|
16 | 16 | use Exception; |
17 | 17 | use Laudis\Neo4j\Common\GeneratorHelper; |
18 | 18 | use Laudis\Neo4j\Common\Neo4jLogger; |
19 | | -use Laudis\Neo4j\Common\TransactionHelper; |
20 | 19 | use Laudis\Neo4j\Contracts\ConnectionPoolInterface; |
| 20 | +use Laudis\Neo4j\Contracts\CypherSequence; |
21 | 21 | use Laudis\Neo4j\Contracts\SessionInterface; |
22 | 22 | use Laudis\Neo4j\Contracts\TransactionInterface; |
23 | 23 | use Laudis\Neo4j\Contracts\UnmanagedTransactionInterface; |
|
39 | 39 | */ |
40 | 40 | final class Session implements SessionInterface |
41 | 41 | { |
| 42 | + private const ROLLBACK_CLASSIFICATIONS = ['ClientError', 'TransientError', 'DatabaseError']; |
| 43 | + |
42 | 44 | /** @var list<BoltConnection> */ |
43 | 45 | private array $usedConnections = []; |
44 | 46 | /** @psalm-readonly */ |
@@ -101,23 +103,58 @@ public function writeTransaction(callable $tsxHandler, ?TransactionConfiguration |
101 | 103 | $this->getLogger()?->log(LogLevel::INFO, 'Beginning write transaction', ['config' => $config]); |
102 | 104 | $config = $this->mergeTsxConfig($config); |
103 | 105 |
|
104 | | - return TransactionHelper::retry( |
105 | | - fn () => $this->startTransaction($config, $this->config->withAccessMode(AccessMode::WRITE())), |
106 | | - $tsxHandler |
107 | | - ); |
| 106 | + return $this->retry($tsxHandler, false, $config); |
108 | 107 | } |
109 | 108 |
|
110 | 109 | public function readTransaction(callable $tsxHandler, ?TransactionConfiguration $config = null) |
111 | 110 | { |
112 | 111 | $this->getLogger()?->log(LogLevel::INFO, 'Beginning read transaction', ['config' => $config]); |
113 | 112 | $config = $this->mergeTsxConfig($config); |
114 | 113 |
|
115 | | - return TransactionHelper::retry( |
116 | | - fn () => $this->startTransaction($config, $this->config->withAccessMode(AccessMode::READ())), |
117 | | - $tsxHandler |
118 | | - ); |
| 114 | + return $this->retry($tsxHandler, false, $config); |
| 115 | + } |
| 116 | + /** |
| 117 | + * @template U |
| 118 | + * |
| 119 | + * @param callable(TransactionInterface):U $tsxHandler |
| 120 | + * |
| 121 | + * @return U |
| 122 | + */ |
| 123 | + private function retry(callable $tsxHandler, bool $read, TransactionConfiguration $config) |
| 124 | + { |
| 125 | + while (true) { |
| 126 | + $transaction = null; |
| 127 | + try { |
| 128 | + if ($read) { |
| 129 | + $transaction = $this->startTransaction($config, $this->config->withAccessMode(AccessMode::READ())); |
| 130 | + } else { |
| 131 | + $transaction = $this->startTransaction($config, $this->config->withAccessMode(AccessMode::WRITE())); |
| 132 | + } |
| 133 | + $tbr = $tsxHandler($transaction); |
| 134 | + self::triggerLazyResult($tbr); |
| 135 | + $transaction->commit(); |
| 136 | + |
| 137 | + return $tbr; |
| 138 | + } catch (Neo4jException $e) { |
| 139 | + if ($transaction && !in_array($e->getClassification(), self::ROLLBACK_CLASSIFICATIONS)) { |
| 140 | + $transaction->rollback(); |
| 141 | + } |
| 142 | + |
| 143 | + if ($e->getTitle() === 'NotALeader') { |
| 144 | + // By closing the pool, we force the connection to be re-acquired and the routing table to be refetched |
| 145 | + $this->pool->close(); |
| 146 | + } elseif ($e->getClassification() !== 'TransientError') { |
| 147 | + throw $e; |
| 148 | + } |
| 149 | + } |
| 150 | + } |
| 151 | + } |
| 152 | + private static function triggerLazyResult(mixed $tbr): void |
| 153 | + { |
| 154 | + if ($tbr instanceof CypherSequence) { |
| 155 | + $tbr->preload(); |
| 156 | + } |
119 | 157 | } |
120 | | - |
121 | 158 | public function transaction(callable $tsxHandler, ?TransactionConfiguration $config = null) |
122 | 159 | { |
123 | 160 | return $this->writeTransaction($tsxHandler, $config); |
|
0 commit comments