From fab6bb2629d7f86f650c112ae9d68e267e103dde Mon Sep 17 00:00:00 2001 From: Massimiliano Lincetto Date: Tue, 17 Jun 2025 17:20:47 +0200 Subject: [PATCH 1/6] attempt at decoupling bg --- flarestack/core/results.py | 179 +++++++++++++++++++++++++++++++------ 1 file changed, 151 insertions(+), 28 deletions(-) diff --git a/flarestack/core/results.py b/flarestack/core/results.py index dc78ae25..78662265 100644 --- a/flarestack/core/results.py +++ b/flarestack/core/results.py @@ -37,6 +37,113 @@ class OverfluctuationError(Exception): pass +class PickleCache: + def __init__(self, pickle_path: Path, background_only: bool = False): + self.path = pickle_path + # self.pickle_path.mkdir(parents=True, exist_ok=True) + self.merged_path = self.path / "merged" + self.merged_path.mkdir(parents=True, exist_ok=True) + self.background_only = background_only + + def clean_merged_data(self): + """Function to clear cache of all data""" + # remove all files inside merged_path using Pathlib + if self.merged_path.exists(): + for file in self.merged_path.iterdir(): + if file.is_file(): + file.unlink() + logger.debug(f"Removed all files from {self.merged_path}") + + def get_subdirs(self): + return [ + x + for x in os.listdir(self.path) + if x[0] != "." and x != "merged" + ] + + def merge_datadict(self, merged: dict[str, list | dict], pending_data: dict[str, list | dict]): + """Merge the content of pending_data into merged.""" + for key, info in pending_data.items(): + if isinstance(info, list): + # Append the list to the existing one. + merged[key] += info + elif isinstance(info, dict): + for param_name, params in info.items(): + try: + merged[key][param_name] += params + except KeyError as m: + logger.warning( + f"Keys [{key}][{param_name}] not found in \n {merged}" + ) + raise KeyError(m) + else: + raise TypeError( + f"Unexpected type for key {key}: {type(info)}. Expected list or dict." + ) + + + def merge_and_load_subdir(self, subdir_name): + """Merge and load data from a single subdirectory.""" + subdir = os.path.join(self.path, subdir_name) + + files = os.listdir(subdir) + + # Map one dir to one pickle + merged_file_path = os.path.join(self.merged_path, subdir_name + ".pkl") + # Load previously merged data, if it exists. + if os.path.isfile(merged_file_path): + logger.debug(f"loading merged data from {merged_file_path}") + with open(merged_file_path, "rb") as mp: + merged_data = Pickle.load(mp) + else: + merged_data = {} + + for filename in files: + pending_file = os.path.join(subdir, filename) + + try: + with open(pending_file, "rb") as f: + data = Pickle.load(f) + except (EOFError, IsADirectoryError): + logger.warning("Failed loading: {0}".format(pending_file)) + continue + # Remove file immediately. This can lead to undesired results because if the program crashes or gets terminated in the process, we will have removed files before writing the merged data. However, delaying the removal would create the opposite problem: the file is merged but not removed, and potentially merged a second time at the next run. + os.remove(pending_file) + + if merged_data == {}: + merged_data = data + else: + self.merge_datadict(merged_data, data) + + # Save merged data. + with open(merged_file_path, "wb") as mp: + Pickle.dump(merged_data, mp) + + return merged_data + + def merge_and_load(self, output_dict: dict): + # Loop over all injection scale subdirectories. + scales_subdirs = self.get_subdirs() + + background_label = scale_shortener(0.0) + + for subdir_name in scales_subdirs: + scale_label = scale_shortener(float(subdir_name)) + + if self.background_only and scale_label != background_label: + # skip non-background trials + continue + + pending_data = self.merge_and_load_subdir(subdir_name) + + if pending_data: + if scale_label == background_label and background_label in output_dict: + self.merge_datadict(output_dict[background_label], pending_data) + else: + output_dict[scale_label] = pending_data + + + class ResultsHandler(object): def __init__( self, @@ -45,10 +152,17 @@ def __init__( do_disc=True, bias_error="std", sigma_thresholds=[3.0, 5.0], + background_from=None ): self.sources = load_catalogue(rh_dict["catalogue"]) self.name = rh_dict["name"] + + if background_from is not None: + self.background_from = background_from + else: + self.background_from = rh_dict["name"] + self.mh_name = rh_dict["mh_name"] self._inj_dict = rh_dict["inj_dict"] @@ -59,11 +173,14 @@ def __init__( self.maxfev = rh_dict.get("maxfev", 800) self.results = dict() + self.pickle_output_dir = name_pickle_output_dir(self.name) + self.pickle_output_dir_bg = name_pickle_output_dir(self.background_from) - self.plot_path = Path(plot_output_dir(self.name)) + self.pickle_cache = PickleCache(Path(self.pickle_output_dir)) + self.pickle_cache_bg = PickleCache(Path(self.pickle_output_dir_bg)) - self.merged_dir = os.path.join(self.pickle_output_dir, "merged") + self.plot_path = Path(plot_output_dir(self.name)) self.allow_extrapolation = rh_dict.get("allow_extrapolated_sensitivity", True) @@ -101,7 +218,8 @@ def __init__( "extrapolated": False, } - # Load injection ladder values + # Load injection ladder values. + # Builds a dictionary mapping the injection scale to the content of the trials. try: self.inj = self.load_injection_values() except FileNotFoundError as err: @@ -112,17 +230,17 @@ def __init__( self.valid = False return - # Load and merge the trial results try: - self.merge_pickle_data() + self.pickle_cache.merge_and_load(output_dict=self.results) + # Load the background trials. Will override the existing one. + self.pickle_cache_bg.merge_and_load(output_dict=self.results) except FileNotFoundError: logger.warning(f"No files found at {self.pickle_output_dir}") # auxiliary parameters - # self.sorted_scales = sorted(self.results.keys()) self.scale_values = sorted( [float(j) for j in self.results.keys()] - ) # replaces self.scales_float + ) self.scale_labels = [scale_shortener(i) for i in self.scale_values] logger.info(f"Injection scales: {self.scale_values}") @@ -131,6 +249,7 @@ def __init__( # Determine the injection scales try: self.find_ns_scale() + self.plot_bias() except ValueError as e: logger.warning(f"RuntimeError for ns scale factor: \n {e}") except IndexError as e: @@ -142,10 +261,6 @@ def __init__( if len(self.scale_values) == 1 and self.scale_values[0] == 0: self.make_plots(self.scale_labels[0]) - # Create fit bias plots - # this expects flux_to_ns to be set - self.plot_bias() - if do_sens: try: self.find_sensitivity() @@ -282,12 +397,9 @@ def nu_astronomy(self, flux, e_pdf_dict): return calculate_astronomy(flux, e_pdf_dict) def clean_merged_data(self): - """Function to clear cache of all data""" - try: - for f in os.listdir(self.merged_dir): - os.remove(self.merged_dir + f) - except OSError: - pass + """Clean merged data from pickle cache, only for main analysis. Do not touch the background cache.""" + self.pickle_cache.clean_merged_data() + def load_injection_values(self): """Function to load the values used in injection, so that a @@ -315,25 +427,33 @@ def load_injection_values(self): return inj_values - def merge_pickle_data(self): + + def merge_and_load_pickle_data(self): + # NOTE: + # self.pickle_output_path + # self.merged_dir = self.pickle_output_path / "merged" + + + # Loop over all subdirectories, one for each injection scale, containing one pickle per trial. all_sub_dirs = [ x - for x in os.listdir(self.pickle_output_dir) + for x in os.listdir(self.path) if x[0] != "." and x != "merged" ] - + # Create a "merged" directory, that will contain a single pickle with many trials per injection scale. try: os.makedirs(self.merged_dir) except OSError: pass for sub_dir_name in all_sub_dirs: - sub_dir = os.path.join(self.pickle_output_dir, sub_dir_name) + sub_dir = os.path.join(self.path, sub_dir_name) files = os.listdir(sub_dir) + # Map one dir to one pickle merged_path = os.path.join(self.merged_dir, sub_dir_name + ".pkl") - + # Load previously merged data, if it exists. if os.path.isfile(merged_path): logger.debug(f"loading merged data from {merged_path}") with open(merged_path, "rb") as mp: @@ -342,15 +462,16 @@ def merge_pickle_data(self): merged_data = {} for filename in files: - path = os.path.join(sub_dir, filename) + pending_file = os.path.join(sub_dir, filename) try: - with open(path, "rb") as f: + with open(pending_file, "rb") as f: data = Pickle.load(f) except (EOFError, IsADirectoryError): - logger.warning("Failed loading: {0}".format(path)) + logger.warning("Failed loading: {0}".format(pending_file)) continue - os.remove(path) + # This can be "dangerous" because if the program crashes or gets terminated, we will have removed files before writing the merged data. + os.remove(pending_file) if merged_data == {}: merged_data = data @@ -368,13 +489,15 @@ def merge_pickle_data(self): ) raise KeyError(m) + # Save merged data. with open(merged_path, "wb") as mp: Pickle.dump(merged_data, mp) - if len(list(merged_data.keys())) > 0: + # Load merged data in results. + if merged_data: self.results[scale_shortener(float(sub_dir_name))] = merged_data - if len(list(self.results.keys())) == 0: + if not self.results: logger.warning("No data was found by ResultsHandler object! \n") logger.warning( "Tried root directory: \n {0} \n ".format(self.pickle_output_dir) From 6af7c9bc7f40093bbcc601866ce6aecfe2b83362 Mon Sep 17 00:00:00 2001 From: Massimiliano Lincetto Date: Tue, 17 Jun 2025 23:02:19 +0200 Subject: [PATCH 2/6] blackify --- flarestack/core/results.py | 26 ++++++++------------------ 1 file changed, 8 insertions(+), 18 deletions(-) diff --git a/flarestack/core/results.py b/flarestack/core/results.py index 78662265..75503c6e 100644 --- a/flarestack/core/results.py +++ b/flarestack/core/results.py @@ -55,13 +55,11 @@ def clean_merged_data(self): logger.debug(f"Removed all files from {self.merged_path}") def get_subdirs(self): - return [ - x - for x in os.listdir(self.path) - if x[0] != "." and x != "merged" - ] + return [x for x in os.listdir(self.path) if x[0] != "." and x != "merged"] - def merge_datadict(self, merged: dict[str, list | dict], pending_data: dict[str, list | dict]): + def merge_datadict( + self, merged: dict[str, list | dict], pending_data: dict[str, list | dict] + ): """Merge the content of pending_data into merged.""" for key, info in pending_data.items(): if isinstance(info, list): @@ -81,7 +79,6 @@ def merge_datadict(self, merged: dict[str, list | dict], pending_data: dict[str, f"Unexpected type for key {key}: {type(info)}. Expected list or dict." ) - def merge_and_load_subdir(self, subdir_name): """Merge and load data from a single subdirectory.""" subdir = os.path.join(self.path, subdir_name) @@ -138,12 +135,12 @@ def merge_and_load(self, output_dict: dict): if pending_data: if scale_label == background_label and background_label in output_dict: + logger.info("Appending background data to existing trials.") self.merge_datadict(output_dict[background_label], pending_data) else: output_dict[scale_label] = pending_data - class ResultsHandler(object): def __init__( self, @@ -152,7 +149,7 @@ def __init__( do_disc=True, bias_error="std", sigma_thresholds=[3.0, 5.0], - background_from=None + background_from=None, ): self.sources = load_catalogue(rh_dict["catalogue"]) @@ -238,9 +235,7 @@ def __init__( logger.warning(f"No files found at {self.pickle_output_dir}") # auxiliary parameters - self.scale_values = sorted( - [float(j) for j in self.results.keys()] - ) + self.scale_values = sorted([float(j) for j in self.results.keys()]) self.scale_labels = [scale_shortener(i) for i in self.scale_values] logger.info(f"Injection scales: {self.scale_values}") @@ -400,7 +395,6 @@ def clean_merged_data(self): """Clean merged data from pickle cache, only for main analysis. Do not touch the background cache.""" self.pickle_cache.clean_merged_data() - def load_injection_values(self): """Function to load the values used in injection, so that a comparison to the fit results can be made. @@ -427,18 +421,14 @@ def load_injection_values(self): return inj_values - def merge_and_load_pickle_data(self): # NOTE: # self.pickle_output_path # self.merged_dir = self.pickle_output_path / "merged" - # Loop over all subdirectories, one for each injection scale, containing one pickle per trial. all_sub_dirs = [ - x - for x in os.listdir(self.path) - if x[0] != "." and x != "merged" + x for x in os.listdir(self.path) if x[0] != "." and x != "merged" ] # Create a "merged" directory, that will contain a single pickle with many trials per injection scale. try: From 5944d7653453072d0f3c31434e2239fc8aef8066 Mon Sep 17 00:00:00 2001 From: Massimiliano Lincetto Date: Tue, 17 Jun 2025 23:13:42 +0200 Subject: [PATCH 3/6] give up on type hints here --- flarestack/core/results.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/flarestack/core/results.py b/flarestack/core/results.py index 75503c6e..d4ae23ff 100644 --- a/flarestack/core/results.py +++ b/flarestack/core/results.py @@ -57,13 +57,11 @@ def clean_merged_data(self): def get_subdirs(self): return [x for x in os.listdir(self.path) if x[0] != "." and x != "merged"] - def merge_datadict( - self, merged: dict[str, list | dict], pending_data: dict[str, list | dict] - ): + def merge_datadict(self, merged, pending_data): """Merge the content of pending_data into merged.""" for key, info in pending_data.items(): if isinstance(info, list): - # Append the list to the existing one. + # Append the list to the existing one. We assume that on the left-hand-side we always have a list, so we ignore the type checking. merged[key] += info elif isinstance(info, dict): for param_name, params in info.items(): @@ -149,14 +147,14 @@ def __init__( do_disc=True, bias_error="std", sigma_thresholds=[3.0, 5.0], - background_from=None, + background_source=None, ): self.sources = load_catalogue(rh_dict["catalogue"]) self.name = rh_dict["name"] - if background_from is not None: - self.background_from = background_from + if background_source is not None: + self.background_from = background_source else: self.background_from = rh_dict["name"] From 1ff3f82a99cc6c89a78fa88b9d0f711e5cb662da Mon Sep 17 00:00:00 2001 From: Massimiliano Lincetto Date: Tue, 17 Jun 2025 23:36:54 +0200 Subject: [PATCH 4/6] cleanup dead code --- flarestack/core/results.py | 80 ++++---------------------------------- 1 file changed, 7 insertions(+), 73 deletions(-) diff --git a/flarestack/core/results.py b/flarestack/core/results.py index d4ae23ff..03e8d48e 100644 --- a/flarestack/core/results.py +++ b/flarestack/core/results.py @@ -229,6 +229,13 @@ def __init__( self.pickle_cache.merge_and_load(output_dict=self.results) # Load the background trials. Will override the existing one. self.pickle_cache_bg.merge_and_load(output_dict=self.results) + if not self.results: + logger.warning("No data was found by ResultsHandler object! \n") + logger.warning( + "Tried root directory: \n {0} \n ".format(self.pickle_output_dir) + ) + sys.exit() + except FileNotFoundError: logger.warning(f"No files found at {self.pickle_output_dir}") @@ -419,79 +426,6 @@ def load_injection_values(self): return inj_values - def merge_and_load_pickle_data(self): - # NOTE: - # self.pickle_output_path - # self.merged_dir = self.pickle_output_path / "merged" - - # Loop over all subdirectories, one for each injection scale, containing one pickle per trial. - all_sub_dirs = [ - x for x in os.listdir(self.path) if x[0] != "." and x != "merged" - ] - # Create a "merged" directory, that will contain a single pickle with many trials per injection scale. - try: - os.makedirs(self.merged_dir) - except OSError: - pass - - for sub_dir_name in all_sub_dirs: - sub_dir = os.path.join(self.path, sub_dir_name) - - files = os.listdir(sub_dir) - - # Map one dir to one pickle - merged_path = os.path.join(self.merged_dir, sub_dir_name + ".pkl") - # Load previously merged data, if it exists. - if os.path.isfile(merged_path): - logger.debug(f"loading merged data from {merged_path}") - with open(merged_path, "rb") as mp: - merged_data = Pickle.load(mp) - else: - merged_data = {} - - for filename in files: - pending_file = os.path.join(sub_dir, filename) - - try: - with open(pending_file, "rb") as f: - data = Pickle.load(f) - except (EOFError, IsADirectoryError): - logger.warning("Failed loading: {0}".format(pending_file)) - continue - # This can be "dangerous" because if the program crashes or gets terminated, we will have removed files before writing the merged data. - os.remove(pending_file) - - if merged_data == {}: - merged_data = data - else: - for key, info in data.items(): - if isinstance(info, list): - merged_data[key] += info - else: - for param_name, params in info.items(): - try: - merged_data[key][param_name] += params - except KeyError as m: - logger.warning( - f"Keys [{key}][{param_name}] not found in \n {merged_data}" - ) - raise KeyError(m) - - # Save merged data. - with open(merged_path, "wb") as mp: - Pickle.dump(merged_data, mp) - - # Load merged data in results. - if merged_data: - self.results[scale_shortener(float(sub_dir_name))] = merged_data - - if not self.results: - logger.warning("No data was found by ResultsHandler object! \n") - logger.warning( - "Tried root directory: \n {0} \n ".format(self.pickle_output_dir) - ) - sys.exit() - def find_ns_scale(self): """Find the number of neutrinos corresponding to flux""" try: From 3327e7fa49d7ec1849efda8f289297ee77efa010 Mon Sep 17 00:00:00 2001 From: Massimiliano Lincetto Date: Tue, 1 Jul 2025 18:02:05 +0200 Subject: [PATCH 5/6] better org/3 --- flarestack/core/results.py | 42 ++++++++++++++++++++++++++++---------- 1 file changed, 31 insertions(+), 11 deletions(-) diff --git a/flarestack/core/results.py b/flarestack/core/results.py index 03e8d48e..b3ef4fa8 100644 --- a/flarestack/core/results.py +++ b/flarestack/core/results.py @@ -123,20 +123,33 @@ def merge_and_load(self, output_dict: dict): background_label = scale_shortener(0.0) for subdir_name in scales_subdirs: - scale_label = scale_shortener(float(subdir_name)) + try: + scale_label = scale_shortener(float(subdir_name)) + except ValueError as e: + # If analysis paths are nested, i.e. we have analyses ana1 and ana1/sub1, the ana1/sub1 directory will be scanned, but should be skipped. Ideally the user should avoid nesting analysis directories, but there is no safeguard against this behaviour. + logger.debug( + f"Skipping subdirectory {subdir_name} as it does not represent a valid scale. Parent directory: {self.path}" + ) + continue if self.background_only and scale_label != background_label: - # skip non-background trials + # skip non-background trials for background_only mode continue pending_data = self.merge_and_load_subdir(subdir_name) if pending_data: + n_pending = len(pending_data["TS"]) + if scale_label == background_label and background_label in output_dict: - logger.info("Appending background data to existing trials.") + logger.info(f"Appending f{n_pending} background data to {len(output_dict[background_label]['TS'])} existing trials ({scale_label=})") self.merge_datadict(output_dict[background_label], pending_data) else: output_dict[scale_label] = pending_data + if self.background_only: + logger.info( + f"Loading {n_pending} background trials ({scale_label=})" + ) class ResultsHandler(object): @@ -153,10 +166,7 @@ def __init__( self.name = rh_dict["name"] - if background_source is not None: - self.background_from = background_source - else: - self.background_from = rh_dict["name"] + self.background_source = background_source self.mh_name = rh_dict["mh_name"] @@ -170,10 +180,11 @@ def __init__( self.results = dict() self.pickle_output_dir = name_pickle_output_dir(self.name) - self.pickle_output_dir_bg = name_pickle_output_dir(self.background_from) + self.pickle_output_dir_bg = name_pickle_output_dir(self.background_source) if self.background_source else None self.pickle_cache = PickleCache(Path(self.pickle_output_dir)) - self.pickle_cache_bg = PickleCache(Path(self.pickle_output_dir_bg)) + + self.pickle_cache_bg = PickleCache(Path(self.pickle_output_dir_bg), background_only=True) if self.background_source else None self.plot_path = Path(plot_output_dir(self.name)) @@ -228,13 +239,21 @@ def __init__( try: self.pickle_cache.merge_and_load(output_dict=self.results) # Load the background trials. Will override the existing one. - self.pickle_cache_bg.merge_and_load(output_dict=self.results) + if self.pickle_cache_bg is not None: + print("NOTE!!!! Loading BG") + self.pickle_cache_bg.merge_and_load(output_dict=self.results) + else: + print("NOTE!!!! No BG pickle cache") if not self.results: logger.warning("No data was found by ResultsHandler object! \n") logger.warning( "Tried root directory: \n {0} \n ".format(self.pickle_output_dir) ) sys.exit() + if not scale_shortener(0.0) in self.results: + logger.error(f"No key equal to '0' in results! Keys are {self.results.keys()}") + + sys.exit() except FileNotFoundError: logger.warning(f"No files found at {self.pickle_output_dir}") @@ -730,7 +749,7 @@ def find_disc_potential(self): ) logger.info( - f"Scale: {scale}, TS_threshold: {disc_threshold}, n_trials: {len(ts_array)} => overfluctuations {frac=}" + f"Scale: {scale}, TS_threshold: {disc_threshold:.1f}, n_trials: {len(ts_array)} => overfluctuations {frac=:.4f}" ) y[zval].append(frac) @@ -1013,6 +1032,7 @@ def plot_bias(self): for scale in raw_x: vals = self.results[scale]["Parameters"][param] + if self.bias_error == "std": med = np.median(vals) meds.append(med) From dc4976217b447fb20803f77952bdc87ff233f843 Mon Sep 17 00:00:00 2001 From: Massimiliano Lincetto Date: Tue, 1 Jul 2025 18:06:23 +0200 Subject: [PATCH 6/6] black --- flarestack/core/results.py | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/flarestack/core/results.py b/flarestack/core/results.py index b3ef4fa8..0b8429a9 100644 --- a/flarestack/core/results.py +++ b/flarestack/core/results.py @@ -142,7 +142,9 @@ def merge_and_load(self, output_dict: dict): n_pending = len(pending_data["TS"]) if scale_label == background_label and background_label in output_dict: - logger.info(f"Appending f{n_pending} background data to {len(output_dict[background_label]['TS'])} existing trials ({scale_label=})") + logger.info( + f"Appending f{n_pending} background data to {len(output_dict[background_label]['TS'])} existing trials ({scale_label=})" + ) self.merge_datadict(output_dict[background_label], pending_data) else: output_dict[scale_label] = pending_data @@ -180,11 +182,19 @@ def __init__( self.results = dict() self.pickle_output_dir = name_pickle_output_dir(self.name) - self.pickle_output_dir_bg = name_pickle_output_dir(self.background_source) if self.background_source else None + self.pickle_output_dir_bg = ( + name_pickle_output_dir(self.background_source) + if self.background_source + else None + ) self.pickle_cache = PickleCache(Path(self.pickle_output_dir)) - self.pickle_cache_bg = PickleCache(Path(self.pickle_output_dir_bg), background_only=True) if self.background_source else None + self.pickle_cache_bg = ( + PickleCache(Path(self.pickle_output_dir_bg), background_only=True) + if self.background_source + else None + ) self.plot_path = Path(plot_output_dir(self.name)) @@ -251,7 +261,9 @@ def __init__( ) sys.exit() if not scale_shortener(0.0) in self.results: - logger.error(f"No key equal to '0' in results! Keys are {self.results.keys()}") + logger.error( + f"No key equal to '0' in results! Keys are {self.results.keys()}" + ) sys.exit() @@ -1032,7 +1044,6 @@ def plot_bias(self): for scale in raw_x: vals = self.results[scale]["Parameters"][param] - if self.bias_error == "std": med = np.median(vals) meds.append(med)