|
| 1 | +# Performance of Bullet on Spark |
| 2 | + |
| 3 | +This section describes how we tune the performance of Bullet on Spark. This is not meant to be a rigorous benchmark. |
| 4 | + |
| 5 | +## Prerequisites |
| 6 | + |
| 7 | +You should be familiar with [Spark Streaming](https://spark.apache.org/streaming/), [Kafka](http://kafka.apache.org) and the [Bullet on Spark architecture](spark-architecture.md). |
| 8 | + |
| 9 | +## How was this tested? |
| 10 | + |
| 11 | +All tests run here were using [Bullet-Spark 0.1.2](https://github.com/bullet-db/bullet-spark/releases/tag/bullet-spark-0.1.2). |
| 12 | + |
| 13 | +### Tools used |
| 14 | + |
| 15 | + * [jq](https://stedolan.github.io/jq/) - a nice tool to parse Bullet JSON responses |
| 16 | + * curl, bash and python - for running and analyzing Bullet queries |
| 17 | + * [Apache JMeter](https://jmeter.apache.org/) - a load testing tool to send queries to the server simultaneously |
| 18 | + |
| 19 | +### Cluster |
| 20 | + |
| 21 | +* Hadoop YARN cluster with Apache Spark 2.1.2.12 installed |
| 22 | +* The spec for the machines we were running on: |
| 23 | + - 2 x Intel E5530(4 cores, 8 Threads) |
| 24 | + - 24 GB RAM |
| 25 | + - 3 TB SATA Disk |
| 26 | + - 10 G Network Interface |
| 27 | + |
| 28 | +### Data |
| 29 | + |
| 30 | +* Our data was read from a Kafka cluster. Kafka version is 0.10.2.1 |
| 31 | +* The Kafka cluster was located within the same datacenter as the Hadoop YARN cluster - close network proximity gives us some measure of confidence that large data transmission delays aren't a factor. |
| 32 | +* Our data schema contained ```92``` fields with ```62``` Strings, ```4``` Longs, ```23``` Maps and ```3``` Lists of Maps. Most of the data is generally present in the Maps and Lists of Maps. |
| 33 | +* We tested 2 set of data: |
| 34 | + - The smaller data was about 36,000 records/s and 43 MB/s |
| 35 | + - The larger data was about 124,700 records/s and 150 MB/s |
| 36 | + |
| 37 | +### Configuration |
| 38 | + |
| 39 | +Here are the configurations we used to launch instances of Bullet Spark. |
| 40 | + |
| 41 | +* For the smaller data: |
| 42 | + |
| 43 | +Settings: |
| 44 | +```YAML |
| 45 | +bullet.spark.batch.duration.ms: 2000 |
| 46 | +bullet.spark.receiver.query.block.size: 1 |
| 47 | +bullet.result.metadata.enable: true |
| 48 | +bullet.spark.metrics.enabled: true |
| 49 | +bullet.spark.filter.parallel.enabled: true |
| 50 | +bullet.spark.filter.parallelism: 16 |
| 51 | +bullet.spark.filter.parallel.query.min.size: 10 |
| 52 | +bullet.spark.query.union.checkpoint.duration.multiplier: 20 |
| 53 | +bullet.spark.join.checkpoint.duration.multiplier: 20 |
| 54 | +``` |
| 55 | +Command line: |
| 56 | +```bash |
| 57 | +./spark-submit \ |
| 58 | + --master yarn \ |
| 59 | + --deploy-mode cluster \ |
| 60 | + --queue default \ |
| 61 | + --executor-memory 12g \ |
| 62 | + --executor-cores 2 \ |
| 63 | + --num-executors 100 \ |
| 64 | + --driver-cores 2 \ |
| 65 | + --driver-memory 12g \ |
| 66 | + --conf spark.streaming.backpressure.enabled=true \ |
| 67 | + --conf spark.driver.extraJavaOptions="-XX:+UseG1GC" \ |
| 68 | + --conf spark.executor.extraJavaOptions="-XX:+UseG1GC" \ |
| 69 | + --conf spark.shuffle.consolidateFiles=true \ |
| 70 | + --conf spark.dynamicAllocation.enabled=false \ |
| 71 | + --conf spark.storage.memoryFraction=0.1 \ |
| 72 | + --conf spark.shuffle.memoryFraction=0.8 \ |
| 73 | + --conf spark.default.parallelism=20 \ |
| 74 | + ... |
| 75 | +``` |
| 76 | + |
| 77 | +* For larger Data: |
| 78 | + |
| 79 | +Settings: |
| 80 | +```YAML |
| 81 | +bullet.spark.batch.duration.ms: 5000 |
| 82 | +bullet.spark.receiver.query.block.size: 1 |
| 83 | +bullet.result.metadata.enable: true |
| 84 | +bullet.spark.metrics.enabled: true |
| 85 | +bullet.spark.filter.parallel.enabled: true |
| 86 | +bullet.spark.filter.parallelism: 64 |
| 87 | +bullet.spark.filter.parallel.query.min.size: 10 |
| 88 | +bullet.spark.query.union.checkpoint.duration.multiplier: 20 |
| 89 | +bullet.spark.join.checkpoint.duration.multiplier: 20 |
| 90 | +``` |
| 91 | +Command line: |
| 92 | +```bash |
| 93 | +./spark-submit \ |
| 94 | + --master yarn \ |
| 95 | + --deploy-mode cluster \ |
| 96 | + --queue default \ |
| 97 | + --executor-memory 12g \ |
| 98 | + --executor-cores 2 \ |
| 99 | + --num-executors 400 \ |
| 100 | + --driver-cores 2 \ |
| 101 | + --driver-memory 12g \ |
| 102 | + --conf spark.streaming.backpressure.enabled=true \ |
| 103 | + --conf spark.driver.extraJavaOptions="-XX:+UseG1GC" \ |
| 104 | + --conf spark.executor.extraJavaOptions="-XX:+UseG1GC" \ |
| 105 | + --conf spark.shuffle.consolidateFiles=true \ |
| 106 | + --conf spark.dynamicAllocation.enabled=false \ |
| 107 | + --conf spark.storage.memoryFraction=0.1 \ |
| 108 | + --conf spark.shuffle.memoryFraction=0.8 \ |
| 109 | + --conf spark.default.parallelism=50 \ |
| 110 | + ... |
| 111 | +``` |
| 112 | + |
| 113 | +## Test 1: Latency of Bullet Spark |
| 114 | + |
| 115 | +This test was done on the smaller data. We used a [RAW query without any filtering](../ws/examples/#simplest-query) to measure the latency added by Bullet Spark. This is not the end-to-end latency for a query. It is the latency from receiving the query to finishing the query, not includes the time spent in Kafka. We ran this query 100 times. |
| 116 | + |
| 117 | +### Result |
| 118 | + |
| 119 | +This graph shows the latency of each attempts: |
| 120 | + |
| 121 | + |
| 122 | + |
| 123 | +### Conclusion |
| 124 | + |
| 125 | +The average latency was 1173 ms. This result shows that this is the fastest Bullet Spark can be. It cannot return data any faster than this for meaningful queries. |
| 126 | + |
| 127 | +## Test 2: Scalability for smaller data |
| 128 | + |
| 129 | +This test was done on the smaller data. We want to measure how many of queries we can have running simultaneously on Bullet Spark. We ran 400, 800, 1500 and 1100 queries each for 10 minutes. |
| 130 | + |
| 131 | +### Result |
| 132 | + |
| 133 | +#### Figure 1. Spark Streaming UI |
| 134 | + |
| 135 | + |
| 136 | + |
| 137 | +#### Figure 2. Queries running |
| 138 | + |
| 139 | + |
| 140 | + |
| 141 | +#### Figure 3. CPU time |
| 142 | + |
| 143 | + |
| 144 | + |
| 145 | +#### Figure 4. Heap usage |
| 146 | + |
| 147 | + |
| 148 | + |
| 149 | +#### Figure 5. Garbage collection time |
| 150 | + |
| 151 | + |
| 152 | + |
| 153 | +#### Figure 6. Garbage collection count |
| 154 | + |
| 155 | + |
| 156 | + |
| 157 | +[Figure 1](#figure-1-spark-streaming-ui) shows the Spark Streaming UI when running the test. |
| 158 | + |
| 159 | +[Figure 2](#figure-2-queries-running) shows the simultaneous queries we ran. |
| 160 | + |
| 161 | +[Figure 3](#figure-3-cpu-time) shows the milliseconds of CPU time used per minute. For example, a value of ```300K ms``` ms for a line (worker) means that the worker used ```300K ms/min``` or ```300s/60s``` or ```5``` CPU cores (virtual) in that minute. |
| 162 | + |
| 163 | +[Figure 4](#figure-4-heap-usage) shows raw numbers for Heap utilizations in bytes. |
| 164 | + |
| 165 | +[Figure 5](#figure-5-garbage-collection-time) shows the time in milliseconds spent for garbage collection per minute. |
| 166 | + |
| 167 | +[Figure 6](#figure-6-garbage-collection-count) shows the count of garbage collection events per minute. |
| 168 | + |
| 169 | +### Conclusion |
| 170 | + |
| 171 | +The average processing time for each batch was 1 second 143 ms which was below the batch duration 2 seconds. For average, 1 CPU core and 3GB memory were used in this experiment. CPU and memory usages go slowly up while queries number goes up but they are still within resource limits. We can easily run up to 1500 RAW queries simultaneously in this test. |
| 172 | + |
| 173 | +## Test 3: Scalability for larger data |
| 174 | + |
| 175 | +This test was done on the larger data. We ran 100, 400, 800 and 600 queries each for 10 minutes. |
| 176 | + |
| 177 | +### Result |
| 178 | + |
| 179 | +#### Figure 7. Spark stream UI |
| 180 | + |
| 181 | + |
| 182 | + |
| 183 | +#### Figure 8. Queries running |
| 184 | + |
| 185 | + |
| 186 | + |
| 187 | +#### Figure 9. CPU time |
| 188 | + |
| 189 | + |
| 190 | + |
| 191 | +#### Figure 10. Heap usage |
| 192 | + |
| 193 | + |
| 194 | + |
| 195 | +#### Figure 11. Garbage collection time |
| 196 | + |
| 197 | + |
| 198 | + |
| 199 | +#### Figure 12. Garbage collection count |
| 200 | + |
| 201 | + |
| 202 | + |
| 203 | +### Conclusion |
| 204 | + |
| 205 | +The average processing time for each batch was 3 second 97 ms which was below the batch duration 5 seconds. For average, 1.2 CPU core and average 5GB memory were used in this experiment. But with queries number goes up, some of the executors memory usage were up to 8-10GB which is close to our resource limits. With more queries running, OOM may happen. So in this experiment, we can only afford up to 800 queries simultaneously. |
0 commit comments