Skip to content

Commit f078745

Browse files
dimitrislifmbenhassine
authored andcommitted
Fix thread usage in SynchronizedItemStreamReaderTests
along with its builder test class SynchronizedItemStreamReaderBuilderTests Resolves #837
1 parent 0bfe31e commit f078745

File tree

3 files changed

+108
-227
lines changed

3 files changed

+108
-227
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Copyright 2018-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.batch.item.support;
17+
18+
import org.junit.jupiter.api.BeforeEach;
19+
import org.junit.jupiter.api.Test;
20+
import org.junit.jupiter.api.extension.ExtendWith;
21+
import org.mockito.Mock;
22+
import org.mockito.junit.jupiter.MockitoExtension;
23+
24+
import org.springframework.batch.item.ExecutionContext;
25+
import org.springframework.batch.item.ItemStreamReader;
26+
27+
import static org.mockito.Mockito.verify;
28+
29+
/**
30+
* Common parent class for {@link SynchronizedItemStreamReaderTests} and
31+
* {@link org.springframework.batch.item.support.builder.SynchronizedItemStreamReaderBuilderTests}
32+
*
33+
* @author Dimitrios Liapis
34+
* @author Mahmoud Ben Hassine
35+
*
36+
*/
37+
@ExtendWith(MockitoExtension.class)
38+
public abstract class AbstractSynchronizedItemStreamReaderTests {
39+
40+
@Mock
41+
protected ItemStreamReader<Object> delegate;
42+
43+
private SynchronizedItemStreamReader<Object> synchronizedItemStreamReader;
44+
45+
private final ExecutionContext testExecutionContext = new ExecutionContext();
46+
47+
abstract protected SynchronizedItemStreamReader<Object> createNewSynchronizedItemStreamReader();
48+
49+
@BeforeEach
50+
void init() {
51+
this.synchronizedItemStreamReader = createNewSynchronizedItemStreamReader();
52+
}
53+
54+
@Test
55+
void testDelegateReadIsCalled() throws Exception {
56+
this.synchronizedItemStreamReader.read();
57+
verify(this.delegate).read();
58+
}
59+
60+
@Test
61+
void testDelegateOpenIsCalled() {
62+
this.synchronizedItemStreamReader.open(this.testExecutionContext);
63+
verify(this.delegate).open(this.testExecutionContext);
64+
}
65+
66+
@Test
67+
void testDelegateUpdateIsCalled() {
68+
this.synchronizedItemStreamReader.update(this.testExecutionContext);
69+
verify(this.delegate).update(this.testExecutionContext);
70+
}
71+
72+
@Test
73+
void testDelegateCloseIsClosed() {
74+
this.synchronizedItemStreamReader.close();
75+
verify(this.delegate).close();
76+
}
77+
78+
}

spring-batch-infrastructure/src/test/java/org/springframework/batch/item/support/SynchronizedItemStreamReaderTests.java

Lines changed: 15 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -15,131 +15,33 @@
1515
*/
1616
package org.springframework.batch.item.support;
1717

18-
import static org.junit.jupiter.api.Assertions.assertEquals;
19-
import static org.junit.jupiter.api.Assertions.assertFalse;
20-
import static org.junit.jupiter.api.Assertions.assertTrue;
21-
22-
import java.util.HashSet;
23-
import java.util.Set;
24-
2518
import org.junit.jupiter.api.Test;
2619

27-
import org.springframework.batch.item.ExecutionContext;
28-
import org.springframework.batch.item.ItemStreamReader;
29-
import org.springframework.batch.item.NonTransientResourceException;
30-
import org.springframework.batch.item.ParseException;
31-
import org.springframework.lang.Nullable;
20+
import org.springframework.beans.factory.InitializingBean;
21+
22+
import static org.junit.jupiter.api.Assertions.assertEquals;
23+
import static org.junit.jupiter.api.Assertions.assertThrows;
3224

3325
/**
3426
* @author Matthew Ouyang
3527
* @author Mahmoud Ben Hassine
28+
* @author Dimitrios Liapis
3629
*
3730
*/
38-
class SynchronizedItemStreamReaderTests {
39-
40-
/**
41-
* A simple class used to test the SynchronizedItemStreamReader. It simply returns the
42-
* number of times the read method has been called, manages some state variables and
43-
* updates an ExecutionContext.
44-
*
45-
* @author Matthew Ouyang
46-
*
47-
*/
48-
private class TestItemReader extends AbstractItemStreamItemReader<Integer> implements ItemStreamReader<Integer> {
49-
50-
private int cursor = 0;
51-
52-
private boolean isClosed = false;
53-
54-
public static final String HAS_BEEN_OPENED = "hasBeenOpened";
55-
56-
public static final String UPDATE_COUNT_KEY = "updateCount";
57-
58-
@Nullable
59-
public Integer read() throws Exception, ParseException, NonTransientResourceException {
60-
cursor = cursor + 1;
61-
return cursor;
62-
}
63-
64-
public void close() {
65-
this.isClosed = true;
66-
}
67-
68-
public void open(ExecutionContext executionContext) {
69-
this.isClosed = false;
70-
executionContext.put(HAS_BEEN_OPENED, true);
71-
executionContext.remove(UPDATE_COUNT_KEY);
72-
}
73-
74-
public void update(ExecutionContext executionContext) {
75-
76-
if (!executionContext.containsKey(UPDATE_COUNT_KEY)) {
77-
executionContext.putInt(UPDATE_COUNT_KEY, 0);
78-
}
79-
80-
executionContext.putInt(UPDATE_COUNT_KEY, executionContext.getInt(UPDATE_COUNT_KEY) + 1);
81-
}
82-
83-
public boolean isClosed() {
84-
return this.isClosed;
85-
}
31+
public class SynchronizedItemStreamReaderTests extends AbstractSynchronizedItemStreamReaderTests {
8632

33+
@Override
34+
protected SynchronizedItemStreamReader<Object> createNewSynchronizedItemStreamReader() {
35+
SynchronizedItemStreamReader<Object> synchronizedItemStreamReader = new SynchronizedItemStreamReader<>();
36+
synchronizedItemStreamReader.setDelegate(delegate);
37+
return synchronizedItemStreamReader;
8738
}
8839

8940
@Test
90-
void testMultipleThreads() throws Exception {
91-
92-
// Initialized an ExecutionContext and a SynchronizedItemStreamReader to test.
93-
final ExecutionContext executionContext = new ExecutionContext();
94-
95-
final TestItemReader testItemReader = new TestItemReader();
96-
final SynchronizedItemStreamReader<Integer> synchronizedItemStreamReader = new SynchronizedItemStreamReader<>();
97-
synchronizedItemStreamReader.setDelegate(testItemReader);
98-
99-
// Open the ItemReader and make sure it's initialized properly.
100-
synchronizedItemStreamReader.open(executionContext);
101-
assertEquals(true, executionContext.get(TestItemReader.HAS_BEEN_OPENED));
102-
assertFalse(testItemReader.isClosed());
103-
104-
/*
105-
* Set up SIZE threads that read from the reader and updates the execution
106-
* context.
107-
*/
108-
final Set<Integer> ecSet = new HashSet<>();
109-
final int SIZE = 20;
110-
Thread[] threads = new Thread[SIZE];
111-
for (int i = 0; i < SIZE; i++) {
112-
threads[i] = new Thread() {
113-
public void run() {
114-
try {
115-
ecSet.add(synchronizedItemStreamReader.read());
116-
synchronizedItemStreamReader.update(executionContext);
117-
}
118-
catch (Exception ignore) {
119-
}
120-
}
121-
};
122-
}
123-
124-
// Start the threads and block until all threads are done.
125-
for (Thread thread : threads) {
126-
thread.run();
127-
}
128-
for (Thread thread : threads) {
129-
thread.join();
130-
}
131-
testItemReader.close();
132-
133-
/*
134-
* Ensure cleanup happens as expected: status variable is set correctly and
135-
* ExecutionContext variable is set properly. Lastly, the Set<Integer> should have
136-
* 1 to 20 which may not always be the case if the read is not synchronized.
137-
*/
138-
for (int i = 1; i <= SIZE; i++) {
139-
assertTrue(ecSet.contains(i));
140-
}
141-
assertTrue(testItemReader.isClosed());
142-
assertEquals(SIZE, executionContext.getInt(TestItemReader.UPDATE_COUNT_KEY));
41+
void testDelegateIsNotNullWhenPropertiesSet() {
42+
final Exception expectedException = assertThrows(IllegalStateException.class,
43+
() -> ((InitializingBean) new SynchronizedItemStreamReader<>()).afterPropertiesSet());
44+
assertEquals("A delegate item reader is required", expectedException.getMessage());
14345
}
14446

14547
}

spring-batch-infrastructure/src/test/java/org/springframework/batch/item/support/builder/SynchronizedItemStreamReaderBuilderTests.java

Lines changed: 15 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -16,135 +16,36 @@
1616

1717
package org.springframework.batch.item.support.builder;
1818

19-
import java.util.HashSet;
20-
import java.util.Set;
21-
2219
import org.junit.jupiter.api.Test;
2320

24-
import org.springframework.batch.item.ExecutionContext;
25-
import org.springframework.batch.item.ItemStreamReader;
26-
import org.springframework.batch.item.NonTransientResourceException;
27-
import org.springframework.batch.item.ParseException;
28-
import org.springframework.batch.item.support.AbstractItemStreamItemReader;
21+
import org.springframework.batch.item.support.AbstractSynchronizedItemStreamReaderTests;
2922
import org.springframework.batch.item.support.SynchronizedItemStreamReader;
30-
import org.springframework.lang.Nullable;
3123

3224
import static org.junit.jupiter.api.Assertions.assertEquals;
33-
import static org.junit.jupiter.api.Assertions.assertFalse;
34-
import static org.junit.jupiter.api.Assertions.assertTrue;
25+
import static org.junit.jupiter.api.Assertions.assertThrows;
3526

3627
/**
3728
* @author Glenn Renfro
3829
* @author Mahmoud Ben Hassine
30+
* @author Dimitrios Liapis
3931
*/
40-
class SynchronizedItemStreamReaderBuilderTests {
41-
42-
@Test
43-
void testMultipleThreads() throws Exception {
44-
45-
// Initialized an ExecutionContext and a SynchronizedItemStreamReader to test.
46-
final ExecutionContext executionContext = new ExecutionContext();
47-
48-
final SynchronizedItemStreamReaderBuilderTests.TestItemReader testItemReader = new SynchronizedItemStreamReaderBuilderTests.TestItemReader();
49-
final SynchronizedItemStreamReader<Integer> synchronizedItemStreamReader = new SynchronizedItemStreamReaderBuilder<Integer>()
50-
.delegate(testItemReader)
51-
.build();
32+
public class SynchronizedItemStreamReaderBuilderTests extends AbstractSynchronizedItemStreamReaderTests {
5233

53-
// Open the ItemReader and make sure it's initialized properly.
54-
synchronizedItemStreamReader.open(executionContext);
55-
assertEquals(true,
56-
executionContext.get(SynchronizedItemStreamReaderBuilderTests.TestItemReader.HAS_BEEN_OPENED));
57-
assertFalse(testItemReader.isClosed());
58-
59-
/*
60-
* Set up SIZE threads that read from the reader and updates the execution
61-
* context.
62-
*/
63-
final Set<Integer> ecSet = new HashSet<>();
64-
final int SIZE = 20;
65-
Thread[] threads = new Thread[SIZE];
66-
for (int i = 0; i < SIZE; i++) {
67-
threads[i] = new Thread() {
68-
public void run() {
69-
try {
70-
ecSet.add(synchronizedItemStreamReader.read());
71-
synchronizedItemStreamReader.update(executionContext);
72-
}
73-
catch (Exception ignore) {
74-
}
75-
}
76-
};
77-
}
78-
79-
// Start the threads and block until all threads are done.
80-
for (Thread thread : threads) {
81-
thread.run();
82-
}
83-
for (Thread thread : threads) {
84-
thread.join();
85-
}
86-
testItemReader.close();
87-
88-
/*
89-
* Ensure cleanup happens as expected: status variable is set correctly and
90-
* ExecutionContext variable is set properly. Lastly, the Set<Integer> should have
91-
* 1 to 20 which may not always be the case if the read is not synchronized.
92-
*/
93-
for (int i = 1; i <= SIZE; i++) {
94-
assertTrue(ecSet.contains(i));
95-
}
96-
assertTrue(testItemReader.isClosed());
97-
assertEquals(SIZE,
98-
executionContext.getInt(SynchronizedItemStreamReaderBuilderTests.TestItemReader.UPDATE_COUNT_KEY));
34+
@Override
35+
protected SynchronizedItemStreamReader<Object> createNewSynchronizedItemStreamReader() {
36+
return new SynchronizedItemStreamReaderBuilder<>().delegate(delegate).build();
9937
}
10038

101-
/**
102-
* A simple class used to test the SynchronizedItemStreamReader. It simply returns the
103-
* number of times the read method has been called, manages some state variables and
104-
* updates an ExecutionContext.
105-
*
106-
* @author Matthew Ouyang
107-
*
108-
*/
109-
private class TestItemReader extends AbstractItemStreamItemReader<Integer> implements ItemStreamReader<Integer> {
110-
111-
private int cursor = 0;
112-
113-
private boolean isClosed = false;
114-
115-
public static final String HAS_BEEN_OPENED = "hasBeenOpened";
116-
117-
public static final String UPDATE_COUNT_KEY = "updateCount";
118-
119-
@Nullable
120-
public Integer read() throws Exception, ParseException, NonTransientResourceException {
121-
cursor = cursor + 1;
122-
return cursor;
123-
}
124-
125-
public void close() {
126-
this.isClosed = true;
127-
}
128-
129-
public void open(ExecutionContext executionContext) {
130-
this.isClosed = false;
131-
executionContext.put(HAS_BEEN_OPENED, true);
132-
executionContext.remove(UPDATE_COUNT_KEY);
133-
}
134-
135-
public void update(ExecutionContext executionContext) {
136-
137-
if (!executionContext.containsKey(UPDATE_COUNT_KEY)) {
138-
executionContext.putInt(UPDATE_COUNT_KEY, 0);
139-
}
140-
141-
executionContext.putInt(UPDATE_COUNT_KEY, executionContext.getInt(UPDATE_COUNT_KEY) + 1);
142-
}
39+
@Test
40+
void testBuilderDelegateIsNotNull() {
41+
// given
42+
final SynchronizedItemStreamReaderBuilder<Object> builder = new SynchronizedItemStreamReaderBuilder<>();
14343

144-
public boolean isClosed() {
145-
return this.isClosed;
146-
}
44+
// when
45+
final Exception expectedException = assertThrows(IllegalArgumentException.class, builder::build);
14746

47+
// then
48+
assertEquals("A delegate is required", expectedException.getMessage());
14849
}
14950

15051
}

0 commit comments

Comments
 (0)