Skip to content

Commit

Permalink
Merge "We need to reflect kafka-related failures in vizd. An element …
Browse files Browse the repository at this point in the history
…is added to ConnectionState for this. We make the Kafka connection down if there are no successful publishes in a 30s period. These kinds of failures will be detected: - All Kafka brokers are down - Kafka topic are misconfigured, and all publish operations are failing" into R3.0
  • Loading branch information
Zuul authored and opencontrail-ci-admin committed Mar 21, 2016
2 parents 4996ad0 + 684401a commit 434500e
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 19 deletions.
66 changes: 50 additions & 16 deletions src/analytics/OpServerProxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class OpServerProxy::OpServerImpl {
RAC_DOWN = 2
};

static const int kActivityCheckPeriod_ = 60;
static const int kActivityCheckPeriod_ms_ = 30000;

const unsigned int partitions_;

Expand All @@ -132,7 +132,7 @@ class OpServerProxy::OpServerImpl {
const string& gen,
const string& value) {
if (k_event_cb.disableKafka) {
LOG(ERROR, "Kafka ignoring KafkaPub");
LOG(INFO, "Kafka ignoring KafkaPub");
return;
}
char* gn = new char[gen.length()+1];
Expand Down Expand Up @@ -460,23 +460,50 @@ class OpServerProxy::OpServerImpl {
return from_ops_conn_;
}
bool KafkaTimer() {
if (kafka_count_++ == kActivityCheckPeriod_) kafka_count_ = 0;
if (kafka_count_ == 0) {

{
uint64_t new_tick_time = UTCTimestampUsec() / 1000;
// We track how long it has been since the timer was last called
// This is because the execution time of this function is highly variable.
// StartKafka can take several seconds
kafka_elapsed_ms_ += (new_tick_time - kafka_tick_ms_);
kafka_tick_ms_ = new_tick_time;
}

// Connection Status is periodically updated
// based on Kafka piblish activity.
// Update Connection Status more often during startup or during failures
if ((((kafka_tick_ms_ - kafka_start_ms_) < kActivityCheckPeriod_ms_) &&
(kafka_elapsed_ms_ >= kActivityCheckPeriod_ms_/3)) ||
(k_event_cb.disableKafka &&
(kafka_elapsed_ms_ >= kActivityCheckPeriod_ms_/3)) ||
(kafka_elapsed_ms_ > kActivityCheckPeriod_ms_)) {

kafka_elapsed_ms_ = 0;

if (k_dr_cb.count==0) {
LOG(ERROR, "No Kafka Callbacks");
ConnectionState::GetInstance()->Update(ConnectionType::KAFKA_PUB,
brokers_, ConnectionStatus::DOWN, process::Endpoint(), std::string());
} else {
ConnectionState::GetInstance()->Update(ConnectionType::KAFKA_PUB,
brokers_, ConnectionStatus::UP, process::Endpoint(), std::string());
LOG(INFO, "Got Kafka Callbacks " << k_dr_cb.count);
}
k_dr_cb.count = 0;
}

if (k_event_cb.disableKafka) {
LOG(ERROR, "Kafka Restart");
StopKafka();
assert(StartKafka());
k_event_cb.disableKafka = false;
if (collector_ && redis_up_)
LOG(ERROR, "Kafka Restarting Redis");
collector_->RedisUpdate(true);
}

if (k_event_cb.disableKafka) {
LOG(ERROR, "Kafka Restart");
StopKafka();
assert(StartKafka());
k_event_cb.disableKafka = false;
if (collector_ && redis_up_) {
LOG(ERROR, "Kafka Restarting Redis");
collector_->RedisUpdate(true);
}
}
}

if (producer_) {
producer_->poll(0);
}
Expand Down Expand Up @@ -507,6 +534,9 @@ class OpServerProxy::OpServerImpl {
brokers_(brokers),
topicpre_(topic),
redis_up_(false),
kafka_elapsed_ms_(0),
kafka_start_ms_(UTCTimestampUsec()/1000),
kafka_tick_ms_(0),
kafka_timer_(TimerManager::CreateTimer(*evm->io_service(),
"Kafka Timer",
TaskScheduler::GetInstance()->GetTaskId(
Expand All @@ -533,6 +563,8 @@ class OpServerProxy::OpServerImpl {
kafka_timer_->Start(1000,
boost::bind(&OpServerImpl::KafkaTimer, this), NULL);
if (brokers.empty()) return;
ConnectionState::GetInstance()->Update(ConnectionType::KAFKA_PUB,
brokers_, ConnectionStatus::INIT, process::Endpoint(), std::string());
assert(StartKafka());
}

Expand Down Expand Up @@ -604,7 +636,9 @@ class OpServerProxy::OpServerImpl {
std::string brokers_;
std::string topicpre_;
bool redis_up_;
uint16_t kafka_count_;
uint64_t kafka_elapsed_ms_;
const uint64_t kafka_start_ms_;
uint64_t kafka_tick_ms_;
Timer *kafka_timer_;
};

Expand Down
5 changes: 3 additions & 2 deletions src/analytics/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -304,12 +304,13 @@ int main(int argc, char *argv[])
// 3. Redis To
// 4. Discovery Collector Publish
// 5. Database global
// 6. Database protobuf if enabled
// 6. Kafka Pub
// 7. Database protobuf if enabled
ConnectionStateManager<NodeStatusUVE, NodeStatus>::
GetInstance()->Init(*a_evm->io_service(),
hostname, module_id, instance_id,
boost::bind(&GetProcessStateCb, _1, _2, _3,
protobuf_server_enabled ? 6 : 5));
protobuf_server_enabled ? 7 : 6));

LOG(INFO, "COLLECTOR analytics_data_ttl: " << options.analytics_data_ttl());
LOG(INFO, "COLLECTOR analytics_flow_ttl: " << options.analytics_flow_ttl());
Expand Down
4 changes: 3 additions & 1 deletion src/base/sandesh/process_info.sandesh
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ enum ConnectionType {
TOR,
REDIS_UVE,
UVEPARTITIONS,
KAFKA_PUB,
}

const map<ConnectionType, string> ConnectionTypeNames = {
Expand All @@ -37,7 +38,8 @@ const map<ConnectionType, string> ConnectionTypeNames = {
ConnectionType.APISERVER : "ApiServer",
ConnectionType.TOR : "ToR",
ConnectionType.REDIS_UVE: "Redis-UVE",
ConnectionType.UVEPARTITIONS : "UvePartitions"
ConnectionType.UVEPARTITIONS : "UvePartitions",
ConnectionType.KAFKA_PUB : "KafkaPub"
}

enum ConnectionStatus {
Expand Down
3 changes: 3 additions & 0 deletions src/opserver/alarmgen.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
import copy
import traceback
import signal
import logging
logging.getLogger('kafka').addHandler(logging.StreamHandler())
logging.getLogger('kafka').setLevel(logging.WARNING)
try:
from collections import OrderedDict
except ImportError:
Expand Down

0 comments on commit 434500e

Please sign in to comment.