Skip to content

Commit 68ae525

Browse files
author
Alexander Damian
committed
Added member functions for static consumers
1 parent 7d097df commit 68ae525

File tree

2 files changed

+51
-14
lines changed

2 files changed

+51
-14
lines changed

include/cppkafka/utils/poll_strategy_base.h

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,29 @@ class CPPKAFKA_API PollStrategyBase : public PollInterface {
8484
*/
8585
Consumer& get_consumer() final;
8686

87+
/**
88+
* \brief Creates partitions queues associated with the supplied partitions.
89+
*
90+
* This method contains a default implementation. It adds all the new queues belonging
91+
* to the provided partition list and calls reset_state().
92+
* To be used with static consumers.
93+
*
94+
* \param partitions Assigned topic partitions.
95+
*/
96+
virtual void assign(TopicPartitionList& partitions);
97+
98+
/**
99+
* \brief Removes partitions queues associated with the supplied partitions.
100+
*
101+
* This method contains a default implementation. It removes all the queues
102+
* belonging to the provided partition list and calls reset_state().
103+
* To be used with static consumers.
104+
*
105+
* \param partitions Revoked topic partitions. If the partition list is empty
106+
* all partitions will be revoked.
107+
*/
108+
virtual void revoke(const TopicPartitionList& partitions = {});
109+
87110
protected:
88111
/**
89112
* \brief Get the queues from all assigned partitions
@@ -111,8 +134,8 @@ class CPPKAFKA_API PollStrategyBase : public PollInterface {
111134
/**
112135
* \brief Function to be called when a new partition assignment takes place
113136
*
114-
* This method contains a default implementation. It adds all the new queues belonging
115-
* to the provided partition list and calls reset_state().
137+
* This method contains a default implementation. It calls assign()
138+
* and invokes the user assignment callback.
116139
*
117140
* \param partitions Assigned topic partitions
118141
*/
@@ -121,8 +144,8 @@ class CPPKAFKA_API PollStrategyBase : public PollInterface {
121144
/**
122145
* \brief Function to be called when an old partition assignment gets revoked
123146
*
124-
* This method contains a default implementation. It removes all the queues
125-
* belonging to the provided partition list and calls reset_state().
147+
* This method contains a default implementation. It calls revoke()
148+
* and invokes the user revocation callback.
126149
*
127150
* \param partitions Revoked topic partitions
128151
*/

src/utils/poll_strategy_base.cpp

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -89,29 +89,43 @@ void PollStrategyBase::reset_state() {
8989

9090
}
9191

92-
void PollStrategyBase::on_assignment(TopicPartitionList& partitions) {
92+
void PollStrategyBase::assign(TopicPartitionList& partitions) {
9393
// populate partition queues
9494
for (const auto& partition : partitions) {
9595
// get the queue associated with this partition
9696
partition_queues_.emplace(partition, QueueData{consumer_.get_partition_queue(partition), boost::any()});
9797
}
9898
reset_state();
99+
}
100+
101+
void PollStrategyBase::revoke(const TopicPartitionList& partitions) {
102+
if (partitions.empty()) {
103+
//revoke everything
104+
partition_queues_.clear();
105+
}
106+
else {
107+
for (const auto &partition : partitions) {
108+
// get the queue associated with this partition
109+
auto toppar_it = partition_queues_.find(partition);
110+
if (toppar_it != partition_queues_.end()) {
111+
// remove this queue from the list
112+
partition_queues_.erase(toppar_it);
113+
}
114+
}
115+
}
116+
reset_state();
117+
}
118+
119+
void PollStrategyBase::on_assignment(TopicPartitionList& partitions) {
120+
assign(partitions);
99121
// call original consumer callback if any
100122
if (assignment_callback_) {
101123
assignment_callback_(partitions);
102124
}
103125
}
104126

105127
void PollStrategyBase::on_revocation(const TopicPartitionList& partitions) {
106-
for (const auto& partition : partitions) {
107-
// get the queue associated with this partition
108-
auto toppar_it = partition_queues_.find(partition);
109-
if (toppar_it != partition_queues_.end()) {
110-
// remove this queue from the list
111-
partition_queues_.erase(toppar_it);
112-
}
113-
}
114-
reset_state();
128+
revoke(partitions);
115129
// call original consumer callback if any
116130
if (revocation_callback_) {
117131
revocation_callback_(partitions);

0 commit comments

Comments
 (0)