Skip to content

Commit 9aecc61

Browse files
committed
Fix race condition in concurrent Session access Integeration Tests.
The race condition involved the absence/presence of the Session ID in GemFire/Geode client/server Integration Tests when using client PROXY or CACHING_PROXY Regions to persist Session state. When the remote Region operation (e.g. put(..), when storing the new Session in the cache Region) occurred, the current Thread could then be blocked on an IO operation, which would then free up another MultithreadedTC test framework thread to continue, but the Session creating Thread may not have set theh Session ID required by the other threads in a timely manner.
1 parent 04f0de2 commit 9aecc61

File tree

4 files changed

+65
-70
lines changed

4 files changed

+65
-70
lines changed

spring-session-data-geode/src/integration-test/java/org/springframework/session/data/gemfire/AbstractConcurrentSessionOperationsIntegrationTests.java

Lines changed: 32 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import static org.springframework.session.data.gemfire.AbstractGemFireOperationsSessionRepository.GemFireSession;
2424

2525
import java.time.Instant;
26+
import java.util.Objects;
2627
import java.util.Optional;
2728
import java.util.concurrent.atomic.AtomicReference;
2829

@@ -64,6 +65,8 @@ protected static class AbstractConcurrentSessionOperationsTestCase extends Multi
6465

6566
private final AbstractConcurrentSessionOperationsIntegrationTests testInstance;
6667

68+
private final AtomicReference<String> sessionId = new AtomicReference<>(null);
69+
6770
private final GemFireOperationsSessionRepository sessionRepository;
6871

6972
protected AbstractConcurrentSessionOperationsTestCase(
@@ -83,44 +86,51 @@ protected AbstractConcurrentSessionOperationsTestCase(
8386
GemFireOperationsSessionRepository.class.getName(), ObjectUtils.nullSafeClassName(sessionRepository)));
8487
}
8588

86-
@NonNull @SuppressWarnings("unused")
87-
protected AbstractConcurrentSessionOperationsIntegrationTests getTestInstance() {
89+
@SuppressWarnings("unused")
90+
protected @NonNull AbstractConcurrentSessionOperationsIntegrationTests getTestInstance() {
8891
return this.testInstance;
8992
}
9093

91-
@NonNull
92-
protected GemFireOperationsSessionRepository getSessionRepository() {
94+
protected @NonNull GemFireOperationsSessionRepository getSessionRepository() {
9395
return this.sessionRepository;
9496
}
9597

96-
@Nullable
97-
protected Session findById(String id) {
98+
protected @NonNull String getSessionId() {
99+
return this.sessionId.get();
100+
}
101+
102+
protected void setSessionId(@Nullable String sessionId) {
103+
this.sessionId.set(sessionId);
104+
}
105+
106+
protected @Nullable Session findById(@NonNull String id) {
98107
return getSessionRepository().findById(id);
99108
}
100109

101-
@NonNull
102-
protected Session newSession() {
110+
protected @NonNull Session newSession() {
103111
return getSessionRepository().createSession();
104112
}
105113

106-
@Nullable
107-
protected <T extends Session> T save(@Nullable T session) {
114+
protected @Nullable <T extends Session> T save(@Nullable T session) {
108115
getSessionRepository().save(session);
109116
return session;
110117
}
118+
119+
protected void waitOnAvailableSessionId() {
120+
AbstractConcurrentSessionOperationsIntegrationTests.waitOn(() -> Objects.nonNull(this.sessionId.get()));
121+
}
111122
}
112123

113124
@SuppressWarnings("unused")
114125
public static class ConcurrentSessionOperationsTestCase extends AbstractConcurrentSessionOperationsTestCase {
115126

116127
private final AtomicReference<Instant> lastAccessedTime = new AtomicReference<>(null);
117-
private final AtomicReference<String> sessionId = new AtomicReference<>(null);
118128

119129
public ConcurrentSessionOperationsTestCase(AbstractConcurrentSessionOperationsIntegrationTests testInstance) {
120130
super(testInstance);
121131
}
122132

123-
// Creator Thread
133+
// Session Creator Thread
124134
@SuppressWarnings("rawtypes")
125135
public void thread1() {
126136

@@ -142,7 +152,7 @@ public void thread1() {
142152

143153
save(session);
144154

145-
this.sessionId.set(session.getId());
155+
setSessionId(session.getId());
146156

147157
waitForTick(4);
148158
assertTick(4);
@@ -153,18 +163,19 @@ public void thread1() {
153163
save(session);
154164
}
155165

156-
// Modifier (Attribute) Thread
166+
// Session Attribute Modifier Thread
157167
public void thread2() {
158168

159169
Thread.currentThread().setName("User Session Two");
160170

161171
waitForTick(1);
162172
assertTick(1);
173+
waitOnAvailableSessionId();
163174

164-
Session session = findById(this.sessionId.get());
175+
Session session = findById(getSessionId());
165176

166177
assertThat(session).isNotNull();
167-
assertThat(session.getId()).isEqualTo(this.sessionId.get());
178+
assertThat(session.getId()).isEqualTo(getSessionId());
168179
assertThat(session.isExpired()).isFalse();
169180
assertThat(session.getAttributeNames()).containsOnly("attributeOne", "attributeTwo");
170181
assertThat(session.<String>getAttribute("attributeOne")).isEqualTo("testOne");
@@ -181,18 +192,18 @@ public void thread2() {
181192
save(session);
182193
}
183194

184-
// Modifier (Timestamp) Thread
195+
// Session Timestamp Modifier Thread
185196
public void thread3() {
186197

187198
Thread.currentThread().setName("User Session Three");
188199

189200
waitForTick(1);
190201
assertTick(1);
191202

192-
Session session = findById(this.sessionId.get());
203+
Session session = findById(getSessionId());
193204

194205
assertThat(session).isNotNull();
195-
assertThat(session.getId()).isEqualTo(this.sessionId.get());
206+
assertThat(session.getId()).isEqualTo(getSessionId());
196207
assertThat(session.isExpired()).isFalse();
197208
assertThat(session.getAttributeNames()).containsOnly("attributeOne", "attributeTwo");
198209
assertThat(session.<String>getAttribute("attributeOne")).isEqualTo("testOne");
@@ -211,10 +222,10 @@ public void finish() {
211222

212223
super.finish();
213224

214-
Session session = findById(this.sessionId.get());
225+
Session session = findById(getSessionId());
215226

216227
assertThat(session).isNotNull();
217-
assertThat(session.getId()).isEqualTo(this.sessionId.get());
228+
assertThat(session.getId()).isEqualTo(getSessionId());
218229
assertThat(session.getAttributeNames()).containsOnly("attributeOne", "attributeTwo", "attributeThree");
219230
assertThat(session.<String>getAttribute("attributeOne")).isEqualTo("testOne");
220231
assertThat(session.<String>getAttribute("attributeTwo")).isEqualTo("testTwo");

spring-session-data-geode/src/integration-test/java/org/springframework/session/data/gemfire/ConcurrentSessionOperationsUsingClientCachingProxyRegionIntegrationTests.java

Lines changed: 24 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import java.time.Instant;
3131
import java.util.Arrays;
3232
import java.util.Objects;
33-
import java.util.concurrent.atomic.AtomicReference;
3433
import java.util.function.Predicate;
3534

3635
import org.junit.Before;
@@ -54,6 +53,7 @@
5453
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
5554
import org.springframework.data.gemfire.config.annotation.CacheServerApplication;
5655
import org.springframework.data.gemfire.config.annotation.ClientCacheApplication;
56+
import org.springframework.lang.NonNull;
5757
import org.springframework.session.Session;
5858
import org.springframework.session.data.gemfire.config.annotation.web.http.EnableGemFireHttpSession;
5959
import org.springframework.session.data.gemfire.config.annotation.web.http.GemFireHttpSessionConfiguration;
@@ -90,8 +90,6 @@
9090
public class ConcurrentSessionOperationsUsingClientCachingProxyRegionIntegrationTests
9191
extends AbstractConcurrentSessionOperationsIntegrationTests {
9292

93-
private static final String GEMFIRE_LOG_LEVEL = "error";
94-
9593
@Before
9694
public void setup() {
9795

@@ -128,10 +126,8 @@ public void regionPutWithNonDirtySessionResultsInInefficientIncorrectBehavior()
128126
// Tests that 2 Threads share the same Session object reference and therefore see's each other's changes.
129127
public static class ConcurrentCachedSessionOperationsTestCase extends AbstractConcurrentSessionOperationsTestCase {
130128

131-
private final AtomicReference<String> sessionId = new AtomicReference<>(null);
132-
133129
public ConcurrentCachedSessionOperationsTestCase(
134-
ConcurrentSessionOperationsUsingClientCachingProxyRegionIntegrationTests testInstance) {
130+
@NonNull ConcurrentSessionOperationsUsingClientCachingProxyRegionIntegrationTests testInstance) {
135131

136132
super(testInstance);
137133
}
@@ -153,7 +149,7 @@ public void initialize() {
153149

154150
save(session);
155151

156-
this.sessionId.set(session.getId());
152+
setSessionId(session.getId());
157153
}
158154

159155
public void thread1() {
@@ -164,10 +160,10 @@ public void thread1() {
164160

165161
Instant beforeLastAccessedTime = Instant.now();
166162

167-
Session session = findById(this.sessionId.get());
163+
Session session = findById(getSessionId());
168164

169165
assertThat(session).isNotNull();
170-
assertThat(session.getId()).isEqualTo(this.sessionId.get());
166+
assertThat(session.getId()).isEqualTo(getSessionId());
171167
assertThat(session.getLastAccessedTime()).isAfterOrEqualTo(beforeLastAccessedTime);
172168
assertThat(session.getLastAccessedTime()).isBeforeOrEqualTo(Instant.now());
173169
assertThat(session.isExpired()).isFalse();
@@ -192,10 +188,10 @@ public void thread2() {
192188

193189
Instant beforeLastAccessedTime = Instant.now();
194190

195-
Session session = findById(this.sessionId.get());
191+
Session session = findById(getSessionId());
196192

197193
assertThat(session).isNotNull();
198-
assertThat(session.getId()).isEqualTo(this.sessionId.get());
194+
assertThat(session.getId()).isEqualTo(getSessionId());
199195
assertThat(session.getLastAccessedTime()).isAfterOrEqualTo(beforeLastAccessedTime);
200196
assertThat(session.getLastAccessedTime()).isBeforeOrEqualTo(Instant.now());
201197
assertThat(session.isExpired()).isFalse();
@@ -217,8 +213,6 @@ public static class RegionPutWithNonDirtySessionTestCase extends AbstractConcurr
217213
private static final String DATA_SERIALIZER_NOT_FOUND_EXCEPTION_MESSAGE =
218214
"No DataSerializer was found capable of de/serializing Sessions";
219215

220-
private final AtomicReference<String> sessionId = new AtomicReference<>(null);
221-
222216
private final DataSerializer sessionSerializer;
223217

224218
private final Region<Object, Session> sessions;
@@ -232,6 +226,14 @@ public RegionPutWithNonDirtySessionTestCase(
232226
this.sessionSerializer = reregisterDataSerializer(resolveDataSerializer());
233227
}
234228

229+
private DataSerializer reregisterDataSerializer(DataSerializer dataSerializer) {
230+
231+
InternalDataSerializer.unregister(dataSerializer.getId());
232+
InternalDataSerializer._register(dataSerializer, false);
233+
234+
return dataSerializer;
235+
}
236+
235237
private DataSerializer resolveDataSerializer() {
236238

237239
return Arrays.stream(nullSafeArray(InternalDataSerializer.getSerializers(), DataSerializer.class))
@@ -258,14 +260,6 @@ private Predicate<? super DataSerializer> sessionSerializerFilter() {
258260
};
259261
}
260262

261-
private DataSerializer reregisterDataSerializer(DataSerializer dataSerializer) {
262-
263-
InternalDataSerializer.unregister(dataSerializer.getId());
264-
InternalDataSerializer._register(dataSerializer, false);
265-
266-
return dataSerializer;
267-
}
268-
269263
private Session get(String id) {
270264
return this.sessions.get(id);
271265
}
@@ -310,7 +304,7 @@ public void thread1() {
310304

311305
getSessionRepository().commit(loadedSession);
312306

313-
this.sessionId.set(session.getId());
307+
setSessionId(session.getId());
314308
}
315309

316310
public void thread2() {
@@ -319,11 +313,12 @@ public void thread2() {
319313

320314
waitForTick(1);
321315
assertTick(1);
316+
waitOnAvailableSessionId();
322317

323-
Session session = get(this.sessionId.get());
318+
Session session = get(getSessionId());
324319

325320
assertThat(session).isInstanceOf(GemFireSession.class);
326-
assertThat(session.getId()).isEqualTo(this.sessionId.get());
321+
assertThat(session.getId()).isEqualTo(getSessionId());
327322
assertThat(session.isExpired()).isFalse();
328323
assertThat(session.getAttributeNames()).containsOnly("attributeOne", "attributeTwo");
329324
assertThat(session.<String>getAttribute("attributeOne")).isEqualTo("testOne");
@@ -336,10 +331,10 @@ public void thread2() {
336331
@Override
337332
public void finish() {
338333

339-
Session session = get(this.sessionId.get());
334+
Session session = get(getSessionId());
340335

341336
assertThat(session).isNotNull();
342-
assertThat(session.getId()).isEqualTo(this.sessionId.get());
337+
assertThat(session.getId()).isEqualTo(getSessionId());
343338
assertThat(session.isExpired()).isFalse();
344339
assertThat(session.getAttributeNames()).containsOnly("attributeOne", "attributeTwo");
345340
assertThat(session.<String>getAttribute("attributeOne")).isEqualTo("testOne");
@@ -352,6 +347,7 @@ public void finish() {
352347

353348
verify(this.sessionSerializer, times(2))
354349
.toData(isA(GemFireSession.class), isA(DataOutput.class));
350+
355351
}
356352
catch (ClassNotFoundException | IOException ignore) { }
357353
}
@@ -364,7 +360,7 @@ public static void startGemFireServer() throws IOException {
364360

365361
// Tests fail when 'copyOnRead' is set to 'true'!
366362
//@ClientCacheApplication(copyOnRead = true, logLevel = GEMFIRE_LOG_LEVEL, subscriptionEnabled = true)
367-
@ClientCacheApplication(logLevel = GEMFIRE_LOG_LEVEL, subscriptionEnabled = true)
363+
@ClientCacheApplication(subscriptionEnabled = true)
368364
@EnableGemFireHttpSession(
369365
clientRegionShortcut = ClientRegionShortcut.CACHING_PROXY,
370366
poolName = "DEFAULT",
@@ -373,10 +369,7 @@ public static void startGemFireServer() throws IOException {
373369
)
374370
static class GemFireClientConfiguration { }
375371

376-
@CacheServerApplication(
377-
name = "ConcurrentSessionOperationsUsingClientCachingProxyRegionIntegrationTests",
378-
logLevel = GEMFIRE_LOG_LEVEL
379-
)
372+
@CacheServerApplication(name = "ConcurrentSessionOperationsUsingClientCachingProxyRegionIntegrationTests")
380373
@EnableGemFireHttpSession(
381374
regionName = "Sessions",
382375
sessionSerializerBeanName = GemFireHttpSessionConfiguration.SESSION_DATA_SERIALIZER_BEAN_NAME

spring-session-data-geode/src/integration-test/java/org/springframework/session/data/gemfire/ConcurrentSessionOperationsUsingClientLocalRegionIntegrationTests.java

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
import static org.assertj.core.api.Assertions.assertThat;
1919

20-
import java.util.concurrent.atomic.AtomicReference;
21-
2220
import org.junit.Test;
2321
import org.junit.runner.RunWith;
2422

@@ -29,6 +27,7 @@
2927
import org.apache.geode.cache.client.ClientRegionShortcut;
3028

3129
import org.springframework.data.gemfire.config.annotation.ClientCacheApplication;
30+
import org.springframework.lang.NonNull;
3231
import org.springframework.session.Session;
3332
import org.springframework.session.data.gemfire.config.annotation.web.http.EnableGemFireHttpSession;
3433
import org.springframework.session.data.gemfire.config.annotation.web.http.GemFireHttpSessionConfiguration;
@@ -56,8 +55,6 @@
5655
public class ConcurrentSessionOperationsUsingClientLocalRegionIntegrationTests
5756
extends AbstractConcurrentSessionOperationsIntegrationTests {
5857

59-
private static final String GEMFIRE_LOG_LEVEL = "error";
60-
6158
@Test
6259
public void concurrentLocalSessionAccessIsCorrect() throws Throwable {
6360
TestFramework.runOnce(new ConcurrentLocalSessionAccessTestCase(this));
@@ -66,10 +63,8 @@ public void concurrentLocalSessionAccessIsCorrect() throws Throwable {
6663
@SuppressWarnings("unused")
6764
public static class ConcurrentLocalSessionAccessTestCase extends AbstractConcurrentSessionOperationsTestCase {
6865

69-
private final AtomicReference<String> sessionId = new AtomicReference<>(null);
70-
7166
public ConcurrentLocalSessionAccessTestCase(
72-
ConcurrentSessionOperationsUsingClientLocalRegionIntegrationTests testInstance) {
67+
@NonNull ConcurrentSessionOperationsUsingClientLocalRegionIntegrationTests testInstance) {
7368

7469
super(testInstance);
7570
}
@@ -89,7 +84,7 @@ public void thread1() {
8984

9085
save(session);
9186

92-
this.sessionId.set(session.getId());
87+
setSessionId(session.getId());
9388

9489
waitForTick(2);
9590
assertTick(2);
@@ -105,11 +100,12 @@ public void thread2() {
105100

106101
waitForTick(1);
107102
assertTick(1);
103+
waitOnAvailableSessionId();
108104

109-
Session session = findById(this.sessionId.get());
105+
Session session = findById(getSessionId());
110106

111107
assertThat(session).isNotNull();
112-
assertThat(session.getId()).isEqualTo(this.sessionId.get());
108+
assertThat(session.getId()).isEqualTo(getSessionId());
113109
assertThat(session.isExpired()).isFalse();
114110
assertThat(session.getAttributeNames()).isEmpty();
115111

@@ -122,7 +118,7 @@ public void thread2() {
122118
}
123119
}
124120

125-
@ClientCacheApplication(logLevel = GEMFIRE_LOG_LEVEL)
121+
@ClientCacheApplication
126122
@EnableGemFireHttpSession(
127123
clientRegionShortcut = ClientRegionShortcut.LOCAL,
128124
poolName = "DEFAULT",

0 commit comments

Comments
 (0)