Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion DEVELOPER.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,24 @@ C_INCLUDE_PATH=/path/to/include LIBRARY_PATH=/path/to/lib python -m build

```bash
python3 -c "import confluent_kafka; print('Setup successful!')"
```

#### Local Setup with UV

Alternative setup instructions tested with python 3.11

```bash
# Modify pyproject.toml to require python version >=3.11
# This fixes the cel-python dependency conflict
uv venv --python 3.11
source .venv/bin/activate

uv sync --extra dev --extra tests
uv pip install trivup setuptools
pytest tests/

# When making changes, change project.version in pyproject.toml before re-running:
uv sync --extra dev --extra tests
```

<!-- markdownlint-enable MD029 -->

Expand Down
63 changes: 41 additions & 22 deletions src/confluent_kafka/src/Producer.c
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,37 @@ static PyObject *Producer_flush (Handle *self, PyObject *args,
return cfl_PyInt_FromInt(qlen);
}


static PyObject *Producer_close(Handle *self, PyObject *args, PyObject *kwargs) {
rd_kafka_resp_err_t err;
CallState cs;

if (!self->rk)
Py_RETURN_TRUE;

CallState_begin(self, &cs);

/* Flush any pending messages (wait indefinitely to ensure delivery) */
err = rd_kafka_flush(self->rk, -1);

/* Destroy the producer (even if flush had issues) */
rd_kafka_destroy(self->rk);
self->rk = NULL;

if (!CallState_end(self, &cs))
return NULL;

/* If flush failed, warn but don't suppress original exception */
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
PyErr_WarnFormat(PyExc_RuntimeWarning, 1,
"Producer flush failed during close: %s",
rd_kafka_err2str(err));
}

Py_RETURN_TRUE;
}


/**
* @brief Validate arguments and parse all messages in the batch
* @param self Producer handle
Expand Down Expand Up @@ -906,27 +937,7 @@ static PyObject *Producer_exit (Handle *self, PyObject *args) {
&exc_type, &exc_value, &exc_traceback))
return NULL;

/* Cleanup: flush pending messages and destroy producer */
if (self->rk) {
CallState_begin(self, &cs);

/* Flush any pending messages (wait indefinitely to ensure delivery) */
err = rd_kafka_flush(self->rk, -1);

/* Destroy the producer (even if flush had issues) */
rd_kafka_destroy(self->rk);
self->rk = NULL;

if (!CallState_end(self, &cs))
return NULL;

/* If flush failed, warn but don't suppress original exception */
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
PyErr_WarnFormat(PyExc_RuntimeWarning, 1,
"Producer flush failed during context exit: %s",
rd_kafka_err2str(err));
}
}
Producer_close(self, (PyObject *)NULL, (PyObject *)NULL);

/* Return None to propagate any exceptions from the with block */
Py_RETURN_NONE;
Expand Down Expand Up @@ -983,7 +994,15 @@ static PyMethodDef Producer_methods[] = {
" :rtype: int\n"
"\n"
},

{ "close", (PyCFunction)Producer_close, METH_VARARGS|METH_KEYWORDS,
".. py:function:: close()\n"
"\n"
" Request to close the producer on demand.\n"
"\n"
" :rtype: bool\n"
" :returns: True if producer close requested successfully, False otherwise\n"
"\n"
},
{ "flush", (PyCFunction)Producer_flush, METH_VARARGS|METH_KEYWORDS,
".. py:function:: flush([timeout])\n"
"\n"
Expand Down
27 changes: 26 additions & 1 deletion tests/test_Producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ def on_delivery(err, msg):
except KafkaException as e:
assert e.args[0].code() in (KafkaError._TIMED_OUT, KafkaError._TRANSPORT)

assert p.close(), "Failed to validate that producer was closed."


def test_produce_timestamp():
""" Test produce() with timestamp arg """
Expand Down Expand Up @@ -239,6 +241,8 @@ def test_transaction_api():
assert ex.value.args[0].fatal() is False
assert ex.value.args[0].txn_requires_abort() is False

assert p.close(), "The producer was not closed"


def test_purge():
"""
Expand Down Expand Up @@ -274,6 +278,8 @@ def on_delivery(err, msg):
p.flush(0.002)
assert cb_detector["on_delivery_called"]

assert p.close(), "The producer was not closed"


def test_producer_bool_value():
"""
Expand All @@ -283,7 +289,7 @@ def test_producer_bool_value():

p = Producer({})
assert bool(p)

assert p.close(), "The producer was not fully closed"

def test_produce_batch_basic_types_and_data():
"""Test basic data types, None/empty handling, and partition functionality."""
Expand Down Expand Up @@ -1395,3 +1401,22 @@ def __init__(self, config):

# Test __len__() - should return 0 for closed producer (safe, no crash)
assert len(producer) == 0


def test_producer_close():
"""
Ensures the producer close can be requested on demand
"""
conf = {
'debug': 'all',
'socket.timeout.ms': 10,
'error_cb': error_cb,
'message.timeout.ms': 10
}
producer = Producer(conf)
cb_detector = {"on_delivery_called": False}
def on_delivery(err, msg):
cb_detector["on_delivery_called"] = True
producer.produce('mytopic', value='somedata', key='a key', callback=on_delivery)
assert producer.close(), "The producer could not be closed on demand"
assert cb_detector["on_delivery_called"], "The delivery callback should have been called by flushing during close"