Skip to content

Commit e48f0f8

Browse files
authored
Switch to SimpleBulletRecord (#3)
1 parent 9dd8e73 commit e48f0f8

File tree

2 files changed

+36
-27
lines changed

2 files changed

+36
-27
lines changed

examples/spark/pom.xml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@
1010
<scala.version>2.11.7</scala.version>
1111
<scala.dep.version>2.11</scala.dep.version>
1212
<spark.version>2.3.0</spark.version>
13-
<bullet.spark.version>0.1.0</bullet.spark.version>
13+
<bullet.spark.version>0.1.1</bullet.spark.version>
14+
<bullet.record.version>0.2.0</bullet.record.version>
1415
</properties>
1516

1617
<repositories>
@@ -44,11 +45,17 @@
4445
<version>${spark.version}</version>
4546
<scope>provided</scope>
4647
</dependency>
48+
4749
<dependency>
4850
<groupId>com.yahoo.bullet</groupId>
4951
<artifactId>bullet-spark</artifactId>
5052
<version>${bullet.spark.version}</version>
5153
</dependency>
54+
<dependency>
55+
<groupId>com.yahoo.bullet</groupId>
56+
<artifactId>bullet-record</artifactId>
57+
<version>${bullet.record.version}</version>
58+
</dependency>
5259
</dependencies>
5360
<build>
5461
<sourceDirectory>src/main/scala</sourceDirectory>

examples/spark/src/main/scala/com/yahoo/bullet/spark/examples/receiver/RandomReceiver.scala

Lines changed: 28 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,12 @@
66
package com.yahoo.bullet.spark.examples.receiver
77

88
import java.util.UUID
9+
import java.util.HashMap
10+
import java.util.Arrays.asList
911

10-
import scala.collection.JavaConverters._
1112
import scala.util.Random
1213

13-
import com.yahoo.bullet.record.BulletRecord
14+
import com.yahoo.bullet.record.{BulletRecord, SimpleBulletRecord}
1415
import com.yahoo.bullet.spark.utils.{BulletSparkConfig, BulletSparkLogger}
1516
import org.apache.spark.storage.StorageLevel
1617
import org.apache.spark.streaming.receiver.Receiver
@@ -90,36 +91,37 @@ class RandomReceiver(val config: BulletSparkConfig)
9091
}
9192

9293
private def generateRecord(): BulletRecord = {
93-
val record = new BulletRecord()
94+
val record = new SimpleBulletRecord()
9495
val uuid = UUID.randomUUID().toString
9596
record.setString(RandomReceiver.STRING, uuid)
9697
record.setLong(RandomReceiver.LONG, generatedThisPeriod)
9798
record.setDouble(RandomReceiver.DOUBLE, Random.nextDouble())
9899
record.setString(RandomReceiver.TYPE, RandomReceiver.STRING_POOL(Random.nextInt(RandomReceiver.STRING_POOL.length)))
99100
record.setLong(RandomReceiver.DURATION, System.nanoTime() % RandomReceiver.INTEGER_POOL(Random.nextInt(RandomReceiver.INTEGER_POOL.length)))
100-
val booleanMap = Map[java.lang.String, java.lang.Boolean](
101-
uuid.substring(0, 8) -> Random.nextBoolean(),
102-
uuid.substring(9, 13) -> Random.nextBoolean(),
103-
uuid.substring(14, 18) -> Random.nextBoolean(),
104-
uuid.substring(19, 23) -> Random.nextBoolean()
105-
)
106-
record.setBooleanMap(RandomReceiver.BOOLEAN_MAP, booleanMap.asJava)
107-
val statsMap = Map[java.lang.String, java.lang.Long](
108-
RandomReceiver.PERIOD_COUNT -> periodCount,
109-
RandomReceiver.RECORD_NUMBER -> (periodCount * maxPerPeriod + generatedThisPeriod),
110-
RandomReceiver.NANO_TIME -> System.nanoTime(),
111-
RandomReceiver.TIMESTAMP -> System.nanoTime()
112-
)
113-
record.setLongMap(RandomReceiver.STATS_MAP, statsMap.asJava)
114-
val randomMapA = Map[java.lang.String, java.lang.String](
115-
RandomReceiver.RANDOM_MAP_KEY_A -> RandomReceiver.STRING_POOL(Random.nextInt(RandomReceiver.STRING_POOL.length)),
116-
RandomReceiver.RANDOM_MAP_KEY_B -> RandomReceiver.STRING_POOL(Random.nextInt(RandomReceiver.STRING_POOL.length))
117-
)
118-
val randomMapB = Map[java.lang.String, java.lang.String](
119-
RandomReceiver.RANDOM_MAP_KEY_A -> RandomReceiver.STRING_POOL(Random.nextInt(RandomReceiver.STRING_POOL.length)),
120-
RandomReceiver.RANDOM_MAP_KEY_B -> RandomReceiver.STRING_POOL(Random.nextInt(RandomReceiver.STRING_POOL.length))
121-
)
122-
record.setListOfStringMap(RandomReceiver.LIST, List(randomMapA.asJava, randomMapB.asJava).asJava)
101+
102+
// Don't use Scala Map and convert it by asJava when calling setxxxMap method in BulletRecord.
103+
// It converts Scala Map to scala.collection.convert.Wrappers$MapWrapper which is not serializable in scala 2.11.x (https://issues.scala-lang.org/browse/SI-8911).
104+
val booleanMap = new HashMap[java.lang.String, java.lang.Boolean](4)
105+
booleanMap.put(uuid.substring(0, 8), Random.nextBoolean())
106+
booleanMap.put(uuid.substring(9, 13), Random.nextBoolean())
107+
booleanMap.put(uuid.substring(14, 18), Random.nextBoolean())
108+
booleanMap.put(uuid.substring(19, 23), Random.nextBoolean())
109+
record.setBooleanMap(RandomReceiver.BOOLEAN_MAP, booleanMap)
110+
111+
val statsMap = new HashMap[java.lang.String, java.lang.Long](4)
112+
statsMap.put(RandomReceiver.PERIOD_COUNT, periodCount)
113+
statsMap.put(RandomReceiver.RECORD_NUMBER, periodCount * maxPerPeriod + generatedThisPeriod)
114+
statsMap.put(RandomReceiver.NANO_TIME, System.nanoTime())
115+
statsMap.put(RandomReceiver.TIMESTAMP, System.nanoTime())
116+
record.setLongMap(RandomReceiver.STATS_MAP, statsMap)
117+
118+
val randomMapA = new HashMap[java.lang.String, java.lang.String](2)
119+
randomMapA.put(RandomReceiver.RANDOM_MAP_KEY_A, RandomReceiver.STRING_POOL(Random.nextInt(RandomReceiver.STRING_POOL.length)))
120+
randomMapA.put(RandomReceiver.RANDOM_MAP_KEY_B, RandomReceiver.STRING_POOL(Random.nextInt(RandomReceiver.STRING_POOL.length)))
121+
val randomMapB = new HashMap[java.lang.String, java.lang.String](2)
122+
randomMapB.put(RandomReceiver.RANDOM_MAP_KEY_A, RandomReceiver.STRING_POOL(Random.nextInt(RandomReceiver.STRING_POOL.length)))
123+
randomMapB.put(RandomReceiver.RANDOM_MAP_KEY_B, RandomReceiver.STRING_POOL(Random.nextInt(RandomReceiver.STRING_POOL.length)))
124+
record.setListOfStringMap(RandomReceiver.LIST, asList(randomMapA, randomMapB))
123125
record
124126
}
125127
}

0 commit comments

Comments
 (0)