Skip to content

Commit 920f697

Browse files
[FEATURE] Integrate prototypes into pipeline
Fixes #6
1 parent ae79228 commit 920f697

File tree

10 files changed

+785
-68
lines changed

10 files changed

+785
-68
lines changed

conf/aggregation/aggregation.yaml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
daily:
2+
function: "numpy.mean"
3+
string: mean
4+
5+
monthly:
6+
function: "numpy.mean"
7+
string: mean
8+
9+
variable: ['t2m', 'd2m']

conf/config.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
defaults:
22
- _self_
33
- datapaths: datapaths
4+
- aggregation: aggregation
45

56
development_mode: false
67

@@ -20,7 +21,7 @@ query:
2021
# check precipitation
2122
# variable: ["2m_dewpoint_temperature", "2m_temperature", "skin_temperature", "total_precipitation"]
2223
variable: ["2m_dewpoint_temperature", "2m_temperature"]
23-
year: [2010, 2011, 2012, 2013, 2014, 2015, 2016, 2017, 2018, 2019, 2020, 2021, 2022, 2023, 2024]
24+
year: [2009, 2010, 2011, 2012, 2013, 2014, 2015, 2016, 2017, 2018, 2019, 2020, 2021, 2022, 2023, 2024]
2425
month: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
2526
day: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31]
2627
time: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23]

notes/00_core.ipynb

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@
4545
"from pydrive2.auth import GoogleAuth\n",
4646
"from pydrive2.drive import GoogleDrive\n",
4747
"from omegaconf import DictConfig, OmegaConf\n",
48-
"from pyprojroot import here"
48+
"from pyprojroot import here\n",
49+
"from importlib import import_module\n"
4950
]
5051
},
5152
{
@@ -98,6 +99,20 @@
9899
" return path"
99100
]
100101
},
102+
{
103+
"cell_type": "code",
104+
"execution_count": null,
105+
"metadata": {},
106+
"outputs": [],
107+
"source": [
108+
"#| exporti\n",
109+
"def _get_callable(func_path):\n",
110+
" \"\"\"Dynamically import a callable from a string path.\"\"\"\n",
111+
" module_name, func_name = func_path.rsplit(\".\", 1)\n",
112+
" module = import_module(module_name)\n",
113+
" return getattr(module, func_name)"
114+
]
115+
},
101116
{
102117
"cell_type": "code",
103118
"execution_count": null,

notes/02_aggregate.ipynb

Lines changed: 590 additions & 50 deletions
Large diffs are not rendered by default.

snakefile

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,17 @@ with hydra.initialize(config_path="conf", version_base=None):
1414
cfg = hydra.compose(config_name="config", overrides=[])
1515
print(OmegaConf.to_yaml(cfg))
1616

17-
# read list of years
17+
# read list of variables to parellelize over
1818
years_cfg = OmegaConf.to_container(cfg.query.year, resolve=True)
1919
months_cfg = OmegaConf.to_container(cfg.query.month, resolve=True)
20-
#day_cfg = OmegaConf.to_container(cfg.query.day, resolve=True)
21-
#time_cfg = OmegaConf.to_container(cfg.query.time, resolve=True)
20+
variable_cfg = OmegaConf.to_container(cfg.query.variable, resolve=True)
21+
agg_variable_cfg = OmegaConf.to_container(cfg.aggregation.variable, resolve=True)
2222

2323
rule all:
2424
input:
25-
expand(data_dir / "input/{year}_{month}.nc", year=years_cfg, month=months_cfg)
25+
expand(data_dir / "input/{year}_{month}.nc", year=years_cfg, month=months_cfg),#, variable=variable_cfg)
26+
expand(data_dir / "intermediate/environmental_exposure-era5_healthshed_{variable}_{year}_{month}.parquet",
27+
variable=agg_variable_cfg, year=years_cfg, month=months_cfg)
2628

2729
rule test_api:
2830
output:
@@ -37,3 +39,13 @@ rule download_raw_era5:
3739
"""
3840
python src/era5_sandbox/download.py "++query.year={wildcards.year}" "++query.month={wildcards.month}"
3941
"""
42+
43+
rule spatial_aggregate_raw_era5:
44+
input:
45+
data_dir / "input/{year}_{month}.nc"
46+
output:
47+
data_dir / "intermediate/environmental_exposure-era5_healthshed_{variable}_{year}_{month}.nc"
48+
params:
49+
variable="{variable}"
50+
script:
51+
"src/era5_sandbox/aggregate.py"
0 Bytes
Binary file not shown.
464 Bytes
Binary file not shown.

src/era5_sandbox/_modidx.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,11 @@
1212
'era5_sandbox/aggregate.py'),
1313
'era5_sandbox.aggregate.RasterFile.shape': ( 'aggregate.html#rasterfile.shape',
1414
'era5_sandbox/aggregate.py'),
15+
'era5_sandbox.aggregate.aggregate_data': ( 'aggregate.html#aggregate_data',
16+
'era5_sandbox/aggregate.py'),
17+
'era5_sandbox.aggregate.aggregate_to_healthsheds': ( 'aggregate.html#aggregate_to_healthsheds',
18+
'era5_sandbox/aggregate.py'),
19+
'era5_sandbox.aggregate.main': ('aggregate.html#main', 'era5_sandbox/aggregate.py'),
1520
'era5_sandbox.aggregate.netcdf_to_tiff': ( 'aggregate.html#netcdf_to_tiff',
1621
'era5_sandbox/aggregate.py'),
1722
'era5_sandbox.aggregate.polygon_to_raster_cells': ( 'aggregate.html#polygon_to_raster_cells',
@@ -28,6 +33,7 @@
2833
'era5_sandbox.core._create_directory_structure': ( 'core.html#_create_directory_structure',
2934
'era5_sandbox/core.py'),
3035
'era5_sandbox.core._expand_path': ('core.html#_expand_path', 'era5_sandbox/core.py'),
36+
'era5_sandbox.core._get_callable': ('core.html#_get_callable', 'era5_sandbox/core.py'),
3137
'era5_sandbox.core.describe': ('core.html#describe', 'era5_sandbox/core.py'),
3238
'era5_sandbox.core.main': ('core.html#main', 'era5_sandbox/core.py'),
3339
'era5_sandbox.core.testAPI': ('core.html#testapi', 'era5_sandbox/core.py')},

src/era5_sandbox/aggregate.py

Lines changed: 131 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,33 @@
11
# AUTOGENERATED! DO NOT EDIT! File to edit: ../../notes/02_aggregate.ipynb.
22

33
# %% auto 0
4-
__all__ = ['resample_netcdf', 'RasterFile', 'netcdf_to_tiff', 'polygon_to_raster_cells']
4+
__all__ = ['resample_netcdf', 'RasterFile', 'netcdf_to_tiff', 'polygon_to_raster_cells', 'aggregate_to_healthsheds',
5+
'aggregate_data', 'main']
56

67
# %% ../../notes/02_aggregate.ipynb 4
78
import tempfile
89
import rasterio
10+
import hydra
11+
import argparse
12+
13+
import pandas as pd
14+
import geopandas as gpd
915
import numpy as np
1016
import xarray as xr
1117
import matplotlib.pyplot as plt
12-
import cartopy.crs as ccrs
13-
import cartopy.feature as cfeature
18+
1419
from dataclasses import dataclass, field
1520
from typing import Optional, Tuple
1621
from pyprojroot import here
1722
from hydra import initialize, compose
18-
from omegaconf import OmegaConf
23+
from omegaconf import OmegaConf, DictConfig
1924
from tqdm import tqdm
2025
from math import ceil, floor
2126
from rasterstats.io import Raster
2227
from rasterstats.utils import boxify_points, rasterize_geom
2328

24-
try: from era5_sandbox.core import GoogleDriver
25-
except: from core import GoogleDriver
29+
try: from era5_sandbox.core import GoogleDriver, _get_callable, describe
30+
except: from core import GoogleDriver, _get_callable, describe
2631

2732
# %% ../../notes/02_aggregate.ipynb 8
2833
def resample_netcdf(
@@ -58,6 +63,7 @@ class RasterFile:
5863
data: Optional[np.ndarray] = field(default=None, init=False)
5964
transform: Optional[rasterio.Affine] = field(default=None, init=False)
6065
crs: Optional[str] = field(default=None, init=False)
66+
nodata: Optional[float] = field(default=None, init=False)
6167
bounds: Optional[Tuple[float, float, float, float]] = field(default=None, init=False)
6268

6369
def load(self):
@@ -66,6 +72,7 @@ def load(self):
6672
self.data = src.read(1) # first band
6773
self.transform = src.transform
6874
self.crs = src.crs
75+
self.nodata = src.nodata
6976
self.bounds = src.bounds
7077
return self
7178

@@ -180,3 +187,121 @@ def polygon_to_raster_cells(
180187
cell_map.append(indices)
181188

182189
return cell_map
190+
191+
# %% ../../notes/02_aggregate.ipynb 25
192+
def aggregate_to_healthsheds(
193+
res_poly2cell: list, # the result of polygon_to_raster_cells
194+
raster: RasterFile, # the raster data
195+
shapes: gpd.GeoDataFrame, # the shapes of the health sheds
196+
names_column: str = "fs_uid", # the unique identifier column name of the health sheds
197+
aggregation_func: callable = np.nanmean, # the aggregation function
198+
aggregation_name: str = "mean" # the name of the aggregation function
199+
) -> gpd.GeoDataFrame:
200+
"""
201+
Aggregate the raster data to the health sheds.
202+
"""
203+
204+
stats = []
205+
206+
for indices in res_poly2cell:
207+
if len(indices[0]) == 0:
208+
# no cells found for this polygon
209+
stats.append(np.nan)
210+
else:
211+
cells = raster.data[indices]
212+
if sum(~np.isnan(cells)) == 0:
213+
# no valid cells found for this polygon
214+
stats.append(np.nan)
215+
continue
216+
else:
217+
# compute MEAN of valid cells
218+
# but this stat can be ANYTHING
219+
stats.append(aggregation_func(cells))
220+
221+
# clean up the result into a dataframe
222+
stats = pd.Series(stats)
223+
shapes[aggregation_name] = stats
224+
df = pd.DataFrame(
225+
{"healthshed": shapes[names_column], aggregation_name: stats}
226+
)
227+
gdf = gpd.GeoDataFrame(df, geometry=shapes.geometry.values, crs=shapes.crs)
228+
return gdf
229+
230+
231+
# %% ../../notes/02_aggregate.ipynb 35
232+
def aggregate_data(
233+
cfg: DictConfig, # hydra configuration file
234+
input_file: str, # path to the input file
235+
output_file: str, # path to the output file
236+
exposure_variable: str # the variable to aggregate
237+
)->None:
238+
'''
239+
Run the agggregation step of the pipeline.
240+
241+
Note, this function is the second step in the snakemake
242+
pipeline. This means that in order to define the input
243+
file, we use the snakemake.input and snakemake.output variables
244+
injected into the runtime by snakemake.
245+
'''
246+
247+
if cfg.development_mode:
248+
describe(cfg)
249+
return None
250+
251+
# get the healthshed shapefile
252+
driver = GoogleDriver(json_key_path=here() / cfg.GOOGLE_DRIVE_AUTH_JSON.path)
253+
drive = driver.get_drive()
254+
healthsheds = driver.read_healthsheds(cfg.GOOGLE_DRIVE_AUTH_JSON.healthsheds_id)
255+
256+
# get the aggregation configuration
257+
# exposure_variable = cfg.aggregation.variable
258+
agg_func = _get_callable(cfg.aggregation.daily.function)
259+
260+
resampled_nc_file = resample_netcdf(input_file, agg_func=agg_func)
261+
262+
resampled_tiff = netcdf_to_tiff(
263+
ds=resampled_nc_file,
264+
variable=exposure_variable,
265+
crs="EPSG:4326"
266+
)
267+
268+
# run the polygon to raster cell function
269+
result_poly2cell=polygon_to_raster_cells(
270+
vectors = healthsheds.geometry.values, # the geometries of the shapefile of the regions
271+
raster=resampled_tiff.data, # the raster data above
272+
band=1, # the value of the day that we're using
273+
nodata=resampled_tiff.nodata, # any intersections with no data, may have to be np.nan
274+
affine=resampled_tiff.transform, # some math thing need to revise
275+
all_touched=True,
276+
verbose=True
277+
)
278+
279+
result = aggregate_to_healthsheds(
280+
res_poly2cell=result_poly2cell,
281+
raster=resampled_tiff,
282+
shapes=healthsheds,
283+
names_column="fs_uid",
284+
aggregation_func=agg_func,
285+
aggregation_name=exposure_variable
286+
)
287+
288+
# Save the result to a file
289+
result.to_parquet(output_file)
290+
291+
# %% ../../notes/02_aggregate.ipynb 36
292+
@hydra.main(version_base=None, config_path="../../conf", config_name="config")
293+
def main(cfg: DictConfig) -> None:
294+
# Parse command-line arguments
295+
input_file = str(snakemake.input[0]) # First input file
296+
output_file = str(snakemake.output[0])
297+
aggregation_variable = str(snakemake.params.variable)
298+
299+
aggregate_data(cfg, input_file=input_file, output_file=output_file, exposure_variable=aggregation_variable)
300+
301+
# %% ../../notes/02_aggregate.ipynb 37
302+
#| eval: false
303+
try: from nbdev.imports import IN_NOTEBOOK
304+
except: IN_NOTEBOOK=False
305+
306+
if __name__ == "__main__" and not IN_NOTEBOOK:
307+
main()

src/era5_sandbox/core.py

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
from pydrive2.drive import GoogleDrive
1717
from omegaconf import DictConfig, OmegaConf
1818
from pyprojroot import here
19+
from importlib import import_module
20+
1921

2022
# %% ../../notes/00_core.ipynb 5
2123
def describe(
@@ -45,6 +47,13 @@ def _expand_path(
4547
return path
4648

4749
# %% ../../notes/00_core.ipynb 7
50+
def _get_callable(func_path):
51+
"""Dynamically import a callable from a string path."""
52+
module_name, func_name = func_path.rsplit(".", 1)
53+
module = import_module(module_name)
54+
return getattr(module, func_name)
55+
56+
# %% ../../notes/00_core.ipynb 8
4857
def _create_directory_structure(
4958
base_path: str, # The base directory where the structure will be created
5059
structure: dict # A dictionary representing the directory structure
@@ -65,7 +74,7 @@ def _create_directory_structure(
6574
if isinstance(substructure, dict):
6675
_create_directory_structure(current_path, substructure)
6776

68-
# %% ../../notes/00_core.ipynb 9
77+
# %% ../../notes/00_core.ipynb 10
6978
class GoogleDriver:
7079
"""
7180
A class to handle Google Drive authentication and file management.
@@ -96,10 +105,10 @@ def _authenticate(self):
96105
def get_drive(self):
97106
return self.drive
98107

99-
# %% ../../notes/00_core.ipynb 18
108+
# %% ../../notes/00_core.ipynb 19
100109
from fastcore.basics import patch
101110

102-
# %% ../../notes/00_core.ipynb 19
111+
# %% ../../notes/00_core.ipynb 20
103112
@patch
104113
def read_healthsheds(self:GoogleDriver, healthshed_zip_name):
105114

@@ -122,7 +131,7 @@ def read_healthsheds(self:GoogleDriver, healthshed_zip_name):
122131

123132
return gdf
124133

125-
# %% ../../notes/00_core.ipynb 23
134+
# %% ../../notes/00_core.ipynb 24
126135
def testAPI(
127136
cfg: DictConfig=None,
128137
dataset:str="reanalysis-era5-pressure-levels"
@@ -167,7 +176,7 @@ def testAPI(
167176
print("Error: {}".format(e))
168177
return False
169178

170-
# %% ../../notes/00_core.ipynb 27
179+
# %% ../../notes/00_core.ipynb 28
171180
@hydra.main(version_base=None, config_path="../../conf", config_name="config")
172181
def main(cfg: DictConfig) -> None:
173182

@@ -177,7 +186,7 @@ def main(cfg: DictConfig) -> None:
177186
# test the api
178187
testAPI(cfg=cfg)
179188

180-
# %% ../../notes/00_core.ipynb 28
189+
# %% ../../notes/00_core.ipynb 29
181190
#| eval: false
182191
try: from nbdev.imports import IN_NOTEBOOK
183192
except: IN_NOTEBOOK=False

0 commit comments

Comments
 (0)