create notification subscriptions in batch subscribe (#477)
* create notification subscriptions in batch subscribe * refactor * refactor
This commit is contained in:
parent
aafe2d43f5
commit
1ce63bee44
|
@ -414,6 +414,9 @@ subscribeConnections' c connIds = do
|
|||
srvRcvQs :: Map SMPServer (Map ConnId (RcvQueue, ConnData)) = M.foldlWithKey' addRcvQueue M.empty rcvQs
|
||||
mapM_ (mapM_ (uncurry $ resumeMsgDelivery c) . sndQueue) cs
|
||||
rcvRs <- mapConcurrently subscribe (M.assocs srvRcvQs)
|
||||
ns <- asks ntfSupervisor
|
||||
tkn <- readTVarIO (ntfTkn ns)
|
||||
when (instantNotifications tkn) . void . forkIO $ sendNtfCreate ns rcvRs
|
||||
let rs = M.unions $ errs' : sndRs : rcvRs
|
||||
notifyResultError rs
|
||||
pure rs
|
||||
|
@ -433,6 +436,11 @@ subscribeConnections' c connIds = do
|
|||
addRcvQueue m connId rq@(RcvQueue {server}, _) = M.alter (Just . maybe (M.singleton connId rq) (M.insert connId rq)) server m
|
||||
subscribe :: (SMPServer, Map ConnId (RcvQueue, ConnData)) -> m (Map ConnId (Either AgentErrorType ()))
|
||||
subscribe (srv, qs) = subscribeQueues c srv (M.map fst qs)
|
||||
sendNtfCreate :: NtfSupervisor -> [Map ConnId (Either AgentErrorType ())] -> m ()
|
||||
sendNtfCreate ns rcvRs =
|
||||
forM_ (concatMap M.assocs rcvRs) $ \case
|
||||
(connId, Right _) -> atomically $ writeTBQueue (ntfSubQ ns) (connId, NSCCreate)
|
||||
_ -> pure ()
|
||||
sndQueue :: SomeConn -> Maybe (ConnData, SndQueue)
|
||||
sndQueue = \case
|
||||
SomeConn _ (DuplexConnection cData _ sq) -> Just (cData, sq)
|
||||
|
|
|
@ -11,6 +11,7 @@ module Simplex.Messaging.Agent.NtfSubSupervisor
|
|||
nsUpdateToken,
|
||||
nsRemoveNtfToken,
|
||||
sendNtfSubCommand,
|
||||
instantNotifications,
|
||||
closeNtfSupervisor,
|
||||
getNtfServer,
|
||||
)
|
||||
|
@ -327,13 +328,14 @@ nsRemoveNtfToken :: NtfSupervisor -> STM ()
|
|||
nsRemoveNtfToken ns = writeTVar (ntfTkn ns) Nothing
|
||||
|
||||
sendNtfSubCommand :: NtfSupervisor -> (ConnId, NtfSupervisorCommand) -> STM ()
|
||||
sendNtfSubCommand ns cmd =
|
||||
readTVar (ntfTkn ns)
|
||||
>>= mapM_
|
||||
( \NtfToken {ntfTknStatus, ntfMode} ->
|
||||
when (ntfTknStatus == NTActive && ntfMode == NMInstant) $
|
||||
writeTBQueue (ntfSubQ ns) cmd
|
||||
)
|
||||
sendNtfSubCommand ns cmd = do
|
||||
tkn <- readTVar (ntfTkn ns)
|
||||
when (instantNotifications tkn) $ writeTBQueue (ntfSubQ ns) cmd
|
||||
|
||||
instantNotifications :: Maybe NtfToken -> Bool
|
||||
instantNotifications = \case
|
||||
Just NtfToken {ntfTknStatus = NTActive, ntfMode = NMInstant} -> True
|
||||
_ -> False
|
||||
|
||||
closeNtfSupervisor :: NtfSupervisor -> IO ()
|
||||
closeNtfSupervisor ns = do
|
||||
|
|
Reference in New Issue