Discussion:
periodically seems to be creating lots of threads
David Smith
2015-04-21 16:33:29 UTC
Permalink
I've created a token bucket using `stream/periodically`

(defn token-channel
[period buffer size]
(let [token-input (stream/stream)
tc (stream/periodically period (fn [] :token))
out-chan (stream/stream)]
(stream/consume (fn [msg]
(when (> size @buffer)
(stream/put! out-chan msg)
(swap! buffer inc)))
tc)
[out-chan token-input]))


I previously had something similar with lamina however now we seem to be
getting quite high CPU all the time and I see that loads of daemon threads
are being created by `manifold.utils/thread-factory`. I've attached an
html export from VisualVM where you can see `manifold.time/every` seems to
be triggering this creation of daemon threads. This is beyond my
understanding at the moment so any help would be appreciated.
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an email to aleph-lib+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
David Smith
2015-04-21 16:55:18 UTC
Permalink
BTW I am moving across to `stream/throttle` but still I need to understand
what is wrong here.
Post by David Smith
I've created a token bucket using `stream/periodically`
(defn token-channel
[period buffer size]
(let [token-input (stream/stream)
tc (stream/periodically period (fn [] :token))
out-chan (stream/stream)]
(stream/consume (fn [msg]
(stream/put! out-chan msg)
(swap! buffer inc)))
tc)
[out-chan token-input]))
I previously had something similar with lamina however now we seem to be
getting quite high CPU all the time and I see that loads of daemon threads
are being created by `manifold.utils/thread-factory`. I've attached an
html export from VisualVM where you can see `manifold.time/every` seems to
be triggering this creation of daemon threads. This is beyond my
understanding at the moment so any help would be appreciated.
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an email to aleph-lib+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Zach Tellman
2015-04-21 17:04:00 UTC
Permalink
Hey David,

`periodically` uses a ScheduledThreadPoolExecutor under the covers, which
will generate threads as needed to schedule the tasks at the given
interval. I had thought it would lazily generate those as needed, but
after some tests I realize that it will spin up as many threads as there
are cores even if you're doing something trivial. This is kinda harmless,
but obviously not exactly what you'd hope for. Is this behavior breaking
anything for you, or were you just surprised and curious as to what's going
on?

Zach
Post by David Smith
BTW I am moving across to `stream/throttle` but still I need to understand
what is wrong here.
Post by David Smith
I've created a token bucket using `stream/periodically`
(defn token-channel
[period buffer size]
(let [token-input (stream/stream)
tc (stream/periodically period (fn [] :token))
out-chan (stream/stream)]
(stream/consume (fn [msg]
(stream/put! out-chan msg)
(swap! buffer inc)))
tc)
[out-chan token-input]))
I previously had something similar with lamina however now we seem to be
getting quite high CPU all the time and I see that loads of daemon threads
are being created by `manifold.utils/thread-factory`. I've attached an
html export from VisualVM where you can see `manifold.time/every` seems to
be triggering this creation of daemon threads. This is beyond my
understanding at the moment so any help would be appreciated.
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an email to aleph-lib+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
David Smith
2015-04-21 17:08:26 UTC
Permalink
I think it might to break prod tonight (I'm in Europe) if the number of
threads is a problem, I've never tried to up the number of threads before
but we're currently at around 1500 threads and it seems to create a new one
every second or so :( A restart should do the trick but it's not good. In
the mean time I'm converting our code to use throttle so hopefully that
should be ok. It does seem a bit dodgy that I could write such innocent
looking code and end up with all these threads though....
Post by Zach Tellman
Hey David,
`periodically` uses a ScheduledThreadPoolExecutor under the covers, which
will generate threads as needed to schedule the tasks at the given
interval. I had thought it would lazily generate those as needed, but
after some tests I realize that it will spin up as many threads as there
are cores even if you're doing something trivial. This is kinda harmless,
but obviously not exactly what you'd hope for. Is this behavior breaking
anything for you, or were you just surprised and curious as to what's going
on?
Zach
Post by David Smith
BTW I am moving across to `stream/throttle` but still I need to
understand what is wrong here.
Post by David Smith
I've created a token bucket using `stream/periodically`
(defn token-channel
[period buffer size]
(let [token-input (stream/stream)
tc (stream/periodically period (fn [] :token))
out-chan (stream/stream)]
(stream/consume (fn [msg]
(stream/put! out-chan msg)
(swap! buffer inc)))
tc)
[out-chan token-input]))
I previously had something similar with lamina however now we seem to be
getting quite high CPU all the time and I see that loads of daemon threads
are being created by `manifold.utils/thread-factory`. I've attached an
html export from VisualVM where you can see `manifold.time/every` seems to
be triggering this creation of daemon threads. This is beyond my
understanding at the moment so any help would be appreciated.
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an email to aleph-lib+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Zach Tellman
2015-04-21 17:10:43 UTC
Permalink
Oh! That's not correct, it should definitely only be a fixed number of
threads. Is it possible that the task you're assigning it takes longer
than the interval `periodically` is being run at?
Post by David Smith
I think it might to break prod tonight (I'm in Europe) if the number of
threads is a problem, I've never tried to up the number of threads before
but we're currently at around 1500 threads and it seems to create a new one
every second or so :( A restart should do the trick but it's not good. In
the mean time I'm converting our code to use throttle so hopefully that
should be ok. It does seem a bit dodgy that I could write such innocent
looking code and end up with all these threads though....
Post by Zach Tellman
Hey David,
`periodically` uses a ScheduledThreadPoolExecutor under the covers, which
will generate threads as needed to schedule the tasks at the given
interval. I had thought it would lazily generate those as needed, but
after some tests I realize that it will spin up as many threads as there
are cores even if you're doing something trivial. This is kinda harmless,
but obviously not exactly what you'd hope for. Is this behavior breaking
anything for you, or were you just surprised and curious as to what's going
on?
Zach
Post by David Smith
BTW I am moving across to `stream/throttle` but still I need to
understand what is wrong here.
Post by David Smith
I've created a token bucket using `stream/periodically`
(defn token-channel
[period buffer size]
(let [token-input (stream/stream)
tc (stream/periodically period (fn [] :token))
out-chan (stream/stream)]
(stream/consume (fn [msg]
(stream/put! out-chan msg)
(swap! buffer inc)))
tc)
[out-chan token-input]))
I previously had something similar with lamina however now we seem to
be getting quite high CPU all the time and I see that loads of daemon
threads are being created by `manifold.utils/thread-factory`. I've
attached an html export from VisualVM where you can see
`manifold.time/every` seems to be triggering this creation of daemon
threads. This is beyond my understanding at the moment so any help would
be appreciated.
--
You received this message because you are subscribed to the Google
Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an email to aleph-lib+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
David Smith
2015-04-21 17:25:06 UTC
Permalink
Hmm, well the task assigned to periodically just returns `:token` so not
really. Here is the rest of my code:

(defn limit-stream "returns a channel whose output rate will be limited to
one message every period ms. If no messages are enqueued for a time then
we allow a 'burst' for new messages. This burst is limited by the
buffer-size. Lets assume a period of 1 second and buffer-size of 10 *
We leave the queue empty for 10 seconds * We put 30 messages in the queue
* 10 messages will be sent immediately * The remaining 20 messsages
will be limited to 1 per second You should set the period to be faster
than you expect to enqueue messages, otherwise you will fill up the queue
until it bursts! The buffer should be set to the largest number of
messages you want to be able to send in one burst." [in period buffer-size]
(let [buffer (atom 0) out-channel (stream/stream) [tc token-
input] (token-channel period buffer buffer-size)] (stream/consume (fn [
msg] (stream/put! token-input 1)
@(stream/take! tc) (swap! buffer dec)
(stream/put! in msg)) out-channel) out-channel))
Post by Zach Tellman
Oh! That's not correct, it should definitely only be a fixed number of
threads. Is it possible that the task you're assigning it takes longer
than the interval `periodically` is being run at?
Post by David Smith
I think it might to break prod tonight (I'm in Europe) if the number of
threads is a problem, I've never tried to up the number of threads before
but we're currently at around 1500 threads and it seems to create a new one
every second or so :( A restart should do the trick but it's not good. In
the mean time I'm converting our code to use throttle so hopefully that
should be ok. It does seem a bit dodgy that I could write such innocent
looking code and end up with all these threads though....
Post by Zach Tellman
Hey David,
`periodically` uses a ScheduledThreadPoolExecutor under the covers,
which will generate threads as needed to schedule the tasks at the given
interval. I had thought it would lazily generate those as needed, but
after some tests I realize that it will spin up as many threads as there
are cores even if you're doing something trivial. This is kinda harmless,
but obviously not exactly what you'd hope for. Is this behavior breaking
anything for you, or were you just surprised and curious as to what's going
on?
Zach
Post by David Smith
BTW I am moving across to `stream/throttle` but still I need to
understand what is wrong here.
Post by David Smith
I've created a token bucket using `stream/periodically`
(defn token-channel
[period buffer size]
(let [token-input (stream/stream)
tc (stream/periodically period (fn [] :token))
out-chan (stream/stream)]
(stream/consume (fn [msg]
(stream/put! out-chan msg)
(swap! buffer inc)))
tc)
[out-chan token-input]))
I previously had something similar with lamina however now we seem to
be getting quite high CPU all the time and I see that loads of daemon
threads are being created by `manifold.utils/thread-factory`. I've
attached an html export from VisualVM where you can see
`manifold.time/every` seems to be triggering this creation of daemon
threads. This is beyond my understanding at the moment so any help would
be appreciated.
--
You received this message because you are subscribed to the Google
Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an email to aleph-lib+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Zach Tellman
2015-04-22 22:02:09 UTC
Permalink
So something to remember is that the time spent not only encompasses the
invoking of the function, but also the actions being taken downstream of
the `periodically` stream, such that all downstream operations take place
on that executor rather than synchronously. I notice that you're doing
some sort of blocking operation inside the `consume`, which is likely the
culprit.

If you want to avoid this, you can connect the periodically stream to a
stream which is "on" an executor, like so:

(s/connect (s/periodically ...) (s/stream* {:executor executor}))

This will make it so the consumption of messages takes place on a different
thread. However, if you can just use the `throttle` operator that seems
best.

Zach
Post by David Smith
Hmm, well the task assigned to periodically just returns `:token` so not
(defn limit-stream "returns a channel whose output rate will be limited
to one message every period ms. If no messages are enqueued for a time
then we allow a 'burst' for new messages. This burst is limited by the
buffer-size. Lets assume a period of 1 second and buffer-size of 10 *
We leave the queue empty for 10 seconds * We put 30 messages in the
queue * 10 messages will be sent immediately * The remaining 20
messsages will be limited to 1 per second You should set the period to
be faster than you expect to enqueue messages, otherwise you will fill
up the queue until it bursts! The buffer should be set to the largest
number of messages you want to be able to send in one burst." [in period
buffer-size] (let [buffer (atom 0) out-channel (stream/stream)
[tc token-input] (token-channel period buffer buffer-size)] (
stream/consume (fn [msg] (stream/put! token-input 1)
@(stream/take! tc) (swap!
buffer dec) (stream/put! in msg))
out-channel) out-channel))
Post by Zach Tellman
Oh! That's not correct, it should definitely only be a fixed number of
threads. Is it possible that the task you're assigning it takes longer
than the interval `periodically` is being run at?
Post by David Smith
I think it might to break prod tonight (I'm in Europe) if the number of
threads is a problem, I've never tried to up the number of threads before
but we're currently at around 1500 threads and it seems to create a new one
every second or so :( A restart should do the trick but it's not good. In
the mean time I'm converting our code to use throttle so hopefully that
should be ok. It does seem a bit dodgy that I could write such innocent
looking code and end up with all these threads though....
Post by Zach Tellman
Hey David,
`periodically` uses a ScheduledThreadPoolExecutor under the covers,
which will generate threads as needed to schedule the tasks at the given
interval. I had thought it would lazily generate those as needed, but
after some tests I realize that it will spin up as many threads as there
are cores even if you're doing something trivial. This is kinda harmless,
but obviously not exactly what you'd hope for. Is this behavior breaking
anything for you, or were you just surprised and curious as to what's going
on?
Zach
Post by David Smith
BTW I am moving across to `stream/throttle` but still I need to
understand what is wrong here.
Post by David Smith
I've created a token bucket using `stream/periodically`
(defn token-channel
[period buffer size]
(let [token-input (stream/stream)
tc (stream/periodically period (fn [] :token))
out-chan (stream/stream)]
(stream/consume (fn [msg]
(stream/put! out-chan msg)
(swap! buffer inc)))
tc)
[out-chan token-input]))
I previously had something similar with lamina however now we seem to
be getting quite high CPU all the time and I see that loads of daemon
threads are being created by `manifold.utils/thread-factory`. I've
attached an html export from VisualVM where you can see
`manifold.time/every` seems to be triggering this creation of daemon
threads. This is beyond my understanding at the moment so any help would
be appreciated.
--
You received this message because you are subscribed to the Google
Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google
Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an email to aleph-lib+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
David Smith
2015-04-23 06:57:47 UTC
Permalink
Ok great, it is indeed triggering a function that could take a while.
Unfortunately I don't really have time to play around with this to fully
understand and I am sure that your implementation of throttling will be
better than mine :)
Post by Zach Tellman
So something to remember is that the time spent not only encompasses the
invoking of the function, but also the actions being taken downstream of
the `periodically` stream, such that all downstream operations take place
on that executor rather than synchronously. I notice that you're doing
some sort of blocking operation inside the `consume`, which is likely the
culprit.
If you want to avoid this, you can connect the periodically stream to a
(s/connect (s/periodically ...) (s/stream* {:executor executor}))
This will make it so the consumption of messages takes place on a
different thread. However, if you can just use the `throttle` operator
that seems best.
Zach
Post by David Smith
Hmm, well the task assigned to periodically just returns `:token` so not
(defn limit-stream "returns a channel whose output rate will be limited
to one message every period ms. If no messages are enqueued for a time
then we allow a 'burst' for new messages. This burst is limited by the
buffer-size. Lets assume a period of 1 second and buffer-size of 10
* We leave the queue empty for 10 seconds * We put 30 messages in
the queue * 10 messages will be sent immediately * The remaining
20 messsages will be limited to 1 per second You should set the period
to be faster than you expect to enqueue messages, otherwise you will
fill up the queue until it bursts! The buffer should be set to the
largest number of messages you want to be able to send in one burst." [
in period buffer-size] (let [buffer (atom 0) out-channel (stream/
stream) [tc token-input] (token-channel period buffer buffer-size
)] (stream/consume (fn [msg] (stream/put! token-input
buffer dec) (stream/put! in msg))
out-channel) out-channel))
Post by Zach Tellman
Oh! That's not correct, it should definitely only be a fixed number of
threads. Is it possible that the task you're assigning it takes longer
than the interval `periodically` is being run at?
Post by David Smith
I think it might to break prod tonight (I'm in Europe) if the number of
threads is a problem, I've never tried to up the number of threads before
but we're currently at around 1500 threads and it seems to create a new one
every second or so :( A restart should do the trick but it's not good. In
the mean time I'm converting our code to use throttle so hopefully that
should be ok. It does seem a bit dodgy that I could write such innocent
looking code and end up with all these threads though....
Post by Zach Tellman
Hey David,
`periodically` uses a ScheduledThreadPoolExecutor under the covers,
which will generate threads as needed to schedule the tasks at the given
interval. I had thought it would lazily generate those as needed, but
after some tests I realize that it will spin up as many threads as there
are cores even if you're doing something trivial. This is kinda harmless,
but obviously not exactly what you'd hope for. Is this behavior breaking
anything for you, or were you just surprised and curious as to what's going
on?
Zach
Post by David Smith
BTW I am moving across to `stream/throttle` but still I need to
understand what is wrong here.
Post by David Smith
I've created a token bucket using `stream/periodically`
(defn token-channel
[period buffer size]
(let [token-input (stream/stream)
tc (stream/periodically period (fn [] :token))
out-chan (stream/stream)]
(stream/consume (fn [msg]
(stream/put! out-chan msg)
(swap! buffer inc)))
tc)
[out-chan token-input]))
I previously had something similar with lamina however now we seem
to be getting quite high CPU all the time and I see that loads of daemon
threads are being created by `manifold.utils/thread-factory`. I've
attached an html export from VisualVM where you can see
`manifold.time/every` seems to be triggering this creation of daemon
threads. This is beyond my understanding at the moment so any help would
be appreciated.
--
You received this message because you are subscribed to the Google
Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it,
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google
Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an email to aleph-lib+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
David Smith
2015-04-27 15:19:36 UTC
Permalink
FYI manifold.stream/throttle working well, thanks :)
Post by David Smith
Ok great, it is indeed triggering a function that could take a while.
Unfortunately I don't really have time to play around with this to fully
understand and I am sure that your implementation of throttling will be
better than mine :)
Post by Zach Tellman
So something to remember is that the time spent not only encompasses the
invoking of the function, but also the actions being taken downstream of
the `periodically` stream, such that all downstream operations take place
on that executor rather than synchronously. I notice that you're doing
some sort of blocking operation inside the `consume`, which is likely the
culprit.
If you want to avoid this, you can connect the periodically stream to a
(s/connect (s/periodically ...) (s/stream* {:executor executor}))
This will make it so the consumption of messages takes place on a
different thread. However, if you can just use the `throttle` operator
that seems best.
Zach
Post by David Smith
Hmm, well the task assigned to periodically just returns `:token` so not
(defn limit-stream "returns a channel whose output rate will be
limited to one message every period ms. If no messages are enqueued
for a time then we allow a 'burst' for new messages. This burst is
limited by the buffer-size. Lets assume a period of 1 second and
buffer-size of 10 * We leave the queue empty for 10 seconds * We
put 30 messages in the queue * 10 messages will be sent immediately
* The remaining 20 messsages will be limited to 1 per second You
should set the period to be faster than you expect to enqueue messages,
otherwise you will fill up the queue until it bursts! The buffer
should be set to the largest number of messages you want to be able to send
in one burst." [in period buffer-size] (let [buffer (atom 0)
out-channel (stream/stream) [tc token-input] (token-channel
period buffer buffer-size)] (stream/consume (fn [msg]
tc) (swap! buffer dec) (stream
/put! in msg)) out-channel) out-channel))
Post by Zach Tellman
Oh! That's not correct, it should definitely only be a fixed number of
threads. Is it possible that the task you're assigning it takes longer
than the interval `periodically` is being run at?
Post by David Smith
I think it might to break prod tonight (I'm in Europe) if the number
of threads is a problem, I've never tried to up the number of threads
before but we're currently at around 1500 threads and it seems to create a
new one every second or so :( A restart should do the trick but it's not
good. In the mean time I'm converting our code to use throttle so
hopefully that should be ok. It does seem a bit dodgy that I could write
such innocent looking code and end up with all these threads though....
Post by Zach Tellman
Hey David,
`periodically` uses a ScheduledThreadPoolExecutor under the covers,
which will generate threads as needed to schedule the tasks at the given
interval. I had thought it would lazily generate those as needed, but
after some tests I realize that it will spin up as many threads as there
are cores even if you're doing something trivial. This is kinda harmless,
but obviously not exactly what you'd hope for. Is this behavior breaking
anything for you, or were you just surprised and curious as to what's going
on?
Zach
Post by David Smith
BTW I am moving across to `stream/throttle` but still I need to
understand what is wrong here.
Post by David Smith
I've created a token bucket using `stream/periodically`
(defn token-channel
[period buffer size]
(let [token-input (stream/stream)
tc (stream/periodically period (fn [] :token))
out-chan (stream/stream)]
(stream/consume (fn [msg]
(stream/put! out-chan msg)
(swap! buffer inc)))
tc)
[out-chan token-input]))
I previously had something similar with lamina however now we seem
to be getting quite high CPU all the time and I see that loads of daemon
threads are being created by `manifold.utils/thread-factory`. I've
attached an html export from VisualVM where you can see
`manifold.time/every` seems to be triggering this creation of daemon
threads. This is beyond my understanding at the moment so any help would
be appreciated.
--
You received this message because you are subscribed to the Google
Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it,
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google
Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google
Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an email to aleph-lib+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Loading...