| Safe Haskell | None |
|---|---|
| Language | GHC2021 |
Ecluse.Core.Worker.Job
Description
Ack within the visibility budget during job processing.
A received message is hidden only for the queue's visibility window. The worker
acks on success; before a publish that may run long it calls
extendVisibility to hold the message before the window lapses; on a
transient failure it does not ack, so the message redelivers. A batch is
processed sequentially, so each job has the full visibility budget rather than
competing with its batch-mates for it.
Synopsis
- data JobOutcome
- processJob :: ReceiptHandle -> MirrorJob -> WorkerM JobOutcome
- processBatch :: [QueueMessage] -> WorkerM ()
- displayExceptionT :: Exception e => e -> Text
Documentation
data JobOutcome Source #
The terminal outcome of processing one mirror job, deciding whether the message is acked or left to redeliver.
Constructors
| Succeeded | The publish succeeded, so the job is acked. This covers an idempotent
redelivery too: a version already present at the mirror target is a |
| Dropped Text | A non-retryable fault: the bytes did not match the serve-time digest (tamper), or the publish URL was unformable (misconfiguration). Redelivery cannot help, so the job is dropped after alarming. Carries the reason. |
| Retried Text | A transient fault: a fetch failure, or a registry rejection worth retrying. The message is left un-acked so it redelivers. Carries the reason. |
Instances
| Show JobOutcome Source # | |
Defined in Ecluse.Core.Worker.Job Methods showsPrec :: Int -> JobOutcome -> ShowS # show :: JobOutcome -> String # showList :: [JobOutcome] -> ShowS # | |
| Eq JobOutcome Source # | |
Defined in Ecluse.Core.Worker.Job | |
processJob :: ReceiptHandle -> MirrorJob -> WorkerM JobOutcome Source #
Process one mirror job end to end: probe the mirror target for the job's version
(a confirmed-present version is acked outright, the duplicate-suppression short-circuit),
then re-evaluate current policy, and only on a current admit fetch the artifact,
verify it against the job's serve-time-admitted integrity digest, and publish it to the
mirror target. Returns the JobOutcome that decides ack vs. redeliver.
The presence probe exists for the enqueue-to-availability window: mirroring is
demand-driven, so every public-leg admit of a still-unmirrored version enqueues its own
job, and a fleet-wide install of a novel version enqueues many. Without the probe each
duplicate pays a full artifact download and an integrity recompute before the publish
discovers the version is already present (the idempotent 409); with it, a duplicate
costs one metadata round trip. The probe is an optimisation, never a gate: it skips
only work whose publish would have been that no-op, so the policy re-evaluation below
still guards every artifact that actually publishes.
The policy re-evaluation is the ingest-time gate. The version was gated at serve time, but the enqueue-to-process window is asynchronous and unbounded, so policy may have tightened toward deny since (a new denylist entry, a freshly-published advisory, a rule-config change). The worker re-runs the same rules the serve path gates with, over the version resolved through the same single-version fetch-and-project, so a now-denied version is dropped (acked, never published) rather than frozen into the rule-exempt trusted mirror store; a version the upstream has since withdrawn is likewise dropped, while metadata that cannot be re-fetched (or a rule that cannot be computed) leaves the job for redelivery. A current admit proceeds to the integrity gate: a tampered or corrupt artifact fails the job with no publish, since the mirror is later served without the rules.
The receipt handle is taken so a long publish can extendVisibility
to hold the message before its window lapses.
The per-job domain span (the worker tracing port) wraps the whole probe → re-evaluate →
fetch → verify → publish, projecting the terminal outcome onto the span so a refused or dropped job
is explainable from the trace, and linking back to the request that enqueued the job
through the trace context the job carries (jobTraceContext). The span body is discharged
to IO through the unlift, so the loop's structured log lines still compose through the
ambient katip context.
processBatch :: [QueueMessage] -> WorkerM () Source #
Process one received batch sequentially, so each job gets the full visibility budget rather than competing with its batch-mates for it. A batch is at most the queue's configured batch size (≤ 10), so sequential processing is a deliberate throughput-vs-budget choice, not a scaling bottleneck.
displayExceptionT :: Exception e => e -> Text Source #