Skip to content

Commit 5741df9

Browse files
authored
Enable per-message GRO on Linux (#3439)
Motivation: NIO has existing GRO support. This GRO support is applied at Channel scope, which makes it somewhat less than ideal, as it forces the datagram sizes to be accessed using socket options. In real applications this is less desirable than being able to do this on a per-superbuffer context. Linux supports this already by using recvmsg/recvmmsg control data. We just need to wire this up in NIO. We can re-use the existing metadata fields on AddressedEnvelope. Modifications: - Add a new ChannelOption to extra metadata - Reject setting this option on non-Linux devices. - On Linux, use this to load the control data. - Added new tests that validate the behaviour is correct. Result: Enable per-message GRO.
1 parent a7da98c commit 5741df9

File tree

4 files changed

+173
-3
lines changed

4 files changed

+173
-3
lines changed

Sources/NIOCore/ChannelOption.swift

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,19 @@ extension ChannelOptions {
217217
public init() {}
218218
}
219219

220+
/// ``DatagramReceiveSegmentSize`` enables per-message GRO (Generic Receive Offload) segment size reporting.
221+
/// When enabled, the kernel will provide the original segment size via control messages for aggregated datagrams,
222+
/// which will be reported in `AddressedEnvelope.Metadata.segmentSize`.
223+
///
224+
/// This option requires ``DatagramReceiveOffload`` to be enabled first and is only supported on Linux.
225+
/// Support can be checked using ``System/supportsUDPReceiveOffload``.
226+
///
227+
/// - Note: This provides the receive-side complement to per-message GSO (``AddressedEnvelope/Metadata/segmentSize``).
228+
public struct DatagramReceiveSegmentSize: ChannelOption, Sendable {
229+
public typealias Value = Bool
230+
public init() {}
231+
}
232+
220233
/// When set to true IP level ECN information will be reported through `AddressedEnvelope.Metadata`
221234
public struct ExplicitCongestionNotificationsOption: ChannelOption, Sendable {
222235
public typealias Value = Bool
@@ -360,6 +373,9 @@ public struct ChannelOptions: Sendable {
360373
/// - seealso: `DatagramReceiveOffload`
361374
public static let datagramReceiveOffload = Types.DatagramReceiveOffload()
362375

376+
/// - seealso: `DatagramReceiveSegmentSize`
377+
public static let datagramReceiveSegmentSize = Types.DatagramReceiveSegmentSize()
378+
363379
/// - seealso: `ExplicitCongestionNotificationsOption`
364380
public static let explicitCongestionNotification = Types.ExplicitCongestionNotificationsOption()
365381

@@ -451,6 +467,11 @@ extension ChannelOption where Self == ChannelOptions.Types.DatagramReceiveOffloa
451467
public static var datagramReceiveOffload: Self { .init() }
452468
}
453469

470+
/// - seealso: `DatagramReceiveSegmentSize`.
471+
extension ChannelOption where Self == ChannelOptions.Types.DatagramReceiveSegmentSize {
472+
public static var datagramReceiveSegmentSize: Self { .init() }
473+
}
474+
454475
/// - seealso: `ExplicitCongestionNotificationsOption`.
455476
extension ChannelOption where Self == ChannelOptions.Types.ExplicitCongestionNotificationsOption {
456477
public static var explicitCongestionNotification: Self { .init() }

Sources/NIOPosix/ControlMessage.swift

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,7 @@ struct UnsafeReceivedControlBytes {
194194
struct ControlMessageParser {
195195
var ecnValue: NIOExplicitCongestionNotificationState = .transportNotCapable // Default
196196
var packetInfo: NIOPacketInfo? = nil
197+
var segmentSize: Int? = nil
197198

198199
init(parsing controlMessagesReceived: UnsafeControlMessageCollection) {
199200
for controlMessage in controlMessagesReceived {
@@ -222,6 +223,8 @@ struct ControlMessageParser {
222223
self.receiveIPv4Message(controlMessage)
223224
} else if controlMessage.level == _IPPROTO_IPV6 {
224225
self.receiveIPv6Message(controlMessage)
226+
} else if controlMessage.level == Posix.SOL_UDP {
227+
self.receiveUDPMessage(controlMessage)
225228
}
226229
}
227230

@@ -271,6 +274,17 @@ struct ControlMessageParser {
271274
}
272275
}
273276
}
277+
278+
private mutating func receiveUDPMessage(_ controlMessage: UnsafeControlMessage) {
279+
#if os(Linux)
280+
if controlMessage.type == .init(NIOBSDSocket.Option.udp_gro.rawValue) {
281+
if let data = controlMessage.data {
282+
let readValue = ControlMessageParser._readCInt(data: data)
283+
self.segmentSize = Int(readValue)
284+
}
285+
}
286+
#endif
287+
}
274288
}
275289

276290
extension NIOExplicitCongestionNotificationState {
@@ -413,6 +427,10 @@ extension AddressedEnvelope.Metadata {
413427
/// It's assumed the caller has checked that congestion information is required before calling.
414428
internal init(from controlMessagesReceived: UnsafeControlMessageCollection) {
415429
let controlMessageReceiver = ControlMessageParser(parsing: controlMessagesReceived)
416-
self.init(ecnState: controlMessageReceiver.ecnValue, packetInfo: controlMessageReceiver.packetInfo)
430+
self.init(
431+
ecnState: controlMessageReceiver.ecnValue,
432+
packetInfo: controlMessageReceiver.packetInfo,
433+
segmentSize: controlMessageReceiver.segmentSize
434+
)
417435
}
418436
}

Sources/NIOPosix/SocketChannel.swift

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,7 @@ final class ServerSocketChannel: BaseSocketChannel<ServerSocket>, @unchecked Sen
467467
final class DatagramChannel: BaseSocketChannel<Socket>, @unchecked Sendable {
468468
private var reportExplicitCongestionNotifications = false
469469
private var receivePacketInfo = false
470+
private var receiveSegmentSize = false
470471

471472
// Guard against re-entrance of flushNow() method.
472473
private let pendingWrites: PendingDatagramWritesManager
@@ -625,6 +626,12 @@ final class DatagramChannel: BaseSocketChannel<Socket>, @unchecked Sendable {
625626
}
626627
let enable = value as! ChannelOptions.Types.DatagramReceiveOffload.Value
627628
try self.socket.setUDPReceiveOffload(enable)
629+
case _ as ChannelOptions.Types.DatagramReceiveSegmentSize:
630+
guard System.supportsUDPReceiveOffload else {
631+
throw ChannelError._operationUnsupported
632+
}
633+
let enable = value as! ChannelOptions.Types.DatagramReceiveSegmentSize.Value
634+
self.receiveSegmentSize = enable
628635
default:
629636
try super.setOption0(option, value: value)
630637
}
@@ -690,6 +697,11 @@ final class DatagramChannel: BaseSocketChannel<Socket>, @unchecked Sendable {
690697
throw ChannelError._operationUnsupported
691698
}
692699
return try self.socket.getUDPReceiveOffload() as! Option.Value
700+
case _ as ChannelOptions.Types.DatagramReceiveSegmentSize:
701+
guard System.supportsUDPReceiveOffload else {
702+
throw ChannelError._operationUnsupported
703+
}
704+
return self.receiveSegmentSize as! Option.Value
693705
case _ as ChannelOptions.Types.BufferedWritableBytesOption:
694706
return Int(self.pendingWrites.bufferedBytes) as! Option.Value
695707
default:
@@ -736,7 +748,7 @@ final class DatagramChannel: BaseSocketChannel<Socket>, @unchecked Sendable {
736748
override func readFromSocket() throws -> ReadResult {
737749
if self.vectorReadManager != nil {
738750
return try self.vectorReadFromSocket()
739-
} else if self.reportExplicitCongestionNotifications || self.receivePacketInfo {
751+
} else if self.reportExplicitCongestionNotifications || self.receivePacketInfo || self.receiveSegmentSize {
740752
let pooledMsgBuffer = self.selectableEventLoop.msgBufferPool.get()
741753
defer { self.selectableEventLoop.msgBufferPool.put(pooledMsgBuffer) }
742754
return try pooledMsgBuffer.withUnsafePointers { _, _, controlMessageStorage in
@@ -781,7 +793,7 @@ final class DatagramChannel: BaseSocketChannel<Socket>, @unchecked Sendable {
781793
readPending = false
782794

783795
let metadata: AddressedEnvelope<ByteBuffer>.Metadata?
784-
if self.reportExplicitCongestionNotifications || self.receivePacketInfo,
796+
if self.reportExplicitCongestionNotifications || self.receivePacketInfo || self.receiveSegmentSize,
785797
let controlMessagesReceived = controlBytes.receivedControlMessages
786798
{
787799
metadata = .init(from: controlMessagesReceived)
@@ -826,6 +838,7 @@ final class DatagramChannel: BaseSocketChannel<Socket>, @unchecked Sendable {
826838
socket: self.socket,
827839
buffer: &buffer,
828840
parseControlMessages: self.reportExplicitCongestionNotifications || self.receivePacketInfo
841+
|| self.receiveSegmentSize
829842
)
830843
}
831844

Tests/NIOPosixTests/DatagramChannelTests.swift

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2041,6 +2041,124 @@ class DatagramChannelTests: XCTestCase {
20412041
}
20422042
#endif
20432043
}
2044+
2045+
// MARK: - Per-Message GRO Tests
2046+
2047+
func testReceiveLargeBufferWithPerMessageGRO(
2048+
segments: Int,
2049+
segmentSize: Int,
2050+
writes: Int,
2051+
vectorReads: Int? = nil
2052+
) throws {
2053+
try XCTSkipUnless(System.supportsUDPSegmentationOffload, "UDP_SEGMENT (GSO) is not supported on this platform")
2054+
try XCTSkipUnless(System.supportsUDPReceiveOffload, "UDP_GRO is not supported on this platform")
2055+
try XCTSkipUnless(try self.hasGoodGROSupport())
2056+
2057+
/// Set GSO on the first channel.
2058+
XCTAssertNoThrow(
2059+
try self.firstChannel.setOption(.datagramSegmentSize, value: CInt(segmentSize)).wait()
2060+
)
2061+
/// Set GRO on the second channel.
2062+
XCTAssertNoThrow(try self.secondChannel.setOption(.datagramReceiveOffload, value: true).wait())
2063+
/// Enable per-message GRO segment size reporting on the second channel.
2064+
XCTAssertNoThrow(try self.secondChannel.setOption(.datagramReceiveSegmentSize, value: true).wait())
2065+
/// The third channel has neither set.
2066+
2067+
// Enable on second channel
2068+
if let vectorReads = vectorReads {
2069+
XCTAssertNoThrow(
2070+
try self.secondChannel.setOption(.datagramVectorReadMessageCount, value: vectorReads)
2071+
.wait()
2072+
)
2073+
}
2074+
2075+
/// Increase the size of the read buffer for the second and third channels.
2076+
let fixed = FixedSizeRecvByteBufferAllocator(capacity: 1 << 16)
2077+
XCTAssertNoThrow(try self.secondChannel.setOption(.recvAllocator, value: fixed).wait())
2078+
XCTAssertNoThrow(try self.thirdChannel.setOption(.recvAllocator, value: fixed).wait())
2079+
2080+
// Write a large datagrams on the first channel. They should be split and then accumulated on the receive side.
2081+
// Form a large buffer to write from the first channel.
2082+
let buffer = self.firstChannel.allocator.buffer(repeating: 1, count: segmentSize * segments)
2083+
2084+
// Write to the channel with per-message GRO enabled.
2085+
do {
2086+
let writeData = AddressedEnvelope(remoteAddress: self.secondChannel.localAddress!, data: buffer)
2087+
let promises = (0..<writes).map { _ in self.firstChannel.write(writeData) }
2088+
self.firstChannel.flush()
2089+
XCTAssertNoThrow(try EventLoopFuture.andAllSucceed(promises, on: self.firstChannel.eventLoop).wait())
2090+
2091+
// GRO is well supported; we expect `writes` datagrams.
2092+
let datagrams = try self.secondChannel.waitForDatagrams(count: writes)
2093+
for datagram in datagrams {
2094+
XCTAssertEqual(datagram.data.readableBytes, segments * segmentSize)
2095+
// Verify that the metadata contains the segment size
2096+
XCTAssertNotNil(datagram.metadata, "Expected metadata to be present")
2097+
XCTAssertEqual(
2098+
datagram.metadata?.segmentSize,
2099+
segmentSize,
2100+
"Expected segment size to be \(segmentSize)"
2101+
)
2102+
}
2103+
}
2104+
2105+
// Write to the channel without GRO.
2106+
do {
2107+
let writeData = AddressedEnvelope(remoteAddress: self.thirdChannel.localAddress!, data: buffer)
2108+
let promises = (0..<writes).map { _ in self.firstChannel.write(writeData) }
2109+
self.firstChannel.flush()
2110+
XCTAssertNoThrow(try EventLoopFuture.andAllSucceed(promises, on: self.firstChannel.eventLoop).wait())
2111+
2112+
// GRO is not enabled so we expect a `writes * segments` datagrams without segment size metadata.
2113+
let datagrams = try self.thirdChannel.waitForDatagrams(count: writes * segments)
2114+
for datagram in datagrams {
2115+
XCTAssertEqual(datagram.data.readableBytes, segmentSize)
2116+
// Without per-message GRO enabled, there should be no segment size in metadata
2117+
XCTAssertNil(datagram.metadata?.segmentSize)
2118+
}
2119+
}
2120+
}
2121+
2122+
func testChannelCanReceiveLargeBufferWithPerMessageGROUsingScalarReads() throws {
2123+
try self.testReceiveLargeBufferWithPerMessageGRO(segments: 10, segmentSize: 1000, writes: 1)
2124+
}
2125+
2126+
func testChannelCanReceiveLargeBufferWithPerMessageGROUsingVectorReads() throws {
2127+
try self.testReceiveLargeBufferWithPerMessageGRO(segments: 10, segmentSize: 1000, writes: 1, vectorReads: 4)
2128+
}
2129+
2130+
func testChannelCanReceiveMultipleLargeBuffersWithPerMessageGROUsingScalarReads() throws {
2131+
try self.testReceiveLargeBufferWithPerMessageGRO(segments: 10, segmentSize: 1000, writes: 4)
2132+
}
2133+
2134+
func testChannelCanReceiveMultipleLargeBuffersWithPerMessageGROUsingVectorReads() throws {
2135+
try self.testReceiveLargeBufferWithPerMessageGRO(segments: 10, segmentSize: 1000, writes: 4, vectorReads: 4)
2136+
}
2137+
2138+
func testGetPerMessageGROOption() throws {
2139+
let getOption = self.firstChannel.getOption(.datagramReceiveSegmentSize)
2140+
if System.supportsUDPReceiveOffload {
2141+
XCTAssertEqual(try getOption.wait(), false) // not-set
2142+
2143+
// Now set and check.
2144+
XCTAssertNoThrow(try self.firstChannel.setOption(.datagramReceiveSegmentSize, value: true).wait())
2145+
XCTAssertTrue(try self.firstChannel.getOption(.datagramReceiveSegmentSize).wait())
2146+
} else {
2147+
XCTAssertThrowsError(try getOption.wait()) { error in
2148+
XCTAssertEqual(error as? ChannelError, .operationUnsupported)
2149+
}
2150+
}
2151+
}
2152+
2153+
func testPerMessageGROThrowsOnNonLinux() throws {
2154+
#if !os(Linux)
2155+
// On non-Linux platforms, setting datagramReceiveSegmentSize should throw an error
2156+
XCTAssertThrowsError(try self.firstChannel.setOption(.datagramReceiveSegmentSize, value: true).wait()) {
2157+
error in
2158+
XCTAssertEqual(error as? ChannelError, .operationUnsupported)
2159+
}
2160+
#endif
2161+
}
20442162
}
20452163

20462164
extension System {

0 commit comments

Comments
 (0)