From f838b06c3b64f889e4f6a3f87399cd383395de6a Mon Sep 17 00:00:00 2001 From: Lubos Kosco Date: Wed, 2 Apr 2025 12:26:52 +0200 Subject: [PATCH] [spark] full scan delete and ttl examples --- spark/spark-scylla-fullscan/README.md | 132 +++++++++++++ spark/spark-scylla-fullscan/build.sbt | 23 +++ .../project/build.properties | 1 + .../spark-scylla-fullscan/project/plugins.sbt | 4 + spark/spark-scylla-fullscan/samplespark3.cql | 54 ++++++ spark/spark-scylla-fullscan/spark3-shell.sh | 14 ++ .../src/main/scala/FullScanDelete.scala | 78 ++++++++ .../src/main/scala/FullScanTTL.scala | 173 ++++++++++++++++++ .../start-scylla-container.sh | 13 ++ spark/spark-scylla-fullscan/start-spark3.sh | 21 +++ spark/spark-scylla-fullscan/stop-spark3.sh | 17 ++ .../submit_job_spark3_delete.sh | 16 ++ .../submit_job_spark3_ttl.sh | 17 ++ 13 files changed, 563 insertions(+) create mode 100644 spark/spark-scylla-fullscan/README.md create mode 100644 spark/spark-scylla-fullscan/build.sbt create mode 100644 spark/spark-scylla-fullscan/project/build.properties create mode 100644 spark/spark-scylla-fullscan/project/plugins.sbt create mode 100644 spark/spark-scylla-fullscan/samplespark3.cql create mode 100644 spark/spark-scylla-fullscan/spark3-shell.sh create mode 100644 spark/spark-scylla-fullscan/src/main/scala/FullScanDelete.scala create mode 100644 spark/spark-scylla-fullscan/src/main/scala/FullScanTTL.scala create mode 100644 spark/spark-scylla-fullscan/start-scylla-container.sh create mode 100644 spark/spark-scylla-fullscan/start-spark3.sh create mode 100644 spark/spark-scylla-fullscan/stop-spark3.sh create mode 100644 spark/spark-scylla-fullscan/submit_job_spark3_delete.sh create mode 100644 spark/spark-scylla-fullscan/submit_job_spark3_ttl.sh diff --git a/spark/spark-scylla-fullscan/README.md b/spark/spark-scylla-fullscan/README.md new file mode 100644 index 0000000..a4145af --- /dev/null +++ b/spark/spark-scylla-fullscan/README.md @@ -0,0 +1,132 @@ +This demo shows how to run a simple spark 3.5 job to enrich a simple scylla 2025.1 table + +Prerequisites: +--------------- +Read and run below carefully, any of steps is skipped or broken, the whole app will error out with weird messages ;-) +Below works well in Fedora 34, replace the dnf/yum commands with their appropriate apt alternative for debian systems + +Make sure you have docker installed. +https://docs.docker.com/engine/install/ + +start a scylla container: +``` +./start-scylla-container.sh +``` + +Now you should have scylla listening on port 9044 +verify using `docker ps` + +Make sure you have OpenJDK 21 installed in /usr/lib/jvm/java-21 +If it's elsewhere, fix paths in *.sh scripts !!! + +``` +sudo dnf -y install java-21-openjdk-devel +``` + +Get Spark 3.5.5 WITH scala 2.13: +https://spark.apache.org/downloads.html + +``` +wget https://dlcdn.apache.org/spark/spark-3.5.5/spark-3.5.5-bin-hadoop3-scala2.13.tgz +``` + +unzip it locally and symlink spark3 dir to it, e.g. + +``` +tar xzvf spark-3.5.5-bin-hadoop3-scala2.13.tgz +ln -s spark-3.5.5-bin-hadoop3-scala2.13 spark3 +``` + +Get local cqlsh + +``` +sudo dnf -y install git + +git clone https://github.com/scylladb/scylla-tools-java.git + +``` + +Make sure you have python2 at least installed. +Check if scylla container really runs after checkout of above: + +``` +./scylla-tools-java/bin/cqlsh $HOSTNAME 9044 +``` + +Make sure you have latest sbt: +https://www.scala-sbt.org/1.x/docs/Installing-sbt-on-Linux.html + + +Running the demo: +----------------- + +Populate the DB: + +``` +./scylla-tools-java/bin/cqlsh $HOSTNAME 9044 -f samplespark3.cql +``` +(check extra commands for showing the contents after data load, note null values in "lettersinname") + +Build the project: + +``` +java --version +``` + +should say OpenJDK 11 +then build: + +``` +sbt assembly +``` + +Verify you have the jar built: +``` +ls -la target/scala-2.13/spark-scylla-fullscan-assembly-0.1.jar +``` + +Start spark3: + +``` +./start-spark3.sh +``` + +UI should be listening on $HOSTNAME:8080 +(or any bigger free port, e.g. 8081) + +Submit the app: + +``` +./submit_job_spark3.sh +``` + +Ideally close to the end before "SparkUI: Stopped Spark web UI" you will see: + +`Accumulator: "Changed Row Count" is set to: 4` + +And that means the first run was sucessfull and updated the rows without info. + +If you go for second round, job will be smarter to not update stuff without null values for "lettersinname" column: + +Accumulator: "Changed Row Count" is set to: 0 + +And that's it :-) + +Extra commands: +--------------- + +to trash the keyspace easily do: +``` +./scylla-tools-java/bin/cqlsh $HOSTNAME 9044 -e "drop keyspace myOwnKeyspace" +``` + +to show current rows: +``` +./scylla-tools-java/bin/cqlsh $HOSTNAME 9044 -e "select * from myOwnKeyspace.sparseTable" +``` + +If you want to run the commands from the app interactively, use Spark REPL: +``` +./spark3-shell.sh +``` + diff --git a/spark/spark-scylla-fullscan/build.sbt b/spark/spark-scylla-fullscan/build.sbt new file mode 100644 index 0000000..55316ea --- /dev/null +++ b/spark/spark-scylla-fullscan/build.sbt @@ -0,0 +1,23 @@ + +ThisBuild / version := "0.1" + +lazy val root = (project in file(".")) + .settings( + name := "spark-scylla-fullscan" + ) + +inThisBuild( + List( + idePackagePrefix := Some("com.scylladb"), + organization := "com.scylladb", + scalaVersion := "2.13.14", + scalacOptions ++= Seq("-release:8", "-deprecation", "-unchecked", "-feature"), + ) +) + +ThisBuild / libraryDependencies ++= Seq( + "org.apache.spark" %% "spark-core" % "3.5.5" % "provided", + "org.apache.spark" %% "spark-sql" % "3.5.5" % "provided", + + "com.datastax.spark" %% "spark-cassandra-connector" % "3.5.1" + ) diff --git a/spark/spark-scylla-fullscan/project/build.properties b/spark/spark-scylla-fullscan/project/build.properties new file mode 100644 index 0000000..ee06c39 --- /dev/null +++ b/spark/spark-scylla-fullscan/project/build.properties @@ -0,0 +1 @@ +sbt.version = 1.10.0 \ No newline at end of file diff --git a/spark/spark-scylla-fullscan/project/plugins.sbt b/spark/spark-scylla-fullscan/project/plugins.sbt new file mode 100644 index 0000000..eac603d --- /dev/null +++ b/spark/spark-scylla-fullscan/project/plugins.sbt @@ -0,0 +1,4 @@ +addSbtPlugin("org.jetbrains.scala" % "sbt-ide-settings" % "1.1.2") + +addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.2.0") + diff --git a/spark/spark-scylla-fullscan/samplespark3.cql b/spark/spark-scylla-fullscan/samplespark3.cql new file mode 100644 index 0000000..4d81a68 --- /dev/null +++ b/spark/spark-scylla-fullscan/samplespark3.cql @@ -0,0 +1,54 @@ +CREATE KEYSPACE datastore WITH REPLICATION = { 'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1} ; + +use datastore; + +drop INDEX owner_id_index; +DROP table data; + +CREATE TABLE data ( id uuid, + group_id uuid, + owner_id uuid, + data blob, + changed_at timestamp, + PRIMARY KEY ((id, group_id)) ) WITH compaction = {'class': 'IncrementalCompactionStrategy', 'space_amplification_goal': '1.75'} + AND compression = {'compression_level': '3', 'sstable_compression': 'org.apache.cassandra.io.compress.ZstdCompressor'}; + +CREATE INDEX owner_id_index ON datastore.data(owner_id); + +insert into data(id, group_id, owner_id, data, changed_at) values (e7ae5cf3-d358-4d99-b900-85902fda9bb0, 27dc4604-3d36-4b69-b053-954db147fd03, c46cb523-4202-4212-a8e9-024d46529907, TextAsBlob('data1'), '2017-04-01'); +insert into data(id, group_id, owner_id, data, changed_at) values (f7ae5cf3-d358-4d99-b900-85902fda9bb0, 27dc4604-3d36-4b69-b053-954db147fd03, d46cb523-4202-4212-a8e9-024d46529907, TextAsBlob('data2'), '2017-04-02'); +insert into data(id, group_id, owner_id, data, changed_at) values (a7ae5cf3-d358-4d99-b900-85902fda9bb0, 37dc4604-3d36-4b69-b053-954db147fd03, d46cb523-4202-4212-a8e9-024d46529907, TextAsBlob('data3'), '2017-04-03'); +insert into data(id, group_id, owner_id, data, changed_at) values (b7ae5cf3-d358-4d99-b900-85902fda9bb0, 37dc4604-3d36-4b69-b053-954db147fd03, c46cb523-4202-4212-a8e9-024d46529907, TextAsBlob('data4'), '2017-04-04'); + +alter table datastore.data WITH default_time_to_live = 300000; + +insert into data(id, group_id, owner_id, data, changed_at) values (17ae5cf3-d358-4d99-b900-85902fda9bb0, 27dc4604-3d36-4b69-b053-954db147fd03, c46cb523-4202-4212-a8e9-024d46529907, TextAsBlob('data5'), '2017-04-05'); +insert into data(id, group_id, owner_id, data, changed_at) values (27ae5cf3-d358-4d99-b900-85902fda9bb0, 27dc4604-3d36-4b69-b053-954db147fd03, d46cb523-4202-4212-a8e9-024d46529907, TextAsBlob('data6'), '2017-04-06'); +insert into data(id, group_id, owner_id, data, changed_at) values (37ae5cf3-d358-4d99-b900-85902fda9bb0, 37dc4604-3d36-4b69-b053-954db147fd03, d46cb523-4202-4212-a8e9-024d46529907, TextAsBlob('data7'), '2017-04-07'); +insert into data(id, group_id, owner_id, data, changed_at) values (47ae5cf3-d358-4d99-b900-85902fda9bb0, 37dc4604-3d36-4b69-b053-954db147fd03, c46cb523-4202-4212-a8e9-024d46529907, TextAsBlob('data8'), '2017-04-08'); + +select * from datastore.data; + +'lookup +CREATE TABLE data_lookup ( id uuid, + group_id uuid, + owner_id uuid, + bucket smallint, + changed_at timestamp, + PRIMARY KEY ((owner_id, group_id, bucket)) ) WITH compaction = {'class': 'IncrementalCompactionStrategy', 'space_amplification_goal': '1.75'} + AND compression = {'compression_level': '3', 'sstable_compression': 'org.apache.cassandra.io.compress.ZstdCompressor'}; +insert into data_lookup(id, group_id, owner_id, bucket, changed_at) values (e7ae5cf3-d358-4d99-b900-85902fda9bb0, 27dc4604-3d36-4b69-b053-954db147fd03, c46cb523-4202-4212-a8e9-024d46529907, 0, '2017-04-01'); +insert into data_lookup(id, group_id, owner_id, bucket, changed_at) values (f7ae5cf3-d358-4d99-b900-85902fda9bb0, 27dc4604-3d36-4b69-b053-954db147fd03, d46cb523-4202-4212-a8e9-024d46529907, 1, '2017-04-02'); +insert into data_lookup(id, group_id, owner_id, bucket, changed_at) values (a7ae5cf3-d358-4d99-b900-85902fda9bb0, 37dc4604-3d36-4b69-b053-954db147fd03, d46cb523-4202-4212-a8e9-024d46529907, 2, '2017-04-03'); +insert into data_lookup(id, group_id, owner_id, bucket, changed_at) values (b7ae5cf3-d358-4d99-b900-85902fda9bb0, 37dc4604-3d36-4b69-b053-954db147fd03, c46cb523-4202-4212-a8e9-024d46529907, 3, '2017-04-04'); + +alter table datastore.data WITH default_time_to_live = 300000; + +insert into data_lookup(id, group_id, owner_id, bucket, changed_at) values (17ae5cf3-d358-4d99-b900-85902fda9bb0, 27dc4604-3d36-4b69-b053-954db147fd03, c46cb523-4202-4212-a8e9-024d46529907, 4, '2017-04-05'); +insert into data_lookup(id, group_id, owner_id, bucket, changed_at) values (27ae5cf3-d358-4d99-b900-85902fda9bb0, 27dc4604-3d36-4b69-b053-954db147fd03, d46cb523-4202-4212-a8e9-024d46529907, 5, '2017-04-06'); +insert into data_lookup(id, group_id, owner_id, bucket, changed_at) values (37ae5cf3-d358-4d99-b900-85902fda9bb0, 37dc4604-3d36-4b69-b053-954db147fd03, d46cb523-4202-4212-a8e9-024d46529907, 6, '2017-04-07'); +insert into data_lookup(id, group_id, owner_id, bucket, changed_at) values (47ae5cf3-d358-4d99-b900-85902fda9bb0, 37dc4604-3d36-4b69-b053-954db147fd03, c46cb523-4202-4212-a8e9-024d46529907, 7, '2017-04-08'); + +select * from datastore.data_lookup; + +select id,group_id,owner_id,ttl(data),changed_at from datastore.data; diff --git a/spark/spark-scylla-fullscan/spark3-shell.sh b/spark/spark-scylla-fullscan/spark3-shell.sh new file mode 100644 index 0000000..7371d49 --- /dev/null +++ b/spark/spark-scylla-fullscan/spark3-shell.sh @@ -0,0 +1,14 @@ +#!/bin/bash + +set -x + +export JAVA_HOME=/usr/lib/jvm/java-21-openjdk-amd64/ +export PATH=$JAVA_HOME/bin:$PATH + +mkdir /tmp/savepoints + +./spark3/bin/spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.13:3.5.5 \ +--conf spark.cassandra.connection.host=127.0.0.1 \ +--conf spark.cassandra.connection.port=9044 \ +--conf spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions +# --master spark://tublat:7077 \ diff --git a/spark/spark-scylla-fullscan/src/main/scala/FullScanDelete.scala b/spark/spark-scylla-fullscan/src/main/scala/FullScanDelete.scala new file mode 100644 index 0000000..23d99d6 --- /dev/null +++ b/spark/spark-scylla-fullscan/src/main/scala/FullScanDelete.scala @@ -0,0 +1,78 @@ +package com.scylladb + +import com.datastax.spark.connector._ +import com.datastax.spark.connector.cql.{CassandraConnector, CassandraConnectorConf, IpBasedContactInfo, Schema} +import com.datastax.spark.connector.writer.SqlRowWriter +import org.apache.log4j.LogManager +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.cassandra._ + +import java.net.InetSocketAddress +import scala.collection.immutable.Seq + +// Let's do a full scan and filter based on UUID +// let's delete those rows then as part of cleanup + +object FullScanDelete { + + def main(args: Array[String]): Unit = { + + val log = LogManager.getLogger("com.scylladb.FullScan") + + implicit val spark: SparkSession = SparkSession + .builder() + .appName("FullScanApp") + .config("spark.task.maxFailures", "4") + .config("spark.stage.maxConsecutiveAttempts", "6") + .withExtensions(new CassandraSparkExtensions) + .getOrCreate() + +// val scyllaHost = "127.0.0.1" +// val scyllaPort = 9044 + + val scyllaHost = "192.168.1.177" + val scyllaPort = 9042 + + val scyllaKeyspace = "datastore" // cassandra doesn't honor case sensitivity by default + val scyllaTable = "data" + + spark.conf.set(s"spark.cassandra.connection.host", scyllaHost) + spark.conf.set(s"spark.cassandra.connection.port", scyllaPort) //for some unknown reason below catalog doesn't accept a port + + spark.conf.set(s"spark.sql.catalog.localScylla", "com.datastax.spark.connector.datasource.CassandraCatalog") + spark.conf.set(s"spark.sql.catalog.localScylla.spark.cassandra.connection.host", scyllaHost) + spark.conf.set(s"spark.sql.catalog.localScylla.spark.cassandra.connection.port", scyllaPort) //is this really used? + + val connector = new CassandraConnector(CassandraConnectorConf (spark.sparkContext.getConf.clone()).copy( + contactInfo = IpBasedContactInfo( + hosts = Set(new InetSocketAddress(scyllaHost, scyllaPort)) + ) + )) + + val compareFieldName = "owner_id" //what field we need to filter on + val filterValue = "d46cb523-4202-4212-a8e9-024d46529907" //we are just interested into this ID + + val dataset = spark.read.cassandraFormat.table(s"localScylla.$scyllaKeyspace.$scyllaTable") + + val accum = spark.sparkContext.longAccumulator("Deleted Row Count") + + val filtered_dataset = dataset.filter(row => + row.getString(row.fieldIndex(compareFieldName)) == filterValue + ) + + val deleteRows = filtered_dataset.map( + row => { + //print(row) + accum.add(1) + val frow = Row.fromSeq(Seq(row.get(0),row.get(1))) //TODO rewrite in dynamic way to match just PK for deletes + frow + } + )(filtered_dataset.encoder) + + deleteRows.rdd.deleteFromCassandra(scyllaKeyspace,scyllaTable)(connector, SqlRowWriter.Factory) + println("Accumulator: \""+accum.name.getOrElse("NoName")+"\" is set to: "+accum.value) + + } + + +} diff --git a/spark/spark-scylla-fullscan/src/main/scala/FullScanTTL.scala b/spark/spark-scylla-fullscan/src/main/scala/FullScanTTL.scala new file mode 100644 index 0000000..9e7a4e4 --- /dev/null +++ b/spark/spark-scylla-fullscan/src/main/scala/FullScanTTL.scala @@ -0,0 +1,173 @@ +package com.scylladb + +import com.datastax.oss.driver.api.core.ConsistencyLevel +import com.datastax.spark.connector._ +import com.datastax.spark.connector.cql.{CassandraConnector, CassandraConnectorConf, IpBasedContactInfo, NoAuthConf, PasswordAuthConf, Schema} +import org.apache.log4j.LogManager +import org.apache.spark.sql.cassandra._ +import org.apache.spark.sql.{Column, Row, SparkSession} +import com.datastax.spark.connector._ +import com.datastax.spark.connector.rdd.CassandraTableScanRDD +import com.datastax.spark.connector.writer.{SqlRowWriter, TTLOption, WriteConf} +import org.apache.spark.rdd.RDD + +import java.net.InetSocketAddress + +// Let's do a full scan and filter based on UUID +// for those rows let's do a TTL update + +object FullScanTTL { + + def main(args: Array[String]): Unit = { + + val log = LogManager.getLogger("com.scylladb.FullScan") + + implicit val spark: SparkSession = SparkSession + .builder() + .appName("FullScanApp") + .config("spark.task.maxFailures", "4") + .config("spark.stage.maxConsecutiveAttempts", "6") + .withExtensions(new CassandraSparkExtensions) + .getOrCreate() + +// val scyllaHost = "127.0.0.1" +// val scyllaPort = 9044 + + val scyllaHost = "192.168.1.177" + val scyllaPort = 9042 + + val scyllaKeyspace = "datastore" // cassandra doesn't honor case sensitivity by default + val scyllaTable = "data" + + spark.conf.set(s"spark.cassandra.connection.host", scyllaHost) + spark.conf.set(s"spark.cassandra.connection.port", scyllaPort) //for some unknown reason below catalog doesn't accept a port + + spark.conf.set(s"spark.sql.catalog.localScylla", "com.datastax.spark.connector.datasource.CassandraCatalog") + spark.conf.set(s"spark.sql.catalog.localScylla.spark.cassandra.connection.host", scyllaHost) + spark.conf.set(s"spark.sql.catalog.localScylla.spark.cassandra.connection.port", scyllaPort) //is this really used? + + val connector = new CassandraConnector(CassandraConnectorConf (spark.sparkContext.getConf.clone()).copy( + contactInfo = IpBasedContactInfo( + hosts = Set(new InetSocketAddress(scyllaHost, scyllaPort)) + ) + )) + + val tableDef = + connector.withSessionDo(Schema.tableFromCassandra(_, scyllaKeyspace, scyllaTable)) + log.info("TableDef retrieved for source:") + log.info(tableDef) + + val columnRefs = + tableDef.partitionKey.map(_.ref) ++ + tableDef.clusteringColumns.map(_.ref) ++ + tableDef.regularColumns//.map(_.ref) + .flatMap { column => + val colName = column.columnName + List( + column.ref, + colName.ttl as s"${colName}_ttl", + colName.writeTime as s"${colName}_writetime" + ) + } + log.info("ColumnRefs generated for selection:") + log.info(columnRefs.mkString("\n")) + + val compareFieldName = "owner_id" //what field we need to filter on + val compareFieldNameTTL = "data_ttl" + val compareFieldNameTTLCQL = "ttl(data) as "+compareFieldNameTTL + + //below is not needed if we don't convert to text + val cols = columnRefs.map ( + e => e.cqlValueName + ) + val first=cols.head + var restc = cols.slice(2, cols.length) +// println(first) +// println(restc) + restc = restc ++ Seq(compareFieldNameTTLCQL) + +// log.info("list of selectors:") +// log.info(restc.mkString("\n")) + + //val colsmanualbase = Seq("id","group_id") + val colsmanualbase = tableDef.partitionKey.map(_.ref) ++ + Seq(tableDef.regularColumns.map(_.ref).find(p => p.columnName=="data").get) + + val ttlstabledef = tableDef.regularColumns.flatMap { column => + val colName = column.columnName + List( + column.ref, + colName.ttl as s"${colName}_ttl", + colName.writeTime as s"${colName}_writetime" + ) + } + + val colsmanual = colsmanualbase ++ + Seq(tableDef.regularColumns.map(_.ref).find(p => p.columnName=="owner_id").get) ++ + Seq(ttlstabledef.find(p => p.cqlValueName=="ttl(data)").get) + + + val dataset = spark.sparkContext + .cassandraTable[CassandraSQLRow]( + scyllaKeyspace, + scyllaTable) + .withConnector(connector) + //.read.cassandraFormat.table(s"localScylla.$scyllaKeyspace.$scyllaTable").rdd.asInstanceOf[CassandraTableScanRDD[CassandraSQLRow]] + .select(colsmanual: _*) +// TODO above is hardcoded schema, so all below instead of recalling index has to use hardcoded position based on above, fix to schema + + //.select(first,cols:_*) + +// .select(first,restc:_*) +// .cassandraTable[CassandraSQLRow] + + val accum = spark.sparkContext.longAccumulator("Changed Row Count") + + val filterValue = "d46cb523-4202-4212-a8e9-024d46529907" //we are just interested into this ID + + val filtered_dataset = dataset.filter(row => + row.getString(3) == filterValue //row.fieldIndex(compareFieldName) // IF we have our subset of data found + && row.get(4) == null // IF TTL is null (so old data before alter TTL was done, we can even check if TTL is too low, etc. ) + ) + + //TODO fetch ALL rows which ttl you want to update in second step to save network costs + + val updatedTTL = filtered_dataset.map( + row => { +// print(row) +// val ttl = row.getLong(4) +// val ttl = row.getLong(compareFieldNameTTL) +// print (ttl); +// println(row.get(row.fieldIndex(compareFieldNameTTL))) + accum.add(1) + val frow = Row.fromSeq(Seq(row.get(0),row.get(1),row.get(2))) //TODO rewrite to match schema / colsmanualbase + frow + } + ) + //(filtered_dataset.encoder) + + val tempWriteConf = WriteConf + .fromSparkConf(spark.sparkContext.getConf) + .copy(consistencyLevel = ConsistencyLevel.LOCAL_QUORUM) + + val writeConf = + tempWriteConf.copy(ttl = TTLOption.constant(200000)); + + val columnSelector = SomeColumns(colsmanualbase: _*) // this has to match original rdd with selected columns !!! + +// updatedTTL.collect() + + updatedTTL.asInstanceOf[RDD[Row]] + .saveToCassandra( + scyllaKeyspace, + scyllaTable, + columnSelector, + writeConf + ) (connector, SqlRowWriter.Factory) + + println("Accumulator: \""+accum.name.getOrElse("NoName")+"\" is set to: "+accum.value) + + } + + +} diff --git a/spark/spark-scylla-fullscan/start-scylla-container.sh b/spark/spark-scylla-fullscan/start-scylla-container.sh new file mode 100644 index 0000000..bdb2b0e --- /dev/null +++ b/spark/spark-scylla-fullscan/start-scylla-container.sh @@ -0,0 +1,13 @@ +#!/bin/bash +set -x + +docker pull scylladb/scylla:2025.1 + +docker rm scylla-spark + +#EXPOSE 10000 9042 9160 9180 7000 7001 22 +#--hostname some-scylla +docker run --name scylla-spark -p 10001:10000 -p 24:22 -p 7004:7000 -p 7005:7001 -p 9181:9180 -p 9044:9042 -p 9162:9160 -d scylladb/scylla:6.2 + +docker start scylla-spark + diff --git a/spark/spark-scylla-fullscan/start-spark3.sh b/spark/spark-scylla-fullscan/start-spark3.sh new file mode 100644 index 0000000..810a220 --- /dev/null +++ b/spark/spark-scylla-fullscan/start-spark3.sh @@ -0,0 +1,21 @@ +#!/bin/bash +set -x + +export HOSTIP=$HOSTNAME +export SPARK_LOCAL_IP=$HOSTIP + +export JAVA_HOME=/usr/lib/jvm/java-21-openjdk-amd64 +export PATH=$JAVA_HOME/bin:$PATH + +cd spark3/sbin + +./start-master.sh + +/bin/mkdir /tmp/spark-events + +./start-history-server.sh + +#./start-worker.sh spark://$HOSTIP:7077 -c 8 -m 32G +./start-worker.sh spark://$HOSTIP:7077 -c 2 -m 2G + +#./start-shuffle-service.sh diff --git a/spark/spark-scylla-fullscan/stop-spark3.sh b/spark/spark-scylla-fullscan/stop-spark3.sh new file mode 100644 index 0000000..a46c922 --- /dev/null +++ b/spark/spark-scylla-fullscan/stop-spark3.sh @@ -0,0 +1,17 @@ +#!/bin/bash +set -x + +export HOSTIP=$HOSTNAME +export SPARK_LOCAL_IP=$HOSTIP + +export JAVA_HOME=/usr/lib/jvm/java-21-openjdk-amd64 +export PATH=$JAVA_HOME/bin:$PATH + +cd spark3/sbin + +#./stop-worker.sh spark://$HOSTIP:7077 -c 8 -m 32G +./stop-worker.sh spark://$HOSTIP:7077 -c 2 -m 2G + +./stop-history-server.sh + +./stop-master.sh -h $HOSTIP diff --git a/spark/spark-scylla-fullscan/submit_job_spark3_delete.sh b/spark/spark-scylla-fullscan/submit_job_spark3_delete.sh new file mode 100644 index 0000000..eccdc09 --- /dev/null +++ b/spark/spark-scylla-fullscan/submit_job_spark3_delete.sh @@ -0,0 +1,16 @@ +#!/bin/bash + +set -x + +export JAVA_HOME=/usr/lib/jvm/java-21-openjdk-amd64/ +export PATH=$JAVA_HOME/bin:$PATH + +mkdir /tmp/savepoints + +./spark3/bin/spark-submit --class com.scylladb.FullScanDelete \ + --master spark://$HOSTNAME:7077 \ + --conf spark.eventLog.enabled=true \ + target/scala-2.13/spark-scylla-fullscan-assembly-0.1.jar + + + diff --git a/spark/spark-scylla-fullscan/submit_job_spark3_ttl.sh b/spark/spark-scylla-fullscan/submit_job_spark3_ttl.sh new file mode 100644 index 0000000..f5148d3 --- /dev/null +++ b/spark/spark-scylla-fullscan/submit_job_spark3_ttl.sh @@ -0,0 +1,17 @@ +#!/bin/bash + +set -x + +export JAVA_HOME=/usr/lib/jvm/java-21-openjdk-amd64/ +export PATH=$JAVA_HOME/bin:$PATH + +mkdir /tmp/savepoints + +./spark3/bin/spark-submit --class com.scylladb.FullScanTTL \ + --driver-java-options -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=65009 \ + --master spark://$HOSTNAME:7077 \ + --conf spark.eventLog.enabled=true \ + target/scala-2.13/spark-scylla-fullscan-assembly-0.1.jar + + +