Skip to content
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ volestipy.egg-info
venv
lp_solve_5.5/
.devcontainer/
.github/dependabot.yml
.github/dependabot.yml.venv/
1 change: 1 addition & 0 deletions .python-version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
3.8.19
169 changes: 154 additions & 15 deletions dingo/PolytopeSampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

# Licensed under GNU LGPL.3, see LICENCE file

# Contributed and/or modified by Iva Janković, as part of Google Summer of Code 2025 program.

import numpy as np
import warnings
Expand All @@ -29,12 +30,12 @@ def __init__(self, metabol_net):
raise Exception("An unknown input object given for initialization.")

self._metabolic_network = metabol_net
self._A = []
self._b = []
self._N = []
self._N_shift = []
self._T = []
self._T_shift = []
self._A = None
self._b = None
self._N = None
self._N_shift = None
self._T = None
self._T_shift = None
self._parameters = {}
self._parameters["nullspace_method"] = "sparseQR"
self._parameters["opt_percentage"] = self.metabolic_network.parameters[
Expand All @@ -46,21 +47,23 @@ def __init__(self, metabol_net):

self._parameters["tol"] = 1e-06
self._parameters["solver"] = None
self._last_hpoly = None

def get_polytope(self):
"""A member function to derive the corresponding full dimensional polytope
and a isometric linear transformation that maps the latter to the initial space.
"""

if (
self._A == []
or self._b == []
or self._N == []
or self._N_shift == []
or self._T == []
or self._T_shift == []
self._A is None
or self._b is None
or self._N is None
or self._N_shift is None
or self._T is None
or self._T_shift is None
):


(
max_flux_vector,
max_objective,
Expand Down Expand Up @@ -166,7 +169,7 @@ def generate_steady_states_no_multiphase(
"""A member function to sample steady states.

Keyword arguments:
method -- An MCMC method to sample, i.e. {'billiard_walk', 'cdhr', 'rdhr', 'ball_walk', 'dikin_walk', 'john_walk', 'vaidya_walk', 'gaussian_hmc_walk', 'exponential_hmc_walk', 'hmc_leapfrog_gaussian', 'hmc_leapfrog_exponential'}
method -- An MCMC method to sample, i.e. {'billiard_walk', 'cdhr', 'rdhr', 'ball_walk', 'dikin_walk', 'john_walk', 'vaidya_walk', 'gaussian_hmc_walk', 'exponential_hmc_walk', 'hmc_leapfrog_gaussian', 'hmc_leapfrog_exponential}
n -- the number of steady states to sample
burn_in -- the number of points to burn before sampling
thinning -- the walk length of the chain
Expand All @@ -190,6 +193,107 @@ def generate_steady_states_no_multiphase(

return steady_states

def generate_steady_states_sb_once(self, n=1000, burn_in=0, sampler="sb", nreflections=None):
"""
Single-phase boundary sampler.
- sampler: "sb"/"shake_and_bake" or "bsb"/"billiard_shake_and_bake"
- nreflections: only for BSB; defaults to ceil(sqrt(d)) if None
"""
import numpy as np

# Build H-polytope from current A, b
self.get_polytope()
P = HPolytope(self._A, self._b)

# Normalize sampler keyword
s = (sampler or "sb").lower()
if s in ("sb", "shake_and_bake"):
method = b"shake_and_bake"
use_bsb = False
elif s in ("bsb", "billiard_shake_and_bake"):
method = b"billiard_shake_and_bake"
use_bsb = True
else:
raise ValueError(f"Unknown sampler '{sampler}'")

d = int(self._A.shape[1])

# IMPORTANT: walk_len must be modest; too large often causes poor mixing or NaNs.
# A robust default is O(sqrt(d)) with a small lower bound.
walk_len = max(5, int(np.sqrt(d)))

# bias_vector must be a 1D array (never None)
bias_vec = np.ones(d, dtype=np.float64)

# Reflections for BSB (if not provided, use ceil(sqrt(d)))
if use_bsb:
nref = int(nreflections) if nreflections is not None else int(np.ceil(np.sqrt(d)))
else:
nref = 0

# Call into Cython in the exact argument order it expects:
# (method, number_of_points, number_of_points_to_burn, walk_len,
# variance_value, bias_vector, solver=None, ess=1000, nreflections)
samples = P.generate_samples(
method,
int(n),
int(burn_in),
int(walk_len),
1.0,
bias_vec,
self._parameters.get("solver"),
0,
int(nref),
)

# Pull diagnostics from C++ (could contain NaN if samples had NaNs)
self._last_hpoly = P

diag_buf = np.empty(5, dtype=np.float64)
P.get_sb_diagnostics(diag_buf)
minESS, maxPSRF, Ncpp_reported, phases, seconds = diag_buf

# Cython returns samples as (N, d); convert to (d, N)
S = samples.T # shape (d, N)

# Sanitize columns: drop any column with NaN/Inf to avoid propagating NaNs
finite_cols = np.isfinite(S).all(axis=0)
if not finite_cols.all():
S = S[:, finite_cols]

N_kept = S.shape[1]
if N_kept == 0:
raise RuntimeError(
"All generated samples are NaN/Inf; check walk_len, inner ball, and constraints."
)

# Map to full steady-state space
steady_states = map_samples_to_steady_states(S, self._N, self._N_shift)

# Diagnostics: report actual kept N; guard PSRF if NaN
diagnostics = {
"minESS": float(minESS),
"maxPSRF": float(maxPSRF) if np.isfinite(maxPSRF) else 1.0,
"N": int(N_kept),
"phases": int(phases),
"seconds": float(seconds) if np.isfinite(seconds) else None,
}

return steady_states, diagnostics

def sb_scaling_ratio(self, tol=1e-10, min_ratio=0.01):
"""
Prosleđuje na C++ get_sb_scaling_ratio preko POSLEDNJEG HPolytope
koji je generisao SB/BSB uzorke. Moraš prethodno pozvati
generate_steady_states_sb_once(...).
"""
if self._last_hpoly is None:
raise RuntimeError("sb_scaling_ratio: nema keširanog HPolytope sa SB/BSB uzorcima; pozovi sampler prvo.")
# ovo zove Cython metodu koja pakuje numpy bafer-e i zove C++ get_sb_scaling_ratio(...)
return self._last_hpoly.sb_scaling_ratio(tol=tol, min_ratio=min_ratio)



@staticmethod
def sample_from_polytope(
A, b, ess=1000, psrf=False, parallel_mmcs=False, num_threads=1, solver=None
Expand Down Expand Up @@ -223,7 +327,7 @@ def sample_from_polytope_no_multiphase(
Keyword arguments:
A -- an mxn matrix that contains the normal vectors of the facets of the polytope row-wise
b -- a m-dimensional vector, s.t. A*x <= b
method -- An MCMC method to sample, i.e. {'billiard_walk', 'cdhr', 'rdhr', 'ball_walk', 'dikin_walk', 'john_walk', 'vaidya_walk', 'gaussian_hmc_walk', 'exponential_hmc_walk', 'hmc_leapfrog_gaussian', 'hmc_leapfrog_exponential'}
method -- An MCMC method to sample, i.e. {'billiard_walk', 'cdhr', 'rdhr', 'ball_walk', 'dikin_walk', 'john_walk', 'vaidya_walk', 'gaussian_hmc_walk', 'exponential_hmc_walk', 'hmc_leapfrog_gaussian', 'hmc_leapfrog_exponential', 'shake_and_bake', 'billiard_shake_and_bake'}
n -- the number of steady states to sample
burn_in -- the number of points to burn before sampling
thinning -- the walk length of the chain
Expand All @@ -239,6 +343,41 @@ def sample_from_polytope_no_multiphase(

samples_T = samples.T
return samples_T

@staticmethod
def sample_from_polytope_sb_once(
A, b, n=1000, burn_in=0, solver=None, sampler="sb"
):
"""
One boundary-sampling phase for polytope A,b using SB or BSB.
Returns (samples_T_d_by_N, diagnostics_dict).
"""
P = HPolytope(A, b)

# choose method inline, no extra vars
method = b"billiard_shake_and_bake" if sampler == "bsb" else b"shake_and_bake"

# call binding with or without solver depending on signature
try:
samples = P.generate_samples(method, int(n), int(burn_in), solver)
except TypeError:
samples = P.generate_samples(method, int(n), int(burn_in))

# shared diagnostics for SB/BSB
diag = np.zeros(5, dtype=np.float64)
P.get_sb_diagnostics(diag)
minESS, maxPSRF, Ncpp, phases, seconds = diag

diagnostics = {
"minESS": float(minESS),
"maxPSRF": float(maxPSRF),
"N": int(Ncpp),
"phases": int(phases),
"seconds": float(seconds) if not np.isnan(seconds) else None,
}
return samples.T, diagnostics



@staticmethod
def round_polytope(
Expand Down Expand Up @@ -352,4 +491,4 @@ def set_tol(self, value):

def set_opt_percentage(self, value):

self._parameters["opt_percentage"] = value
self._parameters["opt_percentage"] = value
Loading
Loading