diff --git a/pyest/filters/Gmkf.py b/pyest/filters/Gmkf.py index 192fc3f..a716ad3 100644 --- a/pyest/filters/Gmkf.py +++ b/pyest/filters/Gmkf.py @@ -1,7 +1,7 @@ import numpy as np import pyest.gm as pygm from pyest.filters import GaussianMixturePredict, GaussianMixtureUpdate -from pyest.filters import KfdPredict, KfdUpdate, EkfdPredict +from pyest.filters import KfdPredict, KfdUpdate, EkfdPredict, EkfdUpdate from pyest.utils import make_tuple @@ -250,3 +250,142 @@ def lin_gauss_likelihood_agreement(z, m, P, H, R, L=None, h_args=()): h_args = make_tuple(h_args) Hk = H(*h_args) return pygm.eval_mvnpdf(z, Hk@m, Hk@P@Hk.T + LRLt) + +class GmekfUpdate(EkfdUpdate, GaussianMixtureUpdate): + """ Discrete Gaussian Mixture Extended Kalman Filter Update + + Parameters + ---------- + h : callable + measurement function of the form :math:`h(x, ...)` + H : callable + (nz,nx) measurement Jacobian matrix of the form :math:`H(x)` + R : ndarray + (ny,ny) measurement noise covariance matrix + L : (optional) ndarray + (nz,ny) mapping matrix mapping measurement noise into + measurement space + cov_method : (optional) string + method to use for covariance update. Valid options include 'general' + (default), 'Joseph', 'standard', and 'KWK'. + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + def __lin_gauss_likelihood_agreement(self, z, zhat, W): + return pygm.eval_mvnpdf(z, zhat, W) + + def lin_gauss_cond_likelihood_prod(self, m, P, z, h_args=(), interm_vals=False): + """ compute the product of the linear Gaussian likelihood function and + another Gaussian pdf + + Parameters + ---------- + m : ndarray + (nx,) prior mean + P : ndarray + (nx,nx) prior covariance matrix + z : ndarray + (nz,) measurement + h_args : (optional) tuple + deterministic parameters to be passed to measurement function + interm_vals : (optional) Boolean + if True, returns intermediate values from computation. False by default + + Returns + ------- + mp : ndarray + (nx,) posterior mean + Pp : ndarray + (nx,nx) posterior state error covariance + q : float + likelihood agreement, :math:`q = N(z; Hm, HPH' + R)` + + If interm_vals is true, additionally returns a dictionary containing: + W : ndarray + (nz,nz) innovatations covariance + C : (ndarray) + (nx,nz) cross-covariance + K : (ndarray) + gain matrix + zhat : (ndarray) + predicted measurement + """ + mp, Pp, interm = super().update(m, P, z, interm_vals=True, h_args=h_args) + q = self.__lin_gauss_likelihood_agreement(z, interm['zhat'], interm['W']) + + if not interm_vals: + return mp, Pp, q + else: + return mp, Pp, q, interm + + def cond_likelihood_prod(self, m, P, z, h_args=(), interm_vals=False): + """ compute the product of the linear Gaussian likelihood function and + another Gaussian pdf + + Parameters + ---------- + m : ndarray + (nx,) prior mean + P : ndarray + (nx,nx) prior covariance matrix + z : ndarray + (nz,) measurement + h_args : (optional) tuple + deterministic parameters to be passed to measurement function + interm_vals : (optional) Boolean + if True, returns intermediate values from computation. False by default + + Returns + ------- + mp : ndarray + (nx,) posterior mean + Pp : ndarray + (nx,nx) posterior state error covariance + q : float + likelihood agreement, :math:`q = N(z; Hm, HPH' + R)` + + If interm_vals is true, additionally returns a dictionary containing: + W : ndarray + (nz,nz) innovatations covariance + C : ndarray + (nx,nz) cross-covariance + K : ndarray + gain matrix + zhat : (ndarray) + predicted measurement + """ + return self.lin_gauss_cond_likelihood_prod(m, P, z, h_args=h_args, interm_vals=interm_vals) + + def update(self, pkm, zk, unnormalized=False, h_args=(), *args, **kwargs): + """ measurement-update of gm + + Parameters + ---------- + pkm : GaussianMixture + prior density at time tk + zk : ndarray + (nz,) measurement at time k + unnormalized : optional + if True, returns unnormalized distribution. False by default + h_args : (optional) tuple + deterministic parameters to be passed to measurement function + """ + wkp = np.empty_like(pkm.w, dtype=float) + mkp = np.empty_like(pkm.m, dtype=float) + Pkp = np.empty_like(pkm.P, dtype=float) + for i, (wm,mm,Pm) in enumerate(pkm): + print(f"updating mixand {i+1} out of {len(pkm)}") + mkp[i], Pkp[i], q, interm_vals = self.lin_gauss_cond_likelihood_prod(mm, Pm, zk, h_args=h_args, interm_vals=True) + print(f"Q values: {q}") + print(f"true z: {zk}") + print(f"z_hat: {interm_vals['zhat']}") + # print(f"W: {interm_vals["W"]}") + # TODO: test (z-zhat).T @ inv(W) @ (z-zhat) + wkp[i] = wm*q + + if not unnormalized: + wkp /= np.sum(wkp) + + return pygm.GaussianMixture(wkp, mkp, Pkp) \ No newline at end of file diff --git a/pyest/filters/KalmanFilter.py b/pyest/filters/KalmanFilter.py index 562116e..057ff51 100644 --- a/pyest/filters/KalmanFilter.py +++ b/pyest/filters/KalmanFilter.py @@ -625,4 +625,262 @@ def _set_F(self, F): def _get_F(self): return self._F - F = property(_get_F, _set_F) \ No newline at end of file + F = property(_get_F, _set_F) + +class EkfdUpdate(KalmanDiscreteUpdate): + """ EkfdUpdate Discrete extended Kalman filter update + + Parameters + ---------- + h : callable + measurement function of the form :math:`h(x, ...)` + R : ndarray + (ny,ny) measurement noise covariance matrix + H : callable + (nz,nx) measurement Jacobian matrix of the form :math:`H(x)` + L : (optional) ndarray + (nz,ny) mapping matrix mapping measurement noise into + measurement space + cov_method : (optional) string + method to use for covariance update. Valid options include 'general' + (default), 'Joseph', 'standard', and 'KWK'. + p : (optional) scalar + underweighting factor. p=1 results in no underweighting. p-->0 results + in no covariance update + """ + + def __init__(self, h, R, H, L=None, h_args=(), cov_method='general', p=None): + + super().__init__(R=R, L=L, p=p) + + #TODO: check cov_update method + #TODO: residual editing + + self.cov_method = cov_method + self.h = h + self.H = H + + def __set_h(self, h): + self._h = h + + def __set_H(self, H): + if isinstance(H, np.ndarray): + H = convert_mat_to_fun(H) + self._H = H + + + def __get_H(self): + return self._H + + def __get_h(self): + return self._h + + H = property(__get_H, __set_H) + h = property(__get_h, __set_h) + + @staticmethod + def gain(C, W): + """ compute filter gain + + Parameters + ---------- + C : ndarray + (nx,nz) cross-covariance + W : ndarray + (nz,nz) innovations covariance + + Returns + ------- + K : ndarray + gain matrix + + """ + return np.linalg.solve(W.T, C.T).T + + def __innovations_cov(self, Pkm, Hk): + """ compute inovations covariance + Parameters + ---------- + Pkm : ndarray + (nx,nx) prior covariance matrix at time k + Hk : ndarray + (nz,nx) measurement function Jacobian evaluated at mkm + h_args : (optional) tuple + deterministic parameters to be passed to measurement function + + Returns + ------- + W : ndarray + (nz,nz) innovations covariance + """ + return Hk @ Pkm @ Hk.T + self._LRLt + + def innovations_cov(self, Pkm, h_args=()): + """ compute inovations covariance + Parameters + ---------- + Pkm : ndarray + (nx,nx) prior covariance matrix at time k + h_args : (optional) tuple + deterministic parameters to be passed to measurement function + + Returns + ------- + W : ndarray + (nz,nz) innovations covariance + """ + h_args = make_tuple(h_args) + Hk = self._H(*h_args) + return self.__innovations_cov(Pkm, Hk) + + def __cross_cov(self, Pkm, Hk): + """ compute cross covariance + Parameters + ---------- + Pkm : ndarray + (nx,nx) prior covariance matrix at time k + Hk : ndarray + (nz,nx) measurement function Jacobian evaluated at mkm + + Returns + ------- + C : ndarray + (nx,nz) cross covariance + """ + return Pkm @ Hk.T + + def cross_cov(self, mkm, Pkm, h_args=()): + """ compute cross covariance + Parameters + ---------- + mkm : ndarray + (nx,) prior mean at time k + Pkm : ndarray + (nx,nx) prior covariance matrix at time k + h_args : (optional) tuple + deterministic parameters to be passed to measurement function + + Returns + ------- + C : ndarray + (nx,nz) cross covariance + """ + h_args = make_tuple(h_args) + Hk = self._H(mkm,*h_args) + return self.__cross_cov(Pkm, Hk) + + + def __expected_meas(self, mkm, h_func): + """ compute measurement expectation + Parameters + ---------- + mkm : ndarray + (nx,) prior mean at time k + Hk : ndarray + (nz,nx) measurement function Jacobian evaluated at mkm + + Returns + ------- + w : ndarray + (nz,) predicted measurement at time k + """ + return h_func(mkm) + + def expected_meas(self, mkm, h_args=()): + """ compute measurement expectation + Parameters + ---------- + mkm : ndarray + (nx,) prior mean at time k + h_args : (optional) tuple + deterministic parameters to be passed to measurement function + + Returns + ------- + w : ndarray + (nz,) predicted measurement at time k + """ + h_args = make_tuple(h_args) + return self.h(mkm, *h_args) + + def update(self, mkm, Pkm, z, h=None, R=None, H=None, h_args=(), interm_vals=False): + """ perform Extended Kalman filter update + + Parameters + ---------- + mkm : ndarray + (nx,) prior mean at time k + Pkm : ndarray + (nx,nx) prior covariance matrix at time k + z : ndarray + (nz,) measurement at time k + h : callable + measurement function of the form :math:`h(x, ...)`. If not provided, self.h will be used. + R : (optional) ndarray + (ny,ny) measurement noise covariance matrix. If not provided, self.R will be used. + H : (optional) callable + (nz,nx) measurement Jacobian matrix measurement Jacobian matrix of the form :math:`H(x)`. If not provided, self.H will be used. + h_args : (optional) tuple + deterministic parameters to be passed to measurement function + + Returns + ------- + mkp : ndarray + (nx,) posterior mean + Pkp : ndarray + (nx,nx) posterior state error covariance + + If interm_vals is true, additionally returns a dictionary containing: + W : ndarray + (nz,nz) innovatations covariance + C : ndarray + (nx,nz) cross-covariance + K : ndarray + gain matrix + zhat : ndarray + predicted measurement + """ + #TODO: check measurement size + if h is not None: + self.h = h + if H is not None: + self.H = H + if R is not None: + self.R = R + + h_args = make_tuple(h_args) + Hk = self.H(mkm, *h_args) + hk_func = lambda mkm: self._h(mkm, *h_args) + + # predicted measurement + w = self.__expected_meas(mkm, hk_func) + W = self.__innovations_cov(Pkm, Hk) + C = self.__cross_cov(Pkm, Hk) + + K = KfdUpdate.gain(C, W) + mkp = mkm + K@(z - w) + + if self.cov_method == 'general': + # general form. does not require gain be Kalman gain. gain must + # be linear + Pkp = Pkm - C@K.T - K@C.T + K@W@K.T + elif self.cov_method == 'Joseph': + # Joseph form. does not require gain be Kalman gain. + # measurements must be linear + I = np.eye(self._H.shape[1]) + ImKH = I - K@H + Pkp = ImKH@Pkm@ImKH.T + K@self._LRLt@K.T + elif self.cov_method == 'standard': + # standard form. requires gain be Kalman gain and linear + # measurements + I = np.eye(self._H.shape[1]) + Pkp = (I - K@self._H)@Pkm + elif self.cov_method == 'KWK': + # yet another form. requires gain be Kalman gain but does not + # require linear measurements + Pkp = Pkm - K@W@K.T + + if not interm_vals: + return mkp, Pkp + else: + return mkp, Pkp, {'W':W, 'C':C, 'K':K, 'zhat':w} \ No newline at end of file diff --git a/tests/test_filters.py b/tests/test_filters.py index b5dbc82..dd36045 100644 --- a/tests/test_filters.py +++ b/tests/test_filters.py @@ -90,6 +90,76 @@ def f(tkm1, tk, xkm1): return np.array([np.sin(xkm1[1]), xkm1[0]]) npt.assert_array_equal(m_prior, m_des) npt.assert_array_equal(P_prior, P_des) +def test_extendedkalmandiscreteupdate(): + """ test discrete time extended Kalman filter update """ + ## Polar transformation test case + def h(m): return np.array([np.linalg.norm(m), np.arctan2(m[1], m[0])]) + def H(m): return np.array([[m[0]/np.linalg.norm(m), m[1]/np.linalg.norm(m)], + [-m[1]/(m[0]**2*(1+(m[1]/m[0])**2)),1/(m[0]*(1+(m[1]/m[0])**2))]]) + R = np.array([[0.1**2, 0], + [0, 0.1**2]]) + + ekfdupd = filters.EkfdUpdate(h = h, H = H, R = R) + + mkm = np.array([np.cos(np.pi/4), np.sin(np.pi/4)]) + Pkm = np.eye(2) + + # checking that z = h(x) yields no mean update + zk = h(mkm) + mkp, Pkp = ekfdupd.update(mkm, Pkm, zk) + mkp_des = mkm + npt.assert_array_equal(mkp, mkp_des) + + # checking that z =/= h(x) updates the mean accordingly + zk = np.array([np.cos(np.pi/4),0]) + mkp, Pkp = ekfdupd.update(mkm, Pkm, zk) + + # Manual Ekfd update + Wk = H(mkm)@Pkm@H(mkm).T + R + Ck = Pkm@H(mkm).T + Kk = np.linalg.solve(Wk,Ck) + zk_hat = h(mkm) + mkp_des = mkm + Kk@(zk-zk_hat) + Pkp_des = Pkm - Ck@Kk.T - Kk@Ck.T + Kk@Wk@Kk.T + + npt.assert_almost_equal(mkp_des,mkp,decimal=15) + npt.assert_almost_equal(Pkp_des,Pkp,decimal=15) + + ## Diagonal measurement jacobian test case (H diagonal implies W diagonal) + def h(m): return np.array([0.5*m[0]**2, 0.5*m[1]**2]) + def H(m): return np.array([[m[0], 0], + [0, m[1]]]) + R = np.array([[0.1**2, 0], + [0, 0.1**2]]) + ekfdupd = filters.EkfdUpdate(h = h, H = H, R = R) + + mkm = np.array([1, 1]) + Pkm = np.eye(2) + + Wk = ekfdupd.innovations_cov(Pkm, h_args = (mkm)) + off_diag = Wk[~np.eye(Wk.shape[0], dtype=bool)] + + npt.assert_almost_equal(off_diag, 0.0, decimal=15) + + ## Linear measurement test case + def h(m): return np.array([m[1], m[0]]) + H = np.array([[0,1], + [1,0]]) + R = np.array([[0.1**2, 0], + [0, 0.1**2]]) + kfdupd = filters.KfdUpdate(H = H, R = R) + ekfdupd = filters.EkfdUpdate(h = h, H = H, R = R) + + mkm = np.array([1, -1]) + Pkm = np.eye(2) + zk = np.array([-1.1, 0.9]) + + mkp_kfd, Pkp_kfd = kfdupd.update(mkm, Pkm, zk) + mkp_ekfd, Pkp_ekfd = ekfdupd.update(mkm, Pkm, zk) + + npt.assert_almost_equal(mkp_kfd,mkp_ekfd,decimal=15) + npt.assert_almost_equal(Pkp_kfd,Pkp_ekfd,decimal=15) + def test_badunderweighting(): R = np.eye(2) H = np.eye(2) @@ -308,6 +378,82 @@ def test_GmkfUpdate(): rtol=1e-8 ) +def test_GmekfUpdate(): + """ test Gaussian mixture extended Kalman filter update """ + ## Single mixand to make sure it is the same as EkfdUpdate() + def h(m): return np.array([np.linalg.norm(m), np.arctan2(m[1], m[0])]) + def H(m): return np.array([[m[0]/np.linalg.norm(m), m[1]/np.linalg.norm(m)], + [-m[1]/(m[0]**2*(1+(m[1]/m[0])**2)),1/(m[0]*(1+(m[1]/m[0])**2))]]) + R = np.array([[0.1**2, 0], + [0, 0.1**2]]) + + ekfdupd = filters.EkfdUpdate(h = h, H = H, R = R) + gmekfupd = filters.GmekfUpdate(h = h, H = H, R = R) + + mkm = np.array([np.cos(np.pi/4), np.sin(np.pi/4)]) + Pkm = np.eye(2) + + pkm = pygm.GaussianMixture(1,mkm,Pkm) + + zk = np.array([np.cos(np.pi/4), 0]) + + mkp_ekfd, Pkp_ekfd = ekfdupd.update(mkm,Pkm,zk) + pkp = gmekfupd.update(pkm,zk) + mkp_gmekf = pkp.get_m().squeeze() + Pkp_gmekf = pkp.get_P().squeeze() + + npt.assert_almost_equal(mkp_ekfd,mkp_gmekf,decimal=15) + npt.assert_almost_equal(Pkp_ekfd,Pkp_gmekf,decimal=15) + + ## Bimodal distribution with measurement in agreement with only one mixand + m1 = np.array([np.cos(np.pi/4), np.sin(np.pi/4)]) + m2 = np.array([-10,-10]) + P1 = np.eye(2) + P2 = np.eye(2) + pkm = pygm.GaussianMixture([0.5,0.5],[m1,m2],[P1,P2]) + zk = np.array([np.cos(np.pi/4), np.sin(np.pi/4)]) + pkp = gmekfupd.update(pkm,zk) + wkp = pkp.get_w().squeeze() + + npt.assert_almost_equal(wkp[1],0,decimal=15) + + ## Linear measurements to compare against GmkfUpdate() + def h(m): return np.array([m[1], m[0]]) + H = np.array([[0,1], + [1,0]]) + R = np.array([[0.1**2, 0], + [0, 0.1**2]]) + + gmkfupd = filters.GmkfUpdate(H = H, R = R) + gmekfupd = filters.GmekfUpdate(h = h, H = H, R = R) + + wkm = np.array([0.8, + 0.1, + 0.1]) + mkm = np.array([[1,-1], + [0,0], + [1,1]]) + Pkm = np.array([np.eye(2), + np.eye(2), + np.eye(2)]) + pkm = pygm.GaussianMixture(wkm,mkm,Pkm) + + zk = np.array([-1.1, 0.9]) + + pkp_gmkf = gmkfupd.update(pkm, zk) + pkp_gmekf = gmekfupd.update(pkm, zk) + + wkp_gmkf = pkp_gmkf.get_w() + mkp_gmkf = pkp_gmkf.get_m() + Pkp_gmkf = pkp_gmkf.get_P() + + wkp_gmekf = pkp_gmekf.get_w() + mkp_gmekf = pkp_gmekf.get_m() + Pkp_gmekf = pkp_gmekf.get_P() + + npt.assert_almost_equal(wkp_gmkf,wkp_gmekf,decimal=15) + npt.assert_almost_equal(mkp_gmkf,mkp_gmekf,decimal=15) + npt.assert_almost_equal(Pkp_gmkf,Pkp_gmekf,decimal=15) def test_GmukfUpdate(): """ test Gaussian mixture unscented Kalman filter update """