module Ecluse.Core.Queue (
MirrorQueue (..),
MirrorJob (..),
MirrorArtifact (..),
RemoteSpanContext (..),
QueueMessage (..),
ReceiptHandle,
mkReceiptHandle,
unReceiptHandle,
Seconds (..),
newInMemoryQueue,
MemoryQueueConfig (..),
defaultMemoryQueueConfig,
newBoundedInMemoryQueue,
memoryQueueBatchSize,
memoryQueueDropReportInterval,
newEnqueueBuffer,
) where
import Control.Concurrent.STM.TBQueue (TBQueue, isFullTBQueue, newTBQueueIO, readTBQueue, tryReadTBQueue, writeTBQueue)
import Data.Map.Strict qualified as Map
import Data.Sequence qualified as Seq
import System.Timeout (timeout)
import UnliftIO.Exception (tryAny)
import Ecluse.Core.Package (Hash, PackageName)
import Ecluse.Core.Version (Version)
data MirrorJob = MirrorJob
{ MirrorJob -> PackageName
jobPackage :: PackageName
, MirrorJob -> Version
jobVersion :: Version
, MirrorJob -> Text
jobArtifactUrl :: Text
, MirrorJob -> Text
jobMirrorTarget :: Text
, MirrorJob -> MirrorArtifact
jobArtifact :: MirrorArtifact
, MirrorJob -> Maybe RemoteSpanContext
jobTraceContext :: Maybe RemoteSpanContext
}
deriving stock (MirrorJob -> MirrorJob -> Bool
(MirrorJob -> MirrorJob -> Bool)
-> (MirrorJob -> MirrorJob -> Bool) -> Eq MirrorJob
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: MirrorJob -> MirrorJob -> Bool
== :: MirrorJob -> MirrorJob -> Bool
$c/= :: MirrorJob -> MirrorJob -> Bool
/= :: MirrorJob -> MirrorJob -> Bool
Eq, Int -> MirrorJob -> ShowS
[MirrorJob] -> ShowS
MirrorJob -> String
(Int -> MirrorJob -> ShowS)
-> (MirrorJob -> String)
-> ([MirrorJob] -> ShowS)
-> Show MirrorJob
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> MirrorJob -> ShowS
showsPrec :: Int -> MirrorJob -> ShowS
$cshow :: MirrorJob -> String
show :: MirrorJob -> String
$cshowList :: [MirrorJob] -> ShowS
showList :: [MirrorJob] -> ShowS
Show)
data MirrorArtifact = MirrorArtifact
{ MirrorArtifact -> Text
maFilename :: Text
, MirrorArtifact -> NonEmpty Hash
maHashes :: NonEmpty Hash
, MirrorArtifact -> Maybe Int
maSize :: Maybe Int
}
deriving stock (MirrorArtifact -> MirrorArtifact -> Bool
(MirrorArtifact -> MirrorArtifact -> Bool)
-> (MirrorArtifact -> MirrorArtifact -> Bool) -> Eq MirrorArtifact
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: MirrorArtifact -> MirrorArtifact -> Bool
== :: MirrorArtifact -> MirrorArtifact -> Bool
$c/= :: MirrorArtifact -> MirrorArtifact -> Bool
/= :: MirrorArtifact -> MirrorArtifact -> Bool
Eq, Int -> MirrorArtifact -> ShowS
[MirrorArtifact] -> ShowS
MirrorArtifact -> String
(Int -> MirrorArtifact -> ShowS)
-> (MirrorArtifact -> String)
-> ([MirrorArtifact] -> ShowS)
-> Show MirrorArtifact
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> MirrorArtifact -> ShowS
showsPrec :: Int -> MirrorArtifact -> ShowS
$cshow :: MirrorArtifact -> String
show :: MirrorArtifact -> String
$cshowList :: [MirrorArtifact] -> ShowS
showList :: [MirrorArtifact] -> ShowS
Show)
data RemoteSpanContext = RemoteSpanContext
{ RemoteSpanContext -> Text
rscTraceparent :: Text
, RemoteSpanContext -> Text
rscTracestate :: Text
}
deriving stock (RemoteSpanContext -> RemoteSpanContext -> Bool
(RemoteSpanContext -> RemoteSpanContext -> Bool)
-> (RemoteSpanContext -> RemoteSpanContext -> Bool)
-> Eq RemoteSpanContext
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: RemoteSpanContext -> RemoteSpanContext -> Bool
== :: RemoteSpanContext -> RemoteSpanContext -> Bool
$c/= :: RemoteSpanContext -> RemoteSpanContext -> Bool
/= :: RemoteSpanContext -> RemoteSpanContext -> Bool
Eq, Int -> RemoteSpanContext -> ShowS
[RemoteSpanContext] -> ShowS
RemoteSpanContext -> String
(Int -> RemoteSpanContext -> ShowS)
-> (RemoteSpanContext -> String)
-> ([RemoteSpanContext] -> ShowS)
-> Show RemoteSpanContext
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> RemoteSpanContext -> ShowS
showsPrec :: Int -> RemoteSpanContext -> ShowS
$cshow :: RemoteSpanContext -> String
show :: RemoteSpanContext -> String
$cshowList :: [RemoteSpanContext] -> ShowS
showList :: [RemoteSpanContext] -> ShowS
Show)
newtype ReceiptHandle = ReceiptHandle Text
deriving stock (ReceiptHandle -> ReceiptHandle -> Bool
(ReceiptHandle -> ReceiptHandle -> Bool)
-> (ReceiptHandle -> ReceiptHandle -> Bool) -> Eq ReceiptHandle
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ReceiptHandle -> ReceiptHandle -> Bool
== :: ReceiptHandle -> ReceiptHandle -> Bool
$c/= :: ReceiptHandle -> ReceiptHandle -> Bool
/= :: ReceiptHandle -> ReceiptHandle -> Bool
Eq, Eq ReceiptHandle
Eq ReceiptHandle =>
(ReceiptHandle -> ReceiptHandle -> Ordering)
-> (ReceiptHandle -> ReceiptHandle -> Bool)
-> (ReceiptHandle -> ReceiptHandle -> Bool)
-> (ReceiptHandle -> ReceiptHandle -> Bool)
-> (ReceiptHandle -> ReceiptHandle -> Bool)
-> (ReceiptHandle -> ReceiptHandle -> ReceiptHandle)
-> (ReceiptHandle -> ReceiptHandle -> ReceiptHandle)
-> Ord ReceiptHandle
ReceiptHandle -> ReceiptHandle -> Bool
ReceiptHandle -> ReceiptHandle -> Ordering
ReceiptHandle -> ReceiptHandle -> ReceiptHandle
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: ReceiptHandle -> ReceiptHandle -> Ordering
compare :: ReceiptHandle -> ReceiptHandle -> Ordering
$c< :: ReceiptHandle -> ReceiptHandle -> Bool
< :: ReceiptHandle -> ReceiptHandle -> Bool
$c<= :: ReceiptHandle -> ReceiptHandle -> Bool
<= :: ReceiptHandle -> ReceiptHandle -> Bool
$c> :: ReceiptHandle -> ReceiptHandle -> Bool
> :: ReceiptHandle -> ReceiptHandle -> Bool
$c>= :: ReceiptHandle -> ReceiptHandle -> Bool
>= :: ReceiptHandle -> ReceiptHandle -> Bool
$cmax :: ReceiptHandle -> ReceiptHandle -> ReceiptHandle
max :: ReceiptHandle -> ReceiptHandle -> ReceiptHandle
$cmin :: ReceiptHandle -> ReceiptHandle -> ReceiptHandle
min :: ReceiptHandle -> ReceiptHandle -> ReceiptHandle
Ord, Int -> ReceiptHandle -> ShowS
[ReceiptHandle] -> ShowS
ReceiptHandle -> String
(Int -> ReceiptHandle -> ShowS)
-> (ReceiptHandle -> String)
-> ([ReceiptHandle] -> ShowS)
-> Show ReceiptHandle
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ReceiptHandle -> ShowS
showsPrec :: Int -> ReceiptHandle -> ShowS
$cshow :: ReceiptHandle -> String
show :: ReceiptHandle -> String
$cshowList :: [ReceiptHandle] -> ShowS
showList :: [ReceiptHandle] -> ShowS
Show)
mkReceiptHandle :: Text -> ReceiptHandle
mkReceiptHandle :: Text -> ReceiptHandle
mkReceiptHandle = Text -> ReceiptHandle
ReceiptHandle
unReceiptHandle :: ReceiptHandle -> Text
unReceiptHandle :: ReceiptHandle -> Text
unReceiptHandle (ReceiptHandle Text
t) = Text
t
data QueueMessage = QueueMessage
{ QueueMessage -> MirrorJob
msgJob :: MirrorJob
, QueueMessage -> ReceiptHandle
msgReceipt :: ReceiptHandle
}
deriving stock (QueueMessage -> QueueMessage -> Bool
(QueueMessage -> QueueMessage -> Bool)
-> (QueueMessage -> QueueMessage -> Bool) -> Eq QueueMessage
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: QueueMessage -> QueueMessage -> Bool
== :: QueueMessage -> QueueMessage -> Bool
$c/= :: QueueMessage -> QueueMessage -> Bool
/= :: QueueMessage -> QueueMessage -> Bool
Eq, Int -> QueueMessage -> ShowS
[QueueMessage] -> ShowS
QueueMessage -> String
(Int -> QueueMessage -> ShowS)
-> (QueueMessage -> String)
-> ([QueueMessage] -> ShowS)
-> Show QueueMessage
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> QueueMessage -> ShowS
showsPrec :: Int -> QueueMessage -> ShowS
$cshow :: QueueMessage -> String
show :: QueueMessage -> String
$cshowList :: [QueueMessage] -> ShowS
showList :: [QueueMessage] -> ShowS
Show)
newtype Seconds = Seconds Int
deriving stock (Seconds -> Seconds -> Bool
(Seconds -> Seconds -> Bool)
-> (Seconds -> Seconds -> Bool) -> Eq Seconds
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: Seconds -> Seconds -> Bool
== :: Seconds -> Seconds -> Bool
$c/= :: Seconds -> Seconds -> Bool
/= :: Seconds -> Seconds -> Bool
Eq, Eq Seconds
Eq Seconds =>
(Seconds -> Seconds -> Ordering)
-> (Seconds -> Seconds -> Bool)
-> (Seconds -> Seconds -> Bool)
-> (Seconds -> Seconds -> Bool)
-> (Seconds -> Seconds -> Bool)
-> (Seconds -> Seconds -> Seconds)
-> (Seconds -> Seconds -> Seconds)
-> Ord Seconds
Seconds -> Seconds -> Bool
Seconds -> Seconds -> Ordering
Seconds -> Seconds -> Seconds
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: Seconds -> Seconds -> Ordering
compare :: Seconds -> Seconds -> Ordering
$c< :: Seconds -> Seconds -> Bool
< :: Seconds -> Seconds -> Bool
$c<= :: Seconds -> Seconds -> Bool
<= :: Seconds -> Seconds -> Bool
$c> :: Seconds -> Seconds -> Bool
> :: Seconds -> Seconds -> Bool
$c>= :: Seconds -> Seconds -> Bool
>= :: Seconds -> Seconds -> Bool
$cmax :: Seconds -> Seconds -> Seconds
max :: Seconds -> Seconds -> Seconds
$cmin :: Seconds -> Seconds -> Seconds
min :: Seconds -> Seconds -> Seconds
Ord, Int -> Seconds -> ShowS
[Seconds] -> ShowS
Seconds -> String
(Int -> Seconds -> ShowS)
-> (Seconds -> String) -> ([Seconds] -> ShowS) -> Show Seconds
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> Seconds -> ShowS
showsPrec :: Int -> Seconds -> ShowS
$cshow :: Seconds -> String
show :: Seconds -> String
$cshowList :: [Seconds] -> ShowS
showList :: [Seconds] -> ShowS
Show)
data MirrorQueue = MirrorQueue
{ MirrorQueue -> MirrorJob -> IO ()
enqueue :: MirrorJob -> IO ()
, MirrorQueue -> IO [QueueMessage]
receive :: IO [QueueMessage]
, MirrorQueue -> ReceiptHandle -> IO ()
ack :: ReceiptHandle -> IO ()
, MirrorQueue -> ReceiptHandle -> Seconds -> IO ()
extendVisibility :: ReceiptHandle -> Seconds -> IO ()
}
data QueueState = QueueState
{
QueueState -> Word64
qsNextReceipt :: Word64
,
QueueState -> Seq MirrorJob
qsVisible :: Seq MirrorJob
,
QueueState -> Map Word64 InFlight
qsInFlight :: Map Word64 InFlight
}
data InFlight = InFlight
{
InFlight -> MirrorJob
inFlightJob :: MirrorJob
,
InFlight -> Bool
inFlightHeld :: Bool
}
newInMemoryQueue :: IO MirrorQueue
newInMemoryQueue :: IO MirrorQueue
newInMemoryQueue = do
stateVar <- QueueState -> IO (TVar QueueState)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO (Word64 -> Seq MirrorJob -> Map Word64 InFlight -> QueueState
QueueState Word64
0 Seq MirrorJob
forall a. Seq a
Seq.empty Map Word64 InFlight
forall a. Monoid a => a
mempty)
let modifyState :: (QueueState -> QueueState) -> IO ()
modifyState = STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ())
-> ((QueueState -> QueueState) -> STM ())
-> (QueueState -> QueueState)
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TVar QueueState -> (QueueState -> QueueState) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar QueueState
stateVar
pure
MirrorQueue
{ enqueue = modifyState . enqueueJob
, receive = atomically $ do
qs <- readTVar stateVar
let (messages, qs') = deliverAll qs
writeTVar stateVar qs'
pure messages
, ack = modifyState . ackJob
, extendVisibility = \ReceiptHandle
handle Seconds
_seconds -> (QueueState -> QueueState) -> IO ()
modifyState (ReceiptHandle -> QueueState -> QueueState
holdJob ReceiptHandle
handle)
}
enqueueJob :: MirrorJob -> QueueState -> QueueState
enqueueJob :: MirrorJob -> QueueState -> QueueState
enqueueJob MirrorJob
job QueueState
qs = QueueState
qs{qsVisible = qsVisible qs <> Seq.singleton job}
ackJob :: ReceiptHandle -> QueueState -> QueueState
ackJob :: ReceiptHandle -> QueueState -> QueueState
ackJob ReceiptHandle
handle QueueState
qs =
case ReceiptHandle -> Maybe Word64
receiptKey ReceiptHandle
handle of
Just Word64
key -> QueueState
qs{qsInFlight = Map.delete key (qsInFlight qs)}
Maybe Word64
Nothing -> QueueState
qs
holdJob :: ReceiptHandle -> QueueState -> QueueState
holdJob :: ReceiptHandle -> QueueState -> QueueState
holdJob ReceiptHandle
handle QueueState
qs =
case ReceiptHandle -> Maybe Word64
receiptKey ReceiptHandle
handle of
Just Word64
key ->
QueueState
qs{qsInFlight = Map.adjust (\InFlight
f -> InFlight
f{inFlightHeld = True}) key (qsInFlight qs)}
Maybe Word64
Nothing -> QueueState
qs
receiptKey :: ReceiptHandle -> Maybe Word64
receiptKey :: ReceiptHandle -> Maybe Word64
receiptKey = String -> Maybe Word64
forall a. Read a => String -> Maybe a
readMaybe (String -> Maybe Word64)
-> (ReceiptHandle -> String) -> ReceiptHandle -> Maybe Word64
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> String
forall a. ToString a => a -> String
toString (Text -> String)
-> (ReceiptHandle -> Text) -> ReceiptHandle -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ReceiptHandle -> Text
unReceiptHandle
deliverAll :: QueueState -> ([QueueMessage], QueueState)
deliverAll :: QueueState -> ([QueueMessage], QueueState)
deliverAll QueueState
qs =
let ([MirrorJob]
reclaimed, [(Word64, InFlight)]
stillHeld) = [(Word64, InFlight)] -> ([MirrorJob], [(Word64, InFlight)])
reclaim (Map Word64 InFlight -> [(Word64, InFlight)]
forall k a. Map k a -> [(k, a)]
Map.toList (QueueState -> Map Word64 InFlight
qsInFlight QueueState
qs))
toDeliver :: Seq MirrorJob
toDeliver = [MirrorJob] -> Seq MirrorJob
forall a. [a] -> Seq a
Seq.fromList [MirrorJob]
reclaimed Seq MirrorJob -> Seq MirrorJob -> Seq MirrorJob
forall a. Semigroup a => a -> a -> a
<> QueueState -> Seq MirrorJob
qsVisible QueueState
qs
([QueueMessage]
messages, Word64
nextReceipt, Map Word64 InFlight
delivered) =
Word64
-> [MirrorJob] -> ([QueueMessage], Word64, Map Word64 InFlight)
assignReceipts (QueueState -> Word64
qsNextReceipt QueueState
qs) (Seq MirrorJob -> [MirrorJob]
forall a. Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList Seq MirrorJob
toDeliver)
in ( [QueueMessage]
messages
, QueueState
{ qsNextReceipt :: Word64
qsNextReceipt = Word64
nextReceipt
, qsVisible :: Seq MirrorJob
qsVisible = Seq MirrorJob
forall a. Seq a
Seq.empty
, qsInFlight :: Map Word64 InFlight
qsInFlight = [(Word64, InFlight)] -> Map Word64 InFlight
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList [(Word64, InFlight)]
stillHeld Map Word64 InFlight -> Map Word64 InFlight -> Map Word64 InFlight
forall a. Semigroup a => a -> a -> a
<> Map Word64 InFlight
delivered
}
)
reclaim ::
[(Word64, InFlight)] ->
([MirrorJob], [(Word64, InFlight)])
reclaim :: [(Word64, InFlight)] -> ([MirrorJob], [(Word64, InFlight)])
reclaim = ((Word64, InFlight)
-> ([MirrorJob], [(Word64, InFlight)])
-> ([MirrorJob], [(Word64, InFlight)]))
-> ([MirrorJob], [(Word64, InFlight)])
-> [(Word64, InFlight)]
-> ([MirrorJob], [(Word64, InFlight)])
forall a b. (a -> b -> b) -> b -> [a] -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr (Word64, InFlight)
-> ([MirrorJob], [(Word64, InFlight)])
-> ([MirrorJob], [(Word64, InFlight)])
forall {a}.
(a, InFlight)
-> ([MirrorJob], [(a, InFlight)]) -> ([MirrorJob], [(a, InFlight)])
step ([], [])
where
step :: (a, InFlight)
-> ([MirrorJob], [(a, InFlight)]) -> ([MirrorJob], [(a, InFlight)])
step (a
key, InFlight
f) ([MirrorJob]
jobs, [(a, InFlight)]
held)
| InFlight -> Bool
inFlightHeld InFlight
f = ([MirrorJob]
jobs, (a
key, InFlight
f{inFlightHeld = False}) (a, InFlight) -> [(a, InFlight)] -> [(a, InFlight)]
forall a. a -> [a] -> [a]
: [(a, InFlight)]
held)
| Bool
otherwise = (InFlight -> MirrorJob
inFlightJob InFlight
f MirrorJob -> [MirrorJob] -> [MirrorJob]
forall a. a -> [a] -> [a]
: [MirrorJob]
jobs, [(a, InFlight)]
held)
assignReceipts ::
Word64 ->
[MirrorJob] ->
([QueueMessage], Word64, Map Word64 InFlight)
assignReceipts :: Word64
-> [MirrorJob] -> ([QueueMessage], Word64, Map Word64 InFlight)
assignReceipts Word64
next [] = ([], Word64
next, Map Word64 InFlight
forall a. Monoid a => a
mempty)
assignReceipts Word64
next (MirrorJob
job : [MirrorJob]
rest) =
let message :: QueueMessage
message = QueueMessage{msgJob :: MirrorJob
msgJob = MirrorJob
job, msgReceipt :: ReceiptHandle
msgReceipt = Text -> ReceiptHandle
mkReceiptHandle (Word64 -> Text
forall b a. (Show a, IsString b) => a -> b
show Word64
next)}
([QueueMessage]
messages, Word64
next', Map Word64 InFlight
inFlight) = Word64
-> [MirrorJob] -> ([QueueMessage], Word64, Map Word64 InFlight)
assignReceipts (Word64
next Word64 -> Word64 -> Word64
forall a. Num a => a -> a -> a
+ Word64
1) [MirrorJob]
rest
in ( QueueMessage
message QueueMessage -> [QueueMessage] -> [QueueMessage]
forall a. a -> [a] -> [a]
: [QueueMessage]
messages
, Word64
next'
, Word64 -> InFlight -> Map Word64 InFlight -> Map Word64 InFlight
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert Word64
next (InFlight{inFlightJob :: MirrorJob
inFlightJob = MirrorJob
job, inFlightHeld :: Bool
inFlightHeld = Bool
False}) Map Word64 InFlight
inFlight
)
data MemoryQueueConfig = MemoryQueueConfig
{ MemoryQueueConfig -> Int
memQueueMaxDepth :: Int
, MemoryQueueConfig -> Int
memQueuePollWaitMicros :: Int
}
deriving stock (MemoryQueueConfig -> MemoryQueueConfig -> Bool
(MemoryQueueConfig -> MemoryQueueConfig -> Bool)
-> (MemoryQueueConfig -> MemoryQueueConfig -> Bool)
-> Eq MemoryQueueConfig
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: MemoryQueueConfig -> MemoryQueueConfig -> Bool
== :: MemoryQueueConfig -> MemoryQueueConfig -> Bool
$c/= :: MemoryQueueConfig -> MemoryQueueConfig -> Bool
/= :: MemoryQueueConfig -> MemoryQueueConfig -> Bool
Eq, Int -> MemoryQueueConfig -> ShowS
[MemoryQueueConfig] -> ShowS
MemoryQueueConfig -> String
(Int -> MemoryQueueConfig -> ShowS)
-> (MemoryQueueConfig -> String)
-> ([MemoryQueueConfig] -> ShowS)
-> Show MemoryQueueConfig
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> MemoryQueueConfig -> ShowS
showsPrec :: Int -> MemoryQueueConfig -> ShowS
$cshow :: MemoryQueueConfig -> String
show :: MemoryQueueConfig -> String
$cshowList :: [MemoryQueueConfig] -> ShowS
showList :: [MemoryQueueConfig] -> ShowS
Show)
defaultMemoryQueueConfig :: Int -> MemoryQueueConfig
defaultMemoryQueueConfig :: Int -> MemoryQueueConfig
defaultMemoryQueueConfig Int
maxDepth =
MemoryQueueConfig
{ memQueueMaxDepth :: Int
memQueueMaxDepth = Int
maxDepth
, memQueuePollWaitMicros :: Int
memQueuePollWaitMicros = Int
20_000_000
}
memoryQueueBatchSize :: Int
memoryQueueBatchSize :: Int
memoryQueueBatchSize = Int
10
memoryQueueDropReportInterval :: Int
memoryQueueDropReportInterval :: Int
memoryQueueDropReportInterval = Int
1000
newBoundedInMemoryQueue ::
MemoryQueueConfig ->
(Int -> IO ()) ->
IO MirrorQueue
newBoundedInMemoryQueue :: MemoryQueueConfig -> (Int -> IO ()) -> IO MirrorQueue
newBoundedInMemoryQueue MemoryQueueConfig
cfg Int -> IO ()
onDrop = do
queue <- Natural -> IO (TBQueue MirrorJob)
forall a. Natural -> IO (TBQueue a)
newTBQueueIO (Int -> Natural
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
1 (MemoryQueueConfig -> Int
memQueueMaxDepth MemoryQueueConfig
cfg)))
dropCount <- newTVarIO (0 :: Int)
nextReceipt <- newTVarIO (0 :: Word64)
pure
MirrorQueue
{ enqueue = \MirrorJob
job -> do
dropped <- STM (Maybe Int) -> IO (Maybe Int)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (TBQueue MirrorJob -> TVar Int -> MirrorJob -> STM (Maybe Int)
writeOrDrop TBQueue MirrorJob
queue TVar Int
dropCount MirrorJob
job)
whenJust dropped (\Int
n -> Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int -> Bool
shouldReportDrop Int
n) (Int -> IO ()
onDrop Int
n))
,
receive = fromMaybe [] <$> timeout (memQueuePollWaitMicros cfg) (atomically (receiveBatch queue nextReceipt))
,
ack = const pass
, extendVisibility = \ReceiptHandle
_ Seconds
_ -> IO ()
forall (f :: * -> *). Applicative f => f ()
pass
}
shouldReportDrop :: Int -> Bool
shouldReportDrop :: Int -> Bool
shouldReportDrop Int
n = Int
n Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
1 Bool -> Bool -> Bool
|| Int
n Int -> Int -> Int
forall a. Integral a => a -> a -> a
`mod` Int
memoryQueueDropReportInterval Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
receiveBatch :: TBQueue MirrorJob -> TVar Word64 -> STM [QueueMessage]
receiveBatch :: TBQueue MirrorJob -> TVar Word64 -> STM [QueueMessage]
receiveBatch TBQueue MirrorJob
queue TVar Word64
nextReceipt = do
headJob <- TBQueue MirrorJob -> STM MirrorJob
forall a. TBQueue a -> STM a
readTBQueue TBQueue MirrorJob
queue
rest <- drainUpTo (memoryQueueBatchSize - 1)
traverse assignReceipt (headJob : rest)
where
drainUpTo :: Int -> STM [MirrorJob]
drainUpTo :: Int -> STM [MirrorJob]
drainUpTo Int
budget
| Int
budget Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0 = [MirrorJob] -> STM [MirrorJob]
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
| Bool
otherwise =
TBQueue MirrorJob -> STM (Maybe MirrorJob)
forall a. TBQueue a -> STM (Maybe a)
tryReadTBQueue TBQueue MirrorJob
queue STM (Maybe MirrorJob)
-> (Maybe MirrorJob -> STM [MirrorJob]) -> STM [MirrorJob]
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe MirrorJob
Nothing -> [MirrorJob] -> STM [MirrorJob]
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
Just MirrorJob
job -> (MirrorJob
job MirrorJob -> [MirrorJob] -> [MirrorJob]
forall a. a -> [a] -> [a]
:) ([MirrorJob] -> [MirrorJob]) -> STM [MirrorJob] -> STM [MirrorJob]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Int -> STM [MirrorJob]
drainUpTo (Int
budget Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1)
assignReceipt :: MirrorJob -> STM QueueMessage
assignReceipt :: MirrorJob -> STM QueueMessage
assignReceipt MirrorJob
job = do
n <- TVar Word64 -> STM Word64
forall a. TVar a -> STM a
readTVar TVar Word64
nextReceipt
writeTVar nextReceipt (n + 1)
pure QueueMessage{msgJob = job, msgReceipt = mkReceiptHandle (show n)}
writeOrDrop :: TBQueue MirrorJob -> TVar Int -> MirrorJob -> STM (Maybe Int)
writeOrDrop :: TBQueue MirrorJob -> TVar Int -> MirrorJob -> STM (Maybe Int)
writeOrDrop TBQueue MirrorJob
queue TVar Int
dropCount MirrorJob
job = do
full <- TBQueue MirrorJob -> STM Bool
forall a. TBQueue a -> STM Bool
isFullTBQueue TBQueue MirrorJob
queue
if full
then Just <$> bumpCount dropCount
else writeTBQueue queue job $> Nothing
bumpCount :: TVar Int -> STM Int
bumpCount :: TVar Int -> STM Int
bumpCount TVar Int
counter = do
n <- (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) (Int -> Int) -> STM Int -> STM Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
counter
writeTVar counter n
pure n
newEnqueueBuffer ::
Int ->
(Int -> IO ()) ->
(Int -> Text -> IO ()) ->
MirrorQueue ->
IO (MirrorQueue, IO ())
newEnqueueBuffer :: Int
-> (Int -> IO ())
-> (Int -> Text -> IO ())
-> MirrorQueue
-> IO (MirrorQueue, IO ())
newEnqueueBuffer Int
depth Int -> IO ()
onDrop Int -> Text -> IO ()
onDeliveryFailure MirrorQueue
backend = do
buffer <- Natural -> IO (TBQueue MirrorJob)
forall a. Natural -> IO (TBQueue a)
newTBQueueIO (Int -> Natural
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
1 Int
depth))
dropCount <- newTVarIO (0 :: Int)
failureCount <- newTVarIO (0 :: Int)
let
handOff MirrorJob
job = do
dropped <- STM (Maybe Int) -> IO (Maybe Int)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (TBQueue MirrorJob -> TVar Int -> MirrorJob -> STM (Maybe Int)
writeOrDrop TBQueue MirrorJob
buffer TVar Int
dropCount MirrorJob
job)
whenJust dropped onDrop
pure (backend{enqueue = handOff}, forever (deliverNext buffer failureCount onDeliveryFailure backend))
deliverNext :: TBQueue MirrorJob -> TVar Int -> (Int -> Text -> IO ()) -> MirrorQueue -> IO ()
deliverNext :: TBQueue MirrorJob
-> TVar Int -> (Int -> Text -> IO ()) -> MirrorQueue -> IO ()
deliverNext TBQueue MirrorJob
buffer TVar Int
failureCount Int -> Text -> IO ()
onDeliveryFailure MirrorQueue
backend = do
job <- STM MirrorJob -> IO MirrorJob
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (TBQueue MirrorJob -> STM MirrorJob
forall a. TBQueue a -> STM a
readTBQueue TBQueue MirrorJob
buffer)
tryAny (enqueue backend job) >>= \case
Left SomeException
failure -> do
n <- STM Int -> IO Int
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (TVar Int -> STM Int
bumpCount TVar Int
failureCount)
onDeliveryFailure n (toText (displayException failure))
Right () -> IO ()
forall (f :: * -> *). Applicative f => f ()
pass