suspend ntf operations when agent is suspended (#453)
* suspend ntf operations when agent is suspended * end and begin ntf operation on loop
This commit is contained in:
parent
77f1d45021
commit
f9cd7e5416
|
@ -811,6 +811,7 @@ activateAgent' c = atomically $ do
|
|||
activate sndNetworkOp
|
||||
activate msgDeliveryOp
|
||||
activate rcvNetworkOp
|
||||
activate ntfNetworkOp
|
||||
where
|
||||
activate opSel = modifyTVar' (opSel c) $ \s -> s {opSuspended = False}
|
||||
|
||||
|
@ -819,6 +820,7 @@ suspendAgent' c@AgentClient {agentState = as} maxDelay = do
|
|||
state <-
|
||||
atomically $ do
|
||||
writeTVar as ASSuspending
|
||||
suspendOperation c AONtfNetwork $ pure ()
|
||||
suspendOperation c AORcvNetwork $
|
||||
suspendOperation c AOMsgDelivery $
|
||||
suspendSendingAndDatabase c
|
||||
|
@ -842,12 +844,11 @@ getSMPServer c = do
|
|||
|
||||
subscriber :: (MonadUnliftIO m, MonadReader Env m) => AgentClient -> m ()
|
||||
subscriber c@AgentClient {msgQ} = forever $ do
|
||||
atomically $ endAgentOperation c AORcvNetwork
|
||||
t <- atomically $ readTBQueue msgQ
|
||||
atomically $ beginAgentOperation c AORcvNetwork
|
||||
withAgentLock c (runExceptT $ processSMPTransmission c t) >>= \case
|
||||
Left e -> liftIO $ print e
|
||||
Right _ -> return ()
|
||||
agentOperationBracket c AORcvNetwork $
|
||||
withAgentLock c (runExceptT $ processSMPTransmission c t) >>= \case
|
||||
Left e -> liftIO $ print e
|
||||
Right _ -> return ()
|
||||
|
||||
processSMPTransmission :: forall m. AgentMonad m => AgentClient -> ServerTransmission BrokerMsg -> m ()
|
||||
processSMPTransmission c@AgentClient {smpClients, subQ} (srv, sessId, rId, cmd) =
|
||||
|
|
|
@ -52,6 +52,7 @@ module Simplex.Messaging.Agent.Client
|
|||
AgentOperation (..),
|
||||
AgentOpState (..),
|
||||
AgentState (..),
|
||||
agentOperationBracket,
|
||||
beginAgentOperation,
|
||||
endAgentOperation,
|
||||
suspendSendingAndDatabase,
|
||||
|
@ -127,6 +128,7 @@ data AgentClient = AgentClient
|
|||
connMsgsQueued :: TMap ConnId Bool,
|
||||
smpQueueMsgQueues :: TMap (ConnId, SMPServer, SMP.SenderId) (TQueue InternalId),
|
||||
smpQueueMsgDeliveries :: TMap (ConnId, SMPServer, SMP.SenderId) (Async ()),
|
||||
ntfNetworkOp :: TVar AgentOpState,
|
||||
rcvNetworkOp :: TVar AgentOpState,
|
||||
msgDeliveryOp :: TVar AgentOpState,
|
||||
sndNetworkOp :: TVar AgentOpState,
|
||||
|
@ -140,11 +142,12 @@ data AgentClient = AgentClient
|
|||
lock :: TMVar ()
|
||||
}
|
||||
|
||||
data AgentOperation = AORcvNetwork | AOMsgDelivery | AOSndNetwork | AODatabase
|
||||
data AgentOperation = AONtfNetwork | AORcvNetwork | AOMsgDelivery | AOSndNetwork | AODatabase
|
||||
deriving (Eq, Show)
|
||||
|
||||
agentOpSel :: AgentOperation -> (AgentClient -> TVar AgentOpState)
|
||||
agentOpSel = \case
|
||||
AONtfNetwork -> ntfNetworkOp
|
||||
AORcvNetwork -> rcvNetworkOp
|
||||
AOMsgDelivery -> msgDeliveryOp
|
||||
AOSndNetwork -> sndNetworkOp
|
||||
|
@ -172,6 +175,7 @@ newAgentClient InitialAgentServers {smp, ntf} agentEnv = do
|
|||
connMsgsQueued <- TM.empty
|
||||
smpQueueMsgQueues <- TM.empty
|
||||
smpQueueMsgDeliveries <- TM.empty
|
||||
ntfNetworkOp <- newTVar $ AgentOpState False 0
|
||||
rcvNetworkOp <- newTVar $ AgentOpState False 0
|
||||
msgDeliveryOp <- newTVar $ AgentOpState False 0
|
||||
sndNetworkOp <- newTVar $ AgentOpState False 0
|
||||
|
@ -182,7 +186,7 @@ newAgentClient InitialAgentServers {smp, ntf} agentEnv = do
|
|||
asyncClients <- newTVar []
|
||||
clientId <- stateTVar (clientCounter agentEnv) $ \i -> let i' = i + 1 in (i', i')
|
||||
lock <- newTMVar ()
|
||||
return AgentClient {active, rcvQ, subQ, msgQ, smpServers, smpClients, ntfServers, ntfClients, subscrSrvrs, pendingSubscrSrvrs, subscrConns, connMsgsQueued, smpQueueMsgQueues, smpQueueMsgDeliveries, rcvNetworkOp, msgDeliveryOp, sndNetworkOp, databaseOp, agentState, getMsgLocks, reconnections, asyncClients, clientId, agentEnv, lock}
|
||||
return AgentClient {active, rcvQ, subQ, msgQ, smpServers, smpClients, ntfServers, ntfClients, subscrSrvrs, pendingSubscrSrvrs, subscrConns, connMsgsQueued, smpQueueMsgQueues, smpQueueMsgDeliveries, ntfNetworkOp, rcvNetworkOp, msgDeliveryOp, sndNetworkOp, databaseOp, agentState, getMsgLocks, reconnections, asyncClients, clientId, agentEnv, lock}
|
||||
|
||||
agentDbPath :: AgentClient -> FilePath
|
||||
agentDbPath AgentClient {agentEnv = Env {store = SQLiteStore {dbFilePath}}} = dbFilePath
|
||||
|
@ -675,6 +679,7 @@ cryptoError = \case
|
|||
|
||||
endAgentOperation :: AgentClient -> AgentOperation -> STM ()
|
||||
endAgentOperation c op = endOperation c op $ case op of
|
||||
AONtfNetwork -> pure ()
|
||||
AORcvNetwork ->
|
||||
suspendOperation c AOMsgDelivery $
|
||||
suspendSendingAndDatabase c
|
||||
|
@ -724,16 +729,21 @@ beginAgentOperation c op = do
|
|||
-- unsafeIOToSTM $ putStrLn $ "beginOperation! " <> show op <> " " <> show (opsInProgress s + 1)
|
||||
writeTVar opVar $! s {opsInProgress = opsInProgress s + 1}
|
||||
|
||||
agentOperationBracket :: MonadUnliftIO m => AgentClient -> AgentOperation -> m a -> m a
|
||||
agentOperationBracket c op action =
|
||||
E.bracket
|
||||
(atomically $ beginAgentOperation c op)
|
||||
(\_ -> atomically $ endAgentOperation c op)
|
||||
(\_ -> action)
|
||||
|
||||
withStore' :: AgentMonad m => AgentClient -> (DB.Connection -> IO a) -> m a
|
||||
withStore' c action = withStore c $ fmap Right . action
|
||||
|
||||
withStore :: AgentMonad m => AgentClient -> (DB.Connection -> IO (Either StoreError a)) -> m a
|
||||
withStore c action = do
|
||||
st <- asks store
|
||||
atomically $ beginAgentOperation c AODatabase
|
||||
r <- liftIO $ withTransaction st action `E.catch` handleInternal
|
||||
atomically $ endAgentOperation c AODatabase
|
||||
liftEither $ first storeError r
|
||||
liftEitherError storeError . agentOperationBracket c AODatabase $
|
||||
withTransaction st action `E.catch` handleInternal
|
||||
where
|
||||
handleInternal :: E.SomeException -> IO (Either StoreError a)
|
||||
handleInternal = pure . Left . SEInternal . bshow
|
||||
|
|
|
@ -68,7 +68,8 @@ data AgentConfig = AgentConfig
|
|||
helloTimeout :: NominalDiffTime,
|
||||
resubscriptionConcurrency :: Int,
|
||||
ntfCron :: Word16,
|
||||
ntfWorkerThrottle :: Int,
|
||||
ntfWorkerDelay :: Int,
|
||||
ntfSMPWorkerDelay :: Int,
|
||||
ntfSubCheckInterval :: NominalDiffTime,
|
||||
ntfMaxMessages :: Int,
|
||||
caCertificateFile :: FilePath,
|
||||
|
@ -103,7 +104,8 @@ defaultAgentConfig =
|
|||
helloTimeout = 2 * nominalDay,
|
||||
resubscriptionConcurrency = 16,
|
||||
ntfCron = 20, -- minutes
|
||||
ntfWorkerThrottle = 1000000, -- microseconds
|
||||
ntfWorkerDelay = 100000, -- microseconds
|
||||
ntfSMPWorkerDelay = 500000, -- microseconds
|
||||
ntfSubCheckInterval = nominalDay,
|
||||
ntfMaxMessages = 4,
|
||||
-- CA certificate private key is not needed for initialization
|
||||
|
|
|
@ -26,6 +26,7 @@ import Control.Monad.Reader
|
|||
import Data.Bifunctor (first)
|
||||
import Data.Fixed (Fixed (MkFixed), Pico)
|
||||
import qualified Data.Map.Strict as M
|
||||
import Data.Text (Text)
|
||||
import Data.Time (UTCTime, addUTCTime, diffUTCTime, getCurrentTime, nominalDiffTimeToSeconds)
|
||||
import Simplex.Messaging.Agent.Client
|
||||
import Simplex.Messaging.Agent.Env.SQLite
|
||||
|
@ -49,14 +50,17 @@ import qualified UnliftIO.Exception as E
|
|||
import UnliftIO.STM
|
||||
|
||||
runNtfSupervisor :: (MonadUnliftIO m, MonadReader Env m) => AgentClient -> m ()
|
||||
runNtfSupervisor c = forever . handleError $ do
|
||||
runNtfSupervisor c = do
|
||||
ns <- asks ntfSupervisor
|
||||
cmd <- atomically . readTBQueue $ ntfSubQ ns
|
||||
runExceptT (processNtfSub c cmd) >>= \case
|
||||
Left e -> liftIO $ print e
|
||||
Right _ -> return ()
|
||||
forever . handleError $ do
|
||||
cmd <- atomically . readTBQueue $ ntfSubQ ns
|
||||
agentOperationBracket c AONtfNetwork $
|
||||
runExceptT (processNtfSub c cmd) >>= \case
|
||||
Left e -> liftIO $ print e
|
||||
Right _ -> return ()
|
||||
where
|
||||
handleError = E.handle $ \(e :: E.SomeException) -> logError $ "runNtfSupervisor error " <> tshow e
|
||||
handleError = E.handle $ \(e :: E.SomeException) -> do
|
||||
logError $ "runNtfSupervisor error " <> tshow e
|
||||
|
||||
processNtfSub :: forall m. AgentMonad m => AgentClient -> (ConnId, NtfSupervisorCommand) -> m ()
|
||||
processNtfSub c@AgentClient {subQ} (connId, cmd) = do
|
||||
|
@ -152,26 +156,24 @@ withNtfServer :: AgentMonad m => AgentClient -> (NtfServer -> m ()) -> m ()
|
|||
withNtfServer c action = getNtfServer c >>= mapM_ action
|
||||
|
||||
runNtfWorker :: forall m. AgentMonad m => AgentClient -> NtfServer -> TMVar () -> m ()
|
||||
runNtfWorker c srv doWork = forever $ do
|
||||
void . atomically $ readTMVar doWork
|
||||
nextSub_ <- withStore' c (`getNextNtfSubNTFAction` srv)
|
||||
logInfo $ "runNtfWorker, nextSub_ " <> tshow nextSub_
|
||||
case nextSub_ of
|
||||
Nothing -> noWorkToDo
|
||||
Just a@(NtfSubscription {connId}, _, _) -> do
|
||||
ri <- asks $ reconnectInterval . config
|
||||
withRetryInterval ri $ \loop ->
|
||||
processAction a
|
||||
`catchError` ( \e -> do
|
||||
logInfo $ "runNtfWorker, error " <> tshow e
|
||||
case e of
|
||||
BROKER NETWORK -> loop
|
||||
BROKER TIMEOUT -> loop
|
||||
_ -> ntfInternalError c connId (show e)
|
||||
)
|
||||
throttle <- asks $ ntfWorkerThrottle . config
|
||||
liftIO $ threadDelay throttle
|
||||
runNtfWorker c srv doWork = do
|
||||
delay <- asks $ ntfWorkerDelay . config
|
||||
forever $ do
|
||||
void . atomically $ readTMVar doWork
|
||||
agentOperationBracket c AONtfNetwork runNtfOperation
|
||||
threadDelay delay
|
||||
where
|
||||
runNtfOperation :: m ()
|
||||
runNtfOperation = do
|
||||
nextSub_ <- withStore' c (`getNextNtfSubNTFAction` srv)
|
||||
logInfo $ "runNtfWorker, nextSub_ " <> tshow nextSub_
|
||||
case nextSub_ of
|
||||
Nothing -> noWorkToDo
|
||||
Just a@(NtfSubscription {connId}, _, _) -> do
|
||||
ri <- asks $ reconnectInterval . config
|
||||
withRetryInterval ri $ \loop ->
|
||||
processAction a
|
||||
`catchError` retryOnError c "NtfWorker" loop (ntfInternalError c connId . show)
|
||||
noWorkToDo = void . atomically $ tryTakeTMVar doWork
|
||||
processAction :: (NtfSubscription, NtfSubNTFAction, NtfActionTs) -> m ()
|
||||
processAction (sub@NtfSubscription {connId, smpServer, ntfSubId}, action, actionTs) = do
|
||||
|
@ -230,26 +232,23 @@ runNtfWorker c srv doWork = forever $ do
|
|||
updateNtfSubscription db sub {ntfSubStatus = toStatus} toAction actionTs'
|
||||
|
||||
runNtfSMPWorker :: forall m. AgentMonad m => AgentClient -> SMPServer -> TMVar () -> m ()
|
||||
runNtfSMPWorker c srv doWork = forever $ do
|
||||
void . atomically $ readTMVar doWork
|
||||
nextSub_ <- withStore' c (`getNextNtfSubSMPAction` srv)
|
||||
logInfo $ "runNtfSMPWorker, nextSub_ " <> tshow nextSub_
|
||||
case nextSub_ of
|
||||
Nothing -> noWorkToDo
|
||||
Just a@(NtfSubscription {connId}, _, _) -> do
|
||||
ri <- asks $ reconnectInterval . config
|
||||
withRetryInterval ri $ \loop ->
|
||||
processAction a
|
||||
`catchError` ( \e -> do
|
||||
logInfo $ "runNtfSMPWorker, error " <> tshow e
|
||||
case e of
|
||||
BROKER NETWORK -> loop
|
||||
BROKER TIMEOUT -> loop
|
||||
_ -> ntfInternalError c connId (show e)
|
||||
)
|
||||
throttle <- asks $ ntfWorkerThrottle . config
|
||||
liftIO $ threadDelay throttle
|
||||
runNtfSMPWorker c srv doWork = do
|
||||
delay <- asks $ ntfSMPWorkerDelay . config
|
||||
forever $ do
|
||||
void . atomically $ readTMVar doWork
|
||||
agentOperationBracket c AONtfNetwork runNtfSMPOperation
|
||||
threadDelay delay
|
||||
where
|
||||
runNtfSMPOperation = do
|
||||
nextSub_ <- withStore' c (`getNextNtfSubSMPAction` srv)
|
||||
logInfo $ "runNtfSMPWorker, nextSub_ " <> tshow nextSub_
|
||||
case nextSub_ of
|
||||
Nothing -> noWorkToDo
|
||||
Just a@(NtfSubscription {connId}, _, _) -> do
|
||||
ri <- asks $ reconnectInterval . config
|
||||
withRetryInterval ri $ \loop ->
|
||||
processAction a
|
||||
`catchError` retryOnError c "NtfSMPWorker" loop (ntfInternalError c connId . show)
|
||||
noWorkToDo = void . atomically $ tryTakeTMVar doWork
|
||||
processAction :: (NtfSubscription, NtfSubSMPAction, NtfActionTs) -> m ()
|
||||
processAction (sub@NtfSubscription {connId, ntfServer}, smpAction, actionTs) = do
|
||||
|
@ -294,6 +293,19 @@ fromPico (MkFixed i) = i
|
|||
diffInMicros :: UTCTime -> UTCTime -> Int
|
||||
diffInMicros a b = (`div` 1000000) . fromInteger . fromPico . nominalDiffTimeToSeconds $ diffUTCTime a b
|
||||
|
||||
retryOnError :: AgentMonad m => AgentClient -> Text -> m () -> (AgentErrorType -> m ()) -> AgentErrorType -> m ()
|
||||
retryOnError c name loop done e = do
|
||||
logError $ name <> " error: " <> tshow e
|
||||
case e of
|
||||
BROKER NETWORK -> retryLoop
|
||||
BROKER TIMEOUT -> retryLoop
|
||||
_ -> done e
|
||||
where
|
||||
retryLoop = do
|
||||
atomically $ endAgentOperation c AONtfNetwork
|
||||
atomically $ beginAgentOperation c AONtfNetwork
|
||||
loop
|
||||
|
||||
ntfInternalError :: AgentMonad m => AgentClient -> ConnId -> String -> m ()
|
||||
ntfInternalError c@AgentClient {subQ} connId internalErrStr = do
|
||||
withStore' c $ \db -> setNullNtfSubscriptionAction db connId
|
||||
|
|
|
@ -183,7 +183,8 @@ agentCfg =
|
|||
defaultTransport = (ntfTestPort, transport @TLS)
|
||||
},
|
||||
reconnectInterval = defaultReconnectInterval {initialInterval = 50_000},
|
||||
ntfWorkerThrottle = 1000,
|
||||
ntfWorkerDelay = 1000,
|
||||
ntfSMPWorkerDelay = 1000,
|
||||
caCertificateFile = "tests/fixtures/ca.crt",
|
||||
privateKeyFile = "tests/fixtures/server.key",
|
||||
certificateFile = "tests/fixtures/server.crt"
|
||||
|
|
Reference in New Issue