1616use Exception ;
1717use Laudis \Neo4j \Common \GeneratorHelper ;
1818use Laudis \Neo4j \Common \Neo4jLogger ;
19- use Laudis \Neo4j \Common \TransactionHelper ;
2019use Laudis \Neo4j \Contracts \ConnectionPoolInterface ;
20+ use Laudis \Neo4j \Contracts \CypherSequence ;
2121use Laudis \Neo4j \Contracts \SessionInterface ;
2222use Laudis \Neo4j \Contracts \TransactionInterface ;
2323use Laudis \Neo4j \Contracts \UnmanagedTransactionInterface ;
@@ -41,6 +41,7 @@ final class Session implements SessionInterface
4141{
4242 /** @psalm-readonly */
4343 private readonly BookmarkHolder $ bookmarkHolder ;
44+ private const ROLLBACK_CLASSIFICATIONS = ['ClientError ' , 'TransientError ' , 'DatabaseError ' ];
4445
4546 /**
4647 * @param ConnectionPool|Neo4jConnectionPool $pool
@@ -100,21 +101,59 @@ public function writeTransaction(callable $tsxHandler, ?TransactionConfiguration
100101 $ this ->getLogger ()?->log(LogLevel::INFO , 'Beginning write transaction ' , ['config ' => $ config ]);
101102 $ config = $ this ->mergeTsxConfig ($ config );
102103
103- return TransactionHelper::retry (
104- fn () => $ this ->startTransaction ($ config , $ this ->config ->withAccessMode (AccessMode::WRITE ())),
105- $ tsxHandler
106- );
104+ return $ this ->retry ($ tsxHandler , false , $ config );
107105 }
108106
109107 public function readTransaction (callable $ tsxHandler , ?TransactionConfiguration $ config = null )
110108 {
111109 $ this ->getLogger ()?->log(LogLevel::INFO , 'Beginning read transaction ' , ['config ' => $ config ]);
112110 $ config = $ this ->mergeTsxConfig ($ config );
113111
114- return TransactionHelper::retry (
115- fn () => $ this ->startTransaction ($ config , $ this ->config ->withAccessMode (AccessMode::READ ())),
116- $ tsxHandler
117- );
112+ return $ this ->retry ($ tsxHandler , true , $ config );
113+ }
114+
115+ /**
116+ * @template U
117+ *
118+ * @param callable(TransactionInterface):U $tsxHandler
119+ *
120+ * @return U
121+ */
122+ private function retry (callable $ tsxHandler , bool $ read , TransactionConfiguration $ config )
123+ {
124+ while (true ) {
125+ $ transaction = null ;
126+ try {
127+ if ($ read ) {
128+ $ transaction = $ this ->startTransaction ($ config , $ this ->config ->withAccessMode (AccessMode::READ ()));
129+ } else {
130+ $ transaction = $ this ->startTransaction ($ config , $ this ->config ->withAccessMode (AccessMode::WRITE ()));
131+ }
132+ $ tbr = $ tsxHandler ($ transaction );
133+ self ::triggerLazyResult ($ tbr );
134+ $ transaction ->commit ();
135+
136+ return $ tbr ;
137+ } catch (Neo4jException $ e ) {
138+ if ($ transaction && !in_array ($ e ->getClassification (), self ::ROLLBACK_CLASSIFICATIONS )) {
139+ $ transaction ->rollback ();
140+ }
141+
142+ if ($ e ->getTitle () === 'NotALeader ' ) {
143+ // By closing the pool, we force the connection to be re-acquired and the routing table to be refetched
144+ $ this ->pool ->close ();
145+ } elseif ($ e ->getClassification () !== 'TransientError ' ) {
146+ throw $ e ;
147+ }
148+ }
149+ }
150+ }
151+
152+ private static function triggerLazyResult (mixed $ tbr ): void
153+ {
154+ if ($ tbr instanceof CypherSequence) {
155+ $ tbr ->preload ();
156+ }
118157 }
119158
120159 public function transaction (callable $ tsxHandler , ?TransactionConfiguration $ config = null )
0 commit comments