Skip to content

Commit c56dd57

Browse files
authored
Merge pull request #4302 from ClickHouse/mongo-quick-start
Document common patterns working with JSON data replicated from MongoDB CDC Connector
2 parents d18e9a4 + f2e35e1 commit c56dd57

File tree

2 files changed

+312
-0
lines changed

2 files changed

+312
-0
lines changed
Lines changed: 311 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,311 @@
1+
---
2+
title: 'Working with JSON in ClickHouse'
3+
sidebar_label: 'Working with JSON'
4+
slug: /integrations/clickpipes/mongodb/quickstart
5+
description: 'Common patterns for working with JSON data replicated from MongoDB to ClickHouse via ClickPipes'
6+
---
7+
8+
# Working with JSON in ClickHouse
9+
10+
This guide provides common patterns for working with JSON data replicated from MongoDB to ClickHouse via ClickPipes.
11+
12+
Suppose we created a collection `t1` in MongoDB to track customer orders:
13+
14+
```javascript
15+
db.t1.insertOne({
16+
"order_id": "ORD-001234",
17+
"customer_id": 98765,
18+
"status": "completed",
19+
"total_amount": 299.97,
20+
"order_date": new Date(),
21+
"shipping": {
22+
"method": "express",
23+
"city": "Seattle",
24+
"cost": 19.99
25+
},
26+
"items": [
27+
{
28+
"category": "electronics",
29+
"price": 149.99
30+
},
31+
{
32+
"category": "accessories",
33+
"price": 24.99
34+
}
35+
]
36+
})
37+
```
38+
39+
MongoDB CDC Connector replicates MongoDB documents to ClickHouse using the native JSON data type. The replicated table `t1` in ClickHouse will contain the following row:
40+
41+
```shell
42+
Row 1:
43+
──────
44+
_id: "68a4df4b9fe6c73b541703b0"
45+
_full_document: {"_id":"68a4df4b9fe6c73b541703b0","customer_id":"98765","items":[{"category":"electronics","price":149.99},{"category":"accessories","price":24.99}],"order_date":"2025-08-19T20:32:11.705Z","order_id":"ORD-001234","shipping":{"city":"Seattle","cost":19.99,"method":"express"},"status":"completed","total_amount":299.97}
46+
_peerdb_synced_at: 2025-08-19 20:50:42.005000000
47+
_peerdb_is_deleted: 0
48+
_peerdb_version: 0
49+
```
50+
51+
## Table schema {#table-schema}
52+
53+
The replicated tables use this standard schema:
54+
55+
```shell
56+
┌─name───────────────┬─type──────────┐
57+
│ _id │ String │
58+
│ _full_document │ JSON │
59+
│ _peerdb_synced_at │ DateTime64(9) │
60+
│ _peerdb_version │ Int64 │
61+
│ _peerdb_is_deleted │ Int8 │
62+
└────────────────────┴───────────────┘
63+
```
64+
65+
- `_id`: Primary key from MongoDB
66+
- `_full_document`: MongoDB document replicated as JSON data type
67+
- `_peerdb_synced_at`: Records when the row was last synced
68+
- `_peerdb_version`: Tracks the version of the row; incremented when the row is updated or deleted
69+
- `_peerdb_is_deleted`: Marks whether the row is deleted
70+
71+
### ReplacingMergeTree table engine {#replacingmergetree-table-engine}
72+
73+
ClickPipes maps MongoDB collections into ClickHouse using the `ReplacingMergeTree` table engine family. With this engine, updates are modeled as inserts with a newer version (`_peerdb_version`) of the document for a given primary key (`_id`), enabling efficient handling of updates, replaces, and deletes as versioned inserts.
74+
75+
`ReplacingMergeTree` clears out duplicates asynchronously in the background. To guarantee the absence of duplicates for the same row, use the [`FINAL` modifier](https://clickhouse.com/docs/sql-reference/statements/select/from#final-modifier). For example:
76+
77+
```sql
78+
SELECT * FROM t1 FINAL;
79+
```
80+
81+
### Handling deletes {#handling-deletes}
82+
83+
Deletes from MongoDB are propagated as new rows marked as deleted using the `_peerdb_is_deleted` column. You typically want to filter these out in your queries:
84+
85+
```sql
86+
SELECT * FROM t1 FINAL WHERE _peerdb_is_deleted = 0;
87+
```
88+
89+
You can also create a row-level policy to automatically filter out deleted rows instead of specifying the filter in each query:
90+
91+
```sql
92+
CREATE ROW POLICY policy_name ON t1
93+
FOR SELECT USING _peerdb_is_deleted = 0;
94+
```
95+
96+
## Querying JSON data {#querying-json-data}
97+
98+
You can directly query JSON fields using dot syntax:
99+
100+
```sql title="Query"
101+
SELECT
102+
_full_document.order_id,
103+
_full_document.shipping.method
104+
FROM t1;
105+
```
106+
107+
```shell title="Result"
108+
┌─_full_document.order_id─┬─_full_document.shipping.method─┐
109+
│ ORD-001234 │ express │
110+
└─────────────────────────┴────────────────────────────────┘
111+
```
112+
113+
### Dynamic type {#dynamic-type}
114+
115+
In ClickHouse, each field in JSON has `Dynamic` type. Dynamic type allows ClickHouse to store values of any type without knowing the type in advance. You can verify this with the `toTypeName` function:
116+
117+
```sql title="Query"
118+
SELECT toTypeName(_full_document.customer_id) AS type FROM t1;
119+
```
120+
121+
```shell title="Result"
122+
┌─type────┐
123+
│ Dynamic │
124+
└─────────┘
125+
```
126+
127+
To examine the underlying data type(s) for a field, you can check with the `dynamicType` function. Note that it's possible to have different data types for the same field name in different rows:
128+
129+
```sql title="Query"
130+
SELECT dynamicType(_full_document.customer_id) AS type FROM t1;
131+
```
132+
133+
```shell title="Result"
134+
┌─type──┐
135+
│ Int64 │
136+
└───────┘
137+
```
138+
139+
[Regular functions](https://clickhouse.com/docs/sql-reference/functions/regular-functions) work for dynamic type just like they do for regular columns:
140+
141+
**Example 1: Date parsing**
142+
143+
```sql title="Query"
144+
SELECT parseDateTimeBestEffortOrNull(_full_document.order_date) AS order_date FROM t1;
145+
```
146+
147+
```shell title="Result"
148+
┌─order_date──────────┐
149+
│ 2025-08-19 20:32:11 │
150+
└─────────────────────┘
151+
```
152+
153+
**Example 2: Conditional logic**
154+
155+
```sql title="Query"
156+
SELECT multiIf(
157+
_full_document.total_amount < 100, 'less_than_100',
158+
_full_document.total_amount < 1000, 'less_than_1000',
159+
'1000+') AS spendings
160+
FROM t1;
161+
```
162+
163+
```shell title="Result"
164+
┌─spendings──────┐
165+
│ less_than_1000 │
166+
└────────────────┘
167+
```
168+
169+
**Example 3: Array operations**
170+
171+
```sql title="Query"
172+
SELECT length(_full_document.items) AS item_count FROM t1;
173+
```
174+
175+
```shell title="Result"
176+
┌─item_count─┐
177+
│ 2 │
178+
└────────────┘
179+
```
180+
181+
### Field casting {#field-casting}
182+
183+
[Aggregation functions](https://clickhouse.com/docs/sql-reference/aggregate-functions/combinators) in ClickHouse don't work with dynamic type directly. For example, if you attempt to directly use the `sum` function on a dynamic type, you get the following error:
184+
185+
```sql
186+
SELECT sum(_full_document.shipping.cost) AS shipping_cost FROM t1;
187+
-- DB::Exception: Illegal type Dynamic of argument for aggregate function sum. (ILLEGAL_TYPE_OF_ARGUMENT)
188+
```
189+
190+
To use aggregation functions, cast the field to the appropriate type with the `CAST` function or `::` syntax:
191+
192+
```sql title="Query"
193+
SELECT sum(_full_document.shipping.cost::Float32) AS shipping_cost FROM t1;
194+
```
195+
196+
```shell title="Result"
197+
┌─shipping_cost─┐
198+
│ 19.99 │
199+
└───────────────┘
200+
```
201+
202+
:::note
203+
Casting from dynamic type to the underlying data type (determined by `dynamicType`) is very performant, as ClickHouse already stores the value in its underlying type internally.
204+
:::
205+
206+
## Flattening JSON {#flattening-json}
207+
208+
### Normal view {#normal-view}
209+
210+
You can create normal views on top of the JSON table to encapsulate flattening/casting/transformation logic in order to query data similar to a relational table. Normal views are lightweight as they only store the query itself, not the underlying data. For example:
211+
212+
```sql
213+
CREATE VIEW v1 AS
214+
SELECT
215+
CAST(_full_document._id, 'String') AS object_id,
216+
CAST(_full_document.order_id, 'String') AS order_id,
217+
CAST(_full_document.customer_id, 'Int64') AS customer_id,
218+
CAST(_full_document.status, 'String') AS status,
219+
CAST(_full_document.total_amount, 'Decimal64(2)') AS total_amount,
220+
CAST(parseDateTime64BestEffortOrNull(_full_document.order_date, 3), 'DATETIME(3)') AS order_date,
221+
CAST(_full_document.shipping.method, 'String') AS shipping_method,
222+
CAST(_full_document.shipping.city, 'String') AS shipping_city,
223+
CAST(_full_document.shipping.cost, 'Decimal64(2)') AS shipping_cost,
224+
_full_document.items AS items
225+
FROM t1 FINAL
226+
WHERE _peerdb_is_deleted = 0;
227+
```
228+
229+
This view will have the following schema:
230+
231+
```shell
232+
┌─name────────────┬─type───────────┐
233+
│ object_id │ String │
234+
│ order_id │ String │
235+
│ customer_id │ Int64 │
236+
│ status │ String │
237+
│ total_amount │ Decimal(18, 2) │
238+
│ order_date │ DateTime64(3) │
239+
│ shipping_method │ String │
240+
│ shipping_city │ String │
241+
│ shipping_cost │ Decimal(18, 2) │
242+
│ items │ Dynamic │
243+
└─────────────────┴────────────────┘
244+
```
245+
246+
You can now query the view similar to how you would query a flattened table:
247+
248+
```sql
249+
SELECT
250+
customer_id,
251+
sum(total_amount)
252+
FROM v1
253+
WHERE shipping_city = 'Seattle'
254+
GROUP BY customer_id
255+
ORDER BY customer_id DESC
256+
LIMIT 10;
257+
```
258+
259+
### Refreshable materialized view {#refreshable-materialized-view}
260+
261+
You can also create [Refreshable Materialized Views](https://clickhouse.com/docs/materialized-view/refreshable-materialized-view), which enable you to schedule query execution for deduplicating rows and storing the results in a flattened destination table. With each scheduled refresh, the destination table is replaced with the latest query results.
262+
263+
The key advantage of this method is that the query using the `FINAL` keyword runs only once during the refresh, eliminating the need for subsequent queries on the destination table to use `FINAL`.
264+
265+
However, a drawback is that the data in the destination table is only as up-to-date as the most recent refresh. For many use cases, refresh intervals ranging from several minutes to a few hours provide a good balance between data freshness and query performance.
266+
267+
```sql
268+
CREATE TABLE flattened_t1 (
269+
`_id` String,
270+
`order_id` String,
271+
`customer_id` Int64,
272+
`status` String,
273+
`total_amount` Decimal(18, 2),
274+
`order_date` DateTime64(3),
275+
`shipping_method` String,
276+
`shipping_city` String,
277+
`shipping_cost` Decimal(18, 2),
278+
`items` Dynamic
279+
)
280+
ENGINE = ReplacingMergeTree()
281+
PRIMARY KEY _id
282+
ORDER BY _id;
283+
284+
CREATE MATERIALIZED VIEW mv1 REFRESH EVERY 1 HOUR TO flattened_t1 AS
285+
SELECT
286+
CAST(_full_document._id, 'String') AS _id,
287+
CAST(_full_document.order_id, 'String') AS order_id,
288+
CAST(_full_document.customer_id, 'Int64') AS customer_id,
289+
CAST(_full_document.status, 'String') AS status,
290+
CAST(_full_document.total_amount, 'Decimal64(2)') AS total_amount,
291+
CAST(parseDateTime64BestEffortOrNull(_full_document.order_date, 3), 'DATETIME(3)') AS order_date,
292+
CAST(_full_document.shipping.method, 'String') AS shipping_method,
293+
CAST(_full_document.shipping.city, 'String') AS shipping_city,
294+
CAST(_full_document.shipping.cost, 'Decimal64(2)') AS shipping_cost,
295+
_full_document.items AS items
296+
FROM t1 FINAL
297+
WHERE _peerdb_is_deleted = 0;
298+
```
299+
300+
You can now query the table `flattened_t1` directly without the `FINAL` modifier:
301+
302+
```sql
303+
SELECT
304+
customer_id,
305+
sum(total_amount)
306+
FROM flattened_t1
307+
WHERE shipping_city = 'Seattle'
308+
GROUP BY customer_id
309+
ORDER BY customer_id DESC
310+
LIMIT 10;
311+
```

sidebars.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -810,6 +810,7 @@ const sidebars = {
810810
items: [
811811
"integrations/data-ingestion/clickpipes/mongodb/index",
812812
"integrations/data-ingestion/clickpipes/mongodb/datatypes",
813+
"integrations/data-ingestion/clickpipes/mongodb/quickstart",
813814
{
814815
type: "category",
815816
label: "Operations",

0 commit comments

Comments
 (0)