Skip to content

Commit 7295b73

Browse files
committed
agent database triggers
1 parent a0646c4 commit 7295b73

File tree

6 files changed

+234
-31
lines changed

6 files changed

+234
-31
lines changed

src/Simplex/Messaging/Agent/Client.hs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2302,7 +2302,8 @@ withStore c action = do
23022302
[ E.Handler $ \(e :: SQL.SQLError) ->
23032303
let se = SQL.sqlError e
23042304
busy = se == SQL.ErrorBusy || se == SQL.ErrorLocked
2305-
in pure . Left . (if busy then SEDatabaseBusy else SEInternal) $ bshow se,
2305+
err = tshow se <> ": " <> SQL.sqlErrorDetails e <> ", " <> SQL.sqlErrorContext e
2306+
in pure . Left . (if busy then SEDatabaseBusy else SEInternal) $ encodeUtf8 err,
23062307
E.Handler $ \(E.SomeException e) -> pure . Left $ SEInternal $ bshow e
23072308
]
23082309
#endif

src/Simplex/Messaging/Agent/Store/Postgres/Migrations/M20251020_service_certs.hs

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,84 @@ CREATE UNIQUE INDEX idx_server_certs_user_id_host_port ON client_services(user_i
2727
CREATE INDEX idx_server_certs_host_port ON client_services(host, port);
2828

2929
ALTER TABLE rcv_queues ADD COLUMN rcv_service_assoc SMALLINT NOT NULL DEFAULT 0;
30+
31+
CREATE FUNCTION update_aggregates(p_conn_id BYTEA, p_host TEXT, p_port TEXT, p_change BIGINT, p_rcv_id BYTEA) RETURNS VOID
32+
LANGUAGE plpgsql
33+
AS $$
34+
DECLARE q_user_id BIGINT;
35+
BEGIN
36+
SELECT user_id INTO q_user_id FROM connections WHERE conn_id = p_conn_id;
37+
UPDATE client_services
38+
SET service_queue_count = service_queue_count + p_change,
39+
service_queue_ids_hash = xor_combine(service_queue_ids_hash, public.digest(p_rcv_id, 'md5'))
40+
WHERE user_id = q_user_id AND host = p_host AND port = p_port;
41+
END;
42+
$$;
43+
44+
CREATE FUNCTION on_rcv_queue_insert() RETURNS TRIGGER
45+
LANGUAGE plpgsql
46+
AS $$
47+
BEGIN
48+
IF NEW.rcv_service_assoc != 0 AND NEW.deleted = 0 THEN
49+
PERFORM update_aggregates(NEW.conn_id, NEW.host, NEW.port, 1, NEW.rcv_id);
50+
END IF;
51+
RETURN NEW;
52+
END;
53+
$$;
54+
55+
CREATE FUNCTION on_rcv_queue_delete() RETURNS TRIGGER
56+
LANGUAGE plpgsql
57+
AS $$
58+
BEGIN
59+
IF OLD.rcv_service_assoc != 0 AND OLD.deleted = 0 THEN
60+
PERFORM update_aggregates(OLD.conn_id, OLD.host, OLD.port, -1, OLD.rcv_id);
61+
END IF;
62+
RETURN OLD;
63+
END;
64+
$$;
65+
66+
CREATE FUNCTION on_rcv_queue_update() RETURNS TRIGGER
67+
LANGUAGE plpgsql
68+
AS $$
69+
BEGIN
70+
IF OLD.rcv_service_assoc != 0 AND OLD.deleted = 0 THEN
71+
IF NOT (NEW.rcv_service_assoc != 0 AND NEW.deleted = 0) THEN
72+
PERFORM update_aggregates(OLD.conn_id, OLD.host, OLD.port, -1, OLD.rcv_id);
73+
END IF;
74+
ELSIF NEW.rcv_service_assoc != 0 AND NEW.deleted = 0 THEN
75+
PERFORM update_aggregates(NEW.conn_id, NEW.host, NEW.port, 1, NEW.rcv_id);
76+
END IF;
77+
RETURN NEW;
78+
END;
79+
$$;
80+
81+
CREATE TRIGGER tr_rcv_queue_insert
82+
AFTER INSERT ON rcv_queues
83+
FOR EACH ROW EXECUTE PROCEDURE on_rcv_queue_insert();
84+
85+
CREATE TRIGGER tr_rcv_queue_delete
86+
AFTER DELETE ON rcv_queues
87+
FOR EACH ROW EXECUTE PROCEDURE on_rcv_queue_delete();
88+
89+
CREATE TRIGGER tr_rcv_queue_update
90+
AFTER UPDATE ON rcv_queues
91+
FOR EACH ROW EXECUTE PROCEDURE on_rcv_queue_update();
3092
|]
3193

3294
down_m20251020_service_certs :: Text
3395
down_m20251020_service_certs =
3496
T.pack
3597
[r|
98+
DROP TRIGGER tr_rcv_queue_insert ON rcv_queues;
99+
DROP TRIGGER tr_rcv_queue_delete ON rcv_queues;
100+
DROP TRIGGER tr_rcv_queue_update ON rcv_queues;
101+
102+
DROP FUNCTION on_rcv_queue_insert;
103+
DROP FUNCTION on_rcv_queue_delete;
104+
DROP FUNCTION on_rcv_queue_update;
105+
106+
DROP FUNCTION update_aggregates;
107+
36108
ALTER TABLE rcv_queues DROP COLUMN rcv_service_assoc;
37109

38110
DROP INDEX idx_server_certs_host_port;

src/Simplex/Messaging/Agent/Store/Postgres/Migrations/agent_postgres_schema.sql

Lines changed: 52 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ CREATE FUNCTION smp_agent_test_protocol_schema.on_rcv_queue_delete() RETURNS tri
2121
AS $$
2222
BEGIN
2323
IF OLD.rcv_service_assoc != 0 AND OLD.deleted = 0 THEN
24-
PERFORM update_aggregates(OLD.user_id, OLD.host, OLD.port, -1, OLD.rcv_id);
24+
PERFORM update_aggregates(OLD.conn_id, OLD.host, OLD.port, -1, OLD.rcv_id);
2525
END IF;
2626
RETURN OLD;
2727
END;
@@ -34,7 +34,7 @@ CREATE FUNCTION smp_agent_test_protocol_schema.on_rcv_queue_insert() RETURNS tri
3434
AS $$
3535
BEGIN
3636
IF NEW.rcv_service_assoc != 0 AND NEW.deleted = 0 THEN
37-
PERFORM update_aggregates(NEW.user_id, NEW.host, NEW.port, 1, NEW.rcv_id);
37+
PERFORM update_aggregates(NEW.conn_id, NEW.host, NEW.port, 1, NEW.rcv_id);
3838
END IF;
3939
RETURN NEW;
4040
END;
@@ -48,58 +48,57 @@ CREATE FUNCTION smp_agent_test_protocol_schema.on_rcv_queue_update() RETURNS tri
4848
BEGIN
4949
IF OLD.rcv_service_assoc != 0 AND OLD.deleted = 0 THEN
5050
IF NOT (NEW.rcv_service_assoc != 0 AND NEW.deleted = 0) THEN
51-
PERFORM update_aggregates(OLD.user_id, OLD.host, OLD.port, -1, OLD.rcv_id);
51+
PERFORM update_aggregates(OLD.conn_id, OLD.host, OLD.port, -1, OLD.rcv_id);
5252
END IF;
5353
ELSIF NEW.rcv_service_assoc != 0 AND NEW.deleted = 0 THEN
54-
PERFORM update_aggregates(NEW.user_id, NEW.host, NEW.port, 1, NEW.rcv_id);
54+
PERFORM update_aggregates(NEW.conn_id, NEW.host, NEW.port, 1, NEW.rcv_id);
5555
END IF;
5656
RETURN NEW;
5757
END;
5858
$$;
5959

6060

6161

62-
CREATE FUNCTION smp_agent_test_protocol_schema.update_aggregates(p_user_id bigint, p_host text, p_port text, p_change bigint, p_rcv_id bytea) RETURNS void
62+
CREATE FUNCTION smp_agent_test_protocol_schema.update_aggregates(p_conn_id bytea, p_host text, p_port text, p_change bigint, p_rcv_id bytea) RETURNS void
6363
LANGUAGE plpgsql
6464
AS $$
65+
DECLARE q_user_id BIGINT;
6566
BEGIN
67+
SELECT user_id INTO q_user_id FROM connections WHERE conn_id = p_conn_id;
6668
UPDATE client_services
6769
SET service_queue_count = service_queue_count + p_change,
6870
service_queue_ids_hash = xor_combine(service_queue_ids_hash, public.digest(p_rcv_id, 'md5'))
69-
WHERE user_id = p_user_id AND host = p_host AND port = p_port;
71+
WHERE user_id = q_user_id AND host = p_host AND port = p_port;
7072
END;
7173
$$;
7274

7375

76+
SET default_table_access_method = heap;
7477

75-
CREATE FUNCTION smp_agent_test_protocol_schema.xor_combine(state bytea, value bytea) RETURNS bytea
76-
LANGUAGE plpgsql IMMUTABLE STRICT
77-
AS $$
78-
DECLARE
79-
result BYTEA := state;
80-
i INTEGER;
81-
len INTEGER := octet_length(value);
82-
BEGIN
83-
IF octet_length(state) != len THEN
84-
RAISE EXCEPTION 'Inputs must be equal length (% != %)', octet_length(state), len;
85-
END IF;
86-
FOR i IN 0..len-1 LOOP
87-
result := set_byte(result, i, get_byte(state, i) # get_byte(value, i));
88-
END LOOP;
89-
RETURN result;
90-
END;
91-
$$;
9278

79+
CREATE TABLE smp_agent_test_protocol_schema.client_notices (
80+
client_notice_id bigint NOT NULL,
81+
protocol text NOT NULL,
82+
host text NOT NULL,
83+
port text NOT NULL,
84+
entity_id bytea NOT NULL,
85+
server_key_hash bytea,
86+
notice_ttl bigint,
87+
created_at bigint NOT NULL,
88+
updated_at bigint NOT NULL
89+
);
9390

9491

95-
CREATE AGGREGATE smp_agent_test_protocol_schema.xor_aggregate(bytea) (
96-
SFUNC = smp_agent_test_protocol_schema.xor_combine,
97-
STYPE = bytea,
98-
INITCOND = '\x00000000000000000000000000000000'
99-
);
10092

93+
ALTER TABLE smp_agent_test_protocol_schema.client_notices ALTER COLUMN client_notice_id ADD GENERATED ALWAYS AS IDENTITY (
94+
SEQUENCE NAME smp_agent_test_protocol_schema.client_notices_client_notice_id_seq
95+
START WITH 1
96+
INCREMENT BY 1
97+
NO MINVALUE
98+
NO MAXVALUE
99+
CACHE 1
100+
);
101101

102-
SET default_table_access_method = heap;
103102

104103

105104
CREATE TABLE smp_agent_test_protocol_schema.client_services (
@@ -535,6 +534,8 @@ CREATE TABLE smp_agent_test_protocol_schema.rcv_queues (
535534
link_priv_sig_key bytea,
536535
link_enc_fixed_data bytea,
537536
queue_mode text,
537+
to_subscribe smallint DEFAULT 0 NOT NULL,
538+
client_notice_id bigint,
538539
rcv_service_assoc smallint DEFAULT 0 NOT NULL
539540
);
540541

@@ -816,6 +817,11 @@ ALTER TABLE smp_agent_test_protocol_schema.xftp_servers ALTER COLUMN xftp_server
816817

817818

818819

820+
ALTER TABLE ONLY smp_agent_test_protocol_schema.client_notices
821+
ADD CONSTRAINT client_notices_pkey PRIMARY KEY (client_notice_id);
822+
823+
824+
819825
ALTER TABLE ONLY smp_agent_test_protocol_schema.commands
820826
ADD CONSTRAINT commands_pkey PRIMARY KEY (command_id);
821827

@@ -996,6 +1002,10 @@ ALTER TABLE ONLY smp_agent_test_protocol_schema.xftp_servers
9961002

9971003

9981004

1005+
CREATE UNIQUE INDEX idx_client_notices_entity ON smp_agent_test_protocol_schema.client_notices USING btree (protocol, host, port, entity_id);
1006+
1007+
1008+
9991009
CREATE INDEX idx_commands_conn_id ON smp_agent_test_protocol_schema.commands USING btree (conn_id);
10001010

10011011

@@ -1124,6 +1134,10 @@ CREATE UNIQUE INDEX idx_rcv_queue_id ON smp_agent_test_protocol_schema.rcv_queue
11241134

11251135

11261136

1137+
CREATE INDEX idx_rcv_queues_client_notice_id ON smp_agent_test_protocol_schema.rcv_queues USING btree (client_notice_id);
1138+
1139+
1140+
11271141
CREATE UNIQUE INDEX idx_rcv_queues_link_id ON smp_agent_test_protocol_schema.rcv_queues USING btree (host, port, link_id);
11281142

11291143

@@ -1132,6 +1146,10 @@ CREATE UNIQUE INDEX idx_rcv_queues_ntf ON smp_agent_test_protocol_schema.rcv_que
11321146

11331147

11341148

1149+
CREATE INDEX idx_rcv_queues_to_subscribe ON smp_agent_test_protocol_schema.rcv_queues USING btree (to_subscribe);
1150+
1151+
1152+
11351153
CREATE INDEX idx_server_certs_host_port ON smp_agent_test_protocol_schema.client_services USING btree (host, port);
11361154

11371155

@@ -1345,6 +1363,11 @@ ALTER TABLE ONLY smp_agent_test_protocol_schema.rcv_messages
13451363

13461364

13471365

1366+
ALTER TABLE ONLY smp_agent_test_protocol_schema.rcv_queues
1367+
ADD CONSTRAINT rcv_queues_client_notice_id_fkey FOREIGN KEY (client_notice_id) REFERENCES smp_agent_test_protocol_schema.client_notices(client_notice_id) ON UPDATE RESTRICT ON DELETE SET NULL;
1368+
1369+
1370+
13481371
ALTER TABLE ONLY smp_agent_test_protocol_schema.rcv_queues
13491372
ADD CONSTRAINT rcv_queues_conn_id_fkey FOREIGN KEY (conn_id) REFERENCES smp_agent_test_protocol_schema.connections(conn_id) ON DELETE CASCADE;
13501373

src/Simplex/Messaging/Agent/Store/SQLite.hs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,26 +43,31 @@ module Simplex.Messaging.Agent.Store.SQLite
4343
where
4444

4545
import Control.Monad
46+
import Data.Bits (xor)
4647
import Data.ByteArray (ScrubbedBytes)
4748
import qualified Data.ByteArray as BA
49+
import Data.ByteString (ByteString)
50+
import qualified Data.ByteString as B
4851
import Data.Functor (($>))
4952
import Data.IORef
5053
import Data.Maybe (fromMaybe)
5154
import Data.Text (Text)
5255
import qualified Data.Text as T
5356
import Database.SQLite.Simple (Query (..))
5457
import qualified Database.SQLite.Simple as SQL
58+
import Database.SQLite.Simple.Function
5559
import Database.SQLite.Simple.QQ (sql)
5660
import qualified Database.SQLite3 as SQLite3
5761
import Simplex.Messaging.Agent.Store.Migrations (DBMigrate (..), sharedMigrateSchema)
5862
import qualified Simplex.Messaging.Agent.Store.SQLite.Migrations as Migrations
5963
import Simplex.Messaging.Agent.Store.SQLite.Common
6064
import qualified Simplex.Messaging.Agent.Store.SQLite.DB as DB
6165
import Simplex.Messaging.Agent.Store.Shared (Migration (..), MigrationConfig (..), MigrationError (..))
66+
import qualified Simplex.Messaging.Crypto as C
6267
import Simplex.Messaging.Util (ifM, safeDecodeUtf8)
6368
import System.Directory (copyFile, createDirectoryIfMissing, doesFileExist)
6469
import System.FilePath (takeDirectory, takeFileName, (</>))
65-
import UnliftIO.Exception (bracketOnError, onException)
70+
import UnliftIO.Exception (bracketOnError, onException, throwIO)
6671
import UnliftIO.MVar
6772
import UnliftIO.STM
6873

@@ -119,6 +124,11 @@ connectDB path key track = do
119124
PRAGMA secure_delete = ON;
120125
PRAGMA auto_vacuum = FULL;
121126
|]
127+
createFunction (DB.conn db) "simplex_xor_md5_combine" xorMd5Combine
128+
>>= either (throwIO . userError . show) pure
129+
130+
xorMd5Combine :: ByteString -> ByteString -> ByteString
131+
xorMd5Combine idsHash rId = B.packZipWith xor idsHash $ C.md5Hash rId
122132

123133
closeDBStore :: DBStore -> IO ()
124134
closeDBStore st@DBStore {dbClosed} =

src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20251020_service_certs.hs

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,64 @@ CREATE UNIQUE INDEX idx_server_certs_user_id_host_port ON client_services(user_i
2525
CREATE INDEX idx_server_certs_host_port ON client_services(host, port);
2626

2727
ALTER TABLE rcv_queues ADD COLUMN rcv_service_assoc INTEGER NOT NULL DEFAULT 0;
28+
29+
CREATE TRIGGER tr_rcv_queue_insert
30+
AFTER INSERT ON rcv_queues
31+
FOR EACH ROW
32+
WHEN NEW.rcv_service_assoc != 0 AND NEW.deleted = 0
33+
BEGIN
34+
UPDATE client_services
35+
SET service_queue_count = service_queue_count + 1,
36+
service_queue_ids_hash = simplex_xor_md5_combine(service_queue_ids_hash, NEW.rcv_id)
37+
WHERE user_id = (SELECT user_id FROM connections WHERE conn_id = NEW.conn_id)
38+
AND host = NEW.host AND port = NEW.port;
39+
END;
40+
41+
CREATE TRIGGER tr_rcv_queue_delete
42+
AFTER DELETE ON rcv_queues
43+
FOR EACH ROW
44+
WHEN OLD.rcv_service_assoc != 0 AND OLD.deleted = 0
45+
BEGIN
46+
UPDATE client_services
47+
SET service_queue_count = service_queue_count - 1,
48+
service_queue_ids_hash = simplex_xor_md5_combine(service_queue_ids_hash, OLD.rcv_id)
49+
WHERE user_id = (SELECT user_id FROM connections WHERE conn_id = OLD.conn_id)
50+
AND host = OLD.host AND port = OLD.port;
51+
END;
52+
53+
CREATE TRIGGER tr_rcv_queue_update_remove
54+
AFTER UPDATE ON rcv_queues
55+
FOR EACH ROW
56+
WHEN OLD.rcv_service_assoc != 0 AND OLD.deleted = 0 AND NOT (NEW.rcv_service_assoc != 0 AND NEW.deleted = 0)
57+
BEGIN
58+
UPDATE client_services
59+
SET service_queue_count = service_queue_count - 1,
60+
service_queue_ids_hash = simplex_xor_md5_combine(service_queue_ids_hash, OLD.rcv_id)
61+
WHERE user_id = (SELECT user_id FROM connections WHERE conn_id = OLD.conn_id)
62+
AND host = OLD.host AND port = OLD.port;
63+
END;
64+
65+
CREATE TRIGGER tr_rcv_queue_update_add
66+
AFTER UPDATE ON rcv_queues
67+
FOR EACH ROW
68+
WHEN NEW.rcv_service_assoc != 0 AND NEW.deleted = 0 AND NOT (OLD.rcv_service_assoc != 0 AND OLD.deleted = 0)
69+
BEGIN
70+
UPDATE client_services
71+
SET service_queue_count = service_queue_count + 1,
72+
service_queue_ids_hash = simplex_xor_md5_combine(service_queue_ids_hash, NEW.rcv_id)
73+
WHERE user_id = (SELECT user_id FROM connections WHERE conn_id = NEW.conn_id)
74+
AND host = NEW.host AND port = NEW.port;
75+
END;
2876
|]
2977

3078
down_m20251020_service_certs :: Query
3179
down_m20251020_service_certs =
3280
[sql|
81+
DROP TRIGGER tr_rcv_queue_insert;
82+
DROP TRIGGER tr_rcv_queue_delete;
83+
DROP TRIGGER tr_rcv_queue_update_remove;
84+
DROP TRIGGER tr_rcv_queue_update_add;
85+
3386
ALTER TABLE rcv_queues DROP COLUMN rcv_service_assoc;
3487

3588
DROP INDEX idx_server_certs_host_port;

0 commit comments

Comments
 (0)