ntf: notify client about ntf supervisor internal errors (#455)
Co-authored-by: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com>
This commit is contained in:
parent
f9cd7e5416
commit
57d83ae42d
|
@ -49,21 +49,24 @@ import UnliftIO.Concurrent (forkIO, threadDelay)
|
|||
import qualified UnliftIO.Exception as E
|
||||
import UnliftIO.STM
|
||||
|
||||
runNtfSupervisor :: (MonadUnliftIO m, MonadReader Env m) => AgentClient -> m ()
|
||||
runNtfSupervisor :: forall m. (MonadUnliftIO m, MonadReader Env m) => AgentClient -> m ()
|
||||
runNtfSupervisor c = do
|
||||
ns <- asks ntfSupervisor
|
||||
forever . handleError $ do
|
||||
cmd <- atomically . readTBQueue $ ntfSubQ ns
|
||||
agentOperationBracket c AONtfNetwork $
|
||||
forever $ do
|
||||
cmd@(connId, _) <- atomically . readTBQueue $ ntfSubQ ns
|
||||
handleError connId . agentOperationBracket c AONtfNetwork $
|
||||
runExceptT (processNtfSub c cmd) >>= \case
|
||||
Left e -> liftIO $ print e
|
||||
Left e -> notifyErr connId e
|
||||
Right _ -> return ()
|
||||
where
|
||||
handleError = E.handle $ \(e :: E.SomeException) -> do
|
||||
handleError :: ConnId -> m () -> m ()
|
||||
handleError connId = E.handle $ \(e :: E.SomeException) -> do
|
||||
logError $ "runNtfSupervisor error " <> tshow e
|
||||
notifyErr connId e
|
||||
notifyErr connId e = notifyInternalError c connId $ "runNtfSupervisor error " <> show e
|
||||
|
||||
processNtfSub :: forall m. AgentMonad m => AgentClient -> (ConnId, NtfSupervisorCommand) -> m ()
|
||||
processNtfSub c@AgentClient {subQ} (connId, cmd) = do
|
||||
processNtfSub c (connId, cmd) = do
|
||||
logInfo $ "processNtfSub - connId = " <> tshow connId <> " - cmd = " <> tshow cmd
|
||||
case cmd of
|
||||
NSCCreate -> do
|
||||
|
@ -127,7 +130,7 @@ processNtfSub c@AgentClient {subQ} (connId, cmd) = do
|
|||
ts <- liftIO getCurrentTime
|
||||
withStore' c $ \db -> supervisorUpdateNtfSubAction db connId (NtfSubSMPAction NSASmpDelete) ts
|
||||
addNtfSMPWorker smpServer
|
||||
_ -> err "NSCSmpDelete - no rcv queue"
|
||||
_ -> notifyInternalError c connId "NSCSmpDelete - no rcv queue"
|
||||
NSCNtfWorker ntfServer ->
|
||||
addNtfNTFWorker ntfServer
|
||||
NSCNtfSMPWorker smpServer ->
|
||||
|
@ -149,8 +152,6 @@ processNtfSub c@AgentClient {subQ} (connId, cmd) = do
|
|||
atomically $ TM.insert srv (doWork, worker) ws
|
||||
Just (doWork, _) ->
|
||||
void . atomically $ tryPutTMVar doWork ()
|
||||
err :: String -> m ()
|
||||
err internalErrStr = atomically $ writeTBQueue subQ ("", connId, AP.ERR $ AP.INTERNAL internalErrStr)
|
||||
|
||||
withNtfServer :: AgentMonad m => AgentClient -> (NtfServer -> m ()) -> m ()
|
||||
withNtfServer c action = getNtfServer c >>= mapM_ action
|
||||
|
@ -173,7 +174,7 @@ runNtfWorker c srv doWork = do
|
|||
ri <- asks $ reconnectInterval . config
|
||||
withRetryInterval ri $ \loop ->
|
||||
processAction a
|
||||
`catchError` retryOnError c "NtfWorker" loop (ntfInternalError c connId . show)
|
||||
`catchError` retryOnError c "NtfWorker" loop (workerInternalError c connId . show)
|
||||
noWorkToDo = void . atomically $ tryTakeTMVar doWork
|
||||
processAction :: (NtfSubscription, NtfSubNTFAction, NtfActionTs) -> m ()
|
||||
processAction (sub@NtfSubscription {connId, smpServer, ntfSubId}, action, actionTs) = do
|
||||
|
@ -191,8 +192,8 @@ runNtfWorker c srv doWork = do
|
|||
let actionTs' = addUTCTime 30 ts
|
||||
withStore' c $ \db ->
|
||||
updateNtfSubscription db sub {ntfSubId = Just nSubId, ntfSubStatus = NASCreated NSNew} (NtfSubNTFAction NSACheck) actionTs'
|
||||
_ -> ntfInternalError c connId "NSACreate - no notifier queue credentials"
|
||||
_ -> ntfInternalError c connId "NSACreate - no active token"
|
||||
_ -> workerInternalError c connId "NSACreate - no notifier queue credentials"
|
||||
_ -> workerInternalError c connId "NSACreate - no active token"
|
||||
NSACheck ->
|
||||
getNtfToken >>= \case
|
||||
Just tkn ->
|
||||
|
@ -206,10 +207,10 @@ runNtfWorker c srv doWork = do
|
|||
updateNtfSubscription db sub {ntfServer, ntfQueueId = Nothing, ntfSubId = Nothing, ntfSubStatus = NASNew} (NtfSubSMPAction NSASmpKey) ts
|
||||
ns <- asks ntfSupervisor
|
||||
atomically $ writeTBQueue (ntfSubQ ns) (connId, NSCNtfSMPWorker smpServer)
|
||||
_ -> ntfInternalError c connId "NSACheck - failed to reset subscription, notification server not configured"
|
||||
_ -> workerInternalError c connId "NSACheck - failed to reset subscription, notification server not configured"
|
||||
status -> updateSubNextCheck ts status
|
||||
Nothing -> ntfInternalError c connId "NSACheck - no subscription ID"
|
||||
_ -> ntfInternalError c connId "NSACheck - no active token"
|
||||
Nothing -> workerInternalError c connId "NSACheck - no subscription ID"
|
||||
_ -> workerInternalError c connId "NSACheck - no active token"
|
||||
NSADelete -> case ntfSubId of
|
||||
Just nSubId ->
|
||||
(getNtfToken >>= \tkn -> forM_ tkn $ agentNtfDeleteSubscription c nSubId)
|
||||
|
@ -248,7 +249,7 @@ runNtfSMPWorker c srv doWork = do
|
|||
ri <- asks $ reconnectInterval . config
|
||||
withRetryInterval ri $ \loop ->
|
||||
processAction a
|
||||
`catchError` retryOnError c "NtfSMPWorker" loop (ntfInternalError c connId . show)
|
||||
`catchError` retryOnError c "NtfSMPWorker" loop (workerInternalError c connId . show)
|
||||
noWorkToDo = void . atomically $ tryTakeTMVar doWork
|
||||
processAction :: (NtfSubscription, NtfSubSMPAction, NtfActionTs) -> m ()
|
||||
processAction (sub@NtfSubscription {connId, ntfServer}, smpAction, actionTs) = do
|
||||
|
@ -269,7 +270,7 @@ runNtfSMPWorker c srv doWork = do
|
|||
updateNtfSubscription db sub {ntfQueueId = Just notifierId, ntfSubStatus = NASKey} (NtfSubNTFAction NSACreate) ts
|
||||
ns <- asks ntfSupervisor
|
||||
atomically $ sendNtfSubCommand ns (connId, NSCNtfWorker ntfServer)
|
||||
_ -> ntfInternalError c connId "NSASmpKey - no active token"
|
||||
_ -> workerInternalError c connId "NSASmpKey - no active token"
|
||||
NSASmpDelete -> do
|
||||
rq_ <- withStore' c $ \db -> do
|
||||
setRcvQueueNtfCreds db connId Nothing
|
||||
|
@ -306,10 +307,13 @@ retryOnError c name loop done e = do
|
|||
atomically $ beginAgentOperation c AONtfNetwork
|
||||
loop
|
||||
|
||||
ntfInternalError :: AgentMonad m => AgentClient -> ConnId -> String -> m ()
|
||||
ntfInternalError c@AgentClient {subQ} connId internalErrStr = do
|
||||
workerInternalError :: AgentMonad m => AgentClient -> ConnId -> String -> m ()
|
||||
workerInternalError c connId internalErrStr = do
|
||||
withStore' c $ \db -> setNullNtfSubscriptionAction db connId
|
||||
atomically $ writeTBQueue subQ ("", connId, AP.ERR $ AP.INTERNAL internalErrStr)
|
||||
notifyInternalError c connId internalErrStr
|
||||
|
||||
notifyInternalError :: (MonadUnliftIO m) => AgentClient -> ConnId -> String -> m ()
|
||||
notifyInternalError AgentClient {subQ} connId internalErrStr = atomically $ writeTBQueue subQ ("", connId, AP.ERR $ AP.INTERNAL internalErrStr)
|
||||
|
||||
getNtfToken :: AgentMonad m => m (Maybe NtfToken)
|
||||
getNtfToken = do
|
||||
|
|
Reference in New Issue