-
Notifications
You must be signed in to change notification settings - Fork 62
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Stream.buffer with TimeSpan combinator #132
Comments
Hi, I'm very busy this week, so not sure when I have time to look into this in full detail. After a quick look I believe you are right to be a bit suspicious of the use of |
See the pull request #133 for a draft version of I spent some time drafting code for this and it seems that there are a number of things time based buffer should handle properly. First of all the timeout should be respected as mentioned in my previous comment. Also, if there are no elements, then no timeout should be started to avoid busy-waiting (start timeout -> nothing received -> start timeout ...). There are potentially many ways to handle this. The approach I chose is to start the timeout when the first element is received. Third issue is that, unlike with plain |
Thanks! Will test it today. |
This code works well, will think how to write tests. |
I don't know if it better to open new issue or comment here. I have taken a look the code of First one is about implementing blocking back-pressure for streams. I mean it should convert this stream to "eager" one that consumes source stream into the limited cache and blocks when cache reaches the limits. A second one is pretty same except one thing - it discards the elements using specified handler The use case: an application receives the data from 3rd party server and stores it into a database, for example. And there are two possible behaviors. It stops pulling the data from the source when the database is unreachable or unresponsive, or discards oldest samples of data with a proper handler (it could store it on the disc and I need to handle discarded files to delete them properly). It's very likely that I want something wrong or I am trying to abuse wrong primitives. |
First of all I'd like to just say that some of the stream combinators have fairly subtle implementations. Indeed, it has not been a priori obvious (to me) that some of the combinators are even possible. The implementation of (sequential) lazy streams is subtle business already. Adding concurrency and non-deterministic choice to the mix opens a whole new dimension of subtle. The stream buffering combinators you describe sound fairly straight-forward, but after reading the use case, I'm not sure it needs a stream combinator. Does the use case require a stream transformer ( |
The simple usecase: The worker gets data from a remote source (RabbitMQ), bulks it by chunk of 100k entries (or each 30sec), stores it on disk and then try to push it into database. Database is not available sometime however if it works it can handle stored amount of data. So the code is quite simple: let flushTimeout = timeOut <| TimeSpan.FromSeconds 30.
let storeToDisk (chunk, ack) =
var file = saveToFile chunk
ack ()
chunk, File.Delete file
let reportDiscard = notImpl ""
let liveStream =
subscribe sources
|> Stream.map (Stream.bufferTime 100000 flushTimeout)
|> Stream.mergeAll
|> Stream.mapFun (fun chunks -> ResizeArray.map fst chunks, snd chunks.[chunks.Length - 1])
|> Stream.mapJob (fun (chunk, ack) -> storeToDisk chunk)
let! savedChunks = readSavedChunks ()
do! savedChunks
|> Stream.append liveStream
|> Stream.discardingCache 10000 reportDiscard // or blockingCache
|> Stream.iterJob (storeToDataBase >=> deleteTempFile) I would prefer to have the functional like It has been implemented in more imperative way with channels, mailboxes and workers and I'm looking for simplifying the code using Stream combinators. |
And this is naive implementation: let blockingCache capacity (xs: Stream<'t>) : Stream<'t> =
if capacity < 1 then failwith "Capacity < 1" else
let mb = BoundedMb capacity
Job.iterateServer xs (fun xs ->
xs ^=>
function
| Nil -> BoundedMb.put mb None >>=. Alt.never ()
| Cons (x, xs) -> BoundedMb.put mb (Some x) >>-. xs)
>>= Stream.unfoldJob (fun _ -> BoundedMb.take mb >>- Option.map (fun x -> x, ()))
|> memo
let discardingCache capacity dj (xs: Stream<'t>) : Stream<'t> =
if capacity < 2 then failwith "Capacity < 2" else
let mb = BoundedMb capacity
let ch = Ch()
Job.iterateServer (0, xs) (fun (count, xs) ->
if count >= capacity then BoundedMb.take mb >>= fun x -> dj (Option.get x) >>-. (count - 1, xs)
else
Alt.choose [ xs ^=> function
| Cons (x, xs) -> BoundedMb.put mb (Some x) >>-. (count + 1, xs)
| Nil -> BoundedMb.put mb None >>= Alt.never
(Ch.take ch ^=> (fun reply -> Job.start (BoundedMb.take mb >>= IVar.fill reply)))
^->. (count - 1, xs) ] :> _)
>>= Stream.unfoldJob (fun _ -> job { let reply = IVar()
do! Ch.send ch reply
let! consOrNil = IVar.read reply
return Option.map (fun x -> x, ()) consOrNil })
|> memo |
You may also be interested in RingBuffer @pavelhritonenko |
Hello.
I need buffer stream events not only by buffer size but by some TimeOut too.
For example - I need flush statistic to DB from external source by batches of size 10000 or every 30 seconds (if there are any entries). What is the best way to achieve it? I modified
buffer
function in this way and it even works however I don't likeDateTime.UtcNow
here. Is there more convient way?The text was updated successfully, but these errors were encountered: