Skip to content

Commit b75248e

Browse files
committed
Add FetchMetrics class; move topic_fetch_metrics inside aggregator
1 parent 8be2568 commit b75248e

File tree

1 file changed

+18
-7
lines changed

1 file changed

+18
-7
lines changed

kafka/consumer/fetcher.py

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -843,7 +843,6 @@ def _parse_fetched_data(self, completed_fetch):
843843
record_too_large_partitions,
844844
self.config['max_partition_fetch_bytes']),
845845
record_too_large_partitions)
846-
self._sensors.record_topic_fetch_metrics(tp.topic, num_bytes, records_count)
847846

848847
elif error_type in (Errors.NotLeaderForPartitionError,
849848
Errors.ReplicaNotAvailableError,
@@ -1128,6 +1127,14 @@ def to_forget(self):
11281127
return list(partition_data.items())
11291128

11301129

1130+
class FetchMetrics(object):
1131+
__slots__ = ('total_bytes', 'total_records')
1132+
1133+
def __init__(self):
1134+
self.total_bytes = 0
1135+
self.total_records = 0
1136+
1137+
11311138
class FetchResponseMetricAggregator(object):
11321139
"""
11331140
Since we parse the message data for each partition from each fetch
@@ -1138,8 +1145,8 @@ class FetchResponseMetricAggregator(object):
11381145
def __init__(self, sensors, partitions):
11391146
self.sensors = sensors
11401147
self.unrecorded_partitions = partitions
1141-
self.total_bytes = 0
1142-
self.total_records = 0
1148+
self.fetch_metrics = FetchMetrics()
1149+
self.topic_fetch_metrics = collections.defaultdict(FetchMetrics)
11431150

11441151
def record(self, partition, num_bytes, num_records):
11451152
"""
@@ -1148,13 +1155,17 @@ def record(self, partition, num_bytes, num_records):
11481155
have reported, we write the metric.
11491156
"""
11501157
self.unrecorded_partitions.remove(partition)
1151-
self.total_bytes += num_bytes
1152-
self.total_records += num_records
1158+
self.fetch_metrics.total_bytes += num_bytes
1159+
self.fetch_metrics.total_records += num_records
1160+
self.topic_fetch_metrics[partition.topic].total_bytes += num_bytes
1161+
self.topic_fetch_metrics[partition.topic].total_records += num_records
11531162

11541163
# once all expected partitions from the fetch have reported in, record the metrics
11551164
if not self.unrecorded_partitions:
1156-
self.sensors.bytes_fetched.record(self.total_bytes)
1157-
self.sensors.records_fetched.record(self.total_records)
1165+
self.sensors.bytes_fetched.record(self.fetch_metrics.total_bytes)
1166+
self.sensors.records_fetched.record(self.fetch_metrics.total_records)
1167+
for topic, metrics in six.iteritems(self.topic_fetch_metrics):
1168+
self.sensors.record_topic_fetch_metrics(topic, metrics.total_bytes, metrics.total_records)
11581169

11591170

11601171
class FetchManagerMetrics(object):

0 commit comments

Comments
 (0)