Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Sources/NIOCore/IO.swift
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,15 @@ public struct IOError: Swift.Error {
.reason(self.failureDescription)
}

private enum Error {
package enum Error {
#if os(Windows)
case windows(DWORD)
case winsock(CInt)
#endif
case errno(CInt)
}

private let error: Error
package let error: Error

/// The `errno` that was set for the operation.
public var errnoCode: CInt {
Expand Down
4 changes: 4 additions & 0 deletions Sources/NIOCore/SystemCallHelpers.swift
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ internal func syscall<T: FixedWidthInteger>(
case (EWOULDBLOCK, true):
return .wouldBlock(0)
#endif
#if os(Windows)
case (WSAEWOULDBLOCK, true):
return .wouldBlock(0)
#endif
default:
preconditionIsNotUnacceptableErrno(err: err, where: function)
throw IOError(errnoCode: err, reason: function)
Expand Down
13 changes: 12 additions & 1 deletion Sources/NIOPosix/BaseSocketChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
import Atomics
import NIOConcurrencyHelpers
import NIOCore
#if os(Windows)
import WinSDK
#endif

private struct SocketChannelLifecycleManager {
// MARK: Types
Expand Down Expand Up @@ -1215,7 +1218,15 @@ class BaseSocketChannel<SocketType: BaseSocketProtocol>: SelectableChannel, Chan
/// - err: The `Error` which was thrown by `readFromSocket`.
/// - Returns: `true` if the `Channel` should be closed, `false` otherwise.
func shouldCloseOnReadError(_ err: Error) -> Bool {
true
#if os(Windows)
if
let err = err as? IOError,
case .winsock(WSAEWOULDBLOCK) = err.error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this case reached?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I've found is that when WSAPoll gives the server socket a .read event when a client is inbound:

  • Server accepts the client through readable() -> readable0() -> ServerSocketChannel.readFromSocket()
  • The socket.accept(..) finds a socket
  • SelectableEventLoop repeats this again again to get another read (see maxMessagesPerRead)
  • This time, socket.accept(..) runs into WinSDK.INVALID_SOCKET
  • Gets the error using WSAGetLastError()
  • The error ends up reaching .winsock(WSAEWOULDBLOCK)

In hindsight, I just noticed that accept() returns an optional so I'll leverage that instead.

{
return false
}
#endif
return true
}

/// Handles an error reported by the selector.
Expand Down
4 changes: 3 additions & 1 deletion Sources/NIOPosix/SelectorGeneric.swift
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,9 @@ internal class Selector<R: Registration> {
@usableFromInline
typealias EventType = WinSDK.pollfd
@usableFromInline
var pollFDs = [WinSDK.pollfd]()
var pollFDs = [pollfd]()
@usableFromInline
var deregisteredFDs = [Bool]()
#else
#error("Unsupported platform, no suitable selector backend (we need kqueue or epoll support)")
#endif
Expand Down
29 changes: 25 additions & 4 deletions Sources/NIOPosix/SelectorWSAPoll.swift
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ extension Selector: _SelectorBackendProtocol {

func initialiseState0() throws {
self.pollFDs.reserveCapacity(16)
self.deregisteredFDs.reserveCapacity(16)
self.lifecycleState = .open
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lifecycle never became open yet

}

func deinitAssertions0() {
Expand Down Expand Up @@ -131,6 +133,19 @@ extension Selector: _SelectorBackendProtocol {

try body((SelectorEvent(io: selectorEvent, registration: registration)))
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line often calls deregister0 indirectly, so we effectively can't mutate pollFDs in deregister0

}

// now clean up any deregistered fds
// In reverse order so we don't have to copy elements out of the array
// If we do in in normal order, we'll have to shift all elements after the removed one
for i in self.deregisteredFDs.indices.reversed() {
if self.deregisteredFDs[i] {
// remove this one
let fd = self.pollFDs[i].fd
self.pollFDs.remove(at: i)
self.deregisteredFDs.remove(at: i)
self.registrations.removeValue(forKey: Int(fd))
}
}
Comment on lines +137 to +148
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do the deregister0 work of cleaning up after the polling is done

} else if result == 0 {
// nothing has happened
} else if result == WinSDK.SOCKET_ERROR {
Expand All @@ -149,6 +164,7 @@ extension Selector: _SelectorBackendProtocol {
// that will allow O(1) access here.
let poll = pollfd(fd: UInt64(fileDescriptor), events: interested.wsaPollEvent, revents: 0)
self.pollFDs.append(poll)
self.deregisteredFDs.append(false)
}

func reregister0(
Expand All @@ -158,7 +174,9 @@ extension Selector: _SelectorBackendProtocol {
newInterested: SelectorEventSet,
registrationID: SelectorRegistrationID
) throws {
fatalError("TODO: Unimplemented")
if let index = self.pollFDs.firstIndex(where: { $0.fd == UInt64(fileDescriptor) }) {
self.pollFDs[index].events = newInterested.wsaPollEvent
}
}

func deregister0(
Expand All @@ -167,13 +185,15 @@ extension Selector: _SelectorBackendProtocol {
oldInterested: SelectorEventSet,
registrationID: SelectorRegistrationID
) throws {
fatalError("TODO: Unimplemented")
if let index = self.pollFDs.firstIndex(where: { $0.fd == UInt64(fileDescriptor) }) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deregistering is just removing the FD. However, we can't just remove it here as we're iterating over the same pollFDs at the same time. deregister0 is called down the stack of try body((SelectorEvent(io: selectorEvent, registration: registration))). So the available indices of pollFDs changes causing a crash

self.deregisteredFDs[index] = true
}
}

func wakeup0() throws {
// will be called from a different thread
let result = try self.myThread.withHandleUnderLock { handle in
QueueUserAPC(wakeupTarget, handle, 0)
let result = try self.myThread.withHandleUnderLock { threadHandle in
return QueueUserAPC(wakeupTarget, threadHandle.handle, 0)
}
if result == 0 {
let errorCode = GetLastError()
Expand All @@ -185,6 +205,7 @@ extension Selector: _SelectorBackendProtocol {

func close0() throws {
self.pollFDs.removeAll()
self.deregisteredFDs.removeAll()
}
}

Expand Down
41 changes: 29 additions & 12 deletions Sources/NIOPosix/SocketChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import let WinSDK.ENFILE
import let WinSDK.ENOBUFS
import let WinSDK.ENOMEM
import let WinSDK.INADDR_ANY
import let WinSDK.WSAEWOULDBLOCK

import struct WinSDK.ip_mreq
import struct WinSDK.ipv6_mreq
Expand Down Expand Up @@ -383,15 +384,19 @@ final class ServerSocketChannel: BaseSocketChannel<ServerSocket>, @unchecked Sen
}
guard let err = err as? IOError else { return true }

switch err.errnoCode {
case ECONNABORTED,
EMFILE,
ENFILE,
ENOBUFS,
ENOMEM:
switch err.error {
case .errno(ECONNABORTED),
.errno(EMFILE),
.errno(ENFILE),
.errno(ENOBUFS),
.errno(ENOMEM):
// These are errors we may be able to recover from. The user may just want to stop accepting connections for example
// or provide some other means of back-pressure. This could be achieved by a custom ChannelDuplexHandler.
return false
#if os(Windows)
case .winsock(WSAEWOULDBLOCK):
return false
#endif
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

accept returns WSAEWOULDBLOCK

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why doesn't the current logic for syscall(blocking: true) cover this?

default:
return true
}
Expand Down Expand Up @@ -854,24 +859,36 @@ final class DatagramChannel: BaseSocketChannel<Socket>, @unchecked Sendable {
#endif
}

private func shouldCloseOnErrnoCode(_ errnoCode: CInt) -> Bool {
switch errnoCode {
private func shouldCloseOnErrnoCode(_ errno: CInt) -> Bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What motivated the changes to this function?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was messing with that code but the function itself shouldn't have changed, I'll revert it.

switch errno {
case ECONNREFUSED, ENOMEM:
// These are errors we may be able to recover from.
return false
default:
return true
}
}

private func shouldCloseOnError(_ error: IOError.Error) -> Bool {
switch error {
// ECONNREFUSED can happen on linux if the previous sendto(...) failed.
// See also:
// - https://bugzilla.redhat.com/show_bug.cgi?id=1375
// - https://lists.gt.net/linux/kernel/39575
case ECONNREFUSED,
ENOMEM:
// These are errors we may be able to recover from.
case .errno(let code):
return self.shouldCloseOnErrnoCode(code)
#if os(Windows)
case .winsock(EWOULDBLOCK):
return false
#endif
default:
return true
}
}

override func shouldCloseOnReadError(_ err: Error) -> Bool {
guard let err = err as? IOError else { return true }
return self.shouldCloseOnErrnoCode(err.errnoCode)
return self.shouldCloseOnError(err.error)
}

override func error() -> ErrorResult {
Expand Down
8 changes: 8 additions & 0 deletions Sources/NIOPosix/System.swift
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,10 @@ internal func syscall<T: FixedWidthInteger>(
continue
case (EWOULDBLOCK, true):
return .wouldBlock(0)
#if os(Windows)
case (WSAEWOULDBLOCK, true):
return .wouldBlock(0)
#endif
default:
preconditionIsNotUnacceptableErrno(err: err, where: function)
throw IOError(errnoCode: err, reason: function)
Expand Down Expand Up @@ -430,6 +434,10 @@ internal func syscallForbiddingEINVAL<T: FixedWidthInteger>(
continue
case EWOULDBLOCK:
return .wouldBlock(0)
#if os(Windows)
case WSAEWOULDBLOCK:
return .wouldBlock(0)
#endif
default:
preconditionIsNotUnacceptableErrnoForbiddingEINVAL(err: err, where: function)
throw IOError(errnoCode: err, reason: function)
Expand Down
6 changes: 1 addition & 5 deletions Sources/NIOPosix/Thread.swift
Original file line number Diff line number Diff line change
Expand Up @@ -92,18 +92,14 @@ final class NIOThread: Sendable {

static var currentThreadName: String? {
#if os(Windows)
ThreadOpsSystem.threadName(.init(GetCurrentThread()))
ThreadOpsSystem.threadName(.init(handle: GetCurrentThread()))
#else
ThreadOpsSystem.threadName(.init(handle: pthread_self()))
#endif
}

static var currentThreadID: UInt {
#if os(Windows)
UInt(bitPattern: .init(bitPattern: ThreadOpsSystem.currentThread))
#else
UInt(bitPattern: .init(bitPattern: ThreadOpsSystem.currentThread.handle))
#endif
}

@discardableResult
Expand Down
47 changes: 39 additions & 8 deletions Sources/NIOPosix/ThreadWindows.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ import WinSDK

typealias ThreadOpsSystem = ThreadOpsWindows
enum ThreadOpsWindows: ThreadOps {
typealias ThreadHandle = HANDLE
struct ThreadHandle: @unchecked Sendable {
let handle: HANDLE
}
typealias ThreadSpecificKey = DWORD
typealias ThreadSpecificKeyDestructor = @convention(c) (UnsafeMutableRawPointer?) -> Void

static func threadName(_ thread: ThreadOpsSystem.ThreadHandle) -> String? {
var pszBuffer: PWSTR?
GetThreadDescription(thread, &pszBuffer)
GetThreadDescription(thread.handle, &pszBuffer)
guard let buffer = pszBuffer else { return nil }
let string: String = String(decodingCString: buffer, as: UTF16.self)
LocalFree(buffer)
Expand All @@ -41,11 +43,27 @@ enum ThreadOpsWindows: ThreadOps {
let routine: @convention(c) (UnsafeMutableRawPointer?) -> CUnsignedInt = {
let boxed = Unmanaged<NIOThread.ThreadBox>.fromOpaque($0!).takeRetainedValue()
let (body, name) = (boxed.value.body, boxed.value.name)
let hThread: ThreadOpsSystem.ThreadHandle = GetCurrentThread()

// Get a real thread handle instead of pseudo-handle
var realHandle: HANDLE? = nil
let success = DuplicateHandle(
GetCurrentProcess(), // Source process
GetCurrentThread(), // Source handle (pseudo-handle)
GetCurrentProcess(), // Target process
&realHandle, // Target handle (real handle)
0, // Desired access (0 = same as source)
false, // Inherit handle
DWORD(DUPLICATE_SAME_ACCESS) // Options
)
Comment on lines +48 to +57
Copy link
Author

@Joannis Joannis Nov 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Windows gives pseudo handles by default, so they're always correct for the current thread. However, when spawning work on another thread, this handle results in getting the incorrect ThreadID, causing QueueUserAPC to notify the wrong (or non-existing) thread. This in turn prevents SleepEx from waking up the eventloop


guard success, let realHandle else {
fatalError("DuplicateHandle failed: \(GetLastError())")
}
let hThread = ThreadOpsSystem.ThreadHandle(handle: realHandle)

if let name = name {
_ = name.withCString(encodedAs: UTF16.self) {
SetThreadDescription(hThread, $0)
SetThreadDescription(hThread.handle, $0)
}
}

Expand All @@ -58,15 +76,28 @@ enum ThreadOpsWindows: ThreadOps {
}

static func isCurrentThread(_ thread: ThreadOpsSystem.ThreadHandle) -> Bool {
CompareObjectHandles(thread, GetCurrentThread())
CompareObjectHandles(thread.handle, GetCurrentThread())
}

static var currentThread: ThreadOpsSystem.ThreadHandle {
GetCurrentThread()
var realHandle: HANDLE? = nil
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise here

let success = DuplicateHandle(
GetCurrentProcess(),
GetCurrentThread(),
GetCurrentProcess(),
&realHandle,
0,
false,
DWORD(DUPLICATE_SAME_ACCESS)
)
guard success, let realHandle else {
fatalError("DuplicateHandle failed: \(GetLastError())")
}
return ThreadHandle(handle: realHandle)
}

static func joinThread(_ thread: ThreadOpsSystem.ThreadHandle) {
let dwResult: DWORD = WaitForSingleObject(thread, INFINITE)
let dwResult: DWORD = WaitForSingleObject(thread.handle, INFINITE)
assert(dwResult == WAIT_OBJECT_0, "WaitForSingleObject: \(GetLastError())")
}

Expand All @@ -88,7 +119,7 @@ enum ThreadOpsWindows: ThreadOps {
}

static func compareThreads(_ lhs: ThreadOpsSystem.ThreadHandle, _ rhs: ThreadOpsSystem.ThreadHandle) -> Bool {
CompareObjectHandles(lhs, rhs)
CompareObjectHandles(lhs.handle, rhs.handle)
}
}

Expand Down