From 5718f0a7e2abff9202ad72e363103e2c115c1327 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 28 Dec 2023 21:50:24 +0800 Subject: [PATCH 1/4] [asyncio] Support creating producer and sending messages Master issue: https://github.com/apache/pulsar-client-python/issues/55 ### Modifications Introduce a `pulsar.asyncio` module that includes the asynchronous APIs to work with Python asyncio module. Example: ```python async def main(): client = Client('pulsar://localhost:6650') try: producer = await client.create_producer('topic') msg_id = await producer.send('msg'.encode()) await producer.close() except PulsarException as e: error = e.error() await client.close() ``` The creation of `Client` reuses the same keyword arguments from `pulsar.Client` but the options for creating producer and sending messages are not added yet. --- pulsar/__init__.py | 6 ++ pulsar/asyncio.py | 172 ++++++++++++++++++++++++++++++++++++++++ src/client.cc | 14 ++++ src/producer.cc | 6 ++ tests/asyncio_test.py | 86 ++++++++++++++++++++ tests/run-unit-tests.sh | 1 + 6 files changed, 285 insertions(+) create mode 100644 pulsar/asyncio.py create mode 100644 tests/asyncio_test.py diff --git a/pulsar/__init__.py b/pulsar/__init__.py index a44a0db..29bb034 100644 --- a/pulsar/__init__.py +++ b/pulsar/__init__.py @@ -105,6 +105,12 @@ def serialize(self): """ return self._msg_id.serialize() + def __str__(self) -> str: + """ + Returns the string representation of the message id. + """ + return str(self._msg_id) + @staticmethod def deserialize(message_id_bytes): """ diff --git a/pulsar/asyncio.py b/pulsar/asyncio.py new file mode 100644 index 0000000..562ea43 --- /dev/null +++ b/pulsar/asyncio.py @@ -0,0 +1,172 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +The Pulsar Python client APIs that work with the asyncio module. +""" + +import asyncio +import functools +from typing import Any + +import _pulsar +import pulsar + +class PulsarException(BaseException): + """ + The exception that wraps the Pulsar error code + """ + + def __init__(self, result: pulsar.Result) -> None: + """ + Create the Pulsar exception. + + Parameters + ---------- + result: pulsar.Result + The error code of the underlying Pulsar APIs. + """ + self._result = result + + def error(self) -> pulsar.Result: + """ + Returns the Pulsar error code. + """ + return self._result + + def __str__(self): + """ + Convert the exception to string. + """ + return f'{self._result.value} {self._result.name}' + +class Producer: + """ + The Pulsar message producer, used to publish messages on a topic. + """ + + def __init__(self, producer: _pulsar.Producer) -> None: + """ + Create the producer. + Users should not call this constructor directly. Instead, create the + producer via `Client.create_producer`. + + Parameters + ---------- + producer: _pulsar.Producer + The underlying Producer object from the C extension. + """ + self._producer: _pulsar.Producer = producer + + async def send(self, content: bytes) -> pulsar.MessageId: + """ + Send a message asynchronously. + + parameters + ---------- + content: bytes + The message payload + + Returns + ------- + pulsar.MessageId + The message id that represents the persisted position of the message. + + Raises + ------ + PulsarException + """ + builder = _pulsar.MessageBuilder() + builder.content(content) + future = asyncio.get_running_loop().create_future() + self._producer.send_async(builder.build(), functools.partial(_set_future, future)) + msg_id = await future + return pulsar.MessageId( + msg_id.partition(), + msg_id.ledger_id(), + msg_id.entry_id(), + msg_id.batch_index(), + ) + + async def close(self) -> None: + """ + Close the producer. + + Raises + ------ + PulsarException + """ + future = asyncio.get_running_loop().create_future() + self._producer.close_async(functools.partial(_set_future, future, value=None)) + await future + +class Client: + """ + The asynchronous version of `pulsar.Client`. + """ + + def __init__(self, service_url, **kwargs) -> None: + """ + See `pulsar.Client.__init__` + """ + self._client: _pulsar.Client = pulsar.Client(service_url, **kwargs)._client + + async def create_producer(self, topic: str) -> Producer: + """ + Create a new producer on a given topic + + Parameters + ---------- + topic: str + The topic name + + Returns + ------- + Producer + The producer created + + Raises + ------ + PulsarException + """ + future = asyncio.get_running_loop().create_future() + conf = _pulsar.ProducerConfiguration() + # TODO: add more configs + self._client.create_producer_async(topic, conf, functools.partial(_set_future, future)) + return Producer(await future) + + async def close(self) -> None: + """ + Close the client and all the associated producers and consumers + + Raises + ------ + PulsarException + """ + future = asyncio.get_running_loop().create_future() + self._client.close_async(functools.partial(_set_future, future, value=None)) + await future + +def _set_future(future: asyncio.Future[Any], result: _pulsar.Result, value: Any): + def complete(): + if result == _pulsar.Result.Ok: + future.set_result(value) + else: + future.set_exception(PulsarException(result)) + future.get_loop().call_soon_threadsafe(complete) diff --git a/src/client.cc b/src/client.cc index 626ff9f..b25c63a 100644 --- a/src/client.cc +++ b/src/client.cc @@ -18,6 +18,7 @@ */ #include "utils.h" +#include #include #include @@ -28,6 +29,12 @@ Producer Client_createProducer(Client& client, const std::string& topic, const P [&](CreateProducerCallback callback) { client.createProducerAsync(topic, conf, callback); }); } +void Client_createProducerAsync(Client& client, const std::string& topic, ProducerConfiguration conf, + CreateProducerCallback callback) { + py::gil_scoped_release release; + client.createProducerAsync(topic, conf, callback); +} + Consumer Client_subscribe(Client& client, const std::string& topic, const std::string& subscriptionName, const ConsumerConfiguration& conf) { return waitForAsyncValue( @@ -68,10 +75,16 @@ void Client_close(Client& client) { waitForAsyncResult([&](ResultCallback callback) { client.closeAsync(callback); }); } +void Client_closeAsync(Client& client, ResultCallback callback) { + py::gil_scoped_release release; + client.closeAsync(callback); +} + void export_client(py::module_& m) { py::class_>(m, "Client") .def(py::init()) .def("create_producer", &Client_createProducer) + .def("create_producer_async", &Client_createProducerAsync) .def("subscribe", &Client_subscribe) .def("subscribe_topics", &Client_subscribe_topics) .def("subscribe_pattern", &Client_subscribe_pattern) @@ -79,5 +92,6 @@ void export_client(py::module_& m) { .def("get_topic_partitions", &Client_getTopicPartitions) .def("get_schema_info", &Client_getSchemaInfo) .def("close", &Client_close) + .def("close_async", &Client_closeAsync) .def("shutdown", &Client::shutdown); } diff --git a/src/producer.cc b/src/producer.cc index 7027185..9b38016 100644 --- a/src/producer.cc +++ b/src/producer.cc @@ -46,6 +46,11 @@ void Producer_close(Producer& producer) { waitForAsyncResult([&](ResultCallback callback) { producer.closeAsync(callback); }); } +void Producer_closeAsync(Producer& producer, ResultCallback callback) { + py::gil_scoped_release release; + producer.closeAsync(callback); +} + void export_producer(py::module_& m) { using namespace py; @@ -76,5 +81,6 @@ void export_producer(py::module_& m) { "Flush all the messages buffered in the client and wait until all messages have been\n" "successfully persisted\n") .def("close", &Producer_close) + .def("close_async", &Producer_closeAsync) .def("is_connected", &Producer::isConnected); } diff --git a/tests/asyncio_test.py b/tests/asyncio_test.py new file mode 100644 index 0000000..fb02c99 --- /dev/null +++ b/tests/asyncio_test.py @@ -0,0 +1,86 @@ +#!/usr/bin/env python3 +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import asyncio +import pulsar +from pulsar.asyncio import ( + Client, + PulsarException, +) +from unittest import ( + main, + IsolatedAsyncioTestCase, +) + +service_url = 'pulsar://localhost:6650' + +class AsyncioTest(IsolatedAsyncioTestCase): + + async def asyncSetUp(self) -> None: + self._client = Client(service_url) + + async def asyncTearDown(self) -> None: + await self._client.close() + + async def test_batch_send(self): + producer = await self._client.create_producer('awaitio-test-batch-send') + async with asyncio.TaskGroup() as tg: + tasks = [] + for i in range(5): + tasks.append(tg.create_task(producer.send(f'msg-{i}'.encode()))) + msg_ids = await asyncio.gather(*tasks) + self.assertEqual(len(msg_ids), 5) + ledger_id = msg_ids[0].ledger_id() + entry_id = msg_ids[0].entry_id() + # These messages should be in the same entry + for i in range(5): + msg_id = msg_ids[i] + print(f'{i} was sent to {msg_id}') + self.assertIsInstance(msg_id, pulsar.MessageId) + self.assertEqual(msg_ids[i].ledger_id(), ledger_id) + self.assertEqual(msg_ids[i].entry_id(), entry_id) + self.assertEqual(msg_ids[i].batch_index(), i) + + async def test_create_producer_failure(self): + try: + await self._client.create_producer('tenant/ns/awaitio-test-send-failure') + self.fail() + except PulsarException as e: + self.assertEqual(e.error(), pulsar.Result.TopicNotFound) + + async def test_send_failure(self): + producer = await self._client.create_producer('awaitio-test-send-failure') + try: + await producer.send(('x' * 1024 * 1024 * 10).encode()) + self.fail() + except PulsarException as e: + self.assertEqual(e.error(), pulsar.Result.MessageTooBig) + + async def test_close_producer(self): + producer = await self._client.create_producer('awaitio-test-close-producer') + await producer.close() + try: + await producer.close() + self.fail() + except PulsarException as e: + self.assertEqual(e.error(), pulsar.Result.AlreadyClosed) + +if __name__ == '__main__': + main() diff --git a/tests/run-unit-tests.sh b/tests/run-unit-tests.sh index 5168f94..ea0b450 100755 --- a/tests/run-unit-tests.sh +++ b/tests/run-unit-tests.sh @@ -26,3 +26,4 @@ cd $ROOT_DIR/tests python3 custom_logger_test.py python3 interrupted_test.py python3 pulsar_test.py +python3 asyncio_test.py From feaa63187e2c8ff74e09896063659c3fd65c5ae3 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 29 Dec 2023 17:23:52 +0800 Subject: [PATCH 2/4] Fix tests error --- tests/asyncio_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/asyncio_test.py b/tests/asyncio_test.py index fb02c99..4fa3035 100644 --- a/tests/asyncio_test.py +++ b/tests/asyncio_test.py @@ -63,7 +63,7 @@ async def test_create_producer_failure(self): await self._client.create_producer('tenant/ns/awaitio-test-send-failure') self.fail() except PulsarException as e: - self.assertEqual(e.error(), pulsar.Result.TopicNotFound) + self.assertEqual(e.error(), pulsar.Result.AuthorizationError) async def test_send_failure(self): producer = await self._client.create_producer('awaitio-test-send-failure') From 42a0e2c7045540266bf77acef3d7e837b0bd54f1 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 29 Dec 2023 17:47:53 +0800 Subject: [PATCH 3/4] Fix incompatibility with Python 3.8 --- pulsar/asyncio.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar/asyncio.py b/pulsar/asyncio.py index 562ea43..445d477 100644 --- a/pulsar/asyncio.py +++ b/pulsar/asyncio.py @@ -163,7 +163,7 @@ async def close(self) -> None: self._client.close_async(functools.partial(_set_future, future, value=None)) await future -def _set_future(future: asyncio.Future[Any], result: _pulsar.Result, value: Any): +def _set_future(future: asyncio.Future, result: _pulsar.Result, value: Any): def complete(): if result == _pulsar.Result.Ok: future.set_result(value) From 2cbae0c8080c87fc58cc83c3e1aff42440512471 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 29 Dec 2023 18:04:46 +0800 Subject: [PATCH 4/4] Fix TaskGroup not found in Python 3.8 --- tests/asyncio_test.py | 31 +++++++++++++++---------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/tests/asyncio_test.py b/tests/asyncio_test.py index 4fa3035..5478b60 100644 --- a/tests/asyncio_test.py +++ b/tests/asyncio_test.py @@ -41,22 +41,21 @@ async def asyncTearDown(self) -> None: async def test_batch_send(self): producer = await self._client.create_producer('awaitio-test-batch-send') - async with asyncio.TaskGroup() as tg: - tasks = [] - for i in range(5): - tasks.append(tg.create_task(producer.send(f'msg-{i}'.encode()))) - msg_ids = await asyncio.gather(*tasks) - self.assertEqual(len(msg_ids), 5) - ledger_id = msg_ids[0].ledger_id() - entry_id = msg_ids[0].entry_id() - # These messages should be in the same entry - for i in range(5): - msg_id = msg_ids[i] - print(f'{i} was sent to {msg_id}') - self.assertIsInstance(msg_id, pulsar.MessageId) - self.assertEqual(msg_ids[i].ledger_id(), ledger_id) - self.assertEqual(msg_ids[i].entry_id(), entry_id) - self.assertEqual(msg_ids[i].batch_index(), i) + tasks = [] + for i in range(5): + tasks.append(asyncio.create_task(producer.send(f'msg-{i}'.encode()))) + msg_ids = await asyncio.gather(*tasks) + self.assertEqual(len(msg_ids), 5) + ledger_id = msg_ids[0].ledger_id() + entry_id = msg_ids[0].entry_id() + # These messages should be in the same entry + for i in range(5): + msg_id = msg_ids[i] + print(f'{i} was sent to {msg_id}') + self.assertIsInstance(msg_id, pulsar.MessageId) + self.assertEqual(msg_ids[i].ledger_id(), ledger_id) + self.assertEqual(msg_ids[i].entry_id(), entry_id) + self.assertEqual(msg_ids[i].batch_index(), i) async def test_create_producer_failure(self): try: