From 220f193dd5245b9598dc5dfd4f39eed36ab50cf1 Mon Sep 17 00:00:00 2001 From: Mats Rydberg Date: Mon, 3 Nov 2025 15:08:40 +0100 Subject: [PATCH] Add Spark+BQ notebook MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Max Kießling --- examples/aga-standalone-bq-spark.ipynb | 363 +++++++++++++++++++++++++ 1 file changed, 363 insertions(+) create mode 100644 examples/aga-standalone-bq-spark.ipynb diff --git a/examples/aga-standalone-bq-spark.ipynb b/examples/aga-standalone-bq-spark.ipynb new file mode 100644 index 000000000..002c5e6d0 --- /dev/null +++ b/examples/aga-standalone-bq-spark.ipynb @@ -0,0 +1,363 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "source": [ + "# BigQuery + Spark + Neo4j AGA\n" + ], + "metadata": { + "id": "OlAd6uQkNIU3" + } + }, + { + "cell_type": "markdown", + "source": [ + "## Setup" + ], + "metadata": { + "id": "ACEMXLnpGFuh" + } + }, + { + "cell_type": "markdown", + "source": [ + "We need to do a little setup before we can run this notebook.\n", + "In order to allow the spark workers connect to our session we need to create a new `NAT` network router that routes the workers traffice to the internet.\n", + "\n", + "```shell\n", + "# 1) Cloud Router\n", + "gcloud compute routers create nat-router --network=YOUR_VPC --region=REGION\n", + "\n", + "# 2) (Optional) reserve a static egress IP to allowlist at the third-party\n", + "gcloud compute addresses create spark-egress --region=REGION\n", + "\n", + "# 3) Cloud NAT config (use static IP if you reserved one)\n", + "gcloud compute routers nats create spark-nat \\\n", + " --router=nat-router --router-region=REGION \\\n", + " --nat-all-subnet-ip-ranges \\\n", + " --auto-allocate-nat-external-ips\n", + "```" + ], + "metadata": { + "id": "pQOufV-lEczj" + } + }, + { + "cell_type": "code", + "source": [ + "%pip install graphdatascience==1.16" + ], + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "AcD9BL1hb04_", + "executionInfo": { + "status": "ok", + "timestamp": 1761742505180, + "user_tz": -60, + "elapsed": 5476, + "user": { + "displayName": "", + "userId": "" + } + }, + "outputId": "044002a0-292b-4d60-dcc9-3cef2f8d293a" + }, + "outputs": [], + "execution_count": null + }, + { + "cell_type": "markdown", + "source": [ + "## Create a Spark session" + ], + "metadata": { + "id": "sBUd0iF5YsbS" + } + }, + { + "cell_type": "code", + "source": [ + "from google.cloud.dataproc_spark_connect import DataprocSparkSession\n", + "from google.cloud.dataproc_v1 import Session\n", + "\n", + "session = Session()\n", + "session.environment_config.execution_config.subnetwork_uri = \"projects/team-graph-analytics/regions/europe-west2/subnetworks/default\"\n", + "spark = DataprocSparkSession.builder.dataprocSessionConfig(session).getOrCreate()\n", + "spark.addArtifacts(\"graphdatascience==1.16\", pypi=True)" + ], + "metadata": { + "id": "p2NN8wwF7YRa", + "colab": { + "base_uri": "https://localhost:8080/", + "height": 177 + }, + "executionInfo": { + "status": "ok", + "timestamp": 1761742286558, + "user_tz": -60, + "elapsed": 108523, + "user": { + "displayName": "", + "userId": "" + } + }, + "outputId": "8e0dfb94-3c64-46bf-cba8-5516591db9c1" + }, + "outputs": [], + "execution_count": null + }, + { + "cell_type": "markdown", + "source": [ + "## Load data\n", + "\n", + "Connect to the Big Query Dataset and make it accessible to PySpark" + ], + "metadata": { + "id": "TFrNuh06V3i7" + } + }, + { + "cell_type": "code", + "source": [ + "# Load data from BigQuery\n", + "trips_table = spark.read.format('bigquery') \\\n", + " .option('table', 'bigquery-public-data.new_york.citibike_trips') \\\n", + " .load()\n", + "trips_table.createOrReplaceTempView('trips')" + ], + "metadata": { + "id": "9AyZuQMpZE92", + "colab": { + "base_uri": "https://localhost:8080/", + "height": 46 + }, + "executionInfo": { + "status": "ok", + "timestamp": 1761742475388, + "user_tz": -60, + "elapsed": 9423, + "user": { + "displayName": "", + "userId": "" + } + }, + "outputId": "b6246315-6582-41e3-e61a-9f373ffbb5cd" + }, + "outputs": [], + "execution_count": null + }, + { + "cell_type": "markdown", + "source": [ + "## Creating a session\n", + "\n" + ], + "metadata": { + "id": "VCRfM48LZ9px" + } + }, + { + "cell_type": "code", + "source": [ + "from graphdatascience.session import AuraAPICredentials, GdsSessions, CloudLocation, SessionMemory\n", + "from datetime import timedelta\n", + "\n", + "# you can also use AuraAPICredentials.from_env() to load credentials from environment variables\n", + "api_credentials = AuraAPICredentials(\n", + " client_id=\"\",\n", + " client_secret=\"\",\n", + " # If your account is a member of several project, you must also specify the project ID to use\n", + " project_id=\"\",\n", + ")\n", + "\n", + "sessions = GdsSessions(api_credentials=api_credentials)\n", + "\n", + "# Create a GDS session!\n", + "gds = sessions.get_or_create(\n", + " session_name=\"trips\",\n", + " memory=SessionMemory.m_16GB,\n", + " ttl=timedelta(minutes=30),\n", + " cloud_location=CloudLocation(\"gcp\", \"europe-west1\"),\n", + ")" + ], + "metadata": { + "id": "ioUJZdSjcFJf" + }, + "outputs": [], + "execution_count": null + }, + { + "cell_type": "markdown", + "source": [ + "## Graph projections" + ], + "metadata": { + "id": "CeTSJz_JGW16" + } + }, + { + "cell_type": "code", + "source": [ + "arrow_client = gds._query_runner._query_runner._gds_arrow_client\n", + "arrow_client.create_graph_from_triplets(\"trips\", \"neo4j\")\n" + ], + "metadata": { + "id": "amReY8Xactah" + }, + "outputs": [], + "execution_count": null + }, + { + "cell_type": "code", + "source": [ + "import pyarrow\n", + "def upload_batch(iterator):\n", + " for batch in iterator:\n", + " arrow_client.upload_triplets(\"trips\", [batch])\n", + " yield pyarrow.RecordBatch.from_pydict({})" + ], + "metadata": { + "id": "88-t13CxkkkE" + }, + "outputs": [], + "execution_count": null + }, + { + "cell_type": "code", + "source": [ + "# Total number of sales broken down by product in descending order\n", + "spark.sql(\"\"\"\n", + " SELECT start_station_id AS sourceNode, end_station_id AS targetNode FROM trips LIMIT 10000000\n", + "\"\"\").mapInArrow(upload_batch, \"\").show()" + ], + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/", + "height": 140 + }, + "id": "yPnxlpyFlrHe", + "executionInfo": { + "status": "ok", + "timestamp": 1761743727293, + "user_tz": -60, + "elapsed": 28772, + "user": { + "displayName": "", + "userId": "" + } + }, + "outputId": "e6566086-43c2-4585-9179-5096d3c1f8b2" + }, + "outputs": [], + "execution_count": null + }, + { + "cell_type": "code", + "source": [ + "arrow_client.triplet_load_done(\"trips\")" + ], + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "ujdFtl5iE2Gb", + "executionInfo": { + "status": "ok", + "timestamp": 1761743185856, + "user_tz": -60, + "elapsed": 1003, + "user": { + "displayName": "", + "userId": "" + } + }, + "outputId": "1dec7c0f-3a62-4eea-d4f5-4738b3a4611c" + }, + "outputs": [], + "execution_count": null + }, + { + "cell_type": "markdown", + "source": [ + "## Running an algorithm" + ], + "metadata": { + "id": "3n1JHbbPAXD-" + } + }, + { + "cell_type": "code", + "source": [ + "from graphdatascience import Graph\n", + "G = gds.graph.get(\"trips\")\n", + "gds.degree.stream(G)" + ], + "metadata": { + "id": "UXzIHwHb8PWB", + "executionInfo": { + "status": "ok", + "timestamp": 1761743455379, + "user_tz": -60, + "elapsed": 145, + "user": { + "displayName": "", + "userId": "" + } + }, + "colab": { + "base_uri": "https://localhost:8080/", + "height": 680 + }, + "outputId": "3bc49938-66e7-49dd-8945-228b9f4dd43e" + }, + "outputs": [], + "execution_count": null + }, + { + "cell_type": "markdown", + "source": [ + "# Cleaning up\n", + "\n", + "To clean up all Google Cloud resources used in this project, you can [shut down the Google Cloud project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#shutting_down_projects) you used for the tutorial.\n", + "\n", + "Otherwise, you can delete the individual resources you created in this tutorial:" + ], + "metadata": { + "id": "Szt3be79N53A" + } + }, + { + "cell_type": "code", + "source": [ + "# Stop the Spark session and release all resources\n", + "sessions.delete(session_name=\"trips\")\n", + "spark.stop()" + ], + "metadata": { + "id": "CcUvIx6-N7TN" + }, + "outputs": [], + "execution_count": null + } + ], + "metadata": { + "colab": { + "provenance": [], + "toc_visible": true, + "cell_execution_strategy": "setup", + "name": "BigQuery + Spark + AGA" + }, + "kernelspec": { + "display_name": "Python 3", + "name": "python3" + }, + "language_info": { + "name": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 0 +}