Skip to content

Commit 4ea5ed4

Browse files
authored
Added Spark-Parquet-KDF example (#1391)
* Added a prototype for Spark-Parquet-KDF example * Improved Spark-Parquet-KDF example: added pipeline export, updated Arrow/Parquet JDK compatibility, and refactored file handling. * Enhanced Spark-Parquet-KDF example: added linear model plotting with Kandy, updated regression parameters, and extended README with step-by-step walkthrough. * Refined Spark-Parquet-KDF example: removed unused imports, improved README formatting, updated image reference, and added SparkSession configuration for Hadoop-free setup with Arrow support. * Updated Spark-Parquet-KDF example: adjusted step numbering, improved README clarity, added clarifications in comments, refined Hadoop configuration handling, and enhanced pipeline export/inspection functionality.
1 parent 85fdd63 commit 4ea5ed4

File tree

6 files changed

+21230
-0
lines changed

6 files changed

+21230
-0
lines changed
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
# spark-parquet-dataframe
2+
3+
This example shows how to:
4+
- Load a CSV (California Housing) with local Apache Spark
5+
- Write it to Parquet, then read Parquet back with Kotlin DataFrame (Arrow-based reader)
6+
- Train a simple Linear Regression model with Spark MLlib
7+
- Export the model in two ways and explain why we do both
8+
- Inspect the saved Spark model artifacts
9+
- Build a 2D plot for a single model coefficient
10+
11+
Below is a faithful, step-by-step walkthrough matching the code in `SparkParquetDataframe.kt`.
12+
13+
## The 10 steps of the example (with explanations)
14+
15+
1. Start local Spark
16+
- A local `SparkSession` is created. The example configures Spark to work against the local filesystem and sets Java options required by Arrow/Parquet.
17+
18+
2. Read `housing.csv` with Spark
19+
- Spark loads the CSV with header and automatic schema inference into a Spark DataFrame.
20+
21+
3. Show the Spark DataFrame and write it to Parquet
22+
- `show(10, false)` prints the first rows for inspection.
23+
- The DataFrame is written to a temporary directory in Parquet format.
24+
25+
4. Read this Parquet with Kotlin DataFrame (Arrow backend)
26+
- Kotlin DataFrame reads the concrete `part-*.parquet` files produced by Spark using the Arrow-based Parquet reader.
27+
28+
5. Print `head()` of the Kotlin DataFrame
29+
- A quick glance at the loaded data in Kotlin DataFrame form.
30+
31+
6. Train a regression model with Spark MLlib
32+
- Numeric features are assembled with `VectorAssembler` (the categorical `ocean_proximity` is excluded).
33+
- A `LinearRegression` model (no intercept in the code, elasticNet=0.5, maxIter=10) is trained on a train split.
34+
35+
7. Export model summary to Parquet (tabular, portable)
36+
- The learned coefficients are paired with their feature names, plus a special row for the intercept.
37+
- This small, explicit summary table is written to Parquet. It’s easy to exchange and read without Spark.
38+
39+
8. Read the model-summary Parquet with Kotlin DataFrame
40+
- Kotlin DataFrame reads the summary Parquet and prints its head. This is the portable path for analytics/visualization.
41+
42+
9. Save the full fitted PipelineModel
43+
- The entire fitted `PipelineModel` is saved using Spark’s native ML writer. This produces a directory with both JSON metadata and Parquet data.
44+
45+
10. Inspect pipeline internals using Kotlin DataFrame
46+
- For exploration, the example then reads some of those JSON and Parquet files back using Kotlin DataFrame.
47+
- Notes:
48+
- Internal folder names contain stage indices and UIDs (e.g., `0_...`, `1_...`) and may vary across Spark versions.
49+
- This inspection method is for exploration only. For reuse in Spark, you should load using `PipelineModel.load(...)`.
50+
- Sub-steps:
51+
- 10.1 Root metadata (JSON): read each file under `.../metadata/` and print heads.
52+
- 10.2 Stage 0 (VectorAssembler): read JSON metadata and Parquet data under `.../stages/0_*/{metadata,data}` if present.
53+
- 10.3 Stage 1 (LinearRegressionModel): read JSON metadata and Parquet data under `.../stages/1_*/{metadata,data}` if present.
54+
55+
11. Build a 2D plot using one coefficient
56+
- We choose the feature `median_income` and the label `median_house_value` to produce a 2D scatter plot.
57+
- From the summary table, we extract the slope for `median_income` and the intercept, and draw the line `y = slope*x + intercept`.
58+
- Sub-steps:
59+
- 11.1 Concatenate any metadata JSON frames that were successfully read (optional, for inspection).
60+
- 11.2 Use the model-summary table (coefficients + intercept) as the unified model data source.
61+
- 11.3 Compute the slope/intercept for the chosen feature from the summary table.
62+
- 11.4 Create a Kandy plot (points + abLine) and save it to `linear_model_plot.jpg`.
63+
- The plot is saved as `linear_model_plot.jpg` (an example image is committed at `lets-plot-images/linear_model_plot.jpg`).
64+
65+
![Linear model plot](src/main/resources/linear_model_plot.jpg)
66+
67+
## Why two ways to serialize the model?
68+
69+
We deliberately show both because they serve different goals:
70+
- Tabular summary (Parquet):
71+
- A small, human- and tool-friendly table of coefficients + intercept.
72+
- Portable across tools; easy to read directly in Kotlin DataFrame, pandas, SQL engines, etc.
73+
- Great for analytics, reporting, and plotting.
74+
- Full Spark ML writer (PipelineModel.save):
75+
- Contains everything needed to reuse the trained model inside Spark (including metadata and internal data).
76+
- Directory layout and file names aren’t guaranteed to be stable across versions; the intended way to consume is `PipelineModel.load(...)` in Spark.
77+
- Not ideal as a cross-tool tabular export, but perfect for production use in Spark pipelines.
78+
79+
## Why do we plot only one coefficient?
80+
81+
The linear model has multiple coefficients (one per feature). A 2D chart can only show two axes. To visualize the learned relationship, we pick a single feature (here, `median_income`) and the target (`median_house_value`) and draw the corresponding fitted line. You can repeat the procedure with any other feature to obtain a different 2D projection of the multi-dimensional model.
82+
83+
## About the dataset (`housing.csv`)
84+
85+
1. __longitude:__ How far west a house is; higher values are farther west
86+
2. __latitude:__ How far north a house is; higher values are farther north
87+
3. __housingMedianAge:__ Median age of a house within a block; lower means newer
88+
4. __totalRooms:__ Total number of rooms within a block
89+
5. __totalBedrooms:__ Total number of bedrooms within a block
90+
6. __population:__ Total number of people residing within a block
91+
7. __households:__ Total number of households within a block
92+
8. __medianIncome:__ Median household income (in tens of thousands of USD)
93+
9. __medianHouseValue:__ Median house value (in USD)
94+
10. __oceanProximity:__ Location of the house with respect to the ocean/sea
95+
96+
The CSV file is located at `examples/housing.csv` in the repository root.
97+
98+
## Windows note
99+
100+
<details>
101+
<summary>Running on Windows: install winutils and set Hadoop environment variables</summary>
102+
103+
On Windows, Spark may require Hadoop native helpers. If you see errors like "winutils.exe not found" or permission/FS issues, do the following:
104+
105+
1. Install winutils.exe that matches your Spark/Hadoop version and place it under a Hadoop directory, e.g. `C:\hadoop\bin\winutils.exe`.
106+
2. Set environment variables:
107+
- `HADOOP_HOME=C:\hadoop`
108+
- Add `%HADOOP_HOME%\bin` to your `PATH`
109+
3. Restart your IDE/terminal so the variables are picked up and re-run the example.
110+
111+
This ensures Spark can operate correctly with Hadoop on Windows.
112+
</details>
113+
114+
115+
## SparkSession configuration to bypass Hadoop/winutils and enable Arrow
116+
117+
Use the following SparkSession builder if you want to completely avoid native Hadoop libraries (including winutils on Windows) and enable Arrow-related add-opens:
118+
119+
```kotlin
120+
val spark = SparkSession.builder()
121+
.appName("spark-parquet-dataframe")
122+
.master("local[*]")
123+
.config("spark.sql.warehouse.dir", Files.createTempDirectory("spark-warehouse").toString())
124+
// Completely bypass native Hadoop libraries and winutils
125+
.config("spark.hadoop.fs.defaultFS", "file:///")
126+
.config("spark.hadoop.fs.AbstractFileSystem.file.impl", "org.apache.hadoop.fs.local.LocalFs")
127+
.config("spark.hadoop.fs.file.impl.disable.cache", "true")
128+
// Disable Hadoop native library requirements and native warnings
129+
.config("spark.hadoop.hadoop.native.lib", "false")
130+
.config("spark.hadoop.io.native.lib.available", "false")
131+
.config(
132+
"spark.driver.extraJavaOptions",
133+
"--add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED"
134+
)
135+
.config(
136+
"spark.executor.extraJavaOptions",
137+
"--add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED"
138+
)
139+
.getOrCreate()
140+
```
141+
142+
Notes:
143+
- This configuration uses the pure-Java local filesystem (file://) and disables Hadoop native library checks, making winutils unnecessary.
144+
- If you rely on HDFS or native Hadoop tooling, omit these overrides and configure Hadoop as usual.
145+
146+
## What each Spark config does (and why it matters on JDK 21 and the Java module system)
147+
- `spark.sql.warehouse.dir=Files.createTempDirectory("spark-warehouse").toString()`
148+
- Points Spark SQL’s warehouse to an ephemeral, writable temp directory.
149+
- Avoids permission issues and clutter in the project directory, especially on Windows.
150+
- `spark.hadoop.fs.defaultFS = file:///`
151+
- Forces Hadoop to use the local filesystem instead of HDFS.
152+
- Bypasses native Hadoop bits and makes winutils unnecessary on Windows for this example.
153+
- `spark.hadoop.fs.AbstractFileSystem.file.impl = org.apache.hadoop.fs.local.LocalFs`
154+
- Ensures the AbstractFileSystem implementation resolves to the pure-Java LocalFs.
155+
- `spark.hadoop.fs.file.impl.disable.cache = true`
156+
- Disables FS implementation caching so the LocalFs overrides are applied immediately within the current JVM.
157+
- `spark.hadoop.hadoop.native.lib = false` and `spark.hadoop.io.native.lib.available = false`
158+
- Tell Hadoop not to load native libraries and suppress related warnings.
159+
- Prevents errors stemming from missing native binaries (e.g., winutils) when you only need local file IO.
160+
- `spark.driver.extraJavaOptions` and `spark.executor.extraJavaOptions` with:
161+
`--add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED`
162+
- Why needed: Starting with the Java Platform Module System (JDK 9+) and especially under JDK 17/21 (JEP 403 strong encapsulation), reflective access into JDK internals is restricted. Apache Arrow (used by the vectorized Parquet reader in Kotlin DataFrame) may need reflective access within java.nio for memory management and buffer internals. Without opening the package, you can get errors like:
163+
- `java.lang.reflect.InaccessibleObjectException: module java.base does not open java.nio to org.apache.arrow.memory.core`
164+
- ...does not open `java.nio` to unnamed module @xxxx
165+
- What it does: Opens the `java.nio` package in module `java.base` at runtime to both the named module org.apache.arrow.memory.core (when Arrow is on the module path) and to ALL-UNNAMED (when Arrow is on the classpath). This enables Arrow’s memory code to work on modern JDKs.
166+
- Driver vs executor: In `local[*]` both apply to the same process, but keeping both symmetric makes this snippet cluster-ready (executors are separate JVMs).
167+
- When you might not need it: On JDK 8 (no module system) or if your stack does not use Arrow’s vectorized path. On JDK 17/21+, keep it if you see `InaccessibleObjectException` referencing `java.nio`.
168+
- Other packages: Some environments/libraries (e.g., Netty) may require additional opens such as `--add-opens=java.base/sun.nio.ch=ALL-UNNAMED`. Only add the opens that your error messages explicitly mention.
169+
- Security note: add-opens affects only the current JVM process at runtime; it doesn’t change compile-time checks or system-wide settings.
170+
171+
## Troubleshooting on JDK 17+
172+
- Symptom: `InaccessibleObjectException` mentioning java.nio or “illegal reflective access” warnings.
173+
- Fix: Ensure both spark.driver.extraJavaOptions and `spark.executor.extraJavaOptions` include the exact `--add-opens` string shown above.
174+
- Symptom: Works in IDE, fails with spark-submit.
175+
- Fix: Pass the options with `--conf spark.driver.extraJavaOptions=...` and `--conf spark.executor.extraJavaOptions=...` (or via SPARK_SUBMIT_OPTS), not only in IDE settings.
176+
- Symptom: On Windows, “winutils.exe not found”.
177+
- Fix: Either use this configuration block (bypassing native Hadoop) or install winutils as described in the Windows note above.
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
import org.jetbrains.kotlin.gradle.dsl.JvmTarget
2+
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
3+
4+
plugins {
5+
application
6+
kotlin("jvm")
7+
8+
id("org.jetbrains.kotlinx.dataframe")
9+
10+
// only mandatory if `kotlin.dataframe.add.ksp=false` in gradle.properties
11+
id("com.google.devtools.ksp")
12+
}
13+
14+
repositories {
15+
mavenCentral()
16+
mavenLocal() // in case of local dataframe development
17+
}
18+
19+
application.mainClass = "org.jetbrains.kotlinx.dataframe.examples.spark.parquet.SparkParquetDataframeKt"
20+
21+
dependencies {
22+
implementation(project(":"))
23+
24+
// Spark SQL + MLlib (Spark 4.0.0)
25+
implementation("org.apache.spark:spark-sql_2.13:4.0.0")
26+
implementation("org.apache.spark:spark-mllib_2.13:4.0.0")
27+
28+
// Kandy (Lets-Plot backend) for plotting
29+
implementation(libs.kandy) {
30+
// Avoid pulling transitive kotlinx-dataframe from Kandy — we use the monorepo modules
31+
exclude("org.jetbrains.kotlinx", "dataframe")
32+
}
33+
34+
// Logging to keep Spark quiet
35+
implementation(libs.log4j.core)
36+
implementation(libs.log4j.api)
37+
}
38+
39+
// for Java 17+, and Arrow/Parquet support
40+
application {
41+
applicationDefaultJvmArgs = listOf(
42+
"--add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED"
43+
)
44+
}
45+
46+
kotlin {
47+
jvmToolchain(11)
48+
compilerOptions {
49+
jvmTarget = JvmTarget.JVM_11
50+
freeCompilerArgs.add("-Xjdk-release=11")
51+
}
52+
}
53+
54+
java {
55+
toolchain {
56+
languageVersion.set(JavaLanguageVersion.of(11))
57+
}
58+
}
59+
60+
tasks.withType<JavaExec> {
61+
jvmArgs(
62+
"--add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED"
63+
)
64+
}
65+
66+
tasks.withType<JavaCompile> {
67+
sourceCompatibility = JavaVersion.VERSION_11.toString()
68+
targetCompatibility = JavaVersion.VERSION_11.toString()
69+
options.release.set(11)
70+
}
71+
72+
// Configure KSP tasks to use the same JVM target
73+
tasks.withType<KotlinCompile> {
74+
compilerOptions {
75+
jvmTarget.set(JvmTarget.JVM_11)
76+
freeCompilerArgs.add("-Xjdk-release=11")
77+
}
78+
}
79+
80+
81+

0 commit comments

Comments
 (0)