use batch commands to resubscribe agent on server re-connection (#473)
This commit is contained in:
parent
a6f401041a
commit
6bbe1dfc66
|
@ -85,7 +85,7 @@ import Data.List.NonEmpty (NonEmpty)
|
|||
import qualified Data.List.NonEmpty as L
|
||||
import Data.Map.Strict (Map)
|
||||
import qualified Data.Map.Strict as M
|
||||
import Data.Maybe (catMaybes)
|
||||
import Data.Maybe (listToMaybe)
|
||||
import Data.Set (Set)
|
||||
import Data.Text.Encoding
|
||||
import Data.Tuple (swap)
|
||||
|
@ -111,7 +111,7 @@ import qualified Simplex.Messaging.TMap as TM
|
|||
import Simplex.Messaging.Util
|
||||
import Simplex.Messaging.Version
|
||||
import System.Timeout (timeout)
|
||||
import UnliftIO (async, pooledForConcurrentlyN)
|
||||
import UnliftIO (async)
|
||||
import qualified UnliftIO.Exception as E
|
||||
import UnliftIO.STM
|
||||
|
||||
|
@ -270,34 +270,18 @@ getSMPServerClient c@AgentClient {active, smpClients, msgQ} srv = do
|
|||
reconnectClient `catchError` const loop
|
||||
|
||||
reconnectClient :: m ()
|
||||
reconnectClient = do
|
||||
n <- asks $ resubscriptionConcurrency . config
|
||||
withAgentLock c . withClient c srv $ \smp -> do
|
||||
cs <- atomically $ mapM readTVar =<< TM.lookup srv (pendingSubscrSrvrs c)
|
||||
-- TODO if any of the subscriptions fails here (e.g. because of timeout), it terminates the whole process for all subscriptions
|
||||
-- instead it should only report successful subscriptions and schedule the next call to reconnectClient to subscribe for the remaining subscriptions
|
||||
-- this way, for each DOWN event there can be several UP events
|
||||
conns <- pooledForConcurrentlyN n (maybe [] M.toList cs) $ \sub@(connId, _) ->
|
||||
ifM
|
||||
(atomically $ hasActiveSubscription c connId)
|
||||
(pure $ Just connId)
|
||||
(subscribe_ smp sub `catchError` handleError connId)
|
||||
liftIO . unless (null conns) . notifySub "" . UP srv $ catMaybes conns
|
||||
reconnectClient =
|
||||
withAgentLock c $
|
||||
atomically (TM.lookup srv (pendingSubscrSrvrs c) >>= mapM readTVar)
|
||||
>>= mapM_ resubscribe
|
||||
where
|
||||
subscribe_ :: SMPClient -> (ConnId, RcvQueue) -> ExceptT ProtocolClientError IO (Maybe ConnId)
|
||||
subscribe_ smp (connId, rq@RcvQueue {rcvPrivateKey, rcvId}) = do
|
||||
subscribeSMPQueue smp rcvPrivateKey rcvId
|
||||
addSubscription c rq connId
|
||||
pure $ Just connId
|
||||
|
||||
handleError :: ConnId -> ProtocolClientError -> ExceptT ProtocolClientError IO (Maybe ConnId)
|
||||
handleError connId = \case
|
||||
e@PCEResponseTimeout -> throwError e
|
||||
e@PCENetworkError -> throwError e
|
||||
e -> do
|
||||
liftIO . notifySub connId . ERR $ protocolClientError SMP e
|
||||
atomically $ removePendingSubscription c srv connId
|
||||
pure Nothing
|
||||
resubscribe :: Map ConnId RcvQueue -> m ()
|
||||
resubscribe qs = do
|
||||
(errs, oks) <- M.mapEither id <$> subscribeQueues c srv qs
|
||||
liftIO . unless (M.null oks) . notifySub "" . UP srv $ M.keys oks
|
||||
let (tempErrs, finalErrs) = M.partition temporaryAgentError errs
|
||||
liftIO . mapM_ (\(connId, e) -> notifySub connId $ ERR e) $ M.assocs finalErrs
|
||||
mapM_ throwError . listToMaybe $ M.elems tempErrs
|
||||
|
||||
notifySub :: ConnId -> ACommand 'Agent -> IO ()
|
||||
notifySub connId cmd = atomically $ writeTBQueue (subQ c) ("", connId, cmd)
|
||||
|
@ -489,11 +473,23 @@ processSubResult :: AgentClient -> RcvQueue -> ConnId -> Either ProtocolClientEr
|
|||
processSubResult c rq@RcvQueue {server} connId r = do
|
||||
case r of
|
||||
Left e ->
|
||||
atomically . when (e /= PCENetworkError && e /= PCEResponseTimeout) $
|
||||
atomically . unless (temporaryClientError e) $
|
||||
removePendingSubscription c server connId
|
||||
_ -> addSubscription c rq connId
|
||||
pure r
|
||||
|
||||
temporaryClientError :: ProtocolClientError -> Bool
|
||||
temporaryClientError = \case
|
||||
PCENetworkError -> True
|
||||
PCEResponseTimeout -> True
|
||||
_ -> False
|
||||
|
||||
temporaryAgentError :: AgentErrorType -> Bool
|
||||
temporaryAgentError = \case
|
||||
BROKER NETWORK -> True
|
||||
BROKER TIMEOUT -> True
|
||||
_ -> False
|
||||
|
||||
-- | subscribe multiple queues - all passed queues should be on the same server
|
||||
subscribeQueues :: AgentMonad m => AgentClient -> SMPServer -> Map ConnId RcvQueue -> m (Map ConnId (Either AgentErrorType ()))
|
||||
subscribeQueues c srv qs = do
|
||||
|
|
|
@ -67,7 +67,6 @@ data AgentConfig = AgentConfig
|
|||
ntfCfg :: ProtocolClientConfig,
|
||||
reconnectInterval :: RetryInterval,
|
||||
helloTimeout :: NominalDiffTime,
|
||||
resubscriptionConcurrency :: Int,
|
||||
ntfCron :: Word16,
|
||||
ntfWorkerDelay :: Int,
|
||||
ntfSMPWorkerDelay :: Int,
|
||||
|
@ -103,7 +102,6 @@ defaultAgentConfig =
|
|||
ntfCfg = defaultClientConfig {defaultTransport = ("443", transport @TLS)},
|
||||
reconnectInterval = defaultReconnectInterval,
|
||||
helloTimeout = 2 * nominalDay,
|
||||
resubscriptionConcurrency = 16,
|
||||
ntfCron = 20, -- minutes
|
||||
ntfWorkerDelay = 100000, -- microseconds
|
||||
ntfSMPWorkerDelay = 500000, -- microseconds
|
||||
|
|
|
@ -80,6 +80,7 @@ cfg =
|
|||
inactiveClientExpiration = Just defaultInactiveClientExpiration,
|
||||
logStatsInterval = Nothing,
|
||||
logStatsStartTime = 0,
|
||||
serverStatsLogFile = "tests/smp-server-stats.daily.log",
|
||||
serverStatsBackupFile = Nothing,
|
||||
caCertificateFile = "tests/fixtures/ca.crt",
|
||||
privateKeyFile = "tests/fixtures/server.key",
|
||||
|
|
Reference in New Issue