ecluse:ecluse-core
Safe HaskellNone
LanguageGHC2021

Ecluse.Core.Server.Stream

Description

Bounded-memory artifact streaming -- the constant-memory serve path.

The proxy serves an artifact by streaming it through from upstream, never buffering it whole: a multi-hundred-megabyte tarball must not become a local memory spike. The trap is resource lifetime. A WAI streaming body __runs after the handler returns__ (Warp serialises it while writing to the socket), so an upstream connection released when the handler returns lexically is already gone by the time the body streams -- a use-after-free.

Raw WAI avoids it by construction: Application is continuation-passing, so the upstream connection is acquired with withResponse bracketed around the respond call itself. The connection then lives for exactly the duration of the streamed body and is closed only when Warp returns ResponseReceived. pumpBody pulls one chunk from upstream, writes it through the sink's bounded output buffer -- blocking on the socket send whenever it spills -- before pulling the next, so the proxy reads from upstream only as fast as the client drains, giving constant memory regardless of artifact size with backpressure for free. Only the first chunk is explicitly flushed (prompt first byte); the rest coalesce in the output buffer, so the relay pays fewer socket sends than upstream chunks. No ResourceT, no conduit on the hot path (see docs/architecture/web-layer.md → "Streaming and resource lifetime").

This is the serve path; it streams, never buffers. The whole-artifact-in-memory fetchArtifact is the separate mirroring concern, not this.

Synopsis

Streaming a response through

streamUpstream :: Manager -> Request -> (Status -> ResponseHeaders -> (Status, ResponseHeaders)) -> (Response -> IO ResponseReceived) -> IO ResponseReceived Source #

Stream an upstream response through to the client with constant memory.

The upstream connection is opened with withResponse __bracketed around the respond call__, so it lives exactly as long as the streamed body and is released only after Warp returns ResponseReceived -- the WAI streaming-lifetime contract. The body is pumped chunk-by-chunk via pumpBody, whose write blocks on the socket, so upstream is read only as fast as the client drains (backpressure).

The relay argument chooses the client-facing status and headers from upstream's, so the caller controls what is forwarded (relaying an artifact's status and content headers unchanged, passing a 304 straight back, or filtering hop-by-hop headers) without this helper hard-coding a policy.

streamUpstreamWhen :: Manager -> Request -> (Status -> Bool) -> (Status -> ResponseHeaders -> (Status, ResponseHeaders)) -> (Response -> IO ResponseReceived) -> IO (Maybe ResponseReceived) Source #

Stream an upstream response through only when its status passes the accept predicate, keeping a recoverable miss distinct from an unrecoverable mid-stream failure.

This is the conditional relay the serve path's private-origin fetch needs: open the upstream, learn its status, stream the body on a hit, and on a miss fall through to another upstream -- without buffering and without leaking the connection. The two outcomes are deliberately kept apart:

  • Recoverable miss -- the connection could not be opened, or the status fails accept. No response has been committed, so the connection is closed and Nothing is returned and the caller may fall through to another upstream.
  • Committed stream -- the status passed, so the response is begun on the wire. From that point a failure pumping the body is unrecoverable: it is not collapsed into a miss (that would call respond a second time over a half-sent response), but propagates -- the connection torn down as it unwinds -- so the caller fails internally rather than responding again.

A passing isNotModified (304 Not Modified) status is the __pass-through conditional-GET relay__: it is committed like any accepted status, but answered bodiless (responseLBS over an empty body) rather than pumped, since a 304 carries no body (RFC 9110 §15.4.5) -- the upstream body reader is never read. This is how a client validator relayed upstream that matches comes straight back as a 304, the artifact never re-downloaded.

Only the connection open is caught here; once respond is reached exceptions fly. The connection is released on every path: a rejected status closes it before returning, a streamed (or failed) body closes it as the stream unwinds.

The accept predicate sees only the status (the hit/miss decision a serve fetch makes); a passing response is relayed exactly as streamUpstream would, the relay choosing the client-facing status and headers.

Probing without a body (HEAD)

probeUpstreamWhen :: Manager -> Request -> (Status -> Bool) -> (Status -> ResponseHeaders -> (Status, ResponseHeaders)) -> (Response -> IO ResponseReceived) -> IO (Maybe ResponseReceived) Source #

Probe an upstream without pumping a body -- the bodiless relay a HEAD takes, so a client cannot force the proxy to open the upstream artifact connection and stream a whole artifact to nowhere (the GET-pump amplification a HEAD must never trigger).

The request must already carry the HEAD method (the caller sets it), so the upstream sees a bodiless request too and replies with headers and no body. This mirrors streamUpstreamWhen's hit/miss split, but the committed phase answers with responseLBS over an empty body rather than the streaming pump:

  • Recoverable miss -- the connection could not be opened, or the status fails accept; no response is committed, the connection is closed, and Nothing is returned so the caller may fall through to another upstream.
  • Committed reply -- the status passed, so a bodiless response is sent with the relayed status and headers. The upstream body reader is never read.

The relay chooses the client-facing status and headers from upstream's (the same header-filtering the streamed path applies), so a HEAD relays an artifact's content headers -- Content-Type, Content-Length, ETag, and the like -- exactly as the matching GET would, only without the bytes. The connection is released on every path; nothing is pumped, so there is no mid-stream phase to guard.

The pump

pumpBody :: BodyReader -> (Builder -> IO ()) -> IO () -> IO () Source #

Pump a chunked body from a reader to a WAI stream sink with constant memory.

Each pull reads one chunk and writes it before the next is pulled, so at most one chunk (plus the sink's fixed output buffer) is ever resident. An empty chunk is the http-client BodyReader end-of-body terminator -- the pump stops on it and never writes it. Because write fills the sink's bounded output buffer and blocks on the socket send whenever it spills, the loop pulls from upstream only as fast as the client consumes: backpressure, and bounded memory independent of body size.

Only the first chunk is explicitly flushed, so the response's status, headers, and opening bytes reach the client promptly (time to first byte) even when upstream trickles. Later chunks are deliberately not flushed per chunk: at relay byte rates a per-chunk flush degenerates into a socket send per upstream read, and letting the sink coalesce writes into its buffer raises the streaming ceiling. The sink flushes whatever remains when the stream ends (Warp's stream-close contract), so the tail is never stranded.

Taking the reader and sink as plain actions (not a http-client response or a WAI Response) keeps the pump's memory and backpressure behaviour testable in process against an instrumented source and sink, with no socket.