Skip to content

Commit

Permalink
Merge "Refactoring cassandra repair and cassandra status code so that…
Browse files Browse the repository at this point in the history
… it can be used in config node, when config node has seperate config db." into R3.1
  • Loading branch information
Zuul authored and opencontrail-ci-admin committed Jul 28, 2016
2 parents 622ae7b + 31dc69b commit 125297c
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 31 deletions.
3 changes: 2 additions & 1 deletion src/nodemgr/SConscript
Expand Up @@ -65,7 +65,8 @@ for file in vrouter_sources:

database_sources = [
'__init__.py',
'database_nodemgr/database_event_manager.py'
'database_nodemgr/database_event_manager.py',
'database_nodemgr/common.py'
]
database_sources_rules = []
for file in database_sources:
Expand Down
34 changes: 24 additions & 10 deletions src/nodemgr/common/event_manager.py
Expand Up @@ -16,6 +16,7 @@
from subprocess import Popen, PIPE
import supervisor.xmlrpc
import xmlrpclib
import platform

from supervisor import childutils
from nodemgr.common.event_listener_protocol_nodemgr import \
Expand All @@ -31,6 +32,17 @@
from pysandesh.sandesh_logger import *
from pysandesh.gen_py.sandesh.ttypes import SandeshLevel


def package_installed(pkg):
(pdist, _, _) = platform.dist()
if pdist == 'Ubuntu':
cmd = "dpkg -l " + pkg
else:
cmd = "rpm -q " + pkg
with open(os.devnull, "w") as fnull:
return (not subprocess.call(cmd.split(), stdout=fnull, stderr=fnull))


class EventManager(object):
rules_data = []
group_names = []
Expand Down Expand Up @@ -518,7 +530,7 @@ def event_process_communication(self, pdata):
cmd_and_args = ['/usr/bin/bash', '-c', rules['action']]
subprocess.Popen(cmd_and_args)

def event_tick_60(self, prev_current_time):
def event_tick_60(self):
self.tick_count += 1
# get disk usage info periodically
disk_usage_info = self.get_disk_usage()
Expand Down Expand Up @@ -551,29 +563,29 @@ def event_tick_60(self, prev_current_time):
node_status_uve.send()

current_time = int(time.time())
if ((abs(current_time - prev_current_time)) > 300):
if ((abs(current_time - self.prev_current_time)) > 300):
# update all process start_times with the updated time
# Compute the elapsed time and subtract them from
# current time to get updated values
sys.stderr.write(
"Time lapse detected " +
str(abs(current_time - prev_current_time)) + "\n")
str(abs(current_time - self.prev_current_time)) + "\n")
for key in self.process_state_db:
pstat = self.process_state_db[key]
if pstat.start_time is not '':
pstat.start_time = str(
(int(current_time - (prev_current_time -
(int(current_time - (self.prev_current_time -
((int)(pstat.start_time)) / 1000000))) * 1000000)
if (pstat.process_state == 'PROCESS_STATE_STOPPED'):
if pstat.stop_time is not '':
pstat.stop_time = str(
int(current_time - (prev_current_time -
int(current_time - (self.prev_current_time -
((int)(pstat.stop_time)) / 1000000)) *
1000000)
if (pstat.process_state == 'PROCESS_STATE_EXITED'):
if pstat.exit_time is not '':
pstat.exit_time = str(
int(current_time - (prev_current_time -
int(current_time - (self.prev_current_time -
((int)(pstat.exit_time)) / 1000000)) *
1000000)
# update process state database
Expand All @@ -590,11 +602,13 @@ def event_tick_60(self, prev_current_time):
sys.stderr.write("Unable to write json")
pass
self.send_process_state_db(self.group_names)
prev_current_time = int(time.time())
return prev_current_time
self.prev_current_time = int(time.time())

def do_periodic_events(self):
self.event_tick_60()

def runforever(self, test=False):
prev_current_time = int(time.time())
self.prev_current_time = int(time.time())
while 1:
# we explicitly use self.stdin, self.stdout, and self.stderr
# instead of sys.* so we can unit test this code
Expand All @@ -610,5 +624,5 @@ def runforever(self, test=False):
self.event_process_communication(pdata)
# do periodic events
if headers['eventname'].startswith("TICK_60"):
prev_current_time = self.event_tick_60(prev_current_time)
self.do_periodic_events()
self.listener_nodemgr.ok(self.stdout)
22 changes: 20 additions & 2 deletions src/nodemgr/config_nodemgr/config_event_manager.py
Expand Up @@ -16,7 +16,8 @@
import gevent
import ConfigParser

from nodemgr.common.event_manager import EventManager
from nodemgr.common.event_manager import EventManager, package_installed
from nodemgr.database_nodemgr.common import CassandraManager

from ConfigParser import NoOptionError

Expand All @@ -39,11 +40,16 @@

class ConfigEventManager(EventManager):
def __init__(self, rule_file, discovery_server,
discovery_port, collector_addr):
discovery_port, collector_addr,
cassandra_repair_interval,
cassandra_repair_logdir):
self.node_type = "contrail-config"
self.table = "ObjectConfigNode"
self.module = Module.CONFIG_NODE_MGR
self.module_id = ModuleNames[self.module]
self.cassandra_repair_interval = cassandra_repair_interval
self.cassandra_repair_logdir = cassandra_repair_logdir
self.cassandra_mgr = CassandraManager(cassandra_repair_logdir)
self.supervisor_serverurl = "unix:///var/run/supervisord_config.sock"
self.add_current_process()
node_type = Module2NodeType[self.module]
Expand Down Expand Up @@ -87,3 +93,15 @@ def get_node_third_party_process_list(self):
def get_process_state(self, fail_status_bits):
return self.get_process_state_base(
fail_status_bits, ProcessStateNames, ProcessState)

def do_periodic_events(self):
db = package_installed('contrail-opesntack-database')
config_db = package_installed('contrail-database-common')
if not db and config_db:
# Record cluster status and shut down cassandra if needed
self.cassandra_mgr.status()
self.event_tick_60()
if not db and config_db:
# Perform nodetool repair every cassandra_repair_interval hours
if self.tick_count % (60 * self.cassandra_repair_interval) == 0:
self.cassandra_mgr.repair()
21 changes: 21 additions & 0 deletions src/nodemgr/database_nodemgr/common.py
@@ -0,0 +1,21 @@
#
# Copyright (c) 2016 Juniper Networks, Inc. All rights reserved.
#

import subprocess


class CassandraManager(object):
def __init__(self, cassandra_repair_logdir):
self.cassandra_repair_logdir = cassandra_repair_logdir

def status(self):
subprocess.Popen(["contrail-cassandra-status",
"--log-file", "/var/log/cassandra/status.log",
"--debug"])

def repair(self):
logdir = self.cassandra_repair_logdir + "repair.log"
subprocess.Popen(["contrail-cassandra-repair",
"--log-file", logdir,
"--debug"])
20 changes: 5 additions & 15 deletions src/nodemgr/database_nodemgr/database_event_manager.py
Expand Up @@ -18,6 +18,7 @@
import yaml

from nodemgr.common.event_manager import EventManager
from nodemgr.database_nodemgr.common import CassandraManager

from ConfigParser import NoOptionError

Expand Down Expand Up @@ -59,6 +60,7 @@ def __init__(self, rule_file, discovery_server,
self.contrail_databases = contrail_databases
self.cassandra_repair_interval = cassandra_repair_interval
self.cassandra_repair_logdir = cassandra_repair_logdir
self.cassandra_mgr = CassandraManager(cassandra_repair_logdir)
self.supervisor_serverurl = "unix:///var/run/supervisord_database.sock"
self.add_current_process()
node_type = Module2NodeType[self.module]
Expand Down Expand Up @@ -245,9 +247,7 @@ def database_periodic(self):
# Send cassandra nodetool information
self.send_database_status()
# Record cluster status and shut down cassandra if needed
subprocess.Popen(["contrail-cassandra-status",
"--log-file", "/var/log/cassandra/status.log",
"--debug"])
self.cassandra_mgr.status()
# end database_periodic

def send_database_status(self):
Expand Down Expand Up @@ -309,15 +309,8 @@ def get_tp_status(self,tp_stats_output):
return thread_pool_stats_list
# end get_tp_status

def cassandra_repair(self):
logdir = self.cassandra_repair_logdir + "repair.log"
subprocess.Popen(["contrail-cassandra-repair",
"--log-file", logdir,
"--debug"])
#end cassandra_repair

def runforever(self, test=False):
prev_current_time = int(time.time())
self.prev_current_time = int(time.time())
while 1:
# we explicitly use self.stdin, self.stdout, and self.stderr
# instead of sys.* so we can unit test this code
Expand All @@ -340,8 +333,5 @@ def runforever(self, test=False):
# do periodic events
if headers['eventname'].startswith("TICK_60"):
self.database_periodic()
prev_current_time = self.event_tick_60(prev_current_time)
# Perform nodetool repair every cassandra_repair_interval hours
if self.tick_count % (60 * self.cassandra_repair_interval) == 0:
self.cassandra_repair()
self.event_tick_60()
self.listener_nodemgr.ok(self.stdout)
5 changes: 4 additions & 1 deletion src/nodemgr/main.py
Expand Up @@ -164,9 +164,12 @@ def main(args_str=' '.join(sys.argv[1:])):
rule_file, discovery_server,
discovery_port, collector_addr)
elif (node_type == 'contrail-config'):
cassandra_repair_interval = _args.cassandra_repair_interval
cassandra_repair_logdir = _args.cassandra_repair_logdir
prog = ConfigEventManager(
rule_file, discovery_server,
discovery_port, collector_addr)
discovery_port, collector_addr,
cassandra_repair_interval, cassandra_repair_logdir)
elif (node_type == 'contrail-control'):
prog = ControlEventManager(
rule_file, discovery_server,
Expand Down
4 changes: 2 additions & 2 deletions src/nodemgr/vrouter_nodemgr/vrouter_event_manager.py
Expand Up @@ -112,7 +112,7 @@ def delete_process_handler(self, deleted_process):
# end delete_process_handler

def runforever(self, test=False):
prev_current_time = int(time.time())
self.prev_current_time = int(time.time())
while 1:
# we explicitly use self.stdin, self.stdout, and self.stderr
# instead of sys.* so we can unit test this code
Expand All @@ -138,7 +138,7 @@ def runforever(self, test=False):
self.event_process_communication(pdata)
# do periodic events
if headers['eventname'].startswith("TICK_60"):
prev_current_time = self.event_tick_60(prev_current_time)
self.event_tick_60()

# loadbalancer processing
self.lb_stats.send_loadbalancer_stats()
Expand Down

0 comments on commit 125297c

Please sign in to comment.