ntf server: fix resubscribing to SMP server after it was restarted, test (#465)
This commit is contained in:
parent
991548b64d
commit
7a238812b7
|
@ -59,6 +59,7 @@ module Simplex.Messaging.Agent
|
|||
checkNtfToken,
|
||||
deleteNtfToken,
|
||||
getNtfToken,
|
||||
getNtfTokenData,
|
||||
deleteNtfSub,
|
||||
activateAgent,
|
||||
suspendAgent,
|
||||
|
@ -207,6 +208,9 @@ deleteNtfToken c = withAgentEnv c . deleteNtfToken' c
|
|||
getNtfToken :: AgentErrorMonad m => AgentClient -> m (DeviceToken, NtfTknStatus, NotificationsMode)
|
||||
getNtfToken c = withAgentEnv c $ getNtfToken' c
|
||||
|
||||
getNtfTokenData :: AgentErrorMonad m => AgentClient -> m NtfToken
|
||||
getNtfTokenData c = withAgentEnv c $ getNtfTokenData' c
|
||||
|
||||
-- | Delete notification subscription for connection
|
||||
deleteNtfSub :: AgentErrorMonad m => AgentClient -> ConnId -> m ()
|
||||
deleteNtfSub c = withAgentEnv c . deleteNtfSub' c
|
||||
|
@ -745,6 +749,12 @@ getNtfToken' c =
|
|||
Just NtfToken {deviceToken, ntfTknStatus, ntfMode} -> pure (deviceToken, ntfTknStatus, ntfMode)
|
||||
_ -> throwError $ CMD PROHIBITED
|
||||
|
||||
getNtfTokenData' :: AgentMonad m => AgentClient -> m NtfToken
|
||||
getNtfTokenData' c =
|
||||
withStore' c getSavedNtfToken >>= \case
|
||||
Just tkn -> pure tkn
|
||||
_ -> throwError $ CMD PROHIBITED
|
||||
|
||||
-- | Delete notification subscription for connection, in Reader monad
|
||||
deleteNtfSub' :: AgentMonad m => AgentClient -> ConnId -> m ()
|
||||
deleteNtfSub' _c connId = do
|
||||
|
|
|
@ -30,7 +30,7 @@ import Simplex.Messaging.Protocol (BrokerMsg, ProtocolServer (..), QueueId, SMPS
|
|||
import Simplex.Messaging.TMap (TMap)
|
||||
import qualified Simplex.Messaging.TMap as TM
|
||||
import Simplex.Messaging.Transport
|
||||
import Simplex.Messaging.Util (catchAll_, tryE, whenM, ($>>=))
|
||||
import Simplex.Messaging.Util (catchAll_, tryE, unlessM, ($>>=))
|
||||
import System.Timeout (timeout)
|
||||
import UnliftIO (async, forConcurrently_)
|
||||
import UnliftIO.Exception (Exception)
|
||||
|
@ -47,7 +47,7 @@ data SMPClientAgentEvent
|
|||
| CASubError SMPServer SMPSub ProtocolClientError
|
||||
|
||||
data SMPSubParty = SPRecipient | SPNotifier
|
||||
deriving (Eq, Ord)
|
||||
deriving (Eq, Ord, Show)
|
||||
|
||||
type SMPSub = (SMPSubParty, QueueId)
|
||||
|
||||
|
@ -203,7 +203,7 @@ getSMPServerClient' ca@SMPClientAgent {agentCfg, smpClients, msgQ} srv =
|
|||
notify $ CAReconnected srv
|
||||
cs <- atomically $ mapM readTVar =<< TM.lookup srv (pendingSrvSubs ca)
|
||||
forConcurrently_ (maybe [] M.assocs cs) $ \sub@(s, _) ->
|
||||
whenM (atomically $ hasSub (srvSubs ca) srv s) $
|
||||
unlessM (atomically $ hasSub (srvSubs ca) srv s) $
|
||||
subscribe_ smp sub `catchE` handleError s
|
||||
where
|
||||
subscribe_ :: SMPClient -> (SMPSub, C.APrivateSignKey) -> ExceptT ProtocolClientError IO ()
|
||||
|
|
|
@ -31,7 +31,7 @@ import Simplex.Messaging.Notifications.Server.Push.APNS (PNMessageData (..), Pus
|
|||
import Simplex.Messaging.Notifications.Server.Store
|
||||
import Simplex.Messaging.Notifications.Server.StoreLog
|
||||
import Simplex.Messaging.Notifications.Transport
|
||||
import Simplex.Messaging.Protocol (ErrorType (..), SMPServer, SignedTransmission, Transmission, encodeTransmission, tGet, tPut)
|
||||
import Simplex.Messaging.Protocol (ErrorType (..), ProtocolServer (host), SMPServer, SignedTransmission, Transmission, encodeTransmission, tGet, tPut)
|
||||
import qualified Simplex.Messaging.Protocol as SMP
|
||||
import Simplex.Messaging.Server
|
||||
import qualified Simplex.Messaging.TMap as TM
|
||||
|
@ -147,16 +147,19 @@ ntfSubscriber NtfSubscriber {smpSubscribers, newSubQ, smpAgent = ca@SMPClientAge
|
|||
forever $
|
||||
atomically (readTBQueue agentQ) >>= \case
|
||||
CAConnected _ -> pure ()
|
||||
CADisconnected srv subs ->
|
||||
CADisconnected srv subs -> do
|
||||
logInfo . T.pack $ "SMP server disconnected " <> host srv <> " (" <> show (length subs) <> ") subscriptions"
|
||||
forM_ subs $ \(_, ntfId) -> do
|
||||
let smpQueue = SMPQueueNtf srv ntfId
|
||||
updateSubStatus smpQueue NSInactive
|
||||
CAReconnected _ -> pure ()
|
||||
CAReconnected srv ->
|
||||
logInfo $ "SMP server reconnected " <> T.pack (host srv)
|
||||
CAResubscribed srv sub -> do
|
||||
let ntfId = snd sub
|
||||
smpQueue = SMPQueueNtf srv ntfId
|
||||
updateSubStatus smpQueue NSActive
|
||||
CASubError srv (_, ntfId) err ->
|
||||
CASubError srv (_, ntfId) err -> do
|
||||
logError . T.pack $ "SMP subscription error on server " <> host srv <> ": " <> show err
|
||||
handleSubError (SMPQueueNtf srv ntfId) err
|
||||
|
||||
handleSubError :: SMPQueueNtf -> ProtocolClientError -> m ()
|
||||
|
|
|
@ -99,6 +99,7 @@ data PushNotification
|
|||
| PNMessage PNMessageData
|
||||
| PNAlert Text
|
||||
| PNCheckMessages
|
||||
deriving (Show)
|
||||
|
||||
data PNMessageData = PNMessageData
|
||||
{ smpQueue :: SMPQueueNtf,
|
||||
|
@ -106,6 +107,7 @@ data PNMessageData = PNMessageData
|
|||
nmsgNonce :: C.CbNonce,
|
||||
encNMsgMeta :: EncNMsgMeta
|
||||
}
|
||||
deriving (Show)
|
||||
|
||||
instance StrEncoding PNMessageData where
|
||||
strEncode PNMessageData {smpQueue, ntfTs, nmsgNonce, encNMsgMeta} =
|
||||
|
|
|
@ -14,19 +14,22 @@ import Control.Concurrent (killThread, threadDelay)
|
|||
import Control.Monad.Except
|
||||
import qualified Data.Aeson as J
|
||||
import qualified Data.Aeson.Types as JT
|
||||
import Data.Bifunctor (bimap)
|
||||
import Data.Bifunctor (bimap, first)
|
||||
import qualified Data.ByteString.Base64.URL as U
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import Data.Text.Encoding (encodeUtf8)
|
||||
import NtfClient
|
||||
import SMPAgentClient (agentCfg, initAgentServers, testDB, testDB2)
|
||||
import SMPClient (withSmpServer)
|
||||
import SMPClient (testPort, withSmpServer, withSmpServerStoreLogOn)
|
||||
import Simplex.Messaging.Agent
|
||||
import Simplex.Messaging.Agent.Client (AgentClient)
|
||||
import Simplex.Messaging.Agent.Env.SQLite (AgentConfig (..))
|
||||
import Simplex.Messaging.Agent.Protocol
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Encoding.String
|
||||
import Simplex.Messaging.Notifications.Protocol
|
||||
import Simplex.Messaging.Notifications.Server.Push.APNS
|
||||
import Simplex.Messaging.Notifications.Types (NtfToken (..))
|
||||
import Simplex.Messaging.Protocol (ErrorType (AUTH), MsgFlags (MsgFlags), SMPMsgMeta (..))
|
||||
import qualified Simplex.Messaging.Protocol as SMP
|
||||
import Simplex.Messaging.Transport (ATransport)
|
||||
|
@ -73,11 +76,15 @@ notificationTests t =
|
|||
withSmpServer t $
|
||||
withAPNSMockServer $ \apns ->
|
||||
withNtfServer t $ testChangeToken apns
|
||||
describe "Notifications server store log" $ do
|
||||
describe "Notifications server store log" $
|
||||
it "should save and restore tokens and subscriptions" $ \_ ->
|
||||
withSmpServer t $
|
||||
withAPNSMockServer $ \apns ->
|
||||
testNotificationsStoreLog t apns
|
||||
describe "Notifications after SMP server restart" $
|
||||
it "should resume subscriptions after SMP server is restarted" $ \_ ->
|
||||
withAPNSMockServer $ \apns ->
|
||||
withNtfServer t $ testNotificationsSMPRestart t apns
|
||||
|
||||
testNotificationToken :: APNSMockServer -> IO ()
|
||||
testNotificationToken APNSMockServer {apnsQ} = do
|
||||
|
@ -430,8 +437,8 @@ testNotificationsStoreLog t APNSMockServer {apnsQ} = do
|
|||
alice <- getSMPAgentClient agentCfg initAgentServers
|
||||
bob <- getSMPAgentClient agentCfg {dbFile = testDB2} initAgentServers
|
||||
Right (aliceId, bobId) <- withNtfServerStoreLog t $ \threadId -> runExceptT $ do
|
||||
_ <- registerTestToken alice "abcd" NMInstant apnsQ
|
||||
(aliceId, bobId) <- makeConnection alice bob
|
||||
_ <- registerTestToken alice "abcd" NMInstant apnsQ
|
||||
liftIO $ threadDelay 250000
|
||||
4 <- sendMessage bob aliceId (SMP.MsgFlags True) "hello"
|
||||
get bob ##> ("", aliceId, SENT 4)
|
||||
|
@ -452,6 +459,37 @@ testNotificationsStoreLog t APNSMockServer {apnsQ} = do
|
|||
liftIO $ killThread threadId
|
||||
pure ()
|
||||
|
||||
testNotificationsSMPRestart :: ATransport -> APNSMockServer -> IO ()
|
||||
testNotificationsSMPRestart t APNSMockServer {apnsQ} = do
|
||||
alice <- getSMPAgentClient agentCfg initAgentServers
|
||||
bob <- getSMPAgentClient agentCfg {dbFile = testDB2} initAgentServers
|
||||
Right (aliceId, bobId) <- withSmpServerStoreLogOn t testPort $ \threadId -> runExceptT $ do
|
||||
(aliceId, bobId) <- makeConnection alice bob
|
||||
_ <- registerTestToken alice "abcd" NMInstant apnsQ
|
||||
liftIO $ threadDelay 250000
|
||||
4 <- sendMessage bob aliceId (SMP.MsgFlags True) "hello"
|
||||
get bob ##> ("", aliceId, SENT 4)
|
||||
void $ messageNotification apnsQ
|
||||
get alice =##> \case ("", c, Msg "hello") -> c == bobId; _ -> False
|
||||
ackMessage alice bobId 4
|
||||
liftIO $ killThread threadId
|
||||
pure (aliceId, bobId)
|
||||
|
||||
Right () <- runExceptT $ do
|
||||
get alice =##> \case ("", "", DOWN _ [c]) -> c == bobId; _ -> False
|
||||
get bob =##> \case ("", "", DOWN _ [c]) -> c == aliceId; _ -> False
|
||||
|
||||
Right () <- withSmpServerStoreLogOn t testPort $ \threadId -> runExceptT $ do
|
||||
get alice =##> \case ("", "", UP _ [c]) -> c == bobId; _ -> False
|
||||
get bob =##> \case ("", "", UP _ [c]) -> c == aliceId; _ -> False
|
||||
liftIO $ threadDelay 1000000
|
||||
5 <- sendMessage bob aliceId (SMP.MsgFlags True) "hello again"
|
||||
get bob ##> ("", aliceId, SENT 5)
|
||||
_ <- messageNotificationData alice apnsQ
|
||||
get alice =##> \case ("", c, Msg "hello again") -> c == bobId; _ -> False
|
||||
liftIO $ killThread threadId
|
||||
pure ()
|
||||
|
||||
messageNotification :: TBQueue APNSMockRequest -> ExceptT AgentErrorType IO (C.CbNonce, ByteString)
|
||||
messageNotification apnsQ = do
|
||||
1000000 `timeout` atomically (readTBQueue apnsQ) >>= \case
|
||||
|
@ -463,6 +501,13 @@ messageNotification apnsQ = do
|
|||
pure (nonce, message)
|
||||
_ -> error "bad notification"
|
||||
|
||||
messageNotificationData :: AgentClient -> TBQueue APNSMockRequest -> ExceptT AgentErrorType IO PNMessageData
|
||||
messageNotificationData c apnsQ = do
|
||||
(nonce, message) <- messageNotification apnsQ
|
||||
NtfToken {ntfDhSecret = Just dhSecret} <- getNtfTokenData c
|
||||
Right pnMsgData <- liftEither . first INTERNAL $ Right . strDecode =<< first show (C.cbDecrypt dhSecret nonce message)
|
||||
pure pnMsgData
|
||||
|
||||
noNotification :: TBQueue APNSMockRequest -> ExceptT AgentErrorType IO ()
|
||||
noNotification apnsQ = do
|
||||
500000 `timeout` atomically (readTBQueue apnsQ) >>= \case
|
||||
|
|
Reference in New Issue