{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
module Ecluse.Pilot.Osv.Stream (
streamOsvUrl,
parseOsvStream,
) where
import Codec.Archive.Zip.Conduit.Types (ZipEntry (..))
import Codec.Archive.Zip.Conduit.UnZip (unZipStream)
import Conduit
import Data.Aeson (decodeStrict)
import Data.ByteString qualified as BS
import Katip (KatipContext, Severity (..), logFM, ls)
import Network.HTTP.Simple (getResponseBody, httpSource, parseRequest, setRequestCheckStatus)
import OpenTelemetry.Context qualified as Ctx
import OpenTelemetry.Trace (SpanKind (Internal), defaultSpanArguments, kind, makeTracer, tracerOptions)
import OpenTelemetry.Trace.Core (createSpan, endSpan)
import Ecluse.Pilot.Osv (ExtractedOsv, OsvAdvisory, extractFromAdvisory)
import Ecluse.Telemetry (Telemetry, telemetryTracerProvider)
streamOsvUrl :: (MonadResource m, MonadThrow m, KatipContext m) => Telemetry -> String -> ConduitT i ExtractedOsv m ()
streamOsvUrl :: forall (m :: * -> *) i.
(MonadResource m, MonadThrow m, KatipContext m) =>
Telemetry -> String -> ConduitT i ExtractedOsv m ()
streamOsvUrl Telemetry
telemetry String
urlStr = do
m () -> ConduitT i ExtractedOsv m ()
forall (m :: * -> *) a.
Monad m =>
m a -> ConduitT i ExtractedOsv m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m () -> ConduitT i ExtractedOsv m ())
-> m () -> ConduitT i ExtractedOsv m ()
forall a b. (a -> b) -> a -> b
$ Severity -> LogStr -> m ()
forall (m :: * -> *).
(Applicative m, KatipContext m) =>
Severity -> LogStr -> m ()
logFM Severity
InfoS (String -> LogStr
forall a. StringConv a Text => a -> LogStr
ls (String
"Initializing OSV stream from URL: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
urlStr))
let mTracer :: Maybe Tracer
mTracer = (\TracerProvider
tp -> TracerProvider -> InstrumentationLibrary -> TracerOptions -> Tracer
makeTracer TracerProvider
tp InstrumentationLibrary
"ecluse" TracerOptions
tracerOptions) (TracerProvider -> Tracer) -> Maybe TracerProvider -> Maybe Tracer
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Telemetry -> Maybe TracerProvider
telemetryTracerProvider Telemetry
telemetry
IO (Maybe Span)
-> (Maybe Span -> IO ())
-> (Maybe Span -> ConduitT i ExtractedOsv m ())
-> ConduitT i ExtractedOsv m ()
forall (m :: * -> *) a i o r.
MonadResource m =>
IO a -> (a -> IO ()) -> (a -> ConduitT i o m r) -> ConduitT i o m r
bracketP
((Tracer -> IO Span) -> Maybe Tracer -> IO (Maybe Span)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Maybe a -> f (Maybe b)
traverse (\Tracer
t -> Tracer -> Context -> Text -> SpanArguments -> IO Span
forall (m :: * -> *).
(MonadIO m, HasCallStack) =>
Tracer -> Context -> Text -> SpanArguments -> m Span
createSpan Tracer
t Context
Ctx.empty Text
"ecluse.pilot.osv.stream" SpanArguments
defaultSpanArguments{kind = Internal}) Maybe Tracer
mTracer)
((Span -> IO ()) -> Maybe Span -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (Span -> Maybe Timestamp -> IO ()
forall (m :: * -> *). MonadIO m => Span -> Maybe Timestamp -> m ()
`endSpan` Maybe Timestamp
forall a. Maybe a
Nothing))
( \Maybe Span
_ -> do
req <- IO Request -> ConduitT i ExtractedOsv m Request
forall a. IO a -> ConduitT i ExtractedOsv m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Request -> ConduitT i ExtractedOsv m Request)
-> IO Request -> ConduitT i ExtractedOsv m Request
forall a b. (a -> b) -> a -> b
$ Request -> Request
setRequestCheckStatus (Request -> Request) -> IO Request -> IO Request
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> String -> IO Request
forall (m :: * -> *). MonadThrow m => String -> m Request
parseRequest String
urlStr
httpSource req (\Response (ConduitM i ByteString m ())
res -> Response (ConduitM i ByteString m ()) -> ConduitM i ByteString m ()
forall a. Response a -> a
getResponseBody Response (ConduitM i ByteString m ())
res ConduitM i ByteString m ()
-> ConduitT ByteString ExtractedOsv m ()
-> ConduitT i ExtractedOsv m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| Telemetry -> ConduitT ByteString ExtractedOsv m ()
forall (m :: * -> *).
(MonadResource m, MonadThrow m, KatipContext m) =>
Telemetry -> ConduitT ByteString ExtractedOsv m ()
parseOsvStream Telemetry
telemetry)
)
parseOsvStream :: (MonadResource m, MonadThrow m, KatipContext m) => Telemetry -> ConduitT ByteString ExtractedOsv m ()
parseOsvStream :: forall (m :: * -> *).
(MonadResource m, MonadThrow m, KatipContext m) =>
Telemetry -> ConduitT ByteString ExtractedOsv m ()
parseOsvStream Telemetry
telemetry = do
m () -> ConduitT ByteString ExtractedOsv m ()
forall (m :: * -> *) a.
Monad m =>
m a -> ConduitT ByteString ExtractedOsv m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m () -> ConduitT ByteString ExtractedOsv m ())
-> m () -> ConduitT ByteString ExtractedOsv m ()
forall a b. (a -> b) -> a -> b
$ Severity -> LogStr -> m ()
forall (m :: * -> *).
(Applicative m, KatipContext m) =>
Severity -> LogStr -> m ()
logFM Severity
InfoS (String -> LogStr
forall a. StringConv a Text => a -> LogStr
ls (String
"Starting OSV zip extraction and parsing pipeline" :: String))
let mTracer :: Maybe Tracer
mTracer = (\TracerProvider
tp -> TracerProvider -> InstrumentationLibrary -> TracerOptions -> Tracer
makeTracer TracerProvider
tp InstrumentationLibrary
"ecluse" TracerOptions
tracerOptions) (TracerProvider -> Tracer) -> Maybe TracerProvider -> Maybe Tracer
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Telemetry -> Maybe TracerProvider
telemetryTracerProvider Telemetry
telemetry
IO (Maybe Span)
-> (Maybe Span -> IO ())
-> (Maybe Span -> ConduitT ByteString ExtractedOsv m ())
-> ConduitT ByteString ExtractedOsv m ()
forall (m :: * -> *) a i o r.
MonadResource m =>
IO a -> (a -> IO ()) -> (a -> ConduitT i o m r) -> ConduitT i o m r
bracketP
((Tracer -> IO Span) -> Maybe Tracer -> IO (Maybe Span)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Maybe a -> f (Maybe b)
traverse (\Tracer
t -> Tracer -> Context -> Text -> SpanArguments -> IO Span
forall (m :: * -> *).
(MonadIO m, HasCallStack) =>
Tracer -> Context -> Text -> SpanArguments -> m Span
createSpan Tracer
t Context
Ctx.empty Text
"ecluse.pilot.osv.parse" SpanArguments
defaultSpanArguments{kind = Internal}) Maybe Tracer
mTracer)
((Span -> IO ()) -> Maybe Span -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (Span -> Maybe Timestamp -> IO ()
forall (m :: * -> *). MonadIO m => Span -> Maybe Timestamp -> m ()
`endSpan` Maybe Timestamp
forall a. Maybe a
Nothing))
(\Maybe Span
_ -> ConduitT ByteString (Either ZipEntry ByteString) m ZipInfo
-> ConduitT ByteString (Either ZipEntry ByteString) m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void ((forall a. IO a -> m a)
-> ConduitT ByteString (Either ZipEntry ByteString) IO ZipInfo
-> ConduitT ByteString (Either ZipEntry ByteString) m ZipInfo
forall (m :: * -> *) (n :: * -> *) i o r.
Monad m =>
(forall a. m a -> n a) -> ConduitT i o m r -> ConduitT i o n r
transPipe IO a -> m a
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO ConduitT ByteString (Either ZipEntry ByteString) IO ZipInfo
forall (m :: * -> *).
(MonadThrow m, PrimMonad m) =>
ConduitM ByteString (Either ZipEntry ByteString) m ZipInfo
unZipStream) ConduitT ByteString (Either ZipEntry ByteString) m ()
-> ConduitT (Either ZipEntry ByteString) ExtractedOsv m ()
-> ConduitT ByteString ExtractedOsv m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| ConduitT (Either ZipEntry ByteString) ExtractedOsv m ()
forall (m :: * -> *).
(MonadThrow m, KatipContext m) =>
ConduitT (Either ZipEntry ByteString) ExtractedOsv m ()
processZipEntries)
processZipEntries :: (MonadThrow m, KatipContext m) => ConduitT (Either ZipEntry ByteString) ExtractedOsv m ()
processZipEntries :: forall (m :: * -> *).
(MonadThrow m, KatipContext m) =>
ConduitT (Either ZipEntry ByteString) ExtractedOsv m ()
processZipEntries =
ConduitT
(Either ZipEntry ByteString)
ExtractedOsv
m
(Maybe (Either ZipEntry ByteString))
forall (m :: * -> *) i o. Monad m => ConduitT i o m (Maybe i)
await ConduitT
(Either ZipEntry ByteString)
ExtractedOsv
m
(Maybe (Either ZipEntry ByteString))
-> (Maybe (Either ZipEntry ByteString)
-> ConduitT (Either ZipEntry ByteString) ExtractedOsv m ())
-> ConduitT (Either ZipEntry ByteString) ExtractedOsv m ()
forall a b.
ConduitT (Either ZipEntry ByteString) ExtractedOsv m a
-> (a -> ConduitT (Either ZipEntry ByteString) ExtractedOsv m b)
-> ConduitT (Either ZipEntry ByteString) ExtractedOsv m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe (Either ZipEntry ByteString)
Nothing -> m () -> ConduitT (Either ZipEntry ByteString) ExtractedOsv m ()
forall (m :: * -> *) a.
Monad m =>
m a -> ConduitT (Either ZipEntry ByteString) ExtractedOsv m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m () -> ConduitT (Either ZipEntry ByteString) ExtractedOsv m ())
-> m () -> ConduitT (Either ZipEntry ByteString) ExtractedOsv m ()
forall a b. (a -> b) -> a -> b
$ Severity -> LogStr -> m ()
forall (m :: * -> *).
(Applicative m, KatipContext m) =>
Severity -> LogStr -> m ()
logFM Severity
InfoS (String -> LogStr
forall a. StringConv a Text => a -> LogStr
ls (String
"OSV stream fully processed" :: String))
Just (Left ZipEntry
entry) -> do
fileBytes <- ConduitT (Either ZipEntry ByteString) ExtractedOsv m ByteString
forall (m :: * -> *) o.
Monad m =>
ConduitT (Either ZipEntry ByteString) o m ByteString
collectFile
case decodeStrict fileBytes :: Maybe OsvAdvisory of
Just OsvAdvisory
adv -> [ExtractedOsv]
-> ConduitT
(Either ZipEntry ByteString) (Element [ExtractedOsv]) m ()
forall (m :: * -> *) mono i.
(Monad m, MonoFoldable mono) =>
mono -> ConduitT i (Element mono) m ()
yieldMany (OsvAdvisory -> [ExtractedOsv]
extractFromAdvisory OsvAdvisory
adv)
Maybe OsvAdvisory
Nothing -> m () -> ConduitT (Either ZipEntry ByteString) ExtractedOsv m ()
forall (m :: * -> *) a.
Monad m =>
m a -> ConduitT (Either ZipEntry ByteString) ExtractedOsv m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m () -> ConduitT (Either ZipEntry ByteString) ExtractedOsv m ())
-> m () -> ConduitT (Either ZipEntry ByteString) ExtractedOsv m ()
forall a b. (a -> b) -> a -> b
$ Severity -> LogStr -> m ()
forall (m :: * -> *).
(Applicative m, KatipContext m) =>
Severity -> LogStr -> m ()
logFM Severity
WarningS (Text -> LogStr
forall a. StringConv a Text => a -> LogStr
ls (Text
"Failed to parse OSV advisory JSON from entry: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> ZipEntry -> Text
zipEntryNameText ZipEntry
entry))
processZipEntries
Just (Right ByteString
_) -> ConduitT (Either ZipEntry ByteString) ExtractedOsv m ()
forall (m :: * -> *).
(MonadThrow m, KatipContext m) =>
ConduitT (Either ZipEntry ByteString) ExtractedOsv m ()
processZipEntries
zipEntryNameText :: ZipEntry -> Text
zipEntryNameText :: ZipEntry -> Text
zipEntryNameText ZipEntry
entry = case ZipEntry -> Either Text ByteString
zipEntryName ZipEntry
entry of
Left Text
txt -> Text
txt
Right ByteString
bs -> OnDecodeError -> ByteString -> Text
decodeUtf8With OnDecodeError
lenientDecode ByteString
bs
collectFile :: (Monad m) => ConduitT (Either ZipEntry ByteString) o m ByteString
collectFile :: forall (m :: * -> *) o.
Monad m =>
ConduitT (Either ZipEntry ByteString) o m ByteString
collectFile = [ByteString]
-> ConduitT (Either ZipEntry ByteString) o m ByteString
forall {m :: * -> *} {a} {o}.
Monad m =>
[ByteString] -> ConduitT (Either a ByteString) o m ByteString
go []
where
go :: [ByteString] -> ConduitT (Either a ByteString) o m ByteString
go [ByteString]
acc =
ConduitT (Either a ByteString) o m (Maybe (Either a ByteString))
forall (m :: * -> *) i o. Monad m => ConduitT i o m (Maybe i)
await ConduitT (Either a ByteString) o m (Maybe (Either a ByteString))
-> (Maybe (Either a ByteString)
-> ConduitT (Either a ByteString) o m ByteString)
-> ConduitT (Either a ByteString) o m ByteString
forall a b.
ConduitT (Either a ByteString) o m a
-> (a -> ConduitT (Either a ByteString) o m b)
-> ConduitT (Either a ByteString) o m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe (Either a ByteString)
Nothing -> ByteString -> ConduitT (Either a ByteString) o m ByteString
forall a. a -> ConduitT (Either a ByteString) o m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([ByteString] -> ByteString
BS.concat ([ByteString] -> [ByteString]
forall a. [a] -> [a]
reverse [ByteString]
acc))
Just (Left a
entry) -> do
Either a ByteString -> ConduitT (Either a ByteString) o m ()
forall i o (m :: * -> *). i -> ConduitT i o m ()
leftover (a -> Either a ByteString
forall a b. a -> Either a b
Left a
entry)
ByteString -> ConduitT (Either a ByteString) o m ByteString
forall a. a -> ConduitT (Either a ByteString) o m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([ByteString] -> ByteString
BS.concat ([ByteString] -> [ByteString]
forall a. [a] -> [a]
reverse [ByteString]
acc))
Just (Right ByteString
bs) -> [ByteString] -> ConduitT (Either a ByteString) o m ByteString
go (ByteString
bs ByteString -> [ByteString] -> [ByteString]
forall a. a -> [a] -> [a]
: [ByteString]
acc)