Skip to content

Commit

Permalink
Stability fixes for vrouter-agent flow handling.
Browse files Browse the repository at this point in the history
Multiple issues seen as concurrency check failure, index management
state machine issues, asserts, lifetime reference dependency issue
are addressed in this.

Committing on behalf of Praveen.

Change-Id: Id77dd8c8e5d4f54508b76229d1bf54e3db0d834a
closes-bug: 1550617
  • Loading branch information
haripk committed Feb 27, 2016
1 parent 5c37ff3 commit 39a5856
Show file tree
Hide file tree
Showing 9 changed files with 139 additions and 37 deletions.
7 changes: 6 additions & 1 deletion src/vnsw/agent/pkt/flow_entry.cc
Expand Up @@ -440,7 +440,12 @@ void intrusive_ptr_release(FlowEntry *fe) {
int prev = fe->refcount_.fetch_and_decrement();
if (prev == 1) {
if (fe->on_tree()) {
flow_table->ConcurrencyCheck();
if (flow_table->ConcurrencyCheck() == false) {
FlowEntryPtr ref(fe);
FlowProto *proto=flow_table->agent()->pkt()->get_flow_proto();
proto->ForceEnqueueFreeFlowReference(ref);
return;
}
FlowTable::FlowEntryMap::iterator it =
flow_table->flow_entry_map_.find(fe->key());
assert(it != flow_table->flow_entry_map_.end());
Expand Down
7 changes: 6 additions & 1 deletion src/vnsw/agent/pkt/flow_mgmt.cc
Expand Up @@ -1536,8 +1536,13 @@ bool VrfFlowMgmtEntry::CanDelete() const {

VrfFlowMgmtEntry::Data::Data(VrfFlowMgmtEntry *vrf_mgmt_entry,
const VrfEntry *vrf, AgentRouteTable *table) :
deleted_(false), table_ref_(this, table->deleter()),
deleted_(false), table_ref_(this, NULL),
vrf_mgmt_entry_(vrf_mgmt_entry), vrf_(vrf) {
if (vrf->IsDeleted() == false) {
table_ref_.Reset(table->deleter());
} else {
deleted_ = true;
}
}

VrfFlowMgmtEntry::Data::~Data() {
Expand Down
9 changes: 8 additions & 1 deletion src/vnsw/agent/pkt/flow_proto.cc
Expand Up @@ -265,7 +265,7 @@ void FlowProto::EnqueueFlowEvent(FlowEvent *event) {
bool FlowProto::FlowEventHandler(FlowEvent *req, FlowTable *table) {
std::auto_ptr<FlowEvent> req_ptr(req);
if (table) {
table->ConcurrencyCheck();
assert(table->ConcurrencyCheck() == true);
}

switch (req->event()) {
Expand Down Expand Up @@ -427,6 +427,13 @@ void FlowProto::EnqueueFreeFlowReference(FlowEntryPtr &flow) {
}
}

void FlowProto::ForceEnqueueFreeFlowReference(FlowEntryPtr &flow) {
FlowEvent *event = new FlowEvent(FlowEvent::FREE_FLOW_REF,
flow.get());
flow.reset();
EnqueueFlowEvent(event);
}

bool FlowProto::EnqueueReentrant(boost::shared_ptr<PktInfo> msg,
uint8_t table_index) {
EnqueueFlowEvent(new FlowEvent(FlowEvent::REENTRANT,
Expand Down
1 change: 1 addition & 0 deletions src/vnsw/agent/pkt/flow_proto.h
Expand Up @@ -64,6 +64,7 @@ class FlowProto : public Proto {

void EnqueueEvent(FlowEvent *event, FlowTable *table);
void EnqueueFlowEvent(FlowEvent *event);
void ForceEnqueueFreeFlowReference(FlowEntryPtr &flow);
void DeleteFlowRequest(const FlowKey &flow_key, bool del_rev_flow,
uint32_t table_index);
void EvictFlowRequest(FlowEntryPtr &flow, uint32_t flow_handle);
Expand Down
19 changes: 11 additions & 8 deletions src/vnsw/agent/pkt/flow_table.cc
Expand Up @@ -90,15 +90,18 @@ void FlowTable::Shutdown() {

// Concurrency check to ensure all flow-table and free-list manipulations
// are done from FlowEvent task context only
void FlowTable::ConcurrencyCheck() {
bool FlowTable::ConcurrencyCheck() {
Task *current = Task::Running();
// test code invokes FlowTable API from main thread. The running task
// will be NULL in such cases
if (current == NULL) {
return;
return true;
}
assert(current->GetTaskId() == flow_task_id_);
assert(current->GetTaskInstance() == table_index_);
if (current->GetTaskId() != flow_task_id_)
return false;
if (current->GetTaskId() != flow_task_id_)
return false;
return true;
}

/////////////////////////////////////////////////////////////////////////////
Expand All @@ -122,7 +125,7 @@ void FlowTable::GetMutexSeq(tbb::mutex &mutex1, tbb::mutex &mutex2,
}

FlowEntry *FlowTable::Find(const FlowKey &key) {
ConcurrencyCheck();
assert(ConcurrencyCheck() == true);
FlowEntryMap::iterator it;

it = flow_entry_map_.find(key);
Expand All @@ -140,7 +143,7 @@ void FlowTable::Copy(FlowEntry *lhs, FlowEntry *rhs, bool update) {
}

FlowEntry *FlowTable::Locate(FlowEntry *flow, uint64_t time) {
ConcurrencyCheck();
assert(ConcurrencyCheck() == true);
std::pair<FlowEntryMap::iterator, bool> ret;
ret = flow_entry_map_.insert(FlowEntryMapPair(flow->key(), flow));
if (ret.second == true) {
Expand Down Expand Up @@ -1082,7 +1085,7 @@ void FlowEntryFreeList::Grow() {
}

FlowEntry *FlowEntryFreeList::Allocate(const FlowKey &key) {
table_->ConcurrencyCheck();
assert(table_->ConcurrencyCheck() == true);
FlowEntry *flow = NULL;
if (free_list_.size() == 0) {
flow = new FlowEntry(table_);
Expand All @@ -1104,7 +1107,7 @@ FlowEntry *FlowEntryFreeList::Allocate(const FlowKey &key) {
}

void FlowEntryFreeList::Free(FlowEntry *flow) {
table_->ConcurrencyCheck();
assert(table_->ConcurrencyCheck() == true);
total_free_++;
flow->Reset();
free_list_.push_back(*flow);
Expand Down
2 changes: 1 addition & 1 deletion src/vnsw/agent/pkt/flow_table.h
Expand Up @@ -236,7 +236,7 @@ class FlowTable {

// Concurrency check to ensure all flow-table and free-list manipulations
// are done from FlowEvent task context only
void ConcurrencyCheck();
bool ConcurrencyCheck();

friend class FlowStatsCollector;
friend class PktSandeshFlow;
Expand Down
7 changes: 5 additions & 2 deletions src/vnsw/agent/pkt/pkt_flow_info.cc
Expand Up @@ -1596,8 +1596,11 @@ void PktFlowInfo::UpdateEvictedFlowStats(const PktInfo *pkt) {
void PktFlowInfo::Add(const PktInfo *pkt, PktControlInfo *in,
PktControlInfo *out) {
bool update = false;
if (pkt->agent_hdr.cmd == AgentHdr::TRAP_FLOW_MISS) {
UpdateEvictedFlowStats(pkt);
if (pkt->type != PktType::MESSAGE &&
pkt->agent_hdr.cmd == AgentHdr::TRAP_FLOW_MISS) {
if (pkt->agent_hdr.cmd_param != FlowEntry::kInvalidFlowHandle) {
UpdateEvictedFlowStats(pkt);
}
}

if ((pkt->type == PktType::MESSAGE &&
Expand Down
113 changes: 90 additions & 23 deletions src/vnsw/agent/vrouter/ksync/ksync_flow_index_manager.cc
Expand Up @@ -13,7 +13,8 @@
KSyncFlowIndexEntry::KSyncFlowIndexEntry() :
state_(INIT), index_(FlowEntry::kInvalidFlowHandle), ksync_entry_(NULL),
index_owner_(NULL), evicted_(false), skip_delete_(false), evict_count_(0),
delete_in_progress_(false), event_log_index_(0), event_logs_(NULL) {
delete_in_progress_(false), locked_(false), event_log_index_(0),
event_logs_(NULL) {
}

KSyncFlowIndexEntry::~KSyncFlowIndexEntry() {
Expand Down Expand Up @@ -380,7 +381,12 @@ void KSyncFlowIndexEntry::IndexUnassignedSm(KSyncFlowIndexManager *manager,

case KSYNC_FREE:
if (flow->flow_handle() == FlowEntry::kInvalidFlowHandle) {
KSyncAddChange(manager, flow);
if (flow->deleted() == false) {
// FIXME : Dont add if deleted_ state
KSyncAddChange(manager, flow);
} else {
state_ = INIT;
}
}
break;

Expand Down Expand Up @@ -503,7 +509,9 @@ void KSyncFlowIndexEntry::IndexChangeSm(KSyncFlowIndexManager *manager,
uint32_t index) {
switch (event) {
case ADD:
assert(0);
// FIXME : We are seeing cases of double eviction, that is two-add from
// vrouter with different indexes. Ignore the event
//assert(0);
break;

case CHANGE:
Expand Down Expand Up @@ -678,18 +686,96 @@ void KSyncFlowIndexManager::InitDone(uint32_t count) {
//////////////////////////////////////////////////////////////////////////////
// KSyncFlowIndexManager State Machine APIs
//////////////////////////////////////////////////////////////////////////////

//////////////////////////////////////////////////////////////////////////////
// When a flow is added, it can potentially evict flow from another flow-table
// This can result in concurrency issues w.r.t index manager states. The
// concurrency is avoided by taking lock on the index used by the flows
//
// Take lock on flow_handle and the index owned by the flow. This handles
// concurrency even if flow being evicted is from other table
//////////////////////////////////////////////////////////////////////////////
void KSyncFlowIndexManager::GetIndexMutexSeq(FlowEntry *flow,
uint32_t index,
tbb::mutex &tmp_mutex1,
tbb::mutex &tmp_mutex2,
tbb::mutex &tmp_mutex3,
tbb::mutex **mutex_ptr_1,
tbb::mutex **mutex_ptr_2,
tbb::mutex **mutex_ptr_3) {
uint32_t index_array[3];
index_array[0] = flow->flow_handle();
index_array[1] = flow->ksync_index_entry()->index();
index_array[2] = index;
std::sort(index_array, index_array+3);
if (index_array[0] == index_array[1])
index_array[0] = FlowEntry::kInvalidFlowHandle;
if (index_array[1] == index_array[2])
index_array[1] = FlowEntry::kInvalidFlowHandle;

if (index_array[0] == FlowEntry::kInvalidFlowHandle)
*mutex_ptr_1 = &tmp_mutex1;
else
*mutex_ptr_1 = &index_list_[index_array[0]].mutex_;

if (index_array[1] == FlowEntry::kInvalidFlowHandle)
*mutex_ptr_2 = &tmp_mutex2;
else
*mutex_ptr_2 = &index_list_[index_array[1]].mutex_;

if (index_array[2] == FlowEntry::kInvalidFlowHandle)
*mutex_ptr_3 = &tmp_mutex3;
else
*mutex_ptr_3 = &index_list_[index_array[2]].mutex_;
}

#define FLOW_INDEX_LOCK(flow, index) \
tbb::mutex tmp1, tmp2, tmp3, *ptr1, *ptr2, *ptr3; \
GetIndexMutexSeq(flow, index, tmp1, tmp2, tmp3, &ptr1, &ptr2, &ptr3);\
tbb::mutex::scoped_lock lock1(*ptr1); \
tbb::mutex::scoped_lock lock2(*ptr2); \
tbb::mutex::scoped_lock lock3(*ptr3);

void KSyncFlowIndexManager::HandleEvent(FlowEntry *flow,
KSyncFlowIndexEntry::Event event,
uint32_t index) {
FLOW_INDEX_LOCK(flow, index);
flow->ksync_index_entry()->set_locked(true);
flow->ksync_index_entry()->HandleEvent(this, flow, event, index);
flow->ksync_index_entry()->set_locked(false);
}

void KSyncFlowIndexManager::HandleEvent(FlowEntry *flow,
KSyncFlowIndexEntry::Event event) {
flow->ksync_index_entry()->HandleEvent(this, flow, event,
HandleEvent(flow, event, FlowEntry::kInvalidFlowHandle);
}

void KSyncFlowIndexManager::ReleaseUnlocked(FlowEntry *flow) {
uint32_t index = flow->ksync_index_entry()->index_;
if (index != FlowEntry::kInvalidFlowHandle) {
if (flow->ksync_index_entry()->evicted_ == false) {
EvictIndexUnlocked(flow, index, true);
}
}
flow->ksync_index_entry()->index_ = FlowEntry::kInvalidFlowHandle;
flow->ksync_index_entry()->ksync_entry_ = NULL;
flow->ksync_index_entry()->delete_in_progress_ = false;
flow->ksync_index_entry()->evicted_ = false;
flow->ksync_index_entry()->skip_delete_ = false;
flow->ksync_index_entry()->HandleEvent(this, flow,
KSyncFlowIndexEntry::KSYNC_FREE,
FlowEntry::kInvalidFlowHandle);
}

void KSyncFlowIndexManager::Release(FlowEntry *flow) {
if (flow->ksync_index_entry()->locked()) {
ReleaseUnlocked(flow);
} else {
FLOW_INDEX_LOCK(flow, FlowEntry::kInvalidFlowHandle);
ReleaseUnlocked(flow);
}
}

void KSyncFlowIndexManager::Add(FlowEntry *flow) {
HandleEvent(flow, KSyncFlowIndexEntry::ADD);
}
Expand Down Expand Up @@ -721,7 +807,6 @@ void KSyncFlowIndexManager::AcquireIndex(FlowEntry *flow, uint32_t index) {
// Sanity check
assert(index != FlowEntry::kInvalidFlowHandle);

tbb::mutex::scoped_lock lock(index_list_[index].mutex_);
FlowEntryPtr owner = index_list_[index].owner_.get();
if (owner.get() != NULL) {
EvictIndexUnlocked(owner.get(), index, true);
Expand Down Expand Up @@ -749,29 +834,11 @@ void KSyncFlowIndexManager::EvictIndexUnlocked(FlowEntry *flow,

void KSyncFlowIndexManager::EvictIndex(FlowEntry *flow, uint32_t index,
bool skip_del) {
tbb::mutex::scoped_lock lock(index_list_[index].mutex_);
EvictIndexUnlocked(flow, index, skip_del);
return;
}

void KSyncFlowIndexManager::Release(FlowEntry *flow) {
uint32_t index = flow->ksync_index_entry()->index_;
if (index != FlowEntry::kInvalidFlowHandle) {
tbb::mutex::scoped_lock lock(index_list_[index].mutex_);
if (flow->ksync_index_entry()->evicted_ == false) {
EvictIndexUnlocked(flow, index, true);
}
}
flow->ksync_index_entry()->index_ = FlowEntry::kInvalidFlowHandle;
flow->ksync_index_entry()->ksync_entry_ = NULL;
flow->ksync_index_entry()->delete_in_progress_ = false;
flow->ksync_index_entry()->evicted_ = false;
flow->ksync_index_entry()->skip_delete_ = false;
HandleEvent(flow, KSyncFlowIndexEntry::KSYNC_FREE);
}

FlowEntryPtr KSyncFlowIndexManager::FindByIndex(uint32_t idx) {
tbb::mutex::scoped_lock lock(index_list_[idx].mutex_);
if (index_list_[idx].owner_.get() != NULL)
return index_list_[idx].owner_;
return FlowEntryPtr(NULL);
Expand Down
11 changes: 11 additions & 0 deletions src/vnsw/agent/vrouter/ksync/ksync_flow_index_manager.h
Expand Up @@ -114,6 +114,8 @@ class KSyncFlowIndexEntry {
bool evicted() const { return evicted_; }
bool skip_delete() const { return skip_delete_; }
uint32_t evict_count() const { return evict_count_; }
bool locked() const { return locked_; }
void set_locked(bool val) { assert(locked_ != val); locked_ = val; }

void HandleEvent(KSyncFlowIndexManager *manager, FlowEntry *flow,
Event event, uint32_t index);
Expand Down Expand Up @@ -160,6 +162,7 @@ class KSyncFlowIndexEntry {
uint32_t evict_count_;
// Delete initiated for the flow
bool delete_in_progress_;
bool locked_;
int event_log_index_;
EventLog *event_logs_;
};
Expand All @@ -183,6 +186,7 @@ class KSyncFlowIndexManager {
virtual ~KSyncFlowIndexManager();
void InitDone(uint32_t count);

void ReleaseUnlocked(FlowEntry *flow);
void Release(FlowEntry *flow);
FlowEntryPtr FindByIndex(uint32_t idx);

Expand All @@ -197,6 +201,13 @@ class KSyncFlowIndexManager {
void EvictIndex(FlowEntry *flow, uint32_t index, bool skip_del);
uint16_t sm_log_count() const { return sm_log_count_; }
private:
void GetIndexMutexSeq(FlowEntry *flow, uint32_t index,
tbb::mutex &tmp_mutex1,
tbb::mutex &tmp_mutex2,
tbb::mutex &tmp_mutex3,
tbb::mutex **mutex_ptr_1,
tbb::mutex **mutex_ptr_2,
tbb::mutex **mutex_ptr_3);
void HandleEvent(FlowEntry *flow, KSyncFlowIndexEntry::Event event);
void HandleEvent(FlowEntry *flow, KSyncFlowIndexEntry::Event event,
uint32_t index);
Expand Down

0 comments on commit 39a5856

Please sign in to comment.