Skip to content

Commit 5242f1d

Browse files
committed
make neural prophet parallel mdoel training
1 parent 177fa40 commit 5242f1d

File tree

1 file changed

+196
-35
lines changed

1 file changed

+196
-35
lines changed

ads/opctl/operator/lowcode/forecast/model/neuralprophet.py

Lines changed: 196 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import numpy as np
88
import optuna
99
import pandas as pd
10+
from joblib import Parallel, delayed
1011
from torch import Tensor
1112
from torchmetrics.regression import (
1213
MeanAbsoluteError,
@@ -71,33 +72,31 @@ def __init__(self, config: ForecastOperatorConfig, datasets: ForecastDatasets):
7172
self.train_metrics = True
7273
self.forecast_col_name = "yhat1"
7374

74-
def _build_model(self) -> pd.DataFrame:
75-
from neuralprophet import NeuralProphet
7675

77-
full_data_dict = self.datasets.full_data_dict
78-
models = []
79-
outputs = dict()
80-
outputs_legacy = []
81-
82-
# Extract the Confidence Interval Width and
83-
# convert to neural prophets equivalent - quantiles
84-
model_kwargs = self.spec.model_kwargs
85-
86-
if self.spec.confidence_interval_width is None:
87-
quantiles = model_kwargs.get("quantiles", [0.05, 0.95])
88-
self.spec.confidence_interval_width = float(quantiles[1]) - float(
89-
quantiles[0]
90-
)
91-
else:
92-
boundaries = round((1 - self.spec.confidence_interval_width) / 2, 2)
93-
quantiles = [boundaries, self.spec.confidence_interval_width + boundaries]
76+
def _train_model(self, i, target, df):
9477

95-
model_kwargs["quantiles"] = quantiles
96-
self.forecast_output = ForecastOutput(
97-
confidence_interval_width=self.spec.confidence_interval_width
98-
)
78+
try:
79+
from neuralprophet import NeuralProphet
80+
81+
# Extract the Confidence Interval Width and
82+
# convert to neural prophets equivalent - quantiles
83+
model_kwargs = self.spec.model_kwargs
84+
85+
if self.spec.confidence_interval_width is None:
86+
self.quantiles = model_kwargs.get("quantiles", [0.05, 0.95])
87+
self.spec.confidence_interval_width = float(self.quantiles[1]) - float(
88+
self.quantiles[0]
89+
)
90+
else:
91+
boundaries = round((1 - self.spec.confidence_interval_width) / 2, 2)
92+
self.quantiles = [boundaries, self.spec.confidence_interval_width + boundaries]
93+
94+
model_kwargs["quantiles"] = self.quantiles
95+
self.forecast_output = ForecastOutput(
96+
confidence_interval_width=self.spec.confidence_interval_width
97+
)
9998

100-
for i, (target, df) in enumerate(full_data_dict.items()):
99+
# for i, (target, df) in enumerate(full_data_dict.items()):
101100
le, df_encoded = utils._label_encode_dataframe(
102101
df, no_encode={self.spec.datetime_column.name, target}
103102
)
@@ -212,14 +211,176 @@ def objective(trial):
212211
forecast = model.predict(future)
213212
logger.debug(f"-----------------Model {i}----------------------")
214213
logger.debug(forecast.tail())
215-
models.append(model)
216-
outputs[target] = forecast
217-
outputs_legacy.append(forecast)
214+
# models.append(model)
215+
self.outputs_dict[target] = forecast
216+
self.outputs_legacy.append(forecast)
217+
218+
self.models_dict[target] = model
219+
self.outputs = self.outputs_legacy
220+
221+
logger.debug("===========Done===========")
222+
except Exception as e:
223+
self.errors_dict[target] = {"model_name": self.spec.model, "error": str(e)}
224+
225+
def _build_model(self) -> pd.DataFrame:
226+
# from neuralprophet import NeuralProphet
218227

219-
self.models = models
220-
self.outputs = outputs_legacy
228+
full_data_dict = self.datasets.full_data_dict
229+
self.models_dict = dict()
230+
self.outputs_dict = dict()
231+
self.outputs_legacy = []
232+
self.errors_dict = dict()
233+
234+
Parallel(n_jobs=-1, require="sharedmem")(
235+
delayed(NeuralProphetOperatorModel._train_model)(self, i, target, df)
236+
for self, (i, (target, df)) in zip(
237+
[self] * len(full_data_dict), enumerate(full_data_dict.items())
238+
)
239+
)
221240

222-
logger.debug("===========Done===========")
241+
self.models = [self.models_dict[target] for target in self.target_columns]
242+
243+
# # Extract the Confidence Interval Width and
244+
# # convert to neural prophets equivalent - quantiles
245+
# model_kwargs = self.spec.model_kwargs
246+
247+
# if self.spec.confidence_interval_width is None:
248+
# quantiles = model_kwargs.get("quantiles", [0.05, 0.95])
249+
# self.spec.confidence_interval_width = float(quantiles[1]) - float(
250+
# quantiles[0]
251+
# )
252+
# else:
253+
# boundaries = round((1 - self.spec.confidence_interval_width) / 2, 2)
254+
# quantiles = [boundaries, self.spec.confidence_interval_width + boundaries]
255+
256+
# model_kwargs["quantiles"] = quantiles
257+
# self.forecast_output = ForecastOutput(
258+
# confidence_interval_width=self.spec.confidence_interval_width
259+
# )
260+
261+
# for i, (target, df) in enumerate(full_data_dict.items()):
262+
# le, df_encoded = utils._label_encode_dataframe(
263+
# df, no_encode={self.spec.datetime_column.name, target}
264+
# )
265+
# model_kwargs_i = model_kwargs.copy()
266+
267+
# # format the dataframe for this target. Dropping NA on target[df] will remove all future data
268+
# df_clean = self._preprocess(
269+
# df_encoded,
270+
# self.spec.datetime_column.name,
271+
# self.spec.datetime_column.format,
272+
# )
273+
# data_i = df_clean[df_clean[target].notna()]
274+
# data_i.rename({target: "y"}, axis=1, inplace=True)
275+
276+
# # Assume that all columns passed in should be used as additional data
277+
# additional_regressors = set(data_i.columns) - {"y", "ds"}
278+
# training_data = data_i[["y", "ds"] + list(additional_regressors)]
279+
280+
# if self.perform_tuning:
281+
282+
# def objective(trial):
283+
# params = {
284+
# # 'seasonality_mode': trial.suggest_categorical('seasonality_mode', ['additive', 'multiplicative']),
285+
# # 'seasonality_reg': trial.suggest_float('seasonality_reg', 0.1, 500, log=True),
286+
# # 'learning_rate': trial.suggest_float('learning_rate', 0.0001, 0.1, log=True),
287+
# "newer_samples_start": trial.suggest_float(
288+
# "newer_samples_start", 0.001, 0.999
289+
# ),
290+
# "newer_samples_weight": trial.suggest_float(
291+
# "newer_samples_weight", 0, 100
292+
# ),
293+
# "changepoints_range": trial.suggest_float(
294+
# "changepoints_range", 0.8, 0.95
295+
# ),
296+
# }
297+
# # trend_reg, trend_reg_threshold, ar_reg, impute_rolling/impute_linear,
298+
# params.update(model_kwargs_i)
299+
300+
# folds = NeuralProphet(**params).crossvalidation_split_df(
301+
# data_i, k=3
302+
# )
303+
# test_metrics_total_i = []
304+
# for df_train, df_test in folds:
305+
# m, accepted_regressors = _fit_model(
306+
# data=df_train,
307+
# params=params,
308+
# additional_regressors=additional_regressors,
309+
# select_metric=self.spec.metric,
310+
# )
311+
# df_test = df_test[["y", "ds"] + accepted_regressors]
312+
313+
# test_forecast_i = m.predict(df=df_test)
314+
# fold_metric_i = (
315+
# m.metrics[self.spec.metric]
316+
# .forward(
317+
# Tensor(test_forecast_i["yhat1"]),
318+
# Tensor(test_forecast_i["y"]),
319+
# )
320+
# .item()
321+
# )
322+
# test_metrics_total_i.append(fold_metric_i)
323+
# logger.debug(
324+
# f"----------------------{np.asarray(test_metrics_total_i).mean()}----------------------"
325+
# )
326+
# return np.asarray(test_metrics_total_i).mean()
327+
328+
# study = optuna.create_study(direction="minimize")
329+
# m_params = NeuralProphet().parameters()
330+
# study.enqueue_trial(
331+
# {
332+
# # 'seasonality_mode': m_params['seasonality_mode'],
333+
# # 'seasonality_reg': m_params['seasonality_reg'],
334+
# # 'learning_rate': m_params['learning_rate'],
335+
# "newer_samples_start": m_params["newer_samples_start"],
336+
# "newer_samples_weight": m_params["newer_samples_weight"],
337+
# "changepoints_range": m_params["changepoints_range"],
338+
# }
339+
# )
340+
# study.optimize(
341+
# objective,
342+
# n_trials=self.spec.tuning.n_trials
343+
# if self.spec.tuning
344+
# else DEFAULT_TRIALS,
345+
# n_jobs=-1,
346+
# )
347+
348+
# selected_params = study.best_params
349+
# selected_params.update(model_kwargs_i)
350+
# model_kwargs_i = selected_params
351+
352+
# # Build and fit model
353+
# model, accepted_regressors = _fit_model(
354+
# data=training_data,
355+
# params=model_kwargs_i,
356+
# additional_regressors=additional_regressors,
357+
# select_metric=self.spec.metric,
358+
# )
359+
# logger.debug(
360+
# f"Found the following additional data columns: {additional_regressors}"
361+
# )
362+
# logger.debug(
363+
# f"While fitting the model, some additional data may have been "
364+
# f"discarded. Only using the columns: {accepted_regressors}"
365+
# )
366+
367+
# # Build future dataframe
368+
# future = df_clean.reset_index(drop=True)
369+
# future["y"] = None
370+
# future = future[["y", "ds"] + list(accepted_regressors)]
371+
372+
# # Forecast model and collect outputs
373+
# forecast = model.predict(future)
374+
# logger.debug(f"-----------------Model {i}----------------------")
375+
# logger.debug(forecast.tail())
376+
# models.append(model)
377+
# outputs[target] = forecast
378+
# outputs_legacy.append(forecast)
379+
380+
# self.models = models
381+
# self.outputs = outputs_legacy
382+
383+
# logger.debug("===========Done===========")
223384

224385
# Merge the outputs from each model into 1 df with all outputs by target and category
225386
col = self.original_target_column
@@ -229,7 +390,7 @@ def objective(trial):
229390
for cat in self.categories:
230391
output_i = pd.DataFrame()
231392

232-
output_i["Date"] = outputs[f"{col}_{cat}"]["ds"]
393+
output_i["Date"] = self.outputs_dict[f"{col}_{cat}"]["ds"]
233394
output_i["Series"] = cat
234395
output_i[f"input_value"] = full_data_dict[f"{col}_{cat}"][f"{col}_{cat}"]
235396

@@ -240,26 +401,26 @@ def objective(trial):
240401

241402
output_i.iloc[
242403
: -self.spec.horizon, output_i.columns.get_loc(f"fitted_value")
243-
] = (outputs[f"{col}_{cat}"]["yhat1"].iloc[: -self.spec.horizon].values)
404+
] = (self.outputs_dict[f"{col}_{cat}"]["yhat1"].iloc[: -self.spec.horizon].values)
244405
output_i.iloc[
245406
-self.spec.horizon :,
246407
output_i.columns.get_loc(f"forecast_value"),
247408
] = (
248-
outputs[f"{col}_{cat}"]["yhat1"].iloc[-self.spec.horizon :].values
409+
self.outputs_dict[f"{col}_{cat}"]["yhat1"].iloc[-self.spec.horizon :].values
249410
)
250411
output_i.iloc[
251412
-self.spec.horizon :,
252413
output_i.columns.get_loc(yhat_upper_name),
253414
] = (
254-
outputs[f"{col}_{cat}"][f"yhat1 {quantiles[1]*100}%"]
415+
self.outputs_dict[f"{col}_{cat}"][f"yhat1 {self.quantiles[1]*100}%"]
255416
.iloc[-self.spec.horizon :]
256417
.values
257418
)
258419
output_i.iloc[
259420
-self.spec.horizon :,
260421
output_i.columns.get_loc(yhat_lower_name),
261422
] = (
262-
outputs[f"{col}_{cat}"][f"yhat1 {quantiles[0]*100}%"]
423+
self.outputs_dict[f"{col}_{cat}"][f"yhat1 {self.quantiles[0]*100}%"]
263424
.iloc[-self.spec.horizon :]
264425
.values
265426
)

0 commit comments

Comments
 (0)