Discussion:
Batching incoming messages on tcp server
vvadi
2015-09-13 05:49:28 UTC
Permalink
Hi Zach,
Is there a way to batch the messages coming to the tcp server.

as per your example code in tcp.clj
https://github.com/ztellman/aleph/blob/master/examples/src/aleph/examples/tcp.clj#L159

(d/let-flow [msg (s/take! s ::none)]

Can we take first 1000 messages or the batch of messages < 1000 , until we
hit :none.

In our implementation, we have less number clients ( < 5 ), but they send
data at very high rate > 10K/Sec. We need a server that can batch requests
and process them in bulk.

Thanks,
Kalidas
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an email to aleph-lib+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Zach Tellman
2015-09-13 17:11:54 UTC
Permalink
check out the `batch` operator in `manifold.stream`.
Post by vvadi
Hi Zach,
Is there a way to batch the messages coming to the tcp server.
as per your example code in tcp.clj
https://github.com/ztellman/aleph/blob/master/examples/src/aleph/examples/tcp.clj#L159
(d/let-flow [msg (s/take! s ::none)]
Can we take first 1000 messages or the batch of messages < 1000 , until we
hit :none.
In our implementation, we have less number clients ( < 5 ), but they send
data at very high rate > 10K/Sec. We need a server that can batch requests
and process them in bulk.
Thanks,
Kalidas
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an email to aleph-lib+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
vvadi
2015-09-15 05:18:18 UTC
Permalink
Hi Zach,

I'm using batch command as

(d/let-flow [ bt (s/batch 100 100 s) msgv (s/stream->seq bt 10) ]

It is not doing any processing in this mode.

What is the correct way to use batch on a incoming stream ?

Thanks,
Kalidas
Post by Zach Tellman
check out the `batch` operator in `manifold.stream`.
Post by vvadi
Hi Zach,
Is there a way to batch the messages coming to the tcp server.
as per your example code in tcp.clj
https://github.com/ztellman/aleph/blob/master/examples/src/aleph/examples/tcp.clj#L159
(d/let-flow [msg (s/take! s ::none)]
Can we take first 1000 messages or the batch of messages < 1000 , until
we hit :none.
In our implementation, we have less number clients ( < 5 ), but they send
data at very high rate > 10K/Sec. We need a server that can batch requests
and process them in bulk.
Thanks,
Kalidas
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an email to aleph-lib+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Zach Tellman
2015-09-16 20:41:06 UTC
Permalink
Are you doing anything with the seq you're creating? It's a lazy sequence,
so unless you start iterating over it nothing will happen.
Post by vvadi
Hi Zach,
I'm using batch command as
(d/let-flow [ bt (s/batch 100 100 s) msgv (s/stream->seq bt 10) ]
It is not doing any processing in this mode.
What is the correct way to use batch on a incoming stream ?
Thanks,
Kalidas
Post by Zach Tellman
check out the `batch` operator in `manifold.stream`.
Hi Zach,
Post by Zach Tellman
Post by vvadi
Is there a way to batch the messages coming to the tcp server.
as per your example code in tcp.clj
https://github.com/ztellman/aleph/blob/master/examples/src/aleph/examples/tcp.clj#L159
(d/let-flow [msg (s/take! s ::none)]
Can we take first 1000 messages or the batch of messages < 1000 , until
we hit :none.
In our implementation, we have less number clients ( < 5 ), but they
send data at very high rate > 10K/Sec. We need a server that can batch
requests and process them in bulk.
Thanks,
Kalidas
--
You received this message because you are subscribed to the Google
Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an
Post by vvadi
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an email to aleph-lib+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
vvadi
2015-09-17 14:44:56 UTC
Permalink
Hi Zach,
I'm just iterating over the seq, as in this code.

Thanks,
Kalidas

(defn batch-data-handler
[f]
(fn [s info]
(d/loop []
(->
(d/let-flow [ bt (s/batch 1000 10 s) msgv (s/stream->seq bt 10)]
(when-not (empty? msgv)
(doseq [msg msgv]
(d/let-flow [ msg' (d/future (f msg))
result (s/put! s msg')] )))
(d/recur))
(d/catch
(fn [ex]
(s/put! s (str "ERROR: " ex))
(s/close! s)))))))
Post by Zach Tellman
Are you doing anything with the seq you're creating? It's a lazy
sequence, so unless you start iterating over it nothing will happen.
Post by vvadi
Hi Zach,
I'm using batch command as
(d/let-flow [ bt (s/batch 100 100 s) msgv (s/stream->seq bt 10) ]
It is not doing any processing in this mode.
What is the correct way to use batch on a incoming stream ?
Thanks,
Kalidas
Post by Zach Tellman
check out the `batch` operator in `manifold.stream`.
Hi Zach,
Post by Zach Tellman
Post by vvadi
Is there a way to batch the messages coming to the tcp server.
as per your example code in tcp.clj
https://github.com/ztellman/aleph/blob/master/examples/src/aleph/examples/tcp.clj#L159
(d/let-flow [msg (s/take! s ::none)]
Can we take first 1000 messages or the batch of messages < 1000 , until
we hit :none.
In our implementation, we have less number clients ( < 5 ), but they
send data at very high rate > 10K/Sec. We need a server that can batch
requests and process them in bulk.
Thanks,
Kalidas
--
You received this message because you are subscribed to the Google
Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send
Post by vvadi
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an email to aleph-lib+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Atamert Ölçgen
2015-09-17 19:46:05 UTC
Permalink
Post by vvadi
Hi Zach,
I'm just iterating over the seq, as in this code.
Thanks,
Kalidas
(defn batch-data-handler
[f]
(fn [s info]
(d/loop []
(->
(d/let-flow [ bt (s/batch 1000 10 s) msgv (s/stream->seq bt 10)]
(when-not (empty? msgv)
(doseq [msg msgv]
(d/let-flow [ msg' (d/future (f msg))
result (s/put! s msg')] )))
(d/recur))
(d/catch
(fn [ex]
(s/put! s (str "ERROR: " ex))
(s/close! s)))))))
Shouldn't s/batch & s/steam->seq calls be made once, outside of the loop?
Post by vvadi
Post by Zach Tellman
Are you doing anything with the seq you're creating? It's a lazy
sequence, so unless you start iterating over it nothing will happen.
Post by vvadi
Hi Zach,
I'm using batch command as
(d/let-flow [ bt (s/batch 100 100 s) msgv (s/stream->seq bt 10) ]
It is not doing any processing in this mode.
What is the correct way to use batch on a incoming stream ?
Thanks,
Kalidas
Post by Zach Tellman
check out the `batch` operator in `manifold.stream`.
Hi Zach,
Post by Zach Tellman
Post by vvadi
Is there a way to batch the messages coming to the tcp server.
as per your example code in tcp.clj
https://github.com/ztellman/aleph/blob/master/examples/src/aleph/examples/tcp.clj#L159
(d/let-flow [msg (s/take! s ::none)]
Can we take first 1000 messages or the batch of messages < 1000 ,
until we hit :none.
In our implementation, we have less number clients ( < 5 ), but they
send data at very high rate > 10K/Sec. We need a server that can batch
requests and process them in bulk.
Thanks,
Kalidas
--
You received this message because you are subscribed to the Google
Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send
Post by vvadi
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google
Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an
For more options, visit https://groups.google.com/d/optout.
--
Kind Regards,
Atamert Ölçgen

◻◌◻
◻◻◌
◌◌◌

www.muhuk.com
--
You received this message because you are subscribed to the Google Groups "Aleph" group.
To unsubscribe from this group and stop receiving emails from it, send an email to aleph-lib+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Loading...