From 058a27ee5a44e02806499f6aaec0bea703808627 Mon Sep 17 00:00:00 2001 From: Ben Schneiderheinze Date: Tue, 28 Oct 2025 09:01:20 -0400 Subject: [PATCH 1/4] Added EkfdUpdate to KalmanFilter.py and GmekfUpdate to Gmkf.py --- pyest/filters/Gmkf.py | 145 ++++++++++++++++++- pyest/filters/KalmanFilter.py | 264 +++++++++++++++++++++++++++++++++- 2 files changed, 407 insertions(+), 2 deletions(-) diff --git a/pyest/filters/Gmkf.py b/pyest/filters/Gmkf.py index 192fc3f..7a53be0 100644 --- a/pyest/filters/Gmkf.py +++ b/pyest/filters/Gmkf.py @@ -1,7 +1,7 @@ import numpy as np import pyest.gm as pygm from pyest.filters import GaussianMixturePredict, GaussianMixtureUpdate -from pyest.filters import KfdPredict, KfdUpdate, EkfdPredict +from pyest.filters import KfdPredict, KfdUpdate, EkfdPredict, EkfdUpdate from pyest.utils import make_tuple @@ -250,3 +250,146 @@ def lin_gauss_likelihood_agreement(z, m, P, H, R, L=None, h_args=()): h_args = make_tuple(h_args) Hk = H(*h_args) return pygm.eval_mvnpdf(z, Hk@m, Hk@P@Hk.T + LRLt) + +class GmekfUpdate(EkfdUpdate, GaussianMixtureUpdate): + """ Discrete Gaussian Mixture Extended Kalman Filter Update + + Parameters + ---------- + H : ndarray or callable + (nz,nx) measurement Jacobian matrix + z_k = H(tk, xk, *args) @ x. If provided an ndarray instead, H will + automatically be recast as a callable. + R : ndarray + (ny,ny) measurement noise covariance matrix + L : (optional) ndarray + (nz,ny) mapping matrix mapping measurement noise into + measurement space + cov_method : (optional) string + method to use for covariance update. Valid options include 'general' + (default), 'Joseph', 'standard', and 'KWK'. + + Written by Keith LeGrand, March 2019 + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + def __lin_gauss_likelihood_agreement(self, z, zhat, W): + return pygm.eval_mvnpdf(z, zhat, W) + + def lin_gauss_cond_likelihood_prod(self, m, P, z, h_args=(), interm_vals=False): + """ compute the product of the linear Gaussian likelihood function and + another Gaussian pdf + + Parameters + ---------- + m : ndarray + (nx,) prior mean + P : ndarray + (nx,nx) prior covariance matrix + z : ndarray + (nz,) measurement + h_args : (optional) tuple + deterministic parameters to be passed to measurement function + interm_vals : (optional) Boolean + if True, returns intermediate values from computation. False by default + + Returns + ------- + mp : ndarray + (nx,) posterior mean + Pp : ndarray + (nx,nx) posterior state error covariance + q : float + likelihood agreement, :math:`q = N(z; Hm, HPH' + R)` + + If interm_vals is true, additionally returns a dictionary containing: + W : ndarray + (nz,nz) innovatations covariance + C : (ndarray) + (nx,nz) cross-covariance + K : (ndarray) + gain matrix + zhat : (ndarray) + predicted measurement + + """ + mp, Pp, interm = super().update(m, P, z, interm_vals=True, h_args=h_args) + q = self.__lin_gauss_likelihood_agreement(z, interm['zhat'], interm['W']) + + if not interm_vals: + return mp, Pp, q + else: + return mp, Pp, q, interm + + def cond_likelihood_prod(self, m, P, z, h_args=(), interm_vals=False): + """ compute the product of the linear Gaussian likelihood function and + another Gaussian pdf + + Parameters + ---------- + m : ndarray + (nx,) prior mean + P : ndarray + (nx,nx) prior covariance matrix + z : ndarray + (nz,) measurement + h_args : (optional) tuple + deterministic parameters to be passed to measurement function + interm_vals : (optional) Boolean + if True, returns intermediate values from computation. False by default + + Returns + ------- + mp : ndarray + (nx,) posterior mean + Pp : ndarray + (nx,nx) posterior state error covariance + q : float + likelihood agreement, :math:`q = N(z; Hm, HPH' + R)` + + If interm_vals is true, additionally returns a dictionary containing: + W : ndarray + (nz,nz) innovatations covariance + C : ndarray + (nx,nz) cross-covariance + K : ndarray + gain matrix + zhat : (ndarray) + predicted measurement + + """ + return self.lin_gauss_cond_likelihood_prod(m, P, z, h_args=h_args, interm_vals=interm_vals) + + def update(self, pkm, zk, unnormalized=False, h_args=(), *args, **kwargs): + """ measurement-update of gm + + Parameters + ---------- + pkm : GaussianMixture + prior density at time tk + zk : ndarray + (nz,) measurement at time k + unnormalized : optional + if True, returns unnormalized distribution. False by default + h_args : (optional) tuple + deterministic parameters to be passed to measurement function + """ + wkp = np.empty_like(pkm.w, dtype=float) + mkp = np.empty_like(pkm.m, dtype=float) + Pkp = np.empty_like(pkm.P, dtype=float) + for i, (wm,mm,Pm) in enumerate(pkm): + print(f"updating mixand {i+1} out of {len(pkm)}") + mkp[i], Pkp[i], q, interm_vals = self.lin_gauss_cond_likelihood_prod(mm, Pm, zk, h_args=h_args, interm_vals=True) + print(f"Q values: {q}") + print(f"true z: {zk}") + print(f"z_hat: {interm_vals['zhat']}") + # print(f"W: {interm_vals["W"]}") + # TODO: test (z-zhat).T @ inv(W) @ (z-zhat) + wkp[i] = wm*q + + if not unnormalized: + wkp /= np.sum(wkp) + + return pygm.GaussianMixture(wkp, mkp, Pkp) \ No newline at end of file diff --git a/pyest/filters/KalmanFilter.py b/pyest/filters/KalmanFilter.py index 562116e..810f630 100644 --- a/pyest/filters/KalmanFilter.py +++ b/pyest/filters/KalmanFilter.py @@ -625,4 +625,266 @@ def _set_F(self, F): def _get_F(self): return self._F - F = property(_get_F, _set_F) \ No newline at end of file + F = property(_get_F, _set_F) + +class EkfdUpdate(KalmanDiscreteUpdate): + """ EkfdUpdate Discrete extended Kalman filter update + + Parameters + ---------- + R : ndarray + (ny,ny) measurement noise covariance matrix + H : ndarray or callable + (nz,nx) measurement Jacobian matrix + z_k = H(tk, xk, *args) @ x. If provided an ndarray instead, H will + automatically be recast as a callable. + L : (optional) ndarray + (nz,ny) mapping matrix mapping measurement noise into + measurement space + cov_method : (optional) string + method to use for covariance update. Valid options include 'general' + (default), 'Joseph', 'standard', and 'KWK'. + p : (optional) scalar + underweighting factor. p=1 results in no underweighting. p-->0 results + in no covariance update + + Originally written by Keith LeGrand, March 2019, + Adapted for extended Kf by Aditya Desai, September 2025 + """ + + def __init__(self, h, R, H, L=None, h_args=(), cov_method='general', p=None): + + super().__init__(R=R, L=L, p=p) + + #TODO: check cov_update method + #TODO: residual editing + + self.cov_method = cov_method + self.h = h + self.H = H + + def __set_h(self, h): + self._h = h + + def __set_H(self, H): + if isinstance(H, np.ndarray): + H = convert_mat_to_fun(H) + self._H = H + + + def __get_H(self): + return self._H + + def __get_h(self): + return self._h + + H = property(__get_H, __set_H) + h = property(__get_h, __set_h) + + @staticmethod + def gain(C, W): + """ compute filter gain + + Parameters + ---------- + C : ndarray + (nx,nz) cross-covariance + W : ndarray + (nz,nz) innovations covariance + + Returns + ------- + K : ndarray + gain matrix + + """ + return np.linalg.solve(W.T, C.T).T + + def __innovations_cov(self, Pkm, Hk): + """ compute inovations covariance + Parameters + ---------- + Pkm : ndarray + (nx,nx) prior covariance matrix at time k + Hk : ndarray + h_args : (optional) tuple + deterministic parameters to be passed to measurement function + + Returns + ------- + W : ndarray + (nz,nz) innovations covariance + """ + return Hk @ Pkm @ Hk.T + self._LRLt + + def innovations_cov(self, Pkm, h_args=()): + """ compute inovations covariance + Parameters + ---------- + Pkm : ndarray + (nx,nx) prior covariance matrix at time k + h_args : (optional) tuple + deterministic parameters to be passed to measurement function + + Returns + ------- + W : ndarray + (nz,nz) innovations covariance + """ + h_args = make_tuple(h_args) + Hk = self._H(*h_args) + return self.__innovations_cov(Pkm, Hk) + + def __cross_cov(self, Pkm, Hk): + """ compute cross covariance + Parameters + ---------- + Pkm : ndarray + (nx,nx) prior covariance matrix at time k + Hk : ndarray + (nz,nx) measurement matrix + + Returns + ------- + C : ndarray + (nx,nz) cross covariance + """ + return Pkm @ Hk.T + + def cross_cov(self, Pkm, h_args=()): + """ compute cross covariance + Parameters + ---------- + Pkm : ndarray + (nx,nx) prior covariance matrix at time k + h_args : (optional) tuple + deterministic parameters to be passed to measurement function + + Returns + ------- + C : ndarray + (nx,nz) cross covariance + """ + h_args = make_tuple(h_args) + Hk = self._H(*h_args) + return self.__cross_cov(Pkm, Hk) + + + def __expected_meas(self, mkm, h_func): + """ compute measurement expectation + Parameters + ---------- + mkm : ndarray + (nx,) prior mean at time k + Hk : ndarray + (nz,nx) measurement matrix + + Returns + ------- + w : ndarray + (nz,) predicted measurement at time k + """ + return h_func(mkm) + + def expected_meas(self, mkm, h_args=()): + """ compute measurement expectation + Parameters + ---------- + mkm : ndarray + (nx,) prior mean at time k + h_args : (optional) tuple + deterministic parameters to be passed to measurement function + + Returns + ------- + w : ndarray + (nz,) predicted measurement at time k + """ + h_args = make_tuple(h_args) + hk_func = lambda mkm: self._h(mkm, *h_args) + print(hk_func) + return self.__expected_meas(mkm, hk_func) + + def update(self, mkm, Pkm, z, R=None, H=None, h_args=(), interm_vals=False): + """ perform Kalman filter update + + Parameters + ---------- + mkm : ndarray + (nx,) prior mean at time k + Pkm : ndarray + (nx,nx) prior covariance matrix at time k + z : ndarray + (nz,) measurement at time k + R : (optional) ndarray + (ny,ny) measurement noise covariance matrix + H : (optional) ndarray or callable + (nz,nx) measurement Jacobian matrix + z_k = H(tk, xk, *args) @ x. If provided an ndarray instead, H will + automatically be recast as a callable. + h_args : (optional) tuple + deterministic parameters to be passed to measurement function + + Returns + ------- + mkp : ndarray + (nx,) posterior mean + Pkp : ndarray + (nx,nx) posterior state error covariance + + If interm_vals is true, additionally returns a dictionary containing: + W : ndarray + (nz,nz) innovatations covariance + C : ndarray + (nx,nz) cross-covariance + K : ndarray + gain matrix + zhat : ndarray + predicted measurement + + Written by Keith LeGrand, March 2019 + """ + #TODO: check measurement size + if H is not None: + self.H = H + + if R is not None: + self.R = R + + h_args = make_tuple(h_args) + Hk = self._H(mkm, *h_args) + hk_func = lambda mkm: self._h(mkm, *h_args) + + assert(self._H is self.H), "H and _H are not the same somehow..." + # predicted measurement + w = self.__expected_meas(mkm, hk_func) + W = self.__innovations_cov(Pkm, Hk) + C = self.__cross_cov(Pkm, Hk) + + K = KfdUpdate.gain(C, W) + mkp = mkm + K@(z - w) + + if self.cov_method == 'general': + # general form. does not require gain be Kalman gain. gain must + # be linear + Pkp = Pkm - C@K.T - K@C.T + K@W@K.T + elif self.cov_method == 'Joseph': + # Joseph form. does not require gain be Kalman gain. + # measurements must be linear + I = np.eye(self._H.shape[1]) + ImKH = I - K@H + Pkp = ImKH@Pkm@ImKH.T + K@self._LRLt@K.T + elif self.cov_method == 'standard': + # standard form. requires gain be Kalman gain and linear + # measurements + I = np.eye(self._H.shape[1]) + Pkp = (I - K@self._H)@Pkm + elif self.cov_method == 'KWK': + # yet another form. requires gain be Kalman gain but does not + # require linear measurements + Pkp = Pkm - K@W@K.T + + if not interm_vals: + return mkp, Pkp + else: + return mkp, Pkp, {'W':W, 'C':C, 'K':K, 'zhat':w} \ No newline at end of file From e5a0f3b28b374b79fec088dad73c041a61688310 Mon Sep 17 00:00:00 2001 From: Ben Schneiderheinze Date: Tue, 28 Oct 2025 12:43:11 -0400 Subject: [PATCH 2/4] Added tests for EkfdPredict and GmekfPredict to test_filters.py, which both pass. EkfdPredict test checked against manual ekf update, checked that a diagonal H implies a diagonal W, and checked against KfdPredict for linear measurements. GmekfPredict test checked against EkfdPredict for single mixand, checked that weights get heavily scaled down for non-agreeing measurements, and checked against GmkfPredict for linear measurements. --- tests/test_filters.py | 146 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 146 insertions(+) diff --git a/tests/test_filters.py b/tests/test_filters.py index b5dbc82..dd36045 100644 --- a/tests/test_filters.py +++ b/tests/test_filters.py @@ -90,6 +90,76 @@ def f(tkm1, tk, xkm1): return np.array([np.sin(xkm1[1]), xkm1[0]]) npt.assert_array_equal(m_prior, m_des) npt.assert_array_equal(P_prior, P_des) +def test_extendedkalmandiscreteupdate(): + """ test discrete time extended Kalman filter update """ + ## Polar transformation test case + def h(m): return np.array([np.linalg.norm(m), np.arctan2(m[1], m[0])]) + def H(m): return np.array([[m[0]/np.linalg.norm(m), m[1]/np.linalg.norm(m)], + [-m[1]/(m[0]**2*(1+(m[1]/m[0])**2)),1/(m[0]*(1+(m[1]/m[0])**2))]]) + R = np.array([[0.1**2, 0], + [0, 0.1**2]]) + + ekfdupd = filters.EkfdUpdate(h = h, H = H, R = R) + + mkm = np.array([np.cos(np.pi/4), np.sin(np.pi/4)]) + Pkm = np.eye(2) + + # checking that z = h(x) yields no mean update + zk = h(mkm) + mkp, Pkp = ekfdupd.update(mkm, Pkm, zk) + mkp_des = mkm + npt.assert_array_equal(mkp, mkp_des) + + # checking that z =/= h(x) updates the mean accordingly + zk = np.array([np.cos(np.pi/4),0]) + mkp, Pkp = ekfdupd.update(mkm, Pkm, zk) + + # Manual Ekfd update + Wk = H(mkm)@Pkm@H(mkm).T + R + Ck = Pkm@H(mkm).T + Kk = np.linalg.solve(Wk,Ck) + zk_hat = h(mkm) + mkp_des = mkm + Kk@(zk-zk_hat) + Pkp_des = Pkm - Ck@Kk.T - Kk@Ck.T + Kk@Wk@Kk.T + + npt.assert_almost_equal(mkp_des,mkp,decimal=15) + npt.assert_almost_equal(Pkp_des,Pkp,decimal=15) + + ## Diagonal measurement jacobian test case (H diagonal implies W diagonal) + def h(m): return np.array([0.5*m[0]**2, 0.5*m[1]**2]) + def H(m): return np.array([[m[0], 0], + [0, m[1]]]) + R = np.array([[0.1**2, 0], + [0, 0.1**2]]) + ekfdupd = filters.EkfdUpdate(h = h, H = H, R = R) + + mkm = np.array([1, 1]) + Pkm = np.eye(2) + + Wk = ekfdupd.innovations_cov(Pkm, h_args = (mkm)) + off_diag = Wk[~np.eye(Wk.shape[0], dtype=bool)] + + npt.assert_almost_equal(off_diag, 0.0, decimal=15) + + ## Linear measurement test case + def h(m): return np.array([m[1], m[0]]) + H = np.array([[0,1], + [1,0]]) + R = np.array([[0.1**2, 0], + [0, 0.1**2]]) + kfdupd = filters.KfdUpdate(H = H, R = R) + ekfdupd = filters.EkfdUpdate(h = h, H = H, R = R) + + mkm = np.array([1, -1]) + Pkm = np.eye(2) + zk = np.array([-1.1, 0.9]) + + mkp_kfd, Pkp_kfd = kfdupd.update(mkm, Pkm, zk) + mkp_ekfd, Pkp_ekfd = ekfdupd.update(mkm, Pkm, zk) + + npt.assert_almost_equal(mkp_kfd,mkp_ekfd,decimal=15) + npt.assert_almost_equal(Pkp_kfd,Pkp_ekfd,decimal=15) + def test_badunderweighting(): R = np.eye(2) H = np.eye(2) @@ -308,6 +378,82 @@ def test_GmkfUpdate(): rtol=1e-8 ) +def test_GmekfUpdate(): + """ test Gaussian mixture extended Kalman filter update """ + ## Single mixand to make sure it is the same as EkfdUpdate() + def h(m): return np.array([np.linalg.norm(m), np.arctan2(m[1], m[0])]) + def H(m): return np.array([[m[0]/np.linalg.norm(m), m[1]/np.linalg.norm(m)], + [-m[1]/(m[0]**2*(1+(m[1]/m[0])**2)),1/(m[0]*(1+(m[1]/m[0])**2))]]) + R = np.array([[0.1**2, 0], + [0, 0.1**2]]) + + ekfdupd = filters.EkfdUpdate(h = h, H = H, R = R) + gmekfupd = filters.GmekfUpdate(h = h, H = H, R = R) + + mkm = np.array([np.cos(np.pi/4), np.sin(np.pi/4)]) + Pkm = np.eye(2) + + pkm = pygm.GaussianMixture(1,mkm,Pkm) + + zk = np.array([np.cos(np.pi/4), 0]) + + mkp_ekfd, Pkp_ekfd = ekfdupd.update(mkm,Pkm,zk) + pkp = gmekfupd.update(pkm,zk) + mkp_gmekf = pkp.get_m().squeeze() + Pkp_gmekf = pkp.get_P().squeeze() + + npt.assert_almost_equal(mkp_ekfd,mkp_gmekf,decimal=15) + npt.assert_almost_equal(Pkp_ekfd,Pkp_gmekf,decimal=15) + + ## Bimodal distribution with measurement in agreement with only one mixand + m1 = np.array([np.cos(np.pi/4), np.sin(np.pi/4)]) + m2 = np.array([-10,-10]) + P1 = np.eye(2) + P2 = np.eye(2) + pkm = pygm.GaussianMixture([0.5,0.5],[m1,m2],[P1,P2]) + zk = np.array([np.cos(np.pi/4), np.sin(np.pi/4)]) + pkp = gmekfupd.update(pkm,zk) + wkp = pkp.get_w().squeeze() + + npt.assert_almost_equal(wkp[1],0,decimal=15) + + ## Linear measurements to compare against GmkfUpdate() + def h(m): return np.array([m[1], m[0]]) + H = np.array([[0,1], + [1,0]]) + R = np.array([[0.1**2, 0], + [0, 0.1**2]]) + + gmkfupd = filters.GmkfUpdate(H = H, R = R) + gmekfupd = filters.GmekfUpdate(h = h, H = H, R = R) + + wkm = np.array([0.8, + 0.1, + 0.1]) + mkm = np.array([[1,-1], + [0,0], + [1,1]]) + Pkm = np.array([np.eye(2), + np.eye(2), + np.eye(2)]) + pkm = pygm.GaussianMixture(wkm,mkm,Pkm) + + zk = np.array([-1.1, 0.9]) + + pkp_gmkf = gmkfupd.update(pkm, zk) + pkp_gmekf = gmekfupd.update(pkm, zk) + + wkp_gmkf = pkp_gmkf.get_w() + mkp_gmkf = pkp_gmkf.get_m() + Pkp_gmkf = pkp_gmkf.get_P() + + wkp_gmekf = pkp_gmekf.get_w() + mkp_gmekf = pkp_gmekf.get_m() + Pkp_gmekf = pkp_gmekf.get_P() + + npt.assert_almost_equal(wkp_gmkf,wkp_gmekf,decimal=15) + npt.assert_almost_equal(mkp_gmkf,mkp_gmekf,decimal=15) + npt.assert_almost_equal(Pkp_gmkf,Pkp_gmekf,decimal=15) def test_GmukfUpdate(): """ test Gaussian mixture unscented Kalman filter update """ From 762f9f405b7aa8a53c66cfe725fe42b3dc2d10d8 Mon Sep 17 00:00:00 2001 From: Ben Schneiderheinze Date: Tue, 28 Oct 2025 15:24:13 -0400 Subject: [PATCH 3/4] Added comments defining h function to EkfdUpdate and GmekfUpdate --- pyest/filters/Gmkf.py | 2 ++ pyest/filters/KalmanFilter.py | 2 ++ 2 files changed, 4 insertions(+) diff --git a/pyest/filters/Gmkf.py b/pyest/filters/Gmkf.py index 7a53be0..faa33fb 100644 --- a/pyest/filters/Gmkf.py +++ b/pyest/filters/Gmkf.py @@ -256,6 +256,8 @@ class GmekfUpdate(EkfdUpdate, GaussianMixtureUpdate): Parameters ---------- + h : callable + measurement function of the form :math:`h(x, ...)` H : ndarray or callable (nz,nx) measurement Jacobian matrix z_k = H(tk, xk, *args) @ x. If provided an ndarray instead, H will diff --git a/pyest/filters/KalmanFilter.py b/pyest/filters/KalmanFilter.py index 810f630..2ef18c3 100644 --- a/pyest/filters/KalmanFilter.py +++ b/pyest/filters/KalmanFilter.py @@ -632,6 +632,8 @@ class EkfdUpdate(KalmanDiscreteUpdate): Parameters ---------- + h : callable + measurement function of the form :math:`h(x, ...)` R : ndarray (ny,ny) measurement noise covariance matrix H : ndarray or callable From 21b98ccddc3ce49698a77f8af36ff1ef42ea1893 Mon Sep 17 00:00:00 2001 From: Ben Schneiderheinze Date: Wed, 29 Oct 2025 15:53:52 -0400 Subject: [PATCH 4/4] Fixes and clairfications in response to pull request comments. --- pyest/filters/Gmkf.py | 10 ++------ pyest/filters/KalmanFilter.py | 46 +++++++++++++++-------------------- 2 files changed, 22 insertions(+), 34 deletions(-) diff --git a/pyest/filters/Gmkf.py b/pyest/filters/Gmkf.py index faa33fb..a716ad3 100644 --- a/pyest/filters/Gmkf.py +++ b/pyest/filters/Gmkf.py @@ -258,10 +258,8 @@ class GmekfUpdate(EkfdUpdate, GaussianMixtureUpdate): ---------- h : callable measurement function of the form :math:`h(x, ...)` - H : ndarray or callable - (nz,nx) measurement Jacobian matrix - z_k = H(tk, xk, *args) @ x. If provided an ndarray instead, H will - automatically be recast as a callable. + H : callable + (nz,nx) measurement Jacobian matrix of the form :math:`H(x)` R : ndarray (ny,ny) measurement noise covariance matrix L : (optional) ndarray @@ -270,8 +268,6 @@ class GmekfUpdate(EkfdUpdate, GaussianMixtureUpdate): cov_method : (optional) string method to use for covariance update. Valid options include 'general' (default), 'Joseph', 'standard', and 'KWK'. - - Written by Keith LeGrand, March 2019 """ def __init__(self, *args, **kwargs): @@ -315,7 +311,6 @@ def lin_gauss_cond_likelihood_prod(self, m, P, z, h_args=(), interm_vals=False): gain matrix zhat : (ndarray) predicted measurement - """ mp, Pp, interm = super().update(m, P, z, interm_vals=True, h_args=h_args) q = self.__lin_gauss_likelihood_agreement(z, interm['zhat'], interm['W']) @@ -360,7 +355,6 @@ def cond_likelihood_prod(self, m, P, z, h_args=(), interm_vals=False): gain matrix zhat : (ndarray) predicted measurement - """ return self.lin_gauss_cond_likelihood_prod(m, P, z, h_args=h_args, interm_vals=interm_vals) diff --git a/pyest/filters/KalmanFilter.py b/pyest/filters/KalmanFilter.py index 2ef18c3..057ff51 100644 --- a/pyest/filters/KalmanFilter.py +++ b/pyest/filters/KalmanFilter.py @@ -636,10 +636,8 @@ class EkfdUpdate(KalmanDiscreteUpdate): measurement function of the form :math:`h(x, ...)` R : ndarray (ny,ny) measurement noise covariance matrix - H : ndarray or callable - (nz,nx) measurement Jacobian matrix - z_k = H(tk, xk, *args) @ x. If provided an ndarray instead, H will - automatically be recast as a callable. + H : callable + (nz,nx) measurement Jacobian matrix of the form :math:`H(x)` L : (optional) ndarray (nz,ny) mapping matrix mapping measurement noise into measurement space @@ -649,9 +647,6 @@ class EkfdUpdate(KalmanDiscreteUpdate): p : (optional) scalar underweighting factor. p=1 results in no underweighting. p-->0 results in no covariance update - - Originally written by Keith LeGrand, March 2019, - Adapted for extended Kf by Aditya Desai, September 2025 """ def __init__(self, h, R, H, L=None, h_args=(), cov_method='general', p=None): @@ -709,6 +704,7 @@ def __innovations_cov(self, Pkm, Hk): Pkm : ndarray (nx,nx) prior covariance matrix at time k Hk : ndarray + (nz,nx) measurement function Jacobian evaluated at mkm h_args : (optional) tuple deterministic parameters to be passed to measurement function @@ -744,7 +740,7 @@ def __cross_cov(self, Pkm, Hk): Pkm : ndarray (nx,nx) prior covariance matrix at time k Hk : ndarray - (nz,nx) measurement matrix + (nz,nx) measurement function Jacobian evaluated at mkm Returns ------- @@ -753,10 +749,12 @@ def __cross_cov(self, Pkm, Hk): """ return Pkm @ Hk.T - def cross_cov(self, Pkm, h_args=()): + def cross_cov(self, mkm, Pkm, h_args=()): """ compute cross covariance Parameters ---------- + mkm : ndarray + (nx,) prior mean at time k Pkm : ndarray (nx,nx) prior covariance matrix at time k h_args : (optional) tuple @@ -768,7 +766,7 @@ def cross_cov(self, Pkm, h_args=()): (nx,nz) cross covariance """ h_args = make_tuple(h_args) - Hk = self._H(*h_args) + Hk = self._H(mkm,*h_args) return self.__cross_cov(Pkm, Hk) @@ -779,7 +777,7 @@ def __expected_meas(self, mkm, h_func): mkm : ndarray (nx,) prior mean at time k Hk : ndarray - (nz,nx) measurement matrix + (nz,nx) measurement function Jacobian evaluated at mkm Returns ------- @@ -803,12 +801,10 @@ def expected_meas(self, mkm, h_args=()): (nz,) predicted measurement at time k """ h_args = make_tuple(h_args) - hk_func = lambda mkm: self._h(mkm, *h_args) - print(hk_func) - return self.__expected_meas(mkm, hk_func) + return self.h(mkm, *h_args) - def update(self, mkm, Pkm, z, R=None, H=None, h_args=(), interm_vals=False): - """ perform Kalman filter update + def update(self, mkm, Pkm, z, h=None, R=None, H=None, h_args=(), interm_vals=False): + """ perform Extended Kalman filter update Parameters ---------- @@ -818,12 +814,12 @@ def update(self, mkm, Pkm, z, R=None, H=None, h_args=(), interm_vals=False): (nx,nx) prior covariance matrix at time k z : ndarray (nz,) measurement at time k + h : callable + measurement function of the form :math:`h(x, ...)`. If not provided, self.h will be used. R : (optional) ndarray - (ny,ny) measurement noise covariance matrix - H : (optional) ndarray or callable - (nz,nx) measurement Jacobian matrix - z_k = H(tk, xk, *args) @ x. If provided an ndarray instead, H will - automatically be recast as a callable. + (ny,ny) measurement noise covariance matrix. If not provided, self.R will be used. + H : (optional) callable + (nz,nx) measurement Jacobian matrix measurement Jacobian matrix of the form :math:`H(x)`. If not provided, self.H will be used. h_args : (optional) tuple deterministic parameters to be passed to measurement function @@ -843,21 +839,19 @@ def update(self, mkm, Pkm, z, R=None, H=None, h_args=(), interm_vals=False): gain matrix zhat : ndarray predicted measurement - - Written by Keith LeGrand, March 2019 """ #TODO: check measurement size + if h is not None: + self.h = h if H is not None: self.H = H - if R is not None: self.R = R h_args = make_tuple(h_args) - Hk = self._H(mkm, *h_args) + Hk = self.H(mkm, *h_args) hk_func = lambda mkm: self._h(mkm, *h_args) - assert(self._H is self.H), "H and _H are not the same somehow..." # predicted measurement w = self.__expected_meas(mkm, hk_func) W = self.__innovations_cov(Pkm, Hk)