Skip to content

Commit

Permalink
Enchance the protobuf server implementation to:
Browse files Browse the repository at this point in the history
1. Use TelemetryFieldOptions which are an extension of
   protobuf FieldOptions to decide which of the elements
   of a message should be considered as tags/key.
Added unit tests for verification of protobuf message
extensions sent in self describing message.
Closes-Bug: #1450736

Change-Id: I72a367963a1b6a1c433a181f6547da8376faf8b4
  • Loading branch information
Megh Bhatt committed May 4, 2015
1 parent 77813e9 commit 43375be
Show file tree
Hide file tree
Showing 7 changed files with 816 additions and 174 deletions.
2 changes: 1 addition & 1 deletion src/analytics/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ RedisLuaBuild(AnalyticsEnv, 'uveupdate_st')
RedisLuaBuild(AnalyticsEnv, 'uvedelete')
RedisLuaBuild(AnalyticsEnv, 'flushuves')

ProtobufGenFiles = AnalyticsEnv.ProtocGenCpp('self_describing_message.proto')
ProtobufGenFiles = AnalyticsEnv.ProtocGenCpp('protobuf_schema.proto')
ProtobufGenSrcs = AnalyticsEnv.ExtractCpp(ProtobufGenFiles)
ProtobufGenObjs = AnalyticsEnv.Object(ProtobufGenSrcs)
AnalyticsEnv['ANALYTICS_PROTOBUF_GEN_OBJS'] = ProtobufGenObjs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,15 @@ message SelfDescribingMessage {
// The message data.
required bytes message_data = 4;
}

message TelemetryFieldOptions {
// Key
optional bool is_key = 1;

// Timestamp
optional bool is_timestamp = 2;
}

extend google.protobuf.FieldOptions {
optional TelemetryFieldOptions telemetry_options = 33333;
}
97 changes: 60 additions & 37 deletions src/analytics/protobuf_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@
#include <io/udp_server.h>

#include "analytics/diffstats.h"
#include "analytics/self_describing_message.pb.h"
#include "analytics/protobuf_schema.pb.h"
#include "analytics/protobuf_server.h"
#include "analytics/protobuf_server_impl.h"

using ::google::protobuf::FileDescriptorSet;
using ::google::protobuf::FileDescriptor;
using ::google::protobuf::FileDescriptorProto;
using ::google::protobuf::FieldDescriptor;
using ::google::protobuf::FieldOptions;
using ::google::protobuf::EnumValueDescriptor;
using ::google::protobuf::Descriptor;
using ::google::protobuf::DescriptorPool;
Expand Down Expand Up @@ -87,7 +88,7 @@ bool ProtobufReader::ParseSelfDescribingMessage(const uint8_t *data,
parse_failure_cb(msg_type);
}
LOG(ERROR, "SelfDescribingMessage: " << msg_type << ": Descriptor " <<
" not FOUND");
"not FOUND");
return false;
}
// Parse the message.
Expand Down Expand Up @@ -132,6 +133,16 @@ void PopulateProtobufTopLevelTags(const Message& message,
for (size_t i = 0; i < fields.size(); i++) {
// Gather tags
const FieldDescriptor *field(fields[i]);
const FieldOptions &foptions(field->options());
bool is_tag(false);
if (foptions.HasExtension(telemetry_options)) {
const TelemetryFieldOptions &toptions(
foptions.GetExtension(telemetry_options));
is_tag = toptions.has_is_key() && toptions.is_key();
}
if (is_tag == false) {
continue;
}
const FieldDescriptor::CppType ftype(field->cpp_type());
const std::string &fname(field->name());
switch (ftype) {
Expand Down Expand Up @@ -208,6 +219,19 @@ void PopulateProtobufTopLevelTags(const Message& message,
}
}

static inline void PopulateAttribsAndTags(DbHandler::AttribMap *attribs,
StatWalker::TagMap *tags, bool is_tag, const std::string &name,
DbHandler::Var value) {
// Insert into the attribute map
attribs->insert(make_pair(name, value));
if (is_tag) {
// Insert into the tag map
StatWalker::TagVal tvalue;
tvalue.val = value;
tags->insert(make_pair(name, tvalue));
}
}

void PopulateProtobufStats(const Message& message,
const std::string &stat_attr_name, StatWalker *stat_walker) {
// At the top level the stat walker already has the tags so
Expand All @@ -226,76 +250,75 @@ void PopulateProtobufStats(const Message& message,
const FieldDescriptor *field(fields[i]);
const FieldDescriptor::CppType ftype(field->cpp_type());
const std::string &fname(field->name());
const FieldOptions &foptions(field->options());
bool is_tag(false);
if (foptions.HasExtension(telemetry_options)) {
const TelemetryFieldOptions &toptions(
foptions.GetExtension(telemetry_options));
is_tag = toptions.has_is_key() && toptions.is_key();
}
switch (ftype) {
case FieldDescriptor::CPPTYPE_INT32: {
// Insert into the attribute map
DbHandler::Var avalue(static_cast<uint64_t>(
// Insert into the attribute and tag map
DbHandler::Var value(static_cast<uint64_t>(
reflection->GetInt32(message, field)));
attribs.insert(make_pair(fname, avalue));
PopulateAttribsAndTags(&attribs, &tags, is_tag, fname, value);
break;
}
case FieldDescriptor::CPPTYPE_INT64: {
// Insert into the attribute map
DbHandler::Var avalue(static_cast<uint64_t>(
// Insert into the attribute and tag map
DbHandler::Var value(static_cast<uint64_t>(
reflection->GetInt64(message, field)));
attribs.insert(make_pair(fname, avalue));
PopulateAttribsAndTags(&attribs, &tags, is_tag, fname, value);
break;
}
case FieldDescriptor::CPPTYPE_UINT32: {
// Insert into the attribute map
DbHandler::Var avalue(static_cast<uint64_t>(
// Insert into the attribute and tag map
DbHandler::Var value(static_cast<uint64_t>(
reflection->GetUInt32(message, field)));
attribs.insert(make_pair(fname, avalue));
PopulateAttribsAndTags(&attribs, &tags, is_tag, fname, value);
break;
}
case FieldDescriptor::CPPTYPE_UINT64: {
// Insert into the attribute map
DbHandler::Var avalue(reflection->GetUInt64(message, field));
attribs.insert(make_pair(fname, avalue));
// Insert into the attribute and tag map
DbHandler::Var value(reflection->GetUInt64(message, field));
PopulateAttribsAndTags(&attribs, &tags, is_tag, fname, value);
break;
}
case FieldDescriptor::CPPTYPE_DOUBLE: {
// Insert into the attribute map
DbHandler::Var avalue(reflection->GetDouble(message, field));
attribs.insert(make_pair(fname, avalue));
// Insert into the attribute and tag map
DbHandler::Var value(reflection->GetDouble(message, field));
PopulateAttribsAndTags(&attribs, &tags, is_tag, fname, value);
break;
}
case FieldDescriptor::CPPTYPE_FLOAT: {
// Insert into the attribute map
DbHandler::Var avalue(static_cast<double>(
// Insert into the attribute and tag map
DbHandler::Var value(static_cast<double>(
reflection->GetFloat(message, field)));
attribs.insert(make_pair(fname, avalue));
PopulateAttribsAndTags(&attribs, &tags, is_tag, fname, value);
break;
}
case FieldDescriptor::CPPTYPE_BOOL: {
// Insert into the attribute map
DbHandler::Var avalue(static_cast<uint64_t>(
// Insert into the attribute and tag map
DbHandler::Var value(static_cast<uint64_t>(
reflection->GetBool(message, field)));
attribs.insert(make_pair(fname, avalue));
PopulateAttribsAndTags(&attribs, &tags, is_tag, fname, value);
break;
}
case FieldDescriptor::CPPTYPE_ENUM: {
const EnumValueDescriptor *edesc(reflection->GetEnum(message,
field));
const std::string &svalue(edesc->name());
// Insert into the attribute map
DbHandler::Var avalue(svalue);
attribs.insert(make_pair(fname, avalue));
// Insert into the tag map
StatWalker::TagVal tvalue;
tvalue.val = svalue;
tags.insert(make_pair(fname, tvalue));
// Insert into the attribute and tag map
DbHandler::Var value(svalue);
PopulateAttribsAndTags(&attribs, &tags, is_tag, fname, value);
break;
}
case FieldDescriptor::CPPTYPE_STRING: {
const std::string &svalue(reflection->GetString(message, field));
// Insert into the attribute map
DbHandler::Var avalue(svalue);
attribs.insert(make_pair(fname, avalue));
// Insert into the tag map
StatWalker::TagVal tvalue;
tvalue.val = svalue;
tags.insert(make_pair(fname, tvalue));
// Insert into the attribute and tag map
DbHandler::Var value(svalue);
PopulateAttribsAndTags(&attribs, &tags, is_tag, fname, value);
break;
}
case FieldDescriptor::CPPTYPE_MESSAGE: {
Expand Down
5 changes: 4 additions & 1 deletion src/analytics/test/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,15 @@ env.Alias('src/analytics:options_test', options_test)

protobufEnv = env.Clone()
protobuf_test_gen_files = protobufEnv.ProtocGenCpp('test_message.proto')
protobuf_test_gen_files += protobufEnv.ProtocGenCpp('test_message_extensions.proto')
protobuf_test_gen_srcs = protobufEnv.ExtractCpp(protobuf_test_gen_files)
protobuf_test_desc_files = protobufEnv.ProtocGenDesc('test_message.proto')
protobuf_test_desc_files += protobufEnv.ProtocGenDesc('test_message_extensions.proto')
protobuf_test = protobufEnv.UnitTest('protobuf_test', protobuf_test_gen_srcs +
protobufEnv['ANALYTICS_PROTOBUF_GEN_OBJS'] +
['protobuf_test.cc', '../protobuf_server.o', '../stat_walker.o'])
env.Requires(protobuf_test, env['TOP'] + '/analytics/test/test_message.desc')
env.Requires(protobuf_test, [env['TOP'] + '/analytics/test/test_message.desc',
env['TOP'] + '/analytics/test/test_message_extensions.desc'])
protobufEnv.Alias('src/analytics:protobuf_test', protobuf_test)

test_suite = [
Expand Down

0 comments on commit 43375be

Please sign in to comment.