Skip to content

Commit d925885

Browse files
committed
#80 create transaction manager
1 parent 66e0e2b commit d925885

File tree

6 files changed

+332
-0
lines changed

6 files changed

+332
-0
lines changed
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package com.arangodb.springframework.repository.query;
2+
3+
import org.springframework.core.NamedInheritableThreadLocal;
4+
5+
import java.util.Collection;
6+
import java.util.function.Function;
7+
8+
/**
9+
* Bridge to postpone late transaction start to be able to inject collections from query side.
10+
*/
11+
public class QueryTransactionBridge {
12+
private static final Function<Collection<String>, String> NO_TRANSACTION = any -> null;
13+
private static final ThreadLocal<Function<Collection<String>, String>> CURRENT_TRANSACTION_BEGIN = new NamedInheritableThreadLocal<>("ArangoTransactionBegin");
14+
15+
public QueryTransactionBridge() {
16+
CURRENT_TRANSACTION_BEGIN.set(NO_TRANSACTION);
17+
}
18+
19+
public void setCurrentTransactionBegin(Function<Collection<String>, String> begin) {
20+
CURRENT_TRANSACTION_BEGIN.set(begin);
21+
}
22+
23+
public void clearCurrentTransactionBegin() {
24+
CURRENT_TRANSACTION_BEGIN.set(NO_TRANSACTION);
25+
}
26+
27+
public String beginCurrentTransaction(Collection<String> collections) {
28+
return CURRENT_TRANSACTION_BEGIN.get().apply(collections);
29+
}
30+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package com.arangodb.springframework.transaction;
2+
3+
import com.arangodb.ArangoDatabase;
4+
import com.arangodb.entity.StreamTransactionEntity;
5+
import com.arangodb.entity.StreamTransactionStatus;
6+
import com.arangodb.model.StreamTransactionOptions;
7+
import org.springframework.transaction.IllegalTransactionStateException;
8+
import org.springframework.transaction.TransactionDefinition;
9+
import org.springframework.transaction.interceptor.TransactionAttribute;
10+
import org.springframework.transaction.support.SmartTransactionObject;
11+
12+
import java.util.Collection;
13+
import java.util.HashSet;
14+
import java.util.Set;
15+
16+
class ArangoTransaction implements SmartTransactionObject {
17+
18+
private final ArangoDatabase database;
19+
private TransactionDefinition definition;
20+
private StreamTransactionEntity transaction;
21+
22+
ArangoTransaction(ArangoDatabase database) {
23+
this.database = database;
24+
}
25+
26+
boolean exists() {
27+
return transaction != null;
28+
}
29+
30+
void configure(TransactionDefinition definition) {
31+
this.definition = definition;
32+
}
33+
34+
String begin(Collection<String> collections) {
35+
if (transaction != null) {
36+
throw new IllegalTransactionStateException("Stream transaction already started");
37+
}
38+
Set<String> allCollections = new HashSet<>(collections);
39+
if (definition instanceof TransactionAttribute) {
40+
allCollections.addAll(((TransactionAttribute) definition).getLabels());
41+
}
42+
StreamTransactionOptions options = new StreamTransactionOptions().allowImplicit(true)
43+
.writeCollections(allCollections.toArray(new String[0]))
44+
.lockTimeout(definition.getTimeout() == -1 ? 0 : definition.getTimeout());
45+
transaction = database.beginStreamTransaction(options);
46+
return transaction.getId();
47+
}
48+
49+
void commit() {
50+
database.commitStreamTransaction(transaction.getId());
51+
}
52+
53+
void rollback() {
54+
database.abortStreamTransaction(transaction.getId());
55+
}
56+
57+
@Override
58+
public boolean isRollbackOnly() {
59+
return transaction != null && transaction.getStatus() == StreamTransactionStatus.aborted;
60+
}
61+
62+
@Override
63+
public void flush() {
64+
// nothing to do
65+
}
66+
67+
@Override
68+
public String toString() {
69+
return transaction == null ? "(not begun)" : transaction.getId();
70+
}
71+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package com.arangodb.springframework.transaction;
2+
3+
import com.arangodb.springframework.core.ArangoOperations;
4+
import com.arangodb.springframework.repository.query.QueryTransactionBridge;
5+
import org.springframework.context.annotation.Bean;
6+
import org.springframework.transaction.PlatformTransactionManager;
7+
import org.springframework.transaction.annotation.TransactionManagementConfigurer;
8+
9+
public class ArangoTransactionManagementConfigurer implements TransactionManagementConfigurer {
10+
11+
private final ArangoOperations operations;
12+
private final QueryTransactionBridge bridge;
13+
14+
public ArangoTransactionManagementConfigurer(ArangoOperations operations, QueryTransactionBridge bridge) {
15+
this.operations = operations;
16+
this.bridge = bridge;
17+
}
18+
19+
@Override
20+
@Bean
21+
public PlatformTransactionManager annotationDrivenTransactionManager() {
22+
return new ArangoTransactionManager(operations, bridge);
23+
}
24+
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package com.arangodb.springframework.transaction;
2+
3+
import com.arangodb.ArangoDatabase;
4+
import com.arangodb.model.StreamTransactionOptions;
5+
import com.arangodb.springframework.core.ArangoOperations;
6+
import com.arangodb.springframework.repository.query.QueryTransactionBridge;
7+
import org.springframework.transaction.InvalidIsolationLevelException;
8+
import org.springframework.transaction.TransactionDefinition;
9+
import org.springframework.transaction.TransactionException;
10+
import org.springframework.transaction.support.AbstractPlatformTransactionManager;
11+
import org.springframework.transaction.support.DefaultTransactionStatus;
12+
13+
import java.util.Collection;
14+
import java.util.function.Function;
15+
16+
/**
17+
* Transaction manager using ArangoDB stream transactions on the
18+
* {@linkplain ArangoOperations#getDatabaseName()} current database} of the template.
19+
* Isolation level {@linkplain TransactionDefinition#ISOLATION_SERIALIZABLE serializable} is not supported.
20+
*
21+
* @see ArangoDatabase#beginStreamTransaction(StreamTransactionOptions)
22+
*/
23+
public class ArangoTransactionManager extends AbstractPlatformTransactionManager {
24+
25+
private final ArangoOperations operations;
26+
private final QueryTransactionBridge bridge;
27+
28+
public ArangoTransactionManager(ArangoOperations operations, QueryTransactionBridge bridge) {
29+
this.operations = operations;
30+
this.bridge = bridge;
31+
}
32+
33+
@Override
34+
protected Object doGetTransaction() throws TransactionException {
35+
return new ArangoTransaction(operations.driver().db(operations.getDatabaseName()));
36+
}
37+
38+
@Override
39+
protected void doBegin(Object transaction, TransactionDefinition definition) throws InvalidIsolationLevelException {
40+
int isolationLevel = definition.getIsolationLevel();
41+
if (isolationLevel != -1 && (isolationLevel & TransactionDefinition.ISOLATION_SERIALIZABLE) != 0) {
42+
throw new InvalidIsolationLevelException("ArangoDB does not support isolation level serializable");
43+
}
44+
ArangoTransaction tx = (ArangoTransaction) transaction;
45+
tx.configure(definition);
46+
Function<Collection<String>, String> begin = tx::begin;
47+
bridge.setCurrentTransactionBegin(begin.andThen(id -> {
48+
if (logger.isDebugEnabled()) {
49+
logger.debug("Began stream transaction " + id);
50+
}
51+
return id;
52+
}));
53+
}
54+
55+
@Override
56+
protected void doCommit(DefaultTransactionStatus status) throws TransactionException {
57+
ArangoTransaction tx = (ArangoTransaction) status.getTransaction();
58+
if (logger.isDebugEnabled()) {
59+
logger.debug("Commit stream transaction " + tx);
60+
}
61+
tx.commit();
62+
bridge.clearCurrentTransactionBegin();
63+
}
64+
65+
@Override
66+
protected void doRollback(DefaultTransactionStatus status) throws TransactionException {
67+
ArangoTransaction tx = (ArangoTransaction) status.getTransaction();
68+
if (logger.isDebugEnabled()) {
69+
logger.debug("Rollback stream transaction " + tx);
70+
}
71+
tx.rollback();
72+
bridge.clearCurrentTransactionBegin();
73+
}
74+
75+
@Override
76+
protected boolean isExistingTransaction(Object transaction) throws TransactionException {
77+
return transaction instanceof ArangoTransaction
78+
&& ((ArangoTransaction) transaction).exists();
79+
}
80+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package com.arangodb.springframework.repository.query;
2+
3+
import org.hamcrest.Matchers;
4+
import org.junit.After;
5+
import org.junit.Test;
6+
7+
import java.util.Collections;
8+
9+
import static org.hamcrest.MatcherAssert.assertThat;
10+
11+
public class QueryTransactionBridgeTest {
12+
13+
private QueryTransactionBridge underTest = new QueryTransactionBridge();
14+
15+
@Test
16+
public void beginCurrentTransactionInitiallyReturnsNull() {
17+
assertThat(underTest.beginCurrentTransaction(Collections.singleton("test")), Matchers.nullValue());
18+
}
19+
20+
@Test
21+
public void setCurrentTransactionBeginIsAppliedOnBeginCurrentTransaction() {
22+
underTest.setCurrentTransactionBegin(collections -> collections.iterator().next());
23+
assertThat(underTest.beginCurrentTransaction(Collections.singleton("test")), Matchers.is("test"));
24+
}
25+
26+
@After
27+
public void cleanup() {
28+
underTest.clearCurrentTransactionBegin();
29+
}
30+
}
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
package com.arangodb.springframework.transaction;
2+
3+
import com.arangodb.ArangoDB;
4+
import com.arangodb.ArangoDatabase;
5+
import com.arangodb.DbName;
6+
import com.arangodb.entity.StreamTransactionEntity;
7+
import com.arangodb.model.StreamTransactionOptions;
8+
import com.arangodb.springframework.core.ArangoOperations;
9+
import com.arangodb.springframework.repository.query.QueryTransactionBridge;
10+
import org.junit.Before;
11+
import org.junit.Test;
12+
import org.junit.runner.RunWith;
13+
import org.mockito.ArgumentCaptor;
14+
import org.mockito.Captor;
15+
import org.mockito.InjectMocks;
16+
import org.mockito.Mock;
17+
import org.mockito.junit.MockitoJUnitRunner;
18+
import org.springframework.transaction.InvalidIsolationLevelException;
19+
import org.springframework.transaction.TransactionDefinition;
20+
import org.springframework.transaction.TransactionStatus;
21+
import org.springframework.transaction.interceptor.DefaultTransactionAttribute;
22+
23+
import java.util.Arrays;
24+
import java.util.Collection;
25+
import java.util.Collections;
26+
import java.util.function.Function;
27+
28+
import static org.hamcrest.MatcherAssert.assertThat;
29+
import static org.hamcrest.Matchers.is;
30+
import static org.mockito.ArgumentMatchers.any;
31+
import static org.mockito.Mockito.*;
32+
33+
@RunWith(MockitoJUnitRunner.class)
34+
public class ArangoTransactionManagerTest {
35+
36+
private static final DbName DATABASE_NAME = DbName.of("test");
37+
38+
@Mock
39+
private ArangoOperations operations;
40+
@Mock
41+
private QueryTransactionBridge bridge;
42+
@InjectMocks
43+
private ArangoTransactionManager underTest;
44+
@Mock
45+
private ArangoDB driver;
46+
@Mock
47+
private ArangoDatabase database;
48+
@Mock
49+
private StreamTransactionEntity streamTransaction;
50+
@Captor
51+
private ArgumentCaptor<Function<Collection<String>, String>> beginPassed;
52+
@Captor
53+
private ArgumentCaptor<StreamTransactionOptions> optionsPassed;
54+
55+
@Before
56+
public void setupMocks() {
57+
when(operations.getDatabaseName())
58+
.thenReturn(DATABASE_NAME);
59+
when(operations.driver())
60+
.thenReturn(driver);
61+
when(driver.db(any(DbName.class)))
62+
.thenReturn(database);
63+
}
64+
65+
@Test
66+
public void getTransactionReturnsNewTransactionWithoutStreamTransaction() {
67+
TransactionStatus transaction = underTest.getTransaction(new DefaultTransactionAttribute());
68+
assertThat(transaction.isNewTransaction(), is(true));
69+
verify(driver).db(DATABASE_NAME);
70+
verify(bridge).setCurrentTransactionBegin(any());
71+
verifyNoInteractions(database);
72+
}
73+
74+
@Test
75+
public void getTransactionReturnsTransactionCreatesStreamTransactionOnBridgeBeginCall() {
76+
DefaultTransactionAttribute definition = new DefaultTransactionAttribute();
77+
definition.setLabels(Collections.singleton("baz"));
78+
definition.setTimeout(20);
79+
TransactionStatus transaction = underTest.getTransaction(definition);
80+
when(streamTransaction.getId())
81+
.thenReturn("123");
82+
when(database.beginStreamTransaction(any()))
83+
.thenReturn(streamTransaction);
84+
verify(bridge).setCurrentTransactionBegin(beginPassed.capture());
85+
beginPassed.getValue().apply(Arrays.asList("foo", "bar"));
86+
verify(database).beginStreamTransaction(optionsPassed.capture());
87+
assertThat(optionsPassed.getValue().getAllowImplicit(), is(true));
88+
assertThat(optionsPassed.getValue().getLockTimeout(), is(20));
89+
}
90+
91+
@Test(expected = InvalidIsolationLevelException.class)
92+
public void getTransactionThrowsInvalidIsolationLevelExceptionForIsolationSerializable() {
93+
DefaultTransactionAttribute definition = new DefaultTransactionAttribute();
94+
definition.setIsolationLevel(TransactionDefinition.ISOLATION_SERIALIZABLE);
95+
underTest.getTransaction(definition);
96+
}
97+
}

0 commit comments

Comments
 (0)