| Safe Haskell | None |
|---|---|
| Language | GHC2021 |
Ecluse.Core.Worker
Description
The mirror worker: the supervised consume loop that turns enqueued jobs into mirrored packages.
The worker is the consumer end of the demand-driven mirror queue (see Ecluse.Core.Queue). The consume loop long-polls the queue, and for each received job:
- probes the mirror target for the job's version, acking a confirmed-present duplicate outright (demand-driven enqueue means a fleet-wide install of a novel version enqueues many jobs for it; only the first has work to do),
- re-evaluates current policy for the version through the same rules and single-version fetch the serve path gates with, so a version denied since its serve-time admit is dropped rather than mirrored,
- fetches the artifact bytes from the public upstream named on the job,
- verifies those bytes against the integrity digest the job carries -- the digest the rules admitted at serve time, not a fresh re-fetch,
- assembles the npm publish document and publishes it to the mirror target (the
publish-side registry handle on the
WorkerRuntime, resolved at the composition root with the bearer from the Ecluse.Core.Credential provider), and - acknowledges the job.
See individual modules for detailed behaviour: * Ecluse.Core.Worker.Integrity for the security gate on artifact digests. * Ecluse.Core.Worker.Loop for supervision and graceful shutdown. * Ecluse.Core.Worker.Job for ack semantics within the visibility budget.
See docs/architecture/cloud-backends.md → "Mirror Queue" and "Process model".
Synopsis
- data WorkerRuntime = WorkerRuntime {
- wrQueue :: MirrorQueue
- wrRegistry :: RegistryClient
- wrManager :: Manager
- wrHeartbeat :: WorkerHeartbeat
- wrMetrics :: WorkerMetricsPort
- wrTracing :: WorkerTracingPort
- wrInjectTraceContext :: forall (m :: Type -> Type) a. (KatipContext m, MonadIO m) => m a -> m a
- wrPolicies :: WorkerPolicies
- data WorkerPolicy = WorkerPolicy {
- wpResolveVersion :: PackageName -> Version -> IO VersionEvaluation
- wpRules :: [PreparedRule]
- wpNow :: IO UTCTime
- type WorkerPolicies = Map Ecosystem WorkerPolicy
- data WorkerM a
- runWorkerM :: LogEnv -> SimpleLogPayload -> WorkerRuntime -> WorkerM a -> IO a
- workerLoop :: WorkerM ()
- processBatch :: [QueueMessage] -> WorkerM ()
- processJob :: ReceiptHandle -> MirrorJob -> WorkerM JobOutcome
- data JobOutcome
- data WorkerHeartbeat
- newWorkerHeartbeat :: IO WorkerHeartbeat
- recordPoll :: WorkerHeartbeat -> UTCTime -> IO ()
- lastPoll :: WorkerHeartbeat -> IO (Maybe UTCTime)
- workerHeartbeatStaleAfter :: NominalDiffTime
- heartbeatHealthy :: UTCTime -> Maybe UTCTime -> Bool
- heartbeatHealthyNow :: WorkerHeartbeat -> IO Bool
- data IntegrityResult
- verifyIntegrity :: NonEmpty Hash -> ByteString -> IntegrityResult
Worker runtime
data WorkerRuntime Source #
The runtime backends the mirror worker is closed over: exactly the effectful
capabilities the consume loop needs to poll, fetch, verify, publish, and record. A
record of concrete handles and abstract ports (the Handle pattern), assembled by the
composition root (workerRuntimeOf) and read by the loop through the
WorkerM reader.
The mirror queue is the demand-driven hand-off the loop consumes; the publish-side
registry client writes approved artifacts to the mirror target; the untrusted
data-plane manager fetches the artifact bytes (the validating TLS manager, over an
https-only dist.tarball); the heartbeat is the loop's liveness surface. The metric and
tracing ports are the abstract recording interfaces (Ecluse.Core.Telemetry.Record,
Ecluse.Core.Telemetry.Span); the application supplies their OpenTelemetry-backed
implementations, so the loop records without naming a telemetry backend. There is no log
field: the loop logs through the ambient katip context the entry point establishes.
Constructors
| WorkerRuntime | |
Fields
| |
Instances
| MonadReader WorkerRuntime WorkerM Source # | |
Defined in Ecluse.Core.Worker.Types Methods ask :: WorkerM WorkerRuntime # local :: (WorkerRuntime -> WorkerRuntime) -> WorkerM a -> WorkerM a # reader :: (WorkerRuntime -> a) -> WorkerM a # | |
Per-ecosystem ingest re-evaluation
data WorkerPolicy Source #
The per-ecosystem re-evaluation bundle the worker re-runs current policy through before it mirrors a job: a resolver that fetches and projects the single version's metadata, the prepared rule set, and the wall-clock the age rules read.
The resolver is the shared single-version fetch-and-project
(fetchVersionDetails over the guarded public origin,
wired by the composition root), and the rules are the same prepared rules the serve
path gates with, so the worker's ingest decision and the serve-time decision run one
codepath and any per-source breaker state is shared, never forked.
Constructors
| WorkerPolicy | |
Fields
| |
type WorkerPolicies = Map Ecosystem WorkerPolicy Source #
The worker's per-ecosystem re-evaluation bundles, keyed by the ecosystem a job's
package belongs to (pkgEcosystem). Built once at boot and shared
with the serve mounts; a job whose ecosystem is absent here is fail-closed (dropped), never
mirrored unvetted.
The worker monad
The mirror worker's monad: a reader over the WorkerRuntime layered on katip's
logging context.
A newtype over so its instances are
this module's to control and call sites name one concrete monad. The derived instances
give reader access to the runtime (ReaderT WorkerRuntime (KatipContextT IO)MonadReader WorkerRuntime), arbitrary effects
(MonadIO), the unlift capability (MonadUnliftIO) the loop's tryAny and the per-job
span bracket need, and the katip classes (Katip, KatipContext) so a structured log
call composes through the ambient context the entry point establishes.
The katip base is a reader, never a StateT, so the logging context behaves correctly
across the loop (see docs/architecture/technology-stack.md → "Key Decisions").
Instances
| MonadIO WorkerM Source # | |
Defined in Ecluse.Core.Worker.Types | |
| Applicative WorkerM Source # | |
| Functor WorkerM Source # | |
| Monad WorkerM Source # | |
| Katip WorkerM Source # | |
| KatipContext WorkerM Source # | |
Defined in Ecluse.Core.Worker.Types Methods getKatipContext :: WorkerM LogContexts Source # localKatipContext :: (LogContexts -> LogContexts) -> WorkerM a -> WorkerM a Source # getKatipNamespace :: WorkerM Namespace Source # localKatipNamespace :: (Namespace -> Namespace) -> WorkerM a -> WorkerM a Source # | |
| MonadUnliftIO WorkerM Source # | |
Defined in Ecluse.Core.Worker.Types | |
| MonadReader WorkerRuntime WorkerM Source # | |
Defined in Ecluse.Core.Worker.Types Methods ask :: WorkerM WorkerRuntime # local :: (WorkerRuntime -> WorkerRuntime) -> WorkerM a -> WorkerM a # reader :: (WorkerRuntime -> a) -> WorkerM a # | |
runWorkerM :: LogEnv -> SimpleLogPayload -> WorkerRuntime -> WorkerM a -> IO a Source #
Run a WorkerM against the WorkerRuntime and the katip logging environment and
initial context the entry point supplies, yielding the underlying IO action. This is
the boundary where the worker's WorkerM code is discharged to IO.
The LogEnv (the structured-log scribes) and the initial context payload are passed in
rather than read from the runtime, so the application owns the log stream and the
trace-correlation dd enrichment: it resolves the dd identity and hands it here as the
initial context, so every line the loop emits carries dd. The loop narrows the
namespace with katip's combinators on top as it logs.
Loop and job processing (exposed for direct testing)
workerLoop :: WorkerM () Source #
The continuous consume loop: long-poll for a batch, process it, repeat.
Each iteration is wrapped so a single failure -- a receive that throws, a fetch or
publish error, an undecodable body -- is caught and logged, then the loop backs off
briefly and continues, so one bad iteration cannot kill the worker thread. A
successful poll advances the heartbeat (whether or not the batch was empty), so a
liveness probe sees the loop is alive; an idle queue is a healthy empty poll, not a
stall.
processBatch :: [QueueMessage] -> WorkerM () Source #
Process one received batch sequentially, so each job gets the full visibility budget rather than competing with its batch-mates for it. A batch is at most the queue's configured batch size (≤ 10), so sequential processing is a deliberate throughput-vs-budget choice, not a scaling bottleneck.
processJob :: ReceiptHandle -> MirrorJob -> WorkerM JobOutcome Source #
Process one mirror job end to end: probe the mirror target for the job's version
(a confirmed-present version is acked outright, the duplicate-suppression short-circuit),
then re-evaluate current policy, and only on a current admit fetch the artifact,
verify it against the job's serve-time-admitted integrity digest, and publish it to the
mirror target. Returns the JobOutcome that decides ack vs. redeliver.
The presence probe exists for the enqueue-to-availability window: mirroring is
demand-driven, so every public-leg admit of a still-unmirrored version enqueues its own
job, and a fleet-wide install of a novel version enqueues many. Without the probe each
duplicate pays a full artifact download and an integrity recompute before the publish
discovers the version is already present (the idempotent 409); with it, a duplicate
costs one metadata round trip. The probe is an optimisation, never a gate: it skips
only work whose publish would have been that no-op, so the policy re-evaluation below
still guards every artifact that actually publishes.
The policy re-evaluation is the ingest-time gate. The version was gated at serve time, but the enqueue-to-process window is asynchronous and unbounded, so policy may have tightened toward deny since (a new denylist entry, a freshly-published advisory, a rule-config change). The worker re-runs the same rules the serve path gates with, over the version resolved through the same single-version fetch-and-project, so a now-denied version is dropped (acked, never published) rather than frozen into the rule-exempt trusted mirror store; a version the upstream has since withdrawn is likewise dropped, while metadata that cannot be re-fetched (or a rule that cannot be computed) leaves the job for redelivery. A current admit proceeds to the integrity gate: a tampered or corrupt artifact fails the job with no publish, since the mirror is later served without the rules.
The receipt handle is taken so a long publish can extendVisibility
to hold the message before its window lapses.
The per-job domain span (the worker tracing port) wraps the whole probe → re-evaluate →
fetch → verify → publish, projecting the terminal outcome onto the span so a refused or dropped job
is explainable from the trace, and linking back to the request that enqueued the job
through the trace context the job carries (jobTraceContext). The span body is discharged
to IO through the unlift, so the loop's structured log lines still compose through the
ambient katip context.
data JobOutcome Source #
The terminal outcome of processing one mirror job, deciding whether the message is acked or left to redeliver.
Constructors
| Succeeded | The publish succeeded, so the job is acked. This covers an idempotent
redelivery too: a version already present at the mirror target is a |
| Dropped Text | A non-retryable fault: the bytes did not match the serve-time digest (tamper), or the publish URL was unformable (misconfiguration). Redelivery cannot help, so the job is dropped after alarming. Carries the reason. |
| Retried Text | A transient fault: a fetch failure, or a registry rejection worth retrying. The message is left un-acked so it redelivers. Carries the reason. |
Instances
| Show JobOutcome Source # | |
Defined in Ecluse.Core.Worker.Job Methods showsPrec :: Int -> JobOutcome -> ShowS # show :: JobOutcome -> String # showList :: [JobOutcome] -> ShowS # | |
| Eq JobOutcome Source # | |
Defined in Ecluse.Core.Worker.Job | |
Liveness
data WorkerHeartbeat Source #
The mirror worker's consume-loop heartbeat: the wall-clock time of the worker's last successful poll of the queue.
It is the worker's own liveness signal, kept apart from the server's HTTP
readiness so single-process health reflects a stalled worker today and a future
standalone worker binary keeps the same probe. The worker recordPolls after each
successful receive (whether or not the batch was empty -- an empty long-poll is a
healthy idle, not a stall); a liveness probe reads lastPoll and compares it
against the wall clock to decide whether the loop has gone quiet for too long.
newWorkerHeartbeat :: IO WorkerHeartbeat Source #
Build a fresh WorkerHeartbeat with no poll yet recorded (lastPoll is
Nothing until the worker's first successful receive).
recordPoll :: WorkerHeartbeat -> UTCTime -> IO () Source #
Record the time of a successful queue poll, advancing the heartbeat. Called
by the worker after each receive returns (the loop is alive even on an empty
batch).
lastPoll :: WorkerHeartbeat -> IO (Maybe UTCTime) Source #
The time of the worker's last successful poll, or Nothing before its first.
A liveness probe reads this and compares it against the wall clock.
workerHeartbeatStaleAfter :: NominalDiffTime Source #
How long the worker's last successful poll may be stale before the loop is considered stalled -- the staleness threshold the liveness probe applies.
It is a generous multiple of the long-poll cadence: a healthy idle worker still
completes a poll at least every sqsWaitSeconds (≤ 20s by
default), so a gap several times that is a genuine stall, not an idle queue. Set
well above one poll window so liveness never flaps on normal scheduling jitter.
heartbeatHealthy :: UTCTime -> Maybe UTCTime -> Bool Source #
Whether the worker's consume loop is healthy as of now, given its last
successful poll. This is the liveness signal the single-process /livez probe
folds in (see Ecluse.Server), distinct from HTTP readiness.
Nothing(no poll yet) is healthy: the worker is still starting, not stalled.- A poll within
workerHeartbeatStaleAfteris healthy. - A poll older than that is unhealthy: the loop has gone quiet for too long.
>>>import Data.Time (UTCTime (UTCTime), fromGregorian, secondsToDiffTime)>>>let t0 = UTCTime (fromGregorian 2020 1 1) (secondsToDiffTime 0)>>>heartbeatHealthy t0 NothingTrue
>>>let now = UTCTime (fromGregorian 2020 1 1) (secondsToDiffTime 10)>>>heartbeatHealthy now (Just t0)True
>>>let later = UTCTime (fromGregorian 2020 1 1) (secondsToDiffTime 300)>>>heartbeatHealthy later (Just t0)False
heartbeatHealthyNow :: WorkerHeartbeat -> IO Bool Source #
Read the worker heartbeat and decide liveness against the current wall clock --
the IO wrapper the liveness probe calls. True while the consume loop is alive
(or still starting); False once the last successful poll is staler than
workerHeartbeatStaleAfter.
Integrity verification
data IntegrityResult Source #
The result of verifying fetched bytes against the admitted integrity digests.
A sum type, not a Bool, so the mismatch carries the detail an operator needs to
explain why a publish was refused.
Constructors
| IntegrityVerified | The bytes matched the most authoritative admitted digest. |
| IntegrityMismatch Text | The bytes failed the integrity gate. Carries a human-readable detail (the digest they were checked against, or that the strongest one was uncomputable). |
Instances
| Show IntegrityResult Source # | |
Defined in Ecluse.Core.Worker.Integrity Methods showsPrec :: Int -> IntegrityResult -> ShowS # show :: IntegrityResult -> String # showList :: [IntegrityResult] -> ShowS # | |
| Eq IntegrityResult Source # | |
Defined in Ecluse.Core.Worker.Integrity Methods (==) :: IntegrityResult -> IntegrityResult -> Bool # (/=) :: IntegrityResult -> IntegrityResult -> Bool # | |
verifyIntegrity :: NonEmpty Hash -> ByteString -> IntegrityResult Source #
Verify fetched artifact bytes against the most authoritative integrity digest the version carries -- never against a weaker one while a stronger is present.
A real npm version carries both a modern SRI sha512 digest and the legacy SHA-1
shasum. Passing on any match would let an artifact that matches the weak SHA-1
but fails the strong sha512 through -- and SHA-1 collision resistance is broken, so
that is exploitable. So the gate ranks the admitted digests by algorithm authority
(strongest first: sha512 / blake2b > sha384 > sha256 > sha1 > md5), and
checks the bytes against the strongest one present: the bytes pass iff that digest
matches.
A weaker digest can neither override nor rescue a failed strong one.
The bytes are recomputed in the strongest digest's own algorithm through the shared
computeDigest, the one definition of which algorithms Écluse can
verify. That computable set covers every algorithm the public integrity floor admits, so an
admitted artifact is always verifiable here. If the strongest digest is nonetheless in an
algorithm computeDigest declines (MD5, a forgeable hash) or an SRI whose inner algorithm
does not resolve, the gate fails closed rather than falling back to a weaker digest: a
tampered artifact must never be admitted on the strength of a hash an attacker could forge.
This is the tamper gate before a publish: a mismatch fails the job and never publishes a corrupt or substituted artifact into the private upstream.
>>>import Ecluse.Core.Package (mkHash, HashAlg (SHA1))>>>fmap (\h -> verifyIntegrity (h :| []) "Hello World") (mkHash SHA1 "0a4d55a8d778e5022fab701977c5d840bbc486d0")Right IntegrityVerified
>>>fmap (\h -> verifyIntegrity (h :| []) "Hello World") (mkHash SHA1 "da39a3ee5e6b4b0d3255bfef95601890afd80709")Right (IntegrityMismatch "the SHA1 digest did not match the fetched bytes")