Bhaskar Mookerji
2015-02-18 19:08:32 UTC
Hello!
... so I've been messing around with the new Aleph, manifold, and
core.async and am running into a Netty reference counting error. The
snippet below creates a TCP service that reads from a single source
(*client*), accepts inbound connections, and then forwards/peers data from
that source using core.async/tap+mult. Multiple connection to the service
results in the netty stacktrace at the end.
I'm unfortunately quite new to Netty, but I'd like to believe that the
client connection I'm peering from is essentially blackboxed from the
downstream channels. Design issues aside, is there some subtlety I'm
missing here?
--------------------------------
(ns foo.handler
(:require
[clojure.core.async :as a]
[aleph.tcp :as tcp]
[manifold.stream :as s])
(:import
[io.netty.buffer ByteBuf]))
;; Make a client connection to *client*, read into a sliding-buffer and
return a channel.
;; This is peering the stream from this client to other channels via
core.async/mult+tap.
(defn get-client-connection []
(let [source @(tcp/client *client*)
buffer-size 4096
chan (-> buffer-size a/sliding-buffer a/chan)]
(a/go-loop []
(try
(if-let [val @(s/take! source)]
(let [sz (.readableBytes ^ByteBuf val)]
(do ;; Do something unrelated with sz
(a/>!! chan val))))
(catch Throwable ex
(s/close! source)
(a/close! chan)))
(recur))
chan))
;; Read from the sliding buffer (msg-bus) into accepted TCP connection
(sink).
(defn fwd-handler [msg-bus sink _]
(try
(a/go-loop []
(try
(let [[val channel] (a/alts!! [msg-bus])]
(s/put! sink val))
(catch Throwable ex
nil))
(recur))
(catch Throwable ex
(s/close! sink))))
;; Get a mult channel, tap new clients into a sliding-buffer.
(defn init-tcp []
(let [client (a/mult (get-client-connection))]
(tcp/start-server (fn [sink info]
(fwd-handler (a/tap client (a/chan
(a/sliding-buffer 4096))) sink info))
{:port *port*})))
;; [aleph "0.4.0-beta2"]
;; [http-kit "2.1.19"]
;; [io.netty/netty-all "4.0.25.Final"]
;; [manifold "0.1.0-beta10"]
;; [org.clojure/core.async "0.1.267.0-0d7780-alpha"]
;; [org.clojure/clojure "1.6.0"]
------------------------
... and multiple connections+reconnections by clients to the TCP server
results stuff like this:
Feb 18, 2015 6:39:31 PM io.netty.channel.AbstractChannelHandlerContext
notifyOutboundHandlerException
WARNING: Failed to fail the promise because it's done already:
***@15871(failure(java.nio.channels.ClosedChannelException)
io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
at
io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:101)
at io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.java:59)
at
io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:657)
at
io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1113)
at
io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:633)
at
io.netty.channel.AbstractChannelHandlerContext.access$1900(AbstractChannelHandlerContext.java:32)
at
io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext.java:908)
at
io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:960)
at
io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:893)
at
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:380)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:268)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
at
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:745)
Thanks for the help (and all the hard work),
Buro
... so I've been messing around with the new Aleph, manifold, and
core.async and am running into a Netty reference counting error. The
snippet below creates a TCP service that reads from a single source
(*client*), accepts inbound connections, and then forwards/peers data from
that source using core.async/tap+mult. Multiple connection to the service
results in the netty stacktrace at the end.
I'm unfortunately quite new to Netty, but I'd like to believe that the
client connection I'm peering from is essentially blackboxed from the
downstream channels. Design issues aside, is there some subtlety I'm
missing here?
--------------------------------
(ns foo.handler
(:require
[clojure.core.async :as a]
[aleph.tcp :as tcp]
[manifold.stream :as s])
(:import
[io.netty.buffer ByteBuf]))
;; Make a client connection to *client*, read into a sliding-buffer and
return a channel.
;; This is peering the stream from this client to other channels via
core.async/mult+tap.
(defn get-client-connection []
(let [source @(tcp/client *client*)
buffer-size 4096
chan (-> buffer-size a/sliding-buffer a/chan)]
(a/go-loop []
(try
(if-let [val @(s/take! source)]
(let [sz (.readableBytes ^ByteBuf val)]
(do ;; Do something unrelated with sz
(a/>!! chan val))))
(catch Throwable ex
(s/close! source)
(a/close! chan)))
(recur))
chan))
;; Read from the sliding buffer (msg-bus) into accepted TCP connection
(sink).
(defn fwd-handler [msg-bus sink _]
(try
(a/go-loop []
(try
(let [[val channel] (a/alts!! [msg-bus])]
(s/put! sink val))
(catch Throwable ex
nil))
(recur))
(catch Throwable ex
(s/close! sink))))
;; Get a mult channel, tap new clients into a sliding-buffer.
(defn init-tcp []
(let [client (a/mult (get-client-connection))]
(tcp/start-server (fn [sink info]
(fwd-handler (a/tap client (a/chan
(a/sliding-buffer 4096))) sink info))
{:port *port*})))
;; [aleph "0.4.0-beta2"]
;; [http-kit "2.1.19"]
;; [io.netty/netty-all "4.0.25.Final"]
;; [manifold "0.1.0-beta10"]
;; [org.clojure/core.async "0.1.267.0-0d7780-alpha"]
;; [org.clojure/clojure "1.6.0"]
------------------------
... and multiple connections+reconnections by clients to the TCP server
results stuff like this:
Feb 18, 2015 6:39:31 PM io.netty.channel.AbstractChannelHandlerContext
notifyOutboundHandlerException
WARNING: Failed to fail the promise because it's done already:
***@15871(failure(java.nio.channels.ClosedChannelException)
io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
at
io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:101)
at io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.java:59)
at
io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:657)
at
io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1113)
at
io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:633)
at
io.netty.channel.AbstractChannelHandlerContext.access$1900(AbstractChannelHandlerContext.java:32)
at
io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext.java:908)
at
io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:960)
at
io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:893)
at
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:380)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:268)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
at
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:745)
Thanks for the help (and all the hard work),
Buro
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an email to aleph-lib+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an email to aleph-lib+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.