3737import java .util .concurrent .ThreadFactory ;
3838import java .util .concurrent .ThreadLocalRandom ;
3939import java .util .concurrent .TimeUnit ;
40- import java .util .concurrent .atomic .AtomicBoolean ;
40+ import java .util .concurrent .atomic .AtomicLong ;
4141
4242import org .neo4j .driver .v1 .AccessMode ;
4343import org .neo4j .driver .v1 .AuthToken ;
@@ -89,7 +89,7 @@ public void setUp() throws Exception
8989 {
9090 URI clusterUri = clusterRule .getClusterUri ();
9191 AuthToken authToken = clusterRule .getAuthToken ();
92- Config config = Config .build ().withLogging ( DEV_NULL_LOGGING ).toConfig ();
92+ Config config = Config .build ().withLogging ( DEV_NULL_LOGGING ).withMaxIdleSessions ( THREAD_COUNT ). toConfig ();
9393 driver = GraphDatabase .driver ( clusterUri , authToken , config );
9494
9595 ThreadFactory threadFactory = new DaemonThreadFactory ( getClass ().getSimpleName () + "-worker-" );
@@ -109,11 +109,11 @@ public void tearDown() throws Exception
109109 @ Test
110110 public void basicStressTest () throws Throwable
111111 {
112- AtomicBoolean stop = new AtomicBoolean ();
113- List <Future <?>> resultFutures = launchWorkerThreads ( stop );
112+ Context context = new Context ();
113+ List <Future <?>> resultFutures = launchWorkerThreads ( context );
114114
115115 long openFileDescriptors = sleepAndGetOpenFileDescriptorCount ();
116- stop . set ( true );
116+ context . stop ( );
117117
118118 Throwable firstError = null ;
119119 for ( Future <?> future : resultFutures )
@@ -134,16 +134,17 @@ public void basicStressTest() throws Throwable
134134 }
135135
136136 assertNoFileDescriptorLeak ( openFileDescriptors );
137+ assertExpectedNumberOfNodesCreated ( context .getCreatedNodesCount () );
137138 }
138139
139- private List <Future <?>> launchWorkerThreads ( AtomicBoolean stop )
140+ private List <Future <?>> launchWorkerThreads ( Context context )
140141 {
141142 List <Command > commands = createCommands ();
142143 List <Future <?>> futures = new ArrayList <>();
143144
144145 for ( int i = 0 ; i < THREAD_COUNT ; i ++ )
145146 {
146- Future <Void > future = launchWorkerThread ( executor , commands , stop );
147+ Future <Void > future = launchWorkerThread ( executor , commands , context );
147148 futures .add ( future );
148149 }
149150
@@ -155,9 +156,11 @@ private List<Command> createCommands()
155156 List <Command > commands = new ArrayList <>();
156157
157158 commands .add ( new ReadQuery ( driver ) );
158- commands .add ( new ReadQueryInTx ( driver ) );
159+ commands .add ( new ReadQueryInTx ( driver , false ) );
160+ commands .add ( new ReadQueryInTx ( driver , true ) );
159161 commands .add ( new WriteQuery ( driver ) );
160- commands .add ( new WriteQueryInTx ( driver ) );
162+ commands .add ( new WriteQueryInTx ( driver , false ) );
163+ commands .add ( new WriteQueryInTx ( driver , true ) );
161164 commands .add ( new WriteQueryUsingReadSession ( driver ) );
162165 commands .add ( new WriteQueryUsingReadSessionInTx ( driver ) );
163166 commands .add ( new FailedAuth ( clusterRule .getClusterUri () ) );
@@ -166,18 +169,20 @@ private List<Command> createCommands()
166169 }
167170
168171 private static Future <Void > launchWorkerThread ( final ExecutorService executor , final List <Command > commands ,
169- final AtomicBoolean stop )
172+ final Context context )
170173 {
171174 return executor .submit ( new Callable <Void >()
172175 {
176+ final ThreadLocalRandom random = ThreadLocalRandom .current ();
177+
173178 @ Override
174179 public Void call () throws Exception
175180 {
176- while ( !stop . get () )
181+ while ( !context . isStopped () )
177182 {
178- int randomCommandIdx = ThreadLocalRandom . current () .nextInt ( commands .size () );
183+ int randomCommandIdx = random .nextInt ( commands .size () );
179184 Command command = commands .get ( randomCommandIdx );
180- command .execute ();
185+ command .execute ( context );
181186 }
182187 return null ;
183188 }
@@ -202,6 +207,18 @@ private void assertNoFileDescriptorLeak( long previousOpenFileDescriptors )
202207 currentOpenFileDescriptorCount , lessThanOrEqualTo ( maxOpenFileDescriptors ) );
203208 }
204209
210+ private void assertExpectedNumberOfNodesCreated ( long expectedCount )
211+ {
212+ try ( Session session = driver .session () )
213+ {
214+ List <Record > records = session .run ( "MATCH (n) RETURN count(n) AS nodesCount" ).list ();
215+ assertEquals ( 1 , records .size () );
216+ Record record = records .get ( 0 );
217+ long actualCount = record .get ( "nodesCount" ).asLong ();
218+ assertEquals ( "Unexpected number of nodes in the database" , expectedCount , actualCount );
219+ }
220+ }
221+
205222 private static long getOpenFileDescriptorCount ()
206223 {
207224 try
@@ -227,9 +244,46 @@ private static Throwable withSuppressed( Throwable firstError, Throwable newErro
227244 return firstError ;
228245 }
229246
247+ private static class Context
248+ {
249+ volatile boolean stopped ;
250+ volatile String bookmark ;
251+ final AtomicLong createdNodesCount = new AtomicLong ();
252+
253+ boolean isStopped ()
254+ {
255+ return stopped ;
256+ }
257+
258+ void stop ()
259+ {
260+ this .stopped = true ;
261+ }
262+
263+ String getBookmark ()
264+ {
265+ return bookmark ;
266+ }
267+
268+ void setBookmark ( String bookmark )
269+ {
270+ this .bookmark = bookmark ;
271+ }
272+
273+ void nodeCreated ()
274+ {
275+ createdNodesCount .incrementAndGet ();
276+ }
277+
278+ long getCreatedNodesCount ()
279+ {
280+ return createdNodesCount .get ();
281+ }
282+ }
283+
230284 private interface Command
231285 {
232- void execute ();
286+ void execute ( Context context );
233287 }
234288
235289 private static abstract class BaseQuery implements Command
@@ -240,6 +294,19 @@ private static abstract class BaseQuery implements Command
240294 {
241295 this .driver = driver ;
242296 }
297+
298+ Transaction beginTx ( Session session , Context context , boolean useBookmark )
299+ {
300+ if ( useBookmark )
301+ {
302+ String bookmark = context .getBookmark ();
303+ if ( bookmark != null )
304+ {
305+ return session .beginTransaction ( bookmark );
306+ }
307+ }
308+ return session .beginTransaction ();
309+ }
243310 }
244311
245312 private static class ReadQuery extends BaseQuery
@@ -250,7 +317,7 @@ private static class ReadQuery extends BaseQuery
250317 }
251318
252319 @ Override
253- public void execute ()
320+ public void execute ( Context context )
254321 {
255322 try ( Session session = driver .session ( AccessMode .READ ) )
256323 {
@@ -268,16 +335,19 @@ public void execute()
268335
269336 private static class ReadQueryInTx extends BaseQuery
270337 {
271- ReadQueryInTx ( Driver driver )
338+ final boolean useBookmark ;
339+
340+ ReadQueryInTx ( Driver driver , boolean useBookmark )
272341 {
273342 super ( driver );
343+ this .useBookmark = useBookmark ;
274344 }
275345
276346 @ Override
277- public void execute ()
347+ public void execute ( Context context )
278348 {
279349 try ( Session session = driver .session ( AccessMode .READ );
280- Transaction tx = session . beginTransaction ( ) )
350+ Transaction tx = beginTx ( session , context , useBookmark ) )
281351 {
282352 StatementResult result = tx .run ( "MATCH (n) RETURN n LIMIT 1" );
283353 List <Record > records = result .list ();
@@ -287,6 +357,7 @@ public void execute()
287357 Node node = record .get ( 0 ).asNode ();
288358 assertNotNull ( node );
289359 }
360+ tx .success ();
290361 }
291362 }
292363 }
@@ -299,34 +370,44 @@ private static class WriteQuery extends BaseQuery
299370 }
300371
301372 @ Override
302- public void execute ()
373+ public void execute ( Context context )
303374 {
304375 StatementResult result ;
305376 try ( Session session = driver .session ( AccessMode .WRITE ) )
306377 {
307378 result = session .run ( "CREATE ()" );
308379 }
309380 assertEquals ( 1 , result .summary ().counters ().nodesCreated () );
381+ context .nodeCreated ();
310382 }
311383 }
312384
313385 private static class WriteQueryInTx extends BaseQuery
314386 {
315- WriteQueryInTx ( Driver driver )
387+ final boolean useBookmark ;
388+
389+ WriteQueryInTx ( Driver driver , boolean useBookmark )
316390 {
317391 super ( driver );
392+ this .useBookmark = useBookmark ;
318393 }
319394
320395 @ Override
321- public void execute ()
396+ public void execute ( Context context )
322397 {
323398 StatementResult result ;
324- try ( Session session = driver .session ( AccessMode .WRITE );
325- Transaction tx = session .beginTransaction () )
399+ try ( Session session = driver .session ( AccessMode .WRITE ) )
326400 {
327- result = tx .run ( "CREATE ()" );
401+ try ( Transaction tx = beginTx ( session , context , useBookmark ) )
402+ {
403+ result = tx .run ( "CREATE ()" );
404+ tx .success ();
405+ }
406+
407+ context .setBookmark ( session .lastBookmark () );
328408 }
329409 assertEquals ( 1 , result .summary ().counters ().nodesCreated () );
410+ context .nodeCreated ();
330411 }
331412 }
332413
@@ -338,7 +419,7 @@ private static class WriteQueryUsingReadSession extends BaseQuery
338419 }
339420
340421 @ Override
341- public void execute ()
422+ public void execute ( Context context )
342423 {
343424 StatementResult result = null ;
344425 try
@@ -366,7 +447,7 @@ private static class WriteQueryUsingReadSessionInTx extends BaseQuery
366447 }
367448
368449 @ Override
369- public void execute ()
450+ public void execute ( Context context )
370451 {
371452 StatementResult result = null ;
372453 try
@@ -375,6 +456,7 @@ public void execute()
375456 Transaction tx = session .beginTransaction () )
376457 {
377458 result = tx .run ( "CREATE ()" );
459+ tx .success ();
378460 }
379461 fail ( "Exception expected" );
380462 }
@@ -397,7 +479,7 @@ private static class FailedAuth implements Command
397479 }
398480
399481 @ Override
400- public void execute ()
482+ public void execute ( Context context )
401483 {
402484 Logger logger = mock ( Logger .class );
403485 Logging logging = mock ( Logging .class );
0 commit comments