{- | The AWS SQS backend behind the 'MirrorQueue' handle.

Maps the handle's receive → process → ack shape onto SQS:

* 'enqueue' → @SendMessage@ (the 'MirrorJob' encoded as the message body),
* 'receive' → one long-poll @ReceiveMessage@ (a batch, @[]@ on an empty poll),
* 'ack' → @DeleteMessage@ (the message is gone, never redelivered),
* 'extendVisibility' → @ChangeMessageVisibility@ (hold a long publish).

The provider differences SQS embodies -- the visibility timeout, the long-poll
window, the batch limit -- are 'SqsConfig' knobs with sane defaults, and the SQS
receipt handle is carried opaquely in a 'ReceiptHandle' (via 'mkReceiptHandle'),
so none of it leaks past the handle. __Retry is "don't ack"__: a job whose
processing fails is simply not 'ack'ed, and SQS redelivers it once the visibility
timeout lapses; persistent failures fall to the queue's native dead-letter
(max-receive-count), so there is no @nack@ (see "Ecluse.Core.Queue").

The @amazonka@ 'AWS.Env' is built once at 'newSqsQueue' and captured by the
handle's closures, so the backend's state never reaches the proxy's @Env@\/@App@
(see @docs\/architecture\/technology-stack.md@ → "Key Decisions"). The
'MirrorJob' wire mapping is a plain JSON object, decoded on 'receive'; a body that
fails to parse is dropped rather than yielded as a partial, so -- like any message
left unprocessed -- it is not 'ack'ed and SQS redelivers it, ultimately to the
dead-letter queue.

The SQS queue is a __trusted, operator-declared destination__ (the configured queue
URL, or an endpoint override): like the OTLP telemetry endpoint (see
"Ecluse.Telemetry.Resolve"), it is reached through @amazonka@'s own client and is
__not__ subject to the data-plane egress controls (the host allowlist and the https-only
egress posture of "Ecluse.Core.Security.Egress"), which guard only untrusted package
downloads, never a destination the operator configured.
-}
module Ecluse.Core.Queue.Sqs (
    -- * Configuration
    SqsConfig (..),
    SqsEndpoint (..),
    defaultSqsConfig,

    -- * The backend
    newSqsQueue,

    -- * Job wire mapping
    encodeJob,
    decodeJob,
    parseHashAlg,
) where

import Amazonka qualified as AWS

import Amazonka.SQS.ChangeMessageVisibility qualified as SQS
import Amazonka.SQS.DeleteMessage qualified as SQS
import Amazonka.SQS.ReceiveMessage qualified as SQS
import Amazonka.SQS.SendMessage qualified as SQS
import Amazonka.SQS.Types qualified as SQS
import Control.Monad.Trans.Resource (runResourceT)
import Data.Aeson (
    eitherDecodeStrict',
    object,
    withObject,
    (.:),
    (.:?),
    (.=),
 )
import Data.Aeson qualified as Aeson
import Data.Aeson.Types (Parser, parseEither)
import Lens.Micro ((?~), (^.))

import Ecluse.Core.Ecosystem (ecosystemName, parseEcosystem)
import Ecluse.Core.Package (
    Hash,
    HashAlg (Blake2b, MD5, SHA1, SHA256, SHA384, SHA512, SRI),
    hashAlg,
    hashValue,
    mkHash,
    mkPackageName,
    mkScope,
    pkgEcosystem,
    pkgNamespace,
    unScope,
    unscopedName,
 )
import Ecluse.Core.Package.Integrity (renderHashAlg)
import Ecluse.Core.Queue (
    MirrorArtifact (MirrorArtifact, maFilename, maHashes, maSize),
    MirrorJob (..),
    MirrorQueue (..),
    QueueMessage (..),
    RemoteSpanContext (RemoteSpanContext, rscTraceparent, rscTracestate),
    Seconds (..),
    mkReceiptHandle,
    unReceiptHandle,
 )
import Ecluse.Core.Version (mkVersion, renderVersion)

{- | Where an SQS-compatible endpoint lives, for pointing the backend at a
non-default host: a local emulator (@ministack@) in tests, or a VPC endpoint. A
non-default host: a local emulator (@ministack@) in tests, or a VPC endpoint.
-}
data SqsEndpoint = SqsEndpoint
    { SqsEndpoint -> Bool
endpointSecure :: Bool
    -- ^ Whether to connect over HTTPS (an emulator is usually plain HTTP).
    , SqsEndpoint -> Text
endpointHost :: Text
    -- ^ The host to connect to (e.g. @"localhost"@).
    , SqsEndpoint -> Int
endpointPort :: Int
    -- ^ The port to connect to (e.g. @4566@ for ministack).
    }
    deriving stock (SqsEndpoint -> SqsEndpoint -> Bool
(SqsEndpoint -> SqsEndpoint -> Bool)
-> (SqsEndpoint -> SqsEndpoint -> Bool) -> Eq SqsEndpoint
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: SqsEndpoint -> SqsEndpoint -> Bool
== :: SqsEndpoint -> SqsEndpoint -> Bool
$c/= :: SqsEndpoint -> SqsEndpoint -> Bool
/= :: SqsEndpoint -> SqsEndpoint -> Bool
Eq, Int -> SqsEndpoint -> ShowS
[SqsEndpoint] -> ShowS
SqsEndpoint -> String
(Int -> SqsEndpoint -> ShowS)
-> (SqsEndpoint -> String)
-> ([SqsEndpoint] -> ShowS)
-> Show SqsEndpoint
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> SqsEndpoint -> ShowS
showsPrec :: Int -> SqsEndpoint -> ShowS
$cshow :: SqsEndpoint -> String
show :: SqsEndpoint -> String
$cshowList :: [SqsEndpoint] -> ShowS
showList :: [SqsEndpoint] -> ShowS
Show)

{- | What the SQS backend needs. The batch size, long-poll window, and visibility
timeout are provider knobs (see "Ecluse.Core.Queue") with defaults in
'defaultSqsConfig'.
-}
data SqsConfig = SqsConfig
    { SqsConfig -> Text
sqsQueueUrl :: Text
    -- ^ The fully-qualified SQS queue URL mirror jobs are sent to and received from.
    , SqsConfig -> Text
sqsRegion :: Text
    -- ^ The AWS region the queue lives in (e.g. @"us-east-1"@).
    , SqsConfig -> Maybe SqsEndpoint
sqsEndpoint :: Maybe SqsEndpoint
    {- ^ An endpoint override for an emulator or VPC endpoint; 'Nothing' uses
    @amazonka@'s default resolution and the ambient credential chain.
    -}
    , SqsConfig -> Int
sqsBatchSize :: Int
    {- ^ Maximum messages to pull per 'receive' (SQS caps this at 10). A larger
    batch amortises the round-trip when the queue is busy.
    -}
    , SqsConfig -> Int
sqsWaitSeconds :: Int
    {- ^ The long-poll window in seconds (SQS caps this at 20): how long a
    'receive' waits for a message before returning @[]@, so an idle worker does
    not hot-loop on empty polls.
    -}
    , SqsConfig -> Seconds
sqsVisibilityTimeout :: Seconds
    {- ^ How long a received message stays hidden from other 'receive's before SQS
    redelivers it -- the budget for processing-then-'ack', extendable per message
    via 'extendVisibility'.
    -}
    }
    deriving stock (SqsConfig -> SqsConfig -> Bool
(SqsConfig -> SqsConfig -> Bool)
-> (SqsConfig -> SqsConfig -> Bool) -> Eq SqsConfig
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: SqsConfig -> SqsConfig -> Bool
== :: SqsConfig -> SqsConfig -> Bool
$c/= :: SqsConfig -> SqsConfig -> Bool
/= :: SqsConfig -> SqsConfig -> Bool
Eq, Int -> SqsConfig -> ShowS
[SqsConfig] -> ShowS
SqsConfig -> String
(Int -> SqsConfig -> ShowS)
-> (SqsConfig -> String)
-> ([SqsConfig] -> ShowS)
-> Show SqsConfig
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> SqsConfig -> ShowS
showsPrec :: Int -> SqsConfig -> ShowS
$cshow :: SqsConfig -> String
show :: SqsConfig -> String
$cshowList :: [SqsConfig] -> ShowS
showList :: [SqsConfig] -> ShowS
Show)

{- | A 'SqsConfig' for a queue URL and region with the provider knobs at sane
defaults: a full batch of 10, the maximum 20-second long poll, and a 30-second
visibility timeout. Override the record fields to tune them, or set 'sqsEndpoint'
to target an emulator.
-}
defaultSqsConfig :: Text -> Text -> SqsConfig
defaultSqsConfig :: Text -> Text -> SqsConfig
defaultSqsConfig Text
queueUrl Text
region =
    SqsConfig
        { sqsQueueUrl :: Text
sqsQueueUrl = Text
queueUrl
        , sqsRegion :: Text
sqsRegion = Text
region
        , sqsEndpoint :: Maybe SqsEndpoint
sqsEndpoint = Maybe SqsEndpoint
forall a. Maybe a
Nothing
        , sqsBatchSize :: Int
sqsBatchSize = Int
10
        , sqsWaitSeconds :: Int
sqsWaitSeconds = Int
20
        , sqsVisibilityTimeout :: Seconds
sqsVisibilityTimeout = Int -> Seconds
Seconds Int
30
        }

{- | Build an SQS-backed 'MirrorQueue'. The @amazonka@ 'AWS.Env' is constructed
once here -- region-scoped, and pointed at 'sqsEndpoint' with its throwaway
credentials when one is given, otherwise discovering the ambient AWS credential
chain -- and captured by the returned handle's closures.
-}
newSqsQueue :: SqsConfig -> IO MirrorQueue
newSqsQueue :: SqsConfig -> IO MirrorQueue
newSqsQueue SqsConfig
cfg = do
    env <- SqsConfig -> IO Env
mkEnv SqsConfig
cfg
    let run :: (AWS.AWSRequest a) => a -> IO (AWS.AWSResponse a)
        run = ResourceT IO (AWSResponse a) -> IO (AWSResponse a)
forall (m :: * -> *) a. MonadUnliftIO m => ResourceT m a -> m a
runResourceT (ResourceT IO (AWSResponse a) -> IO (AWSResponse a))
-> (a -> ResourceT IO (AWSResponse a)) -> a -> IO (AWSResponse a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env -> a -> ResourceT IO (AWSResponse a)
forall (m :: * -> *) a.
(MonadResource m, AWSRequest a) =>
Env -> a -> m (AWSResponse a)
AWS.send Env
env
        queueUrl = SqsConfig -> Text
sqsQueueUrl SqsConfig
cfg
    pure
        MirrorQueue
            { enqueue = void . run . SQS.newSendMessage queueUrl . encodeJob
            , receive = do
                response <- run (receiveRequest cfg)
                let messages = [Message] -> Maybe [Message] -> [Message]
forall a. a -> Maybe a -> a
fromMaybe [] (ReceiveMessageResponse
response ReceiveMessageResponse
-> Getting
     (Maybe [Message]) ReceiveMessageResponse (Maybe [Message])
-> Maybe [Message]
forall s a. s -> Getting a s a -> a
^. Getting (Maybe [Message]) ReceiveMessageResponse (Maybe [Message])
Lens' ReceiveMessageResponse (Maybe [Message])
SQS.receiveMessageResponse_messages)
                pure (mapMaybe toQueueMessage messages)
            , ack = void . run . SQS.newDeleteMessage queueUrl . unReceiptHandle
            , extendVisibility = \ReceiptHandle
receipt (Seconds Int
secs) ->
                IO ChangeMessageVisibilityResponse -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ChangeMessageVisibilityResponse -> IO ())
-> (ChangeMessageVisibility -> IO ChangeMessageVisibilityResponse)
-> ChangeMessageVisibility
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ChangeMessageVisibility -> IO (AWSResponse ChangeMessageVisibility)
ChangeMessageVisibility -> IO ChangeMessageVisibilityResponse
forall a. AWSRequest a => a -> IO (AWSResponse a)
run (ChangeMessageVisibility -> IO ())
-> ChangeMessageVisibility -> IO ()
forall a b. (a -> b) -> a -> b
$
                    Text -> Text -> Int -> ChangeMessageVisibility
SQS.newChangeMessageVisibility Text
queueUrl (ReceiptHandle -> Text
unReceiptHandle ReceiptHandle
receipt) Int
secs
            }

-- Build the region-scoped, optionally endpoint-overridden amazonka environment.
mkEnv :: SqsConfig -> IO AWS.Env
mkEnv :: SqsConfig -> IO Env
mkEnv SqsConfig
cfg = case SqsConfig -> Maybe SqsEndpoint
sqsEndpoint SqsConfig
cfg of
    Just SqsEndpoint
ep -> do
        base <- Env -> Env
regioned (Env -> Env) -> IO Env -> IO Env
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (EnvNoAuth -> IO Env) -> IO Env
forall (m :: * -> *). MonadIO m => (EnvNoAuth -> m Env) -> m Env
AWS.newEnv EnvNoAuth -> IO Env
forall (m :: * -> *) (withAuth :: * -> *).
(MonadCatch m, MonadIO m, Foldable withAuth) =>
Env' withAuth -> m Env
AWS.discover
        pure (configured ep base)
    Maybe SqsEndpoint
Nothing -> Env -> Env
regioned (Env -> Env) -> IO Env -> IO Env
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (EnvNoAuth -> IO Env) -> IO Env
forall (m :: * -> *). MonadIO m => (EnvNoAuth -> m Env) -> m Env
AWS.newEnv EnvNoAuth -> IO Env
forall (m :: * -> *) (withAuth :: * -> *).
(MonadCatch m, MonadIO m, Foldable withAuth) =>
Env' withAuth -> m Env
AWS.discover
  where
    regioned :: AWS.Env -> AWS.Env
    regioned :: Env -> Env
regioned Env
env = Env
env{AWS.region = AWS.Region' (sqsRegion cfg)}

    configured :: SqsEndpoint -> AWS.Env -> AWS.Env
    configured :: SqsEndpoint -> Env -> Env
configured SqsEndpoint
ep =
        Service -> Env -> Env
forall (withAuth :: * -> *).
Service -> Env' withAuth -> Env' withAuth
AWS.configureService
            ( Bool -> ByteString -> Int -> Service -> Service
AWS.setEndpoint
                (SqsEndpoint -> Bool
endpointSecure SqsEndpoint
ep)
                (Text -> ByteString
forall a b. ConvertUtf8 a b => a -> b
encodeUtf8 (SqsEndpoint -> Text
endpointHost SqsEndpoint
ep))
                (SqsEndpoint -> Int
endpointPort SqsEndpoint
ep)
                Service
SQS.defaultService
            )

-- One long-poll ReceiveMessage with the configured batch / wait / visibility.
-- SQS caps the long-poll ('sqsWaitSeconds') at 20s, which stays within amazonka's
-- default per-service request timeout, so the client never cuts a long-poll short
-- and no explicit response-timeout override is needed; a configured wait above the
-- SQS cap is clamped by SQS, so the relationship cannot be broken from config.
receiveRequest :: SqsConfig -> SQS.ReceiveMessage
receiveRequest :: SqsConfig -> ReceiveMessage
receiveRequest SqsConfig
cfg =
    Text -> ReceiveMessage
SQS.newReceiveMessage (SqsConfig -> Text
sqsQueueUrl SqsConfig
cfg)
        ReceiveMessage
-> (ReceiveMessage -> ReceiveMessage) -> ReceiveMessage
forall a b. a -> (a -> b) -> b
& (Maybe Int -> Identity (Maybe Int))
-> ReceiveMessage -> Identity ReceiveMessage
Lens' ReceiveMessage (Maybe Int)
SQS.receiveMessage_maxNumberOfMessages
        ((Maybe Int -> Identity (Maybe Int))
 -> ReceiveMessage -> Identity ReceiveMessage)
-> Int -> ReceiveMessage -> ReceiveMessage
forall s t a b. ASetter s t a (Maybe b) -> b -> s -> t
?~ SqsConfig -> Int
sqsBatchSize SqsConfig
cfg
            ReceiveMessage
-> (ReceiveMessage -> ReceiveMessage) -> ReceiveMessage
forall a b. a -> (a -> b) -> b
& (Maybe Int -> Identity (Maybe Int))
-> ReceiveMessage -> Identity ReceiveMessage
Lens' ReceiveMessage (Maybe Int)
SQS.receiveMessage_waitTimeSeconds
        ((Maybe Int -> Identity (Maybe Int))
 -> ReceiveMessage -> Identity ReceiveMessage)
-> Int -> ReceiveMessage -> ReceiveMessage
forall s t a b. ASetter s t a (Maybe b) -> b -> s -> t
?~ SqsConfig -> Int
sqsWaitSeconds SqsConfig
cfg
            ReceiveMessage
-> (ReceiveMessage -> ReceiveMessage) -> ReceiveMessage
forall a b. a -> (a -> b) -> b
& (Maybe Int -> Identity (Maybe Int))
-> ReceiveMessage -> Identity ReceiveMessage
Lens' ReceiveMessage (Maybe Int)
SQS.receiveMessage_visibilityTimeout
        ((Maybe Int -> Identity (Maybe Int))
 -> ReceiveMessage -> Identity ReceiveMessage)
-> Int -> ReceiveMessage -> ReceiveMessage
forall s t a b. ASetter s t a (Maybe b) -> b -> s -> t
?~ Int
visibilitySeconds
  where
    Seconds Int
visibilitySeconds = SqsConfig -> Seconds
sqsVisibilityTimeout SqsConfig
cfg

{- Lift one SQS Message into a QueueMessage. A message missing its body or
receipt handle (which SQS always supplies) is dropped rather than crashing the
poll; likewise an undecodable body -- the visibility timeout then redelivers it,
and a persistently bad message falls to the dead-letter queue. -}
toQueueMessage :: SQS.Message -> Maybe QueueMessage
toQueueMessage :: Message -> Maybe QueueMessage
toQueueMessage Message
message = do
    body <- Message
message Message -> Getting (Maybe Text) Message (Maybe Text) -> Maybe Text
forall s a. s -> Getting a s a -> a
^. Getting (Maybe Text) Message (Maybe Text)
Lens' Message (Maybe Text)
SQS.message_body
    receipt <- message ^. SQS.message_receiptHandle
    job <- rightToMaybe (decodeJob body)
    pure QueueMessage{msgJob = job, msgReceipt = mkReceiptHandle receipt}

{- | Encode a 'MirrorJob' as the JSON text of an SQS message body. The inverse of
'decodeJob': the package identity is split into its ecosystem, optional scope, and
bare name so it round-trips through 'mkPackageName', and the version keeps its raw
string. The serve-time-admitted artifact descriptor ('jobArtifact') -- the filename,
the integrity digests, and the declared size -- round-trips as a nested object so the
worker has the digest to verify the fetched bytes against and the inputs to assemble
the publish document.
-}
encodeJob :: MirrorJob -> Text
encodeJob :: MirrorJob -> Text
encodeJob MirrorJob
job =
    ByteString -> Text
forall a b. ConvertUtf8 a b => b -> a
decodeUtf8 (ByteString -> Text) -> (Value -> ByteString) -> Value -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Value -> ByteString
forall a. ToJSON a => a -> ByteString
Aeson.encode (Value -> Text) -> Value -> Text
forall a b. (a -> b) -> a -> b
$
        [Pair] -> Value
object
            [ Key
"ecosystem" Key -> Text -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= Ecosystem -> Text
ecosystemName (PackageName -> Ecosystem
pkgEcosystem PackageName
name)
            , Key
"scope" Key -> Maybe Text -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= (Scope -> Text
unScope (Scope -> Text) -> Maybe Scope -> Maybe Text
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> PackageName -> Maybe Scope
pkgNamespace PackageName
name)
            , Key
"name" Key -> Text -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= PackageName -> Text
unscopedName PackageName
name
            , Key
"version" Key -> Text -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= Version -> Text
renderVersion (MirrorJob -> Version
jobVersion MirrorJob
job)
            , Key
"artifactUrl" Key -> Text -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= MirrorJob -> Text
jobArtifactUrl MirrorJob
job
            , Key
"mirrorTarget" Key -> Text -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= MirrorJob -> Text
jobMirrorTarget MirrorJob
job
            , Key
"artifact" Key -> Value -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= MirrorArtifact -> Value
encodeArtifact (MirrorJob -> MirrorArtifact
jobArtifact MirrorJob
job)
            , Key
"traceContext" Key -> Maybe Value -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= (RemoteSpanContext -> Value
encodeTraceContext (RemoteSpanContext -> Value)
-> Maybe RemoteSpanContext -> Maybe Value
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MirrorJob -> Maybe RemoteSpanContext
jobTraceContext MirrorJob
job)
            ]
  where
    name :: PackageName
name = MirrorJob -> PackageName
jobPackage MirrorJob
job

-- Encode the optional enqueue-span trace-context carrier: the W3C traceparent and
-- tracestate verbatim, so the worker can re-establish the cross-async span link. A
-- 'Nothing' carrier (tracing was off at enqueue) serialises to a JSON null and
-- round-trips back to 'Nothing'.
encodeTraceContext :: RemoteSpanContext -> Aeson.Value
encodeTraceContext :: RemoteSpanContext -> Value
encodeTraceContext RemoteSpanContext
rsc =
    [Pair] -> Value
object
        [ Key
"traceparent" Key -> Text -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= RemoteSpanContext -> Text
rscTraceparent RemoteSpanContext
rsc
        , Key
"tracestate" Key -> Text -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= RemoteSpanContext -> Text
rscTracestate RemoteSpanContext
rsc
        ]

-- Encode the serve-time-admitted artifact descriptor: filename, the integrity
-- digests (each an algorithm-tagged value), and the declared size when known.
encodeArtifact :: MirrorArtifact -> Aeson.Value
encodeArtifact :: MirrorArtifact -> Value
encodeArtifact MirrorArtifact
artifact =
    [Pair] -> Value
object
        [ Key
"filename" Key -> Text -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= MirrorArtifact -> Text
maFilename MirrorArtifact
artifact
        , Key
"hashes" Key -> [Value] -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= (Hash -> Value) -> [Hash] -> [Value]
forall a b. (a -> b) -> [a] -> [b]
map Hash -> Value
encodeHash (NonEmpty Hash -> [Hash]
forall a. NonEmpty a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (MirrorArtifact -> NonEmpty Hash
maHashes MirrorArtifact
artifact))
        , Key
"size" Key -> Maybe Int -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= MirrorArtifact -> Maybe Int
maSize MirrorArtifact
artifact
        ]
  where
    encodeHash :: Hash -> Aeson.Value
    encodeHash :: Hash -> Value
encodeHash Hash
h = [Pair] -> Value
object [Key
"alg" Key -> Text -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= HashAlg -> Text
renderHashAlg (Hash -> HashAlg
hashAlg Hash
h), Key
"value" Key -> Text -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= Hash -> Text
hashValue Hash
h]

{- | Decode an SQS message body back into a 'MirrorJob', or a human-readable error
if the body is not the JSON object 'encodeJob' produces (a missing field, an
unknown ecosystem, an empty hash list, malformed JSON).
-}
decodeJob :: Text -> Either Text MirrorJob
decodeJob :: Text -> Either Text MirrorJob
decodeJob Text
body =
    (String -> Text) -> Either String Value -> Either Text Value
forall a b c. (a -> b) -> Either a c -> Either b c
forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first String -> Text
forall a. ToText a => a -> Text
toText (ByteString -> Either String Value
forall a. FromJSON a => ByteString -> Either String a
eitherDecodeStrict' (Text -> ByteString
forall a b. ConvertUtf8 a b => a -> b
encodeUtf8 Text
body))
        Either Text Value
-> (Value -> Either Text MirrorJob) -> Either Text MirrorJob
forall a b. Either Text a -> (a -> Either Text b) -> Either Text b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (String -> Text)
-> Either String MirrorJob -> Either Text MirrorJob
forall a b c. (a -> b) -> Either a c -> Either b c
forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first String -> Text
forall a. ToText a => a -> Text
toText (Either String MirrorJob -> Either Text MirrorJob)
-> (Value -> Either String MirrorJob)
-> Value
-> Either Text MirrorJob
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Value -> Parser MirrorJob) -> Value -> Either String MirrorJob
forall a b. (a -> Parser b) -> a -> Either String b
parseEither Value -> Parser MirrorJob
parseMirrorJob

-- Parse the top-level job object 'encodeJob' writes, delegating the nested
-- carriers to 'parseArtifact' and 'parseTraceContext'.
parseMirrorJob :: Aeson.Value -> Parser MirrorJob
parseMirrorJob :: Value -> Parser MirrorJob
parseMirrorJob = String -> (Object -> Parser MirrorJob) -> Value -> Parser MirrorJob
forall a. String -> (Object -> Parser a) -> Value -> Parser a
withObject String
"MirrorJob" ((Object -> Parser MirrorJob) -> Value -> Parser MirrorJob)
-> (Object -> Parser MirrorJob) -> Value -> Parser MirrorJob
forall a b. (a -> b) -> a -> b
$ \Object
o -> do
    ecoName <- Object
o Object -> Key -> Parser Text
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"ecosystem"
    eco <- maybe (fail (unknownEcosystem ecoName)) pure (parseEcosystem ecoName)
    scope <- o .:? "scope"
    rawName <- o .: "name"
    rawVersion <- o .: "version"
    artifactUrl <- o .: "artifactUrl"
    mirrorTarget <- o .: "mirrorTarget"
    artifact <- o .: "artifact" >>= parseArtifact
    -- The trace-context carrier is optional: a job from an older producer (or one
    -- enqueued with tracing off) carries no "traceContext", which decodes to
    -- 'Nothing' and simply yields no span link in the worker.
    traceContext <- o .:? "traceContext" >>= traverse parseTraceContext
    pure
        MirrorJob
            { jobPackage = mkPackageName eco (mkScope <$> scope) rawName
            , jobVersion = mkVersion eco rawVersion
            , jobArtifactUrl = artifactUrl
            , jobMirrorTarget = mirrorTarget
            , jobArtifact = artifact
            , jobTraceContext = traceContext
            }
  where
    unknownEcosystem :: Text -> a
unknownEcosystem Text
n = a
"unknown ecosystem " a -> a -> a
forall a. Semigroup a => a -> a -> a
<> Text -> a
forall b a. (Show a, IsString b) => a -> b
show (Text
n :: Text)

-- Parse the optional trace-context carrier back into a 'RemoteSpanContext': the W3C
-- traceparent and tracestate verbatim. The carrier is untrusted opaque transport, so
-- both fields are taken as-is -- an unparseable W3C value is the tracing port's concern
-- (it yields no link), never a decode failure that would strand a serviceable job.
parseTraceContext :: Aeson.Value -> Parser RemoteSpanContext
parseTraceContext :: Value -> Parser RemoteSpanContext
parseTraceContext = String
-> (Object -> Parser RemoteSpanContext)
-> Value
-> Parser RemoteSpanContext
forall a. String -> (Object -> Parser a) -> Value -> Parser a
withObject String
"RemoteSpanContext" ((Object -> Parser RemoteSpanContext)
 -> Value -> Parser RemoteSpanContext)
-> (Object -> Parser RemoteSpanContext)
-> Value
-> Parser RemoteSpanContext
forall a b. (a -> b) -> a -> b
$ \Object
t ->
    Text -> Text -> RemoteSpanContext
RemoteSpanContext (Text -> Text -> RemoteSpanContext)
-> Parser Text -> Parser (Text -> RemoteSpanContext)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Object
t Object -> Key -> Parser Text
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"traceparent" Parser (Text -> RemoteSpanContext)
-> Parser Text -> Parser RemoteSpanContext
forall a b. Parser (a -> b) -> Parser a -> Parser b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
t Object -> Key -> Parser Text
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"tracestate"

-- Parse the nested artifact descriptor, failing on an empty hash list (the
-- 'NonEmpty' invariant the serve path upholds -- a job must carry a digest to verify
-- against).
parseArtifact :: Aeson.Value -> Parser MirrorArtifact
parseArtifact :: Value -> Parser MirrorArtifact
parseArtifact = String
-> (Object -> Parser MirrorArtifact)
-> Value
-> Parser MirrorArtifact
forall a. String -> (Object -> Parser a) -> Value -> Parser a
withObject String
"MirrorArtifact" ((Object -> Parser MirrorArtifact)
 -> Value -> Parser MirrorArtifact)
-> (Object -> Parser MirrorArtifact)
-> Value
-> Parser MirrorArtifact
forall a b. (a -> b) -> a -> b
$ \Object
o -> do
    filename <- Object
o Object -> Key -> Parser Text
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"filename"
    rawHashes <- o .: "hashes" >>= traverse parseHash
    size <- o .:? "size"
    case nonEmpty rawHashes of
        Maybe (NonEmpty Hash)
Nothing -> String -> Parser MirrorArtifact
forall a. String -> Parser a
forall (m :: * -> *) a. MonadFail m => String -> m a
fail String
"MirrorArtifact carries no integrity digest"
        Just NonEmpty Hash
hashes ->
            MirrorArtifact -> Parser MirrorArtifact
forall a. a -> Parser a
forall (f :: * -> *) a. Applicative f => a -> f a
pure MirrorArtifact{maFilename :: Text
maFilename = Text
filename, maHashes :: NonEmpty Hash
maHashes = NonEmpty Hash
hashes, maSize :: Maybe Int
maSize = Maybe Int
size}

-- Parse one algorithm-tagged digest from the artifact descriptor's hash list.
parseHash :: Aeson.Value -> Parser Hash
parseHash :: Value -> Parser Hash
parseHash = String -> (Object -> Parser Hash) -> Value -> Parser Hash
forall a. String -> (Object -> Parser a) -> Value -> Parser a
withObject String
"Hash" ((Object -> Parser Hash) -> Value -> Parser Hash)
-> (Object -> Parser Hash) -> Value -> Parser Hash
forall a b. (a -> b) -> a -> b
$ \Object
h -> do
    algName <- Object
h Object -> Key -> Parser Text
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"alg"
    alg <- maybe (fail (unknownAlg algName)) pure (parseHashAlg algName)
    value <- h .: "value"
    -- The queue is a trust boundary: validate the digest on decode through the same
    -- 'mkHash' the serve path uses, so the worker can never ingest a malformed digest
    -- to verify the fetched bytes against. A malformed value fails the decode (the job
    -- is left un-acked and redelivers, ultimately to the dead-letter queue).
    either (fail . toString) pure (mkHash alg value)
  where
    unknownAlg :: Text -> a
unknownAlg Text
n = a
"unknown hash algorithm " a -> a -> a
forall a. Semigroup a => a -> a -> a
<> Text -> a
forall b a. (Show a, IsString b) => a -> b
show (Text
n :: Text)

-- Decode a wire algorithm name back to its 'HashAlg' -- the inverse of 'renderHashAlg'
-- over the SQS message vocabulary, including the @sri@ wrapper an npm @dist.integrity@
-- digest rides under. An exact match on a name a digest is serialized under, so a
-- well-formed message round-trips and an unrecognised name yields 'Nothing' (the job
-- is then rejected with a digest the worker could never have verified against).
parseHashAlg :: Text -> Maybe HashAlg
parseHashAlg :: Text -> Maybe HashAlg
parseHashAlg = \case
    Text
"sha1" -> HashAlg -> Maybe HashAlg
forall a. a -> Maybe a
Just HashAlg
SHA1
    Text
"sha256" -> HashAlg -> Maybe HashAlg
forall a. a -> Maybe a
Just HashAlg
SHA256
    Text
"sha384" -> HashAlg -> Maybe HashAlg
forall a. a -> Maybe a
Just HashAlg
SHA384
    Text
"sha512" -> HashAlg -> Maybe HashAlg
forall a. a -> Maybe a
Just HashAlg
SHA512
    Text
"md5" -> HashAlg -> Maybe HashAlg
forall a. a -> Maybe a
Just HashAlg
MD5
    Text
"blake2b" -> HashAlg -> Maybe HashAlg
forall a. a -> Maybe a
Just HashAlg
Blake2b
    Text
"sri" -> HashAlg -> Maybe HashAlg
forall a. a -> Maybe a
Just HashAlg
SRI
    Text
_ -> Maybe HashAlg
forall a. Maybe a
Nothing