Skip to content

Commit

Permalink
Nodemgr to start using sandesh logger
Browse files Browse the repository at this point in the history
Nodemgr logs using stderr which does not add timestamp. Now it will use
sandesh logger which automatically adds timestamp to the logs.

Change-Id: I7fd888b1c381b851359d5bfcd5052bd8cee93aac
Closes-Bug: 1466473
  • Loading branch information
bansalnikhil committed Dec 1, 2015
1 parent 69dd345 commit 50e7af5
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 93 deletions.
19 changes: 9 additions & 10 deletions src/nodemgr/analytics_nodemgr/analytics_event_manager.py
Expand Up @@ -36,20 +36,12 @@ def __init__(self, rule_file, discovery_server,
discovery_port, collector_addr):
EventManager.__init__(
self, rule_file, discovery_server,
discovery_port, collector_addr)
discovery_port, collector_addr, sandesh_global)
self.node_type = 'contrail-analytics'
self.module = Module.ANALYTICS_NODE_MGR
self.module_id = ModuleNames[self.module]
self.supervisor_serverurl = "unix:///tmp/supervisord_analytics.sock"
self.add_current_process()
# end __init__

def process(self):
if self.rule_file is '':
self.rule_file = "/etc/contrail/" + \
"supervisord_analytics_files/contrail-analytics.rules"
json_file = open(self.rule_file)
self.rules_data = json.load(json_file)
node_type = Module2NodeType[self.module]
node_type_name = NodeTypeNames[node_type]
_disc = self.get_discovery_client()
Expand All @@ -58,7 +50,14 @@ def process(self):
node_type_name, self.instance_id, self.collector_addr,
self.module_id, 8104, ['analytics'], _disc)
sandesh_global.set_logging_params(enable_local_log=True)
self.sandesh_global = sandesh_global
# end __init__

def process(self):
if self.rule_file is '':
self.rule_file = "/etc/contrail/" + \
"supervisord_analytics_files/contrail-analytics.rules"
json_file = open(self.rule_file)
self.rules_data = json.load(json_file)

def send_process_state_db(self, group_names):
self.send_process_state_db_base(
Expand Down
53 changes: 34 additions & 19 deletions src/nodemgr/common/event_manager.py
Expand Up @@ -22,6 +22,8 @@
from nodemgr.common.process_stat import ProcessStat
from sandesh_common.vns.constants import INSTANCE_ID_DEFAULT
import discoveryclient.client as client
from pysandesh.sandesh_logger import *
from pysandesh.gen_py.sandesh.ttypes import SandeshLevel


class EventManager(object):
Expand All @@ -35,7 +37,7 @@ class EventManager(object):
FAIL_STATUS_DISK_SPACE_NA = 0x10

def __init__(self, rule_file, discovery_server,
discovery_port, collector_addr):
discovery_port, collector_addr, sandesh_global):
self.stdin = sys.stdin
self.stdout = sys.stdout
self.stderr = sys.stderr
Expand All @@ -54,7 +56,7 @@ def __init__(self, rule_file, discovery_server,
self.discovery_port = discovery_port
self.collector_addr = collector_addr
self.listener_nodemgr = EventListenerProtocolNodeMgr()
self.sandesh_global = None
self.sandesh_global = sandesh_global

# Get all the current processes in the node
def get_current_process(self):
Expand Down Expand Up @@ -165,7 +167,9 @@ def send_process_state_db_base(self, group_names, ProcessInfo,
node_status.process_info = process_infos
node_status.all_core_file_list = self.all_core_file_list
node_status_uve = NodeStatusUVE(data=node_status)
sys.stderr.write('Sending UVE:' + str(node_status_uve))
msg = 'Sending UVE:' + str(node_status_uve)
self.sandesh_global.logger().log(SandeshLogger.get_py_logger_level(
SandeshLevel.SYS_INFO), msg)
node_status_uve.send()

def send_all_core_file(self):
Expand Down Expand Up @@ -293,7 +297,9 @@ def send_nodemgr_process_status_base(self, ProcessStateNames,
name=socket.gethostname(),
process_status=process_status_list)
node_status_uve = NodeStatusUVE(data=node_status)
sys.stderr.write('Sending UVE:' + str(node_status_uve))
msg = 'Sending UVE:' + str(node_status_uve)
self.sandesh_global.logger().log(SandeshLogger.get_py_logger_level(
SandeshLevel.SYS_INFO), msg)
node_status_uve.send()

def send_disk_usage_info_base(self, NodeStatusUVE, NodeStatus,
Expand Down Expand Up @@ -326,7 +332,9 @@ def send_disk_usage_info_base(self, NodeStatusUVE, NodeStatus,
node_status = NodeStatus(
name=socket.gethostname(), disk_usage_info=disk_usage_infos)
node_status_uve = NodeStatusUVE(data=node_status)
sys.stderr.write('Sending UVE:' + str(node_status_uve))
msg = 'Sending UVE:' + str(node_status_uve)
self.sandesh_global.logger().log(SandeshLogger.get_py_logger_level(
SandeshLevel.SYS_INFO), msg)
node_status_uve.send()
# end send_disk_usage_info

Expand All @@ -349,9 +357,9 @@ def get_failbits_nodespecific_desc(self, fail_status_bits):
return ""

def event_process_state(self, pheaders, headers):
self.stderr.write("process:" + pheaders['processname'] + "," +
"groupname:" + pheaders['groupname'] + "," +
"eventname:" + headers['eventname'] + '\n')
msg = ("process:" + pheaders['processname'] + "," + "groupname:" +
pheaders['groupname'] + "," + "eventname:" + headers['eventname'])
self.sandesh_global.logger().log(SandeshLogger.get_py_logger_level(SandeshLevel.SYS_DEBUG), msg)
pname = pheaders['processname']
if (pheaders['processname'] != pheaders['groupname']):
pname = pheaders['groupname'] + ":" + pheaders['processname']
Expand All @@ -360,32 +368,39 @@ def event_process_state(self, pheaders, headers):
if 'processname' in rules:
if ((rules['processname'] == pheaders['groupname']) and
(rules['process_state'] == headers['eventname'])):
self.stderr.write("got a hit with:" + str(rules) + '\n')
msg = "got a hit with:" + str(rules)
self.sandesh_global.logger().logger.log(SandeshLogger.get_py_logger_level(
SandeshLevel.SYS_DEBUG), msg)
# do not make async calls
try:
ret_code = subprocess.call(
[rules['action']], shell=True,
stdout=self.stderr, stderr=self.stderr)
except Exception as e:
self.stderr.write(
'Failed to execute action: ' +
rules['action'] + ' with err ' + str(e) + '\n')
msg = ('Failed to execute action: ' + rules['action'] +
' with err ' + str(e))
self.sandesh_global.logger().logger.log(SandeshLogger.
get_py_logger_level(SandeshLevel.SYS_ERR), msg)
else:
if ret_code:
self.stderr.write(
'Execution of action ' +
rules['action'] + ' returned err ' +
str(ret_code) + '\n')
msg = ('Execution of action ' + rules['action'] +
' returned err ' + str(ret_code))
self.sandesh_global.logger().log(SandeshLogger.
get_py_logger_level(SandeshLevel.SYS_ERR), msg)

def event_process_communication(self, pdata):
flag_and_value = pdata.partition(":")
self.stderr.write("Flag:" + flag_and_value[0] +
" Value:" + flag_and_value[2] + "\n")
msg = ("Flag:" + flag_and_value[0] +
" Value:" + flag_and_value[2])
self.sandesh_global.logger().log(SandeshLogger.get_py_logger_level
(SandeshLevel.SYS_DEBUG), msg)
for rules in self.rules_data['Rules']:
if 'flag_name' in rules:
if ((rules['flag_name'] == flag_and_value[0]) and
(rules['flag_value'].strip() == flag_and_value[2].strip())):
self.stderr.write("got a hit with:" + str(rules) + '\n')
msg = "got a hit with:" + str(rules)
self.sandesh_global.logger().log(SandeshLogger.
get_py_logger_level(SandeshLevel.SYS_DEBUG), msg)
cmd_and_args = ['/usr/bin/bash', '-c', rules['action']]
subprocess.Popen(cmd_and_args)

Expand Down
24 changes: 12 additions & 12 deletions src/nodemgr/config_nodemgr/config_event_manager.py
Expand Up @@ -42,14 +42,23 @@
class ConfigEventManager(EventManager):
def __init__(self, rule_file, discovery_server,
discovery_port, collector_addr):
EventManager.__init__(
self, rule_file, discovery_server,
discovery_port, collector_addr)
self.node_type = "contrail-config"
self.module = Module.CONFIG_NODE_MGR
self.module_id = ModuleNames[self.module]
self.supervisor_serverurl = "unix:///tmp/supervisord_config.sock"
self.add_current_process()
node_type = Module2NodeType[self.module]
node_type_name = NodeTypeNames[node_type]
self.sandesh_global = sandesh_global
EventManager.__init__(
self, rule_file, discovery_server,
discovery_port, collector_addr, sandesh_global)
_disc = self.get_discovery_client()
sandesh_global.init_generator(
self.module_id, socket.gethostname(),
node_type_name, self.instance_id, self.collector_addr,
self.module_id, 8100, ['cfgm_common.uve'], _disc)
sandesh_global.set_logging_params(enable_local_log=True)
# end __init__

def process(self):
Expand All @@ -58,15 +67,6 @@ def process(self):
"supervisord_config_files/contrail-config.rules"
json_file = open(self.rule_file)
self.rules_data = json.load(json_file)
node_type = Module2NodeType[self.module]
node_type_name = NodeTypeNames[node_type]
_disc = self.get_discovery_client()
sandesh_global.init_generator(
self.module_id, socket.gethostname(),
node_type_name, self.instance_id, self.collector_addr,
self.module_id, 8100, ['cfgm_common.uve'], _disc)
# sandesh_global.set_logging_params(enable_local_log=True)
self.sandesh_global = sandesh_global

def send_process_state_db(self, group_names):
self.send_process_state_db_base(
Expand Down
24 changes: 12 additions & 12 deletions src/nodemgr/control_nodemgr/control_event_manager.py
Expand Up @@ -42,14 +42,23 @@
class ControlEventManager(EventManager):
def __init__(self, rule_file, discovery_server,
discovery_port, collector_addr):
EventManager.__init__(
self, rule_file, discovery_server,
discovery_port, collector_addr)
self.node_type = "contrail-control"
self.module = Module.CONTROL_NODE_MGR
self.module_id = ModuleNames[self.module]
self.supervisor_serverurl = "unix:///tmp/supervisord_control.sock"
self.add_current_process()
node_type = Module2NodeType[self.module]
node_type_name = NodeTypeNames[node_type]
self.sandesh_global = sandesh_global;
EventManager.__init__(
self, rule_file, discovery_server,
discovery_port, collector_addr, sandesh_global)
_disc = self.get_discovery_client()
sandesh_global.init_generator(
self.module_id, socket.gethostname(),
node_type_name, self.instance_id, self.collector_addr,
self.module_id, 8101, ['control_node.control_node'], _disc)
sandesh_global.set_logging_params(enable_local_log=True)
# end __init__

def process(self):
Expand All @@ -58,15 +67,6 @@ def process(self):
"supervisord_control_files/contrail-control.rules"
json_file = open(self.rule_file)
self.rules_data = json.load(json_file)
node_type = Module2NodeType[self.module]
node_type_name = NodeTypeNames[node_type]
_disc = self.get_discovery_client()
sandesh_global.init_generator(
self.module_id, socket.gethostname(),
node_type_name, self.instance_id, self.collector_addr,
self.module_id, 8101, ['control_node.control_node'], _disc)
# sandesh_global.set_logging_params(enable_local_log=True)
self.sandesh_global = sandesh_global

def send_process_state_db(self, group_names):
self.send_process_state_db_base(
Expand Down
60 changes: 36 additions & 24 deletions src/nodemgr/database_nodemgr/database_event_manager.py
Expand Up @@ -24,6 +24,8 @@
from supervisor import childutils

from pysandesh.sandesh_base import *
from pysandesh.gen_py.sandesh.ttypes import SandeshLevel
from pysandesh.sandesh_logger import SandeshLogger
from pysandesh.sandesh_session import SandeshWriter
from pysandesh.gen_py.sandesh_trace.ttypes import SandeshTraceRequest
from sandesh_common.vns.ttypes import Module, NodeType
Expand All @@ -46,9 +48,6 @@ class DatabaseEventManager(EventManager):
def __init__(self, rule_file, discovery_server,
discovery_port, collector_addr,
hostip, minimum_diskgb, contrail_databases, cassandra_repair_interval):
EventManager.__init__(
self, rule_file, discovery_server,
discovery_port, collector_addr)
self.node_type = "contrail-database"
self.module = Module.DATABASE_NODE_MGR
self.module_id = ModuleNames[self.module]
Expand All @@ -58,6 +57,24 @@ def __init__(self, rule_file, discovery_server,
self.cassandra_repair_interval = cassandra_repair_interval
self.supervisor_serverurl = "unix:///tmp/supervisord_database.sock"
self.add_current_process()
node_type = Module2NodeType[self.module]
node_type_name = NodeTypeNames[node_type]
self.sandesh_global = sandesh_global
EventManager.__init__(
self, rule_file, discovery_server,
discovery_port, collector_addr, sandesh_global)
self.sandesh_global = sandesh_global
if self.rule_file is '':
self.rule_file = "/etc/contrail/" + \
"supervisord_database_files/contrail-database.rules"
json_file = open(self.rule_file)
self.rules_data = json.load(json_file)
_disc = self.get_discovery_client()
sandesh_global.init_generator(
self.module_id, socket.gethostname(), node_type_name,
self.instance_id, self.collector_addr, self.module_id, 8103,
['database.sandesh'], _disc)
sandesh_global.set_logging_params(enable_local_log=True)
# end __init__

def _get_cassandra_config_option(self, config):
Expand All @@ -71,37 +88,30 @@ def _get_cassandra_config_option(self, config):
yamlstream.close()
return cfg[config][0]

def process(self):
if self.rule_file is '':
self.rule_file = "/etc/contrail/" + \
"supervisord_database_files/contrail-database.rules"
json_file = open(self.rule_file)
self.rules_data = json.load(json_file)
node_type = Module2NodeType[self.module]
node_type_name = NodeTypeNames[node_type]
_disc = self.get_discovery_client()
sandesh_global.init_generator(
self.module_id, socket.gethostname(), node_type_name,
self.instance_id, self.collector_addr, self.module_id, 8103,
['database.sandesh'], _disc)
# sandesh_global.set_logging_params(enable_local_log=True)
self.sandesh_global = sandesh_global
def msg_log(self, msg, level):
self.sandesh_global.logger().log(SandeshLogger.get_py_logger_level(
level), msg)

def process(self):
try:
cassandra_data_dir = self._get_cassandra_config_option("data_file_directories")
analytics_dir = cassandra_data_dir + '/ContrailAnalytics'
if os.path.exists(analytics_dir):
self.stderr.write("analytics_dir is " + analytics_dir + "\n")
msg = "analytics_dir is " + analytics_dir
self.msg_log(msg, level=SandeshLevel.SYS_DEBUG)
popen_cmd = "set `df -Pk " + analytics_dir + " | grep % | awk '{s+=$3}END{print s}'` && echo $1"
self.stderr.write("popen_cmd is " + popen_cmd + "\n")
msg = "popen_cmd is " + popen_cmd
self.msg_log(msg, level=SandeshLevel.SYS_DEBUG)
(disk_space_used, error_value) = \
Popen(popen_cmd, shell=True, stdout=PIPE).communicate()
popen_cmd = "set `df -Pk " + analytics_dir + " | grep % | awk '{s+=$4}END{print s}'` && echo $1"
self.stderr.write("popen_cmd is " + popen_cmd + "\n")
msg = "popen_cmd is " + popen_cmd
self.msg_log(msg, level=SandeshLevel.SYS_DEBUG)
(disk_space_available, error_value) = \
Popen(popen_cmd, shell=True, stdout=PIPE).communicate()
popen_cmd = "set `du -skL " + analytics_dir + " | awk '{s+=$1}END{print s}'` && echo $1"
self.stderr.write("popen_cmd is " + popen_cmd + "\n")
msg = "popen_cmd is " + popen_cmd
self.msg_log(msg, level=SandeshLevel.SYS_DEBUG)
(analytics_db_size, error_value) = \
Popen(popen_cmd, shell=True, stdout=PIPE).communicate()
disk_space_total = int(disk_space_used) + int(disk_space_available)
Expand All @@ -116,7 +126,8 @@ def process(self):
else:
self.fail_status_bits |= self.FAIL_STATUS_DISK_SPACE_NA
except:
sys.stderr.write("Failed to get database usage" + "\n")
msg = "Failed to get database usage"
self.msg_log(msg, level=SandeshLevel.SYS_ERR)
self.fail_status_bits |= self.FAIL_STATUS_DISK_SPACE_NA

def send_process_state_db(self, group_names):
Expand Down Expand Up @@ -177,7 +188,8 @@ def database_periodic(self):
else:
self.fail_status_bits |= self.FAIL_STATUS_DISK_SPACE_NA
except:
sys.stderr.write("Failed to get database usage" + "\n")
msg = "Failed to get database usage"
self.msg_log(msg, level=SandeshLevel.SYS_ERR)
self.fail_status_bits |= self.FAIL_STATUS_DISK_SPACE_NA

cassandra_cli_cmd = "cassandra-cli --host " + self.hostip + \
Expand Down

0 comments on commit 50e7af5

Please sign in to comment.