make smp servers configurable for running agent (#326)

This commit is contained in:
John Roberts 2022-03-10 10:49:22 +04:00 committed by GitHub
parent 7a19ab224b
commit 5c6ec96d64
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 29 additions and 11 deletions

View File

@ -11,7 +11,7 @@ import Simplex.Messaging.Agent.Server (runSMPAgent)
import Simplex.Messaging.Transport (TLS, Transport (..))
cfg :: AgentConfig
cfg = defaultAgentConfig {smpServers = L.fromList ["smp://bU0K-bRg24xWW__lS0umO1Zdw_SXqpJNtm1_RrPLViE=@localhost:5223"]}
cfg = defaultAgentConfig {initialSMPServers = L.fromList ["smp://bU0K-bRg24xWW__lS0umO1Zdw_SXqpJNtm1_RrPLViE=@localhost:5223"]}
logCfg :: LogConfig
logCfg = LogConfig {lc_file = Nothing, lc_stderr = True}

View File

@ -47,6 +47,7 @@ module Simplex.Messaging.Agent
ackMessage,
suspendConnection,
deleteConnection,
setSMPServers,
logConnection,
)
where
@ -143,6 +144,10 @@ suspendConnection c = withAgentEnv c . suspendConnection' c
deleteConnection :: AgentErrorMonad m => AgentClient -> ConnId -> m ()
deleteConnection c = withAgentEnv c . deleteConnection' c
-- | Change servers to be used for creating new queues
setSMPServers :: AgentErrorMonad m => AgentClient -> NonEmpty SMPServer -> m ()
setSMPServers c = withAgentEnv c . setSMPServers' c
withAgentEnv :: AgentClient -> ReaderT Env m a -> m a
withAgentEnv c = (`runReaderT` agentEnv c)
@ -209,7 +214,7 @@ processCommand c (connId, cmd) = case cmd of
newConn :: AgentMonad m => AgentClient -> ConnId -> SConnectionMode c -> m (ConnId, ConnectionRequestUri c)
newConn c connId cMode = do
srv <- getSMPServer
srv <- getSMPServer c
(rq, qUri) <- newRcvQueue c srv
g <- asks idsDrg
let cData = ConnData {connId}
@ -262,7 +267,7 @@ joinConn c connId (CRContactUri (ConnReqUriData _ agentVRange (qUri :| _))) cInf
createReplyQueue :: AgentMonad m => AgentClient -> ConnId -> SndQueue -> m ()
createReplyQueue c connId sq = do
srv <- getSMPServer
srv <- getSMPServer c
(rq, qUri) <- newRcvQueue c srv
-- TODO reply queue version should be the same as send queue, ignoring it in v1
let qInfo = toVersionT qUri SMP.smpClientVersion
@ -488,9 +493,15 @@ deleteConnection' c connId =
removeSubscription c connId
withStore (`deleteConn` connId)
getSMPServer :: AgentMonad m => m SMPServer
getSMPServer =
asks (smpServers . config) >>= \case
-- | Change servers to be used for creating new queues, in Reader monad
setSMPServers' :: forall m. AgentMonad m => AgentClient -> NonEmpty SMPServer -> m ()
setSMPServers' c servers = do
atomically $ writeTVar (smpServers c) servers
getSMPServer :: AgentMonad m => AgentClient -> m SMPServer
getSMPServer c = do
smpServers <- readTVarIO $ smpServers c
case smpServers of
srv :| [] -> pure srv
servers -> do
gen <- asks randomServer

View File

@ -47,6 +47,7 @@ import Data.Bifunctor (first)
import Data.ByteString.Base64
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import Data.List.NonEmpty (NonEmpty)
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as M
import Data.Maybe (isNothing)
@ -76,6 +77,7 @@ data AgentClient = AgentClient
{ rcvQ :: TBQueue (ATransmission 'Client),
subQ :: TBQueue (ATransmission 'Agent),
msgQ :: TBQueue SMPServerTransmission,
smpServers :: TVar (NonEmpty SMPServer),
smpClients :: TVar (Map SMPServer SMPClientVar),
subscrSrvrs :: TVar (Map SMPServer (Map ConnId RcvQueue)),
pendingSubscrSrvrs :: TVar (Map SMPServer (Map ConnId RcvQueue)),
@ -97,6 +99,7 @@ newAgentClient agentEnv = do
rcvQ <- newTBQueue qSize
subQ <- newTBQueue qSize
msgQ <- newTBQueue qSize
smpServers <- newTVar $ initialSMPServers (config agentEnv)
smpClients <- newTVar M.empty
subscrSrvrs <- newTVar M.empty
pendingSubscrSrvrs <- newTVar M.empty
@ -108,7 +111,7 @@ newAgentClient agentEnv = do
asyncClients <- newTVar []
clientId <- stateTVar (clientCounter agentEnv) $ \i -> (i + 1, i + 1)
lock <- newTMVar ()
return AgentClient {rcvQ, subQ, msgQ, smpClients, subscrSrvrs, pendingSubscrSrvrs, subscrConns, connMsgsQueued, smpQueueMsgQueues, smpQueueMsgDeliveries, reconnections, asyncClients, clientId, agentEnv, smpSubscriber = undefined, lock}
return AgentClient {rcvQ, subQ, msgQ, smpServers, smpClients, subscrSrvrs, pendingSubscrSrvrs, subscrConns, connMsgsQueued, smpQueueMsgQueues, smpQueueMsgDeliveries, reconnections, asyncClients, clientId, agentEnv, smpSubscriber = undefined, lock}
-- | Agent monad with MonadReader Env and MonadError AgentErrorType
type AgentMonad m = (MonadUnliftIO m, MonadReader Env m, MonadError AgentErrorType m)

View File

@ -29,7 +29,7 @@ import UnliftIO.STM
data AgentConfig = AgentConfig
{ tcpPort :: ServiceName,
smpServers :: NonEmpty SMPServer,
initialSMPServers :: NonEmpty SMPServer,
cmdSignAlg :: C.SignAlg,
connIdBytes :: Int,
tbqSize :: Natural,
@ -48,7 +48,7 @@ defaultAgentConfig :: AgentConfig
defaultAgentConfig =
AgentConfig
{ tcpPort = "5224",
smpServers = undefined, -- TODO move it elsewhere?
initialSMPServers = undefined, -- TODO move it elsewhere?
cmdSignAlg = C.SignAlg C.SEd448,
connIdBytes = 12,
tbqSize = 64,

View File

@ -399,6 +399,10 @@ instance StrEncoding SMPServer where
SrvLoc host port <- strP
pure SMPServer {host, port, keyHash}
instance ToJSON SMPServer where
toJSON = strToJSON
toEncoding = strToJEncoding
data SrvLoc = SrvLoc HostName ServiceName
deriving (Eq, Ord, Show)

View File

@ -157,7 +157,7 @@ cfg :: AgentConfig
cfg =
defaultAgentConfig
{ tcpPort = agentTestPort,
smpServers = L.fromList ["smp://LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI=@localhost:5001"],
initialSMPServers = L.fromList ["smp://LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI=@localhost:5001"],
tbqSize = 1,
dbFile = testDB,
smpCfg =
@ -174,7 +174,7 @@ cfg =
withSmpAgentThreadOn_ :: (MonadUnliftIO m, MonadRandom m) => ATransport -> (ServiceName, ServiceName, String) -> m () -> (ThreadId -> m a) -> m a
withSmpAgentThreadOn_ t (port', smpPort', db') afterProcess =
let cfg' = cfg {tcpPort = port', dbFile = db', smpServers = L.fromList [SMPServer "localhost" smpPort' testKeyHash]}
let cfg' = cfg {tcpPort = port', dbFile = db', initialSMPServers = L.fromList [SMPServer "localhost" smpPort' testKeyHash]}
in serverBracket
(\started -> runSMPAgentBlocking t started cfg')
afterProcess