Skip to content

Commit f5f85cb

Browse files
committed
Add basic implementation of amqp_params
1 parent 717e80b commit f5f85cb

File tree

8 files changed

+18
-3
lines changed

8 files changed

+18
-3
lines changed

.devcontainer/devcontainer.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
"github.vscode-github-actions"
2626
],
2727
"settings": {
28+
"python.defaultInterpreterPath": "/workspaces/${localWorkspaceFolderBasename}/.venv/bin/python",
2829
"yaml.schemas": {
2930
"https://asyncapi.com/schema-store/3.0.0-without-$id.json": [
3031
"file:///workspaces/asyncapi-python/examples/*.yaml"

src/asyncapi_python/amqp/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@
1919
from .operation import Operation
2020
from .utils import union_model
2121
from .error import Rejection, RejectedError
22+
from .params import AmqpParams
2223

2324
__all__ = [
2425
"channel_pool",
26+
"AmqpParams",
2527
"AmqpPool",
2628
"BaseApplication",
2729
"Router",

src/asyncapi_python/amqp/base_application.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from .endpoint import EndpointParams
2424
from .connection import channel_pool
2525
from .utils import encode_message, decode_message
26+
from .params import AmqpParams
2627
from typing import Generic, Optional, TypeVar
2728

2829

@@ -61,6 +62,7 @@ def __init__(
6162
amqp_uri: str,
6263
producer_factory: type[P],
6364
consumer_factory: type[C],
65+
amqp_params: AmqpParams,
6466
):
6567
self.__params = EndpointParams(
6668
pool=channel_pool(amqp_uri),
@@ -69,6 +71,7 @@ def __init__(
6971
register_correlation_id=self.__register_correlation_id,
7072
stop_application=self.stop,
7173
app_id=str(uuid4()),
74+
amqp_params=amqp_params,
7275
)
7376
self.__reply_futures: dict[
7477
str,

src/asyncapi_python/amqp/connection.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515

1616
from functools import cache
17+
from typing import TypeAlias
1718
from aio_pika.robust_connection import (
1819
AbstractRobustConnection,
1920
AbstractRobustChannel,

src/asyncapi_python/amqp/endpoint/base.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
from ..error import Rejection, RejectedError
3232
from ..connection import AmqpPool
3333
from ..operation import Operation
34+
from ..params import AmqpParams
3435
from aio_pika.abc import (
3536
AbstractRobustChannel,
3637
AbstractRobustQueue,
@@ -64,6 +65,7 @@ class EndpointParams:
6465
]
6566
app_id: str
6667
stop_application: Callable[[], Awaitable[None]]
68+
amqp_params: AmqpParams
6769

6870
@property
6971
def reply_queue_name(self) -> str:

src/asyncapi_python/amqp/endpoint/receiver.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ async def start(self) -> None:
4545
print("start", self._op)
4646
if self._fn:
4747
async with self._params.pool.acquire() as ch:
48+
if self._params.amqp_params.prefetch_count:
49+
await ch.set_qos(prefetch_count=self._params.amqp_params.prefetch_count)
4850
q = self._queue = await self._declare(ch)
4951
self._consumer_tag = await q.consume(self._consumer)
5052
return

src/asyncapi_python/amqp/params.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
from typing import TypedDict
2+
3+
class AmqpParams(TypedDict, total=False):
4+
prefetch_count: str

src/asyncapi_python_codegen/generators/amqp/templates/application.py.j2

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@
1313
limitations under the License. #}
1414
from .consumer import _Router_0 as Consumer
1515
from .producer import _Router_0 as Producer
16-
from asyncapi_python.amqp.base_application import BaseApplication
16+
from asyncapi_python.amqp import BaseApplication, AmqpParams
1717

1818

1919
class Application(BaseApplication[Producer, Consumer]):
20-
def __init__(self, amqp_uri: str):
21-
super().__init__(amqp_uri, Producer, Consumer)
20+
def __init__(self, amqp_uri: str, amqp_params: AmqpParams = {}):
21+
super().__init__(amqp_uri, Producer, Consumer, amqp_params)

0 commit comments

Comments
 (0)