From 4f7b242354bcef1a3b63b86be0c6abac7db6961e Mon Sep 17 00:00:00 2001 From: Sundaresan Rajangam Date: Tue, 8 Sep 2015 08:54:17 -0700 Subject: [PATCH] Handle list of flow samples in Flow sandesh Presently, vrouter-agent sends a single flow sample in the flow sandesh message. This patch adds support for handling list of flow samples in flow sandesh in Collector. Collector will continue to accept/process single flow sample in the flow sandesh for backward compatibility. Change-Id: I0e1db167d70265006222938717c1c8518f491a8c Closes-Bug: #1493436 (cherry picked from commit be3fd49cb6aa636cb5b9538a7eb81e864ece8a7c) --- src/analytics/db_handler.cc | 38 ++- src/analytics/db_handler.h | 2 + src/analytics/test/db_handler_test.cc | 445 +++++++++++++++----------- 3 files changed, 299 insertions(+), 186 deletions(-) diff --git a/src/analytics/db_handler.cc b/src/analytics/db_handler.cc index f5cc90ddf93..ceb340f13a7 100644 --- a/src/analytics/db_handler.cc +++ b/src/analytics/db_handler.cc @@ -1205,14 +1205,14 @@ bool FlowDataIpv4ObjectWalker::for_each(pugi::xml_node& node) { } /* - * process the flow message and insert into appropriate tables + * process the flow sample and insert into the appropriate tables */ -bool DbHandler::FlowTableInsert(const pugi::xml_node &parent, - const SandeshHeader& header) { +bool DbHandler::FlowSampleAdd(const pugi::xml_node& flow_sample, + const SandeshHeader& header) { // Traverse and populate the flow entry values FlowValueArray flow_entry_values; FlowDataIpv4ObjectWalker flow_msg_walker(flow_entry_values); - pugi::xml_node &mnode = const_cast(parent); + pugi::xml_node &mnode = const_cast(flow_sample); if (!mnode.traverse(flow_msg_walker)) { VIZD_ASSERT(0); } @@ -1227,25 +1227,27 @@ bool DbHandler::FlowTableInsert(const pugi::xml_node &parent, flow_entry_values[FlowRecordFields::FLOWREC_TEARDOWN_TIME]); if (setup_time.which() != GenDb::DB_VALUE_BLANK && teardown_time.which() != GenDb::DB_VALUE_BLANK) { - flow_entry_values[FlowRecordFields::FLOWREC_SHORT_FLOW] = static_cast(1); + flow_entry_values[FlowRecordFields::FLOWREC_SHORT_FLOW] = + static_cast(1); } else { - flow_entry_values[FlowRecordFields::FLOWREC_SHORT_FLOW] = static_cast(0); + flow_entry_values[FlowRecordFields::FLOWREC_SHORT_FLOW] = + static_cast(0); } // Calculate T1 and T2 values from timestamp uint64_t timestamp(header.get_Timestamp()); uint32_t T2(timestamp >> g_viz_constants.RowTimeInBits); uint32_t T1(timestamp & g_viz_constants.RowTimeInMask); - // Parittion no + // Partition no uint8_t partition_no = 0; // Populate Flow Record Table if (!PopulateFlowRecordTable(flow_entry_values, dbif_.get(), ttl_map_)) { DB_LOG(ERROR, "Populating FlowRecordTable FAILED"); } - // Populate Flow Index Tables only if FLOWREC_DIFF_BYTES and GenDb::DbDataValue &diff_bytes( flow_entry_values[FlowRecordFields::FLOWREC_DIFF_BYTES]); GenDb::DbDataValue &diff_packets( flow_entry_values[FlowRecordFields::FLOWREC_DIFF_PACKETS]); + // Populate Flow Index Tables only if FLOWREC_DIFF_BYTES and // FLOWREC_DIFF_PACKETS are present if (diff_bytes.which() != GenDb::DB_VALUE_BLANK && diff_packets.which() != GenDb::DB_VALUE_BLANK) { @@ -1257,6 +1259,26 @@ bool DbHandler::FlowTableInsert(const pugi::xml_node &parent, return true; } +/* + * process the flow sandesh message + */ +bool DbHandler::FlowTableInsert(const pugi::xml_node &parent, + const SandeshHeader& header) { + pugi::xml_node flowdata(parent.child("flowdata")); + // Flow sandesh message may contain a list of flow samples or + // a single flow sample + if (strcmp(flowdata.attribute("type").value(), "list") == 0) { + pugi::xml_node flow_list = flowdata.child("list"); + for (pugi::xml_node fsample = flow_list.first_child(); fsample; + fsample = fsample.next_sibling()) { + FlowSampleAdd(fsample, header); + } + } else { + FlowSampleAdd(flowdata.first_child(), header); + } + return true; +} + bool DbHandler::UnderlayFlowSampleInsert(const UFlowData& flow_data, uint64_t timestamp) { const std::vector& flow = flow_data.get_flow(); diff --git a/src/analytics/db_handler.h b/src/analytics/db_handler.h index 07326de739d..641c49d2d83 100644 --- a/src/analytics/db_handler.h +++ b/src/analytics/db_handler.h @@ -163,6 +163,8 @@ class DbHandler { const std::pair& stag, uint32_t t1, const boost::uuids::uuid& unm, const std::string& jsonline, int ttl); + bool FlowSampleAdd(const pugi::xml_node& flowdata, + const SandeshHeader& header); int GetTtl(TtlType type) { return GetTtlFromMap(ttl_map_, type); } diff --git a/src/analytics/test/db_handler_test.cc b/src/analytics/test/db_handler_test.cc index b887f0abb55..77a3fd958f6 100644 --- a/src/analytics/test/db_handler_test.cc +++ b/src/analytics/test/db_handler_test.cc @@ -17,6 +17,7 @@ #include "../db_handler.h" #include "cdb_if_mock.h" #include "../vizd_table_desc.h" +#include "sandesh/common/flow_types.h" using ::testing::Return; using ::testing::Field; @@ -492,192 +493,280 @@ TEST_F(DbHandlerTest, FlowTableInsertTest) { hdr.set_Module("VizdTest"); hdr.set_Source("127.0.0.1"); std::string messagetype(""); - std::string xmlmessage = "555788e0-513c-4351-8711-3fc481cf2eb40default-domain:demo:vn1-1062731011default-domain:demo:vn0-106273126765201-2459004430130-664a-4b89-9287-39d71f35120758745ee7-d616-4e59-b8f7-96f896487c9f0000"; - - SandeshXMLMessageTest *msg = dynamic_cast( - builder_->Create( - reinterpret_cast(xmlmessage.c_str()), - xmlmessage.size())); - msg->SetHeader(hdr); - - std::string flowu_str = "555788e0-513c-4351-8711-3fc481cf2eb4"; - boost::uuids::uuid flowu = StringToUuid(flowu_str); - - { - GenDb::DbDataValueVec rowkey; - rowkey.push_back(flowu); - - EXPECT_CALL(*dbif_mock(), - Db_AddColumnProxy( - Pointee( - AllOf(Field(&GenDb::ColList::cfname_, g_viz_constants.FLOW_TABLE), - Field(&GenDb::ColList::rowkey_, rowkey))))) - .Times(1) - .WillOnce(Return(true)); - } - - GenDb::DbDataValueVec ocolvalue; - ocolvalue.push_back((uint64_t)0); //bytes - ocolvalue.push_back((uint64_t)0); //pkts - ocolvalue.push_back((uint8_t)0); //dir - ocolvalue.push_back(flowu); //flowuuid - ocolvalue.push_back(hdr.get_Source()); //vrouter - ocolvalue.push_back("default-domain:demo:vn1"); //svn - ocolvalue.push_back("default-domain:demo:vn0"); //dvn - ocolvalue.push_back((uint32_t)-1062731011); //sip - ocolvalue.push_back((uint32_t)-1062731267); //dip - ocolvalue.push_back((uint8_t)6); //prot - ocolvalue.push_back((uint16_t)5201); //sport - ocolvalue.push_back((uint16_t)-24590); //dport - ocolvalue.push_back(""); //json - - int ttl = ttl_map.find(DbHandler::FLOWDATA_TTL)->second; - - { - GenDb::DbDataValueVec *colname(new GenDb::DbDataValueVec); - colname->reserve(4); - colname->push_back("default-domain:demo:vn1"); - colname->push_back((uint32_t)-1062731011); - colname->push_back((uint32_t)(hdr.get_Timestamp() & g_viz_constants.RowTimeInMask)); - colname->push_back(flowu); - GenDb::DbDataValueVec *colvalue(new GenDb::DbDataValueVec(ocolvalue)); - boost::ptr_vector expected_vector = - boost::assign::ptr_list_of - (GenDb::NewCol(colname, colvalue, ttl)); - - GenDb::DbDataValueVec rowkey; - rowkey.push_back((uint32_t)(hdr.get_Timestamp() >> g_viz_constants.RowTimeInBits)); - uint8_t partition_no = 0; - rowkey.push_back(partition_no); - rowkey.push_back((uint8_t)0); //direction - - EXPECT_CALL(*dbif_mock(), - Db_AddColumnProxy( - Pointee( - AllOf(Field(&GenDb::ColList::cfname_, g_viz_constants.FLOW_TABLE_SVN_SIP), - Field(&GenDb::ColList::rowkey_, rowkey), - Field(&GenDb::ColList::columns_, - expected_vector))))) - .Times(1) - .WillOnce(Return(true)); - } - { - GenDb::DbDataValueVec *colname(new GenDb::DbDataValueVec); - colname->reserve(4); - colname->push_back("default-domain:demo:vn0"); - colname->push_back((uint32_t)-1062731267); - colname->push_back((uint32_t)(hdr.get_Timestamp() & g_viz_constants.RowTimeInMask)); - colname->push_back(flowu); - GenDb::DbDataValueVec *colvalue(new GenDb::DbDataValueVec(ocolvalue)); - boost::ptr_vector expected_vector = - boost::assign::ptr_list_of - (GenDb::NewCol(colname, colvalue, ttl)); - - GenDb::DbDataValueVec rowkey; - rowkey.push_back((uint32_t)(hdr.get_Timestamp() >> g_viz_constants.RowTimeInBits)); - uint8_t partition_no = 0; - rowkey.push_back(partition_no); - rowkey.push_back((uint8_t)0); //direction - - EXPECT_CALL(*dbif_mock(), - Db_AddColumnProxy( - Pointee( - AllOf(Field(&GenDb::ColList::cfname_, g_viz_constants.FLOW_TABLE_DVN_DIP), - Field(&GenDb::ColList::rowkey_, rowkey), - Field(&GenDb::ColList::columns_, - expected_vector))))) - .Times(1) - .WillOnce(Return(true)); - } - { - GenDb::DbDataValueVec *colname(new GenDb::DbDataValueVec); - colname->reserve(4); - colname->push_back((uint8_t)6); - colname->push_back((uint16_t)5201); - colname->push_back((uint32_t)(hdr.get_Timestamp() & g_viz_constants.RowTimeInMask)); - colname->push_back(flowu); - GenDb::DbDataValueVec *colvalue(new GenDb::DbDataValueVec(ocolvalue)); - boost::ptr_vector expected_vector = - boost::assign::ptr_list_of - (GenDb::NewCol(colname, colvalue, ttl)); - - GenDb::DbDataValueVec rowkey; - rowkey.push_back((uint32_t)(hdr.get_Timestamp() >> g_viz_constants.RowTimeInBits)); - uint8_t partition_no = 0; - rowkey.push_back(partition_no); - rowkey.push_back((uint8_t)0); //direction + std::vector > > flow_msgs; + + // Flow sandesh with single flow sample + { + std::string xmlmessage = "555788e0-513c-4351-8711-3fc481cf2eb40default-domain:demo:vn1-1062731011default-domain:demo:vn0-106273126765201-2459004430130-664a-4b89-9287-39d71f35120758745ee7-d616-4e59-b8f7-96f896487c9f0000"; + + std::vector flowdata_list; + FlowDataIpv4 flow_data1; + flow_data1.set_flowuuid("555788e0-513c-4351-8711-3fc481cf2eb4"); + flow_data1.set_direction_ing(0); + flow_data1.set_sourcevn("default-domain:demo:vn1"); + flow_data1.set_sourceip(-1062731011); + flow_data1.set_destvn("default-domain:demo:vn0"); + flow_data1.set_destip(-1062731267); + flow_data1.set_protocol(6); + flow_data1.set_sport(5201); + flow_data1.set_dport(-24590); + flow_data1.set_diff_bytes(0); + flow_data1.set_diff_packets(0); + flowdata_list.push_back(flow_data1); + flow_msgs.push_back(std::make_pair(xmlmessage, flowdata_list)); + } - EXPECT_CALL(*dbif_mock(), - Db_AddColumnProxy( - Pointee( - AllOf(Field(&GenDb::ColList::cfname_, g_viz_constants.FLOW_TABLE_PROT_SP), - Field(&GenDb::ColList::rowkey_, rowkey), - Field(&GenDb::ColList::columns_, - expected_vector))))) - .Times(1) - .WillOnce(Return(true)); - } + // Flow sandesh with list of flow samples + { + std::string xmlmessage = "511788e0-513f-4351-8711-3fc481cf2efa1default-domain:contrail:vn0168430081default-domain:contrail:vn1-10627312631712345808704430130-6641-1b89-9287-39d71f35120658745ee6-d616-4e59-b8f7-96e896587c9f102442561525538ef-513f-435f-871f-3fc482cf2ebf0default-domain:demo:vn0-1062731011default-domain:contrail:vn0168430082611221808604430130-664a-4b89-3456-39d71f35120758745ee3-d613-4e53-b8f3-96f896487c9351225122"; + + std::vector flowdata_list; + FlowDataIpv4 flow_data1; + flow_data1.set_flowuuid("511788e0-513f-4351-8711-3fc481cf2efa"); + flow_data1.set_direction_ing(1); + flow_data1.set_sourcevn("default-domain:contrail:vn0"); + flow_data1.set_sourceip(168430081); + flow_data1.set_destvn("default-domain:contrail:vn1"); + flow_data1.set_destip(-1062731263); + flow_data1.set_protocol(17); + flow_data1.set_sport(12345); + flow_data1.set_dport(8087); + flow_data1.set_diff_bytes(256); + flow_data1.set_diff_packets(1); + flowdata_list.push_back(flow_data1); + + FlowDataIpv4 flow_data2; + flow_data2.set_flowuuid("525538ef-513f-435f-871f-3fc482cf2ebf"); + flow_data2.set_direction_ing(0); + flow_data2.set_sourcevn("default-domain:demo:vn0"); + flow_data2.set_sourceip(-1062731011); + flow_data2.set_destvn("default-domain:contrail:vn0"); + flow_data2.set_destip(168430082); + flow_data2.set_protocol(6); + flow_data2.set_sport(11221); + flow_data2.set_dport(8086); + flow_data2.set_diff_bytes(512); + flow_data2.set_diff_packets(2); + flowdata_list.push_back(flow_data2); + flow_msgs.push_back(std::make_pair(xmlmessage, flowdata_list)); + } - { - GenDb::DbDataValueVec *colname(new GenDb::DbDataValueVec); - colname->reserve(4); - colname->push_back((uint8_t)6); - colname->push_back((uint16_t)-24590); - colname->push_back((uint32_t)(hdr.get_Timestamp() & g_viz_constants.RowTimeInMask)); - colname->push_back(flowu); - GenDb::DbDataValueVec *colvalue(new GenDb::DbDataValueVec(ocolvalue)); - boost::ptr_vector expected_vector = - boost::assign::ptr_list_of - (GenDb::NewCol(colname, colvalue, ttl)); + std::vector > >:: + const_iterator fit; + for (fit = flow_msgs.begin(); fit != flow_msgs.end(); fit++) { + std::auto_ptr msg( + dynamic_cast( + builder_->Create(reinterpret_cast( + fit->first.c_str()), fit->first.size()))); + msg->SetHeader(hdr); + std::vector::const_iterator dit; + for (dit = fit->second.begin(); dit != fit->second.end(); dit++) { + boost::uuids::uuid flowu = StringToUuid(dit->get_flowuuid()); + + // set expectations for FLOW_TABLE + { + GenDb::DbDataValueVec rowkey; + rowkey.push_back(flowu); + + EXPECT_CALL(*dbif_mock(), + Db_AddColumnProxy( + Pointee( + AllOf(Field(&GenDb::ColList::cfname_, + g_viz_constants.FLOW_TABLE), + Field(&GenDb::ColList::rowkey_, rowkey))))) + .Times(1) + .WillOnce(Return(true)); + } - GenDb::DbDataValueVec rowkey; - rowkey.push_back((uint32_t)(hdr.get_Timestamp() >> g_viz_constants.RowTimeInBits)); - uint8_t partition_no = 0; - rowkey.push_back(partition_no); - rowkey.push_back((uint8_t)0); //direction + int ttl = ttl_map.find(DbHandler::FLOWDATA_TTL)->second; + GenDb::DbDataValueVec ocolvalue; + ocolvalue.push_back((uint64_t)dit->get_diff_bytes()); + ocolvalue.push_back((uint64_t)dit->get_diff_packets()); + ocolvalue.push_back((uint8_t)0); // short flow + ocolvalue.push_back(flowu); + ocolvalue.push_back(hdr.get_Source()); // vrouter + ocolvalue.push_back(dit->get_sourcevn()); + ocolvalue.push_back(dit->get_destvn()); + ocolvalue.push_back((uint32_t)dit->get_sourceip()); + ocolvalue.push_back((uint32_t)dit->get_destip()); + ocolvalue.push_back((uint8_t)dit->get_protocol()); + ocolvalue.push_back((uint16_t)dit->get_sport()); + ocolvalue.push_back((uint16_t)dit->get_dport()); + ocolvalue.push_back(""); // json + + // set expectations for FLOW_TABLE_SVN_SIP + { + GenDb::DbDataValueVec *colname(new GenDb::DbDataValueVec); + colname->reserve(4); + colname->push_back(dit->get_sourcevn()); + colname->push_back((uint32_t)dit->get_sourceip()); + colname->push_back((uint32_t)(hdr.get_Timestamp() & + g_viz_constants.RowTimeInMask)); + colname->push_back(flowu); + GenDb::DbDataValueVec *colvalue( + new GenDb::DbDataValueVec(ocolvalue)); + boost::ptr_vector expected_vector = + boost::assign::ptr_list_of + (GenDb::NewCol(colname, colvalue, ttl)); + + GenDb::DbDataValueVec rowkey; + rowkey.push_back((uint32_t)(hdr.get_Timestamp() >> + g_viz_constants.RowTimeInBits)); + uint8_t partition_no = 0; + rowkey.push_back(partition_no); + rowkey.push_back((uint8_t)dit->get_direction_ing()); + + EXPECT_CALL(*dbif_mock(), + Db_AddColumnProxy( + Pointee( + AllOf(Field(&GenDb::ColList::cfname_, + g_viz_constants.FLOW_TABLE_SVN_SIP), + Field(&GenDb::ColList::rowkey_, rowkey), + Field(&GenDb::ColList::columns_, + expected_vector))))) + .Times(1) + .WillOnce(Return(true)); + } - EXPECT_CALL(*dbif_mock(), - Db_AddColumnProxy( - Pointee( - AllOf(Field(&GenDb::ColList::cfname_, g_viz_constants.FLOW_TABLE_PROT_DP), - Field(&GenDb::ColList::rowkey_, rowkey), - Field(&GenDb::ColList::columns_, - expected_vector))))) - .Times(1) - .WillOnce(Return(true)); - } + // set expectations for FLOW_TABLE_DVN_DIP + { + GenDb::DbDataValueVec *colname(new GenDb::DbDataValueVec); + colname->reserve(4); + colname->push_back(dit->get_destvn()); + colname->push_back((uint32_t)dit->get_destip()); + colname->push_back((uint32_t)(hdr.get_Timestamp() & + g_viz_constants.RowTimeInMask)); + colname->push_back(flowu); + GenDb::DbDataValueVec *colvalue( + new GenDb::DbDataValueVec(ocolvalue)); + boost::ptr_vector expected_vector = + boost::assign::ptr_list_of + (GenDb::NewCol(colname, colvalue, ttl)); + + GenDb::DbDataValueVec rowkey; + rowkey.push_back((uint32_t)(hdr.get_Timestamp() >> + g_viz_constants.RowTimeInBits)); + uint8_t partition_no = 0; + rowkey.push_back(partition_no); + rowkey.push_back((uint8_t)dit->get_direction_ing()); + + EXPECT_CALL(*dbif_mock(), + Db_AddColumnProxy( + Pointee( + AllOf(Field(&GenDb::ColList::cfname_, + g_viz_constants.FLOW_TABLE_DVN_DIP), + Field(&GenDb::ColList::rowkey_, rowkey), + Field(&GenDb::ColList::columns_, + expected_vector))))) + .Times(1) + .WillOnce(Return(true)); + } - { - GenDb::DbDataValueVec *colname(new GenDb::DbDataValueVec); - colname->reserve(4); - colname->push_back(hdr.get_Source()); //vrouter - colname->push_back((uint32_t)(hdr.get_Timestamp() & g_viz_constants.RowTimeInMask)); - colname->push_back(flowu); - GenDb::DbDataValueVec *colvalue(new GenDb::DbDataValueVec(ocolvalue)); - boost::ptr_vector expected_vector = - boost::assign::ptr_list_of - (GenDb::NewCol(colname, colvalue, ttl)); + // set expectations for FLOW_TABLE_PROT_SP + { + GenDb::DbDataValueVec *colname(new GenDb::DbDataValueVec); + colname->reserve(4); + colname->push_back((uint8_t)dit->get_protocol()); + colname->push_back((uint16_t)dit->get_sport()); + colname->push_back((uint32_t)(hdr.get_Timestamp() & + g_viz_constants.RowTimeInMask)); + colname->push_back(flowu); + GenDb::DbDataValueVec *colvalue( + new GenDb::DbDataValueVec(ocolvalue)); + boost::ptr_vector expected_vector = + boost::assign::ptr_list_of + (GenDb::NewCol(colname, colvalue, ttl)); + + GenDb::DbDataValueVec rowkey; + rowkey.push_back((uint32_t)(hdr.get_Timestamp() >> + g_viz_constants.RowTimeInBits)); + uint8_t partition_no = 0; + rowkey.push_back(partition_no); + rowkey.push_back((uint8_t)dit->get_direction_ing()); + + EXPECT_CALL(*dbif_mock(), + Db_AddColumnProxy( + Pointee( + AllOf(Field(&GenDb::ColList::cfname_, + g_viz_constants.FLOW_TABLE_PROT_SP), + Field(&GenDb::ColList::rowkey_, rowkey), + Field(&GenDb::ColList::columns_, + expected_vector))))) + .Times(1) + .WillOnce(Return(true)); + } - GenDb::DbDataValueVec rowkey; - rowkey.push_back((uint32_t)(hdr.get_Timestamp() >> g_viz_constants.RowTimeInBits)); - uint8_t partition_no = 0; - rowkey.push_back(partition_no); - rowkey.push_back((uint8_t)0); //direction + // set expectations for FLOW_TABLE_PROT_DP + { + GenDb::DbDataValueVec *colname(new GenDb::DbDataValueVec); + colname->reserve(4); + colname->push_back((uint8_t)dit->get_protocol()); + colname->push_back((uint16_t)dit->get_dport()); + colname->push_back((uint32_t)(hdr.get_Timestamp() & + g_viz_constants.RowTimeInMask)); + colname->push_back(flowu); + GenDb::DbDataValueVec *colvalue( + new GenDb::DbDataValueVec(ocolvalue)); + boost::ptr_vector expected_vector = + boost::assign::ptr_list_of + (GenDb::NewCol(colname, colvalue, ttl)); + + GenDb::DbDataValueVec rowkey; + rowkey.push_back((uint32_t)(hdr.get_Timestamp() >> + g_viz_constants.RowTimeInBits)); + uint8_t partition_no = 0; + rowkey.push_back(partition_no); + rowkey.push_back((uint8_t)dit->get_direction_ing()); + + EXPECT_CALL(*dbif_mock(), + Db_AddColumnProxy( + Pointee( + AllOf(Field(&GenDb::ColList::cfname_, + g_viz_constants.FLOW_TABLE_PROT_DP), + Field(&GenDb::ColList::rowkey_, rowkey), + Field(&GenDb::ColList::columns_, + expected_vector))))) + .Times(1) + .WillOnce(Return(true)); + } - EXPECT_CALL(*dbif_mock(), - Db_AddColumnProxy( - Pointee( - AllOf(Field(&GenDb::ColList::cfname_, g_viz_constants.FLOW_TABLE_VROUTER), - Field(&GenDb::ColList::rowkey_, rowkey), - Field(&GenDb::ColList::columns_, - expected_vector))))) - .Times(1) - .WillOnce(Return(true)); - } + // set expectations for FLOW_TABLE_VROUTER + { + GenDb::DbDataValueVec *colname(new GenDb::DbDataValueVec); + colname->reserve(4); + colname->push_back(hdr.get_Source()); //vrouter + colname->push_back((uint32_t)(hdr.get_Timestamp() & + g_viz_constants.RowTimeInMask)); + colname->push_back(flowu); + GenDb::DbDataValueVec *colvalue( + new GenDb::DbDataValueVec(ocolvalue)); + boost::ptr_vector expected_vector = + boost::assign::ptr_list_of + (GenDb::NewCol(colname, colvalue, ttl)); + + GenDb::DbDataValueVec rowkey; + rowkey.push_back((uint32_t)(hdr.get_Timestamp() >> + g_viz_constants.RowTimeInBits)); + uint8_t partition_no = 0; + rowkey.push_back(partition_no); + rowkey.push_back((uint8_t)dit->get_direction_ing()); + + EXPECT_CALL(*dbif_mock(), + Db_AddColumnProxy( + Pointee( + AllOf(Field(&GenDb::ColList::cfname_, + g_viz_constants.FLOW_TABLE_VROUTER), + Field(&GenDb::ColList::rowkey_, rowkey), + Field(&GenDb::ColList::columns_, + expected_vector))))) + .Times(1) + .WillOnce(Return(true)); + } + } - db_handler()->FlowTableInsert(msg->GetMessageNode(), - msg->GetHeader()); - delete msg; + db_handler()->FlowTableInsert(msg->GetMessageNode(), + msg->GetHeader()); + } } class UUIDRandomGenTest : public ::testing::Test {