Discussion:
Aleph (client) & Back-pressure
Reynald Borer
2018-07-02 11:06:36 UTC
Permalink
Hi everyone,

I'm currently integrating Aleph (client part) into a platform built on
Kafka messaging bus in order to crawl a lot of URLs. To prevent loss of
messages and handle spikes of load, I need to handle back-pressure to
throttle Kafka messages consumption when Aleph is busy.

To achieve that, I've built a solution on top of a java Semaphore, in order
to allow for a maximum of X concurrent crawling requests at a time (I
couldn't find any other way to achieve this with either Aleph or Manifold).
Here is a (very simplified) example of code I am using:

(defn crawl
[^Semaphore s url]
; blocking if full
(.acquire s)
(try
(-> url
(http/get {:option :goes :here :dummy})
(d/chain'
(do-some-processing-here))
(d/catch' Exception
(exception-handler-here))
(d/finally'
(.release s)))
(catch Exception e
(.release s)
(d/error-deferred e))))

Now here comes the problem: it seems to randomly *not* release a Semaphore
permit, like if the chain doesn't go until the end!

In my calling code, I don't do anything with the returned deferred (that
is, it is never deferenced). I assume that once I have called my crawl
function, the execution happens in a separate thread so I don't need to do
anything with the returned deferred.

This issue is starting to drive my crazy. Is this supposed to work as I
expect it or I have completely misunderstood deferred principle?

Thanks in advance if you're able to shed some light on my problem.

Cheers,
Reynald
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an email to aleph-lib+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Alexey Kachayev
2018-07-02 11:26:12 UTC
Permalink
Hello Reynald,

We implemented backpressure for the server-side use case using
j.u.c.Semaphore as well, I've mentioned it here:
https://speakerdeck.com/kachayev/clojure-at-attendify-2nd-ed?slide=30. But
operating with semaphores might be pretty clumsy due to the nature of the
problem and typical issues with concurrent coordination. I'm not sure if
this is the best approach for the client side, maybe it's better to play
with the total number of connections & the queue size for the pool? I mean
you don't need 1-to-1 control, you just need to be sure it won't exceed
some reasonable threshold. Would it work for your use case?

From the code you've shared, I can suggest moving release to final instead
of catch, as right now you're obviously missing a few valuable exceptions,
like IOError, ThreadDeath etc (those are not subclassed of Exception).

Let me know how it goes, I'm really curious about your progress with the
project you're working on!

BR,
Post by Reynald Borer
Hi everyone,
I'm currently integrating Aleph (client part) into a platform built on
Kafka messaging bus in order to crawl a lot of URLs. To prevent loss of
messages and handle spikes of load, I need to handle back-pressure to
throttle Kafka messages consumption when Aleph is busy.
To achieve that, I've built a solution on top of a java Semaphore, in
order to allow for a maximum of X concurrent crawling requests at a time (I
couldn't find any other way to achieve this with either Aleph or Manifold).
(defn crawl
[^Semaphore s url]
; blocking if full
(.acquire s)
(try
(-> url
(http/get {:option :goes :here :dummy})
(d/chain'
(do-some-processing-here))
(d/catch' Exception
(exception-handler-here))
(d/finally'
(.release s)))
(catch Exception e
(.release s)
(d/error-deferred e))))
Now here comes the problem: it seems to randomly *not* release a Semaphore
permit, like if the chain doesn't go until the end!
In my calling code, I don't do anything with the returned deferred (that
is, it is never deferenced). I assume that once I have called my crawl
function, the execution happens in a separate thread so I don't need to do
anything with the returned deferred.
This issue is starting to drive my crazy. Is this supposed to work as I
expect it or I have completely misunderstood deferred principle?
Thanks in advance if you're able to shed some light on my problem.
Cheers,
Reynald
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an
For more options, visit https://groups.google.com/d/optout.
--
Oleksii Kachaiev,
CTO | Attendify
M: +1-650-451-2271
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an email to aleph-lib+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Alexey Kachayev
2018-07-02 11:40:54 UTC
Permalink
Hello again,

My bad 😞Block final would not work as you create deferred there. Missed
that. In such a case you can capture Throwable instead of Exception. But
better to try to deal with the limitation using pool settings and avoiding
semaphores.

BR,
Post by Alexey Kachayev
Hello Reynald,
We implemented backpressure for the server-side use case using
https://speakerdeck.com/kachayev/clojure-at-attendify-2nd-ed?slide=30.
But operating with semaphores might be pretty clumsy due to the nature of
the problem and typical issues with concurrent coordination. I'm not sure
if this is the best approach for the client side, maybe it's better to play
with the total number of connections & the queue size for the pool? I mean
you don't need 1-to-1 control, you just need to be sure it won't exceed
some reasonable threshold. Would it work for your use case?
From the code you've shared, I can suggest moving release to final
instead of catch, as right now you're obviously missing a few
valuable exceptions, like IOError, ThreadDeath etc (those are not
subclassed of Exception).
Let me know how it goes, I'm really curious about your progress with the
project you're working on!
BR,
Post by Reynald Borer
Hi everyone,
I'm currently integrating Aleph (client part) into a platform built on
Kafka messaging bus in order to crawl a lot of URLs. To prevent loss of
messages and handle spikes of load, I need to handle back-pressure to
throttle Kafka messages consumption when Aleph is busy.
To achieve that, I've built a solution on top of a java Semaphore, in
order to allow for a maximum of X concurrent crawling requests at a time (I
couldn't find any other way to achieve this with either Aleph or Manifold).
(defn crawl
[^Semaphore s url]
; blocking if full
(.acquire s)
(try
(-> url
(http/get {:option :goes :here :dummy})
(d/chain'
(do-some-processing-here))
(d/catch' Exception
(exception-handler-here))
(d/finally'
(.release s)))
(catch Exception e
(.release s)
(d/error-deferred e))))
Now here comes the problem: it seems to randomly *not* release a
Semaphore permit, like if the chain doesn't go until the end!
In my calling code, I don't do anything with the returned deferred (that
is, it is never deferenced). I assume that once I have called my crawl
function, the execution happens in a separate thread so I don't need to do
anything with the returned deferred.
This issue is starting to drive my crazy. Is this supposed to work as I
expect it or I have completely misunderstood deferred principle?
Thanks in advance if you're able to shed some light on my problem.
Cheers,
Reynald
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an
For more options, visit https://groups.google.com/d/optout.
--
Oleksii Kachaiev,
CTO | Attendify
M: +1-650-451-2271
--
Oleksii Kachaiev,
CTO | Attendify
M: +1-650-451-2271
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an email to aleph-lib+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Reynald Borer
2018-07-02 12:36:19 UTC
Permalink
Hi Alexey,

Thanks a lot for your feedback, happy to see I'm not the only one trying to
use a Semaphore within Aleph :-)

I'll try switching my catch block to Throwable, maybe it's hiding something
in there.

I had considered playing with the parameters total-connections
and max-queue-size in the past yes. If I have understood them properly,
once we reach total-connections + max-queue-size, call to http/get will
receive a RejectedExecutionException, right? Do you know any mean to
configure the pool to transform this operation as blocking instead?

Of course, I could write a little wrapper that would retry (with an
exponential backoff) as long as it gets a RejectedExecutionException, but
this would mean more CPU cycles burnt only for waiting.

Cheers,
Reynald
Post by Alexey Kachayev
Hello again,
My bad 😞Block final would not work as you create deferred there. Missed
that. In such a case you can capture Throwable instead of Exception. But
better to try to deal with the limitation using pool settings and avoiding
semaphores.
BR,
Post by Alexey Kachayev
Hello Reynald,
We implemented backpressure for the server-side use case using
https://speakerdeck.com/kachayev/clojure-at-attendify-2nd-ed?slide=30.
But operating with semaphores might be pretty clumsy due to the nature of
the problem and typical issues with concurrent coordination. I'm not sure
if this is the best approach for the client side, maybe it's better to play
with the total number of connections & the queue size for the pool? I mean
you don't need 1-to-1 control, you just need to be sure it won't exceed
some reasonable threshold. Would it work for your use case?
From the code you've shared, I can suggest moving release to final
instead of catch, as right now you're obviously missing a few
valuable exceptions, like IOError, ThreadDeath etc (those are not
subclassed of Exception).
Let me know how it goes, I'm really curious about your progress with the
project you're working on!
BR,
Post by Reynald Borer
Hi everyone,
I'm currently integrating Aleph (client part) into a platform built on
Kafka messaging bus in order to crawl a lot of URLs. To prevent loss of
messages and handle spikes of load, I need to handle back-pressure to
throttle Kafka messages consumption when Aleph is busy.
To achieve that, I've built a solution on top of a java Semaphore, in
order to allow for a maximum of X concurrent crawling requests at a time (I
couldn't find any other way to achieve this with either Aleph or Manifold).
(defn crawl
[^Semaphore s url]
; blocking if full
(.acquire s)
(try
(-> url
(http/get {:option :goes :here :dummy})
(d/chain'
(do-some-processing-here))
(d/catch' Exception
(exception-handler-here))
(d/finally'
(.release s)))
(catch Exception e
(.release s)
(d/error-deferred e))))
Now here comes the problem: it seems to randomly *not* release a
Semaphore permit, like if the chain doesn't go until the end!
In my calling code, I don't do anything with the returned deferred (that
is, it is never deferenced). I assume that once I have called my crawl
function, the execution happens in a separate thread so I don't need to do
anything with the returned deferred.
This issue is starting to drive my crazy. Is this supposed to work as I
expect it or I have completely misunderstood deferred principle?
Thanks in advance if you're able to shed some light on my problem.
Cheers,
Reynald
--
You received this message because you are subscribed to the Google
Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send
For more options, visit https://groups.google.com/d/optout.
--
Oleksii Kachaiev,
CTO | Attendify
M: +1-650-451-2271
--
Oleksii Kachaiev,
CTO | Attendify
M: +1-650-451-2271
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an email to aleph-lib+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Loading...