Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions simplexmq.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ library
Simplex.Messaging.Agent.Store.Postgres.Migrations.M20250702_conn_invitations_remove_cascade_delete
Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251009_queue_to_subscribe
Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251010_client_notices
Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251020_service_certs
else
exposed-modules:
Simplex.Messaging.Agent.Store.SQLite
Expand Down Expand Up @@ -217,12 +218,14 @@ library
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20251009_queue_to_subscribe
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20251010_client_notices
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20251020_service_certs
Simplex.Messaging.Agent.Store.SQLite.Util
if flag(client_postgres) || flag(server_postgres)
exposed-modules:
Simplex.Messaging.Agent.Store.Postgres
Simplex.Messaging.Agent.Store.Postgres.Common
Simplex.Messaging.Agent.Store.Postgres.DB
Simplex.Messaging.Agent.Store.Postgres.Migrations
Simplex.Messaging.Agent.Store.Postgres.Migrations.Util
Simplex.Messaging.Agent.Store.Postgres.Util
if !flag(client_library)
exposed-modules:
Expand Down
27 changes: 15 additions & 12 deletions src/Simplex/Messaging/Agent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,6 @@ import Simplex.Messaging.Protocol
ErrorType (AUTH),
MsgBody,
MsgFlags (..),
IdsHash,
NtfServer,
ProtoServerWithAuth (..),
ProtocolServer (..),
Expand All @@ -222,6 +221,7 @@ import Simplex.Messaging.Protocol
SMPMsgMeta,
SParty (..),
SProtocolType (..),
ServiceSub (..),
SndPublicAuthKey,
SubscriptionMode (..),
UserProtocol,
Expand Down Expand Up @@ -500,7 +500,7 @@ resubscribeConnections :: AgentClient -> [ConnId] -> AE (Map ConnId (Either Agen
resubscribeConnections c = withAgentEnv c . resubscribeConnections' c
{-# INLINE resubscribeConnections #-}

subscribeClientServices :: AgentClient -> UserId -> AE (Map SMPServer (Either AgentErrorType (Int64, IdsHash)))
subscribeClientServices :: AgentClient -> UserId -> AE (Map SMPServer (Either AgentErrorType ServiceSub))
subscribeClientServices c = withAgentEnv c . subscribeClientServices' c
{-# INLINE subscribeClientServices #-}

Expand Down Expand Up @@ -594,6 +594,7 @@ testProtocolServer c nm userId srv = withAgentEnv' c $ case protocolTypeI @p of
SPNTF -> runNTFServerTest c nm userId srv

-- | set SOCKS5 proxy on/off and optionally set TCP timeouts for fast network
-- TODO [certs rcv] should fail if any user is enabled to use services and per-connection isolation is chosen
setNetworkConfig :: AgentClient -> NetworkConfig -> IO ()
setNetworkConfig c@AgentClient {useNetworkConfig, proxySessTs} cfg' = do
ts <- getCurrentTime
Expand Down Expand Up @@ -771,6 +772,7 @@ deleteUser' c@AgentClient {smpServersStats, xftpServersStats} userId delSMPQueue
whenM (withStore' c (`deleteUserWithoutConns` userId)) . atomically $
writeTBQueue (subQ c) ("", "", AEvt SAENone $ DEL_USER userId)

-- TODO [certs rcv] should fail enabling if per-connection isolation is set
setUserService' :: AgentClient -> UserId -> Bool -> AM ()
setUserService' c userId enable = do
wasEnabled <- liftIO $ fromMaybe False <$> TM.lookupIO userId (useClientServices c)
Expand Down Expand Up @@ -1507,15 +1509,15 @@ resubscribeConnections' c connIds = do
[] -> pure True
rqs' -> anyM $ map (atomically . hasActiveSubscription c) rqs'

-- TODO [certs rcv] compare hash with lock
subscribeClientServices' :: AgentClient -> UserId -> AM (Map SMPServer (Either AgentErrorType (Int64, IdsHash)))
-- TODO [certs rcv] compare hash. possibly, it should return both expected and returned counts
subscribeClientServices' :: AgentClient -> UserId -> AM (Map SMPServer (Either AgentErrorType ServiceSub))
subscribeClientServices' c userId =
ifM useService subscribe $ throwError $ CMD PROHIBITED "no user service allowed"
where
useService = liftIO $ (Just True ==) <$> TM.lookupIO userId (useClientServices c)
subscribe = do
srvs <- withStore' c (`getClientServiceServers` userId)
lift $ M.fromList . zip srvs <$> mapConcurrently (tryAllErrors' . subscribeClientService c userId) srvs
lift $ M.fromList <$> mapConcurrently (\(srv, ServiceSub _ n idsHash) -> fmap (srv,) $ tryAllErrors' $ subscribeClientService c userId srv n idsHash) srvs

-- requesting messages sequentially, to reduce memory usage
getConnectionMessages' :: AgentClient -> NonEmpty ConnMsgReq -> AM' (NonEmpty (Either AgentErrorType (Maybe SMPMsgMeta)))
Expand Down Expand Up @@ -2829,12 +2831,13 @@ processSMPTransmissions :: AgentClient -> ServerTransmissionBatch SMPVersion Err
processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), _v, sessId, ts) = do
upConnIds <- newTVarIO []
forM_ ts $ \(entId, t) -> case t of
STEvent msgOrErr ->
withRcvConn entId $ \rq@RcvQueue {connId} conn -> case msgOrErr of
Right msg -> runProcessSMP rq conn (toConnData conn) msg
Left e -> lift $ do
processClientNotice rq e
notifyErr connId e
STEvent msgOrErr
| entId == SMP.NoEntity -> pure () -- TODO [certs rcv] process SALL
| otherwise -> withRcvConn entId $ \rq@RcvQueue {connId} conn -> case msgOrErr of
Right msg -> runProcessSMP rq conn (toConnData conn) msg
Left e -> lift $ do
processClientNotice rq e
notifyErr connId e
STResponse (Cmd SRecipient cmd) respOrErr ->
withRcvConn entId $ \rq conn -> case cmd of
SMP.SUB -> case respOrErr of
Expand Down Expand Up @@ -2870,7 +2873,7 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), _v, sessId
processSubOk :: RcvQueue -> TVar [ConnId] -> IO ()
processSubOk rq@RcvQueue {connId} upConnIds =
atomically . whenM (isPendingSub rq) $ do
SS.addActiveSub tSess sessId (rcvQueueSub rq) $ currentSubs c
SS.addActiveSub tSess sessId rq $ currentSubs c
modifyTVar' upConnIds (connId :)
processSubErr :: RcvQueue -> SMPClientError -> AM' ()
processSubErr rq@RcvQueue {connId} e = do
Expand Down
Loading
Loading