Skip to content

Commit a1a08e7

Browse files
turboFeipan3793
authored andcommitted
[KYUUBI #7132] Respect kyuubi.session.engine.startup.waitCompletion for wait engine completion
### Why are the changes needed? We should not fail the batch submission if the submit process is alive and wait engine completion is false. Especially for spark on kubernetes, the app might failed with NOT_FOUND state if the spark submit process running time more than the submit timeout. In this PR, if the `kyuubi.session.engine.startup.waitCompletion` is false, when getting the application info, it use current timestamp as submit time to prevent the app failed with NOT_FOUND state due to submit timeout. ### How was this patch tested? Pass current GA and manually testing. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #7132 from turboFei/batch_submit. Closes #7132 efb06db [Wang, Fei] refine 7e453c1 [Wang, Fei] refine 7bca1a7 [Wang, Fei] Prevent potential timeout durartion polling the application info 15529ab [Wang, Fei] prevent metadata manager fail 38335f2 [Wang, Fei] refine 9b8a9fd [Wang, Fei] comments 11f607d [Wang, Fei] docs f2f6ba1 [Wang, Fei] revert 2da0705 [Wang, Fei] wait for if not wait complete d849634 [Wang, Fei] revert check in loop b4cf50a [Wang, Fei] comments 8c262b7 [Wang, Fei] refine ecf379b [Wang, Fei] Revert conf change 60dc167 [Wang, Fei] enlarge 4d0aa54 [Wang, Fei] Save 4aea965 [Wang, Fei] refine 2ad75fc [Wang, Fei] nit a71b11d [Wang, Fei] Do not fail batch if the process is alive Authored-by: Wang, Fei <fwang12@ebay.com> Signed-off-by: Cheng Pan <chengpan@apache.org>
1 parent c0d4980 commit a1a08e7

File tree

8 files changed

+84
-30
lines changed

8 files changed

+84
-30
lines changed

docs/deployment/migration-guide.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121

2222
* Since Kyuubi 1.11, the configuration `spark.sql.watchdog.forcedMaxOutputRows` provided by Kyuubi Spark extension is removed, consider using `kyuubi.operation.result.max.rows` instead. Note, the latter works without requirement of installing Kyuubi Spark extension.
2323

24+
* Since Kyuubi 1.11, if the engine is running in cluster mode, Kyuubi will respect the `kyuubi.session.engine.startup.waitCompletion` config to determine whether to wait for the engine completion or not. If the engine is running in client mode, Kyuubi will always wait for the engine completion. And for Spark engine, Kyuubi will append the `spark.yarn.submit.waitAppCompletion` and `spark.kubernetes.submission.waitAppCompletion` configs to the engine conf based on the value of `kyuubi.session.engine.startup.waitCompletion`.
25+
2426
## Upgrading from Kyuubi 1.9 to 1.10
2527

2628
* Since Kyuubi 1.10, `beeline` is deprecated and will be removed in the future, please use `kyuubi-beeline` instead.

kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -324,8 +324,7 @@ private[kyuubi] class EngineRef(
324324
engineRef.get
325325
} finally {
326326
if (acquiredPermit) startupProcessSemaphore.foreach(_.release())
327-
val waitCompletion = conf.get(KyuubiConf.SESSION_ENGINE_STARTUP_WAIT_COMPLETION)
328-
val destroyProcess = !waitCompletion && builder.isClusterMode()
327+
val destroyProcess = !builder.waitEngineCompletion
329328
if (destroyProcess) {
330329
info("Destroy the builder process because waitCompletion is false" +
331330
" and the engine is running in cluster mode.")

kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -292,20 +292,27 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
292292
throw new IllegalStateException("Methods initialize and isSupported must be called ahead")
293293
}
294294
debug(s"Getting application[${toLabel(tag)}]'s info from Kubernetes cluster")
295+
val startTime = System.currentTimeMillis()
295296
try {
296297
// need to initialize the kubernetes client if not exists
297298
getOrCreateKubernetesClient(appMgrInfo.kubernetesInfo)
298299
val appInfo = appInfoStore.get(tag) match {
299300
case (_, info) => info
300301
case _ =>
301302
// try to get the application info from kubernetes engine info store
302-
metadataManager.flatMap(
303-
_.getKubernetesApplicationInfo(tag)).getOrElse(ApplicationInfo.NOT_FOUND)
303+
try {
304+
metadataManager.flatMap(
305+
_.getKubernetesApplicationInfo(tag)).getOrElse(ApplicationInfo.NOT_FOUND)
306+
} catch {
307+
case e: Exception =>
308+
error(s"Failed to get application info from metadata manager for ${toLabel(tag)}", e)
309+
ApplicationInfo.NOT_FOUND
310+
}
304311
}
305312
(appInfo.state, submitTime) match {
306313
// Kyuubi should wait second if pod is not be created
307314
case (NOT_FOUND, Some(_submitTime)) =>
308-
val elapsedTime = System.currentTimeMillis - _submitTime
315+
val elapsedTime = startTime - _submitTime
309316
if (elapsedTime > submitTimeout) {
310317
error(s"Can't find target driver pod by ${toLabel(tag)}, " +
311318
s"elapsed time: ${elapsedTime}ms exceeds ${submitTimeout}ms.")

kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,10 @@ trait ProcBuilder {
355355
def clusterManager(): Option[String] = None
356356

357357
def appMgrInfo(): ApplicationManagerInfo = ApplicationManagerInfo(None)
358+
359+
def waitEngineCompletion: Boolean = {
360+
!isClusterMode() || conf.get(KyuubiConf.SESSION_ENGINE_STARTUP_WAIT_COMPLETION)
361+
}
358362
}
359363

360364
object ProcBuilder extends Logging {

kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,12 +131,13 @@ class YarnApplicationOperation extends ApplicationOperation with Logging {
131131
proxyUser: Option[String] = None,
132132
submitTime: Option[Long] = None): ApplicationInfo = withYarnClient(proxyUser) { yarnClient =>
133133
debug(s"Getting application info from YARN cluster by tag: $tag")
134+
val startTime = System.currentTimeMillis()
134135
val reports = yarnClient.getApplications(null, null, Set(tag).asJava)
135136
if (reports.isEmpty) {
136137
debug(s"Can't find target application from YARN cluster by tag: $tag")
137138
submitTime match {
138139
case Some(_submitTime) =>
139-
val elapsedTime = System.currentTimeMillis - _submitTime
140+
val elapsedTime = startTime - _submitTime
140141
if (elapsedTime < submitTimeout) {
141142
info(s"Wait for YARN application[tag: $tag] to be submitted, " +
142143
s"elapsed time: ${elapsedTime}ms, return ${ApplicationInfo.UNKNOWN} status")

kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ class SparkBatchProcessBuilder(
5757
sparkAppNameConf() ++
5858
engineLogPathConf() ++
5959
appendPodNameConf(batchConf) ++
60-
prepareK8sFileUploadPath()).map { case (k, v) =>
60+
prepareK8sFileUploadPath() ++
61+
engineWaitCompletionConf()).map { case (k, v) =>
6162
buffer ++= confKeyValue(convertConfigKey(k), v)
6263
}
6364

@@ -79,15 +80,7 @@ class SparkBatchProcessBuilder(
7980

8081
override protected def module: String = "kyuubi-spark-batch-submit"
8182

82-
override def clusterManager(): Option[String] = {
83-
batchConf.get(MASTER_KEY).orElse(super.clusterManager())
84-
}
85-
86-
override def kubernetesContext(): Option[String] = {
87-
batchConf.get(KUBERNETES_CONTEXT_KEY).orElse(super.kubernetesContext())
88-
}
89-
90-
override def kubernetesNamespace(): Option[String] = {
91-
batchConf.get(KUBERNETES_NAMESPACE_KEY).orElse(super.kubernetesNamespace())
83+
override private[spark] def getSparkOption(key: String) = {
84+
batchConf.get(key).orElse(super.getSparkOption(key))
9285
}
9386
}

kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,8 @@ class SparkProcessBuilder(
150150
engineLogPathConf ++
151151
extraYarnConf(allConf) ++
152152
appendPodNameConf(allConf) ++
153-
prepareK8sFileUploadPath()).foreach {
153+
prepareK8sFileUploadPath() ++
154+
engineWaitCompletionConf).foreach {
154155
case (k, v) => buffer ++= confKeyValue(convertConfigKey(k), v)
155156
}
156157

@@ -325,11 +326,11 @@ class SparkProcessBuilder(
325326
}
326327

327328
override def clusterManager(): Option[String] = {
328-
conf.getOption(MASTER_KEY).orElse(defaultsConf.get(MASTER_KEY))
329+
getSparkOption(MASTER_KEY)
329330
}
330331

331332
def deployMode(): Option[String] = {
332-
conf.getOption(DEPLOY_MODE_KEY).orElse(defaultsConf.get(DEPLOY_MODE_KEY))
333+
getSparkOption(DEPLOY_MODE_KEY)
333334
}
334335

335336
override def isClusterMode(): Boolean = {
@@ -346,16 +347,15 @@ class SparkProcessBuilder(
346347
}
347348

348349
def kubernetesContext(): Option[String] = {
349-
conf.getOption(KUBERNETES_CONTEXT_KEY).orElse(defaultsConf.get(KUBERNETES_CONTEXT_KEY))
350+
getSparkOption(KUBERNETES_CONTEXT_KEY)
350351
}
351352

352353
def kubernetesNamespace(): Option[String] = {
353-
conf.getOption(KUBERNETES_NAMESPACE_KEY).orElse(defaultsConf.get(KUBERNETES_NAMESPACE_KEY))
354+
getSparkOption(KUBERNETES_NAMESPACE_KEY)
354355
}
355356

356357
def kubernetesFileUploadPath(): Option[String] = {
357-
conf.getOption(KUBERNETES_FILE_UPLOAD_PATH)
358-
.orElse(defaultsConf.get(KUBERNETES_FILE_UPLOAD_PATH))
358+
getSparkOption(KUBERNETES_FILE_UPLOAD_PATH)
359359
}
360360

361361
override def validateConf(): Unit = Validator.validateConf(conf)
@@ -373,6 +373,25 @@ class SparkProcessBuilder(
373373
private[spark] def engineLogPathConf(): Map[String, String] = {
374374
Map(KYUUBI_ENGINE_LOG_PATH_KEY -> engineLog.getAbsolutePath)
375375
}
376+
377+
private[spark] def getSparkOption(key: String): Option[String] = {
378+
conf.getOption(key).orElse(defaultsConf.get(key))
379+
}
380+
381+
override def waitEngineCompletion: Boolean = {
382+
!isClusterMode() || getSparkOption(KyuubiConf.SESSION_ENGINE_STARTUP_WAIT_COMPLETION.key)
383+
.getOrElse(KyuubiConf.SESSION_ENGINE_STARTUP_WAIT_COMPLETION.defaultValStr)
384+
.toBoolean
385+
}
386+
387+
def engineWaitCompletionConf(): Map[String, String] =
388+
clusterManager().map(_.toLowerCase(Locale.ROOT)) match {
389+
case Some(m) if m.startsWith("yarn") =>
390+
Map(YARN_SUBMIT_WAIT_APP_COMPLETION -> waitEngineCompletion.toString)
391+
case Some(m) if m.startsWith("k8s") =>
392+
Map(KUBERNETES_SUBMISSION_WAIT_APP_COMPLETION -> waitEngineCompletion.toString)
393+
case _ => Map.empty
394+
}
376395
}
377396

378397
object SparkProcessBuilder {
@@ -384,7 +403,10 @@ object SparkProcessBuilder {
384403
final val KUBERNETES_NAMESPACE_KEY = "spark.kubernetes.namespace"
385404
final val KUBERNETES_DRIVER_POD_NAME = "spark.kubernetes.driver.pod.name"
386405
final val KUBERNETES_EXECUTOR_POD_NAME_PREFIX = "spark.kubernetes.executor.podNamePrefix"
406+
final val KUBERNETES_SUBMISSION_WAIT_APP_COMPLETION =
407+
"spark.kubernetes.submission.waitAppCompletion"
387408
final val YARN_MAX_APP_ATTEMPTS_KEY = "spark.yarn.maxAppAttempts"
409+
final val YARN_SUBMIT_WAIT_APP_COMPLETION = "spark.yarn.submit.waitAppCompletion"
388410
final val INTERNAL_RESOURCE = "spark-internal"
389411

390412
final val KUBERNETES_FILE_UPLOAD_PATH = "spark.kubernetes.file.upload.path"

kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,26 @@ class BatchJobSubmission(
7979
def appStartTime: Long = _appStartTime
8080
def appStarted: Boolean = _appStartTime > 0
8181

82-
private lazy val _submitTime = if (appStarted) _appStartTime else System.currentTimeMillis
82+
private lazy val _submitTime = System.currentTimeMillis
83+
84+
/**
85+
* Batch submission refers to the time interval from the start of the batch operation run
86+
* to the application being started. This method returns the time within this interval based
87+
* on the following conditions:
88+
* 1. If the application has been submitted to resource manager, return the time that application
89+
* submitted to resource manager.
90+
* 2. If the builder process does not wait for the engine completion and the process has not been
91+
* terminated, return the current time to prevent application failed within NOT_FOUND state if the
92+
* process duration exceeds the application submit timeout.
93+
* 3. Otherwise, return the time that start to run the batch job submission operation.
94+
*/
95+
private def submitTime: Long = if (appStarted) {
96+
_appStartTime
97+
} else if (!waitEngineCompletion && !startupProcessTerminated) {
98+
System.currentTimeMillis()
99+
} else {
100+
_submitTime
101+
}
83102

84103
@VisibleForTesting
85104
private[kyuubi] val builder: ProcBuilder = {
@@ -100,11 +119,16 @@ class BatchJobSubmission(
100119
getOperationLog)
101120
}
102121

122+
private lazy val waitEngineCompletion = builder.waitEngineCompletion
123+
103124
private lazy val appOperation = applicationManager.getApplicationOperation(builder.appMgrInfo())
104125

105126
def startupProcessAlive: Boolean =
106127
builder.processLaunched && Option(builder.process).exists(_.isAlive)
107128

129+
private def startupProcessTerminated: Boolean =
130+
builder.processLaunched && Option(builder.process).forall(!_.isAlive)
131+
108132
override def currentApplicationInfo(): Option[ApplicationInfo] = {
109133
if (isTerminal(state) && _applicationInfo.map(_.state).exists(ApplicationState.isTerminated)) {
110134
return _applicationInfo
@@ -114,7 +138,7 @@ class BatchJobSubmission(
114138
builder.appMgrInfo(),
115139
batchId,
116140
Some(session.user),
117-
Some(_submitTime))
141+
Some(submitTime))
118142
applicationId(applicationInfo).foreach { _ =>
119143
if (_appStartTime <= 0) {
120144
_appStartTime = System.currentTimeMillis()
@@ -299,6 +323,11 @@ class BatchJobSubmission(
299323

300324
if (!process.isAlive) {
301325
doUpdateApplicationInfoMetadataIfNeeded()
326+
val exitValue = process.exitValue()
327+
if (exitValue != 0) {
328+
throw new KyuubiException(
329+
s"Process exit with value $exitValue, application info: ${_applicationInfo}")
330+
}
302331
}
303332

304333
if (applicationFailed(_applicationInfo, appOperation)) {
@@ -323,10 +352,7 @@ class BatchJobSubmission(
323352
case None =>
324353
}
325354
} finally {
326-
val waitCompletion = batchConf.get(KyuubiConf.SESSION_ENGINE_STARTUP_WAIT_COMPLETION.key)
327-
.map(_.toBoolean).getOrElse(
328-
session.sessionConf.get(KyuubiConf.SESSION_ENGINE_STARTUP_WAIT_COMPLETION))
329-
val destroyProcess = !waitCompletion && builder.isClusterMode()
355+
val destroyProcess = !waitEngineCompletion
330356
if (destroyProcess) {
331357
info("Destroy the builder process because waitCompletion is false" +
332358
" and the engine is running in cluster mode.")

0 commit comments

Comments
 (0)