Skip to content

Commit fbcf1a8

Browse files
authored
[Backport stable-25-3] initial scd2 (#28802)
2 parents bd273f2 + 0efca16 commit fbcf1a8

File tree

3 files changed

+329
-1
lines changed

3 files changed

+329
-1
lines changed

ydb/docs/ru/core/analyst/practical-guides/scd/index.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,3 +32,5 @@
3232
* С использованием связки [Change Data Capture (CDC)](../../../concepts/cdc.md) и [Transfer](../../../concepts/transfer.md) для автоматической потоковой репликации изменений из таблиц-источников.
3333
* [{#T}](scd1-transfer.md)
3434
* [{#T}](scd2-transfer.md)
35+
* С помощью периодических YQL-запросов, которые обрабатывают пакеты изменений из промежуточной таблицы и подмерживают их в основную SCD-таблицу.
36+
* [{#T}](scd2-merge.md)
Lines changed: 324 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,324 @@
1+
# Использование процесса подмерживания изменения для реализации SCD2 в {{ ydb-full-name }}
2+
3+
В этой статье описывается реализация паттерна [Slowly Changing Dimensions Type 2 (SCD2)](./index.md#scd2) в {{ ydb-full-name }} с использованием процесса подмерживания изменений.
4+
5+
## Используемые инструменты
6+
7+
Для поставки данных в SCD2 таблицу в данной статье будет использоваться следующая комбинация из доступной в {{ ydb-short-name }} функциональности:
8+
9+
1. Таблица-источник `dimension_scd_changes`, содержащая информацию об атрибутах, их значениях и моментах изменений данных.
10+
1. Таблица-приёмник `dimension_scd2_final` для хранения результирующих данных.
11+
1. Периодически внешнее приложение должно вызывать запрос, который будет подмерживать изменения данных, накопившиеся в таблице `dimension_scd_changes`, в таблицу `dimension_scd2_final`.
12+
1. Для поставки данных из строковых таблиц для хранения их в формате SCD2 удобно использовать встроенный в {{ydb-short-name}} механизм [трансфера](../../../concepts/transfer.md).
13+
14+
{% note info %}
15+
16+
Таблицы `dimension_scd_changes`, `dimension_scd2_final` приведены для иллюстрации. Для реальных запросов вам нужно скорректировать структуру таблиц и их атрибутов.
17+
18+
{% endnote %}
19+
20+
## Создание таблицы для приёма всех изменений `dimension_scd_changes`
21+
22+
```sql
23+
CREATE TABLE dimension_scd_changes (
24+
id Utf8 NOT NULL, -- Бизнес-ключ
25+
attribute1 Utf8, -- Атрибут данных
26+
attribute2 Utf8, -- Атрибут данных
27+
change_time Timestamp NOT NULL, -- Момент изменения данных
28+
operation Utf8, -- Тип изменений данных
29+
PRIMARY KEY (change_time, id)
30+
)
31+
PARTITION BY HASH(change_time, id)
32+
WITH (
33+
STORE=COLUMN
34+
)
35+
```
36+
37+
Описание полей таблицы:
38+
39+
- `id` — бизнес-ключ записи;
40+
- `attribute1`, `attribute2` — атрибуты измерения;
41+
- `change_time` — момент времени изменения данных;
42+
- `operation` — тип изменения данных: `CREATE`, `UPDATE`, `DELETE`.
43+
44+
Первичный ключ создается как `PRIMARY KEY (change_time, id)`, так как по одному и тому же бизнес-ключу данных может происходить множество изменений и все эти изменения по одному ключу важно сохранять. Подробнее про выбор первичного ключа и ключа партиционирования, можно прочесть в документации - [выбор первичного ключа](../../../dev/primary-key/column-oriented.md##vybor-pervichnogo-klyucha), [выбор ключа партиционирования](../../../dev/primary-key/column-oriented.md##vybor-klyucha-particionirovaniya)
45+
46+
## Создание финальной SCD2 таблицы `dimension_scd2_final`
47+
48+
```sql
49+
CREATE TABLE dimension_scd2_final (
50+
id Utf8 NOT NULL, -- Бизнес-ключ данных
51+
attribute1 Utf8, -- Атрибут данных
52+
attribute2 Utf8, -- Атрибут данных
53+
valid_from Timestamp NOT NULL, -- Момент времени, с которого данные актуальны
54+
valid_to Timestamp, -- Момент времени, до которого данные актуальны.
55+
-- Если данные актуальны прямо сейчас, то в valid_to находится NULL
56+
is_current Uint8, -- Признак, что данные актуальны прямо сейчас.
57+
is_deleted Uint8, -- Признак, что данные были удалены. Если данные были удалены, то is_current = FALSE
58+
PRIMARY KEY (valid_from, id)
59+
)
60+
PARTITION BY HASH(valid_from, id)
61+
WITH(
62+
STORE=COLUMN
63+
)
64+
```
65+
66+
Описание полей таблицы:
67+
68+
- `id` — бизнес-ключ записи;
69+
- `attribute1`, `attribute2` — атрибуты измерения;
70+
- `valid_from` — момент времени, с которого запись становится актуальной;
71+
- `valid_to` — момент времени, до которого запись была актуальной, или `NULL` для текущих записей;
72+
- `is_current` — флаг, указывающий, является ли запись текущей (1 - текущая запись) или (0 - запись историческая);
73+
- `is_deleted` — флаг, указывающий, была ли запись удалена (1 - запись была удалена) или (0 - запись не была удалена).
74+
75+
Первичный ключ создается как `PRIMARY KEY (valid_from, id)`, так как по одному и тому же ключу данных может происходить множество изменений и все эти изменения по одному ключу важно сохранять.
76+
77+
## Загрузка данных в таблицу изменений
78+
79+
Для загрузки данных в таблицу изменений можно использовать любой способ загрузки данных и автоматическую поставку изменений с помощью механизма [трансфер](../../../concepts/transfer.md).
80+
81+
Пример запроса для явной загрузки изменений:
82+
83+
```sql
84+
UPSERT INTO dimension_scd_changes (id, attribute1, attribute2, change_time, operation)
85+
VALUES ('CUSTOMER_1001', 'John Doe', 'Los Angeles', Unwrap(CAST('2025-08-22T17:00:00Z' as Timestamp)), 'CREATE');
86+
87+
UPSERT INTO dimension_scd_changes (id, attribute1, attribute2, change_time, operation)
88+
VALUES ('CUSTOMER_1002', 'John Doe', 'New York', Unwrap(CAST('2025-08-22T17:00:00Z' as Timestamp)), 'CREATE');
89+
90+
UPSERT INTO dimension_scd_changes (id, attribute1, attribute2, change_time, operation)
91+
VALUES ('CUSTOMER_1001', 'John Doe', 'San Francisco', Unwrap(CAST('2025-08-22T19:00:00Z' as Timestamp)), 'UPDATE');
92+
93+
UPSERT INTO dimension_scd_changes (id, attribute1, attribute2, change_time, operation)
94+
VALUES ('CUSTOMER_1002', 'John Doe', 'New York', Unwrap(CAST('2025-08-22T21:00:00Z' as Timestamp)), 'DELETE');
95+
```
96+
97+
98+
## Запрос для размещения изменений в формате SCD2
99+
100+
Чтобы преобразовать данные из таблицы изменений в формат SCD2 и загрузить их в финальную таблицу, используется специальный запрос. Этот запрос нужно запускать регулярно — с такой периодичностью, с какой вы хотите обновлять данные в финальной таблице. Для автоматического запуска можно воспользоваться [интеграцию](../../../integrations/orchestration/airflow.md) {{ ydb-short-name }} с Apache Airflow™:
101+
102+
```sql
103+
-- Шаг 1: Читаем все новые события из таблицы `dimension_scd_changes`.
104+
-- Это именованное выражение ($changes) является исходным набором данных для всей последующей обработки в рамках этого запуска.
105+
$changes = (
106+
SELECT
107+
id,
108+
attribute1,
109+
attribute2,
110+
change_time,
111+
String::AsciiToUpper(operation) AS op
112+
FROM dimension_scd_changes
113+
);
114+
115+
-- Шаг 2: Фильтруем события, оставляя только те, которых еще нет в целевой таблице.
116+
-- Цель этого шага - обеспечить идемпотентность на уровне чтения, чтобы не обрабатывать
117+
-- уже загруженные данные в случае сбоя и перезапуска скрипта.
118+
$unprocessed_data = (
119+
SELECT
120+
chg.id AS id,
121+
chg.attribute1 AS attribute1,
122+
chg.attribute2 AS attribute2,
123+
chg.change_time AS change_time,
124+
chg.op AS op
125+
FROM $changes AS chg
126+
LEFT JOIN dimension_scd2_final AS scd
127+
ON chg.id = scd.id AND chg.change_time = scd.valid_from -- Ищем записи по каждой сущности (id) и времени изменения
128+
WHERE scd.id IS NULL -- для исключения строк, которые уже были перенесены в таблицу dimension_scd2_final ранее
129+
);
130+
131+
-- Шаг 3: Находим в целевой таблице активные записи (`is_current=1`), для которых пришли обновления.
132+
-- Формируем для них "закрывающие" версии, устанавливая `valid_to` равным времени
133+
-- самого первого изменения из новой пачки ($unprocessed_data).
134+
$close_open_intervals = (
135+
SELECT
136+
target.id AS id,
137+
target.attribute1 as attribute1,
138+
target.attribute2 as attribute2,
139+
target.valid_from as valid_from,
140+
0ut AS is_current, -- Закрываемая запись больше не является текущей
141+
unprocessed_data.change_time AS valid_to,
142+
target.is_deleted as is_deleted
143+
FROM dimension_scd2_final AS target
144+
INNER JOIN (
145+
SELECT
146+
id,
147+
MIN(change_time) AS change_time
148+
FROM $unprocessed_data
149+
GROUP BY id
150+
) AS unprocessed_data
151+
ON target.id = unprocessed_data.id
152+
WHERE target.is_current = 1ut
153+
);
154+
155+
-- Шаг 4: Преобразуем поток необработанных событий в версионные записи (строки для вставки).
156+
-- Здесь вычисляются все необходимые атрибуты для новых версий: `valid_to`, `is_current`, `is_deleted`.
157+
$updated_data = (
158+
SELECT
159+
t.id AS id,
160+
t.attribute1 AS attribute1,
161+
t.attribute2 AS attribute2,
162+
t.is_deleted AS is_deleted,
163+
-- Логика флага `is_current`: он устанавливается в 1 только для последней
164+
-- записи в цепочке (`next_change_time IS NULL`), и только если это не
165+
-- операция удаления (`is_deleted == 0`).
166+
IF(t.next_change_time IS NOT NULL OR t.is_deleted == 1ut, 0ut, 1ut) AS is_current,
167+
t.change_time AS valid_from,
168+
t.next_change_time AS valid_to
169+
FROM (
170+
-- Подзапрос вычисляет для каждой строки флаг удаления (`is_deleted`)
171+
-- и временную метку следующего события (`next_change_time`) с помощью оконной функции LEAD.
172+
SELECT
173+
unprocessed_data.id AS id,
174+
unprocessed_data.attribute1 AS attribute1,
175+
unprocessed_data.attribute2 AS attribute2,
176+
unprocessed_data.op AS op,
177+
unprocessed_data.change_time AS change_time,
178+
IF(unprocessed_data.op = "DELETE", 1ut, 0ut) AS is_deleted,
179+
LEAD(unprocessed_data.change_time) OVER (PARTITION BY id ORDER BY unprocessed_data.change_time) AS next_change_time
180+
FROM $unprocessed_data AS unprocessed_data
181+
) AS t
182+
);
183+
184+
-- Шаг 5: Атомарно применяем все рассчитанные изменения к целевой таблице.
185+
-- UPSERT обновит существующие записи (из $close_open_intervals) и вставит новые (из $updated_data).
186+
UPSERT INTO dimension_scd2_final (id, attribute1, attribute2, is_current, is_deleted, valid_from, valid_to)
187+
SELECT
188+
id,
189+
attribute1,
190+
attribute2,
191+
is_current,
192+
is_deleted,
193+
valid_from,
194+
valid_to
195+
FROM $close_open_intervals
196+
UNION ALL
197+
SELECT
198+
id,
199+
attribute1,
200+
attribute2,
201+
is_current,
202+
is_deleted,
203+
valid_from,
204+
valid_to
205+
FROM $updated_data;
206+
207+
-- Шаг 6: Очищает стейджинг-таблицу от обработанных записей.
208+
DELETE FROM dimension_scd_changes ON
209+
SELECT id, change_time FROM $changes;
210+
```
211+
212+
## Демонстрация работы
213+
214+
В примере ниже рассматривается сущность **Customer**:
215+
216+
- бизнес-ключ — поле `id`,
217+
- атрибуты — `attribute1` (полное имя) и `attribute2` (город).
218+
219+
В момент времени `2025-08-22 17:00` создаются два клиента (John в Los Angeles - с id `CUSTOMER_1001` и Judy в New York с id `CUSTOMER_1002`), в момент времени `2025-08-22 19:00` клиент `CUSTOMER_1001` меняет город на San Francisco `UPDATE`, а в момент `2025-08-22 21:00` клиент `CUSTOMER_1002` удаляется `DELETE`.
220+
221+
| id | attribute1 | attribute2 | change\_time | operation |
222+
| -------------- | ---------- | ------------- | ---------------- | --------- |
223+
| CUSTOMER\_1001 | John Doe | Los Angeles | 2025-08-22 17:00 | CREATE |
224+
| CUSTOMER\_1002 | Judy Doe | New York | 2025-08-22 17:00 | CREATE |
225+
| CUSTOMER\_1001 | John Doe | San Francisco | 2025-08-22 19:00 | UPDATE |
226+
| CUSTOMER\_1002 | Judy Doe | New York | 2025-08-22 21:00 | DELETE |
227+
228+
Процесс SCD2 преобразует такие события в интервальные версии записей с полями `valid_from` и `valid_to`. Например, у `CUSTOMER_1001` получится две последовательные версии: сначала с городом LA, затем с городом SF (актуальная запись, у которой `valid_to = NULL`). У `CUSTOMER_1002` будет одна устаревшая версия и последняя запись с флагами `is_deleted=1` и `is_current=0`, которая показывает, что пользователь удалён.
229+
230+
Ниже показаны исходные события и соответствующие им версии в финальной таблице.
231+
232+
```mermaid
233+
gantt
234+
title История изменения данных
235+
dateFormat YYYY-MM-DD HH:mm
236+
axisFormat %H:%M
237+
todayMarker off
238+
239+
section CUSTOMER_1001 — John Doe
240+
Los Angeles :done, t0, 2025-08-22 17:00, 2025-08-22 19:00
241+
San Francisco (is_current) :active, t1, 2025-08-22 19:00, 2025-08-23 00:00
242+
243+
section CUSTOMER_1002 — Judy Doe
244+
New York :done, t0b, 2025-08-22 17:00, 2025-08-22 21:00
245+
246+
```
247+
248+
| id | attribute1 | attribute2 | valid\_from | valid\_to | is\_current | is\_deleted |
249+
| -------------- | ---------- | ------------- | ---------------- | ---------------- | ----------- | ----------- |
250+
| CUSTOMER\_1001 | John Doe | Los Angeles | 2025-08-22 17:00 | 2025-08-22 19:00 | 0 | 0 |
251+
| CUSTOMER\_1001 | John Doe | San Francisco | 2025-08-22 19:00 | NULL | 1 | 0 |
252+
| CUSTOMER\_1002 | Judy Doe | New York | 2025-08-22 17:00 | 2025-08-22 21:00 | 0 | 0 |
253+
| CUSTOMER\_1002 | Judy Doe | New York | 2025-08-22 21:00 | NULL | 0 | 1 |
254+
255+
256+
## Получение данных из SCD2-таблицы
257+
258+
### Получение актуальных данных
259+
260+
```sql
261+
SELECT
262+
id,
263+
attribute1,
264+
attribute2,
265+
valid_from,
266+
valid_to
267+
FROM dimension_scd2_final
268+
WHERE is_current = 1ut;
269+
```
270+
271+
Результат:
272+
273+
| id | attribute1 | attribute2 | valid\_from | valid\_to | is\_current | is\_deleted |
274+
| -------------- | ---------- | ------------- | ---------------- | ---------------- | ----------- | ----------- |
275+
| CUSTOMER\_1001 | John Doe | San Francisco | 2025-08-22 19:00 | NULL | 1 | 0 |
276+
277+
278+
### Получение данных на определённый момент времени
279+
280+
```sql
281+
$as_of = Timestamp("2025-08-22T19:11:30.000000Z");
282+
283+
SELECT
284+
id,
285+
attribute1,
286+
attribute2,
287+
valid_from,
288+
valid_to
289+
FROM dimension_scd2_final
290+
WHERE valid_from <= $as_of
291+
AND (valid_to IS NULL OR valid_to > $as_of) -- Получаем записи, которые действовали в $as_of момент времени
292+
AND is_deleted = 0ut -- Только записи, которые не удалены
293+
```
294+
295+
Результат:
296+
297+
| id | attribute1 | attribute2 | valid\_from | valid\_to |
298+
| -------------- | ---------- | ------------- | ---------------- | ---------------- |
299+
| CUSTOMER\_1001 | John Doe | San Francisco | 2025-08-22 19:00 | NULL |
300+
| CUSTOMER\_1002 | Judy Doe | New York | 2025-08-22 17:00 | 2025-08-22 21:00 |
301+
302+
303+
### Получение истории изменений для конкретной записи
304+
305+
```sql
306+
SELECT
307+
id,
308+
attribute1,
309+
attribute2,
310+
valid_from,
311+
valid_to,
312+
is_current,
313+
is_deleted
314+
FROM dimension_scd2_final
315+
WHERE id = 'CUSTOMER_1001'
316+
ORDER BY valid_from;
317+
```
318+
319+
Результат:
320+
321+
| id | attribute1 | attribute2 | valid\_from | valid\_to | is\_current | is\_deleted |
322+
| -------------- | ---------- | ------------- | ---------------- | ---------------- | ----------- | ----------- |
323+
| CUSTOMER\_1001 | John Doe | Los Angeles | 2025-08-22 17:00 | 2025-08-22 19:00 | 0 | 0 |
324+
| CUSTOMER\_1001 | John Doe | San Francisco | 2025-08-22 19:00 | NULL | 1 | 0 |

ydb/docs/ru/core/analyst/practical-guides/scd/toc_p.yaml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,6 @@ items:
44
- name: SCD1 с использованием TRANSFER
55
href: scd1-transfer.md
66
- name: SCD2 с использованием TRANSFER
7-
href: scd2-transfer.md
7+
href: scd2-transfer.md
8+
- name: SCD2 с использованием подмерживания изменений
9+
href: scd2-merge.md

0 commit comments

Comments
 (0)