| Safe Haskell | None |
|---|---|
| Language | GHC2021 |
Ecluse.Core.Queue
Description
The mirror-queue handle: the durable hand-off from the request path to the mirror worker.
Mirroring is demand-driven: when a client fetches an artifact whose version
passes the rules, the proxy enqueues a MirrorJob and serves the artifact
immediately, never blocking on the mirror. A separate worker receives jobs,
fetches and verifies the artifact, publishes it to the mirror target, and acks
the job (see docs/architecture/cloud-backends.md → "Mirror Queue").
The queue is the one cloud surface with materially different APIs per provider
(AWS SQS SendMessage/ReceiveMessage+visibility-timeout/DeleteMessage; GCP
Pub/Sub Publish/Pull+ack-deadline/Acknowledge), so it is its own handle --
a record of functions (the Handle pattern). Both providers fit the same
receive → process → ack shape; their differences (visibility timeout vs ack
deadline, batch limits, dead-letter wiring) stay behind the handle, and
ReceiptHandle is opaque so neither leaks.
Like the other handles, the effectful fields return IO, not App, so an
adapter stays decoupled from the proxy's Env/App (see
docs/architecture/technology-stack.md → "Key Decisions").
Conventions
The two cloud backends both give at-least-once delivery, which is safe here because publishing is idempotent (a registry treats versions as immutable). The handle's contract reflects that:
enqueueis best-effort. It runs on the request hot path (enqueue, then serve immediately), so a failure must be logged/metered and __never fail the client response__ -- the artifact is already served, and a later pull re-enqueues.- Retry is "don't
ack". A job that fails processing is simply not acked; the visibility timeout / ack deadline redelivers it, and the backend's native dead-letter path catches the persistently failing ones. There is deliberately nonack. extendVisibilitylets the worker hold a long publish (a large artifact) past the visibility window. It is an optimization, not correctness-critical, since idempotency already makes redelivery harmless.
This module provides the handle and its payload types, plus two STM-backed in-memory implementations:
newInMemoryQueue-- the test double that models the cloud backends' visibility-timeout semantics (receive → ack / redeliver-on-no-ack), used to exercise the worker's retry path without a cloud queue.newBoundedInMemoryQueue-- the bounded, best-effort production backend selected byECLUSE_QUEUE_BACKEND=memory. See its own Haddock for why it is correctness-safe (a dropped job is re-enqueued on the next demand) and why it deliberately does not redeliver.
It also provides newEnqueueBuffer, a bounded producer-side hand-off buffer
wrapped in front of either backend so the serve path's enqueue completes in
microseconds while a composition-root drain loop delivers to the (possibly slow)
backend off the request path.
Synopsis
- data MirrorQueue = MirrorQueue {
- enqueue :: MirrorJob -> IO ()
- receive :: IO [QueueMessage]
- ack :: ReceiptHandle -> IO ()
- extendVisibility :: ReceiptHandle -> Seconds -> IO ()
- data MirrorJob = MirrorJob {}
- data MirrorArtifact = MirrorArtifact {}
- data RemoteSpanContext = RemoteSpanContext {}
- data QueueMessage = QueueMessage {}
- data ReceiptHandle
- mkReceiptHandle :: Text -> ReceiptHandle
- unReceiptHandle :: ReceiptHandle -> Text
- newtype Seconds = Seconds Int
- newInMemoryQueue :: IO MirrorQueue
- data MemoryQueueConfig = MemoryQueueConfig {}
- defaultMemoryQueueConfig :: Int -> MemoryQueueConfig
- newBoundedInMemoryQueue :: MemoryQueueConfig -> (Int -> IO ()) -> IO MirrorQueue
- memoryQueueBatchSize :: Int
- memoryQueueDropReportInterval :: Int
- newEnqueueBuffer :: Int -> (Int -> IO ()) -> (Int -> Text -> IO ()) -> MirrorQueue -> IO (MirrorQueue, IO ())
Queue handle
data MirrorQueue Source #
The mirror-queue handle -- a record of functions over a backend whose private
state the closures capture. See the module header for the enqueue /
don't-ack-to-retry / no-nack conventions; all fields are IO.
Constructors
| MirrorQueue | |
Fields
| |
Payloads
A mirror job: everything the worker needs to back-fill one artifact into the mirror target. The version was already gated by the rules at serve time (when the job was enqueued), so the worker does not re-run the rules; it fetches the bytes, verifies them against the serve-time-admitted integrity digest the job carries, and publishes.
The integrity digest and the artifact descriptor are captured at enqueue time
(jobArtifact), not re-fetched: the worker mirrors exactly what the rules
admitted, so an upstream packument mutated in the enqueue → process window cannot
substitute a different artifact for the one that was gated. The descriptor also
carries the filename and declared size the worker needs to assemble the publish
document.
Constructors
| MirrorJob | |
Fields
| |
data MirrorArtifact Source #
The serve-time-admitted artifact descriptor carried on a MirrorJob: exactly
the fields the worker needs to verify the fetched bytes and assemble the publish
document, captured when the version was gated.
maHashes is a NonEmpty because the serve path admits a public version only when
it carries at least one integrity digest (the integrity-presence admission policy),
so a job with no digest to verify against is unrepresentable -- the worker always
has a fingerprint to check the bytes against before they reach the private upstream.
Constructors
| MirrorArtifact | |
Fields
| |
Instances
| Show MirrorArtifact Source # | |
Defined in Ecluse.Core.Queue Methods showsPrec :: Int -> MirrorArtifact -> ShowS # show :: MirrorArtifact -> String # showList :: [MirrorArtifact] -> ShowS # | |
| Eq MirrorArtifact Source # | |
Defined in Ecluse.Core.Queue Methods (==) :: MirrorArtifact -> MirrorArtifact -> Bool # (/=) :: MirrorArtifact -> MirrorArtifact -> Bool # | |
data RemoteSpanContext Source #
A serialised W3C trace-context carrier riding on a MirrorJob: the
traceparent (and any tracestate) of the span that enqueued the job, in the
standard wire encoding. It is captured at enqueue time and read back by the worker's
tracing port to re-establish a span link from the per-job span to the enqueueing
request, so the asynchronous mirror hand-off is navigable in a trace.
The two fields are the W3C header values verbatim; the queue carries them opaquely (it neither parses nor validates them -- an unparseable carrier simply yields no link), so this type names what is carried without coupling the queue to any tracing backend.
Constructors
| RemoteSpanContext | |
Fields
| |
Instances
| Show RemoteSpanContext Source # | |
Defined in Ecluse.Core.Queue Methods showsPrec :: Int -> RemoteSpanContext -> ShowS # show :: RemoteSpanContext -> String # showList :: [RemoteSpanContext] -> ShowS # | |
| Eq RemoteSpanContext Source # | |
Defined in Ecluse.Core.Queue Methods (==) :: RemoteSpanContext -> RemoteSpanContext -> Bool # (/=) :: RemoteSpanContext -> RemoteSpanContext -> Bool # | |
data QueueMessage Source #
A received message: the MirrorJob to process together with the
ReceiptHandle used to ack it (or extendVisibility on it) once processed.
Constructors
| QueueMessage | |
Fields
| |
Instances
| Show QueueMessage Source # | |
Defined in Ecluse.Core.Queue Methods showsPrec :: Int -> QueueMessage -> ShowS # show :: QueueMessage -> String # showList :: [QueueMessage] -> ShowS # | |
| Eq QueueMessage Source # | |
Defined in Ecluse.Core.Queue | |
Opaque receipt
data ReceiptHandle Source #
An opaque handle identifying a received message for ack /
extendVisibility. It carries the backend's own delivery token -- an SQS receipt
handle or a Pub/Sub ackId -- as text; the constructor is hidden so neither
provider's representation leaks into worker code, and a handle is only ever
obtained from a QueueMessage returned by receive. Build one (in a backend)
with mkReceiptHandle and read the token back with unReceiptHandle.
Instances
| Show ReceiptHandle Source # | |
Defined in Ecluse.Core.Queue Methods showsPrec :: Int -> ReceiptHandle -> ShowS # show :: ReceiptHandle -> String # showList :: [ReceiptHandle] -> ShowS # | |
| Eq ReceiptHandle Source # | |
Defined in Ecluse.Core.Queue Methods (==) :: ReceiptHandle -> ReceiptHandle -> Bool # (/=) :: ReceiptHandle -> ReceiptHandle -> Bool # | |
| Ord ReceiptHandle Source # | |
Defined in Ecluse.Core.Queue Methods compare :: ReceiptHandle -> ReceiptHandle -> Ordering # (<) :: ReceiptHandle -> ReceiptHandle -> Bool # (<=) :: ReceiptHandle -> ReceiptHandle -> Bool # (>) :: ReceiptHandle -> ReceiptHandle -> Bool # (>=) :: ReceiptHandle -> ReceiptHandle -> Bool # max :: ReceiptHandle -> ReceiptHandle -> ReceiptHandle # min :: ReceiptHandle -> ReceiptHandle -> ReceiptHandle # | |
mkReceiptHandle :: Text -> ReceiptHandle Source #
Wrap a backend's delivery token (an SQS receipt handle, a Pub/Sub ackId)
as an opaque ReceiptHandle. For backend implementations only -- worker code
obtains handles from receive, never builds them.
unReceiptHandle :: ReceiptHandle -> Text Source #
Recover the backend's delivery token from a ReceiptHandle, to pass back to
the backend on ack / extendVisibility. For backend implementations only.
Durations
A duration in whole seconds, for extendVisibility. A 'newtype' so a raw
Int of seconds is never confused with some other count.
In-memory double
newInMemoryQueue :: IO MirrorQueue Source #
Build a fresh STM-backed in-memory MirrorQueue.
Honours the handle's contract: enqueue appends (FIFO), receive delivers all
currently-visible jobs and moves them in-flight, ack removes an in-flight job,
and an in-flight job that is never acked is redelivered on the next receive
("retry is don't ack"). extendVisibility holds a job in-flight across one such
redelivery pass. This is a test double -- there is no long-poll blocking; an empty
receive returns [] at once.
Bounded in-memory production backend
data MemoryQueueConfig Source #
What the bounded in-memory backend needs: its depth cap and its idle-poll
window. A record (like SqsConfig) so each knob is named rather
than a bare Int; build it with defaultMemoryQueueConfig for the production poll
window.
Constructors
| MemoryQueueConfig | |
Fields
| |
Instances
| Show MemoryQueueConfig Source # | |
Defined in Ecluse.Core.Queue Methods showsPrec :: Int -> MemoryQueueConfig -> ShowS # show :: MemoryQueueConfig -> String # showList :: [MemoryQueueConfig] -> ShowS # | |
| Eq MemoryQueueConfig Source # | |
Defined in Ecluse.Core.Queue Methods (==) :: MemoryQueueConfig -> MemoryQueueConfig -> Bool # (/=) :: MemoryQueueConfig -> MemoryQueueConfig -> Bool # | |
defaultMemoryQueueConfig :: Int -> MemoryQueueConfig Source #
A MemoryQueueConfig for a given depth cap with the idle-poll window at its
production default -- 20s, mirroring the SQS long-poll cadence
(defaultSqsConfig) and comfortably under the worker's 120s
heartbeat-staleness budget (workerHeartbeatStaleAfter), so an idle
receive returns a healthy empty poll long before /livez would flag the loop
stalled. The depth cap stays the operator-tunable knob; the poll window is a fixed
cadence, exposed on the record only so a test can shorten it.
newBoundedInMemoryQueue Source #
Arguments
| :: MemoryQueueConfig | The depth cap (and any future knobs). |
| -> (Int -> IO ()) | Invoked on each report-worthy cap-overflow drop with the running total drops,
so the composition root can log it (and, once the |
| -> IO MirrorQueue |
Build a bounded, best-effort in-memory MirrorQueue -- the production backend
behind ECLUSE_QUEUE_BACKEND=memory, a TBQueue shared between the serve path's
enqueue and the worker's receive.
It is correctness-safe despite being lossy: mirroring is a demand-driven optimization over the always-available public upstream, so a job lost to the cap or to process teardown just means the package is served from public again and re-enqueued on the next pull -- a deferred performance win, never a correctness loss. That admits two deliberate departures from the cloud backends' contract:
- Bounded, drop-newest on overflow. The queue holds at most
memQueueMaxDepthjobs; anenqueuethat would exceed the cap is rejected (the newest job is dropped) rather than growing memory without bound -- the load-bearing constraint, since a cold-cachenpm cienqueues thousands of jobs at once.enqueuenever throws (it runs on the serve hot path), and each report-worthy drop invokes the injected drop callback with the running drop count, rate-limited bymemoryQueueDropReportIntervalso a flood does not spam. - No redelivery;
ack/extendVisibilityare no-ops. Unlike the cloud backends (andnewInMemoryQueue), there is no visibility-timeout in-flight tracking: areceiveremoves a job for good. A job whose processing fails is therefore not redelivered -- it is simply re-enqueued on the next demand. This bounds memory hardest (nothing is retained after delivery) and is admissible precisely because a lost job is safe.
receive is a bounded long-poll: it waits up to memQueuePollWaitMicros for a
job, then drains up to memoryQueueBatchSize without blocking, or returns [] when
the window lapses -- the in-process analogue of the cloud long-poll. The bound is
load-bearing: the worker advances its liveness heartbeat only when receive returns
(an empty poll is a healthy idle), so an idle receive that blocked forever would
let the heartbeat go stale and /livez flag the loop stalled. The wait is the
timeout-over-atomically idiom rather than registerDelay so it works on the
non-threaded RTS too; an interrupted poll aborts the STM transaction, consuming
nothing.
memoryQueueBatchSize :: Int Source #
The most jobs one receive delivers from the bounded in-memory backend. Held
at the SQS batch cap so the worker -- which processes a batch sequentially and
advances its liveness heartbeat once per poll -- sees the same bounded batch shape
regardless of backend, rather than one poll returning a whole cold-cache burst and
starving the heartbeat past its staleness window.
memoryQueueDropReportInterval :: Int Source #
How many cap-overflow drops the bounded in-memory backend absorbs between warning reports. The first drop is always reported, then every multiple of this, so a sustained flood logs at most about one line per this many drops rather than one per dropped job.
Buffered producer hand-off
Arguments
| :: Int | Buffer depth: how many undelivered jobs the hand-off retains before dropping the newest. |
| -> (Int -> IO ()) | Invoked on every hand-off drop, with the running drop total. |
| -> (Int -> Text -> IO ()) | Invoked on every backend delivery failure, with the running failure total and the failure's detail. |
| -> MirrorQueue | The backend whose |
| -> IO (MirrorQueue, IO ()) |
Wrap a bounded producer-side hand-off buffer in front of a queue, so the
serve path's enqueue is an in-process STM write (microseconds) no matter how slow
the backend's own producer call is.
The motivating case is the SQS backend: its enqueue is an HTTP round trip
(SendMessage), and the serve path runs the mirror enqueue after the response body
has been sent but before the handler returns -- so on a keep-alive connection those
milliseconds hold the connection's turn and tax the next request on it. Buffered,
the handler hands the job off and returns; the returned drain loop -- which the
composition root runs alongside the server -- delivers buffered jobs to the backend
at the backend's own pace. The consumer fields (receive, ack,
extendVisibility) pass through untouched.
Loss stays safe, so the buffer keeps the handle's best-effort producer contract
(mirroring is demand-driven: a lost job is re-enqueued on the next demand for its
artifact -- the same argument newBoundedInMemoryQueue makes):
- Drop-newest on overflow. A hand-off finding the buffer full drops the job and
invokes
onDropwith the running drop total. The callback fires on every drop (metric-grade); the caller owns any log rate-limiting. - A backend failure inside the drain loop invokes
onDeliveryFailurewith the running failure total and the failure's detail, and the loop moves on to the next job; the failed job is not redelivered here. - Cancellation loses the buffer. The drain loop never returns, so the composition root races it against the services; shutdown cancels it and any still-buffered jobs are dropped -- the same safe loss.
The wrapped enqueue never throws.