diff --git a/DEVELOPER.md b/DEVELOPER.md index 201816c73..340925923 100644 --- a/DEVELOPER.md +++ b/DEVELOPER.md @@ -55,7 +55,24 @@ C_INCLUDE_PATH=/path/to/include LIBRARY_PATH=/path/to/lib python -m build ```bash python3 -c "import confluent_kafka; print('Setup successful!')" - ``` + +#### Local Setup with UV + +Alternative setup instructions tested with python 3.11 + +```bash +# Modify pyproject.toml to require python version >=3.11 +# This fixes the cel-python dependency conflict +uv venv --python 3.11 +source .venv/bin/activate + +uv sync --extra dev --extra tests +uv pip install trivup setuptools +pytest tests/ + +# When making changes, change project.version in pyproject.toml before re-running: +uv sync --extra dev --extra tests +``` diff --git a/src/confluent_kafka/src/Producer.c b/src/confluent_kafka/src/Producer.c index 0e3d4a307..8b3f71885 100644 --- a/src/confluent_kafka/src/Producer.c +++ b/src/confluent_kafka/src/Producer.c @@ -420,6 +420,37 @@ static PyObject *Producer_flush (Handle *self, PyObject *args, return cfl_PyInt_FromInt(qlen); } + +static PyObject *Producer_close(Handle *self, PyObject *args, PyObject *kwargs) { + rd_kafka_resp_err_t err; + CallState cs; + + if (!self->rk) + Py_RETURN_TRUE; + + CallState_begin(self, &cs); + + /* Flush any pending messages (wait indefinitely to ensure delivery) */ + err = rd_kafka_flush(self->rk, -1); + + /* Destroy the producer (even if flush had issues) */ + rd_kafka_destroy(self->rk); + self->rk = NULL; + + if (!CallState_end(self, &cs)) + return NULL; + + /* If flush failed, warn but don't suppress original exception */ + if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { + PyErr_WarnFormat(PyExc_RuntimeWarning, 1, + "Producer flush failed during close: %s", + rd_kafka_err2str(err)); + } + + Py_RETURN_TRUE; +} + + /** * @brief Validate arguments and parse all messages in the batch * @param self Producer handle @@ -906,27 +937,7 @@ static PyObject *Producer_exit (Handle *self, PyObject *args) { &exc_type, &exc_value, &exc_traceback)) return NULL; - /* Cleanup: flush pending messages and destroy producer */ - if (self->rk) { - CallState_begin(self, &cs); - - /* Flush any pending messages (wait indefinitely to ensure delivery) */ - err = rd_kafka_flush(self->rk, -1); - - /* Destroy the producer (even if flush had issues) */ - rd_kafka_destroy(self->rk); - self->rk = NULL; - - if (!CallState_end(self, &cs)) - return NULL; - - /* If flush failed, warn but don't suppress original exception */ - if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { - PyErr_WarnFormat(PyExc_RuntimeWarning, 1, - "Producer flush failed during context exit: %s", - rd_kafka_err2str(err)); - } - } + Producer_close(self, (PyObject *)NULL, (PyObject *)NULL); /* Return None to propagate any exceptions from the with block */ Py_RETURN_NONE; @@ -983,7 +994,15 @@ static PyMethodDef Producer_methods[] = { " :rtype: int\n" "\n" }, - + { "close", (PyCFunction)Producer_close, METH_VARARGS|METH_KEYWORDS, + ".. py:function:: close()\n" + "\n" + " Request to close the producer on demand.\n" + "\n" + " :rtype: bool\n" + " :returns: True if producer close requested successfully, False otherwise\n" + "\n" + }, { "flush", (PyCFunction)Producer_flush, METH_VARARGS|METH_KEYWORDS, ".. py:function:: flush([timeout])\n" "\n" diff --git a/tests/test_Producer.py b/tests/test_Producer.py index f94e4903f..0d7f1d843 100644 --- a/tests/test_Producer.py +++ b/tests/test_Producer.py @@ -55,6 +55,8 @@ def on_delivery(err, msg): except KafkaException as e: assert e.args[0].code() in (KafkaError._TIMED_OUT, KafkaError._TRANSPORT) + assert p.close(), "Failed to validate that producer was closed." + def test_produce_timestamp(): """ Test produce() with timestamp arg """ @@ -239,6 +241,8 @@ def test_transaction_api(): assert ex.value.args[0].fatal() is False assert ex.value.args[0].txn_requires_abort() is False + assert p.close(), "The producer was not closed" + def test_purge(): """ @@ -274,6 +278,8 @@ def on_delivery(err, msg): p.flush(0.002) assert cb_detector["on_delivery_called"] + assert p.close(), "The producer was not closed" + def test_producer_bool_value(): """ @@ -283,6 +289,7 @@ def test_producer_bool_value(): p = Producer({}) assert bool(p) + assert p.close(), "The producer was not fully closed" def test_produce_batch_basic_types_and_data(): @@ -1395,3 +1402,24 @@ def __init__(self, config): # Test __len__() - should return 0 for closed producer (safe, no crash) assert len(producer) == 0 + + +def test_producer_close(): + """ + Ensures the producer close can be requested on demand + """ + conf = { + 'debug': 'all', + 'socket.timeout.ms': 10, + 'error_cb': error_cb, + 'message.timeout.ms': 10 + } + producer = Producer(conf) + cb_detector = {"on_delivery_called": False} + + def on_delivery(err, msg): + cb_detector["on_delivery_called"] = True + + producer.produce('mytopic', value='somedata', key='a key', callback=on_delivery) + assert producer.close(), "The producer could not be closed on demand" + assert cb_detector["on_delivery_called"], "The delivery callback should have been called by flushing during close"