Skip to content

Commit ddb01d4

Browse files
Add 6 window functions from blog post
1 parent 0536879 commit ddb01d4

File tree

8 files changed

+407
-0
lines changed

8 files changed

+407
-0
lines changed

README.md

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
# Introduction
2+
3+
This repository contains the source code for a blog post about window functions in PySpark. Go to [Blog Post](https://mitchellvanrijkom.com). In that post, I described how to use the 6 most window functions in PySpark.
4+
5+
# Window functions
6+
7+
When we work with data in Spark, we commonly use the SQL module. With this module, we can easily create dataframes with the DataFrame APIs that use different optimizers to help in supporting a wide range of data sources and algorithms optimized for big data workloads.
8+
9+
In SQL, we have a particular type of operation called a Window Function. This operation calculates a function on a subset of rows based on the current row. For each row, a frame window is determined. On this frame, a calculation is made based on the rows in this frame. For every row, the calculation returns a value.
10+
11+
Because Spark uses SQL we also have window functions at our disposal. When we combine the power of DataFrames with window functions, we can create some unique optimized calculations!
12+
13+
# Repository
14+
15+
## Getting started
16+
17+
```bash
18+
# Create virtualenv
19+
python -m venv .venv
20+
21+
# Activate virtualenv
22+
. .venv/bin/activate
23+
24+
# Install dependencies
25+
pip install -r requirements.txt
26+
27+
# Run the code
28+
python most_recent.py
29+
```
30+
31+
The repository contains the following files:
32+
33+
## Aggregates Functions
34+
35+
### How to calculate a cumulative sum (running total) 📈
36+
37+
Very easy with a SQL window function! 👇🏻
38+
39+
[cumulative_sum.py](cumulative_sum.py)
40+
41+
### How to calculate a moving average 📈
42+
43+
Filter out the noise to determine the direction of a trend!
44+
45+
[moving_average.py](moving_average.py)
46+
47+
## Ranking Functions
48+
49+
### Select only the most recent records
50+
51+
Easy way to remove duplicate entries
52+
53+
[most_recent.py](most_recent.py)
54+
55+
### Break your dataset into equal groups
56+
57+
Rank each value in your dataset
58+
59+
[rank.py](rank.py)
60+
61+
## Value/Analytical Functions
62+
63+
### Calculate the difference from preceeding rows
64+
65+
Very easy to select preceeding or following rows
66+
67+
[difference.py](difference.py)
68+
69+
### Get the first and last value of the month
70+
71+
Quickly analyze the start and end of each month
72+
73+
[first_last.py](first_last.py)

cumulative_sum.py

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
from pyspark.sql import SparkSession
2+
from pyspark.sql import functions as F
3+
from pyspark.sql.window import Window
4+
5+
spark = SparkSession.builder.appName("Cumulative Sum").getOrCreate()
6+
7+
from pyspark.sql import functions as F
8+
from pyspark.sql.window import Window
9+
import matplotlib
10+
from datetime import datetime
11+
12+
13+
headers = ["date", "sales"]
14+
15+
data = [
16+
[datetime(2022, 1, 1), 100],
17+
[datetime(2022, 1, 2), 1543],
18+
[datetime(2022, 1, 3), 756],
19+
[datetime(2022, 1, 4), 2223],
20+
[datetime(2022, 1, 5), 765],
21+
[datetime(2022, 1, 6), 734],
22+
[datetime(2022, 1, 7), 762],
23+
[datetime(2022, 1, 8), 3422],
24+
[datetime(2022, 1, 9), 1500],
25+
[datetime(2022, 1, 10), 7332],
26+
[datetime(2022, 1, 11), 4200],
27+
[datetime(2022, 1, 12), 1121],
28+
[datetime(2022, 1, 13), 448],
29+
[datetime(2022, 1, 14), 1198],
30+
[datetime(2022, 1, 15), 1500],
31+
[datetime(2022, 1, 16), 4200],
32+
[datetime(2022, 1, 17), 1121],
33+
[datetime(2022, 1, 18), 448],
34+
[datetime(2022, 1, 19), 1198],
35+
[datetime(2022, 1, 20), 1198],
36+
[datetime(2022, 1, 21), 7653],
37+
[datetime(2022, 1, 22), 2345],
38+
[datetime(2022, 1, 23), 1246],
39+
[datetime(2022, 1, 24), 888],
40+
[datetime(2022, 1, 25), 2653],
41+
[datetime(2022, 1, 26), 8445],
42+
[datetime(2022, 1, 27), 1198],
43+
[datetime(2022, 1, 28), 3211],
44+
[datetime(2022, 1, 29), 2745],
45+
[datetime(2022, 1, 30), 1234],
46+
[datetime(2022, 1, 31), 6542],
47+
]
48+
df = spark.createDataFrame(data, headers).withColumn(
49+
"cumsum", F.sum("sales").over(Window.partitionBy().orderBy("date"))
50+
)
51+
df.show()
52+
# +-------------------+-----+------+
53+
# | date|sales|cumsum|
54+
# +-------------------+-----+------+
55+
# |2022-01-01 00:00:00| 100| 100|
56+
# |2022-01-02 00:00:00| 1543| 1643|
57+
# |2022-01-03 00:00:00| 756| 2399|
58+
# |2022-01-04 00:00:00| 2223| 4622|
59+
# |2022-01-05 00:00:00| 765| 5387|
60+
# |2022-01-06 00:00:00| 734| 6121|
61+
# |2022-01-07 00:00:00| 762| 6883|
62+
# |2022-01-08 00:00:00| 3422| 10305|
63+
# |2022-01-09 00:00:00| 1500| 11805|
64+
# |2022-01-10 00:00:00| 7332| 19137|
65+
# |2022-01-11 00:00:00| 4200| 23337|
66+
# |2022-01-12 00:00:00| 1121| 24458|
67+
# |2022-01-13 00:00:00| 448| 24906|
68+
# |2022-01-14 00:00:00| 1198| 26104|
69+
# |2022-01-15 00:00:00| 1500| 27604|
70+
# |2022-01-16 00:00:00| 4200| 31804|
71+
# |2022-01-17 00:00:00| 1121| 32925|
72+
# |2022-01-18 00:00:00| 448| 33373|
73+
# |2022-01-19 00:00:00| 1198| 34571|
74+
# |2022-01-20 00:00:00| 1198| 35769|
75+
# +-------------------+-----+------+
76+
# only showing top 20 rows
77+
78+
df.toPandas().plot.line(x="date", y=["sales", "cumsum"], rot=45)

difference.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
from pyspark.sql import SparkSession
2+
3+
spark = SparkSession.builder.appName("Differences with lag").getOrCreate()
4+
5+
from pyspark.sql import functions as F
6+
from pyspark.sql.window import Window
7+
from datetime import datetime
8+
9+
10+
headers = ["date", "product", "price"]
11+
12+
data = [
13+
[datetime(2022, 1, 10), "Bose Revolve+", 330],
14+
[datetime(2022, 1, 11), "JBL Partybox", 299],
15+
[datetime(2022, 1, 12), "Bose Revolve+", 299],
16+
[datetime(2022, 1, 12), "Sonos Move", 399],
17+
[datetime(2022, 1, 13), "JBL Partybox", 275],
18+
[datetime(2022, 2, 10), "Bose Revolve+", 360],
19+
[datetime(2022, 2, 12), "Sonos Move", 359],
20+
[datetime(2022, 2, 13), "JBL Partybox", 269],
21+
[datetime(2022, 2, 16), "Bose Revolve+", 330],
22+
]
23+
df = spark.createDataFrame(data, headers)
24+
25+
window_spec = Window.partitionBy("product").orderBy("date")
26+
27+
difference_df = (
28+
df.withColumn("previous_price", F.lag("price").over(window_spec))
29+
.filter(F.col("previous_price").isNotNull())
30+
.withColumn("difference", F.col("price") - F.col("previous_price"))
31+
)
32+
difference_df.show()
33+
# +-------------------+-------------+-----+--------------+----------+
34+
# | date| product|price|previous_price|difference|
35+
# +-------------------+-------------+-----+--------------+----------+
36+
# |2022-01-12 00:00:00|Bose Revolve+| 299| 330| -31|
37+
# |2022-02-10 00:00:00|Bose Revolve+| 360| 299| 61|
38+
# |2022-02-16 00:00:00|Bose Revolve+| 330| 360| -30|
39+
# |2022-01-13 00:00:00| JBL Partybox| 275| 299| -24|
40+
# |2022-02-13 00:00:00| JBL Partybox| 269| 275| -6|
41+
# |2022-02-12 00:00:00| Sonos Move| 359| 399| -40|
42+
# +-------------------+-------------+-----+--------------+----------+

first_last.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
from pyspark.sql import SparkSession
2+
3+
spark = SparkSession.builder.appName("First Last").getOrCreate()
4+
5+
from pyspark.sql import functions as F
6+
from pyspark.sql.window import Window
7+
from datetime import datetime
8+
9+
headers = ["date", "product", "price"]
10+
11+
data = [
12+
[datetime(2022, 1, 10), "Bose Revolve+", 330],
13+
[datetime(2022, 1, 11), "JBL Partybox", 299],
14+
[datetime(2022, 1, 12), "Bose Revolve+", 299],
15+
[datetime(2022, 1, 14), "Bose Revolve+", 399],
16+
[datetime(2022, 1, 18), "JBL Partybox", 300],
17+
[datetime(2022, 1, 29), "Bose Revolve+", 450],
18+
[datetime(2022, 1, 13), "JBL Partybox", 275],
19+
[datetime(2022, 2, 10), "Bose Revolve+", 360],
20+
[datetime(2022, 2, 13), "JBL Partybox", 269],
21+
[datetime(2022, 2, 10), "Bose Revolve+", 200],
22+
[datetime(2022, 2, 16), "Bose Revolve+", None],
23+
]
24+
df = spark.createDataFrame(data, headers)
25+
26+
window_spec = (
27+
Window.partitionBy("year", "month", "product")
28+
.orderBy("date")
29+
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
30+
)
31+
32+
first_last_df = (
33+
df.withColumn("year", F.year("date"))
34+
.withColumn("month", F.month("date"))
35+
.withColumn("first_value", F.first("price", ignorenulls=True).over(window_spec))
36+
.withColumn("last_value", F.last("price", ignorenulls=True).over(window_spec))
37+
.select(["year", "month", "product", "first_value", "last_value"])
38+
.distinct()
39+
)
40+
first_last_df.show()
41+
# +----+-----+-------------+-----------+----------+
42+
# |year|month| product|first_value|last_value|
43+
# +----+-----+-------------+-----------+----------+
44+
# |2022| 1|Bose Revolve+| 330| 450|
45+
# |2022| 1| JBL Partybox| 299| 300|
46+
# |2022| 2|Bose Revolve+| 360| 200|
47+
# |2022| 2| JBL Partybox| 269| 269|
48+
# +----+-----+-------------+-----------+----------+

most_recent.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
from pyspark.sql import SparkSession
2+
3+
spark = SparkSession.builder.appName("Most Recent Record").getOrCreate()
4+
5+
from pyspark.sql import functions as F
6+
from pyspark.sql.window import Window
7+
from datetime import datetime
8+
9+
headers = ["date", "product", "price"]
10+
11+
data = [
12+
[datetime(2022, 1, 10), "Bose Revolve+", 330],
13+
[datetime(2022, 1, 11), "JBL Partybox", 299],
14+
[datetime(2022, 1, 12), "Bose Revolve+", 299],
15+
[datetime(2022, 1, 12), "Sonos Move", 399],
16+
[datetime(2022, 1, 13), "JBL Partybox", 275],
17+
[datetime(2022, 2, 10), "Bose Revolve+", 360],
18+
[datetime(2022, 2, 12), "Sonos Move", 359],
19+
[datetime(2022, 2, 13), "JBL Partybox", 269],
20+
[datetime(2022, 2, 16), "Bose Revolve+", 330],
21+
]
22+
df = spark.createDataFrame(data, headers)
23+
24+
25+
product_window = Window.partitionBy("product").orderBy(F.col("date").desc())
26+
27+
recent_df = (
28+
df.withColumn("row_num", F.row_number().over(product_window))
29+
# We now have created a subsequent numbering based on date descending for every product
30+
# Simply filter the rows with row_num = 1 to select the most recent record
31+
.filter(F.col("row_num") == 1)
32+
# # We do not need the row_number column anymore, so we drop it
33+
.drop("row_num")
34+
)
35+
recent_df.show()
36+
# +-------------------+-------------+-----+
37+
# | date| product|price|
38+
# +-------------------+-------------+-----+
39+
# |2022-02-16 00:00:00|Bose Revolve+| 330|
40+
# |2022-02-13 00:00:00| JBL Partybox| 269|
41+
# |2022-02-12 00:00:00| Sonos Move| 359|
42+
# +-------------------+-------------+-----+

moving_average.py

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
from pyspark.sql import functions as F
2+
from pyspark.sql.window import Window
3+
from pyspark.sql import SparkSession
4+
from datetime import datetime
5+
6+
spark = SparkSession.builder.appName("Moving Average").getOrCreate()
7+
8+
headers = ["date", "sales"]
9+
10+
data = [
11+
[datetime(2022, 1, 1), 100],
12+
[datetime(2022, 1, 2), 1543],
13+
[datetime(2022, 1, 3), 756],
14+
[datetime(2022, 1, 4), 2223],
15+
[datetime(2022, 1, 5), 765],
16+
[datetime(2022, 1, 6), 734],
17+
[datetime(2022, 1, 7), 762],
18+
[datetime(2022, 1, 8), 3422],
19+
[datetime(2022, 1, 9), 1500],
20+
[datetime(2022, 1, 10), 7332],
21+
[datetime(2022, 1, 11), 4200],
22+
[datetime(2022, 1, 12), 1121],
23+
[datetime(2022, 1, 13), 448],
24+
[datetime(2022, 1, 14), 1198],
25+
[datetime(2022, 1, 15), 1500],
26+
[datetime(2022, 1, 16), 4200],
27+
[datetime(2022, 1, 17), 1121],
28+
[datetime(2022, 1, 18), 448],
29+
[datetime(2022, 1, 19), 1198],
30+
[datetime(2022, 1, 20), 1198],
31+
[datetime(2022, 1, 21), 7653],
32+
[datetime(2022, 1, 22), 2345],
33+
[datetime(2022, 1, 23), 1246],
34+
[datetime(2022, 1, 24), 888],
35+
[datetime(2022, 1, 25), 2653],
36+
[datetime(2022, 1, 26), 8445],
37+
[datetime(2022, 1, 27), 1198],
38+
[datetime(2022, 1, 28), 3211],
39+
[datetime(2022, 1, 29), 2745],
40+
[datetime(2022, 1, 30), 1234],
41+
[datetime(2022, 1, 31), 6542],
42+
]
43+
44+
days = lambda i: i * 86400
45+
46+
moving_7_day_window = Window.orderBy(
47+
F.col("date").cast("timestamp").cast("long")
48+
).rangeBetween(-days(7), Window.currentRow)
49+
50+
df = spark.createDataFrame(data, headers).withColumn(
51+
"mov_avg", F.avg("sales").over(moving_7_day_window)
52+
)
53+
54+
df.show()
55+
# +-------------------+-----+------------------+
56+
# | date|sales| mov_avg|
57+
# +-------------------+-----+------------------+
58+
# |2022-01-01 00:00:00| 100| 100.0|
59+
# |2022-01-02 00:00:00| 1543| 821.5|
60+
# |2022-01-03 00:00:00| 756| 799.6666666666666|
61+
# |2022-01-04 00:00:00| 2223| 1155.5|
62+
# |2022-01-05 00:00:00| 765| 1077.4|
63+
# |2022-01-06 00:00:00| 734|1020.1666666666666|
64+
# |2022-01-07 00:00:00| 762| 983.2857142857143|
65+
# |2022-01-08 00:00:00| 3422| 1288.125|
66+
# |2022-01-09 00:00:00| 1500| 1463.125|
67+
# |2022-01-10 00:00:00| 7332| 2186.75|
68+
# |2022-01-11 00:00:00| 4200| 2617.25|
69+
# |2022-01-12 00:00:00| 1121| 2479.5|
70+
# |2022-01-13 00:00:00| 448| 2439.875|
71+
# |2022-01-14 00:00:00| 1198| 2497.875|
72+
# |2022-01-15 00:00:00| 1500| 2590.125|
73+
# |2022-01-16 00:00:00| 4200| 2687.375|
74+
# |2022-01-17 00:00:00| 1121| 2640.0|
75+
# |2022-01-18 00:00:00| 448| 1779.5|
76+
# |2022-01-19 00:00:00| 1198| 1404.25|
77+
# |2022-01-20 00:00:00| 1198| 1413.875|
78+
# +-------------------+-----+------------------+
79+
# only showing top 20 rows
80+
81+
df.toPandas().plot.line(x="date", y=["sales", "mov_avg"], rot=45)

0 commit comments

Comments
 (0)