Skip to content

Commit 9dd8e73

Browse files
authored
Spark Docs (#2)
1 parent f8b9707 commit 9dd8e73

File tree

5 files changed

+236
-57
lines changed

5 files changed

+236
-57
lines changed

docs/backend/spark-architecture.md

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
# Spark architecture
2+
3+
This section describes how the [Backend architecture](../index.md#backend) is implemented in Spark Streaming.
4+
5+
## Data Flow Graph
6+
7+
Bullet Spark implements the backend piece from the full [Architecture](../index.md#architecture). It is implemented with Spark Streaming:
8+
9+
![Bullet Spark DAG](../img/spark-dag.png)
10+
11+
The components in the [Architecture](../index.md#architecture) have direct counterparts here. The Query Receiver reading from the PubSub layer using plugged-in PubSub consumers and the Query Unioning make up the Request Processor. The Filter Streaming and your plugin for your source of data make up the Data Processor. The Join Streaming and the Result Emitter make up the Combiner.
12+
13+
The red lines are the path for the queries that come in through the PubSub, the orange lines are the path for the signals and the blue lines are for the data from your data source. The shapes of the boxes denote the type of transformation/action being executed in the boxes.
14+
15+
### Data processing
16+
17+
Bullet can accept arbitrary sources of data as long as they can be ingested by Spark. They can be Kafka, Flume, Kinesis, and TCP sockets etc. In order to hook up your data to Bullet Spark, you just need to implement the [Data Producer Trait](https://github.com/bullet-db/bullet-spark/blob/master/src/main/scala/com/yahoo/bullet/spark/DataProducer.scala). In your implementation, you can either:
18+
* Use [Spark Streaming built-in sources](https://spark.apache.org/docs/latest/streaming-programming-guide.html#input-dstreams-and-receivers) to receive data. Below is a quick example for a direct Kafka source in Scala. You can also write it in Java:
19+
20+
```scala
21+
import com.yahoo.bullet.spark.DataProducer
22+
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
23+
// import all other necessary packages
24+
25+
class DirectKafkaProducer extends DataProducer {
26+
override def getBulletRecordStream(ssc: StreamingContext, config: BulletSparkConfig): DStream[BulletRecord] = {
27+
val topics = Array("test")
28+
val kafkaParams = Map[String, AnyRef](
29+
"bootstrap.servers" -> "server1, server2",
30+
"group.id" -> "mygroup",
31+
"key.deserializer" -> classOf[StringDeserializer],
32+
"value.deserializer" -> classOf[ByteArrayDeserializer]
33+
// Other kafka params
34+
)
35+
36+
val directKafkaStream = KafkaUtils.createDirectStream[String, Array[Byte]](
37+
ssc,
38+
LocationStrategies.PreferConsistent,
39+
ConsumerStrategies.Subscribe[String, Array[Byte]](topics, kafkaParams))
40+
41+
directKafkaStream.map(record => {
42+
// Convert your record to BulletRecord
43+
})
44+
}
45+
}
46+
```
47+
48+
* Write a [custom receiver](https://spark.apache.org/docs/latest/streaming-custom-receivers.html) to receive data from any arbitrary data source beyond the ones for which it has built-in support (that is, beyond Flume, Kafka, Kinesis, files, sockets, etc.). See [example](https://github.com/bullet-db/bullet-db.github.io/tree/src/examples/spark/src/main/scala/com/yahoo/bullet/spark/examples).
49+
50+
After receiving your data, you can do any transformations like joins or type conversions in your implementation before emitting to the Filter Streaming stage.
51+
52+
The Filter Streaming stage checks every record from your data source against every query from Query Unioning stage to see if it matches and emits partial results to the Join Streaming stage.
53+
54+
### Request processing
55+
56+
The Query Receiver fetches Bullet queries and signals through the PubSub layer using the Subscribers provided by the plugged in PubSub layer. The queries received through the PubSub also contain information about the query such as its unique identifier, potentially other metadata and signals. The Query Unioning collects all active queries by the stateful transformation [updateStateByKey](https://spark.apache.org/docs/latest/streaming-programming-guide.html#updatestatebykey-operation) and broadcasts all the collected queries to every executor for the Filter Streaming stage.
57+
58+
The Query Unioning also sends all active queries and signals to the Join Streaming stage.
59+
60+
### Combining
61+
62+
The Filter Streaming combines all the partial results from the Filter Streaming by the stateful transformation [mapWithState](https://spark.apache.org/docs/2.3.0/api/scala/index.html#org.apache.spark.streaming.dstream.PairDStreamFunctions@mapWithState[StateType,MappedType](spec:org.apache.spark.streaming.StateSpec[K,V,StateType,MappedType])(implicitevidence$2:scala.reflect.ClassTag[StateType],implicitevidence$3:scala.reflect.ClassTag[MappedType]):org.apache.spark.streaming.dstream.MapWithStateDStream[K,V,StateType,MappedType]) and produces final results.
63+
64+
The Result Emitter uses the particular publisher from the plugged in PubSub layer to send back results/loop signals.

docs/backend/spark-setup.md

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
# Bullet on Spark
2+
3+
This section explains how to set up and run Bullet on Spark.
4+
5+
## Configuration
6+
7+
Bullet is configured at run-time using settings defined in a file. Settings not overridden will default to the values in [bullet_spark_defaults.yaml](https://github.com/bullet-db/bullet-spark/blob/master/src/main/resources/bullet_spark_defaults.yaml). You can find out what these settings do in the comments listed in the defaults.
8+
9+
## Installation
10+
11+
Download the Bullet Spark standalone jar from [JCenter](http://jcenter.bintray.com/com/yahoo/bullet/bullet-spark/).
12+
13+
If you are using Bullet Kafka as pluggable PubSub, you can download the fat jar from [JCenter](http://jcenter.bintray.com/com/yahoo/bullet/bullet-kafka/). Otherwise, you need to plug in your own PubSub jar or use the RESTPubSub built-into bullet-core and turned on in the API.
14+
15+
To use Bullet Spark, you need to implement your own [Data Producer Trait](https://github.com/bullet-db/bullet-spark/blob/master/src/main/scala/com/yahoo/bullet/spark/DataProducer.scala) with a JVM based project. You have two ways to implement it as described in the [Spark Architecture](spark-architecture.md#data-processing) section. You include the Bullet artifact and Spark dependencies in your pom.xml or other equivalent build tools. The artifacts are available through JCenter. Here is an example if you use Scala and Maven:
16+
17+
```xml
18+
<repositories>
19+
<repository>
20+
<snapshots>
21+
<enabled>false</enabled>
22+
</snapshots>
23+
<id>central</id>
24+
<name>bintray</name>
25+
<url>http://jcenter.bintray.com</url>
26+
</repository>
27+
</repositories>
28+
```
29+
30+
```xml
31+
<properties>
32+
<scala.version>2.11.7</scala.version>
33+
<scala.dep.version>2.11</scala.dep.version>
34+
<spark.version>2.3.0</spark.version>
35+
<bullet.spark.version>0.1.1</bullet.spark.version>
36+
</properties>
37+
38+
<dependency>
39+
<groupId>org.scala-lang</groupId>
40+
<artifactId>scala-library</artifactId>
41+
<version>${scala.version}</version>
42+
<scope>provided</scope>
43+
</dependency>
44+
45+
<dependency>
46+
<groupId>org.apache.spark</groupId>
47+
<artifactId>spark-streaming_${scala.dep.version}</artifactId>
48+
<version>${spark.version}</version>
49+
<scope>provided</scope>
50+
</dependency>
51+
52+
<dependency>
53+
<groupId>org.apache.spark</groupId>
54+
<artifactId>spark-core_${scala.dep.version}</artifactId>
55+
<version>${spark.version}</version>
56+
<scope>provided</scope>
57+
</dependency>
58+
59+
<dependency>
60+
<groupId>com.yahoo.bullet</groupId>
61+
<artifactId>bullet-spark</artifactId>
62+
<version>${bullet.spark.version}</version>
63+
</dependency>
64+
```
65+
66+
You can also add ```<classifier>sources</classifier>``` or ```<classifier>javadoc</classifier>``` if you want the sources or javadoc.
67+
68+
## Launch
69+
70+
After you have implemented your own data producer and built a jar, you could launch your Bullet Spark application. Here is an example command for a [YARN cluster](https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html).
71+
72+
```bash
73+
./bin/spark-submit \
74+
--master yarn \
75+
--deploy-mode cluster \
76+
--class com.yahoo.bullet.spark.BulletSparkStreamingMain \
77+
--queue <your queue> \
78+
--executor-memory 12g \
79+
--executor-cores 2 \
80+
--num-executors 200 \
81+
--driver-cores 2 \
82+
--driver-memory 12g \
83+
--conf spark.streaming.backpressure.enabled=true \
84+
--conf spark.default.parallelism=20 \
85+
... # other Spark settings
86+
--jars /path/to/your-data-producer.jar,/path/to/your-pubsub.jar \
87+
/path/to/downloaded-bullet-spark-standalone.jar \
88+
--bullet-spark-conf /path/to/your-settings.yaml
89+
```
90+
91+
You can pass other Spark settings by adding ```--conf key=value``` to the command. For more settings, you can refer to the [Spark Configuration](https://spark.apache.org/docs/latest/configuration.html).
92+
93+
For other platforms, you can find the commands from the [Spark Documentation](https://spark.apache.org/docs/latest/submitting-applications.html).

docs/img/spark-dag.png

149 KB
Loading

0 commit comments

Comments
 (0)