v2.3.0: save and restore messages when the server is restarted (#395)

* save and restore messages when server is restarted

* read file line by line

* add import

* optmize restoring messages by reading the whole file

* update version to 2.3.0

* update scripts

* add script

* update readme
This commit is contained in:
Evgeny Poberezkin 2022-06-12 15:59:14 +01:00 committed by GitHub
parent 7736ef8576
commit 0a71822dd0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 191 additions and 19 deletions

View File

@ -1,3 +1,9 @@
# 2.3.0
SMP server:
- Save and restore undelivered messages, to avoid losing them. To save messages the server has to be stopped with SIGINT signal, if it is stopped with SIGTERM undelivered messages would not be saved.
# 2.2.0
SMP server:

View File

@ -35,6 +35,8 @@ SMP server uses in-memory persistence with an optional append-only log of create
To enable store log, initialize server using `smp-server -l` command, or modify `smp-server.ini` created during initialization (uncomment `enable: on` option in the store log section). Use `smp-server --help` for other usage tips.
Starting from version 2.3.0, when store log is enabled, the server would also enable saving undelivered messages on exit and restoring them on start. This can be disabled via a separate setting `restore_messages` in `smp-server.ini` file. Saving messages would only work if the server is stopped with SIGINT signal (keyboard interrupt), if it is stopped with SIGTERM signal the messages would not be saved.
> **Please note:** On initialization SMP server creates a chain of two certificates: a self-signed CA certificate ("offline") and a server certificate used for TLS handshake ("online"). **You should store CA certificate private key securely and delete it from the server. If server TLS credential is compromised this key can be used to sign a new one, keeping the same server identity and established connections.** CA private key location by default is `/etc/opt/simplex/ca.key`.
SMP server implements [SMP protocol](https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md).
@ -61,6 +63,7 @@ Now `openssl version` should be saying "OpenSSL". You can now run `smp-server in
### SMP client library
[SMP client](https://github.com/simplex-chat/simplexmq/blob/master/src/Simplex/Messaging/Client.hs) is a Haskell library to connect to SMP servers that allows to:
- execute commands with a functional API.
- receive messages and other notifications via STM queue.
- automatically send keep-alive commands.
@ -118,11 +121,11 @@ Deployment on Linode is performed via StackScripts, which serve as recipes for L
- Create a Linode account or login with an already existing one.
- Open [SMP server StackScript](https://cloud.linode.com/stackscripts/748014) and click "Deploy New Linode".
- You can optionally configure the following parameters:
- SMP Server store log flag for queue persistence on server restart, recommended.
- [Linode API token](https://www.linode.com/docs/guides/getting-started-with-the-linode-api#get-an-access-token) to attach server address etc. as tags to Linode and to add A record to your 2nd level domain (e.g. `example.com` [domain should be created](https://cloud.linode.com/domains/create) in your account prior to deployment). The API token access scopes:
- read/write for "linodes"
- read/write for "domains"
- Domain name to use instead of Linode IP address, e.g. `smp1.example.com`.
- SMP Server store log flag for queue persistence on server restart, recommended.
- [Linode API token](https://www.linode.com/docs/guides/getting-started-with-the-linode-api#get-an-access-token) to attach server address etc. as tags to Linode and to add A record to your 2nd level domain (e.g. `example.com` [domain should be created](https://cloud.linode.com/domains/create) in your account prior to deployment). The API token access scopes:
- read/write for "linodes"
- read/write for "domains"
- Domain name to use instead of Linode IP address, e.g. `smp1.example.com`.
- Choose the region and plan, Shared CPU Nanode with 1Gb is sufficient.
- Provide ssh key to be able to connect to your Linode via ssh. If you haven't provided a Linode API token this step is required to login to your Linode and get the server's fingerprint either from the welcome message or from the file `/etc/opt/simplex/fingerprint` after server starts. See [Linode's guide on ssh](https://www.linode.com/docs/guides/use-public-key-authentication-with-ssh/) .
- Deploy your Linode. After it starts wait for SMP server to start and for tags to appear (if a Linode API token was provided). It may take up to 5 minutes depending on the connection speed on the Linode. Connecting Linode IP address to provided domain name may take some additional time.

View File

@ -7,6 +7,7 @@
module Main where
import Control.Logger.Simple
import Data.Functor (($>))
import Data.Ini (lookupValue)
import Simplex.Messaging.Server (runSMPServer)
import Simplex.Messaging.Server.CLI (ServerCLIConfig (..), protocolServerCLI, readStrictIni)
@ -57,9 +58,11 @@ smpServerCLIConfig =
\# that will be lost on restart (e.g., as with redis).\n\
\# This option enables saving memory to append only log,\n\
\# and restoring it when the server is started.\n\
\# Log is compacted on start (deleted objects are removed).\n\
\# The messages are not logged.\n"
<> ("enable: " <> (if enableStoreLog then "on" else "off # on") <> "\n\n")
\# Log is compacted on start (deleted objects are removed).\n"
<> ("enable: " <> (if enableStoreLog then "on" else "off # on") <> "\n")
<> "# The messages are optionally saved and restored when the server restarts,\n\
\# they are deleted after restarting.\n"
<> ("restore_messages: " <> (if enableStoreLog then "on" else "off # on") <> "\n\n")
<> "[TRANSPORT]\n"
<> ("port: " <> defaultServerPort <> "\n")
<> "websockets: off\n\n"
@ -80,6 +83,13 @@ smpServerCLIConfig =
privateKeyFile = serverKeyFile,
certificateFile = serverCrtFile,
storeLogFile,
storeMsgsFile =
let messagesPath = combine logPath "smp-server-messages.log"
in case lookupValue "STORE_LOG" "restore_messages" ini of
Right "on" -> Just messagesPath
Right _ -> Nothing
-- if the setting is not set, it is enabled when store log is enabled
_ -> storeLogFile $> messagesPath,
allowNewQueues = True,
messageExpiration = Just defaultMessageExpiration,
inactiveClientExpiration =

View File

@ -1,5 +1,5 @@
name: simplexmq
version: 2.2.1
version: 2.3.0
synopsis: SimpleXMQ message broker
description: |
This package includes <./docs/Simplex-Messaging-Server.html server>,

View File

@ -157,6 +157,7 @@ Description=SMP server
[Service]
Type=simple
ExecStart=/bin/sh -c "exec $binary start >> /var/opt/simplex/smp-server.log 2>&1"
KillSignal=SIGINT
Restart=always
RestartSec=10
LimitNOFILE=1000000

12
scripts/update-smp-server.sh Executable file
View File

@ -0,0 +1,12 @@
#!/bin/bash
# systemd has to be configured to use SIGINT to save and restore undelivered messages after restart.
# Add this to [Service] section:
# KillSignal=SIGINT
curl -L -o /opt/simplex/bin/smp-server-new https://github.com/simplex-chat/simplexmq/releases/latest/download/smp-server-ubuntu-20_04-x86-64
systemctl stop smp-server
cp /var/opt/simplex/smp-server-store.log /var/opt/simplex/smp-server-store.log.bak
mv /opt/simplex/bin/smp-server /opt/simplex/bin/smp-server-old
mv /opt/simplex/bin/smp-server-new /opt/simplex/bin/smp-server
chmod +x /opt/simplex/bin/smp-server
systemctl start smp-server

View File

@ -5,7 +5,7 @@ cabal-version: 1.12
-- see: https://github.com/sol/hpack
name: simplexmq
version: 2.2.1
version: 2.3.0
synopsis: SimpleXMQ message broker
description: This package includes <./docs/Simplex-Messaging-Server.html server>,
<./docs/Simplex-Messaging-Client.html client> and

View File

@ -26,9 +26,11 @@ import qualified Data.ByteString.Base64.URL as U
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import Data.Char (isAlphaNum)
import Data.Int (Int64)
import qualified Data.List.NonEmpty as L
import Data.Text (Text)
import Data.Text.Encoding (decodeLatin1, encodeUtf8)
import Data.Time.Clock.System (SystemTime (..))
import Data.Word (Word16)
import Simplex.Messaging.Parsers (parseAll)
import Simplex.Messaging.Util ((<$?>))
@ -82,6 +84,14 @@ instance StrEncoding Word16 where
strEncode = B.pack . show
strP = A.decimal
instance StrEncoding Int64 where
strEncode = B.pack . show
strP = A.decimal
instance StrEncoding SystemTime where
strEncode = strEncode . systemSeconds
strP = MkSystemTime <$> strP <*> pure 0
-- lists encode/parse as comma-separated strings
strEncodeList :: StrEncoding a => [a] -> ByteString
strEncodeList = B.intercalate "," . map strEncode

View File

@ -52,6 +52,7 @@ import Data.Time.Clock.System (SystemTime (..), getSystemTime)
import Data.Type.Equality
import Network.Socket (ServiceName)
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Protocol
import Simplex.Messaging.Server.Env.STM
import Simplex.Messaging.Server.Expiration
@ -67,6 +68,7 @@ import Simplex.Messaging.Transport.Server
import Simplex.Messaging.Util
import System.Mem.Weak (deRefWeak)
import UnliftIO.Concurrent
import UnliftIO.Directory (doesFileExist, renameFile)
import UnliftIO.Exception
import UnliftIO.IO
import UnliftIO.STM
@ -90,12 +92,13 @@ smpServer :: forall m. (MonadUnliftIO m, MonadReader Env m) => TMVar Bool -> m (
smpServer started = do
s <- asks server
cfg@ServerConfig {transports} <- asks config
restoreServerMessages
raceAny_
( serverThread s subscribedQ subscribers subscriptions cancelSub :
serverThread s ntfSubscribedQ notifiers ntfSubscriptions (\_ -> pure ()) :
map runServer transports <> expireMessagesThread_ cfg <> serverStatsThread_ cfg
)
`finally` withLog closeStoreLog
`finally` (withLog closeStoreLog >> saveServerMessages)
where
runServer :: (ServiceName, ATransport) -> m ()
runServer (tcpPort, ATransport t) = do
@ -532,3 +535,34 @@ randomId :: (MonadUnliftIO m, MonadReader Env m) => Int -> m ByteString
randomId n = do
gVar <- asks idsDrg
atomically (C.pseudoRandomBytes n gVar)
saveServerMessages :: forall m. (MonadUnliftIO m, MonadReader Env m) => m ()
saveServerMessages = asks (storeMsgsFile . config) >>= mapM_ saveMessages
where
saveMessages f = do
liftIO $ putStrLn $ "saving messages to file " <> f
ms <- asks msgStore
liftIO . withFile f WriteMode $ \h ->
readTVarIO ms >>= mapM_ (saveQueueMsgs ms h) . M.keys
where
saveQueueMsgs ms h rId =
atomically (flushMsgQueue ms rId)
>>= mapM_ (B.hPutStrLn h . strEncode . MsgLogRecord rId)
restoreServerMessages :: forall m. (MonadUnliftIO m, MonadReader Env m) => m ()
restoreServerMessages = asks (storeMsgsFile . config) >>= mapM_ restoreMessages
where
restoreMessages f = whenM (doesFileExist f) $ do
liftIO $ putStrLn $ "restoring messages from file " <> f
ms <- asks msgStore
quota <- asks $ msgQueueQuota . config
liftIO $ mapM_ (restoreMsg ms quota) . B.lines =<< B.readFile f
renameFile f $ f <> ".bak"
where
restoreMsg ms quota s = case strDecode s of
Left e -> B.putStrLn $ "message parsing error (" <> B.pack e <> "): " <> B.take 100 s
Right (MsgLogRecord rId msg) -> do
full <- atomically $ do
q <- getMsgQueue ms rId quota
ifM (isFull q) (pure True) (writeMsg q msg $> False)
when full . B.putStrLn $ "message queue " <> strEncode rId <> " is full, message not restored: " <> strEncode (msgId msg)

View File

@ -43,6 +43,7 @@ data ServerConfig = ServerConfig
queueIdBytes :: Int,
msgIdBytes :: Int,
storeLogFile :: Maybe FilePath,
storeMsgsFile :: Maybe FilePath,
-- | set to False to prohibit creating new queues
allowNewQueues :: Bool,
-- | time after which the messages can be removed from the queues and check interval, seconds

View File

@ -1,10 +1,12 @@
{-# LANGUAGE FunctionalDependencies #-}
{-# LANGUAGE NamedFieldPuns #-}
module Simplex.Messaging.Server.MsgStore where
import Data.Int (Int64)
import Data.Time.Clock.System (SystemTime)
import Numeric.Natural
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Protocol (MsgBody, MsgId, RecipientId)
data Message = Message
@ -13,9 +15,22 @@ data Message = Message
msgBody :: MsgBody
}
instance StrEncoding Message where
strEncode Message {msgId, ts, msgBody} = strEncode (msgId, ts, msgBody)
strP = do
(msgId, ts, msgBody) <- strP
pure Message {msgId, ts, msgBody}
data MsgLogRecord = MsgLogRecord RecipientId Message
instance StrEncoding MsgLogRecord where
strEncode (MsgLogRecord rId msg) = strEncode (rId, msg)
strP = MsgLogRecord <$> strP_ <*> strP
class MonadMsgStore s q m | s -> q where
getMsgQueue :: s -> RecipientId -> Natural -> m q
delMsgQueue :: s -> RecipientId -> m ()
flushMsgQueue :: s -> RecipientId -> m [Message]
class MonadMsgQueue q m where
isFull :: q -> m Bool

View File

@ -7,6 +7,7 @@
module Simplex.Messaging.Server.MsgStore.STM where
import Control.Concurrent.STM.TBQueue (flushTBQueue)
import Control.Monad (when)
import Data.Int (Int64)
import Data.Time.Clock.System (SystemTime (systemSeconds))
@ -36,6 +37,9 @@ instance MonadMsgStore STMMsgStore MsgQueue STM where
delMsgQueue :: STMMsgStore -> RecipientId -> STM ()
delMsgQueue st rId = TM.delete rId st
flushMsgQueue :: STMMsgStore -> RecipientId -> STM [Message]
flushMsgQueue st rId = TM.lookup rId st >>= maybe (pure []) (flushTBQueue . msgQueue)
instance MonadMsgQueue MsgQueue STM where
isFull :: MsgQueue -> STM Bool
isFull = isFullTBQueue . msgQueue

View File

@ -96,7 +96,7 @@ supportedSMPVersions :: VersionRange
supportedSMPVersions = mkVersionRange 1 1
simplexMQVersion :: String
simplexMQVersion = "2.2.1"
simplexMQVersion = "2.3.0"
-- * Transport connection class

View File

@ -43,6 +43,9 @@ testKeyHash = "LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI="
testStoreLogFile :: FilePath
testStoreLogFile = "tests/tmp/smp-server-store.log"
testStoreMsgsFile :: FilePath
testStoreMsgsFile = "tests/tmp/smp-server-messages.log"
testSMPClient :: (Transport c, MonadUnliftIO m) => (THandle c -> m a) -> m a
testSMPClient client =
runTransportClient testHost testPort (Just testKeyHash) (Just defaultKeepAliveOpts) $ \h ->
@ -60,6 +63,7 @@ cfg =
queueIdBytes = 24,
msgIdBytes = 24,
storeLogFile = Nothing,
storeMsgsFile = Nothing,
allowNewQueues = True,
messageExpiration = Just defaultMessageExpiration,
inactiveClientExpiration = Just defaultInactiveClientExpiration,
@ -70,6 +74,9 @@ cfg =
certificateFile = "tests/fixtures/server.crt"
}
withSmpServerStoreMsgLogOn :: (MonadUnliftIO m, MonadRandom m) => ATransport -> ServiceName -> (ThreadId -> m a) -> m a
withSmpServerStoreMsgLogOn t = withSmpServerConfigOn t cfg {storeLogFile = Just testStoreLogFile, storeMsgsFile = Just testStoreMsgsFile}
withSmpServerStoreLogOn :: (MonadUnliftIO m, MonadRandom m) => ATransport -> ServiceName -> (ThreadId -> m a) -> m a
withSmpServerStoreLogOn t = withSmpServerConfigOn t cfg {storeLogFile = Just testStoreLogFile}

View File

@ -42,6 +42,7 @@ serverTests t@(ATransport t') = do
describe "duplex communication over 2 SMP connections" $ testDuplex t
describe "switch subscription to another TCP connection" $ testSwitchSub t
describe "Store log" $ testWithStoreLog t
describe "Restore messages" $ testRestoreMessages t
describe "Timing of AUTH error" $ testTiming t
describe "Message notifications" $ testMessageNotifications t
describe "Message expiration" $ do
@ -352,7 +353,7 @@ testWithStoreLog at@(ATransport t) =
Resp "dabc" _ OK <- signSendRecv h rKey2 ("dabc", rId2, DEL)
pure ()
logSize `shouldReturn` 6
logSize testStoreLogFile `shouldReturn` 6
withSmpServerThreadOn at testPort . runTest t $ \h -> do
sId1 <- readTVarIO senderId1
@ -377,7 +378,7 @@ testWithStoreLog at@(ATransport t) =
Resp "cdab" _ (ERR AUTH) <- signSendRecv h sKey2 ("cdab", sId2, SEND "hello too")
pure ()
logSize `shouldReturn` 1
logSize testStoreLogFile `shouldReturn` 1
removeFile testStoreLogFile
where
runTest :: Transport c => TProxy c -> (THandle c -> IO ()) -> ThreadId -> Expectation
@ -388,11 +389,79 @@ testWithStoreLog at@(ATransport t) =
runClient :: Transport c => TProxy c -> (THandle c -> IO ()) -> Expectation
runClient _ test' = testSMPClient test' `shouldReturn` ()
logSize :: IO Int
logSize =
try (length . B.lines <$> B.readFile testStoreLogFile) >>= \case
Right l -> pure l
Left (_ :: SomeException) -> logSize
logSize :: FilePath -> IO Int
logSize f =
try (length . B.lines <$> B.readFile f) >>= \case
Right l -> pure l
Left (_ :: SomeException) -> logSize f
testRestoreMessages :: ATransport -> Spec
testRestoreMessages at@(ATransport t) =
it "should store messages on exit and restore on start" $ do
(sPub, sKey) <- C.generateSignatureKeyPair C.SEd25519
recipientId <- newTVarIO ""
recipientKey <- newTVarIO Nothing
dhShared <- newTVarIO Nothing
senderId <- newTVarIO ""
withSmpServerStoreMsgLogOn at testPort . runTest t $ \h -> do
runClient t $ \h1 -> do
(sId, rId, rKey, dh) <- createAndSecureQueue h1 sPub
atomically $ do
writeTVar recipientId rId
writeTVar recipientKey $ Just rKey
writeTVar dhShared $ Just dh
writeTVar senderId sId
Resp "1" _ OK <- signSendRecv h sKey ("1", sId, SEND "hello")
Resp "" _ (MSG mId1 _ msg1) <- tGet h1
Resp "1a" _ OK <- signSendRecv h1 rKey ("1a", rId, ACK)
(C.cbDecrypt dh (C.cbNonce mId1) msg1, Right "hello") #== "message delivered"
-- messages below are delivered after server restart
sId <- readTVarIO senderId
Resp "2" _ OK <- signSendRecv h sKey ("2", sId, SEND "hello 2")
Resp "3" _ OK <- signSendRecv h sKey ("3", sId, SEND "hello 3")
Resp "4" _ OK <- signSendRecv h sKey ("4", sId, SEND "hello 4")
pure ()
logSize testStoreLogFile `shouldReturn` 2
logSize testStoreMsgsFile `shouldReturn` 3
withSmpServerStoreMsgLogOn at testPort . runTest t $ \h -> do
rId <- readTVarIO recipientId
Just rKey <- readTVarIO recipientKey
Just dh <- readTVarIO dhShared
Resp "2" _ (MSG mId2 _ msg2) <- signSendRecv h rKey ("2", rId, SUB)
(C.cbDecrypt dh (C.cbNonce mId2) msg2, Right "hello 2") #== "restored message delivered"
Resp "3" _ (MSG mId3 _ msg3) <- signSendRecv h rKey ("3", rId, ACK)
(C.cbDecrypt dh (C.cbNonce mId3) msg3, Right "hello 3") #== "restored message delivered"
Resp "4" _ (MSG mId4 _ msg4) <- signSendRecv h rKey ("4", rId, ACK)
(C.cbDecrypt dh (C.cbNonce mId4) msg4, Right "hello 4") #== "restored message delivered"
logSize testStoreLogFile `shouldReturn` 1
-- the last message is not removed because it was not ACK'd
logSize testStoreMsgsFile `shouldReturn` 1
withSmpServerStoreMsgLogOn at testPort . runTest t $ \h -> do
rId <- readTVarIO recipientId
Just rKey <- readTVarIO recipientKey
Just dh <- readTVarIO dhShared
Resp "4" _ (MSG mId4 _ msg4) <- signSendRecv h rKey ("4", rId, SUB)
Resp "5" _ OK <- signSendRecv h rKey ("5", rId, ACK)
(C.cbDecrypt dh (C.cbNonce mId4) msg4, Right "hello 4") #== "restored message delivered"
logSize testStoreLogFile `shouldReturn` 1
logSize testStoreMsgsFile `shouldReturn` 0
removeFile testStoreLogFile
removeFile testStoreMsgsFile
where
runTest :: Transport c => TProxy c -> (THandle c -> IO ()) -> ThreadId -> Expectation
runTest _ test' server = do
testSMPClient test' `shouldReturn` ()
killThread server
runClient :: Transport c => TProxy c -> (THandle c -> IO ()) -> Expectation
runClient _ test' = testSMPClient test' `shouldReturn` ()
createAndSecureQueue :: Transport c => THandle c -> SndPublicVerifyKey -> IO (SenderId, RecipientId, RcvPrivateSignKey, RcvDhSecret)
createAndSecureQueue h sPub = do