2424import java .util .concurrent .CopyOnWriteArrayList ;
2525import java .util .concurrent .atomic .AtomicIntegerFieldUpdater ;
2626import java .util .function .BiFunction ;
27- import java .util .stream .Collectors ;
2827
2928import org .springframework .data .redis .connection .Message ;
3029import org .springframework .data .redis .connection .MessageListener ;
3433
3534/**
3635 * Synchronizing {@link MessageListener} and {@link SubscriptionListener} that allows notifying a {@link Runnable}
37- * (through {@link SubscriptionSynchronizion }) upon completing subscriptions to channels or patterns.
36+ * (through {@link SubscriptionSynchronization }) upon completing subscriptions to channels or patterns.
3837 *
3938 * @author Mark Paluch
4039 * @since 3.0
@@ -43,19 +42,19 @@ class SynchronizingMessageListener implements MessageListener, SubscriptionListe
4342
4443 private final MessageListener messageListener ;
4544 private final SubscriptionListener subscriptionListener ;
46- private final List <SubscriptionSynchronizion > synchronizations = new CopyOnWriteArrayList <>();
45+ private final List <SubscriptionSynchronization > synchronizations = new CopyOnWriteArrayList <>();
4746
4847 public SynchronizingMessageListener (MessageListener messageListener , SubscriptionListener subscriptionListener ) {
4948 this .messageListener = messageListener ;
5049 this .subscriptionListener = subscriptionListener ;
5150 }
5251
5352 /**
54- * Register a {@link SubscriptionSynchronizion }.
53+ * Register a {@link SubscriptionSynchronization }.
5554 *
5655 * @param synchronization must not be {@literal null}.
5756 */
58- public void addSynchronization (SubscriptionSynchronizion synchronization ) {
57+ public void addSynchronization (SubscriptionSynchronization synchronization ) {
5958 this .synchronizations .add (synchronization );
6059 }
6160
@@ -68,7 +67,7 @@ public void onMessage(Message message, @Nullable byte[] pattern) {
6867 public void onChannelSubscribed (byte [] channel , long count ) {
6968
7069 subscriptionListener .onChannelSubscribed (channel , count );
71- handleSubscription (channel , SubscriptionSynchronizion ::onChannelSubscribed );
70+ handleSubscription (channel , SubscriptionSynchronization ::onChannelSubscribed );
7271 }
7372
7473 @ Override
@@ -80,7 +79,7 @@ public void onChannelUnsubscribed(byte[] channel, long count) {
8079 public void onPatternSubscribed (byte [] pattern , long count ) {
8180
8281 subscriptionListener .onPatternSubscribed (pattern , count );
83- handleSubscription (pattern , SubscriptionSynchronizion ::onPatternSubscribed );
82+ handleSubscription (pattern , SubscriptionSynchronization ::onPatternSubscribed );
8483 }
8584
8685 @ Override
@@ -89,16 +88,16 @@ public void onPatternUnsubscribed(byte[] pattern, long count) {
8988 }
9089
9190 void handleSubscription (byte [] topic ,
92- BiFunction <SubscriptionSynchronizion , ByteArrayWrapper , Boolean > synchronizerCallback ) {
91+ BiFunction <SubscriptionSynchronization , ByteArrayWrapper , Boolean > synchronizerCallback ) {
9392
9493 if (synchronizations .isEmpty ()) {
9594 return ;
9695 }
9796
9897 ByteArrayWrapper binaryChannel = new ByteArrayWrapper (topic );
99- List <SubscriptionSynchronizion > finalized = new ArrayList <>(synchronizations .size ());
98+ List <SubscriptionSynchronization > finalized = new ArrayList <>(synchronizations .size ());
10099
101- for (SubscriptionSynchronizion synchronizer : synchronizations ) {
100+ for (SubscriptionSynchronization synchronizer : synchronizations ) {
102101
103102 if (synchronizerCallback .apply (synchronizer , binaryChannel )) {
104103 finalized .add (synchronizer );
@@ -111,37 +110,38 @@ void handleSubscription(byte[] topic,
111110 /**
112111 * Synchronization to await subscriptions for channels and patterns.
113112 */
114- static class SubscriptionSynchronizion {
113+ static class SubscriptionSynchronization {
115114
116- private static final AtomicIntegerFieldUpdater <SubscriptionSynchronizion > DONE = AtomicIntegerFieldUpdater
117- .newUpdater (SubscriptionSynchronizion .class , "done" );
115+ private static final AtomicIntegerFieldUpdater <SubscriptionSynchronization > DONE = AtomicIntegerFieldUpdater
116+ .newUpdater (SubscriptionSynchronization .class , "done" );
118117
119118 private static final int NOT_DONE = 0 ;
120119 private static final int DONE_DONE = 0 ;
121120
122121 private volatile int done = NOT_DONE ;
123- private final Set <ByteArrayWrapper > remainingPatterns ;
124- private final Set <ByteArrayWrapper > remainingChannels ;
125122
126123 private final Runnable doneCallback ;
127124
128- public SubscriptionSynchronizion (Collection <byte []> remainingPatterns , Collection <byte []> remainingChannels ,
125+ private final Set <ByteArrayWrapper > remainingPatterns ;
126+ private final Set <ByteArrayWrapper > remainingChannels ;
127+
128+ public SubscriptionSynchronization (Collection <byte []> remainingPatterns , Collection <byte []> remainingChannels ,
129129 Runnable doneCallback ) {
130130
131131 if (remainingPatterns .isEmpty ()) {
132132 this .remainingPatterns = Collections .emptySet ();
133133 } else {
134134 this .remainingPatterns = ConcurrentHashMap .newKeySet (remainingPatterns .size ());
135135 this .remainingPatterns
136- .addAll (remainingPatterns .stream ().map (ByteArrayWrapper ::new ).collect ( Collectors . toList () ));
136+ .addAll (remainingPatterns .stream ().map (ByteArrayWrapper ::new ).toList ());
137137 }
138138
139139 if (remainingChannels .isEmpty ()) {
140140 this .remainingChannels = Collections .emptySet ();
141141 } else {
142142 this .remainingChannels = ConcurrentHashMap .newKeySet (remainingChannels .size ());
143143 this .remainingChannels
144- .addAll (remainingChannels .stream ().map (ByteArrayWrapper ::new ).collect ( Collectors . toList () ));
144+ .addAll (remainingChannels .stream ().map (ByteArrayWrapper ::new ).toList ());
145145 }
146146
147147 this .doneCallback = doneCallback ;
0 commit comments