diff --git a/dp-core/vr_flow.c b/dp-core/vr_flow.c index 1fcc1e2a3..4ffdd4e59 100644 --- a/dp-core/vr_flow.c +++ b/dp-core/vr_flow.c @@ -53,15 +53,18 @@ int hashrnd_inited = 0; static void vr_flush_entry(struct vrouter *, struct vr_flow_entry *, struct vr_flow_md *, struct vr_forwarding_md *); -static void vr_flush_flow_queue(struct vrouter *, struct vr_flow_entry *, +static void __vr_flow_flush_hold_queue(struct vrouter *, struct vr_flow_entry *, struct vr_forwarding_md *, struct vr_flow_queue *); static void vr_flow_set_forwarding_md(struct vrouter *, struct vr_flow_entry *, unsigned int, struct vr_forwarding_md *); +static int +__vr_flow_schedule_transition(struct vrouter *, struct vr_flow_entry *, + unsigned int, unsigned short); struct vr_flow_entry *vr_find_flow(struct vrouter *, struct vr_flow *, uint8_t, unsigned int *); unsigned int vr_trap_flow(struct vrouter *, struct vr_flow_entry *, - struct vr_packet *, unsigned int); + struct vr_packet *, unsigned int, struct vr_flow_stats *); void get_random_bytes(void *buf, int nbytes); @@ -199,10 +202,9 @@ vr_init_flow_entry(struct vr_flow_entry *fe) static void -vr_reset_flow_entry(struct vrouter *router, struct vr_flow_entry *fe, +vr_reset_active_flow_entry(struct vrouter *router, struct vr_flow_entry *fe, unsigned int index) { - memset(&fe->fe_stats, 0, sizeof(fe->fe_stats)); if (fe->fe_hold_list) { vr_printf("vrouter: Potential memory leak @ %s:%d\n", __FILE__, __LINE__); @@ -217,9 +219,22 @@ vr_reset_flow_entry(struct vrouter *router, struct vr_flow_entry *fe, fe->fe_src_nh_index = NH_DISCARD_ID; fe->fe_rflow = -1; fe->fe_action = VR_FLOW_ACTION_DROP; - fe->fe_flags = 0; fe->fe_udp_src_port = 0; fe->fe_tcp_flags = 0; + fe->fe_flags &= + (VR_FLOW_FLAG_ACTIVE | VR_FLOW_FLAG_EVICT_CANDIDATE | + VR_FLOW_FLAG_NEW_FLOW); + + return; +} + +static void +vr_reset_flow_entry(struct vrouter *router, struct vr_flow_entry *fe, + unsigned int index) +{ + vr_reset_active_flow_entry(router, fe, index); + memset(&fe->fe_stats, 0, sizeof(fe->fe_stats)); + fe->fe_flags = 0; return; } @@ -229,7 +244,8 @@ static inline bool vr_set_flow_active(struct vr_flow_entry *fe) { return __sync_bool_compare_and_swap(&fe->fe_flags, - fe->fe_flags & ~VR_FLOW_FLAG_ACTIVE, VR_FLOW_FLAG_ACTIVE); + fe->fe_flags & ~VR_FLOW_FLAG_ACTIVE, + VR_FLOW_FLAG_ACTIVE | VR_FLOW_FLAG_NEW_FLOW); } static inline struct vr_flow_entry * @@ -296,92 +312,222 @@ vr_get_flow_entry(struct vrouter *router, int index) return (struct vr_flow_entry *)vr_btable_get(table, index); } +static inline void +vr_flow_stop_modify(struct vrouter *router, struct vr_flow_entry *fe) +{ + if (!fe) + return; + + (void)__sync_and_and_fetch(&fe->fe_flags, ~VR_FLOW_FLAG_MODIFIED); + return; +} + +static inline bool +vr_flow_start_modify(struct vrouter *router, struct vr_flow_entry *fe) +{ + unsigned short flags; + + flags = fe->fe_flags; + if (!(flags & (VR_FLOW_FLAG_MODIFIED | VR_FLOW_FLAG_EVICTED))) { + if (__sync_bool_compare_and_swap(&fe->fe_flags, flags, + flags | VR_FLOW_FLAG_MODIFIED)) { + return true; + } + } + + return false; +} + + /* Non-static due to RCU callback pointer comparison in vRouter/DPDK */ void -vr_flow_queue_free(struct vrouter *router, void *arg) +vr_flow_flush_hold_queue(struct vrouter *router, struct vr_flow_entry *fe, + struct vr_flow_queue *vfq) { struct vr_forwarding_md fmd; + + if (vfq) { + vr_init_forwarding_md(&fmd); + vr_flow_set_forwarding_md(router, fe, vfq->vfq_index, &fmd); + __vr_flow_flush_hold_queue(router, fe, &fmd, vfq); + } + + return; +} + +static void +vr_flow_evict_flow(struct vrouter *router, struct vr_flow_entry *fe, + unsigned int index) +{ + unsigned short flags; + + if (!fe) + return; + + if ((fe->fe_flags & VR_FLOW_FLAG_ACTIVE) && + (fe->fe_flags & VR_FLOW_FLAG_EVICT_CANDIDATE)) { + flags = fe->fe_flags | VR_FLOW_FLAG_ACTIVE | + VR_FLOW_FLAG_EVICT_CANDIDATE; + (void)__sync_bool_compare_and_swap(&fe->fe_flags, flags, + (flags ^ VR_FLOW_FLAG_EVICT_CANDIDATE) | + VR_FLOW_FLAG_EVICTED); + vr_flow_stop_modify(router, fe); + } + + return; +} + +void +vr_flow_defer_cb(struct vrouter *router, void *arg) +{ struct vr_defer_data *defer; - struct vr_flow_entry *fe; + struct vr_flow_entry *fe, *rfe; struct vr_flow_queue *vfq; + struct vr_flow_defer_data *vfdd; defer = (struct vr_defer_data *)arg; if (!defer) return; - vr_init_forwarding_md(&fmd); + vfdd = (struct vr_flow_defer_data *)defer->vdd_data; + if (!vfdd) + return; + fe = vfdd->vfdd_fe; - vfq = (struct vr_flow_queue *)defer->vdd_data; - fe = vr_get_flow_entry(router, vfq->vfq_index); - if (fe) { - vr_flow_set_forwarding_md(router, fe, vfq->vfq_index, &fmd); - vr_flush_flow_queue(router, fe, &fmd, vfq); + vfq = (struct vr_flow_queue *)vfdd->vfdd_flow_queue; + if (vfq) { + vr_flow_flush_hold_queue(router, fe, vfq); + vr_free(vfq, VR_FLOW_QUEUE_OBJECT); + vfdd->vfdd_flow_queue = NULL; + } + + if (vfdd->vfdd_delete) { + vr_reset_flow_entry(router, fe, vfdd->vfdd_fe_index); + } else { + rfe = vr_get_flow_entry(router, fe->fe_rflow); + vr_flow_evict_flow(router, fe, vfdd->vfdd_fe_index); + if (rfe) + vr_flow_evict_flow(router, rfe, fe->fe_rflow); } - vr_free(vfq, VR_FLOW_QUEUE_OBJECT); + + vr_free(vfdd, VR_FLOW_DEFER_DATA_OBJECT); + return; } static void -vr_flow_queue_free_defer(struct vr_flow_md *flmd, struct vr_flow_queue *vfq) +vr_flow_reset_evict(struct vrouter *router, struct vr_flow_entry *fe) { + unsigned short flags; + + if (!fe) + return; + + flags = fe->fe_flags; + if (flags & VR_FLOW_FLAG_EVICT_CANDIDATE) { + (void)__sync_bool_compare_and_swap(&fe->fe_flags, flags, + (flags ^ VR_FLOW_FLAG_EVICT_CANDIDATE)); + } + + vr_flow_stop_modify(router, fe); + + return; +} + +static void +vr_flow_defer(struct vr_flow_md *flmd, struct vr_flow_entry *fe) +{ + struct vr_flow_entry *rfe; struct vr_defer_data *vdd = flmd->flmd_defer_data; + struct vr_flow_defer_data *vfdd; + + if (!vdd || !vdd->vdd_data) { + if (fe->fe_rflow) { + rfe = vr_get_flow_entry(flmd->flmd_router, fe->fe_rflow); + vr_flow_reset_evict(flmd->flmd_router, rfe); + } + vr_flow_reset_evict(flmd->flmd_router, fe); + + if (!(flmd->flmd_flags & VR_FLOW_FLAG_ACTIVE)) { + vr_reset_flow_entry(flmd->flmd_router, fe, flmd->flmd_index); + } - if (!vdd) { - vr_free(vfq, VR_FLOW_QUEUE_OBJECT); return; } - vdd->vdd_data = (void *)vfq; - vr_defer(flmd->flmd_router, vr_flow_queue_free, (void *)vdd); + vfdd = (struct vr_flow_defer_data *)vdd->vdd_data; + vfdd->vfdd_fe = fe; + + vr_defer(flmd->flmd_router, vr_flow_defer_cb, (void *)vdd); flmd->flmd_defer_data = NULL; return; } static struct vr_flow_entry * -vr_find_free_entry(struct vrouter *router, struct vr_flow *key, uint8_t type, - bool need_hold, unsigned int *fe_index) +vr_flow_table_get_free_entry(struct vrouter *router, struct vr_btable *table, + unsigned int start, unsigned int entries, unsigned int *free_index) { - unsigned int i, index, hash; - struct vr_flow_entry *tmp_fe, *fe = NULL; - - *fe_index = 0; + bool alloced = false, swapped = false; + unsigned short flags; + unsigned int i, j, table_size; - hash = vr_hash(key, key->flow_key_len, 0); + struct vr_flow_entry *fe; - index = (hash % vr_flow_entries) & ~(VR_FLOW_ENTRIES_PER_BUCKET - 1); - for (i = 0; i < VR_FLOW_ENTRIES_PER_BUCKET; i++) { - tmp_fe = vr_flow_table_entry_get(router, index); - if (tmp_fe && !(tmp_fe->fe_flags & VR_FLOW_FLAG_ACTIVE)) { - if (vr_set_flow_active(tmp_fe)) { - vr_init_flow_entry(tmp_fe); - fe = tmp_fe; - break; + table_size = vr_btable_entries(table); + for (i = start, j = 0; j < entries; + (i = ((i + 1) % table_size)), j++) { + fe = vr_btable_get(table, i); + if (fe) { + flags = fe->fe_flags; + if (!(flags & VR_FLOW_FLAG_ACTIVE)) { + if (vr_set_flow_active(fe)) { + alloced = true; + vr_init_flow_entry(fe); + break; + } + } else if (flags & VR_FLOW_FLAG_EVICTED) { + swapped = __sync_bool_compare_and_swap(&fe->fe_flags, flags, + ((flags & ~VR_FLOW_FLAG_EVICTED) | + VR_FLOW_FLAG_NEW_FLOW)); + if (swapped) { + alloced = true; + vr_reset_active_flow_entry(router, fe, i); + break; + } } } - index++; } + if (alloced) { + *free_index = i; + return fe; + } + + return NULL; +} + +static struct vr_flow_entry * +vr_flow_get_free_entry(struct vrouter *router, struct vr_flow *key, uint8_t type, + bool need_hold, unsigned int *fe_index) +{ + unsigned int index, hash; + struct vr_flow_entry *fe = NULL; + + hash = vr_hash(key, key->flow_key_len, 0); + + index = (hash % vr_flow_entries) & ~(VR_FLOW_ENTRIES_PER_BUCKET - 1); + fe = vr_flow_table_get_free_entry(router, router->vr_flow_table, index, + VR_FLOW_ENTRIES_PER_BUCKET, fe_index); if (!fe) { index = hash % vr_oflow_entries; - for (i = 0; i < vr_oflow_entries; i++) { - tmp_fe = vr_oflow_table_entry_get(router, index); - if (tmp_fe && !(tmp_fe->fe_flags & VR_FLOW_FLAG_ACTIVE)) { - if (vr_set_flow_active(tmp_fe)) { - vr_init_flow_entry(tmp_fe); - fe = tmp_fe; - break; - } - } - index = (index + 1) % vr_oflow_entries; - } - + fe = vr_flow_table_get_free_entry(router, router->vr_oflow_table, index, + vr_oflow_entries, fe_index); if (fe) *fe_index += vr_flow_entries; } if (fe) { - *fe_index += index; if (need_hold) { fe->fe_hold_list = vr_zalloc(sizeof(struct vr_flow_queue), VR_FLOW_QUEUE_OBJECT); @@ -426,8 +572,10 @@ vr_flow_table_lookup(struct vr_flow *key, uint16_t type, (flow_e->fe_flags & VR_FLOW_FLAG_ACTIVE) && (flow_e->fe_type == type)) { if (!memcmp(&flow_e->fe_key, key, key->flow_key_len)) { - *fe_index = (hash + i) % table_size; - return flow_e; + if (!(flow_e->fe_flags & VR_FLOW_FLAG_EVICTED)) { + *fe_index = (hash + i) % table_size; + return flow_e; + } } } } @@ -497,7 +645,7 @@ vr_flow_fill_pnode(struct vr_packet_node *pnode, struct vr_packet *pkt, static int vr_enqueue_flow(struct vrouter *router, struct vr_flow_entry *fe, struct vr_packet *pkt, unsigned int index, - struct vr_forwarding_md *fmd) + struct vr_flow_stats *stats, struct vr_forwarding_md *fmd) { unsigned int i; unsigned short drop_reason = 0; @@ -518,7 +666,7 @@ vr_enqueue_flow(struct vrouter *router, struct vr_flow_entry *fe, pnode = &vfq->vfq_pnodes[i]; vr_flow_fill_pnode(pnode, pkt, fmd); if (!i) - vr_trap_flow(router, fe, pkt, index); + vr_trap_flow(router, fe, pkt, index, stats); return 0; drop: @@ -555,6 +703,74 @@ vr_flow_set_forwarding_md(struct vrouter *router, struct vr_flow_entry *fe, return; } +static bool +__vr_flow_mark_evict(struct vrouter *router, struct vr_flow_entry *fe) +{ + unsigned short flags; + + flags = fe->fe_flags; + if (flags & VR_FLOW_FLAG_ACTIVE) { + if (vr_flow_start_modify(router, fe)) { + flags = __sync_fetch_and_or(&fe->fe_flags, + VR_FLOW_FLAG_EVICT_CANDIDATE); + if (flags & VR_FLOW_FLAG_EVICT_CANDIDATE) { + goto unset_modified; + } else { + return true; + } + } + } + + return false; + +unset_modified: + vr_flow_stop_modify(router, fe); + return false; +} + +static void +vr_flow_mark_evict(struct vrouter *router, struct vr_flow_entry *fe, + unsigned int index) +{ + bool evict_forward_flow = true; + + struct vr_flow_entry *rfe = NULL; + + if (fe->fe_rflow >= 0) { + rfe = vr_get_flow_entry(router, fe->fe_rflow); + if (rfe && (rfe->fe_rflow == index)) { + if (rfe->fe_tcp_flags & VR_FLOW_TCP_DEAD) { + evict_forward_flow = __vr_flow_mark_evict(router, rfe); + } else { + evict_forward_flow = false; + } + } else { + rfe = NULL; + } + } + + if (evict_forward_flow) { + if (__vr_flow_mark_evict(router, fe)) { + if (__vr_flow_schedule_transition(router, fe, + index, fe->fe_flags) < 0) { + if (rfe) + vr_flow_reset_evict(router, rfe); + vr_flow_reset_evict(router, fe); + } + } else { + goto unset_modified; + } + } + + return; + +unset_modified: + if (rfe) + vr_flow_stop_modify(router, rfe); + + return; +} + static flow_result_t vr_flow_action(struct vrouter *router, struct vr_flow_entry *fe, unsigned int index, struct vr_packet *pkt, @@ -642,13 +858,17 @@ vr_flow_action(struct vrouter *router, struct vr_flow_entry *fe, break; } + if (fe->fe_tcp_flags & VR_FLOW_TCP_DEAD) + vr_flow_mark_evict(router, fe, index); + return result; } unsigned int vr_trap_flow(struct vrouter *router, struct vr_flow_entry *fe, - struct vr_packet *pkt, unsigned int index) + struct vr_packet *pkt, unsigned int index, + struct vr_flow_stats *stats) { unsigned int trap_reason; struct vr_packet *npkt; @@ -662,14 +882,32 @@ vr_trap_flow(struct vrouter *router, struct vr_flow_entry *fe, switch (fe->fe_flags & VR_FLOW_FLAG_TRAP_MASK) { default: - trap_reason = AGENT_TRAP_FLOW_MISS; + /* + * agent needs a method to identify new flows from existing flows. + * existing flows can be reused (evicted) or the action of such flows + * can become hold. If existing flows are reused and packet is trapped, + * agent will not re-evaluate the flow. Hence, agent has to be told + * that this is a new flow, which we indicate by the trap reason. + */ + if (fe->fe_flags & VR_FLOW_FLAG_NEW_FLOW) { + trap_reason = AGENT_TRAP_FLOW_MISS; + fe->fe_flags ^= VR_FLOW_FLAG_NEW_FLOW; + } else { + trap_reason = AGENT_TRAP_FLOW_ACTION_HOLD; + } + ta.vfta_index = index; if ((fe->fe_type == VP_TYPE_IP) || (fe->fe_type == VP_TYPE_IP6)) ta.vfta_nh_index = fe->fe_key.flow_nh_id; + if (stats) { + ta.vfta_stats = *stats; + } else { + ta.vfta_stats = fe->fe_stats; + } + break; } - return vr_trap(npkt, fe->fe_vrf, trap_reason, &ta); } @@ -679,6 +917,13 @@ vr_do_flow_action(struct vrouter *router, struct vr_flow_entry *fe, struct vr_forwarding_md *fmd) { uint32_t new_stats; + struct vr_flow_stats stats, *stats_p = NULL; + + if (fe->fe_flags & VR_FLOW_FLAG_NEW_FLOW) { + memcpy(&stats, &fe->fe_stats, sizeof(fe->fe_stats)); + memset(&fe->fe_stats, 0, sizeof(fe->fe_stats)); + stats_p = &stats; + } new_stats = __sync_add_and_fetch(&fe->fe_stats.flow_bytes, pkt_len(pkt)); if (new_stats < pkt_len(pkt)) @@ -689,7 +934,7 @@ vr_do_flow_action(struct vrouter *router, struct vr_flow_entry *fe, fe->fe_stats.flow_packets_oflow++; if (fe->fe_action == VR_FLOW_ACTION_HOLD) { - vr_enqueue_flow(router, fe, pkt, index, fmd); + vr_enqueue_flow(router, fe, pkt, index, stats_p, fmd); return FLOW_HELD; } @@ -753,25 +998,14 @@ static void vr_flow_init_close(struct vrouter *router, struct vr_flow_entry *flow_e, struct vr_packet *pkt, struct vr_forwarding_md *fmd) { - unsigned int flow_index; - unsigned int head_room = sizeof(struct agent_hdr) + sizeof(struct vr_eth); - - struct vr_packet *pkt_c; - - pkt_c = vr_pclone(pkt); - if (!pkt_c) - return; + struct vr_flow_entry *rfe; - vr_preset(pkt_c); - if (vr_pcow(pkt_c, head_room)) { - vr_pfree(pkt_c, VP_DROP_PCOW_FAIL); - return; + (void)__sync_fetch_and_or(&flow_e->fe_tcp_flags, VR_FLOW_TCP_DEAD); + rfe = vr_get_flow_entry(router, flow_e->fe_rflow); + if (rfe) { + (void)__sync_fetch_and_or(&rfe->fe_tcp_flags, VR_FLOW_TCP_DEAD); } - flow_index = fmd->fmd_flow_index; - vr_trap(pkt_c, fmd->fmd_dvrf, AGENT_TRAP_SESSION_CLOSE, - (void *)&flow_index); - return; } @@ -945,7 +1179,7 @@ vr_flow_lookup(struct vrouter *router, struct vr_flow *key, return FLOW_CONSUMED; } - flow_e = vr_find_free_entry(router, key, pkt->vp_type, + flow_e = vr_flow_get_free_entry(router, key, pkt->vp_type, true, &fe_index); if (!flow_e) { vr_pfree(pkt, VP_DROP_FLOW_TABLE_FULL); @@ -957,6 +1191,9 @@ vr_flow_lookup(struct vrouter *router, struct vr_flow *key, vr_flow_entry_set_hold(router, flow_e); } + if (flow_e->fe_flags & VR_FLOW_FLAG_EVICT_CANDIDATE) + return FLOW_DROP; + vr_flow_set_forwarding_md(router, flow_e, fe_index, fmd); vr_flow_tcp_digest(router, flow_e, pkt, fmd); @@ -1072,7 +1309,7 @@ vr_flow_flush_pnode(struct vrouter *router, struct vr_packet_node *pnode, } static void -vr_flush_flow_queue(struct vrouter *router, struct vr_flow_entry *fe, +__vr_flow_flush_hold_queue(struct vrouter *router, struct vr_flow_entry *fe, struct vr_forwarding_md *fmd, struct vr_flow_queue *vfq) { unsigned int i; @@ -1090,28 +1327,56 @@ static void vr_flush_entry(struct vrouter *router, struct vr_flow_entry *fe, struct vr_flow_md *flmd, struct vr_forwarding_md *fmd) { - struct vr_flow_queue *vfq; + bool swapped; - if (fe->fe_action == VR_FLOW_ACTION_HOLD) - return; + struct vr_flow_queue *vfq; + struct vr_defer_data *vdd = flmd->flmd_defer_data; + struct vr_flow_defer_data *vfdd; vfq = fe->fe_hold_list; - if (!vfq) - return; - fe->fe_hold_list = NULL; + if (vfq) { + if (fe->fe_action == VR_FLOW_ACTION_HOLD) + return; + + swapped = __sync_bool_compare_and_swap(&fe->fe_hold_list, vfq, NULL); + if (swapped) { + __vr_flow_flush_hold_queue(router, fe, fmd, vfq); + if (!vdd || !vdd->vdd_data) + goto free_flush_queue; - vr_flush_flow_queue(router, fe, fmd, vfq); - vr_flow_queue_free_defer(flmd, vfq); + vfdd = (struct vr_flow_defer_data *)vdd->vdd_data; + vfdd->vfdd_flow_queue = vfq; + } + } return; + +free_flush_queue: + if (vfq) + vr_free(vfq, VR_FLOW_QUEUE_OBJECT); + return; } static void -vr_flow_flush(void *arg) +__vr_flow_work(struct vrouter *router, struct vr_flow_entry *fe, + struct vr_flow_md *flmd) +{ + struct vr_forwarding_md fmd; + + vr_init_forwarding_md(&fmd); + vr_flow_set_forwarding_md(router, fe, flmd->flmd_index, &fmd); + vr_flush_entry(router, fe, flmd, &fmd); + + vr_flow_defer(flmd, fe); + return; +} + + +static void +vr_flow_work(void *arg) { struct vrouter *router; struct vr_flow_entry *fe; - struct vr_forwarding_md fmd; struct vr_flow_md *flmd = (struct vr_flow_md *)arg; @@ -1123,17 +1388,14 @@ vr_flow_flush(void *arg) if (!fe) goto exit_flush; - vr_init_forwarding_md(&fmd); - vr_flow_set_forwarding_md(router, fe, flmd->flmd_index, &fmd); - - vr_flush_entry(router, fe, flmd, &fmd); - - if (!(flmd->flmd_flags & VR_FLOW_FLAG_ACTIVE)) { - vr_reset_flow_entry(router, fe, flmd->flmd_index); - } + __vr_flow_work(router, fe, flmd); exit_flush: if (flmd->flmd_defer_data) { + if (flmd->flmd_defer_data->vdd_data) { + vr_free(flmd->flmd_defer_data->vdd_data, + VR_FLOW_DEFER_DATA_OBJECT); + } vr_put_defer_data(flmd->flmd_defer_data); flmd->flmd_defer_data = NULL; } @@ -1211,7 +1473,7 @@ vr_add_flow(unsigned int rid, struct vr_flow *key, uint8_t type, /* a race between agent and dp. allow agent to handle this error */ return NULL; } else { - flow_e = vr_find_free_entry(router, key, type, + flow_e = vr_flow_get_free_entry(router, key, type, need_hold_queue, fe_index); } @@ -1259,9 +1521,10 @@ vr_add_flow_req(vr_flow_req *req, unsigned int *fe_index) * agent), in which case we should be checking only the request */ static int -vr_flow_req_is_invalid(struct vrouter *router, vr_flow_req *req, +vr_flow_set_req_is_invalid(struct vrouter *router, vr_flow_req *req, struct vr_flow_entry *fe) { + int error = 0; struct vr_flow_entry *rfe; if (fe) { @@ -1272,34 +1535,44 @@ vr_flow_req_is_invalid(struct vrouter *router, vr_flow_req *req, (unsigned short)req->fr_flow_dport != fe->fe_key.flow_dport|| (unsigned short)req->fr_flow_nh_id != fe->fe_key.flow_nh_id || (unsigned char)req->fr_flow_proto != fe->fe_key.flow_proto) { - return -EBADF; + error = -EBADF; + goto invalid_req; } } } if (req->fr_flags & VR_FLOW_FLAG_VRFT) { - if ((unsigned short)req->fr_flow_dvrf >= router->vr_max_vrfs) - return -EINVAL; + if ((unsigned short)req->fr_flow_dvrf >= router->vr_max_vrfs) { + error = -EINVAL; + goto invalid_req; + } } if (req->fr_flags & VR_FLOW_FLAG_MIRROR) { if (((unsigned int)req->fr_mir_id >= router->vr_max_mirror_indices) && - (unsigned int)req->fr_sec_mir_id >= router->vr_max_mirror_indices) - return -EINVAL; + (unsigned int)req->fr_sec_mir_id >= router->vr_max_mirror_indices) { + error = -EINVAL; + goto invalid_req; + } } if (req->fr_flags & VR_RFLOW_VALID) { rfe = vr_get_flow_entry(router, req->fr_rindex); - if (!rfe) - return -EINVAL; + if (!rfe) { + error = -EINVAL; + goto invalid_req; + } } return 0; + +invalid_req: + return error; } static int -vr_flow_schedule_transition(struct vrouter *router, vr_flow_req *req, - struct vr_flow_entry *fe) +__vr_flow_schedule_transition(struct vrouter *router, struct vr_flow_entry *fe, + unsigned int index, unsigned short flags) { struct vr_flow_md *flmd; struct vr_defer_data *defer = NULL; @@ -1310,21 +1583,32 @@ vr_flow_schedule_transition(struct vrouter *router, vr_flow_req *req, return -ENOMEM; flmd->flmd_router = router; - flmd->flmd_index = req->fr_index; - flmd->flmd_flags = req->fr_flags; - if (fe->fe_hold_list) { + flmd->flmd_index = index; + flmd->flmd_flags = flags; + if (fe->fe_hold_list || (flags & VR_FLOW_FLAG_EVICT_CANDIDATE)) { defer = vr_get_defer_data(sizeof(*defer)); - if (!defer) { - vr_free(flmd, VR_FLOW_METADATA_OBJECT); - return -ENOMEM; + if (defer) { + defer->vdd_data = (void *)vr_zalloc(sizeof(struct vr_flow_defer_data), + VR_FLOW_DEFER_DATA_OBJECT); + if (!(flmd->flmd_flags & VR_FLOW_FLAG_ACTIVE)) { + ((struct vr_flow_defer_data *)defer->vdd_data)->vfdd_delete = + true; + } } } flmd->flmd_defer_data = defer; - vr_schedule_work(vr_get_cpu(), vr_flow_flush, (void *)flmd); + vr_schedule_work(vr_get_cpu(), vr_flow_work, (void *)flmd); return 0; } +static int +vr_flow_schedule_transition(struct vrouter *router, vr_flow_req *req, + struct vr_flow_entry *fe) +{ + return __vr_flow_schedule_transition(router, fe, req->fr_index, req->fr_flags); +} + static int vr_flow_delete(struct vrouter *router, vr_flow_req *req, struct vr_flow_entry *fe) @@ -1378,7 +1662,7 @@ vr_flow_set(struct vrouter *router, vr_flow_req *req) { int ret; unsigned int fe_index; - bool new_flow = false; + bool new_flow = false, modified = false; struct vr_flow_entry *fe = NULL, *rfe = NULL; struct vr_flow_table_info *infop = router->vr_flow_table_info; @@ -1388,14 +1672,21 @@ vr_flow_set(struct vrouter *router, vr_flow_req *req) return -EINVAL; fe = vr_get_flow_entry(router, req->fr_index); + if (fe) { + if (!(modified = vr_flow_start_modify(router, fe))) + return -EBUSY; + } - if ((ret = vr_flow_req_is_invalid(router, req, fe))) - return ret; + if ((ret = vr_flow_set_req_is_invalid(router, req, fe))) + goto exit_set; - if (fe && (fe->fe_action == VR_FLOW_ACTION_HOLD) && + if (fe) { + if ((fe->fe_action == VR_FLOW_ACTION_HOLD) && ((req->fr_action != fe->fe_action) || - !(req->fr_flags & VR_FLOW_FLAG_ACTIVE))) - __sync_fetch_and_add(&infop->vfti_action_count, 1); + !(req->fr_flags & VR_FLOW_FLAG_ACTIVE))) { + __sync_fetch_and_add(&infop->vfti_action_count, 1); + } + } /* * for delete, absence of the requested flow entry is caustic. so * handle that case first @@ -1424,8 +1715,10 @@ vr_flow_set(struct vrouter *router, vr_flow_req *req) if (!fe->fe_hold_list) { fe->fe_hold_list = vr_zalloc(sizeof(struct vr_flow_queue), VR_FLOW_QUEUE_OBJECT); - if (!fe->fe_hold_list) - return -ENOMEM; + if (!fe->fe_hold_list) { + ret = -ENOMEM; + goto exit_set; + } } } } @@ -1470,7 +1763,7 @@ vr_flow_set(struct vrouter *router, vr_flow_req *req) if (fe->fe_action == VR_FLOW_ACTION_DROP) fe->fe_drop_reason = (uint8_t)req->fr_drop_reason; - fe->fe_flags = req->fr_flags; + fe->fe_flags = VR_FLOW_FLAG_MASK(req->fr_flags); if (new_flow && (fe->fe_flags & VR_RFLOW_VALID)) { rfe = vr_get_flow_entry(router, fe->fe_rflow); if (rfe) { @@ -1483,7 +1776,14 @@ vr_flow_set(struct vrouter *router, vr_flow_req *req) vr_flow_udp_src_port(router, fe); - return vr_flow_schedule_transition(router, req, fe); + ret = vr_flow_schedule_transition(router, req, fe); + +exit_set: + if (modified && fe) { + vr_flow_stop_modify(router, fe); + } + + return ret; } static void diff --git a/dp-core/vr_interface.c b/dp-core/vr_interface.c index 647f99c05..0ed36f5d9 100644 --- a/dp-core/vr_interface.c +++ b/dp-core/vr_interface.c @@ -377,6 +377,7 @@ agent_trap_may_truncate(int trap_reason) case AGENT_TRAP_NEXTHOP: case AGENT_TRAP_RESOLVE: case AGENT_TRAP_FLOW_MISS: + case AGENT_TRAP_FLOW_ACTION_HOLD: case AGENT_TRAP_ECMP_RESOLVE: case AGENT_TRAP_HANDLE_DF: case AGENT_TRAP_ZERO_TTL: @@ -473,16 +474,20 @@ agent_send(struct vr_interface *vif, struct vr_packet *pkt, switch (params->trap_reason) { case AGENT_TRAP_FLOW_MISS: + case AGENT_TRAP_FLOW_ACTION_HOLD: if (params->trap_param) { fta = (struct vr_flow_trap_arg *)(params->trap_param); hdr->hdr_cmd_param = htonl(fta->vfta_index); hdr->hdr_cmd_param_1 = htonl(fta->vfta_nh_index); + hdr->hdr_cmd_param_2 = htonl(fta->vfta_stats.flow_bytes); + hdr->hdr_cmd_param_3 = htonl(fta->vfta_stats.flow_packets); + hdr->hdr_cmd_param_4 = htonl((fta->vfta_stats.flow_bytes_oflow | + (fta->vfta_stats.flow_packets_oflow << 16))); } break; case AGENT_TRAP_ECMP_RESOLVE: case AGENT_TRAP_SOURCE_MISMATCH: - case AGENT_TRAP_SESSION_CLOSE: if (params->trap_param) hdr->hdr_cmd_param = htonl(*(unsigned int *)(params->trap_param)); break; diff --git a/dpdk/vr_dpdk_host.c b/dpdk/vr_dpdk_host.c index 685533d81..113a30327 100644 --- a/dpdk/vr_dpdk_host.c +++ b/dpdk/vr_dpdk_host.c @@ -41,7 +41,7 @@ static bool vr_host_inited = false; extern void vr_malloc_stats(unsigned int, unsigned int); extern void vr_free_stats(unsigned int); /* RCU callback */ -extern void vr_flow_queue_free(struct vrouter *router, void *arg); +extern void vr_flow_defer_cb(struct vrouter *router, void *arg); static void * @@ -517,25 +517,29 @@ dpdk_rcu_cb(struct rcu_head *rh) struct vr_flow_queue *vfq; struct vr_packet_node *pnode; + cb_data = CONTAINER_OF(rcd_rcu, struct vr_dpdk_rcu_cb_data, rh); /* check if we need to pass the callback to packet lcore */ - if (cb_data->rcd_user_cb == vr_flow_queue_free - && cb_data->rcd_user_data) { + if ((cb_data->rcd_user_cb == vr_flow_defer_cb) && + cb_data->rcd_user_data) { defer = (struct vr_defer_data *)cb_data->rcd_user_data; - vfq = (struct vr_flow_queue *)defer->vdd_data; - for (i = 0; i < VR_MAX_FLOW_QUEUE_ENTRIES; i++) { - pnode = &vfq->vfq_pnodes[i]; - if (pnode->pl_packet) { - RTE_LOG(DEBUG, VROUTER, "%s: lcore %u passing RCU callback to lcore %u\n", - __func__, rte_lcore_id(), VR_DPDK_PACKET_LCORE_ID); - vr_dpdk_lcore_cmd_post(VR_DPDK_PACKET_LCORE_ID, - VR_DPDK_LCORE_RCU_CMD, (uintptr_t)rh); - return; + vfq = ((struct vr_flow_defer_data *)defer->vdd_data)->vfdd_flow_queue; + if (vfq) { + for (i = 0; i < VR_MAX_FLOW_QUEUE_ENTRIES; i++) { + pnode = &vfq->vfq_pnodes[i]; + if (pnode->pl_packet) { + RTE_LOG(DEBUG, VROUTER, "%s: lcore %u passing RCU callback " + "to lcore %u\n", __func__, rte_lcore_id(), + VR_DPDK_PACKET_LCORE_ID); + vr_dpdk_lcore_cmd_post(VR_DPDK_PACKET_LCORE_ID, + VR_DPDK_LCORE_RCU_CMD, (uintptr_t)rh); + return; + } } + RTE_LOG(DEBUG, VROUTER, "%s: lcore %u passing RCU callback to lcore %u\n", + __func__, rte_lcore_id(), VR_DPDK_PACKET_LCORE_ID); } - RTE_LOG(DEBUG, VROUTER, "%s: lcore %u just calling RCU callback\n", - __func__, rte_lcore_id()); } /* no need to send any packets, so just call the callback */ cb_data->rcd_user_cb(cb_data->rcd_router, cb_data->rcd_user_data); diff --git a/include/vr_defs.h b/include/vr_defs.h index e3ba8b816..61467f4f8 100644 --- a/include/vr_defs.h +++ b/include/vr_defs.h @@ -34,7 +34,7 @@ #define AGENT_TRAP_ZERO_TTL 12 #define AGENT_TRAP_ICMP_ERROR 13 #define AGENT_TRAP_TOR_CONTROL_PKT 14 -#define AGENT_TRAP_SESSION_CLOSE 15 +#define AGENT_TRAP_FLOW_ACTION_HOLD 15 #define MAX_AGENT_HDR_COMMANDS 16 enum rt_type{ @@ -60,6 +60,9 @@ struct agent_hdr { unsigned short hdr_cmd; unsigned int hdr_cmd_param; unsigned int hdr_cmd_param_1; + unsigned int hdr_cmd_param_2; + unsigned int hdr_cmd_param_3; + unsigned int hdr_cmd_param_4; } __attribute__((packed)); #define CMD_PARAM_PACKET_CTRL 0x1 diff --git a/include/vr_flow.h b/include/vr_flow.h index 34b5eb246..34b3307d4 100644 --- a/include/vr_flow.h +++ b/include/vr_flow.h @@ -21,12 +21,18 @@ typedef enum { FLOW_CONSUMED, } flow_result_t; -#define VR_FLOW_FLAG_ACTIVE 0x1 -#define VR_RFLOW_VALID 0x1000 -#define VR_FLOW_FLAG_MIRROR 0x2000 -#define VR_FLOW_FLAG_VRFT 0x4000 -#define VR_FLOW_FLAG_LINK_LOCAL 0x8000 - +#define VR_FLOW_FLAG_ACTIVE 0x0001 +#define VR_FLOW_FLAG_MODIFIED 0x0100 +#define VR_FLOW_FLAG_NEW_FLOW 0x0200 +#define VR_FLOW_FLAG_EVICT_CANDIDATE 0x0400 +#define VR_FLOW_FLAG_EVICTED 0x0800 +#define VR_RFLOW_VALID 0x1000 +#define VR_FLOW_FLAG_MIRROR 0x2000 +#define VR_FLOW_FLAG_VRFT 0x4000 +#define VR_FLOW_FLAG_LINK_LOCAL 0x8000 + +#define VR_FLOW_FLAG_MASK(flag) ((flag) & ~(VR_FLOW_FLAG_EVICT_CANDIDATE |\ + VR_FLOW_FLAG_EVICTED | VR_FLOW_FLAG_NEW_FLOW)) /* rest of the flags are action specific */ /* for NAT */ @@ -69,6 +75,13 @@ typedef enum { : AF_INET) struct vr_forwarding_md; +struct vr_flow_defer_data { + struct vr_flow_queue *vfdd_flow_queue; + struct vr_flow_entry *vfdd_fe; + unsigned int vfdd_fe_index; + bool vfdd_delete; +}; + struct vr_common_flow{ unsigned char ip_family; unsigned char ip_proto; @@ -215,6 +228,46 @@ struct vr_flow_queue { struct vr_packet_node vfq_pnodes[VR_MAX_FLOW_QUEUE_ENTRIES]; }; +/* + * Flow eviction: + * 1. Requirement + * -------------- + * + * Inactive TCP flows (flows that have already seen the closure cycle - FIN/ACK + * or the RESET flags) should additionally be considered as a free flow entry + * so that vRouter does not have to wait for agent's aging cycle to accommodate + * new flows under severe occupancy and provide better service. + * + * 2. Problems in datapath initiated flow closure + * ---------------------------------------------- + * + * . Simultaneous discovery of the same flow entry by two different CPUs + * . Simultaneous closure of an entry by both agent as well as from datapath + * . Handling of packets held in the flow entry when the entry moves from hold to + * closed state + * + * 3. Implementation + * ----------------- + * + * 3.1 Marking + * ----------- + * + * Once the TCP state machine determines that a flow can be closed, it updates + * the tcp flags with a new flag VR_FLOW_TCP_DEAD, since determining whether a + * tcp flow has seen its end with only the existing TCP flags is a bit more + * involved. The last packet before exiting the module, marks the flow as a an + * eviction candidate (VR_FLOW_FLAG_EVICT_CANDIDATE). + * + * 3.2 Allocation/Eviction + * ----------------------- + * + * Once the last packet exits the flow module, a work is scheduled to mark the + * flow as inactive. This work will schedule and RCU call back to mark the entry + * as inactive (this is the same flow for deletion of flow from agent). While + * deleting the entry, the evicted flow will also be marked as evicted (VR_FLOW_ + * FLAG_EVICTED). + * + */ #define VR_FLOW_TCP_FIN 0x0001 #define VR_FLOW_TCP_HALF_CLOSE 0x0002 #define VR_FLOW_TCP_FIN_R 0x0004 @@ -223,12 +276,14 @@ struct vr_flow_queue { #define VR_FLOW_TCP_ESTABLISHED 0x0020 #define VR_FLOW_TCP_ESTABLISHED_R 0x0040 #define VR_FLOW_TCP_RST 0x0080 +#define VR_FLOW_TCP_DEAD 0x8000 /* align to 8 byte boundary */ #define VR_FLOW_KEY_PAD ((8 - (sizeof(struct vr_flow) % 8)) % 8) struct vr_dummy_flow_entry { struct vr_flow fe_key; + uint8_t fe_key_packing; uint16_t fe_tcp_flags; unsigned int fe_tcp_seq; struct vr_flow_queue *fe_hold_list; @@ -252,6 +307,7 @@ struct vr_dummy_flow_entry { /* do not change. any field positions as it might lead to incompatibility */ struct vr_flow_entry { struct vr_flow fe_key; + uint8_t fe_key_packing; uint16_t fe_tcp_flags; unsigned int fe_tcp_seq; struct vr_flow_queue *fe_hold_list; @@ -305,6 +361,7 @@ struct vr_flow_md { struct vr_flow_trap_arg { unsigned int vfta_index; unsigned int vfta_nh_index; + struct vr_flow_stats vfta_stats; }; struct vr_packet; diff --git a/include/vrouter.h b/include/vrouter.h index 90b4f787b..33ab07f48 100644 --- a/include/vrouter.h +++ b/include/vrouter.h @@ -55,6 +55,7 @@ enum vr_malloc_objects_t { VR_FLOW_LINK_LOCAL_OBJECT, VR_FLOW_METADATA_OBJECT, VR_FLOW_TABLE_INFO_OBJECT, + VR_FLOW_DEFER_DATA_OBJECT, VR_FRAGMENT_OBJECT, VR_FRAGMENT_QUEUE_OBJECT, VR_FRAGMENT_QUEUE_ELEMENT_OBJECT, diff --git a/utils/dropstats.c b/utils/dropstats.c index b9de1bb0b..cc1c99a92 100644 --- a/utils/dropstats.c +++ b/utils/dropstats.c @@ -22,6 +22,7 @@ #include "vr_os.h" #include "vr_types.h" #include "vr_nexthop.h" +#include "ini_parser.h" #include "nl_util.h" #include "ini_parser.h" diff --git a/utils/flow.c b/utils/flow.c index 3469ebad1..51268d5ec 100644 --- a/utils/flow.c +++ b/utils/flow.c @@ -137,7 +137,8 @@ dump_legend(void) printf("L=Link Local Port)\n"); printf(" Other:K(nh)=Key_Nexthop, S(nh)=RPF_Nexthop\n"); - printf(" TCP(r=reverse):S=SYN, F=FIN, R=RST, C=HalfClose, E=Established\n"); + printf(" Flags:E=Evicted, Ec=Evict Candidate, N=New Flow, M=Modified\n"); + printf("TCP(r=reverse):S=SYN, F=FIN, R=RST, C=HalfClose, E=Established, D=Dead\n"); printf("\n"); return; @@ -284,6 +285,17 @@ dump_table(struct flow_table *ft) printf("(%u)", fe->fe_drop_reason); } + printf(", "); + printf("Flags:"); + if (fe->fe_flags & VR_FLOW_FLAG_EVICTED) + printf("E"); + if (fe->fe_flags & VR_FLOW_FLAG_EVICT_CANDIDATE) + printf("Ec"); + if (fe->fe_flags & VR_FLOW_FLAG_NEW_FLOW) + printf("N"); + if (fe->fe_flags & VR_FLOW_FLAG_MODIFIED) + printf("M"); + printf(", "); if (fe->fe_key.flow4_proto == VR_IP_PROTO_TCP) { printf("TCP:"); @@ -305,6 +317,8 @@ dump_table(struct flow_table *ft) printf("R"); if (fe->fe_tcp_flags & VR_FLOW_TCP_HALF_CLOSE) printf("C"); + if (fe->fe_tcp_flags & VR_FLOW_TCP_DEAD) + printf("D"); printf(", "); }