ntf: server stats (#487)

* nts: server stats

* ntf: collect stats, refactor

* rename property

* fixes
This commit is contained in:
Evgeny Poberezkin 2022-08-01 08:42:23 +01:00 committed by GitHub
parent fcaddb7848
commit b76ef03dbe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 376 additions and 86 deletions

View File

@ -1,9 +1,12 @@
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
module Main where
import Control.Logger.Simple
import Data.Functor (($>))
import Data.Ini (lookupValue)
import Simplex.Messaging.Client.Agent (defaultSMPClientAgentConfig)
import Simplex.Messaging.Notifications.Server (runNtfServer)
import Simplex.Messaging.Notifications.Server.Env (NtfServerConfig (..))
@ -50,28 +53,36 @@ ntfServerCLIConfig =
\# This option enables saving memory to append only log,\n\
\# and restoring it when the server is started.\n\
\# Log is compacted on start (deleted objects are removed).\n\
\# The messages are not logged.\n"
<> ("enable: " <> (if enableStoreLog then "on" else "off") <> "\n\n")
<> "[TRANSPORT]\n\
\enable: "
<> (if enableStoreLog then "on" else "off")
<> "\n\
\log_stats: off\n\n\
\[TRANSPORT]\n\
\port: "
<> defaultServerPort
<> "\n\
\websockets: off\n",
mkServerConfig = \storeLogFile transports _ ->
NtfServerConfig
{ transports,
subIdBytes = 24,
regCodeBytes = 32,
clientQSize = 16,
subQSize = 64,
pushQSize = 128,
smpAgentCfg = defaultSMPClientAgentConfig,
apnsConfig = defaultAPNSPushClientConfig,
inactiveClientExpiration = Nothing,
storeLogFile,
resubscribeDelay = 50000, -- 50ms
caCertificateFile = caCrtFile,
privateKeyFile = serverKeyFile,
certificateFile = serverCrtFile
}
mkServerConfig = \storeLogFile transports ini ->
let settingIsOn section name = if lookupValue section name ini == Right "on" then Just () else Nothing
logStats = settingIsOn "STORE_LOG" "log_stats"
in NtfServerConfig
{ transports,
subIdBytes = 24,
regCodeBytes = 32,
clientQSize = 16,
subQSize = 64,
pushQSize = 128,
smpAgentCfg = defaultSMPClientAgentConfig,
apnsConfig = defaultAPNSPushClientConfig,
inactiveClientExpiration = Nothing,
storeLogFile,
resubscribeDelay = 50000, -- 50ms
caCertificateFile = caCrtFile,
privateKeyFile = serverKeyFile,
certificateFile = serverCrtFile,
logStatsInterval = logStats $> 86400, -- seconds
logStatsStartTime = 0, -- seconds from 00:00 UTC
serverStatsLogFile = combine logPath "ntf-server-stats.daily.log",
serverStatsBackupFile = logStats $> combine logPath "ntf-server-stats.log"
}
}

View File

@ -2,7 +2,6 @@
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TypeApplications #-}
module Main where

View File

@ -61,6 +61,7 @@ library
Simplex.Messaging.Notifications.Server
Simplex.Messaging.Notifications.Server.Env
Simplex.Messaging.Notifications.Server.Push.APNS
Simplex.Messaging.Notifications.Server.Stats
Simplex.Messaging.Notifications.Server.Store
Simplex.Messaging.Notifications.Server.StoreLog
Simplex.Messaging.Notifications.Transport

View File

@ -4,8 +4,10 @@
{-# LANGUAGE GADTs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE OverloadedLists #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE PatternSynonyms #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
@ -18,30 +20,40 @@ import Control.Monad.IO.Unlift (MonadUnliftIO)
import Control.Monad.Reader
import Crypto.Random (MonadRandom)
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import Data.Functor (($>))
import Data.List (intercalate)
import Data.Map.Strict (Map)
import qualified Data.Text as T
import Data.Time.Clock (UTCTime (..), diffTimeToPicoseconds, getCurrentTime)
import Data.Time.Clock.System (getSystemTime)
import Data.Time.Format.ISO8601 (iso8601Show)
import Network.Socket (ServiceName)
import Simplex.Messaging.Client (ProtocolClientError (..))
import Simplex.Messaging.Client.Agent
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Notifications.Protocol
import Simplex.Messaging.Notifications.Server.Env
import Simplex.Messaging.Notifications.Server.Push.APNS (PNMessageData (..), PushNotification (..), PushProviderError (..))
import Simplex.Messaging.Notifications.Server.Stats
import Simplex.Messaging.Notifications.Server.Store
import Simplex.Messaging.Notifications.Server.StoreLog
import Simplex.Messaging.Notifications.Transport
import Simplex.Messaging.Protocol (ErrorType (..), ProtocolServer (host), SMPServer, SignedTransmission, Transmission, encodeTransmission, tGet, tPut)
import qualified Simplex.Messaging.Protocol as SMP
import Simplex.Messaging.Server
import Simplex.Messaging.Server.Stats
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Transport (ATransport (..), THandle (..), TProxy, Transport (..))
import Simplex.Messaging.Transport.Server (runTransportServer)
import Simplex.Messaging.Util
import System.Exit (exitFailure)
import System.IO (BufferMode (..), hPutStrLn, hSetBuffering)
import System.Mem.Weak (deRefWeak)
import UnliftIO (IOMode (..), async, uninterruptibleCancel)
import UnliftIO (IOMode (..), async, uninterruptibleCancel, withFile)
import UnliftIO.Concurrent (forkIO, killThread, mkWeakThreadId, threadDelay)
import UnliftIO.Directory (doesFileExist, renameFile)
import UnliftIO.Exception
import UnliftIO.STM
@ -54,12 +66,13 @@ runNtfServerBlocking :: (MonadRandom m, MonadUnliftIO m) => TMVar Bool -> NtfSer
runNtfServerBlocking started cfg = runReaderT (ntfServer cfg started) =<< newNtfServerEnv cfg
ntfServer :: forall m. (MonadUnliftIO m, MonadReader NtfEnv m) => NtfServerConfig -> TMVar Bool -> m ()
ntfServer NtfServerConfig {transports} started = do
ntfServer cfg@NtfServerConfig {transports} started = do
restoreServerStats
s <- asks subscriber
ps <- asks pushServer
subs <- readTVarIO =<< asks (subscriptions . store)
void . forkIO $ resubscribe s subs
raceAny_ (ntfSubscriber s : ntfPush ps : map runServer transports) `finally` stopServer
raceAny_ (ntfSubscriber s : ntfPush ps : map runServer transports <> serverStatsThread_ cfg) `finally` stopServer
where
runServer :: (ServiceName, ATransport) -> m ()
runServer (tcpPort, ATransport t) = do
@ -76,8 +89,55 @@ ntfServer NtfServerConfig {transports} started = do
stopServer :: m ()
stopServer = do
withNtfLog closeStoreLog
saveServerStats
asks (smpSubscribers . subscriber) >>= readTVarIO >>= mapM_ (\SMPSubscriber {subThreadId} -> readTVarIO subThreadId >>= mapM_ (liftIO . deRefWeak >=> mapM_ killThread))
serverStatsThread_ :: NtfServerConfig -> [m ()]
serverStatsThread_ NtfServerConfig {logStatsInterval = Just interval, logStatsStartTime, serverStatsLogFile} =
[logServerStats logStatsStartTime interval serverStatsLogFile]
serverStatsThread_ _ = []
logServerStats :: Int -> Int -> FilePath -> m ()
logServerStats startAt logInterval statsFilePath = do
initialDelay <- (startAt -) . fromIntegral . (`div` 1000000_000000) . diffTimeToPicoseconds . utctDayTime <$> liftIO getCurrentTime
liftIO $ putStrLn $ "server stats log enabled: " <> statsFilePath
threadDelay $ 1000000 * (initialDelay + if initialDelay < 0 then 86400 else 0)
NtfServerStats {fromTime, tknCreated, tknVerified, tknDeleted, subCreated, subDeleted, ntfReceived, ntfDelivered, activeTokens, activeSubs} <- asks serverStats
let interval = 1000000 * logInterval
withFile statsFilePath AppendMode $ \h -> liftIO $ do
hSetBuffering h LineBuffering
forever $ do
ts <- getCurrentTime
fromTime' <- atomically $ swapTVar fromTime ts
tknCreated' <- atomically $ swapTVar tknCreated 0
tknVerified' <- atomically $ swapTVar tknVerified 0
tknDeleted' <- atomically $ swapTVar tknDeleted 0
subCreated' <- atomically $ swapTVar subCreated 0
subDeleted' <- atomically $ swapTVar subDeleted 0
ntfReceived' <- atomically $ swapTVar ntfReceived 0
ntfDelivered' <- atomically $ swapTVar ntfDelivered 0
tkn <- atomically $ periodStatCounts activeTokens ts
sub <- atomically $ periodStatCounts activeSubs ts
hPutStrLn h $
intercalate
","
[ iso8601Show $ utctDay fromTime',
show tknCreated',
show tknVerified',
show tknDeleted',
show subCreated',
show subDeleted',
show ntfReceived',
show ntfDelivered',
dayCount tkn,
weekCount tkn,
monthCount tkn,
dayCount sub,
weekCount sub,
monthCount sub
]
threadDelay interval
resubscribe :: (MonadUnliftIO m, MonadReader NtfEnv m) => NtfSubscriber -> Map NtfSubscriptionId NtfSubData -> m ()
resubscribe NtfSubscriber {newSubQ} subs = do
d <- asks $ resubscribeDelay . config
@ -137,9 +197,12 @@ ntfSubscriber NtfSubscriber {smpSubscribers, newSubQ, smpAgent = ca@SMPClientAge
ntfTs <- liftIO getSystemTime
st <- asks store
NtfPushServer {pushQ} <- asks pushServer
stats <- asks serverStats
atomically $ updatePeriodStats (activeSubs stats) ntfId
atomically $
findNtfSubscriptionToken st smpQueue
>>= mapM_ (\tkn -> writeTBQueue pushQ (tkn, PNMessage PNMessageData {smpQueue, ntfTs, nmsgNonce, encNMsgMeta}))
incNtfStat ntfReceived
SMP.END -> updateSubStatus smpQueue NSEnd
_ -> pure ()
pure ()
@ -193,18 +256,21 @@ ntfPush s@NtfPushServer {pushQ} = forever $ do
liftIO $ logDebug $ "sending push notification to " <> T.pack (show pp)
status <- readTVarIO tknStatus
case (status, ntf) of
(_, PNVerification _) -> do
(_, PNVerification _) ->
-- TODO check token status
deliverNotification pp tkn ntf >>= \case
Right _ -> do
status_ <- atomically $ stateTVar tknStatus $ \status' -> if status' == NTActive then (Nothing, NTActive) else (Just NTConfirmed, NTConfirmed)
forM_ status_ $ \status' -> withNtfLog $ \sl -> logTokenStatus sl ntfTknId status'
_ -> pure ()
(NTActive, PNCheckMessages) -> do
(NTActive, PNCheckMessages) ->
void $ deliverNotification pp tkn ntf
(NTActive, PNMessage {}) -> do
stats <- asks serverStats
atomically $ updatePeriodStats (activeTokens stats) ntfTknId
void $ deliverNotification pp tkn ntf
_ -> do
incNtfStat ntfDelivered
_ ->
liftIO $ logError "bad notification token status"
where
deliverNotification :: PushProvider -> NtfTknData -> PushNotification -> m (Either PushProviderError ())
@ -347,6 +413,7 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu
atomically $ addNtfToken st tknId tkn
atomically $ writeTBQueue pushQ (tkn, PNVerification regCode)
withNtfLog (`logCreateToken` tkn)
incNtfStat tknCreated
pure (corrId, "", NRTknId tknId srvDhPubKey)
NtfReqCmd SToken (NtfTkn tkn@NtfTknData {ntfTknId, tknStatus, tknRegCode, tknDhSecret, tknDhKeys = (srvDhPubKey, srvDhPrivKey), tknCronInterval}) (corrId, tknId, cmd) -> do
status <- readTVarIO tknStatus
@ -368,6 +435,7 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu
tIds <- atomically $ removeInactiveTokenRegistrations st tkn
forM_ tIds cancelInvervalNotifications
withNtfLog $ \s -> logTokenStatus s tknId NTActive
incNtfStat tknVerified
pure NROk
| otherwise -> do
logDebug "TVFY - incorrect code or token status"
@ -386,6 +454,8 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu
addNtfToken st tknId tkn'
writeTBQueue pushQ (tkn', PNVerification regCode)
withNtfLog $ \s -> logUpdateToken s tknId token' regCode
incNtfStat tknDeleted
incNtfStat tknCreated
pure NROk
TDEL -> do
logDebug "TDEL"
@ -395,6 +465,7 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu
atomically $ removeSubscription ca smpServer (SPNotifier, notifierId)
cancelInvervalNotifications tknId
withNtfLog (`logDeleteToken` tknId)
incNtfStat tknDeleted
pure NROk
TCRN 0 -> do
logDebug "TCRN 0"
@ -434,6 +505,7 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu
Just _ -> atomically (writeTBQueue newSubQ $ NtfSub sub) $> NRSubId subId
_ -> pure $ NRErr AUTH
withNtfLog (`logCreateSubscription` sub)
incNtfStat subCreated
pure (corrId, "", resp)
NtfReqCmd SSubscription (NtfSub NtfSubData {smpQueue = SMPQueueNtf {smpServer, notifierId}, notifierKey = registeredNKey, subStatus}) (corrId, subId, cmd) -> do
status <- readTVarIO subStatus
@ -454,6 +526,7 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu
atomically $ deleteNtfSubscription st subId
atomically $ removeSubscription ca smpServer (SPNotifier, notifierId)
withNtfLog (`logDeleteSubscription` subId)
incNtfStat subDeleted
pure NROk
PING -> pure NRPong
getId :: m NtfEntityId
@ -471,3 +544,33 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu
withNtfLog :: (MonadUnliftIO m, MonadReader NtfEnv m) => (StoreLog 'WriteMode -> IO a) -> m ()
withNtfLog action = liftIO . mapM_ action =<< asks storeLog
incNtfStat :: (MonadUnliftIO m, MonadReader NtfEnv m) => (NtfServerStats -> TVar Int) -> m ()
incNtfStat statSel = do
stats <- asks serverStats
atomically $ modifyTVar (statSel stats) (+ 1)
saveServerStats :: (MonadUnliftIO m, MonadReader NtfEnv m) => m ()
saveServerStats =
asks (serverStatsBackupFile . config)
>>= mapM_ (\f -> asks serverStats >>= atomically . getNtfServerStatsData >>= liftIO . saveStats f)
where
saveStats f stats = do
logInfo $ "saving server stats to file " <> T.pack f
B.writeFile f $ strEncode stats
logInfo "server stats saved"
restoreServerStats :: (MonadUnliftIO m, MonadReader NtfEnv m) => m ()
restoreServerStats = asks (serverStatsBackupFile . config) >>= mapM_ restoreStats
where
restoreStats f = whenM (doesFileExist f) $ do
logInfo $ "restoring server stats from file " <> T.pack f
liftIO (strDecode <$> B.readFile f) >>= \case
Right d -> do
s <- asks serverStats
atomically $ setNtfServerStats s d
renameFile f $ f <> ".bak"
logInfo "server stats restored"
Left e -> do
logInfo $ "error restoring server stats: " <> T.pack e
liftIO exitFailure

View File

@ -11,6 +11,7 @@ import Control.Concurrent.Async (Async)
import Control.Monad.IO.Unlift
import Crypto.Random
import Data.ByteString.Char8 (ByteString)
import Data.Time.Clock (getCurrentTime)
import Data.Time.Clock.System (SystemTime)
import Data.Word (Word16)
import Data.X509.Validation (Fingerprint (..))
@ -21,6 +22,7 @@ import Simplex.Messaging.Client.Agent
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Notifications.Protocol
import Simplex.Messaging.Notifications.Server.Push.APNS
import Simplex.Messaging.Notifications.Server.Stats
import Simplex.Messaging.Notifications.Server.Store
import Simplex.Messaging.Notifications.Server.StoreLog
import Simplex.Messaging.Protocol (CorrId, SMPServer, Transmission)
@ -48,7 +50,12 @@ data NtfServerConfig = NtfServerConfig
-- CA certificate private key is not needed for initialization
caCertificateFile :: FilePath,
privateKeyFile :: FilePath,
certificateFile :: FilePath
certificateFile :: FilePath,
-- stats config - see SMP server config
logStatsInterval :: Maybe Int,
logStatsStartTime :: Int,
serverStatsLogFile :: FilePath,
serverStatsBackupFile :: Maybe FilePath
}
defaultInactiveClientExpiration :: ExpirationConfig
@ -67,7 +74,8 @@ data NtfEnv = NtfEnv
idsDrg :: TVar ChaChaDRG,
serverIdentity :: C.KeyHash,
tlsServerParams :: T.ServerParams,
serverIdentity :: C.KeyHash
serverIdentity :: C.KeyHash,
serverStats :: NtfServerStats
}
newNtfServerEnv :: (MonadUnliftIO m, MonadRandom m) => NtfServerConfig -> m NtfEnv
@ -79,7 +87,8 @@ newNtfServerEnv config@NtfServerConfig {subQSize, pushQSize, smpAgentCfg, apnsCo
pushServer <- atomically $ newNtfPushServer pushQSize apnsConfig
tlsServerParams <- liftIO $ loadTLSServerParams caCertificateFile certificateFile privateKeyFile
Fingerprint fp <- liftIO $ loadFingerprint caCertificateFile
pure NtfEnv {config, subscriber, pushServer, store, storeLog, idsDrg, tlsServerParams, serverIdentity = C.KeyHash fp}
serverStats <- atomically . newNtfServerStats =<< liftIO getCurrentTime
pure NtfEnv {config, subscriber, pushServer, store, storeLog, idsDrg, tlsServerParams, serverIdentity = C.KeyHash fp, serverStats}
data NtfSubscriber = NtfSubscriber
{ smpSubscribers :: TMap SMPServer SMPSubscriber,

View File

@ -0,0 +1,113 @@
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
module Simplex.Messaging.Notifications.Server.Stats where
import Control.Applicative (optional)
import qualified Data.Attoparsec.ByteString.Char8 as A
import qualified Data.ByteString.Char8 as B
import Data.Time.Clock (UTCTime)
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Notifications.Protocol (NtfTokenId)
import Simplex.Messaging.Protocol (NotifierId)
import Simplex.Messaging.Server.Stats
import UnliftIO.STM
data NtfServerStats = NtfServerStats
{ fromTime :: TVar UTCTime,
tknCreated :: TVar Int,
tknVerified :: TVar Int,
tknDeleted :: TVar Int,
subCreated :: TVar Int,
subDeleted :: TVar Int,
ntfReceived :: TVar Int,
ntfDelivered :: TVar Int,
activeTokens :: PeriodStats NtfTokenId,
activeSubs :: PeriodStats NotifierId
}
data NtfServerStatsData = NtfServerStatsData
{ _fromTime :: UTCTime,
_tknCreated :: Int,
_tknVerified :: Int,
_tknDeleted :: Int,
_subCreated :: Int,
_subDeleted :: Int,
_ntfReceived :: Int,
_ntfDelivered :: Int,
_activeTokens :: PeriodStatsData NtfTokenId,
_activeSubs :: PeriodStatsData NotifierId
}
newNtfServerStats :: UTCTime -> STM NtfServerStats
newNtfServerStats ts = do
fromTime <- newTVar ts
tknCreated <- newTVar 0
tknVerified <- newTVar 0
tknDeleted <- newTVar 0
subCreated <- newTVar 0
subDeleted <- newTVar 0
ntfReceived <- newTVar 0
ntfDelivered <- newTVar 0
activeTokens <- newPeriodStats
activeSubs <- newPeriodStats
pure NtfServerStats {fromTime, tknCreated, tknVerified, tknDeleted, subCreated, subDeleted, ntfReceived, ntfDelivered, activeTokens, activeSubs}
getNtfServerStatsData :: NtfServerStats -> STM NtfServerStatsData
getNtfServerStatsData s = do
_fromTime <- readTVar $ fromTime (s :: NtfServerStats)
_tknCreated <- readTVar $ tknCreated s
_tknVerified <- readTVar $ tknVerified s
_tknDeleted <- readTVar $ tknDeleted s
_subCreated <- readTVar $ subCreated s
_subDeleted <- readTVar $ subDeleted s
_ntfReceived <- readTVar $ ntfReceived s
_ntfDelivered <- readTVar $ ntfDelivered s
_activeTokens <- getPeriodStatsData $ activeTokens s
_activeSubs <- getPeriodStatsData $ activeSubs s
pure NtfServerStatsData {_fromTime, _tknCreated, _tknVerified, _tknDeleted, _subCreated, _subDeleted, _ntfReceived, _ntfDelivered, _activeTokens, _activeSubs}
setNtfServerStats :: NtfServerStats -> NtfServerStatsData -> STM ()
setNtfServerStats s d = do
writeTVar (fromTime (s :: NtfServerStats)) (_fromTime (d :: NtfServerStatsData))
writeTVar (tknCreated s) (_tknCreated d)
writeTVar (tknVerified s) (_tknVerified d)
writeTVar (tknDeleted s) (_tknDeleted d)
writeTVar (subCreated s) (_subCreated d)
writeTVar (subDeleted s) (_subDeleted d)
writeTVar (ntfReceived s) (_ntfReceived d)
writeTVar (ntfDelivered s) (_ntfDelivered d)
setPeriodStats (activeTokens s) (_activeTokens d)
setPeriodStats (activeSubs s) (_activeSubs d)
instance StrEncoding NtfServerStatsData where
strEncode NtfServerStatsData {_fromTime, _tknCreated, _tknVerified, _tknDeleted, _subCreated, _subDeleted, _ntfReceived, _ntfDelivered, _activeTokens, _activeSubs} =
B.unlines
[ "fromTime=" <> strEncode _fromTime,
"tknCreated=" <> strEncode _tknCreated,
"tknVerified=" <> strEncode _tknVerified,
"tknDeleted=" <> strEncode _tknDeleted,
"subCreated=" <> strEncode _subCreated,
"subDeleted=" <> strEncode _subDeleted,
"ntfReceived=" <> strEncode _ntfReceived,
"ntfDelivered=" <> strEncode _ntfDelivered,
"activeTokens:",
strEncode _activeTokens,
"activeSubs:",
strEncode _activeSubs
]
strP = do
_fromTime <- "fromTime=" *> strP <* A.endOfLine
_tknCreated <- "tknCreated=" *> strP <* A.endOfLine
_tknVerified <- "tknVerified=" *> strP <* A.endOfLine
_tknDeleted <- "tknDeleted=" *> strP <* A.endOfLine
_subCreated <- "subCreated=" *> strP <* A.endOfLine
_subDeleted <- "subDeleted=" *> strP <* A.endOfLine
_ntfReceived <- "ntfReceived=" *> strP <* A.endOfLine
_ntfDelivered <- "ntfDelivered=" *> strP <* A.endOfLine
_ <- "activeTokens:" <* A.endOfLine
_activeTokens <- strP <* A.endOfLine
_ <- "activeSubs:" <* A.endOfLine
_activeSubs <- strP <* optional A.endOfLine
pure NtfServerStatsData {_fromTime, _tknCreated, _tknVerified, _tknDeleted, _subCreated, _subDeleted, _ntfReceived, _ntfDelivered, _activeTokens, _activeSubs}

View File

@ -50,12 +50,8 @@ import Data.List (intercalate)
import qualified Data.List.NonEmpty as L
import qualified Data.Map.Strict as M
import Data.Maybe (isNothing)
import Data.Set (Set)
import qualified Data.Set as S
import qualified Data.Text as T
import Data.Text.Encoding (decodeLatin1)
import Data.Time.Calendar.Month.Compat (pattern MonthDay)
import Data.Time.Calendar.OrdinalDate (mondayStartWeek)
import Data.Time.Clock (UTCTime (..), diffTimeToPicoseconds, getCurrentTime)
import Data.Time.Clock.System (SystemTime (..), getSystemTime)
import Data.Time.Format.ISO8601 (iso8601Show)
@ -177,7 +173,7 @@ smpServer started = do
initialDelay <- (startAt -) . fromIntegral . (`div` 1000000_000000) . diffTimeToPicoseconds . utctDayTime <$> liftIO getCurrentTime
liftIO $ putStrLn $ "server stats log enabled: " <> statsFilePath
threadDelay $ 1000000 * (initialDelay + if initialDelay < 0 then 86400 else 0)
ServerStats {fromTime, qCreated, qSecured, qDeleted, msgSent, msgRecv, dayMsgQueues, weekMsgQueues, monthMsgQueues} <- asks serverStats
ServerStats {fromTime, qCreated, qSecured, qDeleted, msgSent, msgRecv, activeQueues} <- asks serverStats
let interval = 1000000 * logInterval
withFile statsFilePath AppendMode $ \h -> liftIO $ do
hSetBuffering h LineBuffering
@ -189,17 +185,9 @@ smpServer started = do
qDeleted' <- atomically $ swapTVar qDeleted 0
msgSent' <- atomically $ swapTVar msgSent 0
msgRecv' <- atomically $ swapTVar msgRecv 0
let day = utctDay ts
(_, wDay) = mondayStartWeek day
MonthDay _ mDay = day
(dayMsgQueues', weekMsgQueues', monthMsgQueues') <-
atomically $ (,,) <$> periodCount 1 dayMsgQueues <*> periodCount wDay weekMsgQueues <*> periodCount mDay monthMsgQueues
hPutStrLn h $ intercalate "," [iso8601Show $ utctDay fromTime', show qCreated', show qSecured', show qDeleted', show msgSent', show msgRecv', dayMsgQueues', weekMsgQueues', monthMsgQueues']
ps <- atomically $ periodStatCounts activeQueues ts
hPutStrLn h $ intercalate "," [iso8601Show $ utctDay fromTime', show qCreated', show qSecured', show qDeleted', show msgSent', show msgRecv', dayCount ps, weekCount ps, monthCount ps]
threadDelay interval
where
periodCount :: Int -> TVar (Set RecipientId) -> STM String
periodCount 1 pVar = show . S.size <$> swapTVar pVar S.empty
periodCount _ _ = pure ""
runClient :: Transport c => TProxy c -> c -> m ()
runClient _ h = do
@ -538,15 +526,7 @@ client clnt@Client {thVersion, subscriptions, ntfSubscriptions, rcvQ, sndQ} Serv
updateStats = do
stats <- asks serverStats
atomically $ modifyTVar (msgRecv stats) (+ 1)
atomically $ updateActiveQueues stats queueId
updateActiveQueues :: ServerStats -> RecipientId -> STM ()
updateActiveQueues stats qId = do
updatePeriod dayMsgQueues
updatePeriod weekMsgQueues
updatePeriod monthMsgQueues
where
updatePeriod pSel = modifyTVar (pSel stats) (S.insert qId)
atomically $ updatePeriodStats (activeQueues stats) queueId
sendMessage :: QueueRec -> MsgFlags -> MsgBody -> m (Transmission BrokerMsg)
sendMessage qr msgFlags msgBody
@ -571,7 +551,7 @@ client clnt@Client {thVersion, subscriptions, ntfSubscriptions, rcvQ, sndQ} Serv
when (sent == OK) $ do
stats <- asks serverStats
atomically $ modifyTVar (msgSent stats) (+ 1)
atomically $ updateActiveQueues stats $ recipientId qr
atomically $ updatePeriodStats (activeQueues stats) (recipientId qr)
pure resp
where
mkMessage :: C.MaxLenBS MaxMessageLen -> m Message
@ -743,7 +723,9 @@ restoreServerStats = asks (serverStatsBackupFile . config) >>= mapM_ restoreStat
liftIO (strDecode <$> B.readFile f) >>= \case
Right d -> do
s <- asks serverStats
atomically $ setServerStatsData s d
atomically $ setServerStats s d
renameFile f $ f <> ".bak"
logInfo "server stats restored"
Left e -> logInfo $ "error restoring server stats: " <> T.pack e
Left e -> do
logInfo $ "error restoring server stats: " <> T.pack e
liftIO exitFailure

View File

@ -1,5 +1,7 @@
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE PatternSynonyms #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Simplex.Messaging.Server.Stats where
@ -8,7 +10,9 @@ import qualified Data.Attoparsec.ByteString.Char8 as A
import qualified Data.ByteString.Char8 as B
import Data.Set (Set)
import qualified Data.Set as S
import Data.Time.Clock (UTCTime)
import Data.Time.Calendar.Month.Compat (pattern MonthDay)
import Data.Time.Calendar.OrdinalDate (mondayStartWeek)
import Data.Time.Clock (UTCTime (..))
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Protocol (RecipientId)
import UnliftIO.STM
@ -20,9 +24,7 @@ data ServerStats = ServerStats
qDeleted :: TVar Int,
msgSent :: TVar Int,
msgRecv :: TVar Int,
dayMsgQueues :: TVar (Set RecipientId),
weekMsgQueues :: TVar (Set RecipientId),
monthMsgQueues :: TVar (Set RecipientId)
activeQueues :: PeriodStats RecipientId
}
data ServerStatsData = ServerStatsData
@ -32,9 +34,7 @@ data ServerStatsData = ServerStatsData
_qDeleted :: Int,
_msgSent :: Int,
_msgRecv :: Int,
_dayMsgQueues :: Set RecipientId,
_weekMsgQueues :: Set RecipientId,
_monthMsgQueues :: Set RecipientId
_activeQueues :: PeriodStatsData RecipientId
}
newServerStats :: UTCTime -> STM ServerStats
@ -45,10 +45,8 @@ newServerStats ts = do
qDeleted <- newTVar 0
msgSent <- newTVar 0
msgRecv <- newTVar 0
dayMsgQueues <- newTVar S.empty
weekMsgQueues <- newTVar S.empty
monthMsgQueues <- newTVar S.empty
pure ServerStats {fromTime, qCreated, qSecured, qDeleted, msgSent, msgRecv, dayMsgQueues, weekMsgQueues, monthMsgQueues}
activeQueues <- newPeriodStats
pure ServerStats {fromTime, qCreated, qSecured, qDeleted, msgSent, msgRecv, activeQueues}
getServerStatsData :: ServerStats -> STM ServerStatsData
getServerStatsData s = do
@ -58,25 +56,21 @@ getServerStatsData s = do
_qDeleted <- readTVar $ qDeleted s
_msgSent <- readTVar $ msgSent s
_msgRecv <- readTVar $ msgRecv s
_dayMsgQueues <- readTVar $ dayMsgQueues s
_weekMsgQueues <- readTVar $ weekMsgQueues s
_monthMsgQueues <- readTVar $ monthMsgQueues s
pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _dayMsgQueues, _weekMsgQueues, _monthMsgQueues}
_activeQueues <- getPeriodStatsData $ activeQueues s
pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _activeQueues}
setServerStatsData :: ServerStats -> ServerStatsData -> STM ()
setServerStatsData s d = do
setServerStats :: ServerStats -> ServerStatsData -> STM ()
setServerStats s d = do
writeTVar (fromTime s) (_fromTime d)
writeTVar (qCreated s) (_qCreated d)
writeTVar (qSecured s) (_qSecured d)
writeTVar (qDeleted s) (_qDeleted d)
writeTVar (msgSent s) (_msgSent d)
writeTVar (msgRecv s) (_msgRecv d)
writeTVar (dayMsgQueues s) (_dayMsgQueues d)
writeTVar (weekMsgQueues s) (_weekMsgQueues d)
writeTVar (monthMsgQueues s) (_monthMsgQueues d)
setPeriodStats (activeQueues s) (_activeQueues d)
instance StrEncoding ServerStatsData where
strEncode ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _dayMsgQueues, _weekMsgQueues, _monthMsgQueues} =
strEncode ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _activeQueues} =
B.unlines
[ "fromTime=" <> strEncode _fromTime,
"qCreated=" <> strEncode _qCreated,
@ -84,9 +78,8 @@ instance StrEncoding ServerStatsData where
"qDeleted=" <> strEncode _qDeleted,
"msgSent=" <> strEncode _msgSent,
"msgRecv=" <> strEncode _msgRecv,
"dayMsgQueues=" <> strEncode _dayMsgQueues,
"weekMsgQueues=" <> strEncode _weekMsgQueues,
"monthMsgQueues=" <> strEncode _monthMsgQueues
"activeQueues:",
strEncode _activeQueues
]
strP = do
_fromTime <- "fromTime=" *> strP <* A.endOfLine
@ -95,7 +88,81 @@ instance StrEncoding ServerStatsData where
_qDeleted <- "qDeleted=" *> strP <* A.endOfLine
_msgSent <- "msgSent=" *> strP <* A.endOfLine
_msgRecv <- "msgRecv=" *> strP <* A.endOfLine
_dayMsgQueues <- "dayMsgQueues=" *> strP <* A.endOfLine
_weekMsgQueues <- "weekMsgQueues=" *> strP <* A.endOfLine
_monthMsgQueues <- "monthMsgQueues=" *> strP <* optional A.endOfLine
pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _dayMsgQueues, _weekMsgQueues, _monthMsgQueues}
r <- optional ("activeQueues:" <* A.endOfLine)
_activeQueues <- case r of
Just _ -> strP <* optional A.endOfLine
_ -> do
_day <- "dayMsgQueues=" *> strP <* A.endOfLine
_week <- "weekMsgQueues=" *> strP <* A.endOfLine
_month <- "monthMsgQueues=" *> strP <* optional A.endOfLine
pure PeriodStatsData {_day, _week, _month}
pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _activeQueues}
data PeriodStats a = PeriodStats
{ day :: TVar (Set a),
week :: TVar (Set a),
month :: TVar (Set a)
}
newPeriodStats :: STM (PeriodStats a)
newPeriodStats = do
day <- newTVar S.empty
week <- newTVar S.empty
month <- newTVar S.empty
pure PeriodStats {day, week, month}
data PeriodStatsData a = PeriodStatsData
{ _day :: Set a,
_week :: Set a,
_month :: Set a
}
getPeriodStatsData :: PeriodStats a -> STM (PeriodStatsData a)
getPeriodStatsData s = do
_day <- readTVar $ day s
_week <- readTVar $ week s
_month <- readTVar $ month s
pure PeriodStatsData {_day, _week, _month}
setPeriodStats :: PeriodStats a -> PeriodStatsData a -> STM ()
setPeriodStats s d = do
writeTVar (day s) (_day d)
writeTVar (week s) (_week d)
writeTVar (month s) (_month d)
instance (Ord a, StrEncoding a) => StrEncoding (PeriodStatsData a) where
strEncode PeriodStatsData {_day, _week, _month} =
"day=" <> strEncode _day <> "\nweek=" <> strEncode _week <> "\nmonth=" <> strEncode _month
strP = do
_day <- "day=" *> strP <* A.endOfLine
_week <- "week=" *> strP <* A.endOfLine
_month <- "month=" *> strP
pure PeriodStatsData {_day, _week, _month}
data PeriodStatCounts = PeriodStatCounts
{ dayCount :: String,
weekCount :: String,
monthCount :: String
}
periodStatCounts :: forall a. PeriodStats a -> UTCTime -> STM PeriodStatCounts
periodStatCounts ps ts = do
let d = utctDay ts
(_, wDay) = mondayStartWeek d
MonthDay _ mDay = d
dayCount <- periodCount 1 $ day ps
weekCount <- periodCount wDay $ week ps
monthCount <- periodCount mDay $ month ps
pure PeriodStatCounts {dayCount, weekCount, monthCount}
where
periodCount :: Int -> TVar (Set a) -> STM String
periodCount 1 pVar = show . S.size <$> swapTVar pVar S.empty
periodCount _ _ = pure ""
updatePeriodStats :: Ord a => PeriodStats a -> a -> STM ()
updatePeriodStats stats pId = do
updatePeriod day
updatePeriod week
updatePeriod month
where
updatePeriod pSel = modifyTVar (pSel stats) (S.insert pId)

View File

@ -94,7 +94,12 @@ ntfServerCfg =
-- CA certificate private key is not needed for initialization
caCertificateFile = "tests/fixtures/ca.crt",
privateKeyFile = "tests/fixtures/server.key",
certificateFile = "tests/fixtures/server.crt"
certificateFile = "tests/fixtures/server.crt",
-- stats config
logStatsInterval = Nothing,
logStatsStartTime = 0,
serverStatsLogFile = "tests/ntf-server-stats.daily.log",
serverStatsBackupFile = Nothing
}
withNtfServerStoreLog :: (MonadUnliftIO m, MonadRandom m) => ATransport -> (ThreadId -> m a) -> m a