Skip to content

Commit

Permalink
Config read from cassandra
Browse files Browse the repository at this point in the history
(Prkash's Commit + build errors + fix failing tests)

1. Pass obj type for read and validation
2. Store linkattr_ attribute for parsing links with attribute
3. Set ack needed for rabbit messages
4. Delete queue on first connect to rabbitmq
5. Pupulate control-node config with data from cassandra

Ananth
1. Update script to produce json files for all xml files used in
   ifmap_server_parser_test
2. Add All ifmap_server_parser_test uts to config_json_parser_test (disabled)
3. Fix many build (linker errors) We may need to clean up this in a better way
   to reduce dependency on cassandra, rabbit, etc. for unreleated tests such
   as bgp
4. Add ifmap-id to rabbitmq message

Change-Id: Ia47bdb5e834fed7cd5180e38c736bd140434365c
Partial-Bug: #1632470
  • Loading branch information
ananth-at-camphor-networks committed Jan 11, 2017
1 parent 28fd7dc commit 6e92f67
Show file tree
Hide file tree
Showing 64 changed files with 4,510 additions and 437 deletions.
13 changes: 13 additions & 0 deletions lib/SimpleAmqpClient/SConscript
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
# -*- mode: python; -*-
import os
vpath = '#/third_party/SimpleAmqpClient'

env = DefaultEnvironment()
SimpleAmqpClient_path = '#/third_party/SimpleAmqpClient'

def Symlink(target, source):
parent = os.path.dirname(target[0].abspath)
if not os.path.exists(parent):
os.makedirs(parent)

if not os.path.exists(target[0].abspath):
os.symlink(source[0].abspath, target[0].abspath)

cmd = ('(cd ' + Dir('.').abspath + ';' +
'cmake -DENABLE_SSL_SUPPORT=ON' +
Expand All @@ -19,3 +29,6 @@ env.SideEffect(products, libSimpleAmqpClient)

env.Requires(libSimpleAmqpClient, '#/build/include/boost')
env.Requires(libSimpleAmqpClient, '#/build/lib/librabbitmq.a')

Symlink([Dir('#/build/include/SimpleAmqpClient')],
[Dir(SimpleAmqpClient_path + '/src/SimpleAmqpClient')])
12 changes: 11 additions & 1 deletion src/bgp/ermvpn/test/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ env.Append(LIBPATH = env['TOP'] + '/bgp/security_group')
env.Append(LIBPATH = env['TOP'] + '/bgp/tunnel_encap')
env.Append(LIBPATH = env['TOP'] + '/control-node')
env.Append(LIBPATH = env['TOP'] + '/db')
env.Append(LIBPATH = env['TOP'] + '/discovery/client')
env.Append(LIBPATH = env['TOP'] + '/io')
env.Append(LIBPATH = env['TOP'] + '/ifmap')
env.Append(LIBPATH = env['TOP'] + '/net')
Expand Down Expand Up @@ -86,7 +87,16 @@ env.Prepend(LIBS = [
'xmpp_enet',
'xml',
'pugixml',
'boost_regex'
'boost_regex',
'io',
'ifmapio',
'cassandra_cql',
'SimpleAmqpClient',
'rabbitmq',
'cassandra',
'gendb',
'ds',
'httpc',
])

if sys.platform != 'darwin':
Expand Down
12 changes: 11 additions & 1 deletion src/bgp/evpn/test/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ env.Append(LIBPATH = env['TOP'] + '/bgp/rtarget')
env.Append(LIBPATH = env['TOP'] + '/bgp/security_group')
env.Append(LIBPATH = env['TOP'] + '/bgp/tunnel_encap')
env.Append(LIBPATH = env['TOP'] + '/control-node')
env.Append(LIBPATH = env['TOP'] + '/discovery/client')
env.Append(LIBPATH = env['TOP'] + '/db')
env.Append(LIBPATH = env['TOP'] + '/io')
env.Append(LIBPATH = env['TOP'] + '/ifmap')
Expand Down Expand Up @@ -86,7 +87,16 @@ env.Prepend(LIBS = [
'xmpp_enet',
'xml',
'pugixml',
'boost_regex'
'boost_regex',
'io',
'ifmapio',
'cassandra_cql',
'SimpleAmqpClient',
'rabbitmq',
'cassandra',
'gendb',
'ds',
'httpc',
])

if sys.platform != 'darwin':
Expand Down
12 changes: 11 additions & 1 deletion src/bgp/inet/test/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ env.Append(LIBPATH = env['TOP'] + '/bgp/security_group')
env.Append(LIBPATH = env['TOP'] + '/bgp/tunnel_encap')
env.Append(LIBPATH = env['TOP'] + '/control-node')
env.Append(LIBPATH = env['TOP'] + '/db')
env.Append(LIBPATH = env['TOP'] + '/discovery/client')
env.Append(LIBPATH = env['TOP'] + '/io')
env.Append(LIBPATH = env['TOP'] + '/ifmap')
env.Append(LIBPATH = env['TOP'] + '/net')
Expand Down Expand Up @@ -86,7 +87,16 @@ env.Prepend(LIBS = [
'xml',
'pugixml',
'boost_regex',
'process_info'
'process_info',
'io',
'ifmapio',
'cassandra_cql',
'SimpleAmqpClient',
'rabbitmq',
'cassandra',
'gendb',
'ds',
'httpc',
])

if sys.platform != 'darwin':
Expand Down
12 changes: 11 additions & 1 deletion src/bgp/inet6/test/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ env.Append(LIBPATH = env['TOP'] + '/bgp/security_group')
env.Append(LIBPATH = env['TOP'] + '/bgp/tunnel_encap')
env.Append(LIBPATH = env['TOP'] + '/control-node')
env.Append(LIBPATH = env['TOP'] + '/db')
env.Append(LIBPATH = env['TOP'] + '/discovery/client')
env.Append(LIBPATH = env['TOP'] + '/io')
env.Append(LIBPATH = env['TOP'] + '/ifmap')
env.Append(LIBPATH = env['TOP'] + '/net')
Expand Down Expand Up @@ -86,7 +87,16 @@ env.Prepend(LIBS = [
'xml',
'pugixml',
'boost_regex',
'process_info'
'process_info',
'io',
'ifmapio',
'cassandra_cql',
'SimpleAmqpClient',
'rabbitmq',
'cassandra',
'gendb',
'ds',
'httpc',
])

if sys.platform != 'darwin':
Expand Down
12 changes: 11 additions & 1 deletion src/bgp/inet6vpn/test/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ env.Append(LIBPATH = env['TOP'] + '/bgp/rtarget')
env.Append(LIBPATH = env['TOP'] + '/bgp/security_group')
env.Append(LIBPATH = env['TOP'] + '/bgp/tunnel_encap')
env.Append(LIBPATH = env['TOP'] + '/control-node')
env.Append(LIBPATH = env['TOP'] + '/discovery/client')
env.Append(LIBPATH = env['TOP'] + '/db')
env.Append(LIBPATH = env['TOP'] + '/io')
env.Append(LIBPATH = env['TOP'] + '/ifmap')
Expand Down Expand Up @@ -87,7 +88,16 @@ env.Prepend(LIBS = [
'ssl',
'boost_regex',
'process_info',
'extended_community'
'extended_community',
'io',
'ifmapio',
'cassandra_cql',
'SimpleAmqpClient',
'rabbitmq',
'cassandra',
'gendb',
'ds',
'httpc',
])

if sys.platform != 'darwin':
Expand Down
12 changes: 11 additions & 1 deletion src/bgp/l3vpn/test/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ env.Append(LIBPATH = env['TOP'] + '/bgp/rtarget')
env.Append(LIBPATH = env['TOP'] + '/bgp/security_group')
env.Append(LIBPATH = env['TOP'] + '/bgp/tunnel_encap')
env.Append(LIBPATH = env['TOP'] + '/control-node')
env.Append(LIBPATH = env['TOP'] + '/discovery/client')
env.Append(LIBPATH = env['TOP'] + '/db')
env.Append(LIBPATH = env['TOP'] + '/io')
env.Append(LIBPATH = env['TOP'] + '/ifmap')
Expand Down Expand Up @@ -86,7 +87,16 @@ env.Prepend(LIBS = [
'xmpp_enet',
'xml',
'pugixml',
'boost_regex'
'boost_regex',
'io',
'ifmapio',
'cassandra_cql',
'SimpleAmqpClient',
'rabbitmq',
'cassandra',
'gendb',
'ds',
'httpc',
])

if sys.platform != 'darwin':
Expand Down
13 changes: 12 additions & 1 deletion src/bgp/rtarget/test/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ env.Append(LIBPATH = [env['TOP'] + '/base', env['TOP'] + '/base/test',
env['TOP'] + '/bgp/rtarget', env['TOP'] + '/bgp/security_group',
env['TOP'] + '/bgp/test', env['TOP'] + '/bgp/tunnel_encap',
env['TOP'] + '/control-node', env['TOP'] + '/control-node/test',
env['TOP'] + '/discovery/test',
env['TOP'] + '/db', env['TOP'] + '/db/test',
env['TOP'] + '/discovery/client', env['TOP'] + '/http/client',
env['TOP'] + '/ifmap', env['TOP'] + '/ifmap/test',
Expand Down Expand Up @@ -76,7 +77,17 @@ else:


if sys.platform != 'darwin':
env.Append(LIBS=['rt'])
env.Append(LIBS=['rt',
'io',
'ifmapio',
'cassandra_cql',
'SimpleAmqpClient',
'rabbitmq',
'cassandra',
'gendb',
'ds',
'httpc'
])


rtarget_address_test = env.UnitTest('rtarget_address_test',
Expand Down
10 changes: 10 additions & 0 deletions src/bgp/test/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ env.Append(LIBPATH = ['#/' + Dir('..').path,
'../../bgp/inet6vpn',
'../../bgp/l3vpn',
'../../control-node',
'../../discovery/client',
'../../db',
'../../io',
'../../ifmap',
Expand Down Expand Up @@ -121,6 +122,15 @@ env.Append(LIBS = [
'bgp_test_factory',
'rtarget',
'control_node_uve',
'io',
'ifmapio',
'cassandra_cql',
'SimpleAmqpClient',
'rabbitmq',
'cassandra',
'gendb',
'ds',
'httpc',
])

env.Append(LIBS = ['route', 'routing_instance', 'routing_policy', 'net'])
Expand Down
46 changes: 39 additions & 7 deletions src/ifmap/client/config_amqp_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,19 @@

#include "config_amqp_client.h"

#include <boost/algorithm/string/find.hpp>
#include <stdio.h>

#include <SimpleAmqpClient/SimpleAmqpClient.h>
#include "rapidjson/document.h"

#include "base/task.h"
#include "ifmap/ifmap_config_options.h"
#include "config_cassandra_client.h"
#include "config_client_manager.h"
#include "config_db_client.h"

using namespace boost;
using namespace std;
using namespace rapidjson;

Expand All @@ -31,7 +35,7 @@ class ConfigAmqpClient::RabbitMQReader : public Task {
ConfigAmqpClient *amqpclient_;
AmqpClient::Channel::ptr_t channel_;
string consumer_tag_;
bool ConnectToRabbitMQ();
bool ConnectToRabbitMQ(bool queue_delete = true);
};

ConfigAmqpClient::ConfigAmqpClient(ConfigClientManager *mgr, string hostname,
Expand Down Expand Up @@ -66,21 +70,33 @@ void ConfigAmqpClient::EnqueueUUIDRequest(string uuid_str, string obj_type,
}

string ConfigAmqpClient::FormAmqpUri() const {
return string("amqp://" + rabbitmq_user() + ":" + rabbitmq_password() +
"@" + rabbitmq_ip() + ":" + rabbitmq_port());
string uri = string("amqp://" + rabbitmq_user() + ":" + rabbitmq_password() +
"@" + rabbitmq_ip() + ":" + rabbitmq_port());
if (rabbitmq_vhost() != "") {
uri += "/" + rabbitmq_vhost();
}
return uri;
}

bool ConfigAmqpClient::RabbitMQReader::ConnectToRabbitMQ() {
bool ConfigAmqpClient::RabbitMQReader::ConnectToRabbitMQ(bool queue_delete) {
string uri = amqpclient_->FormAmqpUri();
try {
channel_ = AmqpClient::Channel::CreateFromUri(uri);
// passive = false, durable = false
// passive = false, durable = false, auto_delete = false
channel_->DeclareExchange("vnc_config.object-update",
AmqpClient::Channel::EXCHANGE_TYPE_FANOUT);
AmqpClient::Channel::EXCHANGE_TYPE_FANOUT, false, false, false);
string queue_name = string("control-node.") + amqpclient_->hostname();

if (queue_delete) {
channel_->DeleteQueue(queue_name, true, true);
}

string queue = channel_->DeclareQueue(queue_name);
channel_->BindQueue(queue, "vnc_config.object-update");
consumer_tag_ = channel_->BasicConsume(queue);
// no_local = true, no_ack = false,
// exclusive = true, message_prefetch_count = 0
consumer_tag_ = channel_->BasicConsume(queue, queue_name,
true, false, true, 0);
} catch (std::exception &e) {
static std::string what = e.what();
std::cout << "Caught fatal exception while connecting to RabbitMQ: " << what << std::endl;
Expand Down Expand Up @@ -109,20 +125,35 @@ bool ConfigAmqpClient::ProcessMessage(const string &json_message) {
string oper = "";
string uuid_str = "";
string obj_type = "";
string obj_name = "";
for (Value::ConstMemberIterator itr = document.MemberBegin();
itr != document.MemberEnd(); ++itr) {
string key(itr->name.GetString());
if (key == "oper") {
oper = itr->value.GetString();
} else if (key == "type") {
obj_type = itr->value.GetString();
} else if (key == "imid") {
string temp_imid = itr->value.GetString();
iterator_range<string::iterator> r = find_nth(temp_imid, ":", 1);
if (r.empty()) {
std::cout << "FAIL " << std::endl;
continue;
}
obj_name =
temp_imid.substr(distance(temp_imid.begin(), r.begin())+1);
} else if (key == "uuid") {
uuid_str = itr->value.GetString();
}
}
if ((oper == "") || (uuid_str == "") || (obj_type == "")) {
assert(0);
}
if (oper == "CREATE") {
assert(obj_name != "");
config_manager()->config_db_client()->AddFQNameCache(uuid_str,
obj_name);
}
EnqueueUUIDRequest(uuid_str, obj_type, oper);
}
return true;
Expand All @@ -137,6 +168,7 @@ bool ConfigAmqpClient::RabbitMQReader::Run() {
AmqpClient::Envelope::ptr_t envelope;
while (envelope = channel_->BasicConsumeMessage(consumer_tag_)) {
amqpclient_->ProcessMessage(envelope->Message()->Body());
channel_->BasicAck(envelope);
}
return true;
}
Expand Down
6 changes: 6 additions & 0 deletions src/ifmap/client/config_amqp_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ class ConfigAmqpClient {
bool ProcessMessage(const std::string &json_message);
static void set_disable(bool disable) { disable_ = disable; }

ConfigClientManager *config_manager() const {
return mgr_;
}
ConfigClientManager *config_manager() {
return mgr_;
}
private:
// A Job for reading the rabbitmq
class RabbitMQReader;
Expand Down

0 comments on commit 6e92f67

Please sign in to comment.