module Ecluse.Core.Cve.Sync (
CveFetch (..),
DbEtag (..),
OsvDbFetchFault (..),
s3CveFetch,
cappedAt,
SyncEnv (..),
SyncOutcome (..),
syncStep,
SyncSchedule (..),
runCveSync,
bootBackoffDelays,
) where
import Conduit (ConduitT, await, runResourceT, yield, (.|))
import Data.ByteString qualified as BS
import Data.Conduit.Combinators qualified as C
import Katip (KatipContext, Severity (DebugS, ErrorS, InfoS, WarningS), logFM, ls)
import Network.HTTP.Types.Status (statusCode)
import System.Directory (removeFile, renameFile)
import UnliftIO (MonadUnliftIO, tryAny)
import UnliftIO.Concurrent (threadDelay)
import UnliftIO.Exception (catchAny, mask, onException, throwIO)
import Amazonka qualified as AWS
import Amazonka.S3 qualified as S3
import Amazonka.S3.Lens qualified as S3L
import Lens.Micro ((^.))
import Ecluse.Core.Cve (CveDb (cveDbClose, cveDbMeta), CveDbRejected, openCveDb)
import Ecluse.Core.Cve.Slot (CveSlot, swapIn)
import Ecluse.Core.Ecosystem (Ecosystem)
newtype DbEtag = DbEtag Text
deriving stock (DbEtag -> DbEtag -> Bool
(DbEtag -> DbEtag -> Bool)
-> (DbEtag -> DbEtag -> Bool) -> Eq DbEtag
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: DbEtag -> DbEtag -> Bool
== :: DbEtag -> DbEtag -> Bool
$c/= :: DbEtag -> DbEtag -> Bool
/= :: DbEtag -> DbEtag -> Bool
Eq, Int -> DbEtag -> ShowS
[DbEtag] -> ShowS
DbEtag -> String
(Int -> DbEtag -> ShowS)
-> (DbEtag -> String) -> ([DbEtag] -> ShowS) -> Show DbEtag
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> DbEtag -> ShowS
showsPrec :: Int -> DbEtag -> ShowS
$cshow :: DbEtag -> String
show :: DbEtag -> String
$cshowList :: [DbEtag] -> ShowS
showList :: [DbEtag] -> ShowS
Show)
data CveFetch = CveFetch
{ CveFetch -> IO (Maybe DbEtag)
fetchHeadEtag :: IO (Maybe DbEtag)
, CveFetch -> String -> IO DbEtag
fetchDownload :: FilePath -> IO DbEtag
}
data OsvDbFetchFault
=
OsvDbTooLarge Int
|
OsvDbNoEtag
deriving stock (OsvDbFetchFault -> OsvDbFetchFault -> Bool
(OsvDbFetchFault -> OsvDbFetchFault -> Bool)
-> (OsvDbFetchFault -> OsvDbFetchFault -> Bool)
-> Eq OsvDbFetchFault
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: OsvDbFetchFault -> OsvDbFetchFault -> Bool
== :: OsvDbFetchFault -> OsvDbFetchFault -> Bool
$c/= :: OsvDbFetchFault -> OsvDbFetchFault -> Bool
/= :: OsvDbFetchFault -> OsvDbFetchFault -> Bool
Eq, Int -> OsvDbFetchFault -> ShowS
[OsvDbFetchFault] -> ShowS
OsvDbFetchFault -> String
(Int -> OsvDbFetchFault -> ShowS)
-> (OsvDbFetchFault -> String)
-> ([OsvDbFetchFault] -> ShowS)
-> Show OsvDbFetchFault
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> OsvDbFetchFault -> ShowS
showsPrec :: Int -> OsvDbFetchFault -> ShowS
$cshow :: OsvDbFetchFault -> String
show :: OsvDbFetchFault -> String
$cshowList :: [OsvDbFetchFault] -> ShowS
showList :: [OsvDbFetchFault] -> ShowS
Show)
instance Exception OsvDbFetchFault
data SyncEnv = SyncEnv
{ SyncEnv -> CveFetch
syncFetch :: CveFetch
, SyncEnv -> Ecosystem
syncEcosystem :: Ecosystem
, SyncEnv -> String
syncDbPath :: FilePath
, SyncEnv -> CveSlot
syncSlot :: CveSlot
}
data SyncOutcome
=
SyncSwapped DbEtag [(Text, Text)]
|
SyncUnchanged
|
SyncAbsent
|
SyncRejected DbEtag CveDbRejected
deriving stock (Int -> SyncOutcome -> ShowS
[SyncOutcome] -> ShowS
SyncOutcome -> String
(Int -> SyncOutcome -> ShowS)
-> (SyncOutcome -> String)
-> ([SyncOutcome] -> ShowS)
-> Show SyncOutcome
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> SyncOutcome -> ShowS
showsPrec :: Int -> SyncOutcome -> ShowS
$cshow :: SyncOutcome -> String
show :: SyncOutcome -> String
$cshowList :: [SyncOutcome] -> ShowS
showList :: [SyncOutcome] -> ShowS
Show)
syncStep :: SyncEnv -> Maybe DbEtag -> IO SyncOutcome
syncStep :: SyncEnv -> Maybe DbEtag -> IO SyncOutcome
syncStep SyncEnv
env Maybe DbEtag
lastSeen =
CveFetch -> IO (Maybe DbEtag)
fetchHeadEtag (SyncEnv -> CveFetch
syncFetch SyncEnv
env) IO (Maybe DbEtag)
-> (Maybe DbEtag -> IO SyncOutcome) -> IO SyncOutcome
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe DbEtag
Nothing -> SyncOutcome -> IO SyncOutcome
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SyncOutcome
SyncAbsent
Just DbEtag
remote
| DbEtag -> Maybe DbEtag
forall a. a -> Maybe a
Just DbEtag
remote Maybe DbEtag -> Maybe DbEtag -> Bool
forall a. Eq a => a -> a -> Bool
== Maybe DbEtag
lastSeen -> SyncOutcome -> IO SyncOutcome
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SyncOutcome
SyncUnchanged
| Bool
otherwise -> SyncEnv -> IO SyncOutcome
syncNewArtifact SyncEnv
env
syncNewArtifact :: SyncEnv -> IO SyncOutcome
syncNewArtifact :: SyncEnv -> IO SyncOutcome
syncNewArtifact SyncEnv
env = do
let temp :: String
temp = SyncEnv -> String
syncDbPath SyncEnv
env String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
".tmp"
fetched <- CveFetch -> String -> IO DbEtag
fetchDownload (SyncEnv -> CveFetch
syncFetch SyncEnv
env) String
temp IO DbEtag -> IO () -> IO DbEtag
forall (m :: * -> *) a b. MonadUnliftIO m => m a -> m b -> m a
`onException` String -> IO ()
discardTemp String
temp
opened <- openCveDb (syncEcosystem env) temp `onException` discardTemp temp
case opened of
Left CveDbRejected
rejection -> do
String -> IO ()
discardTemp String
temp
SyncOutcome -> IO SyncOutcome
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (DbEtag -> CveDbRejected -> SyncOutcome
SyncRejected DbEtag
fetched CveDbRejected
rejection)
Right CveDb
db -> SyncEnv -> String -> DbEtag -> CveDb -> IO SyncOutcome
publishVerified SyncEnv
env String
temp DbEtag
fetched CveDb
db
publishVerified :: SyncEnv -> FilePath -> DbEtag -> CveDb -> IO SyncOutcome
publishVerified :: SyncEnv -> String -> DbEtag -> CveDb -> IO SyncOutcome
publishVerified SyncEnv
env String
temp DbEtag
fetched CveDb
db = ((forall a. IO a -> IO a) -> IO SyncOutcome) -> IO SyncOutcome
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> m a) -> m b) -> m b
mask (((forall a. IO a -> IO a) -> IO SyncOutcome) -> IO SyncOutcome)
-> ((forall a. IO a -> IO a) -> IO SyncOutcome) -> IO SyncOutcome
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore -> do
IO () -> IO ()
forall a. IO a -> IO a
restore (String -> String -> IO ()
renameFile String
temp (SyncEnv -> String
syncDbPath SyncEnv
env))
IO () -> IO () -> IO ()
forall (m :: * -> *) a b. MonadUnliftIO m => m a -> m b -> m a
`onException` (CveDb -> IO ()
cveDbClose CveDb
db IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> String -> IO ()
discardTemp String
temp)
CveSlot -> CveDb -> IO ()
swapIn (SyncEnv -> CveSlot
syncSlot SyncEnv
env) CveDb
db
SyncOutcome -> IO SyncOutcome
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (DbEtag -> [(Text, Text)] -> SyncOutcome
SyncSwapped DbEtag
fetched (CveDb -> [(Text, Text)]
cveDbMeta CveDb
db))
discardTemp :: FilePath -> IO ()
discardTemp :: String -> IO ()
discardTemp String
temp = String -> IO ()
removeFile String
temp IO () -> (SomeException -> IO ()) -> IO ()
forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> (SomeException -> m a) -> m a
`catchAny` IO () -> SomeException -> IO ()
forall a b. a -> b -> a
const IO ()
forall (f :: * -> *). Applicative f => f ()
pass
data SyncSchedule = SyncSchedule
{ SyncSchedule -> [Int]
schedBootBackoff :: [Int]
, SyncSchedule -> Int
schedPollDelay :: Int
}
bootBackoffDelays :: [Int]
bootBackoffDelays :: [Int]
bootBackoffDelays = [Int
1_000_000, Int
2_000_000, Int
4_000_000, Int
8_000_000, Int
16_000_000]
runCveSync :: (MonadUnliftIO m, KatipContext m) => SyncEnv -> SyncSchedule -> IO () -> m ()
runCveSync :: forall (m :: * -> *).
(MonadUnliftIO m, KatipContext m) =>
SyncEnv -> SyncSchedule -> IO () -> m ()
runCveSync SyncEnv
env SyncSchedule
schedule IO ()
notifyFirstSync = do
seen <- Maybe DbEtag -> [Int] -> m (Maybe DbEtag)
forall {m :: * -> *}.
(MonadUnliftIO m, KatipContext m) =>
Maybe DbEtag -> [Int] -> m (Maybe DbEtag)
burst Maybe DbEtag
forall a. Maybe a
Nothing (SyncSchedule -> [Int]
schedBootBackoff SyncSchedule
schedule)
poll seen
where
eco :: Text
eco = Ecosystem -> Text
forall b a. (Show a, IsString b) => a -> b
show (SyncEnv -> Ecosystem
syncEcosystem SyncEnv
env) :: Text
burst :: Maybe DbEtag -> [Int] -> m (Maybe DbEtag)
burst Maybe DbEtag
lastSeen [Int]
delays = do
(settled, seen') <- SyncEnv -> Text -> IO () -> Maybe DbEtag -> m (Bool, Maybe DbEtag)
forall (m :: * -> *).
(MonadUnliftIO m, KatipContext m) =>
SyncEnv -> Text -> IO () -> Maybe DbEtag -> m (Bool, Maybe DbEtag)
supervisedStep SyncEnv
env Text
eco IO ()
notifyFirstSync Maybe DbEtag
lastSeen
case (settled, delays) of
(Bool
True, [Int]
_) -> Maybe DbEtag -> m (Maybe DbEtag)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe DbEtag
seen'
(Bool
False, []) -> do
Severity -> LogStr -> m ()
forall (m :: * -> *).
(Applicative m, KatipContext m) =>
Severity -> LogStr -> m ()
logFM Severity
WarningS (Text -> LogStr
forall a. StringConv a Text => a -> LogStr
ls (Text
"cve-sync[" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
eco Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"]: boot fetch did not produce an advisory database; continuing without one, polling"))
Maybe DbEtag -> m (Maybe DbEtag)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe DbEtag
seen'
(Bool
False, Int
d : [Int]
rest) -> do
Int -> m ()
forall (m :: * -> *). MonadIO m => Int -> m ()
threadDelay Int
d
Maybe DbEtag -> [Int] -> m (Maybe DbEtag)
burst Maybe DbEtag
seen' [Int]
rest
poll :: Maybe DbEtag -> m b
poll Maybe DbEtag
lastSeen = do
Int -> m ()
forall (m :: * -> *). MonadIO m => Int -> m ()
threadDelay (SyncSchedule -> Int
schedPollDelay SyncSchedule
schedule)
(_, seen') <- SyncEnv -> Text -> IO () -> Maybe DbEtag -> m (Bool, Maybe DbEtag)
forall (m :: * -> *).
(MonadUnliftIO m, KatipContext m) =>
SyncEnv -> Text -> IO () -> Maybe DbEtag -> m (Bool, Maybe DbEtag)
supervisedStep SyncEnv
env Text
eco IO ()
notifyFirstSync Maybe DbEtag
lastSeen
poll seen'
supervisedStep :: (MonadUnliftIO m, KatipContext m) => SyncEnv -> Text -> IO () -> Maybe DbEtag -> m (Bool, Maybe DbEtag)
supervisedStep :: forall (m :: * -> *).
(MonadUnliftIO m, KatipContext m) =>
SyncEnv -> Text -> IO () -> Maybe DbEtag -> m (Bool, Maybe DbEtag)
supervisedStep SyncEnv
env Text
eco IO ()
notifyFirstSync Maybe DbEtag
lastSeen =
m SyncOutcome -> m (Either SomeException SyncOutcome)
forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> m (Either SomeException a)
tryAny (IO SyncOutcome -> m SyncOutcome
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (SyncEnv -> Maybe DbEtag -> IO SyncOutcome
syncStep SyncEnv
env Maybe DbEtag
lastSeen)) m (Either SomeException SyncOutcome)
-> (Either SomeException SyncOutcome -> m (Bool, Maybe DbEtag))
-> m (Bool, Maybe DbEtag)
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left SomeException
err -> do
Severity -> LogStr -> m ()
forall (m :: * -> *).
(Applicative m, KatipContext m) =>
Severity -> LogStr -> m ()
logFM Severity
ErrorS (Text -> LogStr
forall a. StringConv a Text => a -> LogStr
ls (Text
"cve-sync[" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
eco Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"]: sync attempt failed: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> SomeException -> Text
forall b a. (Show a, IsString b) => a -> b
show SomeException
err))
(Bool, Maybe DbEtag) -> m (Bool, Maybe DbEtag)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bool
False, Maybe DbEtag
lastSeen)
Right (SyncSwapped DbEtag
etag [(Text, Text)]
meta) -> do
Severity -> LogStr -> m ()
forall (m :: * -> *).
(Applicative m, KatipContext m) =>
Severity -> LogStr -> m ()
logFM Severity
InfoS (Text -> LogStr
forall a. StringConv a Text => a -> LogStr
ls (Text
"cve-sync[" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
eco Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"]: advisory database swapped in: etag=" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> DbEtag -> Text
forall b a. (Show a, IsString b) => a -> b
show DbEtag
etag Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" meta=" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> [(Text, Text)] -> Text
forall b a. (Show a, IsString b) => a -> b
show [(Text, Text)]
meta))
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO ()
notifyFirstSync
(Bool, Maybe DbEtag) -> m (Bool, Maybe DbEtag)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bool
True, DbEtag -> Maybe DbEtag
forall a. a -> Maybe a
Just DbEtag
etag)
Right SyncOutcome
SyncUnchanged -> do
Severity -> LogStr -> m ()
forall (m :: * -> *).
(Applicative m, KatipContext m) =>
Severity -> LogStr -> m ()
logFM Severity
DebugS (Text -> LogStr
forall a. StringConv a Text => a -> LogStr
ls (Text
"cve-sync[" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
eco Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"]: advisory database unchanged"))
(Bool, Maybe DbEtag) -> m (Bool, Maybe DbEtag)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bool
True, Maybe DbEtag
lastSeen)
Right SyncOutcome
SyncAbsent -> do
Severity -> LogStr -> m ()
forall (m :: * -> *).
(Applicative m, KatipContext m) =>
Severity -> LogStr -> m ()
logFM Severity
DebugS (Text -> LogStr
forall a. StringConv a Text => a -> LogStr
ls (Text
"cve-sync[" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
eco Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"]: no advisory database published yet"))
(Bool, Maybe DbEtag) -> m (Bool, Maybe DbEtag)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bool
False, Maybe DbEtag
lastSeen)
Right (SyncRejected DbEtag
etag CveDbRejected
rejection) -> do
Severity -> LogStr -> m ()
forall (m :: * -> *).
(Applicative m, KatipContext m) =>
Severity -> LogStr -> m ()
logFM Severity
ErrorS (Text -> LogStr
forall a. StringConv a Text => a -> LogStr
ls (Text
"cve-sync[" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
eco Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"]: downloaded artifact refused (keeping last good): " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> CveDbRejected -> Text
forall b a. (Show a, IsString b) => a -> b
show CveDbRejected
rejection))
(Bool, Maybe DbEtag) -> m (Bool, Maybe DbEtag)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bool
True, DbEtag -> Maybe DbEtag
forall a. a -> Maybe a
Just DbEtag
etag)
s3CveFetch :: AWS.Env -> Text -> Text -> Int -> CveFetch
s3CveFetch :: Env -> Text -> Text -> Int -> CveFetch
s3CveFetch Env
awsEnv Text
bucket Text
key Int
maxBytes =
CveFetch
{ fetchHeadEtag :: IO (Maybe DbEtag)
fetchHeadEtag = Env -> Text -> Text -> IO (Maybe DbEtag)
s3HeadEtag Env
awsEnv Text
bucket Text
key
, fetchDownload :: String -> IO DbEtag
fetchDownload = Env -> Text -> Text -> Int -> String -> IO DbEtag
s3Download Env
awsEnv Text
bucket Text
key Int
maxBytes
}
s3HeadEtag :: AWS.Env -> Text -> Text -> IO (Maybe DbEtag)
s3HeadEtag :: Env -> Text -> Text -> IO (Maybe DbEtag)
s3HeadEtag Env
awsEnv Text
bucket Text
key =
ResourceT IO (Either Error HeadObjectResponse)
-> IO (Either Error HeadObjectResponse)
forall (m :: * -> *) a. MonadUnliftIO m => ResourceT m a -> m a
runResourceT (Env
-> HeadObject
-> ResourceT IO (Either Error (AWSResponse HeadObject))
forall (m :: * -> *) a.
(MonadResource m, AWSRequest a) =>
Env -> a -> m (Either Error (AWSResponse a))
AWS.sendEither Env
awsEnv (BucketName -> ObjectKey -> HeadObject
S3.newHeadObject (Text -> BucketName
S3.BucketName Text
bucket) (Text -> ObjectKey
S3.ObjectKey Text
key))) IO (Either Error HeadObjectResponse)
-> (Either Error HeadObjectResponse -> IO (Maybe DbEtag))
-> IO (Maybe DbEtag)
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Right HeadObjectResponse
resp -> Maybe DbEtag -> IO (Maybe DbEtag)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ETag -> DbEtag
dbEtag (ETag -> DbEtag) -> Maybe ETag -> Maybe DbEtag
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> HeadObjectResponse
resp HeadObjectResponse
-> Getting (Maybe ETag) HeadObjectResponse (Maybe ETag)
-> Maybe ETag
forall s a. s -> Getting a s a -> a
^. Getting (Maybe ETag) HeadObjectResponse (Maybe ETag)
Lens' HeadObjectResponse (Maybe ETag)
S3L.headObjectResponse_eTag)
Left Error
err
| Error -> Bool
isNotFound Error
err -> Maybe DbEtag -> IO (Maybe DbEtag)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe DbEtag
forall a. Maybe a
Nothing
| Bool
otherwise -> Error -> IO (Maybe DbEtag)
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO Error
err
s3Download :: AWS.Env -> Text -> Text -> Int -> FilePath -> IO DbEtag
s3Download :: Env -> Text -> Text -> Int -> String -> IO DbEtag
s3Download Env
awsEnv Text
bucket Text
key Int
maxBytes String
dest = ResourceT IO DbEtag -> IO DbEtag
forall (m :: * -> *) a. MonadUnliftIO m => ResourceT m a -> m a
runResourceT (ResourceT IO DbEtag -> IO DbEtag)
-> ResourceT IO DbEtag -> IO DbEtag
forall a b. (a -> b) -> a -> b
$ do
resp <- Env -> GetObject -> ResourceT IO (AWSResponse GetObject)
forall (m :: * -> *) a.
(MonadResource m, AWSRequest a) =>
Env -> a -> m (AWSResponse a)
AWS.send Env
awsEnv (BucketName -> ObjectKey -> GetObject
S3.newGetObject (Text -> BucketName
S3.BucketName Text
bucket) (Text -> ObjectKey
S3.ObjectKey Text
key))
for_ (resp ^. S3L.getObjectResponse_contentLength) $ \Integer
len ->
Bool -> ResourceT IO () -> ResourceT IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Integer
len Integer -> Integer -> Bool
forall a. Ord a => a -> a -> Bool
> Int -> Integer
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
maxBytes) (OsvDbFetchFault -> ResourceT IO ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO (Int -> OsvDbFetchFault
OsvDbTooLarge Int
maxBytes))
AWS.sinkBody (resp ^. S3L.getObjectResponse_body) (cappedAt maxBytes .| C.sinkFile dest)
maybe (throwIO OsvDbNoEtag) (pure . dbEtag) (resp ^. S3L.getObjectResponse_eTag)
dbEtag :: S3.ETag -> DbEtag
dbEtag :: ETag -> DbEtag
dbEtag (S3.ETag ByteString
bytes) = Text -> DbEtag
DbEtag (ByteString -> Text
forall a b. ConvertUtf8 a b => b -> a
decodeUtf8 ByteString
bytes)
isNotFound :: AWS.Error -> Bool
isNotFound :: Error -> Bool
isNotFound = \case
AWS.ServiceError ServiceError
se -> Status -> Int
statusCode (ServiceError
se ServiceError -> Getting Status ServiceError Status -> Status
forall s a. s -> Getting a s a -> a
^. Getting Status ServiceError Status
Lens' ServiceError Status
AWS.serviceError_status) Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
404
Error
_ -> Bool
False
cappedAt :: (MonadIO m) => Int -> ConduitT ByteString ByteString m ()
cappedAt :: forall (m :: * -> *).
MonadIO m =>
Int -> ConduitT ByteString ByteString m ()
cappedAt Int
maxBytes = Int -> ConduitT ByteString ByteString m ()
forall (m :: * -> *).
MonadIO m =>
Int -> ConduitT ByteString ByteString m ()
go Int
0
where
go :: Int -> ConduitT ByteString ByteString m ()
go Int
seen =
ConduitT ByteString ByteString m (Maybe ByteString)
forall (m :: * -> *) i o. Monad m => ConduitT i o m (Maybe i)
await ConduitT ByteString ByteString m (Maybe ByteString)
-> (Maybe ByteString -> ConduitT ByteString ByteString m ())
-> ConduitT ByteString ByteString m ()
forall a b.
ConduitT ByteString ByteString m a
-> (a -> ConduitT ByteString ByteString m b)
-> ConduitT ByteString ByteString m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe ByteString
Nothing -> ConduitT ByteString ByteString m ()
forall (f :: * -> *). Applicative f => f ()
pass
Just ByteString
chunk -> do
let seen' :: Int
seen' = Int
seen Int -> Int -> Int
forall a. Num a => a -> a -> a
+ ByteString -> Int
BS.length ByteString
chunk
Bool
-> ConduitT ByteString ByteString m ()
-> ConduitT ByteString ByteString m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
seen' Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
maxBytes) (OsvDbFetchFault -> ConduitT ByteString ByteString m ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO (Int -> OsvDbFetchFault
OsvDbTooLarge Int
maxBytes))
ByteString -> ConduitT ByteString ByteString m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield ByteString
chunk
Int -> ConduitT ByteString ByteString m ()
go Int
seen'