Source code for kf.KF_class

# -*- coding: utf-8 -*-
############################################################################@
#Assimilation of measured phase : Kalman Filter for (synthetic) INSAR data 
#
#
#Date : February 2018
#Author : Manon Dalaison & Romain Jolivet
###########################################################################@
from __future__ import print_function
from __future__ import division
from builtins import str
from builtins import range
from past.utils import old_div
import numpy as np
import matplotlib.pyplot as plt
import operator
import os        #using operating system dependent functionality.
import h5py
import time as TIME

# Local stuff
#import insar_fct
#from timefunction import TimeFct

###############################################################################     
# A class for the Kalman filter
    
[docs]class Kalman(object): def __init__(self, data, fctmod, j=0, i=0, verbose=True): ''' Class for a Kalman filter for an InSAR time series analysis Initialize the object * data : object observations/measurements in class from readimput_mpi.py * fctmod : object functional model in class from timefunction.py * j,i : integers indexes for 2-D image used for storage * verbose : boolean print stuffs ''' self.verbose = verbose ### Extract model from class self.modelobj = fctmod self.model = fctmod.model self.L = fctmod.L ### Store essential information #self.dataobj = data self.data = data.igram[:,j,i] self.t = data.time self.link = data.links[:] self.Rdat = data.R self.link[:,0] = 0 # For 2D store index of pixels self.xi = i self.yi = data.miny +j self.rank = data.rank assert (np.shape(self.link)[0] == np.shape(data.igram[:,j,i])[-1]), 'rows of links should correspond to numb of interfero'
[docs] def restart_from_file(self,fin,pasttime,indxs): ''' Extract initial condition from OPENED infile (fin) which stores previously computed mk and Pk for all pixels including pixel[i,j] * fin : object opened H5 file containing formely computed state * pasttime : array already loaded time array in fin * indxs : array already loaded index array ''' i = self.xi j = self.yi # Load Previously computed state self.m = fin['state'][j,i,:] #3D (y,x,m) self.P = fin['state_cov'][j,i,:,:] #3D (y,x,P) # Keep strack of where we are with respect to first data of time series self.m_indxs = indxs #indexes of phases in m after prediction phase self.t = np.concatenate((pasttime,self.t)) self.t = np.unique(self.t) if len(self.modelobj.t) < len(self.t): self.modelobj.t = self.t self.phases = [] #phases already computed self.std = [] #std of phases already computed # Check consistency len_m0 = self.m.shape[-1] -len(pasttime) #number of parameters in the m stored, define initial model self.Lref = self.L #max number of parameters as predicted by the model self.get_model_from_num_of_param(len_m0) if np.shape(self.link)[1] != len(self.t): #WARNING: columns of links and time must be of same length, modify links self.link = self.link[:,-len(self.t):]
[docs] def start_new(self,m0, P0): ''' Start from skratches * m0 : 1D array (N) Initial Model. The length of the vector will determine how many element of the model will be kept (in the order given in the model vector) * P0 : 2D array (N,N) Initial Covariance ''' self.m = np.concatenate((m0,[0.0])) #first phase fixed to zero self.P = P0 self.m_indxs = [0] #indexes of phases in m after prediction phase self.phases = [] #store phases that converged and removed from m self.std = [] #std of phases already computed self.Lref = self.L #max number of model parameters (take it from TimeFct) self.get_model_from_num_of_param(len(m0)) assert (np.shape(self.link)[1] == len(self.t)), 'columns of links and time must be of same length'
[docs] def get_model_from_num_of_param(self,N): ''' Truncate model if the number of parameters in the input (N) is smaller than the maximum number of parameters as predicted by the initial functional model (self.Lref). N (or equivalently self.L) may increase with latter iterations''' assert N <= self.Lref, 'number of apriori parameters greater than what model predicts' if N < self.Lref: print('Truncate model') self.mod = self.modelobj.cut_model(N) #new truncated model (list of tuples)
#new self.L defined as N
[docs] def create_Q(self, m_err, phi_err, add_err, M): ''' Create process covariance Q from uncertainty on model (m_err) and interferograms (phi_err) at kth time. * m_err : float or an array of length L model uncertainty * phi_err : float systematic error on phases (should be zero) * add_err : float systematic error on last forecast (square of std of mismodeling) * M : integer the state vector length ''' L = self.L if (isinstance(m_err,float) or isinstance(m_err,int)) : #check if float or int self.Q = np.concatenate((m_err*np.eye(M+1)[:L],phi_err*np.eye(M+1)[L:])) elif len(m_err) >= L : Q1 = np.hstack((np.diag(m_err[:L]),np.zeros((L,M+1-L)))) self.Q = np.concatenate((Q1,phi_err*np.eye(M+1)[L:])) else : assert False, 'format m_err not understood' if M >= L+1 : #not first iteration self.Q[-1,-1] = add_err #large error associated to last a priori value
[docs] def create_H_R_and_D(self, k, indxs): ''' Produce the measurement vector (D), the measurement matrix (H), and the measurement covariance matrix (R) at a specific timestep (0≤ k <N) --> if len(D)=n for this timestep, then H will be (n x (L+k+1)) and R (n x n) * k : integer itteration number * indx : integer indexes (with respect to t0) of phases in self.m[L:] ''' # last_column is 1 when youngest-oldest or -1 when oldest-youngest last_column = self.link[:,-1][np.nonzero(self.link[:,-1])][-1] # find interferograms for time k (line of links) involving past dates only ! ind_interf = np.array([i for i,hh in enumerate(self.link[:,k]) if hh== last_column]) #check for NaN in D[ind_interf] if len(ind_interf)> 0 : mask_nan = np.isnan(self.data[ind_interf]) ind_interf = ind_interf[np.invert(mask_nan)] if len(ind_interf)> 0 : #find phase substracted to phase k (column of links) ind_phases = np.array([i for i in np.where(self.link[ind_interf,:]==abs(1))[1] if i!=k]) #If phase involved in interferogram not in state (m) anymore condition = [i not in indxs for i in ind_phases] if any(condition): ind_old,im = ind_phases[condition],np.where(condition)[0] #expand m and P from phase self.m = np.concatenate((self.m[:self.L],self.phases[ind_old],self.m[self.L:])) for i in range(len(ind_old)): var = np.square(self.std[ind_old[i]]) row = np.zeros(np.shape(self.P)[0]) col = np.zeros(np.shape(self.P)[0]+1) #No covariance terms in P gives better results, but approximation col[self.L] = var self.P = np.insert(self.P,self.L+i,row,axis=0) self.P = np.insert(self.P,self.L+i,col,axis=1) #notify new indxs = np.concatenate((ind_old,indxs)) #with respect to idx0 self.m_indxs = np.concatenate((ind_old +self.idx0,self.m_indxs)) #select relevant D and R self.D = self.data[ind_interf] self.R = self.Rdat[ind_interf,:][:,ind_interf] Hsub = self.link[ind_interf,:k+1] #resize if phases deleted Hsub = Hsub[:,indxs] self.H = np.hstack((np.zeros((len(self.D),self.L)),Hsub)) else : self.D = [] self.R = [] self.H = []
[docs] def predict(self,X, P, A, Q): ''' Forecast step * X : array (N) The mean state estimate of the previous step ( k −1). * P : array (N, N) The state covariance of previous step ( k −1). * A : array (N+1, N) The transition matrix. * Q : array (N+1, N+1) The process noise covariance matrix. ''' Xf = np.dot(A, X) Pf = np.dot(A, np.dot(P, A.T)) + Q return(Xf,Pf)
[docs] def update(self,Xf, Pf): ''' * Xf : array forecast mean of the state * Pf : array forecast covariance of the state''' Y = np.array(self.D) # the measurement vector H = np.array(self.H) # the measurement matrix R = np.array(self.R) # the measurement covariance matrix if len(Y) == 0 : #no information for this date self.inov = np.nan #innovation has no meaning here self.K = np.zeros((self.L,1)) #kalman gain has no meaning here return (Xf,Pf) #return forecast else : #Data - predictive distribution of Y self.inov = self.innovation(Xf,Y) #the Covariance or predictive mean of Y IS = R + np.dot(H, np.dot(Pf, H.T)) #the Kalman Gain matrix if np.linalg.det(IS) == 0: print('Pf', Pf) print('H', H) print('determinant is Zero, not invertible') self.K = np.dot(Pf, np.dot(H.T, np.linalg.inv(IS))) #the estimated mean state X = Xf + np.dot(self.K,self.inov) #Y-IM is residual or "innovation vector" #the estimated covariance state P = Pf - np.dot(self.K,np.dot(H,Pf)) if self.verbose : self.check_fit(X,P) return(X,P)
[docs] def innovation(self, Xf, Y): ''' Compute residual or innovation vector Innovation for phases is not informative. After a few steps, reflects noise of data around model. ''' #the Mean of predictive distribution of Y IM = np.dot(self.H, Xf) return(Y-IM)
[docs] def check_fit(self, X, P, eps_interf=10): ''' Check quality of fit of phases if verbose activated. Compute residual weighted by its Covariance for analysed state and print warning if pb * eps_interf : float accepted difference between computed and real interferograms. ''' Cres = self.R + np.dot(self.H, np.dot(P, self.H.T)) res = np.dot(np.linalg.inv(Cres), self.innovation(X, self.D)) if abs(np.mean(res)) > eps_interf : print('WARNING: post-fit residual too big (mean >' +str(eps_interf)+ 'mm)') print(res)
[docs] def reduce_sizes_m_P(self, k ): ''' Remove phases in m if not used to build interferograms and has converged For ulterior long baseline interferograms, phase and associated standard deviation can be recovered but state Covariance terms are lost (too heavy to store) * k : integer number of iteration ''' t_sep = self.t_sep #number of time step to keep in state vector if k >= t_sep : L = self.L #number of parameters # apply to phases not in current interferograms (t > t_sep) sub_P = np.diag(self.P[L:-(t_sep),L:-(t_sep)]) sub_m = self.m[L:-(t_sep)] indx = self.m_indxs[:-(t_sep)] #Look at where to insert old phases relativind = indx -(len(self.phases)+self.idx0) ind_mod,im = indx[relativind<0],np.where(relativind<0)[0] ia = np.where(relativind>=0)[0] if len(im)>0: #phase already stored need to be updated self.phases[ind_mod-self.idx0] = sub_m[im] self.std[ind_mod-self.idx0] = abs(sub_P[im])**(1/2.) #Append phases self.phases = np.append(self.phases,sub_m[ia]) self.std = np.append(self.std,abs(sub_P[ia])**(1/2.)) #sqrt of variance #Cut state self.P = np.delete(self.P,list(range(L,len(self.m)-t_sep)),0) #row self.P = np.delete(self.P,list(range(L,len(self.m)-t_sep)),1) #column self.m = np.concatenate((self.m[:L],self.m[-t_sep:])) self.m_indxs = self.m_indxs[-t_sep:] assert np.shape(self.P)[0]==np.shape(self.P)[1],'ERROR: Pb in reshape, P not square matrix' assert np.shape(self.P)[0]==len(self.m),'ERROR: shape of m and P do not match'
[docs] def expend_m_P(self,L,n,PL): ''' Open state vector and covariance (m and P) to add building parameters * L : integer index at which we open and insert new parameters in m and P * n : integer number of parameters to add * PL : float apriori variance of the new parameters ''' #increase size of m self.m = np.concatenate((self.m[:L],np.zeros(n),self.m[L:])) #extend P for i in range(L,L+n): row = np.zeros(np.shape(self.P)[0]) col = np.zeros(np.shape(self.P)[0]+1) col[L] = PL self.P = np.insert(self.P,i,row,axis=0) self.P = np.insert(self.P,i,col,axis=1)
[docs] def detect_event(self,k,kmod,m_all): ''' IN PROGRESS TESTED ON SYNTHETIC DATA Add model parameter for unexpected events not in model * k : integer iteration * kmod : integer minimum k at which modification can be applied ''' # Test for sharp variations in model parameters params = [i[:self.L] for i in m_all[-5:-1]] condition = np.sum(abs(np.mean(np.diff(params,axis=0)))) > 1/2. *np.sum(self.m[:self.L]) #sig_y = 15.0 #vel_all = [i[1] for i in m_all[-5:-1]] #condition = abs(np.mean(np.diff(vel_all))) > sig_y/self.t[k]*1./(len(vel_all)-1) if k > kmod and condition: print('They may be an unexpected event') print(np.sum(abs(np.mean(np.diff(params,axis=0))))) print(1/2. *np.sum(self.m[:self.L])) ''' if k >= kmod_min-1 and self.Lss > 1 : #reevaluate length of Lss Amps = self.m[self.L+self.Leq:self.L+self.Leq+self.Lss] mask = Amps > 1/100.*max(Amps) #want to keep significant terms only rem_i = np.array(range(len(Amps)))[np.invert(mask)] #indexs to remove Amps = Amps[mask] #to keep print 'clean phase' print self.st self.st = self.st[mask] self.m = np.concatenate((self.m[:(self.L+self.Leq)],Amps,self.m[(self.L+self.Leq+self.Lss):])) print self.st self.P = np.delete(self.P,self.L+self.Leq+rem_i,0) #row self.P = np.delete(self.P,self.L+self.Leq+rem_i,1) #column self.Lss = len(Amps) if k > kmod_min and conditions : #compare velocity terms print 'diff between vel', np.diff(vel_all) print 'mean change in vel', abs(np.mean(np.diff(vel_all))) print 'max crit', sig_y/self.t[k]*1./(len(vel_all)-1) dt = int(round(2*self.swidth/6)) #how early can the ss be detected? add_times = [self.t[k+dt-5],self.t[k+dt-3]] #times to test self.st = np.append(self.st,add_times) L = self.L + self.Leq + self.Lss #where we split m to insert new param print 'add slowslip', self.t[k] self.expend_m_P(L,len(add_times),100.**2.) self.m[1] = m_all[-3][1] self.Lss += len(add_times) kmod_min = k + dt + 2 #time to wait for adjustement of amplitude before chacking for new ss if kmod_min >= len(self.t): kmod_min = len(self.t)'''
[docs] def kf(self, m_err, phi_err, add_err, t_sep=6, plots=True, cm='jet', ax1=0, ax2=0): ''' Run kalman filter combining other functions of class (i.e. MAIN) * m_err : array systematic error on model (should be 0) * phi_err : float systematic error on interferograms (should be 0) * t_sep : integer maximum time separation between interferograms, fix the minimum number of phases that must be kept in the state vector. Constrain the maximum length of the state vector * plots : boolean, optional WARNING - activate only if one instance of KF (=one pixel), then subsequent parameters must be specified - ax1 : pyplot axis in which plot evolution of parameters - ax2 : pyplot axis in which plot evolution of predicted value and model - cm : string or colormap the colormap of reference later discretised ''' #Prepare storage array kmod = 13 #int from which parameters can be added m_all = [] #store state vector at each k in list of lists self.t_sep = t_sep #caution if large may be slow and heavy #Get where to start itterations assert len(self.m_indxs)<= len(self.t),'ERROR: more phases computed than dates to work on' assert len(self.m_indxs) < len(self.t),'ERROR: from array size, no NEW phase to compute' self.idx0 = self.m_indxs[0] #indx of first phase (in this KF update) wrt the initial reference at t0 k_start = len(self.m_indxs) #number of phases in m from start (that will be reanalysed/updated) k_end = len(self.t) #number of dates in time series (final number of phase estimates) #Initialize self.A = self.modelobj.create_A(k_start-1, len(self.m)) self.create_Q(m_err, phi_err, add_err, len(self.m)) Innov = [] Gain = [] #Loop on time for k in range(k_start,k_end): self.m_indxs = np.append(self.m_indxs, self.idx0+k) #add last phase index self.create_H_R_and_D(k, self.m_indxs-self.idx0) #Update matrices self.create_Q(m_err,phi_err,add_err,len(self.m)) self.A = self.modelobj.create_A(k-1,len(self.m)) (mf,Pf) = self.predict(self.m, self.P, self.A, self.Q) (self.m,self.P) = self.update(mf, Pf) #store info Gain.append( np.linalg.norm(self.K[:self.L,:], axis=1) ) #Gain of model parameters Innov.extend( [np.mean(self.inov)] ) if (k%5==0) or (k_end-1): #every 5th k (to save time) #Reduce size of m (part with phases) self.reduce_sizes_m_P(k) #OPTION ONLY TESTED ON SYNTHETICS #Add building parameters based on observations #m_all.append(self.m.copy()) #self.detect_event(k,kmod,m_all) if k < k_end-1: #not last step #if number of parameters smaller than in ref if self.L < self.Lref : #Add model element when getting close to relevant date self.mod,n = self.modelobj.expend_model(k,2,verbose=False) self.expend_m_P(self.L-n,n,70.**2.) #L already increased in timefunction #Update matrices #self.create_Q(m_err,phi_err,add_err,len(self.m)) #self.A = self.modelobj.create_A(k,len(self.m)) #Plot if plots == True : cmap = [cm(1.*i/k_end) for i in range(k_end)] if ax1.ndim == 2: self.plot_params(k,ax1[0,:],cmap) self.plot_gain(k,ax1[1,:],cmap) else: self.plot_params(k,ax1,cmap) if k > 5 : #do not plot first poorly fitting models self.plot_model(k,ax2,cmap) ###END loop self.phases = np.concatenate((self.phases, self.m[self.L:])) std_all = np.concatenate((self.std, np.diag(abs(self.P[self.L:,self.L:]))**(1/2.))) self.std = std_all if plots == True : ax2.errorbar(self.t[:],self.phases,yerr=std_all,\ fmt='.',c='pink',label='retrieved phases',markersize=10) ax2.set_xlabel('Time (days)') ax2.set_ylabel('Displacement (mm)') self.title_labels(ax1) self.Gain = np.array(Gain) self.Innov = np.array(Innov) return
[docs] def write_output(self, outstates, outphase, outupdate=None): ''' Save outputs of kalman filter for next run * outstates : h5py file Open h5file for state storage * outphase : h5py file Open h5file for phase storage * outupdate : h5py file Open h5file for gain and innovation (Optional) ''' i = self.xi j = self.yi if np.count_nonzero(outstates['indx'][:]) == 0 : #check if empty #Store pixel independent information outstates['indx'][:] = self.m_indxs outstates['tims'][:] = self.t[self.m_indxs-self.idx0] outphase['idx0'][...] = self.idx0 outphase['tims'][:] = self.t #Fill in with new data outstates['state'][j,i,:] = self.m outstates['state_cov'][j,i,:,:] = self.P outphase['rawts'][j,i,:] = self.phases outphase['rawts_std'][j,i,:] = self.std #Store Gain and Innovation outupdate["mean_innov"][j,i,:] = self.Innov outupdate["param_gain"][j,i,:,:] = self.Gain # All done return
[docs] def plot_params(self,k,ax,cmap) : ''' Plot each parameter over time with its uncertainty in subplots of size len(ax)''' m_plt,P_plt = self.modelobj.comp_phase_shift(self.m[:self.L],P=self.P[:self.L,:self.L]) P_plt = abs(np.diag(P_plt))**(1/2.) #plot for i in range(self.L): ax[i].errorbar(self.t[k],m_plt[i],yerr= P_plt[i],fmt='o',markersize=4.5,\ c=cmap[k],elinewidth=0.9,markeredgecolor='k',markeredgewidth=0.3)
[docs] def plot_gain(self,k,ax,cmap) : ''' Plot gain for each parameter over time''' g_l = np.linalg.norm(self.K[:self.L,:],axis=1) g_plt,tmp = self.modelobj.comp_phase_shift(g_l) for i in range(self.L): ax[i].plot(self.t[k],g_plt[i],'o',c=cmap[k],markersize=3)
[docs] def plot_model(self,k,ax,cmap) : ''' Plot resulting model''' Model= self.draw_model(self.m[:self.L]) ax.plot(self.t,Model,'-',c=cmap[k],linewidth=0.8)
[docs] def title_labels(self,ax1) : ''' Add axes label and titles for subplots from plot_params and plot gain functions''' if ax1.ndim == 1: ax1[0].set_ylabel('parameters') #for i in range(len(ax1)): # ax1[i].set_xlabel('time (days)') if ax1.ndim == 2: ax1[1,0].set_ylabel('gains') ax1[0,0].set_ylabel('parameters') for i in range(np.shape(ax1)[1]): ax1[1,i].set_xlabel('time (days)') ax1 = ax1[0] #select first row of plots #Titles of subplots in fig1 tltsize = 12 labels = self.modelobj.get_label(self.L, 'mm', phase=True) i=0 for lab in labels: ax1[i].set_title(lab,fontsize=tltsize) i +=1
#EOF