Skip to content

Commit e69c5df

Browse files
committed
Tweaks to time-series.pymd to lower memory pressure and remove SQL.
1 parent bac4f07 commit e69c5df

File tree

5 files changed

+31
-28
lines changed

5 files changed

+31
-28
lines changed

core/src/main/scala/org/locationtech/rasterframes/RasterFunctions.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,11 @@ trait RasterFunctions {
280280
udf(F.rasterize(_: Geometry, _: Geometry, _: Int, cols, rows)).apply(geometry, bounds, value)
281281
)
282282

283+
def rf_rasterize(geometry: Column, bounds: Column, value: Column, cols: Column, rows: Column): TypedColumn[Any, Tile] =
284+
withTypedAlias("rf_rasterize", geometry)(
285+
udf(F.rasterize).apply(geometry, bounds, value, cols, rows)
286+
)
287+
283288
/** Reproject a column of geometry from one CRS to another.
284289
* @param sourceGeom Geometry column to reproject
285290
* @param srcCRS Native CRS of `sourceGeom` as a literal

pyrasterframes/src/main/python/docs/time-series.pymd

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ from IPython.display import display
77
import pyrasterframes
88
from pyrasterframes.rasterfunctions import *
99
import pyrasterframes.rf_ipython
10-
11-
spark = pyrasterframes.get_spark_session()
10+
# This job is more memory bound, so reduce the concurrent tasks.
11+
spark = pyrasterframes.get_spark_session("local[4]")
1212
```
1313

1414
In this example, we will show how the flexibility of the DataFrame concept for raster data allows a simple and intuitive way to extract a time series from Earth observation data. We will start with our @ref:[built-in MODIS data catalog](raster-catalogs.md#using-built-in-experimental-catalogs).
@@ -35,6 +35,7 @@ park_geo = park_df[park_df.UNIT_CODE=='CUVA'].geometry[0]
3535
park_geo.wkt[:100]
3636
```
3737

38+
3839
The entire park boundary is contained in MODIS granule h11 v4. We will simply filter on this granule, rather than using a @ref:[spatial relation](vector-data.md#geomesa-functions-and-spatial-relations). The time period selected should show the change in plant vigor as leaves emerge over the spring and into early summer.
3940

4041
```python query_catalog
@@ -53,34 +54,30 @@ Now we have a catalog with several months of MODIS data for a single granule. Ho
5354

5455
```python read_catalog
5556
raster_cols = ['B01', 'B02',] # red and near-infrared respectively
56-
park_rf = spark.read.raster(catalog=park_cat.select(['acquisition_date', 'granule_id'] + raster_cols),
57-
catalog_col_names=raster_cols,
58-
) \
59-
.withColumn('park', st_geomFromWKT(lit(park_geo.wkt)))
60-
park_rf.createOrReplaceTempView('catalog_read')
57+
park_rf = spark.read.raster(
58+
catalog=park_cat.select(['acquisition_date', 'granule_id'] + raster_cols),
59+
catalog_col_names=raster_cols
60+
) \
61+
.withColumn('park', st_geomFromWKT(lit(park_geo.wkt)))
6162
park_rf.printSchema()
63+
park_rf.persist()
6264
```
6365

6466
## Vector and Raster Data Interaction
6567

6668
Now we have the vector representation of the park boundary alongside the _tiles_ of red and near infrared bands. Next, we need to create a _tile_ representation of the park to allow us to limit the time series analysis to pixels within the park. This is similar to the masking operation demonstrated in @ref:[NoData handling](nodata-handling.md#masking).
6769

68-
We do this using two SQL transformations. The first one will reproject the park boundary from coordinates to the MODIS sinusoidal projection. The second one will create a new _tile_ aligned with the imagery containing a value of 1 where the pixels are contained within the park and NoData elsewhere. The code demonstrates that we can use any `rf_` or `st_` function in SQL. For a more thorough demonstration of how Python and SQL codes compare in RasterFrames, see the @ref:[language support section](languages.md).
70+
We do this using two transformations. The first one will reproject the park boundary from coordinates to the MODIS sinusoidal projection. The second one will create a new _tile_ aligned with the imagery containing a value of 1 where the pixels are contained within the park and NoData elsewhere.
6971

7072
```python burn_in
71-
cr_1 = spark.sql("""
72-
SELECT *,
73-
st_reproject(park, 'EPSG:4326', rf_crs(B01).crsProj4) as park_native
74-
FROM catalog_read
75-
""")
76-
cr_1.createOrReplaceTempView('cr_1')
77-
78-
cr_2 = spark.sql("""
79-
SELECT *,
80-
rf_rasterize(park_native, rf_geometry(B01), 1, rf_dimensions(B01).cols, rf_dimensions(B01).rows) as park_tile
81-
FROM cr_1
82-
""").where(rf_tile_sum('park_tile') > 0)
73+
cr_1 = park_rf.withColumn('park_native', st_reproject('park', lit('EPSG:4326'), rf_crs('B01')))
74+
75+
cr_2 = cr_1 \
76+
.withColumn('dims', rf_dimensions('B01')) \
77+
.withColumn('park_tile', rf_rasterize('park_native', rf_geometry('B01'), lit(1), 'dims.cols', 'dims.rows')) \
78+
.where(rf_tile_sum('park_tile') > 0)
8379
cr_2.printSchema()
80+
cr_2.persist()
8481
```
8582

8683
## Create Time Series
@@ -100,7 +97,8 @@ time_series = rf_ndvi \
10097
.groupby(year('acquisition_date').alias('year'), weekofyear('acquisition_date').alias('week')) \
10198
.agg(sql_sum('ndvi_wt').alias('ndvi_wt'), sql_sum('wt').alias('wt')) \
10299
.withColumn('ndvi', col('ndvi_wt') / col('wt'))
103-
100+
time_series.printSchema()
101+
time_series.persist()
104102
```
105103

106104
Finally, we will take a look at the NDVI over time.

pyrasterframes/src/main/python/pyrasterframes/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,11 @@ def _kryo_init(builder):
5555
return builder
5656

5757

58-
def get_spark_session():
58+
def get_spark_session(master="local[*]"):
5959
""" Create a SparkSession with pyrasterframes enabled and configured. """
6060
from pyrasterframes.utils import create_rf_spark_session
6161

62-
return create_rf_spark_session()
62+
return create_rf_spark_session(master)
6363

6464

6565
def _convert_df(df, sp_key=None, metadata=None):

pyrasterframes/src/main/python/pyrasterframes/rasterfunctions.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,11 +91,11 @@ def rf_make_ones_tile(num_cols, num_rows, cell_type=CellType.float64()):
9191
return Column(jfcn(num_cols, num_rows, _parse_cell_type(cell_type)))
9292

9393

94-
def rf_rasterize(geometry_col, bounds_col, value_col, num_cols, num_rows):
94+
def rf_rasterize(geometry_col, bounds_col, value_col, num_cols_col, num_rows_col):
9595
"""Create a tile where cells in the grid defined by cols, rows, and bounds are filled with the given value."""
9696
jfcn = RFContext.active().lookup('rf_rasterize')
97-
return Column(jfcn(_to_java_column(geometry_col), _to_java_column(bounds_col), _to_java_column(value_col), num_cols,
98-
num_rows))
97+
return Column(jfcn(_to_java_column(geometry_col), _to_java_column(bounds_col),
98+
_to_java_column(value_col), _to_java_column(num_cols_col), _to_java_column(num_rows_col)))
9999

100100

101101
def st_reproject(geometry_col, src_crs, dst_crs):

pyrasterframes/src/main/python/pyrasterframes/utils.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,12 +76,12 @@ def find_pyrasterframes_assembly():
7676
return jarpath[0]
7777

7878

79-
def create_rf_spark_session():
79+
def create_rf_spark_session(master="local[*]"):
8080
""" Create a SparkSession with pyrasterframes enabled and configured. """
8181
jar_path = find_pyrasterframes_assembly()
8282

8383
spark = (SparkSession.builder
84-
.master("local[*]")
84+
.master(master)
8585
.appName("RasterFrames")
8686
.config('spark.jars', jar_path)
8787
.withKryoSerialization()

0 commit comments

Comments
 (0)