{-# LANGUAGE ScopedTypeVariables #-}
module General.Thread(
withThreadsBoth,
withThreadSlave,
allocateThread,
Thread, newThreadFinally, stopThreads
) where
import General.Cleanup
import Data.Hashable
import Control.Concurrent.Extra
import Control.Exception
import General.Extra
import Control.Monad.Extra
data Thread = Thread ThreadId (Barrier ())
instance Eq Thread where
Thread ThreadId
a Barrier ()
_ == :: Thread -> Thread -> Bool
== Thread ThreadId
b Barrier ()
_ = ThreadId
a ThreadId -> ThreadId -> Bool
forall a. Eq a => a -> a -> Bool
== ThreadId
b
instance Hashable Thread where
hashWithSalt :: Int -> Thread -> Int
hashWithSalt Int
salt (Thread ThreadId
a Barrier ()
_) = Int -> ThreadId -> Int
forall a. Hashable a => Int -> a -> Int
hashWithSalt Int
salt ThreadId
a
newThreadFinally :: IO a -> (Thread -> Either SomeException a -> IO ()) -> IO Thread
newThreadFinally :: forall a.
IO a -> (Thread -> Either SomeException a -> IO ()) -> IO Thread
newThreadFinally IO a
act Thread -> Either SomeException a -> IO ()
cleanup = do
bar <- IO (Barrier ())
forall a. IO (Barrier a)
newBarrier
t <- mask_ $ forkIOWithUnmask $ \forall a. IO a -> IO a
unmask -> (IO () -> IO () -> IO ()) -> IO () -> IO () -> IO ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO a
finally (Barrier () -> () -> IO ()
forall a. Partial => Barrier a -> a -> IO ()
signalBarrier Barrier ()
bar ()) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
res <- IO a -> IO (Either SomeException a)
forall e a. Exception e => IO a -> IO (Either e a)
try (IO a -> IO (Either SomeException a))
-> IO a -> IO (Either SomeException a)
forall a b. (a -> b) -> a -> b
$ IO a -> IO a
forall a. IO a -> IO a
unmask IO a
act
me <- myThreadId
cleanup (Thread me bar) res
pure $ Thread t bar
stopThreads :: [Thread] -> IO ()
stopThreads :: [Thread] -> IO ()
stopThreads [Thread]
threads = do
bars <- [IO (Barrier ())] -> IO [Barrier ()]
forall (t :: * -> *) (m :: * -> *) a.
(Traversable t, Monad m) =>
t (m a) -> m (t a)
forall (m :: * -> *) a. Monad m => [m a] -> m [a]
sequence [do IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ ThreadId -> IO ()
killThread ThreadId
t; Barrier () -> IO (Barrier ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Barrier ()
bar | Thread ThreadId
t Barrier ()
bar <- [Thread]
threads]
mapM_ waitBarrier bars
withThreadsBoth :: IO a -> IO b -> IO (a, b)
withThreadsBoth :: forall a b. IO a -> IO b -> IO (a, b)
withThreadsBoth IO a
act1 IO b
act2 = do
bar1 <- IO (Barrier (Either SomeException a))
forall a. IO (Barrier a)
newBarrier
bar2 <- newBarrier
parent <- myThreadId
ignore <- newVar False
mask $ \forall a. IO a -> IO a
unmask -> do
t1 <- ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
forkIOWithUnmask (((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId)
-> ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
unmask -> do
res1 :: Either SomeException a <- IO a -> IO (Either SomeException a)
forall e a. Exception e => IO a -> IO (Either e a)
try (IO a -> IO (Either SomeException a))
-> IO a -> IO (Either SomeException a)
forall a b. (a -> b) -> a -> b
$ IO a -> IO a
forall a. IO a -> IO a
unmask IO a
act1
unlessM (readVar ignore) $ whenLeft res1 $ throwTo parent
signalBarrier bar1 res1
t2 <- forkIOWithUnmask $ \forall a. IO a -> IO a
unmask -> do
res2 :: Either SomeException b <- IO b -> IO (Either SomeException b)
forall e a. Exception e => IO a -> IO (Either e a)
try (IO b -> IO (Either SomeException b))
-> IO b -> IO (Either SomeException b)
forall a b. (a -> b) -> a -> b
$ IO b -> IO b
forall a. IO a -> IO a
unmask IO b
act2
unlessM (readVar ignore) $ whenLeft res2 $ throwTo parent
signalBarrier bar2 res2
res :: Either SomeException (a,b) <- try $ unmask $ do
Right v1 <- waitBarrier bar1
Right v2 <- waitBarrier bar2
pure (v1,v2)
writeVar ignore True
killThread t1
forkIO $ killThread t2
waitBarrier bar1
waitBarrier bar2
either throwIO pure res
withThreadSlave :: IO () -> IO a -> IO a
withThreadSlave :: forall a. IO () -> IO a -> IO a
withThreadSlave IO ()
slave IO a
act = (Cleanup -> IO a) -> IO a
forall a. (Cleanup -> IO a) -> IO a
withCleanup ((Cleanup -> IO a) -> IO a) -> (Cleanup -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \Cleanup
cleanup -> do
Cleanup -> IO () -> IO ()
allocateThread Cleanup
cleanup IO ()
slave
IO a
act
allocateThread :: Cleanup -> IO () -> IO ()
allocateThread :: Cleanup -> IO () -> IO ()
allocateThread Cleanup
cleanup IO ()
act = do
bar <- IO (Barrier ())
forall a. IO (Barrier a)
newBarrier
parent <- myThreadId
ignore <- newVar False
void $ allocate cleanup
(mask_ $ forkIOWithUnmask $ \forall a. IO a -> IO a
unmask -> do
res :: Either SomeException () <- IO () -> IO (Either SomeException ())
forall e a. Exception e => IO a -> IO (Either e a)
try (IO () -> IO (Either SomeException ()))
-> IO () -> IO (Either SomeException ())
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
forall a. IO a -> IO a
unmask IO ()
act
unlessM (readVar ignore) $ whenLeft res $ throwTo parent
signalBarrier bar ()
)
(\ThreadId
t -> do Var Bool -> Bool -> IO ()
forall a. Var a -> a -> IO ()
writeVar Var Bool
ignore Bool
True; ThreadId -> IO ()
killThread ThreadId
t; Barrier () -> IO ()
forall a. Barrier a -> IO a
waitBarrier Barrier ()
bar)