-
Notifications
You must be signed in to change notification settings - Fork 57
[monitoring] Impl metrics #631
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…bles with AtomicLong to make atomic operations
…alculate exp. moving. avg.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements metrics collection and JMX monitoring capabilities for the ClickHouse Kafka connector sink tasks. The changes enable tracking of key performance indicators such as record processing rates, batch statistics, event lag times, and insert durations.
Key changes:
- Introduces JMX MBeans for exposing sink task and topic-level statistics
- Adds tracking of successful/failed records and batches, receive lag, and insert time metrics
- Updates test infrastructure to register/unregister MBeans properly in test lifecycle
Reviewed Changes
Copilot reviewed 11 out of 12 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| TopicStatisticsMBean.java | Defines MBean interface for topic-level metrics (success/failure counts, lag, insert time) |
| TopicStatistics.java | Implements topic statistics tracking using atomic counters and exponential moving averages |
| SinkTaskStatisticsMBean.java | Adds getInsertedRecords method to sink task MBean interface |
| SinkTaskStatistics.java | Enhances statistics collection with thread-safe atomics, topic-level tracking, and MBean registration |
| Processing.java | Integrates statistics recording for insert operations including event receive lag calculation |
| ClickHouseWriter.java | Records insert time metrics across all insert method variants |
| ProxySinkTask.java | Refactors to use final fields and handles MBean lifecycle management |
| ClickHouseSinkTask.java | Exposes taskId accessor for test verification |
| ProcessingTest.java | Updates tests to properly manage SinkTaskStatistics MBean lifecycle |
| ClickHouseWriterTest.java | Updates constructor calls to pass SinkTaskStatistics instances |
| ClickHouseSinkTaskTest.java | Adds comprehensive tests for metrics validation and mean lag time calculation |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
src/main/java/com/clickhouse/kafka/connect/util/jmx/TopicStatistics.java
Outdated
Show resolved
Hide resolved
src/main/java/com/clickhouse/kafka/connect/util/jmx/TopicStatistics.java
Outdated
Show resolved
Hide resolved
src/main/java/com/clickhouse/kafka/connect/util/jmx/SinkTaskStatistics.java
Outdated
Show resolved
Hide resolved
src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskTest.java
Outdated
Show resolved
Hide resolved
| final Record firstRecord = records.get(0); | ||
| long eventReceiveLag = 0; | ||
| if (firstRecord.getSinkRecord().timestamp() != null) { | ||
| eventReceiveLag = System.currentTimeMillis() - firstRecord.getSinkRecord().timestamp(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we know where the timestamp comes from? Record metadata, or when created, just want to validate since we can have clock skew in some siuations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will find out.
As for time skew - this would be a problem. But it can be normalized I think if skew stays in the range.
src/main/java/com/clickhouse/kafka/connect/util/jmx/SinkTaskStatistics.java
Show resolved
Hide resolved
src/main/java/com/clickhouse/kafka/connect/util/jmx/TopicStatistics.java
Show resolved
Hide resolved
| count = values.length; | ||
| } | ||
|
|
||
| for (int i = 0; i < count; i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We keep a sum
Every time we add a new value, we add it to the sum and subtract the oldest from the array.
This way, we can avoid having to sum up every time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good idea but need to determine what is the oldest in the array.
I will work on it while improving statistics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just keep a pointer also to the oldest one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually I will just subtract value of the element where I'm going to write a new value - it will be the oldest value in circular array.
| private final long[] values; | ||
| private final AtomicInteger head; | ||
| private final AtomicLong sum; | ||
| private final int n; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is n used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
holds values.length - 1 value. just not to do it all the time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are setting it once and never using it?
| this.sum = new AtomicLong(); | ||
| } | ||
|
|
||
| public void add(long value) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain the logic here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getting index where to write a new value
subtract existing value - because will override it
add new value
write value to the array.
It is what I think you are suggesting. The oldest element is that we are going to replace.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it is - just let's explain the algoritim in source code (for future referance)
| this.sum = new AtomicLong(); | ||
| } | ||
|
|
||
| public void add(long value) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it is - just let's explain the algoritim in source code (for future referance)
| private final long[] values; | ||
| private final AtomicInteger head; | ||
| private final AtomicLong sum; | ||
| private final int n; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are setting it once and never using it?
Summary
Sink task and topic metrics are accessible via JMX. Bean name format is
com.clickhouse:type=ClickHouseKafkaConnector,name=SinkTask<taskId>,version=<version>.For example,
com.clickhouse:type=ClickHouseKafkaConnector,name=SinkTask0,version=v1.3.5.Topic metrics bean name format is
com.clickhouse:type=ClickHouseKafkaConnector,name=SinkTask<taskId>,version=<version>,topic=<topic>.For example
com.clickhouse:type=ClickHouseKafkaConnector,name=SinkTask0,version=v1.3.5,topic=schemaless_simple_batch_test_1762822854777.Checklist
Delete items not relevant to your PR: