Discussion:
How to detect end of connection for an SSE request?
Jeroen van Dijk
2015-11-25 15:21:10 UTC
Permalink
I have an SSE endpoint that sends messages from a core.async channel. I'm
using core.async's pub/sub to support multiple clients reading from the
same channel. When I added log statements around this SSE endpoint I
noticed that after closing several connections the events were still being
written to the stream that was created during the start of SSE request.

I would like to detect somehow that the SSE request has been terminated by
the client so that I can close relevant resources. Going through the Aleph
code and Manifold code I couldn't find a way to do this. Is it possible?

Below is the code I have to return a Manifold stream to Aleph:

(defn hystrix-stream [subscription req] (let [subscription-with-ping (a/chan
) ping-result (Object.) ;; Return a ping every two seconds when channels
are not being filled _ (a/go-loop [] (let [timeout-ch (a/timeout 2000) [v
ch] (a/alts! [subscription timeout-ch]) res (if (= ch timeout-ch)
ping-result v)] (when res (a/>! subscription-with-ping res) (recur)))) s (
s/->source subscription-with-ping) strm (->> s (s/mapcat (fn [x] (if (= x
ping-result) ["ping"] (metrics->hystrix-data x)))) (s/map (fn [msg] (str "\n
data:" (json/encode msg) "\n"))))] {:status 200 :body strm :headers {"
Content-Type" "text/event-stream;charset=UTF-8" "Cache-Control" "no-cache,
no-store, max-age=0, must-revalidate" "Pragma" "no-cache"}}))
(From https://gist.github.com/jeroenvandijk/67d064e0bb08b900e656)

Thanks in advance,
Jeroen
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an email to aleph-lib+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Jeroen van Dijk
2015-11-25 15:39:15 UTC
Permalink
I went a bit further through the manifold code and I saw that closing a
stream generated from (->source some-channel) also closes the channel. So
with this in mind I created an alternative implementation of the above here
https://gist.github.com/jeroenvandijk/67d064e0bb08b900e656#file-aleph-sse-clj-L41,
but this didn't help either
Post by Jeroen van Dijk
I have an SSE endpoint that sends messages from a core.async channel. I'm
using core.async's pub/sub to support multiple clients reading from the
same channel. When I added log statements around this SSE endpoint I
noticed that after closing several connections the events were still being
written to the stream that was created during the start of SSE request.
I would like to detect somehow that the SSE request has been terminated by
the client so that I can close relevant resources. Going through the Aleph
code and Manifold code I couldn't find a way to do this. Is it possible?
(defn hystrix-stream [subscription req] (let [subscription-with-ping (
a/chan) ping-result (Object.) ;; Return a ping every two seconds when
channels are not being filled _ (a/go-loop [] (let [timeout-ch (a/timeout
2000) [v ch] (a/alts! [subscription timeout-ch]) res (if (= ch timeout-ch)
ping-result v)] (when res (a/>! subscription-with-ping res) (recur)))) s (
s/->source subscription-with-ping) strm (->> s (s/mapcat (fn [x] (if (= x
ping-result) ["ping"] (metrics->hystrix-data x)))) (s/map (fn [msg] (str "
\ndata:" (json/encode msg) "\n"))))] {:status 200 :body strm :headers {"
Content-Type" "text/event-stream;charset=UTF-8" "Cache-Control" "no-cache,
no-store, max-age=0, must-revalidate" "Pragma" "no-cache"}}))
(From https://gist.github.com/jeroenvandijk/67d064e0bb08b900e656)
Thanks in advance,
Jeroen
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an email to aleph-lib+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Jeroen van Dijk
2015-11-25 16:40:17 UTC
Permalink
I found a working solution. I have to return the result of (->source
my-channel) in the response body directly or the closing doesn't happen
properly. I've moved the transformation from manifold to core.async via
transducers. Here is the new version
https://gist.github.com/jeroenvandijk/67d064e0bb08b900e656#file-aleph-sse-clj-L84

I'm not sure if this is a bug, but it is a bit counter intuitive for me.
Especially since some core.async and manifold.stream examples are being
named to be equivalent here http://ideolalia.com/aleph/literate.html. In
this case it is not.

Btw, thanks for all the libraries Zach. Aleph has been literally a drop-in
replacement for httpkit (still pre-production but looks promising).

Thanks,
Jeroen
Post by Jeroen van Dijk
I went a bit further through the manifold code and I saw that closing a
stream generated from (->source some-channel) also closes the channel. So
with this in mind I created an alternative implementation of the above here
https://gist.github.com/jeroenvandijk/67d064e0bb08b900e656#file-aleph-sse-clj-L41,
but this didn't help either
On Wed, Nov 25, 2015 at 4:21 PM, Jeroen van Dijk <
Post by Jeroen van Dijk
I have an SSE endpoint that sends messages from a core.async channel. I'm
using core.async's pub/sub to support multiple clients reading from the
same channel. When I added log statements around this SSE endpoint I
noticed that after closing several connections the events were still being
written to the stream that was created during the start of SSE request.
I would like to detect somehow that the SSE request has been terminated
by the client so that I can close relevant resources. Going through the
Aleph code and Manifold code I couldn't find a way to do this. Is it
possible?
(defn hystrix-stream [subscription req] (let [subscription-with-ping (
a/chan) ping-result (Object.) ;; Return a ping every two seconds when
channels are not being filled _ (a/go-loop [] (let [timeout-ch (a/timeout
2000) [v ch] (a/alts! [subscription timeout-ch]) res (if (= ch
timeout-ch) ping-result v)] (when res (a/>! subscription-with-ping res) (
recur)))) s (s/->source subscription-with-ping) strm (->> s (s/mapcat (fn
[x] (if (= x ping-result) ["ping"] (metrics->hystrix-data x)))) (s/map (
fn [msg] (str "\ndata:" (json/encode msg) "\n"))))] {:status 200 :body
strm :headers {"Content-Type" "text/event-stream;charset=UTF-8" "
Cache-Control" "no-cache, no-store, max-age=0, must-revalidate" "Pragma"
"no-cache"}}))
(From https://gist.github.com/jeroenvandijk/67d064e0bb08b900e656)
Thanks in advance,
Jeroen
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an email to aleph-lib+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Jeroen van Dijk
2015-11-25 17:04:11 UTC
Permalink
Found yet another (better) alternative after looking at the Yada code for
SSE [1]. I should have used manifold.stream/transform, so here is the
latest example
https://gist.github.com/jeroenvandijk/67d064e0bb08b900e656#file-aleph-sse-clj-L108

Apologies for all the noise. Hopefully it is useful to someone.

[1]
https://github.com/juxt/yada/blob/94f3ee93de155a8513b27e0508608691ed556a55/src/yada/resources/sse.clj#L26
Post by Jeroen van Dijk
I found a working solution. I have to return the result of (->source
my-channel) in the response body directly or the closing doesn't happen
properly. I've moved the transformation from manifold to core.async via
transducers. Here is the new version
https://gist.github.com/jeroenvandijk/67d064e0bb08b900e656#file-aleph-sse-clj-L84
I'm not sure if this is a bug, but it is a bit counter intuitive for me.
Especially since some core.async and manifold.stream examples are being
named to be equivalent here http://ideolalia.com/aleph/literate.html. In
this case it is not.
Btw, thanks for all the libraries Zach. Aleph has been literally a drop-in
replacement for httpkit (still pre-production but looks promising).
Thanks,
Jeroen
On Wed, Nov 25, 2015 at 4:39 PM, Jeroen van Dijk <
Post by Jeroen van Dijk
I went a bit further through the manifold code and I saw that closing a
stream generated from (->source some-channel) also closes the channel. So
with this in mind I created an alternative implementation of the above here
https://gist.github.com/jeroenvandijk/67d064e0bb08b900e656#file-aleph-sse-clj-L41,
but this didn't help either
On Wed, Nov 25, 2015 at 4:21 PM, Jeroen van Dijk <
Post by Jeroen van Dijk
I have an SSE endpoint that sends messages from a core.async channel.
I'm using core.async's pub/sub to support multiple clients reading from the
same channel. When I added log statements around this SSE endpoint I
noticed that after closing several connections the events were still being
written to the stream that was created during the start of SSE request.
I would like to detect somehow that the SSE request has been terminated
by the client so that I can close relevant resources. Going through the
Aleph code and Manifold code I couldn't find a way to do this. Is it
possible?
(defn hystrix-stream [subscription req] (let [subscription-with-ping (
a/chan) ping-result (Object.) ;; Return a ping every two seconds when
channels are not being filled _ (a/go-loop [] (let [timeout-ch (
a/timeout 2000) [v ch] (a/alts! [subscription timeout-ch]) res (if (=
ch timeout-ch) ping-result v)] (when res (a/>! subscription-with-ping
res) (recur)))) s (s/->source subscription-with-ping) strm (->> s (
s/mapcat (fn [x] (if (= x ping-result) ["ping"] (metrics->hystrix-data
x)))) (s/map (fn [msg] (str "\ndata:" (json/encode msg) "\n"))))] {
:status 200 :body strm :headers {"Content-Type" "
text/event-stream;charset=UTF-8" "Cache-Control" "no-cache, no-store,
max-age=0, must-revalidate" "Pragma" "no-cache"}}))
(From https://gist.github.com/jeroenvandijk/67d064e0bb08b900e656)
Thanks in advance,
Jeroen
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an email to aleph-lib+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Zach Tellman
2015-11-25 19:30:59 UTC
Permalink
Registering a callback via `on-drained` on the channel you've coerced to a
source will give you what you want, I think.
Post by Jeroen van Dijk
Found yet another (better) alternative after looking at the Yada code for
SSE [1]. I should have used manifold.stream/transform, so here is the
latest example
https://gist.github.com/jeroenvandijk/67d064e0bb08b900e656#file-aleph-sse-clj-L108
Apologies for all the noise. Hopefully it is useful to someone.
[1]
https://github.com/juxt/yada/blob/94f3ee93de155a8513b27e0508608691ed556a55/src/yada/resources/sse.clj#L26
On Wed, Nov 25, 2015 at 5:40 PM, Jeroen van Dijk <
Post by Jeroen van Dijk
I found a working solution. I have to return the result of (->source
my-channel) in the response body directly or the closing doesn't happen
properly. I've moved the transformation from manifold to core.async via
transducers. Here is the new version
https://gist.github.com/jeroenvandijk/67d064e0bb08b900e656#file-aleph-sse-clj-L84
I'm not sure if this is a bug, but it is a bit counter intuitive for me.
Especially since some core.async and manifold.stream examples are being
named to be equivalent here http://ideolalia.com/aleph/literate.html. In
this case it is not.
Btw, thanks for all the libraries Zach. Aleph has been literally a
drop-in replacement for httpkit (still pre-production but looks promising).
Thanks,
Jeroen
On Wed, Nov 25, 2015 at 4:39 PM, Jeroen van Dijk <
Post by Jeroen van Dijk
I went a bit further through the manifold code and I saw that closing a
stream generated from (->source some-channel) also closes the channel. So
with this in mind I created an alternative implementation of the above here
https://gist.github.com/jeroenvandijk/67d064e0bb08b900e656#file-aleph-sse-clj-L41,
but this didn't help either
On Wed, Nov 25, 2015 at 4:21 PM, Jeroen van Dijk <
Post by Jeroen van Dijk
I have an SSE endpoint that sends messages from a core.async channel.
I'm using core.async's pub/sub to support multiple clients reading from the
same channel. When I added log statements around this SSE endpoint I
noticed that after closing several connections the events were still being
written to the stream that was created during the start of SSE request.
I would like to detect somehow that the SSE request has been terminated
by the client so that I can close relevant resources. Going through the
Aleph code and Manifold code I couldn't find a way to do this. Is it
possible?
(defn hystrix-stream [subscription req] (let [subscription-with-ping (
a/chan) ping-result (Object.) ;; Return a ping every two seconds when
channels are not being filled _ (a/go-loop [] (let [timeout-ch (
a/timeout 2000) [v ch] (a/alts! [subscription timeout-ch]) res (if (=
ch timeout-ch) ping-result v)] (when res (a/>! subscription-with-ping
res) (recur)))) s (s/->source subscription-with-ping) strm (->> s (
s/mapcat (fn [x] (if (= x ping-result) ["ping"] (metrics->hystrix-data
x)))) (s/map (fn [msg] (str "\ndata:" (json/encode msg) "\n"))))] {
:status 200 :body strm :headers {"Content-Type" "
text/event-stream;charset=UTF-8" "Cache-Control" "no-cache, no-store,
max-age=0, must-revalidate" "Pragma" "no-cache"}}))
(From https://gist.github.com/jeroenvandijk/67d064e0bb08b900e656)
Thanks in advance,
Jeroen
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an email to aleph-lib+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Jeroen van Dijk
2015-11-27 12:34:05 UTC
Permalink
Thanks for the suggestion Zach. I'll try it a bit later. Currently I have
achieved what I had in mind. For some reason transform didn't work as I
thought and I went back to the core.async/pipe version. I'll look into that
when I'm done with this particular feature I'm working on.
Post by Zach Tellman
Registering a callback via `on-drained` on the channel you've coerced to a
source will give you what you want, I think.
On Wed, Nov 25, 2015 at 9:04 AM Jeroen van Dijk <
Post by Jeroen van Dijk
Found yet another (better) alternative after looking at the Yada code
for SSE [1]. I should have used manifold.stream/transform, so here is the
latest example
https://gist.github.com/jeroenvandijk/67d064e0bb08b900e656#file-aleph-sse-clj-L108
Apologies for all the noise. Hopefully it is useful to someone.
[1]
https://github.com/juxt/yada/blob/94f3ee93de155a8513b27e0508608691ed556a55/src/yada/resources/sse.clj#L26
On Wed, Nov 25, 2015 at 5:40 PM, Jeroen van Dijk <
Post by Jeroen van Dijk
I found a working solution. I have to return the result of (->source
my-channel) in the response body directly or the closing doesn't happen
properly. I've moved the transformation from manifold to core.async via
transducers. Here is the new version
https://gist.github.com/jeroenvandijk/67d064e0bb08b900e656#file-aleph-sse-clj-L84
I'm not sure if this is a bug, but it is a bit counter intuitive for me.
Especially since some core.async and manifold.stream examples are being
named to be equivalent here http://ideolalia.com/aleph/literate.html.
In this case it is not.
Btw, thanks for all the libraries Zach. Aleph has been literally a
drop-in replacement for httpkit (still pre-production but looks promising).
Thanks,
Jeroen
On Wed, Nov 25, 2015 at 4:39 PM, Jeroen van Dijk <
Post by Jeroen van Dijk
I went a bit further through the manifold code and I saw that closing a
stream generated from (->source some-channel) also closes the channel. So
with this in mind I created an alternative implementation of the above here
https://gist.github.com/jeroenvandijk/67d064e0bb08b900e656#file-aleph-sse-clj-L41,
but this didn't help either
On Wed, Nov 25, 2015 at 4:21 PM, Jeroen van Dijk <
Post by Jeroen van Dijk
I have an SSE endpoint that sends messages from a core.async channel.
I'm using core.async's pub/sub to support multiple clients reading from the
same channel. When I added log statements around this SSE endpoint I
noticed that after closing several connections the events were still being
written to the stream that was created during the start of SSE request.
I would like to detect somehow that the SSE request has been
terminated by the client so that I can close relevant resources. Going
through the Aleph code and Manifold code I couldn't find a way to do this.
Is it possible?
(defn hystrix-stream [subscription req] (let [subscription-with-ping (
a/chan) ping-result (Object.) ;; Return a ping every two seconds when
channels are not being filled _ (a/go-loop [] (let [timeout-ch (
a/timeout 2000) [v ch] (a/alts! [subscription timeout-ch]) res (if (=
ch timeout-ch) ping-result v)] (when res (a/>! subscription-with-ping
res) (recur)))) s (s/->source subscription-with-ping) strm (->> s (
s/mapcat (fn [x] (if (= x ping-result) ["ping"] (metrics->hystrix-data
x)))) (s/map (fn [msg] (str "\ndata:" (json/encode msg) "\n"))))] {
:status 200 :body strm :headers {"Content-Type" "
text/event-stream;charset=UTF-8" "Cache-Control" "no-cache, no-store,
max-age=0, must-revalidate" "Pragma" "no-cache"}}))
(From https://gist.github.com/jeroenvandijk/67d064e0bb08b900e656)
Thanks in advance,
Jeroen
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an email to aleph-lib+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Loading...