1313* limitations under the License.
1414*/
1515
16+ using System ;
17+ using System . Threading ;
18+ using System . Threading . Tasks ;
1619using MongoDB . Bson ;
1720using MongoDB . Driver . Core . Clusters ;
1821using MongoDB . Driver . Core . Misc ;
@@ -27,7 +30,9 @@ namespace MongoDB.Driver.Core.Bindings
2730 public sealed class CoreSession : ICoreSession
2831 {
2932 // private fields
33+ private readonly ICluster _cluster ;
3034 private readonly IClusterClock _clusterClock = new ClusterClock ( ) ;
35+ private CoreTransaction _currentTransaction ;
3136 private bool _disposed ;
3237 private readonly IOperationClock _operationClock = new OperationClock ( ) ;
3338 private readonly CoreSessionOptions _options ;
@@ -37,20 +42,29 @@ public sealed class CoreSession : ICoreSession
3742 /// <summary>
3843 /// Initializes a new instance of the <see cref="CoreSession" /> class.
3944 /// </summary>
45+ /// <param name="cluster">The cluster.</param>
4046 /// <param name="serverSession">The server session.</param>
4147 /// <param name="options">The options.</param>
4248 public CoreSession (
49+ ICluster cluster ,
4350 ICoreServerSession serverSession ,
4451 CoreSessionOptions options )
4552 {
53+ _cluster = Ensure . IsNotNull ( cluster , nameof ( cluster ) ) ;
4654 _serverSession = Ensure . IsNotNull ( serverSession , nameof ( serverSession ) ) ;
4755 _options = Ensure . IsNotNull ( options , nameof ( options ) ) ;
4856 }
4957
5058 // public properties
59+ /// <inheritdoc />
60+ public ICluster Cluster => _cluster ;
61+
5162 /// <inheritdoc />
5263 public BsonDocument ClusterTime => _clusterClock . ClusterTime ;
5364
65+ /// <inheritdoc />
66+ public CoreTransaction CurrentTransaction => _currentTransaction ;
67+
5468 /// <inheritdoc />
5569 public BsonDocument Id => _serverSession . Id ;
5670
@@ -60,6 +74,9 @@ public CoreSession(
6074 /// <inheritdoc />
6175 public bool IsImplicit => _options . IsImplicit ;
6276
77+ /// <inheritdoc />
78+ public bool IsInTransaction => _currentTransaction != null ;
79+
6380 /// <inheritdoc />
6481 public BsonTimestamp OperationTime => _operationClock . OperationTime ;
6582
@@ -70,6 +87,100 @@ public CoreSession(
7087 public ICoreServerSession ServerSession => _serverSession ;
7188
7289 // public methods
90+ /// <inheritdoc />
91+ public void AbortTransaction ( CancellationToken cancellationToken = default ( CancellationToken ) )
92+ {
93+ EnsureIsInTransaction ( nameof ( AbortTransaction ) ) ;
94+
95+ try
96+ {
97+ if ( _currentTransaction . StatementId == 0 )
98+ {
99+ return ;
100+ }
101+
102+ try
103+ {
104+ var firstAttempt = CreateAbortTransactionOperation ( ) ;
105+ ExecuteEndTransactionOnPrimary ( firstAttempt , cancellationToken ) ;
106+ return ;
107+ }
108+ catch ( Exception exception ) when ( ShouldIgnoreAbortTransactionException ( exception ) )
109+ {
110+ return ; // ignore exception and return
111+ }
112+ catch ( Exception exception ) when ( ShouldRetryEndTransactionException ( exception ) )
113+ {
114+ // ignore exception and retry
115+ }
116+ catch
117+ {
118+ return ; // ignore exception and return
119+ }
120+
121+ try
122+ {
123+ var secondAttempt = CreateAbortTransactionOperation ( ) ;
124+ ExecuteEndTransactionOnPrimary ( secondAttempt , cancellationToken ) ;
125+ }
126+ catch
127+ {
128+ return ; // ignore exception and return
129+ }
130+ }
131+ finally
132+ {
133+ _currentTransaction = null ;
134+ }
135+ }
136+
137+ /// <inheritdoc />
138+ public async Task AbortTransactionAsync ( CancellationToken cancellationToken = default ( CancellationToken ) )
139+ {
140+ EnsureIsInTransaction ( nameof ( AbortTransaction ) ) ;
141+
142+ try
143+ {
144+ if ( _currentTransaction . StatementId == 0 )
145+ {
146+ return ;
147+ }
148+
149+ try
150+ {
151+ var firstAttempt = CreateAbortTransactionOperation ( ) ;
152+ await ExecuteEndTransactionOnPrimaryAsync ( firstAttempt , cancellationToken ) . ConfigureAwait ( false ) ;
153+ return ;
154+ }
155+ catch ( Exception exception ) when ( ShouldIgnoreAbortTransactionException ( exception ) )
156+ {
157+ return ; // ignore exception and return
158+ }
159+ catch ( Exception exception ) when ( ShouldRetryEndTransactionException ( exception ) )
160+ {
161+ // ignore exception and retry
162+ }
163+ catch
164+ {
165+ return ; // ignore exception and return
166+ }
167+
168+ try
169+ {
170+ var secondAttempt = CreateAbortTransactionOperation ( ) ;
171+ await ExecuteEndTransactionOnPrimaryAsync ( secondAttempt , cancellationToken ) . ConfigureAwait ( false ) ;
172+ }
173+ catch
174+ {
175+ return ; // ignore exception and return
176+ }
177+ }
178+ finally
179+ {
180+ _currentTransaction = null ;
181+ }
182+ }
183+
73184 /// <inheritdoc />
74185 public void AdvanceClusterTime ( BsonDocument newClusterTime )
75186 {
@@ -88,20 +199,175 @@ public long AdvanceTransactionNumber()
88199 return _serverSession . AdvanceTransactionNumber ( ) ;
89200 }
90201
202+ /// <inheritdoc />
203+ public void CommitTransaction ( CancellationToken cancellationToken = default ( CancellationToken ) )
204+ {
205+ EnsureIsInTransaction ( nameof ( CommitTransaction ) ) ;
206+
207+ try
208+ {
209+ if ( _currentTransaction . StatementId == 0 )
210+ {
211+ return ;
212+ }
213+
214+ try
215+ {
216+ var firstAttempt = CreateCommitTransactionOperation ( ) ;
217+ ExecuteEndTransactionOnPrimary ( firstAttempt , cancellationToken ) ;
218+ return ;
219+ }
220+ catch ( Exception exception ) when ( ShouldRetryEndTransactionException ( exception ) )
221+ {
222+ // ignore exception and retry
223+ }
224+
225+ var secondAttempt = CreateCommitTransactionOperation ( ) ;
226+ ExecuteEndTransactionOnPrimary ( secondAttempt , cancellationToken ) ;
227+ }
228+ finally
229+ {
230+ _currentTransaction = null ;
231+ }
232+ }
233+
234+ /// <inheritdoc />
235+ public async Task CommitTransactionAsync ( CancellationToken cancellationToken = default ( CancellationToken ) )
236+ {
237+ EnsureIsInTransaction ( nameof ( CommitTransaction ) ) ;
238+
239+ try
240+ {
241+ if ( _currentTransaction . StatementId == 0 )
242+ {
243+ return ;
244+ }
245+
246+ try
247+ {
248+ var firstAttempt = CreateCommitTransactionOperation ( ) ;
249+ await ExecuteEndTransactionOnPrimaryAsync ( firstAttempt , cancellationToken ) . ConfigureAwait ( false ) ;
250+ return ;
251+ }
252+ catch ( Exception exception ) when ( ShouldRetryEndTransactionException ( exception ) )
253+ {
254+ // ignore exception and retry
255+ }
256+
257+ var secondAttempt = CreateCommitTransactionOperation ( ) ;
258+ await ExecuteEndTransactionOnPrimaryAsync ( secondAttempt , cancellationToken ) . ConfigureAwait ( false ) ;
259+ }
260+ finally
261+ {
262+ _currentTransaction = null ;
263+ }
264+ }
265+
91266 /// <inheritdoc />
92267 public void Dispose ( )
93268 {
94269 if ( ! _disposed )
95270 {
271+ if ( _currentTransaction != null )
272+ {
273+ try
274+ {
275+ AbortTransaction ( CancellationToken . None ) ;
276+ }
277+ catch
278+ {
279+ // ignore exceptions
280+ }
281+ }
282+
96283 _serverSession . Dispose ( ) ;
97284 _disposed = true ;
98285 }
99286 }
100287
288+ /// <inheritdoc />
289+ public void StartTransaction ( TransactionOptions transactionOptions = null )
290+ {
291+ if ( _currentTransaction != null )
292+ {
293+ throw new InvalidOperationException ( "Transaction already in progress." ) ;
294+ }
295+
296+ var transactionNumber = AdvanceTransactionNumber ( ) ;
297+ var readConcern = transactionOptions ? . ReadConcern ?? _options . DefaultTransactionOptions ? . ReadConcern ?? ReadConcern . Default ;
298+ var writeConcern = transactionOptions ? . WriteConcern ?? _options . DefaultTransactionOptions ? . WriteConcern ?? new WriteConcern ( ) ;
299+ var effectiveTransactionOptions = new TransactionOptions ( readConcern , writeConcern ) ;
300+ var transaction = new CoreTransaction ( transactionNumber , effectiveTransactionOptions ) ;
301+
302+ _currentTransaction = transaction ;
303+ }
304+
101305 /// <inheritdoc />
102306 public void WasUsed ( )
103307 {
104308 _serverSession . WasUsed ( ) ;
105309 }
310+
311+ // private methods
312+ private IReadOperation < BsonDocument > CreateAbortTransactionOperation ( )
313+ {
314+ return new AbortTransactionOperation ( GetTransactionWriteConcern ( ) ) ;
315+ }
316+
317+ private IReadOperation < BsonDocument > CreateCommitTransactionOperation ( )
318+ {
319+ return new CommitTransactionOperation ( GetTransactionWriteConcern ( ) ) ;
320+ }
321+
322+ private void EnsureIsInTransaction ( string methodName )
323+ {
324+ this . AutoStartTransactionIfApplicable ( ) ;
325+ if ( _currentTransaction == null )
326+ {
327+ throw new InvalidOperationException ( "No transaction started." ) ;
328+ }
329+ }
330+
331+ private TResult ExecuteEndTransactionOnPrimary < TResult > ( IReadOperation < TResult > operation , CancellationToken cancellationToken )
332+ {
333+ using ( var sessionHandle = new NonDisposingCoreSessionHandle ( this ) )
334+ using ( var binding = new WritableServerBinding ( _cluster , sessionHandle ) )
335+ {
336+ return operation . Execute ( binding , cancellationToken ) ;
337+ }
338+ }
339+
340+ private async Task < TResult > ExecuteEndTransactionOnPrimaryAsync < TResult > ( IReadOperation < TResult > operation , CancellationToken cancellationToken )
341+ {
342+ using ( var sessionHandle = new NonDisposingCoreSessionHandle ( this ) )
343+ using ( var binding = new WritableServerBinding ( _cluster , sessionHandle ) )
344+ {
345+ return await operation . ExecuteAsync ( binding , cancellationToken ) . ConfigureAwait ( false ) ;
346+ }
347+ }
348+
349+ private WriteConcern GetTransactionWriteConcern ( )
350+ {
351+ return
352+ _currentTransaction . TransactionOptions ? . WriteConcern ??
353+ _options . DefaultTransactionOptions ? . WriteConcern ??
354+ WriteConcern . WMajority ;
355+ }
356+
357+ private bool ShouldIgnoreAbortTransactionException ( Exception exception )
358+ {
359+ var commandException = exception as MongoCommandException ;
360+ if ( commandException != null )
361+ {
362+ return true ;
363+ }
364+
365+ return false ;
366+ }
367+
368+ private bool ShouldRetryEndTransactionException ( Exception exception )
369+ {
370+ return RetryabilityHelper . IsRetryableWriteException ( exception ) ;
371+ }
106372 }
107373}
0 commit comments