-
Notifications
You must be signed in to change notification settings - Fork 720
Winsock Fixes #3433
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Winsock Fixes #3433
Changes from 2 commits
43c5d7d
f59cfef
10bb556
05caf55
d2a1ed1
17f30b6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -67,6 +67,8 @@ extension Selector: _SelectorBackendProtocol { | |
|
|
||
| func initialiseState0() throws { | ||
| self.pollFDs.reserveCapacity(16) | ||
| self.deregisteredFDs.reserveCapacity(16) | ||
| self.lifecycleState = .open | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Lifecycle never became open yet |
||
| } | ||
|
|
||
| func deinitAssertions0() { | ||
|
|
@@ -131,6 +133,19 @@ extension Selector: _SelectorBackendProtocol { | |
|
|
||
| try body((SelectorEvent(io: selectorEvent, registration: registration))) | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This line often calls |
||
| } | ||
|
|
||
| // now clean up any deregistered fds | ||
| // In reverse order so we don't have to copy elements out of the array | ||
| // If we do in in normal order, we'll have to shift all elements after the removed one | ||
| for i in self.deregisteredFDs.indices.reversed() { | ||
| if self.deregisteredFDs[i] { | ||
| // remove this one | ||
| let fd = self.pollFDs[i].fd | ||
| self.pollFDs.remove(at: i) | ||
| self.deregisteredFDs.remove(at: i) | ||
| self.registrations.removeValue(forKey: Int(fd)) | ||
| } | ||
| } | ||
|
Comment on lines
+137
to
+148
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do the |
||
| } else if result == 0 { | ||
| // nothing has happened | ||
| } else if result == WinSDK.SOCKET_ERROR { | ||
|
|
@@ -149,6 +164,7 @@ extension Selector: _SelectorBackendProtocol { | |
| // that will allow O(1) access here. | ||
| let poll = pollfd(fd: UInt64(fileDescriptor), events: interested.wsaPollEvent, revents: 0) | ||
| self.pollFDs.append(poll) | ||
| self.deregisteredFDs.append(false) | ||
| } | ||
|
|
||
| func reregister0( | ||
|
|
@@ -158,7 +174,9 @@ extension Selector: _SelectorBackendProtocol { | |
| newInterested: SelectorEventSet, | ||
| registrationID: SelectorRegistrationID | ||
| ) throws { | ||
| fatalError("TODO: Unimplemented") | ||
| if let index = self.pollFDs.firstIndex(where: { $0.fd == UInt64(fileDescriptor) }) { | ||
| self.pollFDs[index].events = newInterested.wsaPollEvent | ||
| } | ||
| } | ||
|
|
||
| func deregister0( | ||
|
|
@@ -167,13 +185,15 @@ extension Selector: _SelectorBackendProtocol { | |
| oldInterested: SelectorEventSet, | ||
| registrationID: SelectorRegistrationID | ||
| ) throws { | ||
| fatalError("TODO: Unimplemented") | ||
| if let index = self.pollFDs.firstIndex(where: { $0.fd == UInt64(fileDescriptor) }) { | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Deregistering is just removing the FD. However, we can't just remove it here as we're iterating over the same |
||
| self.deregisteredFDs[index] = true | ||
| } | ||
| } | ||
|
|
||
| func wakeup0() throws { | ||
| // will be called from a different thread | ||
| let result = try self.myThread.withHandleUnderLock { handle in | ||
| QueueUserAPC(wakeupTarget, handle, 0) | ||
| let result = try self.myThread.withHandleUnderLock { threadHandle in | ||
| return QueueUserAPC(wakeupTarget, threadHandle.handle, 0) | ||
| } | ||
| if result == 0 { | ||
| let errorCode = GetLastError() | ||
|
|
@@ -185,6 +205,7 @@ extension Selector: _SelectorBackendProtocol { | |
|
|
||
| func close0() throws { | ||
| self.pollFDs.removeAll() | ||
| self.deregisteredFDs.removeAll() | ||
| } | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,6 +22,7 @@ import let WinSDK.ENFILE | |
| import let WinSDK.ENOBUFS | ||
| import let WinSDK.ENOMEM | ||
| import let WinSDK.INADDR_ANY | ||
| import let WinSDK.WSAEWOULDBLOCK | ||
|
|
||
| import struct WinSDK.ip_mreq | ||
| import struct WinSDK.ipv6_mreq | ||
|
|
@@ -383,15 +384,19 @@ final class ServerSocketChannel: BaseSocketChannel<ServerSocket>, @unchecked Sen | |
| } | ||
| guard let err = err as? IOError else { return true } | ||
|
|
||
| switch err.errnoCode { | ||
| case ECONNABORTED, | ||
| EMFILE, | ||
| ENFILE, | ||
| ENOBUFS, | ||
| ENOMEM: | ||
| switch err.error { | ||
| case .errno(ECONNABORTED), | ||
| .errno(EMFILE), | ||
| .errno(ENFILE), | ||
| .errno(ENOBUFS), | ||
| .errno(ENOMEM): | ||
| // These are errors we may be able to recover from. The user may just want to stop accepting connections for example | ||
| // or provide some other means of back-pressure. This could be achieved by a custom ChannelDuplexHandler. | ||
| return false | ||
| #if os(Windows) | ||
| case .winsock(WSAEWOULDBLOCK): | ||
| return false | ||
| #endif | ||
|
||
| default: | ||
| return true | ||
| } | ||
|
|
@@ -854,24 +859,36 @@ final class DatagramChannel: BaseSocketChannel<Socket>, @unchecked Sendable { | |
| #endif | ||
| } | ||
|
|
||
| private func shouldCloseOnErrnoCode(_ errnoCode: CInt) -> Bool { | ||
| switch errnoCode { | ||
| private func shouldCloseOnErrnoCode(_ errno: CInt) -> Bool { | ||
|
||
| switch errno { | ||
| case ECONNREFUSED, ENOMEM: | ||
| // These are errors we may be able to recover from. | ||
| return false | ||
| default: | ||
| return true | ||
| } | ||
| } | ||
|
|
||
| private func shouldCloseOnError(_ error: IOError.Error) -> Bool { | ||
| switch error { | ||
| // ECONNREFUSED can happen on linux if the previous sendto(...) failed. | ||
| // See also: | ||
| // - https://bugzilla.redhat.com/show_bug.cgi?id=1375 | ||
| // - https://lists.gt.net/linux/kernel/39575 | ||
| case ECONNREFUSED, | ||
| ENOMEM: | ||
| // These are errors we may be able to recover from. | ||
| case .errno(let code): | ||
| return self.shouldCloseOnErrnoCode(code) | ||
| #if os(Windows) | ||
| case .winsock(EWOULDBLOCK): | ||
Joannis marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| return false | ||
| #endif | ||
| default: | ||
| return true | ||
| } | ||
| } | ||
|
|
||
| override func shouldCloseOnReadError(_ err: Error) -> Bool { | ||
| guard let err = err as? IOError else { return true } | ||
| return self.shouldCloseOnErrnoCode(err.errnoCode) | ||
| return self.shouldCloseOnError(err.error) | ||
| } | ||
|
|
||
| override func error() -> ErrorResult { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,13 +18,15 @@ import WinSDK | |
|
|
||
| typealias ThreadOpsSystem = ThreadOpsWindows | ||
| enum ThreadOpsWindows: ThreadOps { | ||
| typealias ThreadHandle = HANDLE | ||
| struct ThreadHandle: @unchecked Sendable { | ||
| let handle: HANDLE | ||
| } | ||
| typealias ThreadSpecificKey = DWORD | ||
| typealias ThreadSpecificKeyDestructor = @convention(c) (UnsafeMutableRawPointer?) -> Void | ||
|
|
||
| static func threadName(_ thread: ThreadOpsSystem.ThreadHandle) -> String? { | ||
| var pszBuffer: PWSTR? | ||
| GetThreadDescription(thread, &pszBuffer) | ||
| GetThreadDescription(thread.handle, &pszBuffer) | ||
| guard let buffer = pszBuffer else { return nil } | ||
| let string: String = String(decodingCString: buffer, as: UTF16.self) | ||
| LocalFree(buffer) | ||
|
|
@@ -41,11 +43,27 @@ enum ThreadOpsWindows: ThreadOps { | |
| let routine: @convention(c) (UnsafeMutableRawPointer?) -> CUnsignedInt = { | ||
| let boxed = Unmanaged<NIOThread.ThreadBox>.fromOpaque($0!).takeRetainedValue() | ||
| let (body, name) = (boxed.value.body, boxed.value.name) | ||
| let hThread: ThreadOpsSystem.ThreadHandle = GetCurrentThread() | ||
|
|
||
| // Get a real thread handle instead of pseudo-handle | ||
| var realHandle: HANDLE? = nil | ||
| let success = DuplicateHandle( | ||
| GetCurrentProcess(), // Source process | ||
| GetCurrentThread(), // Source handle (pseudo-handle) | ||
| GetCurrentProcess(), // Target process | ||
| &realHandle, // Target handle (real handle) | ||
| 0, // Desired access (0 = same as source) | ||
| false, // Inherit handle | ||
| DWORD(DUPLICATE_SAME_ACCESS) // Options | ||
| ) | ||
|
Comment on lines
+48
to
+57
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Windows gives pseudo handles by default, so they're always correct for the current thread. However, when spawning work on another thread, this handle results in getting the incorrect ThreadID, causing |
||
|
|
||
| guard success, let realHandle else { | ||
| fatalError("DuplicateHandle failed: \(GetLastError())") | ||
| } | ||
| let hThread = ThreadOpsSystem.ThreadHandle(handle: realHandle) | ||
|
|
||
| if let name = name { | ||
| _ = name.withCString(encodedAs: UTF16.self) { | ||
| SetThreadDescription(hThread, $0) | ||
| SetThreadDescription(hThread.handle, $0) | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -58,15 +76,28 @@ enum ThreadOpsWindows: ThreadOps { | |
| } | ||
|
|
||
| static func isCurrentThread(_ thread: ThreadOpsSystem.ThreadHandle) -> Bool { | ||
| CompareObjectHandles(thread, GetCurrentThread()) | ||
| CompareObjectHandles(thread.handle, GetCurrentThread()) | ||
| } | ||
|
|
||
| static var currentThread: ThreadOpsSystem.ThreadHandle { | ||
| GetCurrentThread() | ||
| var realHandle: HANDLE? = nil | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Likewise here |
||
| let success = DuplicateHandle( | ||
| GetCurrentProcess(), | ||
| GetCurrentThread(), | ||
| GetCurrentProcess(), | ||
| &realHandle, | ||
| 0, | ||
| false, | ||
| DWORD(DUPLICATE_SAME_ACCESS) | ||
| ) | ||
| guard success, let realHandle else { | ||
| fatalError("DuplicateHandle failed: \(GetLastError())") | ||
| } | ||
| return ThreadHandle(handle: realHandle) | ||
| } | ||
|
|
||
| static func joinThread(_ thread: ThreadOpsSystem.ThreadHandle) { | ||
| let dwResult: DWORD = WaitForSingleObject(thread, INFINITE) | ||
| let dwResult: DWORD = WaitForSingleObject(thread.handle, INFINITE) | ||
| assert(dwResult == WAIT_OBJECT_0, "WaitForSingleObject: \(GetLastError())") | ||
| } | ||
|
|
||
|
|
@@ -88,7 +119,7 @@ enum ThreadOpsWindows: ThreadOps { | |
| } | ||
|
|
||
| static func compareThreads(_ lhs: ThreadOpsSystem.ThreadHandle, _ rhs: ThreadOpsSystem.ThreadHandle) -> Bool { | ||
| CompareObjectHandles(lhs, rhs) | ||
| CompareObjectHandles(lhs.handle, rhs.handle) | ||
| } | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is this case reached?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I've found is that when WSAPoll gives the server socket a
.readevent when a client is inbound:readable()->readable0()->ServerSocketChannel.readFromSocket()socket.accept(..)finds a socketSelectableEventLooprepeats this again again to get another read (see maxMessagesPerRead)socket.accept(..)runs intoWinSDK.INVALID_SOCKETWSAGetLastError().winsock(WSAEWOULDBLOCK)In hindsight, I just noticed that
accept()returns an optional so I'll leverage that instead.