Skip to content

Commit

Permalink
Merge "1. Add contrail-cassandra-status script to check nodetool stat…
Browse files Browse the repository at this point in the history
…us and determine whether the database node is up or down. 2. Add contrail-cassandra-repair script to run nodetool repair on the config keyspaces. 3. Modify contrail-status to handle services only started via init.d Partial-Bug: #1484297" into R2.22-dev
  • Loading branch information
Zuul authored and opencontrail-ci-admin committed Aug 29, 2015
2 parents 345b918 + 53e25a9 commit 508c70d
Show file tree
Hide file tree
Showing 5 changed files with 404 additions and 10 deletions.
2 changes: 2 additions & 0 deletions src/analytics/database/SConscript
Expand Up @@ -39,3 +39,5 @@ DatabaseEnv.Alias("database:node_mgr", sdist_gen)
DatabaseEnv.Alias("src/analytics/database:database", sdist_gen)

DatabaseEnv['DATABASE_PKG'] = sdist_gen

DatabaseEnv.SConscript('utils/SConscript', exports='AnalyticsEnv', duplicate = 0)
16 changes: 16 additions & 0 deletions src/analytics/database/utils/SConscript
@@ -0,0 +1,16 @@
# -*- mode: python; -*-

#
# Copyright (c) 2015 Juniper Networks, Inc. All rights reserved.
#

Import('AnalyticsEnv')
DatabaseEnv = AnalyticsEnv.Clone()

utils_scripts = [
'contrail-cassandra-status.py',
'contrail-cassandra-repair.py',
]

for utils in utils_scripts:
DatabaseEnv.Alias('install', DatabaseEnv.Install(DatabaseEnv['INSTALL_EXAMPLE'], utils))
116 changes: 116 additions & 0 deletions src/analytics/database/utils/contrail-cassandra-repair.py
@@ -0,0 +1,116 @@
#!/usr/bin/env python
#
# Copyright (c) 2015 Juniper Networks, Inc. All rights reserved.
#

import logging
import subprocess
import sys
import platform
import os
import argparse
from sandesh_common.vns.constants import RepairNeededKeyspaces

def repair(options):
returncode = 0
for keyspace in RepairNeededKeyspaces:
keyspace_repair_logfile = "/var/log/cassandra/repair-" + keyspace + ".log"
keyspace_repair_running = "/var/log/cassandra/repair-" + keyspace + "-running"
if os.path.exists(keyspace_repair_running):
logging.debug("REPAIR for {keyspace} is still running".format(keyspace=keyspace))
returncode = 1
continue
# Create repair running to indicate repair is running for keyspace
with open(keyspace_repair_running, "w"):
# Run repair for the keyspace
cmd = [options.nodetool, "-h", options.host, "repair", "-pr", keyspace]
with open(keyspace_repair_logfile, "a") as repair_file:
success = run_command(cmd, repair_file, repair_file)
if not success:
returncode = 2
os.remove(keyspace_repair_running)
return returncode
#end repair

def run_command(command, stdout, stderr):
"""Execute a command and return success or failure
:param command: the command to be run and all of the arguments
:returns: success_boolean
"""
cmd = " ".join(command)
logging.debug("run_command: " + cmd)
try:
subprocess.check_call(command, stdout=stdout, stderr=stderr)
return True
except subprocess.CalledProcessError as cpe:
logging.error("FAILED: {cmd}".format(cmd=cmd))
logging.error(str(cpe))
return False
except OSError as ose:
logging.error("FAILED: {cmd}".format(cmd=cmd))
logging.error(str(ose))
return False
#end run_command

def setup_logging(option_group):
"""Sets up logging in a syslog format by log level
:param option_group: options as returned by the OptionParser
"""
stderr_log_format = "%(levelname) -10s %(asctime)s %(funcName) -20s line:%(lineno) -5d: %(message)s"
file_log_format = "%(asctime)s - %(levelname)s - %(message)s"
logger = logging.getLogger()
if option_group.debug:
logger.setLevel(level=logging.DEBUG)
elif option_group.verbose:
logger.setLevel(level=logging.INFO)
else:
logger.setLevel(level=logging.WARNING)

handlers = []
if option_group.syslog:
handlers.append(logging.SyslogHandler(facility=option_group.syslog))
# Use standard format here because timestamp and level will be added by syslogd.
if option_group.logfile:
handlers.append(logging.FileHandler(option_group.logfile))
handlers[0].setFormatter(logging.Formatter(file_log_format))
if not handlers:
handlers.append(logging.StreamHandler())
handlers[0].setFormatter(logging.Formatter(stderr_log_format))
for handler in handlers:
logger.addHandler(handler)
#end setup_logging

def main():
"""Validate arguments and check status
"""
parser = argparse.ArgumentParser(
# print script description with -h/--help
description=__doc__,
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("-H", "--host", dest="host", default=platform.node(),
metavar="HOST", help="Hostname to check status")

parser.add_argument("-n", "--nodetool", dest="nodetool", default="nodetool",
metavar="NODETOOL", help="Path to nodetool")

parser.add_argument("-v", "--verbose", dest="verbose", action='store_true',
default=False, help="Verbose output")

parser.add_argument("-d", "--debug", dest="debug", action='store_true',
default=False, help="Debugging output")

parser.add_argument("--syslog", dest="syslog", metavar="FACILITY",
help="Send log messages to the syslog")

parser.add_argument("--log-file", dest="logfile", metavar="FILENAME",
help="Send log messages to a file")

options = parser.parse_args()

setup_logging(options)
ret = repair(options)
exit(ret)
#end main

if __name__ == "__main__":
main()
244 changes: 244 additions & 0 deletions src/analytics/database/utils/contrail-cassandra-status.py
@@ -0,0 +1,244 @@
#!/usr/bin/env python
#
# Copyright (c) 2015 Juniper Networks, Inc. All rights reserved.
#

import logging
import subprocess
import sys
import platform
import argparse
import os
import time

# Parses nodetool status output and returns a dict
# containing the nodes and their status
def parse_nodetool_status_output(output):
""" Following is sample nodetool status output:
Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
-- Address Load Tokens Owns Host ID Rack
UN 10.84.27.27 28.06 GB 256 30.3% 2905fcf3-b702-4a62-9eb9-9f2396f17665 rack1
UN 10.84.27.8 28.41 GB 256 32.2% 46999810-e412-41a5-8d50-fe43c5933945 rack1
UN 10.84.27.9 29.61 GB 256 37.5% 205d5521-1ccc-40b1-98fb-fb2256d776de rack1
"""
# Extract the nodes (Find the header and start from there)
olines = output.splitlines()
olcounter = 0
for line in olines:
line_info = line.split()
if (len(line_info) >= 3 and line_info[1] == "Address" and
line_info[2] == "Load"):
olcounter += 1
break
olcounter += 1
if olcounter == 0:
logging.error("FAILED to parse: {output}".format(output=output))
return {}
nodes = olines[olcounter:]
# Create a node status dict indexed by Host ID (column 6 or column 5
# depending on the output)
"""
UN 10.84.27.8 28.41 GB 256 32.2% 46999810-e412-41a5-8d50-fe43c5933945 rack1
DN 10.84.23.59 ? 256 30.3% 315f045a-ea54-42f7-9c05-72c8ac4b34b6 rack1
"""
nodes_status = {}
for node in nodes:
node_info = node.split()
node_info_len = len(node_info)
if node_info_len == 8:
node_id = node_info[6]
elif node_info_len == 7:
node_id = node_info[5]
else:
logging.error("FAILED to parse: {line}".format(line=node))
return {}
# Node status is column 0
nodes_status[node_id] = node_info[0]
return nodes_status
#end parse_nodetool_status_output

# Determine the number of UP nodes and verify that they are
# greater than or equal to RF/2 + 1 for QUORUM reads/writes
# to succeed. If RF is not passed, assumption is that RF is
# equal to number of nodes.
def is_cluster_partitioned(options):
cmd = [options.nodetool, "-h", options.host, "status"]
success, cmd, stdout, stderr = run_command(*cmd)
if not success or not stdout:
logging.error("FAILED: {cmd}".format(cmd=cmd))
logging.error(stderr)
return True
nodes_status = parse_nodetool_status_output(stdout)
if options.replication_factor:
num_nodes = options.replication_factor
else:
num_nodes = len(nodes_status)
nodes_up_status = dict((node_id, node_status) for \
node_id, node_status in nodes_status.items() if 'U' in node_status)
num_up_nodes = len(nodes_up_status)
if num_up_nodes < (num_nodes/2) + 1:
return True
else:
return False
#end is_cluster_partitioned

def get_cassandra_secs_since_up(options):
secs_since_up = 0
if options.status_up_file and os.path.exists(options.status_up_file):
statinfo = os.stat(options.status_up_file)
last_up_secs = int(statinfo.st_atime)
current_time_secs = int(time.time())
secs_since_up = current_time_secs - last_up_secs
return secs_since_up
#end get_cassandra_secs_since_up

def update_status(options):
# Find the node ID from nodetool info
cmd = [options.nodetool, "-h", options.host, "info",
"|", "grep", "ID", "|", "awk \'{print $3}\'"]
success, cmd, stdout, stderr = run_command(*cmd)
if not success or not stdout:
logging.error("FAILED: {cmd}".format(cmd=cmd))
logging.error(stderr)
return 1
node_id = stdout.strip()
# Run nodetool status and check the status of node ID
cmd = [options.nodetool, "-h", options.host, "status",
"|", "grep", node_id, "|", "awk \'{print $1}\'"]
success, cmd, stdout, stderr = run_command(*cmd)
if not success or not stdout:
logging.error("FAILED: {cmd}".format(cmd=cmd))
logging.error(stderr)
return 2
self_status = stdout.strip()
# Update status_up_file if the status is UP and the cluster is not
# partitioned
partitioned = is_cluster_partitioned(options)
if 'U' in self_status and not partitioned and options.status_up_file:
cmd = ["touch", options.status_up_file]
success, cmd, _, stderr = run_command(*cmd)
if not success:
logging.error("FAILED: {cmd}".format(cmd=cmd))
logging.error(stderr)
return 3
if options.debug:
logging.debug("STATUS: {status}, PARTITIONED: {partitioned}".format(
status=self_status, partitioned=partitioned))
return 0
#end update_status

def verify_up_status(options):
# Verify if the status has NOT being UP for max_allowed_down_seconds, then
# stop cassandra
secs_since_up = get_cassandra_secs_since_up(options)
if secs_since_up >= options.max_allowed_down_seconds:
cmd = ["service", "contrail-database", "stop"]
success, cmd, _, stderr = run_command(*cmd)
if not success:
logging.error("FAILED: {cmd}".format(cmd=cmd))
logging.error(stderr)
return 4
if options.debug:
logging.debug("SECS SINCE UP: {secs}".format(secs=secs_since_up))
return 0
#end verify_up_status

def status(options):
update_status(options)
ret = verify_up_status(options)
return ret
#end status

def run_command(*command):
"""Execute a shell command and return the output
:param command: the command to be run and all of the arguments
:returns: success_boolean, command_string, stdout, stderr
"""
cmd = " ".join(command)
logging.debug("run_command: " + cmd)
proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = proc.communicate()
return proc.returncode == 0, cmd, stdout, stderr
#end run_command

def setup_logging(option_group):
"""Sets up logging in a syslog format by log level
:param option_group: options as returned by the OptionParser
"""
stderr_log_format = "%(levelname) -10s %(asctime)s %(funcName) -20s line:%(lineno) -5d: %(message)s"
file_log_format = "%(asctime)s - %(levelname)s - %(message)s"
logger = logging.getLogger()
if option_group.debug:
logger.setLevel(level=logging.DEBUG)
elif option_group.verbose:
logger.setLevel(level=logging.INFO)
else:
logger.setLevel(level=logging.WARNING)

handlers = []
if option_group.syslog:
handlers.append(logging.SyslogHandler(facility=option_group.syslog))
# Use standard format here because timestamp and level will be added by syslogd.
if option_group.logfile:
handlers.append(logging.FileHandler(option_group.logfile))
handlers[0].setFormatter(logging.Formatter(file_log_format))
if not handlers:
handlers.append(logging.StreamHandler())
handlers[0].setFormatter(logging.Formatter(stderr_log_format))
for handler in handlers:
logger.addHandler(handler)
#end setup_logging

def main():
"""Validate arguments and check status
"""
parser = argparse.ArgumentParser(
# print script description with -h/--help
description=__doc__,
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("-H", "--host", dest="host", default=platform.node(),
metavar="HOST",
help="Hostname to check status")

parser.add_argument("-n", "--nodetool", dest="nodetool", default="nodetool",
metavar="NODETOOL",
help="Path to nodetool")

parser.add_argument("--replication-factor", dest="replication_factor",
metavar="NUM", type=int,
help="Maximum replication factor of any keyspace")

parser.add_argument("--max-allowed-down-seconds", dest="max_allowed_down_seconds",
metavar="SECONDS", type=int, default=int(864000*0.9),
help="Maximum seconds allowed for cassandra status to"
" not be UP before stopping cassandra")

parser.add_argument("--status-up-file", dest="status_up_file",
metavar="FILENAME", default="/var/log/cassandra/status-up",
help="Record up status to file")

parser.add_argument("-v", "--verbose", dest="verbose", action='store_true',
default=False, help="Verbose output")

parser.add_argument("-d", "--debug", dest="debug", action='store_true',
default=False, help="Debugging output")

parser.add_argument("--syslog", dest="syslog", metavar="FACILITY",
help="Send log messages to the syslog")

parser.add_argument("--log-file", dest="logfile", metavar="FILENAME",
help="Send log messages to a file")

options = parser.parse_args()

setup_logging(options)
ret = status(options)
exit(ret)
#end main

if __name__ == "__main__":
main()

0 comments on commit 508c70d

Please sign in to comment.