/
vrouter_event_manager.py
141 lines (115 loc) · 5.17 KB
/
vrouter_event_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
#
# Copyright (c) 2015 Juniper Networks, Inc. All rights reserved.
#
from gevent import monkey
monkey.patch_all()
import os
import sys
import socket
import subprocess
import json
import time
import datetime
import platform
import select
import gevent
import ConfigParser
from nodemgr.common.event_manager import EventManager
from nodemgr.vrouter_nodemgr.vrouter_process_stat import VrouterProcessStat
from ConfigParser import NoOptionError
from supervisor import childutils
from pysandesh.sandesh_base import *
from pysandesh.gen_py.sandesh.ttypes import SandeshLevel
from pysandesh.sandesh_session import SandeshWriter
from pysandesh.sandesh_logger import SandeshLogger
from pysandesh.gen_py.sandesh_trace.ttypes import SandeshTraceRequest
from sandesh_common.vns.ttypes import Module, NodeType
from sandesh_common.vns.constants import ModuleNames, NodeTypeNames,\
Module2NodeType, INSTANCE_ID_DEFAULT
from subprocess import Popen, PIPE
from StringIO import StringIO
from vrouter.vrouter.ttypes import \
NodeStatusUVE, NodeStatus
from vrouter.vrouter.process_info.ttypes import \
ProcessStatus, ProcessState, ProcessInfo, DiskPartitionUsageStats
from vrouter.vrouter.process_info.constants import \
ProcessStateNames
from loadbalancer_stats import LoadbalancerStats
class VrouterEventManager(EventManager):
def __init__(self, rule_file, discovery_server,
discovery_port, collector_addr):
self.module = Module.COMPUTE_NODE_MGR
self.module_id = ModuleNames[self.module]
node_type = Module2NodeType[self.module]
node_type_name = NodeTypeNames[node_type]
self.sandesh_global = sandesh_global
EventManager.__init__(self, rule_file, discovery_server,
discovery_port, collector_addr, sandesh_global)
self.node_type = "contrail-vrouter"
_disc = self.get_discovery_client()
sandesh_global.init_generator(
self.module_id, socket.gethostname(),
node_type_name, self.instance_id, self.collector_addr,
self.module_id, 8102, ['vrouter.vrouter'], _disc)
sandesh_global.set_logging_params(enable_local_log=True)
self.supervisor_serverurl = "unix:///tmp/supervisord_vrouter.sock"
self.add_current_process()
self.lb_stats = LoadbalancerStats()
# end __init__
def msg_log(self, msg, level):
self.sandesh_global.logger().log(SandeshLogger.get_py_logger_level(
level), msg)
def process(self):
if self.rule_file is '':
self.rule_file = \
"/etc/contrail/supervisord_vrouter_files/" + \
"contrail-vrouter.rules"
json_file = open(self.rule_file)
self.rules_data = json.load(json_file)
def send_process_state_db(self, group_names):
self.send_process_state_db_base(
group_names, ProcessInfo, NodeStatus, NodeStatusUVE)
def send_nodemgr_process_status(self):
self.send_nodemgr_process_status_base(
ProcessStateNames, ProcessState, ProcessStatus,
NodeStatus, NodeStatusUVE)
def get_process_state(self, fail_status_bits):
return self.get_process_state_base(
fail_status_bits, ProcessStateNames, ProcessState)
def send_disk_usage_info(self):
self.send_disk_usage_info_base(
NodeStatusUVE, NodeStatus, DiskPartitionUsageStats)
def get_process_stat_object(self, pname):
return VrouterProcessStat(pname)
# overridden delete_process_handler -
def delete_process_handler(self, deleted_process):
super(VrouterEventManager,
self).delete_process_handler(deleted_process)
# end delete_process_handler
def runforever(self, test=False):
prev_current_time = int(time.time())
while 1:
# we explicitly use self.stdin, self.stdout, and self.stderr
# instead of sys.* so we can unit test this code
headers, payload = \
self.listener_nodemgr.wait(self.stdin, self.stdout)
# self.stderr.write("headers:\n" + str(headers) + '\n')
# self.stderr.write("payload:\n" + str(payload) + '\n')
pheaders, pdata = childutils.eventdata(payload + '\n')
# self.stderr.write("pheaders:\n" + str(pheaders)+'\n')
# self.stderr.write("pdata:\n" + str(pdata))
# check for process state change events
if headers['eventname'].startswith("PROCESS_STATE"):
self.event_process_state(pheaders, headers)
# check for addition / deletion of processes in the node.
# Tor Agent process can get added / deleted based on need.
self.update_current_process()
# check for flag value change events
if headers['eventname'].startswith("PROCESS_COMMUNICATION"):
self.event_process_communication(pdata)
# do periodic events
if headers['eventname'].startswith("TICK_60"):
prev_current_time = self.event_tick_60(prev_current_time)
# loadbalancer processing
self.lb_stats.send_loadbalancer_stats()
self.listener_nodemgr.ok(self.stdout)