Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 132 additions & 0 deletions spark/spark-scylla-fullscan/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
This demo shows how to run a simple spark 3.5 job to enrich a simple scylla 2025.1 table

Prerequisites:
---------------
Read and run below carefully, any of steps is skipped or broken, the whole app will error out with weird messages ;-)
Below works well in Fedora 34, replace the dnf/yum commands with their appropriate apt alternative for debian systems

Make sure you have docker installed.
https://docs.docker.com/engine/install/

start a scylla container:
```
./start-scylla-container.sh
```

Now you should have scylla listening on port 9044
verify using `docker ps`

Make sure you have OpenJDK 21 installed in /usr/lib/jvm/java-21
If it's elsewhere, fix paths in *.sh scripts !!!

```
sudo dnf -y install java-21-openjdk-devel
```

Get Spark 3.5.5 WITH scala 2.13:
https://spark.apache.org/downloads.html

```
wget https://dlcdn.apache.org/spark/spark-3.5.5/spark-3.5.5-bin-hadoop3-scala2.13.tgz
```

unzip it locally and symlink spark3 dir to it, e.g.

```
tar xzvf spark-3.5.5-bin-hadoop3-scala2.13.tgz
ln -s spark-3.5.5-bin-hadoop3-scala2.13 spark3
```

Get local cqlsh

```
sudo dnf -y install git

git clone https://github.com/scylladb/scylla-tools-java.git

```

Make sure you have python2 at least installed.
Check if scylla container really runs after checkout of above:

```
./scylla-tools-java/bin/cqlsh $HOSTNAME 9044
```

Make sure you have latest sbt:
https://www.scala-sbt.org/1.x/docs/Installing-sbt-on-Linux.html


Running the demo:
-----------------

Populate the DB:

```
./scylla-tools-java/bin/cqlsh $HOSTNAME 9044 -f samplespark3.cql
```
(check extra commands for showing the contents after data load, note null values in "lettersinname")

Build the project:

```
java --version
```

should say OpenJDK 11
then build:

```
sbt assembly
```

Verify you have the jar built:
```
ls -la target/scala-2.13/spark-scylla-fullscan-assembly-0.1.jar
```

Start spark3:

```
./start-spark3.sh
```

UI should be listening on $HOSTNAME:8080
(or any bigger free port, e.g. 8081)

Submit the app:

```
./submit_job_spark3.sh
```

Ideally close to the end before "SparkUI: Stopped Spark web UI" you will see:

`Accumulator: "Changed Row Count" is set to: 4`

And that means the first run was sucessfull and updated the rows without info.

If you go for second round, job will be smarter to not update stuff without null values for "lettersinname" column:

Accumulator: "Changed Row Count" is set to: 0

And that's it :-)

Extra commands:
---------------

to trash the keyspace easily do:
```
./scylla-tools-java/bin/cqlsh $HOSTNAME 9044 -e "drop keyspace myOwnKeyspace"
```

to show current rows:
```
./scylla-tools-java/bin/cqlsh $HOSTNAME 9044 -e "select * from myOwnKeyspace.sparseTable"
```

If you want to run the commands from the app interactively, use Spark REPL:
```
./spark3-shell.sh
```

23 changes: 23 additions & 0 deletions spark/spark-scylla-fullscan/build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@

ThisBuild / version := "0.1"

lazy val root = (project in file("."))
.settings(
name := "spark-scylla-fullscan"
)

inThisBuild(
List(
idePackagePrefix := Some("com.scylladb"),
organization := "com.scylladb",
scalaVersion := "2.13.14",
scalacOptions ++= Seq("-release:8", "-deprecation", "-unchecked", "-feature"),
)
)

ThisBuild / libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "3.5.5" % "provided",
"org.apache.spark" %% "spark-sql" % "3.5.5" % "provided",

"com.datastax.spark" %% "spark-cassandra-connector" % "3.5.1"
)
1 change: 1 addition & 0 deletions spark/spark-scylla-fullscan/project/build.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
sbt.version = 1.10.0
4 changes: 4 additions & 0 deletions spark/spark-scylla-fullscan/project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
addSbtPlugin("org.jetbrains.scala" % "sbt-ide-settings" % "1.1.2")

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.2.0")

54 changes: 54 additions & 0 deletions spark/spark-scylla-fullscan/samplespark3.cql
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
CREATE KEYSPACE datastore WITH REPLICATION = { 'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1} ;

use datastore;

drop INDEX owner_id_index;
DROP table data;

CREATE TABLE data ( id uuid,
group_id uuid,
owner_id uuid,
data blob,
changed_at timestamp,
PRIMARY KEY ((id, group_id)) ) WITH compaction = {'class': 'IncrementalCompactionStrategy', 'space_amplification_goal': '1.75'}
AND compression = {'compression_level': '3', 'sstable_compression': 'org.apache.cassandra.io.compress.ZstdCompressor'};

CREATE INDEX owner_id_index ON datastore.data(owner_id);

insert into data(id, group_id, owner_id, data, changed_at) values (e7ae5cf3-d358-4d99-b900-85902fda9bb0, 27dc4604-3d36-4b69-b053-954db147fd03, c46cb523-4202-4212-a8e9-024d46529907, TextAsBlob('data1'), '2017-04-01');
insert into data(id, group_id, owner_id, data, changed_at) values (f7ae5cf3-d358-4d99-b900-85902fda9bb0, 27dc4604-3d36-4b69-b053-954db147fd03, d46cb523-4202-4212-a8e9-024d46529907, TextAsBlob('data2'), '2017-04-02');
insert into data(id, group_id, owner_id, data, changed_at) values (a7ae5cf3-d358-4d99-b900-85902fda9bb0, 37dc4604-3d36-4b69-b053-954db147fd03, d46cb523-4202-4212-a8e9-024d46529907, TextAsBlob('data3'), '2017-04-03');
insert into data(id, group_id, owner_id, data, changed_at) values (b7ae5cf3-d358-4d99-b900-85902fda9bb0, 37dc4604-3d36-4b69-b053-954db147fd03, c46cb523-4202-4212-a8e9-024d46529907, TextAsBlob('data4'), '2017-04-04');

alter table datastore.data WITH default_time_to_live = 300000;

insert into data(id, group_id, owner_id, data, changed_at) values (17ae5cf3-d358-4d99-b900-85902fda9bb0, 27dc4604-3d36-4b69-b053-954db147fd03, c46cb523-4202-4212-a8e9-024d46529907, TextAsBlob('data5'), '2017-04-05');
insert into data(id, group_id, owner_id, data, changed_at) values (27ae5cf3-d358-4d99-b900-85902fda9bb0, 27dc4604-3d36-4b69-b053-954db147fd03, d46cb523-4202-4212-a8e9-024d46529907, TextAsBlob('data6'), '2017-04-06');
insert into data(id, group_id, owner_id, data, changed_at) values (37ae5cf3-d358-4d99-b900-85902fda9bb0, 37dc4604-3d36-4b69-b053-954db147fd03, d46cb523-4202-4212-a8e9-024d46529907, TextAsBlob('data7'), '2017-04-07');
insert into data(id, group_id, owner_id, data, changed_at) values (47ae5cf3-d358-4d99-b900-85902fda9bb0, 37dc4604-3d36-4b69-b053-954db147fd03, c46cb523-4202-4212-a8e9-024d46529907, TextAsBlob('data8'), '2017-04-08');

select * from datastore.data;

'lookup
CREATE TABLE data_lookup ( id uuid,
group_id uuid,
owner_id uuid,
bucket smallint,
changed_at timestamp,
PRIMARY KEY ((owner_id, group_id, bucket)) ) WITH compaction = {'class': 'IncrementalCompactionStrategy', 'space_amplification_goal': '1.75'}
AND compression = {'compression_level': '3', 'sstable_compression': 'org.apache.cassandra.io.compress.ZstdCompressor'};
insert into data_lookup(id, group_id, owner_id, bucket, changed_at) values (e7ae5cf3-d358-4d99-b900-85902fda9bb0, 27dc4604-3d36-4b69-b053-954db147fd03, c46cb523-4202-4212-a8e9-024d46529907, 0, '2017-04-01');
insert into data_lookup(id, group_id, owner_id, bucket, changed_at) values (f7ae5cf3-d358-4d99-b900-85902fda9bb0, 27dc4604-3d36-4b69-b053-954db147fd03, d46cb523-4202-4212-a8e9-024d46529907, 1, '2017-04-02');
insert into data_lookup(id, group_id, owner_id, bucket, changed_at) values (a7ae5cf3-d358-4d99-b900-85902fda9bb0, 37dc4604-3d36-4b69-b053-954db147fd03, d46cb523-4202-4212-a8e9-024d46529907, 2, '2017-04-03');
insert into data_lookup(id, group_id, owner_id, bucket, changed_at) values (b7ae5cf3-d358-4d99-b900-85902fda9bb0, 37dc4604-3d36-4b69-b053-954db147fd03, c46cb523-4202-4212-a8e9-024d46529907, 3, '2017-04-04');

alter table datastore.data WITH default_time_to_live = 300000;

insert into data_lookup(id, group_id, owner_id, bucket, changed_at) values (17ae5cf3-d358-4d99-b900-85902fda9bb0, 27dc4604-3d36-4b69-b053-954db147fd03, c46cb523-4202-4212-a8e9-024d46529907, 4, '2017-04-05');
insert into data_lookup(id, group_id, owner_id, bucket, changed_at) values (27ae5cf3-d358-4d99-b900-85902fda9bb0, 27dc4604-3d36-4b69-b053-954db147fd03, d46cb523-4202-4212-a8e9-024d46529907, 5, '2017-04-06');
insert into data_lookup(id, group_id, owner_id, bucket, changed_at) values (37ae5cf3-d358-4d99-b900-85902fda9bb0, 37dc4604-3d36-4b69-b053-954db147fd03, d46cb523-4202-4212-a8e9-024d46529907, 6, '2017-04-07');
insert into data_lookup(id, group_id, owner_id, bucket, changed_at) values (47ae5cf3-d358-4d99-b900-85902fda9bb0, 37dc4604-3d36-4b69-b053-954db147fd03, c46cb523-4202-4212-a8e9-024d46529907, 7, '2017-04-08');

select * from datastore.data_lookup;

select id,group_id,owner_id,ttl(data),changed_at from datastore.data;
14 changes: 14 additions & 0 deletions spark/spark-scylla-fullscan/spark3-shell.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/bin/bash

set -x

export JAVA_HOME=/usr/lib/jvm/java-21-openjdk-amd64/
export PATH=$JAVA_HOME/bin:$PATH

mkdir /tmp/savepoints

./spark3/bin/spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.13:3.5.5 \
--conf spark.cassandra.connection.host=127.0.0.1 \
--conf spark.cassandra.connection.port=9044 \
--conf spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions
# --master spark://tublat:7077 \
78 changes: 78 additions & 0 deletions spark/spark-scylla-fullscan/src/main/scala/FullScanDelete.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package com.scylladb

import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.{CassandraConnector, CassandraConnectorConf, IpBasedContactInfo, Schema}
import com.datastax.spark.connector.writer.SqlRowWriter
import org.apache.log4j.LogManager
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.cassandra._

import java.net.InetSocketAddress
import scala.collection.immutable.Seq

// Let's do a full scan and filter based on UUID
// let's delete those rows then as part of cleanup

object FullScanDelete {

def main(args: Array[String]): Unit = {

val log = LogManager.getLogger("com.scylladb.FullScan")

implicit val spark: SparkSession = SparkSession
.builder()
.appName("FullScanApp")
.config("spark.task.maxFailures", "4")
.config("spark.stage.maxConsecutiveAttempts", "6")
.withExtensions(new CassandraSparkExtensions)
.getOrCreate()

// val scyllaHost = "127.0.0.1"
// val scyllaPort = 9044

val scyllaHost = "192.168.1.177"
val scyllaPort = 9042

val scyllaKeyspace = "datastore" // cassandra doesn't honor case sensitivity by default
val scyllaTable = "data"

spark.conf.set(s"spark.cassandra.connection.host", scyllaHost)
spark.conf.set(s"spark.cassandra.connection.port", scyllaPort) //for some unknown reason below catalog doesn't accept a port

spark.conf.set(s"spark.sql.catalog.localScylla", "com.datastax.spark.connector.datasource.CassandraCatalog")
spark.conf.set(s"spark.sql.catalog.localScylla.spark.cassandra.connection.host", scyllaHost)
spark.conf.set(s"spark.sql.catalog.localScylla.spark.cassandra.connection.port", scyllaPort) //is this really used?

val connector = new CassandraConnector(CassandraConnectorConf (spark.sparkContext.getConf.clone()).copy(
contactInfo = IpBasedContactInfo(
hosts = Set(new InetSocketAddress(scyllaHost, scyllaPort))
)
))

val compareFieldName = "owner_id" //what field we need to filter on
val filterValue = "d46cb523-4202-4212-a8e9-024d46529907" //we are just interested into this ID

val dataset = spark.read.cassandraFormat.table(s"localScylla.$scyllaKeyspace.$scyllaTable")

val accum = spark.sparkContext.longAccumulator("Deleted Row Count")

val filtered_dataset = dataset.filter(row =>
row.getString(row.fieldIndex(compareFieldName)) == filterValue
)

val deleteRows = filtered_dataset.map(
row => {
//print(row)
accum.add(1)
val frow = Row.fromSeq(Seq(row.get(0),row.get(1))) //TODO rewrite in dynamic way to match just PK for deletes
frow
}
)(filtered_dataset.encoder)

deleteRows.rdd.deleteFromCassandra(scyllaKeyspace,scyllaTable)(connector, SqlRowWriter.Factory)
println("Accumulator: \""+accum.name.getOrElse("NoName")+"\" is set to: "+accum.value)

}


}
Loading