From 507fda3d5deb22c6549d4fd253624bea44534b73 Mon Sep 17 00:00:00 2001 From: "Anand H. Krishnan" Date: Thu, 20 Aug 2015 14:40:13 +0530 Subject: [PATCH] Flow eviction by datapath based on TCP states Inactive TCP flows (flows that have already seen the closure cycle - FIN/ACK or the RESET flags) should additionally be considered as a free flow entry so that vRouter does not have to wait for agent to accommodate new flows. This logic will provide better service under severe occupancy. This modification also removes the previous logic of trapping packets to agent when datapath detects closure of a TCP stream. Change-Id: I1009b10f990ea2bf904ac0daec59378d1da07acd Partial-BUG: #1362701 --- dp-core/vr_flow.c | 542 ++++++++++++++++++++++++++++++++--------- dp-core/vr_interface.c | 7 +- dpdk/vr_dpdk_host.c | 32 +-- include/vr_defs.h | 5 +- include/vr_flow.h | 69 +++++- include/vrouter.h | 1 + utils/dropstats.c | 1 + utils/flow.c | 16 +- 8 files changed, 529 insertions(+), 144 deletions(-) diff --git a/dp-core/vr_flow.c b/dp-core/vr_flow.c index 1fcc1e2a3..4ffdd4e59 100644 --- a/dp-core/vr_flow.c +++ b/dp-core/vr_flow.c @@ -53,15 +53,18 @@ int hashrnd_inited = 0; static void vr_flush_entry(struct vrouter *, struct vr_flow_entry *, struct vr_flow_md *, struct vr_forwarding_md *); -static void vr_flush_flow_queue(struct vrouter *, struct vr_flow_entry *, +static void __vr_flow_flush_hold_queue(struct vrouter *, struct vr_flow_entry *, struct vr_forwarding_md *, struct vr_flow_queue *); static void vr_flow_set_forwarding_md(struct vrouter *, struct vr_flow_entry *, unsigned int, struct vr_forwarding_md *); +static int +__vr_flow_schedule_transition(struct vrouter *, struct vr_flow_entry *, + unsigned int, unsigned short); struct vr_flow_entry *vr_find_flow(struct vrouter *, struct vr_flow *, uint8_t, unsigned int *); unsigned int vr_trap_flow(struct vrouter *, struct vr_flow_entry *, - struct vr_packet *, unsigned int); + struct vr_packet *, unsigned int, struct vr_flow_stats *); void get_random_bytes(void *buf, int nbytes); @@ -199,10 +202,9 @@ vr_init_flow_entry(struct vr_flow_entry *fe) static void -vr_reset_flow_entry(struct vrouter *router, struct vr_flow_entry *fe, +vr_reset_active_flow_entry(struct vrouter *router, struct vr_flow_entry *fe, unsigned int index) { - memset(&fe->fe_stats, 0, sizeof(fe->fe_stats)); if (fe->fe_hold_list) { vr_printf("vrouter: Potential memory leak @ %s:%d\n", __FILE__, __LINE__); @@ -217,9 +219,22 @@ vr_reset_flow_entry(struct vrouter *router, struct vr_flow_entry *fe, fe->fe_src_nh_index = NH_DISCARD_ID; fe->fe_rflow = -1; fe->fe_action = VR_FLOW_ACTION_DROP; - fe->fe_flags = 0; fe->fe_udp_src_port = 0; fe->fe_tcp_flags = 0; + fe->fe_flags &= + (VR_FLOW_FLAG_ACTIVE | VR_FLOW_FLAG_EVICT_CANDIDATE | + VR_FLOW_FLAG_NEW_FLOW); + + return; +} + +static void +vr_reset_flow_entry(struct vrouter *router, struct vr_flow_entry *fe, + unsigned int index) +{ + vr_reset_active_flow_entry(router, fe, index); + memset(&fe->fe_stats, 0, sizeof(fe->fe_stats)); + fe->fe_flags = 0; return; } @@ -229,7 +244,8 @@ static inline bool vr_set_flow_active(struct vr_flow_entry *fe) { return __sync_bool_compare_and_swap(&fe->fe_flags, - fe->fe_flags & ~VR_FLOW_FLAG_ACTIVE, VR_FLOW_FLAG_ACTIVE); + fe->fe_flags & ~VR_FLOW_FLAG_ACTIVE, + VR_FLOW_FLAG_ACTIVE | VR_FLOW_FLAG_NEW_FLOW); } static inline struct vr_flow_entry * @@ -296,92 +312,222 @@ vr_get_flow_entry(struct vrouter *router, int index) return (struct vr_flow_entry *)vr_btable_get(table, index); } +static inline void +vr_flow_stop_modify(struct vrouter *router, struct vr_flow_entry *fe) +{ + if (!fe) + return; + + (void)__sync_and_and_fetch(&fe->fe_flags, ~VR_FLOW_FLAG_MODIFIED); + return; +} + +static inline bool +vr_flow_start_modify(struct vrouter *router, struct vr_flow_entry *fe) +{ + unsigned short flags; + + flags = fe->fe_flags; + if (!(flags & (VR_FLOW_FLAG_MODIFIED | VR_FLOW_FLAG_EVICTED))) { + if (__sync_bool_compare_and_swap(&fe->fe_flags, flags, + flags | VR_FLOW_FLAG_MODIFIED)) { + return true; + } + } + + return false; +} + + /* Non-static due to RCU callback pointer comparison in vRouter/DPDK */ void -vr_flow_queue_free(struct vrouter *router, void *arg) +vr_flow_flush_hold_queue(struct vrouter *router, struct vr_flow_entry *fe, + struct vr_flow_queue *vfq) { struct vr_forwarding_md fmd; + + if (vfq) { + vr_init_forwarding_md(&fmd); + vr_flow_set_forwarding_md(router, fe, vfq->vfq_index, &fmd); + __vr_flow_flush_hold_queue(router, fe, &fmd, vfq); + } + + return; +} + +static void +vr_flow_evict_flow(struct vrouter *router, struct vr_flow_entry *fe, + unsigned int index) +{ + unsigned short flags; + + if (!fe) + return; + + if ((fe->fe_flags & VR_FLOW_FLAG_ACTIVE) && + (fe->fe_flags & VR_FLOW_FLAG_EVICT_CANDIDATE)) { + flags = fe->fe_flags | VR_FLOW_FLAG_ACTIVE | + VR_FLOW_FLAG_EVICT_CANDIDATE; + (void)__sync_bool_compare_and_swap(&fe->fe_flags, flags, + (flags ^ VR_FLOW_FLAG_EVICT_CANDIDATE) | + VR_FLOW_FLAG_EVICTED); + vr_flow_stop_modify(router, fe); + } + + return; +} + +void +vr_flow_defer_cb(struct vrouter *router, void *arg) +{ struct vr_defer_data *defer; - struct vr_flow_entry *fe; + struct vr_flow_entry *fe, *rfe; struct vr_flow_queue *vfq; + struct vr_flow_defer_data *vfdd; defer = (struct vr_defer_data *)arg; if (!defer) return; - vr_init_forwarding_md(&fmd); + vfdd = (struct vr_flow_defer_data *)defer->vdd_data; + if (!vfdd) + return; + fe = vfdd->vfdd_fe; - vfq = (struct vr_flow_queue *)defer->vdd_data; - fe = vr_get_flow_entry(router, vfq->vfq_index); - if (fe) { - vr_flow_set_forwarding_md(router, fe, vfq->vfq_index, &fmd); - vr_flush_flow_queue(router, fe, &fmd, vfq); + vfq = (struct vr_flow_queue *)vfdd->vfdd_flow_queue; + if (vfq) { + vr_flow_flush_hold_queue(router, fe, vfq); + vr_free(vfq, VR_FLOW_QUEUE_OBJECT); + vfdd->vfdd_flow_queue = NULL; + } + + if (vfdd->vfdd_delete) { + vr_reset_flow_entry(router, fe, vfdd->vfdd_fe_index); + } else { + rfe = vr_get_flow_entry(router, fe->fe_rflow); + vr_flow_evict_flow(router, fe, vfdd->vfdd_fe_index); + if (rfe) + vr_flow_evict_flow(router, rfe, fe->fe_rflow); } - vr_free(vfq, VR_FLOW_QUEUE_OBJECT); + + vr_free(vfdd, VR_FLOW_DEFER_DATA_OBJECT); + return; } static void -vr_flow_queue_free_defer(struct vr_flow_md *flmd, struct vr_flow_queue *vfq) +vr_flow_reset_evict(struct vrouter *router, struct vr_flow_entry *fe) { + unsigned short flags; + + if (!fe) + return; + + flags = fe->fe_flags; + if (flags & VR_FLOW_FLAG_EVICT_CANDIDATE) { + (void)__sync_bool_compare_and_swap(&fe->fe_flags, flags, + (flags ^ VR_FLOW_FLAG_EVICT_CANDIDATE)); + } + + vr_flow_stop_modify(router, fe); + + return; +} + +static void +vr_flow_defer(struct vr_flow_md *flmd, struct vr_flow_entry *fe) +{ + struct vr_flow_entry *rfe; struct vr_defer_data *vdd = flmd->flmd_defer_data; + struct vr_flow_defer_data *vfdd; + + if (!vdd || !vdd->vdd_data) { + if (fe->fe_rflow) { + rfe = vr_get_flow_entry(flmd->flmd_router, fe->fe_rflow); + vr_flow_reset_evict(flmd->flmd_router, rfe); + } + vr_flow_reset_evict(flmd->flmd_router, fe); + + if (!(flmd->flmd_flags & VR_FLOW_FLAG_ACTIVE)) { + vr_reset_flow_entry(flmd->flmd_router, fe, flmd->flmd_index); + } - if (!vdd) { - vr_free(vfq, VR_FLOW_QUEUE_OBJECT); return; } - vdd->vdd_data = (void *)vfq; - vr_defer(flmd->flmd_router, vr_flow_queue_free, (void *)vdd); + vfdd = (struct vr_flow_defer_data *)vdd->vdd_data; + vfdd->vfdd_fe = fe; + + vr_defer(flmd->flmd_router, vr_flow_defer_cb, (void *)vdd); flmd->flmd_defer_data = NULL; return; } static struct vr_flow_entry * -vr_find_free_entry(struct vrouter *router, struct vr_flow *key, uint8_t type, - bool need_hold, unsigned int *fe_index) +vr_flow_table_get_free_entry(struct vrouter *router, struct vr_btable *table, + unsigned int start, unsigned int entries, unsigned int *free_index) { - unsigned int i, index, hash; - struct vr_flow_entry *tmp_fe, *fe = NULL; - - *fe_index = 0; + bool alloced = false, swapped = false; + unsigned short flags; + unsigned int i, j, table_size; - hash = vr_hash(key, key->flow_key_len, 0); + struct vr_flow_entry *fe; - index = (hash % vr_flow_entries) & ~(VR_FLOW_ENTRIES_PER_BUCKET - 1); - for (i = 0; i < VR_FLOW_ENTRIES_PER_BUCKET; i++) { - tmp_fe = vr_flow_table_entry_get(router, index); - if (tmp_fe && !(tmp_fe->fe_flags & VR_FLOW_FLAG_ACTIVE)) { - if (vr_set_flow_active(tmp_fe)) { - vr_init_flow_entry(tmp_fe); - fe = tmp_fe; - break; + table_size = vr_btable_entries(table); + for (i = start, j = 0; j < entries; + (i = ((i + 1) % table_size)), j++) { + fe = vr_btable_get(table, i); + if (fe) { + flags = fe->fe_flags; + if (!(flags & VR_FLOW_FLAG_ACTIVE)) { + if (vr_set_flow_active(fe)) { + alloced = true; + vr_init_flow_entry(fe); + break; + } + } else if (flags & VR_FLOW_FLAG_EVICTED) { + swapped = __sync_bool_compare_and_swap(&fe->fe_flags, flags, + ((flags & ~VR_FLOW_FLAG_EVICTED) | + VR_FLOW_FLAG_NEW_FLOW)); + if (swapped) { + alloced = true; + vr_reset_active_flow_entry(router, fe, i); + break; + } } } - index++; } + if (alloced) { + *free_index = i; + return fe; + } + + return NULL; +} + +static struct vr_flow_entry * +vr_flow_get_free_entry(struct vrouter *router, struct vr_flow *key, uint8_t type, + bool need_hold, unsigned int *fe_index) +{ + unsigned int index, hash; + struct vr_flow_entry *fe = NULL; + + hash = vr_hash(key, key->flow_key_len, 0); + + index = (hash % vr_flow_entries) & ~(VR_FLOW_ENTRIES_PER_BUCKET - 1); + fe = vr_flow_table_get_free_entry(router, router->vr_flow_table, index, + VR_FLOW_ENTRIES_PER_BUCKET, fe_index); if (!fe) { index = hash % vr_oflow_entries; - for (i = 0; i < vr_oflow_entries; i++) { - tmp_fe = vr_oflow_table_entry_get(router, index); - if (tmp_fe && !(tmp_fe->fe_flags & VR_FLOW_FLAG_ACTIVE)) { - if (vr_set_flow_active(tmp_fe)) { - vr_init_flow_entry(tmp_fe); - fe = tmp_fe; - break; - } - } - index = (index + 1) % vr_oflow_entries; - } - + fe = vr_flow_table_get_free_entry(router, router->vr_oflow_table, index, + vr_oflow_entries, fe_index); if (fe) *fe_index += vr_flow_entries; } if (fe) { - *fe_index += index; if (need_hold) { fe->fe_hold_list = vr_zalloc(sizeof(struct vr_flow_queue), VR_FLOW_QUEUE_OBJECT); @@ -426,8 +572,10 @@ vr_flow_table_lookup(struct vr_flow *key, uint16_t type, (flow_e->fe_flags & VR_FLOW_FLAG_ACTIVE) && (flow_e->fe_type == type)) { if (!memcmp(&flow_e->fe_key, key, key->flow_key_len)) { - *fe_index = (hash + i) % table_size; - return flow_e; + if (!(flow_e->fe_flags & VR_FLOW_FLAG_EVICTED)) { + *fe_index = (hash + i) % table_size; + return flow_e; + } } } } @@ -497,7 +645,7 @@ vr_flow_fill_pnode(struct vr_packet_node *pnode, struct vr_packet *pkt, static int vr_enqueue_flow(struct vrouter *router, struct vr_flow_entry *fe, struct vr_packet *pkt, unsigned int index, - struct vr_forwarding_md *fmd) + struct vr_flow_stats *stats, struct vr_forwarding_md *fmd) { unsigned int i; unsigned short drop_reason = 0; @@ -518,7 +666,7 @@ vr_enqueue_flow(struct vrouter *router, struct vr_flow_entry *fe, pnode = &vfq->vfq_pnodes[i]; vr_flow_fill_pnode(pnode, pkt, fmd); if (!i) - vr_trap_flow(router, fe, pkt, index); + vr_trap_flow(router, fe, pkt, index, stats); return 0; drop: @@ -555,6 +703,74 @@ vr_flow_set_forwarding_md(struct vrouter *router, struct vr_flow_entry *fe, return; } +static bool +__vr_flow_mark_evict(struct vrouter *router, struct vr_flow_entry *fe) +{ + unsigned short flags; + + flags = fe->fe_flags; + if (flags & VR_FLOW_FLAG_ACTIVE) { + if (vr_flow_start_modify(router, fe)) { + flags = __sync_fetch_and_or(&fe->fe_flags, + VR_FLOW_FLAG_EVICT_CANDIDATE); + if (flags & VR_FLOW_FLAG_EVICT_CANDIDATE) { + goto unset_modified; + } else { + return true; + } + } + } + + return false; + +unset_modified: + vr_flow_stop_modify(router, fe); + return false; +} + +static void +vr_flow_mark_evict(struct vrouter *router, struct vr_flow_entry *fe, + unsigned int index) +{ + bool evict_forward_flow = true; + + struct vr_flow_entry *rfe = NULL; + + if (fe->fe_rflow >= 0) { + rfe = vr_get_flow_entry(router, fe->fe_rflow); + if (rfe && (rfe->fe_rflow == index)) { + if (rfe->fe_tcp_flags & VR_FLOW_TCP_DEAD) { + evict_forward_flow = __vr_flow_mark_evict(router, rfe); + } else { + evict_forward_flow = false; + } + } else { + rfe = NULL; + } + } + + if (evict_forward_flow) { + if (__vr_flow_mark_evict(router, fe)) { + if (__vr_flow_schedule_transition(router, fe, + index, fe->fe_flags) < 0) { + if (rfe) + vr_flow_reset_evict(router, rfe); + vr_flow_reset_evict(router, fe); + } + } else { + goto unset_modified; + } + } + + return; + +unset_modified: + if (rfe) + vr_flow_stop_modify(router, rfe); + + return; +} + static flow_result_t vr_flow_action(struct vrouter *router, struct vr_flow_entry *fe, unsigned int index, struct vr_packet *pkt, @@ -642,13 +858,17 @@ vr_flow_action(struct vrouter *router, struct vr_flow_entry *fe, break; } + if (fe->fe_tcp_flags & VR_FLOW_TCP_DEAD) + vr_flow_mark_evict(router, fe, index); + return result; } unsigned int vr_trap_flow(struct vrouter *router, struct vr_flow_entry *fe, - struct vr_packet *pkt, unsigned int index) + struct vr_packet *pkt, unsigned int index, + struct vr_flow_stats *stats) { unsigned int trap_reason; struct vr_packet *npkt; @@ -662,14 +882,32 @@ vr_trap_flow(struct vrouter *router, struct vr_flow_entry *fe, switch (fe->fe_flags & VR_FLOW_FLAG_TRAP_MASK) { default: - trap_reason = AGENT_TRAP_FLOW_MISS; + /* + * agent needs a method to identify new flows from existing flows. + * existing flows can be reused (evicted) or the action of such flows + * can become hold. If existing flows are reused and packet is trapped, + * agent will not re-evaluate the flow. Hence, agent has to be told + * that this is a new flow, which we indicate by the trap reason. + */ + if (fe->fe_flags & VR_FLOW_FLAG_NEW_FLOW) { + trap_reason = AGENT_TRAP_FLOW_MISS; + fe->fe_flags ^= VR_FLOW_FLAG_NEW_FLOW; + } else { + trap_reason = AGENT_TRAP_FLOW_ACTION_HOLD; + } + ta.vfta_index = index; if ((fe->fe_type == VP_TYPE_IP) || (fe->fe_type == VP_TYPE_IP6)) ta.vfta_nh_index = fe->fe_key.flow_nh_id; + if (stats) { + ta.vfta_stats = *stats; + } else { + ta.vfta_stats = fe->fe_stats; + } + break; } - return vr_trap(npkt, fe->fe_vrf, trap_reason, &ta); } @@ -679,6 +917,13 @@ vr_do_flow_action(struct vrouter *router, struct vr_flow_entry *fe, struct vr_forwarding_md *fmd) { uint32_t new_stats; + struct vr_flow_stats stats, *stats_p = NULL; + + if (fe->fe_flags & VR_FLOW_FLAG_NEW_FLOW) { + memcpy(&stats, &fe->fe_stats, sizeof(fe->fe_stats)); + memset(&fe->fe_stats, 0, sizeof(fe->fe_stats)); + stats_p = &stats; + } new_stats = __sync_add_and_fetch(&fe->fe_stats.flow_bytes, pkt_len(pkt)); if (new_stats < pkt_len(pkt)) @@ -689,7 +934,7 @@ vr_do_flow_action(struct vrouter *router, struct vr_flow_entry *fe, fe->fe_stats.flow_packets_oflow++; if (fe->fe_action == VR_FLOW_ACTION_HOLD) { - vr_enqueue_flow(router, fe, pkt, index, fmd); + vr_enqueue_flow(router, fe, pkt, index, stats_p, fmd); return FLOW_HELD; } @@ -753,25 +998,14 @@ static void vr_flow_init_close(struct vrouter *router, struct vr_flow_entry *flow_e, struct vr_packet *pkt, struct vr_forwarding_md *fmd) { - unsigned int flow_index; - unsigned int head_room = sizeof(struct agent_hdr) + sizeof(struct vr_eth); - - struct vr_packet *pkt_c; - - pkt_c = vr_pclone(pkt); - if (!pkt_c) - return; + struct vr_flow_entry *rfe; - vr_preset(pkt_c); - if (vr_pcow(pkt_c, head_room)) { - vr_pfree(pkt_c, VP_DROP_PCOW_FAIL); - return; + (void)__sync_fetch_and_or(&flow_e->fe_tcp_flags, VR_FLOW_TCP_DEAD); + rfe = vr_get_flow_entry(router, flow_e->fe_rflow); + if (rfe) { + (void)__sync_fetch_and_or(&rfe->fe_tcp_flags, VR_FLOW_TCP_DEAD); } - flow_index = fmd->fmd_flow_index; - vr_trap(pkt_c, fmd->fmd_dvrf, AGENT_TRAP_SESSION_CLOSE, - (void *)&flow_index); - return; } @@ -945,7 +1179,7 @@ vr_flow_lookup(struct vrouter *router, struct vr_flow *key, return FLOW_CONSUMED; } - flow_e = vr_find_free_entry(router, key, pkt->vp_type, + flow_e = vr_flow_get_free_entry(router, key, pkt->vp_type, true, &fe_index); if (!flow_e) { vr_pfree(pkt, VP_DROP_FLOW_TABLE_FULL); @@ -957,6 +1191,9 @@ vr_flow_lookup(struct vrouter *router, struct vr_flow *key, vr_flow_entry_set_hold(router, flow_e); } + if (flow_e->fe_flags & VR_FLOW_FLAG_EVICT_CANDIDATE) + return FLOW_DROP; + vr_flow_set_forwarding_md(router, flow_e, fe_index, fmd); vr_flow_tcp_digest(router, flow_e, pkt, fmd); @@ -1072,7 +1309,7 @@ vr_flow_flush_pnode(struct vrouter *router, struct vr_packet_node *pnode, } static void -vr_flush_flow_queue(struct vrouter *router, struct vr_flow_entry *fe, +__vr_flow_flush_hold_queue(struct vrouter *router, struct vr_flow_entry *fe, struct vr_forwarding_md *fmd, struct vr_flow_queue *vfq) { unsigned int i; @@ -1090,28 +1327,56 @@ static void vr_flush_entry(struct vrouter *router, struct vr_flow_entry *fe, struct vr_flow_md *flmd, struct vr_forwarding_md *fmd) { - struct vr_flow_queue *vfq; + bool swapped; - if (fe->fe_action == VR_FLOW_ACTION_HOLD) - return; + struct vr_flow_queue *vfq; + struct vr_defer_data *vdd = flmd->flmd_defer_data; + struct vr_flow_defer_data *vfdd; vfq = fe->fe_hold_list; - if (!vfq) - return; - fe->fe_hold_list = NULL; + if (vfq) { + if (fe->fe_action == VR_FLOW_ACTION_HOLD) + return; + + swapped = __sync_bool_compare_and_swap(&fe->fe_hold_list, vfq, NULL); + if (swapped) { + __vr_flow_flush_hold_queue(router, fe, fmd, vfq); + if (!vdd || !vdd->vdd_data) + goto free_flush_queue; - vr_flush_flow_queue(router, fe, fmd, vfq); - vr_flow_queue_free_defer(flmd, vfq); + vfdd = (struct vr_flow_defer_data *)vdd->vdd_data; + vfdd->vfdd_flow_queue = vfq; + } + } return; + +free_flush_queue: + if (vfq) + vr_free(vfq, VR_FLOW_QUEUE_OBJECT); + return; } static void -vr_flow_flush(void *arg) +__vr_flow_work(struct vrouter *router, struct vr_flow_entry *fe, + struct vr_flow_md *flmd) +{ + struct vr_forwarding_md fmd; + + vr_init_forwarding_md(&fmd); + vr_flow_set_forwarding_md(router, fe, flmd->flmd_index, &fmd); + vr_flush_entry(router, fe, flmd, &fmd); + + vr_flow_defer(flmd, fe); + return; +} + + +static void +vr_flow_work(void *arg) { struct vrouter *router; struct vr_flow_entry *fe; - struct vr_forwarding_md fmd; struct vr_flow_md *flmd = (struct vr_flow_md *)arg; @@ -1123,17 +1388,14 @@ vr_flow_flush(void *arg) if (!fe) goto exit_flush; - vr_init_forwarding_md(&fmd); - vr_flow_set_forwarding_md(router, fe, flmd->flmd_index, &fmd); - - vr_flush_entry(router, fe, flmd, &fmd); - - if (!(flmd->flmd_flags & VR_FLOW_FLAG_ACTIVE)) { - vr_reset_flow_entry(router, fe, flmd->flmd_index); - } + __vr_flow_work(router, fe, flmd); exit_flush: if (flmd->flmd_defer_data) { + if (flmd->flmd_defer_data->vdd_data) { + vr_free(flmd->flmd_defer_data->vdd_data, + VR_FLOW_DEFER_DATA_OBJECT); + } vr_put_defer_data(flmd->flmd_defer_data); flmd->flmd_defer_data = NULL; } @@ -1211,7 +1473,7 @@ vr_add_flow(unsigned int rid, struct vr_flow *key, uint8_t type, /* a race between agent and dp. allow agent to handle this error */ return NULL; } else { - flow_e = vr_find_free_entry(router, key, type, + flow_e = vr_flow_get_free_entry(router, key, type, need_hold_queue, fe_index); } @@ -1259,9 +1521,10 @@ vr_add_flow_req(vr_flow_req *req, unsigned int *fe_index) * agent), in which case we should be checking only the request */ static int -vr_flow_req_is_invalid(struct vrouter *router, vr_flow_req *req, +vr_flow_set_req_is_invalid(struct vrouter *router, vr_flow_req *req, struct vr_flow_entry *fe) { + int error = 0; struct vr_flow_entry *rfe; if (fe) { @@ -1272,34 +1535,44 @@ vr_flow_req_is_invalid(struct vrouter *router, vr_flow_req *req, (unsigned short)req->fr_flow_dport != fe->fe_key.flow_dport|| (unsigned short)req->fr_flow_nh_id != fe->fe_key.flow_nh_id || (unsigned char)req->fr_flow_proto != fe->fe_key.flow_proto) { - return -EBADF; + error = -EBADF; + goto invalid_req; } } } if (req->fr_flags & VR_FLOW_FLAG_VRFT) { - if ((unsigned short)req->fr_flow_dvrf >= router->vr_max_vrfs) - return -EINVAL; + if ((unsigned short)req->fr_flow_dvrf >= router->vr_max_vrfs) { + error = -EINVAL; + goto invalid_req; + } } if (req->fr_flags & VR_FLOW_FLAG_MIRROR) { if (((unsigned int)req->fr_mir_id >= router->vr_max_mirror_indices) && - (unsigned int)req->fr_sec_mir_id >= router->vr_max_mirror_indices) - return -EINVAL; + (unsigned int)req->fr_sec_mir_id >= router->vr_max_mirror_indices) { + error = -EINVAL; + goto invalid_req; + } } if (req->fr_flags & VR_RFLOW_VALID) { rfe = vr_get_flow_entry(router, req->fr_rindex); - if (!rfe) - return -EINVAL; + if (!rfe) { + error = -EINVAL; + goto invalid_req; + } } return 0; + +invalid_req: + return error; } static int -vr_flow_schedule_transition(struct vrouter *router, vr_flow_req *req, - struct vr_flow_entry *fe) +__vr_flow_schedule_transition(struct vrouter *router, struct vr_flow_entry *fe, + unsigned int index, unsigned short flags) { struct vr_flow_md *flmd; struct vr_defer_data *defer = NULL; @@ -1310,21 +1583,32 @@ vr_flow_schedule_transition(struct vrouter *router, vr_flow_req *req, return -ENOMEM; flmd->flmd_router = router; - flmd->flmd_index = req->fr_index; - flmd->flmd_flags = req->fr_flags; - if (fe->fe_hold_list) { + flmd->flmd_index = index; + flmd->flmd_flags = flags; + if (fe->fe_hold_list || (flags & VR_FLOW_FLAG_EVICT_CANDIDATE)) { defer = vr_get_defer_data(sizeof(*defer)); - if (!defer) { - vr_free(flmd, VR_FLOW_METADATA_OBJECT); - return -ENOMEM; + if (defer) { + defer->vdd_data = (void *)vr_zalloc(sizeof(struct vr_flow_defer_data), + VR_FLOW_DEFER_DATA_OBJECT); + if (!(flmd->flmd_flags & VR_FLOW_FLAG_ACTIVE)) { + ((struct vr_flow_defer_data *)defer->vdd_data)->vfdd_delete = + true; + } } } flmd->flmd_defer_data = defer; - vr_schedule_work(vr_get_cpu(), vr_flow_flush, (void *)flmd); + vr_schedule_work(vr_get_cpu(), vr_flow_work, (void *)flmd); return 0; } +static int +vr_flow_schedule_transition(struct vrouter *router, vr_flow_req *req, + struct vr_flow_entry *fe) +{ + return __vr_flow_schedule_transition(router, fe, req->fr_index, req->fr_flags); +} + static int vr_flow_delete(struct vrouter *router, vr_flow_req *req, struct vr_flow_entry *fe) @@ -1378,7 +1662,7 @@ vr_flow_set(struct vrouter *router, vr_flow_req *req) { int ret; unsigned int fe_index; - bool new_flow = false; + bool new_flow = false, modified = false; struct vr_flow_entry *fe = NULL, *rfe = NULL; struct vr_flow_table_info *infop = router->vr_flow_table_info; @@ -1388,14 +1672,21 @@ vr_flow_set(struct vrouter *router, vr_flow_req *req) return -EINVAL; fe = vr_get_flow_entry(router, req->fr_index); + if (fe) { + if (!(modified = vr_flow_start_modify(router, fe))) + return -EBUSY; + } - if ((ret = vr_flow_req_is_invalid(router, req, fe))) - return ret; + if ((ret = vr_flow_set_req_is_invalid(router, req, fe))) + goto exit_set; - if (fe && (fe->fe_action == VR_FLOW_ACTION_HOLD) && + if (fe) { + if ((fe->fe_action == VR_FLOW_ACTION_HOLD) && ((req->fr_action != fe->fe_action) || - !(req->fr_flags & VR_FLOW_FLAG_ACTIVE))) - __sync_fetch_and_add(&infop->vfti_action_count, 1); + !(req->fr_flags & VR_FLOW_FLAG_ACTIVE))) { + __sync_fetch_and_add(&infop->vfti_action_count, 1); + } + } /* * for delete, absence of the requested flow entry is caustic. so * handle that case first @@ -1424,8 +1715,10 @@ vr_flow_set(struct vrouter *router, vr_flow_req *req) if (!fe->fe_hold_list) { fe->fe_hold_list = vr_zalloc(sizeof(struct vr_flow_queue), VR_FLOW_QUEUE_OBJECT); - if (!fe->fe_hold_list) - return -ENOMEM; + if (!fe->fe_hold_list) { + ret = -ENOMEM; + goto exit_set; + } } } } @@ -1470,7 +1763,7 @@ vr_flow_set(struct vrouter *router, vr_flow_req *req) if (fe->fe_action == VR_FLOW_ACTION_DROP) fe->fe_drop_reason = (uint8_t)req->fr_drop_reason; - fe->fe_flags = req->fr_flags; + fe->fe_flags = VR_FLOW_FLAG_MASK(req->fr_flags); if (new_flow && (fe->fe_flags & VR_RFLOW_VALID)) { rfe = vr_get_flow_entry(router, fe->fe_rflow); if (rfe) { @@ -1483,7 +1776,14 @@ vr_flow_set(struct vrouter *router, vr_flow_req *req) vr_flow_udp_src_port(router, fe); - return vr_flow_schedule_transition(router, req, fe); + ret = vr_flow_schedule_transition(router, req, fe); + +exit_set: + if (modified && fe) { + vr_flow_stop_modify(router, fe); + } + + return ret; } static void diff --git a/dp-core/vr_interface.c b/dp-core/vr_interface.c index 647f99c05..0ed36f5d9 100644 --- a/dp-core/vr_interface.c +++ b/dp-core/vr_interface.c @@ -377,6 +377,7 @@ agent_trap_may_truncate(int trap_reason) case AGENT_TRAP_NEXTHOP: case AGENT_TRAP_RESOLVE: case AGENT_TRAP_FLOW_MISS: + case AGENT_TRAP_FLOW_ACTION_HOLD: case AGENT_TRAP_ECMP_RESOLVE: case AGENT_TRAP_HANDLE_DF: case AGENT_TRAP_ZERO_TTL: @@ -473,16 +474,20 @@ agent_send(struct vr_interface *vif, struct vr_packet *pkt, switch (params->trap_reason) { case AGENT_TRAP_FLOW_MISS: + case AGENT_TRAP_FLOW_ACTION_HOLD: if (params->trap_param) { fta = (struct vr_flow_trap_arg *)(params->trap_param); hdr->hdr_cmd_param = htonl(fta->vfta_index); hdr->hdr_cmd_param_1 = htonl(fta->vfta_nh_index); + hdr->hdr_cmd_param_2 = htonl(fta->vfta_stats.flow_bytes); + hdr->hdr_cmd_param_3 = htonl(fta->vfta_stats.flow_packets); + hdr->hdr_cmd_param_4 = htonl((fta->vfta_stats.flow_bytes_oflow | + (fta->vfta_stats.flow_packets_oflow << 16))); } break; case AGENT_TRAP_ECMP_RESOLVE: case AGENT_TRAP_SOURCE_MISMATCH: - case AGENT_TRAP_SESSION_CLOSE: if (params->trap_param) hdr->hdr_cmd_param = htonl(*(unsigned int *)(params->trap_param)); break; diff --git a/dpdk/vr_dpdk_host.c b/dpdk/vr_dpdk_host.c index 685533d81..113a30327 100644 --- a/dpdk/vr_dpdk_host.c +++ b/dpdk/vr_dpdk_host.c @@ -41,7 +41,7 @@ static bool vr_host_inited = false; extern void vr_malloc_stats(unsigned int, unsigned int); extern void vr_free_stats(unsigned int); /* RCU callback */ -extern void vr_flow_queue_free(struct vrouter *router, void *arg); +extern void vr_flow_defer_cb(struct vrouter *router, void *arg); static void * @@ -517,25 +517,29 @@ dpdk_rcu_cb(struct rcu_head *rh) struct vr_flow_queue *vfq; struct vr_packet_node *pnode; + cb_data = CONTAINER_OF(rcd_rcu, struct vr_dpdk_rcu_cb_data, rh); /* check if we need to pass the callback to packet lcore */ - if (cb_data->rcd_user_cb == vr_flow_queue_free - && cb_data->rcd_user_data) { + if ((cb_data->rcd_user_cb == vr_flow_defer_cb) && + cb_data->rcd_user_data) { defer = (struct vr_defer_data *)cb_data->rcd_user_data; - vfq = (struct vr_flow_queue *)defer->vdd_data; - for (i = 0; i < VR_MAX_FLOW_QUEUE_ENTRIES; i++) { - pnode = &vfq->vfq_pnodes[i]; - if (pnode->pl_packet) { - RTE_LOG(DEBUG, VROUTER, "%s: lcore %u passing RCU callback to lcore %u\n", - __func__, rte_lcore_id(), VR_DPDK_PACKET_LCORE_ID); - vr_dpdk_lcore_cmd_post(VR_DPDK_PACKET_LCORE_ID, - VR_DPDK_LCORE_RCU_CMD, (uintptr_t)rh); - return; + vfq = ((struct vr_flow_defer_data *)defer->vdd_data)->vfdd_flow_queue; + if (vfq) { + for (i = 0; i < VR_MAX_FLOW_QUEUE_ENTRIES; i++) { + pnode = &vfq->vfq_pnodes[i]; + if (pnode->pl_packet) { + RTE_LOG(DEBUG, VROUTER, "%s: lcore %u passing RCU callback " + "to lcore %u\n", __func__, rte_lcore_id(), + VR_DPDK_PACKET_LCORE_ID); + vr_dpdk_lcore_cmd_post(VR_DPDK_PACKET_LCORE_ID, + VR_DPDK_LCORE_RCU_CMD, (uintptr_t)rh); + return; + } } + RTE_LOG(DEBUG, VROUTER, "%s: lcore %u passing RCU callback to lcore %u\n", + __func__, rte_lcore_id(), VR_DPDK_PACKET_LCORE_ID); } - RTE_LOG(DEBUG, VROUTER, "%s: lcore %u just calling RCU callback\n", - __func__, rte_lcore_id()); } /* no need to send any packets, so just call the callback */ cb_data->rcd_user_cb(cb_data->rcd_router, cb_data->rcd_user_data); diff --git a/include/vr_defs.h b/include/vr_defs.h index e3ba8b816..61467f4f8 100644 --- a/include/vr_defs.h +++ b/include/vr_defs.h @@ -34,7 +34,7 @@ #define AGENT_TRAP_ZERO_TTL 12 #define AGENT_TRAP_ICMP_ERROR 13 #define AGENT_TRAP_TOR_CONTROL_PKT 14 -#define AGENT_TRAP_SESSION_CLOSE 15 +#define AGENT_TRAP_FLOW_ACTION_HOLD 15 #define MAX_AGENT_HDR_COMMANDS 16 enum rt_type{ @@ -60,6 +60,9 @@ struct agent_hdr { unsigned short hdr_cmd; unsigned int hdr_cmd_param; unsigned int hdr_cmd_param_1; + unsigned int hdr_cmd_param_2; + unsigned int hdr_cmd_param_3; + unsigned int hdr_cmd_param_4; } __attribute__((packed)); #define CMD_PARAM_PACKET_CTRL 0x1 diff --git a/include/vr_flow.h b/include/vr_flow.h index 34b5eb246..34b3307d4 100644 --- a/include/vr_flow.h +++ b/include/vr_flow.h @@ -21,12 +21,18 @@ typedef enum { FLOW_CONSUMED, } flow_result_t; -#define VR_FLOW_FLAG_ACTIVE 0x1 -#define VR_RFLOW_VALID 0x1000 -#define VR_FLOW_FLAG_MIRROR 0x2000 -#define VR_FLOW_FLAG_VRFT 0x4000 -#define VR_FLOW_FLAG_LINK_LOCAL 0x8000 - +#define VR_FLOW_FLAG_ACTIVE 0x0001 +#define VR_FLOW_FLAG_MODIFIED 0x0100 +#define VR_FLOW_FLAG_NEW_FLOW 0x0200 +#define VR_FLOW_FLAG_EVICT_CANDIDATE 0x0400 +#define VR_FLOW_FLAG_EVICTED 0x0800 +#define VR_RFLOW_VALID 0x1000 +#define VR_FLOW_FLAG_MIRROR 0x2000 +#define VR_FLOW_FLAG_VRFT 0x4000 +#define VR_FLOW_FLAG_LINK_LOCAL 0x8000 + +#define VR_FLOW_FLAG_MASK(flag) ((flag) & ~(VR_FLOW_FLAG_EVICT_CANDIDATE |\ + VR_FLOW_FLAG_EVICTED | VR_FLOW_FLAG_NEW_FLOW)) /* rest of the flags are action specific */ /* for NAT */ @@ -69,6 +75,13 @@ typedef enum { : AF_INET) struct vr_forwarding_md; +struct vr_flow_defer_data { + struct vr_flow_queue *vfdd_flow_queue; + struct vr_flow_entry *vfdd_fe; + unsigned int vfdd_fe_index; + bool vfdd_delete; +}; + struct vr_common_flow{ unsigned char ip_family; unsigned char ip_proto; @@ -215,6 +228,46 @@ struct vr_flow_queue { struct vr_packet_node vfq_pnodes[VR_MAX_FLOW_QUEUE_ENTRIES]; }; +/* + * Flow eviction: + * 1. Requirement + * -------------- + * + * Inactive TCP flows (flows that have already seen the closure cycle - FIN/ACK + * or the RESET flags) should additionally be considered as a free flow entry + * so that vRouter does not have to wait for agent's aging cycle to accommodate + * new flows under severe occupancy and provide better service. + * + * 2. Problems in datapath initiated flow closure + * ---------------------------------------------- + * + * . Simultaneous discovery of the same flow entry by two different CPUs + * . Simultaneous closure of an entry by both agent as well as from datapath + * . Handling of packets held in the flow entry when the entry moves from hold to + * closed state + * + * 3. Implementation + * ----------------- + * + * 3.1 Marking + * ----------- + * + * Once the TCP state machine determines that a flow can be closed, it updates + * the tcp flags with a new flag VR_FLOW_TCP_DEAD, since determining whether a + * tcp flow has seen its end with only the existing TCP flags is a bit more + * involved. The last packet before exiting the module, marks the flow as a an + * eviction candidate (VR_FLOW_FLAG_EVICT_CANDIDATE). + * + * 3.2 Allocation/Eviction + * ----------------------- + * + * Once the last packet exits the flow module, a work is scheduled to mark the + * flow as inactive. This work will schedule and RCU call back to mark the entry + * as inactive (this is the same flow for deletion of flow from agent). While + * deleting the entry, the evicted flow will also be marked as evicted (VR_FLOW_ + * FLAG_EVICTED). + * + */ #define VR_FLOW_TCP_FIN 0x0001 #define VR_FLOW_TCP_HALF_CLOSE 0x0002 #define VR_FLOW_TCP_FIN_R 0x0004 @@ -223,12 +276,14 @@ struct vr_flow_queue { #define VR_FLOW_TCP_ESTABLISHED 0x0020 #define VR_FLOW_TCP_ESTABLISHED_R 0x0040 #define VR_FLOW_TCP_RST 0x0080 +#define VR_FLOW_TCP_DEAD 0x8000 /* align to 8 byte boundary */ #define VR_FLOW_KEY_PAD ((8 - (sizeof(struct vr_flow) % 8)) % 8) struct vr_dummy_flow_entry { struct vr_flow fe_key; + uint8_t fe_key_packing; uint16_t fe_tcp_flags; unsigned int fe_tcp_seq; struct vr_flow_queue *fe_hold_list; @@ -252,6 +307,7 @@ struct vr_dummy_flow_entry { /* do not change. any field positions as it might lead to incompatibility */ struct vr_flow_entry { struct vr_flow fe_key; + uint8_t fe_key_packing; uint16_t fe_tcp_flags; unsigned int fe_tcp_seq; struct vr_flow_queue *fe_hold_list; @@ -305,6 +361,7 @@ struct vr_flow_md { struct vr_flow_trap_arg { unsigned int vfta_index; unsigned int vfta_nh_index; + struct vr_flow_stats vfta_stats; }; struct vr_packet; diff --git a/include/vrouter.h b/include/vrouter.h index 90b4f787b..33ab07f48 100644 --- a/include/vrouter.h +++ b/include/vrouter.h @@ -55,6 +55,7 @@ enum vr_malloc_objects_t { VR_FLOW_LINK_LOCAL_OBJECT, VR_FLOW_METADATA_OBJECT, VR_FLOW_TABLE_INFO_OBJECT, + VR_FLOW_DEFER_DATA_OBJECT, VR_FRAGMENT_OBJECT, VR_FRAGMENT_QUEUE_OBJECT, VR_FRAGMENT_QUEUE_ELEMENT_OBJECT, diff --git a/utils/dropstats.c b/utils/dropstats.c index b9de1bb0b..cc1c99a92 100644 --- a/utils/dropstats.c +++ b/utils/dropstats.c @@ -22,6 +22,7 @@ #include "vr_os.h" #include "vr_types.h" #include "vr_nexthop.h" +#include "ini_parser.h" #include "nl_util.h" #include "ini_parser.h" diff --git a/utils/flow.c b/utils/flow.c index 3469ebad1..51268d5ec 100644 --- a/utils/flow.c +++ b/utils/flow.c @@ -137,7 +137,8 @@ dump_legend(void) printf("L=Link Local Port)\n"); printf(" Other:K(nh)=Key_Nexthop, S(nh)=RPF_Nexthop\n"); - printf(" TCP(r=reverse):S=SYN, F=FIN, R=RST, C=HalfClose, E=Established\n"); + printf(" Flags:E=Evicted, Ec=Evict Candidate, N=New Flow, M=Modified\n"); + printf("TCP(r=reverse):S=SYN, F=FIN, R=RST, C=HalfClose, E=Established, D=Dead\n"); printf("\n"); return; @@ -284,6 +285,17 @@ dump_table(struct flow_table *ft) printf("(%u)", fe->fe_drop_reason); } + printf(", "); + printf("Flags:"); + if (fe->fe_flags & VR_FLOW_FLAG_EVICTED) + printf("E"); + if (fe->fe_flags & VR_FLOW_FLAG_EVICT_CANDIDATE) + printf("Ec"); + if (fe->fe_flags & VR_FLOW_FLAG_NEW_FLOW) + printf("N"); + if (fe->fe_flags & VR_FLOW_FLAG_MODIFIED) + printf("M"); + printf(", "); if (fe->fe_key.flow4_proto == VR_IP_PROTO_TCP) { printf("TCP:"); @@ -305,6 +317,8 @@ dump_table(struct flow_table *ft) printf("R"); if (fe->fe_tcp_flags & VR_FLOW_TCP_HALF_CLOSE) printf("C"); + if (fe->fe_tcp_flags & VR_FLOW_TCP_DEAD) + printf("D"); printf(", "); }