Skip to content

Commit

Permalink
Bugfix scan_archive and new_jobs
Browse files Browse the repository at this point in the history
* Refactored new_jobs procedure, more robust way of defining the jobs to do
* Bugfix in new_jobs: for some reason, dates were not properly compared in the database_tools
* Because of the previous: rewrite of a small part of data_availability plot was needed
* added a get_tech method to database_tools to check the type of DB used
* thanks to the previous: force usage of only 1 process in scan_archive if using SQLite
* some minor PEP8 and style related errors
  • Loading branch information
ThomasLecocq committed Apr 28, 2014
1 parent 31f5fa5 commit 02e622e
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 77 deletions.
26 changes: 16 additions & 10 deletions database_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,30 @@
import logging
from msnoise_table_def import *

def get_tech():
tech, hostname, database, username, password = read_database_inifile()
return tech

def connect():
tech, hostname, database, username, password = read_database_inifile()
if tech == 1:
engine = create_engine('sqlite:///%s'%hostname, echo=False)
engine = create_engine('sqlite:///%s' % hostname, echo=False)
else:
engine = create_engine('mysql://%s:%s@%s/%s'%(username, password,
hostname, database),
echo=False, poolclass=NullPool)
engine = create_engine('mysql://%s:%s@%s/%s' % (username, password,
hostname, database),
echo=False, poolclass=NullPool)
Session = sessionmaker(bind=engine)
return Session()


def create_database_inifile(tech, hostname ,database, username, password):
f = open('db.ini','w')
cPickle.dump([tech, hostname, database, username, password],f)
def create_database_inifile(tech, hostname, database, username, password):
f = open('db.ini', 'w')
cPickle.dump([tech, hostname, database, username, password], f)
f.close()


def read_database_inifile():
f = open('db.ini','r')
f = open('db.ini', 'r')
tech, hostname, database, username, password = cPickle.load(f)
f.close()
return [tech, hostname, database, username, password]
Expand Down Expand Up @@ -149,7 +153,8 @@ def update_data_availability(session, net, sta, comp, path, file, starttime, end
toreturn = True
else:
modified = False
for item in ['net', 'sta', 'comp', 'path', 'starttime', 'data_duration', 'gaps_duration', 'samplerate']:
for item in ['net', 'sta', 'comp', 'path', 'starttime', 'endtime',
'data_duration', 'gaps_duration', 'samplerate']:
if eval("data.%s != %s" % (item, item)):
modified = True
break
Expand All @@ -159,6 +164,7 @@ def update_data_availability(session, net, sta, comp, path, file, starttime, end
data.comp = comp
data.path = path
data.starttime = starttime
data.endtime = endtime
data.data_duration = data_duration
data.gaps_duration = gaps_duration
data.samplerate = samplerate
Expand All @@ -176,7 +182,7 @@ def get_data_availability(session, net=None, sta=None, comp=None, starttime=None
if not starttime:
data = session.query(DataAvailability).filter(DataAvailability.net == net).filter(DataAvailability.sta == sta).filter(DataAvailability.comp == comp).all()
if not net:
data = session.query(DataAvailability).filter(func.DATE(DataAvailability.starttime) <= endtime).filter(func.DATE(DataAvailability.endtime) >= starttime).all()
data = session.query(DataAvailability).filter(DataAvailability.starttime <= endtime).filter(DataAvailability.endtime >= starttime).all()
else:
data = session.query(DataAvailability).filter(DataAvailability.net == net).filter(DataAvailability.sta == sta).filter(func.DATE(DataAvailability.starttime) <= starttime.date()).filter(func.DATE(DataAvailability.endtime) >= endtime.date()).all()
return data
Expand Down
39 changes: 21 additions & 18 deletions plot_dataavailability.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,44 +5,48 @@
import matplotlib.dates
import numpy as np
import matplotlib.gridspec as gridspec
import datetime

db = connect()
start, end, datelist = build_movstack_datelist(db)
print dir(start)
dates = []
stations = []
for day in datelist:
# print day
data = get_data_availability(db,starttime=day, endtime=day)
daystart = datetime.datetime.combine(day, datetime.time(0, 0, 0))
dayend = datetime.datetime.combine(day, datetime.time(23, 59, 59))
data = get_data_availability(db, starttime=daystart, endtime=dayend)
for di in data:
stations.append("%s.%s"%(di.net,di.sta))
stations.append("%s.%s" % (di.net, di.sta))
dates.append(di.starttime)

data = pd.DataFrame({"stations":stations},index=dates)
data = pd.DataFrame({"stations": stations}, index=dates)
data = data.groupby('stations')

llen = (end-start).days +1
llen = (end-start).days + 1
ngroups = len(data.groups.keys())
matrix = np.zeros((ngroups,llen))
start = datetime.datetime.combine(start, datetime.time(0,0,0))
matrix = np.zeros((ngroups, llen))
start = datetime.datetime.combine(start, datetime.time(0, 0, 0))

for i, group in enumerate(sorted(data.groups.keys())):
print group
new = True
new = True
for di in data.groups[group]:
if new:
print group, di
new= False
dt= (di-start).days
matrix[i,dt] = 1
new = False
dt = (di-start).days
matrix[i, dt] = 1

gs = gridspec.GridSpec(2, 1,height_ratios=[4, 1])
gs = gridspec.GridSpec(2, 1, height_ratios=[4, 1])

plt.figure(figsize=(11.8,8.4))
plt.figure(figsize=(11.8, 8.4))
ax = plt.subplot(gs[0])
plt.imshow(matrix,interpolation="none",aspect='auto',cmap='bwr',vmin=-1,vmax=1,extent=(date2num(start),date2num(end),0,ngroups),origin='lower')
plt.imshow(matrix, interpolation="none", aspect='auto', cmap='bwr',
vmin=-1, vmax=1, extent=(date2num(start), date2num(end),
0, ngroups),
origin='lower')

plt.yticks(np.arange(ngroups)+0.5,sorted(data.groups.keys()))
plt.yticks(np.arange(ngroups)+0.5, sorted(data.groups.keys()))
ax.xaxis.set_major_locator(
matplotlib.dates.MonthLocator())

Expand All @@ -54,10 +58,9 @@


ax = plt.subplot(gs[1])
plt.plot(datelist,np.sum(matrix,axis=0))
plt.plot(datelist, np.sum(matrix, axis=0))
plt.ylabel('N stations')
plt.gcf().autofmt_xdate()
plt.grid()

plt.show()

50 changes: 34 additions & 16 deletions s01scan_archive.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""One advantage of MSNoise is its ability to be used as an automated
"""One advantage of MSNoise is its ability to be used as an automated
monitoring tool. To run every night on the data acquired during the
previous day, MSNoise needs to check the data archive for new or modified
files. Those files could have been acquired during the last day, but be data of
Expand All @@ -17,24 +17,24 @@
Special case: first run
~~~~~~~~~~~~~~~~~~~~~~~~
This script is the same as for the routine, but one has to pass the init argument:
This script is the same as for the routine, but one has to pass the init
argument:
.. code-block:: sh
python s01scan_archive.py --init -t 2
This will scan the data_archive folder the configured stations and will insert
all files found in the data_availability table in the database. As usual, calling the
script with a --help argument will show its usage.
all files found in the data_availability table in the database. As usual,
calling the script with a --help argument will show its usage.
"""
from obspy.core import read
import glob
import sys
import os
import datetime
import time
import logging
import threading
import logging.handlers
from subprocess import Popen, PIPE
from multiprocessing import Process
import multiprocessing
Expand All @@ -43,10 +43,13 @@
from database_tools import *
from data_structures import data_structure

def worker(files, folder,startdate, enddate):

def worker(files, folder, startdate, enddate):
# logger = logging.getLogger('worker')

db = connect()
for file in files:
file = os.path.join(folder,file)
file = os.path.join(folder, file)
try:
r0 = time.time()
name = os.path.split(file)[1]
Expand Down Expand Up @@ -83,7 +86,9 @@ def worker(files, folder,startdate, enddate):
r1 = time.time()

result = update_data_availability(
db, net, sta, comp, path, name, starttime.datetime, endtime.datetime, data_duration, gaps_duration, data[0].stats.sampling_rate)
db, net, sta, comp, path, name, starttime.datetime,
endtime.datetime, data_duration, gaps_duration,
data[0].stats.sampling_rate)

r2 = time.time()
if result:
Expand All @@ -110,8 +115,14 @@ def worker(files, folder,startdate, enddate):
parser.add_argument('-t', '--threads',
help='Number of parellel threads to use [default:1]', default=1, type=int)
args = parser.parse_args()



# rootLogger = logging.getLogger('')
# rootLogger.setLevel(logging.DEBUG)
# socketHandler = logging.handlers.SocketHandler('localhost',
# logging.handlers.DEFAULT_TCP_LOGGING_PORT)
# rootLogger.addHandler(socketHandler)
# global logger
# logger = logging.getLogger('main')

multiprocessing.log_to_stderr()
global logger
Expand All @@ -123,20 +134,25 @@ def worker(files, folder,startdate, enddate):

logger.info('*** Starting: Scan Archive ***')
db = connect()

init = False
mtime = -2

if args.init:
logger.info("Initializing (should be run only once)")
mtime = "-20000"
init = True
else:
mtime = "%s" % mtime

nthreads = 1
if args.threads:
nthreads = args.threads
if get_tech() == 1:
logger.info("You can not work on %i threads because SQLite only\
supports 1 connection at a time" % nthreads)
nthreads = 1

logger.info("Will work on %i threads" % nthreads)

if os.name == "nt":
Expand Down Expand Up @@ -176,10 +192,12 @@ def worker(files, folder,startdate, enddate):

if len(stdout) != 0:
files = sorted(stdout.split('\n'))

else:
files = []

if '' in files:
files.remove('')

if len(files) != 0:
logger.info('Started: %s'%folder)
client = Process(target=worker, args=([files,folder,startdate,enddate]))
Expand Down
37 changes: 10 additions & 27 deletions s02new_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,23 @@
format='%(asctime)s [%(levelname)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
filemode='w')

console = logging.StreamHandler()
console.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s')
console.setFormatter(formatter)
logging.getLogger('').addHandler(console)



logging.info('*** Starting: New Jobs ***')

db = connect()

if get_config(db, name="autocorr") in ['Y', 'y', '1', 1]:
AUTOCORR = True
else:
AUTOCORR = False

stations_to_analyse = [sta.sta for sta in get_stations(db, all=False)]
nfs = get_new_files(db)

days = {}
old_day = 0
old_pair = ""
Expand All @@ -50,21 +47,14 @@
# logging.debug('%s.%s will be MASTER for %s-%s'% (nf.net, nf.sta, nf.starttime, nf.endtime))
if nf.sta in stations_to_analyse:
day = "%s" % (nf.starttime.date())
if day != old_day:
day_pairs = np.unique(day_pairs)
for pair in day_pairs:
logging.debug('New Job for: %s - %s' % (day, pair))
# add_job(db, day, pair,type='CC',flag='T',commit=False)
jobs.append([day, pair, 'CC', 'T'])
day_pairs = []
old_day = day

available_stations = []
for station in get_data_availability(db, starttime=nf.starttime, endtime=nf.endtime):
if station.sta in stations_to_analyse:
if '%s.%s' % (station.net, station.sta) not in available_stations:
available_stations.append(
'%s.%s' % (station.net, station.sta))
# logging.debug('Will process %s.%s vs %s.%s'% (nf.net, nf.sta, station.net, station.sta))

stations = np.array([])
pairs = []
Expand All @@ -76,29 +66,22 @@
else:
if i == 0:
pairs = np.array(':'.join(sorted([nS, aS])))
# stations = np.array([nS, aS])
i += 1
else:
pairs = np.vstack((pairs, ':'.join(sorted([nS, aS]))))
# stations = np.append(stations, np.array([nS,aS]))

pairs = np.unique(pairs)
for pair in pairs:
day_pairs.append(pair)

if day != old_day and day_pairs != []:
day_pairs = np.unique(day_pairs)
for pair in day_pairs:
logging.debug('New Job for: %s - %s' % (day, pair))
# add_job(db, day, pair,type='CC',flag='T')
jobs.append([day, pair, 'CC', 'T'])
daypair = "%s=%s" % (day, pair)
if daypair not in jobs:
jobs.append(daypair)

count = len(jobs)
logging.debug("Found %i new jobs to do" % count)
alljobs = []
for job in jobs:
day, pair, type, flag = job
job = update_job(db, day, pair, type, flag, commit=False, returnjob=True)
day, pair = job.split("=")
job = update_job(db, day, pair, type='CC', flag='T', commit=False, returnjob=True)
alljobs.append(job)
if i % 100 == 0:
logging.debug("Committing 100 jobs")
Expand Down
9 changes: 3 additions & 6 deletions s03compute_cc.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@
trace is padded with zeros. If longer, it is cut to match the start/end of the
day.
Each 1-day long trace is then low-passed (at ``preprocess_lowpass`` Hz),
Each 1-day long trace is then low-passed (at ``preprocess_lowpass`` Hz),
high-passed (at ``preprocess_highpass`` Hz), then decimated/downsampled.
Decimation/Downsampling are configurable (``resampling_method``) and users are
advised testing both. One advantage of Downsampling over Decimation is that
it is able to downsample the data by any factor, not only integer factors.
.. warning::
.. warning::
For an unknown reason, the PAZ-correction has disappeard from the
current sqlvolution on GitHub: CHECK!
Expand Down Expand Up @@ -65,15 +65,12 @@

import numpy as np
from obspy.core import read, utcdatetime, Stream
from obspy.signal import cosTaper
from obspy.signal.filter import lowpass, highpass
from scikits.samplerate import resample
import time
import calendar
import datetime
import sys
import os
import scipy.fftpack
from database_tools import *
from myCorr import myCorr
from whiten import whiten
Expand Down Expand Up @@ -402,7 +399,7 @@
trame2h[i], Nfft, dt, low, high, plot=False)
else:
# logging.debug("Station no %d, pas de pretraitement car rms < %f ou NaN"% (i, rms_threshold))
trames2hWb[i] = np.zeros(Nfft)
trames2hWb[i] = np.zeros(int(Nfft))

corr = myCorr(trames2hWb, np.ceil(maxlag / dt), plot=False)

Expand Down

0 comments on commit 02e622e

Please sign in to comment.