Discussion:
core.async, aleph, Netty reference counting extravaganza
Bhaskar Mookerji
2015-02-18 19:08:32 UTC
Permalink
Hello!

... so I've been messing around with the new Aleph, manifold, and
core.async and am running into a Netty reference counting error. The
snippet below creates a TCP service that reads from a single source
(*client*), accepts inbound connections, and then forwards/peers data from
that source using core.async/tap+mult. Multiple connection to the service
results in the netty stacktrace at the end.

I'm unfortunately quite new to Netty, but I'd like to believe that the
client connection I'm peering from is essentially blackboxed from the
downstream channels. Design issues aside, is there some subtlety I'm
missing here?

--------------------------------
(ns foo.handler
(:require
[clojure.core.async :as a]
[aleph.tcp :as tcp]
[manifold.stream :as s])
(:import
[io.netty.buffer ByteBuf]))

;; Make a client connection to *client*, read into a sliding-buffer and
return a channel.
;; This is peering the stream from this client to other channels via
core.async/mult+tap.

(defn get-client-connection []
(let [source @(tcp/client *client*)
buffer-size 4096
chan (-> buffer-size a/sliding-buffer a/chan)]
(a/go-loop []
(try
(if-let [val @(s/take! source)]
(let [sz (.readableBytes ^ByteBuf val)]
(do ;; Do something unrelated with sz
(a/>!! chan val))))
(catch Throwable ex
(s/close! source)
(a/close! chan)))
(recur))
chan))

;; Read from the sliding buffer (msg-bus) into accepted TCP connection
(sink).

(defn fwd-handler [msg-bus sink _]
(try
(a/go-loop []
(try
(let [[val channel] (a/alts!! [msg-bus])]
(s/put! sink val))
(catch Throwable ex
nil))
(recur))
(catch Throwable ex
(s/close! sink))))

;; Get a mult channel, tap new clients into a sliding-buffer.

(defn init-tcp []
(let [client (a/mult (get-client-connection))]
(tcp/start-server (fn [sink info]
(fwd-handler (a/tap client (a/chan
(a/sliding-buffer 4096))) sink info))
{:port *port*})))

;; [aleph "0.4.0-beta2"]
;; [http-kit "2.1.19"]
;; [io.netty/netty-all "4.0.25.Final"]
;; [manifold "0.1.0-beta10"]
;; [org.clojure/core.async "0.1.267.0-0d7780-alpha"]
;; [org.clojure/clojure "1.6.0"]

------------------------

... and multiple connections+reconnections by clients to the TCP server
results stuff like this:

Feb 18, 2015 6:39:31 PM io.netty.channel.AbstractChannelHandlerContext
notifyOutboundHandlerException
WARNING: Failed to fail the promise because it's done already:
***@15871(failure(java.nio.channels.ClosedChannelException)
io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
at
io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:101)
at io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.java:59)
at
io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:657)
at
io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1113)
at
io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:633)
at
io.netty.channel.AbstractChannelHandlerContext.access$1900(AbstractChannelHandlerContext.java:32)
at
io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext.java:908)
at
io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:960)
at
io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:893)
at
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:380)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:268)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
at
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:745)

Thanks for the help (and all the hard work),
Buro
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an email to aleph-lib+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Bhaskar Mookerji
2015-02-18 19:26:52 UTC
Permalink
Oh, and before *any* connections happen, this leak is reported.

Feb 18, 2015 7:22:37 PM io.netty.util.ResourceLeakDetector reportLeak
SEVERE: LEAK: ByteBuf.release() was not called before it's
garbage-collected.
Recent access records: 0
Created at:
io.netty.buffer.UnpooledByteBufAllocator.newDirectBuffer(UnpooledByteBufAllocator.java:55)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:155)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:146)
io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:107)
io.netty.channel.AdaptiveRecvByteBufAllocator$HandleImpl.allocate(AdaptiveRecvByteBufAllocator.java:104)
io.netty.channel.epoll.EpollSocketChannel$EpollSocketUnsafe.epollInReady(EpollSocketChannel.java:712)
io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:326)
io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:264)
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
java.lang.Thread.run(Thread.java:745)

The first stack trace can be reproduced using two netcat connections.

- Buro

On Wednesday, February 18, 2015 at 11:08:32 AM UTC-8, Bhaskar Mookerji
Post by Bhaskar Mookerji
Hello!
... so I've been messing around with the new Aleph, manifold, and
core.async and am running into a Netty reference counting error. The
snippet below creates a TCP service that reads from a single source
(*client*), accepts inbound connections, and then forwards/peers data from
that source using core.async/tap+mult. Multiple connection to the service
results in the netty stacktrace at the end.
I'm unfortunately quite new to Netty, but I'd like to believe that the
client connection I'm peering from is essentially blackboxed from the
downstream channels. Design issues aside, is there some subtlety I'm
missing here?
--------------------------------
(ns foo.handler
(:require
[clojure.core.async :as a]
[aleph.tcp :as tcp]
[manifold.stream :as s])
(:import
[io.netty.buffer ByteBuf]))
;; Make a client connection to *client*, read into a sliding-buffer and
return a channel.
;; This is peering the stream from this client to other channels via
core.async/mult+tap.
(defn get-client-connection []
buffer-size 4096
chan (-> buffer-size a/sliding-buffer a/chan)]
(a/go-loop []
(try
(let [sz (.readableBytes ^ByteBuf val)]
(do ;; Do something unrelated with sz
(a/>!! chan val))))
(catch Throwable ex
(s/close! source)
(a/close! chan)))
(recur))
chan))
;; Read from the sliding buffer (msg-bus) into accepted TCP connection
(sink).
(defn fwd-handler [msg-bus sink _]
(try
(a/go-loop []
(try
(let [[val channel] (a/alts!! [msg-bus])]
(s/put! sink val))
(catch Throwable ex
nil))
(recur))
(catch Throwable ex
(s/close! sink))))
;; Get a mult channel, tap new clients into a sliding-buffer.
(defn init-tcp []
(let [client (a/mult (get-client-connection))]
(tcp/start-server (fn [sink info]
(fwd-handler (a/tap client (a/chan
(a/sliding-buffer 4096))) sink info))
{:port *port*})))
;; [aleph "0.4.0-beta2"]
;; [http-kit "2.1.19"]
;; [io.netty/netty-all "4.0.25.Final"]
;; [manifold "0.1.0-beta10"]
;; [org.clojure/core.async "0.1.267.0-0d7780-alpha"]
;; [org.clojure/clojure "1.6.0"]
------------------------
... and multiple connections+reconnections by clients to the TCP server
Feb 18, 2015 6:39:31 PM io.netty.channel.AbstractChannelHandlerContext
notifyOutboundHandlerException
io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
at
io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:101)
at io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.java:59)
at
io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:657)
at
io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1113)
at
io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:633)
at
io.netty.channel.AbstractChannelHandlerContext.access$1900(AbstractChannelHandlerContext.java:32)
at
io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext.java:908)
at
io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:960)
at
io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:893)
at
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:380)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:268)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
at
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:745)
Thanks for the help (and all the hard work),
Buro
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an email to aleph-lib+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Zach Tellman
2015-02-18 19:47:06 UTC
Permalink
So, the problem with using Netty's ByteBufs is that you need to deal with
the reference counting. Any ByteBuf which is written into a Netty Channel
is implicitly released, so sending the same ByteBuf into many channels
(which is what I think you're doing) is going to cause problems, as it will
be multiply released. The easiest thing to do here by far is to transform
the ByteBuf into a byte-array via byte-streams/to-byte-array before passing
it on, which will copy the bytes out and release the ByteBuf for you. This
is what's done automatically for streaming HTTP messages unless
:raw-stream? is set to true, I should probably do the same for the TCP
client and server.
Post by Bhaskar Mookerji
Oh, and before *any* connections happen, this leak is reported.
Feb 18, 2015 7:22:37 PM io.netty.util.ResourceLeakDetector reportLeak
SEVERE: LEAK: ByteBuf.release() was not called before it's
garbage-collected.
Recent access records: 0
io.netty.buffer.UnpooledByteBufAllocator.newDirectBuffer(UnpooledByteBufAllocator.java:55)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:155)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:146)
io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:107)
io.netty.channel.AdaptiveRecvByteBufAllocator$HandleImpl.allocate(AdaptiveRecvByteBufAllocator.java:104)
io.netty.channel.epoll.EpollSocketChannel$EpollSocketUnsafe.epollInReady(EpollSocketChannel.java:712)
io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:326)
io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:264)
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
java.lang.Thread.run(Thread.java:745)
The first stack trace can be reproduced using two netcat connections.
- Buro
On Wednesday, February 18, 2015 at 11:08:32 AM UTC-8, Bhaskar Mookerji
Post by Bhaskar Mookerji
Hello!
... so I've been messing around with the new Aleph, manifold, and
core.async and am running into a Netty reference counting error. The
snippet below creates a TCP service that reads from a single source
(*client*), accepts inbound connections, and then forwards/peers data from
that source using core.async/tap+mult. Multiple connection to the service
results in the netty stacktrace at the end.
I'm unfortunately quite new to Netty, but I'd like to believe that the
client connection I'm peering from is essentially blackboxed from the
downstream channels. Design issues aside, is there some subtlety I'm
missing here?
--------------------------------
(ns foo.handler
(:require
[clojure.core.async :as a]
[aleph.tcp :as tcp]
[manifold.stream :as s])
(:import
[io.netty.buffer ByteBuf]))
;; Make a client connection to *client*, read into a sliding-buffer and
return a channel.
;; This is peering the stream from this client to other channels via
core.async/mult+tap.
(defn get-client-connection []
buffer-size 4096
chan (-> buffer-size a/sliding-buffer a/chan)]
(a/go-loop []
(try
(let [sz (.readableBytes ^ByteBuf val)]
(do ;; Do something unrelated with sz
(a/>!! chan val))))
(catch Throwable ex
(s/close! source)
(a/close! chan)))
(recur))
chan))
;; Read from the sliding buffer (msg-bus) into accepted TCP connection
(sink).
(defn fwd-handler [msg-bus sink _]
(try
(a/go-loop []
(try
(let [[val channel] (a/alts!! [msg-bus])]
(s/put! sink val))
(catch Throwable ex
nil))
(recur))
(catch Throwable ex
(s/close! sink))))
;; Get a mult channel, tap new clients into a sliding-buffer.
(defn init-tcp []
(let [client (a/mult (get-client-connection))]
(tcp/start-server (fn [sink info]
(fwd-handler (a/tap client (a/chan
(a/sliding-buffer 4096))) sink info))
{:port *port*})))
;; [aleph "0.4.0-beta2"]
;; [http-kit "2.1.19"]
;; [io.netty/netty-all "4.0.25.Final"]
;; [manifold "0.1.0-beta10"]
;; [org.clojure/core.async "0.1.267.0-0d7780-alpha"]
;; [org.clojure/clojure "1.6.0"]
------------------------
... and multiple connections+reconnections by clients to the TCP server
Feb 18, 2015 6:39:31 PM io.netty.channel.AbstractChannelHandlerContext
notifyOutboundHandlerException
ClosedChannelException)
io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
at io.netty.buffer.AbstractReferenceCountedByteBuf.release(
AbstractReferenceCountedByteBuf.java:101)
at io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.java:59)
at io.netty.channel.AbstractChannel$AbstractUnsafe.write(
AbstractChannel.java:657)
at io.netty.channel.DefaultChannelPipeline$HeadContext.write(
DefaultChannelPipeline.java:1113)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(
AbstractChannelHandlerContext.java:633)
at io.netty.channel.AbstractChannelHandlerContext.access$1900(
AbstractChannelHandlerContext.java:32)
at io.netty.channel.AbstractChannelHandlerContext$
AbstractWriteTask.write(AbstractChannelHandlerContext.java:908)
at io.netty.channel.AbstractChannelHandlerContext$
WriteAndFlushTask.write(AbstractChannelHandlerContext.java:960)
at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(
AbstractChannelHandlerContext.java:893)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(
SingleThreadEventExecutor.java:380)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:268)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.
run(SingleThreadEventExecutor.java:116)
at io.netty.util.concurrent.DefaultThreadFactory$
DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:745)
Thanks for the help (and all the hard work),
Buro
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an email to aleph-lib+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Samuel Nelson
2015-02-18 22:16:27 UTC
Permalink
Hello,

Our system is falling prey to the same exception; SEVERE: LEAK:
ByteBuf.release() was not called before it's garbage-collected.

We have a TCP server that exchanges messages with a number of clients. I
have experienced that as the number of clients increases, so does the
likelihood of the exception occurring. It also appears to kill the TCP
connection. So, Judging by what you said, a way to fix this would be to
convert messages via byte-streams/to-byte-array whenever they need to be
sent to multiple channels?

Thanks,
Sam
Post by Zach Tellman
So, the problem with using Netty's ByteBufs is that you need to deal with
the reference counting. Any ByteBuf which is written into a Netty Channel
is implicitly released, so sending the same ByteBuf into many channels
(which is what I think you're doing) is going to cause problems, as it will
be multiply released. The easiest thing to do here by far is to transform
the ByteBuf into a byte-array via byte-streams/to-byte-array before passing
it on, which will copy the bytes out and release the ByteBuf for you. This
is what's done automatically for streaming HTTP messages unless
:raw-stream? is set to true, I should probably do the same for the TCP
client and server.
Post by Bhaskar Mookerji
Oh, and before *any* connections happen, this leak is reported.
Feb 18, 2015 7:22:37 PM io.netty.util.ResourceLeakDetector reportLeak
SEVERE: LEAK: ByteBuf.release() was not called before it's
garbage-collected.
Recent access records: 0
io.netty.buffer.UnpooledByteBufAllocator.newDirectBuffer(UnpooledByteBufAllocator.java:55)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:155)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:146)
io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:107)
io.netty.channel.AdaptiveRecvByteBufAllocator$HandleImpl.allocate(AdaptiveRecvByteBufAllocator.java:104)
io.netty.channel.epoll.EpollSocketChannel$EpollSocketUnsafe.epollInReady(EpollSocketChannel.java:712)
io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:326)
io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:264)
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
java.lang.Thread.run(Thread.java:745)
The first stack trace can be reproduced using two netcat connections.
- Buro
On Wednesday, February 18, 2015 at 11:08:32 AM UTC-8, Bhaskar Mookerji
Post by Bhaskar Mookerji
Hello!
... so I've been messing around with the new Aleph, manifold, and
core.async and am running into a Netty reference counting error. The
snippet below creates a TCP service that reads from a single source
(*client*), accepts inbound connections, and then forwards/peers data from
that source using core.async/tap+mult. Multiple connection to the service
results in the netty stacktrace at the end.
I'm unfortunately quite new to Netty, but I'd like to believe that the
client connection I'm peering from is essentially blackboxed from the
downstream channels. Design issues aside, is there some subtlety I'm
missing here?
--------------------------------
(ns foo.handler
(:require
[clojure.core.async :as a]
[aleph.tcp :as tcp]
[manifold.stream :as s])
(:import
[io.netty.buffer ByteBuf]))
;; Make a client connection to *client*, read into a sliding-buffer and
return a channel.
;; This is peering the stream from this client to other channels via
core.async/mult+tap.
(defn get-client-connection []
buffer-size 4096
chan (-> buffer-size a/sliding-buffer a/chan)]
(a/go-loop []
(try
(let [sz (.readableBytes ^ByteBuf val)]
(do ;; Do something unrelated with sz
(a/>!! chan val))))
(catch Throwable ex
(s/close! source)
(a/close! chan)))
(recur))
chan))
;; Read from the sliding buffer (msg-bus) into accepted TCP connection
(sink).
(defn fwd-handler [msg-bus sink _]
(try
(a/go-loop []
(try
(let [[val channel] (a/alts!! [msg-bus])]
(s/put! sink val))
(catch Throwable ex
nil))
(recur))
(catch Throwable ex
(s/close! sink))))
;; Get a mult channel, tap new clients into a sliding-buffer.
(defn init-tcp []
(let [client (a/mult (get-client-connection))]
(tcp/start-server (fn [sink info]
(fwd-handler (a/tap client (a/chan
(a/sliding-buffer 4096))) sink info))
{:port *port*})))
;; [aleph "0.4.0-beta2"]
;; [http-kit "2.1.19"]
;; [io.netty/netty-all "4.0.25.Final"]
;; [manifold "0.1.0-beta10"]
;; [org.clojure/core.async "0.1.267.0-0d7780-alpha"]
;; [org.clojure/clojure "1.6.0"]
------------------------
... and multiple connections+reconnections by clients to the TCP server
Feb 18, 2015 6:39:31 PM io.netty.channel.AbstractChannelHandlerContext
notifyOutboundHandlerException
ClosedChannelException)
io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
at io.netty.buffer.AbstractReferenceCountedByteBuf.release(
AbstractReferenceCountedByteBuf.java:101)
at io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.java:59)
at io.netty.channel.AbstractChannel$AbstractUnsafe.write(
AbstractChannel.java:657)
at io.netty.channel.DefaultChannelPipeline$HeadContext.write(
DefaultChannelPipeline.java:1113)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(
AbstractChannelHandlerContext.java:633)
at io.netty.channel.AbstractChannelHandlerContext.access$1900(
AbstractChannelHandlerContext.java:32)
at io.netty.channel.AbstractChannelHandlerContext$
AbstractWriteTask.write(AbstractChannelHandlerContext.java:908)
at io.netty.channel.AbstractChannelHandlerContext$
WriteAndFlushTask.write(AbstractChannelHandlerContext.java:960)
at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(
AbstractChannelHandlerContext.java:893)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(
SingleThreadEventExecutor.java:380)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:268)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.
run(SingleThreadEventExecutor.java:116)
at io.netty.util.concurrent.DefaultThreadFactory$
DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:745)
Thanks for the help (and all the hard work),
Buro
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an
For more options, visit https://groups.google.com/d/optout.
--
This information is intended solely for the use of the individual to whom it is addressed.
Any review, disclosure, copying, distribution or use of this e-mail communication by
others is strictly prohibited. If you are not the intended recipient, please notify us
immediately by returning this message to the sender and delete all copies.
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an email to aleph-lib+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
ztellman
2015-02-18 23:00:38 UTC
Permalink
The leak notifications shouldn't affect any TCP connection, I think. But
yeah, I think based on this thread I'm going to change the default byte
representation, and only expose Netty ByteBufs if someone is extra-sure
that's what they want.
Post by Samuel Nelson
Hello,
ByteBuf.release() was not called before it's garbage-collected.
We have a TCP server that exchanges messages with a number of clients. I
have experienced that as the number of clients increases, so does the
likelihood of the exception occurring. It also appears to kill the TCP
connection. So, Judging by what you said, a way to fix this would be to
convert messages via byte-streams/to-byte-array whenever they need to be
sent to multiple channels?
Thanks,
Sam
Post by Zach Tellman
So, the problem with using Netty's ByteBufs is that you need to deal with
the reference counting. Any ByteBuf which is written into a Netty Channel
is implicitly released, so sending the same ByteBuf into many channels
(which is what I think you're doing) is going to cause problems, as it will
be multiply released. The easiest thing to do here by far is to transform
the ByteBuf into a byte-array via byte-streams/to-byte-array before passing
it on, which will copy the bytes out and release the ByteBuf for you. This
is what's done automatically for streaming HTTP messages unless
:raw-stream? is set to true, I should probably do the same for the TCP
client and server.
Post by Bhaskar Mookerji
Oh, and before *any* connections happen, this leak is reported.
Feb 18, 2015 7:22:37 PM io.netty.util.ResourceLeakDetector reportLeak
SEVERE: LEAK: ByteBuf.release() was not called before it's
garbage-collected.
Recent access records: 0
io.netty.buffer.UnpooledByteBufAllocator.newDirectBuffer(UnpooledByteBufAllocator.java:55)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:155)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:146)
io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:107)
io.netty.channel.AdaptiveRecvByteBufAllocator$HandleImpl.allocate(AdaptiveRecvByteBufAllocator.java:104)
io.netty.channel.epoll.EpollSocketChannel$EpollSocketUnsafe.epollInReady(EpollSocketChannel.java:712)
io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:326)
io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:264)
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
java.lang.Thread.run(Thread.java:745)
The first stack trace can be reproduced using two netcat connections.
- Buro
On Wednesday, February 18, 2015 at 11:08:32 AM UTC-8, Bhaskar Mookerji
Post by Bhaskar Mookerji
Hello!
... so I've been messing around with the new Aleph, manifold, and
core.async and am running into a Netty reference counting error. The
snippet below creates a TCP service that reads from a single source
(*client*), accepts inbound connections, and then forwards/peers data from
that source using core.async/tap+mult. Multiple connection to the service
results in the netty stacktrace at the end.
I'm unfortunately quite new to Netty, but I'd like to believe that the
client connection I'm peering from is essentially blackboxed from the
downstream channels. Design issues aside, is there some subtlety I'm
missing here?
--------------------------------
(ns foo.handler
(:require
[clojure.core.async :as a]
[aleph.tcp :as tcp]
[manifold.stream :as s])
(:import
[io.netty.buffer ByteBuf]))
;; Make a client connection to *client*, read into a sliding-buffer and
return a channel.
;; This is peering the stream from this client to other channels via
core.async/mult+tap.
(defn get-client-connection []
buffer-size 4096
chan (-> buffer-size a/sliding-buffer a/chan)]
(a/go-loop []
(try
(let [sz (.readableBytes ^ByteBuf val)]
(do ;; Do something unrelated with sz
(a/>!! chan val))))
(catch Throwable ex
(s/close! source)
(a/close! chan)))
(recur))
chan))
;; Read from the sliding buffer (msg-bus) into accepted TCP connection
(sink).
(defn fwd-handler [msg-bus sink _]
(try
(a/go-loop []
(try
(let [[val channel] (a/alts!! [msg-bus])]
(s/put! sink val))
(catch Throwable ex
nil))
(recur))
(catch Throwable ex
(s/close! sink))))
;; Get a mult channel, tap new clients into a sliding-buffer.
(defn init-tcp []
(let [client (a/mult (get-client-connection))]
(tcp/start-server (fn [sink info]
(fwd-handler (a/tap client (a/chan
(a/sliding-buffer 4096))) sink info))
{:port *port*})))
;; [aleph "0.4.0-beta2"]
;; [http-kit "2.1.19"]
;; [io.netty/netty-all "4.0.25.Final"]
;; [manifold "0.1.0-beta10"]
;; [org.clojure/core.async "0.1.267.0-0d7780-alpha"]
;; [org.clojure/clojure "1.6.0"]
------------------------
... and multiple connections+reconnections by clients to the TCP server
Feb 18, 2015 6:39:31 PM io.netty.channel.AbstractChannelHandlerContext
notifyOutboundHandlerException
ClosedChannelException)
io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
at io.netty.buffer.AbstractReferenceCountedByteBuf.release(
AbstractReferenceCountedByteBuf.java:101)
at io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.java:59)
at io.netty.channel.AbstractChannel$AbstractUnsafe.write(
AbstractChannel.java:657)
at io.netty.channel.DefaultChannelPipeline$HeadContext.write(
DefaultChannelPipeline.java:1113)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(
AbstractChannelHandlerContext.java:633)
at io.netty.channel.AbstractChannelHandlerContext.access$1900(
AbstractChannelHandlerContext.java:32)
at io.netty.channel.AbstractChannelHandlerContext$
AbstractWriteTask.write(AbstractChannelHandlerContext.java:908)
at io.netty.channel.AbstractChannelHandlerContext$
WriteAndFlushTask.write(AbstractChannelHandlerContext.java:960)
at io.netty.channel.AbstractChannelHandlerContext$
AbstractWriteTask.run(AbstractChannelHandlerContext.java:893)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(
SingleThreadEventExecutor.java:380)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:268)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.
run(SingleThreadEventExecutor.java:116)
at io.netty.util.concurrent.DefaultThreadFactory$
DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:745)
Thanks for the help (and all the hard work),
Buro
--
You received this message because you are subscribed to the Google
Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send
For more options, visit https://groups.google.com/d/optout.
This information is intended solely for the use of the individual to whom it is addressed.
Any review, disclosure, copying, distribution or use of this e-mail communication by
others is strictly prohibited. If you are not the intended recipient, please notify us
immediately by returning this message to the sender and delete all copies.
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an email to aleph-lib+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Zach Tellman
2015-02-18 23:34:53 UTC
Permalink
Okay, I've just pushed a new version of 0.4.0-SNAPSHOT which makes TCP and
UDP not expose the ByteBufs by default, since that should really only be
for "advanced" users. Buro and Samuel, if you can confirm that this solves
the issues you're seeing, that would be very helpful. Thanks!
Post by ztellman
The leak notifications shouldn't affect any TCP connection, I think. But
yeah, I think based on this thread I'm going to change the default byte
representation, and only expose Netty ByteBufs if someone is extra-sure
that's what they want.
Post by Samuel Nelson
Hello,
ByteBuf.release() was not called before it's garbage-collected.
We have a TCP server that exchanges messages with a number of clients. I
have experienced that as the number of clients increases, so does the
likelihood of the exception occurring. It also appears to kill the TCP
connection. So, Judging by what you said, a way to fix this would be to
convert messages via byte-streams/to-byte-array whenever they need to be
sent to multiple channels?
Thanks,
Sam
Post by Zach Tellman
So, the problem with using Netty's ByteBufs is that you need to deal
with the reference counting. Any ByteBuf which is written into a Netty
Channel is implicitly released, so sending the same ByteBuf into many
channels (which is what I think you're doing) is going to cause problems,
as it will be multiply released. The easiest thing to do here by far is to
transform the ByteBuf into a byte-array via byte-streams/to-byte-array
before passing it on, which will copy the bytes out and release the ByteBuf
for you. This is what's done automatically for streaming HTTP messages
unless :raw-stream? is set to true, I should probably do the same for the
TCP client and server.
Post by Bhaskar Mookerji
Oh, and before *any* connections happen, this leak is reported.
Feb 18, 2015 7:22:37 PM io.netty.util.ResourceLeakDetector reportLeak
SEVERE: LEAK: ByteBuf.release() was not called before it's
garbage-collected.
Recent access records: 0
io.netty.buffer.UnpooledByteBufAllocator.newDirectBuffer(
UnpooledByteBufAllocator.java:55)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(
AbstractByteBufAllocator.java:155)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(
AbstractByteBufAllocator.java:146)
io.netty.buffer.AbstractByteBufAllocator.ioBuffer(
AbstractByteBufAllocator.java:107)
io.netty.channel.AdaptiveRecvByteBufAllocator$HandleImpl.allocate(
AdaptiveRecvByteBufAllocator.java:104)
io.netty.channel.epoll.EpollSocketChannel$EpollSocketUnsafe.
epollInReady(EpollSocketChannel.java:712)
io.netty.channel.epoll.EpollEventLoop.processReady(
EpollEventLoop.java:326)
io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:264)
io.netty.util.concurrent.SingleThreadEventExecutor$2.
run(SingleThreadEventExecutor.java:116)
io.netty.util.concurrent.DefaultThreadFactory$
DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
java.lang.Thread.run(Thread.java:745)
The first stack trace can be reproduced using two netcat connections.
- Buro
On Wednesday, February 18, 2015 at 11:08:32 AM UTC-8, Bhaskar Mookerji
Post by Bhaskar Mookerji
Hello!
... so I've been messing around with the new Aleph, manifold, and
core.async and am running into a Netty reference counting error. The
snippet below creates a TCP service that reads from a single source
(*client*), accepts inbound connections, and then forwards/peers data from
that source using core.async/tap+mult. Multiple connection to the service
results in the netty stacktrace at the end.
I'm unfortunately quite new to Netty, but I'd like to believe that the
client connection I'm peering from is essentially blackboxed from the
downstream channels. Design issues aside, is there some subtlety I'm
missing here?
--------------------------------
(ns foo.handler
(:require
[clojure.core.async :as a]
[aleph.tcp :as tcp]
[manifold.stream :as s])
(:import
[io.netty.buffer ByteBuf]))
;; Make a client connection to *client*, read into a sliding-buffer
and return a channel.
;; This is peering the stream from this client to other channels via
core.async/mult+tap.
(defn get-client-connection []
buffer-size 4096
chan (-> buffer-size a/sliding-buffer a/chan)]
(a/go-loop []
(try
(let [sz (.readableBytes ^ByteBuf val)]
(do ;; Do something unrelated with sz
(a/>!! chan val))))
(catch Throwable ex
(s/close! source)
(a/close! chan)))
(recur))
chan))
;; Read from the sliding buffer (msg-bus) into accepted TCP connection
(sink).
(defn fwd-handler [msg-bus sink _]
(try
(a/go-loop []
(try
(let [[val channel] (a/alts!! [msg-bus])]
(s/put! sink val))
(catch Throwable ex
nil))
(recur))
(catch Throwable ex
(s/close! sink))))
;; Get a mult channel, tap new clients into a sliding-buffer.
(defn init-tcp []
(let [client (a/mult (get-client-connection))]
(tcp/start-server (fn [sink info]
(fwd-handler (a/tap client (a/chan
(a/sliding-buffer 4096))) sink info))
{:port *port*})))
;; [aleph "0.4.0-beta2"]
;; [http-kit "2.1.19"]
;; [io.netty/netty-all "4.0.25.Final"]
;; [manifold "0.1.0-beta10"]
;; [org.clojure/core.async "0.1.267.0-0d7780-alpha"]
;; [org.clojure/clojure "1.6.0"]
------------------------
... and multiple connections+reconnections by clients to the TCP
Feb 18, 2015 6:39:31 PM io.netty.channel.AbstractChannelHandlerContext
notifyOutboundHandlerException
ChannelException)
io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
at io.netty.buffer.AbstractReferenceCountedByteBuf.release(Abst
ractReferenceCountedByteBuf.java:101)
at io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.
java:59)
at io.netty.channel.AbstractChannel$AbstractUnsafe.write(Abstra
ctChannel.java:657)
at io.netty.channel.DefaultChannelPipeline$HeadContext.write(De
faultChannelPipeline.java:1113)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(A
bstractChannelHandlerContext.java:633)
at io.netty.channel.AbstractChannelHandlerContext.access$1900(A
bstractChannelHandlerContext.java:32)
at io.netty.channel.AbstractChannelHandlerContext$AbstractWrite
Task.write(AbstractChannelHandlerContext.java:908)
at io.netty.channel.AbstractChannelHandlerContext$WriteAndFlush
Task.write(AbstractChannelHandlerContext.java:960)
at io.netty.channel.AbstractChannelHandlerContext$AbstractWrite
Task.run(AbstractChannelHandlerContext.java:893)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(
SingleThreadEventExecutor.java:380)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:268)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(
SingleThreadEventExecutor.java:116)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnabl
eDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:745)
Thanks for the help (and all the hard work),
Buro
--
You received this message because you are subscribed to the Google
Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send
For more options, visit https://groups.google.com/d/optout.
This information is intended solely for the use of the individual to whom it is addressed.
Any review, disclosure, copying, distribution or use of this e-mail communication by
others is strictly prohibited. If you are not the intended recipient, please notify us
immediately by returning this message to the sender and delete all copies.
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an email to aleph-lib+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Bhaskar Mookerji
2015-02-19 17:47:42 UTC
Permalink
Thanks! I'll check it out today and report back.

- Buro
Post by Zach Tellman
Okay, I've just pushed a new version of 0.4.0-SNAPSHOT which makes TCP and
UDP not expose the ByteBufs by default, since that should really only be
for "advanced" users. Buro and Samuel, if you can confirm that this solves
the issues you're seeing, that would be very helpful. Thanks!
Post by ztellman
The leak notifications shouldn't affect any TCP connection, I think. But
yeah, I think based on this thread I'm going to change the default byte
representation, and only expose Netty ByteBufs if someone is extra-sure
that's what they want.
Post by Samuel Nelson
Hello,
ByteBuf.release() was not called before it's garbage-collected.
We have a TCP server that exchanges messages with a number of clients.
I have experienced that as the number of clients increases, so does the
likelihood of the exception occurring. It also appears to kill the TCP
connection. So, Judging by what you said, a way to fix this would be to
convert messages via byte-streams/to-byte-array whenever they need to be
sent to multiple channels?
Thanks,
Sam
Post by Zach Tellman
So, the problem with using Netty's ByteBufs is that you need to deal
with the reference counting. Any ByteBuf which is written into a Netty
Channel is implicitly released, so sending the same ByteBuf into many
channels (which is what I think you're doing) is going to cause problems,
as it will be multiply released. The easiest thing to do here by far is to
transform the ByteBuf into a byte-array via byte-streams/to-byte-array
before passing it on, which will copy the bytes out and release the ByteBuf
for you. This is what's done automatically for streaming HTTP messages
unless :raw-stream? is set to true, I should probably do the same for the
TCP client and server.
Post by Bhaskar Mookerji
Oh, and before *any* connections happen, this leak is reported.
Feb 18, 2015 7:22:37 PM io.netty.util.ResourceLeakDetector reportLeak
SEVERE: LEAK: ByteBuf.release() was not called before it's
garbage-collected.
Recent access records: 0
io.netty.buffer.UnpooledByteBufAllocator.newDirectBuffer(
UnpooledByteBufAllocator.java:55)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(
AbstractByteBufAllocator.java:155)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(
AbstractByteBufAllocator.java:146)
io.netty.buffer.AbstractByteBufAllocator.ioBuffer(
AbstractByteBufAllocator.java:107)
io.netty.channel.AdaptiveRecvByteBufAllocator$HandleImpl.allocate(
AdaptiveRecvByteBufAllocator.java:104)
io.netty.channel.epoll.EpollSocketChannel$EpollSocketUnsafe.
epollInReady(EpollSocketChannel.java:712)
io.netty.channel.epoll.EpollEventLoop.processReady(
EpollEventLoop.java:326)
io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:264)
io.netty.util.concurrent.SingleThreadEventExecutor$2.
run(SingleThreadEventExecutor.java:116)
io.netty.util.concurrent.DefaultThreadFactory$
DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
java.lang.Thread.run(Thread.java:745)
The first stack trace can be reproduced using two netcat connections.
- Buro
On Wednesday, February 18, 2015 at 11:08:32 AM UTC-8, Bhaskar Mookerji
Post by Bhaskar Mookerji
Hello!
... so I've been messing around with the new Aleph, manifold, and
core.async and am running into a Netty reference counting error. The
snippet below creates a TCP service that reads from a single source
(*client*), accepts inbound connections, and then forwards/peers data from
that source using core.async/tap+mult. Multiple connection to the service
results in the netty stacktrace at the end.
I'm unfortunately quite new to Netty, but I'd like to believe that
the client connection I'm peering from is essentially blackboxed from the
downstream channels. Design issues aside, is there some subtlety I'm
missing here?
--------------------------------
(ns foo.handler
(:require
[clojure.core.async :as a]
[aleph.tcp :as tcp]
[manifold.stream :as s])
(:import
[io.netty.buffer ByteBuf]))
;; Make a client connection to *client*, read into a sliding-buffer
and return a channel.
;; This is peering the stream from this client to other channels via
core.async/mult+tap.
(defn get-client-connection []
buffer-size 4096
chan (-> buffer-size a/sliding-buffer a/chan)]
(a/go-loop []
(try
(let [sz (.readableBytes ^ByteBuf val)]
(do ;; Do something unrelated with sz
(a/>!! chan val))))
(catch Throwable ex
(s/close! source)
(a/close! chan)))
(recur))
chan))
;; Read from the sliding buffer (msg-bus) into accepted TCP
connection (sink).
(defn fwd-handler [msg-bus sink _]
(try
(a/go-loop []
(try
(let [[val channel] (a/alts!! [msg-bus])]
(s/put! sink val))
(catch Throwable ex
nil))
(recur))
(catch Throwable ex
(s/close! sink))))
;; Get a mult channel, tap new clients into a sliding-buffer.
(defn init-tcp []
(let [client (a/mult (get-client-connection))]
(tcp/start-server (fn [sink info]
(fwd-handler (a/tap client (a/chan
(a/sliding-buffer 4096))) sink info))
{:port *port*})))
;; [aleph "0.4.0-beta2"]
;; [http-kit "2.1.19"]
;; [io.netty/netty-all "4.0.25.Final"]
;; [manifold "0.1.0-beta10"]
;; [org.clojure/core.async "0.1.267.0-0d7780-alpha"]
;; [org.clojure/clojure "1.6.0"]
------------------------
... and multiple connections+reconnections by clients to the TCP
Feb 18, 2015 6:39:31 PM io.netty.channel.AbstractChannelHandlerContext
notifyOutboundHandlerException
ChannelException)
io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
at io.netty.buffer.AbstractReferenceCountedByteBuf.release(Abst
ractReferenceCountedByteBuf.java:101)
at io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.
java:59)
at io.netty.channel.AbstractChannel$AbstractUnsafe.write(Abstra
ctChannel.java:657)
at io.netty.channel.DefaultChannelPipeline$HeadContext.write(De
faultChannelPipeline.java:1113)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(A
bstractChannelHandlerContext.java:633)
at io.netty.channel.AbstractChannelHandlerContext.access$1900(A
bstractChannelHandlerContext.java:32)
at io.netty.channel.AbstractChannelHandlerContext$AbstractWrite
Task.write(AbstractChannelHandlerContext.java:908)
at io.netty.channel.AbstractChannelHandlerContext$WriteAndFlush
Task.write(AbstractChannelHandlerContext.java:960)
at io.netty.channel.AbstractChannelHandlerContext$AbstractWrite
Task.run(AbstractChannelHandlerContext.java:893)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(
SingleThreadEventExecutor.java:380)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:268)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(
SingleThreadEventExecutor.java:116)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnabl
eDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:745)
Thanks for the help (and all the hard work),
Buro
--
You received this message because you are subscribed to the Google
Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send
For more options, visit https://groups.google.com/d/optout.
This information is intended solely for the use of the individual to whom it is addressed.
Any review, disclosure, copying, distribution or use of this e-mail communication by
others is strictly prohibited. If you are not the intended recipient, please notify us
immediately by returning this message to the sender and delete all copies.
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to a topic in the
Google Groups "Aleph" group.
To unsubscribe from this topic, visit
https://groups.google.com/d/topic/aleph-lib/-41TvS0uKVM/unsubscribe.
To unsubscribe from this group and all its topics, send an email to
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an email to aleph-lib+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Bhaskar Mookerji
2015-02-20 08:41:01 UTC
Permalink
Indeed, it seems to have worked (rather, resolved the issues I noticed, and
then some). Thanks!

- Buro
Post by Bhaskar Mookerji
Thanks! I'll check it out today and report back.
- Buro
Post by Zach Tellman
Okay, I've just pushed a new version of 0.4.0-SNAPSHOT which makes TCP
and UDP not expose the ByteBufs by default, since that should really only
be for "advanced" users. Buro and Samuel, if you can confirm that this
solves the issues you're seeing, that would be very helpful. Thanks!
Post by ztellman
The leak notifications shouldn't affect any TCP connection, I think.
But yeah, I think based on this thread I'm going to change the default byte
representation, and only expose Netty ByteBufs if someone is extra-sure
that's what they want.
Post by Samuel Nelson
Hello,
ByteBuf.release() was not called before it's garbage-collected.
We have a TCP server that exchanges messages with a number of clients.
I have experienced that as the number of clients increases, so does the
likelihood of the exception occurring. It also appears to kill the TCP
connection. So, Judging by what you said, a way to fix this would be to
convert messages via byte-streams/to-byte-array whenever they need to be
sent to multiple channels?
Thanks,
Sam
Post by Zach Tellman
So, the problem with using Netty's ByteBufs is that you need to deal
with the reference counting. Any ByteBuf which is written into a Netty
Channel is implicitly released, so sending the same ByteBuf into many
channels (which is what I think you're doing) is going to cause problems,
as it will be multiply released. The easiest thing to do here by far is to
transform the ByteBuf into a byte-array via byte-streams/to-byte-array
before passing it on, which will copy the bytes out and release the ByteBuf
for you. This is what's done automatically for streaming HTTP messages
unless :raw-stream? is set to true, I should probably do the same for the
TCP client and server.
Post by Bhaskar Mookerji
Oh, and before *any* connections happen, this leak is reported.
Feb 18, 2015 7:22:37 PM io.netty.util.ResourceLeakDetector reportLeak
SEVERE: LEAK: ByteBuf.release() was not called before it's
garbage-collected.
Recent access records: 0
io.netty.buffer.UnpooledByteBufAllocator.newDirectBuffer(
UnpooledByteBufAllocator.java:55)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(
AbstractByteBufAllocator.java:155)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(
AbstractByteBufAllocator.java:146)
io.netty.buffer.AbstractByteBufAllocator.ioBuffer(
AbstractByteBufAllocator.java:107)
io.netty.channel.AdaptiveRecvByteBufAllocator$HandleImpl.allocate(
AdaptiveRecvByteBufAllocator.java:104)
io.netty.channel.epoll.EpollSocketChannel$EpollSocketUnsafe.
epollInReady(EpollSocketChannel.java:712)
io.netty.channel.epoll.EpollEventLoop.processReady(
EpollEventLoop.java:326)
io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:264)
io.netty.util.concurrent.SingleThreadEventExecutor$2.
run(SingleThreadEventExecutor.java:116)
io.netty.util.concurrent.DefaultThreadFactory$
DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
java.lang.Thread.run(Thread.java:745)
The first stack trace can be reproduced using two netcat connections.
- Buro
On Wednesday, February 18, 2015 at 11:08:32 AM UTC-8, Bhaskar
Post by Bhaskar Mookerji
Hello!
... so I've been messing around with the new Aleph, manifold, and
core.async and am running into a Netty reference counting error. The
snippet below creates a TCP service that reads from a single source
(*client*), accepts inbound connections, and then forwards/peers data from
that source using core.async/tap+mult. Multiple connection to the service
results in the netty stacktrace at the end.
I'm unfortunately quite new to Netty, but I'd like to believe that
the client connection I'm peering from is essentially blackboxed from the
downstream channels. Design issues aside, is there some subtlety I'm
missing here?
--------------------------------
(ns foo.handler
(:require
[clojure.core.async :as a]
[aleph.tcp :as tcp]
[manifold.stream :as s])
(:import
[io.netty.buffer ByteBuf]))
;; Make a client connection to *client*, read into a sliding-buffer
and return a channel.
;; This is peering the stream from this client to other channels via
core.async/mult+tap.
(defn get-client-connection []
buffer-size 4096
chan (-> buffer-size a/sliding-buffer a/chan)]
(a/go-loop []
(try
(let [sz (.readableBytes ^ByteBuf val)]
(do ;; Do something unrelated with sz
(a/>!! chan val))))
(catch Throwable ex
(s/close! source)
(a/close! chan)))
(recur))
chan))
;; Read from the sliding buffer (msg-bus) into accepted TCP
connection (sink).
(defn fwd-handler [msg-bus sink _]
(try
(a/go-loop []
(try
(let [[val channel] (a/alts!! [msg-bus])]
(s/put! sink val))
(catch Throwable ex
nil))
(recur))
(catch Throwable ex
(s/close! sink))))
;; Get a mult channel, tap new clients into a sliding-buffer.
(defn init-tcp []
(let [client (a/mult (get-client-connection))]
(tcp/start-server (fn [sink info]
(fwd-handler (a/tap client (a/chan
(a/sliding-buffer 4096))) sink info))
{:port *port*})))
;; [aleph "0.4.0-beta2"]
;; [http-kit "2.1.19"]
;; [io.netty/netty-all "4.0.25.Final"]
;; [manifold "0.1.0-beta10"]
;; [org.clojure/core.async "0.1.267.0-0d7780-alpha"]
;; [org.clojure/clojure "1.6.0"]
------------------------
... and multiple connections+reconnections by clients to the TCP
Feb 18, 2015 6:39:31 PM io.netty.channel.AbstractChannelHandlerContext
notifyOutboundHandlerException
ChannelException)
io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
at io.netty.buffer.AbstractReferenceCountedByteBuf.release(Abst
ractReferenceCountedByteBuf.java:101)
at io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.
java:59)
at io.netty.channel.AbstractChannel$AbstractUnsafe.write(Abstra
ctChannel.java:657)
at io.netty.channel.DefaultChannelPipeline$HeadContext.write(De
faultChannelPipeline.java:1113)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(A
bstractChannelHandlerContext.java:633)
at io.netty.channel.AbstractChannelHandlerContext.access$1900(A
bstractChannelHandlerContext.java:32)
at io.netty.channel.AbstractChannelHandlerContext$AbstractWrite
Task.write(AbstractChannelHandlerContext.java:908)
at io.netty.channel.AbstractChannelHandlerContext$WriteAndFlush
Task.write(AbstractChannelHandlerContext.java:960)
at io.netty.channel.AbstractChannelHandlerContext$AbstractWrite
Task.run(AbstractChannelHandlerContext.java:893)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(
SingleThreadEventExecutor.java:380)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.
java:268)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(
SingleThreadEventExecutor.java:116)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnabl
eDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:745)
Thanks for the help (and all the hard work),
Buro
--
You received this message because you are subscribed to the Google
Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it,
For more options, visit https://groups.google.com/d/optout.
This information is intended solely for the use of the individual to whom it is addressed.
Any review, disclosure, copying, distribution or use of this e-mail communication by
others is strictly prohibited. If you are not the intended recipient, please notify us
immediately by returning this message to the sender and delete all copies.
--
You received this message because you are subscribed to the Google
Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to a topic in the
Google Groups "Aleph" group.
To unsubscribe from this topic, visit
https://groups.google.com/d/topic/aleph-lib/-41TvS0uKVM/unsubscribe.
To unsubscribe from this group and all its topics, send an email to
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an email to aleph-lib+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Loading...