save and restore server stats on restart (#460)

This commit is contained in:
Evgeny Poberezkin 2022-07-04 10:45:35 +01:00 committed by GitHub
parent 4c0164c49e
commit bc26dc1d68
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 171 additions and 39 deletions

View File

@ -103,6 +103,7 @@ smpServerCLIConfig =
else Nothing,
logStatsInterval = Just 86400, -- seconds
logStatsStartTime = 0, -- seconds from 00:00 UTC
serverStatsFile = Just $ combine logPath "smp-server-stats.log",
smpServerVRange = supportedSMPServerVRange
}
}

View File

@ -75,6 +75,7 @@ library
Simplex.Messaging.Server.MsgStore.STM
Simplex.Messaging.Server.QueueStore
Simplex.Messaging.Server.QueueStore.STM
Simplex.Messaging.Server.Stats
Simplex.Messaging.Server.StoreLog
Simplex.Messaging.TMap
Simplex.Messaging.Transport

View File

@ -28,9 +28,13 @@ import qualified Data.ByteString.Char8 as B
import Data.Char (isAlphaNum)
import Data.Int (Int64)
import qualified Data.List.NonEmpty as L
import Data.Set (Set)
import qualified Data.Set as S
import Data.Text (Text)
import Data.Text.Encoding (decodeLatin1, encodeUtf8)
import Data.Time.Clock (UTCTime)
import Data.Time.Clock.System (SystemTime (..))
import Data.Time.Format.ISO8601
import Data.Word (Word16)
import Simplex.Messaging.Encoding
import Simplex.Messaging.Parsers (parseAll)
@ -101,14 +105,26 @@ instance StrEncoding Bool where
strP = smpP
{-# INLINE strP #-}
instance StrEncoding Int where
strEncode = B.pack . show
{-# INLINE strEncode #-}
strP = A.decimal
{-# INLINE strP #-}
instance StrEncoding Int64 where
strEncode = B.pack . show
{-# INLINE strEncode #-}
strP = A.decimal
{-# INLINE strP #-}
instance StrEncoding SystemTime where
strEncode = strEncode . systemSeconds
strP = MkSystemTime <$> strP <*> pure 0
instance StrEncoding UTCTime where
strEncode = B.pack . iso8601Show
strP = maybe (Left "bad UTCTime") Right . iso8601ParseM . B.unpack <$?> A.takeTill (\c -> c == ' ' || c == '\n')
-- lists encode/parse as comma-separated strings
strEncodeList :: StrEncoding a => [a] -> ByteString
strEncodeList = B.intercalate "," . map strEncode
@ -121,8 +137,12 @@ instance StrEncoding a => StrEncoding (L.NonEmpty a) where
strEncode = strEncodeList . L.toList
strP = L.fromList <$> listItem `A.sepBy1'` A.char ','
instance (StrEncoding a, Ord a) => StrEncoding (Set a) where
strEncode = strEncodeList . S.toList
strP = S.fromList <$> listItem `A.sepBy'` A.char ','
listItem :: StrEncoding a => Parser a
listItem = parseAll strP <$?> A.takeTill (== ',')
listItem = parseAll strP <$?> A.takeTill (\c -> c == ',' || c == ' ' || c == '\n')
instance (StrEncoding a, StrEncoding b) => StrEncoding (a, b) where
strEncode (a, b) = B.unwords [strEncode a, strEncode b]

View File

@ -50,10 +50,12 @@ import Data.Maybe (isNothing)
import Data.Set (Set)
import qualified Data.Set as S
import qualified Data.Text as T
import Data.Text.Encoding (decodeLatin1)
import Data.Time.Calendar.Month.Compat (pattern MonthDay)
import Data.Time.Calendar.OrdinalDate (mondayStartWeek)
import Data.Time.Clock (UTCTime (..), diffTimeToPicoseconds, getCurrentTime)
import Data.Time.Clock.System (SystemTime (..), getSystemTime)
import Data.Time.Format.ISO8601 (iso8601Show)
import Data.Type.Equality
import Network.Socket (ServiceName)
import qualified Simplex.Messaging.Crypto as C
@ -66,6 +68,7 @@ import Simplex.Messaging.Server.MsgStore
import Simplex.Messaging.Server.MsgStore.STM (MsgQueue)
import Simplex.Messaging.Server.QueueStore
import Simplex.Messaging.Server.QueueStore.STM (QueueStore)
import Simplex.Messaging.Server.Stats
import Simplex.Messaging.Server.StoreLog
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
@ -98,13 +101,14 @@ smpServer :: forall m. (MonadUnliftIO m, MonadReader Env m) => TMVar Bool -> m (
smpServer started = do
s <- asks server
cfg@ServerConfig {transports} <- asks config
restoreServerStats
restoreServerMessages
raceAny_
( serverThread s subscribedQ subscribers subscriptions cancelSub :
serverThread s ntfSubscribedQ notifiers ntfSubscriptions (\_ -> pure ()) :
map runServer transports <> expireMessagesThread_ cfg <> serverStatsThread_ cfg
)
`finally` (withLog closeStoreLog >> saveServerMessages)
`finally` (withLog closeStoreLog >> saveServerMessages >> saveServerStats)
where
runServer :: (ServiceName, ATransport) -> m ()
runServer (tcpPort, ATransport t) = do
@ -182,7 +186,7 @@ smpServer started = do
MonthDay _ mDay = day
(dayMsgQueues', weekMsgQueues', monthMsgQueues') <-
atomically $ (,,) <$> periodCount 1 dayMsgQueues <*> periodCount wDay weekMsgQueues <*> periodCount mDay monthMsgQueues
logInfo . T.pack $ intercalate "," [show fromTime', show qCreated', show qSecured', show qDeleted', show msgSent', show msgRecv', show dayMsgQueues', weekMsgQueues', monthMsgQueues']
logInfo . T.pack $ intercalate "," [iso8601Show fromTime', show qCreated', show qSecured', show qDeleted', show msgSent', show msgRecv', show dayMsgQueues', weekMsgQueues', monthMsgQueues']
threadDelay interval
where
periodCount :: Int -> TVar (Set RecipientId) -> STM String
@ -638,33 +642,58 @@ randomId n = do
gVar <- asks idsDrg
atomically (C.pseudoRandomBytes n gVar)
saveServerMessages :: forall m. (MonadUnliftIO m, MonadReader Env m) => m ()
saveServerMessages :: (MonadUnliftIO m, MonadReader Env m) => m ()
saveServerMessages = asks (storeMsgsFile . config) >>= mapM_ saveMessages
where
saveMessages f = do
liftIO $ putStrLn $ "saving messages to file " <> f
logInfo $ "saving messages to file " <> T.pack f
ms <- asks msgStore
liftIO . withFile f WriteMode $ \h ->
readTVarIO ms >>= mapM_ (saveQueueMsgs ms h) . M.keys
logInfo "messages saved"
where
saveQueueMsgs ms h rId =
atomically (flushMsgQueue ms rId)
>>= mapM_ (B.hPutStrLn h . strEncode . MsgLogRecord rId)
restoreServerMessages :: forall m. (MonadUnliftIO m, MonadReader Env m) => m ()
restoreServerMessages :: (MonadUnliftIO m, MonadReader Env m) => m ()
restoreServerMessages = asks (storeMsgsFile . config) >>= mapM_ restoreMessages
where
restoreMessages f = whenM (doesFileExist f) $ do
liftIO $ putStrLn $ "restoring messages from file " <> f
logInfo $ "restoring messages from file " <> T.pack f
ms <- asks msgStore
quota <- asks $ msgQueueQuota . config
liftIO $ mapM_ (restoreMsg ms quota) . B.lines =<< B.readFile f
renameFile f $ f <> ".bak"
logInfo $ "messages restored"
where
restoreMsg ms quota s = case strDecode s of
Left e -> B.putStrLn $ "message parsing error (" <> B.pack e <> "): " <> B.take 100 s
Left e -> logError . decodeLatin1 $ "message parsing error (" <> B.pack e <> "): " <> B.take 100 s
Right (MsgLogRecord rId msg) -> do
full <- atomically $ do
q <- getMsgQueue ms rId quota
ifM (isFull q) (pure True) (writeMsg q msg $> False)
when full . B.putStrLn $ "message queue " <> strEncode rId <> " is full, message not restored: " <> strEncode (msgId (msg :: Message))
when full . logError . decodeLatin1 $ "message queue " <> strEncode rId <> " is full, message not restored: " <> strEncode (msgId (msg :: Message))
saveServerStats :: (MonadUnliftIO m, MonadReader Env m) => m ()
saveServerStats =
asks (serverStatsFile . config)
>>= mapM_ (\f -> asks serverStats >>= atomically . getServerStatsData >>= liftIO . saveStats f)
where
saveStats f stats = do
logInfo $ "saving server stats to file " <> T.pack f
B.writeFile f $ strEncode stats
logInfo $ "server stats saved"
restoreServerStats :: (MonadUnliftIO m, MonadReader Env m) => m ()
restoreServerStats = asks (serverStatsFile . config) >>= mapM_ restoreStats
where
restoreStats f = whenM (doesFileExist f) $ do
logInfo $ "restoring server stats from file " <> T.pack f
liftIO (strDecode <$> B.readFile f) >>= \case
Right d -> do
s <- asks serverStats
atomically $ setServerStatsData s d
renameFile f $ f <> ".bak"
logInfo "server stats restored"
Left e -> logInfo $ "error restoring server stats: " <> T.pack e

View File

@ -12,9 +12,7 @@ import Crypto.Random
import Data.ByteString.Char8 (ByteString)
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as M
import Data.Set (Set)
import qualified Data.Set as S
import Data.Time.Clock (UTCTime, getCurrentTime)
import Data.Time.Clock (getCurrentTime)
import Data.Time.Clock.System (SystemTime)
import Data.X509.Validation (Fingerprint (..))
import Network.Socket (ServiceName)
@ -26,6 +24,7 @@ import Simplex.Messaging.Server.Expiration
import Simplex.Messaging.Server.MsgStore.STM
import Simplex.Messaging.Server.QueueStore (NtfCreds (..), QueueRec (..))
import Simplex.Messaging.Server.QueueStore.STM
import Simplex.Messaging.Server.Stats
import Simplex.Messaging.Server.StoreLog
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
@ -57,6 +56,8 @@ data ServerConfig = ServerConfig
-- | time of the day when the stats are logged first, to log at consistent times,
-- irrespective of when the server is started (seconds from 00:00 UTC)
logStatsStartTime :: Int,
-- | file to save and restore stats
serverStatsFile :: Maybe FilePath,
-- | CA certificate private key is not needed for initialization
caCertificateFile :: FilePath,
privateKeyFile :: FilePath,
@ -108,18 +109,6 @@ data Client = Client
activeAt :: TVar SystemTime
}
data ServerStats = ServerStats
{ qCreated :: TVar Int,
qSecured :: TVar Int,
qDeleted :: TVar Int,
msgSent :: TVar Int,
msgRecv :: TVar Int,
dayMsgQueues :: TVar (Set RecipientId),
weekMsgQueues :: TVar (Set RecipientId),
monthMsgQueues :: TVar (Set RecipientId),
fromTime :: TVar UTCTime
}
data SubscriptionThread = NoSub | SubPending | SubThread (Weak ThreadId) | ProhibitSub
data Sub = Sub
@ -145,19 +134,6 @@ newClient qSize sessionId ts = do
activeAt <- newTVar ts
return Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId, connected, activeAt}
newServerStats :: UTCTime -> STM ServerStats
newServerStats ts = do
qCreated <- newTVar 0
qSecured <- newTVar 0
qDeleted <- newTVar 0
msgSent <- newTVar 0
msgRecv <- newTVar 0
dayMsgQueues <- newTVar S.empty
weekMsgQueues <- newTVar S.empty
monthMsgQueues <- newTVar S.empty
fromTime <- newTVar ts
pure ServerStats {qCreated, qSecured, qDeleted, msgSent, msgRecv, dayMsgQueues, weekMsgQueues, monthMsgQueues, fromTime}
newSubscription :: SubscriptionThread -> STM Sub
newSubscription subThread = do
delivered <- newEmptyTMVar

View File

@ -0,0 +1,101 @@
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
module Simplex.Messaging.Server.Stats where
import Control.Applicative (optional)
import qualified Data.Attoparsec.ByteString.Char8 as A
import qualified Data.ByteString.Char8 as B
import Data.Set (Set)
import qualified Data.Set as S
import Data.Time.Clock (UTCTime)
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Protocol (RecipientId)
import UnliftIO.STM
data ServerStats = ServerStats
{ fromTime :: TVar UTCTime,
qCreated :: TVar Int,
qSecured :: TVar Int,
qDeleted :: TVar Int,
msgSent :: TVar Int,
msgRecv :: TVar Int,
dayMsgQueues :: TVar (Set RecipientId),
weekMsgQueues :: TVar (Set RecipientId),
monthMsgQueues :: TVar (Set RecipientId)
}
data ServerStatsData = ServerStatsData
{ _fromTime :: UTCTime,
_qCreated :: Int,
_qSecured :: Int,
_qDeleted :: Int,
_msgSent :: Int,
_msgRecv :: Int,
_dayMsgQueues :: Set RecipientId,
_weekMsgQueues :: Set RecipientId,
_monthMsgQueues :: Set RecipientId
}
newServerStats :: UTCTime -> STM ServerStats
newServerStats ts = do
fromTime <- newTVar ts
qCreated <- newTVar 0
qSecured <- newTVar 0
qDeleted <- newTVar 0
msgSent <- newTVar 0
msgRecv <- newTVar 0
dayMsgQueues <- newTVar S.empty
weekMsgQueues <- newTVar S.empty
monthMsgQueues <- newTVar S.empty
pure ServerStats {fromTime, qCreated, qSecured, qDeleted, msgSent, msgRecv, dayMsgQueues, weekMsgQueues, monthMsgQueues}
getServerStatsData :: ServerStats -> STM ServerStatsData
getServerStatsData s = do
_fromTime <- readTVar $ fromTime s
_qCreated <- readTVar $ qCreated s
_qSecured <- readTVar $ qSecured s
_qDeleted <- readTVar $ qDeleted s
_msgSent <- readTVar $ msgSent s
_msgRecv <- readTVar $ msgRecv s
_dayMsgQueues <- readTVar $ dayMsgQueues s
_weekMsgQueues <- readTVar $ weekMsgQueues s
_monthMsgQueues <- readTVar $ monthMsgQueues s
pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _dayMsgQueues, _weekMsgQueues, _monthMsgQueues}
setServerStatsData :: ServerStats -> ServerStatsData -> STM ()
setServerStatsData s d = do
writeTVar (fromTime s) (_fromTime d)
writeTVar (qCreated s) (_qCreated d)
writeTVar (qSecured s) (_qSecured d)
writeTVar (qDeleted s) (_qDeleted d)
writeTVar (msgSent s) (_msgSent d)
writeTVar (msgRecv s) (_msgRecv d)
writeTVar (dayMsgQueues s) (_dayMsgQueues d)
writeTVar (weekMsgQueues s) (_weekMsgQueues d)
writeTVar (monthMsgQueues s) (_monthMsgQueues d)
instance StrEncoding ServerStatsData where
strEncode ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _dayMsgQueues, _weekMsgQueues, _monthMsgQueues} =
B.unlines
[ "fromTime=" <> strEncode _fromTime,
"qCreated=" <> strEncode _qCreated,
"qSecured=" <> strEncode _qSecured,
"qDeleted=" <> strEncode _qDeleted,
"msgSent=" <> strEncode _msgSent,
"msgRecv=" <> strEncode _msgRecv,
"dayMsgQueues=" <> strEncode _dayMsgQueues,
"weekMsgQueues=" <> strEncode _weekMsgQueues,
"monthMsgQueues=" <> strEncode _monthMsgQueues
]
strP = do
_fromTime <- "fromTime=" *> strP <* A.endOfLine
_qCreated <- "qCreated=" *> strP <* A.endOfLine
_qSecured <- "qSecured=" *> strP <* A.endOfLine
_qDeleted <- "qDeleted=" *> strP <* A.endOfLine
_msgSent <- "msgSent=" *> strP <* A.endOfLine
_msgRecv <- "msgRecv=" *> strP <* A.endOfLine
_dayMsgQueues <- "dayMsgQueues=" *> strP <* A.endOfLine
_weekMsgQueues <- "weekMsgQueues=" *> strP <* A.endOfLine
_monthMsgQueues <- "monthMsgQueues=" *> strP <* optional A.endOfLine
pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _dayMsgQueues, _weekMsgQueues, _monthMsgQueues}

View File

@ -46,6 +46,9 @@ testStoreLogFile = "tests/tmp/smp-server-store.log"
testStoreMsgsFile :: FilePath
testStoreMsgsFile = "tests/tmp/smp-server-messages.log"
testServerStatsFile :: FilePath
testServerStatsFile = "tests/tmp/smp-server-stats.log"
testSMPClient :: (Transport c, MonadUnliftIO m) => (THandle c -> m a) -> m a
testSMPClient client =
runTransportClient testHost testPort (Just testKeyHash) (Just defaultKeepAliveOpts) $ \h ->
@ -69,6 +72,7 @@ cfg =
inactiveClientExpiration = Just defaultInactiveClientExpiration,
logStatsInterval = Nothing,
logStatsStartTime = 0,
serverStatsFile = Nothing,
caCertificateFile = "tests/fixtures/ca.crt",
privateKeyFile = "tests/fixtures/server.key",
certificateFile = "tests/fixtures/server.crt",
@ -76,10 +80,10 @@ cfg =
}
withSmpServerStoreMsgLogOn :: (MonadUnliftIO m, MonadRandom m) => ATransport -> ServiceName -> (ThreadId -> m a) -> m a
withSmpServerStoreMsgLogOn t = withSmpServerConfigOn t cfg {storeLogFile = Just testStoreLogFile, storeMsgsFile = Just testStoreMsgsFile}
withSmpServerStoreMsgLogOn t = withSmpServerConfigOn t cfg {storeLogFile = Just testStoreLogFile, storeMsgsFile = Just testStoreMsgsFile, serverStatsFile = Just testServerStatsFile}
withSmpServerStoreLogOn :: (MonadUnliftIO m, MonadRandom m) => ATransport -> ServiceName -> (ThreadId -> m a) -> m a
withSmpServerStoreLogOn t = withSmpServerConfigOn t cfg {storeLogFile = Just testStoreLogFile}
withSmpServerStoreLogOn t = withSmpServerConfigOn t cfg {storeLogFile = Just testStoreLogFile, serverStatsFile = Just testServerStatsFile}
withSmpServerConfigOn :: (MonadUnliftIO m, MonadRandom m) => ATransport -> ServerConfig -> ServiceName -> (ThreadId -> m a) -> m a
withSmpServerConfigOn t cfg' port' =