Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed signature support to handle Delta-persistent objects #271

Merged
merged 15 commits into from Apr 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
23 changes: 17 additions & 6 deletions include/derecho/core/detail/derecho_sst.hpp
Expand Up @@ -79,13 +79,23 @@ class DerechoSST : public sst::SST<DerechoSST> {
* subgroup.
*/
SSTFieldVector<persistent::version_t> persisted_num;
/**
* This represents the highest persistent version number that has a
* signature in its log at this node, if any persistent fields have
* signatures enabled. Since signatures are added to log entries at
* the same time as persistence, this is usually equal to persisted_num,
* but it may lag behind if there are persistent fields that do not have
* signatures enabled (hence there may be persistent versions with no
* corresponding signature). Contains one entry per subgroup.
*/
SSTFieldVector<persistent::version_t> signed_num;
/**
* This represents the highest persistent version number for which this
* node has verified a signature from all other nodes in the subgroup, if
* signatures are enabled. There is updated by the PersistenceManager, and
* contains one entry per subgroup. It will generally lag behind
* persisted_num, since updates are only verified once they have been
* signed locally.
* any persistent fields have signatures enabled. This is updated by the
* PersistenceManager, and contains one entry per subgroup. It will
* generally lag behind persisted_num, since updates are only verified once
* they have been signed locally.
*/
SSTFieldVector<persistent::version_t> verified_num;

Expand Down Expand Up @@ -181,7 +191,7 @@ class DerechoSST : public sst::SST<DerechoSST> {
* standard SST constructor.
* @param num_subgroups Number of the subgroups
* @param signature_size Size of the signature
* @param num_received_size
* @param num_received_size
* @param slot_size
* @param index_field_size
*/
Expand All @@ -191,6 +201,7 @@ class DerechoSST : public sst::SST<DerechoSST> {
delivered_num(num_subgroups),
signatures(num_subgroups * signature_size),
persisted_num(num_subgroups),
signed_num(num_subgroups),
verified_num(num_subgroups),
suspected(parameters.members.size()),
changes(100 + parameters.members.size()), //The extra 100 entries allows for more joins at startup, when the group is very small
Expand All @@ -208,7 +219,7 @@ class DerechoSST : public sst::SST<DerechoSST> {
index(index_field_size),
local_stability_frontier(num_subgroups) {
SSTInit(seq_num, delivered_num, signatures,
persisted_num, verified_num,
persisted_num, signed_num, verified_num,
vid, suspected, changes, joiner_ips,
joiner_gms_ports, joiner_state_transfer_ports, joiner_sst_ports, joiner_rdmc_ports, joiner_external_ports,
num_changes, num_committed, num_acked, num_installed,
Expand Down
43 changes: 28 additions & 15 deletions include/derecho/core/detail/persistence_manager.hpp
Expand Up @@ -30,49 +30,61 @@ class ViewManager;
*/
class PersistenceManager {
public:
enum class RequestType {
PERSIST,
VERIFY
};
struct ThreadRequest {
RequestType operation;
subgroup_id_t subgroup_id;
persistent::version_t version;
};

private:
/** Pointer to the persistence-module logger */
std::shared_ptr<spdlog::logger> persistence_logger;
/** Thread handle */
/** Thread handle for the persistence thread */
std::thread persist_thread;
/** Thread handle for the verification thread */
std::thread verify_thread;
/**
* A flag to signal the persistent thread to shutdown; set to true when the
* group is destroyed.
* A flag to signal the PersistenceManager's threads to shutdown; set to
* true when the group is destroyed.
*/
std::atomic<bool> thread_shutdown;
/**
* A semaphore that counts the number of persistence requests available for
* the persistence thread to handle
*/
sem_t persistence_request_sem;
/**
* A semaphore that counts the number of verification requests available for
* the verification thread to handle
*/
sem_t verification_request_sem;
/**
* Queue of requests for the persistence thread, which is shared with other
* threads (e.g. the predicates thread) so they can make requests
*/
std::queue<ThreadRequest> persistence_request_queue;
/**
* Queue of requests for the verification thread, which is shared with the
* predicates thread so it can make requests.
*/
std::queue<ThreadRequest> verify_request_queue;
/** A test-and-set lock guarding the persistence request queue */
std::atomic_flag prq_lock = ATOMIC_FLAG_INIT;
/** A test-and-set lock guarding the verification request queue */
std::atomic_flag vrq_lock = ATOMIC_FLAG_INIT;
/**
* The latest version that has been persisted successfully in each subgroup
* (indexed by subgroup number). Updated each time a persistence request completes.
* This is equal to this node's row of the SST field persisted_num, but cached
* in non-SST memory to more easily check if a persistence request is obsolete.
*/
std::vector<persistent::version_t> last_persisted_version;
/**
* The Verifier to use for verifying other replicas' signatures over
* persistent log entries, if signatures are enabled. This will be null if
* signatures are disabled.
* The latest version that has been signed and verified in each subgroup,
* indexed by subgroup number. This is equal to this node's row of the SST field
* verified_num but cached in non-SST memory, analogous to last_persisted_version.
* Remains at -1 for all subgroups if signatures are not enabled in this group.
*/
std::unique_ptr<openssl::Verifier> signature_verifier;
std::vector<persistent::version_t> last_verified_version;
/** The size of a signature (which is a constant), or 0 if signatures are disabled. */
std::size_t signature_size;
/**
Expand Down Expand Up @@ -124,7 +136,7 @@ class PersistenceManager {
/** @return the size of a signature on an update in this group. */
std::size_t get_signature_size() const;

/** Start the persistent thread. */
/** Start the persistence and verification threads. */
void start();

/** post a persistence request */
Expand All @@ -145,8 +157,9 @@ class PersistenceManager {
void make_version(const subgroup_id_t& subgroup_id,
const persistent::version_t& version, const HLC& mhlc);

/** shutdown the thread
* @param wait Wait till the thread finished or not.
/**
* Shutdown the threads
* @param wait Whether to wait until both threads have finished
*/
void shutdown(bool wait);
};
Expand Down
54 changes: 21 additions & 33 deletions include/derecho/core/detail/replicated_impl.hpp
Expand Up @@ -32,7 +32,7 @@ Replicated<T>::Replicated(subgroup_type_id_t type_id, node_id_t nid, subgroup_id
T::register_functions())),
group(group),
current_version(persistent::INVALID_VERSION),
current_hlc(0,0) {
current_hlc(0, 0) {
if constexpr(std::is_base_of_v<GroupReference, T>) {
(**user_object_ptr).set_group_pointers(group, subgroup_index);
}
Expand Down Expand Up @@ -217,32 +217,26 @@ void Replicated<T>::new_view_callback(const View& new_view) {
}
}


template <typename T>
void Replicated<T>::make_version(persistent::version_t ver, const HLC& hlc) {
persistent_registry->makeVersion(ver, hlc);
}

template <typename T>
persistent::version_t Replicated<T>::persist(persistent::version_t version, uint8_t* signature) {
persistent::version_t Replicated<T>::sign(uint8_t* signature_buffer) {
if constexpr(!has_signed_fields_v<T>) {
return persistent::INVALID_VERSION;
}
return persistent_registry->sign(*signer, signature_buffer);
}

template <typename T>
persistent::version_t Replicated<T>::persist(std::optional<persistent::version_t> version) {
if constexpr(!has_persistent_fields_v<T>) {
// for replicated<T> without Persistent fields,
// tell the persistent thread that we are done.
return version;
return persistent::INVALID_VERSION;
}
persistent::version_t persisted_ver = persistent_registry->getMinimumLatestVersion();
persistent::version_t signed_ver;
// Ask PersistentRegistry to sign then persist all the Persistent fields;
// if persist() actually persists a later version than requested, repeat
// until the latest version has been both signed and persisted.
do {
signed_ver = persisted_ver;
if constexpr(has_signed_fields_v<T>) {
persistent_registry->sign(signed_ver, *signer, signature);
}
persisted_ver = persistent_registry->persist(signed_ver);
} while(persisted_ver < version || signed_ver < persisted_ver);
return persisted_ver;

return persistent_registry->persist(version);
};

template <typename T>
Expand Down Expand Up @@ -284,10 +278,10 @@ persistent::version_t Replicated<T>::get_minimum_latest_persisted_version() {
template <typename T>
void Replicated<T>::post_next_version(persistent::version_t version, uint64_t ts_us) {
current_version = version;
if (current_hlc > HLC{ts_us,0}) {
current_hlc.m_logic ++;
if(current_hlc > HLC{ts_us, 0}) {
current_hlc.m_logic++;
} else {
current_hlc = {ts_us,0};
current_hlc = {ts_us, 0};
}
}

Expand All @@ -296,40 +290,34 @@ std::tuple<persistent::version_t, HLC> Replicated<T>::get_current_version() {
return std::tie(current_version, current_hlc);
}

template <typename T>
void Replicated<T>::register_persistent_member(const char* object_name,
persistent::PersistentObject* member_pointer) {
this->persistent_registry->registerPersistent(object_name, member_pointer);
}

template <typename T>
const T& Replicated<T>::get_ref() const {
return **user_object_ptr;
}

template <typename T>
void Replicated<T>::oob_remote_write(const node_id_t& remote_node, const struct iovec* iov, int iovcnt, uint64_t remote_dest_addr, uint64_t rkey, size_t size) {
group_rpc_manager.oob_remote_write(remote_node,iov,iovcnt,remote_dest_addr,rkey,size);
group_rpc_manager.oob_remote_write(remote_node, iov, iovcnt, remote_dest_addr, rkey, size);
}

template <typename T>
void Replicated<T>::oob_remote_read(const node_id_t& remote_node, const struct iovec* iov, int iovcnt, uint64_t remote_src_addr, uint64_t rkey, size_t size) {
group_rpc_manager.oob_remote_read(remote_node,iov,iovcnt,remote_src_addr,rkey,size);
group_rpc_manager.oob_remote_read(remote_node, iov, iovcnt, remote_src_addr, rkey, size);
}

template <typename T>
void Replicated<T>::oob_send(const node_id_t& remote_node, const struct iovec* iov, int iovcnt) {
group_rpc_manager.oob_send(remote_node,iov,iovcnt);
group_rpc_manager.oob_send(remote_node, iov, iovcnt);
}

template <typename T>
void Replicated<T>::oob_recv(const node_id_t& remote_node, const struct iovec* iov, int iovcnt) {
group_rpc_manager.oob_recv(remote_node,iov,iovcnt);
group_rpc_manager.oob_recv(remote_node, iov, iovcnt);
}

template <typename T>
void Replicated<T>::wait_for_oob_op(const node_id_t& remote_node, uint32_t op, uint64_t timeout_us) {
group_rpc_manager.wait_for_oob_op(remote_node,op,timeout_us);
group_rpc_manager.wait_for_oob_op(remote_node, op, timeout_us);
}

template <typename T>
Expand Down
18 changes: 12 additions & 6 deletions include/derecho/core/detail/replicated_interface.hpp
Expand Up @@ -4,6 +4,9 @@
#include "derecho/tcp/tcp.hpp"
#include "derecho_internal.hpp"

#include <optional>
#include <vector>

namespace derecho {

/**
Expand All @@ -13,20 +16,23 @@ namespace derecho {
*/
class ReplicatedObject {
public:
/* ---- Public-facing API functions; subset of the public functions defined in Replicated<T> ---- */
virtual ~ReplicatedObject() = default;
virtual bool is_valid() const = 0;
virtual std::size_t object_size() const = 0;
virtual void send_object(tcp::socket& receiver_socket) const = 0;
virtual void send_object_raw(tcp::socket& receiver_socket) const = 0;
virtual std::size_t receive_object(uint8_t* buffer) = 0;
virtual bool is_persistent() const = 0;
virtual bool is_signed() const = 0;
virtual void make_version(persistent::version_t ver, const HLC& hlc) = 0;
virtual persistent::version_t get_minimum_latest_persisted_version() = 0;
virtual persistent::version_t persist(persistent::version_t version, uint8_t* signature) = 0;
virtual std::vector<uint8_t> get_signature(persistent::version_t version) = 0;
virtual bool verify_log(persistent::version_t version, openssl::Verifier& verifier,
const uint8_t* signature) = 0;
/* ---- Internal-only API ---- */
virtual std::size_t object_size() const = 0;
virtual void send_object(tcp::socket& receiver_socket) const = 0;
virtual void send_object_raw(tcp::socket& receiver_socket) const = 0;
virtual std::size_t receive_object(uint8_t* buffer) = 0;
virtual void make_version(persistent::version_t ver, const HLC& hlc) = 0;
virtual persistent::version_t sign(uint8_t* signature_buffer) = 0;
virtual persistent::version_t persist(std::optional<persistent::version_t> version = std::nullopt) = 0;
virtual void truncate(persistent::version_t latest_version) = 0;
virtual void post_next_version(persistent::version_t version, uint64_t msg_ts) = 0;
};
Expand Down