Skip to content

Commit 89b23a2

Browse files
authored
Fix Scanner thread-safety in active transaction managers (#3094)
1 parent 9cf0540 commit 89b23a2

File tree

2 files changed

+76
-2
lines changed

2 files changed

+76
-2
lines changed

core/src/main/java/com/scalar/db/common/ActiveTransactionManagedDistributedTransactionManager.java

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,12 @@
2121
import com.scalar.db.exception.transaction.UnknownTransactionStatusException;
2222
import com.scalar.db.util.ActiveExpiringMap;
2323
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
24+
import java.util.Iterator;
2425
import java.util.List;
2526
import java.util.Optional;
2627
import java.util.concurrent.atomic.AtomicReference;
2728
import java.util.function.BiConsumer;
29+
import javax.annotation.Nonnull;
2830
import javax.annotation.concurrent.ThreadSafe;
2931
import org.slf4j.Logger;
3032
import org.slf4j.LoggerFactory;
@@ -103,6 +105,11 @@ public DistributedTransaction resume(String txId) throws TransactionNotFoundExce
103105
CoreError.TRANSACTION_NOT_FOUND.buildMessage(), txId));
104106
}
105107

108+
/**
109+
* The methods of this class are synchronized to be thread-safe because the rollback() method may
110+
* be called from the expiration handler in a different thread while other methods are being
111+
* executed.
112+
*/
106113
@VisibleForTesting
107114
class ActiveTransaction extends DecoratedDistributedTransaction {
108115

@@ -124,7 +131,37 @@ public synchronized List<Result> scan(Scan scan) throws CrudException {
124131

125132
@Override
126133
public synchronized Scanner getScanner(Scan scan) throws CrudException {
127-
return super.getScanner(scan);
134+
Scanner scanner = super.getScanner(scan);
135+
return new Scanner() {
136+
@Override
137+
public Optional<Result> one() throws CrudException {
138+
synchronized (ActiveTransaction.this) {
139+
return scanner.one();
140+
}
141+
}
142+
143+
@Override
144+
public List<Result> all() throws CrudException {
145+
synchronized (ActiveTransaction.this) {
146+
return scanner.all();
147+
}
148+
}
149+
150+
@Override
151+
public void close() throws CrudException {
152+
synchronized (ActiveTransaction.this) {
153+
scanner.close();
154+
}
155+
}
156+
157+
@Nonnull
158+
@Override
159+
public Iterator<Result> iterator() {
160+
synchronized (ActiveTransaction.this) {
161+
return scanner.iterator();
162+
}
163+
}
164+
};
128165
}
129166

130167
/** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */

core/src/main/java/com/scalar/db/common/ActiveTransactionManagedTwoPhaseCommitTransactionManager.java

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,12 @@
2323
import com.scalar.db.exception.transaction.ValidationException;
2424
import com.scalar.db.util.ActiveExpiringMap;
2525
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
26+
import java.util.Iterator;
2627
import java.util.List;
2728
import java.util.Optional;
2829
import java.util.concurrent.atomic.AtomicReference;
2930
import java.util.function.BiConsumer;
31+
import javax.annotation.Nonnull;
3032
import javax.annotation.concurrent.ThreadSafe;
3133
import org.slf4j.Logger;
3234
import org.slf4j.LoggerFactory;
@@ -109,6 +111,11 @@ public TwoPhaseCommitTransaction resume(String txId) throws TransactionNotFoundE
109111
CoreError.TRANSACTION_NOT_FOUND.buildMessage(), txId));
110112
}
111113

114+
/**
115+
* The methods of this class are synchronized to be thread-safe because the rollback() method may
116+
* be called from the expiration handler in a different thread while other methods are being
117+
* executed.
118+
*/
112119
@VisibleForTesting
113120
class ActiveTransaction extends DecoratedTwoPhaseCommitTransaction {
114121

@@ -130,7 +137,37 @@ public synchronized List<Result> scan(Scan scan) throws CrudException {
130137

131138
@Override
132139
public synchronized Scanner getScanner(Scan scan) throws CrudException {
133-
return super.getScanner(scan);
140+
Scanner scanner = super.getScanner(scan);
141+
return new Scanner() {
142+
@Override
143+
public Optional<Result> one() throws CrudException {
144+
synchronized (ActiveTransaction.this) {
145+
return scanner.one();
146+
}
147+
}
148+
149+
@Override
150+
public List<Result> all() throws CrudException {
151+
synchronized (ActiveTransaction.this) {
152+
return scanner.all();
153+
}
154+
}
155+
156+
@Override
157+
public void close() throws CrudException {
158+
synchronized (ActiveTransaction.this) {
159+
scanner.close();
160+
}
161+
}
162+
163+
@Nonnull
164+
@Override
165+
public Iterator<Result> iterator() {
166+
synchronized (ActiveTransaction.this) {
167+
return scanner.iterator();
168+
}
169+
}
170+
};
134171
}
135172

136173
/** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */

0 commit comments

Comments
 (0)