From 3235846a88ec0147ad3a1e0b3168e94b5bacacba Mon Sep 17 00:00:00 2001 From: nefrathenrici Date: Sat, 9 Aug 2025 20:14:35 -0700 Subject: [PATCH] Add subseasonal calibration pipeline --- .buildkite/longruns/pipeline.yml | 2 +- .gitignore | 1 + .../atmos_configs/climaatmos_wx_progedmf.yml | 2 +- .../subseasonal_configs/wxquest_diagedmf.yml | 35 +-- experiments/calibration/README.md | 18 +- experiments/calibration/api.jl | 154 ++++++++++ .../coarse_amip/generate_observations.jl | 145 +++++++++ .../coarse_amip/model_interface.jl | 49 +++ .../coarse_amip/observation_map.jl | 281 ++++++++++++++++++ .../coarse_amip/observation_utils.jl | 25 ++ .../coarse_amip/run_calibration.jl | 96 ++++++ experiments/calibration/model_config.yml | 36 --- experiments/calibration/model_interface.jl | 26 -- experiments/calibration/run_calibration.jl | 154 ---------- experiments/calibration/run_calibration.sh | 13 - toml/wxquest_diagedmf.toml | 50 ++++ 16 files changed, 824 insertions(+), 263 deletions(-) create mode 100644 experiments/calibration/api.jl create mode 100644 experiments/calibration/coarse_amip/generate_observations.jl create mode 100644 experiments/calibration/coarse_amip/model_interface.jl create mode 100644 experiments/calibration/coarse_amip/observation_map.jl create mode 100644 experiments/calibration/coarse_amip/observation_utils.jl create mode 100644 experiments/calibration/coarse_amip/run_calibration.jl delete mode 100644 experiments/calibration/model_config.yml delete mode 100644 experiments/calibration/model_interface.jl delete mode 100644 experiments/calibration/run_calibration.jl delete mode 100644 experiments/calibration/run_calibration.sh create mode 100644 toml/wxquest_diagedmf.toml diff --git a/.buildkite/longruns/pipeline.yml b/.buildkite/longruns/pipeline.yml index 01ef08dc3e..1ab3c127ac 100644 --- a/.buildkite/longruns/pipeline.yml +++ b/.buildkite/longruns/pipeline.yml @@ -183,7 +183,7 @@ steps: - label: "Perfect model calibration test" key: "amip_pm_calibration" command: - - "julia --color=yes --project=experiments/ClimaEarth experiments/calibration/run_calibration.jl" + - "julia --color=yes --project=experiments/ClimaEarth experiments/calibration/coarse_amip/run_calibration.jl" artifact_paths: "experiments/calibration/output/*" env: CLIMACOMMS_DEVICE: "CUDA" diff --git a/.gitignore b/.gitignore index 4f4dd88cd6..715d882ce1 100644 --- a/.gitignore +++ b/.gitignore @@ -30,6 +30,7 @@ docs/src/generated/ # Experiments !experiments/ClimaEarth/**/Manifest.toml !experiments/ClimaCore/**/Manifest.toml +experiments/calibration/coarse_amip/output*/ # Output output/ diff --git a/config/atmos_configs/climaatmos_wx_progedmf.yml b/config/atmos_configs/climaatmos_wx_progedmf.yml index a84069a25d..5c2ed67aa7 100644 --- a/config/atmos_configs/climaatmos_wx_progedmf.yml +++ b/config/atmos_configs/climaatmos_wx_progedmf.yml @@ -51,4 +51,4 @@ moist: equil cloud_model: "quadrature_sgs" use_itime: true -deep_atmosphere: false \ No newline at end of file +deep_atmosphere: false diff --git a/config/subseasonal_configs/wxquest_diagedmf.yml b/config/subseasonal_configs/wxquest_diagedmf.yml index 18e1a27171..6d02b13cd3 100644 --- a/config/subseasonal_configs/wxquest_diagedmf.yml +++ b/config/subseasonal_configs/wxquest_diagedmf.yml @@ -1,36 +1,33 @@ FLOAT_TYPE: "Float32" albedo_model: "CouplerAlbedo" atmos_config_file: "config/atmos_configs/climaatmos_wx_diagedmf.yml" -coupler_toml: ["toml/amip.toml"] -mode_name: "subseasonal" -era5_initial_condition_dir: "/net/sampo/data1/wxquest_data/initial_conditions" +coupler_toml: ["toml/amip.toml", "toml/wxquest_diagedmf.toml"] initial_condition: "WeatherModel" -checkpoint_dt: "7days" +checkpoint_dt: "93days" co2_model: "fixed" -dt: "30secs" -dt_cpl: "30secs" +coupler_output_dir: "output_quick" +dt: "90secs" +dt_cpl: "90secs" energy_check: false h_elem: 16 - -### integrated land ### -# land_model: "integrated" - -### bucket land ### +mode_name: "subseasonal" +era5_initial_condition_dir: /net/sampo/data1/wxquest_data/initial_conditions land_model: "bucket" bucket_albedo_type: "map_temporal" -bucket_initial_condition: "/net/sampo/data1/wxquest_data/initial_conditions/era5_bucket_processed_20250907_0000.nc" - land_spun_up_ic: false land_temperature_anomaly: "nothing" -use_land_diagnostics: true radiation_reset_rng_seed: true -start_date: "20250907" +start_date: "20180901" surface_setup: "PrescribedSurface" topo_smoothing: true topography: "Earth" -t_end: "7days" +t_end: "700days" netcdf_output_at_levels: true +output_default_diagnostics: false +use_coupler_diagnostics: false +use_land_diagnostics: true +strict_params: false extra_atmos_diagnostics: - - short_name: [tas, mslp, pr, ua, va, rhoa, pfull, hur, hus, clw, cli, clivi, clwvi, cl, arup, tke] - period: 1hours - reduction_time: average \ No newline at end of file + - short_name: [hfls, hfss, mslp, tas, pr, ta, pfull, hur, rsut, rsds, rsus, rsuscs, rsutcs, rlutcs, rlut, rlds, rlus, rlucs, ts, evspsbl, hfes] + period: 1months + reduction_time: average diff --git a/experiments/calibration/README.md b/experiments/calibration/README.md index 7f626b5109..7cb3f0f8cd 100644 --- a/experiments/calibration/README.md +++ b/experiments/calibration/README.md @@ -1,18 +1,10 @@ # ClimaCoupler Calibration Experiments -This folder contains a trivial perfect-model calibration of the atmosphere coupled with the bucket model. -The calibration uses 30-day and lat/lon averages of top-of-atmosphere shortwave -radiation to calibrate the `total_solar_irradiance` parameter in a perfect model setting. -The current run script uses the `ClimaCalibrate.SlurmManager` to add Slurm workers - which run each ensemble member in parallel. - -To run this calibration on a Slurm cluster, ensure that `run_calibration.sh` is -configured for your cluster and run `sbatch run_calibration.sh`. The output will -be generated in `experiments/calibration/output`. +This folder contains coupled calibration experiment scripts. +The `coarse_amip` calibration uses the October monthly surface fluxes to calibrate the inverse entrainment timescale. Components: -- run_calibration.sh: SBATCH script used to instantiate the project and run the calibration on a Slurm cluster. -- run_calibration.jl: Julia script for the overall calibration and postprocessing. Contains the expriment configuration, such as ensemble size and number of iterations. +- generate_observations.jl: Script to generate observations for the calibration pipeline +- run_calibration.jl: Script for the overall calibration and postprocessing. Contains the expriment configuration, such as ensemble size and number of iterations. - model_interface.jl: Contains `forward_model`, the function that gets run during calibration. This basically just uses the `setup_run` function. -- model_config.yml: Contains the configuration for the coupler -- \ No newline at end of file +- observation_map.jl: Contains the loss function and generalized plotting utilities. diff --git a/experiments/calibration/api.jl b/experiments/calibration/api.jl new file mode 100644 index 0000000000..73699efc7c --- /dev/null +++ b/experiments/calibration/api.jl @@ -0,0 +1,154 @@ +import Dates + +""" + struct CalibrateConfig{SPINUP <: Dates.Period, EXTEND <: Dates.Period} + short_names::Vector{String} + minibatch_size::Int64 + n_iterations::Int64 + sample_date_ranges::Vector{NTuple{2, DATE}} + extend::EXTEND + spinup::SPINUP + nelements::Tuple{Int64, Int64} + output_dir::String + rng_seed::Int64 + end + +A configuration struct for keeping track of multiple fields that are of interest +to a user running calibration, or that are needed in multiple places (e.g., for +ensemble members and generating observations). +""" +struct CalibrateConfig{SPINUP <: Dates.Period, EXTEND <: Dates.Period} + "Configuration file to use for ClimaCoupler simulation" + config_file::String + + "The short names of the observations used for calibration. The short names + should match the same names used for the diagnostics." + short_names::Vector{String} + + "The size of the minibatch for each iteration" + minibatch_size::Int64 + + "The number of iterations to run the calibration for" + n_iterations::Int64 + + "The date ranges of the samples for calibration and used to determine the + start and end dates of a simulation for each iteration of calibration" + sample_date_ranges::Vector{NTuple{2, Dates.DateTime}} + + "The amount of time to run a simulation after the last date of the + minibatch" + extend::EXTEND + + "The amount of time to run a simulation before the first date of the + minibatch" + spinup::SPINUP + + "The directory to store the iterations and members of the calibration." + output_dir::String + + "An integer value for ensuring calibrations are the same between multiple + calibrations with the same settings" + rng_seed::Int64 +end + +""" + CalibrateConfig(; + config_file, + short_names, + sample_date_ranges, + extend, + spinup = Dates.Month(3), + minibatch_size, + n_iterations, + output_dir = "calibration/weatherquest",, + rng_seed = 42, + ) + +Initializes a CalibrateConfig, which is of interest to a user running +calibration or contains values needed in multiple places during calibration. + +Keyword arguments +===================== + +- `config_file`: Configuration file to use for ClimaCoupler simulation. + +- `short_names`: Short names of the observations. The currently supported short + names are `pr`, `tas`, and `mslp`. + +- `minibatch_size`: The size of the minibatch for each iteration. + +- `n_iterations`: The number of iterations to run the calibration for. + +- `sample_date_ranges`: The date ranges for each sample. The dates should be the + same as found in the time series data of the observations. + +- `extend`: The amount of time to run the simulation after the end date + determined by `sample_date_ranges`. For seasonal averages, `extend` should be + `Dates.Month(3)` and for monthly averages, `extend` should be + `Dates.Month(1)`. + +- `spinup`: The amount of time to run the simulation before the start date + determined by `sample_date_ranges`. + +- `nelements`: The resolution of the model. This is also used to determine the + mask of the observations. + +- `output_dir`: The location to save the calibration at. + +- `rng_seed`: An integer to ensure that calibration runs with the same settings + are the same. +""" +function CalibrateConfig(; + config_file, + short_names, + minibatch_size, + n_iterations, + sample_date_ranges, + extend, + spinup = Dates.Month(3), + output_dir = "calibration/weatherquest", + rng_seed = 42, +) + isempty(short_names) && error("Cannot run calibration with no short names") + isempty(sample_date_ranges) && + error("Cannot run calibration with no date ranges for the samples") + + sample_date_ranges = [ + (Dates.DateTime(date_pair[1]), Dates.DateTime(date_pair[2])) for + date_pair in sample_date_ranges + ] + + for (start_date, stop_date) in sample_date_ranges + start_date <= stop_date || error( + "The start date ($start_date) should be before the stop date ($stop_date)", + ) + end + issorted(sample_date_ranges) || + error("The samples in $sample_date_ranges should be sorted") + + minibatch_size > 0 || error("The minibatch size ($minibatch_size) should be positive") + n_iterations > 0 || error("The number of iterations ($n_iterations) should be positive") + + num_samples = length(sample_date_ranges) + minibatch_size > num_samples && error( + "The minibatch size is $minibatch_size, but the number of samples is $num_samples", + ) + + remaining = num_samples % minibatch_size + remaining == 0 || @warn( + "Number of samples is not divisible by the minibatch size; the last $remaining samples may be missing when running the calibration" + ) + + return CalibrateConfig( + config_file, + short_names, + minibatch_size, + n_iterations, + sample_date_ranges, + extend, + spinup, + output_dir, + rng_seed, + ) + +end diff --git a/experiments/calibration/coarse_amip/generate_observations.jl b/experiments/calibration/coarse_amip/generate_observations.jl new file mode 100644 index 0000000000..38bbd5495e --- /dev/null +++ b/experiments/calibration/coarse_amip/generate_observations.jl @@ -0,0 +1,145 @@ +import ClimaAnalysis +import ClimaAnalysis: OutputVar +import ClimaCalibrate +import Dates +import EnsembleKalmanProcesses as EKP +import JLD2 +import ClimaDiagnostics +import ClimaCore +import ClimaUtilities.ClimaArtifacts.@clima_artifact +# Access CalibrateConfig +include(joinpath(@__DIR__, "run_calibration.jl")) +include(joinpath(@__DIR__, "observation_utils.jl")) + +""" + load_var(filepath, short_name; varname=nothing, flip_sign=false, transform_dates=false) + +Helper function to load and process an ERA5 variable with common transformations. +""" +function load_var( + filepath, + short_name; + varname = nothing, + flip_sign = false, + transform_dates = nothing, +) + # TODO: Don't assume monthly data + var = + isnothing(varname) ? OutputVar(filepath) : + OutputVar(filepath, varname; shift_by = Dates.firstdayofmonth) + + flip_sign && (var = -var) + + var.attributes["short_name"] = short_name + + if !isnothing(transform_dates) + var = ClimaAnalysis.Var.transform_dates(var, date -> date - transform_dates) + end + + if issorted(latitudes(var)) + var = reverse_dim(var, latitude_name(var)) + end + + return var +end + +""" + load_vars(obsdir, short_names) + +Load NetCDF files belonging to `short_names` in `obsdir` as `OutputVar`s. +""" +function load_vars() + # TODO: Use era5_monthly_averages_surface_single_level_1979_2024 artifact + flux_file = joinpath( + @clima_artifact("era5_monthly_averages_surface_single_level_1979_2024"), + "era5_monthly_averages_surface_single_level_197901-202410.nc", + ) + + lhf = load_var(flux_file, "hfls"; varname = "mslhf") + shf = load_var(flux_file, "hfss"; varname = "msshf") + rsus = load_var(flux_file, "rsus"; varname = "msuwswrf") + rlus = load_var(flux_file, "rlus"; varname = "msuwlwrf") + + return [lhf, shf, rsus, rlus] +end + +""" + preprocess_vars(vars) + +Preprocess each OutputVar in `vars` by keeping the relevant dates in +`sample_date_ranges`. +""" +function preprocess_vars(vars) + resample_var = resampled_lonlat(CALIBRATE_CONFIG.config_file) + vars = map(vars) do var + var = resample_var(var) + var = set_units(var, var_units[short_name(var)]) + var = apply_oceanmask(var) + var = remove_global_mean(var) + end + + return vars +end + +function make_observation_vector(vars, sample_date_ranges) + covar_estimator = ClimaCalibrate.ObservationRecipe.ScalarCovariance(; + scalar = 5.0, + use_latitude_weights = true, + ) + obs_vec = map(sample_date_ranges) do sample_date_range + ClimaCalibrate.ObservationRecipe.observation( + covar_estimator, + vars, + first(sample_date_range), + last(sample_date_range), + ) + end + return obs_vec +end + +""" + resampled_lonlat(config_file) + +Return a function to resample longitude and latitudes according to the model +grid specified by `config_file`. +""" +function resampled_lonlat(config_file) + config_dict = get_coupler_config_dict(CALIBRATE_CONFIG.config_file) + if !isnothing(config_dict["netcdf_interpolation_num_points"]) + (nlon, nlat, nlev) = tuple(config_dict["netcdf_interpolation_num_points"]...) + else + cs = CoupledSimulation(config_file) + center_space = cs.model_sims.atmos_sim.domain.center_space + (nlon, nlat, nlev) = ClimaDiagnostics.Writers.default_num_points(center_space) + stretch = center_space.grid.vertical_grid.topology.mesh.stretch + dz_bottom = center_space.grid.vertical_grid.topology.mesh.faces[2].z + z = range(dz_bottom, ClimaCore.Spaces.z_max(center_space), nlev) + end + lon = range(-180, 180, nlon) + lat = range(-90, 90, nlat) + # TODO: Generalize to 3D vars, account for stretch for 3D variables + return var -> resampled_as(var; lon, lat) +end + +if abspath(PROGRAM_FILE) == @__FILE__ + ENV["CLIMACOMMS_CONTEXT"] = "SINGLETON" + sample_date_ranges = CALIBRATE_CONFIG.sample_date_ranges + short_names = CALIBRATE_CONFIG.short_names + config_file = CALIBRATE_CONFIG.config_file + @info "Generating observations for $short_names" + @info "The number of samples is $(length(sample_date_ranges)) over $sample_date_ranges" + + unprocessed_vars = load_vars() + + preprocessed_vars = preprocess_vars(unprocessed_vars) + + JLD2.save_object( + joinpath(pkgdir(ClimaCoupler), "experiments/calibration/preprocessed_vars.jld2"), + preprocessed_vars, + ) + observation_vector = make_observation_vector(preprocessed_vars, sample_date_ranges) + JLD2.save_object( + joinpath(pkgdir(ClimaCoupler), "experiments/calibration/obs_vec.jld2"), + observation_vector, + ) +end diff --git a/experiments/calibration/coarse_amip/model_interface.jl b/experiments/calibration/coarse_amip/model_interface.jl new file mode 100644 index 0000000000..20b3320ba6 --- /dev/null +++ b/experiments/calibration/coarse_amip/model_interface.jl @@ -0,0 +1,49 @@ +ENV["CLIMACOMMS_DEVICE"] = "CUDA" +ENV["CLIMACOMMS_CONTEXT"] = "SINGLETON" +import ClimaCoupler +import ClimaCalibrate +import CUDA +import EnsembleKalmanProcesses as EKP +include(joinpath(pkgdir(ClimaCoupler), "experiments", "ClimaEarth", "setup_run.jl")) +include( + joinpath( + pkgdir(ClimaCoupler), + "experiments", + "calibration", + "coarse_amip", + "run_calibration.jl", + ), +) +using Pkg +function ClimaCalibrate.forward_model(iter, member) + Pkg.status() + config_dict = get_coupler_config_dict(CALIBRATE_CONFIG.config_file) + output_dir_root = CALIBRATE_CONFIG.output_dir + start_date = + first(CALIBRATE_CONFIG.sample_date_ranges[iter + 1]) - CALIBRATE_CONFIG.spinup + start_date_str = replace(string(Date(start_date)), "-" => "") + end_date = last(CALIBRATE_CONFIG.sample_date_ranges[iter + 1]) + CALIBRATE_CONFIG.extend + sim_length = Second(end_date - start_date) + + config_dict["start_date"] = start_date_str + config_dict["bucket_initial_condition"] = "/net/sampo/data1/wxquest_data/initial_conditions/era5_bucket_processed_$(start_date_str)_0000.nc" + config_dict["t_end"] = "$(sim_length.value)secs" + + # Set member parameter file + sampled_parameter_file = ClimaCalibrate.parameter_path(output_dir_root, iter, member) + if haskey(config_dict, "coupler_toml") + config_dict["coupler_toml"] = + [config_dict["coupler_toml"]..., sampled_parameter_file] + else + config_dict["coupler_toml"] = [sampled_parameter_file] + end + # Set member output directory + member_output_dir = + ClimaCalibrate.path_to_ensemble_member(output_dir_root, iter, member) + config_dict["coupler_output_dir"] = member_output_dir + + @info "Simulation dates" start_date end_date + setup_and_run(config_dict) + @info "Completed member $member" + return nothing +end diff --git a/experiments/calibration/coarse_amip/observation_map.jl b/experiments/calibration/coarse_amip/observation_map.jl new file mode 100644 index 0000000000..4fdf787f98 --- /dev/null +++ b/experiments/calibration/coarse_amip/observation_map.jl @@ -0,0 +1,281 @@ +using Statistics +import JLD2 +# import GeoMakie +# import Makie +import Dates +using ClimaAnalysis +import ClimaCalibrate +import ClimaAnalysis.Utils: kwargs as ca_kwargs +import ClimaCoupler +import ClimaCalibrate: EnsembleBuilder + +include(joinpath(@__DIR__, "observation_utils.jl")) + +""" + ClimaCalibrate.observation_map(iteration) + +Return G ensemble for an `iteration`. + +G ensemble represents the concatenated forward model evaluations from all +ensemble members, arranged horizontally. Each individual forward model +evaluation corresponds to preprocessed, flattened simulation data from a single +ensemble member that has been matched to the corresponding observational data. +""" +function ClimaCalibrate.observation_map(iteration) + output_dir = CALIBRATE_CONFIG.output_dir + ekp = JLD2.load_object(ClimaCalibrate.ekp_path(output_dir, iteration)) + + g_ens_builder = EnsembleBuilder.GEnsembleBuilder(ekp) + + for m in 1:EKP.get_N_ens(ekp) + member_path = ClimaCalibrate.path_to_ensemble_member(output_dir, iteration, m) + simdir_path = joinpath(member_path, "wxquest_diagedmf/output_active") + @info "Processing member $m: $simdir_path" + try + process_member_data!(g_ens_builder, simdir_path, m, iteration) + catch e + @error "Ensemble member $m failed" exception = (e, catch_backtrace()) + EnsembleBuilder.fill_g_ens_col!(g_ens_builder, m, NaN) + end + end + if count(isnan, g_ens_builder.g_ens) > 0.9 * length(g_ens_builder.g_ens) + error("Too many NaNs") + end + return g_ens_builder.g_ens +end + +""" + process_member_data!(diagnostics_folder_path, short_names, current_minibatch) + +Process the data of a single ensemble member and return a single column of the +G ensemble matrix. +""" +function process_member_data!(g_ens_builder, diagnostics_folder_path, col_idx, iteration) + short_names = EnsembleBuilder.missing_short_names(g_ens_builder, col_idx) + sample_date_ranges = CALIBRATE_CONFIG.sample_date_ranges[iteration + 1] + @info "Short names: $short_names" + + simdir = ClimaAnalysis.SimDir(diagnostics_folder_path) + for short_name in short_names + var = get_var(short_name, simdir) + var = preprocess_var(var, sample_date_ranges) + + EnsembleBuilder.fill_g_ens_col!(g_ens_builder, col_idx, var; verbose = true) + end + + return nothing +end + +function largest_period(sample_date_range) + span = maximum(sample_date_range) - minimum(sample_date_range) + span = Millisecond(span) + span.value == 0 && return Month(1) + day_in_ms = 8.64e7 + period = + span.value >= day_in_ms * 365 ? Year(1) : + span.value >= day_in_ms * 30 ? Month(1) : + span.value >= day_in_ms * 7 ? Week(1) : Day(1) + return period +end + +function get_var(short_name, simdir) + if short_name == "tas - ta" + tas = get(simdir; short_name = "tas") + ta = get(simdir; short_name = "ta") + # TODO: figure out why this doesn't work + # pfull = get(simdir; short_name = "pfull") + # ta_900 = ClimaAnalysis.Atmos.to_pressure_coordinates(ta, pfull; target_pressure=[900]) + ta_900hpa = slice(ta; z = 1000) + var = tas - ta_900hpa + else + # TODO: support multiple periods/reductions of same variable + var = get(simdir; short_name) + end + + var.attributes["short_name"] = short_name + return var +end + +""" + preprocess_var(var::ClimaAnalysis.OutputVar, reference_date) + +Preprocess `var` before flattening for G ensemble matrix. + +For "pr", weekly sums are computed. For "tas" and "mslp", weekly means are +computed from daily means. The daily means are computing starting from +`reference_date`. + +This function assumes that the data is monthly. +""" +function preprocess_var(var, sample_date_range) + period = largest_period(sample_date_range) + var = ClimaAnalysis.Var.transform_dates(var, date -> date - period) + var = set_units(var, var_units[short_name(var)]) + # TODO: Match dates instead of just windowing + var = window(var, "time"; left = sample_date_range[1], right = sample_date_range[2]) + + var = apply_oceanmask(var) + var = remove_global_mean(var) + return var +end + +""" + ClimaCalibrate.analyze_iteration(ekp, + g_ensemble, + prior, + output_dir, + iteration) + +Analyze an iteration by plotting the bias plots, constrained parameters over +iterations, and errors over iterations and time. +""" +function ClimaCalibrate.analyze_iteration(ekp, g_ensemble, prior, output_dir, iteration) + plot_output_path = ClimaCalibrate.path_to_iteration(output_dir, iteration) + plot_constrained_params_and_errors(plot_output_path, ekp, prior) + + simdir = SimDir(ClimaCalibrate.path_to_ensemble_member(output_dir, iteration, 1)) + plot_bias(ekp, simdir, iteration; output_dir = plot_output_path) + + @info "Ensemble spread: $(scalar_spread(ekp))" +end + +""" + plot_constrained_params_and_errors(output_dir, ekp, prior) + +Plot the constrained parameters and errors from `ekp` and `prior` and save +them to `output_dir`. +""" +function plot_constrained_params_and_errors(output_dir, ekp, prior) + dim_size = sum(length.(EKP.batch(prior))) + fig = CairoMakie.Figure(size = ((dim_size + 1) * 500, 500)) + for i in 1:dim_size + EKP.Visualize.plot_ϕ_over_iters(fig[1, i], ekp, prior, i) + end + EKP.Visualize.plot_error_over_iters(fig[1, dim_size + 1], ekp, error_metric = "loss") + EKP.Visualize.plot_error_over_time(fig[1, dim_size + 2], ekp, error_metric = "loss") + CairoMakie.save(joinpath(output_dir, "constrained_params_and_error.png"), fig) + return nothing +end + +bias_plot_extrema = Dict( + "tas" => (-6, 6), + "tas - ta" => (-6, 6), + "hfls" => (-50, 50), + "hfss" => (-25, 25), + "rsus" => (-50, 50), + "rlus" => (-50, 50), + "mslp" => (-1000, 1000), + "pr" => (-1e-4, 1e-4), +) + +""" + plot_bias(ekp, simdir, iteration; output_dir) + +Plot bias maps comparing simulation output to observations for all variables in +`CALIBRATE_CONFIG.short_names`. + +Uses observations from the EKP object via `reconstruct_vars` and compares +them against simulation variables for each date in the sample date ranges. +""" +function plot_bias(ekp, simdir, iteration; output_dir = simdir.simulation_path) + # Get observations for this iteration + sample_date_ranges = CALIBRATE_CONFIG.sample_date_ranges[iteration + 1] + obs_series = EKP.get_observation_series(ekp) + minibatch_obs = ClimaCalibrate.ObservationRecipe.get_observations_for_nth_iteration( + obs_series, + iteration + 1, + ) + + # Reconstruct OutputVars from observations (ERA5 data) + era5_vars = [] + for obs in minibatch_obs + obs_vars = ClimaCalibrate.ObservationRecipe.reconstruct_vars(obs) + append!(era5_vars, obs_vars) + end + + # Get simulation variables for all short_names + sim_vars = [] + for short_name in CALIBRATE_CONFIG.short_names + var = get_var(short_name, simdir) + var = preprocess_var(var, sample_date_ranges) + push!(sim_vars, var) + end + + # Match sim_vars with era5_vars by short_name + var_pairs = [] + for sim_var in sim_vars + sim_short_name = ClimaAnalysis.short_name(sim_var) + era5_idx = findfirst(v -> ClimaAnalysis.short_name(v) == sim_short_name, era5_vars) + if !isnothing(era5_idx) + push!(var_pairs, (sim_var, era5_vars[era5_idx])) + else + @warn "No ERA5 data found for $(sim_short_name)" + end + end + + if isempty(var_pairs) + @warn "No matching variable pairs found for bias plotting" + return nothing + end + + # Get sample dates for this iteration + sample_dates = unique(CALIBRATE_CONFIG.sample_date_ranges[iteration + 1]) + + # Create figure + fig = GeoMakie.Figure(size = (1500, 500 * length(var_pairs))) + for (j, date) in enumerate(sample_dates) + for (i, (sim_var, era5_var)) in enumerate(var_pairs) + sim_var_t = slice(sim_var, time = date) + era5_var_t = slice(era5_var, time = date) + + # Calculate biases + global_bias = ClimaAnalysis.global_bias(sim_var_t, era5_var_t) + global_mean = weighted_average_lonlat(sim_var_t).data[1] + relative_global_bias = global_bias / global_mean + + land_bias = ClimaAnalysis.global_bias( + sim_var_t, + era5_var_t; + mask = ClimaAnalysis.apply_oceanmask, + ) + land_mean = + weighted_average_lonlat(ClimaAnalysis.apply_oceanmask(sim_var_t)).data[1] + relative_land_bias = land_bias / land_mean + + ocean_bias = ClimaAnalysis.global_bias( + sim_var_t, + era5_var_t; + mask = ClimaAnalysis.apply_landmask, + ) + ocean_mean = + weighted_average_lonlat(ClimaAnalysis.apply_landmask(sim_var_t)).data[1] + relative_ocean_bias = ocean_bias / ocean_mean + + @info short_name(sim_var_t) relative_global_bias global_bias global_mean + @info short_name(sim_var_t) relative_land_bias land_bias land_mean + @info short_name(sim_var_t) relative_ocean_bias ocean_bias ocean_mean + + cmap_extrema = + get(bias_plot_extrema, short_name(sim_var_t), extrema(sim_var_t.data)) + + # Plot bias + ax = ClimaAnalysis.Visualize.plot_bias_on_globe!( + fig[i, j], + sim_var_t, + era5_var_t; + cmap_extrema, + ) + end + end + + GeoMakie.save(joinpath(output_dir, "bias_sample_dates.png"), fig) + return nothing +end + + +function scalar_spread(ekp) + g_mean_final = EKP.get_g_mean_final(ekp) + g_final = EKP.get_g_final(ekp) + sq_dists = [sum((col .- g_mean_final) .^ 2) for col in eachcol(g_final)] + return mean(sq_dists) +end diff --git a/experiments/calibration/coarse_amip/observation_utils.jl b/experiments/calibration/coarse_amip/observation_utils.jl new file mode 100644 index 0000000000..ef15ba77b6 --- /dev/null +++ b/experiments/calibration/coarse_amip/observation_utils.jl @@ -0,0 +1,25 @@ +import ClimaCoupler +using Statistics +import Dates +using ClimaAnalysis + +include(joinpath(pkgdir(ClimaCoupler), "experiments/ClimaEarth/setup_run.jl")) +ext = Base.get_extension(ClimaCalibrate, :ClimaAnalysisExt) + + +var_units = Dict( + "pr" => "kg m^-2 s^-1", + "mslp" => "Pa", + "tas" => "K", + "tas - ta" => "K", + "hfls" => "W m^-2", + "hfss" => "W m^-2", + "rsus" => "W m^-2", + "rlus" => "W m^-2", +) + +function remove_global_mean(var) + mean_var = ClimaAnalysis.Var.average_lonlat(var; weighted = true) + mean_data = mean_var.data + return ClimaAnalysis.replace(val -> val - mean_data[1], var) +end diff --git a/experiments/calibration/coarse_amip/run_calibration.jl b/experiments/calibration/coarse_amip/run_calibration.jl new file mode 100644 index 0000000000..638d256627 --- /dev/null +++ b/experiments/calibration/coarse_amip/run_calibration.jl @@ -0,0 +1,96 @@ +using Dates +using Distributed +import Random +import ClimaCalibrate +import ClimaAnalysis +import ClimaComms +import ClimaCoupler +import EnsembleKalmanProcesses as EKP +import EnsembleKalmanProcesses.ParameterDistributions as PD +import JLD2 + + +include(joinpath(pkgdir(ClimaCoupler), "experiments", "calibration", "api.jl")) +include( + joinpath( + pkgdir(ClimaCoupler), + "experiments/calibration/coarse_amip/observation_map.jl", + ), +) +model_interface = joinpath( + pkgdir(ClimaCoupler), + "experiments", + "calibration", + "coarse_amip", + "model_interface.jl", +) + +years = 2018:2021 +sample_date_ranges = [(DateTime(yr, 9, 1), DateTime(yr, 9, 1)) for yr in years] +const CALIBRATE_CONFIG = CalibrateConfig(; + config_file = joinpath( + pkgdir(ClimaCoupler), + "config/subseasonal_configs/wxquest_diagedmf.yml", + ), + short_names = ["hfls", "hfss", "rsus", "rlus"], + minibatch_size = 1, + n_iterations = 3, + sample_date_ranges, + extend = Dates.Month(1), + spinup = Dates.Month(0), + output_dir = "output/output_quick", + rng_seed = 42, +) + +if abspath(PROGRAM_FILE) == @__FILE__ + priors = [PD.constrained_gaussian("entr_inv_tau", 0.1, 0.08, 0, 1)] + prior = EKP.combine_distributions(priors) + + observation_vector = JLD2.load_object( + joinpath(pkgdir(ClimaCoupler), "experiments/calibration/obs_vec.jld2"), + ) + + sample_date_ranges = CALIBRATE_CONFIG.sample_date_ranges + minibatch_size = CALIBRATE_CONFIG.minibatch_size + obs_series = EKP.ObservationSeries( + Dict( + "observations" => observation_vector, + "names" => [ + string(Dates.year(start_date)) for + (start_date, stop_date) in sample_date_ranges + ], + "minibatcher" => ClimaCalibrate.minibatcher_over_samples( + length(observation_vector), + minibatch_size, + ), + ), + ) + + rng_seed = CALIBRATE_CONFIG.rng_seed + rng = Random.MersenneTwister(rng_seed) + + ekp = EKP.EnsembleKalmanProcess( + obs_series, + EKP.TransformUnscented(prior, impose_prior = true); + verbose = true, + rng, + scheduler = EKP.DataMisfitController(terminate_at = 100), + ) + + hpc_kwargs = ClimaCalibrate.kwargs(gpus = 1, time = 60 * 6) + verbose = true + backend = ClimaCalibrate.ClimaGPUBackend( + ClimaCalibrate.project_dir(), + model_interface, + hpc_kwargs, + verbose, + ) + + eki = ClimaCalibrate.calibrate( + backend, + ekp, + CALIBRATE_CONFIG.n_iterations, + prior, + CALIBRATE_CONFIG.output_dir, + ) +end diff --git a/experiments/calibration/model_config.yml b/experiments/calibration/model_config.yml deleted file mode 100644 index 5904454d84..0000000000 --- a/experiments/calibration/model_config.yml +++ /dev/null @@ -1,36 +0,0 @@ -FLOAT_TYPE: "Float32" -albedo_model: "CouplerAlbedo" -atmos_config_file: "config/atmos_configs/climaatmos_diagedmf.yml" -checkpoint_dt: "90000days" -coupler_toml: ["toml/amip.toml"] -deep_atmosphere: false -dt: "480secs" -dt_cpl: "480secs" -dz_bottom: 100.0 -energy_check: false -h_elem: 4 -bucket_albedo_type: "map_temporal" -mode_name: "amip" -netcdf_interpolation_num_points: [90, 45, 31] -netcdf_output_at_levels: true -output_default_diagnostics: false -use_coupler_diagnostics: false -override_precip_timescale: false -rayleigh_sponge: true -start_date: "20100101" -surface_setup: "PrescribedSurface" -coupler_output_dir: "experiments/calibration/output" -t_end: "30days" -topo_smoothing: true -topography: "Earth" -viscous_sponge: true -z_elem: 39 -z_max: 60000.0 -insolation: timevarying -dt_rad: 6hours -rad: clearsky -extra_atmos_diagnostics: - - reduction_time: average - short_name: rsut - period: 30days - writer: nc diff --git a/experiments/calibration/model_interface.jl b/experiments/calibration/model_interface.jl deleted file mode 100644 index 2813c4f201..0000000000 --- a/experiments/calibration/model_interface.jl +++ /dev/null @@ -1,26 +0,0 @@ -import ClimaCoupler -import ClimaCalibrate -include(joinpath(pkgdir(ClimaCoupler), "experiments", "ClimaEarth", "setup_run.jl")) - -function ClimaCalibrate.forward_model(iter, member) - config_file = - joinpath(pkgdir(ClimaCoupler), "experiments", "calibration", "model_config.yml") - config_dict = get_coupler_config_dict(config_file) - - # Run for a shorter time if SHORT_RUN is set - if SHORT_RUN - config_dict["t_end"] = "480secs" - config_dict["dt_rad"] = "480secs" - map(diag -> diag["period"] = "480secs", config_dict["extra_atmos_diagnostics"]) - end - - output_dir_root = config_dict["coupler_output_dir"] - # Set member parameter file - sampled_parameter_file = ClimaCalibrate.parameter_path(output_dir_root, iter, member) - config_dict["coupler_toml"] = [sampled_parameter_file] - # Set member output directory - member_output_dir = - ClimaCalibrate.path_to_ensemble_member(output_dir_root, iter, member) - config_dict["coupler_output_dir"] = member_output_dir - return setup_and_run(config_dict) -end diff --git a/experiments/calibration/run_calibration.jl b/experiments/calibration/run_calibration.jl deleted file mode 100644 index b9879c7d51..0000000000 --- a/experiments/calibration/run_calibration.jl +++ /dev/null @@ -1,154 +0,0 @@ -using Distributed -import ClimaCalibrate as CAL -using ClimaCalibrate -using ClimaAnalysis -import ClimaAnalysis: SimDir, get, slice, average_xy -import ClimaComms -import EnsembleKalmanProcesses: I, ParameterDistributions.constrained_gaussian -import EnsembleKalmanProcesses as EKP -using Statistics - -# Ensure ClimaComms doesn't use MPI -ENV["CLIMACOMMS_CONTEXT"] = "SINGLETON" -ClimaComms.@import_required_backends - -single_member_dims = 1 -function CAL.observation_map(iteration) - G_ensemble = Array{Float64}(undef, single_member_dims, ensemble_size) - - for m in 1:ensemble_size - member_path = CAL.path_to_ensemble_member(output_dir, iteration, m) - simdir_path = joinpath(member_path, "model_config/output_active/clima_atmos") - if isdir(simdir_path) - simdir = SimDir(simdir_path) - G_ensemble[:, m] .= process_member_data(simdir) - else - @info "No data found for member $m." - G_ensemble[:, m] .= NaN - end - end - return G_ensemble -end - -function process_member_data(simdir::SimDir) - output = zeros(single_member_dims) - days = 86_400 - minutes = 3_600 - - period = SHORT_RUN ? "8m" : "30d" - time = SHORT_RUN ? 8minutes : 30days - rsut = get(simdir; short_name = "rsut", reduction = "average", period) - rsut_slice = slice(average_lon(average_lat(rsut)); time).data - return rsut_slice -end - -addprocs(CAL.SlurmManager()) -# Make variables and the forward model available on the worker sessions -@everywhere import ClimaComms, CUDA, ClimaCoupler -@everywhere import ClimaCalibrate as CAL -@everywhere import JLD2 -@everywhere begin - # Run for a shorter time if SHORT_RUN is set - const SHORT_RUN = haskey(ENV, "SHORT_RUN") ? true : false - - ENV["CLIMACOMMS_DEVICE"] = "CUDA" - ENV["CLIMACOMMS_CONTEXT"] = "SINGLETON" - - experiment_dir = joinpath(pkgdir(ClimaCoupler), "experiments", "calibration") - include(joinpath(experiment_dir, "model_interface.jl")) - output_dir = joinpath(experiment_dir, "output") - obs_path = joinpath(experiment_dir, "observations.jld2") -end - -# Experiment Configuration -n_iterations = 5 -noise = 0.1 * I -prior = constrained_gaussian("total_solar_irradiance", 1000, 500, 250, 2000) - -# Generate observations if needed -if !isfile(obs_path) - import JLD2 - @info "Generating observations" - obs_output_dir = CAL.path_to_ensemble_member(output_dir, 0, 0) - mkpath(obs_output_dir) - touch(joinpath(obs_output_dir, "parameters.toml")) - CAL.forward_model(0, 0) - observations = Vector{Float64}(undef, 1) - observations .= process_member_data( - SimDir(joinpath(obs_output_dir, "model_config/output_active/clima_atmos")), - ) - JLD2.save_object(obs_path, observations) -end -observations = JLD2.load_object(obs_path) -@show observations -u0_mean = [mean(prior)] -uu0_cov = cov(prior) -eki = EKP.EnsembleKalmanProcess( - EKP.ObservationSeries(EKP.Observation(observations, noise, "rsut")), - EKP.Unscented(u0_mean, uu0_cov), -) -ensemble_size = EKP.get_N_ens(eki) - -# Allow 100% failure rate for short run testing -if SHORT_RUN - eki = CAL.calibrate( - CAL.WorkerBackend, - eki, - n_iterations, - prior, - output_dir; - failure_rate = 1, - ) -else - eki = CAL.calibrate(CAL.WorkerBackend, eki, n_iterations, prior, output_dir) -end - -# Postprocessing -import EnsembleKalmanProcesses as EKP -import Statistics: var, mean -using Test -import CairoMakie - -function scatter_plot(eki::EKP.EnsembleKalmanProcess) - f = CairoMakie.Figure(resolution = (800, 600)) - ax = CairoMakie.Axis( - f[1, 1], - ylabel = "Parameter Value", - xlabel = "Top of atmosphere radiative SW flux", - ) - - g = vec.(EKP.get_g(eki; return_array = true)) - params = vec.((EKP.get_ϕ(prior, eki))) - - for (gg, uu) in zip(g, params) - CairoMakie.scatter!(ax, gg, uu) - end - - CairoMakie.vlines!(ax, observations, linestyle = :dash) - - output = joinpath(output_dir, "scatter.png") - CairoMakie.save(output, f) - return output -end - -function param_versus_iter_plot(eki::EKP.EnsembleKalmanProcess) - f = CairoMakie.Figure(resolution = (800, 600)) - ax = CairoMakie.Axis(f[1, 1], ylabel = "Parameter Value", xlabel = "Iteration") - params = EKP.get_ϕ(prior, eki) - for (i, param) in enumerate(params) - CairoMakie.scatter!(ax, fill(i, length(param)), vec(param)) - end - - output = joinpath(output_dir, "param_vs_iter.png") - CairoMakie.save(output, f) - return output -end - -scatter_plot(eki) -param_versus_iter_plot(eki) - -params = EKP.get_ϕ(prior, eki) -spread = map(var, params) - -# Spread should be heavily decreased as particles have converged -SHORT_RUN || @test last(spread) / first(spread) < 0.15 diff --git a/experiments/calibration/run_calibration.sh b/experiments/calibration/run_calibration.sh deleted file mode 100644 index 2f8e6ebb8a..0000000000 --- a/experiments/calibration/run_calibration.sh +++ /dev/null @@ -1,13 +0,0 @@ -#!/bin/bash - -#SBATCH --partition=a3 -#SBATCH --output="run_calibration.txt" -#SBATCH --time=05:00:00 -#SBATCH --ntasks=10 -#SBATCH --gpus-per-task=1 -#SBATCH --cpus-per-task=4 - -julia --project=experiments/ClimaEarth -e 'using Pkg; Pkg.develop(;path="."); Pkg.instantiate(;verbose=true)' - -julia --project=experiments/ClimaEarth experiments/calibration/run_calibration.jl - diff --git a/toml/wxquest_diagedmf.toml b/toml/wxquest_diagedmf.toml new file mode 100644 index 0000000000..36f1cdfa55 --- /dev/null +++ b/toml/wxquest_diagedmf.toml @@ -0,0 +1,50 @@ +[zd_rayleigh] +value = 40000.0 + +[min_area_limiter_scale] +value = 0 + +[zd_viscous] +value = 40000.0 + +[max_area_limiter_scale] +value = 0 + +[entr_param_vec] +value = [-45.97815905276861, -20.29437694493032, -22.269204417686513, 3.470895262420817, 0.8421581647335002, -0.8534958362066539] +type = "float" + +[alpha_rayleigh_w] +value = 10.0 + +[pressure_normalmode_drag_coeff] +value = 0.22685176632582243 +type = "float" + +[mixing_length_Prandtl_number_0] +value = 2.909243754318121 +type = "float" + +[entr_mult_limiter_coeff] +value = 0.4641383877134538 +type = "float" + +[entr_inv_tau] +value = 0.008362201402729065 +type = "float" + +[mixing_length_static_stab_coeff] +value = 0.29736273245468714 +type = "float" + +[pressure_normalmode_buoy_coeff1] +value = 0.024795188237279258 +type = "float" + +[mixing_length_Prandtl_maximum] +value = 4.2 +type = "float" + +[land_bucket_capacity] +value = 0.8 +type = "float"