@@ -66,17 +66,17 @@ export default class PostgresMetaPublications {
6666 publish_update = false ,
6767 publish_delete = false ,
6868 publish_truncate = false ,
69- tables,
69+ tables = null ,
7070 } : {
7171 name : string
7272 publish_insert ?: boolean
7373 publish_update ?: boolean
7474 publish_delete ?: boolean
7575 publish_truncate ?: boolean
76- tables ?: string [ ]
76+ tables ?: string [ ] | null
7777 } ) : Promise < PostgresMetaResult < PostgresPublication > > {
7878 let tableClause : string
79- if ( tables === undefined ) {
79+ if ( tables === undefined || tables === null ) {
8080 tableClause = 'FOR ALL TABLES'
8181 } else if ( tables . length === 0 ) {
8282 tableClause = ''
@@ -127,87 +127,106 @@ CREATE PUBLICATION ${ident(name)} ${tableClause}
127127 publish_update ?: boolean
128128 publish_delete ?: boolean
129129 publish_truncate ?: boolean
130- tables ?: string [ ]
130+ tables ?: string [ ] | null
131131 }
132132 ) : Promise < PostgresMetaResult < PostgresPublication > > {
133- const { data : old , error } = await this . retrieve ( { id } )
134- if ( error ) {
135- return { data : null , error }
136- }
137-
138- // Need to work around the limitations of the SQL. Can't add/drop tables from
139- // a publication with FOR ALL TABLES. Can't use the SET TABLE clause without
140- // at least one table.
141- //
142- // new tables
143- //
144- // | undefined | string[] |
145- // ---------|-----------|-----------------|
146- // null | '' | 400 Bad Request |
147- // old tables ---------|-----------|-----------------|
148- // object[] | '' | See below |
149- //
150- // new tables
151- //
152- // | [] | [...] |
153- // ---------|-----------|-----------------|
154- // [] | '' | SET TABLE |
155- // old tables ---------|-----------|-----------------|
156- // [...] | DROP all | SET TABLE |
157- //
158- let tableSql : string
159- if ( tables === undefined ) {
160- tableSql = ''
161- } else if ( old ! . tables === null ) {
162- throw new Error ( 'Tables cannot be added to or dropped from FOR ALL TABLES publications' )
163- } else if ( tables . length > 0 ) {
164- tableSql = `ALTER PUBLICATION ${ ident ( old ! . name ) } SET TABLE ${ tables
165- . map ( ( t ) => {
166- if ( ! t . includes ( '.' ) ) {
167- return ident ( t )
168- }
169-
170- const [ schema , ...rest ] = t . split ( '.' )
171- const table = rest . join ( '.' )
172- return `${ ident ( schema ) } .${ ident ( table ) } `
173- } )
174- . join ( ',' ) } ;`
175- } else if ( old ! . tables . length === 0 ) {
176- tableSql = ''
177- } else {
178- tableSql = `ALTER PUBLICATION ${ ident ( old ! . name ) } DROP TABLE ${ old ! . tables
179- . map (
180- ( table : { schema : string ; name : string } ) => `${ ident ( table . schema ) } .${ ident ( table . name ) } `
133+ const sql = `
134+ do $$
135+ declare
136+ id oid := ${ literal ( id ) } ;
137+ old record;
138+ new_name text := ${ name === undefined ? null : literal ( name ) } ;
139+ new_owner text := ${ owner === undefined ? null : literal ( owner ) } ;
140+ new_publish_insert bool := ${ publish_insert ?? null } ;
141+ new_publish_update bool := ${ publish_update ?? null } ;
142+ new_publish_delete bool := ${ publish_delete ?? null } ;
143+ new_publish_truncate bool := ${ publish_truncate ?? null } ;
144+ new_tables text := ${
145+ tables === undefined
146+ ? null
147+ : literal (
148+ tables === null
149+ ? 'all tables'
150+ : tables
151+ . map ( ( t ) => {
152+ if ( ! t . includes ( '.' ) ) {
153+ return ident ( t )
154+ }
155+
156+ const [ schema , ...rest ] = t . split ( '.' )
157+ const table = rest . join ( '.' )
158+ return `${ ident ( schema ) } .${ ident ( table ) } `
159+ } )
160+ . join ( ',' )
181161 )
182- . join ( ',' ) } ;`
183- }
184-
185- let publishOps = [ ]
186- if ( publish_insert ?? old ! . publish_insert ) publishOps . push ( 'insert' )
187- if ( publish_update ?? old ! . publish_update ) publishOps . push ( 'update' )
188- if ( publish_delete ?? old ! . publish_delete ) publishOps . push ( 'delete' )
189- if ( publish_truncate ?? old ! . publish_truncate ) publishOps . push ( 'truncate' )
190- const publishSql = `ALTER PUBLICATION ${ ident ( old ! . name ) } SET (publish = '${ publishOps . join (
191- ','
192- ) } ');`
193-
194- const ownerSql =
195- owner === undefined ? '' : `ALTER PUBLICATION ${ ident ( old ! . name ) } OWNER TO ${ ident ( owner ) } ;`
196-
197- const nameSql =
198- name === undefined || name === old ! . name
199- ? ''
200- : `ALTER PUBLICATION ${ ident ( old ! . name ) } RENAME TO ${ ident ( name ) } ;`
201-
202- // nameSql must be last
203- const sql = `BEGIN; ${ tableSql } ${ publishSql } ${ ownerSql } ${ nameSql } COMMIT;`
204- {
205- const { error } = await this . query ( sql )
206- if ( error ) {
207- return { data : null , error }
208- }
209- }
210- return await this . retrieve ( { id } )
162+ } ;
163+ begin
164+ select * into old from pg_publication where oid = id;
165+ if old is null then
166+ raise exception 'Cannot find publication with id %', id;
167+ end if;
168+
169+ if new_tables is null then
170+ null;
171+ elsif new_tables = 'all tables' then
172+ if old.puballtables then
173+ null;
174+ else
175+ -- Need to recreate because going from list of tables <-> all tables with alter is not possible.
176+ execute(format('drop publication %1$I; create publication %1$I for all tables;', old.pubname));
177+ end if;
178+ else
179+ if old.puballtables then
180+ -- Need to recreate because going from list of tables <-> all tables with alter is not possible.
181+ execute(format('drop publication %1$I; create publication %1$I;', old.pubname));
182+ elsif exists(select from pg_publication_rel where prpubid = id) then
183+ execute(
184+ format(
185+ 'alter publication %I drop table %s',
186+ old.pubname,
187+ (select string_agg(prrelid::regclass::text, ', ') from pg_publication_rel where prpubid = id)
188+ )
189+ );
190+ end if;
191+
192+ -- At this point the publication must have no tables.
193+
194+ if new_tables != '' then
195+ execute(format('alter publication %I add table %s', old.pubname, new_tables));
196+ end if;
197+ end if;
198+
199+ execute(
200+ format(
201+ 'alter publication %I set (publish = %L);',
202+ old.pubname,
203+ concat_ws(
204+ ', ',
205+ case when coalesce(new_publish_insert, old.pubinsert) then 'insert' end,
206+ case when coalesce(new_publish_update, old.pubupdate) then 'update' end,
207+ case when coalesce(new_publish_delete, old.pubdelete) then 'delete' end,
208+ case when coalesce(new_publish_truncate, old.pubtruncate) then 'truncate' end
209+ )
210+ )
211+ );
212+
213+ execute(format('alter publication %I owner to %I;', old.pubname, coalesce(new_owner, old.pubowner::regrole::name)));
214+
215+ -- Using the same name in the rename clause gives an error, so only do it if the new name is different.
216+ if new_name is not null and new_name != old.pubname then
217+ execute(format('alter publication %I rename to %I;', old.pubname, coalesce(new_name, old.pubname)));
218+ end if;
219+
220+ -- We need to retrieve the publication later, so we need a way to uniquely identify which publication this is.
221+ -- We can't rely on id because it gets changed if it got recreated.
222+ -- We use a temp table to store the unique name - DO blocks can't return a value.
223+ create temp table pg_meta_publication_tmp (name) on commit drop as values (coalesce(new_name, old.pubname));
224+ end $$;
225+
226+ with publications as (${ publicationsSql } ) select * from publications where name = (select name from pg_meta_publication_tmp);
227+ `
228+ const { data, error } = await this . query ( sql )
229+ return { data : data [ 0 ] , error }
211230 }
212231
213232 async remove ( id : number ) : Promise < PostgresMetaResult < PostgresPublication > > {
0 commit comments