Discussion:
Piping a java.io.OutputStream to Aleph
Malcolm Sparks
2017-01-20 13:04:29 UTC
Permalink
Hi,

I've got to lazily stream a large amount of JSON using Cheshire's
generate-stream fn.

The closest I've managed to get is this:

(fn [ctx]
(let [pipe (java.nio.channels.Pipe/open)
writer (java.nio.channels.Channels/newWriter (.sink pipe) "utf-8")]
(doto (Thread.
(fn []
(cheshire.core/generate-stream some-obj writer)
(.close out)))
(.start))
(.source pipe)))

I've tried hard to use byte-streams/manifold to do this directly, but
couldn't figure it out, hence this posting.

The closest I've found is this code in byte-streams.clj :

(def-conversion ^{:cost 1.5} [(seq-of ByteBuffer) ReadableByteChannel]
[bufs]
(let [pipe (Pipe/open)
^WritableByteChannel sink (.sink pipe)
source (doto ^AbstractSelectableChannel (.source pipe)
(.configureBlocking true))]
(future
(try
(loop [s bufs]
(when (and (not (empty? s)) (.isOpen sink))
(let [buf (.duplicate ^ByteBuffer (first s))]
(.write sink buf)
(recur (rest s)))))
(finally
(.close sink))))
source))

But it seems I want the 'write' counterpart which doesn't exist. I don't
feel confident enough with the details to offer a PR.

Is there a better way of doing this in byte-streams/manifold as it is today?

Many thanks,

Malcolm
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an email to aleph-lib+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Zach Tellman
2017-01-22 23:39:43 UTC
Permalink
Hey Malcolm,

There is not currently a built-in transform from a Manifold sink to an
OutputStream. It's a perfectly reasonable thing to have, and I point you
to the existing shim for InputStreams [1] for a guide for how this would be
done, but it's work that you'd need to do yourself. It's worth noting that
this isn't *exactly* lazy, since you're trying to generate the encoded JSON
as quickly as the sink will let you, so this will need to be done on a
separate thread, and the `put!` calls inside the OutputStream shim will
need to use the `blocking?` param.

If this isn't an urgent need, I'll get around to adding it to byte-streams
eventually, but pull requests are of course welcome.

Zach

[1]
https://github.com/ztellman/byte-streams/blob/master/src/byte_streams/InputStream.java
Post by Malcolm Sparks
Hi,
I've got to lazily stream a large amount of JSON using Cheshire's
generate-stream fn.
(fn [ctx]
(let [pipe (java.nio.channels.Pipe/open)
writer (java.nio.channels.Channels/newWriter (.sink pipe) "utf-8")]
(doto (Thread.
(fn []
(cheshire.core/generate-stream some-obj writer)
(.close out)))
(.start))
(.source pipe)))
I've tried hard to use byte-streams/manifold to do this directly, but
couldn't figure it out, hence this posting.
(def-conversion ^{:cost 1.5} [(seq-of ByteBuffer) ReadableByteChannel]
[bufs]
(let [pipe (Pipe/open)
^WritableByteChannel sink (.sink pipe)
source (doto ^AbstractSelectableChannel (.source pipe)
(.configureBlocking true))]
(future
(try
(loop [s bufs]
(when (and (not (empty? s)) (.isOpen sink))
(let [buf (.duplicate ^ByteBuffer (first s))]
(.write sink buf)
(recur (rest s)))))
(finally
(.close sink))))
source))
But it seems I want the 'write' counterpart which doesn't exist. I don't
feel confident enough with the details to offer a PR.
Is there a better way of doing this in byte-streams/manifold as it is today?
Many thanks,
Malcolm
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an email to aleph-lib+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Loading...