|
11 | 11 | import oracle.kubernetes.operator.work.Fiber.CompletionCallback; |
12 | 12 | import oracle.kubernetes.operator.work.Fiber.ExitCallback; |
13 | 13 |
|
| 14 | +/** |
| 15 | + * Allows at most one running Fiber per key value. However, rather than queue later arriving Fibers this class cancels |
| 16 | + * the earlier arriving Fibers. For the operator, this makes sense as domain presence Fibers that come later will always complete |
| 17 | + * or correct work that may have been in-flight. |
| 18 | + */ |
14 | 19 | public class FiberGate { |
15 | 20 | private final Engine engine; |
16 | 21 | private final ConcurrentMap<String, Fiber> gateMap = new ConcurrentHashMap<String, Fiber>(); |
17 | 22 |
|
| 23 | + /** |
| 24 | + * Constructor taking Engine for running Fibers |
| 25 | + * @param engine Engine |
| 26 | + */ |
18 | 27 | public FiberGate(Engine engine) { |
19 | 28 | this.engine = engine; |
20 | 29 | } |
21 | 30 |
|
| 31 | + /** |
| 32 | + * Starts Fiber that cancels any earlier running Fibers with the same key. Fiber map is not updated if no Fiber |
| 33 | + * is started. |
| 34 | + * @param key Key |
| 35 | + * @param strategy Step for Fiber to begin with |
| 36 | + * @param packet Packet |
| 37 | + * @param callback Completion callback |
| 38 | + * @return started Fiber |
| 39 | + */ |
22 | 40 | public Fiber startFiber(String key, Step strategy, Packet packet, CompletionCallback callback) { |
23 | | - return replaceAndStartFiber(key, null, strategy, packet, callback); |
| 41 | + return startFiberIfLastFiberMatches(key, null, strategy, packet, callback); |
24 | 42 | } |
25 | 43 |
|
26 | | - public Fiber replaceAndStartFiber(String key, Fiber old, Step strategy, Packet packet, CompletionCallback callback) { |
| 44 | + /** |
| 45 | + * Starts Fiber only if the last started Fiber matches the given old Fiber. |
| 46 | + * @param key Key |
| 47 | + * @param old Expected last Fiber |
| 48 | + * @param strategy Step for Fiber to begin with |
| 49 | + * @param packet Packet |
| 50 | + * @param callback Completion callback |
| 51 | + * @return started Fiber, or null, if no Fiber started |
| 52 | + */ |
| 53 | + public synchronized Fiber startFiberIfLastFiberMatches(String key, Fiber old, Step strategy, Packet packet, CompletionCallback callback) { |
27 | 54 | Fiber f = engine.createFiber(); |
28 | 55 | WaitForOldFiberStep wfofs; |
29 | | - synchronized (this) { |
30 | | - if (old != null) { |
31 | | - if (!gateMap.replace(key, old, f)) { |
32 | | - return null; |
33 | | - } |
34 | | - } else { |
35 | | - old = gateMap.put(key, f); |
| 56 | + if (old != null) { |
| 57 | + if (!gateMap.replace(key, old, f)) { |
| 58 | + return null; |
36 | 59 | } |
37 | | - wfofs = new WaitForOldFiberStep(old, strategy); |
38 | | - f.getComponents().put(ProcessingConstants.FIBER_COMPONENT_NAME, Component.createFor(wfofs)); |
| 60 | + } else { |
| 61 | + old = gateMap.put(key, f); |
39 | 62 | } |
| 63 | + wfofs = new WaitForOldFiberStep(old, strategy); |
| 64 | + f.getComponents().put(ProcessingConstants.FIBER_COMPONENT_NAME, Component.createFor(wfofs)); |
40 | 65 | f.start(wfofs, packet, new CompletionCallback() { |
41 | 66 | @Override |
42 | 67 | public void onCompletion(Packet packet) { |
@@ -87,5 +112,4 @@ public void onExit() { |
87 | 112 | }); |
88 | 113 | } |
89 | 114 | } |
90 | | - |
91 | 115 | } |
0 commit comments