diff --git a/lib/realtime/tenants/repo/migrations/20240423161054_use_fully_qualified_names.ex b/lib/realtime/tenants/repo/migrations/20240423161054_use_fully_qualified_names.ex new file mode 100644 index 000000000..00d4f3b8f --- /dev/null +++ b/lib/realtime/tenants/repo/migrations/20240423161054_use_fully_qualified_names.ex @@ -0,0 +1,325 @@ +defmodule Realtime.Tenants.Migrations.UseFullyQualifiedNames do + @moduledoc false + + use Ecto.Migration + + def change do + execute(" +-- build_prepared_statement_sql function + +CREATE OR REPLACE FUNCTION realtime.build_prepared_statement_sql(prepared_statement_name text, entity regclass, columns realtime.wal_column[]) +RETURNS text +LANGUAGE sql +SET search_path = '' +AS $function$ + /* + Builds a sql string that, if executed, creates a prepared statement to + tests retrive a row from *entity* by its primary key columns. + Example + select realtime.build_prepared_statement_sql('public.notes', '{\"id\"}'::text[], '{\"bigint\"}'::text[]) + */ + select + 'prepare ' || prepared_statement_name || ' as + select + exists( + select + 1 + from + ' || entity || ' + where + ' || pg_catalog.string_agg(pg_catalog.quote_ident(pkc.name) || '=' || pg_catalog.quote_nullable(pkc.value #>> '{}') , ' and ') || ' + )' + from + pg_catalog.unnest(columns) pkc + where + pkc.is_pkey + group by + entity; +$function$; + +-- cast function + +CREATE OR REPLACE FUNCTION realtime.\"cast\"(val text, type_ regtype) + RETURNS jsonb + LANGUAGE plpgsql + SET search_path = '' + IMMUTABLE +AS $function$ + declare + res jsonb; + begin + execute pg_catalog.format('select to_jsonb(%L::'|| type_::text || ')', val) into res; + return res; + end +$function$; + +-- channel_name function + +CREATE OR REPLACE FUNCTION realtime.channel_name() + RETURNS text + SET search_path = '' + LANGUAGE sql + STABLE +AS $function$ + select nullif(pg_catalog.current_setting('realtime.channel_name', true), '')::text; +$function$; + +-- check_equality_op function + +CREATE OR REPLACE FUNCTION realtime.check_equality_op(op realtime.equality_op, type_ regtype, val_1 text, val_2 text) + RETURNS boolean + LANGUAGE plpgsql + SET search_path = '' + IMMUTABLE +AS $function$ + /* + Casts *val_1* and *val_2* as type *type_* and check the *op* condition for truthiness + */ + declare + op_symbol text = ( + case + when op = 'eq' then '=' + when op = 'neq' then '!=' + when op = 'lt' then '<' + when op = 'lte' then '<=' + when op = 'gt' then '>' + when op = 'gte' then '>=' + when op = 'in' then '= any' + else 'UNKNOWN OP' + end + ); + res boolean; + begin + execute pg_catalog.format( + 'select %L::'|| type_::text || ' ' || op_symbol + || ' ( %L::' + || ( + case + when op = 'in' then type_::text || '[]' + else type_::text end + ) + || ')', val_1, val_2) into res; + return res; + end; + $function$; + +-- is_visible_through_filters function + +CREATE OR REPLACE FUNCTION realtime.is_visible_through_filters(columns realtime.wal_column[], filters realtime.user_defined_filter[]) +RETURNS boolean +LANGUAGE sql +SET search_path = '' +IMMUTABLE +AS $function$ + /* + Should the record be visible (true) or filtered out (false) after *filters* are applied + */ + select + -- Default to allowed when no filters present + $2 is null -- no filters. this should not happen because subscriptions has a default + or pg_catalog.array_length($2, 1) is null -- array length of an empty array is null + or bool_and( + coalesce( + realtime.check_equality_op( + op:=f.op, + type_:=coalesce( + col.type_oid::regtype, -- null when wal2json version <= 2.4 + col.type_name::regtype + ), + -- cast jsonb to text + val_1:=col.value #>> '{}', + val_2:=f.value + ), + false -- if null, filter does not match + ) + ) + from + pg_catalog.unnest(filters) f + join pg_catalog.unnest(columns) col + on f.column_name = col.name; +$function$; + +-- quote_wal2json function +CREATE OR REPLACE FUNCTION realtime.list_changes(publication name, slot_name name, max_changes integer, max_record_bytes integer) + RETURNS SETOF realtime.wal_rls + LANGUAGE sql + SET search_path = '' + SET log_min_messages TO 'fatal' +AS $function$ + with pub as ( + select + concat_ws( + ',', + case when bool_or(pubinsert) then 'insert' else null end, + case when bool_or(pubupdate) then 'update' else null end, + case when bool_or(pubdelete) then 'delete' else null end + ) as w2j_actions, + coalesce( + string_agg( + realtime.quote_wal2json(format('%I.%I', schemaname, tablename)::regclass), + ',' + ) filter (where ppt.tablename is not null and ppt.tablename not like '% %'), + '' + ) w2j_add_tables + from + pg_publication pp + left join pg_publication_tables ppt + on pp.pubname = ppt.pubname + where + pp.pubname = publication + group by + pp.pubname + limit 1 + ), + w2j as ( + select + x.*, pub.w2j_add_tables + from + pub, + pg_catalog.pg_logical_slot_get_changes( + slot_name, null, max_changes, + 'include-pk', 'true', + 'include-transaction', 'false', + 'include-timestamp', 'true', + 'include-type-oids', 'true', + 'format-version', '2', + 'actions', pub.w2j_actions, + 'add-tables', pub.w2j_add_tables + ) x + ) + select + xyz.wal, + xyz.is_rls_enabled, + xyz.subscription_ids, + xyz.errors + from + w2j, + realtime.apply_rls( + wal := w2j.data::jsonb, + max_record_bytes := max_record_bytes + ) xyz(wal, is_rls_enabled, subscription_ids, errors) + where + w2j.w2j_add_tables <> '' + and xyz.subscription_ids[1] is not null + $function$; + +-- quote_wal2json function + +CREATE OR REPLACE FUNCTION realtime.quote_wal2json(entity regclass) +RETURNS text +LANGUAGE sql +SET search_path = '' +IMMUTABLE STRICT +AS $function$ + select + ( + select pg_catalog.string_agg('' || ch,'') + from pg_catalog.unnest(pg_catalog.string_to_array(nsp.nspname::text, null)) with ordinality x(ch, idx) + where + not (x.idx = 1 and x.ch = '\"') + and not ( + x.idx = pg_catalog.array_length(pg_catalog.string_to_array(nsp.nspname::text, null), 1) + and x.ch = '\"' + ) + ) + || '.' + || ( + select string_agg('' || ch,'') + from pg_catalog.unnest(pg_catalog.string_to_array(pc.relname::text, null)) with ordinality x(ch, idx) + where + not (x.idx = 1 and x.ch = '\"') + and not ( + x.idx = pg_catalog.array_length(pg_catalog.string_to_array(nsp.nspname::text, null), 1) + and x.ch = '\"' + ) + ) + from + pg_class pc + join pg_namespace nsp + on pc.relnamespace = nsp.oid + where + pc.oid = entity + $function$; + +-- subscription_check_filters function + +CREATE OR REPLACE FUNCTION realtime.subscription_check_filters() +RETURNS trigger +SET search_path = '' +LANGUAGE plpgsql +AS $function$ + /* + Validates that the user defined filters for a subscription: + - refer to valid columns that the claimed role may access + - values are coercable to the correct column type + */ + declare + col_names text[] = coalesce( + pg_catalog.array_agg(c.column_name order by c.ordinal_position), + '{}'::text[] + ) + from + information_schema.columns c + where + pg_catalog.format('%I.%I', c.table_schema, c.table_name)::regclass = new.entity + and pg_catalog.has_column_privilege( + (new.claims ->> 'role'), + pg_catalog.format('%I.%I', c.table_schema, c.table_name)::regclass, + c.column_name, + 'SELECT' + ); + filter realtime.user_defined_filter; + col_type regtype; + + in_val jsonb; + begin + for filter in select * from pg_catalog.unnest(new.filters) loop + -- Filtered column is valid + if not filter.column_name = any(col_names) then + raise exception 'invalid column for filter %', filter.column_name; + end if; + + -- Type is sanitized and safe for string interpolation + col_type = ( + select atttypid::regtype + from pg_catalog.pg_attribute + where attrelid = new.entity + and attname = filter.column_name + ); + if col_type is null then + raise exception 'failed to lookup type for column %', filter.column_name; + end if; + + -- Set maximum number of entries for in filter + if filter.op = 'in'::realtime.equality_op then + in_val = realtime.cast(filter.value, (col_type::text || '[]')::regtype); + if coalesce(pg_catalog.jsonb_array_length(in_val), 0) > 100 then + raise exception 'too many values for `in` filter. Maximum 100'; + end if; + else + -- raises an exception if value is not coercable to type + perform realtime.cast(filter.value, col_type); + end if; + + end loop; + + -- Apply consistent order to filters so the unique constraint on + -- (subscription_id, entity, filters) can't be tricked by a different filter order + new.filters = coalesce( + pg_catalog.array_agg(f order by f.column_name, f.op, f.value), + '{}' + ) from pg_catalog.unnest(new.filters) f; + + return new; + end; + $function$; + +CREATE OR REPLACE FUNCTION realtime.to_regrole(role_name text) +RETURNS regrole +SET search_path = '' +LANGUAGE sql +IMMUTABLE +AS $function$ select role_name::regrole $function$; + ") + end + end diff --git a/mix.exs b/mix.exs index bf60890ca..61af66e95 100644 --- a/mix.exs +++ b/mix.exs @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do def project do [ app: :realtime, - version: "2.29.14", + version: "2.29.15", elixir: "~> 1.16.0", elixirc_paths: elixirc_paths(Mix.env()), start_permanent: Mix.env() == :prod,