module Ecluse.Core.Worker.Loop (
workerLoop,
) where
import Data.Time (getCurrentTime)
import Katip (Severity (DebugS, ErrorS), logFM, ls)
import UnliftIO (tryAny)
import UnliftIO.Concurrent (threadDelay)
import Ecluse.Core.Queue (MirrorQueue (receive))
import Ecluse.Core.Worker.Job (displayExceptionT, processBatch)
import Ecluse.Core.Worker.Liveness (recordPoll)
import Ecluse.Core.Worker.Types
workerLoop :: WorkerM ()
workerLoop :: WorkerM ()
workerLoop = WorkerM () -> WorkerM ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (WorkerM () -> WorkerM ()) -> WorkerM () -> WorkerM ()
forall a b. (a -> b) -> a -> b
$ do
outcome <- WorkerM () -> WorkerM (Either SomeException ())
forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> m (Either SomeException a)
tryAny WorkerM ()
pollAndProcess
whenLeft_ outcome $ \SomeException
err -> do
Severity -> LogStr -> WorkerM ()
forall (m :: * -> *).
(Applicative m, KatipContext m) =>
Severity -> LogStr -> m ()
logFM Severity
ErrorS (Text -> LogStr
forall a. StringConv a Text => a -> LogStr
ls (Text
"worker iteration failed, backing off: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> SomeException -> Text
forall e. Exception e => e -> Text
displayExceptionT SomeException
err))
WorkerM ()
backoff
where
pollAndProcess :: WorkerM ()
pollAndProcess :: WorkerM ()
pollAndProcess = do
queue <- (WorkerRuntime -> MirrorQueue) -> WorkerM MirrorQueue
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks WorkerRuntime -> MirrorQueue
wrQueue
messages <- liftIO (receive queue)
case messages of
[] -> WorkerM ()
forall (f :: * -> *). Applicative f => f ()
pass
[QueueMessage]
_ -> Severity -> LogStr -> WorkerM ()
forall (m :: * -> *).
(Applicative m, KatipContext m) =>
Severity -> LogStr -> m ()
logFM Severity
DebugS (Text -> LogStr
forall a. StringConv a Text => a -> LogStr
ls (Text
"worker received " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Int -> Text
forall b a. (Show a, IsString b) => a -> b
show ([QueueMessage] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [QueueMessage]
messages) Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" messages" :: Text))
heartbeat <- asks wrHeartbeat
now <- liftIO getCurrentTime
liftIO (recordPoll heartbeat now)
processBatch messages
backoff :: WorkerM ()
backoff :: WorkerM ()
backoff = Int -> WorkerM ()
forall (m :: * -> *). MonadIO m => Int -> m ()
threadDelay Int
1_000_000