agent functions to get/set network configuration (#484)
* agent functions to get/set network configuration * fix condition
This commit is contained in:
parent
d788c3ca95
commit
fcaddb7848
|
@ -18,8 +18,7 @@ servers =
|
|||
InitialAgentServers
|
||||
{ smp = L.fromList ["smp://bU0K-bRg24xWW__lS0umO1Zdw_SXqpJNtm1_RrPLViE=@localhost:5223"],
|
||||
ntf = [],
|
||||
socksProxy = Nothing,
|
||||
tcpTimeout = 5000000
|
||||
netCfg = NetworkConfig {socksProxy = Nothing, tcpTimeout = 5000000}
|
||||
}
|
||||
|
||||
logCfg :: LogConfig
|
||||
|
|
|
@ -57,6 +57,8 @@ module Simplex.Messaging.Agent
|
|||
getConnectionServers,
|
||||
setSMPServers,
|
||||
setNtfServers,
|
||||
setNetworkConfig,
|
||||
getNetworkConfig,
|
||||
registerNtfToken,
|
||||
verifyNtfToken,
|
||||
checkNtfToken,
|
||||
|
@ -206,6 +208,18 @@ setSMPServers c = withAgentEnv c . setSMPServers' c
|
|||
setNtfServers :: AgentErrorMonad m => AgentClient -> [NtfServer] -> m ()
|
||||
setNtfServers c = withAgentEnv c . setNtfServers' c
|
||||
|
||||
-- | set SOCKS5 proxy on/off and optionally set TCP timeout
|
||||
setNetworkConfig :: AgentErrorMonad m => AgentClient -> NetworkConfig -> m ()
|
||||
setNetworkConfig c cfg' = do
|
||||
cfg <- atomically $ do
|
||||
swapTVar (useNetworkConfig c) cfg'
|
||||
liftIO . when (socksProxy cfg /= socksProxy cfg') $ do
|
||||
closeProtocolServerClients c smpCfg smpClients
|
||||
closeProtocolServerClients c ntfCfg ntfClients
|
||||
|
||||
getNetworkConfig :: AgentErrorMonad m => AgentClient -> m NetworkConfig
|
||||
getNetworkConfig = readTVarIO . useNetworkConfig
|
||||
|
||||
-- | Register device notifications token
|
||||
registerNtfToken :: AgentErrorMonad m => AgentClient -> DeviceToken -> NotificationsMode -> m NtfTknStatus
|
||||
registerNtfToken c = withAgentEnv c .: registerNtfToken' c
|
||||
|
|
|
@ -19,6 +19,7 @@ module Simplex.Messaging.Agent.Client
|
|||
newAgentClient,
|
||||
withAgentLock,
|
||||
closeAgentClient,
|
||||
closeProtocolServerClients,
|
||||
newRcvQueue,
|
||||
subscribeQueue,
|
||||
subscribeQueues,
|
||||
|
@ -91,7 +92,6 @@ import Data.Text.Encoding
|
|||
import Data.Tuple (swap)
|
||||
import Data.Word (Word16)
|
||||
import qualified Database.SQLite.Simple as DB
|
||||
import Network.Socks5 (SocksConf)
|
||||
import Simplex.Messaging.Agent.Env.SQLite
|
||||
import Simplex.Messaging.Agent.Protocol
|
||||
import Simplex.Messaging.Agent.RetryInterval
|
||||
|
@ -131,8 +131,7 @@ data AgentClient = AgentClient
|
|||
smpClients :: TMap SMPServer SMPClientVar,
|
||||
ntfServers :: TVar [NtfServer],
|
||||
ntfClients :: TMap NtfServer NtfClientVar,
|
||||
useSocksProxy :: TVar (Maybe SocksConf),
|
||||
useTcpTimeout :: TVar (Int),
|
||||
useNetworkConfig :: TVar NetworkConfig,
|
||||
subscrSrvrs :: TMap SMPServer (TMap ConnId RcvQueue),
|
||||
pendingSubscrSrvrs :: TMap SMPServer (TMap ConnId RcvQueue),
|
||||
subscrConns :: TMap ConnId SMPServer,
|
||||
|
@ -173,7 +172,7 @@ data AgentState = ASActive | ASSuspending | ASSuspended
|
|||
deriving (Eq, Show)
|
||||
|
||||
newAgentClient :: InitialAgentServers -> Env -> STM AgentClient
|
||||
newAgentClient InitialAgentServers {smp, ntf, socksProxy, tcpTimeout} agentEnv = do
|
||||
newAgentClient InitialAgentServers {smp, ntf, netCfg} agentEnv = do
|
||||
let qSize = tbqSize $ config agentEnv
|
||||
active <- newTVar True
|
||||
rcvQ <- newTBQueue qSize
|
||||
|
@ -183,8 +182,7 @@ newAgentClient InitialAgentServers {smp, ntf, socksProxy, tcpTimeout} agentEnv =
|
|||
smpClients <- TM.empty
|
||||
ntfServers <- newTVar ntf
|
||||
ntfClients <- TM.empty
|
||||
useSocksProxy <- newTVar socksProxy
|
||||
useTcpTimeout <- newTVar tcpTimeout
|
||||
useNetworkConfig <- newTVar netCfg
|
||||
subscrSrvrs <- TM.empty
|
||||
pendingSubscrSrvrs <- TM.empty
|
||||
subscrConns <- TM.empty
|
||||
|
@ -202,7 +200,7 @@ newAgentClient InitialAgentServers {smp, ntf, socksProxy, tcpTimeout} agentEnv =
|
|||
asyncClients <- newTVar []
|
||||
clientId <- stateTVar (clientCounter agentEnv) $ \i -> let i' = i + 1 in (i', i')
|
||||
lock <- newTMVar ()
|
||||
return AgentClient {active, rcvQ, subQ, msgQ, smpServers, smpClients, ntfServers, ntfClients, useSocksProxy, useTcpTimeout, subscrSrvrs, pendingSubscrSrvrs, subscrConns, connMsgsQueued, smpQueueMsgQueues, smpQueueMsgDeliveries, ntfNetworkOp, rcvNetworkOp, msgDeliveryOp, sndNetworkOp, databaseOp, agentState, getMsgLocks, reconnections, asyncClients, clientId, agentEnv, lock}
|
||||
return AgentClient {active, rcvQ, subQ, msgQ, smpServers, smpClients, ntfServers, ntfClients, useNetworkConfig, subscrSrvrs, pendingSubscrSrvrs, subscrConns, connMsgsQueued, smpQueueMsgQueues, smpQueueMsgDeliveries, ntfNetworkOp, rcvNetworkOp, msgDeliveryOp, sndNetworkOp, databaseOp, agentState, getMsgLocks, reconnections, asyncClients, clientId, agentEnv, lock}
|
||||
|
||||
agentDbPath :: AgentClient -> FilePath
|
||||
agentDbPath AgentClient {agentEnv = Env {store = SQLiteStore {dbFilePath}}} = dbFilePath
|
||||
|
@ -363,16 +361,15 @@ newProtocolClient c srv clients connectClient reconnectClient clientVar = tryCon
|
|||
withRetryInterval ri $ \loop -> void $ tryConnectClient (const reconnectClient) loop
|
||||
|
||||
updateClientConfig :: AgentClient -> ProtocolClientConfig -> STM ProtocolClientConfig
|
||||
updateClientConfig AgentClient {useSocksProxy, useTcpTimeout} cfg = do
|
||||
socksProxy <- readTVar useSocksProxy
|
||||
tcpTimeout <- readTVar useTcpTimeout
|
||||
updateClientConfig AgentClient {useNetworkConfig} cfg = do
|
||||
NetworkConfig {socksProxy, tcpTimeout} <- readTVar useNetworkConfig
|
||||
pure (cfg :: ProtocolClientConfig) {socksProxy, tcpTimeout}
|
||||
|
||||
closeAgentClient :: MonadIO m => AgentClient -> m ()
|
||||
closeAgentClient c = liftIO $ do
|
||||
atomically $ writeTVar (active c) False
|
||||
closeProtocolServerClients (clientTimeout smpCfg) $ smpClients c
|
||||
closeProtocolServerClients (clientTimeout ntfCfg) $ ntfClients c
|
||||
closeProtocolServerClients c smpCfg smpClients
|
||||
closeProtocolServerClients c ntfCfg ntfClients
|
||||
cancelActions $ reconnections c
|
||||
cancelActions $ asyncClients c
|
||||
cancelActions $ smpQueueMsgDeliveries c
|
||||
|
@ -383,13 +380,15 @@ closeAgentClient c = liftIO $ do
|
|||
clear smpQueueMsgQueues
|
||||
clear getMsgLocks
|
||||
where
|
||||
clientTimeout sel = (tcpTimeout :: ProtocolClientConfig -> Int) . sel . config $ agentEnv c
|
||||
clear :: (AgentClient -> TMap k a) -> IO ()
|
||||
clear sel = atomically $ writeTVar (sel c) M.empty
|
||||
|
||||
closeProtocolServerClients :: Int -> TMap (ProtoServer msg) (ClientVar msg) -> IO ()
|
||||
closeProtocolServerClients tcpTimeout cs = readTVarIO cs >>= mapM_ (forkIO . closeClient) >> atomically (writeTVar cs M.empty)
|
||||
closeProtocolServerClients :: AgentClient -> (AgentConfig -> ProtocolClientConfig) -> (AgentClient -> TMap (ProtoServer msg) (ClientVar msg)) -> IO ()
|
||||
closeProtocolServerClients c cfgSel clientsSel =
|
||||
readTVarIO cs >>= mapM_ (forkIO . closeClient) >> atomically (writeTVar cs M.empty)
|
||||
where
|
||||
cs = clientsSel c
|
||||
ProtocolClientConfig {tcpTimeout} = cfgSel . config $ agentEnv c
|
||||
closeClient cVar =
|
||||
tcpTimeout `timeout` atomically (readTMVar cVar) >>= \case
|
||||
Just (Right client) -> closeProtocolClient client `catchAll_` pure ()
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
{-# LANGUAGE ConstraintKinds #-}
|
||||
{-# LANGUAGE DataKinds #-}
|
||||
{-# LANGUAGE DeriveAnyClass #-}
|
||||
{-# LANGUAGE DeriveGeneric #-}
|
||||
{-# LANGUAGE DuplicateRecordFields #-}
|
||||
{-# LANGUAGE FlexibleContexts #-}
|
||||
{-# LANGUAGE GADTs #-}
|
||||
|
@ -13,6 +15,7 @@ module Simplex.Messaging.Agent.Env.SQLite
|
|||
( AgentMonad,
|
||||
AgentConfig (..),
|
||||
InitialAgentServers (..),
|
||||
NetworkConfig (..),
|
||||
defaultAgentConfig,
|
||||
defaultReconnectInterval,
|
||||
Env (..),
|
||||
|
@ -26,11 +29,13 @@ import Control.Monad.Except
|
|||
import Control.Monad.IO.Unlift
|
||||
import Control.Monad.Reader
|
||||
import Crypto.Random
|
||||
import Data.Aeson (FromJSON (..), ToJSON (..))
|
||||
import qualified Data.Aeson as J
|
||||
import Data.List.NonEmpty (NonEmpty)
|
||||
import Data.Time.Clock (NominalDiffTime, nominalDay)
|
||||
import Data.Word (Word16)
|
||||
import GHC.Generics (Generic)
|
||||
import Network.Socket
|
||||
import Network.Socks5 (SocksConf)
|
||||
import Numeric.Natural
|
||||
import Simplex.Messaging.Agent.Protocol
|
||||
import Simplex.Messaging.Agent.RetryInterval
|
||||
|
@ -44,7 +49,7 @@ import Simplex.Messaging.Protocol (NtfServer)
|
|||
import Simplex.Messaging.TMap (TMap)
|
||||
import qualified Simplex.Messaging.TMap as TM
|
||||
import Simplex.Messaging.Transport (TLS, Transport (..))
|
||||
import Simplex.Messaging.Transport.Client (defaultSMPPort)
|
||||
import Simplex.Messaging.Transport.Client (SocksProxy, defaultSMPPort)
|
||||
import Simplex.Messaging.Version
|
||||
import System.Random (StdGen, newStdGen)
|
||||
import UnliftIO (Async)
|
||||
|
@ -56,9 +61,18 @@ type AgentMonad m = (MonadUnliftIO m, MonadReader Env m, MonadError AgentErrorTy
|
|||
data InitialAgentServers = InitialAgentServers
|
||||
{ smp :: NonEmpty SMPServer,
|
||||
ntf :: [NtfServer],
|
||||
socksProxy :: Maybe SocksConf,
|
||||
netCfg :: NetworkConfig
|
||||
}
|
||||
|
||||
data NetworkConfig = NetworkConfig
|
||||
{ socksProxy :: Maybe SocksProxy,
|
||||
tcpTimeout :: Int
|
||||
}
|
||||
deriving (Show, Generic, FromJSON)
|
||||
|
||||
instance ToJSON NetworkConfig where
|
||||
toJSON = J.genericToJSON J.defaultOptions {J.omitNothingFields = True}
|
||||
toEncoding = J.genericToEncoding J.defaultOptions {J.omitNothingFields = True}
|
||||
|
||||
data AgentConfig = AgentConfig
|
||||
{ tcpPort :: ServiceName,
|
||||
|
|
|
@ -70,14 +70,13 @@ import Data.List.NonEmpty (NonEmpty)
|
|||
import qualified Data.List.NonEmpty as L
|
||||
import Data.Maybe (fromMaybe)
|
||||
import Network.Socket (ServiceName)
|
||||
import Network.Socks5 (SocksConf)
|
||||
import Numeric.Natural
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Protocol as SMP
|
||||
import Simplex.Messaging.TMap (TMap)
|
||||
import qualified Simplex.Messaging.TMap as TM
|
||||
import Simplex.Messaging.Transport
|
||||
import Simplex.Messaging.Transport.Client (runTransportClient)
|
||||
import Simplex.Messaging.Transport.Client (SocksProxy, runTransportClient)
|
||||
import Simplex.Messaging.Transport.KeepAlive
|
||||
import Simplex.Messaging.Transport.WebSockets (WS)
|
||||
import Simplex.Messaging.Util (bshow, liftError, raceAny_)
|
||||
|
@ -120,7 +119,7 @@ data ProtocolClientConfig = ProtocolClientConfig
|
|||
-- | TCP keep-alive options, Nothing to skip enabling keep-alive
|
||||
tcpKeepAlive :: Maybe KeepAliveOpts,
|
||||
-- | use SOCKS5 proxy
|
||||
socksProxy :: Maybe SocksConf,
|
||||
socksProxy :: Maybe SocksProxy,
|
||||
-- | period for SMP ping commands (microseconds)
|
||||
smpPing :: Int,
|
||||
-- | SMP client-server protocol version range
|
||||
|
|
|
@ -5,11 +5,16 @@ module Simplex.Messaging.Transport.Client
|
|||
runTLSTransportClient,
|
||||
smpClientHandshake,
|
||||
defaultSMPPort,
|
||||
defaultSocksProxy,
|
||||
SocksProxy,
|
||||
)
|
||||
where
|
||||
|
||||
import Control.Applicative (optional)
|
||||
import Control.Monad.Except
|
||||
import Control.Monad.IO.Unlift
|
||||
import Data.Aeson (FromJSON (..), ToJSON (..))
|
||||
import qualified Data.Attoparsec.ByteString.Char8 as A
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import Data.Default (def)
|
||||
|
@ -23,6 +28,7 @@ import Network.Socket
|
|||
import Network.Socks5
|
||||
import qualified Network.TLS as T
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Encoding.String
|
||||
import Simplex.Messaging.Transport
|
||||
import Simplex.Messaging.Transport.KeepAlive
|
||||
import System.IO.Error
|
||||
|
@ -31,13 +37,13 @@ import UnliftIO.Exception (IOException)
|
|||
import qualified UnliftIO.Exception as E
|
||||
|
||||
-- | Connect to passed TCP host:port and pass handle to the client.
|
||||
runTransportClient :: (Transport c, MonadUnliftIO m) => Maybe SocksConf -> HostName -> ServiceName -> Maybe C.KeyHash -> Maybe KeepAliveOpts -> (c -> m a) -> m a
|
||||
runTransportClient :: (Transport c, MonadUnliftIO m) => Maybe SocksProxy -> HostName -> ServiceName -> Maybe C.KeyHash -> Maybe KeepAliveOpts -> (c -> m a) -> m a
|
||||
runTransportClient = runTLSTransportClient supportedParameters Nothing
|
||||
|
||||
runTLSTransportClient :: (Transport c, MonadUnliftIO m) => T.Supported -> Maybe XS.CertificateStore -> Maybe SocksConf -> HostName -> ServiceName -> Maybe C.KeyHash -> Maybe KeepAliveOpts -> (c -> m a) -> m a
|
||||
runTLSTransportClient tlsParams caStore_ socksConf_ host port keyHash keepAliveOpts client = do
|
||||
runTLSTransportClient :: (Transport c, MonadUnliftIO m) => T.Supported -> Maybe XS.CertificateStore -> Maybe SocksProxy -> HostName -> ServiceName -> Maybe C.KeyHash -> Maybe KeepAliveOpts -> (c -> m a) -> m a
|
||||
runTLSTransportClient tlsParams caStore_ socksProxy_ host port keyHash keepAliveOpts client = do
|
||||
let clientParams = mkTLSClientParams tlsParams caStore_ host port keyHash
|
||||
connectTCP = maybe connectTCPClient connectSocksClient socksConf_
|
||||
connectTCP = maybe connectTCPClient connectSocksClient socksProxy_
|
||||
c <- liftIO $ connectTLSClient connectTCP host port clientParams keepAliveOpts
|
||||
client c `E.finally` liftIO (closeConnection c)
|
||||
|
||||
|
@ -73,10 +79,38 @@ connectTCPClient host port = withSocketsDo $ resolve >>= tryOpen err
|
|||
defaultSMPPort :: PortNumber
|
||||
defaultSMPPort = 5223
|
||||
|
||||
connectSocksClient :: SocksConf -> HostName -> ServiceName -> IO Socket
|
||||
connectSocksClient socksProxy host _port = do
|
||||
connectSocksClient :: SocksProxy -> HostName -> ServiceName -> IO Socket
|
||||
connectSocksClient (SocksProxy addr) host _port = do
|
||||
let port = if null _port then defaultSMPPort else fromMaybe defaultSMPPort $ readMaybe _port
|
||||
fst <$> socksConnect socksProxy (SocksAddress (SocksAddrDomainName $ B.pack host) port)
|
||||
fst <$> socksConnect (defaultSocksConf addr) (SocksAddress (SocksAddrDomainName $ B.pack host) port)
|
||||
|
||||
defaultSocksHost :: HostAddress
|
||||
defaultSocksHost = tupleToHostAddress (127, 0, 0, 1)
|
||||
|
||||
defaultSocksProxy :: SocksProxy
|
||||
defaultSocksProxy = SocksProxy $ SockAddrInet 9050 defaultSocksHost
|
||||
|
||||
newtype SocksProxy = SocksProxy SockAddr
|
||||
deriving (Eq)
|
||||
|
||||
instance Show SocksProxy where show (SocksProxy addr) = show addr
|
||||
|
||||
instance StrEncoding SocksProxy where
|
||||
strEncode = B.pack . show
|
||||
strP = do
|
||||
host <- maybe defaultSocksHost tupleToHostAddress <$> optional ipv4P
|
||||
port <- fromMaybe 9050 <$> optional (A.char ':' *> (fromInteger <$> A.decimal))
|
||||
pure . SocksProxy $ SockAddrInet port host
|
||||
where
|
||||
ipv4P = (,,,) <$> ipNum <*> ipNum <*> ipNum <*> A.decimal
|
||||
ipNum = A.decimal <* A.char '.'
|
||||
|
||||
instance ToJSON SocksProxy where
|
||||
toJSON = strToJSON
|
||||
toEncoding = strToJEncoding
|
||||
|
||||
instance FromJSON SocksProxy where
|
||||
parseJSON = strParseJSON "SocksProxy"
|
||||
|
||||
mkTLSClientParams :: T.Supported -> Maybe XS.CertificateStore -> HostName -> ServiceName -> Maybe C.KeyHash -> T.ClientParams
|
||||
mkTLSClientParams supported caStore_ host port keyHash_ = do
|
||||
|
|
|
@ -166,8 +166,7 @@ initAgentServers =
|
|||
InitialAgentServers
|
||||
{ smp = L.fromList [testSMPServer],
|
||||
ntf = ["ntf://LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI=@localhost:6001"],
|
||||
socksProxy = Nothing,
|
||||
tcpTimeout = 5000000
|
||||
netCfg = NetworkConfig {socksProxy = Nothing, tcpTimeout = 5000000}
|
||||
}
|
||||
|
||||
initAgentServers2 :: InitialAgentServers
|
||||
|
|
Reference in New Issue