Skip to content

Commit ae62ee6

Browse files
committed
Removed File wrapper aroung input path to read files from local and s3
1 parent b2ebb76 commit ae62ee6

File tree

6 files changed

+38
-19
lines changed

6 files changed

+38
-19
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,9 @@ A Spark bootstrap project written in Scala with gradle as build tool.
3131

3232
#### Run sparkSubmit task
3333

34-
- Runs a `spark-submit` with class `dev.template.spark.RddCollect`
34+
Gradle sparkSubmit task is configured to run with class the `dev.template.spark.RddCollect`
3535

36-
./gradlew sparkSubmit
36+
./gradlew sparkSubmit
3737

3838
#### Spark Submit commands in shell
3939

gradle.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ commonsIO=2.13.0
55
deltaVersion=2.4.0
66
#kafka
77
confluentVersion=7.5.0
8+
kafkaClientVersion=3.4.0
89
#logging
910
slf4jVersion=1.7.21
1011
logbackVersion=1.1.7

src/main/scala/dev/template/spark/CovidDataPartitioner.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@ import dev.template.spark.sink.Writer
44
import dev.template.spark.source.Reader
55
import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession}
66

7-
import java.io.File
8-
97
object CovidDataPartitioner
108
extends App
119
with SparkSessionWrapper
@@ -26,7 +24,6 @@ object CovidDataPartitioner
2624
| fips,
2725
| cases,
2826
| deaths from covid
29-
| group by all
3027
|
3128
|""".stripMargin)
3229
.cache()
@@ -54,8 +51,12 @@ object CovidDataPartitioner
5451
throw new RuntimeException("Requires input file us-counties-recent.csv")
5552
}
5653

57-
private val inputFilePath = new File(args(0)).toString
58-
private val outputPath = new File(args(1)).toString
54+
var inputFilePath = args(0)
55+
var outputPath: String = args(1)
56+
57+
log.info("Input path " + inputFilePath)
58+
log.info("Output path " + outputPath)
59+
5960
writeParquet(spark, inputFilePath, outputPath)
6061

6162
}

src/main/scala/dev/template/spark/Main.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ object Main extends App {
4444
|""".stripMargin)
4545
throw new RuntimeException("Requires input file people-example.csv")
4646
}
47-
private val inputFilePath = new File(args(0)).toString
47+
private val inputFilePath = args(0)
4848
val calculateAverageAge = new CalculateAverageAge()
4949
calculateAverageAge.calculateAverageAge(inputFilePath)
5050
}

src/main/scala/dev/template/spark/source/Reader.scala

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,28 @@ trait Reader {
1717
.option("inferSchema", true)
1818
.option("mode", "DROPMALFORMED")
1919

20-
def readKafka(spark: SparkSession, topic: String, options: Map[String, String]) = spark
21-
.read
22-
.format("kafka")
23-
.options(options)
24-
.option("subscribe", topic)
25-
.load()
20+
def readDelta(spark: SparkSession, path: String, options: Map[String, String] = Map()) =
21+
spark.read.format("delta").options(options).load(path)
22+
23+
/**
24+
* Kafka reader requires kafka consumer properties.
25+
*
26+
* @param spark
27+
* spark session
28+
* @param topic
29+
* kafka topic to consume
30+
* @param kafkaConfig
31+
* Kafka consumer properties
32+
* @return
33+
*/
34+
def readKafka(spark: SparkSession, topic: String, kafkaConfig: Map[String, String] = Map()) =
35+
spark
36+
.read
37+
.format("kafka")
38+
.options(kafkaConfig)
39+
.option("subscribe", topic)
40+
.option("startingOffsets", "earliest")
41+
.option("endingOffsets", "latest")
42+
.load()
43+
2644
}

src/test/scala/dev/template/spark/CovidDataPartitionerTest.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,13 @@ class CovidDataPartitionerTest extends AnyFunSpec with SparkSessionTestWrapper {
2121

2222
it("number of reported_date partitions should be 30") {
2323

24-
2524
val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
2625
val reportedDateFolderCount = fs
2726
.listStatus(new Path(outputPath),
28-
new PathFilter {
29-
override def accept(path: Path): Boolean =
30-
path.getName.contains("reported_date")
31-
})
27+
new PathFilter {
28+
override def accept(path: Path): Boolean =
29+
path.getName.contains("reported_date")
30+
})
3231
.length
3332

3433
assertEquals(30, reportedDateFolderCount)

0 commit comments

Comments
 (0)