@@ -131,10 +131,16 @@ void Producer::do_produce(const MessageBuilder& builder,
131131 RD_KAFKA_V_MSGFLAGS (policy),
132132 RD_KAFKA_V_TIMESTAMP (builder.timestamp ().count ()),
133133 RD_KAFKA_V_KEY ((void *)key.get_data (), key.get_size ()),
134- RD_KAFKA_V_HEADERS (headers.release_handle ()), // pass ownership to rdkafka
134+ RD_KAFKA_V_HEADERS (headers.get_handle ()), // pass ownership to rdkafka
135135 RD_KAFKA_V_VALUE ((void *)payload.get_data (), payload.get_size ()),
136136 RD_KAFKA_V_OPAQUE (builder.user_data ()),
137137 RD_KAFKA_V_END);
138+ // We only want to release the handle on the headers data
139+ // if the rd_kafka_producev function returned no error, otherwise
140+ // the function doesn't take ownership of the headers data.
141+ if (result == RD_KAFKA_RESP_ERR_NO_ERROR) {
142+ headers.release_handle ();
143+ }
138144 check_error (result);
139145}
140146
@@ -150,10 +156,16 @@ void Producer::do_produce(const Message& message,
150156 RD_KAFKA_V_MSGFLAGS (policy),
151157 RD_KAFKA_V_TIMESTAMP (duration),
152158 RD_KAFKA_V_KEY ((void *)key.get_data (), key.get_size ()),
153- RD_KAFKA_V_HEADERS (headers.release_handle ()), // pass ownership to rdkafka
159+ RD_KAFKA_V_HEADERS (headers.get_handle ()), // pass ownership to rdkafka
154160 RD_KAFKA_V_VALUE ((void *)payload.get_data (), payload.get_size ()),
155161 RD_KAFKA_V_OPAQUE (message.get_user_data ()),
156162 RD_KAFKA_V_END);
163+ // We only want to release the handle on the headers data
164+ // if the rd_kafka_producev function returned no error, otherwise
165+ // the function doesn't take ownership of the headers data.
166+ if (result == RD_KAFKA_RESP_ERR_NO_ERROR) {
167+ headers.release_handle ();
168+ }
157169 check_error (result);
158170}
159171
0 commit comments