Skip to content

Commit 91ac543

Browse files
authored
Merge pull request mfontanini#307 from BrieucNx/fix-memory-leak
[Memory Leak] Only release header handle if producev returned no errors
2 parents 55cbeb8 + 0f3ae6e commit 91ac543

File tree

1 file changed

+14
-2
lines changed

1 file changed

+14
-2
lines changed

src/producer.cpp

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,10 +131,16 @@ void Producer::do_produce(const MessageBuilder& builder,
131131
RD_KAFKA_V_MSGFLAGS(policy),
132132
RD_KAFKA_V_TIMESTAMP(builder.timestamp().count()),
133133
RD_KAFKA_V_KEY((void*)key.get_data(), key.get_size()),
134-
RD_KAFKA_V_HEADERS(headers.release_handle()), //pass ownership to rdkafka
134+
RD_KAFKA_V_HEADERS(headers.get_handle()), //pass ownership to rdkafka
135135
RD_KAFKA_V_VALUE((void*)payload.get_data(), payload.get_size()),
136136
RD_KAFKA_V_OPAQUE(builder.user_data()),
137137
RD_KAFKA_V_END);
138+
// We only want to release the handle on the headers data
139+
// if the rd_kafka_producev function returned no error, otherwise
140+
// the function doesn't take ownership of the headers data.
141+
if(result == RD_KAFKA_RESP_ERR_NO_ERROR) {
142+
headers.release_handle();
143+
}
138144
check_error(result);
139145
}
140146

@@ -150,10 +156,16 @@ void Producer::do_produce(const Message& message,
150156
RD_KAFKA_V_MSGFLAGS(policy),
151157
RD_KAFKA_V_TIMESTAMP(duration),
152158
RD_KAFKA_V_KEY((void*)key.get_data(), key.get_size()),
153-
RD_KAFKA_V_HEADERS(headers.release_handle()), //pass ownership to rdkafka
159+
RD_KAFKA_V_HEADERS(headers.get_handle()), //pass ownership to rdkafka
154160
RD_KAFKA_V_VALUE((void*)payload.get_data(), payload.get_size()),
155161
RD_KAFKA_V_OPAQUE(message.get_user_data()),
156162
RD_KAFKA_V_END);
163+
// We only want to release the handle on the headers data
164+
// if the rd_kafka_producev function returned no error, otherwise
165+
// the function doesn't take ownership of the headers data.
166+
if(result == RD_KAFKA_RESP_ERR_NO_ERROR) {
167+
headers.release_handle();
168+
}
157169
check_error(result);
158170
}
159171

0 commit comments

Comments
 (0)