@@ -10,7 +10,7 @@ final class PSQLRowStreamTests: XCTestCase {
1010 let logger = Logger ( label: " PSQLRowStreamTests " )
1111 let eventLoop = EmbeddedEventLoop ( )
1212
13- func testEmptyStream ( ) {
13+ func testEmptyStreamAndDrainDoesNotThrowErrorAfterConsumption ( ) {
1414 let stream = PSQLRowStream (
1515 source: . noRows( . success( . tag( " INSERT 0 1 " ) ) ) ,
1616 eventLoop: self . eventLoop,
@@ -20,10 +20,9 @@ final class PSQLRowStreamTests: XCTestCase {
2020 XCTAssertEqual ( try stream. all ( ) . wait ( ) , [ ] )
2121 XCTAssertEqual ( stream. commandTag, " INSERT 0 1 " )
2222
23- // Test 'drain' works in this case
2423 XCTAssertNoThrow ( try stream. drain ( ) . wait ( ) )
2524 }
26-
25+
2726 func testFailedStream( ) {
2827 let stream = PSQLRowStream (
2928 source: . noRows( . failure( PSQLError . serverClosedConnection ( underlying: nil ) ) ) ,
@@ -84,37 +83,37 @@ final class PSQLRowStreamTests: XCTestCase {
8483 )
8584 XCTAssertEqual ( dataSource. hitDemand, 0 )
8685 XCTAssertEqual ( dataSource. hitCancel, 0 )
87-
86+
8887 stream. receive ( [
8988 [ ByteBuffer ( string: " 0 " ) ] ,
9089 [ ByteBuffer ( string: " 1 " ) ]
9190 ] )
92-
91+
9392 XCTAssertEqual ( dataSource. hitDemand, 0 , " Before we have a consumer demand is not signaled " )
94-
93+
9594 // attach consumer
9695 let future = stream. all ( )
9796 XCTAssertEqual ( dataSource. hitDemand, 1 )
98-
97+
9998 stream. receive ( [
10099 [ ByteBuffer ( string: " 2 " ) ] ,
101100 [ ByteBuffer ( string: " 3 " ) ]
102101 ] )
103102 XCTAssertEqual ( dataSource. hitDemand, 2 )
104-
103+
105104 stream. receive ( [
106105 [ ByteBuffer ( string: " 4 " ) ] ,
107106 [ ByteBuffer ( string: " 5 " ) ]
108107 ] )
109108 XCTAssertEqual ( dataSource. hitDemand, 3 )
110-
109+
111110 stream. receive ( completion: . success( " SELECT 2 " ) )
112-
111+
113112 var rows : [ PostgresRow ] ?
114113 XCTAssertNoThrow ( rows = try future. wait ( ) )
115114 XCTAssertEqual ( rows? . count, 6 )
116115 }
117-
116+
118117 func testOnRowAfterStreamHasFinished( ) {
119118 let dataSource = CountingDataSource ( )
120119 let stream = PSQLRowStream (
@@ -240,6 +239,84 @@ final class PSQLRowStreamTests: XCTestCase {
240239 XCTAssertEqual ( stream. commandTag, " SELECT 6 " )
241240 }
242241
242+ func testEmptyStreamDrainsSuccessfully( ) {
243+ let stream = PSQLRowStream (
244+ source: . noRows( . success( . tag( " INSERT 0 1 " ) ) ) ,
245+ eventLoop: self . eventLoop,
246+ logger: self . logger
247+ )
248+
249+ XCTAssertNoThrow ( try stream. drain ( ) . wait ( ) )
250+ XCTAssertEqual ( stream. commandTag, " INSERT 0 1 " )
251+ }
252+
253+ func testDrainAfterStreamHasFinished( ) {
254+ let dataSource = CountingDataSource ( )
255+ let stream = PSQLRowStream (
256+ source: . stream(
257+ [ self . makeColumnDescription ( name: " foo " , dataType: . text, format: . binary) ] ,
258+ dataSource
259+ ) ,
260+ eventLoop: self . eventLoop,
261+ logger: self . logger
262+ )
263+ XCTAssertEqual ( dataSource. hitDemand, 0 )
264+ XCTAssertEqual ( dataSource. hitCancel, 0 )
265+
266+ stream. receive ( [
267+ [ ByteBuffer ( string: " 0 " ) ] ,
268+ [ ByteBuffer ( string: " 1 " ) ]
269+ ] )
270+
271+ XCTAssertEqual ( dataSource. hitDemand, 0 , " Before we have a consumer demand is not signaled " )
272+ stream. receive ( completion: . success( " SELECT 2 " ) )
273+
274+ // attach consumer
275+ XCTAssertNoThrow ( try stream. drain ( ) . wait ( ) )
276+ XCTAssertEqual ( dataSource. hitDemand, 0 ) // TODO: Is this right?
277+ }
278+
279+ func testDrainBeforeStreamHasFinished( ) {
280+ let dataSource = CountingDataSource ( )
281+ let stream = PSQLRowStream (
282+ source: . stream(
283+ [ self . makeColumnDescription ( name: " foo " , dataType: . text, format: . binary) ] ,
284+ dataSource
285+ ) ,
286+ eventLoop: self . eventLoop,
287+ logger: self . logger
288+ )
289+ XCTAssertEqual ( dataSource. hitDemand, 0 )
290+ XCTAssertEqual ( dataSource. hitCancel, 0 )
291+
292+ stream. receive ( [
293+ [ ByteBuffer ( string: " 0 " ) ] ,
294+ [ ByteBuffer ( string: " 1 " ) ]
295+ ] )
296+
297+ XCTAssertEqual ( dataSource. hitDemand, 0 , " Before we have a consumer demand is not signaled " )
298+
299+ // attach consumer
300+ let future = stream. drain ( )
301+ XCTAssertEqual ( dataSource. hitDemand, 1 )
302+
303+ stream. receive ( [
304+ [ ByteBuffer ( string: " 2 " ) ] ,
305+ [ ByteBuffer ( string: " 3 " ) ]
306+ ] )
307+ XCTAssertEqual ( dataSource. hitDemand, 2 )
308+
309+ stream. receive ( [
310+ [ ByteBuffer ( string: " 4 " ) ] ,
311+ [ ByteBuffer ( string: " 5 " ) ]
312+ ] )
313+ XCTAssertEqual ( dataSource. hitDemand, 3 )
314+
315+ stream. receive ( completion: . success( " SELECT 2 " ) )
316+
317+ XCTAssertNoThrow ( try future. wait ( ) )
318+ }
319+
243320 func makeColumnDescription( name: String , dataType: PostgresDataType , format: PostgresFormat ) -> RowDescription . Column {
244321 RowDescription . Column (
245322 name: " test " ,
0 commit comments