module Ecluse.Core.Worker.Liveness ( WorkerHeartbeat, newWorkerHeartbeat, recordPoll, lastPoll, workerHeartbeatStaleAfter, heartbeatHealthy, heartbeatHealthyNow, ) where import Data.Time (NominalDiffTime, UTCTime, diffUTCTime, getCurrentTime) {- | The mirror worker's consume-loop heartbeat: the wall-clock time of the worker's __last successful poll__ of the queue. It is the worker's own liveness signal, kept apart from the server's HTTP readiness so single-process health reflects a stalled worker today and a future standalone worker binary keeps the same probe. The worker 'recordPoll's after each successful @receive@ (whether or not the batch was empty -- an empty long-poll is a healthy idle, not a stall); a liveness probe reads 'lastPoll' and compares it against the wall clock to decide whether the loop has gone quiet for too long. -} newtype WorkerHeartbeat = WorkerHeartbeat (TVar (Maybe UTCTime)) {- | Build a fresh 'WorkerHeartbeat' with no poll yet recorded ('lastPoll' is 'Nothing' until the worker's first successful @receive@). -} newWorkerHeartbeat :: IO WorkerHeartbeat newWorkerHeartbeat :: IO WorkerHeartbeat newWorkerHeartbeat = TVar (Maybe UTCTime) -> WorkerHeartbeat WorkerHeartbeat (TVar (Maybe UTCTime) -> WorkerHeartbeat) -> IO (TVar (Maybe UTCTime)) -> IO WorkerHeartbeat forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b <$> Maybe UTCTime -> IO (TVar (Maybe UTCTime)) forall (m :: * -> *) a. MonadIO m => a -> m (TVar a) newTVarIO Maybe UTCTime forall a. Maybe a Nothing {- | Record the time of a successful queue poll, advancing the heartbeat. Called by the worker after each @receive@ returns (the loop is alive even on an empty batch). -} recordPoll :: WorkerHeartbeat -> UTCTime -> IO () recordPoll :: WorkerHeartbeat -> UTCTime -> IO () recordPoll (WorkerHeartbeat TVar (Maybe UTCTime) var) UTCTime now = STM () -> IO () forall (m :: * -> *) a. MonadIO m => STM a -> m a atomically (TVar (Maybe UTCTime) -> Maybe UTCTime -> STM () forall a. TVar a -> a -> STM () writeTVar TVar (Maybe UTCTime) var (UTCTime -> Maybe UTCTime forall a. a -> Maybe a Just UTCTime now)) {- | The time of the worker's last successful poll, or 'Nothing' before its first. A liveness probe reads this and compares it against the wall clock. -} lastPoll :: WorkerHeartbeat -> IO (Maybe UTCTime) lastPoll :: WorkerHeartbeat -> IO (Maybe UTCTime) lastPoll (WorkerHeartbeat TVar (Maybe UTCTime) var) = TVar (Maybe UTCTime) -> IO (Maybe UTCTime) forall (m :: * -> *) a. MonadIO m => TVar a -> m a readTVarIO TVar (Maybe UTCTime) var {- | How long the worker's last successful poll may be stale before the loop is considered stalled -- the staleness threshold the liveness probe applies. It is a generous multiple of the long-poll cadence: a healthy idle worker still completes a poll at least every 'Ecluse.Core.Queue.Sqs.sqsWaitSeconds' (≤ 20s by default), so a gap several times that is a genuine stall, not an idle queue. Set well above one poll window so liveness never flaps on normal scheduling jitter. -} workerHeartbeatStaleAfter :: NominalDiffTime workerHeartbeatStaleAfter :: NominalDiffTime workerHeartbeatStaleAfter = NominalDiffTime 120 {- | Whether the worker's consume loop is healthy as of @now@, given its last successful poll. This is the liveness signal the single-process @\/livez@ probe folds in (see "Ecluse.Server"), distinct from HTTP readiness. * 'Nothing' (no poll yet) is __healthy__: the worker is still starting, not stalled. * A poll within 'workerHeartbeatStaleAfter' is healthy. * A poll older than that is __unhealthy__: the loop has gone quiet for too long. >>> import Data.Time (UTCTime (UTCTime), fromGregorian, secondsToDiffTime) >>> let t0 = UTCTime (fromGregorian 2020 1 1) (secondsToDiffTime 0) >>> heartbeatHealthy t0 Nothing True >>> let now = UTCTime (fromGregorian 2020 1 1) (secondsToDiffTime 10) >>> heartbeatHealthy now (Just t0) True >>> let later = UTCTime (fromGregorian 2020 1 1) (secondsToDiffTime 300) >>> heartbeatHealthy later (Just t0) False -} heartbeatHealthy :: UTCTime -> Maybe UTCTime -> Bool heartbeatHealthy :: UTCTime -> Maybe UTCTime -> Bool heartbeatHealthy UTCTime _ Maybe UTCTime Nothing = Bool True heartbeatHealthy UTCTime now (Just UTCTime polledAt) = UTCTime -> UTCTime -> NominalDiffTime diffUTCTime UTCTime now UTCTime polledAt NominalDiffTime -> NominalDiffTime -> Bool forall a. Ord a => a -> a -> Bool <= NominalDiffTime workerHeartbeatStaleAfter {- | Read the worker heartbeat and decide liveness against the current wall clock -- the @IO@ wrapper the liveness probe calls. 'True' while the consume loop is alive (or still starting); 'False' once the last successful poll is staler than 'workerHeartbeatStaleAfter'. -} heartbeatHealthyNow :: WorkerHeartbeat -> IO Bool heartbeatHealthyNow :: WorkerHeartbeat -> IO Bool heartbeatHealthyNow WorkerHeartbeat heartbeat = UTCTime -> Maybe UTCTime -> Bool heartbeatHealthy (UTCTime -> Maybe UTCTime -> Bool) -> IO UTCTime -> IO (Maybe UTCTime -> Bool) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b <$> IO UTCTime getCurrentTime IO (Maybe UTCTime -> Bool) -> IO (Maybe UTCTime) -> IO Bool forall a b. IO (a -> b) -> IO a -> IO b forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b <*> WorkerHeartbeat -> IO (Maybe UTCTime) lastPoll WorkerHeartbeat heartbeat