Limit concurrency with Aff / Aff Pool

I need to run many http requests in parallel, but limit number of cuncurrent requests to require("os").cpus().length

Something like this for promises:

  1. https://github.com/rxaviers/async-pool

Probably I need functions like this

parWithLimit ::
  forall a .
  Int ->
  Array (Aff a) ->
  Aff (Array (Either Error a))
parWithLimit limit actions = ...

parWithLimitAll ::
  forall a .
  Int ->
  Array (Aff a) ->
  Aff ( Either
        { index :: Int
        , error :: Error
        }
        (Array a)
      )
parWithLimitAll limit actions = ...

Have anyone implemented something like this so far? (I have a feeling of deja vu)

Maybe this (untested)

import Prelude
import Control.Parallel (parSequence)
import Data.Array as Arr
import Effect.Aff (Aff)

parLimitSequence :: forall a. Int -> Array (Aff a) -> Aff (Array a)
parLimitSequence limit actions
  | Arr.null actions = pure []
  | otherwise =
    let
      batch = Arr.take limit actions
      pending = Arr.drop limit actions
    in
      append <$> parSequence batch <*> parLimitSequence limit pending
2 Likes

I usually just do this ad-hoc, but I suppose it should be in some sort of aff-utils like library. This will work for any Traversable, by using an AVar as a lock/queue.

It probably warrants some additional exception safety, but it gets the job done.

2 Likes