Skip to content

Commit e4000c5

Browse files
committed
WIP
1 parent 55bac25 commit e4000c5

File tree

6 files changed

+718
-30
lines changed

6 files changed

+718
-30
lines changed

lib/extensions/postgres_cdc_rls/subscriptions.ex

Lines changed: 38 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do
88

99
@type conn() :: Postgrex.conn()
1010
@type filter :: {binary, binary, binary}
11-
@type subscription_params :: {binary, binary, [filter]}
11+
@type subscription_params :: {binary, binary, binary, [filter]}
1212
@type subscription_list :: [%{id: binary, claims: map, subscription_params: subscription_params}]
1313

1414
@filter_types ["eq", "neq", "lt", "lte", "gt", "gte", "in"]
@@ -36,26 +36,32 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do
3636
subscription_id,
3737
entity,
3838
filters,
39-
claims
39+
claims,
40+
action_filter
4041
)
4142
select
4243
$4::text::uuid,
4344
sub_tables.entity,
4445
$6,
45-
$5
46+
$5,
47+
$7
4648
from
4749
sub_tables
4850
on conflict
49-
(subscription_id, entity, filters)
51+
(subscription_id, entity, filters, action_filter)
5052
do update set
5153
claims = excluded.claims,
5254
created_at = now()
5355
returning
5456
id"
5557

5658
transaction(conn, fn conn ->
57-
Enum.map(subscription_list, fn %{id: id, claims: claims, subscription_params: params = {schema, table, filters}} ->
58-
case query(conn, sql, [publication, schema, table, id, claims, filters]) do
59+
Enum.map(subscription_list, fn %{
60+
id: id,
61+
claims: claims,
62+
subscription_params: params = {action_filter, schema, table, filters}
63+
} ->
64+
case query(conn, sql, [publication, schema, table, id, claims, filters, action_filter]) do
5965
{:ok, %{num_rows: num} = result} when num > 0 ->
6066
send(manager, {:subscribed, {caller, id}})
6167
result
@@ -80,8 +86,8 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do
8086
:exit, reason -> {:error, {:exit, reason}}
8187
end
8288

83-
defp params_to_log({schema, table, filters}) do
84-
[schema: schema, table: table, filters: filters]
89+
defp params_to_log({action_filter, schema, table, filters}) do
90+
[event: action_filter, schema: schema, table: table, filters: filters]
8591
|> Enum.map_join(", ", fn {k, v} -> "#{k}: #{to_log(v)}" end)
8692
end
8793

@@ -168,27 +174,27 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do
168174
## Examples
169175
170176
iex> parse_subscription_params(%{"schema" => "public", "table" => "messages", "filter" => "subject=eq.hey"})
171-
{:ok, {"public", "messages", [{"subject", "eq", "hey"}]}}
177+
{:ok, {"*", "public", "messages", [{"subject", "eq", "hey"}]}}
172178
173179
`in` filter:
174180
175181
iex> parse_subscription_params(%{"schema" => "public", "table" => "messages", "filter" => "subject=in.(hidee,ho)"})
176-
{:ok, {"public", "messages", [{"subject", "in", "{hidee,ho}"}]}}
182+
{:ok, {"*", "public", "messages", [{"subject", "in", "{hidee,ho}"}]}}
177183
178184
no filter:
179185
180186
iex> parse_subscription_params(%{"schema" => "public", "table" => "messages"})
181-
{:ok, {"public", "messages", []}}
187+
{:ok, {"*", "public", "messages", []}}
182188
183189
only schema:
184190
185191
iex> parse_subscription_params(%{"schema" => "public"})
186-
{:ok, {"public", "*", []}}
192+
{:ok, {"*", "public", "*", []}}
187193
188194
only table:
189195
190196
iex> parse_subscription_params(%{"table" => "messages"})
191-
{:ok, {"public", "messages", []}}
197+
{:ok, {"*", "public", "messages", []}}
192198
193199
An unsupported filter will respond with an error tuple:
194200
@@ -209,13 +215,15 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do
209215

210216
@spec parse_subscription_params(map()) :: {:ok, subscription_params} | {:error, binary()}
211217
def parse_subscription_params(params) do
218+
action_filter = action_filter(params)
219+
212220
case params do
213221
%{"schema" => schema, "table" => table, "filter" => filter} ->
214222
with [col, rest] <- String.split(filter, "=", parts: 2),
215223
[filter_type, value] when filter_type in @filter_types <-
216224
String.split(rest, ".", parts: 2),
217225
{:ok, formatted_value} <- format_filter_value(filter_type, value) do
218-
{:ok, {schema, table, [{col, filter_type, formatted_value}]}}
226+
{:ok, {action_filter, schema, table, [{col, filter_type, formatted_value}]}}
219227
else
220228
{:error, msg} ->
221229
{:error, "Error parsing `filter` params: #{msg}"}
@@ -225,13 +233,13 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do
225233
end
226234

227235
%{"schema" => schema, "table" => table} ->
228-
{:ok, {schema, table, []}}
236+
{:ok, {action_filter, schema, table, []}}
229237

230238
%{"schema" => schema} ->
231-
{:ok, {schema, "*", []}}
239+
{:ok, {action_filter, schema, "*", []}}
232240

233241
%{"table" => table} ->
234-
{:ok, {"public", table, []}}
242+
{:ok, {action_filter, "public", table, []}}
235243

236244
map when is_map_key(map, "user_token") or is_map_key(map, "auth_token") ->
237245
{:error,
@@ -243,6 +251,19 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do
243251
end
244252
end
245253

254+
defp action_filter(%{"event" => "*"}), do: "*"
255+
256+
defp action_filter(%{"event" => event}) when is_binary(event) do
257+
case String.upcase(event) do
258+
"INSERT" -> "INSERT"
259+
"UPDATE" -> "UPDATE"
260+
"DELETE" -> "DELETE"
261+
_ -> "*"
262+
end
263+
end
264+
265+
defp action_filter(_), do: "*"
266+
246267
defp format_filter_value(filter, value) do
247268
case filter do
248269
"in" ->

lib/realtime/tenants/migrations.ex

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,9 @@ defmodule Realtime.Tenants.Migrations do
7777
RunSubscriptionIndexBridgingDisabled,
7878
BroadcastSendErrorLogging,
7979
CreateMessagesReplayIndex,
80-
BroadcastSendIncludePayloadId
80+
BroadcastSendIncludePayloadId,
81+
AddActionToSubscriptions,
82+
FilterActionPostgresChanges
8183
}
8284

8385
@migrations [
@@ -145,7 +147,9 @@ defmodule Realtime.Tenants.Migrations do
145147
{20_250_523_164_012, RunSubscriptionIndexBridgingDisabled},
146148
{20_250_714_121_412, BroadcastSendErrorLogging},
147149
{20_250_905_041_441, CreateMessagesReplayIndex},
148-
{20_251_103_001_201, BroadcastSendIncludePayloadId}
150+
{20_251_103_001_201, BroadcastSendIncludePayloadId},
151+
{20_251_120_212_548, AddActionToSubscriptions},
152+
{20_251_120_215_549, FilterActionPostgresChanges}
149153
]
150154

151155
defstruct [:tenant_external_id, :settings]
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
defmodule Realtime.Tenants.Migrations.AddActionToSubscriptions do
2+
@moduledoc false
3+
use Ecto.Migration
4+
5+
def up do
6+
execute("""
7+
ALTER TABLE realtime.subscription
8+
ADD COLUMN action_filter text DEFAULT '*' CHECK (action_filter IN ('*', 'INSERT', 'UPDATE', 'DELETE'));
9+
""")
10+
11+
execute("""
12+
DROP INDEX IF EXISTS "realtime"."subscription_subscription_id_entity_filters_key";
13+
""")
14+
15+
execute("""
16+
CREATE UNIQUE INDEX subscription_subscription_id_entity_filters_action_filter_key on realtime.subscription (subscription_id, entity, filters, action_filter);
17+
""")
18+
19+
# FIXME create index
20+
end
21+
22+
def down do
23+
execute("""
24+
ALTER TABLE realtime.subscription DROP COLUMN action_filter;
25+
""")
26+
end
27+
end

0 commit comments

Comments
 (0)