Skip to content

Commit aecf4d6

Browse files
authored
Forecasting model parallelization (#511)
2 parents 7f5d5ab + bccdd30 commit aecf4d6

File tree

6 files changed

+290
-190
lines changed

6 files changed

+290
-190
lines changed

ads/opctl/operator/lowcode/forecast/model/arima.py

Lines changed: 68 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import pandas as pd
88
import numpy as np
99
import pmdarima as pm
10+
from joblib import Parallel, delayed
1011

1112
from ads.opctl import logger
1213

@@ -29,32 +30,31 @@ def __init__(self, config: ForecastOperatorConfig, datasets: ForecastDatasets):
2930
self.formatted_global_explanation = None
3031
self.formatted_local_explanation = None
3132

32-
def _build_model(self) -> pd.DataFrame:
33-
full_data_dict = self.datasets.full_data_dict
34-
35-
# Extract the Confidence Interval Width and convert to arima's equivalent - alpha
36-
if self.spec.confidence_interval_width is None:
37-
self.spec.confidence_interval_width = 1 - self.spec.model_kwargs.get(
38-
"alpha", 0.05
39-
)
40-
model_kwargs = self.spec.model_kwargs
41-
model_kwargs["alpha"] = 1 - self.spec.confidence_interval_width
42-
if "error_action" not in model_kwargs.keys():
43-
model_kwargs["error_action"] = "ignore"
33+
def _train_model(self, i, target, df):
34+
"""Trains the ARIMA model for a given target.
4435
45-
models = []
46-
self.datasets.datetime_col = self.spec.datetime_column.name
47-
self.forecast_output = ForecastOutput(
48-
confidence_interval_width=self.spec.confidence_interval_width
49-
)
36+
Parameters
37+
----------
38+
i: int
39+
The index of the target
40+
target: str
41+
The name of the target
42+
df: pd.DataFrame
43+
The dataframe containing the target data
44+
"""
45+
try:
46+
# Extract the Confidence Interval Width and convert to arima's equivalent - alpha
47+
if self.spec.confidence_interval_width is None:
48+
self.spec.confidence_interval_width = 1 - self.spec.model_kwargs.get(
49+
"alpha", 0.05
50+
)
51+
model_kwargs = self.spec.model_kwargs
52+
model_kwargs["alpha"] = 1 - self.spec.confidence_interval_width
53+
if "error_action" not in model_kwargs.keys():
54+
model_kwargs["error_action"] = "ignore"
5055

51-
outputs = dict()
52-
outputs_legacy = []
53-
fitted_values = dict()
54-
actual_values = dict()
55-
dt_columns = dict()
56+
# models = []
5657

57-
for i, (target, df) in enumerate(full_data_dict.items()):
5858
# format the dataframe for this target. Dropping NA on target[df] will remove all future data
5959
le, df_encoded = utils._label_encode_dataframe(
6060
df, no_encode={self.spec.datetime_column.name, target}
@@ -72,9 +72,7 @@ def _build_model(self) -> pd.DataFrame:
7272
target,
7373
self.spec.datetime_column.name,
7474
}
75-
logger.debug(
76-
f"Additional Regressors Detected {list(additional_regressors)}"
77-
)
75+
logger.debug(f"Additional Regressors Detected {list(additional_regressors)}")
7876

7977
# Split data into X and y for arima tune method
8078
y = data_i[target]
@@ -85,19 +83,17 @@ def _build_model(self) -> pd.DataFrame:
8583
# Build and fit model
8684
model = pm.auto_arima(y=y, X=X_in, **self.spec.model_kwargs)
8785

88-
fitted_values[target] = model.predict_in_sample(X=X_in)
89-
actual_values[target] = y
90-
actual_values[target].index = pd.to_datetime(y.index)
86+
self.fitted_values[target] = model.predict_in_sample(X=X_in)
87+
self.actual_values[target] = y
88+
self.actual_values[target].index = pd.to_datetime(y.index)
9189

9290
# Build future dataframe
9391
start_date = y.index.values[-1]
9492
n_periods = self.spec.horizon
9593
if len(additional_regressors):
9694
X = df_clean[df_clean[target].isnull()].drop(target, axis=1)
9795
else:
98-
X = pd.date_range(
99-
start=start_date, periods=n_periods, freq=self.spec.freq
100-
)
96+
X = pd.date_range(start=start_date, periods=n_periods, freq=self.spec.freq)
10197

10298
# Predict and format forecast
10399
yhat, conf_int = model.predict(
@@ -108,7 +104,7 @@ def _build_model(self) -> pd.DataFrame:
108104
)
109105
yhat_clean = pd.DataFrame(yhat, index=yhat.index, columns=["yhat"])
110106

111-
dt_columns[target] = df_encoded[self.spec.datetime_column.name]
107+
self.dt_columns[target] = df_encoded[self.spec.datetime_column.name]
112108
conf_int_clean = pd.DataFrame(
113109
conf_int, index=yhat.index, columns=["yhat_lower", "yhat_upper"]
114110
)
@@ -117,15 +113,42 @@ def _build_model(self) -> pd.DataFrame:
117113
logger.debug(forecast[["yhat", "yhat_lower", "yhat_upper"]].tail())
118114

119115
# Collect all outputs
120-
models.append(model)
121-
outputs_legacy.append(
116+
# models.append(model)
117+
self.outputs_legacy.append(
122118
forecast.reset_index().rename(columns={"index": "ds"})
123119
)
124-
outputs[target] = forecast
120+
self.outputs[target] = forecast
121+
122+
self.models_dict[target] = model
125123

126-
self.models = models
124+
logger.debug("===========Done===========")
125+
except Exception as e:
126+
self.errors_dict[target] = {"model_name": self.spec.model, "error": str(e)}
127+
128+
def _build_model(self) -> pd.DataFrame:
129+
full_data_dict = self.datasets.full_data_dict
130+
131+
self.datasets.datetime_col = self.spec.datetime_column.name
132+
self.forecast_output = ForecastOutput(
133+
confidence_interval_width=self.spec.confidence_interval_width
134+
)
135+
136+
self.outputs = dict()
137+
self.outputs_legacy = []
138+
self.fitted_values = dict()
139+
self.actual_values = dict()
140+
self.dt_columns = dict()
141+
self.models_dict = dict()
142+
self.errors_dict = dict()
143+
144+
Parallel(n_jobs=-1, require="sharedmem")(
145+
delayed(ArimaOperatorModel._train_model)(self, i, target, df)
146+
for self, (i, (target, df)) in zip(
147+
[self] * len(full_data_dict), enumerate(full_data_dict.items())
148+
)
149+
)
127150

128-
logger.debug("===========Done===========")
151+
self.models = [self.models_dict[target] for target in self.target_columns]
129152

130153
# Merge the outputs from each model into 1 df with all outputs by target and category
131154
col = self.original_target_column
@@ -134,15 +157,15 @@ def _build_model(self) -> pd.DataFrame:
134157
yhat_lower_name = ForecastOutputColumns.LOWER_BOUND
135158
for cat in self.categories:
136159
output_i = pd.DataFrame()
137-
output_i["Date"] = dt_columns[f"{col}_{cat}"]
160+
output_i["Date"] = self.dt_columns[f"{col}_{cat}"]
138161
output_i["Series"] = cat
139162
output_i = output_i.set_index("Date")
140163

141-
output_i["input_value"] = actual_values[f"{col}_{cat}"]
142-
output_i["fitted_value"] = fitted_values[f"{col}_{cat}"]
143-
output_i["forecast_value"] = outputs[f"{col}_{cat}"]["yhat"]
144-
output_i[yhat_upper_name] = outputs[f"{col}_{cat}"]["yhat_upper"]
145-
output_i[yhat_lower_name] = outputs[f"{col}_{cat}"]["yhat_lower"]
164+
output_i["input_value"] = self.actual_values[f"{col}_{cat}"]
165+
output_i["fitted_value"] = self.fitted_values[f"{col}_{cat}"]
166+
output_i["forecast_value"] = self.outputs[f"{col}_{cat}"]["yhat"]
167+
output_i[yhat_upper_name] = self.outputs[f"{col}_{cat}"]["yhat_upper"]
168+
output_i[yhat_lower_name] = self.outputs[f"{col}_{cat}"]["yhat_lower"]
146169

147170
output_i = output_i.reset_index(drop=False)
148171
output_col = pd.concat([output_col, output_i])
@@ -252,7 +275,7 @@ def _custom_predict_arima(self, data):
252275
253276
"""
254277
date_col = self.spec.datetime_column.name
255-
data[date_col] = pd.to_datetime(data[date_col], unit='s')
278+
data[date_col] = pd.to_datetime(data[date_col], unit="s")
256279
data = data.set_index(date_col)
257280
# Get the index of the current series id
258281
series_index = self.target_columns.index(self.series_id)

ads/opctl/operator/lowcode/forecast/model/automlx.py

Lines changed: 82 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,15 @@ def __init__(self, config: ForecastOperatorConfig, datasets: ForecastDatasets):
4545
),
4646
)
4747
def _build_model(self) -> pd.DataFrame:
48+
4849
from automl import init
4950
from sktime.forecasting.model_selection import temporal_train_test_split
5051

51-
init(engine="local", check_deprecation_warnings=False)
52+
init(
53+
engine="local",
54+
engine_opts={"n_jobs": -1, "model_n_jobs": -1},
55+
check_deprecation_warnings=False,
56+
)
5257

5358
full_data_dict = self.datasets.full_data_dict
5459

@@ -63,6 +68,7 @@ def _build_model(self) -> pd.DataFrame:
6368
self.forecast_output = ForecastOutput(
6469
confidence_interval_width=self.spec.confidence_interval_width
6570
)
71+
self.errors_dict = dict()
6672

6773
# Clean up kwargs for pass through
6874
model_kwargs_cleaned = self.spec.model_kwargs.copy()
@@ -80,81 +86,84 @@ def _build_model(self) -> pd.DataFrame:
8086
] = self.spec.preprocessing or model_kwargs_cleaned.get("preprocessing", True)
8187

8288
for i, (target, df) in enumerate(full_data_dict.items()):
83-
logger.debug("Running automl for {} at position {}".format(target, i))
84-
series_values = df[df[target].notna()]
85-
# drop NaNs for the time period where data wasn't recorded
86-
series_values.dropna(inplace=True)
87-
df[date_column] = pd.to_datetime(
88-
df[date_column], format=self.spec.datetime_column.format
89-
)
90-
df = df.set_index(date_column)
91-
# if len(df.columns) > 1:
92-
# when additional columns are present
93-
y_train, y_test = temporal_train_test_split(df, test_size=horizon)
94-
forecast_x = y_test.drop(target, axis=1)
95-
# else:
96-
# y_train = df
97-
# forecast_x = None
98-
logger.debug(
99-
"Time Index is" + ""
100-
if y_train.index.is_monotonic
101-
else "NOT" + "monotonic."
102-
)
103-
model = automl.Pipeline(
104-
task="forecasting",
105-
**model_kwargs_cleaned,
106-
)
107-
model.fit(
108-
X=y_train.drop(target, axis=1),
109-
y=pd.DataFrame(y_train[target]),
110-
time_budget=time_budget,
111-
)
112-
logger.debug("Selected model: {}".format(model.selected_model_))
113-
logger.debug(
114-
"Selected model params: {}".format(model.selected_model_params_)
115-
)
116-
summary_frame = model.forecast(
117-
X=forecast_x,
118-
periods=horizon,
119-
alpha=1 - (self.spec.confidence_interval_width / 100),
120-
)
121-
input_values = pd.Series(
122-
y_train[target].values,
123-
name="input_value",
124-
index=y_train.index,
125-
)
126-
fitted_values_raw = model.predict(y_train.drop(target, axis=1))
127-
fitted_values = pd.Series(
128-
fitted_values_raw[target].values,
129-
name="fitted_value",
130-
index=y_train.index,
131-
)
89+
try:
90+
logger.debug("Running automl for {} at position {}".format(target, i))
91+
series_values = df[df[target].notna()]
92+
# drop NaNs for the time period where data wasn't recorded
93+
series_values.dropna(inplace=True)
94+
df[date_column] = pd.to_datetime(
95+
df[date_column], format=self.spec.datetime_column.format
96+
)
97+
df = df.set_index(date_column)
98+
# if len(df.columns) > 1:
99+
# when additional columns are present
100+
y_train, y_test = temporal_train_test_split(df, test_size=horizon)
101+
forecast_x = y_test.drop(target, axis=1)
102+
# else:
103+
# y_train = df
104+
# forecast_x = None
105+
logger.debug(
106+
"Time Index is" + ""
107+
if y_train.index.is_monotonic
108+
else "NOT" + "monotonic."
109+
)
110+
model = automl.Pipeline(
111+
task="forecasting",
112+
**model_kwargs_cleaned,
113+
)
114+
model.fit(
115+
X=y_train.drop(target, axis=1),
116+
y=pd.DataFrame(y_train[target]),
117+
time_budget=time_budget,
118+
)
119+
logger.debug("Selected model: {}".format(model.selected_model_))
120+
logger.debug(
121+
"Selected model params: {}".format(model.selected_model_params_)
122+
)
123+
summary_frame = model.forecast(
124+
X=forecast_x,
125+
periods=horizon,
126+
alpha=1 - (self.spec.confidence_interval_width / 100),
127+
)
128+
input_values = pd.Series(
129+
y_train[target].values,
130+
name="input_value",
131+
index=y_train.index,
132+
)
133+
fitted_values_raw = model.predict(y_train.drop(target, axis=1))
134+
fitted_values = pd.Series(
135+
fitted_values_raw[target].values,
136+
name="fitted_value",
137+
index=y_train.index,
138+
)
132139

133-
summary_frame = pd.concat(
134-
[input_values, fitted_values, summary_frame], axis=1
135-
)
140+
summary_frame = pd.concat(
141+
[input_values, fitted_values, summary_frame], axis=1
142+
)
136143

137-
# Collect Outputs
138-
selected_models[target] = {
139-
"series_id": target,
140-
"selected_model": model.selected_model_,
141-
"model_params": model.selected_model_params_,
142-
}
143-
models[target] = model
144-
summary_frame = summary_frame.rename_axis("ds").reset_index()
145-
summary_frame = summary_frame.rename(
146-
columns={
147-
f"{target}_ci_upper": "yhat_upper",
148-
f"{target}_ci_lower": "yhat_lower",
149-
f"{target}": "yhat",
144+
# Collect Outputs
145+
selected_models[target] = {
146+
"series_id": target,
147+
"selected_model": model.selected_model_,
148+
"model_params": model.selected_model_params_,
150149
}
151-
)
152-
# In case of Naive model, model.forecast function call does not return confidence intervals.
153-
if "yhat_upper" not in summary_frame:
154-
summary_frame["yhat_upper"] = np.NAN
155-
summary_frame["yhat_lower"] = np.NAN
156-
outputs[target] = summary_frame
157-
# outputs_legacy[target] = summary_frame
150+
models[target] = model
151+
summary_frame = summary_frame.rename_axis("ds").reset_index()
152+
summary_frame = summary_frame.rename(
153+
columns={
154+
f"{target}_ci_upper": "yhat_upper",
155+
f"{target}_ci_lower": "yhat_lower",
156+
f"{target}": "yhat",
157+
}
158+
)
159+
# In case of Naive model, model.forecast function call does not return confidence intervals.
160+
if "yhat_upper" not in summary_frame:
161+
summary_frame["yhat_upper"] = np.NAN
162+
summary_frame["yhat_lower"] = np.NAN
163+
outputs[target] = summary_frame
164+
# outputs_legacy[target] = summary_frame
165+
except Exception as e:
166+
self.errors_dict[target] = {"model_name": self.spec.model, "error": str(e)}
158167

159168
logger.debug("===========Forecast Generated===========")
160169
outputs_merged = pd.DataFrame()

0 commit comments

Comments
 (0)