Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pulsar/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ def serialize(self):
"""
return self._msg_id.serialize()

def __str__(self) -> str:
"""
Returns the string representation of the message id.
"""
return str(self._msg_id)

@staticmethod
def deserialize(message_id_bytes):
"""
Expand Down
172 changes: 172 additions & 0 deletions pulsar/asyncio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

"""
The Pulsar Python client APIs that work with the asyncio module.
"""

import asyncio
import functools
from typing import Any

import _pulsar
import pulsar

class PulsarException(BaseException):
"""
The exception that wraps the Pulsar error code
"""

def __init__(self, result: pulsar.Result) -> None:
"""
Create the Pulsar exception.

Parameters
----------
result: pulsar.Result
The error code of the underlying Pulsar APIs.
"""
self._result = result

def error(self) -> pulsar.Result:
"""
Returns the Pulsar error code.
"""
return self._result

def __str__(self):
"""
Convert the exception to string.
"""
return f'{self._result.value} {self._result.name}'

class Producer:
"""
The Pulsar message producer, used to publish messages on a topic.
"""

def __init__(self, producer: _pulsar.Producer) -> None:
"""
Create the producer.
Users should not call this constructor directly. Instead, create the
producer via `Client.create_producer`.

Parameters
----------
producer: _pulsar.Producer
The underlying Producer object from the C extension.
"""
self._producer: _pulsar.Producer = producer

async def send(self, content: bytes) -> pulsar.MessageId:
"""
Send a message asynchronously.

parameters
----------
content: bytes
The message payload

Returns
-------
pulsar.MessageId
The message id that represents the persisted position of the message.

Raises
------
PulsarException
"""
builder = _pulsar.MessageBuilder()
builder.content(content)
future = asyncio.get_running_loop().create_future()
self._producer.send_async(builder.build(), functools.partial(_set_future, future))
msg_id = await future
return pulsar.MessageId(
msg_id.partition(),
msg_id.ledger_id(),
msg_id.entry_id(),
msg_id.batch_index(),
)

async def close(self) -> None:
"""
Close the producer.

Raises
------
PulsarException
"""
future = asyncio.get_running_loop().create_future()
self._producer.close_async(functools.partial(_set_future, future, value=None))
await future

class Client:
"""
The asynchronous version of `pulsar.Client`.
"""

def __init__(self, service_url, **kwargs) -> None:
"""
See `pulsar.Client.__init__`
"""
self._client: _pulsar.Client = pulsar.Client(service_url, **kwargs)._client

async def create_producer(self, topic: str) -> Producer:
"""
Create a new producer on a given topic

Parameters
----------
topic: str
The topic name

Returns
-------
Producer
The producer created

Raises
------
PulsarException
"""
future = asyncio.get_running_loop().create_future()
conf = _pulsar.ProducerConfiguration()
# TODO: add more configs
self._client.create_producer_async(topic, conf, functools.partial(_set_future, future))
return Producer(await future)

async def close(self) -> None:
"""
Close the client and all the associated producers and consumers

Raises
------
PulsarException
"""
future = asyncio.get_running_loop().create_future()
self._client.close_async(functools.partial(_set_future, future, value=None))
await future

def _set_future(future: asyncio.Future, result: _pulsar.Result, value: Any):
def complete():
if result == _pulsar.Result.Ok:
future.set_result(value)
else:
future.set_exception(PulsarException(result))
future.get_loop().call_soon_threadsafe(complete)
14 changes: 14 additions & 0 deletions src/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
#include "utils.h"

#include <pybind11/functional.h>
#include <pybind11/pybind11.h>
#include <pybind11/stl.h>

Expand All @@ -28,6 +29,12 @@ Producer Client_createProducer(Client& client, const std::string& topic, const P
[&](CreateProducerCallback callback) { client.createProducerAsync(topic, conf, callback); });
}

void Client_createProducerAsync(Client& client, const std::string& topic, ProducerConfiguration conf,
CreateProducerCallback callback) {
py::gil_scoped_release release;
client.createProducerAsync(topic, conf, callback);
}

Consumer Client_subscribe(Client& client, const std::string& topic, const std::string& subscriptionName,
const ConsumerConfiguration& conf) {
return waitForAsyncValue<Consumer>(
Expand Down Expand Up @@ -68,16 +75,23 @@ void Client_close(Client& client) {
waitForAsyncResult([&](ResultCallback callback) { client.closeAsync(callback); });
}

void Client_closeAsync(Client& client, ResultCallback callback) {
py::gil_scoped_release release;
client.closeAsync(callback);
}

void export_client(py::module_& m) {
py::class_<Client, std::shared_ptr<Client>>(m, "Client")
.def(py::init<const std::string&, const ClientConfiguration&>())
.def("create_producer", &Client_createProducer)
.def("create_producer_async", &Client_createProducerAsync)
.def("subscribe", &Client_subscribe)
.def("subscribe_topics", &Client_subscribe_topics)
.def("subscribe_pattern", &Client_subscribe_pattern)
.def("create_reader", &Client_createReader)
.def("get_topic_partitions", &Client_getTopicPartitions)
.def("get_schema_info", &Client_getSchemaInfo)
.def("close", &Client_close)
.def("close_async", &Client_closeAsync)
.def("shutdown", &Client::shutdown);
}
6 changes: 6 additions & 0 deletions src/producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ void Producer_close(Producer& producer) {
waitForAsyncResult([&](ResultCallback callback) { producer.closeAsync(callback); });
}

void Producer_closeAsync(Producer& producer, ResultCallback callback) {
py::gil_scoped_release release;
producer.closeAsync(callback);
}

void export_producer(py::module_& m) {
using namespace py;

Expand Down Expand Up @@ -76,5 +81,6 @@ void export_producer(py::module_& m) {
"Flush all the messages buffered in the client and wait until all messages have been\n"
"successfully persisted\n")
.def("close", &Producer_close)
.def("close_async", &Producer_closeAsync)
.def("is_connected", &Producer::isConnected);
}
85 changes: 85 additions & 0 deletions tests/asyncio_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
#!/usr/bin/env python3
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

import asyncio
import pulsar
from pulsar.asyncio import (
Client,
PulsarException,
)
from unittest import (
main,
IsolatedAsyncioTestCase,
)

service_url = 'pulsar://localhost:6650'

class AsyncioTest(IsolatedAsyncioTestCase):

async def asyncSetUp(self) -> None:
self._client = Client(service_url)

async def asyncTearDown(self) -> None:
await self._client.close()

async def test_batch_send(self):
producer = await self._client.create_producer('awaitio-test-batch-send')
tasks = []
for i in range(5):
tasks.append(asyncio.create_task(producer.send(f'msg-{i}'.encode())))
msg_ids = await asyncio.gather(*tasks)
self.assertEqual(len(msg_ids), 5)
ledger_id = msg_ids[0].ledger_id()
entry_id = msg_ids[0].entry_id()
# These messages should be in the same entry
for i in range(5):
msg_id = msg_ids[i]
print(f'{i} was sent to {msg_id}')
self.assertIsInstance(msg_id, pulsar.MessageId)
self.assertEqual(msg_ids[i].ledger_id(), ledger_id)
self.assertEqual(msg_ids[i].entry_id(), entry_id)
self.assertEqual(msg_ids[i].batch_index(), i)

async def test_create_producer_failure(self):
try:
await self._client.create_producer('tenant/ns/awaitio-test-send-failure')
self.fail()
except PulsarException as e:
self.assertEqual(e.error(), pulsar.Result.AuthorizationError)

async def test_send_failure(self):
producer = await self._client.create_producer('awaitio-test-send-failure')
try:
await producer.send(('x' * 1024 * 1024 * 10).encode())
self.fail()
except PulsarException as e:
self.assertEqual(e.error(), pulsar.Result.MessageTooBig)

async def test_close_producer(self):
producer = await self._client.create_producer('awaitio-test-close-producer')
await producer.close()
try:
await producer.close()
self.fail()
except PulsarException as e:
self.assertEqual(e.error(), pulsar.Result.AlreadyClosed)

if __name__ == '__main__':
main()
1 change: 1 addition & 0 deletions tests/run-unit-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ cd $ROOT_DIR/tests
python3 custom_logger_test.py
python3 interrupted_test.py
python3 pulsar_test.py
python3 asyncio_test.py