Skip to content

Commit 62c7c3d

Browse files
authored
Merge pull request #4561 from amolsr/main
Doc Update: Clickhouse kafka connect performance tuning
2 parents 5835d96 + e7de6a3 commit 62c7c3d

File tree

2 files changed

+310
-14
lines changed

2 files changed

+310
-14
lines changed

docs/integrations/data-ingestion/kafka/kafka-clickhouse-connect-sink.md

Lines changed: 307 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -329,35 +329,330 @@ ClickHouse Kafka Connect reports the following metrics:
329329
- Batch size is inherited from the Kafka Consumer properties.
330330
- When using KeeperMap for exactly-once and the offset is changed or re-wound, you need to delete the content from KeeperMap for that specific topic. (See troubleshooting guide below for more details)
331331

332-
### Tuning performance {#tuning-performance}
332+
### Performance tuning and throughput optimization {#tuning-performance}
333333

334-
If you've ever though to yourself "I would like to adjust the batch size for the sink connector", then this is the section for you.
334+
This section covers performance tuning strategies for the ClickHouse Kafka Connect Sink. Performance tuning is essential when dealing with high-throughput use cases or when you need to optimize resource utilization and minimize lag.
335335

336-
##### Connect fetch vs connector poll {#connect-fetch-vs-connector-poll}
336+
#### When is performance tuning needed? {#when-is-performance-tuning-needed}
337337

338-
Kafka Connect (the framework our sink connector is built on) will fetch messages from kafka topics in the background (independent of the connector).
338+
Performance tuning is typically required in the following scenarios:
339339

340-
You can control this process using `fetch.min.bytes` and `fetch.max.bytes` - while `fetch.min.bytes` sets the minimum amount required before the framework will pass values to the connector (up to a time limit set by `fetch.max.wait.ms`), `fetch.max.bytes` sets the upper size limit. If you wanted to pass larger batches to the connector, an option could be to increase the minimum fetch or maximum wait to build bigger data bundles.
340+
- **High-throughput workloads**: When processing millions of events per second from Kafka topics
341+
- **Consumer lag**: When your connector can't keep up with the rate of data production, causing increasing lag
342+
- **Resource constraints**: When you need to optimize CPU, memory, or network usage
343+
- **Multiple topics**: When consuming from multiple high-volume topics simultaneously
344+
- **Small message sizes**: When dealing with many small messages that would benefit from server-side batching
341345

342-
This fetched data is then consumed by the connector client polling for messages, where the amount for each poll is controlled by `max.poll.records` - please note that fetch is independent of poll, though!
346+
Performance tuning is **NOT typically needed** when:
343347

344-
When tuning these settings, users should aim so their fetch size produces multiple batches of `max.poll.records` (and keep in mind, the settings `fetch.min.bytes` and `fetch.max.bytes` represent compressed data) - that way, each connector task is inserting as large a batch as possible.
348+
- You're processing low to moderate volumes (< 10,000 messages/second)
349+
- Consumer lag is stable and acceptable for your use case
350+
- Default connector settings already meet your throughput requirements
351+
- Your ClickHouse cluster can easily handle the incoming load
345352

346-
ClickHouse is optimized for larger batches, even at a slight delay, rather than frequent but smaller batches - the larger the batch, the better.
353+
#### Understanding the data flow {#understanding-the-data-flow}
354+
355+
Before tuning, it's important to understand how data flows through the connector:
356+
357+
1. **Kafka Connect Framework** fetches messages from Kafka topics in the background
358+
2. **Connector polls** for messages from the framework's internal buffer
359+
3. **Connector batches** messages based on poll size
360+
4. **ClickHouse receives** the batched insert via HTTP/S
361+
5. **ClickHouse processes** the insert (synchronously or asynchronously)
362+
363+
Performance can be optimized at each of these stages.
364+
365+
#### Kafka Connect batch size tuning {#connect-fetch-vs-connector-poll}
366+
367+
The first level of optimization is controlling how much data the connector receives per batch from Kafka.
368+
369+
##### Fetch settings {#fetch-settings}
370+
371+
Kafka Connect (the framework) fetches messages from Kafka topics in the background, independent of the connector:
372+
373+
- **`fetch.min.bytes`**: Minimum amount of data before the framework passes values to the connector (default: 1 byte)
374+
- **`fetch.max.bytes`**: Maximum amount of data to fetch in a single request (default: 52428800 / 50 MB)
375+
- **`fetch.max.wait.ms`**: Maximum time to wait before returning data if `fetch.min.bytes` is not met (default: 500 ms)
376+
377+
##### Poll settings {#poll-settings}
378+
379+
The connector polls for messages from the framework's buffer:
380+
381+
- **`max.poll.records`**: Maximum number of records returned in a single poll (default: 500)
382+
- **`max.partition.fetch.bytes`**: Maximum amount of data per partition (default: 1048576 / 1 MB)
383+
384+
##### Recommended settings for high throughput {#recommended-batch-settings}
385+
386+
For optimal performance with ClickHouse, aim for larger batches:
347387

348388
```properties
389+
# Increase the number of records per poll
349390
consumer.max.poll.records=5000
391+
392+
# Increase the partition fetch size (5 MB)
350393
consumer.max.partition.fetch.bytes=5242880
394+
395+
# Optional: Increase minimum fetch size to wait for more data (1 MB)
396+
consumer.fetch.min.bytes=1048576
397+
398+
# Optional: Reduce wait time if latency is critical
399+
consumer.fetch.max.wait.ms=300
400+
```
401+
402+
**Important**: Kafka Connect fetch settings represent compressed data, while ClickHouse receives uncompressed data. Balance these settings based on your compression ratio.
403+
404+
**Trade-offs**:
405+
- **Larger batches** = Better ClickHouse ingestion performance, fewer parts, lower overhead
406+
- **Larger batches** = Higher memory usage, potential increased end-to-end latency
407+
- **Too large batches** = Risk of timeouts, OutOfMemory errors, or exceeding `max.poll.interval.ms`
408+
409+
More details: [Confluent documentation](https://docs.confluent.io/platform/current/connect/references/allconfigs.html#override-the-worker-configuration) | [Kafka documentation](https://kafka.apache.org/documentation/#consumerconfigs)
410+
411+
#### Asynchronous inserts {#asynchronous-inserts}
412+
413+
Asynchronous inserts are a powerful feature when the connector sends relatively small batches or when you want to further optimize ingestion by shifting batching responsibility to ClickHouse.
414+
415+
##### When to use async inserts {#when-to-use-async-inserts}
416+
417+
Consider enabling async inserts when:
418+
419+
- **Many small batches**: Your connector sends frequent small batches (< 1000 rows per batch)
420+
- **High concurrency**: Multiple connector tasks are writing to the same table
421+
- **Distributed deployment**: Running many connector instances across different hosts
422+
- **Part creation overhead**: You're experiencing "too many parts" errors
423+
- **Mixed workload**: Combining real-time ingestion with query workloads
424+
425+
Do **NOT** use async inserts when:
426+
427+
- You're already sending large batches (> 10,000 rows per batch) with controlled frequency
428+
- You require immediate data visibility (queries must see data instantly)
429+
- Exactly-once semantics with `wait_for_async_insert=0` conflicts with your requirements
430+
- Your use case can benefit from client-side batching improvements instead
431+
432+
##### How async inserts work {#how-async-inserts-work}
433+
434+
With asynchronous inserts enabled, ClickHouse:
435+
436+
1. Receives the insert query from the connector
437+
2. Writes data to an in-memory buffer (instead of immediately to disk)
438+
3. Returns success to the connector (if `wait_for_async_insert=0`)
439+
4. Flushes the buffer to disk when one of these conditions is met:
440+
- Buffer reaches `async_insert_max_data_size` (default: 10 MB)
441+
- `async_insert_busy_timeout_ms` milliseconds elapsed since first insert (default: 1000 ms)
442+
- Maximum number of queries accumulated (`async_insert_max_query_number`, default: 100)
443+
444+
This significantly reduces the number of parts created and improves overall throughput.
445+
446+
##### Enabling async inserts {#enabling-async-inserts}
447+
448+
Add async insert settings to the `clickhouseSettings` configuration parameter:
449+
450+
```json
451+
{
452+
"name": "clickhouse-connect",
453+
"config": {
454+
"connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
455+
...
456+
"clickhouseSettings": "async_insert=1,wait_for_async_insert=1"
457+
}
458+
}
459+
```
460+
461+
**Key settings**:
462+
463+
- **`async_insert=1`**: Enable asynchronous inserts
464+
- **`wait_for_async_insert=1`** (recommended): Connector waits for data to be flushed to ClickHouse storage before acknowledging. Provides delivery guarantees.
465+
- **`wait_for_async_insert=0`**: Connector acknowledges immediately after buffering. Better performance but data may be lost on server crash before flush.
466+
467+
##### Tuning async insert behavior {#tuning-async-inserts}
468+
469+
You can fine-tune the async insert flush behavior:
470+
471+
```json
472+
"clickhouseSettings": "async_insert=1,wait_for_async_insert=1,async_insert_max_data_size=10485760,async_insert_busy_timeout_ms=1000"
473+
```
474+
475+
Common tuning parameters:
476+
477+
- **`async_insert_max_data_size`** (default: 10485760 / 10 MB): Maximum buffer size before flush
478+
- **`async_insert_busy_timeout_ms`** (default: 1000): Maximum time (ms) before flush
479+
- **`async_insert_stale_timeout_ms`** (default: 0): Time (ms) since last insert before flush
480+
- **`async_insert_max_query_number`** (default: 100): Maximum queries before flush
481+
482+
**Trade-offs**:
483+
484+
- **Benefits**: Fewer parts, better merge performance, lower CPU overhead, improved throughput under high concurrency
485+
- **Considerations**: Data not immediately queryable, slightly increased end-to-end latency
486+
- **Risks**: Data loss on server crash if `wait_for_async_insert=0`, potential memory pressure with large buffers
487+
488+
##### Async inserts with exactly-once semantics {#async-inserts-with-exactly-once}
489+
490+
When using `exactlyOnce=true` with async inserts:
491+
492+
```json
493+
{
494+
"config": {
495+
"exactlyOnce": "true",
496+
"clickhouseSettings": "async_insert=1,wait_for_async_insert=1"
497+
}
498+
}
499+
```
500+
501+
**Important**: Always use `wait_for_async_insert=1` with exactly-once to ensure offset commits happen only after data is persisted.
502+
503+
For more information about async inserts, see the [ClickHouse async inserts documentation](/best-practices/selecting-an-insert-strategy#asynchronous-inserts).
504+
505+
#### Connector parallelism {#connector-parallelism}
506+
507+
Increase parallelism to improve throughput:
508+
509+
##### Tasks per connector {#tasks-per-connector}
510+
511+
```json
512+
"tasks.max": "4"
351513
```
352514

353-
More details can be found in the [Confluent documentation](https://docs.confluent.io/platform/current/connect/references/allconfigs.html#override-the-worker-configuration)
354-
or in the [Kafka documentation](https://kafka.apache.org/documentation/#consumerconfigs).
515+
Each task processes a subset of topic partitions. More tasks = more parallelism, but:
516+
517+
- Maximum effective tasks = number of topic partitions
518+
- Each task maintains its own connection to ClickHouse
519+
- More tasks = higher overhead and potential resource contention
520+
521+
**Recommendation**: Start with `tasks.max` equal to the number of topic partitions, then adjust based on CPU and throughput metrics.
522+
523+
##### Ignoring partitions when batching {#ignoring-partitions}
524+
525+
By default, the connector batches messages per partition. For higher throughput, you can batch across partitions:
526+
527+
```json
528+
"ignorePartitionsWhenBatching": "true"
529+
```
530+
531+
** Warning**: Only use when `exactlyOnce=false`. This setting can improve throughput by creating larger batches but loses per-partition ordering guarantees.
355532

356533
#### Multiple high throughput topics {#multiple-high-throughput-topics}
357534

358-
If your connector is configured to subscribe to multiple topics, you're using `topic2TableMap` to map topics to tables, and you're experiencing a bottleneck at insertion resulting in consumer lag, consider creating one connector per topic instead. The main reason why this happens is that currently batches are inserted into every table [serially](https://github.com/ClickHouse/clickhouse-kafka-connect/blob/578ac07e8be1a920aaa3b26e49183595c3edd04b/src/main/java/com/clickhouse/kafka/connect/sink/ProxySinkTask.java#L95-L100).
535+
If your connector is configured to subscribe to multiple topics, you're using `topic2TableMap` to map topics to tables, and you're experiencing a bottleneck at insertion resulting in consumer lag, consider creating one connector per topic instead.
536+
537+
The main reason why this happens is that currently batches are inserted into every table [serially](https://github.com/ClickHouse/clickhouse-kafka-connect/blob/578ac07e8be1a920aaa3b26e49183595c3edd04b/src/main/java/com/clickhouse/kafka/connect/sink/ProxySinkTask.java#L95-L100).
538+
539+
**Recommendation**: For multiple high-volume topics, deploy one connector instance per topic to maximize parallel insert throughput.
540+
541+
#### ClickHouse table engine considerations {#table-engine-considerations}
542+
543+
Choose the appropriate ClickHouse table engine for your use case:
544+
545+
- **`MergeTree`**: Best for most use cases, balances query and insert performance
546+
- **`ReplicatedMergeTree`**: Required for high availability, adds replication overhead
547+
- **`*MergeTree` with proper `ORDER BY`**: Optimize for your query patterns
548+
549+
**Settings to consider**:
550+
551+
```sql
552+
CREATE TABLE my_table (...)
553+
ENGINE = MergeTree()
554+
ORDER BY (timestamp, id)
555+
SETTINGS
556+
-- Increase max insert threads for parallel part writing
557+
max_insert_threads = 4,
558+
-- Allow inserts with quorum for reliability (ReplicatedMergeTree)
559+
insert_quorum = 2
560+
```
561+
562+
For connector-level insert settings:
563+
564+
```json
565+
"clickhouseSettings": "insert_quorum=2,insert_quorum_timeout=60000"
566+
```
567+
568+
#### Connection pooling and timeouts {#connection-pooling}
569+
570+
The connector maintains HTTP connections to ClickHouse. Adjust timeouts for high-latency networks:
571+
572+
```json
573+
"clickhouseSettings": "socket_timeout=300000,connection_timeout=30000"
574+
```
575+
576+
- **`socket_timeout`** (default: 30000 ms): Maximum time for read operations
577+
- **`connection_timeout`** (default: 10000 ms): Maximum time to establish connection
578+
579+
Increase these values if you experience timeout errors with large batches.
580+
581+
#### Monitoring and troubleshooting performance {#monitoring-performance}
582+
583+
Monitor these key metrics:
584+
585+
1. **Consumer lag**: Use Kafka monitoring tools to track lag per partition
586+
2. **Connector metrics**: Monitor `receivedRecords`, `recordProcessingTime`, `taskProcessingTime` via JMX (see [Monitoring](#monitoring))
587+
3. **ClickHouse metrics**:
588+
- `system.asynchronous_inserts`: Monitor async insert buffer usage
589+
- `system.parts`: Monitor part count to detect merge issues
590+
- `system.merges`: Monitor active merges
591+
- `system.events`: Track `InsertedRows`, `InsertedBytes`, `FailedInsertQuery`
592+
593+
**Common performance issues**:
594+
595+
| Symptom | Possible Cause | Solution |
596+
|---------|----------------|----------|
597+
| High consumer lag | Batches too small | Increase `max.poll.records`, enable async inserts |
598+
| "Too many parts" errors | Small frequent inserts | Enable async inserts, increase batch size |
599+
| Timeout errors | Large batch size, slow network | Reduce batch size, increase `socket_timeout`, check network |
600+
| High CPU usage | Too many small parts | Enable async inserts, increase merge settings |
601+
| OutOfMemory errors | Batch size too large | Reduce `max.poll.records`, `max.partition.fetch.bytes` |
602+
| Uneven task load | Uneven partition distribution | Rebalance partitions or adjust `tasks.max` |
603+
604+
#### Best practices summary {#performance-best-practices}
605+
606+
1. **Start with defaults**, then measure and tune based on actual performance
607+
2. **Prefer larger batches**: Aim for 10,000-100,000 rows per insert when possible
608+
3. **Use async inserts** when sending many small batches or under high concurrency
609+
4. **Always use `wait_for_async_insert=1`** with exactly-once semantics
610+
5. **Scale horizontally**: Increase `tasks.max` up to the number of partitions
611+
6. **One connector per high-volume topic** for maximum throughput
612+
7. **Monitor continuously**: Track consumer lag, part count, and merge activity
613+
8. **Test thoroughly**: Always test configuration changes under realistic load before production deployment
614+
615+
#### Example: High-throughput configuration {#example-high-throughput}
616+
617+
Here's a complete example optimized for high throughput:
618+
619+
```json
620+
{
621+
"name": "clickhouse-high-throughput",
622+
"config": {
623+
"connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
624+
"tasks.max": "8",
625+
626+
"topics": "high_volume_topic",
627+
"hostname": "my-clickhouse-host.cloud",
628+
"port": "8443",
629+
"database": "default",
630+
"username": "default",
631+
"password": "<PASSWORD>",
632+
"ssl": "true",
633+
634+
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
635+
"value.converter.schemas.enable": "false",
636+
637+
"exactlyOnce": "false",
638+
"ignorePartitionsWhenBatching": "true",
639+
640+
"consumer.max.poll.records": "10000",
641+
"consumer.max.partition.fetch.bytes": "5242880",
642+
"consumer.fetch.min.bytes": "1048576",
643+
"consumer.fetch.max.wait.ms": "500",
644+
645+
"clickhouseSettings": "async_insert=1,wait_for_async_insert=1,async_insert_max_data_size=16777216,async_insert_busy_timeout_ms=1000,socket_timeout=300000"
646+
}
647+
}
648+
```
359649

360-
Creating one connector per topic is a workaround that ensures that you get the fastest possible insert rate.
650+
**This configuration**:
651+
- Processes up to 10,000 records per poll
652+
- Batches across partitions for larger inserts
653+
- Uses async inserts with 16 MB buffer
654+
- Runs 8 parallel tasks (match your partition count)
655+
- Optimized for throughput over strict ordering
361656

362657
### Troubleshooting {#troubleshooting}
363658

scripts/aspell-ignore/en/aspell-dict.txt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
personal_ws-1.1 en 3746
1+
personal_ws-1.1 en 3749
22
AArch
33
ACLs
44
AICPA
@@ -963,6 +963,7 @@ OperationalError
963963
OrDefault
964964
OrNull
965965
OrZero
966+
OutOfMemory
966967
OvercommitTracker
967968
OverflowMode
968969
OverflowModeGroupBy
@@ -2331,9 +2332,9 @@ haversine
23312332
hdbc
23322333
hdfs
23332334
hdfsCluster
2335+
heatmap
23342336
heredoc
23352337
heredocs
2336-
heatmap
23372338
hilbertDecode
23382339
hilbertEncode
23392340
hiveHash

0 commit comments

Comments
 (0)