Skip to content

Commit f8b9707

Browse files
authored
add spark random receiver (#1)
1 parent c0bad2d commit f8b9707

File tree

5 files changed

+231
-0
lines changed

5 files changed

+231
-0
lines changed

examples/spark/.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
/target/**
2+

examples/spark/README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
# Bullet-Spark Example
2+
3+
This example generates fake Bullet Records for use in an example.
4+

examples/spark/pom.xml

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
<project xmlns="http://maven.apache.org/POM/4.0.0"
2+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
4+
<modelVersion>4.0.0</modelVersion>
5+
<groupId>com.yahoo.bullet</groupId>
6+
<artifactId>bullet-spark-example</artifactId>
7+
<version>0.0.1-SNAPSHOT</version>
8+
<packaging>jar</packaging>
9+
<properties>
10+
<scala.version>2.11.7</scala.version>
11+
<scala.dep.version>2.11</scala.dep.version>
12+
<spark.version>2.3.0</spark.version>
13+
<bullet.spark.version>0.1.0</bullet.spark.version>
14+
</properties>
15+
16+
<repositories>
17+
<repository>
18+
<snapshots>
19+
<enabled>false</enabled>
20+
</snapshots>
21+
<id>jcenter</id>
22+
<name>bintray</name>
23+
<url>http://jcenter.bintray.com</url>
24+
</repository>
25+
</repositories>
26+
27+
<dependencies>
28+
<!-- Provided -->
29+
<dependency>
30+
<groupId>org.scala-lang</groupId>
31+
<artifactId>scala-library</artifactId>
32+
<version>${scala.version}</version>
33+
<scope>provided</scope>
34+
</dependency>
35+
<dependency>
36+
<groupId>org.apache.spark</groupId>
37+
<artifactId>spark-streaming_${scala.dep.version}</artifactId>
38+
<version>${spark.version}</version>
39+
<scope>provided</scope>
40+
</dependency>
41+
<dependency>
42+
<groupId>org.apache.spark</groupId>
43+
<artifactId>spark-core_${scala.dep.version}</artifactId>
44+
<version>${spark.version}</version>
45+
<scope>provided</scope>
46+
</dependency>
47+
<dependency>
48+
<groupId>com.yahoo.bullet</groupId>
49+
<artifactId>bullet-spark</artifactId>
50+
<version>${bullet.spark.version}</version>
51+
</dependency>
52+
</dependencies>
53+
<build>
54+
<sourceDirectory>src/main/scala</sourceDirectory>
55+
<plugins>
56+
<plugin>
57+
<groupId>org.scala-tools</groupId>
58+
<artifactId>maven-scala-plugin</artifactId>
59+
<executions>
60+
<execution>
61+
<goals>
62+
<goal>compile</goal>
63+
<goal>testCompile</goal>
64+
</goals>
65+
</execution>
66+
</executions>
67+
<configuration>
68+
<scalaVersion>${scala.version}</scalaVersion>
69+
<args>
70+
<arg>-target:jvm-1.8</arg>
71+
</args>
72+
</configuration>
73+
</plugin>
74+
</plugins>
75+
</build>
76+
</project>
77+
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Copyright 2018, Oath Inc.
3+
* Licensed under the terms of the Apache License, Version 2.0.
4+
* See the LICENSE file associated with the project for terms.
5+
*/
6+
package com.yahoo.bullet.spark.examples
7+
8+
import com.yahoo.bullet.record.BulletRecord
9+
import com.yahoo.bullet.spark.DataProducer
10+
import com.yahoo.bullet.spark.examples.receiver.RandomReceiver
11+
import com.yahoo.bullet.spark.utils.BulletSparkConfig
12+
import org.apache.spark.streaming.StreamingContext
13+
import org.apache.spark.streaming.dstream.DStream
14+
15+
class RandomProducer extends DataProducer {
16+
override def getBulletRecordStream(ssc: StreamingContext, config: BulletSparkConfig): DStream[BulletRecord] = {
17+
// Bullet record input stream.
18+
val bulletReceiver = new RandomReceiver(config)
19+
ssc.receiverStream(bulletReceiver).asInstanceOf[DStream[BulletRecord]]
20+
}
21+
}
22+
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* Copyright 2018, Oath Inc.
3+
* Licensed under the terms of the Apache License, Version 2.0.
4+
* See the LICENSE file associated with the project for terms.
5+
*/
6+
package com.yahoo.bullet.spark.examples.receiver
7+
8+
import java.util.UUID
9+
10+
import scala.collection.JavaConverters._
11+
import scala.util.Random
12+
13+
import com.yahoo.bullet.record.BulletRecord
14+
import com.yahoo.bullet.spark.utils.{BulletSparkConfig, BulletSparkLogger}
15+
import org.apache.spark.storage.StorageLevel
16+
import org.apache.spark.streaming.receiver.Receiver
17+
18+
19+
object RandomReceiver {
20+
// Fields in BulletRecord
21+
private val STRING = "uuid"
22+
private val LONG = "tuple_number"
23+
private val DOUBLE = "probability"
24+
private val BOOLEAN_MAP = "tags"
25+
private val STATS_MAP = "stats"
26+
private val LIST = "classifiers"
27+
private val DURATION = "duration"
28+
private val TYPE = "type"
29+
private val RANDOM_MAP_KEY_A = "field_A"
30+
private val RANDOM_MAP_KEY_B = "field_B"
31+
private val PERIOD_COUNT = "period_count"
32+
private val RECORD_NUMBER = "record_number"
33+
private val NANO_TIME = "nano_time"
34+
private val TIMESTAMP = "timestamp"
35+
// Some static values in BulletRecord for the fields
36+
private val STRING_POOL = Array("foo", "bar", "baz", "qux", "quux", "norf")
37+
private val INTEGER_POOL = Array(2057, 13, 10051, 2, 1059, 187)
38+
}
39+
40+
/**
41+
* Constructor that takes a configuration to use.
42+
*
43+
* @param config The BulletSparkConfig to load settings from.
44+
*/
45+
class RandomReceiver(val config: BulletSparkConfig)
46+
extends Receiver[BulletRecord](StorageLevel.MEMORY_AND_DISK_SER) with BulletSparkLogger {
47+
// Number of tuples to emit
48+
private val maxPerPeriod = 100L
49+
// Period in milliseconds. Default 1000 ms
50+
private val period = 1000
51+
private var periodCount = 0L
52+
private var generatedThisPeriod = 0L
53+
private var nextIntervalStart = 0L
54+
55+
override def onStart(): Unit = {
56+
new Thread() {
57+
override def run(): Unit = {
58+
receive()
59+
}
60+
}.start()
61+
logger.info("Random receiver started.")
62+
}
63+
64+
override def onStop(): Unit = {
65+
logger.info("Random receiver stopped.")
66+
}
67+
68+
private def receive(): Unit = {
69+
nextIntervalStart = System.currentTimeMillis()
70+
while (!isStopped) {
71+
val timeNow = System.currentTimeMillis()
72+
// Only emit if we are still in the interval and haven't gone over our per period max
73+
if (timeNow <= nextIntervalStart && generatedThisPeriod < maxPerPeriod) {
74+
store(generateRecord())
75+
generatedThisPeriod += 1
76+
}
77+
if (timeNow > nextIntervalStart) {
78+
logger.info("Generated {} tuples out of {}", generatedThisPeriod, maxPerPeriod)
79+
nextIntervalStart = timeNow + period
80+
generatedThisPeriod = 0
81+
periodCount += 1
82+
}
83+
// It is courteous to sleep for a short time.
84+
try {
85+
Thread.sleep(1)
86+
} catch {
87+
case e: InterruptedException => logger.error("Error: ", e)
88+
}
89+
}
90+
}
91+
92+
private def generateRecord(): BulletRecord = {
93+
val record = new BulletRecord()
94+
val uuid = UUID.randomUUID().toString
95+
record.setString(RandomReceiver.STRING, uuid)
96+
record.setLong(RandomReceiver.LONG, generatedThisPeriod)
97+
record.setDouble(RandomReceiver.DOUBLE, Random.nextDouble())
98+
record.setString(RandomReceiver.TYPE, RandomReceiver.STRING_POOL(Random.nextInt(RandomReceiver.STRING_POOL.length)))
99+
record.setLong(RandomReceiver.DURATION, System.nanoTime() % RandomReceiver.INTEGER_POOL(Random.nextInt(RandomReceiver.INTEGER_POOL.length)))
100+
val booleanMap = Map[java.lang.String, java.lang.Boolean](
101+
uuid.substring(0, 8) -> Random.nextBoolean(),
102+
uuid.substring(9, 13) -> Random.nextBoolean(),
103+
uuid.substring(14, 18) -> Random.nextBoolean(),
104+
uuid.substring(19, 23) -> Random.nextBoolean()
105+
)
106+
record.setBooleanMap(RandomReceiver.BOOLEAN_MAP, booleanMap.asJava)
107+
val statsMap = Map[java.lang.String, java.lang.Long](
108+
RandomReceiver.PERIOD_COUNT -> periodCount,
109+
RandomReceiver.RECORD_NUMBER -> (periodCount * maxPerPeriod + generatedThisPeriod),
110+
RandomReceiver.NANO_TIME -> System.nanoTime(),
111+
RandomReceiver.TIMESTAMP -> System.nanoTime()
112+
)
113+
record.setLongMap(RandomReceiver.STATS_MAP, statsMap.asJava)
114+
val randomMapA = Map[java.lang.String, java.lang.String](
115+
RandomReceiver.RANDOM_MAP_KEY_A -> RandomReceiver.STRING_POOL(Random.nextInt(RandomReceiver.STRING_POOL.length)),
116+
RandomReceiver.RANDOM_MAP_KEY_B -> RandomReceiver.STRING_POOL(Random.nextInt(RandomReceiver.STRING_POOL.length))
117+
)
118+
val randomMapB = Map[java.lang.String, java.lang.String](
119+
RandomReceiver.RANDOM_MAP_KEY_A -> RandomReceiver.STRING_POOL(Random.nextInt(RandomReceiver.STRING_POOL.length)),
120+
RandomReceiver.RANDOM_MAP_KEY_B -> RandomReceiver.STRING_POOL(Random.nextInt(RandomReceiver.STRING_POOL.length))
121+
)
122+
record.setListOfStringMap(RandomReceiver.LIST, List(randomMapA.asJava, randomMapB.asJava).asJava)
123+
record
124+
}
125+
}
126+

0 commit comments

Comments
 (0)