allow passing all network configuration to the agent (#488)

This commit is contained in:
Evgeny Poberezkin 2022-08-02 13:30:00 +01:00 committed by GitHub
parent b8c23ea8d5
commit e9db0a1162
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 76 additions and 55 deletions

View File

@ -1,4 +1,5 @@
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TypeApplications #-}
@ -8,6 +9,7 @@ import Control.Logger.Simple
import qualified Data.List.NonEmpty as L
import Simplex.Messaging.Agent.Env.SQLite
import Simplex.Messaging.Agent.Server (runSMPAgent)
import Simplex.Messaging.Client (defaultNetworkConfig)
import Simplex.Messaging.Transport (TLS, Transport (..))
cfg :: AgentConfig
@ -18,7 +20,7 @@ servers =
InitialAgentServers
{ smp = L.fromList ["smp://bU0K-bRg24xWW__lS0umO1Zdw_SXqpJNtm1_RrPLViE=@localhost:5223"],
ntf = [],
netCfg = NetworkConfig {socksProxy = Nothing, tcpTimeout = 5000000}
netCfg = defaultNetworkConfig
}
logCfg :: LogConfig

View File

@ -214,8 +214,8 @@ setNetworkConfig c cfg' = do
cfg <- atomically $ do
swapTVar (useNetworkConfig c) cfg'
liftIO . when (socksProxy cfg /= socksProxy cfg') $ do
closeProtocolServerClients c smpCfg smpClients
closeProtocolServerClients c ntfCfg ntfClients
closeProtocolServerClients c smpClients
closeProtocolServerClients c ntfClients
getNetworkConfig :: AgentErrorMonad m => AgentClient -> m NetworkConfig
getNetworkConfig = readTVarIO . useNetworkConfig

View File

@ -223,7 +223,7 @@ getSMPServerClient c@AgentClient {active, smpClients, msgQ} srv = do
atomically (getClientVar srv smpClients)
>>= either
(newProtocolClient c srv smpClients connectClient reconnectClient)
(waitForProtocolClient smpCfg)
(waitForProtocolClient c)
where
connectClient :: m SMPClient
connectClient = do
@ -295,7 +295,7 @@ getNtfServerClient c@AgentClient {active, ntfClients} srv = do
atomically (getClientVar srv ntfClients)
>>= either
(newProtocolClient c srv ntfClients connectClient $ pure ())
(waitForProtocolClient ntfCfg)
(waitForProtocolClient c)
where
connectClient :: m NtfClient
connectClient = do
@ -316,10 +316,10 @@ getClientVar srv clients = maybe (Left <$> newClientVar) (pure . Right) =<< TM.l
TM.insert srv var clients
pure var
waitForProtocolClient :: AgentMonad m => (AgentConfig -> ProtocolClientConfig) -> ClientVar msg -> m (ProtocolClient msg)
waitForProtocolClient clientConfig clientVar = do
ProtocolClientConfig {tcpTimeout} <- asks $ clientConfig . config
client_ <- liftIO $ tcpTimeout `timeout` atomically (readTMVar clientVar)
waitForProtocolClient :: AgentMonad m => AgentClient -> ClientVar msg -> m (ProtocolClient msg)
waitForProtocolClient c clientVar = do
NetworkConfig {tcpConnectTimeout} <- readTVarIO $ useNetworkConfig c
client_ <- liftIO $ tcpConnectTimeout `timeout` atomically (readTMVar clientVar)
liftEither $ case client_ of
Just (Right smpClient) -> Right smpClient
Just (Left e) -> Left e
@ -362,14 +362,14 @@ newProtocolClient c srv clients connectClient reconnectClient clientVar = tryCon
updateClientConfig :: AgentClient -> ProtocolClientConfig -> STM ProtocolClientConfig
updateClientConfig AgentClient {useNetworkConfig} cfg = do
NetworkConfig {socksProxy, tcpTimeout} <- readTVar useNetworkConfig
pure (cfg :: ProtocolClientConfig) {socksProxy, tcpTimeout}
networkConfig <- readTVar useNetworkConfig
pure cfg {networkConfig}
closeAgentClient :: MonadIO m => AgentClient -> m ()
closeAgentClient c = liftIO $ do
atomically $ writeTVar (active c) False
closeProtocolServerClients c smpCfg smpClients
closeProtocolServerClients c ntfCfg ntfClients
closeProtocolServerClients c smpClients
closeProtocolServerClients c ntfClients
cancelActions $ reconnections c
cancelActions $ asyncClients c
cancelActions $ smpQueueMsgDeliveries c
@ -383,14 +383,14 @@ closeAgentClient c = liftIO $ do
clear :: (AgentClient -> TMap k a) -> IO ()
clear sel = atomically $ writeTVar (sel c) M.empty
closeProtocolServerClients :: AgentClient -> (AgentConfig -> ProtocolClientConfig) -> (AgentClient -> TMap (ProtoServer msg) (ClientVar msg)) -> IO ()
closeProtocolServerClients c cfgSel clientsSel =
closeProtocolServerClients :: AgentClient -> (AgentClient -> TMap (ProtoServer msg) (ClientVar msg)) -> IO ()
closeProtocolServerClients c clientsSel =
readTVarIO cs >>= mapM_ (forkIO . closeClient) >> atomically (writeTVar cs M.empty)
where
cs = clientsSel c
ProtocolClientConfig {tcpTimeout} = cfgSel . config $ agentEnv c
closeClient cVar =
tcpTimeout `timeout` atomically (readTMVar cVar) >>= \case
closeClient cVar = do
NetworkConfig {tcpConnectTimeout} <- readTVarIO $ useNetworkConfig c
tcpConnectTimeout `timeout` atomically (readTMVar cVar) >>= \case
Just (Right client) -> closeProtocolClient client `catchAll_` pure ()
_ -> pure ()

View File

@ -1,7 +1,5 @@
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
@ -29,12 +27,9 @@ import Control.Monad.Except
import Control.Monad.IO.Unlift
import Control.Monad.Reader
import Crypto.Random
import Data.Aeson (FromJSON (..), ToJSON (..))
import qualified Data.Aeson as J
import Data.List.NonEmpty (NonEmpty)
import Data.Time.Clock (NominalDiffTime, nominalDay)
import Data.Word (Word16)
import GHC.Generics (Generic)
import Network.Socket
import Numeric.Natural
import Simplex.Messaging.Agent.Protocol
@ -49,7 +44,7 @@ import Simplex.Messaging.Protocol (NtfServer)
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Transport (TLS, Transport (..))
import Simplex.Messaging.Transport.Client (SocksProxy, defaultSMPPort)
import Simplex.Messaging.Transport.Client (defaultSMPPort)
import Simplex.Messaging.Version
import System.Random (StdGen, newStdGen)
import UnliftIO (Async)
@ -64,16 +59,6 @@ data InitialAgentServers = InitialAgentServers
netCfg :: NetworkConfig
}
data NetworkConfig = NetworkConfig
{ socksProxy :: Maybe SocksProxy,
tcpTimeout :: Int
}
deriving (Show, Generic, FromJSON)
instance ToJSON NetworkConfig where
toJSON = J.genericToJSON J.defaultOptions {J.omitNothingFields = True}
toEncoding = J.genericToEncoding J.defaultOptions {J.omitNothingFields = True}
data AgentConfig = AgentConfig
{ tcpPort :: ServiceName,
cmdSignAlg :: C.SignAlg,

View File

@ -1,5 +1,6 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
@ -50,7 +51,9 @@ module Simplex.Messaging.Client
-- * Supporting types and client configuration
ProtocolClientError (..),
ProtocolClientConfig (..),
NetworkConfig (..),
defaultClientConfig,
defaultNetworkConfig,
ServerTransmission,
)
where
@ -62,6 +65,8 @@ import Control.Exception
import Control.Monad
import Control.Monad.IO.Class (liftIO)
import Control.Monad.Trans.Except
import Data.Aeson (FromJSON (..), ToJSON (..))
import qualified Data.Aeson as J
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import Data.Either (rights)
@ -69,6 +74,7 @@ import Data.Functor (($>))
import Data.List.NonEmpty (NonEmpty)
import qualified Data.List.NonEmpty as L
import Data.Maybe (fromMaybe)
import GHC.Generics (Generic)
import Network.Socket (ServiceName)
import Numeric.Natural
import qualified Simplex.Messaging.Crypto as C
@ -108,20 +114,43 @@ type ClientCommand msg = (Maybe C.APrivateSignKey, QueueId, ProtoCommand msg)
-- | Type synonym for transmission from some SPM server queue.
type ServerTransmission msg = (ProtoServer msg, Version, SessionId, QueueId, msg)
-- | network configuration for the client
data NetworkConfig = NetworkConfig
{ -- | use SOCKS5 proxy
socksProxy :: Maybe SocksProxy,
-- | timeout for the initial client TCP/TLS connection (microseconds)
tcpConnectTimeout :: Int,
-- | timeout of protocol commands (microseconds)
tcpTimeout :: Int,
-- | TCP keep-alive options, Nothing to skip enabling keep-alive
tcpKeepAlive :: Maybe KeepAliveOpts,
-- | period for SMP ping commands (microseconds)
smpPingInterval :: Int
}
deriving (Show, Generic, FromJSON)
instance ToJSON NetworkConfig where
toJSON = J.genericToJSON J.defaultOptions {J.omitNothingFields = True}
toEncoding = J.genericToEncoding J.defaultOptions {J.omitNothingFields = True}
defaultNetworkConfig :: NetworkConfig
defaultNetworkConfig =
NetworkConfig
{ socksProxy = Nothing,
tcpConnectTimeout = 7_500_000,
tcpTimeout = 5_000_000,
tcpKeepAlive = Just defaultKeepAliveOpts,
smpPingInterval = 600_000_000 -- 10min
}
-- | protocol client configuration.
data ProtocolClientConfig = ProtocolClientConfig
{ -- | size of TBQueue to use for server commands and responses
qSize :: Natural,
-- | default server port if port is not specified in ProtocolServer
defaultTransport :: (ServiceName, ATransport),
-- | timeout of TCP commands (microseconds)
tcpTimeout :: Int,
-- | TCP keep-alive options, Nothing to skip enabling keep-alive
tcpKeepAlive :: Maybe KeepAliveOpts,
-- | use SOCKS5 proxy
socksProxy :: Maybe SocksProxy,
-- | period for SMP ping commands (microseconds)
smpPing :: Int,
-- | network configuration
networkConfig :: NetworkConfig,
-- | SMP client-server protocol version range
smpServerVRange :: VersionRange
}
@ -132,10 +161,7 @@ defaultClientConfig =
ProtocolClientConfig
{ qSize = 64,
defaultTransport = ("443", transport @TLS),
tcpTimeout = 5_000_000,
tcpKeepAlive = Just defaultKeepAliveOpts,
socksProxy = Nothing,
smpPing = 600_000_000, -- 10min
networkConfig = defaultNetworkConfig,
smpServerVRange = supportedSMPServerVRange
}
@ -152,10 +178,11 @@ type Response msg = Either ProtocolClientError msg
-- A single queue can be used for multiple 'SMPClient' instances,
-- as 'SMPServerTransmission' includes server information.
getProtocolClient :: forall msg. Protocol msg => ProtoServer msg -> ProtocolClientConfig -> Maybe (TBQueue (ServerTransmission msg)) -> IO () -> IO (Either ProtocolClientError (ProtocolClient msg))
getProtocolClient protocolServer cfg@ProtocolClientConfig {qSize, tcpTimeout, tcpKeepAlive, socksProxy, smpPing, smpServerVRange} msgQ disconnected =
getProtocolClient protocolServer cfg@ProtocolClientConfig {qSize, networkConfig, smpServerVRange} msgQ disconnected =
(atomically mkProtocolClient >>= runClient useTransport)
`catch` \(e :: IOException) -> pure . Left $ PCEIOError e
where
NetworkConfig {tcpConnectTimeout, tcpTimeout, tcpKeepAlive, socksProxy, smpPingInterval} = networkConfig
mkProtocolClient :: STM (ProtocolClient msg)
mkProtocolClient = do
connected <- newTVar False
@ -185,7 +212,7 @@ getProtocolClient protocolServer cfg@ProtocolClientConfig {qSize, tcpTimeout, tc
async $
runTransportClient socksProxy (host protocolServer) port' (Just $ keyHash protocolServer) tcpKeepAlive (client t c thVar)
`finally` atomically (putTMVar thVar $ Left PCENetworkError)
th_ <- tcpTimeout `timeout` atomically (takeTMVar thVar)
th_ <- tcpConnectTimeout `timeout` atomically (takeTMVar thVar)
pure $ case th_ of
Just (Right THandle {sessionId, thVersion}) -> Right c {action, sessionId, thVersion}
Just (Left e) -> Left e
@ -218,7 +245,7 @@ getProtocolClient protocolServer cfg@ProtocolClientConfig {qSize, tcpTimeout, tc
ping :: ProtocolClient msg -> IO ()
ping c = forever $ do
threadDelay smpPing
threadDelay smpPingInterval
runExceptT $ sendProtocolCommand c Nothing "" protocolPing
process :: ProtocolClient msg -> IO ()

View File

@ -125,8 +125,8 @@ getSMPServerClient' ca@SMPClientAgent {agentCfg, smpClients, msgQ} srv =
waitForSMPClient :: SMPClientVar -> ExceptT ProtocolClientError IO SMPClient
waitForSMPClient smpVar = do
let ProtocolClientConfig {tcpTimeout} = smpCfg agentCfg
smpClient_ <- liftIO $ tcpTimeout `timeout` atomically (readTMVar smpVar)
let ProtocolClientConfig {networkConfig = NetworkConfig {tcpConnectTimeout}} = smpCfg agentCfg
smpClient_ <- liftIO $ tcpConnectTimeout `timeout` atomically (readTMVar smpVar)
liftEither $ case smpClient_ of
Just (Right smpClient) -> Right smpClient
Just (Left e) -> Left e

View File

@ -1,10 +1,15 @@
{-# LANGUAGE CApiFFI #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE NamedFieldPuns #-}
module Simplex.Messaging.Transport.KeepAlive where
import Data.Aeson (FromJSON (..), ToJSON (..))
import qualified Data.Aeson as J
import Foreign.C (CInt (..))
import GHC.Generics (Generic)
import Network.Socket
data KeepAliveOpts = KeepAliveOpts
@ -12,7 +17,9 @@ data KeepAliveOpts = KeepAliveOpts
keepIntvl :: Int,
keepCnt :: Int
}
deriving (Show)
deriving (Show, Generic, FromJSON)
instance ToJSON KeepAliveOpts where toEncoding = J.genericToEncoding J.defaultOptions
defaultKeepAliveOpts :: KeepAliveOpts
defaultKeepAliveOpts =

View File

@ -25,7 +25,7 @@ import Simplex.Messaging.Agent.Env.SQLite
import Simplex.Messaging.Agent.Protocol
import Simplex.Messaging.Agent.RetryInterval
import Simplex.Messaging.Agent.Server (runSMPAgentBlocking)
import Simplex.Messaging.Client (ProtocolClientConfig (..), defaultClientConfig)
import Simplex.Messaging.Client (ProtocolClientConfig (..), defaultClientConfig, defaultNetworkConfig)
import Simplex.Messaging.Transport
import Simplex.Messaging.Transport.Client
import Simplex.Messaging.Transport.KeepAlive
@ -166,7 +166,7 @@ initAgentServers =
InitialAgentServers
{ smp = L.fromList [testSMPServer],
ntf = ["ntf://LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI=@localhost:6001"],
netCfg = NetworkConfig {socksProxy = Nothing, tcpTimeout = 5000000}
netCfg = defaultNetworkConfig {tcpTimeout = 500_000}
}
initAgentServers2 :: InitialAgentServers
@ -182,7 +182,7 @@ agentCfg =
defaultClientConfig
{ qSize = 1,
defaultTransport = (testPort, transport @TLS),
tcpTimeout = 500_000
networkConfig = defaultNetworkConfig {tcpTimeout = 500_000}
},
ntfCfg =
defaultClientConfig