ios: synchronizing processing between app & nse (#398)

* ios: synchronizing processing between app & nse

* update rfc

* add sequence diagram

* manage agent phase

* track agent operation and phase changes

* update

* remove APInactive

* Update src/Simplex/Messaging/Agent/Protocol.hs

Co-authored-by: JRoberts <8711996+jr-simplex@users.noreply.github.com>
This commit is contained in:
Evgeny Poberezkin 2022-06-14 10:27:45 +01:00 committed by GitHub
parent 62485b9367
commit 68c2682e70
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 333 additions and 107 deletions

View File

@ -0,0 +1,75 @@
# DB access and processing messages for iOS notification service extension
## Problem
The only way to receive/process notificaitons is via a separate NSE process that requires a concurrent DB and network access.
SQLite concurenncy does not work, so we need to sync database access.
The problem is complex, as we do not directly control db access from the app, it can be triggered by the message arriving and it may fail to complete in case the app is suspended in the background.
So we need to prevent db access from starting when we know the app is about to be suspended.
The last problem is how to receive and process messages in NSE - should it use recently added GET command or should it subscribe to the connections that receive messages and process messages normally.
To summarize, 2 problems need to be solved:
1. sync db access between 2 (or more, if we add share extension) processes
2. prevent access from starting when the process is due to suspend, only complete operations.
3. Receiveing and processing agent messages in NSE
## Proposed solution
For problem 1, we can use Posix semaphores from our core code, in the same bracket that manages database connection - it would wait for semaphore to be free and unlock it once the db operation is complete.
For problem 2, we need to communicate from the app when it goes to the background to prevent database access starting and completing before the suspension. This would set some `aboutToSuspend` STM flag (or the opposite) that would prevent operations from progressing (using STM retry, that would block until the flag has the value allowing operation to progress).
Several possibilities can be considered:
- use this flag in the bracket that provides DB connections. While simple, it may lock some operations in the middle and may also lead to the situation when network operation succeeds but database access was blocked, and the database is not updated.
- use this flag to stop network operations that would require database updates - like sending messages, subscriptions and ACK - all these operations would require database access once they succeed.
- use two flags, for both cases above, but set them at different times since going to background - block new network operations as soon as the app goes to the background and block database access once the app is about to be suspended.
The last option seems more robust. To do it, there will be an api allowing the app to communicat its phase:
- app going to the background would trigger blocking new network operations and start a new background task - `background` phase.
- background task receiving completion warning, or, maybe, some time after it is started - probably 20 seconds - or whatever happens earlier - would trigger call blocking db access - `suspended` phase.
- app becoming active would trigger unblocking both flags - `active` phase.
`/_app phase <phase>` where `phase` can be one of the above values.
NSE would also use the same phases:
- sending `active` when it is started (the process starts as active, but it is possible that the new notification arrives to the same process, after the previous one sent background/suspension)
- sending `background + suspended` (or `suspended` should set both flags) once it is finished processing the notification, provided no new notification arrived and started processing - this should be tracked in NSE.
For problem 3, NSE can do one of the following:
- use SUB and process messages normally - the downside is that the app will have to resubscribe and it has to be tracked.
- use GET and process messages by pushing them through the processing function - the downside it that a rewiring of message processing is needed.
- use GET but deliver messages through the same queue as when they arrive normally (in which case getMessage agent function should not return the message, but will return a flag showing whether the message was received, or, possibly, or, possibly will return a message but the message would also be sent to the queue?).
- process messages in agent manually and in chat via the queue.
One of the downside of GET is that it requires calling GET again after ACK. We could have two variants of ACK (or additional ACK parameter) - one that never delivers a new message, and another one that does. In this case, if get needs to process the next message (when the current one has no notification flag), it can call ACK that delivers the next message. But, it is probably a premature optimization, and having general support of batched commands would add more value.
Additional problem is concurrency in NSE - if the new notification from the same queue arrives before the current one finishes processing in the same process one of the following can happen:
- the 2nd notification naively call GET and receives the same message.
- the 2nd notification waits until the first finished processing, in which case it can run out of time.
The problem is that the app won't know it's the same queue, as nId is encrypted, so the agent should handle this scenario when the new call to getMessage is made before the previous one finished processing, and differentiate between calls made for additional messages (possibly, getMessage should include previous message ID, if it is available) and the first call.
EDIT: GETs have to be sent from UI to chat and from chat to agent as function calls, but the agent will have to queue get calls to make sure they return different messages. GET call would return message flags (incl. notification flag), so that the UI can send the next GET if needed without waiting.
Considered alternative: include notification content in the message and have NSE only perform decryption, without any network IO. In this case notification content would be in SEND and in NMSG, e2e encrypted.
While promising, as it solves network coordination issues and makes GET unnecessary, it creates mutliple other problems, so it was rejected:
- message content is exposed to centralized ntf and apns servers, creating additional attack vector.
- it adds complexity in security critical parts of the stack - double ratchet encryption, as it requires either storing message keys and using different IVs for notifications, or initializing completely separate ratchet for notifications content.
- it reduces the size of the message.
- it makes user experience worse, as:
- it would not accelerate handshake for new contacts and for file delivery - this approach only works for content messages.
- it would open the app without the new messages - the users would have to wait until the messages are received. It is also bad for "security optics" - the users might think that the message content was exposed to notifications.

View File

@ -0,0 +1,61 @@
sequenceDiagram
participant M as iOS message<br>notification
participant S as iOS system
participant N as iOS NSE
participant U as iOS UI
participant C as Core chat
participant A as Core agent
M ->> N: notification
S ->> N: get app pref
note over N: ignore,<br>app is active
note over M, A: app going to background
S ->> U: phase: background<br>(possibly, "will" method)
U ->> S: set app pref "pausing"
U ->> C: /_app phase paused, result CRCmdOk
C ->> A: pauseAgent<br>(no new network IO)
M ->> N: notification
S ->> N: get app pref
note over N: wait/poll for<br>"paused"/"suspending"/"suspended"<br>event/pref
A ->> C: event "IO paused"<br>(after in-flight op completed)<br>PHASE PAUSED
C ->> U: event "IO paused" (CRAppPaused)
U ->> S: set shared pref "paused"
note over M, A: process notification
M ->> N: notification
S ->> N: get app pref<br>continue if<br>"paused"/"suspending"/"suspended"
N ->> S: set NSE pref "active"
N ->> C: /_get message
C ->> A: getMessage
A ->> C: msg flags
C ->> N: msg flags
note over N: get messages<br>until notification flag set
A ->> C: MSG/CONF/INFO
C ->> N: some event
N ->> S: set NSE pref "completed"
N ->> S: show notification
note over M, A: app about to be suspended<br>(or 15-20 sec after background)
S ->> U: background task notice
U ->> S: set app pref "suspending"
U ->> C: /_app phase suspended, response ok
C ->> A: suspendAgent<br>(no new DB)
A ->> C: event "DB paused"<br>(after in-flight op completed)<br>PHASE SUSPENDED
C ->> U: event "DB paused" (CRAppSuspended)
U ->> S: set app pref "suspended"
note over M, A: app about to be activated
S ->> U: phase: active<br>(or inactive?)<br><br>(possibly, "will" method)
S ->> U: get NSE pref
U ->> S: set app pref "activating"
alt nse active?
U ->> C: /_app phase inactive
note over U: poll/wait till NSE pref is "completed"
end
U ->> C: /_app phase active (response result)
C ->> A: activateAgent<br>(allow IO/DB)
A ->> C: result ()
C ->> U: CRCmdOk
U ->> S: set app pref "active"

View File

@ -56,6 +56,7 @@ module Simplex.Messaging.Agent
enableNtfCron,
checkNtfToken,
deleteNtfToken,
setAgentPhase,
logConnection,
)
where
@ -191,6 +192,9 @@ checkNtfToken c = withAgentEnv c . checkNtfToken' c
deleteNtfToken :: AgentErrorMonad m => AgentClient -> DeviceToken -> m ()
deleteNtfToken c = withAgentEnv c . deleteNtfToken' c
setAgentPhase :: AgentErrorMonad m => AgentClient -> AgentPhase -> m ()
setAgentPhase c = withAgentEnv c . setAgentPhase' c
withAgentEnv :: AgentClient -> ReaderT Env m a -> m a
withAgentEnv c = (`runReaderT` agentEnv c)
@ -239,7 +243,7 @@ newConn c connId cMode = do
g <- asks idsDrg
agentVersion <- asks $ smpAgentVersion . config
let cData = ConnData {connId, connAgentVersion = agentVersion, duplexHandshake = Nothing} -- connection mode is determined by the accepting agent
connId' <- withStore $ \st -> createRcvConn st g cData rq cMode
connId' <- withStore c $ \st -> createRcvConn st g cData rq cMode
addSubscription c rq connId'
ns <- asks ntfSupervisor
atomically $ sendNtfSubCommand ns (rq, NSCCreate)
@ -249,7 +253,7 @@ newConn c connId cMode = do
SCMContact -> pure (connId', CRContactUri crData)
SCMInvitation -> do
(pk1, pk2, e2eRcvParams) <- liftIO $ CR.generateE2EParams CR.e2eEncryptVersion
withStore $ \st -> createRatchetX3dhKeys st connId' pk1 pk2
withStore c $ \st -> createRatchetX3dhKeys st connId' pk1 pk2
pure (connId', CRInvitationUri crData $ toVersionRangeT e2eRcvParams CR.e2eEncryptVRange)
joinConn :: AgentMonad m => AgentClient -> ConnId -> ConnectionRequestUri c -> ConnInfo -> m ConnId
@ -268,7 +272,7 @@ joinConn c connId (CRInvitationUri (ConnReqUriData _ agentVRange (qUri :| _)) e2
g <- asks idsDrg
let duplexHS = connAgentVersion /= 1
cData = ConnData {connId, connAgentVersion, duplexHandshake = Just duplexHS}
connId' <- withStore $ \st -> do
connId' <- withStore c $ \st -> do
connId' <- createSndConn st g cData sq
createRatchet st connId' rc
pure connId'
@ -279,7 +283,7 @@ joinConn c connId (CRInvitationUri (ConnReqUriData _ agentVRange (qUri :| _)) e2
pure connId'
Left e -> do
-- TODO recovery for failure on network timeout, see rfcs/2022-04-20-smp-conf-timeout-recovery.md
withStore (`deleteConn` connId')
withStore c (`deleteConn` connId')
throwError e
_ -> throwError $ AGENT A_VERSION
joinConn c connId (CRContactUri (ConnReqUriData _ agentVRange (qUri :| _))) cInfo = do
@ -301,16 +305,16 @@ createReplyQueue c connId = do
-- TODO reply queue version should be the same as send queue, ignoring it in v1
let qInfo = toVersionT qUri SMP.smpClientVersion
addSubscription c rq connId
withStore $ \st -> upgradeSndConnToDuplex st connId rq
withStore c $ \st -> upgradeSndConnToDuplex st connId rq
pure qInfo
-- | Approve confirmation (LET command) in Reader monad
allowConnection' :: AgentMonad m => AgentClient -> ConnId -> ConfirmationId -> ConnInfo -> m ()
allowConnection' c connId confId ownConnInfo = do
withStore (`getConn` connId) >>= \case
withStore c (`getConn` connId) >>= \case
SomeConn _ (RcvConnection cData rq) -> do
AcceptedConfirmation {senderConf, ratchetState} <- withStore $ \st -> acceptConfirmation st confId ownConnInfo
withStore $ \st -> createRatchet st connId ratchetState
AcceptedConfirmation {senderConf, ratchetState} <- withStore c $ \st -> acceptConfirmation st confId ownConnInfo
withStore c $ \st -> createRatchet st connId ratchetState
processConfirmation c rq senderConf
mapM_ (connectReplyQueues c cData ownConnInfo) (L.nonEmpty $ smpReplyQueues senderConf)
_ -> throwError $ CMD PROHIBITED
@ -318,29 +322,29 @@ allowConnection' c connId confId ownConnInfo = do
-- | Accept contact (ACPT command) in Reader monad
acceptContact' :: AgentMonad m => AgentClient -> ConnId -> InvitationId -> ConnInfo -> m ConnId
acceptContact' c connId invId ownConnInfo = do
Invitation {contactConnId, connReq} <- withStore (`getInvitation` invId)
withStore (`getConn` contactConnId) >>= \case
Invitation {contactConnId, connReq} <- withStore c (`getInvitation` invId)
withStore c (`getConn` contactConnId) >>= \case
SomeConn _ ContactConnection {} -> do
withStore $ \st -> acceptInvitation st invId ownConnInfo
withStore c $ \st -> acceptInvitation st invId ownConnInfo
joinConn c connId connReq ownConnInfo
_ -> throwError $ CMD PROHIBITED
-- | Reject contact (RJCT command) in Reader monad
rejectContact' :: AgentMonad m => AgentClient -> ConnId -> InvitationId -> m ()
rejectContact' _ contactConnId invId =
withStore $ \st -> deleteInvitation st contactConnId invId
rejectContact' c contactConnId invId =
withStore c $ \st -> deleteInvitation st contactConnId invId
processConfirmation :: AgentMonad m => AgentClient -> RcvQueue -> SMPConfirmation -> m ()
processConfirmation c rq@RcvQueue {e2ePrivKey} SMPConfirmation {senderKey, e2ePubKey} = do
let dhSecret = C.dh' e2ePubKey e2ePrivKey
withStore $ \st -> setRcvQueueConfirmedE2E st rq dhSecret
withStore c $ \st -> setRcvQueueConfirmedE2E st rq dhSecret
secureQueue c rq senderKey
withStore $ \st -> setRcvQueueStatus st rq Secured
withStore c $ \st -> setRcvQueueStatus st rq Secured
-- | Subscribe to receive connection messages (SUB command) in Reader monad
subscribeConnection' :: forall m. AgentMonad m => AgentClient -> ConnId -> m ()
subscribeConnection' c connId =
withStore (`getConn` connId) >>= \case
withStore c (`getConn` connId) >>= \case
SomeConn _ (DuplexConnection cData rq sq) -> do
resumeMsgDelivery c cData sq
subscribe rq
@ -368,7 +372,7 @@ resubscribeConnection' c connId =
-- | Send message to the connection (SEND command) in Reader monad
sendMessage' :: forall m. AgentMonad m => AgentClient -> ConnId -> MsgFlags -> MsgBody -> m AgentMsgId
sendMessage' c connId msgFlags msg =
withStore (`getConn` connId) >>= \case
withStore c (`getConn` connId) >>= \case
SomeConn _ (DuplexConnection cData _ sq) -> enqueueMsg cData sq
SomeConn _ (SndConnection cData sq) -> enqueueMsg cData sq
_ -> throwError $ CONN SIMPLEX
@ -384,7 +388,7 @@ enqueueMessage c cData@ConnData {connId, connAgentVersion} sq msgFlags aMessage
pure $ unId msgId
where
storeSentMsg :: m InternalId
storeSentMsg = withStore $ \st -> do
storeSentMsg = withStore c $ \st -> do
internalTs <- liftIO getCurrentTime
(internalId, internalSndId, prevMsgHash) <- updateSndIds st connId
let privHeader = APrivHeader (unSndId internalSndId) prevMsgHash
@ -405,7 +409,7 @@ resumeMsgDelivery c cData@ConnData {connId} sq@SndQueue {server, sndId} = do
async (runSmpQueueMsgDelivery c cData sq)
>>= \a -> atomically (TM.insert qKey a $ smpQueueMsgDeliveries c)
unlessM connQueued $
withStore (`getPendingMsgs` connId)
withStore c (`getPendingMsgs` connId)
>>= queuePendingMsgs c connId sq
where
queueDelivering qKey = atomically $ TM.member qKey (smpQueueMsgDeliveries c)
@ -431,9 +435,11 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {connId, duplexHandsh
mq <- atomically $ getPendingMsgQ c connId sq
ri <- asks $ reconnectInterval . config
forever $ do
atomically $ endAgentOperation c AONetwork
msgId <- atomically $ readTQueue mq
atomically $ beginAgentOperation c AONetwork
let mId = unId msgId
withStore (\st -> E.try $ getPendingMsgData st connId msgId) >>= \case
withStore c (\st -> E.try $ getPendingMsgData st connId msgId) >>= \case
Left (e :: E.SomeException) ->
notify $ MERR mId (INTERNAL $ show e)
Right (rq_, PendingMsgData {msgType, msgBody, msgFlags, internalTs}) ->
@ -448,7 +454,7 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {connId, duplexHandsh
SMP SMP.QUOTA -> case msgType of
AM_CONN_INFO -> connError msgId NOT_AVAILABLE
AM_CONN_INFO_REPLY -> connError msgId NOT_AVAILABLE
_ -> loop
_ -> retrySending loop
SMP SMP.AUTH -> case msgType of
AM_CONN_INFO -> connError msgId NOT_AVAILABLE
AM_CONN_INFO_REPLY -> connError msgId NOT_AVAILABLE
@ -461,7 +467,7 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {connId, duplexHandsh
currentTime <- liftIO getCurrentTime
if diffUTCTime currentTime internalTs > helloTimeout
then connErr
else loop
else retrySending loop
where
connErr = case rq_ of
-- party initiating connection
@ -472,17 +478,17 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {connId, duplexHandsh
AM_A_MSG_ -> notifyDel msgId $ MERR mId e
SMP (SMP.CMD _) -> notifyDel msgId err
SMP SMP.LARGE_MSG -> notifyDel msgId err
SMP {} -> notify err >> loop
_ -> loop
SMP {} -> notify err >> retrySending loop
_ -> retrySending loop
Right () -> do
case msgType of
AM_CONN_INFO -> do
withStore $ \st -> setSndQueueStatus st sq Confirmed
when (isJust rq_) $ withStore (`removeConfirmations` connId)
withStore c $ \st -> setSndQueueStatus st sq Confirmed
when (isJust rq_) $ withStore c (`removeConfirmations` connId)
-- TODO possibly notification flag should be ON for one of the parties, to result in contact connected notification
unless (duplexHandshake == Just True) . void $ enqueueMessage c cData sq SMP.noMsgFlags HELLO
AM_HELLO_ -> do
withStore $ \st -> setSndQueueStatus st sq Active
withStore c $ \st -> setSndQueueStatus st sq Active
case rq_ of
-- party initiating connection (in v1)
Just RcvQueue {status} ->
@ -507,16 +513,21 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {connId, duplexHandsh
delMsg msgId
where
delMsg :: InternalId -> m ()
delMsg msgId = withStore $ \st -> deleteMsg st connId msgId
delMsg msgId = withStore c $ \st -> deleteMsg st connId msgId
notify :: ACommand 'Agent -> m ()
notify cmd = atomically $ writeTBQueue subQ ("", connId, cmd)
notifyDel :: InternalId -> ACommand 'Agent -> m ()
notifyDel msgId cmd = notify cmd >> delMsg msgId
connError msgId = notifyDel msgId . ERR . CONN
retrySending loop = do
atomically $ do
endAgentOperation c AONetwork
beginAgentOperation c AONetwork
loop
ackMessage' :: forall m. AgentMonad m => AgentClient -> ConnId -> AgentMsgId -> m ()
ackMessage' c connId msgId = do
withStore (`getConn` connId) >>= \case
withStore c (`getConn` connId) >>= \case
SomeConn _ (DuplexConnection _ rq _) -> ack rq
SomeConn _ (RcvConnection _ rq) -> ack rq
_ -> throwError $ CONN SIMPLEX
@ -524,16 +535,16 @@ ackMessage' c connId msgId = do
ack :: RcvQueue -> m ()
ack rq = do
let mId = InternalId msgId
srvMsgId <- withStore $ \st -> setMsgUserAck st connId mId
srvMsgId <- withStore c $ \st -> setMsgUserAck st connId mId
sendAck c rq srvMsgId `catchError` \case
SMP SMP.NO_MSG -> pure ()
e -> throwError e
withStore $ \st -> deleteMsg st connId mId
withStore c $ \st -> deleteMsg st connId mId
-- | Suspend SMP agent connection (OFF command) in Reader monad
suspendConnection' :: AgentMonad m => AgentClient -> ConnId -> m ()
suspendConnection' c connId =
withStore (`getConn` connId) >>= \case
withStore c (`getConn` connId) >>= \case
SomeConn _ (DuplexConnection _ rq _) -> suspendQueue c rq
SomeConn _ (RcvConnection _ rq) -> suspendQueue c rq
_ -> throwError $ CONN SIMPLEX
@ -541,11 +552,11 @@ suspendConnection' c connId =
-- | Delete SMP agent connection (DEL command) in Reader monad
deleteConnection' :: forall m. AgentMonad m => AgentClient -> ConnId -> m ()
deleteConnection' c connId =
withStore (`getConn` connId) >>= \case
withStore c (`getConn` connId) >>= \case
SomeConn _ (DuplexConnection _ rq _) -> delete rq
SomeConn _ (RcvConnection _ rq) -> delete rq
SomeConn _ (ContactConnection _ rq) -> delete rq
SomeConn _ (SndConnection _ _) -> withStore (`deleteConn` connId)
SomeConn _ (SndConnection _ _) -> withStore c (`deleteConn` connId)
where
delete :: RcvQueue -> m ()
delete rq = do
@ -554,7 +565,7 @@ deleteConnection' c connId =
atomically $ do
removeSubscription c connId
sendNtfSubCommand ns (rq, NSCDelete)
withStore (`deleteConn` connId)
withStore c (`deleteConn` connId)
-- | Change servers to be used for creating new queues, in Reader monad
setSMPServers' :: AgentMonad m => AgentClient -> NonEmpty SMPServer -> m ()
@ -563,7 +574,7 @@ setSMPServers' c servers = do
registerNtfToken' :: forall m. AgentMonad m => AgentClient -> DeviceToken -> m NtfTknStatus
registerNtfToken' c deviceToken =
withStore (`getDeviceNtfToken` deviceToken) >>= \case
withStore c (`getDeviceNtfToken` deviceToken) >>= \case
(Just tkn@NtfToken {ntfTokenId, ntfTknStatus, ntfTknAction}, prevTokens) -> do
mapM_ (deleteToken_ c) prevTokens
ns <- asks ntfSupervisor
@ -583,7 +594,7 @@ registerNtfToken' c deviceToken =
-- agentNtfCheckToken c tknId tkn >>= \case
(Just tknId, Just NTADelete) -> do
agentNtfDeleteToken c tknId tkn
withStore $ \st -> removeNtfToken st tkn
withStore c $ \st -> removeNtfToken st tkn
atomically $ nsRemoveNtfToken ns
pure NTExpired
_ -> pure ntfTknStatus
@ -595,7 +606,7 @@ registerNtfToken' c deviceToken =
tknKeys <- liftIO $ C.generateSignatureKeyPair a
dhKeys <- liftIO C.generateKeyPair'
let tkn = newNtfToken deviceToken ntfServer tknKeys dhKeys
withStore $ \st -> createNtfToken st tkn
withStore c $ \st -> createNtfToken st tkn
registerToken tkn
pure NTRegistered
_ -> throwError $ CMD PROHIBITED
@ -605,14 +616,14 @@ registerNtfToken' c deviceToken =
registerToken tkn@NtfToken {ntfPubKey, ntfDhKeys = (pubDhKey, privDhKey)} = do
(tknId, srvPubDhKey) <- agentNtfRegisterToken c tkn ntfPubKey pubDhKey
let dhSecret = C.dh' srvPubDhKey privDhKey
withStore $ \st -> updateNtfTokenRegistration st tkn tknId dhSecret
withStore c $ \st -> updateNtfTokenRegistration st tkn tknId dhSecret
ns <- asks ntfSupervisor
atomically $ nsUpdateToken ns tkn
-- TODO decrypt verification code
verifyNtfToken' :: AgentMonad m => AgentClient -> DeviceToken -> ByteString -> C.CbNonce -> m ()
verifyNtfToken' c deviceToken code nonce =
withStore (`getDeviceNtfToken` deviceToken) >>= \case
withStore c (`getDeviceNtfToken` deviceToken) >>= \case
(Just tkn@NtfToken {ntfTokenId = Just tknId, ntfDhSecret = Just dhSecret}, _) -> do
code' <- liftEither . bimap cryptoError NtfRegCode $ C.cbDecrypt dhSecret nonce code
void . withToken c tkn (Just (NTConfirmed, NTAVerify code')) (NTActive, Just NTACheck) $ do
@ -622,7 +633,7 @@ verifyNtfToken' c deviceToken code nonce =
enableNtfCron' :: AgentMonad m => AgentClient -> DeviceToken -> Word16 -> m ()
enableNtfCron' c deviceToken interval = do
when (interval < 20) . throwError $ CMD PROHIBITED
withStore (`getDeviceNtfToken` deviceToken) >>= \case
withStore c (`getDeviceNtfToken` deviceToken) >>= \case
(Just tkn@NtfToken {ntfTokenId = Just tknId, ntfTknStatus = NTActive}, _) ->
void . withToken c tkn (Just (NTActive, NTACron interval)) (cronSuccess interval) $
agentNtfEnableCron c tknId tkn interval
@ -635,13 +646,13 @@ cronSuccess interval
checkNtfToken' :: AgentMonad m => AgentClient -> DeviceToken -> m NtfTknStatus
checkNtfToken' c deviceToken =
withStore (`getDeviceNtfToken` deviceToken) >>= \case
withStore c (`getDeviceNtfToken` deviceToken) >>= \case
(Just tkn@NtfToken {ntfTokenId = Just tknId}, _) -> agentNtfCheckToken c tknId tkn
_ -> throwError $ CMD PROHIBITED
deleteNtfToken' :: AgentMonad m => AgentClient -> DeviceToken -> m ()
deleteNtfToken' c deviceToken =
withStore (`getDeviceNtfToken` deviceToken) >>= \case
withStore c (`getDeviceNtfToken` deviceToken) >>= \case
(Just tkn, _) -> deleteToken_ c tkn
_ -> throwError $ CMD PROHIBITED
@ -650,30 +661,30 @@ deleteToken_ c tkn@NtfToken {ntfTokenId, ntfTknStatus} = do
ns <- asks ntfSupervisor
forM_ ntfTokenId $ \tknId -> do
let ntfTknAction = Just NTADelete
withStore $ \st -> updateNtfToken st tkn ntfTknStatus ntfTknAction
withStore c $ \st -> updateNtfToken st tkn ntfTknStatus ntfTknAction
atomically $ nsUpdateToken ns tkn {ntfTknStatus, ntfTknAction}
agentNtfDeleteToken c tknId tkn `catchError` \case
NTF AUTH -> pure ()
e -> throwError e
withStore $ \st -> removeNtfToken st tkn
withStore c $ \st -> removeNtfToken st tkn
atomically $ nsRemoveNtfToken ns
withToken :: AgentMonad m => AgentClient -> NtfToken -> Maybe (NtfTknStatus, NtfTknAction) -> (NtfTknStatus, Maybe NtfTknAction) -> m a -> m NtfTknStatus
withToken c tkn@NtfToken {deviceToken} from_ (toStatus, toAction_) f = do
ns <- asks ntfSupervisor
forM_ from_ $ \(status, action) -> do
withStore $ \st -> updateNtfToken st tkn status (Just action)
withStore c $ \st -> updateNtfToken st tkn status (Just action)
atomically $ nsUpdateToken ns tkn {ntfTknStatus = status, ntfTknAction = Just action}
tryError f >>= \case
Right _ -> do
withStore $ \st -> updateNtfToken st tkn toStatus toAction_
withStore c $ \st -> updateNtfToken st tkn toStatus toAction_
let updatedToken = tkn {ntfTknStatus = toStatus, ntfTknAction = toAction_}
if toStatus == NTActive
then initializeNtfSubQ c updatedToken
else atomically $ nsUpdateToken ns updatedToken
pure toStatus
Left e@(NTF AUTH) -> do
withStore $ \st -> removeNtfToken st tkn
withStore c $ \st -> removeNtfToken st tkn
atomically $ nsRemoveNtfToken ns
void $ registerNtfToken' c deviceToken
throwError e
@ -686,7 +697,7 @@ initializeNtfSubQ c tkn = do
nsUpdateToken ns tkn
getSubscriptions c
forM_ connIds $ \connId -> do
rq <- withStore $ \st -> getRcvQueue st connId
rq <- withStore c $ \st -> getRcvQueue st connId
atomically $ sendNtfSubCommand ns (rq, NSCCreate)
-- TODO
@ -695,8 +706,14 @@ initializeNtfSubQ c tkn = do
-- It is an optimization, but I am thinking how it would behave if a user were to flip on/off quickly several times.
setNtfServers' :: AgentMonad m => AgentClient -> [NtfServer] -> m ()
setNtfServers' c servers = do
atomically $ writeTVar (ntfServers c) servers
setNtfServers' c = atomically . writeTVar (ntfServers c)
setAgentPhase' :: AgentMonad m => AgentClient -> AgentPhase -> m ()
setAgentPhase' c p = do
aPhase <- asks agentPhase
atomically $ do
writeTVar aPhase (p, False)
notifyAgentPhaseChanged c
getSMPServer :: AgentMonad m => AgentClient -> m SMPServer
getSMPServer c = do
@ -710,14 +727,16 @@ getSMPServer c = do
subscriber :: (MonadUnliftIO m, MonadReader Env m) => AgentClient -> m ()
subscriber c@AgentClient {msgQ} = forever $ do
atomically $ endAgentOperation c AONetwork
t <- atomically $ readTBQueue msgQ
atomically $ beginAgentOperation c AONetwork
withAgentLock c (runExceptT $ processSMPTransmission c t) >>= \case
Left e -> liftIO $ print e
Right _ -> return ()
processSMPTransmission :: forall m. AgentMonad m => AgentClient -> ServerTransmission BrokerMsg -> m ()
processSMPTransmission c@AgentClient {smpClients, subQ} (srv, sessId, rId, cmd) = do
withStore (\st -> getRcvConn st srv rId) >>= \case
processSMPTransmission c@AgentClient {smpClients, subQ} (srv, sessId, rId, cmd) =
withStore c (\st -> getRcvConn st srv rId) >>= \case
SomeConn _ conn@(DuplexConnection cData rq _) -> processSMP conn cData rq
SomeConn _ conn@(RcvConnection cData rq) -> processSMP conn cData rq
SomeConn _ conn@(ContactConnection cData rq) -> processSMP conn cData rq
@ -746,19 +765,19 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (srv, sessId, rId, cmd)
(SMP.PHEmpty, AgentMsgEnvelope _ encAgentMsg) ->
tryError agentClientMsg >>= \case
Right (Just (msgId, msgMeta, aMessage)) -> case aMessage of
HELLO -> helloMsg >> ack >> withStore (\st -> deleteMsg st connId msgId)
REPLY cReq -> replyMsg cReq >> ack >> withStore (\st -> deleteMsg st connId msgId)
HELLO -> helloMsg >> ack >> withStore c (\st -> deleteMsg st connId msgId)
REPLY cReq -> replyMsg cReq >> ack >> withStore c (\st -> deleteMsg st connId msgId)
-- note that there is no ACK sent for A_MSG, it is sent with agent's user ACK command
A_MSG body -> do
logServer "<--" c srv rId "MSG <MSG>"
notify $ MSG msgMeta msgFlags body
Right _ -> prohibited >> ack
Left e@(AGENT A_DUPLICATE) -> do
withStore (\st -> getLastMsg st connId srvMsgId) >>= \case
withStore c (\st -> getLastMsg st connId srvMsgId) >>= \case
Just RcvMsg {internalId, msgMeta, msgBody = agentMsgBody, userAck}
| userAck -> do
ack
withStore $ \st -> deleteMsg st connId internalId
withStore c $ \st -> deleteMsg st connId internalId
| otherwise -> do
liftEither (parse smpP (AGENT A_MESSAGE) agentMsgBody) >>= \case
AgentMessage _ (A_MSG body) -> do
@ -769,7 +788,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (srv, sessId, rId, cmd)
Left e -> throwError e
where
agentClientMsg :: m (Maybe (InternalId, MsgMeta, AMessage))
agentClientMsg = withStore $ \st -> do
agentClientMsg = withStore c $ \st -> do
agentMsgBody <- agentRatchetDecrypt st connId encAgentMsg
liftEither (parse smpP (SEAgentError $ AGENT A_MESSAGE) agentMsgBody) >>= \case
agentMsg@(AgentMessage APrivHeader {sndMsgId, prevMsgHash} aMessage) -> do
@ -843,7 +862,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (srv, sessId, rId, cmd)
New -> case (conn, e2eEncryption) of
-- party initiating connection
(RcvConnection {}, Just e2eSndParams) -> do
(pk1, rcDHRs) <- withStore $ \st -> getRatchetX3dhKeys st connId
(pk1, rcDHRs) <- withStore c $ \st -> getRatchetX3dhKeys st connId
let rc = CR.initRcvRatchet rcDHRs $ CR.x3dhRcv pk1 rcDHRs e2eSndParams
(agentMsgBody_, rc', skipped) <- liftError cryptoError $ CR.rcDecrypt rc M.empty encConnInfo
case (agentMsgBody_, skipped) of
@ -858,14 +877,14 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (srv, sessId, rId, cmd)
processConf connInfo senderConf duplexHS = do
let newConfirmation = NewConfirmation {connId, senderConf, ratchetState = rc'}
g <- asks idsDrg
confId <- withStore $ \st -> do
confId <- withStore c $ \st -> do
setHandshakeVersion st connId agentVersion duplexHS
createConfirmation st g newConfirmation
notify $ CONF confId connInfo
_ -> prohibited
-- party accepting connection
(DuplexConnection _ _ sq, Nothing) -> do
withStore (\st -> agentRatchetDecrypt st connId encConnInfo) >>= parseMessage >>= \case
withStore c (\st -> agentRatchetDecrypt st connId encConnInfo) >>= parseMessage >>= \case
AgentConnInfo connInfo -> do
notify $ INFO connInfo
processConfirmation c rq $ SMPConfirmation {senderKey, e2ePubKey, connInfo, smpReplyQueues = []}
@ -880,7 +899,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (srv, sessId, rId, cmd)
case status of
Active -> prohibited
_ -> do
withStore $ \st -> setRcvQueueStatus st rq Active
withStore c $ \st -> setRcvQueueStatus st rq Active
case conn of
DuplexConnection _ _ sq@SndQueue {status = sndStatus}
-- `sndStatus == Active` when HELLO was previously sent, and this is the reply HELLO
@ -902,7 +921,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (srv, sessId, rId, cmd)
Just True -> prohibited
_ -> case conn of
RcvConnection {} -> do
AcceptedConfirmation {ownConnInfo} <- withStore (`getAcceptedConfirmation` connId)
AcceptedConfirmation {ownConnInfo} <- withStore c (`getAcceptedConfirmation` connId)
connectReplyQueues c cData ownConnInfo smpQueues `catchError` (notify . ERR)
_ -> prohibited
@ -913,7 +932,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (srv, sessId, rId, cmd)
ContactConnection {} -> do
g <- asks idsDrg
let newInv = NewInvitation {contactConnId = connId, connReq, recipientConnInfo = cInfo}
invId <- withStore $ \st -> createInvitation st g newInv
invId <- withStore c $ \st -> createInvitation st g newInv
notify $ REQ invId cInfo
_ -> prohibited
@ -933,7 +952,7 @@ connectReplyQueues c cData@ConnData {connId} ownConnInfo (qInfo :| _) = do
Nothing -> throwError $ AGENT A_VERSION
Just qInfo' -> do
sq <- newSndQueue qInfo'
withStore $ \st -> upgradeRcvConnToDuplex st connId sq
withStore c $ \st -> upgradeRcvConnToDuplex st connId sq
enqueueConfirmation c cData sq ownConnInfo Nothing
confirmQueue :: forall m. AgentMonad m => Compatible Version -> AgentClient -> ConnId -> SndQueue -> ConnInfo -> Maybe (CR.E2ERatchetParams 'C.X448) -> m ()
@ -941,10 +960,10 @@ confirmQueue (Compatible agentVersion) c connId sq connInfo e2eEncryption = do
aMessage <- mkAgentMessage agentVersion
msg <- mkConfirmation aMessage
sendConfirmation c sq msg
withStore $ \st -> setSndQueueStatus st sq Confirmed
withStore c $ \st -> setSndQueueStatus st sq Confirmed
where
mkConfirmation :: AgentMessage -> m MsgBody
mkConfirmation aMessage = withStore $ \st -> do
mkConfirmation aMessage = withStore c $ \st -> do
void $ updateSndIds st connId
encConnInfo <- agentRatchetEncrypt st connId (smpEncode aMessage) e2eEncConnInfoLength
pure . smpEncode $ AgentConfirmation {agentVersion, e2eEncryption, encConnInfo}
@ -961,7 +980,7 @@ enqueueConfirmation c cData@ConnData {connId, connAgentVersion} sq connInfo e2eE
queuePendingMsgs c connId sq [msgId]
where
storeConfirmation :: m InternalId
storeConfirmation = withStore $ \st -> do
storeConfirmation = withStore c $ \st -> do
internalTs <- liftIO getCurrentTime
(internalId, internalSndId, prevMsgHash) <- updateSndIds st connId
let agentMsg = AgentConnInfo connInfo

View File

@ -4,10 +4,12 @@
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedLists #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
@ -41,12 +43,16 @@ module Simplex.Messaging.Agent.Client
removeSubscription,
hasActiveSubscription,
agentDbPath,
beginAgentOperation,
endAgentOperation,
notifyAgentPhaseChanged,
withStore,
)
where
import Control.Concurrent (forkIO)
import Control.Concurrent.Async (Async, uninterruptibleCancel)
import Control.Concurrent.STM (stateTVar)
import Control.Concurrent.STM (retry, stateTVar)
import Control.Logger.Simple
import Control.Monad.Except
import Control.Monad.IO.Unlift
@ -62,11 +68,12 @@ import Data.Maybe (catMaybes)
import Data.Set (Set)
import Data.Text.Encoding
import Data.Word (Word16)
import Database.SQLite.Simple (SQLError)
import Simplex.Messaging.Agent.Env.SQLite
import Simplex.Messaging.Agent.Protocol
import Simplex.Messaging.Agent.RetryInterval
import Simplex.Messaging.Agent.Store
import Simplex.Messaging.Agent.Store.SQLite (SQLiteStore (..))
import Simplex.Messaging.Agent.Store.SQLite (AgentStoreMonad, SQLiteStore (..))
import Simplex.Messaging.Client
import Simplex.Messaging.Client.Agent ()
import qualified Simplex.Messaging.Crypto as C
@ -209,6 +216,9 @@ getSMPServerClient c@AgentClient {active, smpClients, msgQ} srv = do
n <- asks $ resubscriptionConcurrency . config
withAgentLock c . withClient c srv $ \smp -> do
cs <- atomically $ mapM readTVar =<< TM.lookup srv (pendingSubscrSrvrs c)
-- TODO if any of the subscriptions fails here (e.g. because of timeout), it terminates the whole process for all subscriptions
-- instead it should only report successful subscriptions and schedule the next call to reconnectClient to subscribe for the remaining subscriptions
-- this way, for each DOWN event there can be several UP events
conns <- pooledForConcurrentlyN n (maybe [] M.toList cs) $ \sub@(connId, _) ->
ifM
(atomically $ hasActiveSubscription c connId)
@ -320,6 +330,7 @@ closeAgentClient c = liftIO $ do
clear smpQueueMsgQueues
where
clientTimeout sel = tcpTimeout . sel . config $ agentEnv c
clear :: (AgentClient -> TMap k a) -> IO ()
clear sel = atomically $ writeTVar (sel c) M.empty
closeProtocolServerClients :: Int -> TMap ProtocolServer (ClientVar msg) -> IO ()
@ -576,3 +587,51 @@ cryptoError = \case
C.CBDecryptError -> AGENT A_ENCRYPTION
C.CERatchetDuplicateMessage -> AGENT A_DUPLICATE
e -> INTERNAL $ show e
endAgentOperation :: AgentClient -> AgentOperation -> STM ()
endAgentOperation c@AgentClient {agentEnv = Env {agentOperations}} op = do
TM.alter (Just . maybe 0 (\n -> min 0 $ n - 1)) op agentOperations
notifyAgentPhaseChanged c
beginAgentOperation :: AgentClient -> AgentOperation -> STM ()
beginAgentOperation AgentClient {agentEnv = Env {agentPhase, agentOperations}} op = do
(p, _) <- readTVar agentPhase
when (op `elem` disallowedOperations p) retry
TM.alter (Just . maybe 1 (+ 1)) op agentOperations
notifyAgentPhaseChanged :: AgentClient -> STM ()
notifyAgentPhaseChanged AgentClient {subQ, agentEnv = Env {agentPhase, agentOperations}} = do
(p, notified) <- readTVar agentPhase
unless notified $ do
ops <- readTVar agentOperations
let opsPaused = all (maybe True (== 0) . (`M.lookup` ops)) $ disallowedOperations p
when opsPaused $ do
writeTBQueue subQ ("", "", PHASE p)
writeTVar agentPhase (p, True)
withStore :: AgentMonad m => AgentClient -> (forall m'. AgentStoreMonad m' => SQLiteStore -> m' a) -> m a
withStore c action = do
st <- asks store
atomically $ beginAgentOperation c AODatabase
r <- runExceptT (action st `E.catch` handleInternal)
atomically $ endAgentOperation c AODatabase
case r of
Right res -> pure res
Left e -> throwError $ storeError e
where
-- TODO when parsing exception happens in store, the agent hangs;
-- changing SQLError to SomeException does not help
handleInternal :: (MonadError StoreError m') => SQLError -> m' a
handleInternal e = throwError . SEInternal $ bshow e
storeError :: StoreError -> AgentErrorType
storeError = \case
SEConnNotFound -> CONN NOT_FOUND
SEConnDuplicate -> CONN DUPLICATE
SEBadConnType CRcv -> CONN SIMPLEX
SEBadConnType CSnd -> CONN SIMPLEX
SEInvitationNotFound -> CMD PROHIBITED
-- this error is never reported as store error,
-- it is used to wrap agent operations when "transaction-like" store access is needed
-- NOTE: network IO should NOT be used inside AgentStoreMonad
SEAgentError e -> e
e -> INTERNAL $ show e

View File

@ -17,10 +17,11 @@ module Simplex.Messaging.Agent.Env.SQLite
defaultAgentConfig,
defaultReconnectInterval,
Env (..),
AgentOperation (..),
disallowedOperations,
newSMPAgentEnv,
NtfSupervisor (..),
NtfSupervisorCommand (..),
withStore,
)
where
@ -30,12 +31,11 @@ import Control.Monad.Reader
import Crypto.Random
import Data.List.NonEmpty (NonEmpty)
import Data.Time.Clock (NominalDiffTime, nominalDay)
import Database.SQLite.Simple (SQLError)
import Network.Socket
import Numeric.Natural
import Simplex.Messaging.Agent.Protocol
import Simplex.Messaging.Agent.RetryInterval
import Simplex.Messaging.Agent.Store (ConnType (..), RcvQueue, StoreError (..))
import Simplex.Messaging.Agent.Store (RcvQueue)
import Simplex.Messaging.Agent.Store.SQLite
import qualified Simplex.Messaging.Agent.Store.SQLite.Migrations as Migrations
import Simplex.Messaging.Client
@ -45,11 +45,9 @@ import Simplex.Messaging.Notifications.Client (NtfServer, NtfToken)
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Transport (TLS, Transport (..))
import Simplex.Messaging.Util (bshow)
import Simplex.Messaging.Version
import System.Random (StdGen, newStdGen)
import UnliftIO (Async)
import qualified UnliftIO.Exception as E
import UnliftIO.STM
-- | Agent monad with MonadReader Env and MonadError AgentErrorType
@ -122,17 +120,30 @@ data Env = Env
idsDrg :: TVar ChaChaDRG,
clientCounter :: TVar Int,
randomServer :: TVar StdGen,
agentPhase :: TVar (AgentPhase, Bool),
agentOperations :: TMap AgentOperation Int,
ntfSupervisor :: NtfSupervisor
}
data AgentOperation = AONetwork | AODatabase
deriving (Eq, Ord)
disallowedOperations :: AgentPhase -> [AgentOperation]
disallowedOperations = \case
APActive -> []
APPaused -> [AONetwork]
APSuspended -> [AONetwork, AODatabase]
newSMPAgentEnv :: (MonadUnliftIO m, MonadRandom m) => AgentConfig -> m Env
newSMPAgentEnv config@AgentConfig {dbFile, dbPoolSize, yesToMigrations} = do
idsDrg <- newTVarIO =<< drgNew
store <- liftIO $ createSQLiteStore dbFile dbPoolSize Migrations.app yesToMigrations
clientCounter <- newTVarIO 0
randomServer <- newTVarIO =<< liftIO newStdGen
agentPhase <- newTVarIO (APActive, True)
agentOperations <- atomically TM.empty
ntfSupervisor <- atomically . newNtfSubSupervisor $ tbqSize config
return Env {config, store, idsDrg, clientCounter, randomServer, ntfSupervisor}
return Env {config, store, idsDrg, clientCounter, randomServer, agentPhase, agentOperations, ntfSupervisor}
data NtfSupervisor = NtfSupervisor
{ ntfTkn :: TVar (Maybe NtfToken),
@ -150,27 +161,3 @@ newNtfSubSupervisor qSize = do
ntfWorkers <- TM.empty
ntfSMPWorkers <- TM.empty
pure NtfSupervisor {ntfTkn, ntfSubQ, ntfWorkers, ntfSMPWorkers}
withStore :: AgentMonad m => (forall m'. AgentStoreMonad m' => SQLiteStore -> m' a) -> m a
withStore action = do
st <- asks store
runExceptT (action st `E.catch` handleInternal) >>= \case
Right c -> return c
Left e -> throwError $ storeError e
where
-- TODO when parsing exception happens in store, the agent hangs;
-- changing SQLError to SomeException does not help
handleInternal :: (MonadError StoreError m') => SQLError -> m' a
handleInternal e = throwError . SEInternal $ bshow e
storeError :: StoreError -> AgentErrorType
storeError = \case
SEConnNotFound -> CONN NOT_FOUND
SEConnDuplicate -> CONN DUPLICATE
SEBadConnType CRcv -> CONN SIMPLEX
SEBadConnType CSnd -> CONN SIMPLEX
SEInvitationNotFound -> CMD PROHIBITED
-- this error is never reported as store error,
-- it is used to wrap agent operations when "transaction-like" store access is needed
-- NOTE: network IO should NOT be used inside AgentStoreMonad
SEAgentError e -> e
e -> INTERNAL $ show e

View File

@ -53,12 +53,12 @@ processNtfSub c (rcvQueue@RcvQueue {server = smpServer, rcvId}, cmd) = do
ntfToken_ <- readTVarIO $ ntfTkn ns
case cmd of
NSCCreate -> do
sub_ <- withStore $ \st -> getNtfSubscription st rcvQueue
sub_ <- withStore c $ \st -> getNtfSubscription st rcvQueue
case (sub_, ntfServer_, ntfToken_) of
(Nothing, Just ntfServer, Just tkn) -> do
currentTime <- liftIO getCurrentTime
let newSub = newNtfSubscription ntfServer tkn smpServer rcvId currentTime
withStore $ \st -> createNtfSubscription st newSub
withStore c $ \st -> createNtfSubscription st newSub
-- TODO optimize?
-- TODO - read action in getNtfSubscription and decide which worker to create
-- TODO - SMP worker can create Ntf worker on NKEY completion
@ -69,7 +69,7 @@ processNtfSub c (rcvQueue@RcvQueue {server = smpServer, rcvId}, cmd) = do
addNtfWorker ntfServer
_ -> pure ()
NSCDelete -> do
withStore $ \st -> markNtfSubscriptionForDeletion st rcvQueue
withStore c $ \st -> markNtfSubscriptionForDeletion st rcvQueue
case (ntfServer_, ntfToken_) of
(Just ntfServer, Just _) -> addNtfWorker ntfServer
_ -> pure ()
@ -91,9 +91,9 @@ processNtfSub c (rcvQueue@RcvQueue {server = smpServer, rcvId}, cmd) = do
Just (doWork, _) -> void . atomically $ tryPutTMVar doWork ()
runNtfWorker :: AgentMonad m => AgentClient -> NtfServer -> TMVar () -> m ()
runNtfWorker _c srv doWork = forever $ do
runNtfWorker c srv doWork = forever $ do
void . atomically $ readTMVar doWork
withStore $ \st ->
withStore c $ \st ->
getNextNtfSubAction st srv >>= \case
Nothing -> void . atomically $ tryTakeTMVar doWork
Just (_sub, ntfSubAction) ->
@ -105,9 +105,9 @@ runNtfWorker _c srv doWork = forever $ do
liftIO $ threadDelay delay
runNtfSMPWorker :: AgentMonad m => AgentClient -> NtfServer -> TMVar () -> m ()
runNtfSMPWorker _c srv doWork = forever $ do
runNtfSMPWorker c srv doWork = forever $ do
void . atomically $ readTMVar doWork
withStore $ \st ->
withStore c $ \st ->
getNextNtfSubSMPAction st srv >>= \case
Nothing -> void . atomically $ tryTakeTMVar doWork
Just (_sub, ntfSubAction) ->

View File

@ -82,6 +82,7 @@ module Simplex.Messaging.Agent.Protocol
QueueStatus (..),
ACorrId,
AgentMsgId,
AgentPhase (..),
-- * Encode/decode
serializeCommand,
@ -223,11 +224,34 @@ data ACommand (p :: AParty) where
DEL :: ACommand Client
OK :: ACommand Agent
ERR :: AgentErrorType -> ACommand Agent
PHASE :: AgentPhase -> ACommand Agent
deriving instance Eq (ACommand p)
deriving instance Show (ACommand p)
-- | Agent phase allows to have two agent processes concurrently working with the same database
data AgentPhase
= -- | agent is operating normally
APActive
| -- | agent is paused - no new send/receive operations will be started - they will STM-retry
APPaused
| -- | agent is suspended - no new send/receive/database operations will be started - they will STM-retry
APSuspended
deriving (Eq, Show)
instance StrEncoding AgentPhase where
strEncode = \case
APActive -> "ACTIVE"
APPaused -> "PAUSED"
APSuspended -> "SUSPENDED"
strP =
A.takeTill (== ' ') >>= \case
"ACTIVE" -> pure APActive
"PAUSED" -> pure APPaused
"SUSPENDED" -> pure APSuspended
_ -> fail "bad AgentPhase"
data ConnectionMode = CMInvitation | CMContact
deriving (Eq, Show)
@ -908,6 +932,7 @@ serializeCommand = \case
CON -> "CON"
ERR e -> "ERR " <> strEncode e
OK -> "OK"
PHASE p -> "PHASE " <> strEncode p
where
showTs :: UTCTime -> ByteString
showTs = B.pack . formatISO8601Millis