Skip to content

Commit 6499ef9

Browse files
author
Alexander Damian
committed
Merge branch 'master' of https://github.com/mfontanini/cppkafka into pc_config
2 parents 24e94fb + b91350d commit 6499ef9

File tree

5 files changed

+49
-15
lines changed

5 files changed

+49
-15
lines changed

include/cppkafka/error.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ namespace cppkafka {
4242
*/
4343
class CPPKAFKA_API Error {
4444
public:
45+
/**
46+
* @brief Constructs an error object with RD_KAFKA_RESP_ERR_NO_ERROR
47+
*/
48+
Error() = default;
4549
/**
4650
* Constructs an error object
4751
*/
@@ -77,7 +81,7 @@ class CPPKAFKA_API Error {
7781
*/
7882
CPPKAFKA_API friend std::ostream& operator<<(std::ostream& output, const Error& rhs);
7983
private:
80-
rd_kafka_resp_err_t error_;
84+
rd_kafka_resp_err_t error_{RD_KAFKA_RESP_ERR_NO_ERROR};
8185
};
8286

8387
} // cppkafka

include/cppkafka/exceptions.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,14 @@ class CPPKAFKA_API QueueException : public Exception {
134134
Error error_;
135135
};
136136

137+
/**
138+
* Backoff performer has no more retries left for a specific action.
139+
*/
140+
class CPPKAFKA_API ActionTerminatedException : public Exception {
141+
public:
142+
ActionTerminatedException(const std::string& error);
143+
};
144+
137145
} // cppkafka
138146

139147
#endif // CPPKAFKA_EXCEPTIONS_H

include/cppkafka/utils/backoff_committer.h

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -100,10 +100,18 @@ class CPPKAFKA_API BackoffCommitter : public BackoffPerformer {
100100
*/
101101
void set_error_callback(ErrorCallback callback);
102102

103+
/**
104+
* \brief Commits the current partition assignment synchronously
105+
*
106+
* This will call Consumer::commit() until either the message is successfully
107+
* committed or the error callback returns false (if any is set).
108+
*/
109+
void commit();
110+
103111
/**
104112
* \brief Commits the given message synchronously
105113
*
106-
* This will call Consumer::commit until either the message is successfully
114+
* This will call Consumer::commit(msg) until either the message is successfully
107115
* committed or the error callback returns false (if any is set).
108116
*
109117
* \param msg The message to be committed
@@ -113,7 +121,7 @@ class CPPKAFKA_API BackoffCommitter : public BackoffPerformer {
113121
/**
114122
* \brief Commits the offsets on the given topic/partitions synchronously
115123
*
116-
* This will call Consumer::commit until either the offsets are successfully
124+
* This will call Consumer::commit(topic_partitions) until either the offsets are successfully
117125
* committed or the error callback returns false (if any is set).
118126
*
119127
* \param topic_partitions The topic/partition list to be committed
@@ -127,25 +135,30 @@ class CPPKAFKA_API BackoffCommitter : public BackoffPerformer {
127135
*/
128136
Consumer& get_consumer();
129137
private:
130-
// Return true to abort and false to continue committing
131-
template <typename T>
132-
bool do_commit(const T& object) {
138+
// If the ReturnType contains 'true', we abort committing. Otherwise we continue.
139+
// The second member of the ReturnType contains the RdKafka error if any.
140+
template <typename...Args>
141+
bool do_commit(Args&&...args) {
133142
try {
134-
consumer_.commit(object);
135-
// If the commit succeeds, we're done
143+
consumer_.commit(std::forward<Args>(args)...);
144+
// If the commit succeeds, we're done.
136145
return true;
137146
}
138147
catch (const HandleException& ex) {
148+
Error error = ex.get_error();
139149
// If there were actually no offsets to commit, return. Retrying won't solve
140-
// anything here
141-
if (ex.get_error() == RD_KAFKA_RESP_ERR__NO_OFFSET) {
142-
return true;
150+
// anything here.
151+
if (error == RD_KAFKA_RESP_ERR__NO_OFFSET) {
152+
return true; //not considered an error.
143153
}
144154
// If there's a callback and it returns false for this message, abort.
145155
// Otherwise keep committing.
146156
CallbackInvoker<ErrorCallback> callback("backoff committer", callback_, &consumer_);
147-
return callback && !callback(ex.get_error());
157+
if (callback && !callback(error)) {
158+
throw ex; //abort
159+
}
148160
}
161+
return false; //continue
149162
}
150163

151164
Consumer& consumer_;

include/cppkafka/utils/backoff_performer.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
#include <functional>
3535
#include <thread>
3636
#include "../consumer.h"
37+
#include "../exceptions.h"
3738

3839
namespace cppkafka {
3940

@@ -123,7 +124,7 @@ class CPPKAFKA_API BackoffPerformer {
123124
auto start = std::chrono::steady_clock::now();
124125
// If the callback returns true, we're done
125126
if (callback()) {
126-
return;
127+
return; //success
127128
}
128129
auto end = std::chrono::steady_clock::now();
129130
auto time_elapsed = end - start;
@@ -134,6 +135,8 @@ class CPPKAFKA_API BackoffPerformer {
134135
// Increase out backoff depending on the policy being used
135136
backoff = increase_backoff(backoff);
136137
}
138+
// No more retries left or we have a terminal error.
139+
throw ActionTerminatedException("Commit failed: no more retries.");
137140
}
138141
private:
139142
TimeUnit increase_backoff(TimeUnit backoff);

src/utils/backoff_committer.cpp

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,20 @@ void BackoffCommitter::set_error_callback(ErrorCallback callback) {
4343
callback_ = move(callback);
4444
}
4545

46+
void BackoffCommitter::commit() {
47+
perform([&] {
48+
return do_commit();
49+
});
50+
}
51+
4652
void BackoffCommitter::commit(const Message& msg) {
47-
perform([&] {
53+
perform([&] {
4854
return do_commit(msg);
4955
});
5056
}
5157

5258
void BackoffCommitter::commit(const TopicPartitionList& topic_partitions) {
53-
perform([&] {
59+
perform([&] {
5460
return do_commit(topic_partitions);
5561
});
5662
}

0 commit comments

Comments
 (0)