Skip to content

Commit

Permalink
Correctly placed node_id_t inside derecho namespace
Browse files Browse the repository at this point in the history
While debugging the compile errors caused by enabling USE_VERBS_API, I
noticed that connection_manager.hpp was improperly putting node_id_t
into the global namespace even though we intended it to be scoped within
derecho. This was inadvertently hiding a bunch of improper uses of
node_id_t that should have been caught back when we first put node_id_t
inside the derecho namespace.
  • Loading branch information
etremel committed May 4, 2024
1 parent 7629d2d commit 58a7b5b
Show file tree
Hide file tree
Showing 29 changed files with 84 additions and 61 deletions.
6 changes: 3 additions & 3 deletions README.md
Expand Up @@ -395,7 +395,7 @@ Here is an example of a JSON layout string that uses "reserved_node_ids_by_shard
if(curr_view.num_members < 3) {
throw derecho::subgroup_provisioning_exception();
}
std::vector<node_id_t> first_3_nodes(&curr_view.members[0], &curr_view.members[0] + 3);
std::vector<derecho::node_id_t> first_3_nodes(&curr_view.members[0], &curr_view.members[0] + 3);
//Put the desired SubView at subgroup_layout[0][0] since there's one subgroup with one shard
subgroup_layout[0].emplace_back(curr_view.make_subview(first_3_nodes));
//Advance next_unassigned_rank by 3, unless it was already beyond 3, since we assigned the first 3 nodes
Expand All @@ -405,7 +405,7 @@ Here is an example of a JSON layout string that uses "reserved_node_ids_by_shard
if(curr_view.num_members < 6) {
throw derecho::subgroup_provisioning_exception();
}
std::vector<node_id_t> next_3_nodes(&curr_view.members[3], &curr_view.members[3] + 3);
std::vector<derecho::node_id_t> next_3_nodes(&curr_view.members[3], &curr_view.members[3] + 3);
subgroup_layout[0].emplace_back(curr_view.make_subview(next_3_nodes));
curr_view.next_unassigned_rank += 3;
}
Expand Down Expand Up @@ -453,7 +453,7 @@ PeerCaller<Cache>& p2p_cache_handle = group->get_nonmember_subgroup<Cache>(1);
When invoking a P2P send, the caller must specify, as the first argument, the ID of the node to communicate with. The caller must ensure that this node is actually a member of the subgroup that the PeerCaller targets (though it can be in any shard of that subgroup). Nodes can find out the current membership of a subgroup by calling the `get_subgroup_members` method on the Group, which uses the same template parameter and argument as `get_subgroup` to select a subgroup by type and index. For example, assuming Cache subgroups are not sharded, this is how a non-member process could make a call to `get`, targeting the first node in the second subgroup of type Cache:

```cpp
std::vector<node_id_t> cache_members = group.get_subgroup_members<Cache>(1)[0];
std::vector<derecho::node_id_t> cache_members = group.get_subgroup_members<Cache>(1)[0];
derecho::rpc::QueryResults<std::string> results = p2p_cache_handle.p2p_send<RPC_NAME(get)>(cache_members[0], "Foo");
```

Expand Down
2 changes: 1 addition & 1 deletion include/derecho/core/detail/connection_manager.hpp
Expand Up @@ -9,10 +9,10 @@
#include <map>
#include <mutex>

using node_id_t = derecho::node_id_t;

namespace tcp {

using node_id_t = derecho::node_id_t;
using ip_addr_t = derecho::ip_addr_t;

class tcp_connections {
Expand Down
1 change: 1 addition & 0 deletions src/applications/archive/custom_subgroup_profiles_test.cpp
Expand Up @@ -12,6 +12,7 @@

using derecho::PeerCaller;
using derecho::Replicated;
using derecho::node_id_t;
using std::cout;
using std::endl;
using namespace persistent;
Expand Down
1 change: 1 addition & 0 deletions src/applications/archive/external_client_perf_test.cpp
Expand Up @@ -8,6 +8,7 @@

using derecho::ExternalClientCaller;
using derecho::Replicated;
using derecho::node_id_t;
using std::cout;
using std::endl;
using derecho::Bytes;
Expand Down
1 change: 1 addition & 0 deletions src/applications/archive/long_subgroup_test.cpp
Expand Up @@ -7,6 +7,7 @@

using derecho::PeerCaller;
using derecho::Replicated;
using derecho::node_id_t;
using std::cout;
using std::endl;
using namespace persistent;
Expand Down
1 change: 1 addition & 0 deletions src/applications/archive/notification_test.cpp
Expand Up @@ -7,6 +7,7 @@
#include <derecho/persistent/Persistent.hpp>

using derecho::ExternalClientCaller;
using derecho::node_id_t;
using std::cout;
using std::endl;

Expand Down
1 change: 1 addition & 0 deletions src/applications/archive/scaling_subgroup_test.cpp
Expand Up @@ -9,6 +9,7 @@

using derecho::PeerCaller;
using derecho::Replicated;
using derecho::node_id_t;
using std::cout;
using std::endl;
using namespace persistent;
Expand Down
2 changes: 2 additions & 0 deletions src/applications/archive/smart_membership_function_test.cpp
Expand Up @@ -21,6 +21,8 @@
#define RPC_NAME(...) 0ULL
#endif

using derecho::node_id_t;

class Cache : public mutils::ByteRepresentable {
std::map<std::string, std::string> cache_map;

Expand Down
1 change: 1 addition & 0 deletions src/applications/archive/subgroup_test.cpp
Expand Up @@ -10,6 +10,7 @@
#include <derecho/core/derecho.hpp>

using derecho::RawObject;
using derecho::node_id_t;
using std::cin;
using std::cout;
using std::endl;
Expand Down
1 change: 1 addition & 0 deletions src/applications/archive/typed_subgroup_test.cpp
Expand Up @@ -12,6 +12,7 @@

using derecho::PeerCaller;
using derecho::Replicated;
using derecho::node_id_t;
using std::cout;
using std::endl;
using namespace persistent;
Expand Down
34 changes: 17 additions & 17 deletions src/applications/demos/oob_rdma.cpp
@@ -1,7 +1,7 @@
/**
* @file oob_rdma.cpp
*
* This test creates one subgroup demonstrating OOB mechanism
* This test creates one subgroup demonstrating OOB mechanism
* - between external clients and a group member, and
* - between derecho members
*/
Expand Down Expand Up @@ -76,7 +76,7 @@ class OOBRDMA : public mutils::ByteRepresentable,
* @param size the size of the data
*
* @return true for success, otherwise false.
*/
*/
bool get(const uint64_t& callee_addr, const uint64_t& caller_addr, const uint64_t rkey, const uint64_t size) const;

/**
Expand Down Expand Up @@ -104,7 +104,7 @@ class OOBRDMA : public mutils::ByteRepresentable,
bool recv(const uint64_t& callee_addr, const uint64_t size) const;

// constructors
OOBRDMA(void* _oob_mr_ptr, size_t _oob_mr_size) :
OOBRDMA(void* _oob_mr_ptr, size_t _oob_mr_size) :
oob_mr_ptr(_oob_mr_ptr),
oob_mr_size(_oob_mr_size) {}

Expand Down Expand Up @@ -149,7 +149,7 @@ uint64_t OOBRDMA::put(const uint64_t& caller_addr, const uint64_t rkey, const ui

bool OOBRDMA::get(const uint64_t& callee_addr, const uint64_t& caller_addr, const uint64_t rkey, const uint64_t size) const {
// STEP 1 - validate the memory size
if ((callee_addr < reinterpret_cast<uint64_t>(oob_mr_ptr)) ||
if ((callee_addr < reinterpret_cast<uint64_t>(oob_mr_ptr)) ||
((callee_addr+size) > reinterpret_cast<uint64_t>(oob_mr_ptr) + oob_mr_size)) {
std::cerr << "callee address:0x" << std::hex << callee_addr << " or size " << size << " is invalid." << std::dec << std::endl;
return false;
Expand Down Expand Up @@ -192,7 +192,7 @@ bool OOBRDMA::recv(const uint64_t& callee_addr, const uint64_t size) const {
// STEP 2 - do RDMA send
auto& subgroup_handle = group->template get_subgroup<OOBRDMA>(this->subgroup_index);
struct iovec iov;
iov.iov_base = reinterpret_cast<void*>(callee_addr);
iov.iov_base = reinterpret_cast<void*>(callee_addr);
iov.iov_len = static_cast<size_t>(size);
subgroup_handle.oob_send(group->get_rpc_caller_id(),&iov,1);
subgroup_handle.wait_for_oob_op(group->get_rpc_caller_id(),OOB_OP_SEND,1000);
Expand All @@ -217,7 +217,7 @@ static void print_data (void* addr,size_t size) {

template <typename SubgroupRefT>
void do_send_recv_test(SubgroupRefT& subgroup_handle,
node_id_t nid,
derecho::node_id_t nid,
void* send_buffer_laddr,
void* recv_buffer_laddr,
size_t oob_data_size
Expand Down Expand Up @@ -282,7 +282,7 @@ void do_send_recv_test(SubgroupRefT& subgroup_handle,
iov.iov_len = oob_data_size;
// 3.1 - post oob buffer for receive
subgroup_handle.oob_recv(nid,&iov,1);
// 3.2 - post p2p_send
// 3.2 - post p2p_send
auto recv_results = subgroup_handle.template p2p_send<RPC_NAME(recv)>(nid,remote_addr,oob_data_size);
// 3.3 - wait until oob received.
subgroup_handle.wait_for_oob_op(nid,OOB_OP_RECV,1000);
Expand Down Expand Up @@ -326,7 +326,7 @@ void do_send_recv_test(SubgroupRefT& subgroup_handle,
}

template <typename P2PCaller>
void do_test (P2PCaller& p2p_caller, node_id_t nid, uint64_t rkey, void* put_buffer_laddr, void* get_buffer_laddr,
void do_test (P2PCaller& p2p_caller, derecho::node_id_t nid, uint64_t rkey, void* put_buffer_laddr, void* get_buffer_laddr,
size_t oob_data_size
#ifdef CUDA_FOUND
, bool use_gpu_mem
Expand Down Expand Up @@ -383,7 +383,7 @@ void do_test (P2PCaller& p2p_caller, node_id_t nid, uint64_t rkey, void* put_buf
auto results = p2p_caller.template p2p_send<RPC_NAME(get)>(nid,remote_addr,reinterpret_cast<uint64_t>(get_buffer_laddr),rkey,oob_data_size);
std::cout << "Wait for return" << std::endl;
results.get().get(nid);
std::cout << "Data get from remote address @" << std::hex << remote_addr
std::cout << "Data get from remote address @" << std::hex << remote_addr
<< " to local address @" << reinterpret_cast<uint64_t>(get_buffer_laddr) << std::endl;
}
// print 16 bytes of contents
Expand Down Expand Up @@ -418,7 +418,7 @@ void do_test (P2PCaller& p2p_caller, node_id_t nid, uint64_t rkey, void* put_buf
#endif
}

const char* help_string =
const char* help_string =
"--server,-s Run as server, otherwise, run as client by default.\n"
#ifdef CUDA_FOUND
"--gpu,-g Using GPU memory, otherwise, use CPU memory by default.\n"
Expand Down Expand Up @@ -514,7 +514,7 @@ int main(int argc, char** argv) {
std::cerr << "Unknown CUDA device:" << cuda_device << ". We found only " << n_devices << std::endl;
return -1;
}
// initialize cuda_ctxt;
// initialize cuda_ctxt;
ASSERTDRV(cuDeviceGet(&cuda_ctxt.device, cuda_device));
ASSERTDRV(cuDevicePrimaryCtxRetain(&cuda_ctxt.context, cuda_ctxt.device));
ASSERTDRV(cuCtxSetCurrent(cuda_ctxt.context));
Expand Down Expand Up @@ -556,23 +556,23 @@ int main(int argc, char** argv) {
if (server_mode) {
// Read configurations from the command line options as well as the default config file
derecho::Conf::initialize(argc, argv);

// Define subgroup member ship using the default subgroup allocator function.
// When constructed using make_subgroup_allocator with no arguments, this will check the config file
// for either the json_layout or json_layout_file options, and use whichever one is present to define
// the mapping from types to subgroup allocation parameters.
derecho::SubgroupInfo subgroup_function{derecho::make_subgroup_allocator<OOBRDMA>()};

// oobrdma_factory
auto oobrdma_factory = [&oob_mr_ptr,&oob_mr_size](persistent::PersistentRegistry*, derecho::subgroup_id_t) {
return std::make_unique<OOBRDMA>(oob_mr_ptr,oob_mr_size);
};

// group
derecho::Group<OOBRDMA> group(derecho::UserMessageCallbacks{}, subgroup_function,
derecho::Group<OOBRDMA> group(derecho::UserMessageCallbacks{}, subgroup_function,
{&dsm},
std::vector<derecho::view_upcall_t>{}, oobrdma_factory);

std::cout << "Finished constructing/joining Group." << std::endl;
#ifdef CUDA_FOUND
if (use_gpu_mem) {
Expand Down Expand Up @@ -636,7 +636,7 @@ int main(int argc, char** argv) {
void* get_buffer_laddr = reinterpret_cast<void*>(((reinterpret_cast<uint64_t>(oob_mr_ptr) + oob_data_size + 4095)>>12)<<12);

for (uint32_t i=1;i<=count;i++) {
node_id_t nid = i%external_group.get_members().size();
derecho::node_id_t nid = i%external_group.get_members().size();
do_test(external_caller,nid,rkey,put_buffer_laddr,get_buffer_laddr,oob_data_size
#ifdef CUDA_FOUND
,use_gpu_mem
Expand Down
1 change: 1 addition & 0 deletions src/applications/demos/overlapping_replicated_objects.cpp
Expand Up @@ -22,6 +22,7 @@

using derecho::PeerCaller;
using derecho::Replicated;
using derecho::node_id_t;
using std::cout;
using std::endl;

Expand Down
10 changes: 5 additions & 5 deletions src/applications/demos/signed_store_mockup.cpp
Expand Up @@ -110,13 +110,13 @@ ClientTier::ClientTier(){};
std::tuple<persistent::version_t, uint64_t, std::vector<uint8_t>> ClientTier::submit_update(const Blob& data) const {
derecho::PeerCaller<ObjectStore>& storage_subgroup = group->template get_nonmember_subgroup<ObjectStore>();
derecho::PeerCaller<SignatureStore>& signature_subgroup = group->template get_nonmember_subgroup<SignatureStore>();
std::vector<std::vector<node_id_t>> storage_members = group->get_subgroup_members<ObjectStore>();
std::vector<std::vector<node_id_t>> signature_members = group->get_subgroup_members<SignatureStore>();
std::vector<std::vector<derecho::node_id_t>> storage_members = group->get_subgroup_members<ObjectStore>();
std::vector<std::vector<derecho::node_id_t>> signature_members = group->get_subgroup_members<SignatureStore>();
std::uniform_int_distribution<> storage_distribution(0, storage_members[0].size() - 1);
std::uniform_int_distribution<> signature_distribution(0, signature_members[0].size() - 1);
//Choose a random member of each subgroup to contact with the P2P message
const node_id_t storage_member_to_contact = storage_members[0][storage_distribution(random_engine)];
const node_id_t signature_member_to_contact = signature_members[0][signature_distribution(random_engine)];
const derecho::node_id_t storage_member_to_contact = storage_members[0][storage_distribution(random_engine)];
const derecho::node_id_t signature_member_to_contact = signature_members[0][signature_distribution(random_engine)];
//Send the new data to the storage subgroup
dbg_default_debug("Sending update data to node {}", storage_member_to_contact);
auto storage_query_results = storage_subgroup.p2p_send<RPC_NAME(update)>(storage_member_to_contact, data);
Expand Down Expand Up @@ -213,7 +213,7 @@ Blob ObjectStore::get_latest() const {
/* -------------------------------------------------------------------- */

//Determines whether a node ID is a member of any shard in a list of shards
bool member_of_shards(node_id_t node_id, const std::vector<std::vector<node_id_t>>& shard_member_lists) {
bool member_of_shards(derecho::node_id_t node_id, const std::vector<std::vector<derecho::node_id_t>>& shard_member_lists) {
for(const auto& shard_members : shard_member_lists) {
if(std::find(shard_members.begin(), shard_members.end(), node_id) != shard_members.end()) {
return true;
Expand Down
1 change: 1 addition & 0 deletions src/applications/demos/simple_replicated_objects.cpp
Expand Up @@ -22,6 +22,7 @@

using derecho::PeerCaller;
using derecho::Replicated;
using derecho::node_id_t;
using std::cout;
using std::endl;

Expand Down
1 change: 1 addition & 0 deletions src/applications/demos/simple_replicated_objects_json.cpp
Expand Up @@ -23,6 +23,7 @@

using derecho::PeerCaller;
using derecho::Replicated;
using derecho::node_id_t;
using std::cout;
using std::endl;
using json = nlohmann::json;
Expand Down
Expand Up @@ -13,6 +13,7 @@

using derecho::PeerCaller;
using derecho::Replicated;
using derecho::node_id_t;
using std::cout;
using std::endl;

Expand Down
2 changes: 1 addition & 1 deletion src/applications/tests/performance_tests/oob_perf.cpp
Expand Up @@ -155,7 +155,7 @@ bool OOBRDMA::get(const uint64_t& callee_addr, const uint64_t& caller_addr, cons
template <typename P2PCaller>
void perf_test (
P2PCaller& p2p_caller,
node_id_t nid,
derecho::node_id_t nid,
uint64_t rkey,
void* put_buffer_laddr,
void* get_buffer_laddr,
Expand Down
1 change: 1 addition & 0 deletions src/applications/tests/performance_tests/p2p_bw_test.cpp
Expand Up @@ -15,6 +15,7 @@
using std::endl;
using test::Bytes;
using namespace std::chrono;
using derecho::node_id_t;

/**
* RPC Object with a single function that returns a byte array over P2P
Expand Down
Expand Up @@ -9,6 +9,7 @@ using std::cout;
using std::endl;
using std::string;
using std::chrono::duration_cast;
using derecho::node_id_t;

class TestObject : public mutils::ByteRepresentable {
int state;
Expand Down
26 changes: 13 additions & 13 deletions src/applications/tests/performance_tests/signed_store_test.cpp
Expand Up @@ -113,13 +113,13 @@ ClientTier::ClientTier(std::size_t test_data_size)
std::tuple<persistent::version_t, uint64_t, std::vector<unsigned char>> ClientTier::submit_update(const Blob& data) const {
derecho::PeerCaller<ObjectStore>& storage_subgroup = group->template get_nonmember_subgroup<ObjectStore>();
derecho::PeerCaller<SignatureStore>& signature_subgroup = group->template get_nonmember_subgroup<SignatureStore>();
std::vector<std::vector<node_id_t>> storage_members = group->get_subgroup_members<ObjectStore>();
std::vector<std::vector<node_id_t>> signature_members = group->get_subgroup_members<SignatureStore>();
std::vector<std::vector<derecho::node_id_t>> storage_members = group->get_subgroup_members<ObjectStore>();
std::vector<std::vector<derecho::node_id_t>> signature_members = group->get_subgroup_members<SignatureStore>();
std::uniform_int_distribution<> storage_distribution(0, storage_members[0].size() - 1);
std::uniform_int_distribution<> signature_distribution(0, signature_members[0].size() - 1);
//Choose a random member of each subgroup to contact with the P2P message
const node_id_t storage_member_to_contact = storage_members[0][storage_distribution(random_engine)];
const node_id_t signature_member_to_contact = signature_members[0][signature_distribution(random_engine)];
const derecho::node_id_t storage_member_to_contact = storage_members[0][storage_distribution(random_engine)];
const derecho::node_id_t signature_member_to_contact = signature_members[0][signature_distribution(random_engine)];
//Send the new data to the storage subgroup
auto storage_query_results = storage_subgroup.p2p_send<RPC_NAME(update)>(storage_member_to_contact, data);
//Meanwhile, start hashing the update (this might take a long time)
Expand Down Expand Up @@ -152,13 +152,13 @@ bool ClientTier::update_batch_test(const int& num_updates) const {
using namespace std::chrono;
derecho::PeerCaller<ObjectStore>& storage_subgroup = group->template get_nonmember_subgroup<ObjectStore>();
derecho::PeerCaller<SignatureStore>& signature_subgroup = group->template get_nonmember_subgroup<SignatureStore>();
const std::vector<std::vector<node_id_t>> storage_members = group->get_subgroup_members<ObjectStore>();
const std::vector<std::vector<node_id_t>> signature_members = group->get_subgroup_members<SignatureStore>();
const std::vector<std::vector<derecho::node_id_t>> storage_members = group->get_subgroup_members<ObjectStore>();
const std::vector<std::vector<derecho::node_id_t>> signature_members = group->get_subgroup_members<SignatureStore>();
std::uniform_int_distribution<> storage_distribution(0, storage_members[0].size() - 1);
std::uniform_int_distribution<> signature_distribution(0, signature_members[0].size() - 1);
//Choose a random member of each subgroup to contact with the P2P message
const node_id_t storage_member_to_contact = storage_members[0][storage_distribution(random_engine)];
const node_id_t signature_member_to_contact = signature_members[0][signature_distribution(random_engine)];
const derecho::node_id_t storage_member_to_contact = storage_members[0][storage_distribution(random_engine)];
const derecho::node_id_t signature_member_to_contact = signature_members[0][signature_distribution(random_engine)];
/* Note: This currently doesn't work. It gets "stuck" waiting for completion of the
* await-persistence RPC calls, even though the storage subgroup nodes have in fact
* finished persisting all of the updates. I think this is because of the P2P message
Expand Down Expand Up @@ -320,7 +320,7 @@ std::unique_ptr<ObjectStore> ObjectStore::from_bytes(mutils::DeserializationMana
/* -------------------------------------------------------------------- */

//Determines whether a node ID is a member of any shard in a list of shards
bool member_of_shards(node_id_t node_id, const std::vector<std::vector<node_id_t>>& shard_member_lists) {
bool member_of_shards(derecho::node_id_t node_id, const std::vector<std::vector<derecho::node_id_t>>& shard_member_lists) {
for(const auto& shard_members : shard_member_lists) {
if(std::find(shard_members.begin(), shard_members.end(), node_id) != shard_members.end()) {
return true;
Expand Down Expand Up @@ -432,13 +432,13 @@ int main(int argc, char** argv) {
"data_signed_store_test");
//One node in the client tier should send the "end test" message to all the storage members,
//which will signal the main thread to call group.leave() and exit
std::vector<node_id_t> storage_members = group.get_subgroup_members<ObjectStore>(0)[0];
std::vector<node_id_t> signature_members = group.get_subgroup_members<SignatureStore>(0)[0];
std::vector<derecho::node_id_t> storage_members = group.get_subgroup_members<ObjectStore>(0)[0];
std::vector<derecho::node_id_t> signature_members = group.get_subgroup_members<SignatureStore>(0)[0];
if(group.get_subgroup_members<ClientTier>()[0][0] == my_id) {
for(node_id_t nid : storage_members) {
for(derecho::node_id_t nid : storage_members) {
object_store_subgroup.p2p_send<RPC_NAME(end_test)>(nid);
}
for(node_id_t nid : signature_members) {
for(derecho::node_id_t nid : signature_members) {
signature_store_subgroup.p2p_send<RPC_NAME(end_test)>(nid);
}
}
Expand Down

0 comments on commit 58a7b5b

Please sign in to comment.