|
5 | 5 | */ |
6 | 6 | package org.hibernate.reactive.id.impl; |
7 | 7 |
|
| 8 | +import io.vertx.core.Context; |
| 9 | +import io.vertx.core.Vertx; |
| 10 | +import io.vertx.core.net.impl.pool.CombinerExecutor; |
| 11 | +import io.vertx.core.net.impl.pool.Executor; |
| 12 | +import io.vertx.core.net.impl.pool.Task; |
8 | 13 | import org.hibernate.reactive.id.ReactiveIdentifierGenerator; |
9 | 14 | import org.hibernate.reactive.session.ReactiveConnectionSupplier; |
| 15 | +import org.hibernate.reactive.util.impl.CompletionStages; |
10 | 16 |
|
11 | | -import java.util.ArrayList; |
12 | | -import java.util.List; |
| 17 | +import java.util.Objects; |
13 | 18 | import java.util.concurrent.CompletableFuture; |
14 | 19 | import java.util.concurrent.CompletionStage; |
15 | 20 |
|
|
18 | 23 | /** |
19 | 24 | * A {@link ReactiveIdentifierGenerator} which uses the database to allocate |
20 | 25 | * blocks of ids. A block is identified by its "hi" value (the first id in |
21 | | - * the block). While a new block is being allocated, concurrent streams wait |
22 | | - * without blocking. |
| 26 | + * the block). While a new block is being allocated, concurrent streams will |
| 27 | + * defer the operation without blocking. |
23 | 28 | * |
24 | 29 | * @author Gavin King |
| 30 | + * @author Davide D'Alto |
| 31 | + * @author Sanne Grinovero |
| 32 | + * |
25 | 33 | */ |
26 | 34 | public abstract class BlockingIdentifierGenerator implements ReactiveIdentifierGenerator<Long> { |
27 | 35 |
|
28 | 36 | /** |
29 | 37 | * The block size (the number of "lo" values for each "hi" value) |
30 | 38 | */ |
31 | | - |
32 | 39 | protected abstract int getBlockSize(); |
33 | 40 |
|
| 41 | + private final GeneratorState state = new GeneratorState(); |
| 42 | + |
| 43 | + //Access to the critical section is to be performed exclusively |
| 44 | + //via an Action passed to this executor, to ensure exclusive |
| 45 | + //modification access. |
| 46 | + //This replaces the synchronization blocks one would see in a similar |
| 47 | + //service in Hibernate ORM, but using a non-blocking cooperative design. |
| 48 | + private final CombinerExecutor executor = new CombinerExecutor( state ); |
| 49 | + |
34 | 50 | /** |
35 | 51 | * Allocate a new block, by obtaining the next "hi" value from the database |
36 | 52 | */ |
37 | 53 | protected abstract CompletionStage<Long> nextHiValue(ReactiveConnectionSupplier session); |
38 | 54 |
|
39 | | - private int loValue; |
40 | | - private long hiValue; |
41 | | - |
42 | | - private volatile List<Runnable> queue = null; |
| 55 | + //Not strictly necessary to put these fields into a dedicated class, but it help |
| 56 | + //to reason about what the current state is and what the CombinerExecutor is |
| 57 | + //supposed to work on. |
| 58 | + private static class GeneratorState { |
| 59 | + private int loValue; |
| 60 | + private long hiValue; |
| 61 | + } |
43 | 62 |
|
44 | | - protected synchronized long next() { |
45 | | - return loValue > 0 && loValue < getBlockSize() |
46 | | - ? hiValue + loValue++ |
| 63 | + //Critical section: needs to be accessed exclusively via the CombinerExecutor |
| 64 | + //when there's contention; direct invocation is allowed in the fast path. |
| 65 | + private synchronized long next() { |
| 66 | + return state.loValue > 0 && state.loValue < getBlockSize() |
| 67 | + ? state.hiValue + state.loValue++ |
47 | 68 | : -1; //flag value indicating that we need to hit db |
48 | 69 | } |
49 | 70 |
|
50 | | - protected synchronized long next(long hi) { |
51 | | - hiValue = hi; |
52 | | - loValue = 1; |
| 71 | + //Critical section: needs to be accessed exclusively via the CombinerExecutor |
| 72 | + private synchronized long next(long hi) { |
| 73 | + state.hiValue = hi; |
| 74 | + state.loValue = 1; |
53 | 75 | return hi; |
54 | 76 | } |
55 | 77 |
|
56 | 78 | @Override |
57 | | - public CompletionStage<Long> generate(ReactiveConnectionSupplier session, Object entity) { |
58 | | - if ( getBlockSize() <= 1 ) { |
59 | | - //special case where we're not using blocking at all |
60 | | - return nextHiValue( session ); |
| 79 | + public CompletionStage<Long> generate(ReactiveConnectionSupplier connectionSupplier, Object ignored) { |
| 80 | + Objects.requireNonNull( connectionSupplier ); |
| 81 | + |
| 82 | + //Before submitting a task to the executor, let's try our luck via the fast-path |
| 83 | + //(this does actually hit a synchronization, but it's extremely short) |
| 84 | + final long next = next(); |
| 85 | + if ( next != -1 ) { |
| 86 | + return CompletionStages.completedFuture( next ); |
61 | 87 | } |
62 | 88 |
|
63 | | - long local = next(); |
64 | | - if ( local >= 0 ) { |
65 | | - // We don't need to update or initialize the hi |
66 | | - // value in the table, so just increment the lo |
67 | | - // value and return the next id in the block |
68 | | - return completedFuture( local ); |
| 89 | + //Another special case we need to deal with; this is an unlikely configuration, but |
| 90 | + //if it were to happen we should be better off with direct execution rather than using |
| 91 | + //the co-operative executor: |
| 92 | + if ( getBlockSize() <= 1 ) { |
| 93 | + return nextHiValue( connectionSupplier ) |
| 94 | + .thenApply( i -> next( i ) ); |
69 | 95 | } |
70 | | - else { |
71 | | - synchronized (this) { |
72 | | - CompletableFuture<Long> result = new CompletableFuture<>(); |
73 | | - if ( queue == null ) { |
74 | | - // make a queue for any concurrent streams |
75 | | - queue = new ArrayList<>(); |
76 | | - // go off and fetch the next hi value from db |
77 | | - nextHiValue( session ).thenAccept( id -> { |
78 | | -// Vertx.currentContext().runOnContext(v -> { |
79 | | - List<Runnable> list; |
80 | | - synchronized (this) { |
81 | | - // clone ref to the queue |
82 | | - list = queue; |
83 | | - queue = null; |
84 | | - // use the fetched hi value in this stream |
85 | | - result.complete( next( id ) ); |
86 | | - } |
87 | | - // send waiting streams back to try again |
88 | | - list.forEach( Runnable::run ); |
89 | | -// } ); |
90 | | - } ); |
| 96 | + |
| 97 | + final CompletableFuture<Long> resultForThisEventLoop = new CompletableFuture<>(); |
| 98 | + final CompletableFuture<Long> result = new CompletableFuture<>(); |
| 99 | + executor.submit( new GenerateIdAction( connectionSupplier, result ) ); |
| 100 | + final Context context = Vertx.currentContext(); |
| 101 | + result.whenComplete( (id,t) -> { |
| 102 | + final Context newContext = Vertx.currentContext(); |
| 103 | + //Need to be careful in resuming processing on the same context as the original |
| 104 | + //request, potentially having to switch back if we're no longer executing on the same: |
| 105 | + if ( newContext != context ) { |
| 106 | + if ( t != null ) { |
| 107 | + context.runOnContext( ( v ) -> resultForThisEventLoop.completeExceptionally( t ) ); |
| 108 | + } else { |
| 109 | + context.runOnContext( ( v ) -> resultForThisEventLoop.complete( id ) ); |
91 | 110 | } |
92 | | - else { |
93 | | - // wait for the concurrent fetch to complete |
94 | | - // note that we carefully capture the right session,entity here! |
95 | | - queue.add( () -> generate( session, entity ).thenAccept( result::complete ) ); |
| 111 | + } |
| 112 | + else { |
| 113 | + if ( t != null ) { |
| 114 | + resultForThisEventLoop.completeExceptionally( t ); |
| 115 | + } else { |
| 116 | + resultForThisEventLoop.complete( id ); |
96 | 117 | } |
97 | | - return result; |
| 118 | + } |
| 119 | + }); |
| 120 | + return resultForThisEventLoop; |
| 121 | + } |
| 122 | + |
| 123 | + private final class GenerateIdAction implements Executor.Action<GeneratorState> { |
| 124 | + |
| 125 | + private final ReactiveConnectionSupplier connectionSupplier; |
| 126 | + private final CompletableFuture<Long> result; |
| 127 | + |
| 128 | + public GenerateIdAction(ReactiveConnectionSupplier connectionSupplier, CompletableFuture<Long> result) { |
| 129 | + this.connectionSupplier = Objects.requireNonNull(connectionSupplier); |
| 130 | + this.result = Objects.requireNonNull(result); |
| 131 | + } |
| 132 | + |
| 133 | + @Override |
| 134 | + public Task execute(GeneratorState state) { |
| 135 | + long local = next(); |
| 136 | + if ( local >= 0 ) { |
| 137 | + // We don't need to update or initialize the hi |
| 138 | + // value in the table, so just increment the lo |
| 139 | + // value and return the next id in the block |
| 140 | + completedFuture( local ) |
| 141 | + .whenComplete( this::acceptAsReturnValue ); |
| 142 | + return null; |
| 143 | + } else { |
| 144 | + nextHiValue( connectionSupplier ) |
| 145 | + .whenComplete( (newlyGeneratedHi, throwable) -> { |
| 146 | + if ( throwable != null ) { |
| 147 | + result.completeExceptionally( throwable ); |
| 148 | + } else { |
| 149 | + //We ignore the state argument as we actually use the field directly |
| 150 | + //for convenience, but they are the same object. |
| 151 | + executor.submit( stateIgnored -> { |
| 152 | + result.complete( next( newlyGeneratedHi ) ); |
| 153 | + return null; |
| 154 | + }); |
| 155 | + } |
| 156 | + } ); |
| 157 | + return null; |
| 158 | + } |
| 159 | + } |
| 160 | + |
| 161 | + private void acceptAsReturnValue(final Long aLong, final Throwable throwable) { |
| 162 | + if ( throwable != null ) { |
| 163 | + result.completeExceptionally( throwable ); |
| 164 | + } |
| 165 | + else { |
| 166 | + result.complete( aLong ); |
98 | 167 | } |
99 | 168 | } |
100 | 169 | } |
| 170 | + |
101 | 171 | } |
0 commit comments