Skip to content

Commit

Permalink
Merge "Run flow workqueues independnet of each-other"
Browse files Browse the repository at this point in the history
  • Loading branch information
Zuul authored and opencontrail-ci-admin committed Jun 25, 2016
2 parents 83cd836 + b948ba3 commit 5930dda
Show file tree
Hide file tree
Showing 10 changed files with 50 additions and 37 deletions.
25 changes: 12 additions & 13 deletions src/vnsw/agent/cmn/agent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ void Agent::SetAgentTaskPolicy() {
kTaskFlowEvent,
kTaskFlowKSync,
kTaskFlowUpdate,
kTaskFlowDelete,
kTaskFlowAudit,
"Agent::Services",
"Agent::StatsCollector",
Expand Down Expand Up @@ -160,22 +161,20 @@ void Agent::SetAgentTaskPolicy() {

const char *flow_table_exclude_list[] = {
"Agent::PktFlowResponder",
kTaskFlowKSync,
kTaskFlowUpdate,
AGENT_SHUTDOWN_TASKNAME,
AGENT_INIT_TASKNAME
};
SetTaskPolicyOne(kTaskFlowEvent, flow_table_exclude_list,
sizeof(flow_table_exclude_list) / sizeof(char *));

const char *flow_exclude_list[] = {
"Agent::PktFlowResponder",
kTaskFlowKSync,
AGENT_SHUTDOWN_TASKNAME,
AGENT_INIT_TASKNAME
};
SetTaskPolicyOne(kTaskFlowUpdate, flow_exclude_list,
sizeof(flow_exclude_list) / sizeof(char *));
SetTaskPolicyOne(kTaskFlowKSync, flow_table_exclude_list,
sizeof(flow_table_exclude_list) / sizeof(char *));

SetTaskPolicyOne(kTaskFlowUpdate, flow_table_exclude_list,
sizeof(flow_table_exclude_list) / sizeof(char *));

SetTaskPolicyOne(kTaskFlowDelete, flow_table_exclude_list,
sizeof(flow_table_exclude_list) / sizeof(char *));

const char *sandesh_exclude_list[] = {
"db::DBTable",
Expand Down Expand Up @@ -282,7 +281,7 @@ void Agent::SetAgentTaskPolicy() {
const char *flow_stats_manager_exclude_list[] = {
"Agent::StatsCollector",
kTaskFlowStatsCollector,
"Flow::Management",
kTaskFlowMgmt,
AGENT_SHUTDOWN_TASKNAME,
AGENT_INIT_TASKNAME
};
Expand Down Expand Up @@ -779,8 +778,8 @@ bool Agent::isVmwareVcenterMode() const {
void Agent::ConcurrencyCheck() {
if (test_mode_) {
CHECK_CONCURRENCY("db::DBTable", "Agent::KSync", AGENT_INIT_TASKNAME,
"Flow::Management", kTaskFlowUpdate,
kTaskFlowEvent, kTaskFlowKSync);
kTaskFlowMgmt, kTaskFlowUpdate,
kTaskFlowEvent, kTaskFlowDelete, kTaskFlowKSync);
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/vnsw/agent/cmn/agent.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,11 @@ extern void RouterIdDepInit(Agent *agent);

#define VROUTER_SERVER_PORT 20914

#define kTaskFlowUpdate "Agent::FlowUpdate"
#define kTaskFlowEvent "Agent::FlowEvent"
#define kTaskFlowKSync "Agent::FlowKSync"
#define kTaskFlowUpdate "Agent::FlowUpdate"
#define kTaskFlowDelete "Agent::FlowDelete"
#define kTaskFlowMgmt "Agent::FlowMgmt"
#define kTaskFlowAudit "KSync::FlowAudit"
#define kTaskFlowStatsCollector "Flow::StatsCollector"

Expand Down
3 changes: 2 additions & 1 deletion src/vnsw/agent/pkt/flow_entry.cc
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,8 @@ void intrusive_ptr_release(FlowEntry *fe) {
int prev = fe->refcount_.fetch_and_decrement();
if (prev == 1) {
if (fe->on_tree()) {
if (flow_table->ConcurrencyCheck() == false) {
if (flow_table->ConcurrencyCheck(flow_table->flow_task_id())
== false) {
FlowEntryPtr ref(fe);
FlowProto *proto=flow_table->agent()->pkt()->get_flow_proto();
proto->ForceEnqueueFreeFlowReference(ref);
Expand Down
4 changes: 2 additions & 2 deletions src/vnsw/agent/pkt/flow_event.cc
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ DeleteFlowEventQueue::DeleteFlowEventQueue(Agent *agent, FlowProto *proto,
uint16_t latency_limit,
uint32_t max_iterations) :
FlowEventQueueBase(proto, "Flow Delete Queue",
agent->task_scheduler()->GetTaskId(kTaskFlowEvent),
agent->task_scheduler()->GetTaskId(kTaskFlowDelete),
table->table_index(), pool, latency_limit,
max_iterations),
flow_table_(table) {
Expand All @@ -275,7 +275,7 @@ KSyncFlowEventQueue::KSyncFlowEventQueue(Agent *agent, FlowProto *proto,
uint16_t latency_limit,
uint32_t max_iterations) :
FlowEventQueueBase(proto, "Flow KSync Queue",
agent->task_scheduler()->GetTaskId(kTaskFlowEvent),
agent->task_scheduler()->GetTaskId(kTaskFlowKSync),
table->table_index(), pool, latency_limit,
max_iterations),
flow_table_(table) {
Expand Down
7 changes: 3 additions & 4 deletions src/vnsw/agent/pkt/flow_mgmt.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
#include "uve/flow_ace_stats_request.h"
#include "uve/agent_uve_stats.h"
#include "vrouter/flow_stats/flow_stats_collector.h"
const string FlowMgmtManager::kFlowMgmtTask = "Flow::Management";

/////////////////////////////////////////////////////////////////////////////
// FlowMgmtManager methods
Expand All @@ -27,12 +26,12 @@ FlowMgmtManager::FlowMgmtManager(Agent *agent, uint16_t table_index) :
vrf_flow_mgmt_tree_(this),
nh_flow_mgmt_tree_(this),
flow_mgmt_dbclient_(new FlowMgmtDbClient(agent, this)),
request_queue_(agent_->task_scheduler()->GetTaskId(kFlowMgmtTask), 0,
request_queue_(agent_->task_scheduler()->GetTaskId(kTaskFlowMgmt), 0,
boost::bind(&FlowMgmtManager::RequestHandler, this, _1)),
db_event_queue_(agent_->task_scheduler()->GetTaskId(kFlowMgmtTask), 0,
db_event_queue_(agent_->task_scheduler()->GetTaskId(kTaskFlowMgmt), 0,
boost::bind(&FlowMgmtManager::DBRequestHandler, this, _1),
db_event_queue_.kMaxSize, 1),
log_queue_(agent_->task_scheduler()->GetTaskId(kFlowMgmtTask), 1,
log_queue_(agent_->task_scheduler()->GetTaskId(kTaskFlowMgmt), 1,
boost::bind(&FlowMgmtManager::LogHandler, this, _1)) {
request_queue_.set_name("Flow management");
request_queue_.set_measure_busy_time(agent->MeasureQueueDelay());
Expand Down
1 change: 0 additions & 1 deletion src/vnsw/agent/pkt/flow_mgmt.h
Original file line number Diff line number Diff line change
Expand Up @@ -1027,7 +1027,6 @@ class FlowEntryInfo {

class FlowMgmtManager {
public:
static const std::string kFlowMgmtTask;
typedef boost::shared_ptr<FlowMgmtRequest> FlowMgmtRequestPtr;
typedef WorkQueue<FlowMgmtRequestPtr> FlowMgmtQueue;

Expand Down
7 changes: 4 additions & 3 deletions src/vnsw/agent/pkt/flow_proto.cc
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ void FlowProto::EnqueueFlowEvent(FlowEvent *event) {

bool FlowProto::FlowEventHandler(FlowEvent *req, FlowTable *table) {
// concurrency check to ensure all request are in right partitions
assert(table->ConcurrencyCheck() == true);
assert(table->ConcurrencyCheck(table->flow_task_id()) == true);

switch (req->event()) {
case FlowEvent::VROUTER_FLOW_MSG: {
Expand Down Expand Up @@ -476,7 +476,8 @@ bool FlowProto::FlowKSyncMsgHandler(FlowEvent *req, FlowTable *table) {
FlowEventKSync *ksync_event = static_cast<FlowEventKSync *>(req);

// concurrency check to ensure all request are in right partitions
assert(table->ConcurrencyCheck() == true);
assert((table->ConcurrencyCheck(table->flow_ksync_task_id()) == true) ||
(table->ConcurrencyCheck(table->flow_task_id()) == true));

switch (req->event()) {
// Flow was waiting for an index. Index is available now. Retry acquiring
Expand Down Expand Up @@ -530,7 +531,7 @@ bool FlowProto::FlowDeleteHandler(FlowEvent *req, FlowTable *table) {
// flow-update-queue doenst happen table pointer. Skip concurrency check
// for flow-update-queue
if (table) {
assert(table->ConcurrencyCheck() == true);
assert(table->ConcurrencyCheck(table->flow_delete_task_id()) == true);
}

switch (req->event()) {
Expand Down
24 changes: 16 additions & 8 deletions src/vnsw/agent/pkt/flow_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,10 @@ FlowTable::FlowTable(Agent *agent, uint16_t table_index) :
ksync_object_(NULL),
flow_entry_map_(),
free_list_(this),
flow_task_id_(0) {
flow_task_id_(0),
flow_update_task_id_(0),
flow_delete_task_id_(0),
flow_ksync_task_id_(0) {
}

FlowTable::~FlowTable() {
Expand All @@ -71,6 +74,9 @@ FlowTable::~FlowTable() {

void FlowTable::Init() {
flow_task_id_ = agent_->task_scheduler()->GetTaskId(kTaskFlowEvent);
flow_update_task_id_ = agent_->task_scheduler()->GetTaskId(kTaskFlowUpdate);
flow_delete_task_id_ = agent_->task_scheduler()->GetTaskId(kTaskFlowDelete);
flow_ksync_task_id_ = agent_->task_scheduler()->GetTaskId(kTaskFlowKSync);
FlowEntry::Init();
return;
}
Expand All @@ -83,15 +89,17 @@ void FlowTable::Shutdown() {

// Concurrency check to ensure all flow-table and free-list manipulations
// are done from FlowEvent task context only
bool FlowTable::ConcurrencyCheck() {
bool FlowTable::ConcurrencyCheck(int task_id) {
Task *current = Task::Running();
// test code invokes FlowTable API from main thread. The running task
// will be NULL in such cases
if (current == NULL) {
return true;
}
if (current->GetTaskId() != flow_task_id_)

if (current->GetTaskId() != task_id)
return false;

if (current->GetTaskInstance() != table_index_)
return false;
return true;
Expand All @@ -118,7 +126,7 @@ void FlowTable::GetMutexSeq(tbb::mutex &mutex1, tbb::mutex &mutex2,
}

FlowEntry *FlowTable::Find(const FlowKey &key) {
assert(ConcurrencyCheck() == true);
assert(ConcurrencyCheck(flow_task_id_) == true);
FlowEntryMap::iterator it;

it = flow_entry_map_.find(key);
Expand All @@ -138,7 +146,7 @@ void FlowTable::Copy(FlowEntry *lhs, FlowEntry *rhs, bool update) {
}

FlowEntry *FlowTable::Locate(FlowEntry *flow, uint64_t time) {
assert(ConcurrencyCheck() == true);
assert(ConcurrencyCheck(flow_task_id_) == true);
std::pair<FlowEntryMap::iterator, bool> ret;
ret = flow_entry_map_.insert(FlowEntryMapPair(flow->key(), flow));
if (ret.second == true) {
Expand Down Expand Up @@ -875,7 +883,7 @@ FlowEntryFreeList::~FlowEntryFreeList() {

// Allocate a chunk of FlowEntries
void FlowEntryFreeList::Grow() {
assert(table_->ConcurrencyCheck() == true);
assert(table_->ConcurrencyCheck(table_->flow_task_id()) == true);
grow_pending_ = false;
if (free_list_.size() >= kMinThreshold)
return;
Expand All @@ -888,7 +896,7 @@ void FlowEntryFreeList::Grow() {
}

FlowEntry *FlowEntryFreeList::Allocate(const FlowKey &key) {
assert(table_->ConcurrencyCheck() == true);
assert(table_->ConcurrencyCheck(table_->flow_task_id()) == true);
FlowEntry *flow = NULL;
if (free_list_.size() == 0) {
flow = new FlowEntry(table_);
Expand All @@ -910,7 +918,7 @@ FlowEntry *FlowEntryFreeList::Allocate(const FlowKey &key) {
}

void FlowEntryFreeList::Free(FlowEntry *flow) {
assert(table_->ConcurrencyCheck() == true);
assert(table_->ConcurrencyCheck(table_->flow_task_id()) == true);
total_free_++;
flow->Reset();
free_list_.push_back(*flow);
Expand Down
9 changes: 8 additions & 1 deletion src/vnsw/agent/pkt/flow_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,11 @@ class FlowTable {

// Concurrency check to ensure all flow-table and free-list manipulations
// are done from FlowEvent task context only
bool ConcurrencyCheck();
bool ConcurrencyCheck(int task_id);
int flow_task_id() const { return flow_task_id_; }
int flow_update_task_id() const { return flow_update_task_id_; }
int flow_delete_task_id() const { return flow_delete_task_id_; }
int flow_ksync_task_id() const { return flow_ksync_task_id_; }
static void GetMutexSeq(tbb::mutex &mutex1, tbb::mutex &mutex2,
tbb::mutex **mutex_ptr_1, tbb::mutex **mutex_ptr_2);

Expand Down Expand Up @@ -290,6 +294,9 @@ class FlowTable {
FlowEntryFreeList free_list_;
tbb::mutex mutex_;
int flow_task_id_;
int flow_update_task_id_;
int flow_delete_task_id_;
int flow_ksync_task_id_;
DISALLOW_COPY_AND_ASSIGN(FlowTable);
};

Expand Down
3 changes: 0 additions & 3 deletions src/vnsw/agent/vrouter/ksync/flowtable_ksync.cc
Original file line number Diff line number Diff line change
Expand Up @@ -679,8 +679,6 @@ void KSyncFlowEntryFreeList::Grow() {
}

FlowTableKSyncEntry *KSyncFlowEntryFreeList::Allocate(const KSyncEntry *key) {
object_->flow_table()->ConcurrencyCheck();

const FlowTableKSyncEntry *flow_key =
static_cast<const FlowTableKSyncEntry *>(key);
FlowTableKSyncEntry *flow = NULL;
Expand All @@ -707,7 +705,6 @@ FlowTableKSyncEntry *KSyncFlowEntryFreeList::Allocate(const KSyncEntry *key) {
}

void KSyncFlowEntryFreeList::Free(FlowTableKSyncEntry *flow) {
object_->flow_table()->ConcurrencyCheck();
total_free_++;
flow->Reset();
free_list_.push_back(*flow);
Expand Down

0 comments on commit 5930dda

Please sign in to comment.