1212//
1313//===----------------------------------------------------------------------===//
1414
15+ import NIOCore
1516import Testing
1617
1718@testable import AWSLambdaRuntime
@@ -165,7 +166,7 @@ struct PoolTests {
165166 let pool = LambdaHTTPServer . Pool < String > ( )
166167
167168 // Create two tasks that will both wait for elements to be available
168- await #expect( throws: LambdaHTTPServer . Pool < Swift . String > . PoolError. self) {
169+ let error = await #expect( throws: LambdaHTTPServer . Pool < String > . PoolError. self) {
169170 try await withThrowingTaskGroup ( of: Void . self) { group in
170171
171172 // one of the two task will throw a PoolError
@@ -184,6 +185,329 @@ struct PoolTests {
184185 try await group. waitForAll ( )
185186 }
186187 }
188+
189+ // Verify it's the correct error cause
190+ if case . nextCalledTwice = error? . cause {
191+ // This is the expected error
192+ } else {
193+ Issue . record ( " Expected nextCalledTwice error, got: \( String ( describing: error? . cause) ) " )
194+ }
195+ }
196+
197+ // MARK: - Invariant Tests for RequestId-specific functionality
198+
199+ @Test
200+ @available ( LambdaSwift 2 . 0 , * )
201+ func testRequestIdSpecificNext( ) async throws {
202+ let pool = LambdaHTTPServer . Pool < LambdaHTTPServer . LocalServerResponse > ( )
203+
204+ // Push responses with different requestIds
205+ pool. push ( LambdaHTTPServer . LocalServerResponse ( id: " req1 " , body: ByteBuffer ( string: " data1 " ) ) )
206+ pool. push ( LambdaHTTPServer . LocalServerResponse ( id: " req2 " , body: ByteBuffer ( string: " data2 " ) ) )
207+ pool. push ( LambdaHTTPServer . LocalServerResponse ( id: " req1 " , body: ByteBuffer ( string: " data3 " ) ) )
208+
209+ // Get specific responses
210+ let response1 = try await pool. next ( for: " req1 " )
211+ #expect( response1. requestId == " req1 " )
212+ #expect( String ( buffer: response1. body!) == " data1 " )
213+
214+ let response2 = try await pool. next ( for: " req2 " )
215+ #expect( response2. requestId == " req2 " )
216+ #expect( String ( buffer: response2. body!) == " data2 " )
217+
218+ let response3 = try await pool. next ( for: " req1 " )
219+ #expect( response3. requestId == " req1 " )
220+ #expect( String ( buffer: response3. body!) == " data3 " )
221+ }
222+
223+ @Test
224+ @available ( LambdaSwift 2 . 0 , * )
225+ func testStreamingResponsesWithSameRequestId( ) async throws {
226+ let pool = LambdaHTTPServer . Pool < LambdaHTTPServer . LocalServerResponse > ( )
227+ let requestId = " streaming-req "
228+
229+ let chunks = try await withThrowingTaskGroup ( of: [ String ] . self) { group in
230+ // Start consumer task
231+ group. addTask {
232+ var chunks : [ String ] = [ ]
233+ var isComplete = false
234+
235+ while !isComplete {
236+ let response = try await pool. next ( for: requestId)
237+ if let body = response. body {
238+ chunks. append ( String ( buffer: body) )
239+ }
240+ if response. final {
241+ isComplete = true
242+ }
243+ }
244+ return chunks
245+ }
246+
247+ // Start producer task
248+ group. addTask {
249+ // Give consumer time to start waiting
250+ try await Task . sleep ( nanoseconds: 10_000_000 ) // 0.01 seconds
251+
252+ // Push multiple chunks for the same requestId
253+ pool. push (
254+ LambdaHTTPServer . LocalServerResponse (
255+ id: requestId,
256+ body: ByteBuffer ( string: " chunk1 " ) ,
257+ final: false
258+ )
259+ )
260+ pool. push (
261+ LambdaHTTPServer . LocalServerResponse (
262+ id: requestId,
263+ body: ByteBuffer ( string: " chunk2 " ) ,
264+ final: false
265+ )
266+ )
267+ pool. push (
268+ LambdaHTTPServer . LocalServerResponse ( id: requestId, body: ByteBuffer ( string: " chunk3 " ) , final: true )
269+ )
270+
271+ return [ ] // Producer doesn't return chunks
272+ }
273+
274+ // Wait for consumer to complete and return its result
275+ for try await result in group {
276+ if !result. isEmpty {
277+ group. cancelAll ( )
278+ return result
279+ }
280+ }
281+ return [ ]
282+ }
283+
284+ #expect( chunks == [ " chunk1 " , " chunk2 " , " chunk3 " ] )
285+ }
286+
287+ @Test
288+ @available ( LambdaSwift 2 . 0 , * )
289+ func testMixedWaitingModesError( ) async throws {
290+ let pool = LambdaHTTPServer . Pool < LambdaHTTPServer . LocalServerResponse > ( )
291+
292+ let error = await #expect( throws: LambdaHTTPServer . Pool < LambdaHTTPServer . LocalServerResponse > . PoolError. self) {
293+ try await withThrowingTaskGroup ( of: Void . self) { group in
294+ // Start a FIFO consumer
295+ group. addTask {
296+ for try await _ in pool {
297+ // This should block waiting for any item
298+ }
299+ }
300+
301+ // Start a requestId-specific consumer after a delay
302+ group. addTask {
303+ // Give FIFO task time to start waiting
304+ try await Task . sleep ( nanoseconds: 10_000_000 ) // 0.01 seconds
305+
306+ // Try to use requestId-specific next - should fail with mixedWaitingModes
307+ _ = try await pool. next ( for: " req1 " )
308+ }
309+
310+ // Wait for the first task to complete (which should be the error)
311+ try await group. next ( )
312+ group. cancelAll ( )
313+ }
314+ }
315+
316+ // Verify it's the correct error cause
317+ if case . mixedWaitingModes = error? . cause {
318+ // This is the expected error
319+ } else {
320+ Issue . record ( " Expected mixedWaitingModes error, got: \( String ( describing: error? . cause) ) " )
321+ }
322+ }
323+
324+ @Test
325+ @available ( LambdaSwift 2 . 0 , * )
326+ func testMixedWaitingModesErrorReverse( ) async throws {
327+ let pool = LambdaHTTPServer . Pool < LambdaHTTPServer . LocalServerResponse > ( )
328+
329+ let error = await #expect( throws: LambdaHTTPServer . Pool < LambdaHTTPServer . LocalServerResponse > . PoolError. self) {
330+ try await withThrowingTaskGroup ( of: Void . self) { group in
331+ // Start a requestId-specific consumer
332+ group. addTask {
333+ _ = try await pool. next ( for: " req1 " )
334+ }
335+
336+ // Start a FIFO consumer after a delay
337+ group. addTask {
338+ // Give specific task time to start waiting
339+ try await Task . sleep ( nanoseconds: 10_000_000 ) // 0.01 seconds
340+
341+ // Try to use FIFO next - should fail with mixedWaitingModes
342+ for try await _ in pool {
343+ break
344+ }
345+ }
346+
347+ // Wait for the first task to complete (which should be the error)
348+ try await group. next ( )
349+ group. cancelAll ( )
350+ }
351+ }
352+
353+ // Verify it's the correct error cause
354+ if case . mixedWaitingModes = error? . cause {
355+ // This is the expected error
356+ } else {
357+ Issue . record ( " Expected mixedWaitingModes error, got: \( String ( describing: error? . cause) ) " )
358+ }
359+ }
360+
361+ @Test
362+ @available ( LambdaSwift 2 . 0 , * )
363+ func testDuplicateRequestIdWaitError( ) async throws {
364+ let pool = LambdaHTTPServer . Pool < LambdaHTTPServer . LocalServerResponse > ( )
365+
366+ let error = await #expect( throws: LambdaHTTPServer . Pool < LambdaHTTPServer . LocalServerResponse > . PoolError. self) {
367+ try await withThrowingTaskGroup ( of: Void . self) { group in
368+ // Start first consumer waiting for specific requestId
369+ group. addTask {
370+ _ = try await pool. next ( for: " req1 " )
371+ }
372+
373+ // Start second consumer for same requestId after a delay
374+ group. addTask {
375+ // Give first task time to start waiting
376+ try await Task . sleep ( nanoseconds: 10_000_000 ) // 0.01 seconds
377+
378+ // Try to wait for the same requestId - should fail
379+ _ = try await pool. next ( for: " req1 " )
380+ }
381+
382+ // Wait for the first task to complete (which should be the error)
383+ try await group. next ( )
384+ group. cancelAll ( )
385+ }
386+ }
387+
388+ // Verify it's the correct error cause and requestId
389+ if case let . duplicateRequestIdWait( requestId) = error? . cause {
390+ #expect( requestId == " req1 " )
391+ } else {
392+ Issue . record ( " Expected duplicateRequestIdWait error, got: \( String ( describing: error? . cause) ) " )
393+ }
394+ }
395+
396+ @Test
397+ @available ( LambdaSwift 2 . 0 , * )
398+ func testConcurrentRequestIdConsumers( ) async throws {
399+ let pool = LambdaHTTPServer . Pool < LambdaHTTPServer . LocalServerResponse > ( )
400+
401+ let results = try await withThrowingTaskGroup ( of: ( String, String) . self) { group in
402+ // Start multiple consumers for different requestIds
403+ group. addTask {
404+ let response = try await pool. next ( for: " req1 " )
405+ return ( " req1 " , String ( buffer: response. body!) )
406+ }
407+
408+ group. addTask {
409+ let response = try await pool. next ( for: " req2 " )
410+ return ( " req2 " , String ( buffer: response. body!) )
411+ }
412+
413+ group. addTask {
414+ let response = try await pool. next ( for: " req3 " )
415+ return ( " req3 " , String ( buffer: response. body!) )
416+ }
417+
418+ // Start producer task
419+ group. addTask {
420+ // Give tasks time to start waiting
421+ try await Task . sleep ( nanoseconds: 10_000_000 ) // 0.01 seconds
422+
423+ // Push responses in different order
424+ pool. push ( LambdaHTTPServer . LocalServerResponse ( id: " req3 " , body: ByteBuffer ( string: " data3 " ) ) )
425+ pool. push ( LambdaHTTPServer . LocalServerResponse ( id: " req1 " , body: ByteBuffer ( string: " data1 " ) ) )
426+ pool. push ( LambdaHTTPServer . LocalServerResponse ( id: " req2 " , body: ByteBuffer ( string: " data2 " ) ) )
427+
428+ return ( " producer " , " " ) // Producer doesn't return meaningful data
429+ }
430+
431+ // Collect results from consumers
432+ var consumerResults : [ String : String ] = [ : ]
433+ for try await (requestId, data) in group {
434+ if requestId != " producer " {
435+ consumerResults [ requestId] = data
436+ }
437+ if consumerResults. count == 3 {
438+ group. cancelAll ( )
439+ break
440+ }
441+ }
442+ return consumerResults
443+ }
444+
445+ // Verify each consumer gets the correct response
446+ #expect( results [ " req1 " ] == " data1 " )
447+ #expect( results [ " req2 " ] == " data2 " )
448+ #expect( results [ " req3 " ] == " data3 " )
449+ }
450+
451+ @Test
452+ @available ( LambdaSwift 2 . 0 , * )
453+ func testCancellationCleansUpAllContinuations( ) async throws {
454+ let pool = LambdaHTTPServer . Pool < LambdaHTTPServer . LocalServerResponse > ( )
455+
456+ // Test that cancellation properly cleans up all continuations
457+ do {
458+ try await withThrowingTaskGroup ( of: Void . self) { group in
459+ // Start multiple consumers for different requestIds
460+ group. addTask {
461+ _ = try await pool. next ( for: " req1 " )
462+ }
463+
464+ group. addTask {
465+ _ = try await pool. next ( for: " req2 " )
466+ }
467+
468+ group. addTask {
469+ _ = try await pool. next ( for: " req3 " )
470+ }
471+
472+ // Give tasks time to start waiting then cancel all
473+ try await Task . sleep ( nanoseconds: 10_000_000 ) // 0.01 seconds
474+ group. cancelAll ( )
475+
476+ try await group. waitForAll ( )
477+ }
478+ } catch is CancellationError {
479+ // Expected - tasks should be cancelled
480+ }
481+
482+ // Pool should be back to clean state - verify by pushing and consuming normally
483+ pool. push ( LambdaHTTPServer . LocalServerResponse ( id: " new-req " , body: ByteBuffer ( string: " new-data " ) ) )
484+ let response = try await pool. next ( for: " new-req " )
485+ #expect( String ( buffer: response. body!) == " new-data " )
486+ }
487+
488+ @Test
489+ @available ( LambdaSwift 2 . 0 , * )
490+ func testBufferOrderingWithRequestIds( ) async throws {
491+ let pool = LambdaHTTPServer . Pool < LambdaHTTPServer . LocalServerResponse > ( )
492+
493+ // Push multiple responses for the same requestId
494+ pool. push ( LambdaHTTPServer . LocalServerResponse ( id: " req1 " , body: ByteBuffer ( string: " first " ) ) )
495+ pool. push ( LambdaHTTPServer . LocalServerResponse ( id: " req2 " , body: ByteBuffer ( string: " other " ) ) )
496+ pool. push ( LambdaHTTPServer . LocalServerResponse ( id: " req1 " , body: ByteBuffer ( string: " second " ) ) )
497+ pool. push ( LambdaHTTPServer . LocalServerResponse ( id: " req1 " , body: ByteBuffer ( string: " third " ) ) )
498+
499+ // Consume in order - should get FIFO order for the same requestId
500+ let first = try await pool. next ( for: " req1 " )
501+ #expect( String ( buffer: first. body!) == " first " )
502+
503+ let second = try await pool. next ( for: " req1 " )
504+ #expect( String ( buffer: second. body!) == " second " )
505+
506+ let other = try await pool. next ( for: " req2 " )
507+ #expect( String ( buffer: other. body!) == " other " )
508+
509+ let third = try await pool. next ( for: " req1 " )
510+ #expect( String ( buffer: third. body!) == " third " )
187511 }
188512
189513}
0 commit comments