{-# Language BlockArguments, ScopedTypeVariables, LambdaCase #-}
module Hookup.Concurrent (concurrentAttempts) where
import Control.Concurrent (forkIO, throwTo)
import Control.Concurrent.Async (Async, AsyncCancelled(..), async, asyncThreadId, cancel, waitCatch, waitCatchSTM)
import Control.Concurrent.STM (STM, atomically, check, orElse, readTVar, registerDelay, retry)
import Control.Exception (SomeException, finally, mask_, onException)
import Control.Monad (join, void)
import Data.Foldable (for_)
concurrentAttempts ::
Int ->
(a -> IO ()) ->
[IO a] ->
IO (Either [SomeException] a)
concurrentAttempts :: forall a.
Int -> (a -> IO ()) -> [IO a] -> IO (Either [SomeException] a)
concurrentAttempts Int
delay a -> IO ()
release [IO a]
actions =
IO (Either [SomeException] a) -> IO (Either [SomeException] a)
forall a. IO a -> IO a
mask_ (St a -> IO (Either [SomeException] a)
forall a. St a -> Answer a
loop St{
threads :: [Async a]
threads = [],
errors :: [SomeException]
errors = [],
work :: [IO a]
work = [IO a]
actions,
delay :: Int
delay = Int
delay,
clean :: a -> IO ()
clean = a -> IO ()
release,
readySTM :: STM ()
readySTM = STM ()
forall a. STM a
retry })
data St a = St
{ forall a. St a -> [Async a]
threads :: [Async a]
, forall a. St a -> [SomeException]
errors :: [SomeException]
, forall a. St a -> [IO a]
work :: [IO a]
, forall a. St a -> Int
delay :: !Int
, forall a. St a -> a -> IO ()
clean :: a -> IO ()
, forall a. St a -> STM ()
readySTM :: STM ()
}
type Answer a = IO (Either [SomeException] a)
loop :: forall a. St a -> Answer a
loop :: forall a. St a -> Answer a
loop St a
st
| [Async a] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null (St a -> [Async a]
forall a. St a -> [Async a]
threads St a
st) = St a -> Answer a
forall a. St a -> Answer a
nothingRunning St a
st
| Bool
otherwise = St a -> Answer a
forall a. St a -> Answer a
waitForEvent St a
st
nothingRunning :: St a -> Answer a
nothingRunning :: forall a. St a -> Answer a
nothingRunning St a
st =
case St a -> [IO a]
forall a. St a -> [IO a]
work St a
st of
[] -> Either [SomeException] a -> Answer a
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([SomeException] -> Either [SomeException] a
forall a b. a -> Either a b
Left (St a -> [SomeException]
forall a. St a -> [SomeException]
errors St a
st))
IO a
x:[IO a]
xs -> IO a -> St a -> Answer a
forall a. IO a -> St a -> Answer a
start IO a
x St a
st{work = xs}
start :: IO a -> St a -> Answer a
start :: forall a. IO a -> St a -> Answer a
start IO a
io St a
st =
do thread <- IO a -> IO (Async a)
forall a. IO a -> IO (Async a)
async IO a
io
ready <- if null (work st) then pure retry else startTimer (delay st)
loop st{ threads = thread : threads st, readySTM = ready }
waitForEvent :: St a -> Answer a
waitForEvent :: forall a. St a -> Answer a
waitForEvent St a
st =
IO (IO (Either [SomeException] a)) -> IO (Either [SomeException] a)
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (STM (IO (Either [SomeException] a))
-> IO (IO (Either [SomeException] a))
forall a. STM a -> IO a
atomically (St a
-> [Async a] -> [Async a] -> STM (IO (Either [SomeException] a))
forall a. St a -> [Async a] -> [Async a] -> STM (Answer a)
finish St a
st [] (St a -> [Async a]
forall a. St a -> [Async a]
threads St a
st))
IO (IO (Either [SomeException] a))
-> IO () -> IO (IO (Either [SomeException] a))
forall a b. IO a -> IO b -> IO a
`onException` (a -> IO ()) -> [Async a] -> IO ()
forall a. (a -> IO ()) -> [Async a] -> IO ()
cleanup (St a -> a -> IO ()
forall a. St a -> a -> IO ()
clean St a
st) (St a -> [Async a]
forall a. St a -> [Async a]
threads St a
st))
finish :: St a -> [Async a] -> [Async a] -> STM (Answer a)
finish :: forall a. St a -> [Async a] -> [Async a] -> STM (Answer a)
finish St a
st [Async a]
threads' = \case
[] -> St a -> STM (Answer a)
forall a. St a -> STM (Answer a)
fresh St a
st
Async a
t:[Async a]
ts -> St a -> [Async a] -> Async a -> STM (Answer a)
forall a. St a -> [Async a] -> Async a -> STM (Answer a)
finish1 St a
st ([Async a]
threads' [Async a] -> [Async a] -> [Async a]
forall a. [a] -> [a] -> [a]
++ [Async a]
ts) Async a
t STM (Answer a) -> STM (Answer a) -> STM (Answer a)
forall a. STM a -> STM a -> STM a
`orElse` St a -> [Async a] -> [Async a] -> STM (Answer a)
forall a. St a -> [Async a] -> [Async a] -> STM (Answer a)
finish St a
st (Async a
tAsync a -> [Async a] -> [Async a]
forall a. a -> [a] -> [a]
:[Async a]
threads') [Async a]
ts
finish1 :: St a -> [Async a] -> Async a -> STM (Answer a)
finish1 :: forall a. St a -> [Async a] -> Async a -> STM (Answer a)
finish1 St a
st [Async a]
threads' Async a
t =
do res <- Async a -> STM (Either SomeException a)
forall a. Async a -> STM (Either SomeException a)
waitCatchSTM Async a
t
pure case res of
Right a
s -> a -> Either [SomeException] a
forall a b. b -> Either a b
Right a
s Either [SomeException] a -> IO () -> Answer a
forall a b. a -> IO b -> IO a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ (a -> IO ()) -> [Async a] -> IO ()
forall a. (a -> IO ()) -> [Async a] -> IO ()
cleanup (St a -> a -> IO ()
forall a. St a -> a -> IO ()
clean St a
st) [Async a]
threads'
Left SomeException
e -> St a -> Answer a
forall a. St a -> Answer a
loop St a
st{ errors = e : errors st, threads = threads' }
fresh :: St a -> STM (Answer a)
fresh :: forall a. St a -> STM (Answer a)
fresh St a
st =
case St a -> [IO a]
forall a. St a -> [IO a]
work St a
st of
[] -> STM (Answer a)
forall a. STM a
retry
IO a
x:[IO a]
xs -> IO a -> St a -> Answer a
forall a. IO a -> St a -> Answer a
start IO a
x St a
st{work = xs} Answer a -> STM () -> STM (Answer a)
forall a b. a -> STM b -> STM a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ St a -> STM ()
forall a. St a -> STM ()
readySTM St a
st
startTimer :: Int -> IO (STM ())
startTimer :: Int -> IO (STM ())
startTimer Int
n =
do v <- Int -> IO (TVar Bool)
registerDelay Int
n
pure (check =<< readTVar v)
cleanup :: (a -> IO ()) -> [Async a] -> IO ()
cleanup :: forall a. (a -> IO ()) -> [Async a] -> IO ()
cleanup a -> IO ()
release [Async a]
xs =
IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> IO ThreadId -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO ThreadId
forkIO
do [Async a] -> (Async a -> IO ()) -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ [Async a]
xs \Async a
x -> ThreadId -> AsyncCancelled -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
throwTo (Async a -> ThreadId
forall a. Async a -> ThreadId
asyncThreadId Async a
x) AsyncCancelled
AsyncCancelled
[Async a] -> (Async a -> IO ()) -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ [Async a]
xs \Async a
x ->
do res <- Async a -> IO (Either SomeException a)
forall a. Async a -> IO (Either SomeException a)
waitCatch Async a
x
for_ res release