Skip to content

Commit e696510

Browse files
committed
service subscriptions in the client
1 parent 7295b73 commit e696510

File tree

6 files changed

+174
-65
lines changed

6 files changed

+174
-65
lines changed

src/Simplex/Messaging/Agent.hs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -594,6 +594,7 @@ testProtocolServer c nm userId srv = withAgentEnv' c $ case protocolTypeI @p of
594594
SPNTF -> runNTFServerTest c nm userId srv
595595

596596
-- | set SOCKS5 proxy on/off and optionally set TCP timeouts for fast network
597+
-- TODO [certs rcv] should fail if any user is enabled to use services and per-connection isolation is chosen
597598
setNetworkConfig :: AgentClient -> NetworkConfig -> IO ()
598599
setNetworkConfig c@AgentClient {useNetworkConfig, proxySessTs} cfg' = do
599600
ts <- getCurrentTime
@@ -771,6 +772,7 @@ deleteUser' c@AgentClient {smpServersStats, xftpServersStats} userId delSMPQueue
771772
whenM (withStore' c (`deleteUserWithoutConns` userId)) . atomically $
772773
writeTBQueue (subQ c) ("", "", AEvt SAENone $ DEL_USER userId)
773774

775+
-- TODO [certs rcv] should fail enabling if per-connection isolation is set
774776
setUserService' :: AgentClient -> UserId -> Bool -> AM ()
775777
setUserService' c userId enable = do
776778
wasEnabled <- liftIO $ fromMaybe False <$> TM.lookupIO userId (useClientServices c)
@@ -2870,7 +2872,7 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), _v, sessId
28702872
processSubOk :: RcvQueue -> TVar [ConnId] -> IO ()
28712873
processSubOk rq@RcvQueue {connId} upConnIds =
28722874
atomically . whenM (isPendingSub rq) $ do
2873-
SS.addActiveSub tSess sessId (rcvQueueSub rq) $ currentSubs c
2875+
SS.addActiveSub tSess sessId rq $ currentSubs c
28742876
modifyTVar' upConnIds (connId :)
28752877
processSubErr :: RcvQueue -> SMPClientError -> AM' ()
28762878
processSubErr rq@RcvQueue {connId} e = do

src/Simplex/Messaging/Agent/Client.hs

Lines changed: 80 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ import Simplex.Messaging.Agent.RetryInterval
241241
import Simplex.Messaging.Agent.Stats
242242
import Simplex.Messaging.Agent.Store
243243
import Simplex.Messaging.Agent.Store.AgentStore
244-
import Simplex.Messaging.Agent.Store.Common (DBStore, withTransaction)
244+
import Simplex.Messaging.Agent.Store.Common (DBStore)
245245
import qualified Simplex.Messaging.Agent.Store.DB as DB
246246
import Simplex.Messaging.Agent.Store.Entity
247247
import Simplex.Messaging.Agent.TSessionSubs (TSessionSubs)
@@ -279,7 +279,7 @@ import Simplex.Messaging.Protocol
279279
RcvNtfPublicDhKey,
280280
SMPMsgMeta (..),
281281
SProtocolType (..),
282-
ServiceSub,
282+
ServiceSub (..),
283283
SndPublicAuthKey,
284284
SubscriptionMode (..),
285285
NewNtfCreds (..),
@@ -500,6 +500,7 @@ data UserNetworkType = UNNone | UNCellular | UNWifi | UNEthernet | UNOther
500500
deriving (Eq, Show)
501501

502502
-- | Creates an SMP agent client instance that receives commands and sends responses via 'TBQueue's.
503+
-- TODO [certs rcv] should fail if both per-connection isolation is set and any users use services
503504
newAgentClient :: Int -> InitialAgentServers -> UTCTime -> Map (Maybe SMPServer) (Maybe SystemSeconds) -> Env -> IO AgentClient
504505
newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg, useServices, presetDomains, presetServers} currentTs notices agentEnv = do
505506
let cfg = config agentEnv
@@ -745,9 +746,11 @@ smpConnectClient c@AgentClient {smpClients, msgQ, proxySessTs, presetDomains} nm
745746
smp <- liftError (protocolClientError SMP $ B.unpack $ strEncode srv) $ do
746747
ts <- readTVarIO proxySessTs
747748
ExceptT $ getProtocolClient g nm tSess cfg' presetDomains (Just msgQ) ts $ smpClientDisconnected c tSess env v' prs
749+
-- TODO [certs rcv] add service to SS, possibly combine with SS.setSessionId
748750
atomically $ SS.setSessionId tSess (sessionId $ thParams smp) $ currentSubs c
749751
updateClientService service smp
750752
pure SMPConnectedClient {connectedClient = smp, proxiedRelays = prs}
753+
-- TODO [certs rcv] this should differentiate between service ID just set and service ID changed, and in the latter case disassociate the queue
751754
updateClientService service smp = case (service, smpClientService smp) of
752755
(Just (_, serviceId_), Just THClientService {serviceId})
753756
| serviceId_ /= Just serviceId -> withStore' c $ \db -> setClientServiceId db userId srv serviceId
@@ -764,32 +767,34 @@ smpClientDisconnected c@AgentClient {active, smpClients, smpProxiedRelays} tSess
764767
-- we make active subscriptions pending only if the client for tSess was current (in the map) and active,
765768
-- because we can have a race condition when a new current client could have already
766769
-- made subscriptions active, and the old client would be processing diconnection later.
767-
removeClientAndSubs :: IO ([RcvQueueSub], [ConnId])
770+
removeClientAndSubs :: IO ([RcvQueueSub], [ConnId], Maybe ServiceSub)
768771
removeClientAndSubs = atomically $ do
769772
removeSessVar v tSess smpClients
770-
ifM (readTVar active) removeSubs (pure ([], []))
773+
ifM (readTVar active) removeSubs (pure ([], [], Nothing))
771774
where
772775
sessId = sessionId $ thParams client
773776
removeSubs = do
774777
mode <- getSessionMode c
775-
subs <- SS.setSubsPending mode tSess sessId $ currentSubs c
778+
(subs, serviceSub_) <- SS.setSubsPending mode tSess sessId $ currentSubs c
776779
let qs = M.elems subs
777780
cs = nubOrd $ map qConnId qs
778781
-- this removes proxied relays that this client created sessions to
779782
destSrvs <- M.keys <$> readTVar prs
780783
forM_ destSrvs $ \destSrv -> TM.delete (userId, destSrv, cId) smpProxiedRelays
781-
pure (qs, cs)
784+
pure (qs, cs, serviceSub_)
782785

783-
serverDown :: ([RcvQueueSub], [ConnId]) -> IO ()
784-
serverDown (qs, conns) = whenM (readTVarIO active) $ do
786+
serverDown :: ([RcvQueueSub], [ConnId], Maybe ServiceSub) -> IO ()
787+
serverDown (qs, conns, serviceSub_) = whenM (readTVarIO active) $ do
785788
notifySub c $ hostEvent' DISCONNECT client
786789
unless (null conns) $ notifySub c $ DOWN srv conns
787-
unless (null qs) $ do
790+
unless (null qs && isNothing serviceSub_) $ do
788791
releaseGetLocksIO c qs
789792
mode <- getSessionModeIO c
790793
let resubscribe
791794
| (mode == TSMEntity) == isJust cId = resubscribeSMPSession c tSess
792-
| otherwise = void $ subscribeQueues c True qs
795+
| otherwise = do
796+
mapM_ (runExceptT . resubscribeClientService c tSess) serviceSub_
797+
unless (null qs) $ void $ subscribeQueues c True qs
793798
runReaderT resubscribe env
794799

795800
resubscribeSMPSession :: AgentClient -> SMPTransportSession -> AM' ()
@@ -808,11 +813,12 @@ resubscribeSMPSession c@AgentClient {smpSubWorkers, workerSeq} tSess = do
808813
runSubWorker = do
809814
ri <- asks $ reconnectInterval . config
810815
withRetryForeground ri isForeground (isNetworkOnline c) $ \_ loop -> do
811-
pending <- atomically $ SS.getPendingSubs tSess $ currentSubs c
812-
unless (M.null pending) $ do
816+
(pendingSubs, pendingSS) <- atomically $ SS.getPendingSubs tSess $ currentSubs c
817+
unless (M.null pendingSubs && isNothing pendingSS) $ do
813818
liftIO $ waitUntilForeground c
814819
liftIO $ waitForUserNetwork c
815-
handleNotify $ resubscribeSessQueues c tSess $ M.elems pending
820+
mapM_ (handleNotify . void . runExceptT . resubscribeClientService c tSess) pendingSS
821+
unless (M.null pendingSubs) $ handleNotify $ resubscribeSessQueues c tSess $ M.elems pendingSubs
816822
loop
817823
isForeground = (ASForeground ==) <$> readTVar (agentState c)
818824
cleanup :: SessionVar (Async ()) -> STM ()
@@ -1509,25 +1515,25 @@ newRcvQueue_ c nm userId connId (ProtoServerWithAuth srv auth) vRange cqrd enabl
15091515
newErr :: String -> AM (Maybe ShortLinkCreds)
15101516
newErr = throwE . BROKER (B.unpack $ strEncode srv) . UNEXPECTED . ("Create queue: " <>)
15111517

1512-
processSubResults :: AgentClient -> SMPTransportSession -> SessionId -> NonEmpty (RcvQueueSub, Either SMPClientError (Maybe ServiceId)) -> STM [(RcvQueueSub, Maybe ClientNotice)]
1513-
processSubResults c tSess@(userId, srv, _) sessId rs = do
1514-
pendingSubs <- SS.getPendingSubs tSess $ currentSubs c
1515-
let (failed, subscribed, notices, ignored) = foldr (partitionResults pendingSubs) (M.empty, [], [], 0) rs
1518+
processSubResults :: AgentClient -> SMPTransportSession -> SessionId -> Maybe ServiceId -> NonEmpty (RcvQueueSub, Either SMPClientError (Maybe ServiceId)) -> STM ([RcvQueueSub], [(RcvQueueSub, Maybe ClientNotice)])
1519+
processSubResults c tSess@(userId, srv, _) sessId smpServiceId rs = do
1520+
pending <- SS.getPendingSubs tSess $ currentSubs c
1521+
let (failed, subscribed@(qs, sQs), notices, ignored) = foldr (partitionResults pending) (M.empty, ([], []), [], 0) rs
15161522
unless (M.null failed) $ do
15171523
incSMPServerStat' c userId srv connSubErrs $ M.size failed
15181524
failSubscriptions c tSess failed
1519-
unless (null subscribed) $ do
1520-
incSMPServerStat' c userId srv connSubscribed $ length subscribed
1525+
unless (null qs && null sQs) $ do
1526+
incSMPServerStat' c userId srv connSubscribed $ length qs + length sQs
15211527
SS.batchAddActiveSubs tSess sessId subscribed $ currentSubs c
15221528
unless (ignored == 0) $ incSMPServerStat' c userId srv connSubIgnored ignored
1523-
pure notices
1529+
pure (sQs, notices)
15241530
where
15251531
partitionResults ::
1526-
Map SMP.RecipientId RcvQueueSub ->
1532+
(Map SMP.RecipientId RcvQueueSub, Maybe ServiceSub) ->
15271533
(RcvQueueSub, Either SMPClientError (Maybe ServiceId)) ->
1528-
(Map SMP.RecipientId SMPClientError, [RcvQueueSub], [(RcvQueueSub, Maybe ClientNotice)], Int) ->
1529-
(Map SMP.RecipientId SMPClientError, [RcvQueueSub], [(RcvQueueSub, Maybe ClientNotice)], Int)
1530-
partitionResults pendingSubs (rq@RcvQueueSub {rcvId, clientNoticeId}, r) acc@(failed, subscribed, notices, ignored) = case r of
1534+
(Map SMP.RecipientId SMPClientError, ([RcvQueueSub], [RcvQueueSub]), [(RcvQueueSub, Maybe ClientNotice)], Int) ->
1535+
(Map SMP.RecipientId SMPClientError, ([RcvQueueSub], [RcvQueueSub]), [(RcvQueueSub, Maybe ClientNotice)], Int)
1536+
partitionResults (pendingSubs, pendingSS) (rq@RcvQueueSub {rcvId, clientNoticeId}, r) acc@(failed, subscribed@(qs, sQs), notices, ignored) = case r of
15311537
Left e -> case smpErrorClientNotice e of
15321538
Just notice_ -> (failed', subscribed, (rq, notice_) : notices, ignored)
15331539
where
@@ -1537,8 +1543,12 @@ processSubResults c tSess@(userId, srv, _) sessId rs = do
15371543
| otherwise -> (failed', subscribed, notices, ignored)
15381544
where
15391545
failed' = M.insert rcvId e failed
1540-
Right _serviceId -- TODO [certs rcv] store association with the service
1541-
| rcvId `M.member` pendingSubs -> (failed, rq : subscribed, notices', ignored)
1546+
Right serviceId_
1547+
| rcvId `M.member` pendingSubs ->
1548+
let subscribed' = case (smpServiceId, serviceId_, pendingSS) of
1549+
(Just sId, Just sId', Just ServiceSub {serviceId}) | sId == sId' && sId == serviceId -> (qs, rq : sQs)
1550+
_ -> (rq : qs, sQs)
1551+
in (failed, subscribed', notices', ignored)
15421552
| otherwise -> (failed, subscribed, notices', ignored + 1)
15431553
where
15441554
notices' = if isJust clientNoticeId then (rq, Nothing) : notices else notices
@@ -1577,6 +1587,7 @@ serverHostError = \case
15771587

15781588
-- | Batch by transport session and subscribe queues. The list of results can have a different order.
15791589
subscribeQueues :: AgentClient -> Bool -> [RcvQueueSub] -> AM' [(RcvQueueSub, Either AgentErrorType (Maybe ServiceId))]
1590+
subscribeQueues _ _ [] = pure []
15801591
subscribeQueues c withEvents qs = do
15811592
(errs, qs') <- checkQueues c qs
15821593
atomically $ modifyTVar' (subscrConns c) (`S.union` S.fromList (map qConnId qs'))
@@ -1633,6 +1644,7 @@ checkQueues c = fmap partitionEithers . mapM checkQueue
16331644
-- This function expects that all queues belong to one transport session,
16341645
-- and that they are already added to pending subscriptions.
16351646
resubscribeSessQueues :: AgentClient -> SMPTransportSession -> [RcvQueueSub] -> AM' ()
1647+
resubscribeSessQueues _ _ [] = pure ()
16361648
resubscribeSessQueues c tSess qs = do
16371649
(errs, qs_) <- checkQueues c qs
16381650
forM_ (L.nonEmpty qs_) $ \qs' -> void $ subscribeSessQueues_ c True (tSess, qs')
@@ -1651,13 +1663,15 @@ subscribeSessQueues_ c withEvents qs = sendClientBatch_ "SUB" False subscribe_ c
16511663
then Just . S.fromList . map qConnId . M.elems <$> atomically (SS.getActiveSubs tSess $ currentSubs c)
16521664
else pure Nothing
16531665
active <- E.uninterruptibleMask_ $ do
1654-
(active, notices) <- atomically $ do
1655-
r@(_, notices) <- ifM
1666+
(active, (serviceQs, notices)) <- atomically $ do
1667+
r@(_, (_, notices)) <- ifM
16561668
(activeClientSession c tSess sessId)
1657-
((True,) <$> processSubResults c tSess sessId rs)
1658-
((False, []) <$ incSMPServerStat' c userId srv connSubIgnored (length rs))
1669+
((True,) <$> processSubResults c tSess sessId smpServiceId rs)
1670+
((False, ([], [])) <$ incSMPServerStat' c userId srv connSubIgnored (length rs))
16591671
unless (null notices) $ takeTMVar $ clientNoticesLock c
16601672
pure r
1673+
unless (null serviceQs) $ void $
1674+
processRcvServiceAssocs c serviceQs `runReaderT` agentEnv c
16611675
unless (null notices) $ void $
16621676
(processClientNotices c tSess notices `runReaderT` agentEnv c)
16631677
`E.finally` atomically (putTMVar (clientNoticesLock c) ())
@@ -1678,6 +1692,13 @@ subscribeSessQueues_ c withEvents qs = sendClientBatch_ "SUB" False subscribe_ c
16781692
where
16791693
tSess = transportSession' smp
16801694
sessId = sessionId $ thParams smp
1695+
smpServiceId = (\THClientService {serviceId} -> serviceId) <$> smpClientService smp
1696+
1697+
processRcvServiceAssocs :: AgentClient -> [RcvQueueSub] -> AM' ()
1698+
processRcvServiceAssocs c serviceQs =
1699+
withStore' c (`setRcvServiceAssocs` serviceQs) `catchAllErrors'` \e -> do
1700+
logError $ "processClientNotices error: " <> tshow e
1701+
notifySub' c "" $ ERR e
16811702

16821703
processClientNotices :: AgentClient -> SMPTransportSession -> [(RcvQueueSub, Maybe ClientNotice)] -> AM' ()
16831704
processClientNotices c@AgentClient {presetServers} tSess notices = do
@@ -1690,10 +1711,35 @@ processClientNotices c@AgentClient {presetServers} tSess notices = do
16901711
logError $ "processClientNotices error: " <> tshow e
16911712
notifySub' c "" $ ERR e
16921713

1714+
resubscribeClientService :: AgentClient -> SMPTransportSession -> ServiceSub -> AM ServiceSub
1715+
resubscribeClientService c tSess (ServiceSub _ n idsHash) =
1716+
withServiceClient c tSess $ \smp _ -> do
1717+
subscribeClientService_ c tSess smp n idsHash
1718+
16931719
subscribeClientService :: AgentClient -> UserId -> SMPServer -> Int64 -> IdsHash -> AM ServiceSub
16941720
subscribeClientService c userId srv n idsHash =
1695-
withLogClient c NRMBackground (userId, srv, Nothing) B.empty "SUBS" $ \(SMPConnectedClient smp _) ->
1696-
subscribeService smp SMP.SRecipientService n idsHash
1721+
withServiceClient c tSess $ \smp smpServiceId -> do
1722+
let serviceSub = ServiceSub smpServiceId n idsHash
1723+
atomically $ SS.setPendingServiceSub tSess serviceSub $ currentSubs c
1724+
subscribeClientService_ c tSess smp n idsHash
1725+
where
1726+
tSess = (userId, srv, Nothing)
1727+
1728+
withServiceClient :: AgentClient -> SMPTransportSession -> (SMPClient -> ServiceId -> ExceptT SMPClientError IO a) -> AM a
1729+
withServiceClient c tSess action =
1730+
withLogClient c NRMBackground tSess B.empty "SUBS" $ \(SMPConnectedClient smp _) ->
1731+
case (\THClientService {serviceId} -> serviceId) <$> smpClientService smp of
1732+
Just smpServiceId -> action smp smpServiceId
1733+
Nothing -> throwE PCEServiceUnavailable
1734+
1735+
subscribeClientService_ :: AgentClient -> SMPTransportSession -> SMPClient -> Int64 -> IdsHash -> ExceptT SMPClientError IO ServiceSub
1736+
subscribeClientService_ c tSess smp n idsHash = do
1737+
-- TODO [certs rcv] handle error
1738+
serviceSub' <- subscribeService smp SMP.SRecipientService n idsHash
1739+
let sessId = sessionId $ thParams smp
1740+
atomically $ whenM (activeClientSession c tSess sessId) $
1741+
SS.setActiveServiceSub tSess sessId serviceSub' $ currentSubs c
1742+
pure serviceSub'
16971743

16981744
activeClientSession :: AgentClient -> SMPTransportSession -> SessionId -> STM Bool
16991745
activeClientSession c tSess sessId = sameSess <$> tryReadSessVar tSess (smpClients c)
@@ -1763,7 +1809,7 @@ addNewQueueSubscription c rq' tSess sessId = do
17631809
modifyTVar' (subscrConns c) $ S.insert $ qConnId rq
17641810
active <- activeClientSession c tSess sessId
17651811
if active
1766-
then SS.addActiveSub tSess sessId rq $ currentSubs c
1812+
then SS.addActiveSub tSess sessId rq' $ currentSubs c
17671813
else SS.addPendingSub tSess rq $ currentSubs c
17681814
pure active
17691815
unless same $ resubscribeSMPSession c tSess
@@ -1952,6 +1998,7 @@ releaseGetLock c rq =
19521998
{-# INLINE releaseGetLock #-}
19531999

19542000
releaseGetLocksIO :: SomeRcvQueue q => AgentClient -> [q] -> IO ()
2001+
releaseGetLocksIO _ [] = pure ()
19552002
releaseGetLocksIO c rqs = do
19562003
locks <- readTVarIO $ getMsgLocks c
19572004
forM_ rqs $ \rq ->

src/Simplex/Messaging/Agent/Store/AgentStore.hs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ module Simplex.Messaging.Agent.Store.AgentStore
5353
getSubscriptionServers,
5454
getUserServerRcvQueueSubs,
5555
unsetQueuesToSubscribe,
56+
setRcvServiceAssocs,
5657
getConnIds,
5758
getConn,
5859
getDeletedConn,
@@ -2249,6 +2250,14 @@ getUserServerRcvQueueSubs db userId srv onlyNeeded =
22492250
unsetQueuesToSubscribe :: DB.Connection -> IO ()
22502251
unsetQueuesToSubscribe db = DB.execute_ db "UPDATE rcv_queues SET to_subscribe = 0 WHERE to_subscribe = 1"
22512252

2253+
setRcvServiceAssocs :: DB.Connection -> [RcvQueueSub] -> IO ()
2254+
setRcvServiceAssocs db rqs =
2255+
#if defined(dbPostgres)
2256+
DB.execute db "UPDATE rcv_queues SET rcv_service_assoc = 1 WHERE rcv_id IN " $ Only $ In (map queueId rqs)
2257+
#else
2258+
DB.executeMany db "UPDATE rcv_queues SET rcv_service_assoc = 1 WHERE rcv_id = " $ map (Only . queueId) rqs
2259+
#endif
2260+
22522261
-- * getConn helpers
22532262

22542263
getConnIds :: DB.Connection -> IO [ConnId]

0 commit comments

Comments
 (0)