Skip to content

Commit 4b61547

Browse files
[asyncio] Support creating producer and sending messages (#189)
Master issue: #55 ### Modifications Introduce a `pulsar.asyncio` module that includes the asynchronous APIs to work with Python asyncio module. Example: ```python async def main(): client = Client('pulsar://localhost:6650') try: producer = await client.create_producer('topic') msg_id = await producer.send('msg'.encode()) await producer.close() except PulsarException as e: error = e.error() await client.close() ``` The creation of `Client` reuses the same keyword arguments from `pulsar.Client` but the options for creating producer and sending messages are not added yet.
1 parent 9ee23b8 commit 4b61547

File tree

6 files changed

+284
-0
lines changed

6 files changed

+284
-0
lines changed

pulsar/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,12 @@ def serialize(self):
105105
"""
106106
return self._msg_id.serialize()
107107

108+
def __str__(self) -> str:
109+
"""
110+
Returns the string representation of the message id.
111+
"""
112+
return str(self._msg_id)
113+
108114
@staticmethod
109115
def deserialize(message_id_bytes):
110116
"""

pulsar/asyncio.py

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
#
19+
20+
"""
21+
The Pulsar Python client APIs that work with the asyncio module.
22+
"""
23+
24+
import asyncio
25+
import functools
26+
from typing import Any
27+
28+
import _pulsar
29+
import pulsar
30+
31+
class PulsarException(BaseException):
32+
"""
33+
The exception that wraps the Pulsar error code
34+
"""
35+
36+
def __init__(self, result: pulsar.Result) -> None:
37+
"""
38+
Create the Pulsar exception.
39+
40+
Parameters
41+
----------
42+
result: pulsar.Result
43+
The error code of the underlying Pulsar APIs.
44+
"""
45+
self._result = result
46+
47+
def error(self) -> pulsar.Result:
48+
"""
49+
Returns the Pulsar error code.
50+
"""
51+
return self._result
52+
53+
def __str__(self):
54+
"""
55+
Convert the exception to string.
56+
"""
57+
return f'{self._result.value} {self._result.name}'
58+
59+
class Producer:
60+
"""
61+
The Pulsar message producer, used to publish messages on a topic.
62+
"""
63+
64+
def __init__(self, producer: _pulsar.Producer) -> None:
65+
"""
66+
Create the producer.
67+
Users should not call this constructor directly. Instead, create the
68+
producer via `Client.create_producer`.
69+
70+
Parameters
71+
----------
72+
producer: _pulsar.Producer
73+
The underlying Producer object from the C extension.
74+
"""
75+
self._producer: _pulsar.Producer = producer
76+
77+
async def send(self, content: bytes) -> pulsar.MessageId:
78+
"""
79+
Send a message asynchronously.
80+
81+
parameters
82+
----------
83+
content: bytes
84+
The message payload
85+
86+
Returns
87+
-------
88+
pulsar.MessageId
89+
The message id that represents the persisted position of the message.
90+
91+
Raises
92+
------
93+
PulsarException
94+
"""
95+
builder = _pulsar.MessageBuilder()
96+
builder.content(content)
97+
future = asyncio.get_running_loop().create_future()
98+
self._producer.send_async(builder.build(), functools.partial(_set_future, future))
99+
msg_id = await future
100+
return pulsar.MessageId(
101+
msg_id.partition(),
102+
msg_id.ledger_id(),
103+
msg_id.entry_id(),
104+
msg_id.batch_index(),
105+
)
106+
107+
async def close(self) -> None:
108+
"""
109+
Close the producer.
110+
111+
Raises
112+
------
113+
PulsarException
114+
"""
115+
future = asyncio.get_running_loop().create_future()
116+
self._producer.close_async(functools.partial(_set_future, future, value=None))
117+
await future
118+
119+
class Client:
120+
"""
121+
The asynchronous version of `pulsar.Client`.
122+
"""
123+
124+
def __init__(self, service_url, **kwargs) -> None:
125+
"""
126+
See `pulsar.Client.__init__`
127+
"""
128+
self._client: _pulsar.Client = pulsar.Client(service_url, **kwargs)._client
129+
130+
async def create_producer(self, topic: str) -> Producer:
131+
"""
132+
Create a new producer on a given topic
133+
134+
Parameters
135+
----------
136+
topic: str
137+
The topic name
138+
139+
Returns
140+
-------
141+
Producer
142+
The producer created
143+
144+
Raises
145+
------
146+
PulsarException
147+
"""
148+
future = asyncio.get_running_loop().create_future()
149+
conf = _pulsar.ProducerConfiguration()
150+
# TODO: add more configs
151+
self._client.create_producer_async(topic, conf, functools.partial(_set_future, future))
152+
return Producer(await future)
153+
154+
async def close(self) -> None:
155+
"""
156+
Close the client and all the associated producers and consumers
157+
158+
Raises
159+
------
160+
PulsarException
161+
"""
162+
future = asyncio.get_running_loop().create_future()
163+
self._client.close_async(functools.partial(_set_future, future, value=None))
164+
await future
165+
166+
def _set_future(future: asyncio.Future, result: _pulsar.Result, value: Any):
167+
def complete():
168+
if result == _pulsar.Result.Ok:
169+
future.set_result(value)
170+
else:
171+
future.set_exception(PulsarException(result))
172+
future.get_loop().call_soon_threadsafe(complete)

src/client.cc

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
#include "utils.h"
2020

21+
#include <pybind11/functional.h>
2122
#include <pybind11/pybind11.h>
2223
#include <pybind11/stl.h>
2324

@@ -28,6 +29,12 @@ Producer Client_createProducer(Client& client, const std::string& topic, const P
2829
[&](CreateProducerCallback callback) { client.createProducerAsync(topic, conf, callback); });
2930
}
3031

32+
void Client_createProducerAsync(Client& client, const std::string& topic, ProducerConfiguration conf,
33+
CreateProducerCallback callback) {
34+
py::gil_scoped_release release;
35+
client.createProducerAsync(topic, conf, callback);
36+
}
37+
3138
Consumer Client_subscribe(Client& client, const std::string& topic, const std::string& subscriptionName,
3239
const ConsumerConfiguration& conf) {
3340
return waitForAsyncValue<Consumer>(
@@ -68,16 +75,23 @@ void Client_close(Client& client) {
6875
waitForAsyncResult([&](ResultCallback callback) { client.closeAsync(callback); });
6976
}
7077

78+
void Client_closeAsync(Client& client, ResultCallback callback) {
79+
py::gil_scoped_release release;
80+
client.closeAsync(callback);
81+
}
82+
7183
void export_client(py::module_& m) {
7284
py::class_<Client, std::shared_ptr<Client>>(m, "Client")
7385
.def(py::init<const std::string&, const ClientConfiguration&>())
7486
.def("create_producer", &Client_createProducer)
87+
.def("create_producer_async", &Client_createProducerAsync)
7588
.def("subscribe", &Client_subscribe)
7689
.def("subscribe_topics", &Client_subscribe_topics)
7790
.def("subscribe_pattern", &Client_subscribe_pattern)
7891
.def("create_reader", &Client_createReader)
7992
.def("get_topic_partitions", &Client_getTopicPartitions)
8093
.def("get_schema_info", &Client_getSchemaInfo)
8194
.def("close", &Client_close)
95+
.def("close_async", &Client_closeAsync)
8296
.def("shutdown", &Client::shutdown);
8397
}

src/producer.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,11 @@ void Producer_close(Producer& producer) {
4646
waitForAsyncResult([&](ResultCallback callback) { producer.closeAsync(callback); });
4747
}
4848

49+
void Producer_closeAsync(Producer& producer, ResultCallback callback) {
50+
py::gil_scoped_release release;
51+
producer.closeAsync(callback);
52+
}
53+
4954
void export_producer(py::module_& m) {
5055
using namespace py;
5156

@@ -76,5 +81,6 @@ void export_producer(py::module_& m) {
7681
"Flush all the messages buffered in the client and wait until all messages have been\n"
7782
"successfully persisted\n")
7883
.def("close", &Producer_close)
84+
.def("close_async", &Producer_closeAsync)
7985
.def("is_connected", &Producer::isConnected);
8086
}

tests/asyncio_test.py

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
#!/usr/bin/env python3
2+
#
3+
# Licensed to the Apache Software Foundation (ASF) under one
4+
# or more contributor license agreements. See the NOTICE file
5+
# distributed with this work for additional information
6+
# regarding copyright ownership. The ASF licenses this file
7+
# to you under the Apache License, Version 2.0 (the
8+
# "License"); you may not use this file except in compliance
9+
# with the License. You may obtain a copy of the License at
10+
#
11+
# http://www.apache.org/licenses/LICENSE-2.0
12+
#
13+
# Unless required by applicable law or agreed to in writing,
14+
# software distributed under the License is distributed on an
15+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
# KIND, either express or implied. See the License for the
17+
# specific language governing permissions and limitations
18+
# under the License.
19+
#
20+
21+
import asyncio
22+
import pulsar
23+
from pulsar.asyncio import (
24+
Client,
25+
PulsarException,
26+
)
27+
from unittest import (
28+
main,
29+
IsolatedAsyncioTestCase,
30+
)
31+
32+
service_url = 'pulsar://localhost:6650'
33+
34+
class AsyncioTest(IsolatedAsyncioTestCase):
35+
36+
async def asyncSetUp(self) -> None:
37+
self._client = Client(service_url)
38+
39+
async def asyncTearDown(self) -> None:
40+
await self._client.close()
41+
42+
async def test_batch_send(self):
43+
producer = await self._client.create_producer('awaitio-test-batch-send')
44+
tasks = []
45+
for i in range(5):
46+
tasks.append(asyncio.create_task(producer.send(f'msg-{i}'.encode())))
47+
msg_ids = await asyncio.gather(*tasks)
48+
self.assertEqual(len(msg_ids), 5)
49+
ledger_id = msg_ids[0].ledger_id()
50+
entry_id = msg_ids[0].entry_id()
51+
# These messages should be in the same entry
52+
for i in range(5):
53+
msg_id = msg_ids[i]
54+
print(f'{i} was sent to {msg_id}')
55+
self.assertIsInstance(msg_id, pulsar.MessageId)
56+
self.assertEqual(msg_ids[i].ledger_id(), ledger_id)
57+
self.assertEqual(msg_ids[i].entry_id(), entry_id)
58+
self.assertEqual(msg_ids[i].batch_index(), i)
59+
60+
async def test_create_producer_failure(self):
61+
try:
62+
await self._client.create_producer('tenant/ns/awaitio-test-send-failure')
63+
self.fail()
64+
except PulsarException as e:
65+
self.assertEqual(e.error(), pulsar.Result.AuthorizationError)
66+
67+
async def test_send_failure(self):
68+
producer = await self._client.create_producer('awaitio-test-send-failure')
69+
try:
70+
await producer.send(('x' * 1024 * 1024 * 10).encode())
71+
self.fail()
72+
except PulsarException as e:
73+
self.assertEqual(e.error(), pulsar.Result.MessageTooBig)
74+
75+
async def test_close_producer(self):
76+
producer = await self._client.create_producer('awaitio-test-close-producer')
77+
await producer.close()
78+
try:
79+
await producer.close()
80+
self.fail()
81+
except PulsarException as e:
82+
self.assertEqual(e.error(), pulsar.Result.AlreadyClosed)
83+
84+
if __name__ == '__main__':
85+
main()

tests/run-unit-tests.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,4 @@ cd $ROOT_DIR/tests
2626
python3 custom_logger_test.py
2727
python3 interrupted_test.py
2828
python3 pulsar_test.py
29+
python3 asyncio_test.py

0 commit comments

Comments
 (0)