|
25 | 25 | from typing_extensions import ParamSpec, Self, TypeAlias |
26 | 26 |
|
27 | 27 | from taskiq.abc.middleware import TaskiqMiddleware |
| 28 | +from taskiq.abc.serializer import TaskiqSerializer |
28 | 29 | from taskiq.acks import AckableMessage |
29 | 30 | from taskiq.decor import AsyncTaskiqDecoratedTask |
30 | 31 | from taskiq.events import TaskiqEvents |
31 | | -from taskiq.formatters.json_formatter import JSONFormatter |
| 32 | +from taskiq.formatters.proxy_formatter import ProxyFormatter |
32 | 33 | from taskiq.message import BrokerMessage |
33 | 34 | from taskiq.result_backends.dummy import DummyResultBackend |
| 35 | +from taskiq.serializers.json_serializer import JSONSerializer |
34 | 36 | from taskiq.state import TaskiqState |
35 | 37 | from taskiq.utils import maybe_awaitable, remove_suffix |
36 | 38 | from taskiq.warnings import TaskiqDeprecationWarning |
@@ -97,7 +99,8 @@ def __init__( |
97 | 99 | self.middlewares: "List[TaskiqMiddleware]" = [] |
98 | 100 | self.result_backend = result_backend |
99 | 101 | self.decorator_class = AsyncTaskiqDecoratedTask |
100 | | - self.formatter: "TaskiqFormatter" = JSONFormatter() |
| 102 | + self.serializer: TaskiqSerializer = JSONSerializer() |
| 103 | + self.formatter: "TaskiqFormatter" = ProxyFormatter(self) |
101 | 104 | self.id_generator = task_id_generator |
102 | 105 | self.local_task_registry: Dict[str, AsyncTaskiqDecoratedTask[Any, Any]] = {} |
103 | 106 | # Every event has a list of handlers. |
@@ -479,6 +482,19 @@ def with_event_handlers( |
479 | 482 | self.event_handlers[event].extend(handlers) |
480 | 483 | return self |
481 | 484 |
|
| 485 | + def with_serializer( |
| 486 | + self, |
| 487 | + serializer: TaskiqSerializer, |
| 488 | + ) -> "Self": # pragma: no cover |
| 489 | + """ |
| 490 | + Set a new serializer and return an updated broker. |
| 491 | +
|
| 492 | + :param serializer: new serializer. |
| 493 | + :return: self |
| 494 | + """ |
| 495 | + self.serializer = serializer |
| 496 | + return self |
| 497 | + |
482 | 498 | def _register_task( |
483 | 499 | self, |
484 | 500 | task_name: str, |
|
0 commit comments