Skip to content

Commit

Permalink
Support state-compression for flow delete events
Browse files Browse the repository at this point in the history
Move the PendingAction state-compression logic to FlowEvent handler
module.
Rename REVALUATE_FLOW to RECOPMUTE_FLOW
Change FLOW_MESSAGE event to have FlowEntry as member
Add UT for state-compression of delete events

Conflicts:
	src/vnsw/agent/pkt/flow_proto.h
	src/vnsw/agent/vrouter/flow_stats/flow_stats_collector.cc

Change-Id: I5480c02c27385e1fa8c63820da1a09b24b7898bb
Closes-Bug: #1578660
Partial-Bug: #1568126
Partial-Bug: #1572270
Partial-Bug: #1572471
  • Loading branch information
praveenkv committed May 14, 2016
1 parent 89a2b24 commit 951cc98
Show file tree
Hide file tree
Showing 15 changed files with 472 additions and 219 deletions.
83 changes: 78 additions & 5 deletions src/vnsw/agent/pkt/flow_entry.cc
Expand Up @@ -2033,18 +2033,91 @@ FlowPendingAction::~FlowPendingAction() {
}

void FlowPendingAction::Reset() {
delete_ = false;
recompute_ = false;
recompute_dbentry_ = false;
revaluate_ = false;
}

void FlowPendingAction::SetRevaluate(bool val) {
revaluate_ = val;
bool FlowPendingAction::SetDelete() {
if (delete_)
return false;

delete_ = true;
return true;
}

void FlowPendingAction::ResetDelete() {
delete_ = false;
recompute_ = false;
recompute_dbentry_ = false;
revaluate_ = false;
}

bool FlowPendingAction::CanDelete() {
return delete_;
}

void FlowPendingAction::SetRecompute(bool val) {
recompute_ = val;
revaluate_ = val;
bool FlowPendingAction::SetRecompute() {
if (delete_ || recompute_)
return false;

recompute_ = true;
return true;
}

void FlowPendingAction::ResetRecompute() {
recompute_ = false;
recompute_dbentry_ = false;
revaluate_ = false;
}

bool FlowPendingAction::CanRecompute() {
if (delete_)
return false;

return recompute_;
}

bool FlowPendingAction::SetRecomputeDBEntry() {
if (delete_ || recompute_ || recompute_dbentry_)
return false;

recompute_dbentry_ = true;
return true;
}

void FlowPendingAction::ResetRecomputeDBEntry() {
recompute_dbentry_ = false;
revaluate_ = false;
}

bool FlowPendingAction::CanRecomputeDBEntry() {
if (delete_ || recompute_)
return false;

return recompute_dbentry_;
}

bool FlowPendingAction::SetRevaluate() {
if (delete_ || recompute_ || recompute_dbentry_ || revaluate_)
return false;

revaluate_ = true;
return true;
}

void FlowPendingAction::ResetRevaluate() {
revaluate_ = false;
}

bool FlowPendingAction::CanRevaluate() {
if (delete_ || recompute_ || recompute_dbentry_)
return false;

return revaluate_;
}

/////////////////////////////////////////////////////////////////////////////
// Introspect routines
/////////////////////////////////////////////////////////////////////////////
Expand Down
64 changes: 56 additions & 8 deletions src/vnsw/agent/pkt/flow_entry.h
Expand Up @@ -312,25 +312,73 @@ struct FlowEventLog {
uint8_t vrouter_gen_id_;
};

// Structure to track pending actions on flow. This is used to state-compress
// multiple actions on a flow
// There are 4 actions supported,
// Flow recomputation goes thru 2 stages of processing,
//
// - recompute_dbentry_ : In this stage, flow is enqueued to flow-update-queue
// as a result of db-entry add/delete/change.
// - recompute_ : In this stage, flow is enqueued to flow-event-queue
// for recomputation of flow
// - delete_ : Specifies that delete action is pending on flow.
// - recompute_ : Specifies that flow is enqueued into flow-event-queue
// for recomputation.
//
// The actions have a priority, the higher priorty action overrides lower
// priority actions. The priority in decreasing order is,
// - delete_
// - recompute_
// - recompute_dbentry_
// - revaluate_
//
// The flags are also used for state-compression of objects. The state
// compression is acheived with,
//
// - Before Event Enqueue :
// Before enqueuing an event, the FlowEvent module checks if the
// corresponding action or higher priority action is pending. If so, the
// event is ignored.
// Note, if the lower priority event is pending, the higher priority event
// is still enqueued. The lower priority event is ignored later as given below
//
// - On Event dequeue :
// After dequeuing an event, FlowEvent module checks if a higher priority
// event is pending. If so, the current event is ignored.
//
// - Post Event processing:
// Once the event is processed, the corresponding action is cleared for both
// forward and reverse flows. Clearing an action also clears lower priority
// actions
class FlowPendingAction {
public:
FlowPendingAction();
~FlowPendingAction();

void Reset();

void SetRevaluate(bool val);
bool revaluate() const { return revaluate_; }
bool CanDelete();
bool SetDelete();
void ResetDelete();

void SetRecompute(bool val);
bool recompute() const { return recompute_; }
bool CanRecompute();
bool SetRecompute();
void ResetRecompute();

bool CanRecomputeDBEntry();
bool SetRecomputeDBEntry();
void ResetRecomputeDBEntry();

bool CanRevaluate();
bool SetRevaluate();
void ResetRevaluate();
private:
// Flow pending revaluation due to change in interface, vn, acl and nh
bool revaluate_;
// delete pending
bool delete_;
// Flow pending complete recompute
bool recompute_;
// Flow pending recompute-dbentry
bool recompute_dbentry_;
// Flow pending revaluation due to change in interface, vn, acl and nh
bool revaluate_;
};

class FlowEntry {
Expand Down
149 changes: 143 additions & 6 deletions src/vnsw/agent/pkt/flow_event.cc
Expand Up @@ -24,7 +24,7 @@ FlowEventQueueBase::FlowEventQueueBase(FlowProto *proto,
FlowTokenPool *pool,
uint16_t latency_limit) :
flow_proto_(proto), token_pool_(pool), task_start_(0), count_(0),
latency_limit_(latency_limit) {
events_processed_(0), latency_limit_(latency_limit) {
queue_ = new Queue(task_id, task_instance,
boost::bind(&FlowEventQueueBase::Handler, this, _1));
char buff[100];
Expand All @@ -49,6 +49,10 @@ void FlowEventQueueBase::Shutdown() {
}

void FlowEventQueueBase::Enqueue(FlowEvent *event) {
if (CanEnqueue(event) == false) {
delete event;
return;
}
queue_->Enqueue(event);
}

Expand All @@ -72,11 +76,11 @@ void FlowEventQueueBase::TaskExit(bool done) {
struct rusage r;
getrusage(RUSAGE_THREAD, &r);

uint32_t user = (r.ru_utime.tv_sec - rusage_.ru_utime.tv_sec) * 100;
user += (r.ru_utime.tv_usec - rusage_.ru_utime.tv_usec);
uint32_t user = (r.ru_utime.tv_sec - rusage_.ru_utime.tv_sec) * 1000;
user += ((r.ru_utime.tv_usec - rusage_.ru_utime.tv_usec) / 1000);

uint32_t sys = (r.ru_stime.tv_sec - rusage_.ru_stime.tv_sec) * 100;
sys += (r.ru_stime.tv_usec - rusage_.ru_stime.tv_usec);
uint32_t sys = (r.ru_stime.tv_sec - rusage_.ru_stime.tv_sec) * 1000;
sys += ((r.ru_stime.tv_usec - rusage_.ru_stime.tv_usec) / 1000);

LOG(ERROR, queue_->Description()
<< " Time exceeded " << ((t - task_start_) / 1000)
Expand All @@ -87,8 +91,141 @@ void FlowEventQueueBase::TaskExit(bool done) {
}

bool FlowEventQueueBase::Handler(FlowEvent *event) {
std::auto_ptr<FlowEvent> event_ptr(event);
count_++;
return HandleEvent(event);
if (CanProcess(event) == false) {
ProcessDone(event, false);
return true;
}

HandleEvent(event);

ProcessDone(event, true);
return true;
}

bool FlowEventQueueBase::CanEnqueue(FlowEvent *event) {
FlowEntry *flow = event->flow();
bool ret = true;
switch (event->event()) {

case FlowEvent::DELETE_DBENTRY:
case FlowEvent::DELETE_FLOW: {
tbb::mutex::scoped_lock mutext(flow->mutex());
ret = flow->GetPendingAction()->SetDelete();
break;
}

// lock already token for the flow
case FlowEvent::FLOW_MESSAGE: {
ret = flow->GetPendingAction()->SetRecompute();
break;
}

case FlowEvent::RECOMPUTE_FLOW: {
tbb::mutex::scoped_lock mutext(flow->mutex());
ret = flow->GetPendingAction()->SetRecomputeDBEntry();
break;
}

case FlowEvent::REVALUATE_DBENTRY: {
tbb::mutex::scoped_lock mutext(flow->mutex());
ret = flow->GetPendingAction()->SetRevaluate();
break;
}

default:
break;
}

return ret;
}

bool FlowEventQueueBase::CanProcess(FlowEvent *event) {
FlowEntry *flow = event->flow();
bool ret = true;
switch (event->event()) {

case FlowEvent::DELETE_DBENTRY:
case FlowEvent::DELETE_FLOW: {
tbb::mutex::scoped_lock mutext(flow->mutex());
events_processed_++;
ret = flow->GetPendingAction()->CanDelete();
break;
}

case FlowEvent::FLOW_MESSAGE: {
tbb::mutex::scoped_lock mutext(flow->mutex());
events_processed_++;
ret = flow->GetPendingAction()->CanRecompute();
break;
}

case FlowEvent::RECOMPUTE_FLOW: {
tbb::mutex::scoped_lock mutext(flow->mutex());
events_processed_++;
ret = flow->GetPendingAction()->CanRecomputeDBEntry();
break;
}

case FlowEvent::REVALUATE_DBENTRY: {
events_processed_++;
tbb::mutex::scoped_lock mutext(flow->mutex());
ret = flow->GetPendingAction()->CanRevaluate();
break;
}

default:
break;
}

return ret;
}

void FlowEventQueueBase::ProcessDone(FlowEvent *event, bool update_rev_flow) {
FlowEntry *flow = event->flow();
FlowEntry *rflow = NULL;
if (flow && update_rev_flow)
rflow = flow->reverse_flow_entry();

switch (event->event()) {

case FlowEvent::DELETE_DBENTRY:
case FlowEvent::DELETE_FLOW: {
FLOW_LOCK(flow, rflow);
flow->GetPendingAction()->ResetDelete();
if (rflow)
rflow->GetPendingAction()->ResetDelete();
break;
}

case FlowEvent::FLOW_MESSAGE: {
FLOW_LOCK(flow, rflow);
flow->GetPendingAction()->ResetRecompute();
if (rflow)
rflow->GetPendingAction()->ResetRecompute();
break;
}

case FlowEvent::RECOMPUTE_FLOW: {
tbb::mutex::scoped_lock mutext(flow->mutex());
flow->GetPendingAction()->ResetRecomputeDBEntry();
break;
}

case FlowEvent::REVALUATE_DBENTRY: {
FLOW_LOCK(flow, rflow);
flow->GetPendingAction()->ResetRevaluate();
if (rflow)
rflow->GetPendingAction()->ResetRevaluate();
break;
}

default:
break;
}

return;
}

FlowEventQueue::FlowEventQueue(Agent *agent, FlowProto *proto,
Expand Down

0 comments on commit 951cc98

Please sign in to comment.