Source code for kfts

#!/usr/bin/env python

# Global stuff
import numpy as np
import matplotlib.pyplot as plt
import operator
import os 
import sys  
import h5py
import time as TIME 
import random

# Local stuff
import kf
from kf.KF_class import *
import kf.readinput as infmt
from kf.timefunction import TimeFct

# To Parametrize
import configparser
import argparse
from ast import literal_eval

# MPI 
from mpi4py import MPI

[docs]class RunKalmanFilter(object): ''' Class to run the full KFTS-InSAR processing chain Read configuration and data. Setup parameters. Run KF for each pixel. ''' def __init__(self, config): plt.close('all') # close all figures self.start_time = TIME.time() # record running time self.initMpi() self.setConfiguration(config)
[docs] def initMpi(self): ''' Initiate communication of the Message Passing Interface (MPI)''' self.comm = MPI.COMM_WORLD self.rank = self.comm.Get_rank() self.size = self.comm.Get_size() if self.size == 1: #disable MPI self.comm = False
[docs] def setConfiguration(self, config): '''Read configuration file and convert to python objects. :config: open config file (.ini format) ''' #--------------- # INPUT info loc = os.path.abspath(config['INPUT'].get('workdir', fallback='./')) assert os.path.isdir(loc), "working directory {} defined in {} does not exist".format(loc,config) self.infile = os.path.join(loc, config['INPUT'].get('infile')) self.fmtfile = config['INPUT'].get('fmtfile', fallback='ISCE') # optional self.instate = config['INPUT'].get('instate', fallback=None) self.eqinfo = config['INPUT'].get('eqinfo', fallback=None) self.mask = config['INPUT'].get('maskfile', fallback=None) if self.instate is not None: self.instate = os.path.join(loc, self.instate) if self.eqinfo is not None: self.eqinfo = os.path.join(loc, self.eqinfo) if self.mask is not None: self.mask = os.path.join(loc, self.mask) self.outdir = os.path.join(loc, config['OUTPUT'].get('outdir', fallback='')) self.figdir = os.path.join(loc, config['OUTPUT'].get('figdir', fallback='')) #check if output directories exist otherwise create them os.makedirs(self.outdir,exist_ok=True) os.makedirs(self.figdir,exist_ok=True) #--------------- # MODEL info secMS = config['MODEL SETUP'] self.EQ = secMS.getboolean('EQ', fallback = False) freq = secMS.getfloat('freq', fallback = 2*np.pi) self.sig_y = secMS.getfloat('sig_y', fallback = 10.0) self.sig_i = secMS.getfloat('sig_i', fallback = 0.01) self.dtmax = secMS.getfloat('Dtime_max', fallback = None) try : self.model = literal_eval(secMS.get('model')) except : if self.rank == 0: print("WARNING: model unreadable in config section [MODEL SETUP], use default") self.model = [('POLY',1),('SIN',freq),('COS',freq)] self.sig_a = literal_eval(secMS.get('sig_a')) self.sig_a = list(self.sig_a) #--------------- # Parameters for KFTS secKFS = config['KALMAN FILTER SETUP'] self.VERBOSE = secKFS.getboolean('VERBOSE', fallback = True) self.PLOT = secKFS.getboolean('PLOT', fallback = False) self.UPDT = secKFS.getboolean('UPDT', fallback = False) self.pxlTh = secKFS.getint('pxlTh', fallback = 1) self.cohTh = secKFS.getfloat('cohTh', fallback = None) self.ref = (secKFS.getint('refy',fallback = None), secKFS.getint('refx',fallback = None)) if None in self.ref : self.ref = None if self.isTraceOn(): print("Functional model string is: {}".format(self.model)) print("Exclude pixels with less than {} valid interferograms".format(self.pxlTh)) #--------------- ## Optional section # initialize self.subregion = None # read if exists if config.has_section('FOR TESTING'): secFT = config['FOR TESTING'] SUBREGION = secFT.getboolean('SUBREGION', fallback = False) if SUBREGION: x1,x2,y1,y2 = literal_eval(secFT.get('limval',fallback = '0,0,0,0')) self.subregion = infmt.Subregion(x1, x2, y1, y2) if self.rank == 0: print("WARNING: select subregion", x1, x2, y1, y2)
[docs] def readData(self): '''Initiate the data class dealing with interferograms, time, spatial grid and mask. It #. reads data #. divide the grid between workers (if MPI) #. build data covariance and #. store information ''' # Read data file data = infmt.SetupKF( self.infile, comm = self.comm, mpiarg = (self.rank,self.size), fmt = self.fmtfile, verbose = self.isTraceOn(), subregion = self.subregion, cohTh = self.cohTh, refyx = self.ref ) # Chose subset around fault #data.select_pxl_band(x,y,0.48,-750,-650) # Load previously defined mask if self.mask is not None: data.apply_input_mask(self.mask) # Record indexes of empty pixels data.pxl_with_nodata(thres = self.pxlTh, plot = self.PLOT) # Get indices pairs used to build interferograms data.get_interf_pairs() # Build matrix of uncertainty on data data.create_R(np.square(self.sig_i)) # Print properties of data and keep it stored in there data.summary() return data
[docs] def earthquakeIntegration(self,data): ''' Add step function to the functional model of deformation to model coseismic displacement due to earthquakes. Require a file containing earthquake properties in the track reference frame (see earthquake2step.py). ''' def twoD_Gauss(x, y, amp0, center = (0.,0.), sig = 1.0): '''Compute a gaussian at coord x,y''' x0, y0 = center Function = amp0 *np.exp(-((x0-x)**2 + (y0-y)**2)/(2. * sig**2)) return Function # Get earthquake properties in actual reference frame Xeq,Yeq,teq,Rinf,Aeq,sx,sy = np.loadtxt(self.eqinfo, unpack=True) Xeq = Xeq.astype('int32') Yeq = Yeq.astype('int32') if isinstance(teq,float): #specific case of one earthquake only Leq = 1 teq = [teq] Xeq,Yeq = [Xeq],[Yeq] Rinf,Aeq,sx,sy = [Rinf],[Aeq],np.array(sx),np.array(sy) elif isinstance(teq,np.ndarray): iord = np.argsort(teq) #verify chronological order Xeq,Yeq,Rinf,Aeq,sx,sy = Xeq[iord],Yeq[iord],Rinf[iord],Aeq[iord],sx[iord],sy[iord] teq = teq[iord].tolist() Leq = len(teq) else: assert False, "Verify content of {}".format(self.eqinfo) teq.insert(0,'STEP') self.model.insert(len(self.model),tuple(teq)) if self.isTraceOn(): print('New functional model with earthquakes :', self.model) self.sig_a.extend(np.zeros(Leq)) # Error per earthquake P0eq = np.zeros((data.Ny,data.Nx,Leq)) for i in range(Leq): width = Rinf[i]/(np.mean([np.mean(sx),np.mean(sy)])) fct = twoD_Gauss(data.xv[data.miny:data.maxy,:], data.yv[data.miny:data.maxy,:], Aeq[i]**2, center=(Xeq[i],Yeq[i]), sig=width) fct[fct < 1.] = 0. # below an std of 1, approximate to zero (parameter not-optimized) P0eq[:,:,i] = fct return Leq,P0eq
[docs] def loadcheck_pastoutputs(self,data,tfct): ''' Check input file consitency when restarting and frame time series update * *data* : initiated data class (new interferograms) * *tfct* : initiated model class ''' # Import common things to all pixels to gain time finstate = h5py.File(self.instate,'r') statetime = finstate['tims'][:] #contains as many dates than state contains phases indxs = finstate['indx'][:].tolist() # Identify new dates with respect to former state newdate = [t for t in data.time if t not in statetime] # Check consistency assert (len(newdate) > 0), "No supplementary date in file {} wrt existing {}".format(self.infile,self.instate) assert (statetime[0] <= data.time[0]), "New data involves timesteps older than what was retained in the former state" # Set number of phases to keep for latter update data.max_tsep = np.max([data.max_tsep,len(statetime)]) toverl = len(statetime) - len(data.time) + len(newdate) #number of past dates not in data to recover if self.isTraceOn(): print('Data contains {} new time steps'.format(len(newdate))) print('There are {} estimates that will be potentially updated'.format(len(indxs)-toverl)) # Look for parameters concerning old stuff wrt starting time if self.dtmax is not None: tfct.identify_outdated(self.dtmax) if tfct.Cstindex is None : self.dtmax == None return finstate, statetime, indxs, toverl
[docs] def initCovariances(self, L): '''Create arrays for the initial state Covariance matrix (P0) and the process noise covariance (Q). :L: Initial length of the state vector (number of model parameter + 1 (for reference phase)) ''' P_par = np.square(self.sig_a) m_err = np.zeros(L) # Incertitude sur parametres phi_err = 0.0 # Incertitude sur interfero add_err = (self.sig_y)**2 # Uncertainty on newly added phase return P_par, m_err, phi_err, add_err
[docs] def initPlot(self, data, figdir): '''Draw quick plots to visualize input data: * Data plot * baseline plot * spatial mask on pixel ''' import kf.makeplot as mplt mplt.view_data(data.igram, range(np.shape(data.igram)[0]), figdir) mplt.view_mask(data.mask, figdir) mplt.plot_baselines(data.time, data.bperp, data.imoins, data.iplus, figdir)
[docs] def isTraceOn(self): '''Print only if verbose activated and first parallel worker''' return self.VERBOSE and self.rank == 0
[docs] def launch(self): '''Combine procedures to prepare data and model, then launch the Kalman filter class on each pixel row by row ''' ## Setup # Start class dealing with data data = self.readData() # Add earthquake to model if self.EQ == True: Leq,P0eq = self.earthquakeIntegration(data) # Start class dealing with the functional model if self.isTraceOn(): print("-- Read and initialize model --") tfct = TimeFct(data.time,self.model,verbose = self.isTraceOn()) tfct.check_model() L = len(self.sig_a) # Check right number of model a priori error if tfct.L == L: # Add zero to apriori uncertainty as first phase is certain self.sig_a.extend([0]) elif tfct.L == (L-1): if self.sig_a[-1] == 0: L -= 1 else : assert False, "sig_a has an unexpected extra value, reduce length by one" elif tfct.L<(L-1) : assert False, "Too many uncertainties specified (%d) in sig_a compare to model requirement (%d)"%(L,tfct.L) elif tfct.L>L: assert False, "Too few uncertainties specified (%d) in sig_a compare to model requirement (%d)"%(L,tfct.L) P_par, m_err, phi_err, add_err = self.initCovariances(L) if self.PLOT : self.initPlot(data, self.figdir) #--------------------------------------------------------------------------- ## Kalman Filter kf_start = TIME.time() m0 = np.zeros(L) P0 = P_par*np.eye(L+1) #bound t_sep to save memory data.max_tsep = np.min([data.max_tsep,10]) toverl = 0 tshift = 1 if self.UPDT == True: finstate, statetime, indxs, toverl = self.loadcheck_pastoutputs(data,tfct) tshift = len(indxs) if self.isTraceOn(): print('-- Open H5 file for storage --') if os.path.exists(self.outdir + 'States.h5'): sufx='{}'.format(random.randint(0,9)) if self.isTraceOn(): print('WARNING: {}States.h5 already exists, create States{}.h5'.format(self.outdir,sufx)) else: sufx='' fstates, fphases, fupdate = infmt.initiatefileforKF( os.path.join(self.outdir, 'States{}.h5'.format(sufx)), os.path.join(self.outdir, 'Phases{}.h5'.format(sufx)), L, data, self.model, (m_err,self.sig_i,self.sig_y), updtfile= os.path.join(self.outdir, 'Updates{}.h5'.format(sufx)), comm = self.comm, toverlap = toverl, tshift = tshift) if self.isTraceOn(): print('-- START Kalman Filter --') # Loop on each line for j in range(data.Ny): if self.rank == 0: sys.stdout.write('\r {} / {}'.format(j,data.Ny)) sys.stdout.flush() # Create the kalman filters for all columns kalmans = [Kalman(data, tfct, j = j, i = jj, verbose = self.VERBOSE) for jj in range(data.Nx) if data.mask[j,jj]>0.] # Create the filters for all columns of this line for kal,jj in zip(kalmans,range(data.Nx)): if self.EQ == True : diagP0 = np.diag(P0).copy() diagP0[-(Leq+1):-1] = P0eq[j,jj,:] P0 = np.diag(diagP0) if self.UPDT == True : kal.restart_from_file(finstate, statetime, indxs, self.dtmax) else : kal.start_new(m0, P0) kal.kf(m_err, phi_err, add_err, plots = False, t_sep = data.max_tsep) # Store outputs kal.write_output(fstates, fphases, outupdate=fupdate) # Close file if self.isTraceOn(): print('-- Close H5 files in {} --'.format(self.outdir)) fstates.close() fphases.close() fupdate.close() if self.UPDT == True: finstate.close() if self.cohTh is not None: data.finmask.close() print('Time for Kalman filter : {}'.format(TIME.time() - kf_start))
if __name__ == '__main__': #First get file name from inline argument parser = argparse.ArgumentParser(description = 'InSAR Time series analysis with a Kalman Filter') parser.add_argument( '-c', type=str, dest = 'config', default = None, help = 'Specify INI config file containing at least sections INPUT,OUTPUT, MODEL SETUP, KALMAN FILTER SETUP') args = parser.parse_args() while (not args.config): print('Please choose a config file') exit() config = configparser.ConfigParser(interpolation = configparser.ExtendedInterpolation()) config.read(args.config) runner = RunKalmanFilter(config) runner.launch()