Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,40 +5,38 @@ do $$
begin
set search_path = '{{.Schema}}';

select count(seq)
create temporary table transactions_ids as
select row_number() over (order by transactions.seq) as row_number,
moves.seq as moves_seq, transactions.id, transactions.seq as transactions_seq
from moves
where transactions_id is null
join transactions on transactions.seq = moves.transactions_seq
where transactions_id is null;

create index transactions_ids_rows on transactions_ids(row_number) include (moves_seq, transactions_seq, id);

analyze transactions_ids;

select count(*)
from transactions_ids
into _max;

perform pg_notify('migrations-{{ .Schema }}', 'init: ' || _max);
loop

with _outdated_moves as (
for i in 1.._max by _batch_size loop
with _rows as (
select *
from moves
where transactions_id is null
limit _batch_size
from transactions_ids
where row_number >= i and row_number < i + _batch_size
)
update moves
set transactions_id = (
select id
from transactions
where seq = moves.transactions_seq
)
from _outdated_moves
where moves.seq in (_outdated_moves.seq);
set transactions_id = _rows.id
from _rows
where seq = _rows.moves_seq;

exit when not found;
commit;

perform pg_notify('migrations-{{ .Schema }}', 'continue: ' || _batch_size);

commit ;
end loop;

alter table moves
add constraint transactions_id_not_null
check (transactions_id is not null)
not valid;
end
$$
language plpgsql;
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ do $$
_ledger record;
_vsql text;
_batch_size integer := 1000;
_date timestamp without time zone;
_count integer := 0;
begin
set search_path = '{{.Schema}}';
Expand All @@ -19,17 +18,12 @@ do $$
execute _vsql;
end loop;

-- select the date where the "11-make-stateless" migration has been applied
select tstamp into _date
from goose_db_version
where version_id = 12;

create temporary table logs_transactions as
select id, ledger, date, (data->'transaction'->>'id')::bigint as transaction_id
select row_number() over (order by ledger, id) as row_number, ledger, date, (data->'transaction'->>'id')::bigint as transaction_id
from logs
where date <= _date;
where type = 'NEW_TRANSACTION' or type = 'REVERTED_TRANSACTION';

create index on logs_transactions (ledger, transaction_id) include (id, date);
create index on logs_transactions (row_number) include (ledger, date, transaction_id);

select count(*) into _count
from logs_transactions;
Expand All @@ -40,14 +34,12 @@ do $$
with _rows as (
select *
from logs_transactions
order by ledger, transaction_id
offset i
limit _batch_size
where row_number > i and row_number <= i + _batch_size
)
update transactions
set inserted_at = _rows.date
from _rows
where transactions.ledger = _rows.ledger and transactions.id = _rows.transaction_id;
where transactions.ledger = _rows.ledger and transactions.id = _rows.transaction_id and inserted_at is null;

commit;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ do $$
select transactions_seq, volumes
from moves_view
-- play better than offset/limit
where transactions_seq >= _offset and transactions_seq < _offset + _batch_size
where transactions_seq > _offset and transactions_seq <= _offset + _batch_size
)
update transactions
set post_commit_volumes = data.volumes
Expand Down
63 changes: 1 addition & 62 deletions internal/storage/bucket/migrations/27-fix-invalid-pcv/up.sql
Original file line number Diff line number Diff line change
@@ -1,67 +1,6 @@
do $$
declare
_offset integer := 0;
_batch_size integer := 1000;
begin
set search_path = '{{ .Schema }}';

drop table if exists moves_view;

create table moves_view as
select transactions_seq, public.aggregate_objects(jsonb_build_object(accounts_address, volumes)) as volumes
from (
select transactions_seq, accounts_address, public.aggregate_objects(json_build_object(asset, json_build_object('input', (post_commit_volumes).inputs, 'output', (post_commit_volumes).outputs))::jsonb) as volumes
from (
SELECT DISTINCT ON (moves.transactions_seq, accounts_address, asset) moves.transactions_seq, accounts_address, asset,
first_value(post_commit_volumes) OVER (
PARTITION BY moves.transactions_seq, accounts_address, asset
ORDER BY seq DESC
) AS post_commit_volumes
FROM moves
where insertion_date < (
select tstamp from goose_db_version where version_id = 12
)
) moves
group by transactions_seq, accounts_address
) data
group by transactions_seq;

create index moves_view_idx on moves_view(transactions_seq);
-- speed up hash join when updating rows later
alter table moves_view add foreign key(transactions_seq) references transactions(seq);

if (select count(*) from moves_view) = 0 then
drop table moves_view;
return;
end if;

perform pg_notify('migrations-{{ .Schema }}', 'init: ' || (select count(*) from moves_view));

loop
with data as (
select transactions_seq, volumes
from moves_view
-- play better than offset/limit
where transactions_seq >= _offset and transactions_seq < _offset + _batch_size
)
update transactions
set post_commit_volumes = data.volumes
from data
where transactions.seq = data.transactions_seq;

if not found then
drop table moves_view;
exit;
end if;

_offset = _offset + _batch_size;

perform pg_notify('migrations-{{ .Schema }}', 'continue: ' || _batch_size);

commit;
end loop;

drop table if exists moves_view;
raise notice 'Migration superseded by next migration';
end
$$;

Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ do $$
drop table if exists moves_view;

create table moves_view as
select transactions_seq, public.aggregate_objects(jsonb_build_object(accounts_address, volumes)) as volumes
select row_number() over (order by transactions_seq) as row_number,
transactions_seq, public.aggregate_objects(jsonb_build_object(accounts_address, volumes)) as volumes
from (
select transactions_seq, accounts_address, public.aggregate_objects(json_build_object(asset, json_build_object('input', (post_commit_volumes).inputs, 'output', (post_commit_volumes).outputs))::jsonb) as volumes
from (
Expand All @@ -26,7 +27,7 @@ do $$
) data
group by transactions_seq;

create index moves_view_idx on moves_view(transactions_seq);
create index moves_view_idx on moves_view(row_number) include (transactions_seq, volumes);
-- speed up hash join when updating rows later
alter table moves_view add foreign key(transactions_seq) references transactions(seq);

Expand All @@ -41,8 +42,7 @@ do $$
with data as (
select transactions_seq, volumes
from moves_view
offset _offset
limit _batch_size
where row_number > _offset and row_number <= _offset + _batch_size
)
update transactions
set post_commit_volumes = data.volumes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ do $$
from logs
where type = 'REVERTED_TRANSACTION'
)
select reversed.ledger, reversed.reversedTransactionID, reversed.revertedTransactionID, reversed.revertedAt
select row_number() over (order by transactions.seq) as row_number, reversed.ledger, reversed.reversedTransactionID, reversed.revertedTransactionID, reversed.revertedAt
from transactions
join reversed on
reversed.reversedTransactionID = transactions.id and
Expand All @@ -37,9 +37,7 @@ do $$
with data as (
select ledger, reversedTransactionID, revertedTransactionID, revertedAt
from txs_view
order by ledger, reversedTransactionID, revertedTransactionID
offset _offset
limit _batch_size
where row_number > _offset and row_number <= _offset + _batch_size
)
update transactions
set
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ do $$
drop table if exists txs_view;

create table txs_view as
select *
select row_number() over (order by transactions.seq) as row_number, *
from transactions
where updated_at is null;

Expand All @@ -26,9 +26,7 @@ do $$
with data as (
select *
from txs_view
order by seq
offset _offset
limit _batch_size
where row_number > _offset and row_number <= _offset+_batch_size
)
update transactions
set updated_at = transactions.inserted_at
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@ do $$
from logs
where type = 'REVERTED_TRANSACTION' and data->>'revertedTransactionID' is not null
)
select reversed.id as log_id, transactions.*
select row_number() over (order by transactions.seq) as row_number,
reversed.id as log_id, transactions.*
from transactions
join reversed on
reversed.revertedTransactionID = transactions.id and
reversed.ledger = transactions.ledger;

create index txs_view_idx on txs_view(log_id, id);
create index txs_view_idx on txs_view(row_number);

if (select count(*) from txs_view) = 0 then
return;
Expand All @@ -34,9 +35,7 @@ do $$
with data as (
select *
from txs_view
order by ledger, log_id, id
offset _offset
limit _batch_size
where row_number > _offset and row_number <= _offset + _batch_size
)
update logs
set data = data || jsonb_build_object('revertedTransaction', jsonb_build_object(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ do $$
from logs
where type = 'REVERTED_TRANSACTION' and data->>'revertedTransactionID' is not null
)
select reversed.id as log_id, transactions.*
select row_number() over (order by transactions.seq) as row_number, reversed.id as log_id, transactions.*
from transactions
join reversed on
reversed.revertedTransactionID = transactions.id and
Expand All @@ -34,9 +34,7 @@ do $$
with data as (
select *
from txs_view
order by ledger, log_id, id
offset _offset
limit _batch_size
where row_number > _offset and row_number <= _offset + _batch_size
)
update logs
set data = data || jsonb_build_object('revertedTransaction', jsonb_build_object(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ do $$
with data as (
select *
from logs
where seq >= _offset and seq < _offset + _batch_size
where seq > _offset and seq <= _offset + _batch_size
order by seq
)
update logs
Expand Down
Loading