module Ecluse.Core.Worker.Liveness (
    WorkerHeartbeat,
    newWorkerHeartbeat,
    recordPoll,
    lastPoll,
    workerHeartbeatStaleAfter,
    heartbeatHealthy,
    heartbeatHealthyNow,
) where

import Data.Time (NominalDiffTime, UTCTime, diffUTCTime, getCurrentTime)

{- | The mirror worker's consume-loop heartbeat: the wall-clock time of the
worker's __last successful poll__ of the queue.

It is the worker's own liveness signal, kept apart from the server's HTTP
readiness so single-process health reflects a stalled worker today and a future
standalone worker binary keeps the same probe. The worker 'recordPoll's after each
successful @receive@ (whether or not the batch was empty -- an empty long-poll is a
healthy idle, not a stall); a liveness probe reads 'lastPoll' and compares it
against the wall clock to decide whether the loop has gone quiet for too long.
-}
newtype WorkerHeartbeat = WorkerHeartbeat (TVar (Maybe UTCTime))

{- | Build a fresh 'WorkerHeartbeat' with no poll yet recorded ('lastPoll' is
'Nothing' until the worker's first successful @receive@).
-}
newWorkerHeartbeat :: IO WorkerHeartbeat
newWorkerHeartbeat :: IO WorkerHeartbeat
newWorkerHeartbeat = TVar (Maybe UTCTime) -> WorkerHeartbeat
WorkerHeartbeat (TVar (Maybe UTCTime) -> WorkerHeartbeat)
-> IO (TVar (Maybe UTCTime)) -> IO WorkerHeartbeat
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe UTCTime -> IO (TVar (Maybe UTCTime))
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO Maybe UTCTime
forall a. Maybe a
Nothing

{- | Record the time of a successful queue poll, advancing the heartbeat. Called
by the worker after each @receive@ returns (the loop is alive even on an empty
batch).
-}
recordPoll :: WorkerHeartbeat -> UTCTime -> IO ()
recordPoll :: WorkerHeartbeat -> UTCTime -> IO ()
recordPoll (WorkerHeartbeat TVar (Maybe UTCTime)
var) UTCTime
now = STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (TVar (Maybe UTCTime) -> Maybe UTCTime -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe UTCTime)
var (UTCTime -> Maybe UTCTime
forall a. a -> Maybe a
Just UTCTime
now))

{- | The time of the worker's last successful poll, or 'Nothing' before its first.
A liveness probe reads this and compares it against the wall clock.
-}
lastPoll :: WorkerHeartbeat -> IO (Maybe UTCTime)
lastPoll :: WorkerHeartbeat -> IO (Maybe UTCTime)
lastPoll (WorkerHeartbeat TVar (Maybe UTCTime)
var) = TVar (Maybe UTCTime) -> IO (Maybe UTCTime)
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO TVar (Maybe UTCTime)
var

{- | How long the worker's last successful poll may be stale before the loop is
considered stalled -- the staleness threshold the liveness probe applies.

It is a generous multiple of the long-poll cadence: a healthy idle worker still
completes a poll at least every 'Ecluse.Core.Queue.Sqs.sqsWaitSeconds' (≤ 20s by
default), so a gap several times that is a genuine stall, not an idle queue. Set
well above one poll window so liveness never flaps on normal scheduling jitter.
-}
workerHeartbeatStaleAfter :: NominalDiffTime
workerHeartbeatStaleAfter :: NominalDiffTime
workerHeartbeatStaleAfter = NominalDiffTime
120

{- | Whether the worker's consume loop is healthy as of @now@, given its last
successful poll. This is the liveness signal the single-process @\/livez@ probe
folds in (see "Ecluse.Server"), distinct from HTTP readiness.

* 'Nothing' (no poll yet) is __healthy__: the worker is still starting, not stalled.
* A poll within 'workerHeartbeatStaleAfter' is healthy.
* A poll older than that is __unhealthy__: the loop has gone quiet for too long.

>>> import Data.Time (UTCTime (UTCTime), fromGregorian, secondsToDiffTime)
>>> let t0 = UTCTime (fromGregorian 2020 1 1) (secondsToDiffTime 0)
>>> heartbeatHealthy t0 Nothing
True

>>> let now = UTCTime (fromGregorian 2020 1 1) (secondsToDiffTime 10)
>>> heartbeatHealthy now (Just t0)
True

>>> let later = UTCTime (fromGregorian 2020 1 1) (secondsToDiffTime 300)
>>> heartbeatHealthy later (Just t0)
False
-}
heartbeatHealthy :: UTCTime -> Maybe UTCTime -> Bool
heartbeatHealthy :: UTCTime -> Maybe UTCTime -> Bool
heartbeatHealthy UTCTime
_ Maybe UTCTime
Nothing = Bool
True
heartbeatHealthy UTCTime
now (Just UTCTime
polledAt) = UTCTime -> UTCTime -> NominalDiffTime
diffUTCTime UTCTime
now UTCTime
polledAt NominalDiffTime -> NominalDiffTime -> Bool
forall a. Ord a => a -> a -> Bool
<= NominalDiffTime
workerHeartbeatStaleAfter

{- | Read the worker heartbeat and decide liveness against the current wall clock --
the @IO@ wrapper the liveness probe calls. 'True' while the consume loop is alive
(or still starting); 'False' once the last successful poll is staler than
'workerHeartbeatStaleAfter'.
-}
heartbeatHealthyNow :: WorkerHeartbeat -> IO Bool
heartbeatHealthyNow :: WorkerHeartbeat -> IO Bool
heartbeatHealthyNow WorkerHeartbeat
heartbeat = UTCTime -> Maybe UTCTime -> Bool
heartbeatHealthy (UTCTime -> Maybe UTCTime -> Bool)
-> IO UTCTime -> IO (Maybe UTCTime -> Bool)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UTCTime
getCurrentTime IO (Maybe UTCTime -> Bool) -> IO (Maybe UTCTime) -> IO Bool
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> WorkerHeartbeat -> IO (Maybe UTCTime)
lastPoll WorkerHeartbeat
heartbeat