11package com .arangodb .springframework .transaction ;
22
33import com .arangodb .ArangoDatabase ;
4- import com .arangodb .DbName ;
54import com .arangodb .entity .StreamTransactionEntity ;
65import com .arangodb .entity .StreamTransactionStatus ;
76import com .arangodb .model .StreamTransactionOptions ;
1211import org .springframework .transaction .TransactionDefinition ;
1312import org .springframework .transaction .interceptor .TransactionAttribute ;
1413import org .springframework .transaction .support .SmartTransactionObject ;
14+ import org .springframework .transaction .support .TransactionSynchronizationUtils ;
1515
1616import java .util .Collection ;
17+ import java .util .Collections ;
1718import java .util .HashSet ;
1819import java .util .Set ;
1920
21+ /**
22+ * Transaction object created by {@link ArangoTransactionManager}.
23+ */
2024class ArangoTransactionObject implements SmartTransactionObject {
2125
2226 private static final Log logger = LogFactory .getLog (ArangoTransactionObject .class );
2327
2428 private final ArangoDatabase database ;
25- private final Set < String > writeCollections = new HashSet <>() ;
29+ private final ArangoTransactionResource resource ;
2630 private int timeout ;
27- private StreamTransactionEntity streamTransaction ;
31+ private StreamTransactionEntity transaction ;
2832
2933 ArangoTransactionObject (ArangoDatabase database , int defaultTimeout , @ Nullable ArangoTransactionResource resource ) {
3034 this .database = database ;
35+ this .resource = resource == null ? new ArangoTransactionResource (null , Collections .emptySet (), false ) : resource ;
3136 this .timeout = defaultTimeout ;
32- if (resource != null ) {
33- writeCollections .addAll (resource .getCollectionNames ());
34- if (resource .getStreamTransactionId () != null ) {
35- streamTransaction = database .getStreamTransaction (resource .getStreamTransactionId ());
36- }
37- }
3837 }
3938
40- ArangoTransactionResource createResource () {
41- return new ArangoTransactionResource ( streamTransaction == null ? null : streamTransaction . getId (), writeCollections ) ;
39+ ArangoTransactionResource getResource () {
40+ return resource ;
4241 }
4342
4443 boolean exists () {
45- return streamTransaction != null ;
44+ return resource . getStreamTransactionId () != null ;
4645 }
4746
4847 void configure (TransactionDefinition definition ) {
@@ -56,55 +55,70 @@ void configure(TransactionDefinition definition) {
5655
5756 ArangoTransactionResource getOrBegin (Collection <String > collections ) {
5857 addCollections (collections );
59- if (streamTransaction != null ) {
60- return createResource ();
58+ if (resource . getStreamTransactionId () != null ) {
59+ return getResource ();
6160 }
6261 StreamTransactionOptions options = new StreamTransactionOptions ()
6362 .allowImplicit (true )
64- .writeCollections (writeCollections .toArray (new String [0 ]))
63+ .writeCollections (resource . getCollectionNames () .toArray (new String [0 ]))
6564 .lockTimeout (Math .max (timeout , 0 ));
66- streamTransaction = database .beginStreamTransaction (options );
65+ transaction = database .beginStreamTransaction (options );
66+ resource .setStreamTransactionId (transaction .getId ());
6767 if (logger .isDebugEnabled ()) {
68- logger .debug ("Began stream transaction " + streamTransaction . getId () + " writing collections " + writeCollections );
68+ logger .debug ("Began stream transaction " + resource . getStreamTransactionId () + " writing collections " + resource . getCollectionNames () );
6969 }
70- return createResource ();
70+ return getResource ();
7171 }
7272
7373 void commit () {
74- if (streamTransaction != null && streamTransaction . getStatus () == StreamTransactionStatus .running ) {
75- database .commitStreamTransaction (streamTransaction . getId ());
74+ if (isStatus ( StreamTransactionStatus .running ) ) {
75+ database .commitStreamTransaction (resource . getStreamTransactionId ());
7676 }
7777 }
7878
7979 void rollback () {
80- if (streamTransaction != null && streamTransaction . getStatus () == StreamTransactionStatus .running ) {
81- database .abortStreamTransaction (streamTransaction . getId ());
80+ if (isStatus ( StreamTransactionStatus .running ) ) {
81+ database .abortStreamTransaction (resource . getStreamTransactionId ());
8282 }
83+ setRollbackOnly ();
8384 }
8485
8586 @ Override
8687 public boolean isRollbackOnly () {
87- return streamTransaction != null && streamTransaction .getStatus () == StreamTransactionStatus .aborted ;
88+ return resource .isRollbackOnly () || isStatus (StreamTransactionStatus .aborted );
89+ }
90+
91+ public void setRollbackOnly () {
92+ resource .setRollbackOnly (true );
8893 }
8994
9095 @ Override
9196 public void flush () {
92- // nothing to do
97+ TransactionSynchronizationUtils . triggerFlush ();
9398 }
9499
95100 @ Override
96101 public String toString () {
97- return streamTransaction == null ? "(not begun)" : streamTransaction . getId ();
102+ return resource . getStreamTransactionId () == null ? "(not begun)" : resource . getStreamTransactionId ();
98103 }
99104
100105 private void addCollections (Collection <String > collections ) {
101- if (streamTransaction != null ) {
102- if (!writeCollections .containsAll (collections )) {
106+ if (resource . getStreamTransactionId () != null ) {
107+ if (!resource . getCollectionNames () .containsAll (collections )) {
103108 Set <String > additional = new HashSet <>(collections );
104- additional .removeAll (writeCollections );
105- throw new IllegalTransactionStateException ("Stream transaction already started on collections " + writeCollections + ", no additional collections allowed: " + additional );
109+ additional .removeAll (resource . getCollectionNames () );
110+ throw new IllegalTransactionStateException ("Stream transaction already started on collections " + resource . getCollectionNames () + ", no additional collections allowed: " + additional );
106111 }
107112 }
108- writeCollections .addAll (collections );
113+ HashSet <String > all = new HashSet <>(resource .getCollectionNames ());
114+ all .addAll (collections );
115+ resource .setCollectionNames (all );
116+ }
117+
118+ private boolean isStatus (StreamTransactionStatus status ) {
119+ if (transaction == null && resource .getStreamTransactionId () != null ) {
120+ transaction = database .getStreamTransaction (resource .getStreamTransactionId ());
121+ }
122+ return transaction != null && transaction .getStatus () == status ;
109123 }
110124}
0 commit comments