So something to remember is that the time spent not only encompasses the
invoking of the function, but also the actions being taken downstream of
the `periodically` stream, such that all downstream operations take place
on that executor rather than synchronously. I notice that you're doing
some sort of blocking operation inside the `consume`, which is likely the
culprit.
If you want to avoid this, you can connect the periodically stream to a
stream which is "on" an executor, like so:
(s/connect (s/periodically ...) (s/stream* {:executor executor}))
This will make it so the consumption of messages takes place on a different
thread. However, if you can just use the `throttle` operator that seems
best.
Zach
Post by David SmithHmm, well the task assigned to periodically just returns `:token` so not
(defn limit-stream "returns a channel whose output rate will be limited
to one message every period ms. If no messages are enqueued for a time
then we allow a 'burst' for new messages. This burst is limited by the
buffer-size. Lets assume a period of 1 second and buffer-size of 10 *
We leave the queue empty for 10 seconds * We put 30 messages in the
queue * 10 messages will be sent immediately * The remaining 20
messsages will be limited to 1 per second You should set the period to
be faster than you expect to enqueue messages, otherwise you will fill
up the queue until it bursts! The buffer should be set to the largest
number of messages you want to be able to send in one burst." [in period
buffer-size] (let [buffer (atom 0) out-channel (stream/stream)
[tc token-input] (token-channel period buffer buffer-size)] (
stream/consume (fn [msg] (stream/put! token-input 1)
@(stream/take! tc) (swap!
buffer dec) (stream/put! in msg))
out-channel) out-channel))
Post by Zach TellmanOh! That's not correct, it should definitely only be a fixed number of
threads. Is it possible that the task you're assigning it takes longer
than the interval `periodically` is being run at?
Post by David SmithI think it might to break prod tonight (I'm in Europe) if the number of
threads is a problem, I've never tried to up the number of threads before
but we're currently at around 1500 threads and it seems to create a new one
every second or so :( A restart should do the trick but it's not good. In
the mean time I'm converting our code to use throttle so hopefully that
should be ok. It does seem a bit dodgy that I could write such innocent
looking code and end up with all these threads though....
Post by Zach TellmanHey David,
`periodically` uses a ScheduledThreadPoolExecutor under the covers,
which will generate threads as needed to schedule the tasks at the given
interval. I had thought it would lazily generate those as needed, but
after some tests I realize that it will spin up as many threads as there
are cores even if you're doing something trivial. This is kinda harmless,
but obviously not exactly what you'd hope for. Is this behavior breaking
anything for you, or were you just surprised and curious as to what's going
on?
Zach
Post by David SmithBTW I am moving across to `stream/throttle` but still I need to
understand what is wrong here.
Post by David SmithI've created a token bucket using `stream/periodically`
(defn token-channel
[period buffer size]
(let [token-input (stream/stream)
tc (stream/periodically period (fn [] :token))
out-chan (stream/stream)]
(stream/consume (fn [msg]
(stream/put! out-chan msg)
(swap! buffer inc)))
tc)
[out-chan token-input]))
I previously had something similar with lamina however now we seem to
be getting quite high CPU all the time and I see that loads of daemon
threads are being created by `manifold.utils/thread-factory`. I've
attached an html export from VisualVM where you can see
`manifold.time/every` seems to be triggering this creation of daemon
threads. This is beyond my understanding at the moment so any help would
be appreciated.
--
You received this message because you are subscribed to the Google
Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google
Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an email to aleph-lib+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.