Limit concurrency with Aff / Aff Pool

I need to run many http requests in parallel, but limit number of cuncurrent requests to require("os").cpus().length (P.S. it doesn’t make sense, node is one threaded, number should be custom)

Something like this for promises:

  1. https://github.com/rxaviers/async-pool

Probably I need functions like this

parWithLimit ::
  forall a .
  Int ->
  Array (Aff a) ->
  Aff (Array (Either Error a))
parWithLimit limit actions = ...

parWithLimitAll ::
  forall a .
  Int ->
  Array (Aff a) ->
  Aff ( Either
        { index :: Int
        , error :: Error
        }
        (Array a)
      )
parWithLimitAll limit actions = ...

Have anyone implemented something like this so far? (I have a feeling of deja vu)

2 Likes

Maybe this (untested)

import Prelude
import Control.Parallel (parSequence)
import Data.Array as Arr
import Effect.Aff (Aff)

parLimitSequence :: forall a. Int -> Array (Aff a) -> Aff (Array a)
parLimitSequence limit actions
  | Arr.null actions = pure []
  | otherwise =
    let
      batch = Arr.take limit actions
      pending = Arr.drop limit actions
    in
      append <$> parSequence batch <*> parLimitSequence limit pending
4 Likes

I usually just do this ad-hoc, but I suppose it should be in some sort of aff-utils like library. This will work for any Traversable, by using an AVar as a lock/queue.

It probably warrants some additional exception safety, but it gets the job done.

5 Likes