Jeroen van Dijk
2015-11-25 15:21:10 UTC
I have an SSE endpoint that sends messages from a core.async channel. I'm
using core.async's pub/sub to support multiple clients reading from the
same channel. When I added log statements around this SSE endpoint I
noticed that after closing several connections the events were still being
written to the stream that was created during the start of SSE request.
I would like to detect somehow that the SSE request has been terminated by
the client so that I can close relevant resources. Going through the Aleph
code and Manifold code I couldn't find a way to do this. Is it possible?
Below is the code I have to return a Manifold stream to Aleph:
(defn hystrix-stream [subscription req] (let [subscription-with-ping (a/chan
) ping-result (Object.) ;; Return a ping every two seconds when channels
are not being filled _ (a/go-loop [] (let [timeout-ch (a/timeout 2000) [v
ch] (a/alts! [subscription timeout-ch]) res (if (= ch timeout-ch)
ping-result v)] (when res (a/>! subscription-with-ping res) (recur)))) s (
s/->source subscription-with-ping) strm (->> s (s/mapcat (fn [x] (if (= x
ping-result) ["ping"] (metrics->hystrix-data x)))) (s/map (fn [msg] (str "\n
data:" (json/encode msg) "\n"))))] {:status 200 :body strm :headers {"
Content-Type" "text/event-stream;charset=UTF-8" "Cache-Control" "no-cache,
no-store, max-age=0, must-revalidate" "Pragma" "no-cache"}}))
(From https://gist.github.com/jeroenvandijk/67d064e0bb08b900e656)
Thanks in advance,
Jeroen
using core.async's pub/sub to support multiple clients reading from the
same channel. When I added log statements around this SSE endpoint I
noticed that after closing several connections the events were still being
written to the stream that was created during the start of SSE request.
I would like to detect somehow that the SSE request has been terminated by
the client so that I can close relevant resources. Going through the Aleph
code and Manifold code I couldn't find a way to do this. Is it possible?
Below is the code I have to return a Manifold stream to Aleph:
(defn hystrix-stream [subscription req] (let [subscription-with-ping (a/chan
) ping-result (Object.) ;; Return a ping every two seconds when channels
are not being filled _ (a/go-loop [] (let [timeout-ch (a/timeout 2000) [v
ch] (a/alts! [subscription timeout-ch]) res (if (= ch timeout-ch)
ping-result v)] (when res (a/>! subscription-with-ping res) (recur)))) s (
s/->source subscription-with-ping) strm (->> s (s/mapcat (fn [x] (if (= x
ping-result) ["ping"] (metrics->hystrix-data x)))) (s/map (fn [msg] (str "\n
data:" (json/encode msg) "\n"))))] {:status 200 :body strm :headers {"
Content-Type" "text/event-stream;charset=UTF-8" "Cache-Control" "no-cache,
no-store, max-age=0, must-revalidate" "Pragma" "no-cache"}}))
(From https://gist.github.com/jeroenvandijk/67d064e0bb08b900e656)
Thanks in advance,
Jeroen
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an email to aleph-lib+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an email to aleph-lib+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.