Skip to content

Commit

Permalink
Sub-daily CCFs are now default
Browse files Browse the repository at this point in the history
I've modified the msnoise workflow to allow sub-daily mov_stacks. This means:
* keep_all is now default **Y**
* The stack method will read ALL the CCFs of corr_duratino length and apply: 1/ a rolling mean (for now, can be adapted later to apply other stack methods) of a given duration (e.g. "1d" (d=day in pandas vocabulary) and 2/ a resampling of (possibly) other duration (e.g. "12h").
This results in the STACKS' netcdf files containing 2 CCFs per day, and all subsequent steps have been adapted to work with any number of indices present in the netcdf files.

There might still be some small issues of missing CCFs at the edge of days (or the last window etc), to check.

This introduces a solid backward incompatibility, in the parameters, the code & the file structure.
  • Loading branch information
ThomasLecocq committed Feb 29, 2024
1 parent d6df3ba commit e174246
Show file tree
Hide file tree
Showing 16 changed files with 160 additions and 104 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.txt
Expand Up @@ -2,10 +2,11 @@ master
======

Major:
* MSNoise can now work with subdaily CCFs natively, from CC to dv/v!
* added support for Location IDs: TODO + add documentation
* this CHANGES the structure of the database !! TODO should we provide an "upgrade" tool?
* changes to the schema: Station (and TODO Filter too)
* supprort for PostgreSQL databases. TODO: check if removing index_hints really affect performances (should not)
* support for PostgreSQL databases. TODO: check if removing index_hints really affect performances (should not)

Changes:
- msnoise
Expand Down
55 changes: 31 additions & 24 deletions msnoise/api.py
Expand Up @@ -293,11 +293,10 @@ def get_params(session):
params.all_components = np.unique(params.components_to_compute_single_station + \
params.components_to_compute)

if params.mov_stack.count(',') == 0:
params.mov_stack = [int(params.mov_stack), ]
if params.mov_stack.count(',') == 1:
params.mov_stack = [params.mov_stack, ]
else:
params.mov_stack = [int(mi) for mi in params.mov_stack.split(',')]

params.mov_stack = params.mov_stack
return params

# FILTERS PART
Expand Down Expand Up @@ -1133,7 +1132,7 @@ def export_allcorr(session, ccfid, data):

df = pd.DataFrame().from_dict(data).T
df.columns = get_t_axis(session)
df.to_hdf(os.path.join(path, date+'.h5'), 'data')
df.to_hdf(os.path.join(path, date+'.h5'), key='data')
del df
return

Expand All @@ -1149,7 +1148,7 @@ def export_allcorr2(session, ccfid, data):

df = pd.DataFrame().from_dict(data).T
df.columns = get_t_axis(session)
df.to_hdf(os.path.join(path, date+'.h5'), 'data')
df.to_hdf(os.path.join(path, date+'.h5'), key='data')
del df
return

Expand Down Expand Up @@ -1499,7 +1498,7 @@ def get_mwcs(session, station1, station2, filterid, components, date,
"""
TODO
"""
file = os.path.join('MWCS', "%02i" % filterid, "%03i_DAYS" % mov_stack,
file = os.path.join('MWCS', "%02i" % filterid, "%s_%s" % (mov_stack[0], mov_stack[1]),
components, "%s_%s" % (station1, station2),
'%s.txt' % date)
if os.path.isfile(file):
Expand Down Expand Up @@ -1537,7 +1536,10 @@ def get_results_all(session, station1, station2, filterid, components, dates,
station1, station2, components)
results = []
for date in dates:
fname = os.path.join(path, date.strftime('%Y-%m-%d.h5'))
if isinstance(date, str):
fname = os.path.join(path, date+".h5")
else:
fname = os.path.join(path, date.strftime('%Y-%m-%d.h5'))
if os.path.isfile(fname):
df = pd.read_hdf(fname, 'data', parse_dates=True)
df.index = pd.to_datetime(df.index)
Expand All @@ -1553,6 +1555,7 @@ def get_results_all(session, station1, station2, filterid, components, dates,
dr = xr.DataArray(result, coords=[times, taxis],
dims=["times", "taxis"]).dropna("times", how="all")
dr.name = "CCF"
dr = dr.sortby('times')
return dr.to_dataset()
else:
return pd.DataFrame()
Expand Down Expand Up @@ -1695,9 +1698,13 @@ def build_movstack_datelist(session):
elif begin == "1970-01-01": # TODO this fails when the DA is empty
start = session.query(DataAvailability).order_by(
DataAvailability.starttime).first().starttime.date()
end = datetime.datetime.strptime(end, '%Y-%m-%d').date()
else:
start = datetime.datetime.strptime(begin, '%Y-%m-%d').date()

if end == "2100-01-01":
end = session.query(DataAvailability).order_by(
DataAvailability.endtime.desc()).first().endtime.date()
else:
end = datetime.datetime.strptime(end, '%Y-%m-%d').date()
end = min(end, datetime.date.today())
datelist = pd.date_range(start, end).map(lambda x: x.date())
Expand Down Expand Up @@ -2186,7 +2193,7 @@ def get_dvv(session, filterid, components, dates,
print("No Data for %s m%i f%i" % (components, mov_stack, filterid))

alldf = pd.concat(alldf)
print(mov_stack, alldf.head())
# print(mov_stack, alldf.head())
if 'alldf' in locals():
errname = "E" + dttname
alldf.to_csv("tt.csv")
Expand Down Expand Up @@ -2215,7 +2222,7 @@ def get_dvv(session, filterid, components, dates,

def xr_save_ccf(station1, station2, components, filterid, mov_stack, taxis, new, overwrite=False):
path = os.path.join("STACKS2", "%02i" % filterid,
"%03i_DAYS" % mov_stack, "%s" % components)
"%s_%s" % (mov_stack[0], mov_stack[1]), "%s" % components)
fn = "%s_%s.nc" % (station1, station2)
fullpath = os.path.join(path, fn)
if overwrite:
Expand All @@ -2230,7 +2237,7 @@ def xr_save_ccf(station1, station2, components, filterid, mov_stack, taxis, new,

def xr_get_ccf(station1, station2, components, filterid, mov_stack, taxis):
path = os.path.join("STACKS2", "%02i" % filterid,
"%03i_DAYS" % mov_stack, "%s" % components)
"%s_%s" % (mov_stack[0], mov_stack[1]), "%s" % components)
fn = "%s_%s.nc" % (station1, station2)

fullpath = os.path.join(path, fn)
Expand Down Expand Up @@ -2270,12 +2277,12 @@ def xr_get_ref(station1, station2, components, filterid, taxis):

def xr_save_mwcs(station1, station2, components, filterid, mov_stack, taxis, dataframe):
fn = os.path.join("MWCS2", "%02i" % filterid,
"%03i_DAYS" % mov_stack,
"%s_%s" % (mov_stack[0], mov_stack[1]),
"%s" % components,
"%s_%s.nc" % (station1, station2))
if not os.path.isdir(os.path.split(fn)[0]):
os.makedirs(os.path.split(fn)[0])
d = dataframe.stack().stack()
d = dataframe.stack(future_stack=True).stack(future_stack=True)
d.index = d.index.set_names(["times", "keys", "taxis"])
d = d.reorder_levels(["times", "taxis", "keys"])
d.columns = ["MWCS"]
Expand All @@ -2289,7 +2296,7 @@ def xr_save_mwcs(station1, station2, components, filterid, mov_stack, taxis, dat

def xr_get_mwcs(station1, station2, components, filterid, mov_stack):
fn = os.path.join("MWCS2", "%02i" % filterid,
"%03i_DAYS" % mov_stack,
"%s_%s" % (mov_stack[0], mov_stack[1]),
"%s" % components,
"%s_%s.nc" % (station1, station2))
if not os.path.isfile(fn):
Expand All @@ -2315,12 +2322,12 @@ def xr_save_dtt(station1, station2, components, filterid, mov_stack, dataframe):
* opened using the given file path, and the stacked data is inserted or updated in the file. The resulting dataset is then saved and the file is closed.
"""
fn = os.path.join("DTT2", "%02i" % filterid,
"%03i_DAYS" % mov_stack,
"%s_%s" % (mov_stack[0], mov_stack[1]),
"%s" % components,
"%s_%s.nc" % (station1, station2))
if not os.path.isdir(os.path.split(fn)[0]):
os.makedirs(os.path.split(fn)[0])
d = dataframe.stack()
d = dataframe.stack(future_stack=True)
d.index = d.index.set_names(["times", "keys"])
d.columns = ["DTT"]
dr = xr_create_or_open(fn, taxis=[], name="DTT")
Expand All @@ -2343,7 +2350,7 @@ def xr_get_dtt(station1, station2, components, filterid, mov_stack):
* result.
"""
fn = os.path.join("DTT2", "%02i" % filterid,
"%03i_DAYS" % mov_stack,
"%s_%s" % (mov_stack[0], mov_stack[1]),
"%s" % components,
"%s_%s.nc" % (station1, station2))
if not os.path.isfile(fn):
Expand All @@ -2356,15 +2363,15 @@ def xr_get_dtt(station1, station2, components, filterid, mov_stack):

def xr_save_dvv(components, filterid, mov_stack, dataframe):
fn = os.path.join("DVV2", "%02i" % filterid,
"%03i_DAYS" % mov_stack,
"%s_%s" % (mov_stack[0], mov_stack[1]),
"%s.nc" % components)
if not os.path.isdir(os.path.split(fn)[0]):
os.makedirs(os.path.split(fn)[0])

if dataframe.columns.nlevels > 1:
d = dataframe.stack().stack()
d = dataframe.stack(future_stack=True).stack(future_stack=True)
else:
d = dataframe.stack()
d = dataframe.stack(future_stack=True)

level_names = ["times", "level1", "level0"]
d.index = d.index.set_names(level_names[:d.index.nlevels])
Expand All @@ -2383,7 +2390,7 @@ def xr_save_dvv(components, filterid, mov_stack, dataframe):

def xr_get_dvv(components, filterid, mov_stack):
fn = os.path.join("DVV2", "%02i" % filterid,
"%03i_DAYS" % mov_stack,
"%s_%s" % (mov_stack[0], mov_stack[1]),
"%s.nc" % components)
if not os.path.isfile(fn):
# logging.error("FILE DOES NOT EXIST: %s, skipping" % fn)
Expand All @@ -2404,7 +2411,7 @@ def wavg(group, dttname, errname):
"""
d = group[dttname]
group[errname][group[errname] == 0] = 1e-6
group.loc[group[errname] == 0,errname] = 1e-6
w = 1. / group[errname]
try:
wavg = (d * w).sum() / w.sum()
Expand Down Expand Up @@ -2434,7 +2441,7 @@ def wstd(group, dttname, errname):
Note: This method uses the `np` module from NumPy.
"""
d = group[dttname]
group[errname][group[errname] == 0] = 1e-6
group.loc[group[errname] == 0,errname] = 1e-6
w = 1. / group[errname]
wavg = (d * w).sum() / w.sum()
N = len(np.nonzero(w.values)[0])
Expand Down
6 changes: 3 additions & 3 deletions msnoise/default.csv
Expand Up @@ -10,7 +10,7 @@ enddate,2100-01-01,End Date to process: [2100-01-01]='No end',str,,,
analysis_duration,86400,Duration of the Analysis in seconds (this shouldn't be changed for now),float,,,
cc_sampling_rate,20,Sampling Rate for the CrossCorrelation (in Hz),float,,,
cc_normalisation,NO,"Normalisation method for individual `corr_duration`) CCFs, default is not normalized (NO), but can be normalized based on the power of the two traces (POW), or by the maximum (MAX) or absolute maximum (ABSMAX) of the CCF",str,NO/POW/MAX/ABSMAX,,
resampling_method,Lanczos,Resampling method,str,Lanczos/Decimate,,
resampling_method,Decimate,Resampling method,str,Lanczos/Decimate,,
preprocess_lowpass,8,Preprocessing Low-pass value (in Hz),float,,,
preprocess_highpass,0.01,Preprocessing High-pass value (in Hz),float,,,
preprocess_max_gap,10,Preprocessing maximum gap length that will be filled by interpolation (in seconds),float,,,
Expand All @@ -35,11 +35,11 @@ cc_type,CC,Cross-Correlation type,str,,,
components_to_compute_single_station,,"List (comma separated) of components within a single station. ZZ would be the autocorrelation of Z component, while ZE or ZN are the cross-components. Defaults to [], no single-station computations are done.",str,,,
cc_type_single_station_AC,CC,Auto-Correlation type,str,,,
cc_type_single_station_SC,CC,Cross-Correlation type for Cross-Components,str,,,
keep_all,N,Keep all cross-corr (length: ``corr_duration``),bool,Y/N,,
keep_all,Y,Keep all cross-corr (length: ``corr_duration``),bool,Y/N,,
keep_days,Y,Keep all daily cross-corr,bool,Y/N,,
ref_begin,1970-01-01,Beginning or REF stacks. Can be absolute (2012-01-01) or relative (-100) days,str,,,
ref_end,2100-01-01,End or REF stacks. Same as ``ref_begin``,str,,,
mov_stack,5,"Number of days to stack for the Moving-window stacks ([5]= [day-4:day]), can be a comma-separated list 1,2,5,10",str,,,
mov_stack,"(('1d','1d'))","A list of two parameters: the time to ""roll"" over (default 1 day) and the granularity (step) of the resulting stacked CCFs (default 1 day) to stack for the Moving-window stacks. This can be a list of tuples, e.g. (('1d','1d'),('2d','1d')) corresponds to the MSNoise 1.6 ""1,2"" before. Time deltas can be anything pandas can interpret (""d"", ""min"", ""sec"", etc).",eval,,,
export_format,MSEED,Export stacks in which format(s) ?,str,SAC/MSEED/BOTH,,
sac_format,doublets,Format for SAC stacks ?,str,doublets/clarke,,
dtt_lag,static,How is the lag window defined,str,dynamic/static,,
Expand Down
17 changes: 10 additions & 7 deletions msnoise/plots/ccftime.py
Expand Up @@ -45,12 +45,14 @@
from ..api import *


def main(sta1, sta2, filterid, components, mov_stack=1, ampli=5, seismic=False,
def main(sta1, sta2, filterid, components, mov_stackid=1, ampli=5, seismic=False,
show=False, outfile=None, envelope=False, refilter=None,
normalize=None, loglevel="INFO", **kwargs):
logger = get_logger('msnoise.cc_plot_ccftime', loglevel,
with_pid=True)
db = connect()
params = get_params(db)
mov_stack = params.mov_stack[mov_stackid-1]
maxlag = float(get_config(db, 'maxlag'))
samples = get_maxlag_samples(db)
cc_sampling_rate = float(get_config(db, 'cc_sampling_rate'))
Expand All @@ -76,8 +78,8 @@ def main(sta1, sta2, filterid, components, mov_stack=1, ampli=5, seismic=False,

pair = "%s:%s" % (sta1, sta2)

logger.info("Fetching CCF data for %s-%s-%i-%i" % (pair, components, filterid,
mov_stack))
logger.info("Fetching CCF data for %s-%s-%i-%s" % (pair, components, filterid,
str(mov_stack)))
try:
stack_total = xr_get_ccf(sta1, sta2, components, filterid, mov_stack, taxis)
except FileNotFoundError as fullpath:
Expand Down Expand Up @@ -118,9 +120,9 @@ def main(sta1, sta2, filterid, components, mov_stack=1, ampli=5, seismic=False,
plt.xlabel("Lag Time (s)")
plt.axhline(0, lw=0.5, c='k')
plt.grid()
title = '%s : %s, %s, Filter %d (%.2f - %.2f Hz), Stack %d' %\
title = '%s : %s, %s, Filter %d (%.2f - %.2f Hz), Stack %i (%s_%s)' %\
(sta1, sta2, components,
filterid, low, high, mov_stack)
filterid, low, high, mov_stackid, mov_stack[0], mov_stack[1])
if refilter:
title += ", Re-filtered (%.2f - %.2f Hz)" % (freqmin, freqmax)
plt.title(title)
Expand All @@ -138,10 +140,11 @@ def main(sta1, sta2, filterid, components, mov_stack=1, ampli=5, seismic=False,
if outfile:
if outfile.startswith("?"):
pair = pair.replace(':', '-')
outfile = outfile.replace('?', '%s-%s-f%i-m%i' % (pair,
outfile = outfile.replace('?', '%s-%s-f%i-m%s_%s' % (pair,
components,
filterid,
mov_stack))
mov_stack[0],
mov_stack[1]))
outfile = "ccftime " + outfile
logger.info("output to: %s" % outfile)
plt.savefig(outfile)
Expand Down
19 changes: 11 additions & 8 deletions msnoise/plots/dvv.py
Expand Up @@ -20,24 +20,26 @@
from ..api import *


def main(mov_stack=None, dttname="M", components='ZZ', filterid=1,
def main(mov_stackid=None, dttname="M", components='ZZ', filterid=1,
pairs=[], showALL=False, show=False, outfile=None, loglevel="INFO"):
logger = get_logger('msnoise.cc_dvv_plot_dvv', loglevel,
with_pid=True)
db = connect()
params = get_params(db)
start, end, datelist = build_movstack_datelist(db)

if mov_stack and mov_stack != 0:
if mov_stackid and mov_stackid != "":
mov_stack = params.mov_stack[mov_stackid - 1]
mov_stacks = [mov_stack, ]
print(mov_stack)
else:
mov_stacks = params.mov_stack

if components.count(","):
components = components.split(",")
else:
components = [components, ]

print(mov_stacks)
low = high = 0.0
filter = get_filters(db, ref=filterid)
low = float(filter.low)
Expand All @@ -52,7 +54,7 @@ def main(mov_stack=None, dttname="M", components='ZZ', filterid=1,
plt.sca(axes[i])
except:
plt.sca(axes)
plt.title('%i Days Moving Window' % mov_stack)
# plt.title('%s Moving Window' % mov_stack)
for comps in components:
try:
dvv = xr_get_dvv(comps, filterid, mov_stack)
Expand All @@ -69,9 +71,9 @@ def main(mov_stack=None, dttname="M", components='ZZ', filterid=1,
plt.legend(bbox_to_anchor=(0., 1.02, 1., .102), loc=4,
ncol=2, borderaxespad=0.)
left, right = dvv.index[0], dvv.index[-1]
plt.title('1 Day')
plt.title("Stack %i (%s_%s)"% (mov_stackid or i+1, mov_stack[0], mov_stack[1]))
else:
plt.title('%i Days Moving Window' % mov_stack)
plt.title("Stack %i (%s_%s)"% (i+1, mov_stack[0], mov_stack[1]))
plt.xlim(left, right)

plt.grid(True)
Expand All @@ -84,9 +86,10 @@ def main(mov_stack=None, dttname="M", components='ZZ', filterid=1,
if outfile:
if outfile.startswith("?"):
if len(mov_stacks) == 1:
outfile = outfile.replace('?', '%s-f%i-m%i-M%s' % (components,
outfile = outfile.replace('?', '%s-f%i-m%s_%s-M%s' % (components,
filterid,
mov_stack,
mov_stack[0],
mov_stack[1],
dttname))
else:
outfile = outfile.replace('?', '%s-f%i-M%s' % (components,
Expand Down

0 comments on commit e174246

Please sign in to comment.