Skip to content

Commit

Permalink
Merge branch 'master' into use_zerocopy_delta_api
Browse files Browse the repository at this point in the history
  • Loading branch information
songweijia committed May 9, 2024
2 parents 1e31d58 + 797b4cd commit a317c96
Show file tree
Hide file tree
Showing 15 changed files with 71 additions and 71 deletions.
Expand Up @@ -16,7 +16,7 @@ std::string get_description() {
}

class ConsolePrinterOCDPO: public OffCriticalDataPathObserver {
virtual void operator () (const node_id_t sender,
virtual void operator () (const derecho::node_id_t sender,
const std::string& key_string,
const uint32_t prefix_length,
persistent::version_t version,
Expand Down
2 changes: 1 addition & 1 deletion src/applications/standalone/dairy_farm/filter_udl.cpp
Expand Up @@ -39,7 +39,7 @@ std::string get_description() {
class DairyFarmFilterOCDPO: public OffCriticalDataPathObserver {
std::mutex p2p_send_mutex;

virtual void operator () (const node_id_t,
virtual void operator () (const derecho::node_id_t,
const std::string& key_string,
const uint32_t prefix_length,
persistent::version_t version,
Expand Down
2 changes: 1 addition & 1 deletion src/applications/standalone/dairy_farm/infer_udl.cpp
Expand Up @@ -239,7 +239,7 @@ class DairyFarmInferOCDPO: public OffCriticalDataPathObserver {
private:
mutable std::mutex p2p_send_mutex;

virtual void operator () (const node_id_t,
virtual void operator () (const derecho::node_id_t,
const std::string& key_string,
const uint32_t prefix_length,
persistent::version_t version,
Expand Down
2 changes: 1 addition & 1 deletion src/applications/standalone/dairy_farm/storage_udl.cpp
Expand Up @@ -20,7 +20,7 @@ std::string get_description() {

class DairyFarmStorageOCDPO: public OffCriticalDataPathObserver {

virtual void operator () (const node_id_t,
virtual void operator () (const derecho::node_id_t,
const std::string&,
const uint32_t,
persistent::version_t,
Expand Down
6 changes: 3 additions & 3 deletions src/applications/standalone/dds/src/dds_udl.cpp
Expand Up @@ -26,7 +26,7 @@ class DDSOCDPO: public OffCriticalDataPathObserver {
/* control plane suffix */
std::string control_plane_suffix;
/* subscriber registry */
std::unordered_map<std::string,std::unordered_set<node_id_t>> subscriber_registry;
std::unordered_map<std::string,std::unordered_set<derecho::node_id_t>> subscriber_registry;
#ifdef USE_DDS_TIMESTAMP_LOG
#define INIT_TIMESTAMP_SLOTS (262144)
/* log the server timestamp, they are grouped by topic name */
Expand All @@ -35,7 +35,7 @@ class DDSOCDPO: public OffCriticalDataPathObserver {
/* Is shared_mutex fast enough? */
mutable std::shared_mutex subscriber_registry_mutex;

virtual void operator () (const node_id_t sender,
virtual void operator () (const derecho::node_id_t sender,
const std::string& key_string,
const uint32_t prefix_length,
persistent::version_t, // version
Expand Down Expand Up @@ -64,7 +64,7 @@ class DDSOCDPO: public OffCriticalDataPathObserver {
if (command.command_type == DDSCommand::SUBSCRIBE) {
std::unique_lock<std::shared_mutex> wlock(this->subscriber_registry_mutex);
if (subscriber_registry.find(command.topic) == subscriber_registry.end()) {
subscriber_registry.emplace(command.topic,std::unordered_set<node_id_t>{});
subscriber_registry.emplace(command.topic,std::unordered_set<derecho::node_id_t>{});
#ifdef USE_DDS_TIMESTAMP_LOG
// only add log for new topic.
server_timestamp.emplace(command.topic,std::vector<uint64_t>{});
Expand Down
Expand Up @@ -17,7 +17,7 @@ std::string get_description() {
}

class NotificationOCDPO: public OffCriticalDataPathObserver {
virtual void operator () (const node_id_t sender,
virtual void operator () (const derecho::node_id_t sender,
const std::string& key_string,
const uint32_t prefix_length,
persistent::version_t,
Expand Down
Expand Up @@ -59,7 +59,7 @@ static void client_help() {
// put_type = 1 : persistent
// put_type = 2 : trigger
static void client_put(derecho::ExternalGroupClient<VCS,PCS,TCS>& group,
node_id_t member,
derecho::node_id_t member,
const std::vector<std::string>& tokens,
bool is_persistent) {
if (tokens.size() != 3) {
Expand Down Expand Up @@ -88,7 +88,7 @@ static void client_put(derecho::ExternalGroupClient<VCS,PCS,TCS>& group,
}

static void client_trigger_put(derecho::ExternalGroupClient<VCS,PCS,TCS>& group,
node_id_t member,
derecho::node_id_t member,
const std::vector<std::string>& tokens) {
if (tokens.size() != 3) {
std::cout << "Invalid format of 'put' command." << std::endl;
Expand All @@ -107,7 +107,7 @@ static void client_trigger_put(derecho::ExternalGroupClient<VCS,PCS,TCS>& group,

// get
static void client_get(derecho::ExternalGroupClient<VCS,PCS,TCS>& group,
node_id_t member,
derecho::node_id_t member,
const std::vector<std::string>& tokens,
bool is_persistent,
bool is_stable) {
Expand Down Expand Up @@ -152,7 +152,7 @@ static void client_get(derecho::ExternalGroupClient<VCS,PCS,TCS>& group,

// list
static void client_list(derecho::ExternalGroupClient<VCS,PCS,TCS>& group,
node_id_t member,
derecho::node_id_t member,
const std::vector<std::string>& tokens,
bool is_persistent) {
uint64_t ver = CURRENT_VERSION;
Expand Down Expand Up @@ -195,7 +195,7 @@ static void client_list(derecho::ExternalGroupClient<VCS,PCS,TCS>& group,

// remove
static void client_remove(derecho::ExternalGroupClient<VCS,PCS,TCS>& group,
node_id_t member,
derecho::node_id_t member,
const std::vector<std::string>& tokens,
bool is_persistent) {
if (tokens.size() != 2) {
Expand Down Expand Up @@ -226,28 +226,28 @@ void do_client() {
std::cout << "Finished constructing ExternalGroupClient." << std::endl;

/** 2 - get members */
std::vector<node_id_t> g_members = group.get_members();
std::vector<derecho::node_id_t> g_members = group.get_members();
std::cout << "Members in top derecho group:[ ";
for(auto& nid:g_members) {
std::cout << nid << " ";
}
std::cout << "]" << std::endl;

std::vector<node_id_t> vcs_members = group.template get_shard_members<VCS>(0,0);
std::vector<derecho::node_id_t> vcs_members = group.template get_shard_members<VCS>(0,0);
std::cout << "Members in the single shard of Volatile Cascade Store:[ ";
for (auto& nid:vcs_members) {
std::cout << nid << " ";
}
std::cout << "]" << std::endl;

std::vector<node_id_t> pcs_members = group.template get_shard_members<PCS>(0,0);
std::vector<derecho::node_id_t> pcs_members = group.template get_shard_members<PCS>(0,0);
std::cout << "Members in the single shard of Persistent Cascade Store:[ ";
for (auto& nid:pcs_members) {
std::cout << nid << " ";
}
std::cout << "]" << std::endl;

std::vector<node_id_t> tcs_members = group.template get_shard_members<TCS>(0,0);
std::vector<derecho::node_id_t> tcs_members = group.template get_shard_members<TCS>(0,0);
std::cout << "Members in the single shard of Trigger Cascade (NO)Store:[ ";
for (auto& nid:tcs_members) {
std::cout << nid << " ";
Expand Down
4 changes: 2 additions & 2 deletions src/applications/tests/cascade_as_subgroup_classes/perf.cpp
Expand Up @@ -343,7 +343,7 @@ int do_client(int argc,char** args) {

ExternalClientCaller<PCS,std::remove_reference<decltype(group)>::type>& pcs_ec = group.get_subgroup_caller<PCS>();
auto members = group.template get_shard_members<PCS>(0,0);
node_id_t server_id = members[my_node_id % members.size()];
derecho::node_id_t server_id = members[my_node_id % members.size()];

for(uint64_t i = 0; i < num_messages; i++) {
ObjectWithUInt64Key o(randomize_key(i)%max_distinct_objects,Blob(bbuf, msg_size));
Expand All @@ -363,7 +363,7 @@ int do_client(int argc,char** args) {

ExternalClientCaller<VCS,std::remove_reference<decltype(group)>::type>& vcs_ec = group.get_subgroup_caller<VCS>();
auto members = group.template get_shard_members<VCS>(0,0);
node_id_t server_id = members[my_node_id % members.size()];
derecho::node_id_t server_id = members[my_node_id % members.size()];

for(uint64_t i = 0; i < num_messages; i++) {
ObjectWithUInt64Key o(randomize_key(i)%max_distinct_objects,Blob(bbuf, msg_size));
Expand Down
2 changes: 1 addition & 1 deletion src/applications/tests/pipeline/pipeline_client.cpp
Expand Up @@ -108,7 +108,7 @@ int main(int argc, char** argv) {
uint64_t next_ns = 0;
uint64_t end_ns = now_ns + duration_sec*1e9;
#ifdef ENABLE_EVALUATION
node_id_t my_node_id = capi.get_my_id();
derecho::node_id_t my_node_id = capi.get_my_id();
uint64_t msg_id = 0;
#endif

Expand Down
14 changes: 7 additions & 7 deletions src/service/client.cpp
Expand Up @@ -125,7 +125,7 @@ void print_member_selection_policy(ServiceClientAPI& capi, uint32_t subgroup_ind

template <typename SubgroupType>
void set_member_selection_policy(ServiceClientAPI& capi, uint32_t subgroup_index, uint32_t shard_index,
ShardMemberSelectionPolicy policy, node_id_t user_specified_node_id) {
ShardMemberSelectionPolicy policy, derecho::node_id_t user_specified_node_id) {
capi.template set_member_selection_policy<SubgroupType>(subgroup_index,shard_index,policy,user_specified_node_id);
}

Expand Down Expand Up @@ -332,7 +332,7 @@ void op_trigger_put(ServiceClientAPI& capi, const std::string& key, const std::s
}

template <typename SubgroupType>
void collective_trigger_put(ServiceClientAPI& capi, const std::string& key, const std::string& value, uint32_t subgroup_index, std::vector<node_id_t> nodes) {
void collective_trigger_put(ServiceClientAPI& capi, const std::string& key, const std::string& value, uint32_t subgroup_index, std::vector<derecho::node_id_t> nodes) {
typename SubgroupType::ObjectType obj;
if constexpr (std::is_same<typename SubgroupType::KeyType,uint64_t>::value) {
obj.key = static_cast<uint64_t>(std::stol(key,nullptr,0));
Expand All @@ -344,7 +344,7 @@ void collective_trigger_put(ServiceClientAPI& capi, const std::string& key, cons
}

obj.blob = Blob(reinterpret_cast<const uint8_t*>(value.c_str()),value.length());
std::unordered_map<node_id_t,std::unique_ptr<derecho::rpc::QueryResults<void>>> nodes_and_futures;
std::unordered_map<derecho::node_id_t,std::unique_ptr<derecho::rpc::QueryResults<void>>> nodes_and_futures;
for (auto& nid: nodes) {
nodes_and_futures.emplace(nid,nullptr);
}
Expand Down Expand Up @@ -1079,9 +1079,9 @@ std::vector<command_entry_t> commands =
print_red("Invalid policy name:" + cmd_tokens[4]);
return false;
}
node_id_t user_specified_node_id = INVALID_NODE_ID;
derecho::node_id_t user_specified_node_id = INVALID_NODE_ID;
if (cmd_tokens.size() >= 6) {
user_specified_node_id = static_cast<node_id_t>(std::stoi(cmd_tokens[5],nullptr,0));
user_specified_node_id = static_cast<derecho::node_id_t>(std::stoi(cmd_tokens[5],nullptr,0));
}
on_subgroup_type(cmd_tokens[1],set_member_selection_policy,capi,subgroup_index,shard_index,policy,user_specified_node_id);
return true;
Expand Down Expand Up @@ -1300,12 +1300,12 @@ std::vector<command_entry_t> commands =
"collective_trigger_put <type> <key> <value> <subgroup_index> <node id 1> [node id 2, ...] \n"
" type := " SUBGROUP_TYPE_LIST,
[](ServiceClientAPI& capi, const std::vector<std::string>& cmd_tokens) {
std::vector<node_id_t> nodes;
std::vector<derecho::node_id_t> nodes;
CHECK_FORMAT(cmd_tokens,6);
uint32_t subgroup_index = static_cast<uint32_t>(std::stoi(cmd_tokens[4],nullptr,0));
size_t arg_idx = 5;
while(arg_idx < cmd_tokens.size()) {
nodes.push_back(static_cast<node_id_t>(std::stoi(cmd_tokens[arg_idx++],nullptr,0)));
nodes.push_back(static_cast<derecho::node_id_t>(std::stoi(cmd_tokens[arg_idx++],nullptr,0)));
}
on_subgroup_type(cmd_tokens[1],collective_trigger_put,capi,cmd_tokens[2]/*key*/,cmd_tokens[3]/*value*/,subgroup_index,nodes);
return true;
Expand Down
24 changes: 12 additions & 12 deletions src/service/cs/cascade_client_cs.cpp
Expand Up @@ -456,16 +456,16 @@ struct TwoDimensionalNodeList {
struct PolicyMetadataInternal {
const char* policyString;
ShardMemberSelectionPolicy policy;
node_id_t userNode;
derecho::node_id_t userNode;
};

TwoDimensionalNodeList convert_2d_vector(std::vector<std::vector<node_id_t>> vector) {
TwoDimensionalNodeList convert_2d_vector(std::vector<std::vector<derecho::node_id_t>> vector) {
// heap-allocated so that they persist into the managed code without being destructed
auto flattened_list = new std::vector<node_id_t>();
auto flattened_list = new std::vector<derecho::node_id_t>();
auto vector_sizes = new std::vector<uint64_t>();
for (const auto& inner_list : vector) {
vector_sizes->push_back(inner_list.size());
for (const node_id_t node : inner_list) {
for (const derecho::node_id_t node : inner_list) {
flattened_list->push_back(node);
}
}
Expand Down Expand Up @@ -512,7 +512,7 @@ EXPORT bool deleteStringVectorPointer(std::vector<std::string>* ptr) {
return true;
}

EXPORT bool deleteNodeIdVectorPointer(std::vector<node_id_t>* ptr) {
EXPORT bool deleteNodeIdVectorPointer(std::vector<derecho::node_id_t>* ptr) {
delete ptr;
return true;
}
Expand Down Expand Up @@ -540,25 +540,25 @@ EXPORT uint32_t EXPORT_getMyId(ServiceClientAPI& capi) {

EXPORT StdVectorWrapper EXPORT_getMembers(ServiceClientAPI& capi) {
// heap-allocated so that it persists into the managed code without being destructed
auto vec = new std::vector<node_id_t>(capi.get_members());
auto vec = new std::vector<derecho::node_id_t>(capi.get_members());
return {vec->data(), vec, vec->size()};
}

EXPORT TwoDimensionalNodeList EXPORT_getSubgroupMembers(ServiceClientAPI& capi, char* serviceType, uint32_t subgroupIndex) {
std::vector<std::vector<node_id_t>> members;
std::vector<std::vector<derecho::node_id_t>> members;
on_all_subgroup_type(std::string(serviceType), members = capi.template get_subgroup_members, subgroupIndex);
return convert_2d_vector(members);
}

EXPORT TwoDimensionalNodeList EXPORT_getSubgroupMembersByObjectPool(ServiceClientAPI& capi, char* objectPoolPathname) {
std::vector<std::vector<node_id_t>> members = capi.get_subgroup_members(objectPoolPathname);
std::vector<std::vector<derecho::node_id_t>> members = capi.get_subgroup_members(objectPoolPathname);
return convert_2d_vector(members);
}

EXPORT StdVectorWrapper EXPORT_getShardMembers(ServiceClientAPI& capi, char* serviceType, uint32_t subgroupIndex, uint32_t shardIndex) {
// heap-allocated so that it persists into the managed code without being destructed
auto members_ptr = new std::vector<node_id_t>();
std::vector<node_id_t> members;
auto members_ptr = new std::vector<derecho::node_id_t>();
std::vector<derecho::node_id_t> members;
on_all_subgroup_type(std::string(serviceType), members = capi.template get_shard_members, subgroupIndex, shardIndex);
for (auto member : members) {
members_ptr->push_back(member);
Expand All @@ -569,7 +569,7 @@ EXPORT StdVectorWrapper EXPORT_getShardMembers(ServiceClientAPI& capi, char* ser

EXPORT StdVectorWrapper EXPORT_getShardMembersByObjectPool(ServiceClientAPI& capi, char* objectPoolPathname, uint32_t shardIndex) {
// heap-allocated so that it persists into the managed code without being destructed
auto members = new std::vector<node_id_t>();
auto members = new std::vector<derecho::node_id_t>();
for (auto member : capi.get_shard_members(objectPoolPathname, shardIndex)) {
members->push_back(member);
}
Expand All @@ -588,7 +588,7 @@ EXPORT uint32_t EXPORT_getNumberOfShards(ServiceClientAPI& capi, char* serviceTy
return num_shards;
}

EXPORT void EXPORT_setMemberSelectionPolicy(ServiceClientAPI& capi, char* serviceType, uint32_t subgroupIndex, uint32_t shardIndex, char* policy, node_id_t userNode) {
EXPORT void EXPORT_setMemberSelectionPolicy(ServiceClientAPI& capi, char* serviceType, uint32_t subgroupIndex, uint32_t shardIndex, char* policy, derecho::node_id_t userNode) {
ShardMemberSelectionPolicy real_policy = parse_policy_name(std::string(policy));
on_all_subgroup_type(std::string(serviceType), capi.template set_member_selection_policy, subgroupIndex, shardIndex, real_policy, userNode);
}
Expand Down
10 changes: 5 additions & 5 deletions src/service/java/jni/cascade_jni.cpp
Expand Up @@ -92,7 +92,7 @@ derecho::cascade::ServiceClientAPI *get_api(JNIEnv *env, jobject obj)
/**
* Translate a C++ int vector to a Java Integer List.
*/
jobject cpp_int_vector_to_java_list(JNIEnv *env, std::vector<node_id_t> vec)
jobject cpp_int_vector_to_java_list(JNIEnv *env, std::vector<derecho::node_id_t> vec)
{
// create a Java array list
jclass arr_list_cls = env->FindClass("java/util/ArrayList");
Expand All @@ -108,7 +108,7 @@ jobject cpp_int_vector_to_java_list(JNIEnv *env, std::vector<node_id_t> vec)
jmethodID integer_init_mid = env->GetMethodID(integer_cls, "<init>", "(I)V");

// fill everything in
for (node_id_t id : vec)
for (derecho::node_id_t id : vec)
{
jobject int_obj = env->NewObject(integer_cls, integer_init_mid, id);
env->CallObjectMethod(arr_obj, list_add_mid, int_obj);
Expand All @@ -125,7 +125,7 @@ JNIEXPORT jobject JNICALL Java_io_cascade_Client_getMembers(JNIEnv *env, jobject
{
// get members first!
derecho::cascade::ServiceClientAPI *capi = get_api(env, obj);
std::vector<node_id_t> members = capi->get_members();
std::vector<derecho::node_id_t> members = capi->get_members();

// create an array list
return cpp_int_vector_to_java_list(env, members);
Expand All @@ -141,7 +141,7 @@ JNIEXPORT jobject JNICALL Java_io_cascade_Client_getShardMembers__JJ(JNIEnv *env
{
// get shard members in C++
derecho::cascade::ServiceClientAPI *capi = get_api(env, obj);
std::vector<node_id_t> members = capi->get_shard_members(subgroupID, shardID);
std::vector<derecho::node_id_t> members = capi->get_shard_members(subgroupID, shardID);
// create an array list
return cpp_int_vector_to_java_list(env, members);
Expand All @@ -168,7 +168,7 @@ JNIEXPORT jobject JNICALL Java_io_cascade_Client_getShardMembers(JNIEnv *env, jo
// get the value of the service type
int service_type = get_int_value(env, j_service_type);
derecho::cascade::ServiceClientAPI *capi = get_api(env, obj);
std::vector<node_id_t> members;
std::vector<derecho::node_id_t> members;

// call get shard members
on_service_type(service_type, members = capi->get_shard_members, subgroup_index, shard_index);
Expand Down
4 changes: 2 additions & 2 deletions src/service/python/cascade_client_py.cpp
Expand Up @@ -465,7 +465,7 @@ PYBIND11_MODULE(member_client, m) {
.def(
"get_subgroup_members",
[](ServiceClientAPI_PythonWrapper& capi, std::string service_type, uint32_t subgroup_index) {
std::vector<std::vector<node_id_t>> members;
std::vector<std::vector<derecho::node_id_t>> members;
on_all_subgroup_type(service_type, members = capi.ref.template get_subgroup_members, subgroup_index);
return members;
},
Expand All @@ -488,7 +488,7 @@ PYBIND11_MODULE(member_client, m) {
.def(
"get_shard_members",
[](ServiceClientAPI_PythonWrapper& capi, std::string service_type, uint32_t subgroup_index, uint32_t shard_index) {
std::vector<node_id_t> members;
std::vector<derecho::node_id_t> members;
on_all_subgroup_type(service_type, members = capi.ref.template get_shard_members, subgroup_index, shard_index);
return members;
},
Expand Down

0 comments on commit a317c96

Please sign in to comment.