Skip to content

Commit f2539d2

Browse files
ruanwenjunpan3793
andcommitted
[KYUUBI #7248] JDBC engine should cancel the statement when receive cancel operation
### Why are the changes needed? close #7248 - Cancel the jdbc statement when receive a kyuubi cancel operation ### How was this patch tested? Test by new ut case ### Was this patch authored or co-authored using generative AI tooling? No Closes #7249 from ruanwenjun/dev_wenjun_fix7248. Closes #7248 fa847ec [Cheng Pan] Apply suggestion from @pan3793 734e6c4 [ruanwenjun] polish code 76112f3 [ruanwenjun] change starrocks image to 3.3.13 f2f9eed [ruanwenjun] add status assertion in new ut case 33e0f0d [ruanwenjun] add assert in ut c22feb5 [ruanwenjun] move todo to ExecuteStatement a132f36 [ruanwenjun] improve ut f97ceae [ruanwenjun] [KYUUBI #7248] Ensure jdbc engine statements are canceled when receive cancel operation Lead-authored-by: ruanwenjun <wenjun@apache.org> Co-authored-by: Cheng Pan <pan3793@gmail.com> Signed-off-by: Cheng Pan <chengpan@apache.org>
1 parent 109815b commit f2539d2

File tree

7 files changed

+108
-13
lines changed

7 files changed

+108
-13
lines changed

externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/JdbcDialect.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,19 @@ abstract class JdbcDialect extends SupportServiceLoader with Logging {
8181
def getTRowSetGenerator(): JdbcTRowSetGenerator
8282

8383
def getSchemaHelper(): SchemaHelper
84+
85+
def cancelStatement(jdbcStatement: Statement): Unit = {
86+
if (jdbcStatement != null) {
87+
jdbcStatement.cancel()
88+
jdbcStatement.close()
89+
}
90+
}
91+
92+
def closeStatement(jdbcStatement: Statement): Unit = {
93+
if (jdbcStatement != null) {
94+
jdbcStatement.close()
95+
}
96+
}
8497
}
8598

8699
object JdbcDialects extends Logging {

externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/ExecuteStatement.scala

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,12 +120,31 @@ class ExecuteStatement(
120120
super.validateFetchOrientation(order)
121121
}
122122

123+
override def cancel(): Unit = withLockRequired {
124+
if (!isTerminalState(state)) {
125+
setState(OperationState.CANCELED)
126+
// TODO: If `shouldRunAsync` is true, the statement is initialized lazily.
127+
// When a SQL is submitted and immediately canceled, `jdbcStatement` may still be null,
128+
// which can lead to the cancellation not taking effect.
129+
if (jdbcStatement != null) {
130+
dialect.cancelStatement(jdbcStatement)
131+
jdbcStatement = null
132+
} else {
133+
warn(s"Ignore cancel operation $statementId due to jdbcStatement is null.")
134+
}
135+
}
136+
}
137+
123138
override def cleanup(targetState: OperationState): Unit = withLockRequired {
124139
try {
125140
super.cleanup(targetState)
126141
} finally {
127-
if (jdbcStatement != null && !jdbcStatement.isClosed) {
128-
jdbcStatement.close()
142+
if (jdbcStatement != null) {
143+
if (targetState == OperationState.CANCELED) {
144+
dialect.cancelStatement(jdbcStatement)
145+
} else {
146+
dialect.closeStatement(jdbcStatement)
147+
}
129148
jdbcStatement = null
130149
}
131150
}

externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,6 @@ abstract class JdbcOperation(session: Session) extends AbstractOperation(session
6262
resp
6363
}
6464

65-
override def cancel(): Unit = {
66-
cleanup(OperationState.CANCELED)
67-
}
68-
6965
override def close(): Unit = {
7066
cleanup(OperationState.CLOSED)
7167
}

externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/mysql/OperationWithEngineSuite.scala

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,14 @@
1616
*/
1717
package org.apache.kyuubi.engine.jdbc.mysql
1818

19+
import org.scalatest.concurrent.TimeLimits.failAfter
20+
import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
21+
1922
import org.apache.kyuubi.config.KyuubiConf
2023
import org.apache.kyuubi.engine.jdbc.connection.ConnectionProvider
2124
import org.apache.kyuubi.operation.HiveJDBCTestHelper
2225
import org.apache.kyuubi.shaded.hive.service.rpc.thrift._
26+
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TOperationState.{CANCELED_STATE, RUNNING_STATE}
2327

2428
class OperationWithEngineSuite extends MySQLOperationSuite with HiveJDBCTestHelper {
2529

@@ -75,4 +79,25 @@ class OperationWithEngineSuite extends MySQLOperationSuite with HiveJDBCTestHelp
7579
assert(tFetchResultsResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
7680
}
7781
}
82+
83+
test("MySQL - JDBC ExecuteStatement cancel operation should kill SQL statement") {
84+
failAfter(20.seconds) {
85+
withSessionHandle { (client, handle) =>
86+
val executeReq = new TExecuteStatementReq()
87+
executeReq.setSessionHandle(handle)
88+
// The SQL will sleep 120s
89+
executeReq.setStatement("SELECT sleep(120)")
90+
executeReq.setRunAsync(true)
91+
val executeResp = client.ExecuteStatement(executeReq)
92+
assert(executeResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
93+
94+
val operationHandle = executeResp.getOperationHandle
95+
waitForOperationStatusIn(client, operationHandle, Set(RUNNING_STATE))
96+
97+
val cancelResp = client.CancelOperation(new TCancelOperationReq(operationHandle))
98+
assert(cancelResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
99+
waitForOperationStatusIn(client, operationHandle, Set(CANCELED_STATE))
100+
}
101+
}
102+
}
78103
}

externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/starrocks/StarRocksOperationWithEngineSuite.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,15 @@
1616
*/
1717
package org.apache.kyuubi.engine.jdbc.starrocks
1818

19+
import scala.concurrent.duration.DurationInt
20+
21+
import org.scalatest.concurrent.TimeLimits.failAfter
22+
1923
import org.apache.kyuubi.config.KyuubiConf
2024
import org.apache.kyuubi.engine.jdbc.connection.ConnectionProvider
2125
import org.apache.kyuubi.operation.HiveJDBCTestHelper
2226
import org.apache.kyuubi.shaded.hive.service.rpc.thrift._
27+
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TOperationState.{CANCELED_STATE, RUNNING_STATE}
2328

2429
class StarRocksOperationWithEngineSuite extends StarRocksOperationSuite with HiveJDBCTestHelper {
2530

@@ -75,4 +80,25 @@ class StarRocksOperationWithEngineSuite extends StarRocksOperationSuite with Hiv
7580
assert(tFetchResultsResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
7681
}
7782
}
83+
84+
test("StarRocks - JDBC ExecuteStatement cancel operation should kill SQL statement") {
85+
failAfter(20.seconds) {
86+
withSessionHandle { (client, handle) =>
87+
val executeReq = new TExecuteStatementReq()
88+
executeReq.setSessionHandle(handle)
89+
// The SQL will sleep 120s
90+
executeReq.setStatement("SELECT sleep(120)")
91+
executeReq.setRunAsync(true)
92+
val executeResp = client.ExecuteStatement(executeReq)
93+
assert(executeResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
94+
95+
val operationHandle = executeResp.getOperationHandle
96+
waitForOperationStatusIn(client, operationHandle, Set(RUNNING_STATE))
97+
98+
val cancelResp = client.CancelOperation(new TCancelOperationReq(operationHandle))
99+
assert(cancelResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
100+
waitForOperationStatusIn(client, operationHandle, Set(CANCELED_STATE))
101+
}
102+
}
103+
}
78104
}

externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/starrocks/WithStarRocksContainer.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.kyuubi.engine.jdbc.WithJdbcServerContainer
2626

2727
trait WithStarRocksContainer extends WithJdbcServerContainer {
2828

29-
private val starrocksDockerImage = "starrocks/allin1-ubuntu:3.1.6"
29+
private val starrocksDockerImage = "starrocks/allin1-ubuntu:3.3.13"
3030

3131
private val STARROCKS_FE_MYSQL_PORT = 9030
3232
private val STARROCKS_FE_HTTP_PORT = 8030
@@ -47,7 +47,7 @@ trait WithStarRocksContainer extends WithJdbcServerContainer {
4747
.withStrategy(Wait.forListeningPorts(ports: _*))
4848
.withStrategy(forLogMessage(".*broker service already added into FE service.*", 1))
4949
.withStrategy(
50-
forLogMessage(".*Enjoy the journal to StarRocks blazing-fast lake-house engine.*", 1)))
50+
forLogMessage(".*Enjoy the journey to StarRocks blazing-fast lake-house engine.*", 1)))
5151

5252
protected def feJdbcUrl: String = withContainers { container =>
5353
val queryServerHost: String = container.host

kyuubi-common/src/test/scala/org/apache/kyuubi/operation/HiveJDBCTestHelper.scala

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -124,11 +124,27 @@ trait HiveJDBCTestHelper extends JDBCTestHelper {
124124
}
125125

126126
def waitForOperationToComplete(client: Iface, op: TOperationHandle): Unit = {
127-
val req = new TGetOperationStatusReq(op)
128-
var state = client.GetOperationStatus(req).getOperationState
129-
eventually(timeout(90.seconds), interval(100.milliseconds)) {
130-
state = client.GetOperationStatus(req).getOperationState
131-
assert(!Set(INITIALIZED_STATE, PENDING_STATE, RUNNING_STATE).contains(state))
127+
waitForOperationStatusIn(
128+
client,
129+
op,
130+
Set(
131+
FINISHED_STATE,
132+
CANCELED_STATE,
133+
CLOSED_STATE,
134+
ERROR_STATE,
135+
UKNOWN_STATE,
136+
TIMEDOUT_STATE),
137+
timeoutMs = 90000)
138+
}
139+
140+
def waitForOperationStatusIn(
141+
client: Iface,
142+
op: TOperationHandle,
143+
status: Set[TOperationState],
144+
timeoutMs: Int = 5000): Unit = {
145+
eventually(timeout(timeoutMs.milliseconds), interval(100.milliseconds)) {
146+
val state = client.GetOperationStatus(new TGetOperationStatusReq(op)).getOperationState
147+
assert(status.contains(state))
132148
}
133149
}
134150

0 commit comments

Comments
 (0)