module Ecluse.Core.Server.Admission (
ServeAdmission,
newServeAdmission,
unlimitedServeAdmission,
withServeAdmission,
serveAdmissionWaitMicros,
newServeAdmissionTuned,
) where
import Control.Concurrent.STM (retry)
import GHC.Conc (registerDelay)
import UnliftIO (MonadUnliftIO)
import UnliftIO.Exception qualified as UE
import Ecluse.Core.Telemetry.Record (MetricsPort (..))
data ServeAdmission
= UnlimitedServeAdmission
| BoundedServeAdmission BoundedAdmission
data BoundedAdmission = BoundedAdmission
{ BoundedAdmission -> TVar Int
baSlots :: TVar Int
, BoundedAdmission -> TVar Int
baWaiting :: TVar Int
, BoundedAdmission -> Int
baWaitingRoom :: Int
, BoundedAdmission -> Int
baWaitMicros :: Int
}
serveAdmissionWaitMicros :: Int
serveAdmissionWaitMicros :: Int
serveAdmissionWaitMicros = Int
1_000_000
newServeAdmission :: Int -> IO ServeAdmission
newServeAdmission :: Int -> IO ServeAdmission
newServeAdmission Int
capacity
| Int
capacity Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0 = Text -> IO ServeAdmission
forall a t. (HasCallStack, IsText t) => t -> a
error Text
"ServeAdmission capacity must be positive"
| Bool
otherwise = Int -> Int -> Int -> IO ServeAdmission
newServeAdmissionTuned Int
capacity Int
capacity Int
serveAdmissionWaitMicros
newServeAdmissionTuned :: Int -> Int -> Int -> IO ServeAdmission
newServeAdmissionTuned :: Int -> Int -> Int -> IO ServeAdmission
newServeAdmissionTuned Int
capacity Int
room Int
waitMicros = do
slots <- Int -> IO (TVar Int)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO Int
capacity
waiting <- newTVarIO 0
pure $
BoundedServeAdmission
BoundedAdmission
{ baSlots = slots
, baWaiting = waiting
, baWaitingRoom = max 0 room
, baWaitMicros = max 0 waitMicros
}
unlimitedServeAdmission :: ServeAdmission
unlimitedServeAdmission :: ServeAdmission
unlimitedServeAdmission = ServeAdmission
UnlimitedServeAdmission
data Gate = Admitted | Queued | Refused
doorDecision :: BoundedAdmission -> STM Gate
doorDecision :: BoundedAdmission -> STM Gate
doorDecision BoundedAdmission
ba = do
available <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar (BoundedAdmission -> TVar Int
baSlots BoundedAdmission
ba)
waiting <- readTVar (baWaiting ba)
if available > 0 && waiting == 0
then writeTVar (baSlots ba) (available - 1) $> Admitted
else
if waiting >= baWaitingRoom ba
then pure Refused
else writeTVar (baWaiting ba) (waiting + 1) $> Queued
acquireOrExpire :: BoundedAdmission -> TVar Bool -> STM Bool
acquireOrExpire :: BoundedAdmission -> TVar Bool -> STM Bool
acquireOrExpire BoundedAdmission
ba TVar Bool
deadline = do
available <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar (BoundedAdmission -> TVar Int
baSlots BoundedAdmission
ba)
if available > 0
then writeTVar (baSlots ba) (available - 1) $> True
else do
expired <- readTVar deadline
if expired then pure False else retry
withServeAdmission :: (MonadUnliftIO m) => MetricsPort -> ServeAdmission -> m a -> m (Maybe a)
withServeAdmission :: forall (m :: * -> *) a.
MonadUnliftIO m =>
MetricsPort -> ServeAdmission -> m a -> m (Maybe a)
withServeAdmission MetricsPort
_ ServeAdmission
UnlimitedServeAdmission m a
action = a -> Maybe a
forall a. a -> Maybe a
Just (a -> Maybe a) -> m a -> m (Maybe a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m a
action
withServeAdmission MetricsPort
metrics (BoundedServeAdmission BoundedAdmission
ba) m a
action =
((forall a. m a -> m a) -> m (Maybe a)) -> m (Maybe a)
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> m a) -> m b) -> m b
UE.mask (((forall a. m a -> m a) -> m (Maybe a)) -> m (Maybe a))
-> ((forall a. m a -> m a) -> m (Maybe a)) -> m (Maybe a)
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> m a
restore -> do
gate <- STM Gate -> m Gate
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (BoundedAdmission -> STM Gate
doorDecision BoundedAdmission
ba)
case gate of
Gate
Refused -> Maybe a -> m (Maybe a)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe a
forall a. Maybe a
Nothing
Gate
Admitted -> (m a -> m a) -> m (Maybe a)
forall {a}. (m a -> m a) -> m (Maybe a)
admitted m a -> m a
forall a. m a -> m a
restore
Gate
Queued -> do
deadline <- IO (TVar Bool) -> m (TVar Bool)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Int -> IO (TVar Bool)
registerDelay (BoundedAdmission -> Int
baWaitMicros BoundedAdmission
ba))
acquired <-
atomically (acquireOrExpire ba deadline)
`UE.finally` atomically (modifyTVar' (baWaiting ba) (subtract 1))
if acquired
then liftIO (mpServeAdmissionQueued metrics) >> admitted restore
else pure Nothing
where
admitted :: (m a -> m a) -> m (Maybe a)
admitted m a -> m a
restore =
a -> Maybe a
forall a. a -> Maybe a
Just (a -> Maybe a) -> m a -> m (Maybe a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (m a -> m a
restore (IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (MetricsPort -> Int -> IO ()
mpServeAdmissionInFlight MetricsPort
metrics Int
1) m () -> m a -> m a
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> m a
action) m a -> m () -> m a
forall (m :: * -> *) a b. MonadUnliftIO m => m a -> m b -> m a
`UE.finally` m ()
release)
release :: m ()
release = STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' (BoundedAdmission -> TVar Int
baSlots BoundedAdmission
ba) (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)) m () -> m () -> m ()
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (MetricsPort -> Int -> IO ()
mpServeAdmissionInFlight MetricsPort
metrics (-Int
1))