> {-# LANGUAGE DeriveDataTypeable #-}
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> module Control.Concurrent.MSemN
> (MSemN
> ,new
> ,with
> ,wait
> ,signal
> ,withF
> ,waitF
> ,signalF
> ,peekAvail
> ) where
>
> import Prelude( Integral,Eq,IO,Int,Integer,Maybe(Just,Nothing),Num((+),(-)),Bool(False,True)
> , return,const,fmap,snd,maybe,seq
> , (.),(<=),($),($!) )
> import Control.Concurrent.MVar( MVar
> , withMVar,modifyMVar,modifyMVar_,newMVar
> , newEmptyMVar,tryPutMVar,takeMVar,tryTakeMVar )
> import Control.Exception(bracket,bracket_,uninterruptibleMask_,onException,evaluate,mask_)
> import Control.Monad(when)
> import Data.Typeable(Typeable)
> import Data.Word(Word)
>
The only MVars allocated are the three created be 'new'. Their three roles are
1) to have a FIFO queue of waiters (queueWait)
2) for the head waiter to block on, if necessary (headWait)
3) to protect the actual state of the semaphore (quantityStore)
>
> data MS i = MS { forall i. MS i -> i
avail :: !i
> , forall i. MS i -> Maybe i
headWants :: !(Maybe i)
> }
> deriving (MS i -> MS i -> Bool
(MS i -> MS i -> Bool) -> (MS i -> MS i -> Bool) -> Eq (MS i)
forall i. Eq i => MS i -> MS i -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: MS i -> MS i -> Bool
$c/= :: forall i. Eq i => MS i -> MS i -> Bool
== :: MS i -> MS i -> Bool
$c== :: forall i. Eq i => MS i -> MS i -> Bool
Eq,Typeable)
>
>
> data MSemN i = MSemN { forall i. MSemN i -> MVar (MS i)
quantityStore :: !(MVar (MS i))
> , forall i. MSemN i -> MVar ()
queueWait :: !(MVar ())
> , forall i. MSemN i -> MVar ()
headWait :: !(MVar ())
> }
> deriving (MSemN i -> MSemN i -> Bool
(MSemN i -> MSemN i -> Bool)
-> (MSemN i -> MSemN i -> Bool) -> Eq (MSemN i)
forall i. MSemN i -> MSemN i -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: MSemN i -> MSemN i -> Bool
$c/= :: forall i. MSemN i -> MSemN i -> Bool
== :: MSemN i -> MSemN i -> Bool
$c== :: forall i. MSemN i -> MSemN i -> Bool
Eq,Typeable)
The data structure for 'MSemN' is slightly more complicated than the one in 'MSem'. Here the
quantityStore holds not just a value of type 'i' but also a 'Maybe i' called 'headWants'.
'headWants' is Nothing when there are no blocked threads waiting on quantity. 'headWants' is (Just
x) when there is at least one blocked thread and the head of the queue needs positive quantity x to
proceed.
There are two possible lifecycles of a wait request. Like in MSem, all waiters do all work while
holding queueWait. This is what forces the waiters into a FIFO order.
The first is when the waiter gets to head of the queue and finds that the quantityStore has enough
in 'avail' to be satisfied. This waiter subtracts its wanted value from 'avail' and returns.
The second is when the waiter does not find a larger enough value in 'avail' must block. It sets
headWants from Nothing to 'Just wanted' and then releases quantityStore, followed by blocked in
headWait. When a signal arrives that puts the available quantity above the value in 'headWants'
then it puts () into 'headWait' to wake the blocked waiting thread. Here the subtraction of the
value in 'Just wanted' from the available quantity is handled by the signalling thread.
The difficulty is maintaining the desired invariants in the face of exceptions. If a frustrated
waiter dies before the 'takeMVar' on 'headWait' succeeds then the waiter's changes to
'quantityStore' must be undone! This requires the 'uninterruptibleMask_' around the onException
action in 'waitF'.
When the head waiter releases the queueWait MVar, either by succeeding or being interrupted, there
are three invariants:
(wait invariant 1) The headWait MVar must be empty.
(wait invariant 2) The headWants value is Nothing.
This means that when a waiter first acquires the queueWait MVar both the above hold. If the waiter
succeeded then there is a progress invariant:
(wait progress invariant) The value of 'avail' is non-negative when wait succeeds.
When the signal operation release the quantityStore MVar then one of three situations holds:
(signal possibility 1) headWants was Nothing and it and headWait are unchanged, or
(signal possibility 2) headWants was (Just x) and it and headWait are unchanged, or
(signal possibility 3) headWants was (Just x) and is changed to Nothing and headWait has () put into it.
If headWait had () put into it then headWants is Nothing. The only way headWants can change back to
(Just x) is if a new waiter does it. This requires the original waiter to hand over the queueWait
MVar, and we can be certain that (wait invariant 1) means that the () put into headWait is taken out
before this handoff.
Thus when a signal first acquires the quantityStore MVar there is a dynamically maintained invariant:
(signal invariant 1) A signal that finds headWants of (Just x) also finds headWait empty.
Note that a () put into headWait signifies amount: it is worth the quantity x in the (Just x) in
headWants that was just changed to Nothing. After (signal possibility 3) only the receiving waiting
thread knows the amount that this () in headWait represents, and only this thread can fix the MSemN
if an exception occurs. The waitF function below is careful to fix MSemN.
>
>
> new :: Integral i => i -> IO (MSemN i)
> {-# SPECIALIZE new :: Int -> IO (MSemN Int) #-}
> {-# SPECIALIZE new :: Word -> IO (MSemN Word) #-}
> {-# SPECIALIZE new :: Integer -> IO (MSemN Integer) #-}
> new :: forall i. Integral i => i -> IO (MSemN i)
new i
initial = do
> MVar (MS i)
newMS <- MS i -> IO (MVar (MS i))
forall a. a -> IO (MVar a)
newMVar (MS i -> IO (MVar (MS i))) -> MS i -> IO (MVar (MS i))
forall a b. (a -> b) -> a -> b
$! (MS :: forall i. i -> Maybe i -> MS i
MS { avail :: i
avail = i
initial
> , headWants :: Maybe i
headWants = Maybe i
forall a. Maybe a
Nothing })
> MVar ()
newQueueWait <- () -> IO (MVar ())
forall a. a -> IO (MVar a)
newMVar ()
> MVar ()
newHeadWait <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
> MSemN i -> IO (MSemN i)
forall (m :: * -> *) a. Monad m => a -> m a
return (MSemN :: forall i. MVar (MS i) -> MVar () -> MVar () -> MSemN i
MSemN { quantityStore :: MVar (MS i)
quantityStore = MVar (MS i)
newMS
> , queueWait :: MVar ()
queueWait = MVar ()
newQueueWait
> , headWait :: MVar ()
headWait = MVar ()
newHeadWait })
>
>
>
>
> with :: Integral i => MSemN i -> i -> IO a -> IO a
> {-# SPECIALIZE with :: MSemN Int -> Int -> IO a -> IO a #-}
> {-# SPECIALIZE with :: MSemN Word -> Word -> IO a -> IO a #-}
> {-# SPECIALIZE with :: MSemN Integer -> Integer -> IO a -> IO a #-}
> with :: forall i a. Integral i => MSemN i -> i -> IO a -> IO a
with MSemN i
m i
wanted = i -> (IO a -> IO a) -> IO a -> IO a
seq i
wanted ((IO a -> IO a) -> IO a -> IO a) -> (IO a -> IO a) -> IO a -> IO a
forall a b. (a -> b) -> a -> b
$ IO () -> IO () -> IO a -> IO a
forall a b c. IO a -> IO b -> IO c -> IO c
bracket_ (MSemN i -> i -> IO ()
forall i. Integral i => MSemN i -> i -> IO ()
wait MSemN i
m i
wanted) (MSemN i -> i -> IO ()
forall i. Integral i => MSemN i -> i -> IO ()
signal MSemN i
m i
wanted)
>
>
>
>
>
>
>
>
> withF :: Integral i
> => MSemN i
> -> (i -> (i,b))
> -> ((i,b) -> IO a)
> -> IO a
> {-# SPECIALIZE withF :: MSemN Int -> (Int -> (Int,b)) -> ((Int,b) -> IO a) -> IO a #-}
> {-# SPECIALIZE withF :: MSemN Word -> (Word -> (Word,b)) -> ((Word,b) -> IO a) -> IO a #-}
> {-# SPECIALIZE withF :: MSemN Integer -> (Integer -> (Integer,b)) -> ((Integer,b) -> IO a) -> IO a #-}
> withF :: forall i b a.
Integral i =>
MSemN i -> (i -> (i, b)) -> ((i, b) -> IO a) -> IO a
withF MSemN i
m i -> (i, b)
f = IO (i, b) -> ((i, b) -> IO ()) -> ((i, b) -> IO a) -> IO a
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket (MSemN i -> (i -> (i, b)) -> IO (i, b)
forall i b. Integral i => MSemN i -> (i -> (i, b)) -> IO (i, b)
waitF MSemN i
m i -> (i, b)
f) (\(i
wanted,b
_) -> MSemN i -> i -> IO ()
forall i. Integral i => MSemN i -> i -> IO ()
signal MSemN i
m i
wanted)
>
>
>
>
>
>
>
>
>
>
> wait :: Integral i => MSemN i -> i -> IO ()
> {-# SPECIALIZE wait :: MSemN Int -> Int -> IO () #-}
> {-# SPECIALIZE wait :: MSemN Word -> Word -> IO () #-}
> {-# SPECIALIZE wait :: MSemN Integer -> Integer -> IO () #-}
> wait :: forall i. Integral i => MSemN i -> i -> IO ()
wait MSemN i
m i
wanted = i -> IO () -> IO ()
seq i
wanted (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ ((i, ()) -> ()) -> IO (i, ()) -> IO ()
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (i, ()) -> ()
forall a b. (a, b) -> b
snd (IO (i, ()) -> IO ()) -> IO (i, ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ MSemN i -> (i -> (i, ())) -> IO (i, ())
forall i b. Integral i => MSemN i -> (i -> (i, b)) -> IO (i, b)
waitF MSemN i
m ((i, ()) -> i -> (i, ())
forall a b. a -> b -> a
const (i
wanted,()))
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> waitF :: Integral i => MSemN i -> (i -> (i,b)) -> IO (i,b)
> {-# SPECIALIZE waitF :: MSemN Int -> (Int -> (Int,b)) -> IO (Int,b) #-}
> {-# SPECIALIZE waitF :: MSemN Word -> (Word -> (Word,b)) -> IO (Word,b) #-}
> {-# SPECIALIZE waitF :: MSemN Integer -> (Integer -> (Integer,b)) -> IO (Integer,b) #-}
> waitF :: forall i b. Integral i => MSemN i -> (i -> (i, b)) -> IO (i, b)
waitF MSemN i
m i -> (i, b)
f = (i -> (i, b)) -> IO (i, b) -> IO (i, b)
seq i -> (i, b)
f (IO (i, b) -> IO (i, b)) -> IO (i, b) -> IO (i, b)
forall a b. (a -> b) -> a -> b
$ IO (i, b) -> IO (i, b)
forall a. IO a -> IO a
mask_ (IO (i, b) -> IO (i, b))
-> ((() -> IO (i, b)) -> IO (i, b))
-> (() -> IO (i, b))
-> IO (i, b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MVar () -> (() -> IO (i, b)) -> IO (i, b)
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (MSemN i -> MVar ()
forall i. MSemN i -> MVar ()
queueWait MSemN i
m) ((() -> IO (i, b)) -> IO (i, b)) -> (() -> IO (i, b)) -> IO (i, b)
forall a b. (a -> b) -> a -> b
$ \ () -> do
>
> (out :: (i, b)
out@(i
wanted,b
_),Bool
mustWait) <- MVar (MS i)
-> (MS i -> IO (MS i, ((i, b), Bool))) -> IO ((i, b), Bool)
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar (MSemN i -> MVar (MS i)
forall i. MSemN i -> MVar (MS i)
quantityStore MSemN i
m) ((MS i -> IO (MS i, ((i, b), Bool))) -> IO ((i, b), Bool))
-> (MS i -> IO (MS i, ((i, b), Bool))) -> IO ((i, b), Bool)
forall a b. (a -> b) -> a -> b
$ \ MS i
ms -> do
>
> let outVal :: (i, b)
outVal@(i
wantedVal,b
_) = i -> (i, b)
f (MS i -> i
forall i. MS i -> i
avail MS i
ms)
>
>
> if i
wantedVal i -> i -> Bool
forall a. Ord a => a -> a -> Bool
<= MS i -> i
forall i. MS i -> i
avail MS i
ms
> then do
> let avail'down :: i
avail'down = MS i -> i
forall i. MS i -> i
avail MS i
ms i -> i -> i
forall a. Num a => a -> a -> a
- i
wantedVal
> MS i
ms' <- MS i -> IO (MS i)
forall a. a -> IO a
evaluate MS i
ms { avail :: i
avail = i
avail'down }
> (MS i, ((i, b), Bool)) -> IO (MS i, ((i, b), Bool))
forall (m :: * -> *) a. Monad m => a -> m a
return (MS i
ms', ((i, b)
outVal,Bool
False))
> else do
> MS i
ms' <- MS i -> IO (MS i)
forall a. a -> IO a
evaluate MS i
ms { headWants :: Maybe i
headWants = i -> Maybe i
forall a. a -> Maybe a
Just i
wantedVal }
> (MS i, ((i, b), Bool)) -> IO (MS i, ((i, b), Bool))
forall (m :: * -> *) a. Monad m => a -> m a
return (MS i
ms', ((i, b)
outVal,Bool
True))
>
>
>
>
>
> Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
mustWait (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
> let cleanup :: IO ()
cleanup = IO () -> IO ()
forall a. IO a -> IO a
uninterruptibleMask_ (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar (MS i) -> (MS i -> IO (MS i)) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (MSemN i -> MVar (MS i)
forall i. MSemN i -> MVar (MS i)
quantityStore MSemN i
m) ((MS i -> IO (MS i)) -> IO ()) -> (MS i -> IO (MS i)) -> IO ()
forall a b. (a -> b) -> a -> b
$ \MS i
ms -> do
> Maybe ()
recovered <- MVar () -> IO (Maybe ())
forall a. MVar a -> IO (Maybe a)
tryTakeMVar (MSemN i -> MVar ()
forall i. MSemN i -> MVar ()
headWait MSemN i
m)
> let total :: i
total = MS i -> i
forall i. MS i -> i
avail MS i
ms i -> i -> i
forall a. Num a => a -> a -> a
+ i -> (() -> i) -> Maybe () -> i
forall b a. b -> (a -> b) -> Maybe a -> b
maybe i
0 (i -> () -> i
forall a b. a -> b -> a
const i
wanted) Maybe ()
recovered
> MS i -> IO (MS i)
forall a. a -> IO a
evaluate MS :: forall i. i -> Maybe i -> MS i
MS {avail :: i
avail = i
total, headWants :: Maybe i
headWants = Maybe i
forall a. Maybe a
Nothing}
> MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar (MSemN i -> MVar ()
forall i. MSemN i -> MVar ()
headWait MSemN i
m) IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO a
`onException` IO ()
cleanup
> (i, b) -> IO (i, b)
forall (m :: * -> *) a. Monad m => a -> m a
return (i, b)
out
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> signal :: Integral i => MSemN i -> i -> IO ()
> {-# SPECIALIZE signal :: MSemN Int -> Int -> IO () #-}
> {-# SPECIALIZE signal :: MSemN Word -> Word -> IO () #-}
> {-# SPECIALIZE signal :: MSemN Integer -> Integer -> IO () #-}
> signal :: forall i. Integral i => MSemN i -> i -> IO ()
signal MSemN i
_ i
0 = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
> signal MSemN i
m i
size = IO () -> IO ()
forall a. IO a -> IO a
uninterruptibleMask_ (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ ((i, ()) -> ()) -> IO (i, ()) -> IO ()
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (i, ()) -> ()
forall a b. (a, b) -> b
snd (IO (i, ()) -> IO ()) -> IO (i, ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ MSemN i -> (i -> (i, ())) -> IO (i, ())
forall i b. Integral i => MSemN i -> (i -> (i, b)) -> IO (i, b)
signalF MSemN i
m ((i, ()) -> i -> (i, ())
forall a b. a -> b -> a
const (i
size,()))
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> signalF :: Integral i
> => MSemN i
> -> (i -> (i,b))
> -> IO (i,b)
> {-# SPECIALIZE signalF :: MSemN Int -> (Int -> (Int,b)) -> IO (Int,b) #-}
> {-# SPECIALIZE signalF :: MSemN Word -> (Word -> (Word,b)) -> IO (Word,b) #-}
> {-# SPECIALIZE signalF :: MSemN Integer -> (Integer -> (Integer,b)) -> IO (Integer,b) #-}
> signalF :: forall i b. Integral i => MSemN i -> (i -> (i, b)) -> IO (i, b)
signalF MSemN i
m i -> (i, b)
f = (i -> (i, b)) -> IO (i, b) -> IO (i, b)
seq i -> (i, b)
f (IO (i, b) -> IO (i, b)) -> IO (i, b) -> IO (i, b)
forall a b. (a -> b) -> a -> b
$ IO (i, b) -> IO (i, b)
forall a. IO a -> IO a
mask_ (IO (i, b) -> IO (i, b))
-> ((MS i -> IO (MS i, (i, b))) -> IO (i, b))
-> (MS i -> IO (MS i, (i, b)))
-> IO (i, b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MVar (MS i) -> (MS i -> IO (MS i, (i, b))) -> IO (i, b)
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar (MSemN i -> MVar (MS i)
forall i. MSemN i -> MVar (MS i)
quantityStore MSemN i
m) ((MS i -> IO (MS i, (i, b))) -> IO (i, b))
-> (MS i -> IO (MS i, (i, b))) -> IO (i, b)
forall a b. (a -> b) -> a -> b
$ \ MS i
ms -> do
>
>
> let out :: (i, b)
out@(i
size,b
_) = i -> (i, b)
f (MS i -> i
forall i. MS i -> i
avail MS i
ms)
> i
total <- i -> IO i
forall a. a -> IO a
evaluate (i -> IO i) -> i -> IO i
forall a b. (a -> b) -> a -> b
$ MS i -> i
forall i. MS i -> i
avail MS i
ms i -> i -> i
forall a. Num a => a -> a -> a
+ i
size
> MS i
ms' <- case MS i -> Maybe i
forall i. MS i -> Maybe i
headWants MS i
ms of
> Just i
wanted | i
wanted i -> i -> Bool
forall a. Ord a => a -> a -> Bool
<= i
total -> do
>
> Bool
didPlace <- MVar () -> () -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar (MSemN i -> MVar ()
forall i. MSemN i -> MVar ()
headWait MSemN i
m) ()
> MS i -> IO (MS i)
forall a. a -> IO a
evaluate (MS i -> IO (MS i)) -> MS i -> IO (MS i)
forall a b. (a -> b) -> a -> b
$ if Bool
didPlace
> then MS :: forall i. i -> Maybe i -> MS i
MS { avail :: i
avail = i
total i -> i -> i
forall a. Num a => a -> a -> a
- i
wanted, headWants :: Maybe i
headWants = Maybe i
forall a. Maybe a
Nothing }
> else MS :: forall i. i -> Maybe i -> MS i
MS { avail :: i
avail = i
total, headWants :: Maybe i
headWants = Maybe i
forall a. Maybe a
Nothing }
> Maybe i
_ -> MS i -> IO (MS i)
forall a. a -> IO a
evaluate MS i
ms { avail :: i
avail = i
total }
> (MS i, (i, b)) -> IO (MS i, (i, b))
forall (m :: * -> *) a. Monad m => a -> m a
return (MS i
ms',(i, b)
out)
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> peekAvail :: Integral i => MSemN i -> IO i
> {-# SPECIALIZE peekAvail :: MSemN Int -> IO Int #-}
> {-# SPECIALIZE peekAvail :: MSemN Word -> IO Word #-}
> {-# SPECIALIZE peekAvail :: MSemN Integer -> IO Integer #-}
> peekAvail :: forall i. Integral i => MSemN i -> IO i
peekAvail MSemN i
m = MVar (MS i) -> (MS i -> IO i) -> IO i
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (MSemN i -> MVar (MS i)
forall i. MSemN i -> MVar (MS i)
quantityStore MSemN i
m) (i -> IO i
forall (m :: * -> *) a. Monad m => a -> m a
return (i -> IO i) -> (MS i -> i) -> MS i -> IO i
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MS i -> i
forall i. MS i -> i
avail)