diff --git a/src/nodemgr/DatabaseNodemgr.py b/src/nodemgr/DatabaseNodemgr.py deleted file mode 100644 index 95d8a7dece5..00000000000 --- a/src/nodemgr/DatabaseNodemgr.py +++ /dev/null @@ -1,177 +0,0 @@ -doc = " " - -from gevent import monkey; monkey.patch_all() -import os -import sys -import socket -import subprocess -import json -import time -import datetime -import platform -import select -import gevent -import ConfigParser - -from nodemgr.EventManager import EventManager - -from ConfigParser import NoOptionError - -from supervisor import childutils - -from pysandesh.sandesh_base import * -from pysandesh.sandesh_session import SandeshWriter -from pysandesh.gen_py.sandesh_trace.ttypes import SandeshTraceRequest -from sandesh_common.vns.ttypes import Module, NodeType -from sandesh_common.vns.constants import ModuleNames, NodeTypeNames,\ - Module2NodeType, INSTANCE_ID_DEFAULT, SERVICE_CONTRAIL_DATABASE -from subprocess import Popen, PIPE -from StringIO import StringIO - -from database.sandesh.database.ttypes import \ - NodeStatusUVE, NodeStatus, DatabaseUsageStats,\ - DatabaseUsageInfo, DatabaseUsage -from database.sandesh.database.process_info.ttypes import \ - ProcessStatus, ProcessState, ProcessInfo, DiskPartitionUsageStats -from database.sandesh.database.process_info.constants import \ - ProcessStateNames - -def usage(): - print doc - sys.exit(255) - -class DatabaseEventManager(EventManager): - def __init__(self, rule_file, discovery_server, discovery_port, collector_addr): - EventManager.__init__(self, rule_file, discovery_server, discovery_port, collector_addr) - self.node_type = "contrail-database" - self.module = Module.DATABASE_NODE_MGR - self.module_id = ModuleNames[self.module] - self.supervisor_serverurl = "unix:///tmp/supervisord_database.sock" - self.add_current_process() - #end __init__ - - def process(self): - if self.rule_file is '': - self.rule_file = "/etc/contrail/supervisord_database_files/contrail-database.rules" - json_file = open(self.rule_file) - self.rules_data = json.load(json_file) - node_type = Module2NodeType[self.module] - node_type_name = NodeTypeNames[node_type] - config_file = '/etc/contrail/contrail-database-nodemgr.conf' - Config = self.read_config_data(config_file) - self.get_collector_list(Config) - _disc = self.get_discovery_client(Config) - sandesh_global.init_generator(self.module_id, socket.gethostname(), - node_type_name, self.instance_id, [], - self.module_id, 8103, ['database.sandesh'],_disc) - #sandesh_global.set_logging_params(enable_local_log=True) - self.sandesh_global = sandesh_global - - try: - self.hostip = Config.get("DEFAULT", "hostip") - except: - self.hostip = '127.0.0.1' - - (linux_dist, x, y) = platform.linux_distribution() - if (linux_dist == 'Ubuntu'): - (disk_space_used, error_value) = Popen("set `df -Pk \`grep -A 1 'data_file_directories:' /etc/cassandra/cassandra.yaml | grep '-' | cut -d'-' -f2 \`/ContrailAnalytics | grep %` && echo $3 | cut -d'%' -f1", shell=True, stdout=PIPE).communicate() - (disk_space_available, error_value) = Popen("set `df -Pk \`grep -A 1 'data_file_directories:' /etc/cassandra/cassandra.yaml | grep '-' | cut -d'-' -f2\`/ContrailAnalytics | grep %` && echo $4 | cut -d'%' -f1", shell=True, stdout=PIPE).communicate() - (analytics_db_size, error_value) = Popen("set `du -skL \`grep -A 1 'data_file_directories:' /etc/cassandra/cassandra.yaml | grep '-' | cut -d'-' -f2\`/ContrailAnalytics` && echo $1 | cut -d'%' -f1", shell=True, stdout=PIPE).communicate() - else: - (disk_space_used, error_value) = Popen("set `df -Pk \`grep -A 1 'data_file_directories:' /etc/cassandra/conf/cassandra.yaml | grep '-' | cut -d'-' -f2 \`/ContrailAnalytics | grep %` && echo $3 | cut -d'%' -f1", shell=True, stdout=PIPE).communicate() - (disk_space_available, error_value) = Popen("set `df -Pk \`grep -A 1 'data_file_directories:' /etc/cassandra/conf/cassandra.yaml | grep '-' | cut -d'-' -f2\`/ContrailAnalytics | grep %` && echo $4 | cut -d'%' -f1", shell=True, stdout=PIPE).communicate() - (analytics_db_size, error_value) = Popen("set `du -skL \`grep -A 1 'data_file_directories:' /etc/cassandra/conf/cassandra.yaml | grep '-' | cut -d'-' -f2\`/ContrailAnalytics` && echo $1 | cut -d'%' -f1", shell=True, stdout=PIPE).communicate() - disk_space_total = int(disk_space_used) + int(disk_space_available) - try: - min_disk_opt = Config.get("DEFAULT", "minimum_diskGB") - min_disk = int(min_disk_opt) - except: - min_disk = 0 - if (disk_space_total/(1024*1024) < min_disk): - cmd_str = "service " + SERVICE_CONTRAIL_DATABASE + " stop" - (ret_value, error_value) = Popen(cmd_str, shell=True, stdout=PIPE).communicate() - prog.fail_status_bits |= prog.FAIL_STATUS_DISK_SPACE - - def send_process_state_db(self, group_names): - self.send_process_state_db_base(group_names, ProcessInfo, NodeStatus, NodeStatusUVE) - - def send_nodemgr_process_status(self): - self.send_nodemgr_process_status_base(ProcessStateNames, ProcessState, ProcessStatus, NodeStatus, NodeStatusUVE) - - def get_process_state(self, fail_status_bits): - return self.get_process_state_base(fail_status_bits, ProcessStateNames, ProcessState) - - def get_failbits_nodespecific_desc(self, fail_status_bits): - description = "" - if fail_status_bits & self.FAIL_STATUS_DISK_SPACE: - description += "Disk for analytics db is too low, cassandra stopped." - if fail_status_bits & self.FAIL_STATUS_SERVER_PORT: - if description != "": - description += " " - description += "Cassandra state detected DOWN." - return description - - def database_periodic(self): - (linux_dist, x, y) = platform.linux_distribution() - if (linux_dist == 'Ubuntu'): - (disk_space_used, error_value) = Popen("set `df -Pk \`grep -A 1 'data_file_directories:' /etc/cassandra/cassandra.yaml | grep '-' | cut -d'-' -f2 \`/ContrailAnalytics | grep %` && echo $3 | cut -d'%' -f1", shell=True, stdout=PIPE).communicate() - (disk_space_available, error_value) = Popen("set `df -Pk \`grep -A 1 'data_file_directories:' /etc/cassandra/cassandra.yaml | grep '-' | cut -d'-' -f2\`/ContrailAnalytics | grep %` && echo $4 | cut -d'%' -f1", shell=True, stdout=PIPE).communicate() - (analytics_db_size, error_value) = Popen("set `du -skL \`grep -A 1 'data_file_directories:' /etc/cassandra/cassandra.yaml | grep '-' | cut -d'-' -f2\`/ContrailAnalytics` && echo $1 | cut -d'%' -f1", shell=True, stdout=PIPE).communicate() - else: - (disk_space_used, error_value) = Popen("set `df -Pk \`grep -A 1 'data_file_directories:' /etc/cassandra/conf/cassandra.yaml | grep '-' | cut -d'-' -f2 \`/ContrailAnalytics | grep %` && echo $3 | cut -d'%' -f1", shell=True, stdout=PIPE).communicate() - (disk_space_available, error_value) = Popen("set `df -Pk \`grep -A 1 'data_file_directories:' /etc/cassandra/conf/cassandra.yaml | grep '-' | cut -d'-' -f2\`/ContrailAnalytics | grep %` && echo $4 | cut -d'%' -f1", shell=True, stdout=PIPE).communicate() - (analytics_db_size, error_value) = Popen("set `du -skL \`grep -A 1 'data_file_directories:' /etc/cassandra/conf/cassandra.yaml | grep '-' | cut -d'-' -f2\`/ContrailAnalytics` && echo $1 | cut -d'%' -f1", shell=True, stdout=PIPE).communicate() - db_stat = DatabaseUsageStats() - db_info = DatabaseUsageInfo() - try: - db_stat.disk_space_used_1k = int(disk_space_used) - db_stat.disk_space_available_1k = int(disk_space_available) - db_stat.analytics_db_size_1k = int(analytics_db_size) - except ValueError: - sys.stderr.write("Failed to get database usage" + "\n") - else: - db_info.name = socket.gethostname() - db_info.database_usage = [db_stat] - usage_stat = DatabaseUsage(data=db_info) - usage_stat.send() - - cassandra_cli_cmd = "cassandra-cli --host " + self.hostip + " --batch < /dev/null | grep 'Connected to:'" - proc = Popen(cassandra_cli_cmd, shell=True, stdout=PIPE, stderr=PIPE) - (output, errout) = proc.communicate() - if proc.returncode != 0: - self.fail_status_bits |= self.FAIL_STATUS_SERVER_PORT - else: - self.fail_status_bits &= ~self.FAIL_STATUS_SERVER_PORT - self.send_nodemgr_process_status() - - # end database_periodic - - def send_disk_usage_info(self): - self.send_disk_usage_info_base(NodeStatusUVE, NodeStatus, DiskPartitionUsageStats) - - def runforever(self, test=False): - prev_current_time = int(time.time()) - while 1: - gevent.sleep(1) - # we explicitly use self.stdin, self.stdout, and self.stderr - # instead of sys.* so we can unit test this code - headers, payload = self.listener_nodemgr.wait(self.stdin, self.stdout) - - #self.stderr.write("headers:\n" + str(headers) + '\n') - #self.stderr.write("payload:\n" + str(payload) + '\n') - - pheaders, pdata = childutils.eventdata(payload+'\n') - #self.stderr.write("pheaders:\n" + str(pheaders)+'\n') - #self.stderr.write("pdata:\n" + str(pdata)) - - # check for process state change events - if headers['eventname'].startswith("PROCESS_STATE"): - self.event_process_state(pheaders, headers) - # check for flag value change events - if headers['eventname'].startswith("PROCESS_COMMUNICATION"): - self.event_process_communication(pdata) - # do periodic events - if headers['eventname'].startswith("TICK_60"): - self.database_periodic() - prev_current_time = self.event_tick_60(prev_current_time) - self.listener_nodemgr.ok(self.stdout) diff --git a/src/nodemgr/SConscript b/src/nodemgr/SConscript index f48629512b2..af35007dc8a 100644 --- a/src/nodemgr/SConscript +++ b/src/nodemgr/SConscript @@ -21,17 +21,17 @@ for file in setup_sources: local_sources = [ '__init__.py', - 'AnalyticsNodemgr.py', - 'ControlNodemgr.py', - 'ConfigNodemgr.py', - 'VrouterNodemgr.py', - 'DatabaseNodemgr.py', - 'EventManager.py', - 'VrouterProcessStat.py', - 'ProcessStat.py', - 'EventListenerProtocolNodeMgr.py', - 'LoadbalancerStats.py', - 'HaproxyStats.py', + 'analytics_event_manager.py', + 'control_event_manager.py', + 'config_event_manager.py', + 'vrouter_event_manager.py', + 'database_event_manager.py', + 'event_manager.py', + 'vrouter_process_stat.py', + 'process_stat.py', + 'event_listener_protocol_nodemgr.py', + 'load_balancer_stats.py', + 'haproxy_stats.py', 'main.py' ] diff --git a/src/nodemgr/AnalyticsNodemgr.py b/src/nodemgr/analytics_event_manager.py similarity index 61% rename from src/nodemgr/AnalyticsNodemgr.py rename to src/nodemgr/analytics_event_manager.py index 23d66fd4797..67f08de2fdd 100644 --- a/src/nodemgr/AnalyticsNodemgr.py +++ b/src/nodemgr/analytics_event_manager.py @@ -1,6 +1,11 @@ +# +# Copyright (c) 2015 Juniper Networks, Inc. All rights reserved. +# + doc = " " -from gevent import monkey; monkey.patch_all() +from gevent import monkey +monkey.patch_all() import os import sys import socket @@ -12,7 +17,7 @@ import gevent import ConfigParser -from nodemgr.EventManager import EventManager +from nodemgr.event_manager import EventManager from pysandesh.sandesh_base import * from sandesh_common.vns.ttypes import Module, NodeType @@ -27,23 +32,29 @@ from analytics.process_info.constants import \ ProcessStateNames + def usage(): print doc sys.exit(255) + class AnalyticsEventManager(EventManager): - def __init__(self, rule_file, discovery_server, discovery_port, collector_addr): - EventManager.__init__(self, rule_file, discovery_server, discovery_port, collector_addr) + def __init__(self, rule_file, discovery_server, + discovery_port, collector_addr): + EventManager.__init__( + self, rule_file, discovery_server, + discovery_port, collector_addr) self.node_type = 'contrail-analytics' self.module = Module.ANALYTICS_NODE_MGR - self.module_id = ModuleNames[self.module] + self.module_id = ModuleNames[self.module] self.supervisor_serverurl = "unix:///tmp/supervisord_analytics.sock" self.add_current_process() - #end __init__ + # end __init__ def process(self): if self.rule_file is '': - self.rule_file = "/etc/contrail/supervisord_analytics_files/contrail-analytics.rules" + self.rule_file = "/etc/contrail/" + \ + "supervisord_analytics_files/contrail-analytics.rules" json_file = open(self.rule_file) self.rules_data = json.load(json_file) node_type = Module2NodeType[self.module] @@ -52,20 +63,26 @@ def process(self): Config = self.read_config_data(config_file) self.get_collector_list(Config) _disc = self.get_discovery_client(Config) - sandesh_global.init_generator(self.module_id, socket.gethostname(), + sandesh_global.init_generator( + self.module_id, socket.gethostname(), node_type_name, self.instance_id, self.collector_addr, self.module_id, 8104, ['analytics'], _disc) sandesh_global.set_logging_params(enable_local_log=True) self.sandesh_global = sandesh_global def send_process_state_db(self, group_names): - self.send_process_state_db_base(group_names, ProcessInfo, NodeStatus, NodeStatusUVE) + self.send_process_state_db_base( + group_names, ProcessInfo, NodeStatus, NodeStatusUVE) def send_nodemgr_process_status(self): - self.send_nodemgr_process_status_base(ProcessStateNames, ProcessState, ProcessStatus, NodeStatus, NodeStatusUVE) + self.send_nodemgr_process_status_base( + ProcessStateNames, ProcessState, ProcessStatus, + NodeStatus, NodeStatusUVE) def get_process_state(self, fail_status_bits): - return self.get_process_state_base(fail_status_bits, ProcessStateNames, ProcessState) + return self.get_process_state_base( + fail_status_bits, ProcessStateNames, ProcessState) def send_disk_usage_info(self): - self.send_disk_usage_info_base(NodeStatusUVE, NodeStatus, DiskPartitionUsageStats) + self.send_disk_usage_info_base( + NodeStatusUVE, NodeStatus, DiskPartitionUsageStats) diff --git a/src/nodemgr/ConfigNodemgr.py b/src/nodemgr/config_event_manager.py similarity index 61% rename from src/nodemgr/ConfigNodemgr.py rename to src/nodemgr/config_event_manager.py index 2c12bb2cace..1c3c13c5dc9 100644 --- a/src/nodemgr/ConfigNodemgr.py +++ b/src/nodemgr/config_event_manager.py @@ -1,6 +1,11 @@ +# +# Copyright (c) 2015 Juniper Networks, Inc. All rights reserved. +# + doc = " " -from gevent import monkey; monkey.patch_all() +from gevent import monkey +monkey.patch_all() import os import sys import socket @@ -13,7 +18,7 @@ import gevent import ConfigParser -from nodemgr.EventManager import EventManager +from nodemgr.event_manager import EventManager from ConfigParser import NoOptionError @@ -35,23 +40,29 @@ from cfgm_common.uve.cfgm_cpuinfo.process_info.constants import \ ProcessStateNames + def usage(): print doc sys.exit(255) + class ConfigEventManager(EventManager): - def __init__(self, rule_file, discovery_server, discovery_port, collector_addr): - EventManager.__init__(self, rule_file, discovery_server, discovery_port, collector_addr) + def __init__(self, rule_file, discovery_server, + discovery_port, collector_addr): + EventManager.__init__( + self, rule_file, discovery_server, + discovery_port, collector_addr) self.node_type = "contrail-config" self.module = Module.CONFIG_NODE_MGR - self.module_id = ModuleNames[self.module] + self.module_id = ModuleNames[self.module] self.supervisor_serverurl = "unix:///tmp/supervisord_config.sock" self.add_current_process() - #end __init__ + # end __init__ def process(self): if self.rule_file is '': - self.rule_file = "/etc/contrail/supervisord_config_files/contrail-config.rules" + self.rule_file = "/etc/contrail/" + \ + "supervisord_config_files/contrail-config.rules" json_file = open(self.rule_file) self.rules_data = json.load(json_file) node_type = Module2NodeType[self.module] @@ -60,20 +71,26 @@ def process(self): Config = self.read_config_data(config_file) self.get_collector_list(Config) _disc = self.get_discovery_client(Config) - sandesh_global.init_generator(self.module_id, socket.gethostname(), + sandesh_global.init_generator( + self.module_id, socket.gethostname(), node_type_name, self.instance_id, self.collector_addr, - self.module_id, 8100, ['cfgm_common.uve'],_disc) - #sandesh_global.set_logging_params(enable_local_log=True) + self.module_id, 8100, ['cfgm_common.uve'], _disc) + # sandesh_global.set_logging_params(enable_local_log=True) self.sandesh_global = sandesh_global def send_process_state_db(self, group_names): - self.send_process_state_db_base(group_names, ProcessInfo, NodeStatus, NodeStatusUVE) + self.send_process_state_db_base( + group_names, ProcessInfo, NodeStatus, NodeStatusUVE) def send_nodemgr_process_status(self): - self.send_nodemgr_process_status_base(ProcessStateNames, ProcessState, ProcessStatus, NodeStatus, NodeStatusUVE) + self.send_nodemgr_process_status_base( + ProcessStateNames, ProcessState, ProcessStatus, + NodeStatus, NodeStatusUVE) def get_process_state(self, fail_status_bits): - return self.get_process_state_base(fail_status_bits, ProcessStateNames, ProcessState) + return self.get_process_state_base( + fail_status_bits, ProcessStateNames, ProcessState) def send_disk_usage_info(self): - self.send_disk_usage_info_base(NodeStatusUVE, NodeStatus, DiskPartitionUsageStats) + self.send_disk_usage_info_base( + NodeStatusUVE, NodeStatus, DiskPartitionUsageStats) diff --git a/src/nodemgr/ControlNodemgr.py b/src/nodemgr/control_event_manager.py similarity index 62% rename from src/nodemgr/ControlNodemgr.py rename to src/nodemgr/control_event_manager.py index e251d052d14..9df85c84173 100644 --- a/src/nodemgr/ControlNodemgr.py +++ b/src/nodemgr/control_event_manager.py @@ -1,6 +1,11 @@ +# +# Copyright (c) 2015 Juniper Networks, Inc. All rights reserved. +# + doc = " " -from gevent import monkey; monkey.patch_all() +from gevent import monkey +monkey.patch_all() import os import sys import socket @@ -13,7 +18,7 @@ import gevent import ConfigParser -from nodemgr.EventManager import EventManager +from nodemgr.event_manager import EventManager from ConfigParser import NoOptionError @@ -35,23 +40,29 @@ from control_node.control_node.process_info.constants import \ ProcessStateNames + def usage(): print doc sys.exit(255) + class ControlEventManager(EventManager): - def __init__(self, rule_file, discovery_server, discovery_port, collector_addr): - EventManager.__init__(self, rule_file, discovery_server, discovery_port, collector_addr) + def __init__(self, rule_file, discovery_server, + discovery_port, collector_addr): + EventManager.__init__( + self, rule_file, discovery_server, + discovery_port, collector_addr) self.node_type = "contrail-control" self.module = Module.CONTROL_NODE_MGR - self.module_id = ModuleNames[self.module] + self.module_id = ModuleNames[self.module] self.supervisor_serverurl = "unix:///tmp/supervisord_control.sock" self.add_current_process() - #end __init__ + # end __init__ def process(self): if self.rule_file == '': - self.rule_file = '/etc/contrail/supervisord_control_files/contrail-control.rules' + self.rule_file = "/etc/contrail/" + \ + "supervisord_control_files/contrail-control.rules" json_file = open(self.rule_file) self.rules_data = json.load(json_file) node_type = Module2NodeType[self.module] @@ -60,20 +71,26 @@ def process(self): Config = self.read_config_data(config_file) self.get_collector_list(Config) _disc = self.get_discovery_client(Config) - sandesh_global.init_generator(self.module_id, socket.gethostname(), + sandesh_global.init_generator( + self.module_id, socket.gethostname(), node_type_name, self.instance_id, self.collector_addr, - self.module_id, 8101, ['control_node.control_node'],_disc) - #sandesh_global.set_logging_params(enable_local_log=True) + self.module_id, 8101, ['control_node.control_node'], _disc) + # sandesh_global.set_logging_params(enable_local_log=True) self.sandesh_global = sandesh_global def send_process_state_db(self, group_names): - self.send_process_state_db_base(group_names, ProcessInfo, NodeStatus, NodeStatusUVE) + self.send_process_state_db_base( + group_names, ProcessInfo, NodeStatus, NodeStatusUVE) def send_nodemgr_process_status(self): - self.send_nodemgr_process_status_base(ProcessStateNames, ProcessState, ProcessStatus, NodeStatus, NodeStatusUVE) + self.send_nodemgr_process_status_base( + ProcessStateNames, ProcessState, ProcessStatus, + NodeStatus, NodeStatusUVE) def get_process_state(self, fail_status_bits): - return self.get_process_state_base(fail_status_bits, ProcessStateNames, ProcessState) + return self.get_process_state_base( + fail_status_bits, ProcessStateNames, ProcessState) def send_disk_usage_info(self): - self.send_disk_usage_info_base(NodeStatusUVE, NodeStatus, DiskPartitionUsageStats) + self.send_disk_usage_info_base( + NodeStatusUVE, NodeStatus, DiskPartitionUsageStats) diff --git a/src/nodemgr/database_event_manager.py b/src/nodemgr/database_event_manager.py new file mode 100644 index 00000000000..1ede5ab843e --- /dev/null +++ b/src/nodemgr/database_event_manager.py @@ -0,0 +1,254 @@ +# +# Copyright (c) 2015 Juniper Networks, Inc. All rights reserved. +# + +doc = " " + +from gevent import monkey +monkey.patch_all() +import os +import sys +import socket +import subprocess +import json +import time +import datetime +import platform +import select +import gevent +import ConfigParser + +from nodemgr.event_manager import EventManager + +from ConfigParser import NoOptionError + +from supervisor import childutils + +from pysandesh.sandesh_base import * +from pysandesh.sandesh_session import SandeshWriter +from pysandesh.gen_py.sandesh_trace.ttypes import SandeshTraceRequest +from sandesh_common.vns.ttypes import Module, NodeType +from sandesh_common.vns.constants import ModuleNames, NodeTypeNames,\ + Module2NodeType, INSTANCE_ID_DEFAULT, SERVICE_CONTRAIL_DATABASE +from subprocess import Popen, PIPE +from StringIO import StringIO + +from database.sandesh.database.ttypes import \ + NodeStatusUVE, NodeStatus, DatabaseUsageStats,\ + DatabaseUsageInfo, DatabaseUsage +from database.sandesh.database.process_info.ttypes import \ + ProcessStatus, ProcessState, ProcessInfo, DiskPartitionUsageStats +from database.sandesh.database.process_info.constants import \ + ProcessStateNames + + +def usage(): + print doc + sys.exit(255) + + +class DatabaseEventManager(EventManager): + def __init__(self, rule_file, discovery_server, + discovery_port, collector_addr): + EventManager.__init__( + self, rule_file, discovery_server, + discovery_port, collector_addr) + self.node_type = "contrail-database" + self.module = Module.DATABASE_NODE_MGR + self.module_id = ModuleNames[self.module] + self.supervisor_serverurl = "unix:///tmp/supervisord_database.sock" + self.add_current_process() + # end __init__ + + def process(self): + if self.rule_file is '': + self.rule_file = "/etc/contrail/" + \ + "supervisord_database_files/contrail-database.rules" + json_file = open(self.rule_file) + self.rules_data = json.load(json_file) + node_type = Module2NodeType[self.module] + node_type_name = NodeTypeNames[node_type] + config_file = '/etc/contrail/contrail-database-nodemgr.conf' + Config = self.read_config_data(config_file) + self.get_collector_list(Config) + _disc = self.get_discovery_client(Config) + sandesh_global.init_generator( + self.module_id, socket.gethostname(), node_type_name, + self.instance_id, [], self.module_id, 8103, + ['database.sandesh'], _disc) + # sandesh_global.set_logging_params(enable_local_log=True) + self.sandesh_global = sandesh_global + + try: + self.hostip = Config.get("DEFAULT", "hostip") + except: + self.hostip = '127.0.0.1' + + (linux_dist, x, y) = platform.linux_distribution() + if (linux_dist == 'Ubuntu'): + popen_cmd = "set `df -Pk \`grep -A 1 'data_file_directories:'" + \ + " /etc/cassandra/cassandra.yaml | grep '-' | cut -d'-'" + \ + " -f2 \`/ContrailAnalytics | grep %` && echo $3 |" + \ + " cut -d'%' -f1" + (disk_space_used, error_value) = \ + Popen(popen_cmd, shell=True, stdout=PIPE).communicate() + popen_cmd = "set `df -Pk \`grep -A 1 'data_file_directories:'" + \ + " /etc/cassandra/cassandra.yaml | grep '-' | cut -d'-'" + \ + " -f2\`/ContrailAnalytics | grep %` && echo $4" + \ + " | cut -d'%' -f1" + (disk_space_available, error_value) = \ + Popen(popen_cmd, shell=True, stdout=PIPE).communicate() + popen_cmd = "set `du -skL \`grep -A 1 'data_file_directories:'" + \ + " /etc/cassandra/cassandra.yaml | grep '-' | cut -d'-'" + \ + " -f2\`/ContrailAnalytics` && echo $1 | cut -d'%' -f1" + (analytics_db_size, error_value) = \ + Popen(popen_cmd, shell=True, stdout=PIPE).communicate() + else: + popen_cmd = "set `df -Pk \`grep -A 1 'data_file_directories:'" + \ + " /etc/cassandra/conf/cassandra.yaml | grep '-' |" + \ + " cut -d'-' -f2 \`/ContrailAnalytics | grep %` &&" + \ + " echo $3 | cut -d'%' -f1" + (disk_space_used, error_value) = \ + Popen(popen_cmd, shell=True, stdout=PIPE).communicate() + popen_cmd = "set `df -Pk \`grep -A 1 'data_file_directories:'" + \ + " /etc/cassandra/conf/cassandra.yaml | grep '-' |" + \ + " cut -d'-' -f2\`/ContrailAnalytics | grep %` && echo $4" + \ + " | cut -d'%' -f1" + (disk_space_available, error_value) = \ + Popen(popen_cmd, shell=True, stdout=PIPE).communicate() + popen_cmd = "set `du -skL \`grep -A 1 'data_file_directories:'" + \ + " /etc/cassandra/conf/cassandra.yaml | grep '-' | cut -d" + \ + "'-' -f2\`/ContrailAnalytics` && echo $1 | cut -d'%' -f1" + (analytics_db_size, error_value) = \ + Popen(popen_cmd, shell=True, stdout=PIPE).communicate() + disk_space_total = int(disk_space_used) + int(disk_space_available) + try: + min_disk_opt = Config.get("DEFAULT", "minimum_diskGB") + min_disk = int(min_disk_opt) + except: + min_disk = 0 + if (disk_space_total / (1024 * 1024) < min_disk): + cmd_str = "service " + SERVICE_CONTRAIL_DATABASE + " stop" + (ret_value, error_value) = Popen( + cmd_str, shell=True, stdout=PIPE).communicate() + prog.fail_status_bits |= prog.FAIL_STATUS_DISK_SPACE + + def send_process_state_db(self, group_names): + self.send_process_state_db_base( + group_names, ProcessInfo, NodeStatus, NodeStatusUVE) + + def send_nodemgr_process_status(self): + self.send_nodemgr_process_status_base( + ProcessStateNames, ProcessState, ProcessStatus, + NodeStatus, NodeStatusUVE) + + def get_process_state(self, fail_status_bits): + return self.get_process_state_base( + fail_status_bits, ProcessStateNames, ProcessState) + + def get_failbits_nodespecific_desc(self, fail_status_bits): + description = "" + if fail_status_bits & self.FAIL_STATUS_DISK_SPACE: + description += "Disk for analytics db is too low," + \ + " cassandra stopped." + if fail_status_bits & self.FAIL_STATUS_SERVER_PORT: + if description != "": + description += " " + description += "Cassandra state detected DOWN." + return description + + def database_periodic(self): + (linux_dist, x, y) = platform.linux_distribution() + if (linux_dist == 'Ubuntu'): + popen_cmd = "set `df -Pk \`grep -A 1 'data_file_directories:'" + \ + " /etc/cassandra/cassandra.yaml | grep '-' | cut -d" + \ + "'-' -f2 \`/ContrailAnalytics | grep %` && echo $3 |" + \ + " cut -d'%' -f1" + (disk_space_used, error_value) = \ + Popen(popen_cmd, shell=True, stdout=PIPE).communicate() + popen_cmd = "set `df -Pk \`grep -A 1 'data_file_directories:'" + \ + " /etc/cassandra/cassandra.yaml | grep '-' | cut -d'-'" + \ + " -f2\`/ContrailAnalytics | grep %` && echo $4 |" + \ + " cut -d'%' -f1" + (disk_space_available, error_value) = \ + Popen(popen_cmd, shell=True, stdout=PIPE).communicate() + popen_cmd = "set `du -skL \`grep -A 1 'data_file_directories:'" + \ + " /etc/cassandra/cassandra.yaml | grep '-' | cut -d'-'" + \ + " -f2\`/ContrailAnalytics` && echo $1 | cut -d'%' -f1" + (analytics_db_size, error_value) = \ + Popen(popen_cmd, shell=True, stdout=PIPE).communicate() + else: + popen_cmd = "set `df -Pk \`grep -A 1 'data_file_directories:'" + \ + " /etc/cassandra/conf/cassandra.yaml | grep '-' |" + \ + " cut -d'-' -f2 \`/ContrailAnalytics | grep %` && echo" + \ + " $3 | cut -d'%' -f1" + (disk_space_used, error_value) = \ + Popen(popen_cmd, shell=True, stdout=PIPE).communicate() + popen_cmd = "set `df -Pk \`grep -A 1 'data_file_directories:'" + \ + " /etc/cassandra/conf/cassandra.yaml | grep '-' | cut -d" + \ + "'-' -f2\`/ContrailAnalytics | grep %` && echo $4" + \ + " | cut -d'%' -f1" + (disk_space_available, error_value) = \ + Popen(popen_cmd, shell=True, stdout=PIPE).communicate() + popen_cmd = "set `du -skL \`grep -A 1 'data_file_directories:'" + \ + " /etc/cassandra/conf/cassandra.yaml | grep '-' | cut -d" + \ + "'-' -f2\`/ContrailAnalytics` && echo $1 | cut -d'%' -f1" + (analytics_db_size, error_value) = \ + Popen(popen_cmd, shell=True, stdout=PIPE).communicate() + db_stat = DatabaseUsageStats() + db_info = DatabaseUsageInfo() + try: + db_stat.disk_space_used_1k = int(disk_space_used) + db_stat.disk_space_available_1k = int(disk_space_available) + db_stat.analytics_db_size_1k = int(analytics_db_size) + except ValueError: + sys.stderr.write("Failed to get database usage" + "\n") + else: + db_info.name = socket.gethostname() + db_info.database_usage = [db_stat] + usage_stat = DatabaseUsage(data=db_info) + usage_stat.send() + + cassandra_cli_cmd = "cassandra-cli --host " + self.hostip + \ + " --batch < /dev/null | grep 'Connected to:'" + proc = Popen(cassandra_cli_cmd, shell=True, stdout=PIPE, stderr=PIPE) + (output, errout) = proc.communicate() + if proc.returncode != 0: + self.fail_status_bits |= self.FAIL_STATUS_SERVER_PORT + else: + self.fail_status_bits &= ~self.FAIL_STATUS_SERVER_PORT + self.send_nodemgr_process_status() + + # end database_periodic + + def send_disk_usage_info(self): + self.send_disk_usage_info_base( + NodeStatusUVE, NodeStatus, DiskPartitionUsageStats) + + def runforever(self, test=False): + prev_current_time = int(time.time()) + while 1: + gevent.sleep(1) + # we explicitly use self.stdin, self.stdout, and self.stderr + # instead of sys.* so we can unit test this code + headers, payload = self.listener_nodemgr.wait( + self.stdin, self.stdout) + + # self.stderr.write("headers:\n" + str(headers) + '\n') + # self.stderr.write("payload:\n" + str(payload) + '\n') + + pheaders, pdata = childutils.eventdata(payload + '\n') + # self.stderr.write("pheaders:\n" + str(pheaders)+'\n') + # self.stderr.write("pdata:\n" + str(pdata)) + + # check for process state change events + if headers['eventname'].startswith("PROCESS_STATE"): + self.event_process_state(pheaders, headers) + # check for flag value change events + if headers['eventname'].startswith("PROCESS_COMMUNICATION"): + self.event_process_communication(pdata) + # do periodic events + if headers['eventname'].startswith("TICK_60"): + self.database_periodic() + prev_current_time = self.event_tick_60(prev_current_time) + self.listener_nodemgr.ok(self.stdout) diff --git a/src/nodemgr/EventListenerProtocolNodeMgr.py b/src/nodemgr/event_listener_protocol_nodemgr.py similarity index 83% rename from src/nodemgr/EventListenerProtocolNodeMgr.py rename to src/nodemgr/event_listener_protocol_nodemgr.py index c9082b020aa..e53422e04b6 100644 --- a/src/nodemgr/EventListenerProtocolNodeMgr.py +++ b/src/nodemgr/event_listener_protocol_nodemgr.py @@ -1,14 +1,19 @@ +# +# Copyright (c) 2015 Juniper Networks, Inc. All rights reserved. +# + import gevent import sys import select from supervisor import childutils + class EventListenerProtocolNodeMgr(childutils.EventListenerProtocol): def wait(self, stdin=sys.stdin, stdout=sys.stdout): self.ready(stdout) while 1: gevent.sleep(1) - if select.select([sys.stdin,],[],[], 1)[0]: + if select.select([sys.stdin, ], [], [], 1)[0]: line = stdin.readline() if line is not None: sys.stderr.write("wokeup and found a line\n") diff --git a/src/nodemgr/EventManager.py b/src/nodemgr/event_manager.py similarity index 62% rename from src/nodemgr/EventManager.py rename to src/nodemgr/event_manager.py index 425ad4c1b48..f0554e548bd 100644 --- a/src/nodemgr/EventManager.py +++ b/src/nodemgr/event_manager.py @@ -1,3 +1,7 @@ +# +# Copyright (c) 2015 Juniper Networks, Inc. All rights reserved. +# + import gevent import json import ConfigParser @@ -12,19 +16,23 @@ import xmlrpclib from supervisor import childutils -from nodemgr.EventListenerProtocolNodeMgr import EventListenerProtocolNodeMgr -from nodemgr.ProcessStat import ProcessStat +from nodemgr.event_listener_protocol_nodemgr import \ + EventListenerProtocolNodeMgr +from nodemgr.process_stat import ProcessStat from sandesh_common.vns.constants import INSTANCE_ID_DEFAULT + class EventManager(object): rules_data = [] group_names = [] process_state_db = {} - FAIL_STATUS_DUMMY = 0x1 - FAIL_STATUS_DISK_SPACE = 0x2 + FAIL_STATUS_DUMMY = 0x1 + FAIL_STATUS_DISK_SPACE = 0x2 FAIL_STATUS_SERVER_PORT = 0x4 - FAIL_STATUS_NTP_SYNC = 0x8 - def __init__(self, rule_file, discovery_server, discovery_port, collector_addr): + FAIL_STATUS_NTP_SYNC = 0x8 + + def __init__(self, rule_file, discovery_server, + discovery_port, collector_addr): self.stdin = sys.stdin self.stdout = sys.stdout self.stderr = sys.stderr @@ -47,20 +55,23 @@ def __init__(self, rule_file, discovery_server, discovery_port, collector_addr): # Get all the current processes in the node def get_current_process(self): - proxy = xmlrpclib.ServerProxy('http://127.0.0.1', - transport=supervisor.xmlrpc.SupervisorTransport(None, None, serverurl=self.supervisor_serverurl)) + proxy = xmlrpclib.ServerProxy( + 'http://127.0.0.1', + transport=supervisor.xmlrpc.SupervisorTransport( + None, None, serverurl=self.supervisor_serverurl)) # Add all current processes to make sure nothing misses the radar process_state_db = {} for proc_info in proxy.supervisor.getAllProcessInfo(): if (proc_info['name'] != proc_info['group']): - proc_name = proc_info['group']+ ":" + proc_info['name'] + proc_name = proc_info['group'] + ":" + proc_info['name'] else: proc_name = proc_info['name'] process_stat_ent = self.get_process_stat_object(proc_name) - process_stat_ent.process_state = "PROCESS_STATE_" + proc_info['statename'] - if (process_stat_ent.process_state == + process_stat_ent.process_state = "PROCESS_STATE_" + \ + proc_info['statename'] + if (process_stat_ent.process_state == 'PROCESS_STATE_RUNNING'): - process_stat_ent.start_time = str(proc_info['start']*1000000) + process_stat_ent.start_time = str(proc_info['start'] * 1000000) process_stat_ent.start_count += 1 process_state_db[proc_name] = process_stat_ent return process_state_db @@ -82,20 +93,23 @@ def update_current_process(self): for deleted_process in deleted_process_set: self.delete_process_handler(deleted_process) for added_process in added_process_set: - self.add_process_handler(added_process, process_state_db[added_process]) + self.add_process_handler( + added_process, process_state_db[added_process]) # end update_current_process # process is deleted, send state & remove it from db def delete_process_handler(self, deleted_process): self.process_state_db[deleted_process].deleted = True - self.send_process_state_db([self.process_state_db[deleted_process].group]) + group_val = self.process_state_db[deleted_process].group + self.send_process_state_db([group_val]) del self.process_state_db[deleted_process] # end delete_process_handler # new process added, update db & send state def add_process_handler(self, added_process, process_info): self.process_state_db[added_process] = process_info - self.send_process_state_db([self.process_state_db[added_process].group]) + group_val = self.process_state_db[added_process].group + self.send_process_state_db([group_val]) # end add_process_handler def read_config_data(self, config_file): @@ -111,7 +125,7 @@ def get_discovery_server(self, Config): sys.stderr.write("ERROR: " + str(e) + '\n') except NoSectionError as e: sys.stderr.write("ERROR: " + str(e) + '\n') - #Hack becos of Configparser and the conf file format itself + # Hack becos of Configparser and the conf file format itself try: self.discovery_server[:self.discovery_server.index('#')].strip() except: @@ -124,9 +138,10 @@ def get_discovery_port(self, Config): sys.stderr.write("ERROR: " + str(e) + '\n') except NoSectionError as e: sys.stderr.write("ERROR: " + str(e) + '\n') - #Hack becos of Configparser and the conf file format itself + # Hack becos of Configparser and the conf file format itself try: - self.discovery_port = self.discovery_port[:self.discovery_port.index('#')].strip() + self.discovery_port = self.discovery_port[ + :self.discovery_port.index('#')].strip() except Exception: pass @@ -138,14 +153,16 @@ def get_discovery_client(self, Config): if self.discovery_server == socket.gethostname(): self.get_discovery_server(Config) self.get_discovery_port(Config) - _disc= client.DiscoveryClient(self.discovery_server, self.discovery_port, self.module_id) + _disc = client.DiscoveryClient( + self.discovery_server, self.discovery_port, self.module_id) return _disc def get_collector_list(self, Config): try: self.collector_addr = Config.get("COLLECTOR", "server_list") try: - self.collector_addr = self.collector_addr[:self.collector_addr.index('#')].strip() + self.collector_addr = self.collector_addr[ + :self.collector_addr.index('#')].strip() except: self.collector_addr.strip() except NoOptionError as e: @@ -163,7 +180,8 @@ def check_ntp_status(self): self.fail_status_bits &= ~self.FAIL_STATUS_NTP_SYNC self.send_nodemgr_process_status() - def send_process_state_db_base(self, group_names, ProcessInfo, NodeStatus, NodeStatusUVE): + def send_process_state_db_base(self, group_names, ProcessInfo, + NodeStatus, NodeStatusUVE): name = socket.gethostname() for group in group_names: process_infos = [] @@ -196,18 +214,22 @@ def send_process_state_db_base(self, group_names, ProcessInfo, NodeStatus, NodeS node_status.deleted = delete_status node_status.process_info = process_infos node_status.all_core_file_list = self.all_core_file_list - node_status_uve = NodeStatusUVE(data = node_status) + node_status_uve = NodeStatusUVE(data=node_status) sys.stderr.write('Sending UVE:' + str(node_status_uve)) node_status_uve.send() def send_all_core_file(self): stat_command_option = "stat --printf=%Y /var/crashes" - modified_time = Popen(stat_command_option.split(), stdout=PIPE).communicate() + modified_time = Popen( + stat_command_option.split(), + stdout=PIPE).communicate() if modified_time[0] == self.core_dir_modified_time: return self.core_dir_modified_time = modified_time[0] ls_command_option = "ls /var/crashes" - (corename, stderr) = Popen(ls_command_option.split(), stdout=PIPE).communicate() + (corename, stderr) = Popen( + ls_command_option.split(), + stdout=PIPE).communicate() self.all_core_file_list = corename.split('\n')[0:-1] self.send_process_state_db(self.group_names) @@ -228,52 +250,76 @@ def send_process_state(self, pname, pstate, pheaders): send_uve = False if (pstate == 'PROCESS_STATE_RUNNING'): proc_stat.start_count += 1 - proc_stat.start_time = str(int(time.time()*1000000)) + proc_stat.start_time = str(int(time.time() * 1000000)) send_uve = True if (pstate == 'PROCESS_STATE_STOPPED'): proc_stat.stop_count += 1 send_uve = True - proc_stat.stop_time = str(int(time.time()*1000000)) + proc_stat.stop_time = str(int(time.time() * 1000000)) proc_stat.last_exit_unexpected = False if (pstate == 'PROCESS_STATE_EXITED'): proc_stat.exit_count += 1 send_uve = True - proc_stat.exit_time = str(int(time.time()*1000000)) + proc_stat.exit_time = str(int(time.time() * 1000000)) if not(int(pheaders['expected'])): - self.stderr.write(pname + " with pid:" + pheaders['pid'] + " exited abnormally\n") + self.stderr.write( + pname + " with pid:" + pheaders['pid'] + + " exited abnormally\n") proc_stat.last_exit_unexpected = True # check for core file for this exit - find_command_option = "find /var/crashes -name core.[A-Za-z]*."+ pheaders['pid'] + "*" - self.stderr.write("find command option for cores:" + find_command_option + "\n") - (corename, stderr) = Popen(find_command_option.split(), stdout=PIPE).communicate() + find_command_option = \ + "find /var/crashes -name core.[A-Za-z]*." + \ + pheaders['pid'] + "*" + self.stderr.write( + "find command option for cores:" + + find_command_option + "\n") + (corename, stderr) = Popen( + find_command_option.split(), + stdout=PIPE).communicate() self.stderr.write("core file: " + corename + "\n") if ((corename is not None) and (len(corename.rstrip()) >= 1)): - # before adding to the core file list make sure that we do not have too many cores - sys.stderr.write('core_file_list:'+str(proc_stat.core_file_list)+", self.max_cores:"+str(self.max_cores)+"\n") + # before adding to the core file list make + # sure that we do not have too many cores + sys.stderr.write( + 'core_file_list:' + str(proc_stat.core_file_list) + + ", self.max_cores:" + str(self.max_cores) + "\n") if (len(proc_stat.core_file_list) == self.max_cores): # get rid of old cores - sys.stderr.write('max # of cores reached:' + str(self.max_cores) + "\n") - core_files_to_be_deleted = proc_stat.core_file_list[self.max_old_cores:(self.max_cores - self.max_new_cores+1)] - sys.stderr.write('deleting core file list:' + str(core_files_to_be_deleted) + "\n") + sys.stderr.write( + 'max # of cores reached:' + + str(self.max_cores) + "\n") + val = self.max_cores - self.max_new_cores + 1 + core_files_to_be_deleted = \ + proc_stat.core_file_list[self.max_old_cores:(val)] + sys.stderr.write( + 'deleting core file list:' + + str(core_files_to_be_deleted) + "\n") for core_file in core_files_to_be_deleted: - sys.stderr.write('deleting core file:' + core_file + "\n") + sys.stderr.write( + 'deleting core file:' + core_file + "\n") try: os.remove(core_file) except: pass # now delete the list as well - del proc_stat.core_file_list[self.max_old_cores:(self.max_cores - self.max_new_cores+1)] + val = self.max_cores - self.max_new_cores + 1 + del proc_stat.core_file_list[self.max_old_cores:(val)] # now add the new core to the core file list proc_stat.core_file_list.append(corename.rstrip()) - sys.stderr.write("# of cores for " + pname + ":" + str(len(proc_stat.core_file_list)) + "\n") + sys.stderr.write( + "# of cores for " + pname + ":" + + str(len(proc_stat.core_file_list)) + "\n") # update process state database self.process_state_db[pname] = proc_stat - f = open('/var/log/contrail/process_state' + self.node_type + ".json", 'w') - f.write(json.dumps(self.process_state_db, default=lambda obj: obj.__dict__)) + f = open('/var/log/contrail/process_state' + + self.node_type + ".json", 'w') + f.write(json.dumps( + self.process_state_db, + default=lambda obj: obj.__dict__)) if not(send_uve): return @@ -281,24 +327,30 @@ def send_process_state(self, pname, pstate, pheaders): if (send_uve): self.send_process_state_db([proc_stat.group]) - def send_nodemgr_process_status_base(self, ProcessStateNames, ProcessState, ProcessStatus, NodeStatus, NodeStatusUVE): + def send_nodemgr_process_status_base(self, ProcessStateNames, + ProcessState, ProcessStatus, + NodeStatus, NodeStatusUVE): if (self.prev_fail_status_bits != self.fail_status_bits): self.prev_fail_status_bits = self.fail_status_bits fail_status_bits = self.fail_status_bits state, description = self.get_process_state(fail_status_bits) - process_status = ProcessStatus(module_id = self.module_id, instance_id = self.instance_id, state = state, - description = description) + process_status = ProcessStatus( + module_id=self.module_id, instance_id=self.instance_id, + state=state, description=description) process_status_list = [] process_status_list.append(process_status) - node_status = NodeStatus(name = socket.gethostname(), - process_status = process_status_list) - node_status_uve = NodeStatusUVE(data = node_status) + node_status = NodeStatus( + name=socket.gethostname(), + process_status=process_status_list) + node_status_uve = NodeStatusUVE(data=node_status) sys.stderr.write('Sending UVE:' + str(node_status_uve)) node_status_uve.send() - def send_disk_usage_info_base(self, NodeStatusUVE, NodeStatus, DiskPartitionUsageStats): - partition = subprocess.Popen("df -T -t ext2 -t ext3 -t ext4 -t xfs", - shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + def send_disk_usage_info_base(self, NodeStatusUVE, NodeStatus, + DiskPartitionUsageStats): + partition = subprocess.Popen( + "df -T -t ext2 -t ext3 -t ext4 -t xfs", + shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) disk_usage_infos = [] for line in partition.stdout: if 'Filesystem' in line: @@ -311,22 +363,25 @@ def send_disk_usage_info_base(self, NodeStatusUVE, NodeStatus, DiskPartitionUsag try: disk_usage_stat.partition_type = str(partition_type) disk_usage_stat.partition_name = str(partition_name) - disk_usage_stat.partition_space_used_1k = int(partition_space_used_1k) - disk_usage_stat.partition_space_available_1k = int(partition_space_available_1k) + disk_usage_stat.partition_space_used_1k = \ + int(partition_space_used_1k) + disk_usage_stat.partition_space_available_1k = \ + int(partition_space_available_1k) except ValueError: sys.stderr.write("Failed to get local disk space usage" + "\n") else: disk_usage_infos.append(disk_usage_stat) # send node UVE - node_status = NodeStatus(name = socket.gethostname(), - disk_usage_info = disk_usage_infos) - node_status_uve = NodeStatusUVE(data = node_status) + node_status = NodeStatus( + name=socket.gethostname(), disk_usage_info=disk_usage_infos) + node_status_uve = NodeStatusUVE(data=node_status) sys.stderr.write('Sending UVE:' + str(node_status_uve)) node_status_uve.send() # end send_disk_usage_info - def get_process_state_base(self, fail_status_bits, ProcessStateNames, ProcessState): + def get_process_state_base(self, fail_status_bits, + ProcessStateNames, ProcessState): if fail_status_bits: state = ProcessStateNames[ProcessState.NON_FUNCTIONAL] description = self.get_failbits_nodespecific_desc(fail_status_bits) @@ -344,38 +399,45 @@ def get_failbits_nodespecific_desc(self, fail_status_bits): return "" def event_process_state(self, pheaders, headers): - self.stderr.write("process:" + pheaders['processname'] + "," + "groupname:" + pheaders['groupname'] + "," + "eventname:" + headers['eventname'] + '\n') + self.stderr.write("process:" + pheaders['processname'] + "," + + "groupname:" + pheaders['groupname'] + "," + + "eventname:" + headers['eventname'] + '\n') pname = pheaders['processname'] if (pheaders['processname'] != pheaders['groupname']): - pname = pheaders['groupname'] + ":" + pheaders['processname'] + pname = pheaders['groupname'] + ":" + pheaders['processname'] self.send_process_state(pname, headers['eventname'], pheaders) for rules in self.rules_data['Rules']: if 'processname' in rules: - if ((rules['processname'] == pheaders['groupname']) and (rules['process_state'] == headers['eventname'])): - self.stderr.write("got a hit with:" + str(rules) + '\n') - # do not make async calls - try: - ret_code = subprocess.call([rules['action']], - shell=True, stdout=self.stderr, - stderr=self.stderr) - except Exception as e: - self.stderr.write('Failed to execute action: ' \ - + rules['action'] + ' with err ' + str(e) + '\n') - else: - if ret_code: - self.stderr.write('Execution of action ' + \ - rules['action'] + ' returned err ' + \ - str(ret_code) + '\n') + if ((rules['processname'] == pheaders['groupname']) and + (rules['process_state'] == headers['eventname'])): + self.stderr.write("got a hit with:" + str(rules) + '\n') + # do not make async calls + try: + ret_code = subprocess.call( + [rules['action']], shell=True, + stdout=self.stderr, stderr=self.stderr) + except Exception as e: + self.stderr.write( + 'Failed to execute action: ' + + rules['action'] + ' with err ' + str(e) + '\n') + else: + if ret_code: + self.stderr.write( + 'Execution of action ' + + rules['action'] + ' returned err ' + + str(ret_code) + '\n') def event_process_communication(self, pdata): flag_and_value = pdata.partition(":") - self.stderr.write("Flag:" + flag_and_value[0] + " Value:" + flag_and_value[2] + "\n") + self.stderr.write("Flag:" + flag_and_value[0] + + " Value:" + flag_and_value[2] + "\n") for rules in self.rules_data['Rules']: if 'flag_name' in rules: - if ((rules['flag_name'] == flag_and_value[0]) and (rules['flag_value'].strip() == flag_and_value[2].strip())): - self.stderr.write("got a hit with:" + str(rules) + '\n') - cmd_and_args = ['/usr/bin/bash', '-c' , rules['action']] - subprocess.Popen(cmd_and_args) + if ((rules['flag_name'] == flag_and_value[0]) and + (rules['flag_value'].strip() == flag_and_value[2].strip())): + self.stderr.write("got a hit with:" + str(rules) + '\n') + cmd_and_args = ['/usr/bin/bash', '-c', rules['action']] + subprocess.Popen(cmd_and_args) def event_tick_60(self, prev_current_time): self.tick_count += 1 @@ -383,30 +445,47 @@ def event_tick_60(self, prev_current_time): self.send_all_core_file() # send disk usage info periodically self.send_disk_usage_info() - # typical ntp sync time is about 5 min - first time, we scan only after 10 min + # typical ntp sync time is about 5 min - first time, + # we scan only after 10 min if self.tick_count >= 10: self.check_ntp_status() current_time = int(time.time()) if ((abs(current_time - prev_current_time)) > 300): - #update all process start_times with the updated time - #Compute the elapsed time and subtract them from current time to get updated values - sys.stderr.write("Time lapse detected " + str(abs(current_time - prev_current_time)) + "\n") + # update all process start_times with the updated time + # Compute the elapsed time and subtract them from + # current time to get updated values + sys.stderr.write( + "Time lapse detected " + + str(abs(current_time - prev_current_time)) + "\n") for key in self.process_state_db: pstat = self.process_state_db[key] if pstat.start_time is not '': - pstat.start_time = str((int(current_time - (prev_current_time-((int)(pstat.start_time))/1000000)))*1000000) + pstat.start_time = str( + (int(current_time - (prev_current_time - + ((int)(pstat.start_time)) / 1000000))) * 1000000) if (pstat.process_state == 'PROCESS_STATE_STOPPED'): if pstat.stop_time is not '': - pstat.stop_time = str(int(current_time - (prev_current_time-((int)(pstat.stop_time))/1000000))*1000000) + pstat.stop_time = str( + int(current_time - (prev_current_time - + ((int)(pstat.stop_time)) / 1000000)) * + 1000000) if (pstat.process_state == 'PROCESS_STATE_EXITED'): if pstat.exit_time is not '': - pstat.exit_time = str(int(current_time - (prev_current_time-((int)(pstat.exit_time))/1000000))*1000000) + pstat.exit_time = str( + int(current_time - (prev_current_time - + ((int)(pstat.exit_time)) / 1000000)) * + 1000000) # update process state database self.process_state_db[key] = pstat try: - f = open('/var/log/contrail/process_state' + self.node_type + ".json", 'w') - f.write(json.dumps(self.process_state_db, default=lambda obj: obj.__dict__)) + json_file = '/var/log/contrail/process_state' + \ + self.node_type + ".json" + f = open(json_file, 'w') + f.write( + json.dumps( + self.process_state_db, + default=lambda obj: obj.__dict__)) except: sys.stderr.write("Unable to write json") pass @@ -420,8 +499,9 @@ def runforever(self, test=False): gevent.sleep(1) # we explicitly use self.stdin, self.stdout, and self.stderr # instead of sys.* so we can unit test this code - headers, payload = self.listener_nodemgr.wait(self.stdin, self.stdout) - pheaders, pdata = childutils.eventdata(payload+'\n') + headers, payload = self.listener_nodemgr.wait( + self.stdin, self.stdout) + pheaders, pdata = childutils.eventdata(payload + '\n') # check for process state change events if headers['eventname'].startswith("PROCESS_STATE"): diff --git a/src/nodemgr/HaproxyStats.py b/src/nodemgr/haproxy_stats.py similarity index 100% rename from src/nodemgr/HaproxyStats.py rename to src/nodemgr/haproxy_stats.py diff --git a/src/nodemgr/LoadbalancerStats.py b/src/nodemgr/load_balancer_stats.py similarity index 97% rename from src/nodemgr/LoadbalancerStats.py rename to src/nodemgr/load_balancer_stats.py index c63b61c2acb..386fd5c32e5 100644 --- a/src/nodemgr/LoadbalancerStats.py +++ b/src/nodemgr/load_balancer_stats.py @@ -1,6 +1,6 @@ import os -from HaproxyStats import HaproxyStats +from haproxy_stats import HaproxyStats from vrouter.loadbalancer.ttypes import \ UveLoadbalancerTrace, UveLoadbalancer, UveLoadbalancerStats diff --git a/src/nodemgr/main.py b/src/nodemgr/main.py index 2701e5cb047..75365a570a9 100755 --- a/src/nodemgr/main.py +++ b/src/nodemgr/main.py @@ -1,37 +1,45 @@ +# +# Copyright (c) 2015 Juniper Networks, Inc. All rights reserved. +# + #!/usr/bin/python -from gevent import monkey; monkey.patch_all() +from gevent import monkey +monkey.patch_all() import os import sys import argparse import socket import gevent -from nodemgr.AnalyticsNodemgr import AnalyticsEventManager -from nodemgr.ControlNodemgr import ControlEventManager -from nodemgr.ConfigNodemgr import ConfigEventManager -from nodemgr.VrouterNodemgr import VrouterEventManager -from nodemgr.DatabaseNodemgr import DatabaseEventManager +from nodemgr.analytics_event_manager import AnalyticsEventManager +from nodemgr.control_event_manager import ControlEventManager +from nodemgr.config_event_manager import ConfigEventManager +from nodemgr.vrouter_event_manager import VrouterEventManager +from nodemgr.database_event_manager import DatabaseEventManager + def main(argv=sys.argv): -# Parse Arguments - parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) + # Parse Arguments + parser = argparse.ArgumentParser( + formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument("--rules", - default = '', - help = 'Rules file to use for processing events') + default='', + help='Rules file to use for processing events') parser.add_argument("--nodetype", - default = 'contrail-analytics', - help = 'Type of node which nodemgr is managing') + default='contrail-analytics', + help='Type of node which nodemgr is managing') parser.add_argument("--discovery_server", - default = socket.gethostname(), - help = 'IP address of Discovery Server') + default=socket.gethostname(), + help='IP address of Discovery Server') parser.add_argument("--collectors", - default = '', - help = 'Collector addresses in format ip1:port1 ip2:port2') + default='', + help='Collector addresses in format' + + 'ip1:port1 ip2:port2') parser.add_argument("--discovery_port", - type = int, - default = 5998, - help = 'Port of Discovery Server') + type=int, + default=5998, + help='Port of Discovery Server') try: _args = parser.parse_args() except: @@ -47,7 +55,7 @@ def main(argv=sys.argv): else: collector_addr = _args.collectors.split() sys.stderr.write("Collector address: " + str(collector_addr) + "\n") - #done parsing arguments + # done parsing arguments if not 'SUPERVISOR_SERVER_URL' in os.environ: sys.stderr.write('Node manager must be run as a supervisor event ' @@ -56,17 +64,28 @@ def main(argv=sys.argv): return prog = None if (node_type == 'contrail-analytics'): - prog = AnalyticsEventManager(rule_file, discovery_server, discovery_port, collector_addr) + prog = AnalyticsEventManager( + rule_file, discovery_server, + discovery_port, collector_addr) elif (node_type == 'contrail-config'): - prog = ConfigEventManager(rule_file, discovery_server, discovery_port, collector_addr) + prog = ConfigEventManager( + rule_file, discovery_server, + discovery_port, collector_addr) elif (node_type == 'contrail-control'): - prog = ControlEventManager(rule_file, discovery_server, discovery_port, collector_addr) + prog = ControlEventManager( + rule_file, discovery_server, + discovery_port, collector_addr) elif (node_type == 'contrail-vrouter'): - prog = VrouterEventManager(rule_file, discovery_server, discovery_port, collector_addr) + prog = VrouterEventManager( + rule_file, discovery_server, + discovery_port, collector_addr) elif (node_type == 'contrail-database'): - prog = DatabaseEventManager(rule_file, discovery_server, discovery_port, collector_addr) + prog = DatabaseEventManager( + rule_file, discovery_server, + discovery_port, collector_addr) else: - return + sys.stderr.write("Node type" + str(node_type) + "is incorrect" + "\n") + return prog.process() prog.send_nodemgr_process_status() prog.send_process_state_db(prog.group_names) diff --git a/src/nodemgr/ProcessStat.py b/src/nodemgr/process_stat.py similarity index 86% rename from src/nodemgr/ProcessStat.py rename to src/nodemgr/process_stat.py index 631f58bf615..e651bfd06e3 100644 --- a/src/nodemgr/ProcessStat.py +++ b/src/nodemgr/process_stat.py @@ -1,5 +1,10 @@ +# +# Copyright (c) 2015 Juniper Networks, Inc. All rights reserved. +# + import socket + class ProcessStat(object): def __init__(self, pname): self.start_count = 0 diff --git a/src/nodemgr/setup.py b/src/nodemgr/setup.py index a7403af26fd..4f4b927eb4e 100644 --- a/src/nodemgr/setup.py +++ b/src/nodemgr/setup.py @@ -11,8 +11,8 @@ package_data={'': ['*.html', '*.css', '*.xml']}, zip_safe=False, long_description="Nodemgr Implementation", - entry_points = { - 'console_scripts' : [ + entry_points={ + 'console_scripts': [ 'contrail-nodemgr = nodemgr.main:main', ], }, diff --git a/src/nodemgr/VrouterNodemgr.py b/src/nodemgr/vrouter_event_manager.py similarity index 60% rename from src/nodemgr/VrouterNodemgr.py rename to src/nodemgr/vrouter_event_manager.py index db73d2de74b..63fe172cf1b 100644 --- a/src/nodemgr/VrouterNodemgr.py +++ b/src/nodemgr/vrouter_event_manager.py @@ -1,6 +1,11 @@ +# +# Copyright (c) 2015 Juniper Networks, Inc. All rights reserved. +# + doc = " " -from gevent import monkey; monkey.patch_all() +from gevent import monkey +monkey.patch_all() import os import sys import socket @@ -13,8 +18,8 @@ import gevent import ConfigParser -from nodemgr.EventManager import EventManager -from nodemgr.VrouterProcessStat import VrouterProcessStat +from nodemgr.event_manager import EventManager +from nodemgr.vrouter_process_stat import VrouterProcessStat from ConfigParser import NoOptionError @@ -36,38 +41,47 @@ from vrouter.vrouter.process_info.constants import \ ProcessStateNames -import LoadbalancerStats as lb_stats +import load_balancer_stats as lb_stats def usage(): print doc sys.exit(255) + class VrouterEventManager(EventManager): - def __init__(self, rule_file, discovery_server, discovery_port, collector_addr): - EventManager.__init__(self, rule_file, discovery_server, discovery_port, collector_addr) + def __init__(self, rule_file, discovery_server, + discovery_port, collector_addr): + EventManager.__init__(self, rule_file, discovery_server, + discovery_port, collector_addr) self.node_type = "contrail-vrouter" self.module = Module.COMPUTE_NODE_MGR - self.module_id = ModuleNames[self.module] + self.module_id = ModuleNames[self.module] self.supervisor_serverurl = "unix:///tmp/supervisord_vrouter.sock" self.add_current_process() os_nova_comp = VrouterProcessStat('openstack-nova-compute') - (os_nova_comp_state, error_value) = Popen("openstack-status | grep openstack-nova-compute | cut -d ':' -f2", shell=True, stdout=PIPE).communicate() + popen_cmd = "openstack-status | grep openstack-nova-compute" + \ + " | cut -d ':' -f2" + (os_nova_comp_state, error_value) = \ + Popen(popen_cmd, shell=True, stdout=PIPE).communicate() os_nova_comp.process_state = os_nova_comp_state.strip() if (os_nova_comp.process_state == 'active'): os_nova_comp.process_state = 'PROCESS_STATE_RUNNING' - os_nova_comp.start_time = str(int(time.time()*1000000)) + os_nova_comp.start_time = str(int(time.time() * 1000000)) os_nova_comp.start_count += 1 if (os_nova_comp.process_state == 'dead'): os_nova_comp.process_state = 'PROCESS_STATE_FATAL' - sys.stderr.write('Openstack Nova Compute status:' + os_nova_comp.process_state + "\n") + sys.stderr.write('Openstack Nova Compute status:' + + os_nova_comp.process_state + "\n") self.process_state_db['openstack-nova-compute'] = os_nova_comp - #end __init__ + # end __init__ def process(self): if self.rule_file is '': - self.rule_file = "/etc/contrail/supervisord_vrouter_files/contrail-vrouter.rules" + self.rule_file = \ + "/etc/contrail/supervisord_vrouter_files/" + \ + "contrail-vrouter.rules" json_file = open(self.rule_file) self.rules_data = json.load(json_file) node_type = Module2NodeType[self.module] @@ -76,32 +90,40 @@ def process(self): Config = self.read_config_data(config_file) self.get_collector_list(Config) _disc = self.get_discovery_client(Config) - sandesh_global.init_generator(self.module_id, socket.gethostname(), + sandesh_global.init_generator( + self.module_id, socket.gethostname(), node_type_name, self.instance_id, self.collector_addr, - self.module_id, 8102, ['vrouter.vrouter'],_disc) - #sandesh_global.set_logging_params(enable_local_log=True) + self.module_id, 8102, ['vrouter.vrouter'], _disc) + # sandesh_global.set_logging_params(enable_local_log=True) self.sandesh_global = sandesh_global def send_process_state_db(self, group_names): - self.send_process_state_db_base(group_names, ProcessInfo, NodeStatus, NodeStatusUVE) + self.send_process_state_db_base( + group_names, ProcessInfo, NodeStatus, NodeStatusUVE) def send_nodemgr_process_status(self): - self.send_nodemgr_process_status_base(ProcessStateNames, ProcessState, ProcessStatus, NodeStatus, NodeStatusUVE) + self.send_nodemgr_process_status_base( + ProcessStateNames, ProcessState, ProcessStatus, + NodeStatus, NodeStatusUVE) def get_process_state(self, fail_status_bits): - return self.get_process_state_base(fail_status_bits, ProcessStateNames, ProcessState) + return self.get_process_state_base( + fail_status_bits, ProcessStateNames, ProcessState) def send_disk_usage_info(self): - self.send_disk_usage_info_base(NodeStatusUVE, NodeStatus, DiskPartitionUsageStats) + self.send_disk_usage_info_base( + NodeStatusUVE, NodeStatus, DiskPartitionUsageStats) def get_process_stat_object(self, pname): return VrouterProcessStat(pname) - # overridden delete_process_handler - ignore delete in case of openstack-nova-compute + # overridden delete_process_handler - + # ignore delete in case of openstack-nova-compute def delete_process_handler(self, deleted_process): if deleted_process == 'openstack-nova-compute': return - super(VrouterEventManager, self).delete_process_handler(deleted_process) + super(VrouterEventManager, + self).delete_process_handler(deleted_process) # end delete_process_handler def runforever(self, test=False): @@ -110,14 +132,15 @@ def runforever(self, test=False): gevent.sleep(1) # we explicitly use self.stdin, self.stdout, and self.stderr # instead of sys.* so we can unit test this code - headers, payload = self.listener_nodemgr.wait(self.stdin, self.stdout) + headers, payload = \ + self.listener_nodemgr.wait(self.stdin, self.stdout) - #self.stderr.write("headers:\n" + str(headers) + '\n') - #self.stderr.write("payload:\n" + str(payload) + '\n') + # self.stderr.write("headers:\n" + str(headers) + '\n') + # self.stderr.write("payload:\n" + str(payload) + '\n') - pheaders, pdata = childutils.eventdata(payload+'\n') - #self.stderr.write("pheaders:\n" + str(pheaders)+'\n') - #self.stderr.write("pdata:\n" + str(pdata)) + pheaders, pdata = childutils.eventdata(payload + '\n') + # self.stderr.write("pheaders:\n" + str(pheaders)+'\n') + # self.stderr.write("pdata:\n" + str(pdata)) # check for process state change events if headers['eventname'].startswith("PROCESS_STATE"): @@ -132,7 +155,10 @@ def runforever(self, test=False): # do periodic events if headers['eventname'].startswith("TICK_60"): os_nova_comp = self.process_state_db['openstack-nova-compute'] - (os_nova_comp_state, error_value) = Popen("openstack-status | grep openstack-nova-compute | cut -d ':' -f2", shell=True, stdout=PIPE).communicate() + popen_cmd = "openstack-status | " + \ + "grep openstack-nova-compute | cut -d ':' -f2" + (os_nova_comp_state, error_value) = \ + Popen(popen_cmd, shell=True, stdout=PIPE).communicate() if (os_nova_comp_state.strip() == 'active'): os_nova_comp_state = 'PROCESS_STATE_RUNNING' if (os_nova_comp_state.strip() == 'dead'): @@ -141,20 +167,26 @@ def runforever(self, test=False): os_nova_comp_state = 'PROCESS_STATE_STOPPED' if (os_nova_comp.process_state != os_nova_comp_state): os_nova_comp.process_state = os_nova_comp_state.strip() - sys.stderr.write('Openstack Nova Compute status changed to:' + os_nova_comp.process_state + "\n") + msg = 'Openstack Nova Compute status changed to:' + sys.stderr.write(msg + os_nova_comp.process_state + "\n") if (os_nova_comp.process_state == 'PROCESS_STATE_RUNNING'): - os_nova_comp.start_time = str(int(time.time()*1000000)) + os_nova_comp.start_time = \ + str(int(time.time() * 1000000)) os_nova_comp.start_count += 1 if (os_nova_comp.process_state == 'PROCESS_STATE_FATAL'): - os_nova_comp.exit_time = str(int(time.time()*1000000)) + os_nova_comp.exit_time = \ + str(int(time.time() * 1000000)) os_nova_comp.exit_count += 1 if (os_nova_comp.process_state == 'PROCESS_STATE_STOPPED'): - os_nova_comp.stop_time = str(int(time.time()*1000000)) + os_nova_comp.stop_time = \ + str(int(time.time() * 1000000)) os_nova_comp.stop_count += 1 - self.process_state_db['openstack-nova-compute'] = os_nova_comp + self.process_state_db['openstack-nova-compute'] = \ + os_nova_comp self.send_process_state_db('vrouter_group') else: - sys.stderr.write('Openstack Nova Compute status unchanged at:' + os_nova_comp.process_state + "\n") + msg = 'Openstack Nova Compute status unchanged at:' + sys.stderr.write(msg + os_nova_comp.process_state + "\n") self.process_state_db['openstack-nova-compute'] = os_nova_comp prev_current_time = self.event_tick_60(prev_current_time) diff --git a/src/nodemgr/VrouterProcessStat.py b/src/nodemgr/vrouter_process_stat.py similarity index 56% rename from src/nodemgr/VrouterProcessStat.py rename to src/nodemgr/vrouter_process_stat.py index cdcb366a220..0879e21d87b 100644 --- a/src/nodemgr/VrouterProcessStat.py +++ b/src/nodemgr/vrouter_process_stat.py @@ -1,10 +1,15 @@ +# +# Copyright (c) 2015 Juniper Networks, Inc. All rights reserved. +# + import os from StringIO import StringIO import ConfigParser import sys import socket -from nodemgr.ProcessStat import ProcessStat +from nodemgr.process_stat import ProcessStat + class VrouterProcessStat(ProcessStat): def __init__(self, pname): @@ -12,43 +17,54 @@ def __init__(self, pname): (self.group, self.name) = self.get_vrouter_process_info(pname) def get_vrouter_process_info(self, proc_name): - for root, dirs, files in os.walk("/etc/contrail/supervisord_vrouter_files"): + vrouter_file = "/etc/contrail/supervisord_vrouter_files" + for root, dirs, files in os.walk(vrouter_file): for file in files: if file.endswith(".ini"): - filename = '/etc/contrail/supervisord_vrouter_files/' + file - data = StringIO('\n'.join(line.strip() for line in open(filename))) + filename = \ + '/etc/contrail/supervisord_vrouter_files/' + file + data = StringIO('\n'.join(line.strip() + for line in open(filename))) Config = ConfigParser.SafeConfigParser() Config.readfp(data) sections = Config.sections() if not sections[0]: - sys.stderr.write("Section not present in the ini file : " + filename + "\n") + msg = "Section not present in the ini file : " + sys.stderr.write(msg + filename + "\n") continue name = sections[0].split(':') if len(name) < 2: - sys.stderr.write("Incorrect section name in the ini file : " + filename + "\n") + msg = "Incorrect section name in the ini file : " + sys.stderr.write(msg + filename + "\n") continue if name[1] == proc_name: command = Config.get(sections[0], "command") if not command: - sys.stderr.write("Command not present in the ini file : " + filename + "\n") + msg = "Command not present in the ini file : " + sys.stderr.write(msg + filename + "\n") continue args = command.split() if (args[0] == '/usr/bin/contrail-tor-agent'): try: index = args.index('--config_file') - agent_name = self.get_vrouter_tor_agent_name(args[index + 1]) + args_val = args[index + 1] + agent_name = \ + self.get_vrouter_tor_agent_name(args_val) return (proc_name, agent_name) except Exception, err: - sys.stderr.write("Tor Agent command does not have config file : " + command + "\n") + msg = "Tor Agent command does " + \ + "not have config file : " + sys.stderr.write(msg + command + "\n") return ('vrouter_group', socket.gethostname()) - #end get_vrouter_process_info + # end get_vrouter_process_info # Read agent_name from vrouter-tor-agent conf file def get_vrouter_tor_agent_name(self, conf_file): tor_agent_name = None if conf_file: try: - data = StringIO('\n'.join(line.strip() for line in open(conf_file))) + data = StringIO('\n'.join(line.strip() + for line in open(conf_file))) Config = ConfigParser.SafeConfigParser() Config.readfp(data) except Exception, err: @@ -57,4 +73,4 @@ def get_vrouter_tor_agent_name(self, conf_file): return tor_agent_name tor_agent_name = Config.get("DEFAULT", "agent_name") return tor_agent_name - #end get_vrouter_tor_agent_name + # end get_vrouter_tor_agent_name