Trying to kill an aff and have the computation exit speedily

Hey all!

I’m trying to find a way to kill off an asynchronous computation as fast as possible with killFiber and all of my attempts are coming up short: they all seem to drag on, as evidenced by the timestamp in the logs below. I’m wondering if there is a a way to get killFiber to exit immediately?

module Main where

import Prelude

import Control.Alternative (class Alt, class Alternative, class Plus, alt, empty)
import Control.Monad.State (StateT(..), lift, runState, runStateT)
import Data.Array (length, replicate)
import Data.Either (Either(..), either)
import Data.Foldable (foldl, for_)
import Data.Identity (Identity(..))
import Data.Int (toNumber)
import Data.JSDate (getTime, now)
import Data.List as L
import Data.Traversable (class Foldable, class Traversable, foldMapDefaultR, sequence, traverseDefault)
import Data.Traversable (sequence)
import Data.Tuple (Tuple(..))
import Effect (Effect)
import Effect.Aff (Aff, Milliseconds(..), cancelWith, delay, error, forkAff, joinFiber, killFiber, launchAff_)
import Effect.Class (liftEffect)
import Effect.Console (log)
import Effect.Ref (Ref, modify_, new, read, write)

data Thunkable a
  = Here a
  | Wait (Unit -> Thunkable a)

isWait :: forall a. Thunkable a -> Boolean
isWait = case _ of
  Wait x -> true
  Here x -> false

isHere :: forall a. Thunkable a -> Boolean
isHere = case _ of
  Wait x -> false
  Here x -> true

runThunkable :: forall a. Thunkable a -> a
runThunkable (Here a) = a

runThunkable (Wait f) = runThunkable (f unit)

runThunkableWithCount :: forall a. Thunkable a -> Tuple Int a
runThunkableWithCount (Here a) = Tuple 0 a

runThunkableWithCount (Wait f) = Tuple (x + 1) y
  where
  Tuple x y = runThunkableWithCount (f unit)

thunkThunkable :: forall a. Thunkable a -> Thunkable a
thunkThunkable (Here a) = Here a

thunkThunkable (Wait f) = f unit

monadifyThunkable :: forall m. Monad m => Thunkable ~> m
monadifyThunkable = intercalateThunkable (pure unit)

affifyThunkable :: forall a. Ref Boolean -> Thunkable a -> Aff (Either Unit a)
affifyThunkable r t = case (thunkThunkable t) of
  Here a -> pure $ Right a
  Wait f -> do
    b <- liftEffect $ read r
    if (not b) then pure (Left unit) else do
      delay (Milliseconds 0.01)
      affifyThunkable r (f unit)

intercalateThunkable :: forall m. Monad m => m Unit -> Thunkable ~> m
intercalateThunkable m t = case (thunkThunkable t) of
  Here a -> pure a
  Wait f -> m >>= \_ -> intercalateThunkable m (f unit)

instance semigroupThunkable :: Semigroup a => Semigroup (Thunkable a) where
  append (Here a) (Here b) = Here (a <> b)
  append (Here f) (Wait fa) = Wait (const $ append (pure f) (fa unit))
  append (Wait ff) (Here a) = Wait (const $ append (ff unit) (pure a))
  append (Wait ff) (Wait fa) = Wait (const $ append (ff unit) (Wait fa))

instance foldableThunkable :: Foldable Thunkable where
  foldl bab b = bab b <<< runThunkable
  foldr abb b a = abb (runThunkable a) b
  foldMap = foldMapDefaultR

instance traversableThunkable :: Traversable Thunkable where
  traverse = traverseDefault
  sequence (Here ma) = map Here ma
  sequence (Wait fma) = sequence (fma unit)

instance monoidThunkable :: Monoid a => Monoid (Thunkable a) where
  mempty = Here mempty

instance semiringThunkable :: Semiring a => Semiring (Thunkable a) where
  zero = Here zero
  one = Here one
  add (Here a) (Here b) = Here (add a b)
  add (Here f) (Wait fa) = Wait (const $ add (pure f) (fa unit))
  add (Wait ff) (Here a) = Wait (const $ add (ff unit) (pure a))
  add (Wait ff) (Wait fa) = Wait (const $ add (ff unit) (Wait fa))
  mul (Here a) (Here b) = Here (mul a b)
  mul (Here f) (Wait fa) = Wait (const $ mul (pure f) (fa unit))
  mul (Wait ff) (Here a) = Wait (const $ mul (ff unit) (pure a))
  mul (Wait ff) (Wait fa) = Wait (const $ mul (ff unit) (Wait fa))

instance ringThunkable :: Ring a => Ring (Thunkable a) where
  sub (Here a) (Here b) = Here (sub a b)
  sub (Here f) (Wait fa) = Wait (const $ sub (pure f) (fa unit))
  sub (Wait ff) (Here a) = Wait (const $ sub (ff unit) (pure a))
  sub (Wait ff) (Wait fa) = Wait (const $ sub (ff unit) (Wait fa))

instance functorThunkable :: Functor Thunkable where
  map f = case _ of
    Here a -> Here (f a)
    Wait fa -> Wait ((map <<< map) f fa)

instance applyThunkable :: Apply Thunkable where
  apply (Here f) (Here a) = Here (f a)
  apply (Here f) (Wait fa) = Wait (const $ apply (pure f) (fa unit))
  apply (Wait ff) (Here a) = Wait (const $ apply (ff unit) (pure a))
  apply (Wait ff) (Wait fa) = Wait (const $ apply (ff unit) (Wait fa))

instance applicativeThunkable :: Applicative Thunkable where
  pure a = Here a

instance bindThunkable :: Bind Thunkable where
  bind (Here a) fmb = fmb a
  bind (Wait fa) fmb = Wait (const $ bind (fa unit) fmb)

instance monadThunkable :: Monad Thunkable

instance altThunkable :: Alt Thunkable where
  alt (Here a) (Here _) = Here a
  alt (Here a) (Wait _) = Here a
  alt (Wait _) (Here a) = Here a
  alt (Wait a) (Wait b) = Wait (\_ -> alt (a unit) (b unit))

instance plusThunkable :: Plus Thunkable where
  empty = Wait (\_ -> empty)

instance alternativeThunkable :: Alternative Thunkable

class Waitable f where
  wait :: forall a. a -> f a

instance waitableThunkable :: Waitable Thunkable where
  wait = Wait <<< pure <<< pure

instance waitableIdentity :: Waitable Identity where
  wait = pure

instance waitableAff :: Waitable Aff where
  wait = pure

instance waitableStateTI :: Waitable (StateT s Identity) where
  wait = lift <<< wait

instance waitableStateTT :: Waitable (StateT s Thunkable) where
  wait = lift <<< wait

instance waitableStateTA :: Waitable (StateT s Aff) where
  wait = lift <<< wait

z n = go (L.fromFoldable $ join (replicate n [ wait 10, pure 10 ]))
  where
  go L.Nil = pure 0
  go (L.Cons a b) = do
    a' <- a
    b' <- go b
    pure $ a' + b'

--b n = foldl (\b a -> (+) <$> b <*> (a $> (foldl (\b a -> b + a) 0 (replicate 100000 10)))) (pure 0) (L.fromFoldable $ join (replicate n [ wait unit, pure unit ]))

b n = go (L.fromFoldable $ join (replicate n [ wait 10, pure 10 ]))
  where
  go L.Nil = pure 0
  go (L.Cons a b) = do
    b' <- go b
    a' <- (a $> (foldl (\b a -> b + a) 0 (replicate 100000 10)))
    pure $ a' + b'

singleLoop :: String -> Aff Unit -> Aff Unit
singleLoop name affUnit = do
  tm <- liftEffect $ (getTime <$> now)
  tref <- liftEffect $ new tm
  avg <- liftEffect $ new ([] :: Array Number)
  for_ (replicate 1000 1) \_ -> do
    affUnit
    rf <- liftEffect $ read tref
    ntm <- liftEffect $ (getTime <$> now)
    liftEffect
      $ do
          modify_ (append (pure (ntm - rf))) avg
          modify_ (const ntm) tref
  a <- liftEffect $ read avg
  liftEffect $ log $ "Testing loop: " <> name <> ", average run time: " <> show ((foldl (+) 0.0 a) / (toNumber $ length a))

singleCancel :: Aff Unit -> Aff Unit
singleCancel affUnit = do
  forked <- forkAff affUnit
  killFiber (error "killing") forked

thunkCancel :: Ref Boolean -> Aff Unit -> Aff Unit
thunkCancel rb affUnit = do
  liftEffect $ write false rb
  forked <- forkAff affUnit
  joinFiber forked

main :: Effect Unit
main =
  launchAff_ do
    singleLoop "state" (void $ pure (runState (z 500) unit))
    singleLoop "aff" (void (z 500))
    singleLoop "stateTAff" (void (runStateT (z 500) unit))
    singleLoop "thunkable" (void $ monadifyThunkable (z 500))
    tm <- liftEffect $ (getTime <$> now)
    _ <- singleCancel (void $ pure (runState (b 1000) unit))
    ntm <- liftEffect $ (getTime <$> now)
    liftEffect $ log $ "Testing cancellation: state, took this long to cancel: " <> show (ntm - tm)
    tm <- liftEffect $ (getTime <$> now)
    _ <- singleCancel (void (b 1000))
    ntm <- liftEffect $ (getTime <$> now)
    liftEffect $ log $ "Testing cancellation: aff, took this long to cancel: " <> show (ntm - tm)
    tm <- liftEffect $ (getTime <$> now)
    _ <- singleCancel (void (runStateT (b 1000) unit))
    ntm <- liftEffect $ (getTime <$> now)
    liftEffect $ log $ "Testing cancellation: stateTAff, took this long to cancel: " <> show (ntm - tm)
    tm <- liftEffect $ (getTime <$> now)
    _ <- singleCancel (void $ monadifyThunkable (b 1000))
    ntm <- liftEffect $ (getTime <$> now)
    liftEffect $ log $ "Testing cancellation: thunkable, took this long to cancel: " <> show (ntm - tm)
    tm <- liftEffect $ (getTime <$> now)
    rf <- liftEffect $ new true
    _ <- thunkCancel rf (either identity (const unit) <$> affifyThunkable rf (b 1000))
    ntm <- liftEffect $ (getTime <$> now)
    liftEffect $ log $ "Testing cancellation: thunk cancel, took this long to cancel: " <> show (ntm - tm)

yields

10:25 meeshkan-abel@Abel:/tmp/sumew-rm$ npx spago run
purs compile: No files found using pattern: test/**/*.purs
[info] Build succeeded.
Testing loop: state, average run time: 0.015
Testing loop: aff, average run time: 0.286
Testing loop: stateTAff, average run time: 0.544
Testing loop: thunkable, average run time: 0.037
Testing cancellation: state, took this long to cancel: 6803.0
Testing cancellation: aff, took this long to cancel: 6251.0
Testing cancellation: stateTAff, took this long to cancel: 6250.0
Testing cancellation: thunkable, took this long to cancel: 7064.0
Testing cancellation: thunk cancel, took this long to cancel: 7186.0

I would have thought that in some of the cases the cancellation would have been on the order of ~1ms, but it is consistently 6-7 seconds and I’m trying to get it down to ~1-2 ms tops. Thanks in advance for any tips!

P.S. I’ll likely replace my hand-rolled Thunkable with Trampoline, but I believe the problem is the same with Trampoline.

1 Like

Are you sure that’s the time to cancel and not the time to build the very large Aff structure in b?

Yup, it’s that. Building the aff outside of the timstamp makes the last one instantaneous, which is what I wanted.

Btw I’m going to try to reimplement this using Trampoline: I don’t quite have it worked out how to get all the instances with newtype, but as soon as I’m there, I’ll sub it in.

module Main where

import Prelude

import Control.Alternative (class Alt, class Alternative, class Plus, alt, empty)
import Control.Monad.State (StateT(..), lift, runState, runStateT)
import Data.Array (length, replicate)
import Data.Either (Either(..), either)
import Data.Foldable (foldl, for_)
import Data.Identity (Identity(..))
import Data.Int (toNumber)
import Data.JSDate (getTime, now)
import Data.List as L
import Data.Traversable (class Foldable, class Traversable, foldMapDefaultR, sequence, traverseDefault)
import Data.Traversable (sequence)
import Data.Tuple (Tuple(..))
import Effect (Effect)
import Effect.Aff (Aff, Milliseconds(..), cancelWith, delay, error, forkAff, joinFiber, killFiber, launchAff_)
import Effect.Class (liftEffect)
import Effect.Console (log)
import Effect.Ref (Ref, modify_, new, read, write)

data Thunkable a
  = Here a
  | Wait (Unit -> Thunkable a)

isWait :: forall a. Thunkable a -> Boolean
isWait = case _ of
  Wait x -> true
  Here x -> false

isHere :: forall a. Thunkable a -> Boolean
isHere = case _ of
  Wait x -> false
  Here x -> true

runThunkable :: forall a. Thunkable a -> a
runThunkable (Here a) = a

runThunkable (Wait f) = runThunkable (f unit)

runThunkableWithCount :: forall a. Thunkable a -> Tuple Int a
runThunkableWithCount (Here a) = Tuple 0 a

runThunkableWithCount (Wait f) = Tuple (x + 1) y
  where
  Tuple x y = runThunkableWithCount (f unit)

thunkThunkable :: forall a. Thunkable a -> Thunkable a
thunkThunkable (Here a) = Here a

thunkThunkable (Wait f) = f unit

monadifyThunkable :: forall m. Monad m => Thunkable ~> m
monadifyThunkable = intercalateThunkable (pure unit)

affifyThunkable :: forall a. Ref Boolean -> Thunkable a -> Aff (Either Unit a)
affifyThunkable r t = case (thunkThunkable t) of
  Here a -> pure $ Right a
  Wait f -> do
    b <- liftEffect $ read r
    if (not b) then pure (Left unit) else do
      delay (Milliseconds 0.01)
      affifyThunkable r (f unit)

intercalateThunkable :: forall m. Monad m => m Unit -> Thunkable ~> m
intercalateThunkable m t = case (thunkThunkable t) of
  Here a -> pure a
  Wait f -> m >>= \_ -> intercalateThunkable m (f unit)

instance semigroupThunkable :: Semigroup a => Semigroup (Thunkable a) where
  append (Here a) (Here b) = Here (a <> b)
  append (Here f) (Wait fa) = Wait (const $ append (pure f) (fa unit))
  append (Wait ff) (Here a) = Wait (const $ append (ff unit) (pure a))
  append (Wait ff) (Wait fa) = Wait (const $ append (ff unit) (Wait fa))

instance foldableThunkable :: Foldable Thunkable where
  foldl bab b = bab b <<< runThunkable
  foldr abb b a = abb (runThunkable a) b
  foldMap = foldMapDefaultR

instance traversableThunkable :: Traversable Thunkable where
  traverse = traverseDefault
  sequence (Here ma) = map Here ma
  sequence (Wait fma) = sequence (fma unit)

instance monoidThunkable :: Monoid a => Monoid (Thunkable a) where
  mempty = Here mempty

instance semiringThunkable :: Semiring a => Semiring (Thunkable a) where
  zero = Here zero
  one = Here one
  add (Here a) (Here b) = Here (add a b)
  add (Here f) (Wait fa) = Wait (const $ add (pure f) (fa unit))
  add (Wait ff) (Here a) = Wait (const $ add (ff unit) (pure a))
  add (Wait ff) (Wait fa) = Wait (const $ add (ff unit) (Wait fa))
  mul (Here a) (Here b) = Here (mul a b)
  mul (Here f) (Wait fa) = Wait (const $ mul (pure f) (fa unit))
  mul (Wait ff) (Here a) = Wait (const $ mul (ff unit) (pure a))
  mul (Wait ff) (Wait fa) = Wait (const $ mul (ff unit) (Wait fa))

instance ringThunkable :: Ring a => Ring (Thunkable a) where
  sub (Here a) (Here b) = Here (sub a b)
  sub (Here f) (Wait fa) = Wait (const $ sub (pure f) (fa unit))
  sub (Wait ff) (Here a) = Wait (const $ sub (ff unit) (pure a))
  sub (Wait ff) (Wait fa) = Wait (const $ sub (ff unit) (Wait fa))

instance functorThunkable :: Functor Thunkable where
  map f = case _ of
    Here a -> Here (f a)
    Wait fa -> Wait ((map <<< map) f fa)

instance applyThunkable :: Apply Thunkable where
  apply (Here f) (Here a) = Here (f a)
  apply (Here f) (Wait fa) = Wait (const $ apply (pure f) (fa unit))
  apply (Wait ff) (Here a) = Wait (const $ apply (ff unit) (pure a))
  apply (Wait ff) (Wait fa) = Wait (const $ apply (ff unit) (Wait fa))

instance applicativeThunkable :: Applicative Thunkable where
  pure a = Here a

instance bindThunkable :: Bind Thunkable where
  bind (Here a) fmb = fmb a
  bind (Wait fa) fmb = Wait (const $ bind (fa unit) fmb)

instance monadThunkable :: Monad Thunkable

instance altThunkable :: Alt Thunkable where
  alt (Here a) (Here _) = Here a
  alt (Here a) (Wait _) = Here a
  alt (Wait _) (Here a) = Here a
  alt (Wait a) (Wait b) = Wait (\_ -> alt (a unit) (b unit))

instance plusThunkable :: Plus Thunkable where
  empty = Wait (\_ -> empty)

instance alternativeThunkable :: Alternative Thunkable

class Waitable f where
  wait :: forall a. a -> f a

instance waitableThunkable :: Waitable Thunkable where
  wait = Wait <<< pure <<< pure

instance waitableIdentity :: Waitable Identity where
  wait = pure

instance waitableAff :: Waitable Aff where
  wait = pure

instance waitableStateTI :: Waitable (StateT s Identity) where
  wait = lift <<< wait

instance waitableStateTT :: Waitable (StateT s Thunkable) where
  wait = lift <<< wait

instance waitableStateTA :: Waitable (StateT s Aff) where
  wait = lift <<< wait

z n = go (L.fromFoldable $ join (replicate n [ wait 10, pure 10 ]))
  where
  go L.Nil = pure 0
  go (L.Cons a b) = do
    a' <- a
    b' <- go b
    pure $ a' + b'

--b n = foldl (\b a -> (+) <$> b <*> (a $> (foldl (\b a -> b + a) 0 (replicate 100000 10)))) (pure 0) (L.fromFoldable $ join (replicate n [ wait unit, pure unit ]))

b n = go (L.fromFoldable $ join (replicate n [ wait 10, pure 10 ]))
  where
  go L.Nil = pure 0
  go (L.Cons a b) = do
    b' <- go b
    a' <- (a $> (foldl (\b a -> b + a) 0 (replicate 100000 10)))
    pure $ a' + b'

singleLoop :: String -> Aff Unit -> Aff Unit
singleLoop name affUnit = do
  tm <- liftEffect $ (getTime <$> now)
  tref <- liftEffect $ new tm
  avg <- liftEffect $ new ([] :: Array Number)
  for_ (replicate 1000 1) \_ -> do
    affUnit
    rf <- liftEffect $ read tref
    ntm <- liftEffect $ (getTime <$> now)
    liftEffect
      $ do
          modify_ (append (pure (ntm - rf))) avg
          modify_ (const ntm) tref
  a <- liftEffect $ read avg
  liftEffect $ log $ "Testing loop: " <> name <> ", average run time: " <> show ((foldl (+) 0.0 a) / (toNumber $ length a))

singleCancel :: Aff Unit -> Aff Unit
singleCancel affUnit = do
  forked <- forkAff affUnit
  killFiber (error "killing") forked

thunkCancel :: Ref Boolean -> Aff Unit -> Aff Unit
thunkCancel rb affUnit = do
  liftEffect $ write false rb
  forked <- forkAff affUnit
  joinFiber forked

main :: Effect Unit
main =
  launchAff_ do
    singleLoop "state" (void $ pure (runState (z 500) unit))
    singleLoop "aff" (void (z 500))
    singleLoop "stateTAff" (void (runStateT (z 500) unit))
    singleLoop "thunkable" (void $ monadifyThunkable (z 500))
    tm <- liftEffect $ (getTime <$> now)
    _ <- singleCancel (void $ pure (runState (b 1000) unit))
    ntm <- liftEffect $ (getTime <$> now)
    liftEffect $ log $ "Testing cancelation: state, took this long to cancel: " <> show (ntm - tm)
    let x = (b 1000)
    tm <- liftEffect $ (getTime <$> now)
    _ <- singleCancel (void x)
    ntm <- liftEffect $ (getTime <$> now)
    liftEffect $ log $ "Testing cancelation: aff, took this long to cancel: " <> show (ntm - tm)
    let x = (b 1000)
    tm <- liftEffect $ (getTime <$> now)
    _ <- singleCancel (void (runStateT x unit))
    ntm <- liftEffect $ (getTime <$> now)
    liftEffect $ log $ "Testing cancelation: stateTAff, took this long to cancel: " <> show (ntm - tm)
    tm <- liftEffect $ (getTime <$> now)
    let x = (b 1000)
    _ <- singleCancel (void $ monadifyThunkable x)
    ntm <- liftEffect $ (getTime <$> now)
    liftEffect $ log $ "Testing cancelation: thunkable, took this long to cancel: " <> show (ntm - tm)
    let x = (b 1000)
    tm <- liftEffect $ (getTime <$> now)
    rf <- liftEffect $ new true
    _ <- thunkCancel rf (either identity (const unit) <$> affifyThunkable rf x)
    ntm <- liftEffect $ (getTime <$> now)
    liftEffect $ log $ "Testing cancelation: thunk cancel, took this long to cancel: " <> show (ntm - tm)
10:29 meeshkan-abel@Abel:/tmp/sumew-rm$ npx spago run
purs compile: No files found using pattern: test/**/*.purs
[info] Build succeeded.
Testing loop: state, average run time: 0.015
Testing loop: aff, average run time: 0.282
Testing loop: stateTAff, average run time: 0.538
Testing loop: thunkable, average run time: 0.037
Testing cancelation: state, took this long to cancel: 6231.0
Testing cancelation: aff, took this long to cancel: 6164.0
Testing cancelation: stateTAff, took this long to cancel: 6263.0
Testing cancelation: thunkable, took this long to cancel: 7028.0
Testing cancelation: thunk cancel, took this long to cancel: 0.0