{- | The advisory database's sync mechanics: detect a new @osv.db@ artifact in
object storage, download it bounded, verify it, and shadow-swap it into the
read path, one ecosystem per task, driven by the configured mounts.

The write side of "Ecluse.Core.Cve.Slot": 'syncStep' performs exactly one
detect-download-verify-swap cycle over an injected 'CveFetch' (so unit tests
drive it without a network), and 'runCveSync' schedules those steps: an eager
__boot burst__ (an immediate attempt, retried with incremental backoff, that is
eventually allowed to fail so a broken bucket never wedges startup) followed by
the steady ETag poll. The proxy is rules-engine complete as early as the
artifact can be had; before then it serves deny-by-default.

The swap's file discipline: the download lands in a temp file beside the
canonical per-ecosystem path, and 'Ecluse.Core.Cve.openCveDb' verifies the
temp file (epoch stamp, table shape, ecosystem), the artifact contract's
verify-before-swap. __The connection that verified is the connection that
serves__: the accepted temp file is renamed atomically onto the canonical
name, the open connection follows the inode through the rename, and that same
'CveDb' is swapped in; there is no reopen and so no verify-to-serve gap. The
displaced generation drains and closes inside 'Ecluse.Core.Cve.Slot.swapIn', releasing
the old inode's last reference; reclamation is the kernel's, never a delete
this code could mistime. A rejected artifact is deleted, its ETag remembered
(re-downloading a known-bad object every poll buys nothing), and the last-good
generation keeps serving.
-}
module Ecluse.Core.Cve.Sync (
    -- * The injected transport
    CveFetch (..),
    DbEtag (..),
    OsvDbFetchFault (..),
    s3CveFetch,
    cappedAt,

    -- * One sync cycle
    SyncEnv (..),
    SyncOutcome (..),
    syncStep,

    -- * The scheduled task
    SyncSchedule (..),
    runCveSync,
    bootBackoffDelays,
) where

import Conduit (ConduitT, await, runResourceT, yield, (.|))
import Data.ByteString qualified as BS
import Data.Conduit.Combinators qualified as C
import Katip (KatipContext, Severity (DebugS, ErrorS, InfoS, WarningS), logFM, ls)
import Network.HTTP.Types.Status (statusCode)
import System.Directory (removeFile, renameFile)
import UnliftIO (MonadUnliftIO, tryAny)
import UnliftIO.Concurrent (threadDelay)
import UnliftIO.Exception (catchAny, mask, onException, throwIO)

import Amazonka qualified as AWS
import Amazonka.S3 qualified as S3
import Amazonka.S3.Lens qualified as S3L
import Lens.Micro ((^.))

import Ecluse.Core.Cve (CveDb (cveDbClose, cveDbMeta), CveDbRejected, openCveDb)
import Ecluse.Core.Cve.Slot (CveSlot, swapIn)
import Ecluse.Core.Ecosystem (Ecosystem)

{- | An artifact version marker: S3's ETag, opaque text compared for equality
only. Two objects with equal ETags carry equal bytes, so an unchanged ETag is
"nothing to do" and a rejected artifact's remembered ETag is "still the same
bad artifact".
-}
newtype DbEtag = DbEtag Text
    deriving stock (DbEtag -> DbEtag -> Bool
(DbEtag -> DbEtag -> Bool)
-> (DbEtag -> DbEtag -> Bool) -> Eq DbEtag
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: DbEtag -> DbEtag -> Bool
== :: DbEtag -> DbEtag -> Bool
$c/= :: DbEtag -> DbEtag -> Bool
/= :: DbEtag -> DbEtag -> Bool
Eq, Int -> DbEtag -> ShowS
[DbEtag] -> ShowS
DbEtag -> String
(Int -> DbEtag -> ShowS)
-> (DbEtag -> String) -> ([DbEtag] -> ShowS) -> Show DbEtag
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> DbEtag -> ShowS
showsPrec :: Int -> DbEtag -> ShowS
$cshow :: DbEtag -> String
show :: DbEtag -> String
$cshowList :: [DbEtag] -> ShowS
showList :: [DbEtag] -> ShowS
Show)

{- | The sync transport, as data: how to learn the remote artifact's current
version and how to fetch its bytes. Injected so 'syncStep' is unit-testable
without a network; the composition root supplies 's3CveFetch'.
-}
data CveFetch = CveFetch
    { CveFetch -> IO (Maybe DbEtag)
fetchHeadEtag :: IO (Maybe DbEtag)
    {- ^ The remote artifact's current ETag; 'Nothing' when the object does not
    exist (not yet published for this ecosystem). A transport fault throws.
    -}
    , CveFetch -> String -> IO DbEtag
fetchDownload :: FilePath -> IO DbEtag
    {- ^ Download the artifact to the given path (byte-bounded) and return the
    ETag of the bytes actually fetched, the download's own rather than an
    earlier @HEAD@'s, so a publish racing the poll is recorded truthfully.
    Throws on transport faults and on 'OsvDbFetchFault'.
    -}
    }

{- | A download refused by this side: the object oversteps the configured byte
cap, or the response carried no ETag to record.
-}
data OsvDbFetchFault
    = -- | The object exceeds the configured byte cap (carried, in bytes).
      OsvDbTooLarge Int
    | -- | The response carried no ETag; nothing truthful to record.
      OsvDbNoEtag
    deriving stock (OsvDbFetchFault -> OsvDbFetchFault -> Bool
(OsvDbFetchFault -> OsvDbFetchFault -> Bool)
-> (OsvDbFetchFault -> OsvDbFetchFault -> Bool)
-> Eq OsvDbFetchFault
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: OsvDbFetchFault -> OsvDbFetchFault -> Bool
== :: OsvDbFetchFault -> OsvDbFetchFault -> Bool
$c/= :: OsvDbFetchFault -> OsvDbFetchFault -> Bool
/= :: OsvDbFetchFault -> OsvDbFetchFault -> Bool
Eq, Int -> OsvDbFetchFault -> ShowS
[OsvDbFetchFault] -> ShowS
OsvDbFetchFault -> String
(Int -> OsvDbFetchFault -> ShowS)
-> (OsvDbFetchFault -> String)
-> ([OsvDbFetchFault] -> ShowS)
-> Show OsvDbFetchFault
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> OsvDbFetchFault -> ShowS
showsPrec :: Int -> OsvDbFetchFault -> ShowS
$cshow :: OsvDbFetchFault -> String
show :: OsvDbFetchFault -> String
$cshowList :: [OsvDbFetchFault] -> ShowS
showList :: [OsvDbFetchFault] -> ShowS
Show)

instance Exception OsvDbFetchFault

-- | Everything one ecosystem's sync task operates on.
data SyncEnv = SyncEnv
    { SyncEnv -> CveFetch
syncFetch :: CveFetch
    -- ^ The transport for this ecosystem's object key.
    , SyncEnv -> Ecosystem
syncEcosystem :: Ecosystem
    -- ^ The ecosystem the artifact must verify as.
    , SyncEnv -> String
syncDbPath :: FilePath
    -- ^ The canonical on-disk artifact path (the stable per-ecosystem name).
    , SyncEnv -> CveSlot
syncSlot :: CveSlot
    -- ^ The slot this task's swaps publish to.
    }

{- | What one 'syncStep' concluded; the caller ('runCveSync') logs it and
decides scheduling. Failures of the transport itself surface as exceptions.
-}
data SyncOutcome
    = -- | A new artifact was verified and is now live (its ETag and provenance carried).
      SyncSwapped DbEtag [(Text, Text)]
    | -- | The remote ETag matches the last seen one; nothing to do.
      SyncUnchanged
    | -- | The object does not exist in the bucket (not yet published).
      SyncAbsent
    | {- | The artifact was downloaded and __refused__ by verification; the
      last-good generation keeps serving and the ETag is remembered.
      -}
      SyncRejected DbEtag CveDbRejected
    deriving stock (Int -> SyncOutcome -> ShowS
[SyncOutcome] -> ShowS
SyncOutcome -> String
(Int -> SyncOutcome -> ShowS)
-> (SyncOutcome -> String)
-> ([SyncOutcome] -> ShowS)
-> Show SyncOutcome
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> SyncOutcome -> ShowS
showsPrec :: Int -> SyncOutcome -> ShowS
$cshow :: SyncOutcome -> String
show :: SyncOutcome -> String
$cshowList :: [SyncOutcome] -> ShowS
showList :: [SyncOutcome] -> ShowS
Show)

{- | One detect-download-verify-swap cycle against the last seen ETag. Total
over verification (a refused artifact is an outcome, not an exception);
transport faults propagate for the caller to log and retry. See the module
header for the file discipline.
-}
syncStep :: SyncEnv -> Maybe DbEtag -> IO SyncOutcome
syncStep :: SyncEnv -> Maybe DbEtag -> IO SyncOutcome
syncStep SyncEnv
env Maybe DbEtag
lastSeen =
    CveFetch -> IO (Maybe DbEtag)
fetchHeadEtag (SyncEnv -> CveFetch
syncFetch SyncEnv
env) IO (Maybe DbEtag)
-> (Maybe DbEtag -> IO SyncOutcome) -> IO SyncOutcome
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        Maybe DbEtag
Nothing -> SyncOutcome -> IO SyncOutcome
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SyncOutcome
SyncAbsent
        Just DbEtag
remote
            | DbEtag -> Maybe DbEtag
forall a. a -> Maybe a
Just DbEtag
remote Maybe DbEtag -> Maybe DbEtag -> Bool
forall a. Eq a => a -> a -> Bool
== Maybe DbEtag
lastSeen -> SyncOutcome -> IO SyncOutcome
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SyncOutcome
SyncUnchanged
            | Bool
otherwise -> SyncEnv -> IO SyncOutcome
syncNewArtifact SyncEnv
env

syncNewArtifact :: SyncEnv -> IO SyncOutcome
syncNewArtifact :: SyncEnv -> IO SyncOutcome
syncNewArtifact SyncEnv
env = do
    let temp :: String
temp = SyncEnv -> String
syncDbPath SyncEnv
env String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
".tmp"
    fetched <- CveFetch -> String -> IO DbEtag
fetchDownload (SyncEnv -> CveFetch
syncFetch SyncEnv
env) String
temp IO DbEtag -> IO () -> IO DbEtag
forall (m :: * -> *) a b. MonadUnliftIO m => m a -> m b -> m a
`onException` String -> IO ()
discardTemp String
temp
    opened <- openCveDb (syncEcosystem env) temp `onException` discardTemp temp
    case opened of
        Left CveDbRejected
rejection -> do
            String -> IO ()
discardTemp String
temp
            SyncOutcome -> IO SyncOutcome
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (DbEtag -> CveDbRejected -> SyncOutcome
SyncRejected DbEtag
fetched CveDbRejected
rejection)
        Right CveDb
db -> SyncEnv -> String -> DbEtag -> CveDb -> IO SyncOutcome
publishVerified SyncEnv
env String
temp DbEtag
fetched CveDb
db

publishVerified :: SyncEnv -> FilePath -> DbEtag -> CveDb -> IO SyncOutcome
publishVerified :: SyncEnv -> String -> DbEtag -> CveDb -> IO SyncOutcome
publishVerified SyncEnv
env String
temp DbEtag
fetched CveDb
db = ((forall a. IO a -> IO a) -> IO SyncOutcome) -> IO SyncOutcome
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> m a) -> m b) -> m b
mask (((forall a. IO a -> IO a) -> IO SyncOutcome) -> IO SyncOutcome)
-> ((forall a. IO a -> IO a) -> IO SyncOutcome) -> IO SyncOutcome
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore -> do
    -- The verified connection follows the inode through the rename; the
    -- canonical name now holds the newest accepted artifact and the temp name
    -- is gone. Up to here this side still owns the connection, so a failure
    -- closes it and discards the download.
    IO () -> IO ()
forall a. IO a -> IO a
restore (String -> String -> IO ()
renameFile String
temp (SyncEnv -> String
syncDbPath SyncEnv
env))
        IO () -> IO () -> IO ()
forall (m :: * -> *) a b. MonadUnliftIO m => m a -> m b -> m a
`onException` (CveDb -> IO ()
cveDbClose CveDb
db IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> String -> IO ()
discardTemp String
temp)
    -- 'swapIn' publishes atomically before it retires the displaced
    -- generation, and owns the connection from entry: a failure or
    -- cancellation while the displaced generation drains must never close the
    -- newly live database, so no cleanup wraps it. The mask pins the
    -- ownership handoff; the drain wait inside stays interruptible.
    CveSlot -> CveDb -> IO ()
swapIn (SyncEnv -> CveSlot
syncSlot SyncEnv
env) CveDb
db
    SyncOutcome -> IO SyncOutcome
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (DbEtag -> [(Text, Text)] -> SyncOutcome
SyncSwapped DbEtag
fetched (CveDb -> [(Text, Text)]
cveDbMeta CveDb
db))

-- Best-effort: the temp may already be renamed away or never created.
discardTemp :: FilePath -> IO ()
discardTemp :: String -> IO ()
discardTemp String
temp = String -> IO ()
removeFile String
temp IO () -> (SomeException -> IO ()) -> IO ()
forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> (SomeException -> m a) -> m a
`catchAny` IO () -> SomeException -> IO ()
forall a b. a -> b -> a
const IO ()
forall (f :: * -> *). Applicative f => f ()
pass

{- | The task's timing: the boot burst's backoff delays and the steady poll
interval, both in microseconds. The composition root ships 'bootBackoffDelays'
and the configured poll interval; tests inject tiny values.
-}
data SyncSchedule = SyncSchedule
    { SyncSchedule -> [Int]
schedBootBackoff :: [Int]
    -- ^ Delays before each boot-burst retry; the list's length is the budget.
    , SyncSchedule -> Int
schedPollDelay :: Int
    -- ^ The steady ETag-poll interval.
    }

{- | The shipped boot-burst backoff: an immediate first attempt, then retries
after each of these, then the burst concedes and the steady poll takes over.
Constants by design; the poll interval is the operator-facing knob.
-}
bootBackoffDelays :: [Int]
bootBackoffDelays :: [Int]
bootBackoffDelays = [Int
1_000_000, Int
2_000_000, Int
4_000_000, Int
8_000_000, Int
16_000_000]

{- | One ecosystem's sync task: the boot burst, then the steady poll, forever.

The __boot burst__ attempts a sync immediately and retries per the schedule's
backoff until an artifact is live, so a healthy deployment is
rules-engine complete within seconds of boot. It concedes early on a
__rejected__ artifact (retrying the same bytes cannot end differently) and
gives up after the schedule with a warning. The proxy serves regardless, since
an empty slot only ever abstains into deny-by-default, and the poll keeps
trying.

Every iteration is supervised: a transport fault is caught and logged, never
fatal to the task. @notifyFirstSync@ runs after each successful swap (its
consumer, the readiness signal, is an idempotent one-way flip).
-}
runCveSync :: (MonadUnliftIO m, KatipContext m) => SyncEnv -> SyncSchedule -> IO () -> m ()
runCveSync :: forall (m :: * -> *).
(MonadUnliftIO m, KatipContext m) =>
SyncEnv -> SyncSchedule -> IO () -> m ()
runCveSync SyncEnv
env SyncSchedule
schedule IO ()
notifyFirstSync = do
    seen <- Maybe DbEtag -> [Int] -> m (Maybe DbEtag)
forall {m :: * -> *}.
(MonadUnliftIO m, KatipContext m) =>
Maybe DbEtag -> [Int] -> m (Maybe DbEtag)
burst Maybe DbEtag
forall a. Maybe a
Nothing (SyncSchedule -> [Int]
schedBootBackoff SyncSchedule
schedule)
    poll seen
  where
    eco :: Text
eco = Ecosystem -> Text
forall b a. (Show a, IsString b) => a -> b
show (SyncEnv -> Ecosystem
syncEcosystem SyncEnv
env) :: Text

    burst :: Maybe DbEtag -> [Int] -> m (Maybe DbEtag)
burst Maybe DbEtag
lastSeen [Int]
delays = do
        (settled, seen') <- SyncEnv -> Text -> IO () -> Maybe DbEtag -> m (Bool, Maybe DbEtag)
forall (m :: * -> *).
(MonadUnliftIO m, KatipContext m) =>
SyncEnv -> Text -> IO () -> Maybe DbEtag -> m (Bool, Maybe DbEtag)
supervisedStep SyncEnv
env Text
eco IO ()
notifyFirstSync Maybe DbEtag
lastSeen
        case (settled, delays) of
            (Bool
True, [Int]
_) -> Maybe DbEtag -> m (Maybe DbEtag)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe DbEtag
seen'
            (Bool
False, []) -> do
                Severity -> LogStr -> m ()
forall (m :: * -> *).
(Applicative m, KatipContext m) =>
Severity -> LogStr -> m ()
logFM Severity
WarningS (Text -> LogStr
forall a. StringConv a Text => a -> LogStr
ls (Text
"cve-sync[" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
eco Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"]: boot fetch did not produce an advisory database; continuing without one, polling"))
                Maybe DbEtag -> m (Maybe DbEtag)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe DbEtag
seen'
            (Bool
False, Int
d : [Int]
rest) -> do
                Int -> m ()
forall (m :: * -> *). MonadIO m => Int -> m ()
threadDelay Int
d
                Maybe DbEtag -> [Int] -> m (Maybe DbEtag)
burst Maybe DbEtag
seen' [Int]
rest

    poll :: Maybe DbEtag -> m b
poll Maybe DbEtag
lastSeen = do
        Int -> m ()
forall (m :: * -> *). MonadIO m => Int -> m ()
threadDelay (SyncSchedule -> Int
schedPollDelay SyncSchedule
schedule)
        (_, seen') <- SyncEnv -> Text -> IO () -> Maybe DbEtag -> m (Bool, Maybe DbEtag)
forall (m :: * -> *).
(MonadUnliftIO m, KatipContext m) =>
SyncEnv -> Text -> IO () -> Maybe DbEtag -> m (Bool, Maybe DbEtag)
supervisedStep SyncEnv
env Text
eco IO ()
notifyFirstSync Maybe DbEtag
lastSeen
        poll seen'

-- One supervised step: (the burst may stop, the ETag now last seen).
supervisedStep :: (MonadUnliftIO m, KatipContext m) => SyncEnv -> Text -> IO () -> Maybe DbEtag -> m (Bool, Maybe DbEtag)
supervisedStep :: forall (m :: * -> *).
(MonadUnliftIO m, KatipContext m) =>
SyncEnv -> Text -> IO () -> Maybe DbEtag -> m (Bool, Maybe DbEtag)
supervisedStep SyncEnv
env Text
eco IO ()
notifyFirstSync Maybe DbEtag
lastSeen =
    m SyncOutcome -> m (Either SomeException SyncOutcome)
forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> m (Either SomeException a)
tryAny (IO SyncOutcome -> m SyncOutcome
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (SyncEnv -> Maybe DbEtag -> IO SyncOutcome
syncStep SyncEnv
env Maybe DbEtag
lastSeen)) m (Either SomeException SyncOutcome)
-> (Either SomeException SyncOutcome -> m (Bool, Maybe DbEtag))
-> m (Bool, Maybe DbEtag)
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        Left SomeException
err -> do
            Severity -> LogStr -> m ()
forall (m :: * -> *).
(Applicative m, KatipContext m) =>
Severity -> LogStr -> m ()
logFM Severity
ErrorS (Text -> LogStr
forall a. StringConv a Text => a -> LogStr
ls (Text
"cve-sync[" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
eco Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"]: sync attempt failed: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> SomeException -> Text
forall b a. (Show a, IsString b) => a -> b
show SomeException
err))
            (Bool, Maybe DbEtag) -> m (Bool, Maybe DbEtag)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bool
False, Maybe DbEtag
lastSeen)
        Right (SyncSwapped DbEtag
etag [(Text, Text)]
meta) -> do
            Severity -> LogStr -> m ()
forall (m :: * -> *).
(Applicative m, KatipContext m) =>
Severity -> LogStr -> m ()
logFM Severity
InfoS (Text -> LogStr
forall a. StringConv a Text => a -> LogStr
ls (Text
"cve-sync[" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
eco Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"]: advisory database swapped in: etag=" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> DbEtag -> Text
forall b a. (Show a, IsString b) => a -> b
show DbEtag
etag Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" meta=" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> [(Text, Text)] -> Text
forall b a. (Show a, IsString b) => a -> b
show [(Text, Text)]
meta))
            IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO ()
notifyFirstSync
            (Bool, Maybe DbEtag) -> m (Bool, Maybe DbEtag)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bool
True, DbEtag -> Maybe DbEtag
forall a. a -> Maybe a
Just DbEtag
etag)
        Right SyncOutcome
SyncUnchanged -> do
            Severity -> LogStr -> m ()
forall (m :: * -> *).
(Applicative m, KatipContext m) =>
Severity -> LogStr -> m ()
logFM Severity
DebugS (Text -> LogStr
forall a. StringConv a Text => a -> LogStr
ls (Text
"cve-sync[" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
eco Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"]: advisory database unchanged"))
            (Bool, Maybe DbEtag) -> m (Bool, Maybe DbEtag)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bool
True, Maybe DbEtag
lastSeen)
        Right SyncOutcome
SyncAbsent -> do
            Severity -> LogStr -> m ()
forall (m :: * -> *).
(Applicative m, KatipContext m) =>
Severity -> LogStr -> m ()
logFM Severity
DebugS (Text -> LogStr
forall a. StringConv a Text => a -> LogStr
ls (Text
"cve-sync[" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
eco Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"]: no advisory database published yet"))
            (Bool, Maybe DbEtag) -> m (Bool, Maybe DbEtag)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bool
False, Maybe DbEtag
lastSeen)
        Right (SyncRejected DbEtag
etag CveDbRejected
rejection) -> do
            Severity -> LogStr -> m ()
forall (m :: * -> *).
(Applicative m, KatipContext m) =>
Severity -> LogStr -> m ()
logFM Severity
ErrorS (Text -> LogStr
forall a. StringConv a Text => a -> LogStr
ls (Text
"cve-sync[" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
eco Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"]: downloaded artifact refused (keeping last good): " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> CveDbRejected -> Text
forall b a. (Show a, IsString b) => a -> b
show CveDbRejected
rejection))
            -- Remembered so the same bad artifact is not re-downloaded
            -- every poll; a fixed re-publish carries a new ETag. The burst
            -- stops: retrying identical bytes cannot end differently.
            (Bool, Maybe DbEtag) -> m (Bool, Maybe DbEtag)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bool
True, DbEtag -> Maybe DbEtag
forall a. a -> Maybe a
Just DbEtag
etag)

{- | The real transport: S3 @HEAD@ for the ETag, bounded streaming @GET@ for
the bytes, against one bucket and key. A @404@ on @HEAD@ is the honest
'Nothing' (not yet published); every other service or transport fault throws
for the sync task's supervision to log.
-}
s3CveFetch :: AWS.Env -> Text -> Text -> Int -> CveFetch
s3CveFetch :: Env -> Text -> Text -> Int -> CveFetch
s3CveFetch Env
awsEnv Text
bucket Text
key Int
maxBytes =
    CveFetch
        { fetchHeadEtag :: IO (Maybe DbEtag)
fetchHeadEtag = Env -> Text -> Text -> IO (Maybe DbEtag)
s3HeadEtag Env
awsEnv Text
bucket Text
key
        , fetchDownload :: String -> IO DbEtag
fetchDownload = Env -> Text -> Text -> Int -> String -> IO DbEtag
s3Download Env
awsEnv Text
bucket Text
key Int
maxBytes
        }

s3HeadEtag :: AWS.Env -> Text -> Text -> IO (Maybe DbEtag)
s3HeadEtag :: Env -> Text -> Text -> IO (Maybe DbEtag)
s3HeadEtag Env
awsEnv Text
bucket Text
key =
    ResourceT IO (Either Error HeadObjectResponse)
-> IO (Either Error HeadObjectResponse)
forall (m :: * -> *) a. MonadUnliftIO m => ResourceT m a -> m a
runResourceT (Env
-> HeadObject
-> ResourceT IO (Either Error (AWSResponse HeadObject))
forall (m :: * -> *) a.
(MonadResource m, AWSRequest a) =>
Env -> a -> m (Either Error (AWSResponse a))
AWS.sendEither Env
awsEnv (BucketName -> ObjectKey -> HeadObject
S3.newHeadObject (Text -> BucketName
S3.BucketName Text
bucket) (Text -> ObjectKey
S3.ObjectKey Text
key))) IO (Either Error HeadObjectResponse)
-> (Either Error HeadObjectResponse -> IO (Maybe DbEtag))
-> IO (Maybe DbEtag)
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        Right HeadObjectResponse
resp -> Maybe DbEtag -> IO (Maybe DbEtag)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ETag -> DbEtag
dbEtag (ETag -> DbEtag) -> Maybe ETag -> Maybe DbEtag
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> HeadObjectResponse
resp HeadObjectResponse
-> Getting (Maybe ETag) HeadObjectResponse (Maybe ETag)
-> Maybe ETag
forall s a. s -> Getting a s a -> a
^. Getting (Maybe ETag) HeadObjectResponse (Maybe ETag)
Lens' HeadObjectResponse (Maybe ETag)
S3L.headObjectResponse_eTag)
        Left Error
err
            | Error -> Bool
isNotFound Error
err -> Maybe DbEtag -> IO (Maybe DbEtag)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe DbEtag
forall a. Maybe a
Nothing
            | Bool
otherwise -> Error -> IO (Maybe DbEtag)
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO Error
err

s3Download :: AWS.Env -> Text -> Text -> Int -> FilePath -> IO DbEtag
s3Download :: Env -> Text -> Text -> Int -> String -> IO DbEtag
s3Download Env
awsEnv Text
bucket Text
key Int
maxBytes String
dest = ResourceT IO DbEtag -> IO DbEtag
forall (m :: * -> *) a. MonadUnliftIO m => ResourceT m a -> m a
runResourceT (ResourceT IO DbEtag -> IO DbEtag)
-> ResourceT IO DbEtag -> IO DbEtag
forall a b. (a -> b) -> a -> b
$ do
    resp <- Env -> GetObject -> ResourceT IO (AWSResponse GetObject)
forall (m :: * -> *) a.
(MonadResource m, AWSRequest a) =>
Env -> a -> m (AWSResponse a)
AWS.send Env
awsEnv (BucketName -> ObjectKey -> GetObject
S3.newGetObject (Text -> BucketName
S3.BucketName Text
bucket) (Text -> ObjectKey
S3.ObjectKey Text
key))
    -- The declared length fails fast; the streaming cap is the
    -- enforcement (a declared length is not a guarantee).
    for_ (resp ^. S3L.getObjectResponse_contentLength) $ \Integer
len ->
        Bool -> ResourceT IO () -> ResourceT IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Integer
len Integer -> Integer -> Bool
forall a. Ord a => a -> a -> Bool
> Int -> Integer
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
maxBytes) (OsvDbFetchFault -> ResourceT IO ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO (Int -> OsvDbFetchFault
OsvDbTooLarge Int
maxBytes))
    AWS.sinkBody (resp ^. S3L.getObjectResponse_body) (cappedAt maxBytes .| C.sinkFile dest)
    maybe (throwIO OsvDbNoEtag) (pure . dbEtag) (resp ^. S3L.getObjectResponse_eTag)

dbEtag :: S3.ETag -> DbEtag
dbEtag :: ETag -> DbEtag
dbEtag (S3.ETag ByteString
bytes) = Text -> DbEtag
DbEtag (ByteString -> Text
forall a b. ConvertUtf8 a b => b -> a
decodeUtf8 ByteString
bytes)

isNotFound :: AWS.Error -> Bool
isNotFound :: Error -> Bool
isNotFound = \case
    AWS.ServiceError ServiceError
se -> Status -> Int
statusCode (ServiceError
se ServiceError -> Getting Status ServiceError Status -> Status
forall s a. s -> Getting a s a -> a
^. Getting Status ServiceError Status
Lens' ServiceError Status
AWS.serviceError_status) Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
404
    Error
_ -> Bool
False

{- | A pass-through conduit that refuses to stream past the byte cap
('OsvDbTooLarge'): the enforcement behind 's3CveFetch''s bounded download,
where the declared content length is only the fast-fail.
-}
cappedAt :: (MonadIO m) => Int -> ConduitT ByteString ByteString m ()
cappedAt :: forall (m :: * -> *).
MonadIO m =>
Int -> ConduitT ByteString ByteString m ()
cappedAt Int
maxBytes = Int -> ConduitT ByteString ByteString m ()
forall (m :: * -> *).
MonadIO m =>
Int -> ConduitT ByteString ByteString m ()
go Int
0
  where
    go :: Int -> ConduitT ByteString ByteString m ()
go Int
seen =
        ConduitT ByteString ByteString m (Maybe ByteString)
forall (m :: * -> *) i o. Monad m => ConduitT i o m (Maybe i)
await ConduitT ByteString ByteString m (Maybe ByteString)
-> (Maybe ByteString -> ConduitT ByteString ByteString m ())
-> ConduitT ByteString ByteString m ()
forall a b.
ConduitT ByteString ByteString m a
-> (a -> ConduitT ByteString ByteString m b)
-> ConduitT ByteString ByteString m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
            Maybe ByteString
Nothing -> ConduitT ByteString ByteString m ()
forall (f :: * -> *). Applicative f => f ()
pass
            Just ByteString
chunk -> do
                let seen' :: Int
seen' = Int
seen Int -> Int -> Int
forall a. Num a => a -> a -> a
+ ByteString -> Int
BS.length ByteString
chunk
                Bool
-> ConduitT ByteString ByteString m ()
-> ConduitT ByteString ByteString m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
seen' Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
maxBytes) (OsvDbFetchFault -> ConduitT ByteString ByteString m ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO (Int -> OsvDbFetchFault
OsvDbTooLarge Int
maxBytes))
                ByteString -> ConduitT ByteString ByteString m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield ByteString
chunk
                Int -> ConduitT ByteString ByteString m ()
go Int
seen'