@@ -1033,6 +1033,222 @@ final class AsyncChannelBootstrapTests: XCTestCase {
10331033 }
10341034 }
10351035
1036+ func testPipeBootstrap_callsChannelInitializer( ) async throws {
1037+ let eventLoopGroup = self . group!
1038+ let ( pipe1ReadFD, pipe1WriteFD, pipe2ReadFD, pipe2WriteFD) = self . makePipeFileDescriptors ( )
1039+ let channel : NIOAsyncChannel < ByteBuffer , ByteBuffer >
1040+ let toChannel : NIOAsyncChannel < Never , ByteBuffer >
1041+ let fromChannel : NIOAsyncChannel < ByteBuffer , Never >
1042+ let didCallChannelInitializer = NIOLockedValueBox ( 0 )
1043+
1044+ do {
1045+ channel = try await NIOPipeBootstrap ( group: eventLoopGroup)
1046+ . channelInitializer { channel in
1047+ didCallChannelInitializer. withLockedValue { $0 += 1 }
1048+ return channel. eventLoop. makeSucceededVoidFuture ( )
1049+ }
1050+ . takingOwnershipOfDescriptors (
1051+ input: pipe1ReadFD,
1052+ output: pipe2WriteFD
1053+ ) { channel in
1054+ channel. eventLoop. makeCompletedFuture {
1055+ try NIOAsyncChannel ( wrappingChannelSynchronously: channel)
1056+ }
1057+ }
1058+ } catch {
1059+ for fileDescriptor in [ pipe1ReadFD, pipe1WriteFD, pipe2ReadFD, pipe2WriteFD] {
1060+ try SystemCalls . close ( descriptor: fileDescriptor)
1061+ }
1062+ throw error
1063+ }
1064+
1065+ do {
1066+ toChannel = try await NIOPipeBootstrap ( group: eventLoopGroup)
1067+ . channelInitializer { channel in
1068+ didCallChannelInitializer. withLockedValue { $0 += 1 }
1069+ return channel. eventLoop. makeSucceededVoidFuture ( )
1070+ }
1071+ . takingOwnershipOfDescriptor (
1072+ output: pipe1WriteFD
1073+ ) { channel in
1074+ channel. eventLoop. makeCompletedFuture {
1075+ try NIOAsyncChannel ( wrappingChannelSynchronously: channel)
1076+ }
1077+ }
1078+ } catch {
1079+ for fileDescriptor in [ pipe1WriteFD, pipe2ReadFD] {
1080+ try SystemCalls . close ( descriptor: fileDescriptor)
1081+ }
1082+ throw error
1083+ }
1084+
1085+ do {
1086+ fromChannel = try await NIOPipeBootstrap ( group: eventLoopGroup)
1087+ . channelInitializer { channel in
1088+ didCallChannelInitializer. withLockedValue { $0 += 1 }
1089+ return channel. eventLoop. makeSucceededVoidFuture ( )
1090+ }
1091+ . takingOwnershipOfDescriptor (
1092+ input: pipe2ReadFD
1093+ ) { channel in
1094+ channel. eventLoop. makeCompletedFuture {
1095+ try NIOAsyncChannel ( wrappingChannelSynchronously: channel)
1096+ }
1097+ }
1098+ } catch {
1099+ for fileDescriptor in [ pipe2ReadFD] {
1100+ try SystemCalls . close ( descriptor: fileDescriptor)
1101+ }
1102+ throw error
1103+ }
1104+
1105+ try await channel. executeThenClose { channelInbound, channelOutbound in
1106+ try await fromChannel. executeThenClose { fromChannelInbound, _ in
1107+ try await toChannel. executeThenClose { _, toChannelOutbound in
1108+ var inboundIterator = channelInbound. makeAsyncIterator ( )
1109+ var fromChannelInboundIterator = fromChannelInbound. makeAsyncIterator ( )
1110+
1111+ try await toChannelOutbound. write ( . init( string: " Request " ) )
1112+ try await XCTAsyncAssertEqual ( try await inboundIterator. next ( ) , ByteBuffer ( string: " Request " ) )
1113+
1114+ let response = ByteBuffer ( string: " Response " )
1115+ try await channelOutbound. write ( response)
1116+ try await XCTAsyncAssertEqual ( try await fromChannelInboundIterator. next ( ) , response)
1117+ }
1118+ }
1119+ }
1120+
1121+ XCTAssertEqual ( didCallChannelInitializer. withLockedValue { $0 } , 3 )
1122+ }
1123+
1124+ func testPipeBootstrap_whenInputNil_callsChannelInitializer( ) async throws {
1125+ let eventLoopGroup = self . group!
1126+ let ( pipe1ReadFD, pipe1WriteFD) = self . makePipeFileDescriptors ( )
1127+ let channel : NIOAsyncChannel < ByteBuffer , ByteBuffer >
1128+ let fromChannel : NIOAsyncChannel < ByteBuffer , Never >
1129+ let didCallChannelInitializer = NIOLockedValueBox ( 0 )
1130+
1131+ do {
1132+ channel = try await NIOPipeBootstrap ( group: eventLoopGroup)
1133+ . channelInitializer { channel in
1134+ didCallChannelInitializer. withLockedValue { $0 += 1 }
1135+ return channel. eventLoop. makeSucceededVoidFuture ( )
1136+ }
1137+ . takingOwnershipOfDescriptor (
1138+ output: pipe1WriteFD
1139+ ) { channel in
1140+ channel. eventLoop. makeCompletedFuture {
1141+ try NIOAsyncChannel ( wrappingChannelSynchronously: channel)
1142+ }
1143+ }
1144+ } catch {
1145+ for fileDescriptor in [ pipe1ReadFD, pipe1WriteFD] {
1146+ try SystemCalls . close ( descriptor: fileDescriptor)
1147+ }
1148+ throw error
1149+ }
1150+
1151+ do {
1152+ fromChannel = try await NIOPipeBootstrap ( group: eventLoopGroup)
1153+ . channelInitializer { channel in
1154+ didCallChannelInitializer. withLockedValue { $0 += 1 }
1155+ return channel. eventLoop. makeSucceededVoidFuture ( )
1156+ }
1157+ . takingOwnershipOfDescriptor (
1158+ input: pipe1ReadFD
1159+ ) { channel in
1160+ channel. eventLoop. makeCompletedFuture {
1161+ try NIOAsyncChannel ( wrappingChannelSynchronously: channel)
1162+ }
1163+ }
1164+ } catch {
1165+ for fileDescriptor in [ pipe1WriteFD] {
1166+ try SystemCalls . close ( descriptor: fileDescriptor)
1167+ }
1168+ throw error
1169+ }
1170+
1171+ try await channel. executeThenClose { channelInbound, channelOutbound in
1172+ try await fromChannel. executeThenClose { fromChannelInbound, _ in
1173+ var inboundIterator = channelInbound. makeAsyncIterator ( )
1174+ var fromChannelInboundIterator = fromChannelInbound. makeAsyncIterator ( )
1175+
1176+ try await XCTAsyncAssertEqual ( try await inboundIterator. next ( ) , nil )
1177+
1178+ let response = ByteBuffer ( string: " Response " )
1179+ try await channelOutbound. write ( response)
1180+ try await XCTAsyncAssertEqual ( try await fromChannelInboundIterator. next ( ) , response)
1181+ }
1182+ }
1183+
1184+ XCTAssertEqual ( didCallChannelInitializer. withLockedValue { $0 } , 2 )
1185+ }
1186+
1187+ func testPipeBootstrap_whenOutputNil_callsChannelInitializer( ) async throws {
1188+ let eventLoopGroup = self . group!
1189+ let ( pipe1ReadFD, pipe1WriteFD) = self . makePipeFileDescriptors ( )
1190+ let channel : NIOAsyncChannel < ByteBuffer , ByteBuffer >
1191+ let toChannel : NIOAsyncChannel < Never , ByteBuffer >
1192+ let didCallChannelInitializer = NIOLockedValueBox ( 0 )
1193+
1194+ do {
1195+ channel = try await NIOPipeBootstrap ( group: eventLoopGroup)
1196+ . channelInitializer { channel in
1197+ didCallChannelInitializer. withLockedValue { $0 += 1 }
1198+ return channel. eventLoop. makeSucceededVoidFuture ( )
1199+ }
1200+ . takingOwnershipOfDescriptor (
1201+ input: pipe1ReadFD
1202+ ) { channel in
1203+ channel. eventLoop. makeCompletedFuture {
1204+ try NIOAsyncChannel ( wrappingChannelSynchronously: channel)
1205+ }
1206+ }
1207+ } catch {
1208+ for fileDescriptor in [ pipe1ReadFD, pipe1WriteFD] {
1209+ try SystemCalls . close ( descriptor: fileDescriptor)
1210+ }
1211+
1212+ throw error
1213+ }
1214+
1215+ do {
1216+ toChannel = try await NIOPipeBootstrap ( group: eventLoopGroup)
1217+ . channelInitializer { channel in
1218+ didCallChannelInitializer. withLockedValue { $0 += 1 }
1219+ return channel. eventLoop. makeSucceededVoidFuture ( )
1220+ }
1221+ . takingOwnershipOfDescriptor (
1222+ output: pipe1WriteFD
1223+ ) { channel in
1224+ channel. eventLoop. makeCompletedFuture {
1225+ try NIOAsyncChannel ( wrappingChannelSynchronously: channel)
1226+ }
1227+ }
1228+ } catch {
1229+ for fileDescriptor in [ pipe1WriteFD] {
1230+ try SystemCalls . close ( descriptor: fileDescriptor)
1231+ }
1232+ throw error
1233+ }
1234+
1235+ try await channel. executeThenClose { channelInbound, channelOutbound in
1236+ try await toChannel. executeThenClose { _, toChannelOutbound in
1237+ var inboundIterator = channelInbound. makeAsyncIterator ( )
1238+
1239+ try await toChannelOutbound. write ( . init( string: " Request " ) )
1240+ try await XCTAsyncAssertEqual ( try await inboundIterator. next ( ) , ByteBuffer ( string: " Request " ) )
1241+
1242+ let response = ByteBuffer ( string: " Response " )
1243+ await XCTAsyncAssertThrowsError ( try await channelOutbound. write ( response) ) { error in
1244+ XCTAssertEqual ( error as? NIOAsyncWriterError , . alreadyFinished( ) )
1245+ }
1246+ }
1247+ }
1248+
1249+ XCTAssertEqual ( didCallChannelInitializer. withLockedValue { $0 } , 2 )
1250+ }
1251+
10361252 // MARK: RawSocket bootstrap
10371253
10381254 func testRawSocketBootstrap( ) async throws {
0 commit comments