From 31dc69b39c54ccbd953b6d0b7651d61f00f3e63a Mon Sep 17 00:00:00 2001 From: Ignatious Johnson Christopher Date: Mon, 18 Jul 2016 14:56:40 +0000 Subject: [PATCH] Refactoring cassandra repair and cassandra status code so that it can be used in config node, when config node has seperate config db. Change-Id: I300e5b3a25e4a0521071e9235f53544c28e94421 Partial-Bug: 1578826 --- src/nodemgr/SConscript | 3 +- src/nodemgr/common/event_manager.py | 34 +++++++++++++------ .../config_nodemgr/config_event_manager.py | 22 ++++++++++-- src/nodemgr/database_nodemgr/common.py | 21 ++++++++++++ .../database_event_manager.py | 20 +++-------- src/nodemgr/main.py | 5 ++- .../vrouter_nodemgr/vrouter_event_manager.py | 4 +-- 7 files changed, 78 insertions(+), 31 deletions(-) create mode 100644 src/nodemgr/database_nodemgr/common.py diff --git a/src/nodemgr/SConscript b/src/nodemgr/SConscript index 8b59c600c03..bffccd0f6bd 100644 --- a/src/nodemgr/SConscript +++ b/src/nodemgr/SConscript @@ -65,7 +65,8 @@ for file in vrouter_sources: database_sources = [ '__init__.py', - 'database_nodemgr/database_event_manager.py' + 'database_nodemgr/database_event_manager.py', + 'database_nodemgr/common.py' ] database_sources_rules = [] for file in database_sources: diff --git a/src/nodemgr/common/event_manager.py b/src/nodemgr/common/event_manager.py index fcb0a805945..49e6f74c6de 100644 --- a/src/nodemgr/common/event_manager.py +++ b/src/nodemgr/common/event_manager.py @@ -16,6 +16,7 @@ from subprocess import Popen, PIPE import supervisor.xmlrpc import xmlrpclib +import platform from supervisor import childutils from nodemgr.common.event_listener_protocol_nodemgr import \ @@ -31,6 +32,17 @@ from pysandesh.sandesh_logger import * from pysandesh.gen_py.sandesh.ttypes import SandeshLevel + +def package_installed(pkg): + (pdist, _, _) = platform.dist() + if pdist == 'Ubuntu': + cmd = "dpkg -l " + pkg + else: + cmd = "rpm -q " + pkg + with open(os.devnull, "w") as fnull: + return (not subprocess.call(cmd.split(), stdout=fnull, stderr=fnull)) + + class EventManager(object): rules_data = [] group_names = [] @@ -520,7 +532,7 @@ def event_process_communication(self, pdata): cmd_and_args = ['/usr/bin/bash', '-c', rules['action']] subprocess.Popen(cmd_and_args) - def event_tick_60(self, prev_current_time): + def event_tick_60(self): self.tick_count += 1 # get disk usage info periodically disk_usage_info = self.get_disk_usage() @@ -553,29 +565,29 @@ def event_tick_60(self, prev_current_time): node_status_uve.send() current_time = int(time.time()) - if ((abs(current_time - prev_current_time)) > 300): + if ((abs(current_time - self.prev_current_time)) > 300): # update all process start_times with the updated time # Compute the elapsed time and subtract them from # current time to get updated values sys.stderr.write( "Time lapse detected " + - str(abs(current_time - prev_current_time)) + "\n") + str(abs(current_time - self.prev_current_time)) + "\n") for key in self.process_state_db: pstat = self.process_state_db[key] if pstat.start_time is not '': pstat.start_time = str( - (int(current_time - (prev_current_time - + (int(current_time - (self.prev_current_time - ((int)(pstat.start_time)) / 1000000))) * 1000000) if (pstat.process_state == 'PROCESS_STATE_STOPPED'): if pstat.stop_time is not '': pstat.stop_time = str( - int(current_time - (prev_current_time - + int(current_time - (self.prev_current_time - ((int)(pstat.stop_time)) / 1000000)) * 1000000) if (pstat.process_state == 'PROCESS_STATE_EXITED'): if pstat.exit_time is not '': pstat.exit_time = str( - int(current_time - (prev_current_time - + int(current_time - (self.prev_current_time - ((int)(pstat.exit_time)) / 1000000)) * 1000000) # update process state database @@ -592,11 +604,13 @@ def event_tick_60(self, prev_current_time): sys.stderr.write("Unable to write json") pass self.send_process_state_db(self.group_names) - prev_current_time = int(time.time()) - return prev_current_time + self.prev_current_time = int(time.time()) + + def do_periodic_events(self): + self.event_tick_60() def runforever(self, test=False): - prev_current_time = int(time.time()) + self.prev_current_time = int(time.time()) while 1: # we explicitly use self.stdin, self.stdout, and self.stderr # instead of sys.* so we can unit test this code @@ -612,5 +626,5 @@ def runforever(self, test=False): self.event_process_communication(pdata) # do periodic events if headers['eventname'].startswith("TICK_60"): - prev_current_time = self.event_tick_60(prev_current_time) + self.do_periodic_events() self.listener_nodemgr.ok(self.stdout) diff --git a/src/nodemgr/config_nodemgr/config_event_manager.py b/src/nodemgr/config_nodemgr/config_event_manager.py index c782ff16907..ba24f6e6979 100644 --- a/src/nodemgr/config_nodemgr/config_event_manager.py +++ b/src/nodemgr/config_nodemgr/config_event_manager.py @@ -16,7 +16,8 @@ import gevent import ConfigParser -from nodemgr.common.event_manager import EventManager +from nodemgr.common.event_manager import EventManager, package_installed +from nodemgr.database_nodemgr.common import CassandraManager from ConfigParser import NoOptionError @@ -39,11 +40,16 @@ class ConfigEventManager(EventManager): def __init__(self, rule_file, discovery_server, - discovery_port, collector_addr): + discovery_port, collector_addr, + cassandra_repair_interval, + cassandra_repair_logdir): self.node_type = "contrail-config" self.table = "ObjectConfigNode" self.module = Module.CONFIG_NODE_MGR self.module_id = ModuleNames[self.module] + self.cassandra_repair_interval = cassandra_repair_interval + self.cassandra_repair_logdir = cassandra_repair_logdir + self.cassandra_mgr = CassandraManager(cassandra_repair_logdir) self.supervisor_serverurl = "unix:///var/run/supervisord_config.sock" self.add_current_process() node_type = Module2NodeType[self.module] @@ -87,3 +93,15 @@ def get_node_third_party_process_list(self): def get_process_state(self, fail_status_bits): return self.get_process_state_base( fail_status_bits, ProcessStateNames, ProcessState) + + def do_periodic_events(self): + db = package_installed('contrail-opesntack-database') + config_db = package_installed('contrail-database-common') + if not db and config_db: + # Record cluster status and shut down cassandra if needed + self.cassandra_mgr.status() + self.event_tick_60() + if not db and config_db: + # Perform nodetool repair every cassandra_repair_interval hours + if self.tick_count % (60 * self.cassandra_repair_interval) == 0: + self.cassandra_mgr.repair() diff --git a/src/nodemgr/database_nodemgr/common.py b/src/nodemgr/database_nodemgr/common.py new file mode 100644 index 00000000000..f11e456295e --- /dev/null +++ b/src/nodemgr/database_nodemgr/common.py @@ -0,0 +1,21 @@ +# +# Copyright (c) 2016 Juniper Networks, Inc. All rights reserved. +# + +import subprocess + + +class CassandraManager(object): + def __init__(self, cassandra_repair_logdir): + self.cassandra_repair_logdir = cassandra_repair_logdir + + def status(self): + subprocess.Popen(["contrail-cassandra-status", + "--log-file", "/var/log/cassandra/status.log", + "--debug"]) + + def repair(self): + logdir = self.cassandra_repair_logdir + "repair.log" + subprocess.Popen(["contrail-cassandra-repair", + "--log-file", logdir, + "--debug"]) diff --git a/src/nodemgr/database_nodemgr/database_event_manager.py b/src/nodemgr/database_nodemgr/database_event_manager.py index 533c36e8185..f63f9605ddd 100644 --- a/src/nodemgr/database_nodemgr/database_event_manager.py +++ b/src/nodemgr/database_nodemgr/database_event_manager.py @@ -18,6 +18,7 @@ import yaml from nodemgr.common.event_manager import EventManager +from nodemgr.database_nodemgr.common import CassandraManager from ConfigParser import NoOptionError @@ -59,6 +60,7 @@ def __init__(self, rule_file, discovery_server, self.contrail_databases = contrail_databases self.cassandra_repair_interval = cassandra_repair_interval self.cassandra_repair_logdir = cassandra_repair_logdir + self.cassandra_mgr = CassandraManager(cassandra_repair_logdir) self.supervisor_serverurl = "unix:///var/run/supervisord_database.sock" self.add_current_process() node_type = Module2NodeType[self.module] @@ -245,9 +247,7 @@ def database_periodic(self): # Send cassandra nodetool information self.send_database_status() # Record cluster status and shut down cassandra if needed - subprocess.Popen(["contrail-cassandra-status", - "--log-file", "/var/log/cassandra/status.log", - "--debug"]) + self.cassandra_mgr.status() # end database_periodic def send_database_status(self): @@ -309,15 +309,8 @@ def get_tp_status(self,tp_stats_output): return thread_pool_stats_list # end get_tp_status - def cassandra_repair(self): - logdir = self.cassandra_repair_logdir + "repair.log" - subprocess.Popen(["contrail-cassandra-repair", - "--log-file", logdir, - "--debug"]) - #end cassandra_repair - def runforever(self, test=False): - prev_current_time = int(time.time()) + self.prev_current_time = int(time.time()) while 1: # we explicitly use self.stdin, self.stdout, and self.stderr # instead of sys.* so we can unit test this code @@ -340,8 +333,5 @@ def runforever(self, test=False): # do periodic events if headers['eventname'].startswith("TICK_60"): self.database_periodic() - prev_current_time = self.event_tick_60(prev_current_time) - # Perform nodetool repair every cassandra_repair_interval hours - if self.tick_count % (60 * self.cassandra_repair_interval) == 0: - self.cassandra_repair() + self.event_tick_60() self.listener_nodemgr.ok(self.stdout) diff --git a/src/nodemgr/main.py b/src/nodemgr/main.py index 1f2e7479eaf..509583ce23f 100755 --- a/src/nodemgr/main.py +++ b/src/nodemgr/main.py @@ -164,9 +164,12 @@ def main(args_str=' '.join(sys.argv[1:])): rule_file, discovery_server, discovery_port, collector_addr) elif (node_type == 'contrail-config'): + cassandra_repair_interval = _args.cassandra_repair_interval + cassandra_repair_logdir = _args.cassandra_repair_logdir prog = ConfigEventManager( rule_file, discovery_server, - discovery_port, collector_addr) + discovery_port, collector_addr, + cassandra_repair_interval, cassandra_repair_logdir) elif (node_type == 'contrail-control'): prog = ControlEventManager( rule_file, discovery_server, diff --git a/src/nodemgr/vrouter_nodemgr/vrouter_event_manager.py b/src/nodemgr/vrouter_nodemgr/vrouter_event_manager.py index c4f58691b90..64af29eb252 100644 --- a/src/nodemgr/vrouter_nodemgr/vrouter_event_manager.py +++ b/src/nodemgr/vrouter_nodemgr/vrouter_event_manager.py @@ -111,7 +111,7 @@ def delete_process_handler(self, deleted_process): # end delete_process_handler def runforever(self, test=False): - prev_current_time = int(time.time()) + self.prev_current_time = int(time.time()) while 1: # we explicitly use self.stdin, self.stdout, and self.stderr # instead of sys.* so we can unit test this code @@ -137,7 +137,7 @@ def runforever(self, test=False): self.event_process_communication(pdata) # do periodic events if headers['eventname'].startswith("TICK_60"): - prev_current_time = self.event_tick_60(prev_current_time) + self.event_tick_60() # loadbalancer processing self.lb_stats.send_loadbalancer_stats()