SMP v3: encrypt message timestamp and flags together with the body between server and recipient (#457)

* SMP v3: encrypt message timestamp and flags together with the body between server and recipient

* v3 tests

* update protocol doc

* add test for max size message

* delay in notifications test

* simplify v3

* encrypt server message to the recipient when sent

* refactor

* exit on error restoring the messages

* refactor, increase test timeout

* style

* add prints to the test

* remove error from unsafeMaxLenBS

* update protocol

* lint, improve test, change func param
This commit is contained in:
Evgeny Poberezkin 2022-07-05 21:08:05 +01:00 committed by GitHub
parent e3d2d6fc91
commit 9c1b43791c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 618 additions and 248 deletions

View File

@ -28,7 +28,7 @@ dependencies:
- asn1-types == 0.3.*
- async == 2.2.*
- attoparsec == 0.14.*
- base >= 4.7 && < 5
- base >= 4.14 && < 5
- base64-bytestring >= 1.0 && < 1.3
- bytestring == 0.10.*
- case-insensitive == 1.2.*

View File

@ -147,61 +147,61 @@ To create and start using a simplex queue Alice and Bob follow these steps:
1. Alice creates a simplex queue on the server:
1. Decides which SMP server to use (can be the same or different server that Alice uses for other queues) and opens secure encrypted transport connection to the chosen SMP server (see [Appendix A](#appendix-a)).
1. Decides which SMP server to use (can be the same or different server that Alice uses for other queues) and opens secure encrypted transport connection to the chosen SMP server (see [Appendix A](#appendix-a)).
2. Generates a new random public/private key pair (encryption key - `EK`) that she did not use before for Bob to encrypt the messages.
2. Generates a new random public/private key pair (encryption key - `EK`) that she did not use before for Bob to encrypt the messages.
3. Generates another new random public/private key pair (recipient key - `RK`) that she did not use before for her to sign commands and to decrypt the transmissions received from the server.
3. Generates another new random public/private key pair (recipient key - `RK`) that she did not use before for her to sign commands and to decrypt the transmissions received from the server.
4. Generates one more random key pair (recipient DH key - `RDHK`) to negotiate symmetric key that will be used by the server to encrypt message bodies delivered to Alice (to avoid shared cipher-text inside transport connection).
4. Generates one more random key pair (recipient DH key - `RDHK`) to negotiate symmetric key that will be used by the server to encrypt message bodies delivered to Alice (to avoid shared cipher-text inside transport connection).
5. Sends `"NEW"` command to the server to create a simplex queue (see `create` in [Create queue command](#create-queue-command)). This command contains previously generated unique "public" keys `RK` and `RDHK`. `RK` will be used to verify the following commands related to the same queue signed by its private counterpart, for example to subscribe to the messages received to this queue or to update the queue, e.g. by setting the key required to send the messages (initially Alice creates the queue that accepts unsigned messages, so anybody could send the message via this queue if they knew the queue sender's ID and server address).
5. Sends `"NEW"` command to the server to create a simplex queue (see `create` in [Create queue command](#create-queue-command)). This command contains previously generated unique "public" keys `RK` and `RDHK`. `RK` will be used to verify the following commands related to the same queue signed by its private counterpart, for example to subscribe to the messages received to this queue or to update the queue, e.g. by setting the key required to send the messages (initially Alice creates the queue that accepts unsigned messages, so anybody could send the message via this queue if they knew the queue sender's ID and server address).
6. The server sends `"IDS"` response with queue IDs (`queueIds`):
6. The server sends `"IDS"` response with queue IDs (`queueIds`):
- Recipient ID `RID` for Alice to manage the queue and to receive the messages.
- Recipient ID `RID` for Alice to manage the queue and to receive the messages.
- Sender ID `SID` for Bob to send messages to the queue.
- Sender ID `SID` for Bob to send messages to the queue.
- Server public DH key (`SDHK`) to negotiate a shared secret for message body encryption, that Alice uses to derive a shared secret with the server `SS`.
- Server public DH key (`SDHK`) to negotiate a shared secret for message body encryption, that Alice uses to derive a shared secret with the server `SS`.
2. Alice sends an out-of-band message to Bob via the alternative channel that both Alice and Bob trust (see [protocol abstract](#simplex-messaging-protocol-abstract)). The message must include:
- Unique "public" key (`EK`) that Bob must use for E2E key agreement.
- Unique "public" key (`EK`) that Bob must use for E2E key agreement.
- SMP server hostname and information to open secure encrypted transport connection (see [Appendix A](#appendix-a)).
- SMP server hostname and information to open secure encrypted transport connection (see [Appendix A](#appendix-a)).
- Sender queue ID `SID` for Bob to use.
- Sender queue ID `SID` for Bob to use.
3. Bob, having received the out-of-band message from Alice, connects to the queue:
1. Generates a new random public/private key pair (sender key - `SK`) that he did not use before for him to sign messages sent to Alice's server.
1. Generates a new random public/private key pair (sender key - `SK`) that he did not use before for him to sign messages sent to Alice's server.
2. Prepares the confirmation message for Alice to secure the queue. This message includes:
2. Prepares the confirmation message for Alice to secure the queue. This message includes:
- Previously generated "public" key `SK` that will be used by Alice's server to authenticate Bob's messages, once the queue is secured.
- Previously generated "public" key `SK` that will be used by Alice's server to authenticate Bob's messages, once the queue is secured.
- Optionally, any additional information (application specific, e.g. Bob's profile name and details).
- Optionally, any additional information (application specific, e.g. Bob's profile name and details).
3. Encrypts the confirmation body with the "public" key `EK` (that Alice provided via the out-of-band message).
3. Encrypts the confirmation body with the "public" key `EK` (that Alice provided via the out-of-band message).
4. Sends the encrypted message to the server with queue ID `SID` (see `send` in [Send message](#send-message)). This initial message to the queue must not be signed - signed messages will be rejected until Alice secures the queue (below).
4. Sends the encrypted message to the server with queue ID `SID` (see `send` in [Send message](#send-message)). This initial message to the queue must not be signed - signed messages will be rejected until Alice secures the queue (below).
4. Alice receives Bob's message from the server using recipient queue ID `RID` (possibly, via the same transport connection she already has opened - see `message` in [Deliver queue message](#deliver-queue-message)):
1. She decrypts received message body using the secret `SS`.
1. She decrypts received message body using the secret `SS`.
2. She decrypts received message with [key agreed with sender using] "private" key `EK`.
2. She decrypts received message with [key agreed with sender using] "private" key `EK`.
3. Even though anybody could have sent the message to the queue with ID `SID` before it is secured (e.g. if communication is compromised), Alice would ignore all messages until the decryption succeeds (i.e. the result contains the expected message format). Optionally, in the client application, she also may identify Bob using the information provided, but it is out of scope of SMP protocol.
3. Even though anybody could have sent the message to the queue with ID `SID` before it is secured (e.g. if communication is compromised), Alice would ignore all messages until the decryption succeeds (i.e. the result contains the expected message format). Optionally, in the client application, she also may identify Bob using the information provided, but it is out of scope of SMP protocol.
5. Alice secures the queue `RID` with `"KEY"` command so only Bob can send messages to it (see [Secure queue command](#secure-queue-command)):
1. She sends the `KEY` command with `RID` signed with "private" key `RK` to update the queue to only accept requests signed by "private" key `SK` provided by Bob. This command contains unique "public" key `SK` previously generated by Bob.
1. She sends the `KEY` command with `RID` signed with "private" key `RK` to update the queue to only accept requests signed by "private" key `SK` provided by Bob. This command contains unique "public" key `SK` previously generated by Bob.
2. From this moment the server will accept only signed commands to `SID`, so only Bob will be able to send messages to the queue `SID` (corresponding to `RID` that Alice has).
2. From this moment the server will accept only signed commands to `SID`, so only Bob will be able to send messages to the queue `SID` (corresponding to `RID` that Alice has).
3. Once queue is secured, Alice deletes `SID` and `SK` - even if Alice's client is compromised in the future, the attacker would not be able to send messages pretending to be Bob.
3. Once queue is secured, Alice deletes `SID` and `SK` - even if Alice's client is compromised in the future, the attacker would not be able to send messages pretending to be Bob.
6. The simplex queue `RID` is now ready to be used.
@ -215,21 +215,21 @@ Bob now can securely send messages to Alice:
1. Bob sends the message:
1. He encrypts the message to Alice with "public" key `EK` (provided by Alice, only known to Bob, used only for one simplex queue).
1. He encrypts the message to Alice with "public" key `EK` (provided by Alice, only known to Bob, used only for one simplex queue).
2. He signs `"SEND"` command to the server queue `SID` using the "private" key `SK` (that only he knows, used only for this queue).
2. He signs `"SEND"` command to the server queue `SID` using the "private" key `SK` (that only he knows, used only for this queue).
3. He sends the command to the server (see `send` in [Send message](#send-message)), that the server will authenticate using the "public" key `SK` (that Alice earlier received from Bob and provided to the server via `"KEY"` command).
3. He sends the command to the server (see `send` in [Send message](#send-message)), that the server will authenticate using the "public" key `SK` (that Alice earlier received from Bob and provided to the server via `"KEY"` command).
2. Alice receives the message(s):
1. She signs `"SUB"` command to the server to subscribe to the queue `RID` with the "private" key `RK` (see `subscribe` in [Subscribe to queue](#subscribe-to-queue)).
1. She signs `"SUB"` command to the server to subscribe to the queue `RID` with the "private" key `RK` (see `subscribe` in [Subscribe to queue](#subscribe-to-queue)).
2. The server, having authenticated Alice's command with the "public" key `RK` that she provided, delivers Bob's message(s) (see `message` in [Deliver queue message](#deliver-queue-message)).
2. The server, having authenticated Alice's command with the "public" key `RK` that she provided, delivers Bob's message(s) (see `message` in [Deliver queue message](#deliver-queue-message)).
3. She decrypts Bob's message(s) with the "private" key `EK` (that only she has).
3. She decrypts Bob's message(s) with the "private" key `EK` (that only she has).
4. She acknowledges the message reception to the server with `"ACK"` so that the server can delete the message and deliver the next messages.
4. She acknowledges the message reception to the server with `"ACK"` so that the server can delete the message and deliver the next messages.
This flow is show on sequence diagram below.
@ -360,6 +360,8 @@ The clients can optionally instruct a dedicated push notification server to subs
- `subscribeNotifications` (`"NSUB"`) - see [Subscribe to queue notifications](#subscribe-to-queue-notifications).
- `messageNotification` (`"NMSG"`) - see [Deliver message notification](#deliver-message-notification).
[`SEND` command](#send-message) includes the notification flag to instruct SMP server whether to send the notification - this flag is forwarded to the recepient inside encrypted envelope, together with the timestamp and the message body, so even if TLS is compromised this flag cannot be used for traffic correlation.
## SMP Transmission structure
Each transport block (SMP transmission) has a fixed size of 16384 bytes for traffic uniformity.
@ -587,7 +589,9 @@ Currently SMP defines only one command that can be used by senders - `send` mess
This command is sent to the server by the sender both to confirm the queue after the sender received out-of-band message from the recipient and to send messages after the queue is secured:
```abnf
send = %s"SEND " smpEncMessage
send = %s"SEND " msgFlags SP smpEncMessage
msgFlags = notificationFlag reserved
notificationFlag = %s"T" / %s"F"
smpEncMessage = smpPubHeader sentMsgBody ; message up to 16088 bytes
smpPubHeader = smpClientVersion ("1" senderPublicDhKey / "0")
smpClientVersion = word16
@ -729,9 +733,11 @@ See its syntax in [Create queue command](#create-queue-command)
The server must deliver messages to all subscribed simplex queues on the currently open transport connection. The syntax for the message delivery is:
```abnf
message = %s"MSG " msgId SP timestamp SP encryptedMsgBody
message = %s"MSG " msgId encryptedRcvMsgBody
encryptedMsgBody = <encrypt paddedSentMsgBody> ; server-encrypted padded sent msgBody
paddedSentMsgBody = <padded(sentMsgBody, maxMessageLength + 2)> ; maxMessageLength = 16088
encryptedRcvMsgBody = <encrypt rcvMsgBody> ; server-encrypted meta-data and padded sent msgBody
rcvMsgBody = timestamp msgFlags SP paddedSentMsgBody
msgId = length 24*24OCTET
timestamp = 8*8OCTET
```
@ -823,6 +829,7 @@ ok = %s"OK"
Both the recipient and the sender can use TCP or some other, possibly higher level, transport protocol to communicate with the server. The default TCP port for SMP server is 5223.
For scenarios when meta-data privacy is critical, it is recommended that clients:
- communicating over Tor network,
- establish a separate connection for each SMP queue,
- send noise traffic (using PING command).
@ -830,12 +837,14 @@ For scenarios when meta-data privacy is critical, it is recommended that clients
In addition to that, the servers can be deployed as Tor onion services.
The transport protocol should provide the following:
- server authentication (by matching server certificate hash with `serverIdentity`),
- forward secrecy (by encrypting the traffic using ephemeral keys agreed during transport handshake),
- integrity (preventing data modification by the attacker without detection),
- unique channel binding (`sessionIdentifier`) to include in the signed part of SMP transmissions.
By default, the client and server communicate using [TLS 1.3 protocol][13] restricted to:
- TLS_CHACHA20_POLY1305_SHA256 cipher suite (for better performance on mobile devices),
- ed25519 and ed448 EdDSA algorithms for signatures,
- x25519 and x448 ECDHE groups for key exchange.

View File

@ -101,7 +101,7 @@ library
, asn1-types ==0.3.*
, async ==2.2.*
, attoparsec ==0.14.*
, base >=4.7 && <5
, base >=4.14 && <5
, base64-bytestring >=1.0 && <1.3
, bytestring ==0.10.*
, case-insensitive ==1.2.*
@ -161,7 +161,7 @@ executable ntf-server
, asn1-types ==0.3.*
, async ==2.2.*
, attoparsec ==0.14.*
, base >=4.7 && <5
, base >=4.14 && <5
, base64-bytestring >=1.0 && <1.3
, bytestring ==0.10.*
, case-insensitive ==1.2.*
@ -222,7 +222,7 @@ executable smp-agent
, asn1-types ==0.3.*
, async ==2.2.*
, attoparsec ==0.14.*
, base >=4.7 && <5
, base >=4.14 && <5
, base64-bytestring >=1.0 && <1.3
, bytestring ==0.10.*
, case-insensitive ==1.2.*
@ -283,7 +283,7 @@ executable smp-server
, asn1-types ==0.3.*
, async ==2.2.*
, attoparsec ==0.14.*
, base >=4.7 && <5
, base >=4.14 && <5
, base64-bytestring >=1.0 && <1.3
, bytestring ==0.10.*
, case-insensitive ==1.2.*
@ -361,7 +361,7 @@ test-suite smp-server-test
, asn1-types ==0.3.*
, async ==2.2.*
, attoparsec ==0.14.*
, base >=4.7 && <5
, base >=4.14 && <5
, base64-bytestring >=1.0 && <1.3
, bytestring ==0.10.*
, case-insensitive ==1.2.*

View File

@ -851,7 +851,7 @@ subscriber c@AgentClient {msgQ} = forever $ do
Right _ -> return ()
processSMPTransmission :: forall m. AgentMonad m => AgentClient -> ServerTransmission BrokerMsg -> m ()
processSMPTransmission c@AgentClient {smpClients, subQ} (srv, sessId, rId, cmd) =
processSMPTransmission c@AgentClient {smpClients, subQ} (srv, v, sessId, rId, cmd) =
withStore c (\db -> getRcvConn db srv rId) >>= \case
SomeConn _ conn@(DuplexConnection cData rq _) -> processSMP conn cData rq
SomeConn _ conn@(RcvConnection cData rq) -> processSMP conn cData rq
@ -859,10 +859,10 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (srv, sessId, rId, cmd)
_ -> atomically $ writeTBQueue subQ ("", "", ERR $ CONN NOT_FOUND)
where
processSMP :: Connection c -> ConnData -> RcvQueue -> m ()
processSMP conn cData@ConnData {connId, duplexHandshake} rq@RcvQueue {rcvDhSecret, e2ePrivKey, e2eDhSecret, status} =
processSMP conn cData@ConnData {connId, duplexHandshake} rq@RcvQueue {e2ePrivKey, e2eDhSecret, status} =
case cmd of
SMP.MSG srvMsgId srvTs msgFlags msgBody' -> handleNotifyAck $ do
msgBody <- agentCbDecrypt rcvDhSecret (C.cbNonce srvMsgId) msgBody'
SMP.MSG msg@SMP.RcvMessage {msgId = srvMsgId} -> handleNotifyAck $ do
SMP.ClientRcvMsgBody {msgTs = srvTs, msgFlags, msgBody} <- decryptSMPMessage v rq msg
clientMsg@SMP.ClientMsgEnvelope {cmHeader = SMP.PubHeader phVer e2ePubKey_} <-
parseMessage msgBody
unless (phVer `isCompatible` SMP.smpClientVRange) . throwError $ AGENT A_VERSION

View File

@ -11,6 +11,7 @@
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}
module Simplex.Messaging.Agent.Client
@ -21,6 +22,7 @@ module Simplex.Messaging.Agent.Client
newRcvQueue,
subscribeQueue,
getQueueMessage,
decryptSMPMessage,
addSubscription,
getSubscriptions,
sendConfirmation,
@ -83,7 +85,6 @@ import Data.Set (Set)
import Data.Text.Encoding
import Data.Word (Word16)
import qualified Database.SQLite.Simple as DB
-- import GHC.Conc (unsafeIOToSTM)
import Simplex.Messaging.Agent.Env.SQLite
import Simplex.Messaging.Agent.Protocol
import Simplex.Messaging.Agent.RetryInterval
@ -96,7 +97,8 @@ import Simplex.Messaging.Encoding
import Simplex.Messaging.Notifications.Client
import Simplex.Messaging.Notifications.Protocol
import Simplex.Messaging.Notifications.Types
import Simplex.Messaging.Protocol (BrokerMsg, ErrorType, MsgFlags (..), MsgId, NotifierId, NtfPrivateSignKey, NtfPublicVerifyKey, ProtocolServer (..), QueueId, QueueIdsKeys (..), RcvNtfPublicDhKey, SMPMsgMeta, SndPublicVerifyKey)
import Simplex.Messaging.Parsers (parse)
import Simplex.Messaging.Protocol (BrokerMsg, ErrorType, MsgFlags (..), MsgId, NotifierId, NtfPrivateSignKey, NtfPublicVerifyKey, ProtocolServer (..), QueueId, QueueIdsKeys (..), RcvMessage (..), RcvNtfPublicDhKey, SMPMsgMeta (..), SndPublicVerifyKey)
import qualified Simplex.Messaging.Protocol as SMP
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
@ -547,11 +549,13 @@ sendInvitation c (Compatible SMPQueueInfo {smpServer, senderId, dhPublicKey}) (C
SMP.ClientMessage SMP.PHEmpty $ smpEncode agentEnvelope
getQueueMessage :: AgentMonad m => AgentClient -> RcvQueue -> m (Maybe SMPMsgMeta)
getQueueMessage c RcvQueue {server, rcvId, rcvPrivateKey} = do
getQueueMessage c rq@RcvQueue {server, rcvId, rcvPrivateKey} = do
atomically createTakeGetLock
withLogClient c server rcvId "GET" $ \smp ->
getSMPMessage smp rcvPrivateKey rcvId
(v, msg_) <- withLogClient c server rcvId "GET" $ \smp ->
(thVersion smp,) <$> getSMPMessage smp rcvPrivateKey rcvId
mapM (decryptMeta v) msg_
where
decryptMeta v msg@SMP.RcvMessage {msgId} = SMP.rcvMessageMeta msgId <$> decryptSMPMessage v rq msg
createTakeGetLock = TM.alterF takeLock (server, rcvId) $ getMsgLocks c
where
takeLock l_ = do
@ -559,6 +563,13 @@ getQueueMessage c RcvQueue {server, rcvId, rcvPrivateKey} = do
takeTMVar l
pure $ Just l
decryptSMPMessage :: AgentMonad m => Version -> RcvQueue -> SMP.RcvMessage -> m SMP.ClientRcvMsgBody
decryptSMPMessage v rq SMP.RcvMessage {msgId, msgTs, msgFlags, msgBody = SMP.EncRcvMsgBody body}
| v == 1 || v == 2 = SMP.ClientRcvMsgBody msgTs msgFlags <$> decrypt body
| otherwise = liftEither . parse SMP.clientRcvMsgBodyP (AGENT A_MESSAGE) =<< decrypt body
where
decrypt = agentCbDecrypt (rcvDhSecret rq) (C.cbNonce msgId)
secureQueue :: AgentMonad m => AgentClient -> RcvQueue -> SndPublicVerifyKey -> m ()
secureQueue c RcvQueue {server, rcvId, rcvPrivateKey} senderKey =
withLogClient c server rcvId "KEY <key>" $ \smp ->
@ -582,7 +593,7 @@ sendAck c rq@RcvQueue {server, rcvId, rcvPrivateKey} msgId = do
releaseGetLock :: AgentClient -> RcvQueue -> STM ()
releaseGetLock c RcvQueue {server, rcvId} =
TM.lookup (server, rcvId) (getMsgLocks c) >>= mapM_ (void . (`tryPutTMVar` ()))
TM.lookup (server, rcvId) (getMsgLocks c) >>= mapM_ (`tryPutTMVar` ())
suspendQueue :: AgentMonad m => AgentClient -> RcvQueue -> m ()
suspendQueue c RcvQueue {server, rcvId, rcvPrivateKey} =
@ -734,7 +745,7 @@ agentOperationBracket c op action =
E.bracket
(atomically $ beginAgentOperation c op)
(\_ -> atomically $ endAgentOperation c op)
(\_ -> action)
(const action)
withStore' :: AgentMonad m => AgentClient -> (DB.Connection -> IO a) -> m a
withStore' c action = withStore c $ fmap Right . action

View File

@ -903,7 +903,7 @@ commandP =
msgErrResp = ACmd SAgent .: MERR <$> A.decimal <* A.space <*> strP
message = ACmd SAgent .:. MSG <$> msgMetaP <* A.space <*> smpP <* A.space <*> A.takeByteString
ackCmd = ACmd SClient . ACK <$> A.decimal
connections = strP `A.sepBy'` (A.char ',')
connections = strP `A.sepBy'` A.char ','
msgMetaP = do
integrity <- strP
recipient <- " R=" *> partyMeta A.decimal

View File

@ -24,7 +24,7 @@
-- See https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md
module Simplex.Messaging.Client
( -- * Connect (disconnect) client to (from) SMP server
ProtocolClient (sessionId),
ProtocolClient (thVersion, sessionId),
SMPClient,
getProtocolClient,
closeProtocolClient,
@ -56,7 +56,7 @@ import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Exception
import Control.Monad
import Control.Monad.Trans.Class
import Control.Monad.IO.Class (liftIO)
import Control.Monad.Trans.Except
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
@ -95,7 +95,7 @@ data ProtocolClient msg = ProtocolClient
type SMPClient = ProtocolClient SMP.BrokerMsg
-- | Type synonym for transmission from some SPM server queue.
type ServerTransmission msg = (ProtocolServer, SessionId, QueueId, msg)
type ServerTransmission msg = (ProtocolServer, Version, SessionId, QueueId, msg)
-- | protocol client configuration.
data ProtocolClientConfig = ProtocolClientConfig
@ -208,7 +208,7 @@ getProtocolClient protocolServer cfg@ProtocolClientConfig {qSize, tcpTimeout, tc
runExceptT $ sendProtocolCommand c Nothing "" protocolPing
process :: ProtocolClient msg -> IO ()
process ProtocolClient {sessionId, rcvQ, sentCommands} = forever $ do
process c@ProtocolClient {rcvQ, sentCommands} = forever $ do
(_, _, (corrId, qId, respOrErr)) <- atomically $ readTBQueue rcvQ
if B.null $ bs corrId
then sendMsg qId respOrErr
@ -228,7 +228,7 @@ getProtocolClient protocolServer cfg@ProtocolClientConfig {qSize, tcpTimeout, tc
where
sendMsg :: QueueId -> Either ErrorType msg -> IO ()
sendMsg qId = \case
Right cmd -> atomically $ mapM_ (`writeTBQueue` (protocolServer, sessionId, qId, cmd)) msgQ
Right msg -> atomically $ mapM_ (`writeTBQueue` serverTransmission c qId msg) msgQ
-- TODO send everything else to errQ and log in agent
_ -> return ()
@ -282,23 +282,30 @@ createSMPQueue c rpKey rKey dhKey =
--
-- https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#subscribe-to-queue
subscribeSMPQueue :: SMPClient -> RcvPrivateSignKey -> RecipientId -> ExceptT ProtocolClientError IO ()
subscribeSMPQueue c@ProtocolClient {protocolServer, sessionId, msgQ} rpKey rId =
subscribeSMPQueue c rpKey rId =
sendSMPCommand c (Just rpKey) rId SUB >>= \case
OK -> return ()
cmd@MSG {} ->
lift . atomically $ mapM_ (`writeTBQueue` (protocolServer, sessionId, rId, cmd)) msgQ
cmd@MSG {} -> writeSMPMessage c rId cmd
r -> throwE . PCEUnexpectedResponse $ bshow r
writeSMPMessage :: SMPClient -> RecipientId -> BrokerMsg -> ExceptT ProtocolClientError IO ()
writeSMPMessage c rId msg =
liftIO . atomically $ mapM_ (`writeTBQueue` serverTransmission c rId msg) (msgQ c)
serverTransmission :: ProtocolClient msg -> RecipientId -> msg -> ServerTransmission msg
serverTransmission ProtocolClient {protocolServer, thVersion, sessionId} entityId message =
(protocolServer, thVersion, sessionId, entityId, message)
-- | Get message from SMP queue. The server returns ERR PROHIBITED if a client uses SUB and GET via the same transport connection for the same queue
--
-- https://github.covm/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#receive-a-message-from-the-queue
getSMPMessage :: SMPClient -> RcvPrivateSignKey -> RecipientId -> ExceptT ProtocolClientError IO (Maybe SMP.SMPMsgMeta)
getSMPMessage c@ProtocolClient {protocolServer, sessionId, msgQ} rpKey rId =
getSMPMessage :: SMPClient -> RcvPrivateSignKey -> RecipientId -> ExceptT ProtocolClientError IO (Maybe RcvMessage)
getSMPMessage c rpKey rId =
sendSMPCommand c (Just rpKey) rId GET >>= \case
OK -> pure Nothing
cmd@(MSG msgId msgTs msgFlags _) -> do
lift . atomically $ mapM_ (`writeTBQueue` (protocolServer, sessionId, rId, cmd)) msgQ
pure $ Just SMP.SMPMsgMeta {msgId, msgTs, msgFlags}
cmd@(MSG msg) -> do
writeSMPMessage c rId cmd
pure $ Just msg
r -> throwE . PCEUnexpectedResponse $ bshow r
-- | Subscribe to the SMP queue notifications.
@ -341,11 +348,10 @@ sendSMPMessage c spKey sId flags msg =
--
-- https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#acknowledge-message-delivery
ackSMPMessage :: SMPClient -> RcvPrivateSignKey -> QueueId -> MsgId -> ExceptT ProtocolClientError IO ()
ackSMPMessage c@ProtocolClient {protocolServer, sessionId, msgQ} rpKey rId msgId =
ackSMPMessage c rpKey rId msgId =
sendSMPCommand c (Just rpKey) rId (ACK msgId) >>= \case
OK -> return ()
cmd@MSG {} ->
lift . atomically $ mapM_ (`writeTBQueue` (protocolServer, sessionId, rId, cmd)) msgQ
cmd@MSG {} -> writeSMPMessage c rId cmd
r -> throwE . PCEUnexpectedResponse $ bshow r
-- | Irreversibly suspend SMP queue.

View File

@ -8,6 +8,7 @@
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE PatternSynonyms #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneDeriving #-}
@ -101,6 +102,7 @@ module Simplex.Messaging.Crypto
-- * NaCl crypto_box
CbNonce (unCbNonce),
cbEncrypt,
cbEncryptMaxLenBS,
cbDecrypt,
cbNonce,
randomCbNonce,
@ -118,6 +120,13 @@ module Simplex.Messaging.Crypto
-- * Cryptography error type
CryptoError (..),
-- * Limited size ByteStrings
MaxLenBS,
pattern MaxLenBS,
maxLenBS,
unsafeMaxLenBS,
appendMaxLenBS,
)
where
@ -153,11 +162,11 @@ import Data.Constraint (Dict (..))
import Data.Kind (Constraint, Type)
import Data.String
import Data.Type.Equality
import Data.Typeable (Typeable)
import Data.Typeable (Proxy (Proxy), Typeable)
import Data.X509
import Database.SQLite.Simple.FromField (FromField (..))
import Database.SQLite.Simple.ToField (ToField (..))
import GHC.TypeLits (ErrorMessage (..), TypeError)
import GHC.TypeLits (ErrorMessage (..), KnownNat, Nat, TypeError, natVal, type (+))
import Network.Transport.Internal (decodeWord16, encodeWord16)
import Simplex.Messaging.Encoding
import Simplex.Messaging.Encoding.String
@ -809,6 +818,41 @@ unPad padded
(lenWrd, rest) = B.splitAt 2 padded
len = fromIntegral $ decodeWord16 lenWrd
newtype MaxLenBS (i :: Nat) = MLBS {unMaxLenBS :: ByteString}
pattern MaxLenBS :: ByteString -> MaxLenBS i
pattern MaxLenBS s <- MLBS s
{-# COMPLETE MaxLenBS #-}
instance KnownNat i => Encoding (MaxLenBS i) where
smpEncode (MLBS s) = smpEncode s
smpP = first show . maxLenBS <$?> smpP
instance KnownNat i => StrEncoding (MaxLenBS i) where
strEncode (MLBS s) = strEncode s
strP = first show . maxLenBS <$?> strP
maxLenBS :: forall i. KnownNat i => ByteString -> Either CryptoError (MaxLenBS i)
maxLenBS s
| B.length s > maxLength @i = Left CryptoLargeMsgError
| otherwise = Right $ MLBS s
unsafeMaxLenBS :: forall i. KnownNat i => ByteString -> MaxLenBS i
unsafeMaxLenBS = MLBS
padMaxLenBS :: forall i. KnownNat i => MaxLenBS i -> MaxLenBS (i + 2)
padMaxLenBS (MLBS msg) = MLBS $ encodeWord16 (fromIntegral len) <> msg <> B.replicate padLen '#'
where
len = B.length msg
padLen = maxLength @i - len
appendMaxLenBS :: (KnownNat i, KnownNat j) => MaxLenBS i -> MaxLenBS j -> MaxLenBS (i + j)
appendMaxLenBS (MLBS s1) (MLBS s2) = MLBS $ s1 <> s2
maxLength :: forall i. KnownNat i => Int
maxLength = fromIntegral (natVal $ Proxy @i)
initAEAD :: forall c. AES.BlockCipher c => Key -> IV -> ExceptT CryptoError IO (AES.AEAD c)
initAEAD (Key aesKey) (IV ivBytes) = do
iv <- makeIV @c ivBytes
@ -864,12 +908,17 @@ dh' (PublicKeyX448 k) (PrivateKeyX448 pk _) = DhSecretX448 $ X448.dh k pk
-- | NaCl @crypto_box@ encrypt with a shared DH secret and 192-bit nonce.
cbEncrypt :: DhSecret X25519 -> CbNonce -> ByteString -> Int -> Either CryptoError ByteString
cbEncrypt secret (CbNonce nonce) msg paddedLen = cryptoBox <$> pad msg paddedLen
cbEncrypt secret (CbNonce nonce) msg paddedLen = cryptoBox secret nonce <$> pad msg paddedLen
-- | NaCl @crypto_box@ encrypt with a shared DH secret and 192-bit nonce.
cbEncryptMaxLenBS :: KnownNat i => DhSecret X25519 -> CbNonce -> MaxLenBS i -> ByteString
cbEncryptMaxLenBS secret (CbNonce nonce) = cryptoBox secret nonce . unMaxLenBS . padMaxLenBS
cryptoBox :: DhSecret 'X25519 -> ByteString -> ByteString -> ByteString
cryptoBox secret nonce s = BA.convert tag <> c
where
cryptoBox s = BA.convert tag <> c
where
(rs, c) = xSalsa20 secret nonce s
tag = Poly1305.auth rs c
(rs, c) = xSalsa20 secret nonce s
tag = Poly1305.auth rs c
-- | NaCl @crypto_box@ decrypt with a shared DH secret and 192-bit nonce.
cbDecrypt :: DhSecret X25519 -> CbNonce -> ByteString -> Either CryptoError ByteString

View File

@ -197,3 +197,9 @@ instance (Encoding a, Encoding b, Encoding c, Encoding d, Encoding e, Encoding f
{-# INLINE smpEncode #-}
smpP = (,,,,,,) <$> smpP <*> smpP <*> smpP <*> smpP <*> smpP <*> smpP <*> smpP
{-# INLINE smpP #-}
instance (Encoding a, Encoding b, Encoding c, Encoding d, Encoding e, Encoding f, Encoding g, Encoding h) => Encoding (a, b, c, d, e, f, g, h) where
smpEncode (a, b, c, d, e, f, g, h) = smpEncode a <> smpEncode b <> smpEncode c <> smpEncode d <> smpEncode e <> smpEncode f <> smpEncode g <> smpEncode h
{-# INLINE smpEncode #-}
smpP = (,,,,,,,) <$> smpP <*> smpP <*> smpP <*> smpP <*> smpP <*> smpP <*> smpP <*> smpP
{-# INLINE smpP #-}

View File

@ -128,7 +128,7 @@ ntfSubscriber NtfSubscriber {smpSubscribers, newSubQ, smpAgent = ca@SMPClientAge
receiveSMP :: m ()
receiveSMP = forever $ do
(srv, _sessId, ntfId, msg) <- atomically $ readTBQueue msgQ
(srv, _, _, ntfId, msg) <- atomically $ readTBQueue msgQ
let smpQueue = SMPQueueNtf srv ntfId
case msg of
SMP.NMSG nmsgNonce encNMsgMeta -> do

View File

@ -2,7 +2,6 @@
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
module Simplex.Messaging.Notifications.Server.Env where

View File

@ -16,9 +16,13 @@
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilyDependencies #-}
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE UndecidableInstances #-}
{-# OPTIONS_GHC -Wno-unrecognised-pragmas #-}
{-# OPTIONS_GHC -fno-warn-unticked-promoted-constructors #-}
{-# HLINT ignore "Use newtype instead of data" #-}
-- |
-- Module : Simplex.Messaging.ProtocolEncoding
-- Copyright : (c) simplex.chat
@ -78,12 +82,20 @@ module Simplex.Messaging.Protocol
NtfPublicVerifyKey,
RcvNtfPublicDhKey,
RcvNtfDhSecret,
Message (..),
RcvMessage (..),
MsgId,
MsgBody,
MaxMessageLen,
MaxRcvMessageLen,
EncRcvMsgBody (..),
RcvMsgBody (..),
ClientRcvMsgBody (..),
EncNMsgMeta,
SMPMsgMeta (..),
NMsgMeta (..),
MsgFlags (..),
rcvMessageMeta,
noMsgFlags,
-- * Parse and serialize
@ -92,6 +104,8 @@ module Simplex.Messaging.Protocol
encodeTransmission,
transmissionP,
_smpP,
encodeRcvMsgBody,
clientRcvMsgBodyP,
-- * TCP transport functions
tPut,
@ -114,9 +128,10 @@ import qualified Data.ByteString.Char8 as B
import Data.Kind
import Data.Maybe (isNothing)
import Data.String
import Data.Time.Clock.System (SystemTime)
import Data.Time.Clock.System (SystemTime (..))
import Data.Type.Equality
import GHC.Generics (Generic)
import GHC.TypeLits (type (+))
import Generic.Random (genericArbitraryU)
import Network.Socket (HostName, ServiceName)
import qualified Simplex.Messaging.Crypto as C
@ -137,6 +152,11 @@ smpClientVRange = mkVersionRange 1 smpClientVersion
maxMessageLength :: Int
maxMessageLength = 16088
type MaxMessageLen = 16088
-- 16 extra bytes: 8 for timestamp and 8 for flags (7 flags and the space, only 1 flag is currently used)
type MaxRcvMessageLen = MaxMessageLen + 16 -- 16104, the padded size is 16106
-- it is shorter to allow per-queue e2e encryption DH key in the "public" header
e2eEncConfirmationLength :: Int
e2eEncConfirmationLength = 15936
@ -243,9 +263,10 @@ deriving instance Eq (Command p)
data BrokerMsg where
-- SMP broker messages (responses, client messages, notifications)
IDS :: QueueIdsKeys -> BrokerMsg
-- MSG v1 has to be supported for encoding/decoding
-- MSG :: MsgId -> SystemTime -> MsgBody -> BrokerMsg
MSG :: MsgId -> SystemTime -> MsgFlags -> MsgBody -> BrokerMsg
-- MSG v1/2 has to be supported for encoding/decoding
-- v1: MSG :: MsgId -> SystemTime -> MsgBody -> BrokerMsg
-- v2: MsgId -> SystemTime -> MsgFlags -> MsgBody -> BrokerMsg
MSG :: RcvMessage -> BrokerMsg
NID :: NotifierId -> RcvNtfPublicDhKey -> BrokerMsg
NMSG :: C.CbNonce -> EncNMsgMeta -> BrokerMsg
END :: BrokerMsg
@ -254,6 +275,79 @@ data BrokerMsg where
PONG :: BrokerMsg
deriving (Eq, Show)
data RcvMessage = RcvMessage
{ msgId :: MsgId,
msgTs :: SystemTime,
msgFlags :: MsgFlags,
msgBody :: EncRcvMsgBody -- e2e encrypted, with extra encryption for recipient
}
deriving (Eq, Show)
-- | received message without server/recipient encryption
data Message = Message
{ msgId :: MsgId,
msgTs :: SystemTime,
msgFlags :: MsgFlags,
msgBody :: C.MaxLenBS MaxMessageLen
}
instance StrEncoding RcvMessage where
strEncode RcvMessage {msgId, msgTs, msgFlags, msgBody = EncRcvMsgBody body} =
B.unwords
[ strEncode msgId,
strEncode msgTs,
"flags=" <> strEncode msgFlags,
strEncode body
]
strP = do
msgId <- strP_
msgTs <- strP_
msgFlags <- ("flags=" *> strP_) <|> pure noMsgFlags
msgBody <- EncRcvMsgBody <$> strP
pure RcvMessage {msgId, msgTs, msgFlags, msgBody}
newtype EncRcvMsgBody = EncRcvMsgBody ByteString
deriving (Eq, Show)
data RcvMsgBody = RcvMsgBody
{ msgTs :: SystemTime,
msgFlags :: MsgFlags,
msgBody :: C.MaxLenBS MaxMessageLen
}
encodeRcvMsgBody :: RcvMsgBody -> C.MaxLenBS MaxRcvMessageLen
encodeRcvMsgBody RcvMsgBody {msgTs, msgFlags, msgBody} =
let rcvMeta :: C.MaxLenBS 16 = C.unsafeMaxLenBS $ smpEncode (msgTs, msgFlags, ' ')
in C.appendMaxLenBS rcvMeta msgBody
data ClientRcvMsgBody = ClientRcvMsgBody
{ msgTs :: SystemTime,
msgFlags :: MsgFlags,
msgBody :: ByteString
}
clientRcvMsgBodyP :: Parser ClientRcvMsgBody
clientRcvMsgBodyP = do
msgTs <- smpP
msgFlags <- smpP
Tail msgBody <- _smpP
pure ClientRcvMsgBody {msgTs, msgFlags, msgBody}
instance StrEncoding Message where
strEncode Message {msgId, msgTs, msgFlags, msgBody} =
B.unwords
[ strEncode msgId,
strEncode msgTs,
"flags=" <> strEncode msgFlags,
strEncode msgBody
]
strP = do
msgId <- strP_
msgTs <- strP_
msgFlags <- ("flags=" *> strP_) <|> pure noMsgFlags
msgBody <- strP
pure Message {msgId, msgTs, msgFlags, msgBody}
type EncNMsgMeta = ByteString
data SMPMsgMeta = SMPMsgMeta
@ -263,6 +357,9 @@ data SMPMsgMeta = SMPMsgMeta
}
deriving (Show)
rcvMessageMeta :: MsgId -> ClientRcvMsgBody -> SMPMsgMeta
rcvMessageMeta msgId ClientRcvMsgBody {msgTs, msgFlags} = SMPMsgMeta {msgId, msgTs, msgFlags}
data NMsgMeta = NMsgMeta
{ msgId :: MsgId,
msgTs :: SystemTime
@ -277,11 +374,13 @@ instance Encoding NMsgMeta where
(msgId, msgTs, Tail _) <- smpP
pure NMsgMeta {msgId, msgTs}
-- it must be data for correct JSON encoding
data MsgFlags = MsgFlags {notification :: Bool}
deriving (Eq, Show, Generic)
instance ToJSON MsgFlags where toEncoding = J.genericToEncoding J.defaultOptions
-- this encoding should not become bigger than 7 bytes (currently it is 1 byte)
instance Encoding MsgFlags where
smpEncode MsgFlags {notification} = smpEncode notification
smpP = do
@ -572,7 +671,7 @@ data ErrorType
QUOTA
| -- | ACK command is sent without message to be acknowledged
NO_MSG
| -- | sent message is too large (> maxMessageLength = 16078 bytes)
| -- | sent message is too large (> maxMessageLength = 16088 bytes)
LARGE_MSG
| -- | internal server error
INTERNAL
@ -724,9 +823,10 @@ instance ProtocolEncoding BrokerMsg where
type Tag BrokerMsg = BrokerMsgTag
encodeProtocol v = \case
IDS (QIK rcvId sndId srvDh) -> e (IDS_, ' ', rcvId, sndId, srvDh)
MSG msgId ts flags msgBody
| v == 1 -> e (MSG_, ' ', msgId, ts, Tail msgBody)
| otherwise -> e (MSG_, ' ', msgId, ts, flags, ' ', Tail msgBody)
MSG RcvMessage {msgId, msgTs, msgFlags, msgBody = EncRcvMsgBody body}
| v == 1 -> e (MSG_, ' ', msgId, msgTs, Tail body)
| v == 2 -> e (MSG_, ' ', msgId, msgTs, msgFlags, ' ', Tail body)
| otherwise -> e (MSG_, ' ', msgId, Tail body)
NID nId srvNtfDh -> e (NID_, ' ', nId, srvNtfDh)
NMSG nmsgNonce encNMsgMeta -> e (NMSG_, ' ', nmsgNonce, encNMsgMeta)
END -> e END_
@ -738,9 +838,14 @@ instance ProtocolEncoding BrokerMsg where
e = smpEncode
protocolP v = \case
MSG_
| v == 1 -> MSG <$> _smpP <*> smpP <*> pure noMsgFlags <*> (unTail <$> smpP)
| otherwise -> MSG <$> _smpP <*> smpP <*> smpP <*> (unTail <$> _smpP)
MSG_ -> do
msgId <- _smpP
MSG <$> case v of
1 -> RcvMessage msgId <$> smpP <*> pure noMsgFlags <*> bodyP
2 -> RcvMessage msgId <$> smpP <*> smpP <*> (A.space *> bodyP)
_ -> RcvMessage msgId (MkSystemTime 0 0) noMsgFlags <$> bodyP
where
bodyP = EncRcvMsgBody . unTail <$> smpP
IDS_ -> IDS <$> (QIK <$> _smpP <*> smpP <*> smpP)
NID_ -> NID <$> _smpP <*> smpP
NMSG_ -> NMSG <$> _smpP <*> smpP

View File

@ -40,6 +40,7 @@ import Control.Monad.Except
import Control.Monad.IO.Unlift
import Control.Monad.Reader
import Crypto.Random
import Data.Bifunctor (first)
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import Data.Either (fromRight)
@ -57,6 +58,7 @@ import Data.Time.Clock (UTCTime (..), diffTimeToPicoseconds, getCurrentTime)
import Data.Time.Clock.System (SystemTime (..), getSystemTime)
import Data.Time.Format.ISO8601 (iso8601Show)
import Data.Type.Equality
import GHC.TypeLits (KnownNat)
import Network.Socket (ServiceName)
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Encoding (Encoding (smpEncode))
@ -75,6 +77,7 @@ import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Transport
import Simplex.Messaging.Transport.Server
import Simplex.Messaging.Util
import System.Exit (exitFailure)
import System.Mem.Weak (deRefWeak)
import UnliftIO.Concurrent
import UnliftIO.Directory (doesFileExist, renameFile)
@ -169,7 +172,7 @@ smpServer started = do
logServerStats :: Int -> Int -> m ()
logServerStats startAt logInterval = do
initialDelay <- (startAt -) . fromIntegral . (`div` 1000000_000000) . diffTimeToPicoseconds . utctDayTime <$> liftIO getCurrentTime
logInfo $ "fromTime,qCreated,qSecured,qDeleted,msgSent,msgRecv,dayMsgQueues,weekMsgQueues,monthMsgQueues"
logInfo "fromTime,qCreated,qSecured,qDeleted,msgSent,msgRecv,dayMsgQueues,weekMsgQueues,monthMsgQueues"
threadDelay $ 1000000 * (initialDelay + if initialDelay < 0 then 86400 else 0)
ServerStats {fromTime, qCreated, qSecured, qDeleted, msgSent, msgRecv, dayMsgQueues, weekMsgQueues, monthMsgQueues} <- asks serverStats
let interval = 1000000 * logInterval
@ -202,10 +205,10 @@ smpServer started = do
Left _ -> pure ()
runClientTransport :: (Transport c, MonadUnliftIO m, MonadReader Env m) => THandle c -> m ()
runClientTransport th@THandle {sessionId} = do
runClientTransport th@THandle {thVersion, sessionId} = do
q <- asks $ tbqSize . config
ts <- liftIO getSystemTime
c <- atomically $ newClient q sessionId ts
c <- atomically $ newClient q thVersion sessionId ts
s <- asks server
expCfg <- asks $ inactiveClientExpiration . config
raceAny_ ([send th c, client c s, receive th c] <> disconnectThread_ c expCfg)
@ -311,7 +314,7 @@ dummyKeyEd448 :: C.PublicKey 'C.Ed448
dummyKeyEd448 = "MEMwBQYDK2VxAzoA6ibQc9XpkSLtwrf7PLvp81qW/etiumckVFImCMRdftcG/XopbOSaq9qyLhrgJWKOLyNrQPNVvpMA"
client :: forall m. (MonadUnliftIO m, MonadReader Env m) => Client -> Server -> m ()
client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ} Server {subscribedQ, ntfSubscribedQ, notifiers} =
client clnt@Client {thVersion, subscriptions, ntfSubscriptions, rcvQ, sndQ} Server {subscribedQ, ntfSubscribedQ, notifiers} =
forever $
atomically (readTBQueue rcvQ)
>>= processCommand
@ -333,9 +336,9 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ} Server {subscri
(asks $ allowNewQueues . config)
(createQueue st rKey dhKey)
(pure (corrId, queueId, ERR AUTH))
SUB -> subscribeQueue queueId
GET -> getMessage
ACK msgId -> acknowledgeMsg msgId
SUB -> subscribeQueue st queueId
GET -> getMessage st
ACK msgId -> acknowledgeMsg st msgId
KEY sKey -> secureQueue_ st sKey
NKEY nKey dhKey -> addQueueNotifier_ st nKey dhKey
NDEL -> deleteQueueNotifier_ st
@ -372,7 +375,7 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ} Server {subscri
withLog (`logCreateById` rId)
stats <- asks serverStats
atomically $ modifyTVar (qCreated stats) (+ 1)
subscribeQueue rId $> IDS (qik ids)
subscribeQueue st rId $> IDS (qik ids)
logCreateById :: StoreLog 'WriteMode -> RecipientId -> IO ()
logCreateById s rId =
@ -420,8 +423,8 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ} Server {subscri
withLog (`logDeleteQueue` queueId)
okResp <$> atomically (suspendQueue st queueId)
subscribeQueue :: RecipientId -> m (Transmission BrokerMsg)
subscribeQueue rId =
subscribeQueue :: QueueStore -> RecipientId -> m (Transmission BrokerMsg)
subscribeQueue st rId =
atomically (TM.lookup rId subscriptions) >>= \case
Nothing ->
atomically newSub >>= deliver
@ -443,10 +446,10 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ} Server {subscri
deliver sub = do
q <- getStoreMsgQueue rId
msg_ <- atomically $ tryPeekMsg q
deliverMessage rId sub q msg_
deliverMessage st rId sub q msg_
getMessage :: m (Transmission BrokerMsg)
getMessage =
getMessage :: QueueStore -> m (Transmission BrokerMsg)
getMessage st =
atomically (TM.lookup queueId subscriptions) >>= \case
Nothing ->
atomically newSub >>= getMessage_
@ -465,13 +468,21 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ} Server {subscri
TM.insert queueId sub subscriptions
pure s
getMessage_ :: Sub -> m (Transmission BrokerMsg)
getMessage_ s = do
getMessage_ s = withRcvQueue st queueId $ \qr -> do
q <- getStoreMsgQueue queueId
atomically $
tryPeekMsg q >>= \case
Just msg -> setDelivered s msg $> (corrId, queueId, msgCmd msg)
Just msg ->
let encMsg = encryptMsg qr msg
in setDelivered s msg $> (corrId, queueId, MSG encMsg)
_ -> pure (corrId, queueId, OK)
withRcvQueue :: QueueStore -> RecipientId -> (QueueRec -> m (Transmission BrokerMsg)) -> m (Transmission BrokerMsg)
withRcvQueue st rId action =
atomically (getQueue st SRecipient rId) >>= \case
Left e -> pure (corrId, rId, ERR e)
Right qr -> action qr
subscribeNotifications :: m (Transmission BrokerMsg)
subscribeNotifications = atomically $ do
unlessM (TM.member queueId ntfSubscriptions) $ do
@ -479,8 +490,8 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ} Server {subscri
TM.insert queueId () ntfSubscriptions
pure ok
acknowledgeMsg :: MsgId -> m (Transmission BrokerMsg)
acknowledgeMsg msgId = do
acknowledgeMsg :: QueueStore -> MsgId -> m (Transmission BrokerMsg)
acknowledgeMsg st msgId = do
atomically (TM.lookup queueId subscriptions) >>= \case
Nothing -> pure $ err NO_MSG
Just sub ->
@ -495,7 +506,7 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ} Server {subscri
_ -> do
(msgDeleted, msg_) <- atomically $ tryDelPeekMsg q msgId
when msgDeleted updateStats
deliverMessage queueId sub q msg_
deliverMessage st queueId sub q msg_
_ -> pure $ err NO_MSG
where
getDelivered :: TVar Sub -> STM (Maybe Sub)
@ -520,7 +531,7 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ} Server {subscri
updatePeriod pSel = modifyTVar (pSel stats) (S.insert qId)
sendMessage :: QueueStore -> MsgFlags -> MsgBody -> m (Transmission BrokerMsg)
sendMessage st flags msgBody
sendMessage st msgFlags msgBody
| B.length msgBody > maxMessageLength = pure $ err LARGE_MSG
| otherwise = do
qr <- atomically $ getQueue st SSender queueId
@ -530,7 +541,7 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ} Server {subscri
storeMessage qr = case status qr of
QueueOff -> return $ err AUTH
QueueActive ->
mkMessage >>= \case
mapM mkMessage (C.maxLenBS msgBody) >>= \case
Left _ -> pure $ err LARGE_MSG
Right msg -> do
ms <- asks msgStore
@ -541,7 +552,7 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ} Server {subscri
q <- getMsgQueue ms (recipientId qr) msgQueueQuota
mapM_ (deleteExpiredMsgs q) old
ifM (isFull q) (pure $ err QUOTA) $ do
when (notification flags) $ trySendNotification msg ntfNonceDrg
when (notification msgFlags) $ trySendNotification msg ntfNonceDrg
writeMsg q msg
pure ok
when (sent == OK) $ do
@ -550,12 +561,11 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ} Server {subscri
atomically $ updateActiveQueues stats $ recipientId qr
pure resp
where
mkMessage :: m (Either C.CryptoError Message)
mkMessage = do
mkMessage :: C.MaxLenBS MaxMessageLen -> m Message
mkMessage body = do
msgId <- randomId =<< asks (msgIdBytes . config)
ts <- liftIO getSystemTime
let c = C.cbEncrypt (rcvDhSecret qr) (C.cbNonce msgId) msgBody (maxMessageLength + 2)
pure $ Message msgId ts flags <$> c
msgTs <- liftIO getSystemTime
pure $ Message msgId msgTs msgFlags body
trySendNotification :: Message -> TVar ChaChaDRG -> STM ()
trySendNotification msg ntfNonceDrg =
@ -569,36 +579,48 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ} Server {subscri
writeTBQueue q (CorrId "", nId, NMSG nmsgNonce encNMsgMeta)
mkMessageNotification :: Message -> RcvNtfDhSecret -> TVar ChaChaDRG -> STM (C.CbNonce, EncNMsgMeta)
mkMessageNotification Message {msgId, ts} rcvNtfDhSecret ntfNonceDrg = do
mkMessageNotification Message {msgId, msgTs} rcvNtfDhSecret ntfNonceDrg = do
cbNonce <- C.pseudoRandomCbNonce ntfNonceDrg
let msgMeta = NMsgMeta {msgId, msgTs = ts}
let msgMeta = NMsgMeta {msgId, msgTs}
encNMsgMeta = C.cbEncrypt rcvNtfDhSecret cbNonce (smpEncode msgMeta) 128
pure . (cbNonce,) $ fromRight "" encNMsgMeta
deliverMessage :: RecipientId -> TVar Sub -> MsgQueue -> Maybe Message -> m (Transmission BrokerMsg)
deliverMessage rId sub q msg_ =
deliverMessage :: QueueStore -> RecipientId -> TVar Sub -> MsgQueue -> Maybe Message -> m (Transmission BrokerMsg)
deliverMessage st rId sub q msg_ = withRcvQueue st rId $ \qr -> do
readTVarIO sub >>= \case
s@Sub {subThread = NoSub} ->
case msg_ of
Just msg -> atomically (setDelivered s msg) $> (corrId, rId, msgCmd msg)
_ -> forkSub $> ok
Just msg ->
let encMsg = encryptMsg qr msg
in atomically (setDelivered s msg) $> (corrId, rId, MSG encMsg)
_ -> forkSub qr $> ok
_ -> pure ok
where
forkSub :: m ()
forkSub = do
forkSub :: QueueRec -> m ()
forkSub qr = do
atomically . modifyTVar sub $ \s -> s {subThread = SubPending}
t <- mkWeakThreadId =<< forkIO subscriber
atomically . modifyTVar sub $ \case
s@Sub {subThread = SubPending} -> s {subThread = SubThread t}
s -> s
where
subscriber = atomically $ do
msg <- peekMsg q
let encMsg = encryptMsg qr msg
writeTBQueue sndQ (CorrId "", rId, MSG encMsg)
s <- readTVar sub
void $ setDelivered s msg
writeTVar sub s {subThread = NoSub}
subscriber :: m ()
subscriber = atomically $ do
msg <- peekMsg q
writeTBQueue sndQ (CorrId "", rId, msgCmd msg)
s <- readTVar sub
void $ setDelivered s msg
writeTVar sub s {subThread = NoSub}
encryptMsg :: QueueRec -> Message -> RcvMessage
encryptMsg qr Message {msgId, msgTs, msgFlags, msgBody}
| thVersion == 1 || thVersion == 2 = encrypt msgBody
| otherwise = encrypt $ encodeRcvMsgBody RcvMsgBody {msgTs, msgFlags, msgBody}
where
encrypt :: KnownNat i => C.MaxLenBS i -> RcvMessage
encrypt body =
let encBody = EncRcvMsgBody $ C.cbEncryptMaxLenBS (rcvDhSecret qr) (C.cbNonce msgId) body
in RcvMessage msgId msgTs msgFlags encBody
setDelivered :: Sub -> Message -> STM Bool
setDelivered s Message {msgId} = tryPutTMVar (delivered s) msgId
@ -609,9 +631,6 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ} Server {subscri
quota <- asks $ msgQueueQuota . config
atomically $ getMsgQueue ms rId quota
msgCmd :: Message -> BrokerMsg
msgCmd Message {msgId, ts, msgFlags, msgBody} = MSG msgId ts msgFlags msgBody
delQueueAndMsgs :: QueueStore -> m (Transmission BrokerMsg)
delQueueAndMsgs st = do
withLog (`logDeleteQueue` queueId)
@ -654,26 +673,44 @@ saveServerMessages = asks (storeMsgsFile . config) >>= mapM_ saveMessages
where
saveQueueMsgs ms h rId =
atomically (flushMsgQueue ms rId)
>>= mapM_ (B.hPutStrLn h . strEncode . MsgLogRecord rId)
>>= mapM_ (B.hPutStrLn h . strEncode . MLRv3 rId)
restoreServerMessages :: (MonadUnliftIO m, MonadReader Env m) => m ()
restoreServerMessages :: forall m. (MonadUnliftIO m, MonadReader Env m) => m ()
restoreServerMessages = asks (storeMsgsFile . config) >>= mapM_ restoreMessages
where
restoreMessages f = whenM (doesFileExist f) $ do
logInfo $ "restoring messages from file " <> T.pack f
st <- asks queueStore
ms <- asks msgStore
quota <- asks $ msgQueueQuota . config
liftIO $ mapM_ (restoreMsg ms quota) . B.lines =<< B.readFile f
renameFile f $ f <> ".bak"
logInfo $ "messages restored"
runExceptT (liftIO (B.readFile f) >>= mapM_ (restoreMsg st ms quota) . B.lines) >>= \case
Left e -> do
logError . T.pack $ "error restoring messages: " <> e
liftIO exitFailure
_ -> do
renameFile f $ f <> ".bak"
logInfo "messages restored"
where
restoreMsg ms quota s = case strDecode s of
Left e -> logError . decodeLatin1 $ "message parsing error (" <> B.pack e <> "): " <> B.take 100 s
Right (MsgLogRecord rId msg) -> do
full <- atomically $ do
q <- getMsgQueue ms rId quota
ifM (isFull q) (pure True) (writeMsg q msg $> False)
when full . logError . decodeLatin1 $ "message queue " <> strEncode rId <> " is full, message not restored: " <> strEncode (msgId (msg :: Message))
restoreMsg st ms quota s = do
r <- liftEither . first (msgErr "parsing") $ strDecode s
case r of
MLRv3 rId msg -> addToMsgQueue rId msg
MLRv1 rId encMsg -> do
qr <- liftEitherError (msgErr "queue unknown") . atomically $ getQueue st SRecipient rId
msg' <- updateMsgV1toV3 qr encMsg
addToMsgQueue rId msg'
where
addToMsgQueue rId msg = do
full <- atomically $ do
q <- getMsgQueue ms rId quota
ifM (isFull q) (pure True) (writeMsg q msg $> False)
when full . logError . decodeLatin1 $ "message queue " <> strEncode rId <> " is full, message not restored: " <> strEncode (msgId (msg :: Message))
updateMsgV1toV3 QueueRec {rcvDhSecret} RcvMessage {msgId, msgTs, msgFlags, msgBody = EncRcvMsgBody body} = do
let nonce = C.cbNonce msgId
msgBody <- liftEither . first (msgErr "v1 message decryption") $ C.maxLenBS =<< C.cbDecrypt rcvDhSecret nonce body
pure Message {msgId, msgTs, msgFlags, msgBody}
msgErr :: Show e => String -> e -> String
msgErr op e = op <> " error (" <> show e <> "): " <> B.unpack (B.take 100 s)
saveServerStats :: (MonadUnliftIO m, MonadReader Env m) => m ()
saveServerStats =
@ -683,7 +720,7 @@ saveServerStats =
saveStats f stats = do
logInfo $ "saving server stats to file " <> T.pack f
B.writeFile f $ strEncode stats
logInfo $ "server stats saved"
logInfo "server stats saved"
restoreServerStats :: (MonadUnliftIO m, MonadReader Env m) => m ()
restoreServerStats = asks (serverStatsFile . config) >>= mapM_ restoreStats

View File

@ -2,7 +2,6 @@
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
module Simplex.Messaging.Server.Env.STM where
@ -104,6 +103,7 @@ data Client = Client
ntfSubscriptions :: TMap NotifierId (),
rcvQ :: TBQueue (Transmission Cmd),
sndQ :: TBQueue (Transmission BrokerMsg),
thVersion :: Version,
sessionId :: ByteString,
connected :: TVar Bool,
activeAt :: TVar SystemTime
@ -124,15 +124,15 @@ newServer qSize = do
notifiers <- TM.empty
return Server {subscribedQ, subscribers, ntfSubscribedQ, notifiers}
newClient :: Natural -> ByteString -> SystemTime -> STM Client
newClient qSize sessionId ts = do
newClient :: Natural -> Version -> ByteString -> SystemTime -> STM Client
newClient qSize thVersion sessionId ts = do
subscriptions <- TM.empty
ntfSubscriptions <- TM.empty
rcvQ <- newTBQueue qSize
sndQ <- newTBQueue qSize
connected <- newTVar True
activeAt <- newTVar ts
return Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId, connected, activeAt}
return Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, thVersion, sessionId, connected, activeAt}
newSubscription :: SubscriptionThread -> STM Sub
newSubscription subThread = do

View File

@ -1,44 +1,22 @@
{-# LANGUAGE FunctionalDependencies #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
module Simplex.Messaging.Server.MsgStore where
import Control.Applicative ((<|>))
import qualified Data.ByteString.Char8 as B
import Data.Int (Int64)
import Data.Time.Clock.System (SystemTime)
import Numeric.Natural
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Protocol (MsgBody, MsgFlags, MsgId, RecipientId, noMsgFlags)
import Simplex.Messaging.Protocol (Message (..), MsgId, RcvMessage (..), RecipientId)
data Message = Message
{ msgId :: MsgId,
ts :: SystemTime,
msgFlags :: MsgFlags,
msgBody :: MsgBody
}
instance StrEncoding Message where
strEncode Message {msgId, ts, msgFlags, msgBody} =
B.unwords
[ strEncode msgId,
strEncode ts,
"flags=" <> strEncode msgFlags,
strEncode msgBody
]
strP = do
msgId <- strP_
ts <- strP_
msgFlags <- ("flags=" *> strP_) <|> pure noMsgFlags
msgBody <- strP
pure Message {msgId, ts, msgFlags, msgBody}
data MsgLogRecord = MsgLogRecord RecipientId Message
data MsgLogRecord = MLRv3 RecipientId Message | MLRv1 RecipientId RcvMessage
instance StrEncoding MsgLogRecord where
strEncode (MsgLogRecord rId msg) = strEncode (rId, msg)
strP = MsgLogRecord <$> strP_ <*> strP
strEncode = \case
MLRv3 rId msg -> strEncode (Str "v3", rId, msg)
MLRv1 rId msg -> strEncode (rId, msg)
strP = "v3 " *> (MLRv3 <$> strP_ <*> strP) <|> MLRv1 <$> strP_ <*> strP
class MonadMsgStore s q m | s -> q where
getMsgQueue :: s -> RecipientId -> Natural -> m q

View File

@ -16,7 +16,7 @@ import Data.Functor (($>))
import Data.Int (Int64)
import Data.Time.Clock.System (SystemTime (systemSeconds))
import Numeric.Natural
import Simplex.Messaging.Protocol (MsgId, RecipientId)
import Simplex.Messaging.Protocol (Message (..), MsgId, RecipientId)
import Simplex.Messaging.Server.MsgStore
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
@ -78,6 +78,6 @@ instance MonadMsgQueue MsgQueue STM where
deleteExpiredMsgs (MsgQueue q) old = loop
where
loop = tryPeekTBQueue q >>= mapM_ delOldMsg
delOldMsg Message {ts} =
when (systemSeconds ts < old) $
delOldMsg Message {msgTs} =
when (systemSeconds msgTs < old) $
tryReadTBQueue q >> loop

View File

@ -93,7 +93,7 @@ smpBlockSize :: Int
smpBlockSize = 16384
supportedSMPServerVRange :: VersionRange
supportedSMPServerVRange = mkVersionRange 1 2
supportedSMPServerVRange = mkVersionRange 1 3
simplexMQVersion :: String
simplexMQVersion = "3.0.0-beta.4"

View File

@ -1,4 +1,3 @@
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Simplex.Messaging.Transport.Client

View File

@ -325,11 +325,11 @@ testServerConnectionAfterError t _ = do
bob #: ("1", "alice", "SUB") #> ("1", "alice", ERR (BROKER NETWORK))
alice #: ("1", "bob", "SUB") #> ("1", "bob", ERR (BROKER NETWORK))
withServer $ do
alice <# ("", "bob", SENT 4)
alice <#= \case ("", "bob", SENT 4) -> True; ("", "", UP s ["bob"]) -> s == server; _ -> False
alice <#= \case ("", "bob", SENT 4) -> True; ("", "", UP s ["bob"]) -> s == server; _ -> False
bob <# ("", "", UP server ["alice"])
bob <#= \case ("", "alice", Msg "hello") -> True; _ -> False
bob #: ("2", "alice", "ACK 4") #> ("2", "alice", OK)
alice <# ("", "", UP server ["bob"])
alice #: ("1", "bob", "SEND F 11\nhello again") #> ("1", "bob", MID 5)
alice <# ("", "bob", SENT 5)
bob <#= \case ("", "alice", Msg "hello again") -> True; _ -> False

View File

@ -469,7 +469,7 @@ testSuspendingAgentCompleteSending t = do
Right () <- withSmpServerStoreLogOn t testPort $ \_ -> runExceptT $ do
get b =##> \case ("", c, SENT 5) -> c == aId; ("", "", UP {}) -> True; _ -> False
get b =##> \case ("", c, SENT 5) -> c == aId; ("", "", UP {}) -> True; _ -> False
get b =##> \case ("", c, SENT 6) -> c == aId; _ -> False
get b =##> \case ("", c, SENT 6) -> c == aId; ("", "", UP {}) -> True; _ -> False
("", "", SUSPENDED) <- get b
("", "", UP {}) <- get a

View File

@ -268,25 +268,31 @@ testNotificationSubscriptionNewConnection APNSMockServer {apnsQ} = do
(bobId, qInfo) <- createConnection alice SCMInvitation
liftIO $ threadDelay 500000
aliceId <- joinConnection bob qInfo "bob's connInfo"
liftIO $ print 0
void $ messageNotification apnsQ
("", _, CONF confId "bob's connInfo") <- get alice
liftIO $ threadDelay 500000
allowConnection alice bobId confId "alice's connInfo"
liftIO $ print 1
void $ messageNotification apnsQ
get bob ##> ("", aliceId, INFO "alice's connInfo")
liftIO $ print 2
void $ messageNotification apnsQ
get alice ##> ("", bobId, CON)
liftIO $ print 3
void $ messageNotification apnsQ
get bob ##> ("", aliceId, CON)
-- bob sends message
1 <- msgId <$> sendMessage bob aliceId (SMP.MsgFlags True) "hello"
get bob ##> ("", aliceId, SENT $ baseId + 1)
liftIO $ print 4
void $ messageNotification apnsQ
get alice =##> \case ("", c, Msg "hello") -> c == bobId; _ -> False
ackMessage alice bobId $ baseId + 1
-- alice sends message
2 <- msgId <$> sendMessage alice bobId (SMP.MsgFlags True) "hey there"
get alice ##> ("", bobId, SENT $ baseId + 2)
liftIO $ print 5
void $ messageNotification apnsQ
get bob =##> \case ("", c, Msg "hey there") -> c == aliceId; _ -> False
ackMessage bob aliceId $ baseId + 2
@ -448,7 +454,7 @@ testNotificationsStoreLog t APNSMockServer {apnsQ} = do
messageNotification :: TBQueue APNSMockRequest -> ExceptT AgentErrorType IO (C.CbNonce, ByteString)
messageNotification apnsQ = do
500000 `timeout` atomically (readTBQueue apnsQ) >>= \case
1000000 `timeout` atomically (readTBQueue apnsQ) >>= \case
Nothing -> error "no notification"
Just APNSMockRequest {notification = APNSNotification {aps = APNSMutableContent {}, notificationData = Just ntfData}, sendApnsResponse} -> do
nonce <- C.cbNonce <$> ntfData .-> "nonce"

View File

@ -13,6 +13,7 @@ import Control.Concurrent (threadDelay)
import Control.Monad.Except (runExceptT)
import qualified Data.Aeson as J
import qualified Data.Aeson.Types as JT
import Data.Bifunctor (first)
import qualified Data.ByteString.Base64.URL as U
import Data.ByteString.Char8 (ByteString)
import Data.Text.Encoding (encodeUtf8)
@ -35,7 +36,7 @@ import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Notifications.Protocol
import Simplex.Messaging.Notifications.Server.Push.APNS
import qualified Simplex.Messaging.Notifications.Server.Push.APNS as APNS
import Simplex.Messaging.Parsers (parse)
import Simplex.Messaging.Parsers (parse, parseAll)
import Simplex.Messaging.Protocol hiding (notification)
import Simplex.Messaging.Transport
import Test.Hspec
@ -131,11 +132,11 @@ testNotificationSubscription (ATransport t) =
notifierId `shouldBe` nId
send' APNSRespOk
-- receive message
Resp "" _ (MSG mId1 mTs _ msg1) <- tGet rh
Resp "" _ (MSG RcvMessage {msgId = mId1, msgBody = EncRcvMsgBody body}) <- tGet rh
Right ClientRcvMsgBody {msgTs = mTs, msgBody} <- pure $ parseAll clientRcvMsgBodyP =<< first show (C.cbDecrypt rcvDhSecret (C.cbNonce mId1) body)
mId1 `shouldBe` msgId
mTs `shouldBe` msgTs
let decryptedMsg = C.cbDecrypt rcvDhSecret (C.cbNonce mId1) msg1
(decryptedMsg, Right "hello") #== "delivered from queue"
(msgBody, "hello") #== "delivered from queue"
Resp "6" _ OK <- signSendRecv rh rKey ("6", rId, ACK mId1)
pure ()
-- replace token

View File

@ -22,6 +22,7 @@ import Simplex.Messaging.Server.Env.STM
import Simplex.Messaging.Transport
import Simplex.Messaging.Transport.Client
import Simplex.Messaging.Transport.KeepAlive
import Simplex.Messaging.Version
import Test.Hspec
import UnliftIO.Concurrent
import qualified UnliftIO.Exception as E
@ -56,6 +57,9 @@ testSMPClient client =
Right th -> client th
Left e -> error $ show e
cfgV2 :: ServerConfig
cfgV2 = cfg {smpServerVRange = mkVersionRange 1 2}
cfg :: ServerConfig
cfg =
ServerConfig
@ -79,6 +83,9 @@ cfg =
smpServerVRange = supportedSMPServerVRange
}
withSmpServerStoreMsgLogOnV2 :: (MonadUnliftIO m, MonadRandom m) => ATransport -> ServiceName -> (ThreadId -> m a) -> m a
withSmpServerStoreMsgLogOnV2 t = withSmpServerConfigOn t cfgV2 {storeLogFile = Just testStoreLogFile, storeMsgsFile = Just testStoreMsgsFile}
withSmpServerStoreMsgLogOn :: (MonadUnliftIO m, MonadRandom m) => ATransport -> ServiceName -> (ThreadId -> m a) -> m a
withSmpServerStoreMsgLogOn t = withSmpServerConfigOn t cfg {storeLogFile = Just testStoreLogFile, storeMsgsFile = Just testStoreMsgsFile, serverStatsFile = Just testServerStatsFile}

View File

@ -1,4 +1,5 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
@ -13,6 +14,7 @@ import Control.Concurrent (ThreadId, killThread, threadDelay)
import Control.Concurrent.STM
import Control.Exception (SomeException, try)
import Control.Monad.Except (forM, forM_, runExceptT)
import Data.Bifunctor (first)
import Data.ByteString.Base64
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
@ -20,6 +22,7 @@ import SMPClient
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Encoding
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Parsers (parseAll)
import Simplex.Messaging.Protocol
import Simplex.Messaging.Server.Env.STM (ServerConfig (..))
import Simplex.Messaging.Server.Expiration
@ -34,7 +37,8 @@ serverTests :: ATransport -> Spec
serverTests t@(ATransport t') = do
describe "SMP syntax" $ syntaxTests t
describe "SMP queues" $ do
describe "NEW and KEY commands, SEND messages" $ testCreateSecure t
describe "NEW and KEY commands, SEND messages (v2)" $ testCreateSecureV2 t'
describe "NEW and KEY commands, SEND messages (v3)" $ testCreateSecure t
describe "NEW, OFF and DEL commands, SEND messages" $ testCreateDelete t
describe "Stress test" $ stressTest t
describe "allowNewQueues setting" $ testAllowNewQueues t'
@ -45,6 +49,7 @@ serverTests t@(ATransport t') = do
describe "GET & SUB commands" $ testGetSubCommands t'
describe "Store log" $ testWithStoreLog t
describe "Restore messages" $ testRestoreMessages t
describe "Restore messages (v2)" $ testRestoreMessagesV2 t
describe "Timing of AUTH error" $ testTiming t
describe "Message notifications" $ testMessageNotifications t
describe "Message expiration" $ do
@ -58,6 +63,9 @@ pattern Resp corrId queueId command <- (_, _, (corrId, queueId, Right command))
pattern Ids :: RecipientId -> SenderId -> RcvPublicDhKey -> BrokerMsg
pattern Ids rId sId srvDh <- IDS (QIK rId sId srvDh)
pattern Msg :: MsgId -> MsgBody -> BrokerMsg
pattern Msg msgId body <- MSG RcvMessage {msgId, msgBody = EncRcvMsgBody body}
sendRecv :: forall c p. (Transport c, PartyI p) => THandle c -> (Maybe C.ASignature, ByteString, ByteString, Command p) -> IO (SignedTransmission BrokerMsg)
sendRecv h@THandle {thVersion, sessionId} (sgn, corrId, qId, cmd) = do
let t = encodeTransmission thVersion sessionId (CorrId corrId, qId, cmd)
@ -80,21 +88,29 @@ _SEND = SEND noMsgFlags
_SEND' :: MsgBody -> Command 'Sender
_SEND' = SEND MsgFlags {notification = True}
testCreateSecure :: ATransport -> Spec
testCreateSecure (ATransport t) =
decryptMsgV2 :: C.DhSecret 'C.X25519 -> ByteString -> ByteString -> Either C.CryptoError ByteString
decryptMsgV2 dhShared = C.cbDecrypt dhShared . C.cbNonce
decryptMsgV3 :: C.DhSecret 'C.X25519 -> ByteString -> ByteString -> Either String MsgBody
decryptMsgV3 dhShared nonce body = do
ClientRcvMsgBody {msgBody} <- parseAll clientRcvMsgBodyP =<< first show (C.cbDecrypt dhShared (C.cbNonce nonce) body)
pure msgBody
testCreateSecureV2 :: forall c. Transport c => TProxy c -> Spec
testCreateSecureV2 _ =
it "should create (NEW) and secure (KEY) queue" $
smpTest t $ \h -> do
withSmpServerConfigOn (transport @c) cfgV2 testPort $ \_ -> testSMPClient @c $ \h -> do
(rPub, rKey) <- C.generateSignatureKeyPair C.SEd448
(dhPub, dhPriv :: C.PrivateKeyX25519) <- C.generateKeyPair'
Resp "abcd" rId1 (Ids rId sId srvDh) <- signSendRecv h rKey ("abcd", "", NEW rPub dhPub)
let dec nonce = C.cbDecrypt (C.dh' srvDh dhPriv) (C.cbNonce nonce)
let dec = decryptMsgV2 $ C.dh' srvDh dhPriv
(rId1, "") #== "creates queue"
Resp "bcda" sId1 ok1 <- sendRecv h ("", "bcda", sId, _SEND "hello")
(ok1, OK) #== "accepts unsigned SEND"
(sId1, sId) #== "same queue ID in response 1"
Resp "" _ (MSG mId1 _ _ msg1) <- tGet h
Resp "" _ (Msg mId1 msg1) <- tGet h
(dec mId1 msg1, Right "hello") #== "delivers message"
Resp "cdab" _ ok4 <- signSendRecv h rKey ("cdab", rId, ACK mId1)
@ -124,7 +140,7 @@ testCreateSecure (ATransport t) =
Resp "bcda" _ ok3 <- signSendRecv h sKey ("bcda", sId, _SEND "hello again")
(ok3, OK) #== "accepts signed SEND"
Resp "" _ (MSG mId2 _ _ msg2) <- tGet h
Resp "" _ (Msg mId2 msg2) <- tGet h
(dec mId2 msg2, Right "hello again") #== "delivers message 2"
Resp "cdab" _ ok5 <- signSendRecv h rKey ("cdab", rId, ACK mId2)
@ -133,6 +149,77 @@ testCreateSecure (ATransport t) =
Resp "dabc" _ err5 <- sendRecv h ("", "dabc", sId, _SEND "hello")
(err5, ERR AUTH) #== "rejects unsigned SEND"
let maxAllowedMessage = B.replicate maxMessageLength '-'
Resp "bcda" _ OK <- signSendRecv h sKey ("bcda", sId, _SEND maxAllowedMessage)
Resp "" _ (Msg mId3 msg3) <- tGet h
(dec mId3 msg3, Right maxAllowedMessage) #== "delivers message of max size"
let biggerMessage = B.replicate (maxMessageLength + 1) '-'
Resp "bcda" _ (ERR LARGE_MSG) <- signSendRecv h sKey ("bcda", sId, _SEND biggerMessage)
pure ()
testCreateSecure :: ATransport -> Spec
testCreateSecure (ATransport t) =
it "should create (NEW) and secure (KEY) queue" $
smpTest t $ \h -> do
(rPub, rKey) <- C.generateSignatureKeyPair C.SEd448
(dhPub, dhPriv :: C.PrivateKeyX25519) <- C.generateKeyPair'
Resp "abcd" rId1 (Ids rId sId srvDh) <- signSendRecv h rKey ("abcd", "", NEW rPub dhPub)
let dec = decryptMsgV3 $ C.dh' srvDh dhPriv
(rId1, "") #== "creates queue"
Resp "bcda" sId1 ok1 <- sendRecv h ("", "bcda", sId, _SEND "hello")
(ok1, OK) #== "accepts unsigned SEND"
(sId1, sId) #== "same queue ID in response 1"
Resp "" _ (Msg mId1 msg1) <- tGet h
(dec mId1 msg1, Right "hello") #== "delivers message"
Resp "cdab" _ ok4 <- signSendRecv h rKey ("cdab", rId, ACK mId1)
(ok4, OK) #== "replies OK when message acknowledged if no more messages"
Resp "dabc" _ err6 <- signSendRecv h rKey ("dabc", rId, ACK mId1)
(err6, ERR NO_MSG) #== "replies ERR when message acknowledged without messages"
(sPub, sKey) <- C.generateSignatureKeyPair C.SEd448
Resp "abcd" sId2 err1 <- signSendRecv h sKey ("abcd", sId, _SEND "hello")
(err1, ERR AUTH) #== "rejects signed SEND"
(sId2, sId) #== "same queue ID in response 2"
Resp "bcda" _ err2 <- sendRecv h (sampleSig, "bcda", rId, KEY sPub)
(err2, ERR AUTH) #== "rejects KEY with wrong signature"
Resp "cdab" _ err3 <- signSendRecv h rKey ("cdab", sId, KEY sPub)
(err3, ERR AUTH) #== "rejects KEY with sender's ID"
Resp "dabc" rId2 ok2 <- signSendRecv h rKey ("dabc", rId, KEY sPub)
(ok2, OK) #== "secures queue"
(rId2, rId) #== "same queue ID in response 3"
Resp "abcd" _ err4 <- signSendRecv h rKey ("abcd", rId, KEY sPub)
(err4, ERR AUTH) #== "rejects KEY if already secured"
Resp "bcda" _ ok3 <- signSendRecv h sKey ("bcda", sId, _SEND "hello again")
(ok3, OK) #== "accepts signed SEND"
Resp "" _ (Msg mId2 msg2) <- tGet h
(dec mId2 msg2, Right "hello again") #== "delivers message 2"
Resp "cdab" _ ok5 <- signSendRecv h rKey ("cdab", rId, ACK mId2)
(ok5, OK) #== "replies OK when message acknowledged 2"
Resp "dabc" _ err5 <- sendRecv h ("", "dabc", sId, _SEND "hello")
(err5, ERR AUTH) #== "rejects unsigned SEND"
let maxAllowedMessage = B.replicate maxMessageLength '-'
Resp "bcda" _ OK <- signSendRecv h sKey ("bcda", sId, _SEND maxAllowedMessage)
Resp "" _ (Msg mId3 msg3) <- tGet h
(dec mId3 msg3, Right maxAllowedMessage) #== "delivers message of max size"
let biggerMessage = B.replicate (maxMessageLength + 1) '-'
Resp "bcda" _ (ERR LARGE_MSG) <- signSendRecv h sKey ("bcda", sId, _SEND biggerMessage)
pure ()
testCreateDelete :: ATransport -> Spec
testCreateDelete (ATransport t) =
it "should create (NEW), suspend (OFF) and delete (DEL) queue" $
@ -140,7 +227,7 @@ testCreateDelete (ATransport t) =
(rPub, rKey) <- C.generateSignatureKeyPair C.SEd25519
(dhPub, dhPriv :: C.PrivateKeyX25519) <- C.generateKeyPair'
Resp "abcd" rId1 (Ids rId sId srvDh) <- signSendRecv rh rKey ("abcd", "", NEW rPub dhPub)
let dec nonce = C.cbDecrypt (C.dh' srvDh dhPriv) (C.cbNonce nonce)
let dec = decryptMsgV3 $ C.dh' srvDh dhPriv
(rId1, "") #== "creates queue"
(sPub, sKey) <- C.generateSignatureKeyPair C.SEd25519
@ -153,7 +240,7 @@ testCreateDelete (ATransport t) =
Resp "dabc" _ ok7 <- signSendRecv sh sKey ("dabc", sId, _SEND "hello 2")
(ok7, OK) #== "accepts signed SEND 2 - this message is not delivered because the first is not ACKed"
Resp "" _ (MSG mId1 _ _ msg1) <- tGet rh
Resp "" _ (Msg mId1 msg1) <- tGet rh
(dec mId1 msg1, Right "hello") #== "delivers message"
Resp "abcd" _ err1 <- sendRecv rh (sampleSig, "abcd", rId, OFF)
@ -175,7 +262,7 @@ testCreateDelete (ATransport t) =
Resp "bcda" _ ok4 <- signSendRecv rh rKey ("bcda", rId, OFF)
(ok4, OK) #== "accepts OFF when suspended"
Resp "cdab" _ (MSG mId2 _ _ msg2) <- signSendRecv rh rKey ("cdab", rId, SUB)
Resp "cdab" _ (Msg mId2 msg2) <- signSendRecv rh rKey ("cdab", rId, SUB)
(dec mId2 msg2, Right "hello") #== "accepts SUB when suspended and delivers the message again (because was not ACKed)"
Resp "dabc" _ err5 <- sendRecv rh (sampleSig, "dabc", rId, DEL)
@ -237,14 +324,14 @@ testDuplex (ATransport t) =
(arPub, arKey) <- C.generateSignatureKeyPair C.SEd448
(aDhPub, aDhPriv :: C.PrivateKeyX25519) <- C.generateKeyPair'
Resp "abcd" _ (Ids aRcv aSnd aSrvDh) <- signSendRecv alice arKey ("abcd", "", NEW arPub aDhPub)
let aDec nonce = C.cbDecrypt (C.dh' aSrvDh aDhPriv) (C.cbNonce nonce)
let aDec = decryptMsgV3 $ C.dh' aSrvDh aDhPriv
-- aSnd ID is passed to Bob out-of-band
(bsPub, bsKey) <- C.generateSignatureKeyPair C.SEd448
Resp "bcda" _ OK <- sendRecv bob ("", "bcda", aSnd, _SEND $ "key " <> strEncode bsPub)
-- "key ..." is ad-hoc, not a part of SMP protocol
Resp "" _ (MSG mId1 _ _ msg1) <- tGet alice
Resp "" _ (Msg mId1 msg1) <- tGet alice
Resp "cdab" _ OK <- signSendRecv alice arKey ("cdab", aRcv, ACK mId1)
Right ["key", bobKey] <- pure $ B.words <$> aDec mId1 msg1
(bobKey, strEncode bsPub) #== "key received from Bob"
@ -253,11 +340,11 @@ testDuplex (ATransport t) =
(brPub, brKey) <- C.generateSignatureKeyPair C.SEd448
(bDhPub, bDhPriv :: C.PrivateKeyX25519) <- C.generateKeyPair'
Resp "abcd" _ (Ids bRcv bSnd bSrvDh) <- signSendRecv bob brKey ("abcd", "", NEW brPub bDhPub)
let bDec nonce = C.cbDecrypt (C.dh' bSrvDh bDhPriv) (C.cbNonce nonce)
let bDec = decryptMsgV3 $ C.dh' bSrvDh bDhPriv
Resp "bcda" _ OK <- signSendRecv bob bsKey ("bcda", aSnd, _SEND $ "reply_id " <> encode bSnd)
-- "reply_id ..." is ad-hoc, not a part of SMP protocol
Resp "" _ (MSG mId2 _ _ msg2) <- tGet alice
Resp "" _ (Msg mId2 msg2) <- tGet alice
Resp "cdab" _ OK <- signSendRecv alice arKey ("cdab", aRcv, ACK mId2)
Right ["reply_id", bId] <- pure $ B.words <$> aDec mId2 msg2
(bId, encode bSnd) #== "reply queue ID received from Bob"
@ -266,7 +353,7 @@ testDuplex (ATransport t) =
Resp "dabc" _ OK <- sendRecv alice ("", "dabc", bSnd, _SEND $ "key " <> strEncode asPub)
-- "key ..." is ad-hoc, not a part of SMP protocol
Resp "" _ (MSG mId3 _ _ msg3) <- tGet bob
Resp "" _ (Msg mId3 msg3) <- tGet bob
Resp "abcd" _ OK <- signSendRecv bob brKey ("abcd", bRcv, ACK mId3)
Right ["key", aliceKey] <- pure $ B.words <$> bDec mId3 msg3
(aliceKey, strEncode asPub) #== "key received from Alice"
@ -274,13 +361,13 @@ testDuplex (ATransport t) =
Resp "cdab" _ OK <- signSendRecv bob bsKey ("cdab", aSnd, _SEND "hi alice")
Resp "" _ (MSG mId4 _ _ msg4) <- tGet alice
Resp "" _ (Msg mId4 msg4) <- tGet alice
Resp "dabc" _ OK <- signSendRecv alice arKey ("dabc", aRcv, ACK mId4)
(aDec mId4 msg4, Right "hi alice") #== "message received from Bob"
Resp "abcd" _ OK <- signSendRecv alice asKey ("abcd", bSnd, _SEND "how are you bob")
Resp "" _ (MSG mId5 _ _ msg5) <- tGet bob
Resp "" _ (Msg mId5 msg5) <- tGet bob
Resp "bcda" _ OK <- signSendRecv bob brKey ("bcda", bRcv, ACK mId5)
(bDec mId5 msg5, Right "how are you bob") #== "message received from alice"
@ -291,18 +378,18 @@ testSwitchSub (ATransport t) =
(rPub, rKey) <- C.generateSignatureKeyPair C.SEd448
(dhPub, dhPriv :: C.PrivateKeyX25519) <- C.generateKeyPair'
Resp "abcd" _ (Ids rId sId srvDh) <- signSendRecv rh1 rKey ("abcd", "", NEW rPub dhPub)
let dec nonce = C.cbDecrypt (C.dh' srvDh dhPriv) (C.cbNonce nonce)
let dec = decryptMsgV3 $ C.dh' srvDh dhPriv
Resp "bcda" _ ok1 <- sendRecv sh ("", "bcda", sId, _SEND "test1")
(ok1, OK) #== "sent test message 1"
Resp "cdab" _ ok2 <- sendRecv sh ("", "cdab", sId, _SEND "test2, no ACK")
(ok2, OK) #== "sent test message 2"
Resp "" _ (MSG mId1 _ _ msg1) <- tGet rh1
Resp "" _ (Msg mId1 msg1) <- tGet rh1
(dec mId1 msg1, Right "test1") #== "test message 1 delivered to the 1st TCP connection"
Resp "abcd" _ (MSG mId2 _ _ msg2) <- signSendRecv rh1 rKey ("abcd", rId, ACK mId1)
Resp "abcd" _ (Msg mId2 msg2) <- signSendRecv rh1 rKey ("abcd", rId, ACK mId1)
(dec mId2 msg2, Right "test2, no ACK") #== "test message 2 delivered, no ACK"
Resp "bcda" _ (MSG mId2' _ _ msg2') <- signSendRecv rh2 rKey ("bcda", rId, SUB)
Resp "bcda" _ (Msg mId2' msg2') <- signSendRecv rh2 rKey ("bcda", rId, SUB)
(dec mId2' msg2', Right "test2, no ACK") #== "same simplex queue via another TCP connection, tes2 delivered again (no ACK in 1st queue)"
Resp "cdab" _ OK <- signSendRecv rh2 rKey ("cdab", rId, ACK mId2')
@ -311,7 +398,7 @@ testSwitchSub (ATransport t) =
Resp "dabc" _ OK <- sendRecv sh ("", "dabc", sId, _SEND "test3")
Resp "" _ (MSG mId3 _ _ msg3) <- tGet rh2
Resp "" _ (Msg mId3 msg3) <- tGet rh2
(dec mId3 msg3, Right "test3") #== "delivered to the 2nd TCP connection"
Resp "abcd" _ err <- signSendRecv rh1 rKey ("abcd", rId, ACK mId3)
@ -334,9 +421,9 @@ testGetCommand t =
atomically . putTMVar queue =<< createAndSecureQueue rh sPub
testSMPClient @c $ \rh -> do
(sId, rId, rKey, dhShared) <- atomically $ takeTMVar queue
let dec nonce = C.cbDecrypt dhShared (C.cbNonce nonce)
let dec = decryptMsgV3 dhShared
Resp "1" _ OK <- signSendRecv sh sKey ("1", sId, _SEND "hello")
Resp "2" _ (MSG mId1 _ _ msg1) <- signSendRecv rh rKey ("2", rId, GET)
Resp "2" _ (Msg mId1 msg1) <- signSendRecv rh rKey ("2", rId, GET)
(dec mId1 msg1, Right "hello") #== "retrieved from queue"
Resp "3" _ OK <- signSendRecv rh rKey ("3", rId, ACK mId1)
Resp "4" _ OK <- signSendRecv rh rKey ("4", rId, GET)
@ -348,14 +435,14 @@ testGetSubCommands t =
(sPub, sKey) <- C.generateSignatureKeyPair C.SEd25519
smpTest3 t $ \rh1 rh2 sh -> do
(sId, rId, rKey, dhShared) <- createAndSecureQueue rh1 sPub
let dec nonce = C.cbDecrypt dhShared (C.cbNonce nonce)
let dec = decryptMsgV3 dhShared
Resp "1" _ OK <- signSendRecv sh sKey ("1", sId, _SEND "hello 1")
Resp "1a" _ OK <- signSendRecv sh sKey ("1a", sId, _SEND "hello 2")
Resp "1b" _ OK <- signSendRecv sh sKey ("1b", sId, _SEND "hello 3")
Resp "1c" _ OK <- signSendRecv sh sKey ("1c", sId, _SEND "hello 4")
-- both get the same if not ACK'd
Resp "" _ (MSG mId1 _ _ msg1) <- tGet rh1
Resp "2" _ (MSG mId1' _ _ msg1') <- signSendRecv rh2 rKey ("2", rId, GET)
Resp "" _ (Msg mId1 msg1) <- tGet rh1
Resp "2" _ (Msg mId1' msg1') <- signSendRecv rh2 rKey ("2", rId, GET)
(dec mId1 msg1, Right "hello 1") #== "received from queue via SUB"
(dec mId1' msg1', Right "hello 1") #== "retrieved from queue with GET"
mId1 `shouldBe` mId1'
@ -364,27 +451,27 @@ testGetSubCommands t =
Resp "3" _ (ERR (CMD PROHIBITED)) <- signSendRecv rh1 rKey ("3", rId, GET)
Resp "3a" _ (ERR (CMD PROHIBITED)) <- signSendRecv rh2 rKey ("3a", rId, SUB)
-- ACK for SUB delivers the next message
Resp "4" _ (MSG mId2 _ _ msg2) <- signSendRecv rh1 rKey ("4", rId, ACK mId1)
Resp "4" _ (Msg mId2 msg2) <- signSendRecv rh1 rKey ("4", rId, ACK mId1)
(dec mId2 msg2, Right "hello 2") #== "received from queue via SUB"
-- bad msgId returns error
Resp "5" _ (ERR NO_MSG) <- signSendRecv rh2 rKey ("5", rId, ACK "1234")
-- already ACK'd by subscriber, but still returns OK when msgId matches
Resp "5a" _ OK <- signSendRecv rh2 rKey ("5a", rId, ACK mId1)
-- msg2 is not lost - even if subscriber does not ACK it, it is delivered to getter
Resp "6" _ (MSG mId2' _ _ msg2') <- signSendRecv rh2 rKey ("6", rId, GET)
Resp "6" _ (Msg mId2' msg2') <- signSendRecv rh2 rKey ("6", rId, GET)
(dec mId2' msg2', Right "hello 2") #== "retrieved from queue with GET"
mId2 `shouldBe` mId2'
msg2 `shouldBe` msg2'
-- getter ACK returns OK, even though there is the next message
Resp "7" _ OK <- signSendRecv rh2 rKey ("7", rId, ACK mId2')
Resp "8" _ (MSG mId3 _ _ msg3) <- signSendRecv rh2 rKey ("8", rId, GET)
Resp "8" _ (Msg mId3 msg3) <- signSendRecv rh2 rKey ("8", rId, GET)
(dec mId3 msg3, Right "hello 3") #== "retrieved from queue with GET"
-- subscriber ACK does not lose message
Resp "9" _ (MSG mId3' _ _ msg3') <- signSendRecv rh1 rKey ("9", rId, ACK mId2')
Resp "9" _ (Msg mId3' msg3') <- signSendRecv rh1 rKey ("9", rId, ACK mId2')
(dec mId3' msg3', Right "hello 3") #== "retrieved from queue with GET"
mId3 `shouldBe` mId3'
msg3 `shouldBe` msg3'
Resp "10" _ (MSG mId4 _ _ msg4) <- signSendRecv rh1 rKey ("10", rId, ACK mId3)
Resp "10" _ (Msg mId4 msg4) <- signSendRecv rh1 rKey ("10", rId, ACK mId3)
(dec mId4 msg4, Right "hello 4") #== "retrieved from queue with GET"
Resp "11" _ OK <- signSendRecv rh1 rKey ("11", rId, ACK mId4)
-- no more messages for getter too
@ -416,15 +503,15 @@ testWithStoreLog at@(ATransport t) =
writeTVar notifierId nId
Resp "dabc" _ OK <- signSendRecv h1 nKey ("dabc", nId, NSUB)
Resp "bcda" _ OK <- signSendRecv h sKey1 ("bcda", sId1, _SEND' "hello")
Resp "" _ (MSG mId1 _ _ msg1) <- tGet h
(C.cbDecrypt dhShared (C.cbNonce mId1) msg1, Right "hello") #== "delivered from queue 1"
Resp "" _ (Msg mId1 msg1) <- tGet h
(decryptMsgV3 dhShared mId1 msg1, Right "hello") #== "delivered from queue 1"
Resp "" _ (NMSG _ _) <- tGet h1
(sId2, rId2, rKey2, dhShared2) <- createAndSecureQueue h sPub2
atomically $ writeTVar senderId2 sId2
Resp "cdab" _ OK <- signSendRecv h sKey2 ("cdab", sId2, _SEND "hello too")
Resp "" _ (MSG mId2 _ _ msg2) <- tGet h
(C.cbDecrypt dhShared2 (C.cbNonce mId2) msg2, Right "hello too") #== "delivered from queue 2"
Resp "" _ (Msg mId2 msg2) <- tGet h
(decryptMsgV3 dhShared2 mId2 msg2, Right "hello too") #== "delivered from queue 2"
Resp "dabc" _ OK <- signSendRecv h rKey2 ("dabc", rId2, DEL)
pure ()
@ -446,8 +533,8 @@ testWithStoreLog at@(ATransport t) =
nId <- readTVarIO notifierId
Resp "dabc" _ OK <- signSendRecv h1 nKey ("dabc", nId, NSUB)
Resp "bcda" _ OK <- signSendRecv h sKey1 ("bcda", sId1, _SEND' "hello")
Resp "cdab" _ (MSG mId3 _ _ msg3) <- signSendRecv h rKey1 ("cdab", rId1, SUB)
(C.cbDecrypt dh1 (C.cbNonce mId3) msg3, Right "hello") #== "delivered from restored queue"
Resp "cdab" _ (Msg mId3 msg3) <- signSendRecv h rKey1 ("cdab", rId1, SUB)
(decryptMsgV3 dh1 mId3 msg3, Right "hello") #== "delivered from restored queue"
Resp "" _ (NMSG _ _) <- tGet h1
-- this queue is removed - not restored
sId2 <- readTVarIO senderId2
@ -489,9 +576,9 @@ testRestoreMessages at@(ATransport t) =
writeTVar dhShared $ Just dh
writeTVar senderId sId
Resp "1" _ OK <- signSendRecv h sKey ("1", sId, _SEND "hello")
Resp "" _ (MSG mId1 _ _ msg1) <- tGet h1
Resp "" _ (Msg mId1 msg1) <- tGet h1
Resp "1a" _ OK <- signSendRecv h1 rKey ("1a", rId, ACK mId1)
(C.cbDecrypt dh (C.cbNonce mId1) msg1, Right "hello") #== "message delivered"
(decryptMsgV3 dh mId1 msg1, Right "hello") #== "message delivered"
-- messages below are delivered after server restart
sId <- readTVarIO senderId
Resp "2" _ OK <- signSendRecv h sKey ("2", sId, _SEND "hello 2")
@ -506,12 +593,13 @@ testRestoreMessages at@(ATransport t) =
rId <- readTVarIO recipientId
Just rKey <- readTVarIO recipientKey
Just dh <- readTVarIO dhShared
Resp "2" _ (MSG mId2 _ _ msg2) <- signSendRecv h rKey ("2", rId, SUB)
(C.cbDecrypt dh (C.cbNonce mId2) msg2, Right "hello 2") #== "restored message delivered"
Resp "3" _ (MSG mId3 _ _ msg3) <- signSendRecv h rKey ("3", rId, ACK mId2)
(C.cbDecrypt dh (C.cbNonce mId3) msg3, Right "hello 3") #== "restored message delivered"
Resp "4" _ (MSG mId4 _ _ msg4) <- signSendRecv h rKey ("4", rId, ACK mId3)
(C.cbDecrypt dh (C.cbNonce mId4) msg4, Right "hello 4") #== "restored message delivered"
let dec = decryptMsgV3 dh
Resp "2" _ (Msg mId2 msg2) <- signSendRecv h rKey ("2", rId, SUB)
(dec mId2 msg2, Right "hello 2") #== "restored message delivered"
Resp "3" _ (Msg mId3 msg3) <- signSendRecv h rKey ("3", rId, ACK mId2)
(dec mId3 msg3, Right "hello 3") #== "restored message delivered"
Resp "4" _ (Msg mId4 msg4) <- signSendRecv h rKey ("4", rId, ACK mId3)
(dec mId4 msg4, Right "hello 4") #== "restored message delivered"
logSize testStoreLogFile `shouldReturn` 1
-- the last message is not removed because it was not ACK'd
@ -521,9 +609,78 @@ testRestoreMessages at@(ATransport t) =
rId <- readTVarIO recipientId
Just rKey <- readTVarIO recipientKey
Just dh <- readTVarIO dhShared
Resp "4" _ (MSG mId4 _ _ msg4) <- signSendRecv h rKey ("4", rId, SUB)
Resp "4" _ (Msg mId4 msg4) <- signSendRecv h rKey ("4", rId, SUB)
Resp "5" _ OK <- signSendRecv h rKey ("5", rId, ACK mId4)
(C.cbDecrypt dh (C.cbNonce mId4) msg4, Right "hello 4") #== "restored message delivered"
(decryptMsgV3 dh mId4 msg4, Right "hello 4") #== "restored message delivered"
logSize testStoreLogFile `shouldReturn` 1
logSize testStoreMsgsFile `shouldReturn` 0
removeFile testStoreLogFile
removeFile testStoreMsgsFile
where
runTest :: Transport c => TProxy c -> (THandle c -> IO ()) -> ThreadId -> Expectation
runTest _ test' server = do
testSMPClient test' `shouldReturn` ()
killThread server
runClient :: Transport c => TProxy c -> (THandle c -> IO ()) -> Expectation
runClient _ test' = testSMPClient test' `shouldReturn` ()
testRestoreMessagesV2 :: ATransport -> Spec
testRestoreMessagesV2 at@(ATransport t) =
it "should store messages on exit and restore on start" $ do
(sPub, sKey) <- C.generateSignatureKeyPair C.SEd25519
recipientId <- newTVarIO ""
recipientKey <- newTVarIO Nothing
dhShared <- newTVarIO Nothing
senderId <- newTVarIO ""
withSmpServerStoreMsgLogOnV2 at testPort . runTest t $ \h -> do
runClient t $ \h1 -> do
(sId, rId, rKey, dh) <- createAndSecureQueue h1 sPub
atomically $ do
writeTVar recipientId rId
writeTVar recipientKey $ Just rKey
writeTVar dhShared $ Just dh
writeTVar senderId sId
Resp "1" _ OK <- signSendRecv h sKey ("1", sId, _SEND "hello")
Resp "" _ (Msg mId1 msg1) <- tGet h1
Resp "1a" _ OK <- signSendRecv h1 rKey ("1a", rId, ACK mId1)
(decryptMsgV2 dh mId1 msg1, Right "hello") #== "message delivered"
-- messages below are delivered after server restart
sId <- readTVarIO senderId
Resp "2" _ OK <- signSendRecv h sKey ("2", sId, _SEND "hello 2")
Resp "3" _ OK <- signSendRecv h sKey ("3", sId, _SEND "hello 3")
Resp "4" _ OK <- signSendRecv h sKey ("4", sId, _SEND "hello 4")
pure ()
logSize testStoreLogFile `shouldReturn` 2
logSize testStoreMsgsFile `shouldReturn` 3
withSmpServerStoreMsgLogOnV2 at testPort . runTest t $ \h -> do
rId <- readTVarIO recipientId
Just rKey <- readTVarIO recipientKey
Just dh <- readTVarIO dhShared
let dec = decryptMsgV2 dh
Resp "2" _ (Msg mId2 msg2) <- signSendRecv h rKey ("2", rId, SUB)
(dec mId2 msg2, Right "hello 2") #== "restored message delivered"
Resp "3" _ (Msg mId3 msg3) <- signSendRecv h rKey ("3", rId, ACK mId2)
(dec mId3 msg3, Right "hello 3") #== "restored message delivered"
Resp "4" _ (Msg mId4 msg4) <- signSendRecv h rKey ("4", rId, ACK mId3)
(dec mId4 msg4, Right "hello 4") #== "restored message delivered"
logSize testStoreLogFile `shouldReturn` 1
-- the last message is not removed because it was not ACK'd
logSize testStoreMsgsFile `shouldReturn` 1
withSmpServerStoreMsgLogOnV2 at testPort . runTest t $ \h -> do
rId <- readTVarIO recipientId
Just rKey <- readTVarIO recipientKey
Just dh <- readTVarIO dhShared
Resp "4" _ (Msg mId4 msg4) <- signSendRecv h rKey ("4", rId, SUB)
Resp "5" _ OK <- signSendRecv h rKey ("5", rId, ACK mId4)
(decryptMsgV2 dh mId4 msg4, Right "hello 4") #== "restored message delivered"
logSize testStoreLogFile `shouldReturn` 1
logSize testStoreMsgsFile `shouldReturn` 0
@ -568,7 +725,7 @@ testTiming (ATransport t) =
(rPub, rKey) <- generateKeys goodKeySize
(dhPub, dhPriv :: C.PrivateKeyX25519) <- C.generateKeyPair'
Resp "abcd" "" (Ids rId sId srvDh) <- signSendRecv rh rKey ("abcd", "", NEW rPub dhPub)
let dec nonce = C.cbDecrypt (C.dh' srvDh dhPriv) (C.cbNonce nonce)
let dec = decryptMsgV3 $ C.dh' srvDh dhPriv
Resp "cdab" _ OK <- signSendRecv rh rKey ("cdab", rId, SUB)
(_, badKey) <- generateKeys badKeySize
@ -578,7 +735,7 @@ testTiming (ATransport t) =
Resp "dabc" _ OK <- signSendRecv rh rKey ("dabc", rId, KEY sPub)
Resp "bcda" _ OK <- signSendRecv sh sKey ("bcda", sId, _SEND "hello")
Resp "" _ (MSG mId _ _ msg) <- tGet rh
Resp "" _ (Msg mId msg) <- tGet rh
(dec mId msg, Right "hello") #== "delivered from queue"
runTimingTest sh badKey sId $ _SEND "hello"
@ -610,21 +767,21 @@ testMessageNotifications (ATransport t) =
(nPub, nKey) <- C.generateSignatureKeyPair C.SEd25519
smpTest4 t $ \rh sh nh1 nh2 -> do
(sId, rId, rKey, dhShared) <- createAndSecureQueue rh sPub
let dec nonce = C.cbDecrypt dhShared (C.cbNonce nonce)
let dec = decryptMsgV3 dhShared
(rcvNtfPubDhKey, _) <- C.generateKeyPair'
Resp "1" _ (NID nId' _) <- signSendRecv rh rKey ("1", rId, NKEY nPub rcvNtfPubDhKey)
Resp "1a" _ (NID nId _) <- signSendRecv rh rKey ("1a", rId, NKEY nPub rcvNtfPubDhKey)
nId' `shouldNotBe` nId
Resp "2" _ OK <- signSendRecv nh1 nKey ("2", nId, NSUB)
Resp "3" _ OK <- signSendRecv sh sKey ("3", sId, _SEND' "hello")
Resp "" _ (MSG mId1 _ _ msg1) <- tGet rh
Resp "" _ (Msg mId1 msg1) <- tGet rh
(dec mId1 msg1, Right "hello") #== "delivered from queue"
Resp "3a" _ OK <- signSendRecv rh rKey ("3a", rId, ACK mId1)
Resp "" _ (NMSG _ _) <- tGet nh1
Resp "4" _ OK <- signSendRecv nh2 nKey ("4", nId, NSUB)
Resp "" _ END <- tGet nh1
Resp "5" _ OK <- signSendRecv sh sKey ("5", sId, _SEND' "hello again")
Resp "" _ (MSG mId2 _ _ msg2) <- tGet rh
Resp "" _ (Msg mId2 msg2) <- tGet rh
Resp "5a" _ OK <- signSendRecv rh rKey ("5a", rId, ACK mId2)
(dec mId2 msg2, Right "hello again") #== "delivered from queue again"
Resp "" _ (NMSG _ _) <- tGet nh2
@ -633,7 +790,7 @@ testMessageNotifications (ATransport t) =
Just _ -> error "nothing else should be delivered to the 1st notifier's TCP connection"
Resp "6" _ OK <- signSendRecv rh rKey ("6", rId, NDEL)
Resp "7" _ OK <- signSendRecv sh sKey ("7", sId, _SEND' "hello there")
Resp "" _ (MSG mId3 _ _ msg3) <- tGet rh
Resp "" _ (Msg mId3 msg3) <- tGet rh
(dec mId3 msg3, Right "hello there") #== "delivered from queue again"
1000 `timeout` tGet @BrokerMsg nh2 >>= \case
Nothing -> pure ()
@ -647,12 +804,12 @@ testMsgExpireOnSend t =
withSmpServerConfigOn (ATransport t) cfg' testPort $ \_ ->
testSMPClient @c $ \sh -> do
(sId, rId, rKey, dhShared) <- testSMPClient @c $ \rh -> createAndSecureQueue rh sPub
let dec nonce = C.cbDecrypt dhShared (C.cbNonce nonce)
let dec = decryptMsgV3 dhShared
Resp "1" _ OK <- signSendRecv sh sKey ("1", sId, _SEND "hello (should expire)")
threadDelay 2500000
Resp "2" _ OK <- signSendRecv sh sKey ("2", sId, _SEND "hello (should NOT expire)")
testSMPClient @c $ \rh -> do
Resp "3" _ (MSG mId _ _ msg) <- signSendRecv rh rKey ("3", rId, SUB)
Resp "3" _ (Msg mId msg) <- signSendRecv rh rKey ("3", rId, SUB)
(dec mId msg, Right "hello (should NOT expire)") #== "delivered"
1000 `timeout` tGet @BrokerMsg rh >>= \case
Nothing -> return ()
@ -682,11 +839,11 @@ testMsgNOTExpireOnInterval t =
withSmpServerConfigOn (ATransport t) cfg' testPort $ \_ ->
testSMPClient @c $ \sh -> do
(sId, rId, rKey, dhShared) <- testSMPClient @c $ \rh -> createAndSecureQueue rh sPub
let dec nonce = C.cbDecrypt dhShared (C.cbNonce nonce)
let dec = decryptMsgV3 dhShared
Resp "1" _ OK <- signSendRecv sh sKey ("1", sId, _SEND "hello (should NOT expire)")
threadDelay 2500000
testSMPClient @c $ \rh -> do
Resp "2" _ (MSG mId _ _ msg) <- signSendRecv rh rKey ("2", rId, SUB)
Resp "2" _ (Msg mId msg) <- signSendRecv rh rKey ("2", rId, SUB)
(dec mId msg, Right "hello (should NOT expire)") #== "delivered"
1000 `timeout` tGet @BrokerMsg rh >>= \case
Nothing -> return ()