Skip to content

Commit

Permalink
Refactoring cassandra repair and cassandra status code
Browse files Browse the repository at this point in the history
so that it can be used in config node, when config node has
seperate config db.

Change-Id: I300e5b3a25e4a0521071e9235f53544c28e94421
Partial-Bug: 1578826
  • Loading branch information
cijohnson committed Jul 27, 2016
1 parent 4d5bdd4 commit 31dc69b
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 31 deletions.
3 changes: 2 additions & 1 deletion src/nodemgr/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ for file in vrouter_sources:

database_sources = [
'__init__.py',
'database_nodemgr/database_event_manager.py'
'database_nodemgr/database_event_manager.py',
'database_nodemgr/common.py'
]
database_sources_rules = []
for file in database_sources:
Expand Down
34 changes: 24 additions & 10 deletions src/nodemgr/common/event_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from subprocess import Popen, PIPE
import supervisor.xmlrpc
import xmlrpclib
import platform

from supervisor import childutils
from nodemgr.common.event_listener_protocol_nodemgr import \
Expand All @@ -31,6 +32,17 @@
from pysandesh.sandesh_logger import *
from pysandesh.gen_py.sandesh.ttypes import SandeshLevel


def package_installed(pkg):
(pdist, _, _) = platform.dist()
if pdist == 'Ubuntu':
cmd = "dpkg -l " + pkg
else:
cmd = "rpm -q " + pkg
with open(os.devnull, "w") as fnull:
return (not subprocess.call(cmd.split(), stdout=fnull, stderr=fnull))


class EventManager(object):
rules_data = []
group_names = []
Expand Down Expand Up @@ -520,7 +532,7 @@ def event_process_communication(self, pdata):
cmd_and_args = ['/usr/bin/bash', '-c', rules['action']]
subprocess.Popen(cmd_and_args)

def event_tick_60(self, prev_current_time):
def event_tick_60(self):
self.tick_count += 1
# get disk usage info periodically
disk_usage_info = self.get_disk_usage()
Expand Down Expand Up @@ -553,29 +565,29 @@ def event_tick_60(self, prev_current_time):
node_status_uve.send()

current_time = int(time.time())
if ((abs(current_time - prev_current_time)) > 300):
if ((abs(current_time - self.prev_current_time)) > 300):
# update all process start_times with the updated time
# Compute the elapsed time and subtract them from
# current time to get updated values
sys.stderr.write(
"Time lapse detected " +
str(abs(current_time - prev_current_time)) + "\n")
str(abs(current_time - self.prev_current_time)) + "\n")
for key in self.process_state_db:
pstat = self.process_state_db[key]
if pstat.start_time is not '':
pstat.start_time = str(
(int(current_time - (prev_current_time -
(int(current_time - (self.prev_current_time -
((int)(pstat.start_time)) / 1000000))) * 1000000)
if (pstat.process_state == 'PROCESS_STATE_STOPPED'):
if pstat.stop_time is not '':
pstat.stop_time = str(
int(current_time - (prev_current_time -
int(current_time - (self.prev_current_time -
((int)(pstat.stop_time)) / 1000000)) *
1000000)
if (pstat.process_state == 'PROCESS_STATE_EXITED'):
if pstat.exit_time is not '':
pstat.exit_time = str(
int(current_time - (prev_current_time -
int(current_time - (self.prev_current_time -
((int)(pstat.exit_time)) / 1000000)) *
1000000)
# update process state database
Expand All @@ -592,11 +604,13 @@ def event_tick_60(self, prev_current_time):
sys.stderr.write("Unable to write json")
pass
self.send_process_state_db(self.group_names)
prev_current_time = int(time.time())
return prev_current_time
self.prev_current_time = int(time.time())

def do_periodic_events(self):
self.event_tick_60()

def runforever(self, test=False):
prev_current_time = int(time.time())
self.prev_current_time = int(time.time())
while 1:
# we explicitly use self.stdin, self.stdout, and self.stderr
# instead of sys.* so we can unit test this code
Expand All @@ -612,5 +626,5 @@ def runforever(self, test=False):
self.event_process_communication(pdata)
# do periodic events
if headers['eventname'].startswith("TICK_60"):
prev_current_time = self.event_tick_60(prev_current_time)
self.do_periodic_events()
self.listener_nodemgr.ok(self.stdout)
22 changes: 20 additions & 2 deletions src/nodemgr/config_nodemgr/config_event_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
import gevent
import ConfigParser

from nodemgr.common.event_manager import EventManager
from nodemgr.common.event_manager import EventManager, package_installed
from nodemgr.database_nodemgr.common import CassandraManager

from ConfigParser import NoOptionError

Expand All @@ -39,11 +40,16 @@

class ConfigEventManager(EventManager):
def __init__(self, rule_file, discovery_server,
discovery_port, collector_addr):
discovery_port, collector_addr,
cassandra_repair_interval,
cassandra_repair_logdir):
self.node_type = "contrail-config"
self.table = "ObjectConfigNode"
self.module = Module.CONFIG_NODE_MGR
self.module_id = ModuleNames[self.module]
self.cassandra_repair_interval = cassandra_repair_interval
self.cassandra_repair_logdir = cassandra_repair_logdir
self.cassandra_mgr = CassandraManager(cassandra_repair_logdir)
self.supervisor_serverurl = "unix:///var/run/supervisord_config.sock"
self.add_current_process()
node_type = Module2NodeType[self.module]
Expand Down Expand Up @@ -87,3 +93,15 @@ def get_node_third_party_process_list(self):
def get_process_state(self, fail_status_bits):
return self.get_process_state_base(
fail_status_bits, ProcessStateNames, ProcessState)

def do_periodic_events(self):
db = package_installed('contrail-opesntack-database')
config_db = package_installed('contrail-database-common')
if not db and config_db:
# Record cluster status and shut down cassandra if needed
self.cassandra_mgr.status()
self.event_tick_60()
if not db and config_db:
# Perform nodetool repair every cassandra_repair_interval hours
if self.tick_count % (60 * self.cassandra_repair_interval) == 0:
self.cassandra_mgr.repair()
21 changes: 21 additions & 0 deletions src/nodemgr/database_nodemgr/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#
# Copyright (c) 2016 Juniper Networks, Inc. All rights reserved.
#

import subprocess


class CassandraManager(object):
def __init__(self, cassandra_repair_logdir):
self.cassandra_repair_logdir = cassandra_repair_logdir

def status(self):
subprocess.Popen(["contrail-cassandra-status",
"--log-file", "/var/log/cassandra/status.log",
"--debug"])

def repair(self):
logdir = self.cassandra_repair_logdir + "repair.log"
subprocess.Popen(["contrail-cassandra-repair",
"--log-file", logdir,
"--debug"])
20 changes: 5 additions & 15 deletions src/nodemgr/database_nodemgr/database_event_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import yaml

from nodemgr.common.event_manager import EventManager
from nodemgr.database_nodemgr.common import CassandraManager

from ConfigParser import NoOptionError

Expand Down Expand Up @@ -59,6 +60,7 @@ def __init__(self, rule_file, discovery_server,
self.contrail_databases = contrail_databases
self.cassandra_repair_interval = cassandra_repair_interval
self.cassandra_repair_logdir = cassandra_repair_logdir
self.cassandra_mgr = CassandraManager(cassandra_repair_logdir)
self.supervisor_serverurl = "unix:///var/run/supervisord_database.sock"
self.add_current_process()
node_type = Module2NodeType[self.module]
Expand Down Expand Up @@ -245,9 +247,7 @@ def database_periodic(self):
# Send cassandra nodetool information
self.send_database_status()
# Record cluster status and shut down cassandra if needed
subprocess.Popen(["contrail-cassandra-status",
"--log-file", "/var/log/cassandra/status.log",
"--debug"])
self.cassandra_mgr.status()
# end database_periodic

def send_database_status(self):
Expand Down Expand Up @@ -309,15 +309,8 @@ def get_tp_status(self,tp_stats_output):
return thread_pool_stats_list
# end get_tp_status

def cassandra_repair(self):
logdir = self.cassandra_repair_logdir + "repair.log"
subprocess.Popen(["contrail-cassandra-repair",
"--log-file", logdir,
"--debug"])
#end cassandra_repair

def runforever(self, test=False):
prev_current_time = int(time.time())
self.prev_current_time = int(time.time())
while 1:
# we explicitly use self.stdin, self.stdout, and self.stderr
# instead of sys.* so we can unit test this code
Expand All @@ -340,8 +333,5 @@ def runforever(self, test=False):
# do periodic events
if headers['eventname'].startswith("TICK_60"):
self.database_periodic()
prev_current_time = self.event_tick_60(prev_current_time)
# Perform nodetool repair every cassandra_repair_interval hours
if self.tick_count % (60 * self.cassandra_repair_interval) == 0:
self.cassandra_repair()
self.event_tick_60()
self.listener_nodemgr.ok(self.stdout)
5 changes: 4 additions & 1 deletion src/nodemgr/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,12 @@ def main(args_str=' '.join(sys.argv[1:])):
rule_file, discovery_server,
discovery_port, collector_addr)
elif (node_type == 'contrail-config'):
cassandra_repair_interval = _args.cassandra_repair_interval
cassandra_repair_logdir = _args.cassandra_repair_logdir
prog = ConfigEventManager(
rule_file, discovery_server,
discovery_port, collector_addr)
discovery_port, collector_addr,
cassandra_repair_interval, cassandra_repair_logdir)
elif (node_type == 'contrail-control'):
prog = ControlEventManager(
rule_file, discovery_server,
Expand Down
4 changes: 2 additions & 2 deletions src/nodemgr/vrouter_nodemgr/vrouter_event_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def delete_process_handler(self, deleted_process):
# end delete_process_handler

def runforever(self, test=False):
prev_current_time = int(time.time())
self.prev_current_time = int(time.time())
while 1:
# we explicitly use self.stdin, self.stdout, and self.stderr
# instead of sys.* so we can unit test this code
Expand All @@ -137,7 +137,7 @@ def runforever(self, test=False):
self.event_process_communication(pdata)
# do periodic events
if headers['eventname'].startswith("TICK_60"):
prev_current_time = self.event_tick_60(prev_current_time)
self.event_tick_60()

# loadbalancer processing
self.lb_stats.send_loadbalancer_stats()
Expand Down

0 comments on commit 31dc69b

Please sign in to comment.