Skip to content

Commit a7da98c

Browse files
authored
Enable per-message GSO on Linux (#3436)
Motivation: NIO has existing GSO support. This GSO support is applied at Channel scope, which makes it somewhat less than ideal, as it forces the datagram sizes to be known statically ahead of time. In real applications this is less common than being able to do this on a per-superbuffer context. Linux supports this already by using sendmsg/sendmmsg control data. We just need to wire this up in NIO. An extension to this is that we can also do per-message GRO in Linux. I'll tackle this in a later patch to keep the diffs small. Modifications: - Adding a new field on AddressedEnvelope Metadata. - Reject setting this field on non-Linux devices. - On Linux, use this to set the control data. - Added new tests that validate the behaviour is correct. Result: Enable per-message GSO.
1 parent 88dc990 commit a7da98c

File tree

6 files changed

+240
-0
lines changed

6 files changed

+240
-0
lines changed

Sources/NIOCore/AddressedEnvelope.swift

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,15 +39,37 @@ public struct AddressedEnvelope<DataType> {
3939
/// Details of any congestion state.
4040
public var ecnState: NIOExplicitCongestionNotificationState
4141
public var packetInfo: NIOPacketInfo?
42+
/// The UDP segment size for Generic Segmentation Offload (GSO).
43+
///
44+
/// When set, this enables per-message GSO, allowing the kernel to split this datagram
45+
/// into multiple segments of the specified size. The maximum segment size is platform-dependent
46+
/// and can be queried via `System.udpMaxSegments`.
47+
///
48+
/// On non-Linux platforms, writes with a non-nil `segmentSize` will fail with
49+
/// `ChannelError.operationUnsupported`. The error will be propagated to the write promise
50+
/// (if attached).
51+
public var segmentSize: Int?
4252

4353
public init(ecnState: NIOExplicitCongestionNotificationState) {
4454
self.ecnState = ecnState
4555
self.packetInfo = nil
56+
self.segmentSize = nil
4657
}
4758

4859
public init(ecnState: NIOExplicitCongestionNotificationState, packetInfo: NIOPacketInfo?) {
4960
self.ecnState = ecnState
5061
self.packetInfo = packetInfo
62+
self.segmentSize = nil
63+
}
64+
65+
public init(
66+
ecnState: NIOExplicitCongestionNotificationState,
67+
packetInfo: NIOPacketInfo?,
68+
segmentSize: Int?
69+
) {
70+
self.ecnState = ecnState
71+
self.packetInfo = packetInfo
72+
self.segmentSize = segmentSize
5173
}
5274
}
5375
}

Sources/NIOPosix/ControlMessage.swift

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -388,6 +388,25 @@ extension UnsafeOutboundControlBytes {
388388
break
389389
}
390390
}
391+
392+
/// Appends a UDP segment size control message for Generic Segmentation Offload (GSO).
393+
///
394+
/// - Parameter metadata: Metadata from the addressed envelope which may contain a segment size.
395+
/// - Warning: This will `fatalError` if called with a segmentSize on non-Linux platforms.
396+
/// Callers should validate platform support before enqueueing writes with segmentSize.
397+
internal mutating func appendUDPSegmentSize(metadata: AddressedEnvelope<ByteBuffer>.Metadata?) {
398+
guard let metadata = metadata, let segmentSize = metadata.segmentSize else { return }
399+
400+
#if os(Linux)
401+
self.appendGenericControlMessage(
402+
level: Posix.SOL_UDP,
403+
type: .init(NIOBSDSocket.Option.udp_segment.rawValue),
404+
payload: UInt16(segmentSize)
405+
)
406+
#else
407+
fatalError("UDP segment size is only supported on Linux")
408+
#endif
409+
}
391410
}
392411

393412
extension AddressedEnvelope.Metadata {

Sources/NIOPosix/PendingDatagramWritesManager.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ private func doPendingDatagramWriteVectorOperation(
148148

149149
var controlBytes = UnsafeOutboundControlBytes(controlBytes: controlMessageStorage[c])
150150
controlBytes.appendExplicitCongestionState(metadata: p.metadata, protocolFamily: protocolFamily)
151+
controlBytes.appendUDPSegmentSize(metadata: p.metadata)
151152
let controlMessageBytePointer = controlBytes.validControlBytes
152153

153154
var msg = msghdr()

Sources/NIOPosix/SocketChannel.swift

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -929,6 +929,14 @@ final class DatagramChannel: BaseSocketChannel<Socket>, @unchecked Sendable {
929929
/// Buffer a write in preparation for a flush.
930930
private func bufferPendingAddressedWrite(envelope: AddressedEnvelope<ByteBuffer>, promise: EventLoopPromise<Void>?)
931931
{
932+
// Check if segment size is set on a non-Linux platform
933+
#if !os(Linux)
934+
if envelope.metadata?.segmentSize != nil {
935+
promise?.fail(ChannelError.operationUnsupported)
936+
return
937+
}
938+
#endif
939+
932940
// If the socket is connected, check the remote provided matches the connected address.
933941
if let connectedRemoteAddress = self.remoteAddress {
934942
guard envelope.remoteAddress == connectedRemoteAddress else {
@@ -977,6 +985,7 @@ final class DatagramChannel: BaseSocketChannel<Socket>, @unchecked Sendable {
977985
metadata: metadata,
978986
protocolFamily: self.localAddress?.protocol
979987
)
988+
controlBytes.appendUDPSegmentSize(metadata: metadata)
980989
return try self.socket.sendmsg(
981990
pointer: ptr,
982991
destinationPtr: destinationPtr,

Sources/NIOPosix/System.swift

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -542,6 +542,14 @@ internal enum Posix: Sendable {
542542
static let IPV6_PKTINFO: CInt = CInt(WinSDK.IPV6_PKTINFO)
543543
#endif
544544

545+
#if canImport(Darwin)
546+
static let SOL_UDP: CInt = CInt(IPPROTO_UDP)
547+
#elseif os(Linux) || os(FreeBSD) || os(Android)
548+
static let SOL_UDP: CInt = CInt(IPPROTO_UDP)
549+
#elseif os(Windows)
550+
static let SOL_UDP: CInt = CInt(IPPROTO_UDP)
551+
#endif
552+
545553
#if !os(Windows)
546554
@inline(never)
547555
public static func shutdown(descriptor: CInt, how: Shutdown) throws {

Tests/NIOPosixTests/DatagramChannelTests.swift

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1860,6 +1860,187 @@ class DatagramChannelTests: XCTestCase {
18601860

18611861
return hasGoodGROSupport
18621862
}
1863+
1864+
// MARK: - Per-Message GSO Tests
1865+
1866+
func testLargeScalarWriteWithPerMessageGSO() throws {
1867+
try XCTSkipUnless(System.supportsUDPSegmentationOffload, "UDP_SEGMENT (GSO) is not supported on this platform")
1868+
1869+
// We're going to send one large buffer with per-message GSO metadata.
1870+
// The kernel should split it into multiple segments.
1871+
let segmentSize = 1000
1872+
let segments = 10
1873+
1874+
// Form a handful of segments
1875+
let buffers = (0..<segments).map { i in
1876+
ByteBuffer(repeating: UInt8(i), count: segmentSize)
1877+
}
1878+
1879+
// Coalesce the segments into a single buffer.
1880+
var buffer = self.firstChannel.allocator.buffer(capacity: segments * segmentSize)
1881+
for segment in buffers {
1882+
buffer.writeImmutableBuffer(segment)
1883+
}
1884+
1885+
// Write the single large buffer with per-message GSO metadata.
1886+
let writeData = AddressedEnvelope(
1887+
remoteAddress: self.secondChannel.localAddress!,
1888+
data: buffer,
1889+
metadata: .init(ecnState: .transportNotCapable, packetInfo: nil, segmentSize: segmentSize)
1890+
)
1891+
XCTAssertNoThrow(try self.firstChannel.writeAndFlush(writeData).wait())
1892+
1893+
// The receiver will receive separate segments.
1894+
let receivedBuffers = try self.secondChannel.waitForDatagrams(count: segments)
1895+
let receivedBytes = receivedBuffers.map { $0.data.readableBytes }.reduce(0, +)
1896+
XCTAssertEqual(segmentSize * segments, receivedBytes)
1897+
1898+
var unusedIndexes = Set(buffers.indices)
1899+
for envelope in receivedBuffers {
1900+
if let index = buffers.firstIndex(of: envelope.data) {
1901+
XCTAssertNotNil(unusedIndexes.remove(index))
1902+
} else {
1903+
XCTFail("No matching buffer")
1904+
}
1905+
}
1906+
}
1907+
1908+
func testLargeVectorWriteWithPerMessageGSO() throws {
1909+
try XCTSkipUnless(System.supportsUDPSegmentationOffload, "UDP_SEGMENT (GSO) is not supported on this platform")
1910+
1911+
// Similar to the test above, but with multiple writes using different segment sizes.
1912+
let segmentSize1 = 1000
1913+
let segments1 = 10
1914+
let segmentSize2 = 500
1915+
let segments2 = 5
1916+
1917+
// Form segments for first write
1918+
let buffers1 = (0..<segments1).map { i in
1919+
ByteBuffer(repeating: UInt8(i), count: segmentSize1)
1920+
}
1921+
var buffer1 = self.firstChannel.allocator.buffer(capacity: segments1 * segmentSize1)
1922+
for segment in buffers1 {
1923+
buffer1.writeImmutableBuffer(segment)
1924+
}
1925+
1926+
// Form segments for second write
1927+
let buffers2 = (0..<segments2).map { i in
1928+
ByteBuffer(repeating: UInt8(100 + i), count: segmentSize2)
1929+
}
1930+
var buffer2 = self.firstChannel.allocator.buffer(capacity: segments2 * segmentSize2)
1931+
for segment in buffers2 {
1932+
buffer2.writeImmutableBuffer(segment)
1933+
}
1934+
1935+
// Write both buffers with different segment sizes.
1936+
let writeData1 = AddressedEnvelope(
1937+
remoteAddress: self.secondChannel.localAddress!,
1938+
data: buffer1,
1939+
metadata: .init(ecnState: .transportNotCapable, packetInfo: nil, segmentSize: segmentSize1)
1940+
)
1941+
let writeData2 = AddressedEnvelope(
1942+
remoteAddress: self.secondChannel.localAddress!,
1943+
data: buffer2,
1944+
metadata: .init(ecnState: .transportNotCapable, packetInfo: nil, segmentSize: segmentSize2)
1945+
)
1946+
let write1 = self.firstChannel.write(writeData1)
1947+
let write2 = self.firstChannel.write(writeData2)
1948+
self.firstChannel.flush()
1949+
XCTAssertNoThrow(try write1.wait())
1950+
XCTAssertNoThrow(try write2.wait())
1951+
1952+
// The receiver will receive separate segments from both writes.
1953+
let receivedBuffers = try self.secondChannel.waitForDatagrams(count: segments1 + segments2)
1954+
XCTAssertEqual(receivedBuffers.count, segments1 + segments2)
1955+
}
1956+
1957+
func testMixedGSOAndNonGSO() throws {
1958+
try XCTSkipUnless(System.supportsUDPSegmentationOffload, "UDP_SEGMENT (GSO) is not supported on this platform")
1959+
1960+
// Send some messages with GSO and some without.
1961+
let segmentSize = 1000
1962+
let segments = 5
1963+
1964+
// GSO message
1965+
var gsoBuffer = self.firstChannel.allocator.buffer(capacity: segments * segmentSize)
1966+
for _ in 0..<segments {
1967+
gsoBuffer.writeRepeatingByte(1, count: segmentSize)
1968+
}
1969+
let gsoEnvelope = AddressedEnvelope(
1970+
remoteAddress: self.secondChannel.localAddress!,
1971+
data: gsoBuffer,
1972+
metadata: .init(ecnState: .transportNotCapable, packetInfo: nil, segmentSize: segmentSize)
1973+
)
1974+
1975+
// Non-GSO message
1976+
var normalBuffer = self.firstChannel.allocator.buffer(capacity: 100)
1977+
normalBuffer.writeRepeatingByte(2, count: 100)
1978+
let normalEnvelope = AddressedEnvelope(
1979+
remoteAddress: self.secondChannel.localAddress!,
1980+
data: normalBuffer
1981+
)
1982+
1983+
// Send both
1984+
let write1 = self.firstChannel.write(gsoEnvelope)
1985+
let write2 = self.firstChannel.write(normalEnvelope)
1986+
self.firstChannel.flush()
1987+
XCTAssertNoThrow(try write1.wait())
1988+
XCTAssertNoThrow(try write2.wait())
1989+
1990+
// Should receive 5 segments + 1 normal message
1991+
let received = try self.secondChannel.waitForDatagrams(count: segments + 1)
1992+
XCTAssertEqual(received.count, segments + 1)
1993+
}
1994+
1995+
func testPerMessageGSOOverridesChannelLevel() throws {
1996+
try XCTSkipUnless(System.supportsUDPSegmentationOffload, "UDP_SEGMENT (GSO) is not supported on this platform")
1997+
1998+
// Set channel-level GSO to one value
1999+
XCTAssertNoThrow(try self.firstChannel.setOption(.datagramSegmentSize, value: CInt(500)).wait())
2000+
2001+
// But use per-message GSO with a different value
2002+
let segmentSize = 1000
2003+
let segments = 5
2004+
2005+
var buffer = self.firstChannel.allocator.buffer(capacity: segments * segmentSize)
2006+
for i in 0..<segments {
2007+
buffer.writeRepeatingByte(UInt8(i), count: segmentSize)
2008+
}
2009+
2010+
let envelope = AddressedEnvelope(
2011+
remoteAddress: self.secondChannel.localAddress!,
2012+
data: buffer,
2013+
metadata: .init(ecnState: .transportNotCapable, packetInfo: nil, segmentSize: segmentSize)
2014+
)
2015+
2016+
XCTAssertNoThrow(try self.firstChannel.writeAndFlush(envelope).wait())
2017+
2018+
// Should receive segments of size 1000, not 500
2019+
let received = try self.secondChannel.waitForDatagrams(count: segments)
2020+
for datagram in received {
2021+
XCTAssertEqual(datagram.data.readableBytes, segmentSize)
2022+
}
2023+
}
2024+
2025+
func testPerMessageGSOThrowsOnNonLinux() throws {
2026+
#if os(Linux)
2027+
try XCTSkipIf(true, "This test only runs on non-Linux platforms")
2028+
#else
2029+
// On non-Linux platforms, setting segmentSize in metadata should throw an error
2030+
var buffer = self.firstChannel.allocator.buffer(capacity: 1000)
2031+
buffer.writeRepeatingByte(1, count: 1000)
2032+
2033+
let envelope = AddressedEnvelope(
2034+
remoteAddress: self.secondChannel.localAddress!,
2035+
data: buffer,
2036+
metadata: .init(ecnState: .transportNotCapable, packetInfo: nil, segmentSize: 500)
2037+
)
2038+
2039+
XCTAssertThrowsError(try self.firstChannel.writeAndFlush(envelope).wait()) { error in
2040+
XCTAssertEqual(error as? ChannelError, .operationUnsupported)
2041+
}
2042+
#endif
2043+
}
18632044
}
18642045

18652046
extension System {

0 commit comments

Comments
 (0)