{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}

module Ecluse.Pilot.Osv.Stream (
    streamOsvUrl,
    parseOsvStream,
) where

import Codec.Archive.Zip.Conduit.Types (ZipEntry (..))
import Codec.Archive.Zip.Conduit.UnZip (unZipStream)
import Conduit
import Data.Aeson (decodeStrict)
import Data.ByteString qualified as BS
import Katip (KatipContext, Severity (..), logFM, ls)
import Network.HTTP.Simple (getResponseBody, httpSource, parseRequest, setRequestCheckStatus)
import OpenTelemetry.Context qualified as Ctx
import OpenTelemetry.Trace (SpanKind (Internal), defaultSpanArguments, kind, makeTracer, tracerOptions)
import OpenTelemetry.Trace.Core (createSpan, endSpan)

import Ecluse.Pilot.Osv (ExtractedOsv, OsvAdvisory, extractFromAdvisory)
import Ecluse.Telemetry (Telemetry, telemetryTracerProvider)

-- | Fetch the OSV zip and stream its contents
streamOsvUrl :: (MonadResource m, MonadThrow m, KatipContext m) => Telemetry -> String -> ConduitT i ExtractedOsv m ()
streamOsvUrl :: forall (m :: * -> *) i.
(MonadResource m, MonadThrow m, KatipContext m) =>
Telemetry -> String -> ConduitT i ExtractedOsv m ()
streamOsvUrl Telemetry
telemetry String
urlStr = do
    m () -> ConduitT i ExtractedOsv m ()
forall (m :: * -> *) a.
Monad m =>
m a -> ConduitT i ExtractedOsv m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m () -> ConduitT i ExtractedOsv m ())
-> m () -> ConduitT i ExtractedOsv m ()
forall a b. (a -> b) -> a -> b
$ Severity -> LogStr -> m ()
forall (m :: * -> *).
(Applicative m, KatipContext m) =>
Severity -> LogStr -> m ()
logFM Severity
InfoS (String -> LogStr
forall a. StringConv a Text => a -> LogStr
ls (String
"Initializing OSV stream from URL: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
urlStr))
    let mTracer :: Maybe Tracer
mTracer = (\TracerProvider
tp -> TracerProvider -> InstrumentationLibrary -> TracerOptions -> Tracer
makeTracer TracerProvider
tp InstrumentationLibrary
"ecluse" TracerOptions
tracerOptions) (TracerProvider -> Tracer) -> Maybe TracerProvider -> Maybe Tracer
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Telemetry -> Maybe TracerProvider
telemetryTracerProvider Telemetry
telemetry
    IO (Maybe Span)
-> (Maybe Span -> IO ())
-> (Maybe Span -> ConduitT i ExtractedOsv m ())
-> ConduitT i ExtractedOsv m ()
forall (m :: * -> *) a i o r.
MonadResource m =>
IO a -> (a -> IO ()) -> (a -> ConduitT i o m r) -> ConduitT i o m r
bracketP
        ((Tracer -> IO Span) -> Maybe Tracer -> IO (Maybe Span)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Maybe a -> f (Maybe b)
traverse (\Tracer
t -> Tracer -> Context -> Text -> SpanArguments -> IO Span
forall (m :: * -> *).
(MonadIO m, HasCallStack) =>
Tracer -> Context -> Text -> SpanArguments -> m Span
createSpan Tracer
t Context
Ctx.empty Text
"ecluse.pilot.osv.stream" SpanArguments
defaultSpanArguments{kind = Internal}) Maybe Tracer
mTracer)
        ((Span -> IO ()) -> Maybe Span -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (Span -> Maybe Timestamp -> IO ()
forall (m :: * -> *). MonadIO m => Span -> Maybe Timestamp -> m ()
`endSpan` Maybe Timestamp
forall a. Maybe a
Nothing))
        ( \Maybe Span
_ -> do
            -- 'setRequestCheckStatus' makes a non-2xx response throw a
            -- 'StatusCodeException' at the header boundary. This is deliberate: it
            -- lets the backoff wrapper (see 'Ecluse.Pilot.Osv.Retry') see a 502
            -- from osv.dev as a retryable fault, rather than streaming the error
            -- page into the unzip parser where it would surface as a parse error a
            -- retry could not fix.
            req <- IO Request -> ConduitT i ExtractedOsv m Request
forall a. IO a -> ConduitT i ExtractedOsv m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Request -> ConduitT i ExtractedOsv m Request)
-> IO Request -> ConduitT i ExtractedOsv m Request
forall a b. (a -> b) -> a -> b
$ Request -> Request
setRequestCheckStatus (Request -> Request) -> IO Request -> IO Request
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> String -> IO Request
forall (m :: * -> *). MonadThrow m => String -> m Request
parseRequest String
urlStr
            httpSource req (\Response (ConduitM i ByteString m ())
res -> Response (ConduitM i ByteString m ()) -> ConduitM i ByteString m ()
forall a. Response a -> a
getResponseBody Response (ConduitM i ByteString m ())
res ConduitM i ByteString m ()
-> ConduitT ByteString ExtractedOsv m ()
-> ConduitT i ExtractedOsv m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| Telemetry -> ConduitT ByteString ExtractedOsv m ()
forall (m :: * -> *).
(MonadResource m, MonadThrow m, KatipContext m) =>
Telemetry -> ConduitT ByteString ExtractedOsv m ()
parseOsvStream Telemetry
telemetry)
        )

-- | Parse the zip stream and emit ExtractedOsv
parseOsvStream :: (MonadResource m, MonadThrow m, KatipContext m) => Telemetry -> ConduitT ByteString ExtractedOsv m ()
parseOsvStream :: forall (m :: * -> *).
(MonadResource m, MonadThrow m, KatipContext m) =>
Telemetry -> ConduitT ByteString ExtractedOsv m ()
parseOsvStream Telemetry
telemetry = do
    m () -> ConduitT ByteString ExtractedOsv m ()
forall (m :: * -> *) a.
Monad m =>
m a -> ConduitT ByteString ExtractedOsv m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m () -> ConduitT ByteString ExtractedOsv m ())
-> m () -> ConduitT ByteString ExtractedOsv m ()
forall a b. (a -> b) -> a -> b
$ Severity -> LogStr -> m ()
forall (m :: * -> *).
(Applicative m, KatipContext m) =>
Severity -> LogStr -> m ()
logFM Severity
InfoS (String -> LogStr
forall a. StringConv a Text => a -> LogStr
ls (String
"Starting OSV zip extraction and parsing pipeline" :: String))
    let mTracer :: Maybe Tracer
mTracer = (\TracerProvider
tp -> TracerProvider -> InstrumentationLibrary -> TracerOptions -> Tracer
makeTracer TracerProvider
tp InstrumentationLibrary
"ecluse" TracerOptions
tracerOptions) (TracerProvider -> Tracer) -> Maybe TracerProvider -> Maybe Tracer
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Telemetry -> Maybe TracerProvider
telemetryTracerProvider Telemetry
telemetry
    IO (Maybe Span)
-> (Maybe Span -> IO ())
-> (Maybe Span -> ConduitT ByteString ExtractedOsv m ())
-> ConduitT ByteString ExtractedOsv m ()
forall (m :: * -> *) a i o r.
MonadResource m =>
IO a -> (a -> IO ()) -> (a -> ConduitT i o m r) -> ConduitT i o m r
bracketP
        ((Tracer -> IO Span) -> Maybe Tracer -> IO (Maybe Span)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Maybe a -> f (Maybe b)
traverse (\Tracer
t -> Tracer -> Context -> Text -> SpanArguments -> IO Span
forall (m :: * -> *).
(MonadIO m, HasCallStack) =>
Tracer -> Context -> Text -> SpanArguments -> m Span
createSpan Tracer
t Context
Ctx.empty Text
"ecluse.pilot.osv.parse" SpanArguments
defaultSpanArguments{kind = Internal}) Maybe Tracer
mTracer)
        ((Span -> IO ()) -> Maybe Span -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (Span -> Maybe Timestamp -> IO ()
forall (m :: * -> *). MonadIO m => Span -> Maybe Timestamp -> m ()
`endSpan` Maybe Timestamp
forall a. Maybe a
Nothing))
        (\Maybe Span
_ -> ConduitT ByteString (Either ZipEntry ByteString) m ZipInfo
-> ConduitT ByteString (Either ZipEntry ByteString) m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void ((forall a. IO a -> m a)
-> ConduitT ByteString (Either ZipEntry ByteString) IO ZipInfo
-> ConduitT ByteString (Either ZipEntry ByteString) m ZipInfo
forall (m :: * -> *) (n :: * -> *) i o r.
Monad m =>
(forall a. m a -> n a) -> ConduitT i o m r -> ConduitT i o n r
transPipe IO a -> m a
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO ConduitT ByteString (Either ZipEntry ByteString) IO ZipInfo
forall (m :: * -> *).
(MonadThrow m, PrimMonad m) =>
ConduitM ByteString (Either ZipEntry ByteString) m ZipInfo
unZipStream) ConduitT ByteString (Either ZipEntry ByteString) m ()
-> ConduitT (Either ZipEntry ByteString) ExtractedOsv m ()
-> ConduitT ByteString ExtractedOsv m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| ConduitT (Either ZipEntry ByteString) ExtractedOsv m ()
forall (m :: * -> *).
(MonadThrow m, KatipContext m) =>
ConduitT (Either ZipEntry ByteString) ExtractedOsv m ()
processZipEntries)

processZipEntries :: (MonadThrow m, KatipContext m) => ConduitT (Either ZipEntry ByteString) ExtractedOsv m ()
processZipEntries :: forall (m :: * -> *).
(MonadThrow m, KatipContext m) =>
ConduitT (Either ZipEntry ByteString) ExtractedOsv m ()
processZipEntries =
    ConduitT
  (Either ZipEntry ByteString)
  ExtractedOsv
  m
  (Maybe (Either ZipEntry ByteString))
forall (m :: * -> *) i o. Monad m => ConduitT i o m (Maybe i)
await ConduitT
  (Either ZipEntry ByteString)
  ExtractedOsv
  m
  (Maybe (Either ZipEntry ByteString))
-> (Maybe (Either ZipEntry ByteString)
    -> ConduitT (Either ZipEntry ByteString) ExtractedOsv m ())
-> ConduitT (Either ZipEntry ByteString) ExtractedOsv m ()
forall a b.
ConduitT (Either ZipEntry ByteString) ExtractedOsv m a
-> (a -> ConduitT (Either ZipEntry ByteString) ExtractedOsv m b)
-> ConduitT (Either ZipEntry ByteString) ExtractedOsv m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        Maybe (Either ZipEntry ByteString)
Nothing -> m () -> ConduitT (Either ZipEntry ByteString) ExtractedOsv m ()
forall (m :: * -> *) a.
Monad m =>
m a -> ConduitT (Either ZipEntry ByteString) ExtractedOsv m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m () -> ConduitT (Either ZipEntry ByteString) ExtractedOsv m ())
-> m () -> ConduitT (Either ZipEntry ByteString) ExtractedOsv m ()
forall a b. (a -> b) -> a -> b
$ Severity -> LogStr -> m ()
forall (m :: * -> *).
(Applicative m, KatipContext m) =>
Severity -> LogStr -> m ()
logFM Severity
InfoS (String -> LogStr
forall a. StringConv a Text => a -> LogStr
ls (String
"OSV stream fully processed" :: String))
        Just (Left ZipEntry
entry) -> do
            fileBytes <- ConduitT (Either ZipEntry ByteString) ExtractedOsv m ByteString
forall (m :: * -> *) o.
Monad m =>
ConduitT (Either ZipEntry ByteString) o m ByteString
collectFile
            case decodeStrict fileBytes :: Maybe OsvAdvisory of
                Just OsvAdvisory
adv -> [ExtractedOsv]
-> ConduitT
     (Either ZipEntry ByteString) (Element [ExtractedOsv]) m ()
forall (m :: * -> *) mono i.
(Monad m, MonoFoldable mono) =>
mono -> ConduitT i (Element mono) m ()
yieldMany (OsvAdvisory -> [ExtractedOsv]
extractFromAdvisory OsvAdvisory
adv)
                Maybe OsvAdvisory
Nothing -> m () -> ConduitT (Either ZipEntry ByteString) ExtractedOsv m ()
forall (m :: * -> *) a.
Monad m =>
m a -> ConduitT (Either ZipEntry ByteString) ExtractedOsv m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m () -> ConduitT (Either ZipEntry ByteString) ExtractedOsv m ())
-> m () -> ConduitT (Either ZipEntry ByteString) ExtractedOsv m ()
forall a b. (a -> b) -> a -> b
$ Severity -> LogStr -> m ()
forall (m :: * -> *).
(Applicative m, KatipContext m) =>
Severity -> LogStr -> m ()
logFM Severity
WarningS (Text -> LogStr
forall a. StringConv a Text => a -> LogStr
ls (Text
"Failed to parse OSV advisory JSON from entry: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> ZipEntry -> Text
zipEntryNameText ZipEntry
entry))
            processZipEntries
        Just (Right ByteString
_) -> ConduitT (Either ZipEntry ByteString) ExtractedOsv m ()
forall (m :: * -> *).
(MonadThrow m, KatipContext m) =>
ConduitT (Either ZipEntry ByteString) ExtractedOsv m ()
processZipEntries

zipEntryNameText :: ZipEntry -> Text
zipEntryNameText :: ZipEntry -> Text
zipEntryNameText ZipEntry
entry = case ZipEntry -> Either Text ByteString
zipEntryName ZipEntry
entry of
    Left Text
txt -> Text
txt
    Right ByteString
bs -> OnDecodeError -> ByteString -> Text
decodeUtf8With OnDecodeError
lenientDecode ByteString
bs

collectFile :: (Monad m) => ConduitT (Either ZipEntry ByteString) o m ByteString
collectFile :: forall (m :: * -> *) o.
Monad m =>
ConduitT (Either ZipEntry ByteString) o m ByteString
collectFile = [ByteString]
-> ConduitT (Either ZipEntry ByteString) o m ByteString
forall {m :: * -> *} {a} {o}.
Monad m =>
[ByteString] -> ConduitT (Either a ByteString) o m ByteString
go []
  where
    go :: [ByteString] -> ConduitT (Either a ByteString) o m ByteString
go [ByteString]
acc =
        ConduitT (Either a ByteString) o m (Maybe (Either a ByteString))
forall (m :: * -> *) i o. Monad m => ConduitT i o m (Maybe i)
await ConduitT (Either a ByteString) o m (Maybe (Either a ByteString))
-> (Maybe (Either a ByteString)
    -> ConduitT (Either a ByteString) o m ByteString)
-> ConduitT (Either a ByteString) o m ByteString
forall a b.
ConduitT (Either a ByteString) o m a
-> (a -> ConduitT (Either a ByteString) o m b)
-> ConduitT (Either a ByteString) o m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
            Maybe (Either a ByteString)
Nothing -> ByteString -> ConduitT (Either a ByteString) o m ByteString
forall a. a -> ConduitT (Either a ByteString) o m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([ByteString] -> ByteString
BS.concat ([ByteString] -> [ByteString]
forall a. [a] -> [a]
reverse [ByteString]
acc))
            Just (Left a
entry) -> do
                Either a ByteString -> ConduitT (Either a ByteString) o m ()
forall i o (m :: * -> *). i -> ConduitT i o m ()
leftover (a -> Either a ByteString
forall a b. a -> Either a b
Left a
entry)
                ByteString -> ConduitT (Either a ByteString) o m ByteString
forall a. a -> ConduitT (Either a ByteString) o m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([ByteString] -> ByteString
BS.concat ([ByteString] -> [ByteString]
forall a. [a] -> [a]
reverse [ByteString]
acc))
            Just (Right ByteString
bs) -> [ByteString] -> ConduitT (Either a ByteString) o m ByteString
go (ByteString
bs ByteString -> [ByteString] -> [ByteString]
forall a. a -> [a] -> [a]
: [ByteString]
acc)