{- | Loop robustness and supervision for the worker.

The loop is wrapped so a single bad iteration cannot kill the worker thread: a
transient @receive@ / fetch / publish error, or an undecodable body, is caught,
logged, and the loop backs off and continues. (Job-level "retry is don't ack" is a
separate concern -- it governs whether one message redelivers; it does not protect
the loop, since an escaping exception would still tear the thread down.) The
composition root holds the worker under @concurrently_@ alongside the server, so a
genuinely fatal error propagates and takes the process down (fail-stop), while
transient faults self-recover here. A successful poll advances the 'WorkerHeartbeat',
so a stalled loop is visible to the liveness probe.

Shutdown tears the loop down cleanly: the composition root runs it under
@concurrently_@ within its resource bracket, so process teardown cancels the loop
thread and an in-flight, un-acked message simply redelivers -- safe, because
publishing is idempotent (a version already present is success).
-}
module Ecluse.Core.Worker.Loop (
    workerLoop,
) where

import Data.Time (getCurrentTime)
import Katip (Severity (DebugS, ErrorS), logFM, ls)
import UnliftIO (tryAny)
import UnliftIO.Concurrent (threadDelay)

import Ecluse.Core.Queue (MirrorQueue (receive))
import Ecluse.Core.Worker.Job (displayExceptionT, processBatch)
import Ecluse.Core.Worker.Liveness (recordPoll)
import Ecluse.Core.Worker.Types

{- | The continuous consume loop: long-poll for a batch, process it, repeat.

Each iteration is wrapped so a single failure -- a @receive@ that throws, a fetch or
publish error, an undecodable body -- is caught and logged, then the loop backs off
briefly and continues, so one bad iteration cannot kill the worker thread. A
successful poll advances the heartbeat (whether or not the batch was empty), so a
liveness probe sees the loop is alive; an idle queue is a healthy empty poll, not a
stall.
-}
workerLoop :: WorkerM ()
workerLoop :: WorkerM ()
workerLoop = WorkerM () -> WorkerM ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (WorkerM () -> WorkerM ()) -> WorkerM () -> WorkerM ()
forall a b. (a -> b) -> a -> b
$ do
    outcome <- WorkerM () -> WorkerM (Either SomeException ())
forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> m (Either SomeException a)
tryAny WorkerM ()
pollAndProcess
    whenLeft_ outcome $ \SomeException
err -> do
        Severity -> LogStr -> WorkerM ()
forall (m :: * -> *).
(Applicative m, KatipContext m) =>
Severity -> LogStr -> m ()
logFM Severity
ErrorS (Text -> LogStr
forall a. StringConv a Text => a -> LogStr
ls (Text
"worker iteration failed, backing off: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> SomeException -> Text
forall e. Exception e => e -> Text
displayExceptionT SomeException
err))
        WorkerM ()
backoff
  where
    pollAndProcess :: WorkerM ()
    pollAndProcess :: WorkerM ()
pollAndProcess = do
        queue <- (WorkerRuntime -> MirrorQueue) -> WorkerM MirrorQueue
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks WorkerRuntime -> MirrorQueue
wrQueue
        messages <- liftIO (receive queue)
        case messages of
            [] -> WorkerM ()
forall (f :: * -> *). Applicative f => f ()
pass
            [QueueMessage]
_ -> Severity -> LogStr -> WorkerM ()
forall (m :: * -> *).
(Applicative m, KatipContext m) =>
Severity -> LogStr -> m ()
logFM Severity
DebugS (Text -> LogStr
forall a. StringConv a Text => a -> LogStr
ls (Text
"worker received " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Int -> Text
forall b a. (Show a, IsString b) => a -> b
show ([QueueMessage] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [QueueMessage]
messages) Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" messages" :: Text))
        -- Heartbeat on every successful poll -- an empty long-poll is a healthy idle.
        heartbeat <- asks wrHeartbeat
        now <- liftIO getCurrentTime
        liftIO (recordPoll heartbeat now)
        processBatch messages

-- The fixed pause after a failed iteration, so a persistently failing dependency
-- (queue, upstream) is retried at a bounded rate rather than hot-looping.
backoff :: WorkerM ()
backoff :: WorkerM ()
backoff = Int -> WorkerM ()
forall (m :: * -> *). MonadIO m => Int -> m ()
threadDelay Int
1_000_000