Skip to content

Commit d463a6c

Browse files
jaceklaskowskizifeif2
authored andcommitted
[SPARK-54418][SDP] Fix error messages and code formatting
### What changes were proposed in this pull request? Fixes typos and cleans up code formatting (that could've been automated, but done manually this time) https://issues.apache.org/jira/browse/SPARK-54418 ### Why are the changes needed? Cleaner code with a fewer typos ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Built locally. Waiting for the official build to finish once PR's created. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#52538 from jaceklaskowski/sdp-typo-hunting-formatting. Authored-by: Jacek Laskowski <jacek@japila.pl> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
1 parent 58ea081 commit d463a6c

File tree

27 files changed

+171
-167
lines changed

27 files changed

+171
-167
lines changed

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5253,7 +5253,7 @@
52535253
"RUN_EMPTY_PIPELINE" : {
52545254
"message" : [
52555255
"Pipelines are expected to have at least one non-temporary dataset defined (tables, persisted views) but no non-temporary datasets were found in your pipeline.",
5256-
"Please verify that you have included the expected source files, and that your source code includes table definitions (e.g., CREATE MATERIALIZED VIEW in SQL code, @sdp.table in python code)."
5256+
"Please verify that you have included the expected source files, and that your source code includes table definitions (e.g., CREATE MATERIALIZED VIEW in SQL code, @dp.table in python code)."
52575257
],
52585258
"sqlState" : "42617"
52595259
},

core/src/main/scala/org/apache/spark/deploy/SparkPipelines.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.deploy
1919

20-
import java.util
20+
import java.util.{Arrays => JArrays, List => JList}
2121
import java.util.Locale
2222

2323
import scala.collection.mutable.ArrayBuffer
@@ -46,7 +46,7 @@ object SparkPipelines extends Logging {
4646
pipelinesCliFile: String,
4747
args: Array[String]): Seq[String] = {
4848
val (sparkSubmitArgs, pipelinesArgs) = splitArgs(args)
49-
(sparkSubmitArgs ++ Seq(pipelinesCliFile) ++ pipelinesArgs)
49+
sparkSubmitArgs ++ Seq(pipelinesCliFile) ++ pipelinesArgs
5050
}
5151

5252
/**
@@ -59,7 +59,7 @@ object SparkPipelines extends Logging {
5959
var remote = "local"
6060

6161
new SparkSubmitArgumentsParser() {
62-
parse(util.Arrays.asList(args: _*))
62+
parse(JArrays.asList(args: _*))
6363

6464
override protected def handle(opt: String, value: String): Boolean = {
6565
if (opt == "--remote") {
@@ -91,7 +91,7 @@ object SparkPipelines extends Logging {
9191
true
9292
}
9393

94-
override protected def handleExtraArgs(extra: util.List[String]): Unit = {
94+
override protected def handleExtraArgs(extra: JList[String]): Unit = {
9595
pipelinesArgs.appendAll(extra.asScala)
9696
}
9797

python/pyspark/pipelines/api.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -470,12 +470,12 @@ def create_sink(
470470
options: Optional[Dict[str, str]] = None,
471471
) -> None:
472472
"""
473-
Creates a sink that can be targeted by streaming flows, providing a generic destination \
473+
Creates a sink that can be targeted by streaming flows, providing a generic destination
474474
for flows to send data external to the pipeline.
475475
476476
:param name: The name of the sink.
477477
:param format: The format of the sink, e.g. "parquet".
478-
:param options: A dict where the keys are the property names and the values are the \
478+
:param options: A dict where the keys are the property names and the values are the
479479
property values. These properties will be set on the sink.
480480
"""
481481
if type(name) is not str:

python/pyspark/pipelines/cli.py

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -225,8 +225,8 @@ def register_definitions(
225225
dataflow_graph_id: str,
226226
) -> None:
227227
"""Register the graph element definitions in the pipeline spec with the given registry.
228-
- Looks for Python files matching the glob patterns in the spec and imports them.
229-
- Looks for SQL files matching the blob patterns in the spec and registers thems.
228+
- Import Python files matching the glob patterns in the spec.
229+
- Register SQL files matching the glob patterns in the spec.
230230
"""
231231
path = spec_path.parent
232232
with change_dir(path):
@@ -356,8 +356,9 @@ def parse_table_list(value: str) -> List[str]:
356356
return [table.strip() for table in value.split(",") if table.strip()]
357357

358358

359-
if __name__ == "__main__":
360-
parser = argparse.ArgumentParser(description="Pipeline CLI")
359+
def main() -> None:
360+
"""The entry point of spark-pipelines CLI."""
361+
parser = argparse.ArgumentParser(description="Pipelines CLI")
361362
subparsers = parser.add_subparsers(dest="command", required=True)
362363

363364
# "run" subcommand
@@ -375,7 +376,9 @@ def parse_table_list(value: str) -> List[str]:
375376
default=[],
376377
)
377378
run_parser.add_argument(
378-
"--full-refresh-all", action="store_true", help="Perform a full graph reset and recompute."
379+
"--full-refresh-all",
380+
action="store_true",
381+
help="Perform a full graph reset and recompute.",
379382
)
380383
run_parser.add_argument(
381384
"--refresh",
@@ -395,7 +398,7 @@ def parse_table_list(value: str) -> List[str]:
395398
# "init" subcommand
396399
init_parser = subparsers.add_parser(
397400
"init",
398-
help="Generates a simple pipeline project, including a spec file and example definitions.",
401+
help="Generate a sample pipeline project, with a spec file and example transformations.",
399402
)
400403
init_parser.add_argument(
401404
"--name",
@@ -424,7 +427,7 @@ def parse_table_list(value: str) -> List[str]:
424427
full_refresh=args.full_refresh,
425428
full_refresh_all=args.full_refresh_all,
426429
refresh=args.refresh,
427-
dry=args.command == "dry-run",
430+
dry=False,
428431
)
429432
else:
430433
assert args.command == "dry-run"
@@ -437,3 +440,7 @@ def parse_table_list(value: str) -> List[str]:
437440
)
438441
elif args.command == "init":
439442
init(args.name)
443+
444+
445+
if __name__ == "__main__":
446+
main()

python/pyspark/pipelines/spark_connect_pipeline.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ def create_dataflow_graph(
2929
default_database: Optional[str],
3030
sql_conf: Optional[Mapping[str, str]],
3131
) -> str:
32-
"""Create a dataflow graph in in the Spark Connect server.
32+
"""Create a dataflow graph in the Spark Connect server.
3333
3434
:returns: The ID of the created dataflow graph.
3535
"""
@@ -57,7 +57,7 @@ def handle_pipeline_events(iter: Iterator[Dict[str, Any]]) -> None:
5757
continue
5858
elif "pipeline_event_result" not in result.keys():
5959
raise PySparkValueError(
60-
"Pipeline logs stream handler received an unexpected result: " f"{result}"
60+
f"Pipeline logs stream handler received an unexpected result: {result}"
6161
)
6262
else:
6363
event = result["pipeline_event_result"].event
@@ -76,6 +76,7 @@ def start_run(
7676
) -> Iterator[Dict[str, Any]]:
7777
"""Start a run of the dataflow graph in the Spark Connect server.
7878
79+
:param spark: SparkSession.
7980
:param dataflow_graph_id: The ID of the dataflow graph to start.
8081
:param full_refresh: List of datasets to reset and recompute.
8182
:param full_refresh_all: Perform a full graph reset and recompute.

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -546,7 +546,6 @@ case class CreateTableAsSelect(
546546
* The base command representation for a statement that can be part of a Declarative Pipeline to
547547
* define a pipeline dataset (MV or ST).
548548
*/
549-
550549
trait CreatePipelineDataset extends Command {
551550
// The name of the dataset.
552551
val name: LogicalPlan
@@ -567,7 +566,8 @@ trait CreatePipelineDataset extends Command {
567566
/**
568567
* An extension of the base command representation that represents a CTAS style CREATE statement.
569568
*/
570-
trait CreatePipelineDatasetAsSelect extends BinaryCommand
569+
trait CreatePipelineDatasetAsSelect
570+
extends BinaryCommand
571571
with CreatePipelineDataset
572572
with CTEInChildren {
573573

@@ -2003,7 +2003,7 @@ case class Call(
20032003
* representation of the matching SQL syntax and cannot be executed. Instead, it is interpreted by
20042004
* the pipelines submodule during a pipeline execution
20052005
*
2006-
* @param name the name of this flow
2006+
* @param name the name of this flow
20072007
* @param flowOperation the logical plan of the actual transformation this flow should execute
20082008
* @param comment an optional comment describing this flow
20092009
*/

sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/DataflowGraphRegistry.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,14 @@ class DataflowGraphRegistry {
3232

3333
private val dataflowGraphs = new ConcurrentHashMap[String, GraphRegistrationContext]()
3434

35-
/** Registers a DataflowGraph and generates a unique id to associate with the graph */
35+
/**
36+
* Registers a GraphRegistrationContext and generates a unique id to associate with the graph
37+
*/
3638
def createDataflowGraph(
3739
defaultCatalog: String,
3840
defaultDatabase: String,
3941
defaultSqlConf: Map[String, String]): String = {
4042
val graphId = java.util.UUID.randomUUID().toString
41-
// TODO: propagate pipeline catalog and schema from pipeline spec here.
4243
dataflowGraphs.put(
4344
graphId,
4445
new GraphRegistrationContext(defaultCatalog, defaultDatabase, defaultSqlConf))

sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala

Lines changed: 21 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,6 @@ private[connect] object PipelinesHandler extends Logging {
4949
* Command to be handled
5050
* @param responseObserver
5151
* The response observer where the response will be sent
52-
* @param sparkSession
53-
* The spark session
5452
* @param transformRelationFunc
5553
* Function used to convert a relation to a LogicalPlan. This is used when determining the
5654
* LogicalPlan that a flow returns.
@@ -87,9 +85,7 @@ private[connect] object PipelinesHandler extends Logging {
8785
defineOutput(cmd.getDefineOutput, sessionHolder)
8886
val identifierBuilder = ResolvedIdentifier.newBuilder()
8987
resolvedDataset.catalog.foreach(identifierBuilder.setCatalogName)
90-
resolvedDataset.database.foreach { ns =>
91-
identifierBuilder.addNamespace(ns)
92-
}
88+
resolvedDataset.database.foreach(identifierBuilder.addNamespace)
9389
identifierBuilder.setTableName(resolvedDataset.identifier)
9490
val identifier = identifierBuilder.build()
9591
PipelineCommandResult
@@ -116,7 +112,7 @@ private[connect] object PipelinesHandler extends Logging {
116112
.setDefineFlowResult(
117113
PipelineCommandResult.DefineFlowResult
118114
.newBuilder()
119-
.setResolvedIdentifier(identifierBuilder)
115+
.setResolvedIdentifier(identifier)
120116
.build())
121117
.build()
122118
case proto.PipelineCommand.CommandTypeCase.START_RUN =>
@@ -181,15 +177,21 @@ private[connect] object PipelinesHandler extends Logging {
181177
val defaultCatalog = Option
182178
.when(cmd.hasDefaultCatalog)(cmd.getDefaultCatalog)
183179
.getOrElse {
184-
logInfo(s"No default catalog was supplied. Falling back to the current catalog.")
185-
sessionHolder.session.catalog.currentCatalog()
180+
val currentCatalog = sessionHolder.session.catalog.currentCatalog()
181+
logInfo(
182+
"No default catalog was supplied. " +
183+
s"Falling back to the current catalog: $currentCatalog.")
184+
currentCatalog
186185
}
187186

188187
val defaultDatabase = Option
189188
.when(cmd.hasDefaultDatabase)(cmd.getDefaultDatabase)
190189
.getOrElse {
191-
logInfo(s"No default database was supplied. Falling back to the current database.")
192-
sessionHolder.session.catalog.currentDatabase
190+
val currentDatabase = sessionHolder.session.catalog.currentDatabase
191+
logInfo(
192+
"No default database was supplied. " +
193+
s"Falling back to the current database: $currentDatabase.")
194+
currentDatabase
193195
}
194196

195197
val defaultSqlConf = cmd.getSqlConfMap.asScala.toMap
@@ -280,18 +282,15 @@ private[connect] object PipelinesHandler extends Logging {
280282
output.getSourceCodeLocation.getFileName),
281283
line = Option.when(output.getSourceCodeLocation.hasLineNumber)(
282284
output.getSourceCodeLocation.getLineNumber),
283-
objectType = Option(QueryOriginType.View.toString),
285+
objectType = Some(QueryOriginType.View.toString),
284286
objectName = Option(viewIdentifier.unquotedString),
285-
language = Option(Python())),
287+
language = Some(Python())),
286288
properties = Map.empty,
287289
sqlText = None))
288290
viewIdentifier
289291
case proto.OutputType.SINK =>
290-
val dataflowGraphId = output.getDataflowGraphId
291-
val graphElementRegistry =
292-
sessionHolder.dataflowGraphRegistry.getDataflowGraphOrThrow(dataflowGraphId)
293292
val identifier = GraphIdentifierManager
294-
.parseTableIdentifier(name = output.getOutputName, spark = sessionHolder.session)
293+
.parseTableIdentifier(output.getOutputName, sessionHolder.session)
295294
val sinkDetails = output.getSinkDetails
296295
graphElementRegistry.registerSink(
297296
SinkImpl(
@@ -305,7 +304,7 @@ private[connect] object PipelinesHandler extends Logging {
305304
output.getSourceCodeLocation.getLineNumber),
306305
objectType = Option(QueryOriginType.Sink.toString),
307306
objectName = Option(identifier.unquotedString),
308-
language = Option(Python()))))
307+
language = Some(Python()))))
309308
identifier
310309
case _ =>
311310
throw new IllegalArgumentException(s"Unknown output type: ${output.getOutputType}")
@@ -342,8 +341,7 @@ private[connect] object PipelinesHandler extends Logging {
342341
val rawDestinationIdentifier = GraphIdentifierManager
343342
.parseTableIdentifier(name = flow.getTargetDatasetName, spark = sessionHolder.session)
344343
val flowWritesToView =
345-
graphElementRegistry
346-
.getViews()
344+
graphElementRegistry.getViews
347345
.filter(_.isInstanceOf[TemporaryView])
348346
.exists(_.identifier == rawDestinationIdentifier)
349347
val flowWritesToSink =
@@ -353,7 +351,7 @@ private[connect] object PipelinesHandler extends Logging {
353351
// If the flow is created implicitly as part of defining a view or that it writes to a sink,
354352
// then we do not qualify the flow identifier and the flow destination. This is because
355353
// views and sinks are not permitted to have multipart
356-
val isImplicitFlowForTempView = (isImplicitFlow && flowWritesToView)
354+
val isImplicitFlowForTempView = isImplicitFlow && flowWritesToView
357355
val Seq(flowIdentifier, destinationIdentifier) =
358356
Seq(rawFlowIdentifier, rawDestinationIdentifier).map { rawIdentifier =>
359357
if (isImplicitFlowForTempView || flowWritesToSink) {
@@ -370,7 +368,7 @@ private[connect] object PipelinesHandler extends Logging {
370368

371369
val relationFlowDetails = flow.getRelationFlowDetails
372370
graphElementRegistry.registerFlow(
373-
new UnresolvedFlow(
371+
UnresolvedFlow(
374372
identifier = flowIdentifier,
375373
destinationIdentifier = destinationIdentifier,
376374
func = FlowAnalysis.createFlowFunctionFromLogicalPlan(
@@ -383,9 +381,9 @@ private[connect] object PipelinesHandler extends Logging {
383381
flow.getSourceCodeLocation.getFileName),
384382
line = Option.when(flow.getSourceCodeLocation.hasLineNumber)(
385383
flow.getSourceCodeLocation.getLineNumber),
386-
objectType = Option(QueryOriginType.Flow.toString),
384+
objectType = Some(QueryOriginType.Flow.toString),
387385
objectName = Option(flowIdentifier.unquotedString),
388-
language = Option(Python()))))
386+
language = Some(Python()))))
389387
flowIdentifier
390388
}
391389

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1385,7 +1385,7 @@ class SparkSqlAstBuilder extends AstBuilder {
13851385

13861386
if (colConstraints.nonEmpty) {
13871387
throw operationNotAllowed("Pipeline datasets do not currently support column constraints. " +
1388-
"Please remove and CHECK, UNIQUE, PK, and FK constraints specified on the pipeline " +
1388+
"Please remove any CHECK, UNIQUE, PK, and FK constraints specified on the pipeline " +
13891389
"dataset.", ctx)
13901390
}
13911391

sql/core/src/test/scala/org/apache/spark/sql/execution/command/CreatePipelineDatasetAsSelectParserSuiteBase.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ trait CreatePipelineDatasetAsSelectParserSuiteBase extends CommandSuiteBase {
110110
exception = ex,
111111
condition = "_LEGACY_ERROR_TEMP_0035",
112112
parameters = Map("message" -> ("Pipeline datasets do not currently support column " +
113-
"constraints. Please remove and CHECK, UNIQUE, PK, and FK constraints specified on the " +
113+
"constraints. Please remove any CHECK, UNIQUE, PK, and FK constraints specified on the " +
114114
"pipeline dataset.")),
115115
queryContext = ex.getQueryContext.map(toExpectedContext)
116116
)

0 commit comments

Comments
 (0)