3232 deprecated ,
3333 PreviewWarning ,
3434)
35+ from ..._util import ContextBool
3536from ..._work import Query
3637from ...api import (
3738 Bookmarks ,
@@ -100,6 +101,10 @@ class AsyncSession(AsyncWorkspace):
100101 # The state this session is in.
101102 _state_failed = False
102103
104+ _config : SessionConfig
105+ _bookmark_manager : t .Optional [Bookmarks ]
106+ _pipelined_begin : ContextBool
107+
103108 def __init__ (self , pool , session_config ):
104109 assert isinstance (session_config , SessionConfig )
105110 if session_config .auth is not None :
@@ -115,6 +120,7 @@ def __init__(self, pool, session_config):
115120 self ._config = session_config
116121 self ._initialize_bookmarks (session_config .bookmarks )
117122 self ._bookmark_manager = session_config .bookmark_manager
123+ self ._pipelined_begin = ContextBool ()
118124
119125 async def __aenter__ (self ) -> AsyncSession :
120126 return self
@@ -421,6 +427,7 @@ async def _open_transaction(
421427 bookmarks , access_mode , metadata , timeout ,
422428 self ._config .notifications_min_severity ,
423429 self ._config .notifications_disabled_categories ,
430+ pipelined = self ._pipelined_begin
424431 )
425432
426433 async def begin_transaction (
@@ -480,9 +487,15 @@ async def begin_transaction(
480487
481488 return t .cast (AsyncTransaction , self ._transaction )
482489
490+
483491 async def _run_transaction (
484- self , access_mode , transaction_function , * args , ** kwargs
485- ):
492+ self ,
493+ access_mode : str ,
494+ transaction_function : t .Callable [
495+ te .Concatenate [AsyncManagedTransaction , _P ], t .Awaitable [_R ]
496+ ],
497+ * args : _P .args , ** kwargs : _P .kwargs
498+ ) -> _R :
486499 self ._check_state ()
487500 if not callable (transaction_function ):
488501 raise TypeError ("Unit of work is not callable" )
@@ -498,7 +511,7 @@ async def _run_transaction(
498511
499512 errors = []
500513
501- t0 = - 1 # Timer
514+ t0 : float = - 1 # Timer
502515
503516 while True :
504517 try :
@@ -507,6 +520,7 @@ async def _run_transaction(
507520 access_mode = access_mode , metadata = metadata ,
508521 timeout = timeout
509522 )
523+ assert isinstance (self ._transaction , AsyncManagedTransaction )
510524 tx = self ._transaction
511525 try :
512526 result = await transaction_function (tx , * args , ** kwargs )
0 commit comments