module Ecluse.Core.Server.Stream (
streamUpstream,
streamUpstreamWhen,
probeUpstreamWhen,
pumpBody,
) where
import Data.ByteString qualified as BS
import Data.ByteString.Builder (Builder, byteString)
import Network.HTTP.Client (BodyReader, Manager, Request, brRead, responseClose, responseHeaders, responseOpen, responseStatus, withResponse)
import Network.HTTP.Client qualified as HTTP
import Network.HTTP.Types (ResponseHeaders, Status)
import Network.Wai (Response, ResponseReceived, responseLBS, responseStream)
import UnliftIO.Exception (finally, tryAny)
import Ecluse.Core.Server.Conditional (isNotModified)
streamUpstream ::
Manager ->
Request ->
(Status -> ResponseHeaders -> (Status, ResponseHeaders)) ->
(Response -> IO ResponseReceived) ->
IO ResponseReceived
streamUpstream :: Manager
-> Request
-> (Status -> ResponseHeaders -> (Status, ResponseHeaders))
-> (Response -> IO ResponseReceived)
-> IO ResponseReceived
streamUpstream Manager
manager Request
request Status -> ResponseHeaders -> (Status, ResponseHeaders)
relay Response -> IO ResponseReceived
respond =
Request
-> Manager
-> (Response BodyReader -> IO ResponseReceived)
-> IO ResponseReceived
forall a.
Request -> Manager -> (Response BodyReader -> IO a) -> IO a
withResponse Request
request Manager
manager ((Response BodyReader -> IO ResponseReceived)
-> IO ResponseReceived)
-> (Response BodyReader -> IO ResponseReceived)
-> IO ResponseReceived
forall a b. (a -> b) -> a -> b
$ \Response BodyReader
upstream ->
let (Status
status, ResponseHeaders
headers) = Status -> ResponseHeaders -> (Status, ResponseHeaders)
relay (Response BodyReader -> Status
forall body. Response body -> Status
responseStatus Response BodyReader
upstream) (Response BodyReader -> ResponseHeaders
forall body. Response body -> ResponseHeaders
responseHeaders Response BodyReader
upstream)
in Response -> IO ResponseReceived
respond (Response -> IO ResponseReceived)
-> Response -> IO ResponseReceived
forall a b. (a -> b) -> a -> b
$
Status -> ResponseHeaders -> StreamingBody -> Response
responseStream Status
status ResponseHeaders
headers (StreamingBody -> Response) -> StreamingBody -> Response
forall a b. (a -> b) -> a -> b
$ \Builder -> IO ()
write IO ()
flush ->
BodyReader -> StreamingBody
pumpBody (BodyReader -> BodyReader
brRead (Response BodyReader -> BodyReader
forall body. Response body -> body
HTTP.responseBody Response BodyReader
upstream)) Builder -> IO ()
write IO ()
flush
streamUpstreamWhen ::
Manager ->
Request ->
(Status -> Bool) ->
(Status -> ResponseHeaders -> (Status, ResponseHeaders)) ->
(Response -> IO ResponseReceived) ->
IO (Maybe ResponseReceived)
streamUpstreamWhen :: Manager
-> Request
-> (Status -> Bool)
-> (Status -> ResponseHeaders -> (Status, ResponseHeaders))
-> (Response -> IO ResponseReceived)
-> IO (Maybe ResponseReceived)
streamUpstreamWhen Manager
manager Request
request Status -> Bool
accept Status -> ResponseHeaders -> (Status, ResponseHeaders)
relay Response -> IO ResponseReceived
respond =
IO (Response BodyReader)
-> IO (Either SomeException (Response BodyReader))
forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> m (Either SomeException a)
tryAny (Request -> Manager -> IO (Response BodyReader)
responseOpen Request
request Manager
manager) IO (Either SomeException (Response BodyReader))
-> (Either SomeException (Response BodyReader)
-> IO (Maybe ResponseReceived))
-> IO (Maybe ResponseReceived)
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left SomeException
_ -> Maybe ResponseReceived -> IO (Maybe ResponseReceived)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe ResponseReceived
forall a. Maybe a
Nothing
Right Response BodyReader
upstream -> Response BodyReader -> IO (Maybe ResponseReceived)
stream Response BodyReader
upstream IO (Maybe ResponseReceived) -> IO () -> IO (Maybe ResponseReceived)
forall (m :: * -> *) a b. MonadUnliftIO m => m a -> m b -> m a
`finally` Response BodyReader -> IO ()
forall a. Response a -> IO ()
responseClose Response BodyReader
upstream
where
stream :: Response BodyReader -> IO (Maybe ResponseReceived)
stream Response BodyReader
upstream
| Bool -> Bool
not (Status -> Bool
accept Status
upstreamStatus) = Maybe ResponseReceived -> IO (Maybe ResponseReceived)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe ResponseReceived
forall a. Maybe a
Nothing
| Status -> Bool
isNotModified Status
upstreamStatus =
ResponseReceived -> Maybe ResponseReceived
forall a. a -> Maybe a
Just (ResponseReceived -> Maybe ResponseReceived)
-> IO ResponseReceived -> IO (Maybe ResponseReceived)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Response -> IO ResponseReceived
respond (Status -> ResponseHeaders -> ByteString -> Response
responseLBS Status
status ResponseHeaders
headers ByteString
"")
| Bool
otherwise =
ResponseReceived -> Maybe ResponseReceived
forall a. a -> Maybe a
Just (ResponseReceived -> Maybe ResponseReceived)
-> IO ResponseReceived -> IO (Maybe ResponseReceived)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Response -> IO ResponseReceived
respond (Status -> ResponseHeaders -> StreamingBody -> Response
responseStream Status
status ResponseHeaders
headers StreamingBody
pump)
where
upstreamStatus :: Status
upstreamStatus = Response BodyReader -> Status
forall body. Response body -> Status
responseStatus Response BodyReader
upstream
(Status
status, ResponseHeaders
headers) = Status -> ResponseHeaders -> (Status, ResponseHeaders)
relay Status
upstreamStatus (Response BodyReader -> ResponseHeaders
forall body. Response body -> ResponseHeaders
responseHeaders Response BodyReader
upstream)
pump :: StreamingBody
pump = BodyReader -> StreamingBody
pumpBody (BodyReader -> BodyReader
brRead (Response BodyReader -> BodyReader
forall body. Response body -> body
HTTP.responseBody Response BodyReader
upstream))
probeUpstreamWhen ::
Manager ->
Request ->
(Status -> Bool) ->
(Status -> ResponseHeaders -> (Status, ResponseHeaders)) ->
(Response -> IO ResponseReceived) ->
IO (Maybe ResponseReceived)
probeUpstreamWhen :: Manager
-> Request
-> (Status -> Bool)
-> (Status -> ResponseHeaders -> (Status, ResponseHeaders))
-> (Response -> IO ResponseReceived)
-> IO (Maybe ResponseReceived)
probeUpstreamWhen Manager
manager Request
request Status -> Bool
accept Status -> ResponseHeaders -> (Status, ResponseHeaders)
relay Response -> IO ResponseReceived
respond =
IO (Response BodyReader)
-> IO (Either SomeException (Response BodyReader))
forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> m (Either SomeException a)
tryAny (Request -> Manager -> IO (Response BodyReader)
responseOpen Request
request Manager
manager) IO (Either SomeException (Response BodyReader))
-> (Either SomeException (Response BodyReader)
-> IO (Maybe ResponseReceived))
-> IO (Maybe ResponseReceived)
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left SomeException
_ -> Maybe ResponseReceived -> IO (Maybe ResponseReceived)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe ResponseReceived
forall a. Maybe a
Nothing
Right Response BodyReader
upstream -> Response BodyReader -> IO (Maybe ResponseReceived)
forall {body}. Response body -> IO (Maybe ResponseReceived)
probe Response BodyReader
upstream IO (Maybe ResponseReceived) -> IO () -> IO (Maybe ResponseReceived)
forall (m :: * -> *) a b. MonadUnliftIO m => m a -> m b -> m a
`finally` Response BodyReader -> IO ()
forall a. Response a -> IO ()
responseClose Response BodyReader
upstream
where
probe :: Response body -> IO (Maybe ResponseReceived)
probe Response body
upstream
| Bool -> Bool
not (Status -> Bool
accept Status
upstreamStatus) = Maybe ResponseReceived -> IO (Maybe ResponseReceived)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe ResponseReceived
forall a. Maybe a
Nothing
| Bool
otherwise =
let (Status
status, ResponseHeaders
headers) = Status -> ResponseHeaders -> (Status, ResponseHeaders)
relay Status
upstreamStatus (Response body -> ResponseHeaders
forall body. Response body -> ResponseHeaders
responseHeaders Response body
upstream)
in
ResponseReceived -> Maybe ResponseReceived
forall a. a -> Maybe a
Just (ResponseReceived -> Maybe ResponseReceived)
-> IO ResponseReceived -> IO (Maybe ResponseReceived)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Response -> IO ResponseReceived
respond (Status -> ResponseHeaders -> ByteString -> Response
responseLBS Status
status ResponseHeaders
headers ByteString
"")
where
upstreamStatus :: Status
upstreamStatus = Response body -> Status
forall body. Response body -> Status
responseStatus Response body
upstream
pumpBody :: BodyReader -> (Builder -> IO ()) -> IO () -> IO ()
pumpBody :: BodyReader -> StreamingBody
pumpBody BodyReader
readChunk Builder -> IO ()
write IO ()
flush = do
opening <- BodyReader
readChunk
unless (BS.null opening) $ do
write (byteString opening)
flush
rest
where
rest :: IO ()
rest :: IO ()
rest = do
chunk <- BodyReader
readChunk
unless (BS.null chunk) $ do
write (byteString chunk)
rest