Discussion:
Manifold — s/buffer a stream in connect
d***@juxt.pro
2017-01-24 17:00:40 UTC
Permalink
I seem to have got myself into a slightly confusing situation building
something similar to


(defn connect-mypubsub
[pubsub output]
(manifold.stream/connect
(manifold.bus/subscribe pubsub :event)
(manifold.stream/buffer 50 output)
{:timeout 300}))

(def output (manifold.stream/stream))
(connect-mypubsub (:event-queue system) output)


This is a version of my code, where pubsub is created via

(manifold.bus/event-bus)

For some reason, this causes an Exception like so:

java.lang.IllegalArgumentException: cannot convert manifold.stream.
SourceProxy to sink
clojure.lang.Compiler$CompilerException: java.lang.IllegalArgumentException:
cannot convert manifold.stream.SourceProxy to sink, compiling:(app/event.clj
:31:1)

Any advice on what I am doing wrong? I think I've narrowed it down to this
snippet, but I'm not sure why it doesn't work:
(s/->sink (s/buffer 50 output))

Thanks,
Dominic
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an email to aleph-lib+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Zach Tellman
2017-01-24 18:36:22 UTC
Permalink
Hi Dominic,

I apologize, that example was a bit simplified to fit on the screen. The
`buffer` method takes a source, and returns a buffered view of that
source. In other words, it sets up a buffer downstream of whatever's
passed into it. In your code, you're trying to set up a buffer upstream of
`output`, which would look something like:

(doto (s/buffered-stream 50) (s/connect output))

Again, sorry for the confusion, please let me know if you have any other
questions.

Zach
Post by d***@juxt.pro
I seem to have got myself into a slightly confusing situation building
something similar to http://youtu.be/1bNOO3xxMc0
(defn connect-mypubsub
[pubsub output]
(manifold.stream/connect
(manifold.bus/subscribe pubsub :event)
(manifold.stream/buffer 50 output)
{:timeout 300}))
(def output (manifold.stream/stream))
(connect-mypubsub (:event-queue system) output)
This is a version of my code, where pubsub is created via
(manifold.bus/event-bus)
java.lang.IllegalArgumentException: cannot convert manifold.stream.
SourceProxy to sink
clojure.lang.Compiler$CompilerException: java.lang.
IllegalArgumentException: cannot convert manifold.stream.SourceProxy to
sink, compiling:(app/event.clj:31:1)
Any advice on what I am doing wrong? I think I've narrowed it down to this
(s/->sink (s/buffer 50 output))
Thanks,
Dominic
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an email to aleph-lib+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Dominic Monroe
2017-01-24 21:42:45 UTC
Permalink
Hey Zach,

That makes perfect sense. I had thought to try something like that, but
hadn't quite wrapped my head around how timeout would play into that.

Thanks for your quick reply.

Dominic
Post by Zach Tellman
Hi Dominic,
I apologize, that example was a bit simplified to fit on the screen. The
`buffer` method takes a source, and returns a buffered view of that
source. In other words, it sets up a buffer downstream of whatever's
passed into it. In your code, you're trying to set up a buffer upstream of
(doto (s/buffered-stream 50) (s/connect output))
Again, sorry for the confusion, please let me know if you have any other
questions.
Zach
Post by d***@juxt.pro
I seem to have got myself into a slightly confusing situation building
something similar to http://youtu.be/1bNOO3xxMc0
(defn connect-mypubsub
[pubsub output]
(manifold.stream/connect
(manifold.bus/subscribe pubsub :event)
(manifold.stream/buffer 50 output)
{:timeout 300}))
(def output (manifold.stream/stream))
(connect-mypubsub (:event-queue system) output)
This is a version of my code, where pubsub is created via
(manifold.bus/event-bus)
java.lang.IllegalArgumentException: cannot convert manifold.stream.
SourceProxy to sink
clojure.lang.Compiler$CompilerException: java.lang.IllegalArgumentExcep
tion: cannot convert manifold.stream.SourceProxy to sink, compiling:(app/
event.clj:31:1)
Any advice on what I am doing wrong? I think I've narrowed it down to
(s/->sink (s/buffer 50 output))
Thanks,
Dominic
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to a topic in the
Google Groups "Aleph" group.
To unsubscribe from this topic, visit https://groups.google.com/d/
topic/aleph-lib/osLwBq_fKu0/unsubscribe.
To unsubscribe from this group and all its topics, send an email to
For more options, visit https://groups.google.com/d/optout.
--
Dominic Monroe
Developer

Email: ***@juxt.pro
Web: https://juxt.pro

JUXT LTD.
Software Consulting, Delivery, Training
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an email to aleph-lib+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Loading...