{- | Ack within the visibility budget during job processing.

A received message is hidden only for the queue's visibility window. The worker
acks on success; before a publish that may run long it calls
'Ecluse.Core.Queue.extendVisibility' to hold the message before the window lapses; on a
transient failure it does __not__ ack, so the message redelivers. A batch is
processed __sequentially__, so each job has the full visibility budget rather than
competing with its batch-mates for it.
-}
module Ecluse.Core.Worker.Job (
    JobOutcome (..),
    processJob,
    processBatch,
    displayExceptionT,
) where

import Data.Map.Strict qualified as Map
import Katip (Severity (DebugS, ErrorS, InfoS, WarningS), katipAddNamespace, logFM, ls)
import UnliftIO (tryAny, withRunInIO)

import Ecluse.Core.Ecosystem (ecosystemName)
import Ecluse.Core.Package (pkgEcosystem, renderPackageName)
import Ecluse.Core.Queue (MirrorArtifact (maHashes), MirrorJob (jobArtifact, jobArtifactUrl, jobPackage, jobTraceContext, jobVersion), MirrorQueue (ack, extendVisibility), QueueMessage (msgJob, msgReceipt), ReceiptHandle, Seconds (Seconds))
import Ecluse.Core.Registry (PublishFault (PublishRejected, PublishUrlUnformable), RegistryClient (fetchMetadata, parseVersionList, publishArtifact))
import Ecluse.Core.Registry.Metadata (VersionEvaluation (VersionMetadataUnavailable, VersionMissing, VersionPresent))
import Ecluse.Core.Rules (evalRules)
import Ecluse.Core.Rules.Types (Decision (Admitted, Blocked, BlockedByDefault, Undecidable), EvalContext (EvalContext))
import Ecluse.Core.Telemetry.Metrics qualified as Metric
import Ecluse.Core.Telemetry.Record (WorkerMetricsPort (..), timedSeconds)
import Ecluse.Core.Telemetry.Span (JobSpanOutcome (JobSpanOutcome), WorkerTracingPort (..))
import Ecluse.Core.Version (renderVersion)
import Ecluse.Core.Worker.Fetch (fetchArtifactBytes)
import Ecluse.Core.Worker.Integrity (IntegrityResult (..), verifyIntegrity)
import Ecluse.Core.Worker.Types

{- | Process one received batch __sequentially__, so each job gets the full
visibility budget rather than competing with its batch-mates for it. A batch is at
most the queue's configured batch size (≤ 10), so sequential processing is a
deliberate throughput-vs-budget choice, not a scaling bottleneck.
-}
processBatch :: [QueueMessage] -> WorkerM ()
processBatch :: [QueueMessage] -> WorkerM ()
processBatch = (QueueMessage -> WorkerM ()) -> [QueueMessage] -> WorkerM ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ QueueMessage -> WorkerM ()
processMessage

-- Process one message: run the job, and ack on any terminal outcome (success, or a
-- non-retryable drop). A transient failure leaves the message un-acked so the queue
-- redelivers it ("retry is don't ack").
processMessage :: QueueMessage -> WorkerM ()
processMessage :: QueueMessage -> WorkerM ()
processMessage QueueMessage
message = do
    metrics <- (WorkerRuntime -> WorkerMetricsPort) -> WorkerM WorkerMetricsPort
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks WorkerRuntime -> WorkerMetricsPort
wrMetrics
    outcome <- processJob (msgReceipt message) (msgJob message)
    liftIO (wmpMirrorJobProcessed metrics (jobResultMetric outcome))
    case outcome of
        JobOutcome
Succeeded -> ReceiptHandle -> WorkerM ()
ackMessage (QueueMessage -> ReceiptHandle
msgReceipt QueueMessage
message)
        Dropped Text
reason -> do
            -- A non-retryable fault (a tampered artifact, an unformable URL): the
            -- job can never succeed, so it must not redeliver forever. Ack it to
            -- retire it from the queue, having already alarmed at the fault site.
            Severity -> LogStr -> WorkerM ()
forall (m :: * -> *).
(Applicative m, KatipContext m) =>
Severity -> LogStr -> m ()
logFM Severity
ErrorS (Text -> LogStr
forall a. StringConv a Text => a -> LogStr
ls (Text
"dropping unrecoverable mirror job: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
reason))
            ReceiptHandle -> WorkerM ()
ackMessage (QueueMessage -> ReceiptHandle
msgReceipt QueueMessage
message)
        Retried Text
reason ->
            -- A transient fault: leave the message un-acked. How it is retried is
            -- backend-dependent -- a durable queue redelivers it once the visibility
            -- window lapses, while the in-memory backend (no redelivery) simply
            -- re-mirrors it on the next demand. Either way it is not lost.
            Severity -> LogStr -> WorkerM ()
forall (m :: * -> *).
(Applicative m, KatipContext m) =>
Severity -> LogStr -> m ()
logFM Severity
WarningS (Text -> LogStr
forall a. StringConv a Text => a -> LogStr
ls (Text
"leaving mirror job un-acked for retry (redelivered by a durable queue, re-mirrored on next demand by the in-memory one): " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
reason))

-- Classify a terminal job outcome into the bounded @ecluse.mirror.jobs.processed@
-- result: a successful publish (the idempotent already-present 409 surfaces here too)
-- is published, a dropped or retried job is a failure.
jobResultMetric :: JobOutcome -> Metric.MirrorResult
jobResultMetric :: JobOutcome -> MirrorResult
jobResultMetric = \case
    JobOutcome
Succeeded -> MirrorResult
Metric.Published
    Dropped Text
_ -> MirrorResult
Metric.Failed
    Retried Text
_ -> MirrorResult
Metric.Failed

ackMessage :: ReceiptHandle -> WorkerM ()
ackMessage :: ReceiptHandle -> WorkerM ()
ackMessage ReceiptHandle
receipt = do
    queue <- (WorkerRuntime -> MirrorQueue) -> WorkerM MirrorQueue
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks WorkerRuntime -> MirrorQueue
wrQueue
    liftIO (ack queue receipt)

{- | The terminal outcome of processing one mirror job, deciding whether the
message is acked or left to redeliver.
-}
data JobOutcome
    = {- | The publish succeeded, so the job is acked. This covers an idempotent
      redelivery too: a version already present at the mirror target is a @409@ the
      registry handle treats as success ('Ecluse.Core.Registry.publishArtifact'), so it
      surfaces here as 'Succeeded' rather than a distinct case -- as does the same
      presence confirmed by the pre-fetch probe, before any bytes moved.
      -}
      Succeeded
    | {- | A __non-retryable__ fault: the bytes did not match the serve-time digest
      (tamper), or the publish URL was unformable (misconfiguration). Redelivery
      cannot help, so the job is dropped after alarming. Carries the reason.
      -}
      Dropped Text
    | {- | A __transient__ fault: a fetch failure, or a registry rejection worth
      retrying. The message is left un-acked so it redelivers. Carries the reason.
      -}
      Retried Text
    deriving stock (JobOutcome -> JobOutcome -> Bool
(JobOutcome -> JobOutcome -> Bool)
-> (JobOutcome -> JobOutcome -> Bool) -> Eq JobOutcome
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: JobOutcome -> JobOutcome -> Bool
== :: JobOutcome -> JobOutcome -> Bool
$c/= :: JobOutcome -> JobOutcome -> Bool
/= :: JobOutcome -> JobOutcome -> Bool
Eq, Int -> JobOutcome -> ShowS
[JobOutcome] -> ShowS
JobOutcome -> String
(Int -> JobOutcome -> ShowS)
-> (JobOutcome -> String)
-> ([JobOutcome] -> ShowS)
-> Show JobOutcome
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> JobOutcome -> ShowS
showsPrec :: Int -> JobOutcome -> ShowS
$cshow :: JobOutcome -> String
show :: JobOutcome -> String
$cshowList :: [JobOutcome] -> ShowS
showList :: [JobOutcome] -> ShowS
Show)

{- | Process one mirror job end to end: __probe the mirror target__ for the job's version
(a confirmed-present version is acked outright, the duplicate-suppression short-circuit),
then __re-evaluate current policy__, and only on a current admit fetch the artifact,
verify it against the job's serve-time-admitted integrity digest, and publish it to the
mirror target. Returns the 'JobOutcome' that decides ack vs. redeliver.

The presence probe exists for the enqueue-to-availability window: mirroring is
demand-driven, so every public-leg admit of a still-unmirrored version enqueues its own
job, and a fleet-wide install of a novel version enqueues many. Without the probe each
duplicate pays a full artifact download and an integrity recompute before the publish
discovers the version is already present (the idempotent @409@); with it, a duplicate
costs one metadata round trip. The probe is an __optimisation, never a gate__: it skips
only work whose publish would have been that no-op, so the policy re-evaluation below
still guards every artifact that actually publishes.

The policy re-evaluation is the ingest-time gate. The version was gated at serve time, but
the enqueue-to-process window is asynchronous and unbounded, so policy may have tightened
toward deny since (a new denylist entry, a freshly-published advisory, a rule-config
change). The worker re-runs the __same__ rules the serve path gates with, over the version
resolved through the __same__ single-version fetch-and-project, so a now-denied version is
dropped (acked, never published) rather than frozen into the rule-exempt trusted mirror
store; a version the upstream has since withdrawn is likewise dropped, while metadata that
cannot be re-fetched (or a rule that cannot be computed) leaves the job for redelivery. A
current admit proceeds to the integrity gate: a tampered or corrupt artifact fails the job
with no publish, since the mirror is later served without the rules.

The receipt handle is taken so a long publish can 'Ecluse.Core.Queue.extendVisibility'
to hold the message before its window lapses.

The per-job domain span (the worker tracing port) wraps the whole probe → re-evaluate →
fetch → verify → publish, projecting the terminal outcome onto the span so a refused or dropped job
is explainable from the trace, and __linking__ back to the request that enqueued the job
through the trace context the job carries ('jobTraceContext'). The span body is discharged
to 'IO' through the unlift, so the loop's structured log lines still compose through the
ambient @katip@ context.
-}
processJob :: ReceiptHandle -> MirrorJob -> WorkerM JobOutcome
processJob :: ReceiptHandle -> MirrorJob -> WorkerM JobOutcome
processJob ReceiptHandle
receipt MirrorJob
job = Namespace -> WorkerM JobOutcome -> WorkerM JobOutcome
forall (m :: * -> *) a. KatipContext m => Namespace -> m a -> m a
katipAddNamespace Namespace
"job" (WorkerM JobOutcome -> WorkerM JobOutcome)
-> WorkerM JobOutcome -> WorkerM JobOutcome
forall a b. (a -> b) -> a -> b
$ do
    Severity -> LogStr -> WorkerM ()
forall (m :: * -> *).
(Applicative m, KatipContext m) =>
Severity -> LogStr -> m ()
logFM Severity
DebugS (Text -> LogStr
forall a. StringConv a Text => a -> LogStr
ls (Text
"starting mirror job for " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> MirrorJob -> Text
renderJob MirrorJob
job))
    tracing <- (WorkerRuntime -> WorkerTracingPort) -> WorkerM WorkerTracingPort
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks WorkerRuntime -> WorkerTracingPort
wrTracing
    runtime <- ask
    withRunInIO $ \forall a. WorkerM a -> IO a
runInIO ->
        WorkerTracingPort
-> forall a.
   PackageName
   -> Version
   -> Maybe RemoteSpanContext
   -> (a -> JobSpanOutcome)
   -> IO a
   -> IO a
wtpMirrorJobSpan WorkerTracingPort
tracing (MirrorJob -> PackageName
jobPackage MirrorJob
job) (MirrorJob -> Version
jobVersion MirrorJob
job) (MirrorJob -> Maybe RemoteSpanContext
jobTraceContext MirrorJob
job) JobOutcome -> JobSpanOutcome
jobSpanOutcome (IO JobOutcome -> IO JobOutcome) -> IO JobOutcome -> IO JobOutcome
forall a b. (a -> b) -> a -> b
$
            WorkerM JobOutcome -> IO JobOutcome
forall a. WorkerM a -> IO a
runInIO (WorkerM JobOutcome -> IO JobOutcome)
-> WorkerM JobOutcome -> IO JobOutcome
forall a b. (a -> b) -> a -> b
$
                WorkerRuntime
-> forall (m :: * -> *) a.
   (KatipContext m, MonadIO m) =>
   m a -> m a
wrInjectTraceContext WorkerRuntime
runtime (ReceiptHandle -> MirrorJob -> WorkerM JobOutcome
reevaluateThenMirror ReceiptHandle
receipt MirrorJob
job)
  where
    -- Project a terminal job outcome onto the worker-job span: the bounded outcome
    -- label always, and the failure detail (which marks the span errored) when the
    -- job did not publish.
    jobSpanOutcome :: JobOutcome -> JobSpanOutcome
    jobSpanOutcome :: JobOutcome -> JobSpanOutcome
jobSpanOutcome = \case
        JobOutcome
Succeeded -> Text -> Maybe Text -> JobSpanOutcome
JobSpanOutcome Text
"succeeded" Maybe Text
forall a. Maybe a
Nothing
        Dropped Text
reason -> Text -> Maybe Text -> JobSpanOutcome
JobSpanOutcome Text
"dropped" (Text -> Maybe Text
forall a. a -> Maybe a
Just Text
reason)
        Retried Text
reason -> Text -> Maybe Text -> JobSpanOutcome
JobSpanOutcome Text
"retried" (Text -> Maybe Text
forall a. a -> Maybe a
Just Text
reason)

-- The terminal decision of re-evaluating current policy for a job, before any artifact
-- fetch: admit (mirror it), drop (a current deny or a withdrawn version, acked and never
-- published), or retry (metadata unobtainable, or a rule uncomputable, left for redelivery).
data ReevalOutcome
    = ReevalAdmit
    | ReevalDrop Text
    | ReevalRetry Text

-- Probe the mirror target first (a confirmed-present version is a no-op job, acked
-- without another byte moved), then re-evaluate current policy, then mirror on a current
-- admit. Both cheap steps run before the (potentially large) artifact fetch, so a
-- duplicate is retired for one metadata round trip and a now-denied job is dropped
-- without downloading its bytes.
reevaluateThenMirror :: ReceiptHandle -> MirrorJob -> WorkerM JobOutcome
reevaluateThenMirror :: ReceiptHandle -> MirrorJob -> WorkerM JobOutcome
reevaluateThenMirror ReceiptHandle
receipt MirrorJob
job =
    MirrorJob -> WorkerM Bool
alreadyMirrored MirrorJob
job WorkerM Bool -> (Bool -> WorkerM JobOutcome) -> WorkerM JobOutcome
forall a b. WorkerM a -> (a -> WorkerM b) -> WorkerM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        Bool
True -> do
            Severity -> LogStr -> WorkerM ()
forall (m :: * -> *).
(Applicative m, KatipContext m) =>
Severity -> LogStr -> m ()
logFM Severity
InfoS (Text -> LogStr
forall a. StringConv a Text => a -> LogStr
ls (Text
"already present at the mirror target, acking without re-publish: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> MirrorJob -> Text
renderJob MirrorJob
job))
            JobOutcome -> WorkerM JobOutcome
forall a. a -> WorkerM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure JobOutcome
Succeeded
        Bool
False ->
            MirrorJob -> WorkerM ReevalOutcome
reevaluatePolicy MirrorJob
job WorkerM ReevalOutcome
-> (ReevalOutcome -> WorkerM JobOutcome) -> WorkerM JobOutcome
forall a b. WorkerM a -> (a -> WorkerM b) -> WorkerM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
                ReevalOutcome
ReevalAdmit -> ReceiptHandle -> MirrorJob -> WorkerM JobOutcome
mirrorArtifact ReceiptHandle
receipt MirrorJob
job
                ReevalDrop Text
reason -> JobOutcome -> WorkerM JobOutcome
forall a. a -> WorkerM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Text -> JobOutcome
Dropped Text
reason)
                ReevalRetry Text
reason -> JobOutcome -> WorkerM JobOutcome
forall a. a -> WorkerM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Text -> JobOutcome
Retried Text
reason)

{- Ask the mirror target whether the job's version is already present, through the
publish-side registry handle's read fields. __Positive confirmation only__: 'True' needs
the mirror's own metadata to parse and to list the version; a fetch failure or an
unparseable body (a mirror @404@ for a package not yet mirrored, an auth refusal, an
outage) answers 'False', so the job falls through to the full gated pipeline. A false
'False' costs one redundant download and an idempotent @409@ -- exactly the pre-probe
behaviour -- so the probe can only ever save work, never lose a publish or admit one
unvetted. -}
alreadyMirrored :: MirrorJob -> WorkerM Bool
alreadyMirrored :: MirrorJob -> WorkerM Bool
alreadyMirrored MirrorJob
job = do
    client <- (WorkerRuntime -> RegistryClient) -> WorkerM RegistryClient
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks WorkerRuntime -> RegistryClient
wrRegistry
    probed <- tryAny (liftIO (fetchMetadata client (jobPackage job)))
    pure $ case probed of
        Left SomeException
_ -> Bool
False
        Right RegistryResponse
response -> case RegistryClient -> RegistryResponse -> Either ParseError [Version]
parseVersionList RegistryClient
client RegistryResponse
response of
            Left ParseError
_ -> Bool
False
            Right [Version]
versions -> MirrorJob -> Version
jobVersion MirrorJob
job Version -> [Version] -> Bool
forall (f :: * -> *) a.
(Foldable f, DisallowElem f, Eq a) =>
a -> f a -> Bool
`elem` [Version]
versions

{- Re-run current policy for the job's single version: look up the job's ecosystem bundle,
resolve and project the version's metadata through the shared single-version fetch, and
evaluate the prepared rules over it. A job for an ecosystem with no configured bundle is
fail-closed (dropped) rather than mirrored unvetted. The outcomes mirror the serve path's
degrade: a withdrawn/absent version is a non-retryable drop, unobtainable metadata a
transient retry; a rule block (or deny-by-default) drops, and an uncomputable rule retries
rather than dropping a serviceable job or publishing it unvetted. -}
reevaluatePolicy :: MirrorJob -> WorkerM ReevalOutcome
reevaluatePolicy :: MirrorJob -> WorkerM ReevalOutcome
reevaluatePolicy MirrorJob
job = do
    policies <- (WorkerRuntime -> WorkerPolicies) -> WorkerM WorkerPolicies
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks WorkerRuntime -> WorkerPolicies
wrPolicies
    case Map.lookup ecosystem policies of
        Maybe WorkerPolicy
Nothing ->
            ReevalOutcome -> WorkerM ReevalOutcome
forall a. a -> WorkerM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Text -> ReevalOutcome
ReevalDrop (Text
"no rule policy is configured for the " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Ecosystem -> Text
ecosystemName Ecosystem
ecosystem Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" ecosystem; refusing to mirror " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> MirrorJob -> Text
renderJob MirrorJob
job))
        Just WorkerPolicy
policy -> do
            evaluation <- IO VersionEvaluation -> WorkerM VersionEvaluation
forall a. IO a -> WorkerM a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (WorkerPolicy -> PackageName -> Version -> IO VersionEvaluation
wpResolveVersion WorkerPolicy
policy (MirrorJob -> PackageName
jobPackage MirrorJob
job) (MirrorJob -> Version
jobVersion MirrorJob
job))
            case evaluation of
                VersionEvaluation
VersionMetadataUnavailable ->
                    ReevalOutcome -> WorkerM ReevalOutcome
forall a. a -> WorkerM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Text -> ReevalOutcome
ReevalRetry (Text
"could not re-fetch metadata to re-evaluate current policy for " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> MirrorJob -> Text
renderJob MirrorJob
job))
                VersionEvaluation
VersionMissing ->
                    ReevalOutcome -> WorkerM ReevalOutcome
forall a. a -> WorkerM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Text -> ReevalOutcome
ReevalDrop (Text
"the public upstream no longer offers " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> MirrorJob -> Text
renderJob MirrorJob
job Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"; refusing to mirror a withdrawn version"))
                VersionPresent PackageDetails
details -> do
                    ctx <- IO EvalContext -> WorkerM EvalContext
forall a. IO a -> WorkerM a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (UTCTime -> EvalContext
EvalContext (UTCTime -> EvalContext) -> IO UTCTime -> IO EvalContext
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> WorkerPolicy -> IO UTCTime
wpNow WorkerPolicy
policy)
                    decision <- liftIO (evalRules ctx (wpRules policy) details)
                    pure (outcomeOfDecision job decision)
  where
    ecosystem :: Ecosystem
ecosystem = PackageName -> Ecosystem
pkgEcosystem (MirrorJob -> PackageName
jobPackage MirrorJob
job)

-- Map a re-evaluation 'Decision' to a job outcome. An admit mirrors; a rule block or
-- deny-by-default drops (current policy denies the version, so it must not be frozen into
-- the trusted mirror store); an undecidable verdict (a fail-closed rule that could not be
-- computed) retries, so a transient advisory-source outage neither drops a serviceable job
-- nor publishes it unvetted (the serve path renders the same cause a transient 503).
outcomeOfDecision :: MirrorJob -> Decision -> ReevalOutcome
outcomeOfDecision :: MirrorJob -> Decision -> ReevalOutcome
outcomeOfDecision MirrorJob
job = \case
    Admitted{} -> ReevalOutcome
ReevalAdmit
    Blocked Text
ruleName Text
reason ->
        Text -> ReevalOutcome
ReevalDrop (Text
"current policy denies " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> MirrorJob -> Text
renderJob MirrorJob
job Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
": blocked by " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
ruleName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" (" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
reason Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
")")
    BlockedByDefault [Text]
_ ->
        Text -> ReevalOutcome
ReevalDrop (Text
"current policy denies " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> MirrorJob -> Text
renderJob MirrorJob
job Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
": no rule admits it")
    Undecidable Transience
_ Text
reason ->
        Text -> ReevalOutcome
ReevalRetry (Text
"current policy could not be evaluated for " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> MirrorJob -> Text
renderJob MirrorJob
job Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
": " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
reason)

-- Fetch the artifact bytes, verify them against the job's serve-time-admitted integrity
-- digest, and (only on a match) publish to the mirror target. Reached only on a current
-- policy admit. The integrity gate is the security crux: a tampered or corrupt artifact
-- must never reach the private upstream, which is served without the rules, so a mismatch
-- fails the job with no publish and alarms.
mirrorArtifact :: ReceiptHandle -> MirrorJob -> WorkerM JobOutcome
mirrorArtifact :: ReceiptHandle -> MirrorJob -> WorkerM JobOutcome
mirrorArtifact ReceiptHandle
receipt MirrorJob
job = do
    Severity -> LogStr -> WorkerM ()
forall (m :: * -> *).
(Applicative m, KatipContext m) =>
Severity -> LogStr -> m ()
logFM Severity
DebugS (Text -> LogStr
forall a. StringConv a Text => a -> LogStr
ls (Text
"fetching artifact bytes from " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> MirrorJob -> Text
jobArtifactUrl MirrorJob
job))
    fetched <- Text -> WorkerM (Either Text ByteString)
fetchArtifactBytes (MirrorJob -> Text
jobArtifactUrl MirrorJob
job)
    case fetched of
        Left Text
reason -> JobOutcome -> WorkerM JobOutcome
forall a. a -> WorkerM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Text -> JobOutcome
Retried Text
reason)
        Right ByteString
bytes ->
            case NonEmpty Hash -> ByteString -> IntegrityResult
verifyIntegrity (MirrorArtifact -> NonEmpty Hash
maHashes MirrorArtifact
artifact) ByteString
bytes of
                IntegrityMismatch Text
detail -> do
                    Severity -> LogStr -> WorkerM ()
forall (m :: * -> *).
(Applicative m, KatipContext m) =>
Severity -> LogStr -> m ()
logFM Severity
ErrorS (Text -> LogStr
forall a. StringConv a Text => a -> LogStr
ls (Text
"artifact integrity mismatch, refusing to publish: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
detail))
                    JobOutcome -> WorkerM JobOutcome
forall a. a -> WorkerM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Text -> JobOutcome
Dropped (Text
"integrity mismatch: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
detail))
                IntegrityResult
IntegrityVerified -> ReceiptHandle -> MirrorJob -> ByteString -> WorkerM JobOutcome
publishVerified ReceiptHandle
receipt MirrorJob
job ByteString
bytes
  where
    artifact :: MirrorArtifact
artifact = MirrorJob -> MirrorArtifact
jobArtifact MirrorJob
job

-- Publish already-verified bytes to the mirror target: hold the message past the
-- visibility window (a large-artifact publish may run long), publish through the
-- composition-root publish client (which assembles the ecosystem-specific document),
-- and classify the registry outcome into a 'JobOutcome'.
publishVerified :: ReceiptHandle -> MirrorJob -> ByteString -> WorkerM JobOutcome
publishVerified :: ReceiptHandle -> MirrorJob -> ByteString -> WorkerM JobOutcome
publishVerified ReceiptHandle
receipt MirrorJob
job ByteString
bytes = do
    ReceiptHandle -> WorkerM ()
holdForLongPublish ReceiptHandle
receipt
    client <- (WorkerRuntime -> RegistryClient) -> WorkerM RegistryClient
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks WorkerRuntime -> RegistryClient
wrRegistry
    metrics <- asks wrMetrics
    -- The publish is the long, network-bound step; time it for the publish-latency
    -- histogram whichever way the registry responds.
    (result, seconds) <- timedSeconds (liftIO (publishArtifact client (jobPackage job) (jobVersion job) artifact bytes))
    liftIO (wmpMirrorPublishDuration metrics seconds)
    case result of
        Right () -> do
            Severity -> LogStr -> WorkerM ()
forall (m :: * -> *).
(Applicative m, KatipContext m) =>
Severity -> LogStr -> m ()
logFM Severity
InfoS (Text -> LogStr
forall a. StringConv a Text => a -> LogStr
ls (Text
"mirrored artifact published: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> MirrorJob -> Text
renderJob MirrorJob
job))
            JobOutcome -> WorkerM JobOutcome
forall a. a -> WorkerM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure JobOutcome
Succeeded
        Left (PublishRejected PublishError
err) -> do
            -- Transient: undo the long success-path hold so the job redelivers at once
            -- rather than waiting it out (the hold only exists to protect a slow
            -- success). The message is left un-acked, so it redelivers either way.
            ReceiptHandle -> WorkerM ()
releaseForRetry ReceiptHandle
receipt
            JobOutcome -> WorkerM JobOutcome
forall a. a -> WorkerM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Text -> JobOutcome
Retried (Text
"registry rejected publish: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> PublishError -> Text
forall b a. (Show a, IsString b) => a -> b
show PublishError
err))
        Left (PublishUrlUnformable UrlFormationError
urlErr) ->
            -- Non-retryable: 'processMessage' acks this to retire it, so there is no
            -- redelivery to hasten -- leave the hold be.
            JobOutcome -> WorkerM JobOutcome
forall a. a -> WorkerM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Text -> JobOutcome
Dropped (Text
"unformable publish URL: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> UrlFormationError -> Text
forall b a. (Show a, IsString b) => a -> b
show UrlFormationError
urlErr))
  where
    artifact :: MirrorArtifact
artifact = MirrorJob -> MirrorArtifact
jobArtifact MirrorJob
job

-- Hold a received message past the visibility window before a publish that may run
-- long, so a slow write does not let the message redeliver mid-publish -- which would
-- waste a full re-fetch and re-publish of a (potentially large) artifact. The hold is
-- an optimization (idempotency makes a redelivery harmless), so a failure to extend
-- is swallowed rather than failing the job.
holdForLongPublish :: ReceiptHandle -> WorkerM ()
holdForLongPublish :: ReceiptHandle -> WorkerM ()
holdForLongPublish ReceiptHandle
receipt = do
    queue <- (WorkerRuntime -> MirrorQueue) -> WorkerM MirrorQueue
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks WorkerRuntime -> MirrorQueue
wrQueue
    _ <- tryAny (liftIO (extendVisibility queue receipt extendBy))
    pass
  where
    -- The window one publish is given before the message could redeliver mid-write.
    -- Sized to comfortably cover a publish of the maximum artifact ('workerArtifactLimits',
    -- 512 MiB): even over a slow mirror-target link (a conservative ~2 MiB/s floor)
    -- that uploads in well under 300s, so a successful publish never redelivers
    -- mid-flight. A *failed* publish does not wait this out -- the failure path resets
    -- the message to visible at once (see 'releaseForRetry') -- so the generous hold
    -- costs nothing on the retry path; this is the background worker's correct trade
    -- (never interrupt a slow success; retry latency on failure does not matter).
    extendBy :: Seconds
    extendBy :: Seconds
extendBy = Int -> Seconds
Seconds Int
300

-- Reset a received message to immediately visible, so a failed publish redelivers at
-- once rather than waiting out the long success-path hold ('holdForLongPublish'). A
-- best-effort optimization (a missed reset just means the message redelivers after the
-- hold instead), so a failure to reset is swallowed.
releaseForRetry :: ReceiptHandle -> WorkerM ()
releaseForRetry :: ReceiptHandle -> WorkerM ()
releaseForRetry ReceiptHandle
receipt = do
    queue <- (WorkerRuntime -> MirrorQueue) -> WorkerM MirrorQueue
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks WorkerRuntime -> MirrorQueue
wrQueue
    _ <- tryAny (liftIO (extendVisibility queue receipt (Seconds 0)))
    pass

-- A one-line identifier for a job, for log lines.
renderJob :: MirrorJob -> Text
renderJob :: MirrorJob -> Text
renderJob MirrorJob
job = PackageName -> Text
renderPackageName (MirrorJob -> PackageName
jobPackage MirrorJob
job) Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"@" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Version -> Text
renderVersion (MirrorJob -> Version
jobVersion MirrorJob
job)

-- Render an exception as 'Text' for a log line (relude's 'displayException' is over
-- 'String').
displayExceptionT :: (Exception e) => e -> Text
displayExceptionT :: forall e. Exception e => e -> Text
displayExceptionT = String -> Text
forall a. ToText a => a -> Text
toText (String -> Text) -> (e -> String) -> e -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. e -> String
forall e. Exception e => e -> String
displayException