ntf server error handling (#448)
* ntf server error handling * refactor * fix
This commit is contained in:
parent
238a2e7fe9
commit
6b6ea78eac
|
@ -422,7 +422,7 @@ protocolClientError :: (ErrorType -> AgentErrorType) -> ProtocolClientError -> A
|
|||
protocolClientError protocolError_ = \case
|
||||
PCEProtocolError e -> protocolError_ e
|
||||
PCEResponseError e -> BROKER $ RESPONSE e
|
||||
PCEUnexpectedResponse -> BROKER UNEXPECTED
|
||||
PCEUnexpectedResponse _ -> BROKER UNEXPECTED
|
||||
PCEResponseTimeout -> BROKER TIMEOUT
|
||||
PCENetworkError -> BROKER NETWORK
|
||||
PCETransportError e -> BROKER $ TRANSPORT e
|
||||
|
|
|
@ -197,7 +197,7 @@ runNtfWorker c srv doWork = forever $ do
|
|||
case ntfSubId of
|
||||
Just nSubId ->
|
||||
agentNtfCheckSubscription c nSubId tkn >>= \case
|
||||
NSSMPAuth -> do
|
||||
NSAuth -> do
|
||||
getNtfServer c >>= \case
|
||||
Just ntfServer -> do
|
||||
withStore' c $ \db ->
|
||||
|
|
|
@ -224,7 +224,7 @@ getProtocolClient protocolServer cfg@ProtocolClientConfig {qSize, tcpTimeout, tc
|
|||
Right r -> case protocolError r of
|
||||
Just e -> Left $ PCEProtocolError e
|
||||
_ -> Right r
|
||||
else Left PCEUnexpectedResponse
|
||||
else Left . PCEUnexpectedResponse $ bshow respOrErr
|
||||
where
|
||||
sendMsg :: QueueId -> Either ErrorType msg -> IO ()
|
||||
sendMsg qId = \case
|
||||
|
@ -248,7 +248,7 @@ data ProtocolClientError
|
|||
-- e.g. server should respond `IDS` or `ERR` to `NEW` command,
|
||||
-- other responses would result in this error.
|
||||
-- Forwarded to the agent client as `ERR BROKER UNEXPECTED`.
|
||||
PCEUnexpectedResponse
|
||||
PCEUnexpectedResponse ByteString
|
||||
| -- | Used for TCP connection and command response timeouts.
|
||||
-- Forwarded to the agent client as `ERR BROKER TIMEOUT`.
|
||||
PCEResponseTimeout
|
||||
|
@ -276,7 +276,7 @@ createSMPQueue ::
|
|||
createSMPQueue c rpKey rKey dhKey =
|
||||
sendSMPCommand c (Just rpKey) "" (NEW rKey dhKey) >>= \case
|
||||
IDS qik -> pure qik
|
||||
_ -> throwE PCEUnexpectedResponse
|
||||
r -> throwE . PCEUnexpectedResponse $ bshow r
|
||||
|
||||
-- | Subscribe to the SMP queue.
|
||||
--
|
||||
|
@ -287,7 +287,7 @@ subscribeSMPQueue c@ProtocolClient {protocolServer, sessionId, msgQ} rpKey rId =
|
|||
OK -> return ()
|
||||
cmd@MSG {} ->
|
||||
lift . atomically $ mapM_ (`writeTBQueue` (protocolServer, sessionId, rId, cmd)) msgQ
|
||||
_ -> throwE PCEUnexpectedResponse
|
||||
r -> throwE . PCEUnexpectedResponse $ bshow r
|
||||
|
||||
-- | Get message from SMP queue. The server returns ERR PROHIBITED if a client uses SUB and GET via the same transport connection for the same queue
|
||||
--
|
||||
|
@ -299,7 +299,7 @@ getSMPMessage c@ProtocolClient {protocolServer, sessionId, msgQ} rpKey rId =
|
|||
cmd@(MSG msgId msgTs msgFlags _) -> do
|
||||
lift . atomically $ mapM_ (`writeTBQueue` (protocolServer, sessionId, rId, cmd)) msgQ
|
||||
pure $ Just SMP.SMPMsgMeta {msgId, msgTs, msgFlags}
|
||||
_ -> throwE PCEUnexpectedResponse
|
||||
r -> throwE . PCEUnexpectedResponse $ bshow r
|
||||
|
||||
-- | Subscribe to the SMP queue notifications.
|
||||
--
|
||||
|
@ -320,7 +320,7 @@ enableSMPQueueNotifications :: SMPClient -> RcvPrivateSignKey -> RecipientId ->
|
|||
enableSMPQueueNotifications c rpKey rId notifierKey rcvNtfPublicDhKey =
|
||||
sendSMPCommand c (Just rpKey) rId (NKEY notifierKey rcvNtfPublicDhKey) >>= \case
|
||||
NID nId rcvNtfSrvPublicDhKey -> pure (nId, rcvNtfSrvPublicDhKey)
|
||||
_ -> throwE PCEUnexpectedResponse
|
||||
r -> throwE . PCEUnexpectedResponse $ bshow r
|
||||
|
||||
-- | Disable notifications for the queue for push notifications server.
|
||||
--
|
||||
|
@ -335,7 +335,7 @@ sendSMPMessage :: SMPClient -> Maybe SndPrivateSignKey -> SenderId -> MsgFlags -
|
|||
sendSMPMessage c spKey sId flags msg =
|
||||
sendSMPCommand c spKey sId (SEND flags msg) >>= \case
|
||||
OK -> pure ()
|
||||
_ -> throwE PCEUnexpectedResponse
|
||||
r -> throwE . PCEUnexpectedResponse $ bshow r
|
||||
|
||||
-- | Acknowledge message delivery (server deletes the message).
|
||||
--
|
||||
|
@ -346,7 +346,7 @@ ackSMPMessage c@ProtocolClient {protocolServer, sessionId, msgQ} rpKey rId msgId
|
|||
OK -> return ()
|
||||
cmd@MSG {} ->
|
||||
lift . atomically $ mapM_ (`writeTBQueue` (protocolServer, sessionId, rId, cmd)) msgQ
|
||||
_ -> throwE PCEUnexpectedResponse
|
||||
r -> throwE . PCEUnexpectedResponse $ bshow r
|
||||
|
||||
-- | Irreversibly suspend SMP queue.
|
||||
-- The existing messages from the queue will still be delivered.
|
||||
|
@ -365,7 +365,7 @@ okSMPCommand :: PartyI p => Command p -> SMPClient -> C.APrivateSignKey -> Queue
|
|||
okSMPCommand cmd c pKey qId =
|
||||
sendSMPCommand c (Just pKey) qId cmd >>= \case
|
||||
OK -> return ()
|
||||
_ -> throwE PCEUnexpectedResponse
|
||||
r -> throwE . PCEUnexpectedResponse $ bshow r
|
||||
|
||||
-- | Send SMP command
|
||||
sendSMPCommand :: PartyI p => SMPClient -> Maybe C.APrivateSignKey -> QueueId -> Command p -> ExceptT ProtocolClientError IO BrokerMsg
|
||||
|
|
|
@ -10,6 +10,7 @@ import Data.Word (Word16)
|
|||
import Simplex.Messaging.Client
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Notifications.Protocol
|
||||
import Simplex.Messaging.Util (bshow)
|
||||
|
||||
type NtfClient = ProtocolClient NtfResponse
|
||||
|
||||
|
@ -17,7 +18,7 @@ ntfRegisterToken :: NtfClient -> C.APrivateSignKey -> NewNtfEntity 'Token -> Exc
|
|||
ntfRegisterToken c pKey newTkn =
|
||||
sendNtfCommand c (Just pKey) "" (TNEW newTkn) >>= \case
|
||||
NRTknId tknId dhKey -> pure (tknId, dhKey)
|
||||
_ -> throwE PCEUnexpectedResponse
|
||||
r -> throwE . PCEUnexpectedResponse $ bshow r
|
||||
|
||||
ntfVerifyToken :: NtfClient -> C.APrivateSignKey -> NtfTokenId -> NtfRegCode -> ExceptT ProtocolClientError IO ()
|
||||
ntfVerifyToken c pKey tknId code = okNtfCommand (TVFY code) c pKey tknId
|
||||
|
@ -26,7 +27,7 @@ ntfCheckToken :: NtfClient -> C.APrivateSignKey -> NtfTokenId -> ExceptT Protoco
|
|||
ntfCheckToken c pKey tknId =
|
||||
sendNtfCommand c (Just pKey) tknId TCHK >>= \case
|
||||
NRTkn stat -> pure stat
|
||||
_ -> throwE PCEUnexpectedResponse
|
||||
r -> throwE . PCEUnexpectedResponse $ bshow r
|
||||
|
||||
ntfReplaceToken :: NtfClient -> C.APrivateSignKey -> NtfTokenId -> DeviceToken -> ExceptT ProtocolClientError IO ()
|
||||
ntfReplaceToken c pKey tknId token = okNtfCommand (TRPL token) c pKey tknId
|
||||
|
@ -41,13 +42,13 @@ ntfCreateSubscription :: NtfClient -> C.APrivateSignKey -> NewNtfEntity 'Subscri
|
|||
ntfCreateSubscription c pKey newSub =
|
||||
sendNtfCommand c (Just pKey) "" (SNEW newSub) >>= \case
|
||||
NRSubId subId -> pure subId
|
||||
_ -> throwE PCEUnexpectedResponse
|
||||
r -> throwE . PCEUnexpectedResponse $ bshow r
|
||||
|
||||
ntfCheckSubscription :: NtfClient -> C.APrivateSignKey -> NtfSubscriptionId -> ExceptT ProtocolClientError IO NtfSubStatus
|
||||
ntfCheckSubscription c pKey subId =
|
||||
sendNtfCommand c (Just pKey) subId SCHK >>= \case
|
||||
NRSub stat -> pure stat
|
||||
_ -> throwE PCEUnexpectedResponse
|
||||
r -> throwE . PCEUnexpectedResponse $ bshow r
|
||||
|
||||
ntfDeleteSubscription :: NtfClient -> C.APrivateSignKey -> NtfSubscriptionId -> ExceptT ProtocolClientError IO ()
|
||||
ntfDeleteSubscription = okNtfCommand SDEL
|
||||
|
@ -60,4 +61,4 @@ okNtfCommand :: NtfEntityI e => NtfCommand e -> NtfClient -> C.APrivateSignKey -
|
|||
okNtfCommand cmd c pKey entId =
|
||||
sendNtfCommand c (Just pKey) entId cmd >>= \case
|
||||
NROk -> return ()
|
||||
_ -> throwE PCEUnexpectedResponse
|
||||
r -> throwE . PCEUnexpectedResponse $ bshow r
|
||||
|
|
|
@ -403,12 +403,12 @@ data NtfSubStatus
|
|||
NSActive
|
||||
| -- | disconnected/unsubscribed from SMP server
|
||||
NSInactive
|
||||
| -- | NEND received (we currently do not support it)
|
||||
| -- | END received
|
||||
NSEnd
|
||||
| -- | SMP AUTH error
|
||||
NSSMPAuth
|
||||
NSAuth
|
||||
| -- | SMP error other than AUTH
|
||||
NSSMPErr ErrorType
|
||||
NSErr ByteString
|
||||
deriving (Eq, Show)
|
||||
|
||||
instance Encoding NtfSubStatus where
|
||||
|
@ -418,8 +418,8 @@ instance Encoding NtfSubStatus where
|
|||
NSActive -> "ACTIVE"
|
||||
NSInactive -> "INACTIVE"
|
||||
NSEnd -> "END"
|
||||
NSSMPAuth -> "SMP_AUTH"
|
||||
NSSMPErr err -> "SMP_ERR " <> smpEncode err
|
||||
NSAuth -> "AUTH"
|
||||
NSErr err -> "ERR " <> err
|
||||
smpP =
|
||||
A.takeTill (== ' ') >>= \case
|
||||
"NEW" -> pure NSNew
|
||||
|
@ -427,10 +427,8 @@ instance Encoding NtfSubStatus where
|
|||
"ACTIVE" -> pure NSActive
|
||||
"INACTIVE" -> pure NSInactive
|
||||
"END" -> pure NSEnd
|
||||
"SMP_AUTH" -> pure NSSMPAuth
|
||||
"SMP_ERR" -> do
|
||||
_ <- A.space
|
||||
NSSMPErr <$> smpP
|
||||
"AUTH" -> pure NSAuth
|
||||
"ERR" -> NSErr <$> (A.space *> A.takeByteString)
|
||||
_ -> fail "bad NtfSubStatus"
|
||||
|
||||
instance StrEncoding NtfSubStatus where
|
||||
|
|
|
@ -111,27 +111,20 @@ ntfSubscriber NtfSubscriber {smpSubscribers, newSubQ, smpAgent = ca@SMPClientAge
|
|||
runSMPSubscriber :: SMPSubscriber -> m ()
|
||||
runSMPSubscriber SMPSubscriber {newSubQ = subscriberSubQ} =
|
||||
forever $
|
||||
atomically (peekTQueue subscriberSubQ) >>= \case
|
||||
NtfSub NtfSubData {smpQueue, notifierKey} -> do
|
||||
atomically (peekTQueue subscriberSubQ)
|
||||
>>= \(NtfSub NtfSubData {smpQueue, notifierKey}) -> do
|
||||
updateSubStatus smpQueue NSPending
|
||||
let SMPQueueNtf {smpServer, notifierId} = smpQueue
|
||||
liftIO (runExceptT $ subscribeQueue ca smpServer ((SPNotifier, notifierId), notifierKey)) >>= \case
|
||||
Right _ -> update smpQueue NSActive
|
||||
Left err -> case err of
|
||||
PCEProtocolError AUTH -> update smpQueue NSSMPAuth
|
||||
PCEProtocolError e -> update smpQueue $ NSSMPErr e
|
||||
PCEIOError e -> log' $ "IOError " <> T.pack (show e)
|
||||
PCEResponseError e -> log' $ "ResponseError " <> T.pack (show e)
|
||||
PCEUnexpectedResponse -> log' "UnexpectedResponse"
|
||||
PCETransportError e -> log' $ "TransportError " <> T.pack (show e)
|
||||
PCESignatureError e -> log' $ "SignatureError " <> T.pack (show e)
|
||||
PCEResponseTimeout -> pure ()
|
||||
PCENetworkError -> pure ()
|
||||
where
|
||||
update smpQueue status = do
|
||||
updateSubStatus smpQueue status
|
||||
void . atomically $ readTQueue subscriberSubQ
|
||||
log' e = logError $ "SMPSubscriber subscribeQueue " <> e
|
||||
Right _ -> do
|
||||
updateSubStatus smpQueue NSActive
|
||||
void . atomically $ readTQueue subscriberSubQ
|
||||
Left err -> do
|
||||
handleSubError smpQueue err
|
||||
case err of
|
||||
PCEResponseTimeout -> pure ()
|
||||
PCENetworkError -> pure ()
|
||||
_ -> void . atomically $ readTQueue subscriberSubQ
|
||||
|
||||
receiveSMP :: m ()
|
||||
receiveSMP = forever $ do
|
||||
|
@ -145,7 +138,7 @@ ntfSubscriber NtfSubscriber {smpSubscribers, newSubQ, smpAgent = ca@SMPClientAge
|
|||
atomically $
|
||||
findNtfSubscriptionToken st smpQueue
|
||||
>>= mapM_ (\tkn -> writeTBQueue pushQ (tkn, PNMessage PNMessageData {smpQueue, ntfTs, nmsgNonce, encNMsgMeta}))
|
||||
SMP.END -> updateSubStatus smpQueue NSInactive
|
||||
SMP.END -> updateSubStatus smpQueue NSEnd
|
||||
_ -> pure ()
|
||||
pure ()
|
||||
|
||||
|
@ -162,20 +155,23 @@ ntfSubscriber NtfSubscriber {smpSubscribers, newSubQ, smpAgent = ca@SMPClientAge
|
|||
let ntfId = snd sub
|
||||
smpQueue = SMPQueueNtf srv ntfId
|
||||
updateSubStatus smpQueue NSActive
|
||||
CASubError srv sub err -> do
|
||||
let ntfId = snd sub
|
||||
smpQueue = SMPQueueNtf srv ntfId
|
||||
case err of
|
||||
PCEProtocolError e -> case e of
|
||||
AUTH -> updateSubStatus smpQueue NSSMPAuth
|
||||
_ -> updateSubStatus smpQueue (NSSMPErr e)
|
||||
PCEResponseError e -> logErr e
|
||||
PCEUnexpectedResponse -> logErr err
|
||||
PCESignatureError e -> logErr e
|
||||
PCEIOError e -> logErr e
|
||||
_ -> pure ()
|
||||
where
|
||||
logErr e = logError $ "ntfSubscriber receiveAgent error: " <> T.pack (show e)
|
||||
CASubError srv (_, ntfId) err ->
|
||||
handleSubError (SMPQueueNtf srv ntfId) err
|
||||
|
||||
handleSubError :: SMPQueueNtf -> ProtocolClientError -> m ()
|
||||
handleSubError smpQueue = \case
|
||||
PCEProtocolError AUTH -> updateSubStatus smpQueue NSAuth
|
||||
PCEProtocolError e -> updateErr "SMP error " e
|
||||
PCEIOError e -> updateErr "IOError " e
|
||||
PCEResponseError e -> updateErr "ResponseError " e
|
||||
PCEUnexpectedResponse r -> updateErr "UnexpectedResponse " r
|
||||
PCETransportError e -> updateErr "TransportError " e
|
||||
PCESignatureError e -> updateErr "SignatureError " e
|
||||
PCEResponseTimeout -> pure ()
|
||||
PCENetworkError -> pure ()
|
||||
where
|
||||
updateErr :: Show e => ByteString -> e -> m ()
|
||||
updateErr errType e = updateSubStatus smpQueue . NSErr $ errType <> bshow e
|
||||
|
||||
updateSubStatus smpQueue status = do
|
||||
st <- asks store
|
||||
|
@ -370,7 +366,9 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu
|
|||
| otherwise -> do
|
||||
logDebug "TVFY - incorrect code or token status"
|
||||
pure $ NRErr AUTH
|
||||
TCHK -> pure $ NRTkn status
|
||||
TCHK -> do
|
||||
logDebug "TCHK"
|
||||
pure $ NRTkn status
|
||||
TRPL token' -> do
|
||||
logDebug "TRPL - replace token"
|
||||
st <- asks store
|
||||
|
|
|
@ -628,7 +628,7 @@ transmissionP = do
|
|||
command <- A.takeByteString
|
||||
pure RawTransmission {signature, signed, sessId, corrId, entityId, command}
|
||||
|
||||
class (ProtocolEncoding msg, ProtocolEncoding (ProtocolCommand msg)) => Protocol msg where
|
||||
class (ProtocolEncoding msg, ProtocolEncoding (ProtocolCommand msg), Show msg) => Protocol msg where
|
||||
type ProtocolCommand msg = cmd | cmd -> msg
|
||||
protocolClientHandshake :: forall c. Transport c => c -> C.KeyHash -> VersionRange -> ExceptT TransportError IO (THandle c)
|
||||
protocolPing :: ProtocolCommand msg
|
||||
|
|
Reference in New Issue