Skip to content

Conversation

@chernser
Copy link
Contributor

@chernser chernser commented Nov 8, 2025

Summary

  • Adds Sink tasks metrics
  • Adds topic metrics

Sink task and topic metrics are accessible via JMX. Bean name format is com.clickhouse:type=ClickHouseKafkaConnector,name=SinkTask<taskId>,version=<version>.
For example, com.clickhouse:type=ClickHouseKafkaConnector,name=SinkTask0,version=v1.3.5.

Topic metrics bean name format is com.clickhouse:type=ClickHouseKafkaConnector,name=SinkTask<taskId>,version=<version>,topic=<topic>.
For example com.clickhouse:type=ClickHouseKafkaConnector,name=SinkTask0,version=v1.3.5,topic=schemaless_simple_batch_test_1762822854777.

Checklist

Delete items not relevant to your PR:

  • Unit and integration tests covering the common scenarios were added
  • A human-readable description of the changes was provided to include in CHANGELOG
  • For significant changes, documentation in https://github.com/ClickHouse/clickhouse-docs was updated with further explanations or tutorials

@chernser chernser requested review from Copilot and mzitnik November 10, 2025 14:36
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR implements metrics collection and JMX monitoring capabilities for the ClickHouse Kafka connector sink tasks. The changes enable tracking of key performance indicators such as record processing rates, batch statistics, event lag times, and insert durations.

Key changes:

  • Introduces JMX MBeans for exposing sink task and topic-level statistics
  • Adds tracking of successful/failed records and batches, receive lag, and insert time metrics
  • Updates test infrastructure to register/unregister MBeans properly in test lifecycle

Reviewed Changes

Copilot reviewed 11 out of 12 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
TopicStatisticsMBean.java Defines MBean interface for topic-level metrics (success/failure counts, lag, insert time)
TopicStatistics.java Implements topic statistics tracking using atomic counters and exponential moving averages
SinkTaskStatisticsMBean.java Adds getInsertedRecords method to sink task MBean interface
SinkTaskStatistics.java Enhances statistics collection with thread-safe atomics, topic-level tracking, and MBean registration
Processing.java Integrates statistics recording for insert operations including event receive lag calculation
ClickHouseWriter.java Records insert time metrics across all insert method variants
ProxySinkTask.java Refactors to use final fields and handles MBean lifecycle management
ClickHouseSinkTask.java Exposes taskId accessor for test verification
ProcessingTest.java Updates tests to properly manage SinkTaskStatistics MBean lifecycle
ClickHouseWriterTest.java Updates constructor calls to pass SinkTaskStatistics instances
ClickHouseSinkTaskTest.java Adds comprehensive tests for metrics validation and mean lag time calculation

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@chernser chernser marked this pull request as ready for review November 11, 2025 00:53
final Record firstRecord = records.get(0);
long eventReceiveLag = 0;
if (firstRecord.getSinkRecord().timestamp() != null) {
eventReceiveLag = System.currentTimeMillis() - firstRecord.getSinkRecord().timestamp();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we know where the timestamp comes from? Record metadata, or when created, just want to validate since we can have clock skew in some siuations

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will find out.
As for time skew - this would be a problem. But it can be normalized I think if skew stays in the range.

count = values.length;
}

for (int i = 0; i < count; i++) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We keep a sum
Every time we add a new value, we add it to the sum and subtract the oldest from the array.
This way, we can avoid having to sum up every time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good idea but need to determine what is the oldest in the array.
I will work on it while improving statistics.

Copy link
Collaborator

@mzitnik mzitnik Nov 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just keep a pointer also to the oldest one

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually I will just subtract value of the element where I'm going to write a new value - it will be the oldest value in circular array.

@chernser chernser requested a review from mzitnik November 12, 2025 21:19
private final long[] values;
private final AtomicInteger head;
private final AtomicLong sum;
private final int n;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is n used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

holds values.length - 1 value. just not to do it all the time.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are setting it once and never using it?

this.sum = new AtomicLong();
}

public void add(long value) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain the logic here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getting index where to write a new value
subtract existing value - because will override it
add new value
write value to the array.

It is what I think you are suggesting. The oldest element is that we are going to replace.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it is - just let's explain the algoritim in source code (for future referance)

this.sum = new AtomicLong();
}

public void add(long value) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it is - just let's explain the algoritim in source code (for future referance)

private final long[] values;
private final AtomicInteger head;
private final AtomicLong sum;
private final int n;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are setting it once and never using it?

@chernser chernser merged commit c994103 into main Nov 13, 2025
8 checks passed
@chernser chernser deleted the impl_metrics branch November 13, 2025 17:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants