Skip to content

Commit

Permalink
Merge pull request #271 from Derecho-Project/signed_log_bug_fix
Browse files Browse the repository at this point in the history
Fixed signature support to handle Delta-persistent objects
  • Loading branch information
songweijia committed Apr 27, 2024
2 parents 3fd5b4d + 0590177 commit 4a4c41b
Show file tree
Hide file tree
Showing 21 changed files with 837 additions and 340 deletions.
23 changes: 17 additions & 6 deletions include/derecho/core/detail/derecho_sst.hpp
Expand Up @@ -79,13 +79,23 @@ class DerechoSST : public sst::SST<DerechoSST> {
* subgroup.
*/
SSTFieldVector<persistent::version_t> persisted_num;
/**
* This represents the highest persistent version number that has a
* signature in its log at this node, if any persistent fields have
* signatures enabled. Since signatures are added to log entries at
* the same time as persistence, this is usually equal to persisted_num,
* but it may lag behind if there are persistent fields that do not have
* signatures enabled (hence there may be persistent versions with no
* corresponding signature). Contains one entry per subgroup.
*/
SSTFieldVector<persistent::version_t> signed_num;
/**
* This represents the highest persistent version number for which this
* node has verified a signature from all other nodes in the subgroup, if
* signatures are enabled. There is updated by the PersistenceManager, and
* contains one entry per subgroup. It will generally lag behind
* persisted_num, since updates are only verified once they have been
* signed locally.
* any persistent fields have signatures enabled. This is updated by the
* PersistenceManager, and contains one entry per subgroup. It will
* generally lag behind persisted_num, since updates are only verified once
* they have been signed locally.
*/
SSTFieldVector<persistent::version_t> verified_num;

Expand Down Expand Up @@ -181,7 +191,7 @@ class DerechoSST : public sst::SST<DerechoSST> {
* standard SST constructor.
* @param num_subgroups Number of the subgroups
* @param signature_size Size of the signature
* @param num_received_size
* @param num_received_size
* @param slot_size
* @param index_field_size
*/
Expand All @@ -191,6 +201,7 @@ class DerechoSST : public sst::SST<DerechoSST> {
delivered_num(num_subgroups),
signatures(num_subgroups * signature_size),
persisted_num(num_subgroups),
signed_num(num_subgroups),
verified_num(num_subgroups),
suspected(parameters.members.size()),
changes(100 + parameters.members.size()), //The extra 100 entries allows for more joins at startup, when the group is very small
Expand All @@ -208,7 +219,7 @@ class DerechoSST : public sst::SST<DerechoSST> {
index(index_field_size),
local_stability_frontier(num_subgroups) {
SSTInit(seq_num, delivered_num, signatures,
persisted_num, verified_num,
persisted_num, signed_num, verified_num,
vid, suspected, changes, joiner_ips,
joiner_gms_ports, joiner_state_transfer_ports, joiner_sst_ports, joiner_rdmc_ports, joiner_external_ports,
num_changes, num_committed, num_acked, num_installed,
Expand Down
43 changes: 28 additions & 15 deletions include/derecho/core/detail/persistence_manager.hpp
Expand Up @@ -30,49 +30,61 @@ class ViewManager;
*/
class PersistenceManager {
public:
enum class RequestType {
PERSIST,
VERIFY
};
struct ThreadRequest {
RequestType operation;
subgroup_id_t subgroup_id;
persistent::version_t version;
};

private:
/** Pointer to the persistence-module logger */
std::shared_ptr<spdlog::logger> persistence_logger;
/** Thread handle */
/** Thread handle for the persistence thread */
std::thread persist_thread;
/** Thread handle for the verification thread */
std::thread verify_thread;
/**
* A flag to signal the persistent thread to shutdown; set to true when the
* group is destroyed.
* A flag to signal the PersistenceManager's threads to shutdown; set to
* true when the group is destroyed.
*/
std::atomic<bool> thread_shutdown;
/**
* A semaphore that counts the number of persistence requests available for
* the persistence thread to handle
*/
sem_t persistence_request_sem;
/**
* A semaphore that counts the number of verification requests available for
* the verification thread to handle
*/
sem_t verification_request_sem;
/**
* Queue of requests for the persistence thread, which is shared with other
* threads (e.g. the predicates thread) so they can make requests
*/
std::queue<ThreadRequest> persistence_request_queue;
/**
* Queue of requests for the verification thread, which is shared with the
* predicates thread so it can make requests.
*/
std::queue<ThreadRequest> verify_request_queue;
/** A test-and-set lock guarding the persistence request queue */
std::atomic_flag prq_lock = ATOMIC_FLAG_INIT;
/** A test-and-set lock guarding the verification request queue */
std::atomic_flag vrq_lock = ATOMIC_FLAG_INIT;
/**
* The latest version that has been persisted successfully in each subgroup
* (indexed by subgroup number). Updated each time a persistence request completes.
* This is equal to this node's row of the SST field persisted_num, but cached
* in non-SST memory to more easily check if a persistence request is obsolete.
*/
std::vector<persistent::version_t> last_persisted_version;
/**
* The Verifier to use for verifying other replicas' signatures over
* persistent log entries, if signatures are enabled. This will be null if
* signatures are disabled.
* The latest version that has been signed and verified in each subgroup,
* indexed by subgroup number. This is equal to this node's row of the SST field
* verified_num but cached in non-SST memory, analogous to last_persisted_version.
* Remains at -1 for all subgroups if signatures are not enabled in this group.
*/
std::unique_ptr<openssl::Verifier> signature_verifier;
std::vector<persistent::version_t> last_verified_version;
/** The size of a signature (which is a constant), or 0 if signatures are disabled. */
std::size_t signature_size;
/**
Expand Down Expand Up @@ -124,7 +136,7 @@ class PersistenceManager {
/** @return the size of a signature on an update in this group. */
std::size_t get_signature_size() const;

/** Start the persistent thread. */
/** Start the persistence and verification threads. */
void start();

/** post a persistence request */
Expand All @@ -145,8 +157,9 @@ class PersistenceManager {
void make_version(const subgroup_id_t& subgroup_id,
const persistent::version_t& version, const HLC& mhlc);

/** shutdown the thread
* @param wait Wait till the thread finished or not.
/**
* Shutdown the threads
* @param wait Whether to wait until both threads have finished
*/
void shutdown(bool wait);
};
Expand Down
54 changes: 21 additions & 33 deletions include/derecho/core/detail/replicated_impl.hpp
Expand Up @@ -32,7 +32,7 @@ Replicated<T>::Replicated(subgroup_type_id_t type_id, node_id_t nid, subgroup_id
T::register_functions())),
group(group),
current_version(persistent::INVALID_VERSION),
current_hlc(0,0) {
current_hlc(0, 0) {
if constexpr(std::is_base_of_v<GroupReference, T>) {
(**user_object_ptr).set_group_pointers(group, subgroup_index);
}
Expand Down Expand Up @@ -217,32 +217,26 @@ void Replicated<T>::new_view_callback(const View& new_view) {
}
}


template <typename T>
void Replicated<T>::make_version(persistent::version_t ver, const HLC& hlc) {
persistent_registry->makeVersion(ver, hlc);
}

template <typename T>
persistent::version_t Replicated<T>::persist(persistent::version_t version, uint8_t* signature) {
persistent::version_t Replicated<T>::sign(uint8_t* signature_buffer) {
if constexpr(!has_signed_fields_v<T>) {
return persistent::INVALID_VERSION;
}
return persistent_registry->sign(*signer, signature_buffer);
}

template <typename T>
persistent::version_t Replicated<T>::persist(std::optional<persistent::version_t> version) {
if constexpr(!has_persistent_fields_v<T>) {
// for replicated<T> without Persistent fields,
// tell the persistent thread that we are done.
return version;
return persistent::INVALID_VERSION;
}
persistent::version_t persisted_ver = persistent_registry->getMinimumLatestVersion();
persistent::version_t signed_ver;
// Ask PersistentRegistry to sign then persist all the Persistent fields;
// if persist() actually persists a later version than requested, repeat
// until the latest version has been both signed and persisted.
do {
signed_ver = persisted_ver;
if constexpr(has_signed_fields_v<T>) {
persistent_registry->sign(signed_ver, *signer, signature);
}
persisted_ver = persistent_registry->persist(signed_ver);
} while(persisted_ver < version || signed_ver < persisted_ver);
return persisted_ver;

return persistent_registry->persist(version);
};

template <typename T>
Expand Down Expand Up @@ -284,10 +278,10 @@ persistent::version_t Replicated<T>::get_minimum_latest_persisted_version() {
template <typename T>
void Replicated<T>::post_next_version(persistent::version_t version, uint64_t ts_us) {
current_version = version;
if (current_hlc > HLC{ts_us,0}) {
current_hlc.m_logic ++;
if(current_hlc > HLC{ts_us, 0}) {
current_hlc.m_logic++;
} else {
current_hlc = {ts_us,0};
current_hlc = {ts_us, 0};
}
}

Expand All @@ -296,40 +290,34 @@ std::tuple<persistent::version_t, HLC> Replicated<T>::get_current_version() {
return std::tie(current_version, current_hlc);
}

template <typename T>
void Replicated<T>::register_persistent_member(const char* object_name,
persistent::PersistentObject* member_pointer) {
this->persistent_registry->registerPersistent(object_name, member_pointer);
}

template <typename T>
const T& Replicated<T>::get_ref() const {
return **user_object_ptr;
}

template <typename T>
void Replicated<T>::oob_remote_write(const node_id_t& remote_node, const struct iovec* iov, int iovcnt, uint64_t remote_dest_addr, uint64_t rkey, size_t size) {
group_rpc_manager.oob_remote_write(remote_node,iov,iovcnt,remote_dest_addr,rkey,size);
group_rpc_manager.oob_remote_write(remote_node, iov, iovcnt, remote_dest_addr, rkey, size);
}

template <typename T>
void Replicated<T>::oob_remote_read(const node_id_t& remote_node, const struct iovec* iov, int iovcnt, uint64_t remote_src_addr, uint64_t rkey, size_t size) {
group_rpc_manager.oob_remote_read(remote_node,iov,iovcnt,remote_src_addr,rkey,size);
group_rpc_manager.oob_remote_read(remote_node, iov, iovcnt, remote_src_addr, rkey, size);
}

template <typename T>
void Replicated<T>::oob_send(const node_id_t& remote_node, const struct iovec* iov, int iovcnt) {
group_rpc_manager.oob_send(remote_node,iov,iovcnt);
group_rpc_manager.oob_send(remote_node, iov, iovcnt);
}

template <typename T>
void Replicated<T>::oob_recv(const node_id_t& remote_node, const struct iovec* iov, int iovcnt) {
group_rpc_manager.oob_recv(remote_node,iov,iovcnt);
group_rpc_manager.oob_recv(remote_node, iov, iovcnt);
}

template <typename T>
void Replicated<T>::wait_for_oob_op(const node_id_t& remote_node, uint32_t op, uint64_t timeout_us) {
group_rpc_manager.wait_for_oob_op(remote_node,op,timeout_us);
group_rpc_manager.wait_for_oob_op(remote_node, op, timeout_us);
}

template <typename T>
Expand Down
18 changes: 12 additions & 6 deletions include/derecho/core/detail/replicated_interface.hpp
Expand Up @@ -4,6 +4,9 @@
#include "derecho/tcp/tcp.hpp"
#include "derecho_internal.hpp"

#include <optional>
#include <vector>

namespace derecho {

/**
Expand All @@ -13,20 +16,23 @@ namespace derecho {
*/
class ReplicatedObject {
public:
/* ---- Public-facing API functions; subset of the public functions defined in Replicated<T> ---- */
virtual ~ReplicatedObject() = default;
virtual bool is_valid() const = 0;
virtual std::size_t object_size() const = 0;
virtual void send_object(tcp::socket& receiver_socket) const = 0;
virtual void send_object_raw(tcp::socket& receiver_socket) const = 0;
virtual std::size_t receive_object(uint8_t* buffer) = 0;
virtual bool is_persistent() const = 0;
virtual bool is_signed() const = 0;
virtual void make_version(persistent::version_t ver, const HLC& hlc) = 0;
virtual persistent::version_t get_minimum_latest_persisted_version() = 0;
virtual persistent::version_t persist(persistent::version_t version, uint8_t* signature) = 0;
virtual std::vector<uint8_t> get_signature(persistent::version_t version) = 0;
virtual bool verify_log(persistent::version_t version, openssl::Verifier& verifier,
const uint8_t* signature) = 0;
/* ---- Internal-only API ---- */
virtual std::size_t object_size() const = 0;
virtual void send_object(tcp::socket& receiver_socket) const = 0;
virtual void send_object_raw(tcp::socket& receiver_socket) const = 0;
virtual std::size_t receive_object(uint8_t* buffer) = 0;
virtual void make_version(persistent::version_t ver, const HLC& hlc) = 0;
virtual persistent::version_t sign(uint8_t* signature_buffer) = 0;
virtual persistent::version_t persist(std::optional<persistent::version_t> version = std::nullopt) = 0;
virtual void truncate(persistent::version_t latest_version) = 0;
virtual void post_next_version(persistent::version_t version, uint64_t msg_ts) = 0;
};
Expand Down

0 comments on commit 4a4c41b

Please sign in to comment.