{- | The mirror-queue handle: the durable hand-off from the request path to the
mirror worker.

Mirroring is __demand-driven__: when a client fetches an artifact whose version
passes the rules, the proxy 'enqueue's a 'MirrorJob' and serves the artifact
immediately, never blocking on the mirror. A separate worker 'receive's jobs,
fetches and verifies the artifact, publishes it to the mirror target, and 'ack's
the job (see @docs\/architecture\/cloud-backends.md@ → "Mirror Queue").

The queue is the one cloud surface with materially different APIs per provider
(AWS SQS @SendMessage@\/@ReceiveMessage@+visibility-timeout\/@DeleteMessage@; GCP
Pub\/Sub @Publish@\/@Pull@+ack-deadline\/@Acknowledge@), so it is its own handle --
a __record of functions__ (the Handle pattern). Both providers fit the same
receive → process → ack shape; their differences (visibility timeout vs ack
deadline, batch limits, dead-letter wiring) stay behind the handle, and
'ReceiptHandle' is opaque so neither leaks.

Like the other handles, the effectful fields return __'IO', not @App@__, so an
adapter stays decoupled from the proxy's @Env@\/@App@ (see
@docs\/architecture\/technology-stack.md@ → "Key Decisions").

== Conventions

The two cloud backends both give __at-least-once delivery__, which is safe here
because publishing is idempotent (a registry treats versions as immutable). The
handle's contract reflects that:

* __'enqueue' is best-effort.__ It runs on the request hot path (enqueue, then
  serve immediately), so a failure must be logged\/metered and __never fail the
  client response__ -- the artifact is already served, and a later pull
  re-enqueues.
* __Retry is "don't 'ack'".__ A job that fails processing is simply not acked;
  the visibility timeout \/ ack deadline redelivers it, and the backend's native
  dead-letter path catches the persistently failing ones. There is deliberately
  __no @nack@__.
* __'extendVisibility'__ lets the worker hold a long publish (a large artifact)
  past the visibility window. It is an /optimization/, not correctness-critical,
  since idempotency already makes redelivery harmless.

This module provides the handle and its payload types, plus two STM-backed
in-memory implementations:

* 'newInMemoryQueue' -- the __test double__ that models the cloud backends'
  visibility-timeout semantics (receive → ack \/ redeliver-on-no-ack), used to
  exercise the worker's retry path without a cloud queue.
* 'newBoundedInMemoryQueue' -- the __bounded, best-effort production backend__
  selected by @ECLUSE_QUEUE_BACKEND=memory@. See its own Haddock for why it is
  correctness-safe (a dropped job is re-enqueued on the next demand) and why it
  deliberately does __not__ redeliver.

It also provides 'newEnqueueBuffer', a __bounded producer-side hand-off buffer__
wrapped in front of either backend so the serve path's 'enqueue' completes in
microseconds while a composition-root drain loop delivers to the (possibly slow)
backend off the request path.
-}
module Ecluse.Core.Queue (
    -- * Queue handle
    MirrorQueue (..),

    -- * Payloads
    MirrorJob (..),
    MirrorArtifact (..),
    RemoteSpanContext (..),
    QueueMessage (..),

    -- * Opaque receipt
    ReceiptHandle,
    mkReceiptHandle,
    unReceiptHandle,

    -- * Durations
    Seconds (..),

    -- * In-memory double
    newInMemoryQueue,

    -- * Bounded in-memory production backend
    MemoryQueueConfig (..),
    defaultMemoryQueueConfig,
    newBoundedInMemoryQueue,
    memoryQueueBatchSize,
    memoryQueueDropReportInterval,

    -- * Buffered producer hand-off
    newEnqueueBuffer,
) where

import Control.Concurrent.STM.TBQueue (TBQueue, isFullTBQueue, newTBQueueIO, readTBQueue, tryReadTBQueue, writeTBQueue)
import Data.Map.Strict qualified as Map
import Data.Sequence qualified as Seq
import System.Timeout (timeout)
import UnliftIO.Exception (tryAny)

import Ecluse.Core.Package (Hash, PackageName)
import Ecluse.Core.Version (Version)

{- | A mirror job: everything the worker needs to back-fill one artifact into the
mirror target. The version was already gated by the rules at serve time (when
the job was enqueued), so the worker does not re-run the rules; it fetches the
bytes, verifies them against the __serve-time-admitted__ integrity digest the job
carries, and publishes.

The integrity digest and the artifact descriptor are captured __at enqueue time__
('jobArtifact'), not re-fetched: the worker mirrors exactly what the rules
admitted, so an upstream packument mutated in the enqueue → process window cannot
substitute a different artifact for the one that was gated. The descriptor also
carries the filename and declared size the worker needs to assemble the publish
document.
-}
data MirrorJob = MirrorJob
    { MirrorJob -> PackageName
jobPackage :: PackageName
    -- ^ The package whose artifact is being mirrored.
    , MirrorJob -> Version
jobVersion :: Version
    -- ^ The specific version to mirror.
    , MirrorJob -> Text
jobArtifactUrl :: Text
    -- ^ Where to fetch the artifact bytes from (the public upstream).
    , MirrorJob -> Text
jobMirrorTarget :: Text
    -- ^ The mirror-target endpoint the artifact is published to.
    , MirrorJob -> MirrorArtifact
jobArtifact :: MirrorArtifact
    {- ^ The serve-time-admitted artifact descriptor: the integrity digest the
    fetched bytes are verified against, plus the filename and declared size the
    publish document is assembled from.
    -}
    , MirrorJob -> Maybe RemoteSpanContext
jobTraceContext :: Maybe RemoteSpanContext
    {- ^ The trace context of the serve-time span that enqueued the job, captured
    at enqueue time so the worker's per-job span can __link__ back to the request
    that produced the work across the asynchronous hop. 'Nothing' when tracing was
    off at enqueue time (or for a job from a producer that carried none). The queue
    treats it as opaque transport; only the tracing port reads it.
    -}
    }
    deriving stock (MirrorJob -> MirrorJob -> Bool
(MirrorJob -> MirrorJob -> Bool)
-> (MirrorJob -> MirrorJob -> Bool) -> Eq MirrorJob
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: MirrorJob -> MirrorJob -> Bool
== :: MirrorJob -> MirrorJob -> Bool
$c/= :: MirrorJob -> MirrorJob -> Bool
/= :: MirrorJob -> MirrorJob -> Bool
Eq, Int -> MirrorJob -> ShowS
[MirrorJob] -> ShowS
MirrorJob -> String
(Int -> MirrorJob -> ShowS)
-> (MirrorJob -> String)
-> ([MirrorJob] -> ShowS)
-> Show MirrorJob
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> MirrorJob -> ShowS
showsPrec :: Int -> MirrorJob -> ShowS
$cshow :: MirrorJob -> String
show :: MirrorJob -> String
$cshowList :: [MirrorJob] -> ShowS
showList :: [MirrorJob] -> ShowS
Show)

{- | The serve-time-admitted artifact descriptor carried on a 'MirrorJob': exactly
the fields the worker needs to verify the fetched bytes and assemble the publish
document, captured when the version was gated.

'maHashes' is a 'NonEmpty' because the serve path admits a public version only when
it carries at least one integrity digest (the integrity-presence admission policy),
so a job with __no__ digest to verify against is unrepresentable -- the worker always
has a fingerprint to check the bytes against before they reach the private upstream.
-}
data MirrorArtifact = MirrorArtifact
    { MirrorArtifact -> Text
maFilename :: Text
    {- ^ The artifact's on-the-wire filename, the @_attachments@ key in the publish
    document.
    -}
    , MirrorArtifact -> NonEmpty Hash
maHashes :: NonEmpty Hash
    {- ^ The serve-time-admitted integrity digests (at least one). The worker
    verifies the fetched bytes against these before publishing; a mismatch fails
    the job with no publish.
    -}
    , MirrorArtifact -> Maybe Int
maSize :: Maybe Int
    -- ^ The declared artifact size in bytes, if the registry reported it.
    }
    deriving stock (MirrorArtifact -> MirrorArtifact -> Bool
(MirrorArtifact -> MirrorArtifact -> Bool)
-> (MirrorArtifact -> MirrorArtifact -> Bool) -> Eq MirrorArtifact
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: MirrorArtifact -> MirrorArtifact -> Bool
== :: MirrorArtifact -> MirrorArtifact -> Bool
$c/= :: MirrorArtifact -> MirrorArtifact -> Bool
/= :: MirrorArtifact -> MirrorArtifact -> Bool
Eq, Int -> MirrorArtifact -> ShowS
[MirrorArtifact] -> ShowS
MirrorArtifact -> String
(Int -> MirrorArtifact -> ShowS)
-> (MirrorArtifact -> String)
-> ([MirrorArtifact] -> ShowS)
-> Show MirrorArtifact
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> MirrorArtifact -> ShowS
showsPrec :: Int -> MirrorArtifact -> ShowS
$cshow :: MirrorArtifact -> String
show :: MirrorArtifact -> String
$cshowList :: [MirrorArtifact] -> ShowS
showList :: [MirrorArtifact] -> ShowS
Show)

{- | A serialised W3C trace-context carrier riding on a 'MirrorJob': the
@traceparent@ (and any @tracestate@) of the span that enqueued the job, in the
standard wire encoding. It is captured at enqueue time and read back by the worker's
tracing port to re-establish a span __link__ from the per-job span to the enqueueing
request, so the asynchronous mirror hand-off is navigable in a trace.

The two fields are the W3C header values verbatim; the queue carries them opaquely
(it neither parses nor validates them -- an unparseable carrier simply yields no link),
so this type names what is carried without coupling the queue to any tracing backend.
-}
data RemoteSpanContext = RemoteSpanContext
    { RemoteSpanContext -> Text
rscTraceparent :: Text
    -- ^ The W3C @traceparent@ header value of the enqueueing span.
    , RemoteSpanContext -> Text
rscTracestate :: Text
    {- ^ The W3C @tracestate@ header value (possibly empty) carried alongside, so
    vendor trace state survives the hop.
    -}
    }
    deriving stock (RemoteSpanContext -> RemoteSpanContext -> Bool
(RemoteSpanContext -> RemoteSpanContext -> Bool)
-> (RemoteSpanContext -> RemoteSpanContext -> Bool)
-> Eq RemoteSpanContext
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: RemoteSpanContext -> RemoteSpanContext -> Bool
== :: RemoteSpanContext -> RemoteSpanContext -> Bool
$c/= :: RemoteSpanContext -> RemoteSpanContext -> Bool
/= :: RemoteSpanContext -> RemoteSpanContext -> Bool
Eq, Int -> RemoteSpanContext -> ShowS
[RemoteSpanContext] -> ShowS
RemoteSpanContext -> String
(Int -> RemoteSpanContext -> ShowS)
-> (RemoteSpanContext -> String)
-> ([RemoteSpanContext] -> ShowS)
-> Show RemoteSpanContext
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> RemoteSpanContext -> ShowS
showsPrec :: Int -> RemoteSpanContext -> ShowS
$cshow :: RemoteSpanContext -> String
show :: RemoteSpanContext -> String
$cshowList :: [RemoteSpanContext] -> ShowS
showList :: [RemoteSpanContext] -> ShowS
Show)

{- | An __opaque__ handle identifying a received message for 'ack' \/
'extendVisibility'. It carries the backend's own delivery token -- an SQS receipt
handle or a Pub\/Sub @ackId@ -- as text; the constructor is hidden so neither
provider's representation leaks into worker code, and a handle is only ever
obtained from a 'QueueMessage' returned by 'receive'. Build one (in a backend)
with 'mkReceiptHandle' and read the token back with 'unReceiptHandle'.
-}
newtype ReceiptHandle = ReceiptHandle Text
    deriving stock (ReceiptHandle -> ReceiptHandle -> Bool
(ReceiptHandle -> ReceiptHandle -> Bool)
-> (ReceiptHandle -> ReceiptHandle -> Bool) -> Eq ReceiptHandle
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ReceiptHandle -> ReceiptHandle -> Bool
== :: ReceiptHandle -> ReceiptHandle -> Bool
$c/= :: ReceiptHandle -> ReceiptHandle -> Bool
/= :: ReceiptHandle -> ReceiptHandle -> Bool
Eq, Eq ReceiptHandle
Eq ReceiptHandle =>
(ReceiptHandle -> ReceiptHandle -> Ordering)
-> (ReceiptHandle -> ReceiptHandle -> Bool)
-> (ReceiptHandle -> ReceiptHandle -> Bool)
-> (ReceiptHandle -> ReceiptHandle -> Bool)
-> (ReceiptHandle -> ReceiptHandle -> Bool)
-> (ReceiptHandle -> ReceiptHandle -> ReceiptHandle)
-> (ReceiptHandle -> ReceiptHandle -> ReceiptHandle)
-> Ord ReceiptHandle
ReceiptHandle -> ReceiptHandle -> Bool
ReceiptHandle -> ReceiptHandle -> Ordering
ReceiptHandle -> ReceiptHandle -> ReceiptHandle
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: ReceiptHandle -> ReceiptHandle -> Ordering
compare :: ReceiptHandle -> ReceiptHandle -> Ordering
$c< :: ReceiptHandle -> ReceiptHandle -> Bool
< :: ReceiptHandle -> ReceiptHandle -> Bool
$c<= :: ReceiptHandle -> ReceiptHandle -> Bool
<= :: ReceiptHandle -> ReceiptHandle -> Bool
$c> :: ReceiptHandle -> ReceiptHandle -> Bool
> :: ReceiptHandle -> ReceiptHandle -> Bool
$c>= :: ReceiptHandle -> ReceiptHandle -> Bool
>= :: ReceiptHandle -> ReceiptHandle -> Bool
$cmax :: ReceiptHandle -> ReceiptHandle -> ReceiptHandle
max :: ReceiptHandle -> ReceiptHandle -> ReceiptHandle
$cmin :: ReceiptHandle -> ReceiptHandle -> ReceiptHandle
min :: ReceiptHandle -> ReceiptHandle -> ReceiptHandle
Ord, Int -> ReceiptHandle -> ShowS
[ReceiptHandle] -> ShowS
ReceiptHandle -> String
(Int -> ReceiptHandle -> ShowS)
-> (ReceiptHandle -> String)
-> ([ReceiptHandle] -> ShowS)
-> Show ReceiptHandle
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ReceiptHandle -> ShowS
showsPrec :: Int -> ReceiptHandle -> ShowS
$cshow :: ReceiptHandle -> String
show :: ReceiptHandle -> String
$cshowList :: [ReceiptHandle] -> ShowS
showList :: [ReceiptHandle] -> ShowS
Show)

{- | Wrap a backend's delivery token (an SQS receipt handle, a Pub\/Sub @ackId@)
as an opaque 'ReceiptHandle'. For backend implementations only -- worker code
obtains handles from 'receive', never builds them.
-}
mkReceiptHandle :: Text -> ReceiptHandle
mkReceiptHandle :: Text -> ReceiptHandle
mkReceiptHandle = Text -> ReceiptHandle
ReceiptHandle

{- | Recover the backend's delivery token from a 'ReceiptHandle', to pass back to
the backend on 'ack' \/ 'extendVisibility'. For backend implementations only.
-}
unReceiptHandle :: ReceiptHandle -> Text
unReceiptHandle :: ReceiptHandle -> Text
unReceiptHandle (ReceiptHandle Text
t) = Text
t

{- | A received message: the 'MirrorJob' to process together with the
'ReceiptHandle' used to 'ack' it (or 'extendVisibility' on it) once processed.
-}
data QueueMessage = QueueMessage
    { QueueMessage -> MirrorJob
msgJob :: MirrorJob
    -- ^ The job to process.
    , QueueMessage -> ReceiptHandle
msgReceipt :: ReceiptHandle
    -- ^ The handle identifying this delivery, for 'ack' \/ 'extendVisibility'.
    }
    deriving stock (QueueMessage -> QueueMessage -> Bool
(QueueMessage -> QueueMessage -> Bool)
-> (QueueMessage -> QueueMessage -> Bool) -> Eq QueueMessage
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: QueueMessage -> QueueMessage -> Bool
== :: QueueMessage -> QueueMessage -> Bool
$c/= :: QueueMessage -> QueueMessage -> Bool
/= :: QueueMessage -> QueueMessage -> Bool
Eq, Int -> QueueMessage -> ShowS
[QueueMessage] -> ShowS
QueueMessage -> String
(Int -> QueueMessage -> ShowS)
-> (QueueMessage -> String)
-> ([QueueMessage] -> ShowS)
-> Show QueueMessage
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> QueueMessage -> ShowS
showsPrec :: Int -> QueueMessage -> ShowS
$cshow :: QueueMessage -> String
show :: QueueMessage -> String
$cshowList :: [QueueMessage] -> ShowS
showList :: [QueueMessage] -> ShowS
Show)

{- | A duration in whole seconds, for 'extendVisibility'. A 'newtype' so a raw
@Int@ of seconds is never confused with some other count.
-}
newtype Seconds = Seconds Int
    deriving stock (Seconds -> Seconds -> Bool
(Seconds -> Seconds -> Bool)
-> (Seconds -> Seconds -> Bool) -> Eq Seconds
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: Seconds -> Seconds -> Bool
== :: Seconds -> Seconds -> Bool
$c/= :: Seconds -> Seconds -> Bool
/= :: Seconds -> Seconds -> Bool
Eq, Eq Seconds
Eq Seconds =>
(Seconds -> Seconds -> Ordering)
-> (Seconds -> Seconds -> Bool)
-> (Seconds -> Seconds -> Bool)
-> (Seconds -> Seconds -> Bool)
-> (Seconds -> Seconds -> Bool)
-> (Seconds -> Seconds -> Seconds)
-> (Seconds -> Seconds -> Seconds)
-> Ord Seconds
Seconds -> Seconds -> Bool
Seconds -> Seconds -> Ordering
Seconds -> Seconds -> Seconds
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: Seconds -> Seconds -> Ordering
compare :: Seconds -> Seconds -> Ordering
$c< :: Seconds -> Seconds -> Bool
< :: Seconds -> Seconds -> Bool
$c<= :: Seconds -> Seconds -> Bool
<= :: Seconds -> Seconds -> Bool
$c> :: Seconds -> Seconds -> Bool
> :: Seconds -> Seconds -> Bool
$c>= :: Seconds -> Seconds -> Bool
>= :: Seconds -> Seconds -> Bool
$cmax :: Seconds -> Seconds -> Seconds
max :: Seconds -> Seconds -> Seconds
$cmin :: Seconds -> Seconds -> Seconds
min :: Seconds -> Seconds -> Seconds
Ord, Int -> Seconds -> ShowS
[Seconds] -> ShowS
Seconds -> String
(Int -> Seconds -> ShowS)
-> (Seconds -> String) -> ([Seconds] -> ShowS) -> Show Seconds
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> Seconds -> ShowS
showsPrec :: Int -> Seconds -> ShowS
$cshow :: Seconds -> String
show :: Seconds -> String
$cshowList :: [Seconds] -> ShowS
showList :: [Seconds] -> ShowS
Show)

{- | The mirror-queue handle -- a record of functions over a backend whose private
state the closures capture. See the module header for the @enqueue@ /
don't-@ack@-to-retry / no-@nack@ conventions; all fields are 'IO'.
-}
data MirrorQueue = MirrorQueue
    { MirrorQueue -> MirrorJob -> IO ()
enqueue :: MirrorJob -> IO ()
    {- ^ Producer. __Best-effort__: runs on the request hot path, so a failure is
    logged\/metered and never fails the client response (see the header).
    -}
    , MirrorQueue -> IO [QueueMessage]
receive :: IO [QueueMessage]
    {- ^ Consumer. One long-poll for a batch of messages; returns @[]@ on timeout
    (an empty poll), so the worker loop simply polls again.
    -}
    , MirrorQueue -> ReceiptHandle -> IO ()
ack :: ReceiptHandle -> IO ()
    {- ^ Acknowledge a processed message so it is not redelivered. __Not__ acking
    is how a failed job is retried (the header's "retry is don't ack").
    -}
    , MirrorQueue -> ReceiptHandle -> Seconds -> IO ()
extendVisibility :: ReceiptHandle -> Seconds -> IO ()
    {- ^ Extend a received message's visibility window to hold a long publish. An
    optimization, not correctness-critical (redelivery is harmless).
    -}
    }

{- The mutable state of the in-memory queue.

Modelled as visible (waiting) jobs plus in-flight (received-but-unacked) ones,
exactly mirroring the visibility-timeout model the cloud backends use: a 'receive'
makes visible jobs in-flight, an 'ack' drops an in-flight job, and an unacked
in-flight job becomes visible again -- redelivered -- on a subsequent 'receive'.
-}
data QueueState = QueueState
    { -- A monotonic counter giving each delivery a unique 'ReceiptHandle'.
      QueueState -> Word64
qsNextReceipt :: Word64
    , -- Jobs waiting to be delivered, oldest first (FIFO). 'Seq' gives
      -- O(1) amortised snoc so enqueue cost does not grow with queue depth.
      QueueState -> Seq MirrorJob
qsVisible :: Seq MirrorJob
    , -- Delivered-but-unacked jobs, keyed by the numeric receipt counter (not the
      -- rendered 'ReceiptHandle' text) so iteration stays in delivery -- hence
      -- FIFO-reclaim -- order rather than the lexicographic order text keys give.
      QueueState -> Map Word64 InFlight
qsInFlight :: Map Word64 InFlight
    }

{- One in-flight job and whether its visibility has been extended.

A held ('inFlightHeld' = 'True') job survives one reclaim pass (the effect of
'extendVisibility'); otherwise an in-flight job is reclaimed -- made visible again
for redelivery -- on the next 'receive', modelling expiry of the visibility
window.
-}
data InFlight = InFlight
    { -- The job awaiting acknowledgement.
      InFlight -> MirrorJob
inFlightJob :: MirrorJob
    , -- Whether 'extendVisibility' has held it past the next reclaim.
      InFlight -> Bool
inFlightHeld :: Bool
    }

{- | Build a fresh STM-backed in-memory 'MirrorQueue'.

Honours the handle's contract: 'enqueue' appends (FIFO), 'receive' delivers all
currently-visible jobs and moves them in-flight, 'ack' removes an in-flight job,
and an in-flight job that is never acked is __redelivered__ on the next 'receive'
("retry is don't ack"). 'extendVisibility' holds a job in-flight across one such
redelivery pass. This is a test double -- there is no long-poll blocking; an empty
'receive' returns @[]@ at once.
-}
newInMemoryQueue :: IO MirrorQueue
newInMemoryQueue :: IO MirrorQueue
newInMemoryQueue = do
    stateVar <- QueueState -> IO (TVar QueueState)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO (Word64 -> Seq MirrorJob -> Map Word64 InFlight -> QueueState
QueueState Word64
0 Seq MirrorJob
forall a. Seq a
Seq.empty Map Word64 InFlight
forall a. Monoid a => a
mempty)
    let modifyState :: (QueueState -> QueueState) -> IO ()
        modifyState = STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ())
-> ((QueueState -> QueueState) -> STM ())
-> (QueueState -> QueueState)
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TVar QueueState -> (QueueState -> QueueState) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar QueueState
stateVar
    pure
        MirrorQueue
            { enqueue = modifyState . enqueueJob
            , receive = atomically $ do
                qs <- readTVar stateVar
                let (messages, qs') = deliverAll qs
                writeTVar stateVar qs'
                pure messages
            , ack = modifyState . ackJob
            , extendVisibility = \ReceiptHandle
handle Seconds
_seconds -> (QueueState -> QueueState) -> IO ()
modifyState (ReceiptHandle -> QueueState -> QueueState
holdJob ReceiptHandle
handle)
            }

-- Append a job to the back of the visible queue (FIFO). O(1) amortised.
enqueueJob :: MirrorJob -> QueueState -> QueueState
enqueueJob :: MirrorJob -> QueueState -> QueueState
enqueueJob MirrorJob
job QueueState
qs = QueueState
qs{qsVisible = qsVisible qs <> Seq.singleton job}

-- Drop an acked in-flight job; a handle that is unknown (already acked, never
-- issued, or not one of ours) is a harmless no-op.
ackJob :: ReceiptHandle -> QueueState -> QueueState
ackJob :: ReceiptHandle -> QueueState -> QueueState
ackJob ReceiptHandle
handle QueueState
qs =
    case ReceiptHandle -> Maybe Word64
receiptKey ReceiptHandle
handle of
        Just Word64
key -> QueueState
qs{qsInFlight = Map.delete key (qsInFlight qs)}
        Maybe Word64
Nothing -> QueueState
qs

-- Hold an in-flight job past the next reclaim pass. Unknown handle: no-op.
holdJob :: ReceiptHandle -> QueueState -> QueueState
holdJob :: ReceiptHandle -> QueueState -> QueueState
holdJob ReceiptHandle
handle QueueState
qs =
    case ReceiptHandle -> Maybe Word64
receiptKey ReceiptHandle
handle of
        Just Word64
key ->
            QueueState
qs{qsInFlight = Map.adjust (\InFlight
f -> InFlight
f{inFlightHeld = True}) key (qsInFlight qs)}
        Maybe Word64
Nothing -> QueueState
qs

-- Recover the numeric counter a handle was minted from (the inverse of the
-- 'show' in 'assignReceipts'); 'Nothing' for a handle this queue never issued.
receiptKey :: ReceiptHandle -> Maybe Word64
receiptKey :: ReceiptHandle -> Maybe Word64
receiptKey = String -> Maybe Word64
forall a. Read a => String -> Maybe a
readMaybe (String -> Maybe Word64)
-> (ReceiptHandle -> String) -> ReceiptHandle -> Maybe Word64
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> String
forall a. ToString a => a -> String
toString (Text -> String)
-> (ReceiptHandle -> Text) -> ReceiptHandle -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ReceiptHandle -> Text
unReceiptHandle

{- A single receive over the in-memory queue state. First reclaim any un-held
in-flight jobs (their visibility window has lapsed) back to the front of the
visible queue, and clear the held flag on the rest so they are reclaimed next
time unless held again. Then deliver every visible job: assign each a fresh
receipt, move it in-flight, and return it as a 'QueueMessage'. -}
deliverAll :: QueueState -> ([QueueMessage], QueueState)
deliverAll :: QueueState -> ([QueueMessage], QueueState)
deliverAll QueueState
qs =
    let ([MirrorJob]
reclaimed, [(Word64, InFlight)]
stillHeld) = [(Word64, InFlight)] -> ([MirrorJob], [(Word64, InFlight)])
reclaim (Map Word64 InFlight -> [(Word64, InFlight)]
forall k a. Map k a -> [(k, a)]
Map.toList (QueueState -> Map Word64 InFlight
qsInFlight QueueState
qs))
        toDeliver :: Seq MirrorJob
toDeliver = [MirrorJob] -> Seq MirrorJob
forall a. [a] -> Seq a
Seq.fromList [MirrorJob]
reclaimed Seq MirrorJob -> Seq MirrorJob -> Seq MirrorJob
forall a. Semigroup a => a -> a -> a
<> QueueState -> Seq MirrorJob
qsVisible QueueState
qs
        ([QueueMessage]
messages, Word64
nextReceipt, Map Word64 InFlight
delivered) =
            Word64
-> [MirrorJob] -> ([QueueMessage], Word64, Map Word64 InFlight)
assignReceipts (QueueState -> Word64
qsNextReceipt QueueState
qs) (Seq MirrorJob -> [MirrorJob]
forall a. Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList Seq MirrorJob
toDeliver)
     in ( [QueueMessage]
messages
        , QueueState
            { qsNextReceipt :: Word64
qsNextReceipt = Word64
nextReceipt
            , qsVisible :: Seq MirrorJob
qsVisible = Seq MirrorJob
forall a. Seq a
Seq.empty
            , qsInFlight :: Map Word64 InFlight
qsInFlight = [(Word64, InFlight)] -> Map Word64 InFlight
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList [(Word64, InFlight)]
stillHeld Map Word64 InFlight -> Map Word64 InFlight -> Map Word64 InFlight
forall a. Semigroup a => a -> a -> a
<> Map Word64 InFlight
delivered
            }
        )

{- Partition in-flight entries into (jobs reclaimed to visible, entries that
stay in flight). A held entry stays in flight but has its hold cleared, so a
later un-held receive reclaims it. -}
reclaim ::
    [(Word64, InFlight)] ->
    ([MirrorJob], [(Word64, InFlight)])
reclaim :: [(Word64, InFlight)] -> ([MirrorJob], [(Word64, InFlight)])
reclaim = ((Word64, InFlight)
 -> ([MirrorJob], [(Word64, InFlight)])
 -> ([MirrorJob], [(Word64, InFlight)]))
-> ([MirrorJob], [(Word64, InFlight)])
-> [(Word64, InFlight)]
-> ([MirrorJob], [(Word64, InFlight)])
forall a b. (a -> b -> b) -> b -> [a] -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr (Word64, InFlight)
-> ([MirrorJob], [(Word64, InFlight)])
-> ([MirrorJob], [(Word64, InFlight)])
forall {a}.
(a, InFlight)
-> ([MirrorJob], [(a, InFlight)]) -> ([MirrorJob], [(a, InFlight)])
step ([], [])
  where
    step :: (a, InFlight)
-> ([MirrorJob], [(a, InFlight)]) -> ([MirrorJob], [(a, InFlight)])
step (a
key, InFlight
f) ([MirrorJob]
jobs, [(a, InFlight)]
held)
        | InFlight -> Bool
inFlightHeld InFlight
f = ([MirrorJob]
jobs, (a
key, InFlight
f{inFlightHeld = False}) (a, InFlight) -> [(a, InFlight)] -> [(a, InFlight)]
forall a. a -> [a] -> [a]
: [(a, InFlight)]
held)
        | Bool
otherwise = (InFlight -> MirrorJob
inFlightJob InFlight
f MirrorJob -> [MirrorJob] -> [MirrorJob]
forall a. a -> [a] -> [a]
: [MirrorJob]
jobs, [(a, InFlight)]
held)

{- Give each job a fresh receipt, threading the monotonic counter. Returns
the messages, the next free counter value, and the new in-flight entries. The
in-flight map is keyed by the numeric counter; the 'ReceiptHandle' the message
carries is that counter rendered as text. -}
assignReceipts ::
    Word64 ->
    [MirrorJob] ->
    ([QueueMessage], Word64, Map Word64 InFlight)
assignReceipts :: Word64
-> [MirrorJob] -> ([QueueMessage], Word64, Map Word64 InFlight)
assignReceipts Word64
next [] = ([], Word64
next, Map Word64 InFlight
forall a. Monoid a => a
mempty)
assignReceipts Word64
next (MirrorJob
job : [MirrorJob]
rest) =
    let message :: QueueMessage
message = QueueMessage{msgJob :: MirrorJob
msgJob = MirrorJob
job, msgReceipt :: ReceiptHandle
msgReceipt = Text -> ReceiptHandle
mkReceiptHandle (Word64 -> Text
forall b a. (Show a, IsString b) => a -> b
show Word64
next)}
        ([QueueMessage]
messages, Word64
next', Map Word64 InFlight
inFlight) = Word64
-> [MirrorJob] -> ([QueueMessage], Word64, Map Word64 InFlight)
assignReceipts (Word64
next Word64 -> Word64 -> Word64
forall a. Num a => a -> a -> a
+ Word64
1) [MirrorJob]
rest
     in ( QueueMessage
message QueueMessage -> [QueueMessage] -> [QueueMessage]
forall a. a -> [a] -> [a]
: [QueueMessage]
messages
        , Word64
next'
        , Word64 -> InFlight -> Map Word64 InFlight -> Map Word64 InFlight
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert Word64
next (InFlight{inFlightJob :: MirrorJob
inFlightJob = MirrorJob
job, inFlightHeld :: Bool
inFlightHeld = Bool
False}) Map Word64 InFlight
inFlight
        )

{- | What the bounded in-memory backend needs: its depth cap and its idle-poll
window. A record (like 'Ecluse.Core.Queue.Sqs.SqsConfig') so each knob is named rather
than a bare 'Int'; build it with 'defaultMemoryQueueConfig' for the production poll
window.
-}
data MemoryQueueConfig = MemoryQueueConfig
    { MemoryQueueConfig -> Int
memQueueMaxDepth :: Int
    {- ^ The maximum number of jobs the queue holds. A fresh 'enqueue' past this cap
    is __dropped-newest__ (the enqueue is rejected); a dropped job is safe, as it is
    re-enqueued on the next demand. Must be positive (the config layer enforces it).
    -}
    , MemoryQueueConfig -> Int
memQueuePollWaitMicros :: Int
    {- ^ The idle long-poll window in microseconds: how long a 'receive' waits for a
    job before returning @[]@ (an empty, healthy poll). Bounds the idle wait so the
    worker's liveness heartbeat keeps advancing -- see 'newBoundedInMemoryQueue'.
    -}
    }
    deriving stock (MemoryQueueConfig -> MemoryQueueConfig -> Bool
(MemoryQueueConfig -> MemoryQueueConfig -> Bool)
-> (MemoryQueueConfig -> MemoryQueueConfig -> Bool)
-> Eq MemoryQueueConfig
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: MemoryQueueConfig -> MemoryQueueConfig -> Bool
== :: MemoryQueueConfig -> MemoryQueueConfig -> Bool
$c/= :: MemoryQueueConfig -> MemoryQueueConfig -> Bool
/= :: MemoryQueueConfig -> MemoryQueueConfig -> Bool
Eq, Int -> MemoryQueueConfig -> ShowS
[MemoryQueueConfig] -> ShowS
MemoryQueueConfig -> String
(Int -> MemoryQueueConfig -> ShowS)
-> (MemoryQueueConfig -> String)
-> ([MemoryQueueConfig] -> ShowS)
-> Show MemoryQueueConfig
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> MemoryQueueConfig -> ShowS
showsPrec :: Int -> MemoryQueueConfig -> ShowS
$cshow :: MemoryQueueConfig -> String
show :: MemoryQueueConfig -> String
$cshowList :: [MemoryQueueConfig] -> ShowS
showList :: [MemoryQueueConfig] -> ShowS
Show)

{- | A 'MemoryQueueConfig' for a given depth cap with the idle-poll window at its
production default -- @20s@, mirroring the SQS long-poll cadence
('Ecluse.Core.Queue.Sqs.defaultSqsConfig') and comfortably under the worker's @120s@
heartbeat-staleness budget ('Ecluse.Core.Worker.workerHeartbeatStaleAfter'), so an idle
'receive' returns a healthy empty poll long before @\/livez@ would flag the loop
stalled. The depth cap stays the operator-tunable knob; the poll window is a fixed
cadence, exposed on the record only so a test can shorten it.
-}
defaultMemoryQueueConfig :: Int -> MemoryQueueConfig
defaultMemoryQueueConfig :: Int -> MemoryQueueConfig
defaultMemoryQueueConfig Int
maxDepth =
    MemoryQueueConfig
        { memQueueMaxDepth :: Int
memQueueMaxDepth = Int
maxDepth
        , memQueuePollWaitMicros :: Int
memQueuePollWaitMicros = Int
20_000_000
        }

{- | The most jobs one 'receive' delivers from the bounded in-memory backend. Held
at the SQS batch cap so the worker -- which processes a batch __sequentially__ and
advances its liveness heartbeat once per poll -- sees the same bounded batch shape
regardless of backend, rather than one poll returning a whole cold-cache burst and
starving the heartbeat past its staleness window.
-}
memoryQueueBatchSize :: Int
memoryQueueBatchSize :: Int
memoryQueueBatchSize = Int
10

{- | How many cap-overflow drops the bounded in-memory backend absorbs between
warning reports. The first drop is always reported, then every multiple of this, so
a sustained flood logs at most about one line per this many drops rather than one
per dropped job.
-}
memoryQueueDropReportInterval :: Int
memoryQueueDropReportInterval :: Int
memoryQueueDropReportInterval = Int
1000

{- | Build a bounded, best-effort in-memory 'MirrorQueue' -- the production backend
behind @ECLUSE_QUEUE_BACKEND=memory@, a 'TBQueue' shared between the serve path's
'enqueue' and the worker's 'receive'.

It is __correctness-safe despite being lossy__: mirroring is a demand-driven
optimization over the always-available public upstream, so a job lost to the cap or
to process teardown just means the package is served from public again and
re-enqueued on the next pull -- a deferred performance win, never a correctness loss.
That admits two deliberate departures from the cloud backends' contract:

* __Bounded, drop-newest on overflow.__ The queue holds at most 'memQueueMaxDepth'
  jobs; an 'enqueue' that would exceed the cap is rejected (the newest job is
  dropped) rather than growing memory without bound -- the load-bearing constraint,
  since a cold-cache @npm ci@ enqueues thousands of jobs at once. 'enqueue' never
  throws (it runs on the serve hot path), and each report-worthy drop invokes the
  injected drop callback with the running drop count, rate-limited by
  'memoryQueueDropReportInterval' so a flood does not spam.
* __No redelivery; 'ack' \/ 'extendVisibility' are no-ops.__ Unlike the cloud
  backends (and 'newInMemoryQueue'), there is no visibility-timeout in-flight
  tracking: a 'receive' removes a job for good. A job whose processing fails is
  therefore __not__ redelivered -- it is simply re-enqueued on the next demand. This
  bounds memory hardest (nothing is retained after delivery) and is admissible
  precisely because a lost job is safe.

'receive' is a __bounded long-poll__: it waits up to 'memQueuePollWaitMicros' for a
job, then drains up to 'memoryQueueBatchSize' without blocking, or returns @[]@ when
the window lapses -- the in-process analogue of the cloud long-poll. The bound is
load-bearing: the worker advances its liveness heartbeat only when 'receive' returns
(an empty poll is a healthy idle), so an idle 'receive' that blocked forever would
let the heartbeat go stale and @\/livez@ flag the loop stalled. The wait is the
@timeout@-over-@atomically@ idiom rather than @registerDelay@ so it works on the
non-threaded RTS too; an interrupted poll aborts the STM transaction, consuming
nothing.
-}
newBoundedInMemoryQueue ::
    -- | The depth cap (and any future knobs).
    MemoryQueueConfig ->
    {- | Invoked on each report-worthy cap-overflow drop with the running total drops,
    so the composition root can log it (and, once the @ecluse.mirror.*@ metric
    catalogue lands, increment a drop counter alongside).
    -}
    (Int -> IO ()) ->
    IO MirrorQueue
newBoundedInMemoryQueue :: MemoryQueueConfig -> (Int -> IO ()) -> IO MirrorQueue
newBoundedInMemoryQueue MemoryQueueConfig
cfg Int -> IO ()
onDrop = do
    -- A capacity of at least one: the config layer enforces a positive cap, but guard
    -- so a directly-constructed queue can never be the degenerate always-full zero.
    queue <- Natural -> IO (TBQueue MirrorJob)
forall a. Natural -> IO (TBQueue a)
newTBQueueIO (Int -> Natural
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
1 (MemoryQueueConfig -> Int
memQueueMaxDepth MemoryQueueConfig
cfg)))
    dropCount <- newTVarIO (0 :: Int)
    nextReceipt <- newTVarIO (0 :: Word64)
    pure
        MirrorQueue
            { enqueue = \MirrorJob
job -> do
                dropped <- STM (Maybe Int) -> IO (Maybe Int)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (TBQueue MirrorJob -> TVar Int -> MirrorJob -> STM (Maybe Int)
writeOrDrop TBQueue MirrorJob
queue TVar Int
dropCount MirrorJob
job)
                whenJust dropped (\Int
n -> Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int -> Bool
shouldReportDrop Int
n) (Int -> IO ()
onDrop Int
n))
            , -- A bounded long-poll: wait up to the poll window for a batch, else return
              -- [] so the worker's heartbeat keeps advancing on an idle queue. The
              -- timeout aborts the blocked STM transaction, so no job is consumed.
              receive = fromMaybe [] <$> timeout (memQueuePollWaitMicros cfg) (atomically (receiveBatch queue nextReceipt))
            , -- A delivered job is already gone from the queue, so there is nothing to
              -- retire and a failed job redelivers via the next demand, not here.
              ack = const pass
            , extendVisibility = \ReceiptHandle
_ Seconds
_ -> IO ()
forall (f :: * -> *). Applicative f => f ()
pass
            }

-- Report the first drop, then every interval-th, so the first shed is always
-- visible while a sustained flood is rate-limited.
shouldReportDrop :: Int -> Bool
shouldReportDrop :: Int -> Bool
shouldReportDrop Int
n = Int
n Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
1 Bool -> Bool -> Bool
|| Int
n Int -> Int -> Int
forall a. Integral a => a -> a -> a
`mod` Int
memoryQueueDropReportInterval Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0

{- Take a bounded batch within one STM transaction: block (retry) until at least one
job is available, then drain up to 'memoryQueueBatchSize' total without blocking. The
caller bounds the initial block with a timeout (so an idle queue yields @[]@ rather
than hanging the worker); if that timeout fires, this transaction is aborted and
consumes nothing. Each delivery is assigned a fresh receipt from a monotonic counter
so messages stay distinct, even though 'ack' on this backend is a no-op. -}
receiveBatch :: TBQueue MirrorJob -> TVar Word64 -> STM [QueueMessage]
receiveBatch :: TBQueue MirrorJob -> TVar Word64 -> STM [QueueMessage]
receiveBatch TBQueue MirrorJob
queue TVar Word64
nextReceipt = do
    headJob <- TBQueue MirrorJob -> STM MirrorJob
forall a. TBQueue a -> STM a
readTBQueue TBQueue MirrorJob
queue
    rest <- drainUpTo (memoryQueueBatchSize - 1)
    traverse assignReceipt (headJob : rest)
  where
    drainUpTo :: Int -> STM [MirrorJob]
    drainUpTo :: Int -> STM [MirrorJob]
drainUpTo Int
budget
        | Int
budget Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0 = [MirrorJob] -> STM [MirrorJob]
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
        | Bool
otherwise =
            TBQueue MirrorJob -> STM (Maybe MirrorJob)
forall a. TBQueue a -> STM (Maybe a)
tryReadTBQueue TBQueue MirrorJob
queue STM (Maybe MirrorJob)
-> (Maybe MirrorJob -> STM [MirrorJob]) -> STM [MirrorJob]
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
                Maybe MirrorJob
Nothing -> [MirrorJob] -> STM [MirrorJob]
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
                Just MirrorJob
job -> (MirrorJob
job MirrorJob -> [MirrorJob] -> [MirrorJob]
forall a. a -> [a] -> [a]
:) ([MirrorJob] -> [MirrorJob]) -> STM [MirrorJob] -> STM [MirrorJob]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Int -> STM [MirrorJob]
drainUpTo (Int
budget Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1)

    assignReceipt :: MirrorJob -> STM QueueMessage
    assignReceipt :: MirrorJob -> STM QueueMessage
assignReceipt MirrorJob
job = do
        n <- TVar Word64 -> STM Word64
forall a. TVar a -> STM a
readTVar TVar Word64
nextReceipt
        writeTVar nextReceipt (n + 1)
        pure QueueMessage{msgJob = job, msgReceipt = mkReceiptHandle (show n)}

{- Hand a job to a bounded queue within the caller's transaction: write it when
there is room, or drop it at the cap (drop-newest) and return the incremented
running drop total for the caller's report policy. Dropping rather than blocking
keeps the producer non-blocking, and the loss is safe: a dropped job is
re-enqueued on the next demand for its artifact. -}
writeOrDrop :: TBQueue MirrorJob -> TVar Int -> MirrorJob -> STM (Maybe Int)
writeOrDrop :: TBQueue MirrorJob -> TVar Int -> MirrorJob -> STM (Maybe Int)
writeOrDrop TBQueue MirrorJob
queue TVar Int
dropCount MirrorJob
job = do
    full <- TBQueue MirrorJob -> STM Bool
forall a. TBQueue a -> STM Bool
isFullTBQueue TBQueue MirrorJob
queue
    if full
        then Just <$> bumpCount dropCount
        else writeTBQueue queue job $> Nothing

-- Increment a running counter and return the new total.
bumpCount :: TVar Int -> STM Int
bumpCount :: TVar Int -> STM Int
bumpCount TVar Int
counter = do
    n <- (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) (Int -> Int) -> STM Int -> STM Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
counter
    writeTVar counter n
    pure n

{- | Wrap a bounded __producer-side hand-off buffer__ in front of a queue, so the
serve path's 'enqueue' is an in-process STM write (microseconds) no matter how slow
the backend's own producer call is.

The motivating case is the SQS backend: its 'enqueue' is an HTTP round trip
(@SendMessage@), and the serve path runs the mirror enqueue after the response body
has been sent but before the handler returns -- so on a keep-alive connection those
milliseconds hold the connection's turn and tax the next request on it. Buffered,
the handler hands the job off and returns; the returned __drain loop__ -- which the
composition root runs alongside the server -- delivers buffered jobs to the backend
at the backend's own pace. The consumer fields ('receive', 'ack',
'extendVisibility') pass through untouched.

Loss stays safe, so the buffer keeps the handle's best-effort producer contract
(mirroring is demand-driven: a lost job is re-enqueued on the next demand for its
artifact -- the same argument 'newBoundedInMemoryQueue' makes):

* __Drop-newest on overflow.__ A hand-off finding the buffer full drops the job and
  invokes @onDrop@ with the running drop total. The callback fires on __every__
  drop (metric-grade); the caller owns any log rate-limiting.
* __A backend failure inside the drain loop__ invokes @onDeliveryFailure@ with the
  running failure total and the failure's detail, and the loop moves on to the next
  job; the failed job is not redelivered here.
* __Cancellation loses the buffer.__ The drain loop never returns, so the
  composition root races it against the services; shutdown cancels it and any
  still-buffered jobs are dropped -- the same safe loss.

The wrapped 'enqueue' never throws.
-}
newEnqueueBuffer ::
    {- | Buffer depth: how many undelivered jobs the hand-off retains before
    dropping the newest.
    -}
    Int ->
    -- | Invoked on every hand-off drop, with the running drop total.
    (Int -> IO ()) ->
    {- | Invoked on every backend delivery failure, with the running failure total
    and the failure's detail.
    -}
    (Int -> Text -> IO ()) ->
    -- | The backend whose 'enqueue' is being decoupled from its callers.
    MirrorQueue ->
    IO (MirrorQueue, IO ())
newEnqueueBuffer :: Int
-> (Int -> IO ())
-> (Int -> Text -> IO ())
-> MirrorQueue
-> IO (MirrorQueue, IO ())
newEnqueueBuffer Int
depth Int -> IO ()
onDrop Int -> Text -> IO ()
onDeliveryFailure MirrorQueue
backend = do
    -- A capacity of at least one, so a degenerate depth can never make the
    -- hand-off an always-full drop (the same guard the bounded backend applies).
    buffer <- Natural -> IO (TBQueue MirrorJob)
forall a. Natural -> IO (TBQueue a)
newTBQueueIO (Int -> Natural
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
1 Int
depth))
    dropCount <- newTVarIO (0 :: Int)
    failureCount <- newTVarIO (0 :: Int)
    let
        -- Unlike the bounded backend, every hand-off drop reports (metric-grade);
        -- the caller owns any log rate-limiting.
        handOff MirrorJob
job = do
            dropped <- STM (Maybe Int) -> IO (Maybe Int)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (TBQueue MirrorJob -> TVar Int -> MirrorJob -> STM (Maybe Int)
writeOrDrop TBQueue MirrorJob
buffer TVar Int
dropCount MirrorJob
job)
            whenJust dropped onDrop
    pure (backend{enqueue = handOff}, forever (deliverNext buffer failureCount onDeliveryFailure backend))

{- Deliver the next buffered job to the backend's own 'enqueue', blocking until
one is buffered. A backend failure invokes the failure callback with the running
failure total and the failure's detail; the failed job is not redelivered here
(the safe loss 'newEnqueueBuffer' documents). -}
deliverNext :: TBQueue MirrorJob -> TVar Int -> (Int -> Text -> IO ()) -> MirrorQueue -> IO ()
deliverNext :: TBQueue MirrorJob
-> TVar Int -> (Int -> Text -> IO ()) -> MirrorQueue -> IO ()
deliverNext TBQueue MirrorJob
buffer TVar Int
failureCount Int -> Text -> IO ()
onDeliveryFailure MirrorQueue
backend = do
    job <- STM MirrorJob -> IO MirrorJob
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (TBQueue MirrorJob -> STM MirrorJob
forall a. TBQueue a -> STM a
readTBQueue TBQueue MirrorJob
buffer)
    tryAny (enqueue backend job) >>= \case
        Left SomeException
failure -> do
            n <- STM Int -> IO Int
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (TVar Int -> STM Int
bumpCount TVar Int
failureCount)
            onDeliveryFailure n (toText (displayException failure))
        Right () -> IO ()
forall (f :: * -> *). Applicative f => f ()
pass