From e60704f6b619e0d55416062e6a13177f5975db8f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Max=20K=C3=BChn?= Date: Mon, 8 Sep 2025 23:47:40 +0200 Subject: [PATCH 01/13] draft for shared tee buffer --- asyncstdlib/itertools.py | 106 +++++++++++++++++++++------------------ 1 file changed, 57 insertions(+), 49 deletions(-) diff --git a/asyncstdlib/itertools.py b/asyncstdlib/itertools.py index febd591..250de21 100644 --- a/asyncstdlib/itertools.py +++ b/asyncstdlib/itertools.py @@ -8,7 +8,6 @@ Union, Callable, Optional, - Deque, Generic, Iterable, Iterator, @@ -17,7 +16,7 @@ overload, AsyncGenerator, ) -from collections import deque +from typing_extensions import TypeAlias from ._typing import ACloseable, R, T, AnyIterable, ADD from ._utility import public_module @@ -32,6 +31,7 @@ enumerate as aenumerate, iter as aiter, ) +from itertools import count as _count S = TypeVar("S") T_co = TypeVar("T_co", covariant=True) @@ -346,45 +346,52 @@ async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: return None -async def tee_peer( - iterator: AsyncIterator[T], - # the buffer specific to this peer - buffer: Deque[T], - # the buffers of all peers, including our own - peers: List[Deque[T]], - lock: AsyncContextManager[Any], -) -> AsyncGenerator[T, None]: - """An individual iterator of a :py:func:`~.tee`""" - try: - while True: - if not buffer: - async with lock: - # Another peer produced an item while we were waiting for the lock. - # Proceed with the next loop iteration to yield the item. - if buffer: - continue - try: - item = await iterator.__anext__() - except StopAsyncIteration: - break - else: - # Append to all buffers, including our own. We'll fetch our - # item from the buffer again, instead of yielding it directly. - # This ensures the proper item ordering if any of our peers - # are fetching items concurrently. They may have buffered their - # item already. - for peer_buffer in peers: - peer_buffer.append(item) - yield buffer.popleft() - finally: - # this peer is done – remove its buffer - for idx, peer_buffer in enumerate(peers): # pragma: no branch - if peer_buffer is buffer: - peers.pop(idx) - break - # if we are the last peer, try and close the iterator - if not peers and isinstance(iterator, ACloseable): - await iterator.aclose() +_get_tee_index = _count().__next__ + + +Node: TypeAlias = "list[T | Node[T]]" + + +class TeePeer(Generic[T]): + def __init__( + self, + iterator: AsyncIterator[T], + buffer: "Node[T]", + lock: AsyncContextManager[Any], + tee_peers: "set[int]", + ) -> None: + self.iterator = iterator + self.lock = lock + self.buffer: Node[T] = buffer + self.tee_peers = tee_peers + self.tee_idx = _get_tee_index() + self.tee_peers.add(self.tee_idx) + + def __aiter__(self): + return self + + async def __anext__(self) -> T: + # the buffer is a singly-linked list as [value, [value, [...]]] | [] + next_node = self.buffer + value: T + # for any most advanced TeePeer, the node is just [] + # fetch the next value so we can mutate the node to [value, [...]] + if not next_node: + async with self.lock: + # Check if another peer produced an item while we were waiting for the lock + if not next_node: + next_node[:] = await self.iterator.__anext__(), [] + # for any other TeePeer, the node is already some [value, [...]] + value, self.buffer = next_node # type: ignore + return value + + async def aclose(self) -> None: + self.tee_peers.discard(self.tee_idx) + if not self.tee_peers and isinstance(self.iterator, ACloseable): + await self.iterator.aclose() + + def __del__(self) -> None: + self.tee_peers.discard(self.tee_idx) @public_module(__name__, "tee") @@ -426,7 +433,7 @@ async def derivative(sensor_data): and access is automatically synchronised. """ - __slots__ = ("_iterator", "_buffers", "_children") + __slots__ = ("_iterator", "_buffer", "_children") def __init__( self, @@ -436,15 +443,16 @@ def __init__( lock: Optional[AsyncContextManager[Any]] = None, ): self._iterator = aiter(iterable) - self._buffers: List[Deque[T]] = [deque() for _ in range(n)] + self._buffer: Node[T] = [] + peers: set[int] = set() self._children = tuple( - tee_peer( - iterator=self._iterator, - buffer=buffer, - peers=self._buffers, - lock=lock if lock is not None else NoLock(), + TeePeer( + self._iterator, + self._buffer, + lock if lock is not None else NoLock(), + peers, ) - for buffer in self._buffers + for _ in range(n) ) def __len__(self) -> int: From aa140d0f3a715b2171bd6256a91de27f1acdebf2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Max=20K=C3=BChn?= Date: Tue, 9 Sep 2025 15:40:39 +0200 Subject: [PATCH 02/13] do not expose Tee Node type --- asyncstdlib/itertools.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/asyncstdlib/itertools.py b/asyncstdlib/itertools.py index 250de21..82836d1 100644 --- a/asyncstdlib/itertools.py +++ b/asyncstdlib/itertools.py @@ -349,20 +349,20 @@ async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: _get_tee_index = _count().__next__ -Node: TypeAlias = "list[T | Node[T]]" +_TeeNode: TypeAlias = "list[T | _TeeNode[T]]" class TeePeer(Generic[T]): def __init__( self, iterator: AsyncIterator[T], - buffer: "Node[T]", + buffer: "_TeeNode[T]", lock: AsyncContextManager[Any], tee_peers: "set[int]", ) -> None: self.iterator = iterator self.lock = lock - self.buffer: Node[T] = buffer + self.buffer: _TeeNode[T] = buffer self.tee_peers = tee_peers self.tee_idx = _get_tee_index() self.tee_peers.add(self.tee_idx) @@ -443,7 +443,7 @@ def __init__( lock: Optional[AsyncContextManager[Any]] = None, ): self._iterator = aiter(iterable) - self._buffer: Node[T] = [] + self._buffer: _TeeNode[T] = [] peers: set[int] = set() self._children = tuple( TeePeer( From 55044ec0141c387d5da2bd4c75d98feb932e64d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Max=20K=C3=BChn?= Date: Tue, 9 Sep 2025 15:46:31 +0200 Subject: [PATCH 03/13] tee parent does not need to own peer resources --- asyncstdlib/itertools.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/asyncstdlib/itertools.py b/asyncstdlib/itertools.py index 82836d1..bd1c3b1 100644 --- a/asyncstdlib/itertools.py +++ b/asyncstdlib/itertools.py @@ -433,7 +433,7 @@ async def derivative(sensor_data): and access is automatically synchronised. """ - __slots__ = ("_iterator", "_buffer", "_children") + __slots__ = ("_children",) def __init__( self, @@ -442,13 +442,13 @@ def __init__( *, lock: Optional[AsyncContextManager[Any]] = None, ): - self._iterator = aiter(iterable) - self._buffer: _TeeNode[T] = [] + iterator = aiter(iterable) + buffer: _TeeNode[T] = [] peers: set[int] = set() self._children = tuple( TeePeer( - self._iterator, - self._buffer, + iterator, + buffer, lock if lock is not None else NoLock(), peers, ) From 55af50c9d1f35bae47936e9b985aecc0719e6237 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Max=20K=C3=BChn?= Date: Tue, 9 Sep 2025 21:00:10 +0200 Subject: [PATCH 04/13] avoid name clash --- asyncstdlib/itertools.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/asyncstdlib/itertools.py b/asyncstdlib/itertools.py index bd1c3b1..589be95 100644 --- a/asyncstdlib/itertools.py +++ b/asyncstdlib/itertools.py @@ -31,7 +31,7 @@ enumerate as aenumerate, iter as aiter, ) -from itertools import count as _count +from itertools import count as _counter S = TypeVar("S") T_co = TypeVar("T_co", covariant=True) @@ -346,7 +346,7 @@ async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: return None -_get_tee_index = _count().__next__ +_get_tee_index = _counter().__next__ _TeeNode: TypeAlias = "list[T | _TeeNode[T]]" From ce5d591a47a389a04b8fa27c0e33b59065705ad9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Max=20K=C3=BChn?= Date: Tue, 9 Sep 2025 21:05:25 +0200 Subject: [PATCH 05/13] do not expose internal data --- asyncstdlib/itertools.py | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/asyncstdlib/itertools.py b/asyncstdlib/itertools.py index 589be95..42ce4df 100644 --- a/asyncstdlib/itertools.py +++ b/asyncstdlib/itertools.py @@ -360,38 +360,38 @@ def __init__( lock: AsyncContextManager[Any], tee_peers: "set[int]", ) -> None: - self.iterator = iterator - self.lock = lock - self.buffer: _TeeNode[T] = buffer - self.tee_peers = tee_peers - self.tee_idx = _get_tee_index() - self.tee_peers.add(self.tee_idx) + self._iterator = iterator + self._lock = lock + self._buffer: _TeeNode[T] = buffer + self._tee_peers = tee_peers + self._tee_idx = _get_tee_index() + self._tee_peers.add(self._tee_idx) def __aiter__(self): return self async def __anext__(self) -> T: # the buffer is a singly-linked list as [value, [value, [...]]] | [] - next_node = self.buffer + next_node = self._buffer value: T # for any most advanced TeePeer, the node is just [] # fetch the next value so we can mutate the node to [value, [...]] if not next_node: - async with self.lock: + async with self._lock: # Check if another peer produced an item while we were waiting for the lock if not next_node: - next_node[:] = await self.iterator.__anext__(), [] + next_node[:] = await self._iterator.__anext__(), [] # for any other TeePeer, the node is already some [value, [...]] - value, self.buffer = next_node # type: ignore + value, self._buffer = next_node # type: ignore return value async def aclose(self) -> None: - self.tee_peers.discard(self.tee_idx) - if not self.tee_peers and isinstance(self.iterator, ACloseable): - await self.iterator.aclose() + self._tee_peers.discard(self._tee_idx) + if not self._tee_peers and isinstance(self._iterator, ACloseable): + await self._iterator.aclose() def __del__(self) -> None: - self.tee_peers.discard(self.tee_idx) + self._tee_peers.discard(self._tee_idx) @public_module(__name__, "tee") From b7306004b95943ac64aeccc411dfc35b876ba131 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Max=20K=C3=BChn?= Date: Tue, 9 Sep 2025 22:19:15 +0200 Subject: [PATCH 06/13] concurrent tee iterators are safe and ordered --- asyncstdlib/itertools.py | 16 +++++++++++++++- unittests/test_itertools.py | 38 ++++++++++++++++++++++++++++++++++++- 2 files changed, 52 insertions(+), 2 deletions(-) diff --git a/asyncstdlib/itertools.py b/asyncstdlib/itertools.py index 42ce4df..418593c 100644 --- a/asyncstdlib/itertools.py +++ b/asyncstdlib/itertools.py @@ -380,11 +380,25 @@ async def __anext__(self) -> T: async with self._lock: # Check if another peer produced an item while we were waiting for the lock if not next_node: - next_node[:] = await self._iterator.__anext__(), [] + await self._extend_buffer(next_node) + if not next_node: + raise StopAsyncIteration() # for any other TeePeer, the node is already some [value, [...]] value, self._buffer = next_node # type: ignore return value + async def _extend_buffer(self, next_node: "_TeeNode[T]") -> None: + """Extend the buffer by fetching a new item from the iterable""" + try: + next_value = await self._iterator.__anext__() + except StopAsyncIteration: + return + # another peer may have filled the buffer while we waited + # seek the last node that needs to be filled + while next_node: + _, next_node = next_node # type: ignore + next_node[:] = next_value, [] + async def aclose(self) -> None: self._tee_peers.discard(self._tee_idx) if not self._tee_peers and isinstance(self._iterator, ACloseable): diff --git a/unittests/test_itertools.py b/unittests/test_itertools.py index 82e4e7a..4e3b913 100644 --- a/unittests/test_itertools.py +++ b/unittests/test_itertools.py @@ -1,3 +1,4 @@ +from typing import AsyncIterator import itertools import sys import platform @@ -341,7 +342,7 @@ async def test_tee(): @sync async def test_tee_concurrent_locked(): - """Test that properly uses a lock for synchronisation""" + """Test that tee properly uses a lock for synchronisation""" items = [1, 2, 3, -5, 12, 78, -1, 111] async def iter_values(): @@ -393,6 +394,41 @@ async def test_peer(peer_tee): await test_peer(this) +@pytest.mark.parametrize("size", [2, 3, 5, 9, 12]) +@sync +async def test_tee_concurrent_ordering(size: int): + """Test that tee respects concurrent ordering for all peers""" + + class ConcurrentInvertedIterable: + """Helper that concurrently iterates with earlier items taking longer""" + + def __init__(self, count: int) -> None: + self.count = count + self._counter = itertools.count() + + def __aiter__(self): + return self + + async def __anext__(self): + value = next(self._counter) + if value >= self.count: + raise StopAsyncIteration() + await Switch(self.count - value) + return value + + async def test_peer(peer_tee: AsyncIterator[int]): + # consume items from the tee with a delay so that slower items can arrive + seen_items: list[int] = [] + async for item in peer_tee: + seen_items.append(item) + await Switch() + assert seen_items == expected_items + + expected_items = list(range(size)[::-1]) + peers = a.tee(ConcurrentInvertedIterable(size), n=size) + await Schedule(*map(test_peer, peers)) + + @sync async def test_pairwise(): assert await a.list(a.pairwise(range(5))) == [(0, 1), (1, 2), (2, 3), (3, 4)] From a04917365eafba8af544f78e2b9d828fa8eb86f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Max=20K=C3=BChn?= Date: Tue, 9 Sep 2025 22:28:05 +0200 Subject: [PATCH 07/13] directly handle stopiteration --- asyncstdlib/itertools.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/asyncstdlib/itertools.py b/asyncstdlib/itertools.py index 418593c..5228343 100644 --- a/asyncstdlib/itertools.py +++ b/asyncstdlib/itertools.py @@ -381,8 +381,6 @@ async def __anext__(self) -> T: # Check if another peer produced an item while we were waiting for the lock if not next_node: await self._extend_buffer(next_node) - if not next_node: - raise StopAsyncIteration() # for any other TeePeer, the node is already some [value, [...]] value, self._buffer = next_node # type: ignore return value @@ -390,14 +388,17 @@ async def __anext__(self) -> T: async def _extend_buffer(self, next_node: "_TeeNode[T]") -> None: """Extend the buffer by fetching a new item from the iterable""" try: + # another peer may fill the buffer while we wait here next_value = await self._iterator.__anext__() except StopAsyncIteration: - return - # another peer may have filled the buffer while we waited - # seek the last node that needs to be filled - while next_node: - _, next_node = next_node # type: ignore - next_node[:] = next_value, [] + # no one else managed to fetch a value either + if not next_node: + raise + else: + # seek the last node that needs to be filled + while next_node: + _, next_node = next_node # type: ignore + next_node[:] = next_value, [] async def aclose(self) -> None: self._tee_peers.discard(self._tee_idx) From 28797c12171dfd179d10a101a96631e34e31eefb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Max=20K=C3=BChn?= Date: Wed, 10 Sep 2025 18:42:20 +0200 Subject: [PATCH 08/13] clarify why we do this --- asyncstdlib/itertools.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/asyncstdlib/itertools.py b/asyncstdlib/itertools.py index 5228343..d50b78a 100644 --- a/asyncstdlib/itertools.py +++ b/asyncstdlib/itertools.py @@ -395,7 +395,7 @@ async def _extend_buffer(self, next_node: "_TeeNode[T]") -> None: if not next_node: raise else: - # seek the last node that needs to be filled + # skip nodes that were filled in the meantime while next_node: _, next_node = next_node # type: ignore next_node[:] = next_value, [] From 0c7e0ddb6ce61be104623624c00c04521d2b73a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Max=20K=C3=BChn?= Date: Sat, 13 Sep 2025 23:06:59 +0200 Subject: [PATCH 09/13] tees share their buffer --- asyncstdlib/itertools.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/asyncstdlib/itertools.py b/asyncstdlib/itertools.py index d50b78a..3624df2 100644 --- a/asyncstdlib/itertools.py +++ b/asyncstdlib/itertools.py @@ -457,9 +457,16 @@ def __init__( *, lock: Optional[AsyncContextManager[Any]] = None, ): - iterator = aiter(iterable) - buffer: _TeeNode[T] = [] - peers: set[int] = set() + buffer: _TeeNode[T] + peers: set[int] + if not isinstance(iterable, TeePeer): + iterator = aiter(iterable) + buffer = [] + peers = set() + else: + iterator = iterable._iterator # pyright: ignore[reportPrivateUsage] + buffer = iterable._buffer # pyright: ignore[reportPrivateUsage] + peers = iterable._tee_peers # pyright: ignore[reportPrivateUsage] self._children = tuple( TeePeer( iterator, From 3306fad2cecc6ec6d348c6d6fc9aede6e8d752e0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Max=20K=C3=BChn?= Date: Sat, 13 Sep 2025 23:10:11 +0200 Subject: [PATCH 10/13] restructure docs for clariy --- asyncstdlib/itertools.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/asyncstdlib/itertools.py b/asyncstdlib/itertools.py index 3624df2..970fac8 100644 --- a/asyncstdlib/itertools.py +++ b/asyncstdlib/itertools.py @@ -429,16 +429,12 @@ async def derivative(sensor_data): await a.anext(previous) # advance one iterator return a.map(operator.sub, previous, current) - Unlike :py:func:`itertools.tee`, :py:func:`~.tee` returns a custom type instead - of a :py:class:`tuple`. Like a tuple, it can be indexed, iterated and unpacked - to get the child iterators. In addition, its :py:meth:`~.tee.aclose` method - immediately closes all children, and it can be used in an ``async with`` context - for the same effect. + ``tee`` must internally buffer each item until the last iterator has yielded it; + if the most and least advanced iterator differ by most data, + using a :py:class:`list` is more efficient (but not lazy). If ``iterable`` is an iterator and read elsewhere, ``tee`` will *not* - provide these items. Also, ``tee`` must internally buffer each item until the - last iterator has yielded it; if the most and least advanced iterator differ - by most data, using a :py:class:`list` is more efficient (but not lazy). + provide these items. If the underlying iterable is concurrency safe (``anext`` may be awaited concurrently) the resulting iterators are concurrency safe as well. Otherwise, @@ -446,6 +442,12 @@ async def derivative(sensor_data): To enforce sequential use of ``anext``, provide a ``lock`` - e.g. an :py:class:`asyncio.Lock` instance in an :py:mod:`asyncio` application - and access is automatically synchronised. + + Unlike :py:func:`itertools.tee`, :py:func:`~.tee` returns a custom type instead + of a :py:class:`tuple`. Like a tuple, it can be indexed, iterated and unpacked + to get the child iterators. In addition, its :py:meth:`~.tee.aclose` method + immediately closes all children, and it can be used in an ``async with`` context + for the same effect. """ __slots__ = ("_children",) From e34d926f60da49b86c2eaca1e31b01b522daeffa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Max=20K=C3=BChn?= Date: Sat, 13 Sep 2025 23:17:24 +0200 Subject: [PATCH 11/13] document buffer sharing --- asyncstdlib/itertools.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/asyncstdlib/itertools.py b/asyncstdlib/itertools.py index 970fac8..2363984 100644 --- a/asyncstdlib/itertools.py +++ b/asyncstdlib/itertools.py @@ -411,7 +411,7 @@ def __del__(self) -> None: @public_module(__name__, "tee") class Tee(Generic[T]): - """ + r""" Create ``n`` separate asynchronous iterators over ``iterable`` This splits a single ``iterable`` into multiple iterators, each providing @@ -433,8 +433,9 @@ async def derivative(sensor_data): if the most and least advanced iterator differ by most data, using a :py:class:`list` is more efficient (but not lazy). - If ``iterable`` is an iterator and read elsewhere, ``tee`` will *not* - provide these items. + If ``iterable`` is an iterator and read elsewhere, ``tee`` will generally *not* + provide these items. However, a ``tee`` of a ``tee`` shares its buffer with parent, + sibling and child ``tee``\ s so that each sees the same items. If the underlying iterable is concurrency safe (``anext`` may be awaited concurrently) the resulting iterators are concurrency safe as well. Otherwise, From 2d35a57b47a17605189947c6cac1c907428c1025 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Max=20K=C3=BChn?= Date: Sat, 13 Sep 2025 23:20:57 +0200 Subject: [PATCH 12/13] rework buffering advice since list isn't comparable for async --- asyncstdlib/itertools.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/asyncstdlib/itertools.py b/asyncstdlib/itertools.py index 2363984..e47b4de 100644 --- a/asyncstdlib/itertools.py +++ b/asyncstdlib/itertools.py @@ -418,7 +418,7 @@ class Tee(Generic[T]): the same items in the same order. All child iterators may advance separately but share the same items from ``iterable`` -- when the most advanced iterator retrieves an item, - it is buffered until the least advanced iterator has yielded it as well. + it is buffered until all other iterators have yielded it as well. A ``tee`` works lazily and can handle an infinite ``iterable``, provided that all iterators advance. @@ -429,10 +429,6 @@ async def derivative(sensor_data): await a.anext(previous) # advance one iterator return a.map(operator.sub, previous, current) - ``tee`` must internally buffer each item until the last iterator has yielded it; - if the most and least advanced iterator differ by most data, - using a :py:class:`list` is more efficient (but not lazy). - If ``iterable`` is an iterator and read elsewhere, ``tee`` will generally *not* provide these items. However, a ``tee`` of a ``tee`` shares its buffer with parent, sibling and child ``tee``\ s so that each sees the same items. From f44c81f0f5d95c8b30b17215ac677354f55b26b8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Max=20K=C3=BChn?= Date: Sat, 13 Sep 2025 23:28:43 +0200 Subject: [PATCH 13/13] add version notice --- docs/source/api/itertools.rst | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/source/api/itertools.rst b/docs/source/api/itertools.rst index c61b750..ccfc50d 100644 --- a/docs/source/api/itertools.rst +++ b/docs/source/api/itertools.rst @@ -85,6 +85,10 @@ Iterator splitting The ``lock`` keyword parameter. + .. versionchanged:: 3.13.2 + + ``tee``\ s share their buffer with parents, siblings and children. + .. autofunction:: pairwise(iterable: (async) iter T) :async-for: :(T, T)