Skip to content

Commit 9dcbbd2

Browse files
committed
Handle empty query response
1 parent d18b137 commit 9dcbbd2

File tree

6 files changed

+119
-19
lines changed

6 files changed

+119
-19
lines changed

Sources/PostgresNIO/New/Connection State Machine/ExtendedQueryStateMachine.swift

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ struct ExtendedQueryStateMachine {
1010
case parameterDescriptionReceived(ExtendedQueryContext)
1111
case rowDescriptionReceived(ExtendedQueryContext, [RowDescription.Column])
1212
case noDataMessageReceived(ExtendedQueryContext)
13-
13+
case emptyQueryResponseReceived
14+
1415
/// A state that is used if a noData message was received before. If a row description was received `bufferingRows` is
1516
/// used after receiving a `bindComplete` message
1617
case bindCompleteReceived(ExtendedQueryContext)
@@ -122,7 +123,7 @@ struct ExtendedQueryStateMachine {
122123
return .forwardStreamError(.queryCancelled, read: true)
123124
}
124125

125-
case .commandComplete, .error, .drain:
126+
case .commandComplete, .emptyQueryResponseReceived, .error, .drain:
126127
// the stream has already finished.
127128
return .wait
128129

@@ -229,6 +230,7 @@ struct ExtendedQueryStateMachine {
229230
.messagesSent,
230231
.parseCompleteReceived,
231232
.parameterDescriptionReceived,
233+
.emptyQueryResponseReceived,
232234
.bindCompleteReceived,
233235
.streaming,
234236
.drain,
@@ -268,6 +270,7 @@ struct ExtendedQueryStateMachine {
268270
.parseCompleteReceived,
269271
.parameterDescriptionReceived,
270272
.noDataMessageReceived,
273+
.emptyQueryResponseReceived,
271274
.rowDescriptionReceived,
272275
.bindCompleteReceived,
273276
.commandComplete,
@@ -309,6 +312,7 @@ struct ExtendedQueryStateMachine {
309312
.parseCompleteReceived,
310313
.parameterDescriptionReceived,
311314
.noDataMessageReceived,
315+
.emptyQueryResponseReceived,
312316
.rowDescriptionReceived,
313317
.commandComplete,
314318
.error:
@@ -319,7 +323,23 @@ struct ExtendedQueryStateMachine {
319323
}
320324

321325
mutating func emptyQueryResponseReceived() -> Action {
322-
preconditionFailure("Unimplemented")
326+
guard case .bindCompleteReceived(let queryContext) = self.state else {
327+
return self.setAndFireError(.unexpectedBackendMessage(.emptyQueryResponse))
328+
}
329+
330+
switch queryContext.query {
331+
case .unnamed(_, let eventLoopPromise),
332+
.executeStatement(_, let eventLoopPromise):
333+
return self.avoidingStateMachineCoW { state -> Action in
334+
state = .emptyQueryResponseReceived
335+
let result = QueryResult(value: .emptyResponse, logger: queryContext.logger)
336+
return .succeedQuery(eventLoopPromise, with: result)
337+
}
338+
339+
case .prepareStatement(_, _, _, _):
340+
preconditionFailure("Invalid state: \(self.state)")
341+
}
342+
323343
}
324344

325345
mutating func errorReceived(_ errorMessage: PostgresBackendMessage.ErrorResponse) -> Action {
@@ -336,7 +356,7 @@ struct ExtendedQueryStateMachine {
336356
return self.setAndFireError(error)
337357
case .streaming, .drain:
338358
return self.setAndFireError(error)
339-
case .commandComplete:
359+
case .commandComplete, .emptyQueryResponseReceived:
340360
return self.setAndFireError(.unexpectedBackendMessage(.error(errorMessage)))
341361
case .error:
342362
preconditionFailure("""
@@ -382,6 +402,7 @@ struct ExtendedQueryStateMachine {
382402
.parseCompleteReceived,
383403
.parameterDescriptionReceived,
384404
.noDataMessageReceived,
405+
.emptyQueryResponseReceived,
385406
.rowDescriptionReceived,
386407
.bindCompleteReceived:
387408
preconditionFailure("Requested to consume next row without anything going on.")
@@ -405,6 +426,7 @@ struct ExtendedQueryStateMachine {
405426
.parseCompleteReceived,
406427
.parameterDescriptionReceived,
407428
.noDataMessageReceived,
429+
.emptyQueryResponseReceived,
408430
.rowDescriptionReceived,
409431
.bindCompleteReceived:
410432
return .wait
@@ -449,6 +471,7 @@ struct ExtendedQueryStateMachine {
449471
}
450472
case .initialized,
451473
.commandComplete,
474+
.emptyQueryResponseReceived,
452475
.drain,
453476
.error:
454477
// we already have the complete stream received, now we are waiting for a
@@ -495,7 +518,7 @@ struct ExtendedQueryStateMachine {
495518
return .forwardStreamError(error, read: true)
496519
}
497520

498-
case .commandComplete, .error:
521+
case .commandComplete, .emptyQueryResponseReceived, .error:
499522
preconditionFailure("""
500523
This state must not be reached. If the query `.isComplete`, the
501524
ConnectionStateMachine must not send any further events to the substate machine.
@@ -518,6 +541,9 @@ struct ExtendedQueryStateMachine {
518541
return false
519542
}
520543

544+
case .emptyQueryResponseReceived:
545+
return true
546+
521547
case .initialized, .messagesSent, .parseCompleteReceived, .parameterDescriptionReceived, .bindCompleteReceived, .streaming, .drain:
522548
return false
523549

Sources/PostgresNIO/New/PSQLRowStream.swift

Lines changed: 42 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import Logging
33

44
struct QueryResult {
55
enum Value: Equatable {
6+
case emptyResponse
67
case noRows(String)
78
case rowDescription([RowDescription.Column])
89
}
@@ -19,6 +20,7 @@ final class PSQLRowStream: @unchecked Sendable {
1920
enum Source {
2021
case stream([RowDescription.Column], PSQLRowsDataSource)
2122
case noRows(Result<String, Error>)
23+
case emptyResponse
2224
}
2325

2426
let eventLoop: EventLoop
@@ -27,6 +29,7 @@ final class PSQLRowStream: @unchecked Sendable {
2729
private enum BufferState {
2830
case streaming(buffer: CircularBuffer<DataRow>, dataSource: PSQLRowsDataSource)
2931
case finished(buffer: CircularBuffer<DataRow>, commandTag: String)
32+
case empty
3033
case failure(Error)
3134
}
3235

@@ -35,6 +38,7 @@ final class PSQLRowStream: @unchecked Sendable {
3538
case iteratingRows(onRow: (PostgresRow) throws -> (), EventLoopPromise<Void>, PSQLRowsDataSource)
3639
case waitingForAll([PostgresRow], EventLoopPromise<[PostgresRow]>, PSQLRowsDataSource)
3740
case consumed(Result<String, Error>)
41+
case finished
3842
case asyncSequence(AsyncSequenceSource, PSQLRowsDataSource, onFinish: @Sendable () -> ())
3943
}
4044

@@ -58,6 +62,9 @@ final class PSQLRowStream: @unchecked Sendable {
5862
case .noRows(.failure(let error)):
5963
self.rowDescription = []
6064
bufferState = .failure(error)
65+
case .emptyResponse:
66+
self.rowDescription = []
67+
bufferState = .empty
6168
}
6269

6370
self.downstreamState = .waitingForConsumer(bufferState)
@@ -98,6 +105,11 @@ final class PSQLRowStream: @unchecked Sendable {
98105
self.downstreamState = .asyncSequence(source, dataSource, onFinish: onFinish)
99106
self.executeActionBasedOnYieldResult(yieldResult, source: dataSource)
100107

108+
case .empty:
109+
source.finish()
110+
onFinish()
111+
self.downstreamState = .finished
112+
101113
case .finished(let buffer, let commandTag):
102114
_ = source.yield(contentsOf: buffer)
103115
source.finish()
@@ -127,7 +139,7 @@ final class PSQLRowStream: @unchecked Sendable {
127139
case .waitingForConsumer, .iteratingRows, .waitingForAll:
128140
preconditionFailure("Invalid state: \(self.downstreamState)")
129141

130-
case .consumed:
142+
case .consumed, .finished:
131143
break
132144

133145
case .asyncSequence(_, let dataSource, _):
@@ -152,7 +164,7 @@ final class PSQLRowStream: @unchecked Sendable {
152164
dataSource.cancel(for: self)
153165
onFinish()
154166

155-
case .consumed:
167+
case .consumed, .finished:
156168
return
157169

158170
case .waitingForConsumer, .iteratingRows, .waitingForAll:
@@ -201,6 +213,10 @@ final class PSQLRowStream: @unchecked Sendable {
201213
case .failure(let error):
202214
self.downstreamState = .consumed(.failure(error))
203215
return self.eventLoop.makeFailedFuture(error)
216+
217+
case .empty:
218+
self.downstreamState = .finished
219+
return self.eventLoop.makeSucceededFuture([])
204220
}
205221
}
206222

@@ -247,7 +263,11 @@ final class PSQLRowStream: @unchecked Sendable {
247263
}
248264

249265
return promise.futureResult
250-
266+
267+
case .empty:
268+
self.downstreamState = .finished
269+
return self.eventLoop.makeSucceededVoidFuture()
270+
251271
case .finished(let buffer, let commandTag):
252272
do {
253273
for data in buffer {
@@ -290,9 +310,9 @@ final class PSQLRowStream: @unchecked Sendable {
290310
buffer.append(contentsOf: newRows)
291311
self.downstreamState = .waitingForConsumer(.streaming(buffer: buffer, dataSource: dataSource))
292312

293-
case .waitingForConsumer(.finished), .waitingForConsumer(.failure):
313+
case .waitingForConsumer(.finished), .waitingForConsumer(.failure), .waitingForConsumer(.empty):
294314
preconditionFailure("How can new rows be received, if an end was already signalled?")
295-
315+
296316
case .iteratingRows(let onRow, let promise, let dataSource):
297317
do {
298318
for data in newRows {
@@ -330,6 +350,9 @@ final class PSQLRowStream: @unchecked Sendable {
330350

331351
case .consumed(.failure):
332352
break
353+
354+
case .finished:
355+
preconditionFailure("How can we receive further rows, if we are supposed to be done")
333356
}
334357
}
335358

@@ -367,6 +390,9 @@ final class PSQLRowStream: @unchecked Sendable {
367390

368391
case .consumed:
369392
break
393+
394+
case .finished, .waitingForConsumer(.empty):
395+
preconditionFailure("How can we get an end for empty query response?")
370396
}
371397
}
372398

@@ -375,7 +401,7 @@ final class PSQLRowStream: @unchecked Sendable {
375401
case .waitingForConsumer(.streaming):
376402
self.downstreamState = .waitingForConsumer(.failure(error))
377403

378-
case .waitingForConsumer(.finished), .waitingForConsumer(.failure):
404+
case .waitingForConsumer(.finished), .waitingForConsumer(.failure), .waitingForConsumer(.empty):
379405
preconditionFailure("How can we get another end, if an end was already signalled?")
380406

381407
case .iteratingRows(_, let promise, _):
@@ -393,6 +419,9 @@ final class PSQLRowStream: @unchecked Sendable {
393419

394420
case .consumed:
395421
break
422+
423+
case .finished:
424+
preconditionFailure("How can we get an error for empty query response?")
396425
}
397426
}
398427

@@ -413,10 +442,14 @@ final class PSQLRowStream: @unchecked Sendable {
413442
}
414443

415444
var commandTag: String {
416-
guard case .consumed(.success(let commandTag)) = self.downstreamState else {
417-
preconditionFailure("commandTag may only be called if all rows have been consumed")
445+
switch self.downstreamState {
446+
case .consumed(.success(let commandTag)):
447+
return commandTag
448+
case .finished:
449+
return ""
450+
default:
451+
preconditionFailure("commandTag may only be called if there are no more rows to be consumed")
418452
}
419-
return commandTag
420453
}
421454
}
422455

Sources/PostgresNIO/New/PostgresChannelHandler.swift

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -556,6 +556,13 @@ final class PostgresChannelHandler: ChannelDuplexHandler {
556556
eventLoop: context.channel.eventLoop,
557557
logger: result.logger
558558
)
559+
560+
case .emptyResponse:
561+
rows = PSQLRowStream(
562+
source: .emptyResponse,
563+
eventLoop: context.channel.eventLoop,
564+
logger: result.logger
565+
)
559566
}
560567

561568
promise.succeed(rows)

Sources/PostgresNIO/PostgresDatabase+Query.swift

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,10 +73,7 @@ public struct PostgresQueryMetadata: Sendable {
7373

7474
init?(string: String) {
7575
let parts = string.split(separator: " ")
76-
guard parts.count >= 1 else {
77-
return nil
78-
}
79-
switch parts[0] {
76+
switch parts.first {
8077
case "INSERT":
8178
// INSERT oid rows
8279
guard parts.count == 3 else {

Tests/IntegrationTests/PSQLIntegrationTests.swift

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,25 @@ final class IntegrationTests: XCTestCase {
123123
XCTAssertEqual(foo, "hello")
124124
}
125125

126+
func testQueryNothing() throws {
127+
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
128+
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }
129+
let eventLoop = eventLoopGroup.next()
130+
131+
var conn: PostgresConnection?
132+
XCTAssertNoThrow(conn = try PostgresConnection.test(on: eventLoop).wait())
133+
defer { XCTAssertNoThrow(try conn?.close().wait()) }
134+
135+
var _result: PostgresQueryResult?
136+
XCTAssertNoThrow(_result = try conn?.query("""
137+
-- Some comments
138+
""", logger: .psqlTest).wait())
139+
140+
let result = try XCTUnwrap(_result)
141+
XCTAssertEqual(result.rows, [])
142+
XCTAssertEqual(result.metadata.command, "")
143+
}
144+
126145
func testDecodeIntegers() {
127146
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
128147
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }

Tests/PostgresNIOTests/New/Connection State Machine/ExtendedQueryStateMachineTests.swift

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,25 @@ class ExtendedQueryStateMachineTests: XCTestCase {
7777
XCTAssertEqual(state.commandCompletedReceived("SELECT 2"), .forwardStreamComplete([row5, row6], commandTag: "SELECT 2"))
7878
XCTAssertEqual(state.readyForQueryReceived(.idle), .fireEventReadyForQuery)
7979
}
80-
80+
81+
func testExtendedQueryWithNoQuery() {
82+
var state = ConnectionStateMachine.readyForQuery()
83+
84+
let logger = Logger.psqlTest
85+
let promise = EmbeddedEventLoop().makePromise(of: PSQLRowStream.self)
86+
promise.fail(PSQLError.uncleanShutdown) // we don't care about the error at all.
87+
let query: PostgresQuery = "-- some comments"
88+
let queryContext = ExtendedQueryContext(query: query, logger: logger, promise: promise)
89+
90+
XCTAssertEqual(state.enqueue(task: .extendedQuery(queryContext)), .sendParseDescribeBindExecuteSync(query))
91+
XCTAssertEqual(state.parseCompleteReceived(), .wait)
92+
XCTAssertEqual(state.parameterDescriptionReceived(.init(dataTypes: [.int8])), .wait)
93+
XCTAssertEqual(state.noDataReceived(), .wait)
94+
XCTAssertEqual(state.bindCompleteReceived(), .wait)
95+
XCTAssertEqual(state.emptyQueryResponseReceived(), .succeedQuery(promise, with: .init(value: .emptyResponse, logger: logger)))
96+
XCTAssertEqual(state.readyForQueryReceived(.idle), .fireEventReadyForQuery)
97+
}
98+
8199
func testReceiveTotallyUnexpectedMessageInQuery() {
82100
var state = ConnectionStateMachine.readyForQuery()
83101

0 commit comments

Comments
 (0)