ecluse:ecluse-core
Safe HaskellNone
LanguageGHC2021

Ecluse.Core.Worker

Description

The mirror worker: the supervised consume loop that turns enqueued jobs into mirrored packages.

The worker is the consumer end of the demand-driven mirror queue (see Ecluse.Core.Queue). The consume loop long-polls the queue, and for each received job:

  1. probes the mirror target for the job's version, acking a confirmed-present duplicate outright (demand-driven enqueue means a fleet-wide install of a novel version enqueues many jobs for it; only the first has work to do),
  2. re-evaluates current policy for the version through the same rules and single-version fetch the serve path gates with, so a version denied since its serve-time admit is dropped rather than mirrored,
  3. fetches the artifact bytes from the public upstream named on the job,
  4. verifies those bytes against the integrity digest the job carries -- the digest the rules admitted at serve time, not a fresh re-fetch,
  5. assembles the npm publish document and publishes it to the mirror target (the publish-side registry handle on the WorkerRuntime, resolved at the composition root with the bearer from the Ecluse.Core.Credential provider), and
  6. acknowledges the job.

See individual modules for detailed behaviour: * Ecluse.Core.Worker.Integrity for the security gate on artifact digests. * Ecluse.Core.Worker.Loop for supervision and graceful shutdown. * Ecluse.Core.Worker.Job for ack semantics within the visibility budget.

See docs/architecture/cloud-backends.md → "Mirror Queue" and "Process model".

Synopsis

Worker runtime

data WorkerRuntime Source #

The runtime backends the mirror worker is closed over: exactly the effectful capabilities the consume loop needs to poll, fetch, verify, publish, and record. A record of concrete handles and abstract ports (the Handle pattern), assembled by the composition root (workerRuntimeOf) and read by the loop through the WorkerM reader.

The mirror queue is the demand-driven hand-off the loop consumes; the publish-side registry client writes approved artifacts to the mirror target; the untrusted data-plane manager fetches the artifact bytes (the validating TLS manager, over an https-only dist.tarball); the heartbeat is the loop's liveness surface. The metric and tracing ports are the abstract recording interfaces (Ecluse.Core.Telemetry.Record, Ecluse.Core.Telemetry.Span); the application supplies their OpenTelemetry-backed implementations, so the loop records without naming a telemetry backend. There is no log field: the loop logs through the ambient katip context the entry point establishes.

Constructors

WorkerRuntime 

Fields

  • wrQueue :: MirrorQueue

    The mirror-queue handle the consume loop long-polls and acks against.

  • wrRegistry :: RegistryClient

    The publish-side registry handle approved artifacts are written to the mirror target through.

  • wrManager :: Manager

    The validating-TLS data-plane manager for the untrusted artifact fetch (over an https-only dist.tarball).

  • wrHeartbeat :: WorkerHeartbeat

    The consume-loop heartbeat, advanced on every successful poll and read by the liveness probe.

  • wrMetrics :: WorkerMetricsPort

    The metric-recording port the worker emits its ecluse.mirror.* job signals through.

  • wrTracing :: WorkerTracingPort

    The tracing port the worker opens its per-job span through.

  • wrInjectTraceContext :: forall (m :: Type -> Type) a. (KatipContext m, MonadIO m) => m a -> m a

    Evaluate and inject the current OpenTelemetry correlation payload into the katip context for the inner action.

  • wrPolicies :: WorkerPolicies

    The per-ecosystem re-evaluation bundles, keyed by a job's ecosystem. The worker re-runs current policy against a job's version before it mirrors it, so a policy that has tightened toward deny since the job was enqueued drops the job rather than freezing a now-disallowed version into the trusted mirror store.

Instances

Instances details
MonadReader WorkerRuntime WorkerM Source # 
Instance details

Defined in Ecluse.Core.Worker.Types

Per-ecosystem ingest re-evaluation

data WorkerPolicy Source #

The per-ecosystem re-evaluation bundle the worker re-runs current policy through before it mirrors a job: a resolver that fetches and projects the single version's metadata, the prepared rule set, and the wall-clock the age rules read.

The resolver is the shared single-version fetch-and-project (fetchVersionDetails over the guarded public origin, wired by the composition root), and the rules are the same prepared rules the serve path gates with, so the worker's ingest decision and the serve-time decision run one codepath and any per-source breaker state is shared, never forked.

Constructors

WorkerPolicy 

Fields

  • wpResolveVersion :: PackageName -> Version -> IO VersionEvaluation

    Resolve and project one version's metadata through the guarded public origin, classifying the outcome (fetchVersionDetails). Total: a fetch failure is a VersionMetadataUnavailable value, never an escaping exception.

  • wpRules :: [PreparedRule]

    The prepared rule set evaluated against the resolved version under current policy (the same rules the serve path gates the public version set with).

  • wpNow :: IO UTCTime

    The wall-clock "now" for the rules' EvalContext; injected so the time-sensitive age gate is deterministic under test.

type WorkerPolicies = Map Ecosystem WorkerPolicy Source #

The worker's per-ecosystem re-evaluation bundles, keyed by the ecosystem a job's package belongs to (pkgEcosystem). Built once at boot and shared with the serve mounts; a job whose ecosystem is absent here is fail-closed (dropped), never mirrored unvetted.

The worker monad

data WorkerM a Source #

The mirror worker's monad: a reader over the WorkerRuntime layered on katip's logging context.

A newtype over ReaderT WorkerRuntime (KatipContextT IO) so its instances are this module's to control and call sites name one concrete monad. The derived instances give reader access to the runtime (MonadReader WorkerRuntime), arbitrary effects (MonadIO), the unlift capability (MonadUnliftIO) the loop's tryAny and the per-job span bracket need, and the katip classes (Katip, KatipContext) so a structured log call composes through the ambient context the entry point establishes.

The katip base is a reader, never a StateT, so the logging context behaves correctly across the loop (see docs/architecture/technology-stack.md → "Key Decisions").

Instances

Instances details
MonadIO WorkerM Source # 
Instance details

Defined in Ecluse.Core.Worker.Types

Methods

liftIO :: IO a -> WorkerM a #

Applicative WorkerM Source # 
Instance details

Defined in Ecluse.Core.Worker.Types

Methods

pure :: a -> WorkerM a #

(<*>) :: WorkerM (a -> b) -> WorkerM a -> WorkerM b #

liftA2 :: (a -> b -> c) -> WorkerM a -> WorkerM b -> WorkerM c #

(*>) :: WorkerM a -> WorkerM b -> WorkerM b #

(<*) :: WorkerM a -> WorkerM b -> WorkerM a #

Functor WorkerM Source # 
Instance details

Defined in Ecluse.Core.Worker.Types

Methods

fmap :: (a -> b) -> WorkerM a -> WorkerM b #

(<$) :: a -> WorkerM b -> WorkerM a #

Monad WorkerM Source # 
Instance details

Defined in Ecluse.Core.Worker.Types

Methods

(>>=) :: WorkerM a -> (a -> WorkerM b) -> WorkerM b #

(>>) :: WorkerM a -> WorkerM b -> WorkerM b #

return :: a -> WorkerM a #

Katip WorkerM Source # 
Instance details

Defined in Ecluse.Core.Worker.Types

KatipContext WorkerM Source # 
Instance details

Defined in Ecluse.Core.Worker.Types

MonadUnliftIO WorkerM Source # 
Instance details

Defined in Ecluse.Core.Worker.Types

Methods

withRunInIO :: ((forall a. WorkerM a -> IO a) -> IO b) -> WorkerM b Source #

MonadReader WorkerRuntime WorkerM Source # 
Instance details

Defined in Ecluse.Core.Worker.Types

runWorkerM :: LogEnv -> SimpleLogPayload -> WorkerRuntime -> WorkerM a -> IO a Source #

Run a WorkerM against the WorkerRuntime and the katip logging environment and initial context the entry point supplies, yielding the underlying IO action. This is the boundary where the worker's WorkerM code is discharged to IO.

The LogEnv (the structured-log scribes) and the initial context payload are passed in rather than read from the runtime, so the application owns the log stream and the trace-correlation dd enrichment: it resolves the dd identity and hands it here as the initial context, so every line the loop emits carries dd. The loop narrows the namespace with katip's combinators on top as it logs.

Loop and job processing (exposed for direct testing)

workerLoop :: WorkerM () Source #

The continuous consume loop: long-poll for a batch, process it, repeat.

Each iteration is wrapped so a single failure -- a receive that throws, a fetch or publish error, an undecodable body -- is caught and logged, then the loop backs off briefly and continues, so one bad iteration cannot kill the worker thread. A successful poll advances the heartbeat (whether or not the batch was empty), so a liveness probe sees the loop is alive; an idle queue is a healthy empty poll, not a stall.

processBatch :: [QueueMessage] -> WorkerM () Source #

Process one received batch sequentially, so each job gets the full visibility budget rather than competing with its batch-mates for it. A batch is at most the queue's configured batch size (≤ 10), so sequential processing is a deliberate throughput-vs-budget choice, not a scaling bottleneck.

processJob :: ReceiptHandle -> MirrorJob -> WorkerM JobOutcome Source #

Process one mirror job end to end: probe the mirror target for the job's version (a confirmed-present version is acked outright, the duplicate-suppression short-circuit), then re-evaluate current policy, and only on a current admit fetch the artifact, verify it against the job's serve-time-admitted integrity digest, and publish it to the mirror target. Returns the JobOutcome that decides ack vs. redeliver.

The presence probe exists for the enqueue-to-availability window: mirroring is demand-driven, so every public-leg admit of a still-unmirrored version enqueues its own job, and a fleet-wide install of a novel version enqueues many. Without the probe each duplicate pays a full artifact download and an integrity recompute before the publish discovers the version is already present (the idempotent 409); with it, a duplicate costs one metadata round trip. The probe is an optimisation, never a gate: it skips only work whose publish would have been that no-op, so the policy re-evaluation below still guards every artifact that actually publishes.

The policy re-evaluation is the ingest-time gate. The version was gated at serve time, but the enqueue-to-process window is asynchronous and unbounded, so policy may have tightened toward deny since (a new denylist entry, a freshly-published advisory, a rule-config change). The worker re-runs the same rules the serve path gates with, over the version resolved through the same single-version fetch-and-project, so a now-denied version is dropped (acked, never published) rather than frozen into the rule-exempt trusted mirror store; a version the upstream has since withdrawn is likewise dropped, while metadata that cannot be re-fetched (or a rule that cannot be computed) leaves the job for redelivery. A current admit proceeds to the integrity gate: a tampered or corrupt artifact fails the job with no publish, since the mirror is later served without the rules.

The receipt handle is taken so a long publish can extendVisibility to hold the message before its window lapses.

The per-job domain span (the worker tracing port) wraps the whole probe → re-evaluate → fetch → verify → publish, projecting the terminal outcome onto the span so a refused or dropped job is explainable from the trace, and linking back to the request that enqueued the job through the trace context the job carries (jobTraceContext). The span body is discharged to IO through the unlift, so the loop's structured log lines still compose through the ambient katip context.

data JobOutcome Source #

The terminal outcome of processing one mirror job, deciding whether the message is acked or left to redeliver.

Constructors

Succeeded

The publish succeeded, so the job is acked. This covers an idempotent redelivery too: a version already present at the mirror target is a 409 the registry handle treats as success (publishArtifact), so it surfaces here as Succeeded rather than a distinct case -- as does the same presence confirmed by the pre-fetch probe, before any bytes moved.

Dropped Text

A non-retryable fault: the bytes did not match the serve-time digest (tamper), or the publish URL was unformable (misconfiguration). Redelivery cannot help, so the job is dropped after alarming. Carries the reason.

Retried Text

A transient fault: a fetch failure, or a registry rejection worth retrying. The message is left un-acked so it redelivers. Carries the reason.

Instances

Instances details
Show JobOutcome Source # 
Instance details

Defined in Ecluse.Core.Worker.Job

Eq JobOutcome Source # 
Instance details

Defined in Ecluse.Core.Worker.Job

Liveness

data WorkerHeartbeat Source #

The mirror worker's consume-loop heartbeat: the wall-clock time of the worker's last successful poll of the queue.

It is the worker's own liveness signal, kept apart from the server's HTTP readiness so single-process health reflects a stalled worker today and a future standalone worker binary keeps the same probe. The worker recordPolls after each successful receive (whether or not the batch was empty -- an empty long-poll is a healthy idle, not a stall); a liveness probe reads lastPoll and compares it against the wall clock to decide whether the loop has gone quiet for too long.

newWorkerHeartbeat :: IO WorkerHeartbeat Source #

Build a fresh WorkerHeartbeat with no poll yet recorded (lastPoll is Nothing until the worker's first successful receive).

recordPoll :: WorkerHeartbeat -> UTCTime -> IO () Source #

Record the time of a successful queue poll, advancing the heartbeat. Called by the worker after each receive returns (the loop is alive even on an empty batch).

lastPoll :: WorkerHeartbeat -> IO (Maybe UTCTime) Source #

The time of the worker's last successful poll, or Nothing before its first. A liveness probe reads this and compares it against the wall clock.

workerHeartbeatStaleAfter :: NominalDiffTime Source #

How long the worker's last successful poll may be stale before the loop is considered stalled -- the staleness threshold the liveness probe applies.

It is a generous multiple of the long-poll cadence: a healthy idle worker still completes a poll at least every sqsWaitSeconds (≤ 20s by default), so a gap several times that is a genuine stall, not an idle queue. Set well above one poll window so liveness never flaps on normal scheduling jitter.

heartbeatHealthy :: UTCTime -> Maybe UTCTime -> Bool Source #

Whether the worker's consume loop is healthy as of now, given its last successful poll. This is the liveness signal the single-process /livez probe folds in (see Ecluse.Server), distinct from HTTP readiness.

  • Nothing (no poll yet) is healthy: the worker is still starting, not stalled.
  • A poll within workerHeartbeatStaleAfter is healthy.
  • A poll older than that is unhealthy: the loop has gone quiet for too long.
>>> import Data.Time (UTCTime (UTCTime), fromGregorian, secondsToDiffTime)
>>> let t0 = UTCTime (fromGregorian 2020 1 1) (secondsToDiffTime 0)
>>> heartbeatHealthy t0 Nothing
True
>>> let now = UTCTime (fromGregorian 2020 1 1) (secondsToDiffTime 10)
>>> heartbeatHealthy now (Just t0)
True
>>> let later = UTCTime (fromGregorian 2020 1 1) (secondsToDiffTime 300)
>>> heartbeatHealthy later (Just t0)
False

heartbeatHealthyNow :: WorkerHeartbeat -> IO Bool Source #

Read the worker heartbeat and decide liveness against the current wall clock -- the IO wrapper the liveness probe calls. True while the consume loop is alive (or still starting); False once the last successful poll is staler than workerHeartbeatStaleAfter.

Integrity verification

data IntegrityResult Source #

The result of verifying fetched bytes against the admitted integrity digests. A sum type, not a Bool, so the mismatch carries the detail an operator needs to explain why a publish was refused.

Constructors

IntegrityVerified

The bytes matched the most authoritative admitted digest.

IntegrityMismatch Text

The bytes failed the integrity gate. Carries a human-readable detail (the digest they were checked against, or that the strongest one was uncomputable).

verifyIntegrity :: NonEmpty Hash -> ByteString -> IntegrityResult Source #

Verify fetched artifact bytes against the most authoritative integrity digest the version carries -- never against a weaker one while a stronger is present.

A real npm version carries both a modern SRI sha512 digest and the legacy SHA-1 shasum. Passing on any match would let an artifact that matches the weak SHA-1 but fails the strong sha512 through -- and SHA-1 collision resistance is broken, so that is exploitable. So the gate ranks the admitted digests by algorithm authority (strongest first: sha512 / blake2b > sha384 > sha256 > sha1 > md5), and checks the bytes against the strongest one present: the bytes pass iff that digest matches. A weaker digest can neither override nor rescue a failed strong one.

The bytes are recomputed in the strongest digest's own algorithm through the shared computeDigest, the one definition of which algorithms Écluse can verify. That computable set covers every algorithm the public integrity floor admits, so an admitted artifact is always verifiable here. If the strongest digest is nonetheless in an algorithm computeDigest declines (MD5, a forgeable hash) or an SRI whose inner algorithm does not resolve, the gate fails closed rather than falling back to a weaker digest: a tampered artifact must never be admitted on the strength of a hash an attacker could forge.

This is the tamper gate before a publish: a mismatch fails the job and never publishes a corrupt or substituted artifact into the private upstream.

>>> import Ecluse.Core.Package (mkHash, HashAlg (SHA1))
>>> fmap (\h -> verifyIntegrity (h :| []) "Hello World") (mkHash SHA1 "0a4d55a8d778e5022fab701977c5d840bbc486d0")
Right IntegrityVerified
>>> fmap (\h -> verifyIntegrity (h :| []) "Hello World") (mkHash SHA1 "da39a3ee5e6b4b0d3255bfef95601890afd80709")
Right (IntegrityMismatch "the SHA1 digest did not match the fetched bytes")