Skip to content

Commit

Permalink
This fix changes the sandesh send queue size based on
Browse files Browse the repository at this point in the history
     the actual message size upon enque/dequeue.

    Compiler:
    ---------
    Sandesh compiler is changed to generate GetSize()
    which returns the size of the sandesh

    Library:
    -------
    Introducing a new structure Element which consists of the
    sandesh and GetSize function to return size of sandesh

    Change the sandesh_queue_.Enqueue and Dequeue to take
    Element as argument and not sandesh

    Test if Sandesh GetSize function returns value properly.
    Test if sandesh queue size changes
    by the message size upon enqueue/dequeue

Change-Id: Idcd4a9e47e2e0a14601a1963098aa83c2f8c2d93
Closes-Bug: #1464040
  • Loading branch information
arvindvis committed Sep 16, 2015
1 parent 74205ef commit 5ed5df0
Show file tree
Hide file tree
Showing 10 changed files with 417 additions and 9 deletions.
154 changes: 154 additions & 0 deletions compiler/generate/t_cpp_generator.cc
Expand Up @@ -119,6 +119,13 @@ class t_cpp_generator : public t_oop_generator {
void generate_logger_map_element (std::ofstream& out, t_map* tmap, string iter, bool log_value_only);
void generate_logger_set_element (std::ofstream& out, t_set* tset, string iter, bool log_value_only);
void generate_logger_list_element (std::ofstream& out, t_list* tlist, string iter, bool log_value_only);
void generate_sandesh_get_size (std::ofstream& out, t_sandesh* tsandesh);
void generate_get_size_field (std::ofstream& out, t_field *tfield);
void generate_get_size_struct (std::ofstream& out, t_struct *tstruct, string name);
void generate_get_size_container (std::ofstream& out, t_type* ttype, string name);
void generate_get_size_map_element (std::ofstream& out, t_map* tmap, string name);
void generate_get_size_list_element(std::ofstream& out, t_list* ttype, string name);
void generate_get_size_set_element (std::ofstream& out, t_set* ttype, string name);
void generate_sandesh_trace (std::ofstream& out, t_sandesh* tsandesh);
void generate_sandesh_context (std::ofstream& out, t_sandesh* tsandesh, string val);
void generate_sandesh_seqnum(std::ofstream& out, t_sandesh* tsandesh);
Expand Down Expand Up @@ -160,6 +167,8 @@ class t_cpp_generator : public t_oop_generator {
void generate_static_const_string_definition(std::ofstream& out, t_struct* tstruct);
void generate_struct_logger (ofstream& out, const string& name,
const vector<t_field*>& fields);
void generate_struct_get_size (ofstream& out, const string& name,
const vector<t_field*>& fields);
#endif

/**
Expand Down Expand Up @@ -941,6 +950,7 @@ void t_cpp_generator::generate_cpp_struct(t_struct* tstruct, bool is_exception)
generate_struct_writer(out, tstruct);
#ifdef SANDESH
generate_struct_logger(out, tstruct->get_name(), tstruct->get_members());
generate_struct_get_size(out, tstruct->get_name(), tstruct->get_members());
#endif
}

Expand Down Expand Up @@ -974,6 +984,7 @@ void t_cpp_generator::generate_cpp_sandesh(t_sandesh* tsandesh) {
generate_sandesh_reader(out, tsandesh);
generate_sandesh_writer(out, tsandesh);
generate_sandesh_loggers(out, tsandesh);
generate_sandesh_get_size(out, tsandesh);

if (!is_trace) {
generate_sandesh_static_seqnum_def(out, tsandesh);
Expand Down Expand Up @@ -1866,6 +1877,8 @@ void t_cpp_generator::generate_sandesh_definition(ofstream& out,

out << indent() << "std::string ToString() const;" << endl;

out << indent() << "size_t GetSize() const;" << endl;

// Private members
out << endl;
out << "private:" << endl << endl;
Expand Down Expand Up @@ -2244,6 +2257,7 @@ void t_cpp_generator::generate_struct_definition(ofstream& out,
}
#ifdef SANDESH
out << indent() << "std::string log() const;" << endl;
out << indent() << "size_t GetSize() const;" << endl;
#endif
out << endl;

Expand Down Expand Up @@ -2695,6 +2709,28 @@ void t_cpp_generator::generate_struct_writer(ofstream& out,
}

#ifdef SANDESH
/**
* Generate get size for structs
*
* @param out Stream to write to
* @param tstruct The struct tstruct->get_members() tstruct->get_name()
*/
void t_cpp_generator::generate_struct_get_size(ofstream& out, const string& name,
const vector<t_field*>& fields) {
//Generate GetSize function to return size of sandesh
indent(out) << "size_t " << name <<
"::GetSize() const {" << endl;
indent_up();
indent(out) << "size_t size = 0;" << endl;
vector<t_field*>::const_iterator f_iter;
for (f_iter = fields.begin(); f_iter != fields.end(); ++f_iter) {
generate_get_size_field(out, *f_iter);
}
indent(out) << "return size;" << endl;
indent_down();
indent(out) << "}" << endl << endl;
}

/**
* Generate logger for structs
*
Expand Down Expand Up @@ -3253,6 +3289,35 @@ void t_cpp_generator::generate_sandesh_creator(ofstream& out,
return;
}

/**
* finds size of a field of any type.
*/
void t_cpp_generator::generate_get_size_field(ofstream& out, t_field *tfield) {
t_type* type = get_true_type(tfield->get_type());
string name = tfield->get_name();
// Handle optional elements
if (tfield->get_req() == t_field::T_OPTIONAL) {
out << indent() << "if (__isset." << name << ") {" <<
endl;
indent_up();
}
if (type->is_struct()) {
generate_get_size_struct(out, (t_struct *)type, name);
} else if (type->is_container()) {
generate_get_size_container(out, type, name);
} else if (type->is_string() || type->is_xml() ||
type->is_static_const_string()) {
out << indent() << "size += " << name << ".length();" << endl;
} else {
out << indent() << "size += sizeof(" + name +");" << endl;
}
// Handle optional elements
if (tfield->get_req() == t_field::T_OPTIONAL) {
indent_down();
out << indent() << "}" << endl;
}
}

/**
* Logs a field of any type.
*/
Expand Down Expand Up @@ -3328,6 +3393,62 @@ void t_cpp_generator::generate_logger_field(ofstream& out,
}
}

/**
* Generate code to find size of a container
*/
void t_cpp_generator::generate_get_size_container(ofstream& out,
t_type* ttype,
string name) {
scope_up(out);
string iter = tmp("_iter");
out << indent() << type_name(ttype) << "::const_iterator " << iter
<< ";" << endl;
out << indent() << "for (" << iter << " = " << name << ".begin(); "
<< iter << " != " << name << ".end(); ++" << iter << ")" << endl;
scope_up(out);
if (ttype->is_map()) {
generate_get_size_map_element(out, (t_map *)ttype, iter);
} else if(ttype->is_set()) {
generate_get_size_set_element(out, (t_set *)ttype, iter);
} else if(ttype->is_list()) {
generate_get_size_list_element(out, (t_list *)ttype, iter);
}
scope_down(out);
scope_down(out);
}

/**
* Generate code to find size of a map element
*/
void t_cpp_generator::generate_get_size_map_element(ofstream& out,
t_map* tmap,
string iter) {
t_field kfield(tmap->get_key_type(), iter + "->first");
generate_get_size_field(out, &kfield);
t_field vfield(tmap->get_val_type(), iter + "->second");
generate_get_size_field(out, &vfield);
}

/**
* Generate code to find size of list element
*/
void t_cpp_generator::generate_get_size_list_element(ofstream& out,
t_list* tlist,
string iter) {
t_field efield(tlist->get_elem_type(), "(*" + iter + ")");
generate_get_size_field(out, &efield);
}

/**
* Generate code to find size of set element
*/
void t_cpp_generator::generate_get_size_set_element(ofstream& out,
t_set* tset,
string iter) {
t_field efield(tset->get_elem_type(), "(*" + iter + ")");
generate_get_size_field(out, &efield);
}

/**
* Generate code to log a container
*/
Expand Down Expand Up @@ -3394,6 +3515,16 @@ void t_cpp_generator::generate_logger_list_element(ofstream& out,
generate_logger_field(out, &efield, prefix, log_value_only, true);
}

/**
* Generate code to find size of a struct.
*/
void t_cpp_generator::generate_get_size_struct(ofstream& out,
t_struct *tstruct,
string name) {
(void) tstruct;
out << indent() << "size += " << name << ".GetSize();" << endl;
}

/**
* Generate code to log a struct.
*/
Expand Down Expand Up @@ -3525,6 +3656,29 @@ void t_cpp_generator::generate_sandesh_logger(ofstream& out,
}
}

/**
* Generate GetSize for sandesh
*
* @param out The output stream
* @param tsandesh The sandesh
*/
void t_cpp_generator::generate_sandesh_get_size(ofstream& out,
t_sandesh* tsandesh) {
//Generate GetSize function to return size of sandesh
indent(out) << "size_t " << tsandesh->get_name() <<
"::GetSize() const {" << endl;
indent_up();
indent(out) << "size_t size = 0;" << endl;
const vector<t_field*>& fields = tsandesh->get_members();
vector<t_field*>::const_iterator f_iter;
for (f_iter = fields.begin(); f_iter != fields.end(); ++f_iter) {
generate_get_size_field(out, *f_iter);
}
indent(out) << "return size;" << endl;
indent_down();
indent(out) << "}" << endl << endl;
}

/**
* Generate loggers for sandesh
*
Expand Down
19 changes: 18 additions & 1 deletion library/cpp/sandesh.cc
Expand Up @@ -457,7 +457,9 @@ bool Sandesh::Enqueue(SandeshQueue *queue) {
Release();
return false;
}
if (!queue->Enqueue(this)) {
//Frame an elemet object and enqueue it
SandeshElement elem(this);
if (!queue->Enqueue(elem)) {
// XXX Change when WorkQueue implements bounded queues
return true;
}
Expand Down Expand Up @@ -828,3 +830,18 @@ bool SandeshSystem::HandleTest(SandeshLevel::type level,
}
return false;
}

template<>
size_t Sandesh::SandeshQueue::AtomicIncrementQueueCount(
SandeshElement *element)
{
size_t sandesh_size = element->GetSize();
return count_.fetch_and_add(sandesh_size) + sandesh_size;
}

template<>
size_t Sandesh::SandeshQueue::AtomicDecrementQueueCount(
SandeshElement *element) {
size_t sandesh_size = element->GetSize();
return count_.fetch_and_add((size_t)(0-sandesh_size)) - sandesh_size;
}
29 changes: 28 additions & 1 deletion library/cpp/sandesh.h
Expand Up @@ -120,10 +120,12 @@ class SandeshMessageStats;
class SandeshConnection;
class SandeshRequest;

struct SandeshElement;

class Sandesh {
public:
typedef WorkQueue<SandeshRequest *> SandeshRxQueue;
typedef WorkQueue<Sandesh *> SandeshQueue;
typedef WorkQueue<SandeshElement> SandeshQueue;
typedef WorkQueue<
boost::shared_ptr<contrail::sandesh::transport::TMemoryBuffer> >
SandeshBufferQueue;
Expand Down Expand Up @@ -201,6 +203,9 @@ class Sandesh {
static std::string LoggingCategory() { return logging_category_; }
static void SendLoggingResponse(std::string context);

//GetSize method to report the size
virtual size_t GetSize() const = 0;

// Send queue processing
static void SetSendQueue(bool enable);
static inline bool IsSendQueueEnabled() {
Expand Down Expand Up @@ -293,6 +298,7 @@ class Sandesh {
}
static uint32_t get_send_rate_limit() { return sandesh_send_ratelimit_; }


protected:
void set_timestamp(time_t timestamp) { timestamp_ = timestamp; }
void set_type(SandeshType::type type) { type_ = type; }
Expand Down Expand Up @@ -390,6 +396,27 @@ class Sandesh {
static tbb::atomic<uint32_t> sandesh_send_ratelimit_;
};

struct SandeshElement {
Sandesh *snh_;
//Explicit constructor creating only if Sandesh is passed as arg
explicit SandeshElement(Sandesh *snh):snh_(snh),size_(snh->GetSize()) {
}
SandeshElement():size_(0) { }
size_t GetSize() const {
return size_;
}
private:
size_t size_;
};

template<>
size_t Sandesh::SandeshQueue::AtomicIncrementQueueCount(
SandeshElement *element);

template<>
size_t Sandesh::SandeshQueue::AtomicDecrementQueueCount(
SandeshElement *element);

#define SANDESH_LOG(_Level, _Msg) \
do { \
if (LoggingDisabled()) break; \
Expand Down
3 changes: 2 additions & 1 deletion library/cpp/sandesh_connection.cc
Expand Up @@ -82,7 +82,8 @@ bool SandeshConnection::SendSandesh(Sandesh *snh) {
return false;
}
// XXX No bounded work queue
session_->send_queue()->Enqueue(snh);
SandeshElement element(snh);
session_->send_queue()->Enqueue(element);
return true;
}

Expand Down
6 changes: 4 additions & 2 deletions library/cpp/sandesh_session.cc
Expand Up @@ -285,7 +285,8 @@ SandeshSession::SandeshSession(TcpServer *client, Socket *socket,
reader_(new SandeshReader(this)),
send_queue_(new Sandesh::SandeshQueue(writer_task_id,
task_instance,
boost::bind(&SandeshSession::SendMsg, this, _1))),
boost::bind(&SandeshSession::SendMsg, this, _1),
kQueueSize)),
keepalive_idle_time_(kSessionKeepaliveIdleTime),
keepalive_interval_(kSessionKeepaliveInterval),
keepalive_probes_(kSessionKeepaliveProbes),
Expand Down Expand Up @@ -350,7 +351,8 @@ void SandeshSession::OnRead(Buffer buffer) {
reader_->OnRead(buffer);
}

bool SandeshSession::SendMsg(Sandesh *sandesh) {
bool SandeshSession::SendMsg(SandeshElement element) {
Sandesh *sandesh = element.snh_;
tbb::mutex::scoped_lock lock(send_mutex_);
if (!IsEstablished()) {
if (sandesh->IsLoggingDroppedAllowed()) {
Expand Down
3 changes: 2 additions & 1 deletion library/cpp/sandesh_session.h
Expand Up @@ -223,8 +223,9 @@ class SandeshSession : public TcpSession {
static const int kSessionKeepaliveInterval = 3; // in seconds
static const int kSessionKeepaliveProbes = 5; // count
static const int kSessionTcpUserTimeout = 30000; // ms
static const int kQueueSize = 200 * 1024 * 1024; // 200 MB

bool SendMsg(Sandesh *sandesh);
bool SendMsg(SandeshElement element);
bool SendBuffer(boost::shared_ptr<TMemoryBuffer> sbuffer);
bool SessionSendReady();

Expand Down
9 changes: 9 additions & 0 deletions library/cpp/test/SConscript
Expand Up @@ -26,13 +26,15 @@ SandeshMessageTestGenFiles = env.SandeshGenCpp('sandesh_message_test.sandesh')
SandeshTraceTestGenFiles = env.SandeshGenCpp('sandesh_trace_test.sandesh')
SandeshHttpTestGenFiles = env.SandeshGenCpp('sandesh_http_test.sandesh')
SandeshPerfTestGenFiles = env.SandeshGenCpp('sandesh_perf_test.sandesh')
SandeshSendQueueTestGenFiles = env.SandeshGenCpp('sandesh_send_queue_test.sandesh')

SandeshRWTestGenSrcs = env.ExtractCpp(SandeshRWTestGenFiles)
SandeshMessageTestGenSrcs = env.ExtractCpp(SandeshMessageTestGenFiles)
SandeshTraceTestGenSrcs = env.ExtractCpp(SandeshTraceTestGenFiles)
SandeshTraceTestGenObjs = env.Object(SandeshTraceTestGenSrcs)
SandeshHttpTestGenSrcs = env.ExtractCpp(SandeshHttpTestGenFiles)
SandeshPerfTestGenSrcs = env.ExtractCpp(SandeshPerfTestGenFiles)
SandeshSendQueueTestGenSrcs = env.ExtractCpp(SandeshSendQueueTestGenFiles)

SandeshLibPath = ['#/build/lib',
Dir(env['TOP']).abspath + '/base',
Expand Down Expand Up @@ -79,6 +81,12 @@ if sys.platform != 'darwin':
if sys.platform.startswith('freebsd'):
env.Append(LIBS = ['z', 'lzma', 'iconv'])

sandesh_send_queue_test = env.UnitTest('sandesh_send_queue_test',
SandeshSendQueueTestGenSrcs +
['sandesh_send_queue_test.cc'],
)
env.Alias('src/sandesh:sandesh_send_queue_test', sandesh_send_queue_test)

sandesh_perf_test = env.UnitTest('sandesh_perf_test',
SandeshPerfTestGenSrcs +
['sandesh_perf_test.cc'],
Expand Down Expand Up @@ -155,6 +163,7 @@ test_suite = [sandesh_message_test,
sandesh_client_test,
sandesh_statistics_test,
sandesh_request_test,
sandesh_send_queue_test,
]

test = env.TestSuite('sandesh-test', test_suite)
Expand Down

0 comments on commit 5ed5df0

Please sign in to comment.