diff --git a/include/derecho/core/detail/derecho_sst.hpp b/include/derecho/core/detail/derecho_sst.hpp index 31e1ce71..109376d8 100644 --- a/include/derecho/core/detail/derecho_sst.hpp +++ b/include/derecho/core/detail/derecho_sst.hpp @@ -79,13 +79,23 @@ class DerechoSST : public sst::SST { * subgroup. */ SSTFieldVector persisted_num; + /** + * This represents the highest persistent version number that has a + * signature in its log at this node, if any persistent fields have + * signatures enabled. Since signatures are added to log entries at + * the same time as persistence, this is usually equal to persisted_num, + * but it may lag behind if there are persistent fields that do not have + * signatures enabled (hence there may be persistent versions with no + * corresponding signature). Contains one entry per subgroup. + */ + SSTFieldVector signed_num; /** * This represents the highest persistent version number for which this * node has verified a signature from all other nodes in the subgroup, if - * signatures are enabled. There is updated by the PersistenceManager, and - * contains one entry per subgroup. It will generally lag behind - * persisted_num, since updates are only verified once they have been - * signed locally. + * any persistent fields have signatures enabled. This is updated by the + * PersistenceManager, and contains one entry per subgroup. It will + * generally lag behind persisted_num, since updates are only verified once + * they have been signed locally. */ SSTFieldVector verified_num; @@ -181,7 +191,7 @@ class DerechoSST : public sst::SST { * standard SST constructor. * @param num_subgroups Number of the subgroups * @param signature_size Size of the signature - * @param num_received_size + * @param num_received_size * @param slot_size * @param index_field_size */ @@ -191,6 +201,7 @@ class DerechoSST : public sst::SST { delivered_num(num_subgroups), signatures(num_subgroups * signature_size), persisted_num(num_subgroups), + signed_num(num_subgroups), verified_num(num_subgroups), suspected(parameters.members.size()), changes(100 + parameters.members.size()), //The extra 100 entries allows for more joins at startup, when the group is very small @@ -208,7 +219,7 @@ class DerechoSST : public sst::SST { index(index_field_size), local_stability_frontier(num_subgroups) { SSTInit(seq_num, delivered_num, signatures, - persisted_num, verified_num, + persisted_num, signed_num, verified_num, vid, suspected, changes, joiner_ips, joiner_gms_ports, joiner_state_transfer_ports, joiner_sst_ports, joiner_rdmc_ports, joiner_external_ports, num_changes, num_committed, num_acked, num_installed, diff --git a/include/derecho/core/detail/persistence_manager.hpp b/include/derecho/core/detail/persistence_manager.hpp index a52e1f1a..71372708 100644 --- a/include/derecho/core/detail/persistence_manager.hpp +++ b/include/derecho/core/detail/persistence_manager.hpp @@ -30,12 +30,7 @@ class ViewManager; */ class PersistenceManager { public: - enum class RequestType { - PERSIST, - VERIFY - }; struct ThreadRequest { - RequestType operation; subgroup_id_t subgroup_id; persistent::version_t version; }; @@ -43,11 +38,13 @@ class PersistenceManager { private: /** Pointer to the persistence-module logger */ std::shared_ptr persistence_logger; - /** Thread handle */ + /** Thread handle for the persistence thread */ std::thread persist_thread; + /** Thread handle for the verification thread */ + std::thread verify_thread; /** - * A flag to signal the persistent thread to shutdown; set to true when the - * group is destroyed. + * A flag to signal the PersistenceManager's threads to shutdown; set to + * true when the group is destroyed. */ std::atomic thread_shutdown; /** @@ -55,24 +52,39 @@ class PersistenceManager { * the persistence thread to handle */ sem_t persistence_request_sem; + /** + * A semaphore that counts the number of verification requests available for + * the verification thread to handle + */ + sem_t verification_request_sem; /** * Queue of requests for the persistence thread, which is shared with other * threads (e.g. the predicates thread) so they can make requests */ std::queue persistence_request_queue; + /** + * Queue of requests for the verification thread, which is shared with the + * predicates thread so it can make requests. + */ + std::queue verify_request_queue; /** A test-and-set lock guarding the persistence request queue */ std::atomic_flag prq_lock = ATOMIC_FLAG_INIT; + /** A test-and-set lock guarding the verification request queue */ + std::atomic_flag vrq_lock = ATOMIC_FLAG_INIT; /** * The latest version that has been persisted successfully in each subgroup * (indexed by subgroup number). Updated each time a persistence request completes. + * This is equal to this node's row of the SST field persisted_num, but cached + * in non-SST memory to more easily check if a persistence request is obsolete. */ std::vector last_persisted_version; /** - * The Verifier to use for verifying other replicas' signatures over - * persistent log entries, if signatures are enabled. This will be null if - * signatures are disabled. + * The latest version that has been signed and verified in each subgroup, + * indexed by subgroup number. This is equal to this node's row of the SST field + * verified_num but cached in non-SST memory, analogous to last_persisted_version. + * Remains at -1 for all subgroups if signatures are not enabled in this group. */ - std::unique_ptr signature_verifier; + std::vector last_verified_version; /** The size of a signature (which is a constant), or 0 if signatures are disabled. */ std::size_t signature_size; /** @@ -124,7 +136,7 @@ class PersistenceManager { /** @return the size of a signature on an update in this group. */ std::size_t get_signature_size() const; - /** Start the persistent thread. */ + /** Start the persistence and verification threads. */ void start(); /** post a persistence request */ @@ -145,8 +157,9 @@ class PersistenceManager { void make_version(const subgroup_id_t& subgroup_id, const persistent::version_t& version, const HLC& mhlc); - /** shutdown the thread - * @param wait Wait till the thread finished or not. + /** + * Shutdown the threads + * @param wait Whether to wait until both threads have finished */ void shutdown(bool wait); }; diff --git a/include/derecho/core/detail/replicated_impl.hpp b/include/derecho/core/detail/replicated_impl.hpp index 487df509..21525291 100644 --- a/include/derecho/core/detail/replicated_impl.hpp +++ b/include/derecho/core/detail/replicated_impl.hpp @@ -32,7 +32,7 @@ Replicated::Replicated(subgroup_type_id_t type_id, node_id_t nid, subgroup_id T::register_functions())), group(group), current_version(persistent::INVALID_VERSION), - current_hlc(0,0) { + current_hlc(0, 0) { if constexpr(std::is_base_of_v) { (**user_object_ptr).set_group_pointers(group, subgroup_index); } @@ -217,32 +217,26 @@ void Replicated::new_view_callback(const View& new_view) { } } - template void Replicated::make_version(persistent::version_t ver, const HLC& hlc) { persistent_registry->makeVersion(ver, hlc); } template -persistent::version_t Replicated::persist(persistent::version_t version, uint8_t* signature) { +persistent::version_t Replicated::sign(uint8_t* signature_buffer) { + if constexpr(!has_signed_fields_v) { + return persistent::INVALID_VERSION; + } + return persistent_registry->sign(*signer, signature_buffer); +} + +template +persistent::version_t Replicated::persist(std::optional version) { if constexpr(!has_persistent_fields_v) { - // for replicated without Persistent fields, - // tell the persistent thread that we are done. - return version; + return persistent::INVALID_VERSION; } - persistent::version_t persisted_ver = persistent_registry->getMinimumLatestVersion(); - persistent::version_t signed_ver; - // Ask PersistentRegistry to sign then persist all the Persistent fields; - // if persist() actually persists a later version than requested, repeat - // until the latest version has been both signed and persisted. - do { - signed_ver = persisted_ver; - if constexpr(has_signed_fields_v) { - persistent_registry->sign(signed_ver, *signer, signature); - } - persisted_ver = persistent_registry->persist(signed_ver); - } while(persisted_ver < version || signed_ver < persisted_ver); - return persisted_ver; + + return persistent_registry->persist(version); }; template @@ -284,10 +278,10 @@ persistent::version_t Replicated::get_minimum_latest_persisted_version() { template void Replicated::post_next_version(persistent::version_t version, uint64_t ts_us) { current_version = version; - if (current_hlc > HLC{ts_us,0}) { - current_hlc.m_logic ++; + if(current_hlc > HLC{ts_us, 0}) { + current_hlc.m_logic++; } else { - current_hlc = {ts_us,0}; + current_hlc = {ts_us, 0}; } } @@ -296,12 +290,6 @@ std::tuple Replicated::get_current_version() { return std::tie(current_version, current_hlc); } -template -void Replicated::register_persistent_member(const char* object_name, - persistent::PersistentObject* member_pointer) { - this->persistent_registry->registerPersistent(object_name, member_pointer); -} - template const T& Replicated::get_ref() const { return **user_object_ptr; @@ -309,27 +297,27 @@ const T& Replicated::get_ref() const { template void Replicated::oob_remote_write(const node_id_t& remote_node, const struct iovec* iov, int iovcnt, uint64_t remote_dest_addr, uint64_t rkey, size_t size) { - group_rpc_manager.oob_remote_write(remote_node,iov,iovcnt,remote_dest_addr,rkey,size); + group_rpc_manager.oob_remote_write(remote_node, iov, iovcnt, remote_dest_addr, rkey, size); } template void Replicated::oob_remote_read(const node_id_t& remote_node, const struct iovec* iov, int iovcnt, uint64_t remote_src_addr, uint64_t rkey, size_t size) { - group_rpc_manager.oob_remote_read(remote_node,iov,iovcnt,remote_src_addr,rkey,size); + group_rpc_manager.oob_remote_read(remote_node, iov, iovcnt, remote_src_addr, rkey, size); } template void Replicated::oob_send(const node_id_t& remote_node, const struct iovec* iov, int iovcnt) { - group_rpc_manager.oob_send(remote_node,iov,iovcnt); + group_rpc_manager.oob_send(remote_node, iov, iovcnt); } template void Replicated::oob_recv(const node_id_t& remote_node, const struct iovec* iov, int iovcnt) { - group_rpc_manager.oob_recv(remote_node,iov,iovcnt); + group_rpc_manager.oob_recv(remote_node, iov, iovcnt); } template void Replicated::wait_for_oob_op(const node_id_t& remote_node, uint32_t op, uint64_t timeout_us) { - group_rpc_manager.wait_for_oob_op(remote_node,op,timeout_us); + group_rpc_manager.wait_for_oob_op(remote_node, op, timeout_us); } template diff --git a/include/derecho/core/detail/replicated_interface.hpp b/include/derecho/core/detail/replicated_interface.hpp index 80ed6aa7..8ef2afd1 100644 --- a/include/derecho/core/detail/replicated_interface.hpp +++ b/include/derecho/core/detail/replicated_interface.hpp @@ -4,6 +4,9 @@ #include "derecho/tcp/tcp.hpp" #include "derecho_internal.hpp" +#include +#include + namespace derecho { /** @@ -13,20 +16,23 @@ namespace derecho { */ class ReplicatedObject { public: +/* ---- Public-facing API functions; subset of the public functions defined in Replicated ---- */ virtual ~ReplicatedObject() = default; virtual bool is_valid() const = 0; - virtual std::size_t object_size() const = 0; - virtual void send_object(tcp::socket& receiver_socket) const = 0; - virtual void send_object_raw(tcp::socket& receiver_socket) const = 0; - virtual std::size_t receive_object(uint8_t* buffer) = 0; virtual bool is_persistent() const = 0; virtual bool is_signed() const = 0; - virtual void make_version(persistent::version_t ver, const HLC& hlc) = 0; virtual persistent::version_t get_minimum_latest_persisted_version() = 0; - virtual persistent::version_t persist(persistent::version_t version, uint8_t* signature) = 0; virtual std::vector get_signature(persistent::version_t version) = 0; virtual bool verify_log(persistent::version_t version, openssl::Verifier& verifier, const uint8_t* signature) = 0; +/* ---- Internal-only API ---- */ + virtual std::size_t object_size() const = 0; + virtual void send_object(tcp::socket& receiver_socket) const = 0; + virtual void send_object_raw(tcp::socket& receiver_socket) const = 0; + virtual std::size_t receive_object(uint8_t* buffer) = 0; + virtual void make_version(persistent::version_t ver, const HLC& hlc) = 0; + virtual persistent::version_t sign(uint8_t* signature_buffer) = 0; + virtual persistent::version_t persist(std::optional version = std::nullopt) = 0; virtual void truncate(persistent::version_t latest_version) = 0; virtual void post_next_version(persistent::version_t version, uint64_t msg_ts) = 0; }; diff --git a/include/derecho/core/replicated.hpp b/include/derecho/core/replicated.hpp index 2364b7ea..ef44ce9b 100644 --- a/include/derecho/core/replicated.hpp +++ b/include/derecho/core/replicated.hpp @@ -17,8 +17,10 @@ #include #include #include +#include #include #include +#include namespace derecho { @@ -78,12 +80,12 @@ class GetsViewChangeCallback { /** * A template whose member field "value" will be true if type T inherits * from GetsViewChangeCallback. -*/ -template + */ +template using view_callback_enabled = std::is_base_of; /** Shortcut for view_callback_enabled::value */ -template +template inline constexpr bool view_callback_enabled_v = view_callback_enabled::value; /** @@ -206,6 +208,8 @@ class Replicated : public ReplicatedObject, public persistent::ITemporalQueryFro Replicated(const Replicated&) = delete; virtual ~Replicated(); + /* ---- Public API Functions - can be called by user code ---- */ + /** * @return The value of has_persistent_fields for this Replicated's * template parameter. This is true if any field of the user object T is @@ -229,7 +233,7 @@ class Replicated : public ReplicatedObject, public persistent::ITemporalQueryFro * replicated object, false if it is "empty" because this node is not a * member of the subgroup that replicates T. */ - bool is_valid() const { + virtual bool is_valid() const { return *user_object_ptr && true; } @@ -280,41 +284,6 @@ class Replicated : public ReplicatedObject, public persistent::ITemporalQueryFro */ void send(unsigned long long int payload_size, const std::function& msg_generator); - /** - * @return The serialized size of the object, of type T, that holds the - * state of this Replicated. - */ - std::size_t object_size() const; - - /** - * Serializes and sends the state of the "wrapped" object (of type T) for - * this Replicated over the given socket. (This includes sending the - * object's size before its data, so the receiver knows the size of buffer - * to allocate). - * @param receiver_socket - */ - void send_object(tcp::socket& receiver_socket) const; - - /** - * Serializes and sends the state of the "wrapped" object (of type T) for - * this Replicated over the given socket *without* first sending its size. - * Should only be used when sending a list of objects, preceded by their - * total size, otherwise the recipient will have no way of knowing how large - * a buffer to allocate for this object. - * @param receiver_socket - */ - void send_object_raw(tcp::socket& receiver_socket) const; - - /** - * Updates the state of the "wrapped" object by replacing it with the object - * serialized in a buffer. Returns the number of bytes read from the buffer, - * in case the caller needs to know. - * @param buffer A buffer containing a serialized T, which will replace this - * Replicated's wrapped T - * @return The number of bytes read from the buffer. - */ - std::size_t receive_object(uint8_t* buffer); - /** * A function called by Group to notify this Replicated object that a new * view has been installed. Forwards the notification to the wrapped object @@ -324,9 +293,19 @@ class Replicated : public ReplicatedObject, public persistent::ITemporalQueryFro */ void new_view_callback(const View& new_view); + /** + * Computes the minimum timestamp, in nanoseconds, that corresponds to a stable + * message across all members of the Group. This is the latest timestamp that + * can be considered stable, i.e., no messages will be delivered with an earlier + * timestamp. Note that this is derived from the local stability frontier at each + * replica, which is updated only by the timeout-checking thread, so its + * granularity depends on the configured Derecho heartbeat timeout. + * + * @return The stability frontier timestamp in nanoseconds + */ const uint64_t compute_global_stability_frontier(); - inline const HLC getFrontier() { + virtual inline const HLC getFrontier() { // transform from ns to us: HLC hlc(this->compute_global_stability_frontier() / INT64_1E3, 0); return hlc; @@ -343,50 +322,30 @@ class Replicated : public ReplicatedObject, public persistent::ITemporalQueryFro * Returns the current global persistence frontier, aka, stable frontier that will survive whole system restart. * Please note this applies to persistent data ONLY. The data not in Persistent<> are not saved. */ - virtual persistent::version_t get_global_persistence_frontier(); + persistent::version_t get_global_persistence_frontier(); /** * Wait until the current global persistence frontier advanced beyond a version. * @param version the version * @return false if the given version is beyond the latest atomic broadcast. */ - virtual bool wait_for_global_persistence_frontier(persistent::version_t version); + bool wait_for_global_persistence_frontier(persistent::version_t version); /** * Returns the current global verified frontier, aka, stable frontier that will survive whole system restart. * Please note this applies to persistent data ONLY. The data not in Persistent<> are not saved. */ - virtual persistent::version_t get_global_verified_frontier(); - - /** - * make a version for all the persistent members. - * @param ver the version number to be made - * @param hlc the hybrid clock - */ - virtual void make_version(persistent::version_t ver, const HLC& hlc); + persistent::version_t get_global_verified_frontier(); /** - * Persists the object's data up to at least the specified version; due to - * batching, a later version may actually be persisted if it is available. - * Returns the latest version actually persisted. If the signed log is - * enabled, also returns the signature over the latest persisted version in - * the provided buffer. - * @param version The version to persist up to. - * @param signature The byte array in which to put the signature, assumed to be - * the correct length for this node's signing key. - * @return The version actually persisted (and signed) - */ - virtual persistent::version_t persist(persistent::version_t version, - uint8_t* signature); - - /** - * Retreives a copy of the signature in the persistent log for a specified + * Retrieves a copy of the signature in the persistent log for a specified * version of this object. * @param version The logged version to retrieve the signature for * @return The signature in the log for the requested version, or an empty * vector if signatures are disabled or the requested version doesn't exist */ virtual std::vector get_signature(persistent::version_t version); + /** * Verifies the persistent log entry at the specified version against the * provided signature. @@ -400,29 +359,6 @@ class Replicated : public ReplicatedObject, public persistent::ITemporalQueryFro openssl::Verifier& verifier, const uint8_t* other_signature); - /** - * trim the logs to a version, inclusively. - * @param earliest_version - the version number, before which, logs are - * going to be trimmed - */ - virtual void trim(persistent::version_t earliest_version); - - /** - * Truncate the logs of all Persistent members back to the version - * specified. This deletes recently-used data, so it should only be called - * during failure recovery when some versions must be rolled back. - * @param latest_version The latest version number that should remain in the logs - */ - virtual void truncate(persistent::version_t latest_version); - - /** - * Post the next version to be assigned to an update. Called immediately - * before invoking an ordered_send RPC function to update current_version. - * @return version The new update's persistent version number - * @return ts_us The new update's timestamp in microseconds - */ - virtual void post_next_version(persistent::version_t version, uint64_t ts_us); - /** * Get the current version, set by the most recent ordered_send update. * During the execution of an ordered_send RPC method, this represents the @@ -433,13 +369,7 @@ class Replicated : public ReplicatedObject, public persistent::ITemporalQueryFro * the value of current_version. * @return an ordered pair (version number, HLC) */ - virtual std::tuple get_current_version(); - - /** - * Register a persistent member - */ - virtual void register_persistent_member(const char* object_name, - persistent::PersistentObject* member_pointer); + std::tuple get_current_version(); /** * @brief Expose reference to the wrapped user object. @@ -450,7 +380,9 @@ class Replicated : public ReplicatedObject, public persistent::ITemporalQueryFro * * @return a reference to the wrapped user object. */ - virtual const T& get_ref() const; + const T& get_ref() const; + + /* ---- Out-Of-Band (OOB) Memory API functions, also part of the public API ---- */ /** * @brief One-sided RDMA write to remote OOB memory. @@ -470,7 +402,7 @@ class Replicated : public ReplicatedObject, public persistent::ITemporalQueryFro */ virtual void oob_remote_write( const node_id_t& remote_node, - const struct iovec* iov, int iovcnt, + const struct iovec* iov, int iovcnt, uint64_t remote_dest_addr, uint64_t rkey, size_t size); /** @@ -533,6 +465,105 @@ class Replicated : public ReplicatedObject, public persistent::ITemporalQueryFro * @throw derecho::derecho_exception on error */ virtual void wait_for_oob_op(const node_id_t& remote_node, uint32_t op, uint64_t timeout_us); + + /* ---- End public API ---- */ + /* ---- Internal-Only API Functions - used by Derecho components but should NOT be called by user code ---- */ + + /** + * @return The serialized size of the object, of type T, that holds the + * state of this Replicated. + */ + virtual std::size_t object_size() const; + + /** + * Serializes and sends the state of the "wrapped" object (of type T) for + * this Replicated over the given socket. (This includes sending the + * object's size before its data, so the receiver knows the size of buffer + * to allocate). + * @param receiver_socket + */ + virtual void send_object(tcp::socket& receiver_socket) const; + + /** + * Serializes and sends the state of the "wrapped" object (of type T) for + * this Replicated over the given socket *without* first sending its size. + * Should only be used when sending a list of objects, preceded by their + * total size, otherwise the recipient will have no way of knowing how large + * a buffer to allocate for this object. + * @param receiver_socket + */ + virtual void send_object_raw(tcp::socket& receiver_socket) const; + + /** + * Updates the state of the "wrapped" object by replacing it with the object + * serialized in a buffer. Returns the number of bytes read from the buffer, + * in case the caller needs to know. + * @param buffer A buffer containing a serialized T, which will replace this + * Replicated's wrapped T + * @return The number of bytes read from the buffer. + */ + virtual std::size_t receive_object(uint8_t* buffer); + + /** + * make a version for all the persistent members. + * @param ver the version number to be made + * @param hlc the hybrid clock + */ + virtual void make_version(persistent::version_t ver, const HLC& hlc); + + /** + * Adds signatures to the object's versioned data up through the latest + * version, and places the signature over the latest signed version in the + * provided buffer. Returns the latest version actually signed, which may + * be earlier than the current version if the current version does not + * exist for any signed fields (even though it does exist in persistent-but- + * not-signed fields). + * + * @param signature_buffer The byte array in which to put the signature, + * assumed to be the correct length for this node's signing key. + * @return The latest version actually signed + */ + virtual persistent::version_t sign(uint8_t* signature_buffer); + + /** + * Persists the object's data. If the optional latest_version parameter is + * specified, the log will be persisted only up to that version (and no + * further). If the parameter is std::nullopt_t, the log will be persisted + * up to the current version in memory. Returns the latest version persisted, + * which will either be equal to the parameter (if provided) or the latest + * in-memory version at the time this method was called. + * + * @param latest_version If provided, the latest logged version to write to + * persistent storage. If not provided, all new in-memory versions (since + * the last call to persist) will be persisted. The default is std::nullopt + * since it is more efficient to persist the log in as large a batch as + * possible (up to the current version). + * @return The latest version actually persisted. + */ + virtual persistent::version_t persist(std::optional latest_version = std::nullopt); + + /** + * trim the logs to a version, inclusively. + * @param earliest_version - the version number, before which, logs are + * going to be trimmed + */ + virtual void trim(persistent::version_t earliest_version); + + /** + * Truncate the logs of all Persistent members back to the version + * specified. This deletes recently-used data, so it should only be called + * during failure recovery when some versions must be rolled back. + * @param latest_version The latest version number that should remain in the logs + */ + virtual void truncate(persistent::version_t latest_version); + + /** + * Post the next version to be assigned to an update. Called immediately + * before invoking an ordered_send RPC function to update current_version. + * @return version The new update's persistent version number + * @return ts_us The new update's timestamp in microseconds + */ + virtual void post_next_version(persistent::version_t version, uint64_t ts_us); }; template diff --git a/include/derecho/persistent/Persistent.hpp b/include/derecho/persistent/Persistent.hpp index ff9ade53..15884012 100644 --- a/include/derecho/persistent/Persistent.hpp +++ b/include/derecho/persistent/Persistent.hpp @@ -6,12 +6,12 @@ #include "PersistException.hpp" #include "PersistNoLog.hpp" #include "PersistentInterface.hpp" -#include -#include -#include #include "detail/FilePersistLog.hpp" #include "detail/PersistLog.hpp" #include "detail/logger.hpp" +#include "derecho/mutils-serialization/SerializationSupport.hpp" +#include "derecho/utils/logger.hpp" +#include "derecho/utils/time.h" #include #include @@ -106,11 +106,14 @@ class PersistentRegistry : public mutils::RemoteDeserializationContext { void makeVersion(version_t ver, const HLC& mhlc); /** - * Returns the minumum of the latest version across all Persistent fields. - * This is effectively the "current version" of the object, since all the - * Persistent fields should advance their version numbers at the same rate. + * Returns the minumum value of getCurrentVersion() across all Persistent + * fields, which is their current in-memory version regardless of whether + * it has any associated log data. This represents the current version of + * the whole object, since all the Persistent fields should advance their + * "current" version numbers at the same rate even if they don't all create + * log entries for every version. */ - version_t getMinimumLatestVersion(); + version_t getCurrentVersion() const; /** * Returns the minimum value of getNextVersionOf(version) across all @@ -119,19 +122,22 @@ class PersistentRegistry : public mutils::RemoteDeserializationContext { * versions. Returns INVALID_VERSION if there is no valid version later * than the argument. */ - version_t getMinimumVersionAfter(version_t version); + version_t getMinimumVersionAfter(version_t version) const; /** - * Adds signatures to the log up to the specified version, and returns the - * signature for the latest version. The version specified should be the - * result of calling getMinimumLatestVersion(). - * @param latest_version The version to add signatures up through + * Adds signatures to the log up to the current version, and returns the + * signature for the latest version signed. The latest version signed might + * be earlier than the current version, if the current version only exists + * in non-signed fields, but after calling this method all signed fields will + * have signatures up through their latest versions. * @param signer The Signer object to use for generating signatures, * initialized with the appropriate private key * @param signature_buffer A byte buffer in which the latest signature will * be placed after running this function + * @return The largest version actually signed, which may be earlier than the + * current (latest) version if the current version only exists in non-signed fields */ - void sign(version_t latest_version, openssl::Signer& signer, uint8_t* signature_buffer); + version_t sign(openssl::Signer& signer, uint8_t* signature_buffer); /** * Retrieves a signature from the log for a specific version of the object, @@ -157,14 +163,15 @@ class PersistentRegistry : public mutils::RemoteDeserializationContext { bool verify(version_t version, openssl::Verifier& verifier, const uint8_t* signature); /** - * Persist versions up to a specified version, which should be the result of - * calling getMinimumLatestVersion(). + * Persist versions either up to a specified version, if provided, or up + * to the current version. * - * @param latest_version The version to persist up to. + * @param latest_version The version to persist up to, or std::nullopt_t if + * the fields should be persisted up to their current in-memory version. * - * @return a version equal to getMinimumLatestPersistedVersion() + * @return The latest version persisted. */ - version_t persist(version_t latest_version); + version_t persist(std::optional latest_version); /** Trims the log of all versions earlier than the argument. */ void trim(version_t earliest_version); @@ -225,7 +232,7 @@ class PersistentRegistry : public mutils::RemoteDeserializationContext { return m_temporalQueryFrontierProvider->getFrontier(); #endif //NDEBUG } else { - return HLC(get_walltime()/INT64_1E3, (uint64_t)0); + return HLC(get_walltime() / INT64_1E3, (uint64_t)0); } } @@ -262,7 +269,7 @@ class PersistentRegistry : public mutils::RemoteDeserializationContext { static std::string generate_prefix(const std::type_index& subgroup_type, uint32_t subgroup_index, uint32_t shard_num); /** match prefix - * @param str a string begin with a prefix like + * @param str a string begin with a prefix like * [hex64 of subgroup_type]-[subgroup_index]-[shard_num]- * @param subgroup_type the type information of a subgroup * @param subgroup_index the index of a subgroup @@ -308,6 +315,14 @@ class PersistentRegistry : public mutils::RemoteDeserializationContext { * Set the earliest version to serialize for recovery. */ static thread_local int64_t earliest_version_to_serialize; + + /** + * Determines the next version in any signed field after the provided version, + * skipping both nonexistant versions and versions that only exist in non- + * signed fields. Similar to getMinimumVersionAfter but only considers signed + * fields. Only used internally by this class's sign() method. + */ + version_t getNextSignedVersion(version_t version) const; }; /* ---------------------------- DeltaSupport Interface ---------------------------- */ @@ -874,9 +889,10 @@ class Persistent : public PersistentObject, public mutils::ByteRepresentable { virtual int64_t getEarliestIndex() const; /** - * getEarlisestVersion() + * getEarliestVersion() * - * Get the earliest version excluding trimmed ones. + * Get the earliest version excluding trimmed ones. This is the version + * corresponding to getEarliestIndex(). * * @return the earliest version. */ @@ -894,12 +910,26 @@ class Persistent : public PersistentObject, public mutils::ByteRepresentable { /** * getLatestVersion() * - * Get the lastest version excluding truncated ones. + * Get the latest logged version excluding truncated ones. This is the version + * corresponding to getLatestIndex(). * - * @return the latest version. + * @return the latest version with log data */ virtual version_t getLatestVersion() const; + /** + * getCurrentVersion() + * + * Get the current in-memory version of the object, regardless of whether + * it has any log data associated with it. This may or may not match the + * version returned by getLatestVersion(), depending on whether ObjectType + * implements the IDeltaSupport interface (IDeltaSupport objects can create + * versions with no log data). + * + * @return the current version + */ + virtual version_t getCurrentVersion() const; + /** * getLastPersistedVersion() * @@ -990,12 +1020,16 @@ class Persistent : public PersistentObject, public mutils::ByteRepresentable { /** * persist(version_t) * - * Persist log entries up to the specified version. To avoid inefficiency, this - * should be the latest version. + * Persist log entries up to either the specified version, if present, or + * the latest version, if the argument is std::nullopt. It is more + * efficient to persist up to the current version, so the argument defaults + * to std::nullopt. * - * @param latest_version The version to persist up to + * @param latest_version Either the version to persist up to, or + * std::nullopt_t if the latest (current) version should be persisted. + * @return The latest version actually persisted. */ - virtual version_t persist(version_t latest_version); + virtual version_t persist(std::optional latest_version = std::nullopt); /** * Update the provided Signer with the state of T at the specified version. diff --git a/include/derecho/persistent/PersistentInterface.hpp b/include/derecho/persistent/PersistentInterface.hpp index ecb852a1..9ebbf93c 100644 --- a/include/derecho/persistent/PersistentInterface.hpp +++ b/include/derecho/persistent/PersistentInterface.hpp @@ -5,6 +5,7 @@ #include #include +#include namespace persistent { @@ -71,12 +72,16 @@ class PersistentObject { */ virtual void updateVerifier(version_t version, openssl::Verifier& verifier) = 0; /** - * Persists versions to persistent storage, up to the provided version. - * @param version The highest version number to persist + * Persists versions to persistent storage. If the optional argument is specified, + * only persists up to the provided version. If the argument is std::nullopt, + * persists up to the latest version in the log. * - * @return the real persisted version which might be higher than the requested one. + * @param latest_version Either the highest version number to persist, or + * std::nullopt to indicate that the latest in-memory version should be persisted. + * @return The version actually persisted; either the requested version, or + * the latest version if the argument was std::nullopt. */ - virtual version_t persist(version_t version) = 0; + virtual version_t persist(std::optional latest_version = std::nullopt) = 0; /** * Trims the beginning (oldest part) of the log, discarding versions older * than the specified version @@ -84,7 +89,11 @@ class PersistentObject { */ virtual void trim(version_t earliest_version) = 0; /** - * @return the Persistent object's current version number + * @return the Persistent object's current in-memory version number + */ + virtual version_t getCurrentVersion() const = 0; + /** + * @return the Persistent object's newest version with log data */ virtual version_t getLatestVersion() const = 0; /** diff --git a/include/derecho/persistent/detail/FilePersistLog.hpp b/include/derecho/persistent/detail/FilePersistLog.hpp index 250a7fa6..d708ac24 100644 --- a/include/derecho/persistent/detail/FilePersistLog.hpp +++ b/include/derecho/persistent/detail/FilePersistLog.hpp @@ -21,8 +21,8 @@ namespace persistent { // meta header format union MetaHeader { struct { - int64_t head; // the head index - int64_t tail; // the tail index + int64_t head; // the head index: The smallest valid log index. + int64_t tail; // the tail index: One greater than the last valid log index. int64_t ver; // the latest version number. } fields; uint8_t bytes[META_HEADER_SIZE]; @@ -65,20 +65,20 @@ union LogEntry { // helpers: ///// READ or WRITE LOCK on LOG REQUIRED to use the following MACROs!!!! -#define LOG_ENTRY_ARRAY ((LogEntry*)(this->m_pLog)) +#define LOG_ENTRY_ARRAY (reinterpret_cast(this->m_pLog)) #define NUM_USED_SLOTS (m_currMetaHeader.fields.tail - m_currMetaHeader.fields.head) // #define NUM_USED_SLOTS_PERS (m_persMetaHeader.tail - m_persMetaHeader.head) #define NUM_FREE_SLOTS (MAX_LOG_ENTRY - 1 - NUM_USED_SLOTS) // #define NUM_FREE_SLOTS_PERS (MAX_LOG_ENTRY - 1 - NUM_USERD_SLOTS_PERS) -#define LOG_ENTRY_AT(idx) (LOG_ENTRY_ARRAY + (int)((idx) % MAX_LOG_ENTRY)) +#define LOG_ENTRY_AT(idx) (LOG_ENTRY_ARRAY + static_cast((idx) % MAX_LOG_ENTRY)) #define NEXT_LOG_ENTRY LOG_ENTRY_AT(m_currMetaHeader.fields.tail) #define NEXT_LOG_ENTRY_PERS LOG_ENTRY_AT( \ MAX(m_persMetaHeader.fields.tail, m_currMetaHeader.fields.head)) #define CURR_LOG_IDX ((NUM_USED_SLOTS == 0) ? INVALID_INDEX : m_currMetaHeader.fields.tail - 1) -#define LOG_ENTRY_DATA(e) ((void*)((uint8_t*)this->m_pData + ((e)->fields.ofst + this->signature_size) % MAX_DATA_SIZE)) -#define LOG_ENTRY_SIGNATURE(e) ((void*)((uint8_t*)this->m_pData + ((e)->fields.ofst) % MAX_DATA_SIZE)) +#define LOG_ENTRY_DATA(e) ((void*)(reinterpret_cast(this->m_pData) + ((e)->fields.ofst + this->signature_size) % MAX_DATA_SIZE)) +#define LOG_ENTRY_SIGNATURE(e) ((void*)(reinterpret_cast(this->m_pData) + ((e)->fields.ofst) % MAX_DATA_SIZE)) #define NEXT_DATA_OFST ((CURR_LOG_IDX == INVALID_INDEX) ? 0 : (LOG_ENTRY_AT(CURR_LOG_IDX)->fields.ofst + LOG_ENTRY_AT(CURR_LOG_IDX)->fields.sdlen)) #define NEXT_DATA ((void*)(reinterpret_cast(this->m_pData) + NEXT_DATA_OFST % MAX_DATA_SIZE)) @@ -198,11 +198,12 @@ class FilePersistLog : public PersistLog { virtual version_t getNextVersionOf(const version_t ver) override; virtual version_t getEarliestVersion() override; virtual version_t getLatestVersion() override; + virtual version_t getCurrentVersion() override; virtual version_t getLastPersistedVersion() override; virtual const void* getEntryByIndex(int64_t eno) override; virtual const void* getEntry(version_t ver, bool exact = false) override; virtual const void* getEntry(const HLC& hlc) override; - virtual version_t persist(version_t ver, + virtual version_t persist(std::optional latest_version, bool preLocked = false) override; virtual void processEntryAtVersion(version_t ver, const std::function& func) override; virtual void addSignature(version_t ver, const uint8_t* signature, version_t previous_signed_version) override; @@ -239,13 +240,7 @@ class FilePersistLog : public PersistLog { m_currMetaHeader.fields.head = (idx + 1); FPL_PERS_LOCK; try { - // What version number should be supplied to persist in this case? - // CAUTION: - // The persist API is changed for Edward's convenience by adding a version parameter - // This has a widespreading on the design and needs extensive test before replying on - // it. - version_t ver = LOG_ENTRY_AT(CURR_LOG_IDX)->fields.ver; - persist(ver, true); + persist(std::nullopt, true); } catch(std::exception& e) { FPL_UNLOCK; FPL_PERS_UNLOCK; diff --git a/include/derecho/persistent/detail/PersistLog.hpp b/include/derecho/persistent/detail/PersistLog.hpp index 9edc1063..1847fb04 100644 --- a/include/derecho/persistent/detail/PersistLog.hpp +++ b/include/derecho/persistent/detail/PersistLog.hpp @@ -87,7 +87,7 @@ class PersistLog { * @param enable_signatures True if this log should sign every entry, false * if there are no signatures. */ - PersistLog(const std::string& name, bool enable_signatures) noexcept(true); + PersistLog(const std::string& name, bool enable_signatures); virtual ~PersistLog() noexcept(true); /** Persistent Append * @param pdata - serialized data to be append @@ -105,10 +105,9 @@ class PersistLog { = 0; /** - * Advance the version number without appendding a log. This is useful + * Advance the version number without appending a log. This is useful * to create gap between versions. */ - // virtual void advanceVersion(const __int128 & ver) = 0; virtual void advanceVersion(version_t ver) = 0; // Get the length of the log @@ -140,9 +139,17 @@ class PersistLog { // Get the Earlist version virtual version_t getEarliestVersion() = 0; - // Get the Latest version + /** + * Get the latest version with a log entry + */ virtual version_t getLatestVersion() = 0; + /** + * Get the current in-memory version, even if it has no corresponding log + * entry due to a call to advanceVersion(). + */ + virtual version_t getCurrentVersion() = 0; + // return the last persisted version virtual version_t getLastPersistedVersion() = 0; @@ -169,12 +176,18 @@ class PersistLog { virtual void processEntryAtVersion(version_t ver, const std::function& func) = 0; /** - * Persist the log till specified version - * @return - the version till which has been persisted. - * Note that the return value could be higher than the the version asked - * is lower than the log that has been actually persisted. + * Persist the log, either until the specified version or until the latest version + * @param latest_version - Optional version number. If provided, persist() + * will only persist the log up through this version. If std::nullopt, + * persist() will persist the log up through the current version + * @param preLocked - True if the calling function has already acquired both + * the mutex lock and the read lock on the persistent log. Default is false, + * which means this function will acquire the locks before accessing the log. + * @return - the version till which has been persisted. Note that this will + * be equal to either the requested version or the current version at the time + * persist() was called. */ - virtual version_t persist(version_t version, + virtual version_t persist(std::optional latest_version, bool preLocked = false) = 0; /** diff --git a/include/derecho/persistent/detail/PersistNoLog_impl.hpp b/include/derecho/persistent/detail/PersistNoLog_impl.hpp index 4a6cf6f0..fbc03cb7 100644 --- a/include/derecho/persistent/detail/PersistNoLog_impl.hpp +++ b/include/derecho/persistent/detail/PersistNoLog_impl.hpp @@ -1,6 +1,8 @@ #ifndef PERSIST_NO_LOG_IMPL_HPP #define PERSIST_NO_LOG_IMPL_HPP +#include "../PersistNoLog.hpp" + #define _NOLOG_OBJECT_DIR_ ((storageType == ST_MEM) ? getPersRamdiskPath().c_str() : getPersFilePath().c_str()) #define _NOLOG_OBJECT_NAME_ ((object_name == nullptr) ? typeid(ObjectType).name() : object_name) diff --git a/include/derecho/persistent/detail/Persistent_impl.hpp b/include/derecho/persistent/detail/Persistent_impl.hpp index 07a8ac59..c7df2491 100644 --- a/include/derecho/persistent/detail/Persistent_impl.hpp +++ b/include/derecho/persistent/detail/Persistent_impl.hpp @@ -1,6 +1,8 @@ #ifndef PERSISTENT_IMPL_HPP #define PERSISTENT_IMPL_HPP +#include "derecho/persistent/Persistent.hpp" + #include "derecho/openssl/hash.hpp" #include @@ -496,6 +498,12 @@ version_t Persistent::getLatestVersion() const { return this->m_pLog->getLatestVersion(); } +template +version_t Persistent::getCurrentVersion() const { + return this->m_pLog->getCurrentVersion(); +} + template version_t Persistent::getLastPersistedVersion() const { @@ -585,7 +593,9 @@ void Persistent::version(const version_t ver) { template std::size_t Persistent::updateSignature(version_t ver, openssl::Signer& signer) { + dbg_trace(m_logger, "In Persistent: update signature (ver={})", ver); if(this->m_pLog->signature_size == 0) { + dbg_trace(m_logger, "Returning 0 because signatures are disabled for this object"); return 0; } std::size_t bytes_added = 0; @@ -625,6 +635,7 @@ std::size_t Persistent::getSignatureSize() const { template void Persistent::updateVerifier(version_t ver, openssl::Verifier& verifier) { + dbg_trace(m_logger, "In Persistent: update verifier (ver={})", ver); if(this->m_pLog->signature_size == 0) { return; } @@ -637,7 +648,7 @@ void Persistent::updateVerifier(version_t ver, openssl: template -version_t Persistent::persist(version_t ver) { +version_t Persistent::persist(std::optional ver) { #if defined(_PERFORMANCE_DEBUG) struct timespec t1, t2; clock_gettime(CLOCK_REALTIME, &t1); @@ -648,7 +659,7 @@ version_t Persistent::persist(version_t ver) { return ret; #else version_t persisted_ver = this->m_pLog->persist(ver); - dbg_debug(m_logger, "{} persist({}), actually persisted version {}", this->m_pLog->m_sName, ver, persisted_ver); + dbg_debug(m_logger, "{} persist({}), actually persisted version {}", this->m_pLog->m_sName, ver ? *ver : 0, persisted_ver); return persisted_ver; #endif //_PERFORMANCE_DEBUG } diff --git a/include/derecho/sst/detail/sst_impl.hpp b/include/derecho/sst/detail/sst_impl.hpp index 114b3c29..4d61f518 100644 --- a/include/derecho/sst/detail/sst_impl.hpp +++ b/include/derecho/sst/detail/sst_impl.hpp @@ -9,6 +9,7 @@ #include "../sst.hpp" #include "../predicates.hpp" +#include "derecho/utils/time.h" #include "poll_utils.hpp" #include #include @@ -19,7 +20,6 @@ #include #include #include -#include namespace sst { diff --git a/src/applications/tests/unit_tests/signature_chain_test.cpp b/src/applications/tests/unit_tests/signature_chain_test.cpp index 94f66416..f6554cd3 100644 --- a/src/applications/tests/unit_tests/signature_chain_test.cpp +++ b/src/applications/tests/unit_tests/signature_chain_test.cpp @@ -42,17 +42,16 @@ int main(int argc, char** argv) { version_t new_ver = versions[i]; uint64_t timestamp = std::chrono::system_clock::now().time_since_epoch().count(); registry.makeVersion(new_ver, HLC{timestamp,0}); - //Simulate Replicated::persist + //Simulate a persistence request std::vector signature(signer.get_max_signature_size()); - version_t next_persisted_ver = registry.getMinimumLatestVersion(); - registry.sign(next_persisted_ver, signer, signature.data()); - registry.persist(next_persisted_ver); - assert(next_persisted_ver == new_ver); - dbg_default_info("Signature on version {}: {}", next_persisted_ver, spdlog::to_hex(signature)); + version_t signed_version = registry.sign(signer, signature.data()); + registry.persist(signed_version); + assert(signed_version == new_ver); + dbg_default_info("Signature on version {}: {}", signed_version, spdlog::to_hex(signature)); } for(version_t cur_version : versions) { - //Simulate a verification request + //Verify each signature to make sure they are all valid std::vector signature(signer.get_max_signature_size()); bool got_signature = registry.getSignature(cur_version, signature.data()); if(!got_signature) { diff --git a/src/applications/tests/unit_tests/signed_log_test.cpp b/src/applications/tests/unit_tests/signed_log_test.cpp index 61943daf..a4dc7778 100644 --- a/src/applications/tests/unit_tests/signed_log_test.cpp +++ b/src/applications/tests/unit_tests/signed_log_test.cpp @@ -21,11 +21,18 @@ /* --- TestState implementation --- */ -void TestState::notify_update_delivered(uint64_t update_counter, persistent::version_t version) { +void TestState::notify_update_delivered(uint64_t update_counter, persistent::version_t version, bool is_signed) { dbg_default_debug("Update {}/{} delivered", update_counter, subgroup_total_updates); - if(update_counter == subgroup_total_updates) { - dbg_default_info("Final update (#{}) delivered, version is {}", update_counter, version); + // For signed subgroups, record every signed-update version, in case the last update is not a signed update + if(!my_subgroup_is_unsigned && is_signed) { last_version = version; + } + if(update_counter == subgroup_total_updates) { + // For signed subgroups, only update last_version if this is a signed update + if(my_subgroup_is_unsigned || is_signed) { + last_version = version; + } + dbg_default_info("Final update (#{}) delivered, last version is {}", update_counter, last_version); last_version_ready = true; } } @@ -46,7 +53,7 @@ void TestState::notify_global_verified(derecho::subgroup_id_t subgroup_id, persi dbg_default_flush(); // Each node should only be placed in one subgroup, so this callback should not be invoked for any other subgroup IDs assert(subgroup_id == my_subgroup_id); - if(last_version_ready && version == last_version) { + if(last_version_ready && version >= last_version) { { std::unique_lock finish_lock(finish_mutex); subgroup_finished = true; @@ -55,6 +62,45 @@ void TestState::notify_global_verified(derecho::subgroup_id_t subgroup_id, persi } } +/* --- StringWithDelta implementation --- */ + +StringWithDelta::StringWithDelta(const std::string& init_string) : current_state(init_string) {} + +void StringWithDelta::append(const std::string& str_val) { + dbg_default_trace("StringWithDelta: append called, adding {} to delta", str_val); + current_state.append(str_val); + delta.append(str_val); +} + +std::string StringWithDelta::get_current_state() const { + return current_state; +} + +void StringWithDelta::finalizeCurrentDelta(const persistent::DeltaFinalizer& finalizer) { + if(delta.size() == 0) { + dbg_default_trace("StringWithDelta: Calling finalizer with null buffer"); + finalizer(nullptr, 0); + } else { + // Serialize the string to a byte buffer and give that buffer to the DeltaFinalizer + // (this will create an unnecessary extra copy of the string, but efficiency isn't important here) + std::vector delta_buffer(mutils::bytes_size(delta)); + dbg_default_trace("StringWithDelta: Serializing delta string to a buffer of size {}", delta_buffer.size()); + mutils::to_bytes(delta, delta_buffer.data()); + finalizer(delta_buffer.data(), delta_buffer.size()); + delta.clear(); + } +} + +void StringWithDelta::applyDelta(uint8_t const* const data) { + dbg_default_trace("StringWithDelta: Appending a serialized string in applyDelta"); + // The delta is a serialized std::string that we want to pass to the string::append function on current_state + mutils::deserialize_and_run(nullptr, data, [this](const std::string& arg) { current_state.append(arg); }); +} + +std::unique_ptr StringWithDelta::create(mutils::DeserializationManager* dm) { + return std::make_unique(); +} + /* --- OneFieldObject implementation --- */ OneFieldObject::OneFieldObject(persistent::PersistentRegistry* registry, TestState* test_state) @@ -76,10 +122,12 @@ OneFieldObject::OneFieldObject(persistent::Persistent& other_value, void OneFieldObject::update_state(const std::string& new_value) { auto& this_subgroup_reference = this->group->template get_subgroup(this->subgroup_index); - auto version_and_timestamp = this_subgroup_reference.get_current_version(); + auto version_and_hlc = this_subgroup_reference.get_current_version(); + dbg_default_debug("OneFieldObject: Entering update (RPC function), current version is {}", std::get<0>(version_and_hlc)); ++updates_delivered; *string_field = new_value; - test_state->notify_update_delivered(updates_delivered, std::get<0>(version_and_timestamp)); + test_state->notify_update_delivered(updates_delivered, std::get<0>(version_and_hlc), true); + dbg_default_debug("OneFieldObject: Leaving update"); } // Custom deserializer that retrieves the TestState pointer from the DeserializationManager @@ -116,11 +164,13 @@ TwoFieldObject::TwoFieldObject(persistent::Persistent& other_foo, void TwoFieldObject::update(const std::string& new_foo, const std::string& new_bar) { auto& this_subgroup_reference = this->group->template get_subgroup(this->subgroup_index); - auto version_and_timestamp = this_subgroup_reference.get_current_version(); + auto version_and_hlc = this_subgroup_reference.get_current_version(); + dbg_default_debug("TwoFieldObject: Entering update (RPC function), current version is {}", std::get<0>(version_and_hlc)); ++updates_delivered; *foo = new_foo; *bar = new_bar; - test_state->notify_update_delivered(updates_delivered, std::get<0>(version_and_timestamp)); + test_state->notify_update_delivered(updates_delivered, std::get<0>(version_and_hlc), true); + dbg_default_debug("TwoFieldObject: Leaving update"); } std::unique_ptr TwoFieldObject::from_bytes(mutils::DeserializationManager* dsm, uint8_t const* buffer) { @@ -135,6 +185,89 @@ std::unique_ptr TwoFieldObject::from_bytes(mutils::Deserializati return std::make_unique(*foo_ptr, *bar_ptr, *update_counter_ptr, test_state_ptr); } +/* --- MixedFieldObject implementation --- */ + +MixedFieldObject::MixedFieldObject(persistent::PersistentRegistry* registry, TestState* test_state) + : unsigned_field(std::make_unique, "MixedFieldObjectUnsignedField", registry, false), + signed_delta_field(std::make_unique, "MixedFieldObjectDeltaString", registry, true), + updates_delivered(0), + test_state(test_state) { + assert(test_state); +} + +MixedFieldObject::MixedFieldObject(persistent::Persistent& other_unsigned_field, + persistent::Persistent& other_delta_field, + std::string& other_non_persistent_field, + uint64_t other_updates_delivered, + TestState* test_state) + : unsigned_field(std::move(other_unsigned_field)), + signed_delta_field(std::move(other_delta_field)), + non_persistent_field(std::move(other_non_persistent_field)), + updates_delivered(other_updates_delivered), + test_state(test_state) { + assert(test_state); +} + +std::unique_ptr MixedFieldObject::from_bytes(mutils::DeserializationManager* dsm, uint8_t const* buffer) { + std::size_t bytes_read = 0; + auto unsigned_field_ptr = mutils::from_bytes>(dsm, buffer + bytes_read); + bytes_read += mutils::bytes_size(*unsigned_field_ptr); + auto delta_field_ptr = mutils::from_bytes>(dsm, buffer + bytes_read); + bytes_read += mutils::bytes_size(*delta_field_ptr); + auto non_persistent_ptr = mutils::from_bytes(dsm, buffer + bytes_read); + bytes_read += mutils::bytes_size(*non_persistent_ptr); + auto update_counter_ptr = mutils::from_bytes(dsm, buffer + bytes_read); + assert(dsm); + assert(dsm->registered()); + TestState* test_state_ptr = &(dsm->mgr()); + return std::make_unique(*unsigned_field_ptr, *delta_field_ptr, + *non_persistent_ptr, *update_counter_ptr, test_state_ptr); +} + +void MixedFieldObject::unsigned_update(const std::string& new_value) { + auto& this_subgroup_reference = this->group->template get_subgroup(this->subgroup_index); + auto version_and_hlc = this_subgroup_reference.get_current_version(); + dbg_default_debug("MixedFieldObject: Entering unsigned_update, current version is {}", std::get<0>(version_and_hlc)); + ++updates_delivered; + *unsigned_field = new_value; + test_state->notify_update_delivered(updates_delivered, std::get<0>(version_and_hlc), false); + dbg_default_debug("MixedFieldObject: Leaving unsigned_update"); +} + +void MixedFieldObject::signed_delta_update(const std::string& append_value) { + auto& this_subgroup_reference = this->group->template get_subgroup(this->subgroup_index); + auto version_and_hlc = this_subgroup_reference.get_current_version(); + dbg_default_debug("MixedFieldObject: Entering signed_delta_update, current version is {}", std::get<0>(version_and_hlc)); + ++updates_delivered; + signed_delta_field->append(append_value); + test_state->notify_update_delivered(updates_delivered, std::get<0>(version_and_hlc), true); + dbg_default_debug("MixedFieldObject: Leaving signed_delta_update"); +} + +void MixedFieldObject::non_persistent_update(const std::string& new_value) { + auto& this_subgroup_reference = this->group->template get_subgroup(this->subgroup_index); + auto version_and_hlc = this_subgroup_reference.get_current_version(); + dbg_default_debug("MixedFieldObject: Entering non_persistent_update, current version is {}", std::get<0>(version_and_hlc)); + ++updates_delivered; + non_persistent_field = new_value; + test_state->notify_update_delivered(updates_delivered, std::get<0>(version_and_hlc), false); + dbg_default_debug("MixedFieldObject: Leaving non_persistent_update"); +} + +void MixedFieldObject::update_all(const std::string& new_unsigned, + const std::string& delta_append_value, + const std::string& new_non_persistent) { + auto& this_subgroup_reference = this->group->template get_subgroup(this->subgroup_index); + auto version_and_hlc = this_subgroup_reference.get_current_version(); + dbg_default_debug("MixedFieldObject: Entering update_all, current version is {}", std::get<0>(version_and_hlc)); + ++updates_delivered; + *unsigned_field = new_unsigned; + signed_delta_field->append(delta_append_value); + non_persistent_field = new_non_persistent; + test_state->notify_update_delivered(updates_delivered, std::get<0>(version_and_hlc), true); + dbg_default_debug("MixedFieldObject: Leaving update_all"); +} + /* --- UnsignedObject implementation --- */ UnsignedObject::UnsignedObject(persistent::PersistentRegistry* registry, TestState* test_state) @@ -158,7 +291,7 @@ void UnsignedObject::update_state(const std::string& new_value) { auto version_and_timestamp = this_subgroup_reference.get_current_version(); ++updates_delivered; *string_field = new_value; - test_state->notify_update_delivered(updates_delivered, std::get<0>(version_and_timestamp)); + test_state->notify_update_delivered(updates_delivered, std::get<0>(version_and_timestamp), false); } std::unique_ptr UnsignedObject::from_bytes(mutils::DeserializationManager* dsm, uint8_t const* buffer) { @@ -175,6 +308,7 @@ std::unique_ptr UnsignedObject::from_bytes(mutils::Deserializati * Command-line arguments: * one_field_size: Maximum size of the subgroup that replicates the one-field signed object * two_field_size: Maximum size of the subgroup that replicates the two-field signed object + * mixed_field_size: Maximum size of the subgroup that replicates the mixed-signed-and-unsigned-field object * unsigned_size: Maximum size of the subgroup that replicates the persistent-but-not-signed object * num_updates: Number of randomly-generated 32-byte updates to send to each subgroup */ @@ -183,16 +317,17 @@ int main(int argc, char** argv) { const std::string characters("abcdefghijklmnopqrstuvwxyz"); std::mt19937 random_generator(getpid()); std::uniform_int_distribution char_distribution(0, characters.size() - 1); - const int num_args = 4; + const int num_args = 5; if(argc < (num_args + 1) || (argc > (num_args + 1) && strcmp("--", argv[argc - (num_args + 1)]) != 0)) { std::cout << "Invalid command line arguments." << std::endl; - std::cout << "Usage: " << argv[0] << " [derecho-config-options -- ] one_field_size two_field_size unsigned_size num_updates" << std::endl; + std::cout << "Usage: " << argv[0] << " [derecho-config-options -- ] one_field_size two_field_size mixed_field_size unsigned_size num_updates" << std::endl; return -1; } const unsigned int subgroup_1_size = std::stoi(argv[argc - num_args]); const unsigned int subgroup_2_size = std::stoi(argv[argc - num_args + 1]); - const unsigned int subgroup_unsigned_size = std::stoi(argv[argc - num_args + 2]); + const unsigned int subgroup_mixed_size = std::stoi(argv[argc - num_args + 2]); + const unsigned int subgroup_unsigned_size = std::stoi(argv[argc - num_args + 3]); const unsigned int num_updates = std::stoi(argv[argc - 1]); derecho::Conf::initialize(argc, argv); @@ -203,14 +338,18 @@ int main(int argc, char** argv) { {std::type_index(typeid(TwoFieldObject)), derecho::one_subgroup_policy(derecho::flexible_even_shards( 1, 1, subgroup_2_size))}, + {std::type_index(typeid(MixedFieldObject)), + derecho::one_subgroup_policy(derecho::flexible_even_shards( + 1, 1, subgroup_mixed_size))}, {std::type_index(typeid(UnsignedObject)), derecho::one_subgroup_policy(derecho::flexible_even_shards( 1, 1, subgroup_unsigned_size))}})); //Count the total number of messages to be delivered in each subgroup, so we can //identify when the last message has been delivered and discover what version it got - std::array subgroup_total_messages = {subgroup_1_size * num_updates, + std::array subgroup_total_messages = {subgroup_1_size * num_updates, subgroup_2_size * num_updates, + subgroup_mixed_size * num_updates, subgroup_unsigned_size * num_updates}; TestState test_state; @@ -226,7 +365,7 @@ int main(int argc, char** argv) { dbg_default_info("Now on View {}", view.vid); }; // Pass test_state to the Group constructor as a DeserializationContext - derecho::Group group( + derecho::Group group( {nullptr, nullptr, global_persist_callback, global_verified_callback}, subgroup_info, {&test_state}, {new_view_callback}, [&](persistent::PersistentRegistry* pr, derecho::subgroup_id_t id) { @@ -235,12 +374,16 @@ int main(int argc, char** argv) { [&](persistent::PersistentRegistry* pr, derecho::subgroup_id_t id) { return std::make_unique(pr, &test_state); }, + [&](persistent::PersistentRegistry* pr, derecho::subgroup_id_t id) { + return std::make_unique(pr, &test_state); + }, [&](persistent::PersistentRegistry* pr, derecho::subgroup_id_t id) { return std::make_unique(pr, &test_state); }); //Figure out which subgroup this node got assigned to int32_t my_shard_subgroup_1 = group.get_my_shard(); int32_t my_shard_subgroup_2 = group.get_my_shard(); + int32_t my_shard_mixed_subgroup = group.get_my_shard(); int32_t my_shard_unsigned_subgroup = group.get_my_shard(); if(my_shard_subgroup_1 != -1) { std::cout << "In the OneFieldObject subgroup" << std::endl; @@ -271,10 +414,29 @@ int main(int argc, char** argv) { [&]() { return characters[char_distribution(random_generator)]; }); object_handle.ordered_send(new_foo, new_bar); } + } else if(my_shard_mixed_subgroup != -1) { + std::cout << "In the MixedFieldObject subgroup" << std::endl; + derecho::Replicated& object_handle = group.get_subgroup(); + test_state.subgroup_total_updates = subgroup_total_messages[2]; + test_state.my_subgroup_id = object_handle.get_subgroup_id(); + test_state.my_subgroup_is_unsigned = false; + //Send random updates, alternating between the signed, unsigned, and nonpersistent fields + for(unsigned counter = 0; counter < num_updates; ++counter) { + std::string new_string_value('a', 32); + std::generate(new_string_value.begin(), new_string_value.end(), + [&]() { return characters[char_distribution(random_generator)]; }); + if(counter % 3 == 0) { + object_handle.ordered_send(new_string_value); + } else if(counter % 3 == 1) { + object_handle.ordered_send(new_string_value); + } else { + object_handle.ordered_send(new_string_value); + } + } } else if(my_shard_unsigned_subgroup != -1) { std::cout << "In the UnsignedObject subgroup" << std::endl; derecho::Replicated& object_handle = group.get_subgroup(); - test_state.subgroup_total_updates = subgroup_total_messages[2]; + test_state.subgroup_total_updates = subgroup_total_messages[3]; test_state.my_subgroup_id = object_handle.get_subgroup_id(); test_state.my_subgroup_is_unsigned = true; //Send random updates @@ -284,6 +446,8 @@ int main(int argc, char** argv) { [&]() { return characters[char_distribution(random_generator)]; }); object_handle.ordered_send(new_string); } + } else { + std::cout << "WARNING: Not assigned to any subgroup. This node won't be able to detect when the test is finished." << std::endl; } //Wait for all updates to finish being verified, using the condition variables updated by the callback std::cout << "Waiting for final version to be verified" << std::endl; diff --git a/src/applications/tests/unit_tests/signed_log_test.hpp b/src/applications/tests/unit_tests/signed_log_test.hpp index 9bdd5b39..9fb1e348 100644 --- a/src/applications/tests/unit_tests/signed_log_test.hpp +++ b/src/applications/tests/unit_tests/signed_log_test.hpp @@ -28,13 +28,37 @@ struct TestState : public derecho::DeserializationContext { // Boolean to set to true when signaling the condition variable bool subgroup_finished; // Called from replicated object update_state methods to notify the main thread that an update was delivered - void notify_update_delivered(uint64_t update_counter, persistent::version_t version); + void notify_update_delivered(uint64_t update_counter, persistent::version_t version, bool is_signed); // Called by Derecho's global persistence callback void notify_global_persistence(derecho::subgroup_id_t subgroup_id, persistent::version_t version); // Called by Derecho's global verified callback void notify_global_verified(derecho::subgroup_id_t subgroup_id, persistent::version_t version); }; +/** + * A simple Delta-supporting object that stores a string and has a method that + * appends to the string. All data appended since the last call to finalizeCurrentDelta + * is stored in the delta, and applyDelta appends the delta to the current string. + * (There's no way to delete or truncate the string since this is just for test + * purposes). Note that if append() has not been called since the last call to + * finalizeCurrentDelta, the delta entry will be empty. + */ +class StringWithDelta : public mutils::ByteRepresentable, + public persistent::IDeltaSupport { + std::string current_state; + std::string delta; + +public: + StringWithDelta() = default; + StringWithDelta(const std::string& init_string); + void append(const std::string& str_val); + std::string get_current_state() const; + virtual void finalizeCurrentDelta(const persistent::DeltaFinalizer& finalizer); + virtual void applyDelta(uint8_t const* const data); + static std::unique_ptr create(mutils::DeserializationManager* dm); + DEFAULT_SERIALIZATION_SUPPORT(StringWithDelta, current_state); +}; + /** * Test object with one signed persistent field */ @@ -106,6 +130,53 @@ class TwoFieldObject : public mutils::ByteRepresentable, static std::unique_ptr from_bytes(mutils::DeserializationManager* dsm, uint8_t const* buffer); }; +/** + * Test object that has both a non-signed persistent field and a signed field + * with delta support (and also an RPC function that updates a field with no + * persistent log at all). The delta-supporting field creates empty deltas if + * it has not been updated, so it's possible for this object to create a log + * entry that has nothing to sign. + */ +class MixedFieldObject : public mutils::ByteRepresentable, + public derecho::GroupReference, + public derecho::SignedPersistentFields { + persistent::Persistent unsigned_field; + persistent::Persistent signed_delta_field; + std::string non_persistent_field; + uint64_t updates_delivered; + TestState* test_state; + +public: + /** Factory constructor */ + MixedFieldObject(persistent::PersistentRegistry* registry, TestState* test_state); + /** Deserialization constructor */ + MixedFieldObject(persistent::Persistent& other_unsigned_field, + persistent::Persistent& other_delta_field, + std::string& other_non_persistent_field, + uint64_t other_updates_delivered, + TestState* test_state); + + std::string get_signed_value() const { + return signed_delta_field->get_current_state(); + } + std::string get_unsigned_value() const { + return *unsigned_field; + } + + void unsigned_update(const std::string& new_value); + void signed_delta_update(const std::string& append_value); + void non_persistent_update(const std::string& new_value); + void update_all(const std::string& new_unsigned, + const std::string& delta_append_value, + const std::string& new_non_persistent); + + REGISTER_RPC_FUNCTIONS(MixedFieldObject, P2P_TARGETS(get_signed_value, get_unsigned_value), + ORDERED_TARGETS(unsigned_update, signed_delta_update, non_persistent_update, update_all)); + DEFAULT_SERIALIZE(unsigned_field, signed_delta_field, non_persistent_field, updates_delivered); + DEFAULT_DESERIALIZE_NOALLOC(MixedFieldObject); + static std::unique_ptr from_bytes(mutils::DeserializationManager* dsm, uint8_t const* buffer); +}; + /** * Test object with one un-signed persistent field */ @@ -133,4 +204,4 @@ class UnsignedObject : public mutils::ByteRepresentable, DEFAULT_SERIALIZE(string_field, updates_delivered); DEFAULT_DESERIALIZE_NOALLOC(UnsignedObject); static std::unique_ptr from_bytes(mutils::DeserializationManager* dsm, uint8_t const* buffer); -}; \ No newline at end of file +}; diff --git a/src/core/git_version.cpp b/src/core/git_version.cpp index dca2dfb3..b9ff2925 100644 --- a/src/core/git_version.cpp +++ b/src/core/git_version.cpp @@ -13,8 +13,8 @@ namespace derecho { const int MAJOR_VERSION = 2; const int MINOR_VERSION = 3; const int PATCH_VERSION = 0; -const int COMMITS_AHEAD_OF_VERSION = 190; +const int COMMITS_AHEAD_OF_VERSION = 229; const char* VERSION_STRING = "2.3.0"; -const char* VERSION_STRING_PLUS_COMMITS = "2.3.0+190"; +const char* VERSION_STRING_PLUS_COMMITS = "2.3.0+229"; } diff --git a/src/core/persistence_manager.cpp b/src/core/persistence_manager.cpp index 885eea56..2a71b5eb 100644 --- a/src/core/persistence_manager.cpp +++ b/src/core/persistence_manager.cpp @@ -22,23 +22,28 @@ PersistenceManager::PersistenceManager( signature_size(0), persistence_callbacks{user_persistence_callback}, objects_by_subgroup_id(objects_map) { - // initialize semaphore + // initialize semaphores if(sem_init(&persistence_request_sem, 1, 0) != 0) { throw derecho_exception("Cannot initialize persistent_request_sem:errno=" + std::to_string(errno)); } + if(sem_init(&verification_request_sem, 1, 0) != 0) { + throw derecho_exception("Cannot initialize verification_request_sem: errno=" + std::to_string(errno)); + } if(any_signed_objects) { openssl::EnvelopeKey signing_key = openssl::EnvelopeKey::from_pem_private(getConfString(Conf::PERS_PRIVATE_KEY_FILE)); signature_size = signing_key.get_max_size(); - //The Verifier only needs the public key, but we loaded both public and private components from the private key file - signature_verifier = std::make_unique(signing_key, openssl::DigestAlgorithm::SHA256); } } PersistenceManager::~PersistenceManager() { - if (this->persist_thread.joinable()) { + if(this->persist_thread.joinable()) { this->persist_thread.join(); } + if(verify_thread.joinable()) { + verify_thread.join(); + } sem_destroy(&persistence_request_sem); + sem_destroy(&verification_request_sem); } void PersistenceManager::set_view_manager(ViewManager& view_manager) { @@ -56,11 +61,12 @@ void PersistenceManager::add_persistence_callback(const persistence_callback_t& void PersistenceManager::start() { //Initialize this vector now that ViewManager is set up and we know the number of subgroups last_persisted_version.resize(view_manager->get_current_view().get().subgroup_shard_views.size(), -1); - //Start the thread + last_verified_version.resize(last_persisted_version.size(), -1); + //Start the persistence thread this->persist_thread = std::thread{[this]() { pthread_setname_np(pthread_self(), "persist"); dbg_debug(persistence_logger, "PersistenceManager thread started"); - do { + while(true) { // wait for semaphore sem_wait(&persistence_request_sem); while(prq_lock.test_and_set(std::memory_order_acquire)) // acquire lock @@ -72,16 +78,11 @@ void PersistenceManager::start() { } continue; } - ThreadRequest request = persistence_request_queue.front(); persistence_request_queue.pop(); prq_lock.clear(std::memory_order_release); // release lock - if(request.operation == RequestType::PERSIST) { - handle_persist_request(request.subgroup_id, request.version); - } else if(request.operation == RequestType::VERIFY) { - handle_verify_request(request.subgroup_id, request.version); - } + handle_persist_request(request.subgroup_id, request.version); if(this->thread_shutdown) { while(prq_lock.test_and_set(std::memory_order_acquire)) // acquire lock ; // spin @@ -91,7 +92,35 @@ void PersistenceManager::start() { } prq_lock.clear(std::memory_order_release); // release lock } - } while(true); + } + }}; + // Start the verification thread + this->verify_thread = std::thread{[this]() { + pthread_setname_np(pthread_self(), "verify"); + while(true) { + // wait for semaphore + sem_wait(&verification_request_sem); + while(vrq_lock.test_and_set(std::memory_order_acquire)); + if(verify_request_queue.empty()) { + vrq_lock.clear(std::memory_order_release); + if(thread_shutdown) { + break; + } + continue; + } + ThreadRequest request = verify_request_queue.front(); + verify_request_queue.pop(); + vrq_lock.clear(std::memory_order_release); + handle_verify_request(request.subgroup_id, request.version); + if(thread_shutdown) { + while(vrq_lock.test_and_set(std::memory_order_acquire)); + if(verify_request_queue.empty()) { + vrq_lock.clear(std::memory_order_release); + break; + } + vrq_lock.clear(std::memory_order_release); + } + } }}; } @@ -102,19 +131,36 @@ void PersistenceManager::handle_persist_request(subgroup_id_t subgroup_id, persi return; } persistent::version_t persisted_version = version; + persistent::version_t signed_version = version; // persist try { - //To reduce the time this thread holds the View lock, put the signature in a local array - //and copy it into the SST once signing is done. (We could use the SST signatures field - //directly as the signature array, but that would require holding the lock for longer.) + // To reduce the time this thread holds the View lock, put the signature in a local array + // and copy it into the SST once signing is done uint8_t signature[signature_size]; + memset(signature, 0, signature_size); bool object_has_signature = false; auto search = objects_by_subgroup_id.find(subgroup_id); if(search != objects_by_subgroup_id.end()) { - object_has_signature = search->second->is_signed(); - //Update persisted_version to the version actually persisted, which might be greater than the requested version - persisted_version = search->second->persist(version, signature); + // Don't bother doing anything if the object has no persistent fields; + // persisted_version and signed_version will remain version (the argument) + if(search->second->is_persistent()) { + object_has_signature = search->second->is_signed(); + if(object_has_signature) { + signed_version = search->second->sign(signature); + dbg_trace(persistence_logger, "PersistenceManager: Asked Replicated to sign latest version, version actually signed = {}", signed_version); + // Request to persist the same version that was signed, not the "latest available" version, + // to avoid persisting a version that still needs to be signed. However, if the argument to + // this persistence request is later than the signed version, that means the argument version + // definitely does not need a signature (or we would have just signed it) so it's safe to persist. + persisted_version = search->second->persist(std::max(signed_version, version)); + dbg_trace(persistence_logger, "PersistenceManager: Asked Replicated to persist version {}, version actually persisted = {}", std::max(signed_version, version), persisted_version); + } else { + persisted_version = search->second->persist(); + dbg_trace(persistence_logger, "PersistenceManager: Asked Replicated to persist latest version, version actually persisted = {}", persisted_version); + } + assert(persisted_version >= version); + } } // Call the local persistence callbacks before updating the SST // (as soon as the SST is updated, the global persistence callback may fire) @@ -125,16 +171,23 @@ void PersistenceManager::handle_persist_request(subgroup_id_t subgroup_id, persi } // read lock the view SharedLockedReference view_and_lock = view_manager->get_current_view(); - dbg_debug(persistence_logger, "PersistenceManager: updating subgroup {} persisted_num to {}", subgroup_id, persisted_version); - // update the signature and persisted_num in SST View& Vc = view_and_lock.get(); - if(object_has_signature) { + dbg_debug(persistence_logger, "PersistenceManager: updating subgroup {} persisted_num to {} (and signed_num to {} if applicable) ", subgroup_id, persisted_version, signed_version); + // Only update the signature and signed_num in SST if signed_num has in fact advanced + if(object_has_signature + && Vc.gmsSST->signed_num[Vc.gmsSST->get_local_index()][subgroup_id] < signed_version) { gmssst::set(&(Vc.gmsSST->signatures[Vc.gmsSST->get_local_index()][subgroup_id * signature_size]), signature, signature_size); + gmssst::set(Vc.gmsSST->signed_num[Vc.gmsSST->get_local_index()][subgroup_id], signed_version); Vc.gmsSST->put(Vc.multicast_group->get_shard_sst_indices(subgroup_id), (uint8_t*)(&Vc.gmsSST->signatures[0][subgroup_id * signature_size]) - Vc.gmsSST->getBaseAddress(), signature_size); + // Put signed_num separately after the signature to ensure the signature arrives first + Vc.gmsSST->put(Vc.multicast_group->get_shard_sst_indices(subgroup_id), + Vc.gmsSST->signed_num, + subgroup_id); } + // Update persisted_num in SST gmssst::set(Vc.gmsSST->persisted_num[Vc.gmsSST->get_local_index()][subgroup_id], persisted_version); Vc.gmsSST->put(Vc.multicast_group->get_shard_sst_indices(subgroup_id), Vc.gmsSST->persisted_num, @@ -148,6 +201,12 @@ void PersistenceManager::handle_persist_request(subgroup_id_t subgroup_id, persi void PersistenceManager::handle_verify_request(subgroup_id_t subgroup_id, persistent::version_t version) { dbg_debug(persistence_logger, "PersistenceManager: handling verify request for subgroup {} version {}", subgroup_id, version); + // If this request is already obsolete due to batching, don't do anything + if(last_verified_version[subgroup_id] > version) { + return; + } + // Note: The version parameter has no effect on this function. It just examines the SST and verifies the signatures + // on whatever versions are posted to the signed_num column by the other members of the shard. auto search = objects_by_subgroup_id.find(subgroup_id); if(search != objects_by_subgroup_id.end()) { ReplicatedObject* subgroup_object = search->second; @@ -160,36 +219,48 @@ void PersistenceManager::handle_verify_request(subgroup_id_t subgroup_id, persis View& Vc = view_and_lock.get(); std::vector shard_member_ranks = Vc.multicast_group->get_shard_sst_indices(subgroup_id); persistent::version_t minimum_verified_version = std::numeric_limits::max(); - // Special case for a shard of size 1: There are no other members to verify, so just advance verified_num to match persisted_num + persistent::version_t my_signed_version = Vc.gmsSST->signed_num[Vc.gmsSST->get_local_index()][subgroup_id]; + // Special case for a shard of size 1: There are no other members to verify, so just advance verified_num to match signed_num if(shard_member_ranks.size() == 1) { - minimum_verified_version = Vc.gmsSST->persisted_num[Vc.gmsSST->get_local_index()][subgroup_id]; + minimum_verified_version = my_signed_version; } //For each other member of this node's shard, try to verify the signature in its SST row for(const uint32_t shard_member_rank : shard_member_ranks) { if(shard_member_rank == Vc.gmsSST->get_local_index()) { continue; } - //The signature in the other node's "signatures" column should correspond to the version in its "persisted_num" column - const persistent::version_t other_signed_version = Vc.gmsSST->persisted_num[shard_member_rank][subgroup_id]; + // The signature in the other node's "signatures" column corresponds to the version in its "signed_num" column + const persistent::version_t other_signed_version = Vc.gmsSST->signed_num[shard_member_rank][subgroup_id]; + // If this node hasn't finished signing that version yet, we won't be able to check that the other node's signature + // matches the local signature. It also means the minimum signed version can't advance past my_signed_version anyway. + if(other_signed_version > my_signed_version) { + dbg_debug(persistence_logger, "PersistenceManager: Skipping signature check on version {} from node {} because this node hasn't signed that version yet", other_signed_version, Vc.members[shard_member_rank]); + continue; + } //Copy out the signature so it can't change during verification std::vector other_signature(signature_size); gmssst::set(other_signature.data(), &Vc.gmsSST->signatures[shard_member_rank][subgroup_id * signature_size], signature_size); - assert(other_signed_version >= version); - assert(subgroup_object->get_minimum_latest_persisted_version() >= other_signed_version); - bool verification_success = subgroup_object->verify_log( - other_signed_version, *signature_verifier, other_signature.data()); - if(verification_success) { + // Retrieve this node's signature on that version + std::vector my_signature = subgroup_object->get_signature(other_signed_version); + if(my_signature.size() == 0) { + dbg_warn(persistence_logger, "PersistenceManager: Could not find a local signature on version {} even though this node's highest signed version is {}", other_signed_version, my_signed_version); + continue; + } + if(my_signature == other_signature) { + dbg_debug(persistence_logger, "PersistenceManager: Signature for version {} from node {} matched", other_signed_version, Vc.members[shard_member_rank]); minimum_verified_version = std::min(minimum_verified_version, other_signed_version); } else { - dbg_warn(persistence_logger, "Verification of version {} from node {} failed! {}", other_signed_version, Vc.members[shard_member_rank], openssl::get_error_string(ERR_get_error(), "OpenSSL error")); + dbg_warn(persistence_logger, "Signature for version {} from node {} did not match my signature!", other_signed_version, Vc.members[shard_member_rank]); } } //Update verified_num to the lowest version number that successfully verified across all shard members if(minimum_verified_version != std::numeric_limits::max()) { gmssst::set(Vc.gmsSST->verified_num[Vc.gmsSST->get_local_index()][subgroup_id], minimum_verified_version); Vc.gmsSST->put(shard_member_ranks, Vc.gmsSST->verified_num, subgroup_id); + dbg_debug(persistence_logger, "PersistenceManager: Updated verified_num to {}", minimum_verified_version); + last_verified_version[subgroup_id] = minimum_verified_version; } } } @@ -199,7 +270,7 @@ void PersistenceManager::post_persist_request(const subgroup_id_t& subgroup_id, // request enqueue while(prq_lock.test_and_set(std::memory_order_acquire)) // acquire lock ; // spin - persistence_request_queue.push({RequestType::PERSIST, subgroup_id, version}); + persistence_request_queue.push({subgroup_id, version}); prq_lock.clear(std::memory_order_release); // release lock // post semaphore sem_post(&persistence_request_sem); @@ -210,11 +281,11 @@ void PersistenceManager::post_verify_request(const subgroup_id_t& subgroup_id, c if(signature_size == 0) { return; } - while(prq_lock.test_and_set(std::memory_order_acquire)) // acquire lock + while(vrq_lock.test_and_set(std::memory_order_acquire)) // acquire lock ; // spin - persistence_request_queue.push({RequestType::VERIFY, subgroup_id, version}); - prq_lock.clear(std::memory_order_release); // release lock - sem_post(&persistence_request_sem); + verify_request_queue.push({subgroup_id, version}); + vrq_lock.clear(std::memory_order_release); // release lock + sem_post(&verification_request_sem); } /** make a version */ @@ -226,19 +297,22 @@ void PersistenceManager::make_version(const subgroup_id_t& subgroup_id, } } -/** shutdown the thread - */ void PersistenceManager::shutdown(bool wait) { // if(replicated_objects == nullptr) return; //skip for raw subgroups - NO DON'T dbg_debug(persistence_logger, "PersistenceManager thread shutting down"); thread_shutdown = true; - sem_post(&persistence_request_sem); // kick the persistence thread in case it is sleeping + // Wake up the threads in case they are waiting on semaphores + sem_post(&persistence_request_sem); + sem_post(&verification_request_sem); if(wait) { - if (this->persist_thread.joinable()) { + if(this->persist_thread.joinable()) { this->persist_thread.join(); } + if(verify_thread.joinable()) { + verify_thread.join(); + } } } } // namespace derecho diff --git a/src/persistent/FilePersistLog.cpp b/src/persistent/FilePersistLog.cpp index 7416ad59..9b2d02b6 100644 --- a/src/persistent/FilePersistLog.cpp +++ b/src/persistent/FilePersistLog.cpp @@ -294,7 +294,7 @@ void FilePersistLog::advanceVersion(version_t ver) { FPL_UNLOCK; } -version_t FilePersistLog::persist(version_t ver, bool preLocked) { +version_t FilePersistLog::persist(std::optional latest_version, bool preLocked) { int64_t ver_ret = INVALID_VERSION; if(!preLocked) { FPL_PERS_LOCK; @@ -302,10 +302,11 @@ version_t FilePersistLog::persist(version_t ver, bool preLocked) { } if(m_currMetaHeader == m_persMetaHeader) { - // if(CURR_LOG_IDX != INVALID_INDEX) { - //ver_ret = LOG_ENTRY_AT(CURR_LOG_IDX)->fields.ver; - // } - ver_ret = m_persMetaHeader.fields.ver; + if(latest_version) { + ver_ret = *latest_version; + } else { + ver_ret = m_persMetaHeader.fields.ver; + } if(!preLocked) { FPL_UNLOCK; FPL_PERS_UNLOCK; @@ -317,33 +318,62 @@ version_t FilePersistLog::persist(version_t ver, bool preLocked) { //flush data dbg_trace(m_logger, "{0} flush data,log,and meta.", this->m_sName); try { + void *flush_data_start = nullptr, *flush_log_start = nullptr; + size_t flush_data_len = 0, flush_log_len = 0; // shadow the current state - void *flush_dstart = nullptr, *flush_lstart = nullptr; - size_t flush_dlen = 0, flush_llen = 0; MetaHeader shadow_header = m_currMetaHeader; - if((NUM_USED_SLOTS > 0) && (NEXT_LOG_ENTRY > NEXT_LOG_ENTRY_PERS)) { - flush_dlen = (LOG_ENTRY_AT(CURR_LOG_IDX)->fields.ofst + LOG_ENTRY_AT(CURR_LOG_IDX)->fields.sdlen - NEXT_LOG_ENTRY_PERS->fields.ofst); - // flush data - flush_dstart = ALIGN_TO_PAGE(NEXT_DATA_PERS); - flush_dlen += ((int64_t)NEXT_DATA_PERS) % getpagesize(); - // flush log - flush_lstart = ALIGN_TO_PAGE(NEXT_LOG_ENTRY_PERS); - flush_llen = ((size_t)NEXT_LOG_ENTRY - (size_t)NEXT_LOG_ENTRY_PERS) + ((int64_t)NEXT_LOG_ENTRY_PERS) % getpagesize(); + if(latest_version) { + // Find the nearest index corresponding to the requested latest version + int64_t requested_index = getVersionIndex(*latest_version, false); + // If the requested index is earlier than the last persisted index, + // we can't do anything because it was already persisted + if(requested_index < m_persMetaHeader.fields.tail - 1) { + // Make the shadow header equal the persisted header, so the rest of the function is a no-op + shadow_header.fields.tail = m_persMetaHeader.fields.tail; + shadow_header.fields.ver = m_persMetaHeader.fields.ver; + } else { + // Make the shadow header's "current" log entry equal to the requested version's log entry + shadow_header.fields.tail = requested_index + 1; + // If the requested version is not exactly equal to the log entry's version, use the later one + shadow_header.fields.ver = std::max(LOG_ENTRY_AT(requested_index)->fields.ver, *latest_version); + } + dbg_trace(m_logger, "{}: Adjusted shadow header tail to {}, version to {}", this->m_sName, shadow_header.fields.tail, shadow_header.fields.ver); + } + // Ensure the shadow header's log is non-empty + if(shadow_header.fields.tail - shadow_header.fields.head > 0) { + // Determine if there are any un-persisted log entries + LogEntry* current_log_tail_entry = LOG_ENTRY_AT(shadow_header.fields.tail); + LogEntry* persisted_log_tail_entry = LOG_ENTRY_AT(std::max(m_persMetaHeader.fields.tail, shadow_header.fields.head)); + if(current_log_tail_entry > persisted_log_tail_entry) { + dbg_trace(m_logger, "{}: Distance from persisted log tail to current log tail is {} bytes, {} indexes", this->m_sName, + reinterpret_cast(current_log_tail_entry) - reinterpret_cast(persisted_log_tail_entry), + shadow_header.fields.tail - std::max(m_persMetaHeader.fields.tail, shadow_header.fields.head)); + LogEntry* curr_log_entry = LOG_ENTRY_AT(shadow_header.fields.tail - 1); + dbg_trace(m_logger, "{}: Flushing log entries up through version {}", this->m_sName, curr_log_entry->fields.ver); + // Compute start and length for the data buffer + void* persisted_data_tail = LOG_ENTRY_DATA(persisted_log_tail_entry); + flush_data_start = ALIGN_TO_PAGE(persisted_data_tail); + flush_data_len = (curr_log_entry->fields.ofst + curr_log_entry->fields.sdlen + - persisted_log_tail_entry->fields.ofst) + + reinterpret_cast(persisted_data_tail) % getpagesize(); + // Compute start and length for the log buffer + flush_log_start = ALIGN_TO_PAGE(persisted_log_tail_entry); + flush_log_len = (reinterpret_cast(current_log_tail_entry) + - reinterpret_cast(persisted_log_tail_entry)) + + reinterpret_cast(persisted_log_tail_entry) % getpagesize(); + dbg_trace(m_logger, "{}: flush data length = {}, flush log length = {}", this->m_sName, flush_data_len, flush_log_len); + } } - // if(NUM_USED_SLOTS > 0) { - //get the latest flushed version - //ver_ret = LOG_ENTRY_AT(CURR_LOG_IDX)->fields.ver; - // } if(!preLocked) { FPL_UNLOCK; } - if(flush_dlen > 0) { - if(msync(flush_dstart, flush_dlen, MS_SYNC) != 0) { + if(flush_data_len > 0) { + if(msync(flush_data_start, flush_data_len, MS_SYNC) != 0) { throw persistent_file_error("msync failed.", errno); } } - if(flush_llen > 0) { - if(msync(flush_lstart, flush_llen, MS_SYNC) != 0) { + if(flush_log_len > 0) { + if(msync(flush_log_start, flush_log_len, MS_SYNC) != 0) { throw persistent_file_error("msync failed.", errno); } } @@ -481,6 +511,13 @@ version_t FilePersistLog::getLatestVersion() { return ver; } +version_t FilePersistLog::getCurrentVersion() { + FPL_RDLOCK; + version_t ver = m_currMetaHeader.fields.ver; + FPL_UNLOCK; + return ver; +} + version_t FilePersistLog::getLastPersistedVersion() { version_t last_persisted = INVALID_VERSION; ; @@ -633,7 +670,7 @@ version_t FilePersistLog::getNextVersionOf(version_t ver) { } FPL_UNLOCK; } - + dbg_trace(m_logger, "{} getNextVersionOf({}): returning {}", this->m_sName, ver, next_ver); return next_ver; } @@ -675,7 +712,10 @@ void FilePersistLog::processEntryAtVersion(version_t ver, FPL_UNLOCK; if(ple != nullptr && ple->fields.ver == ver) { + dbg_trace(m_logger, "{} - calling process function on log entry {} of size {}", m_sName, ple->fields.ver, static_cast(ple->fields.sdlen - this->signature_size)); func(LOG_ENTRY_DATA(ple), static_cast(ple->fields.sdlen - this->signature_size)); + } else { + dbg_trace(m_logger, "{} - no log entry to process at version {}", m_sName, ver); } } @@ -700,12 +740,7 @@ void FilePersistLog::trimByIndex(int64_t idx) { } m_currMetaHeader.fields.head = idx + 1; try { - //What version number should be supplied to persist in this case? - // CAUTION: - // The persist API is changed for Edward's convenience by adding a version parameter - // This has a widespreading on the design and needs extensive test before replying on - // it. - persist(LOG_ENTRY_AT(CURR_LOG_IDX)->fields.ver, true); + persist(std::nullopt, true); } catch(std::exception& e) { FPL_UNLOCK; FPL_PERS_UNLOCK; diff --git a/src/persistent/PersistLog.cpp b/src/persistent/PersistLog.cpp index 61292b43..db6870dc 100644 --- a/src/persistent/PersistLog.cpp +++ b/src/persistent/PersistLog.cpp @@ -6,7 +6,7 @@ namespace persistent { -PersistLog::PersistLog(const std::string& name, bool enable_signatures) noexcept(true) +PersistLog::PersistLog(const std::string& name, bool enable_signatures) : m_sName(name), signature_size(enable_signatures ? openssl::EnvelopeKey::from_pem_private(derecho::getConfString(derecho::Conf::PERS_PRIVATE_KEY_FILE)).get_max_size() diff --git a/src/persistent/Persistent.cpp b/src/persistent/Persistent.cpp index 36fc8ae0..b88c80e4 100644 --- a/src/persistent/Persistent.cpp +++ b/src/persistent/Persistent.cpp @@ -32,21 +32,22 @@ void PersistentRegistry::makeVersion(version_t ver, const HLC& mhlc) { } }; -version_t PersistentRegistry::getMinimumLatestVersion() { +version_t PersistentRegistry::getCurrentVersion() const { version_t min = -1; for(auto itr = m_registry.begin(); itr != m_registry.end(); ++itr) { - version_t ver = itr->second->getLatestVersion(); + version_t ver = itr->second->getCurrentVersion(); if(itr == m_registry.begin()) { min = ver; } else if(min > ver) { min = ver; } } + dbg_trace(m_logger, "PersistentRegistry: getCurrentVersion() returning {}", min); return min; } -version_t PersistentRegistry::getMinimumVersionAfter(version_t version) { +version_t PersistentRegistry::getMinimumVersionAfter(version_t version) const { version_t min = -1; for(auto registry_itr = m_registry.begin(); registry_itr != m_registry.end(); ++registry_itr) { version_t field_next_ver = registry_itr->second->getNextVersionOf(version); @@ -55,6 +56,25 @@ version_t PersistentRegistry::getMinimumVersionAfter(version_t version) { min = field_next_ver; } } + dbg_trace(m_logger, "PersistentRegistry: getMinimumVersionAfter({}) returning {}", version, min); + return min; +} + +version_t PersistentRegistry::getNextSignedVersion(version_t version) const { + version_t min = -1; + bool min_initialized = false; + for(auto registry_itr = m_registry.begin(); registry_itr != m_registry.end(); ++registry_itr) { + // Skip non-signed fields + if(registry_itr->second->getSignatureSize() == 0) { + continue; + } + version_t field_next_ver = registry_itr->second->getNextVersionOf(version); + if(field_next_ver != INVALID_VERSION && (!min_initialized || field_next_ver < min)) { + min = field_next_ver; + min_initialized = true; + } + } + dbg_trace(m_logger, "PersistentRegistry: getNextSignedVersion({}) returning {}", version, min); return min; } @@ -72,35 +92,43 @@ void PersistentRegistry::initializeLastSignature(version_t version, } } -void PersistentRegistry::sign(version_t latest_version, openssl::Signer& signer, uint8_t* signature_buffer) { - version_t cur_nonempty_version = getMinimumVersionAfter(m_lastSignedVersion); - dbg_debug(m_logger, "PersistentRegistry: sign() called with lastSignedVersion = {}, latest_version = {}. First version to sign = {}", m_lastSignedVersion, latest_version, cur_nonempty_version); - while(cur_nonempty_version != INVALID_VERSION && cur_nonempty_version <= latest_version) { - dbg_trace(m_logger, "PersistentRegistry: Attempting to sign version {} out of {}", cur_nonempty_version, latest_version); +version_t PersistentRegistry::sign(openssl::Signer& signer, uint8_t* signature_buffer) { + version_t current_version = getCurrentVersion(); + version_t cur_signable_version = getNextSignedVersion(m_lastSignedVersion); + dbg_debug(m_logger, "PersistentRegistry: sign() called with lastSignedVersion = {}, current_version = {}. First version to sign = {}", m_lastSignedVersion, current_version, cur_signable_version); + // If there is nothing to do because the latest non-empty version has already been signed, + // ensure the latest signature still gets returned in the output buffer + if(cur_signable_version == INVALID_VERSION || cur_signable_version > current_version) { + memcpy(signature_buffer, m_lastSignature.data(), m_lastSignature.size()); + } + while(cur_signable_version != INVALID_VERSION && cur_signable_version <= current_version) { + dbg_trace(m_logger, "PersistentRegistry: Attempting to sign version {} out of {}", cur_signable_version, current_version); signer.init(); std::size_t bytes_signed = 0; for(auto& field : m_registry) { - bytes_signed += field.second->updateSignature(cur_nonempty_version, signer); + dbg_trace(m_logger, "PersistentRegistry: Signing persistent field at {}", fmt::ptr(field.second)); + bytes_signed += field.second->updateSignature(cur_signable_version, signer); } if(bytes_signed == 0) { - // That version did not exist in any field, so there was nothing to sign. This should not happen with getMinimumVersionAfter(). - dbg_warn(m_logger, "Logic error in PersistentRegistry: Version {} was returned by getMinimumVersionAfter(), but it did not exist in any field", cur_nonempty_version); - cur_nonempty_version = getMinimumVersionAfter(cur_nonempty_version); + // That version did not exist in any field, so there was nothing to sign. This should not happen with getNextSignedVersion(). + dbg_warn(m_logger, "Logic error in PersistentRegistry: Version {} was returned by getNextSignedVersion(), but no field signed any data for it", cur_signable_version); + cur_signable_version = getNextSignedVersion(cur_signable_version); continue; } signer.add_bytes(m_lastSignature.data(), m_lastSignature.size()); signer.finalize(signature_buffer); // After computing a signature over all fields of the object, go back and // tell each field to add that signature to its log. - dbg_debug(m_logger, "PersistentRegistry: Adding signature to log in version {}, setting its previous signed version to {}", cur_nonempty_version, m_lastSignedVersion); + dbg_debug(m_logger, "PersistentRegistry: Adding signature to log in version {}, setting its previous signed version to {}", cur_signable_version, m_lastSignedVersion); for(auto& field : m_registry) { - field.second->addSignature(cur_nonempty_version, signature_buffer, m_lastSignedVersion); + field.second->addSignature(cur_signable_version, signature_buffer, m_lastSignedVersion); } memcpy(m_lastSignature.data(), signature_buffer, m_lastSignature.size()); - m_lastSignedVersion = cur_nonempty_version; + m_lastSignedVersion = cur_signable_version; // Advance the current version to the next non-empty version, or INVALID_VERSION if it is already at the latest version - cur_nonempty_version = getMinimumVersionAfter(cur_nonempty_version); + cur_signable_version = getNextSignedVersion(cur_signable_version); } + return m_lastSignedVersion; } bool PersistentRegistry::getSignature(version_t version, uint8_t* signature_buffer) { @@ -121,6 +149,7 @@ bool PersistentRegistry::verify(version_t version, openssl::Verifier& verifier, dbg_debug(m_logger, "PersistentRegistry: Verifying signature on version {}", version); verifier.init(); for(auto& field : m_registry) { + // Only adds bytes to the verifier for fields that have signatures enabled field.second->updateVerifier(version, verifier); } const std::size_t signature_size = verifier.get_max_signature_size(); @@ -129,6 +158,7 @@ bool PersistentRegistry::verify(version_t version, openssl::Verifier& verifier, // On that field, get the previous signed version, and retrieve that signature uint8_t current_sig[signature_size]; uint8_t previous_sig[signature_size]; + bool signature_found = false; for(auto& field : m_registry) { if(field.second->getSignature(version, current_sig, prev_signed_version)) { if(prev_signed_version == INVALID_VERSION) { @@ -139,14 +169,18 @@ bool PersistentRegistry::verify(version_t version, openssl::Verifier& verifier, version_t dummy; field.second->getSignature(prev_signed_version, previous_sig, dummy); } + signature_found = true; break; } } + if(!signature_found) { + dbg_warn(m_logger, "PersistentRegistry: Version {} had no fields with a signature! Unable to verify!", version); + } verifier.add_bytes(previous_sig, signature_size); return verifier.finalize(signature, signature_size); } -version_t PersistentRegistry::persist(version_t latest_version) { +version_t PersistentRegistry::persist(std::optional latest_version) { version_t min = INVALID_VERSION; for(auto& entry : m_registry) { version_t ver = entry.second->persist(latest_version); @@ -175,6 +209,7 @@ int64_t PersistentRegistry::getMinimumLatestPersistedVersion() { min = ver; } } + dbg_trace(m_logger, "PersistentRegistry: getMinimumLatestPersistedVersion() returning {}", min); return min; } diff --git a/src/persistent/test.cpp b/src/persistent/test.cpp index 7868cb3a..22129267 100644 --- a/src/persistent/test.cpp +++ b/src/persistent/test.cpp @@ -220,10 +220,10 @@ static void eval_write(std::size_t osize, int nops, bool batch) { clock_gettime(CLOCK_REALTIME, &ts); while(cnt-- > 0) { pvar.set(writeMe, ver++); - if(!batch) pvar.persist(ver-1); + if(!batch) pvar.persist(); } if(batch) { - pvar.persist(ver); + pvar.persist(); } #if defined(_PERFORMANCE_DEBUG) @@ -366,8 +366,10 @@ int main(int argc, char** argv) { std::cout << "signature=" << std::endl; dump_binary_buffer(sig_buf,sig_size); npx.addSignature(ver,sig_buf,prev_ver); + npx.persist(ver); + } else { + npx.persist(); } - npx.persist(ver); } else if(strcmp(argv[1], "verify") == 0) { if (!use_signature) { std::cout << "unable to verify without signature...exit." << std::endl; @@ -396,7 +398,7 @@ int main(int argc, char** argv) { memcpy((*npx_logtail).buf, v, strlen(v) + 1); (*npx_logtail).data_len = strlen(v) + 1; npx_logtail.version(ver); - npx_logtail.persist(ver); + npx_logtail.persist(); } #define LOGTAIL_FILE "logtail.ser" else if(strcmp(argv[1], "logtail-serialize") == 0) { @@ -461,17 +463,17 @@ int main(int argc, char** argv) { X x; x.x = 1; px2.set(x, ver++); - px2.persist(ver-1); + px2.persist(); cout << "after set 1" << endl; listvar(px2); x.x = 10; px2.set(x, ver++); - px2.persist(ver-1); + px2.persist(); cout << "after set 10" << endl; listvar(px2); x.x = 100; px2.set(x, ver++); - px2.persist(ver-1); + px2.persist(); cout << "after set 100" << endl; listvar(px2); } else if(strcmp(argv[1], "hlc") == 0) { @@ -516,8 +518,10 @@ int main(int argc, char** argv) { std::cout << "signature=" << std::endl; dump_binary_buffer(sig_buf,sig_size); dx.addSignature(ver,sig_buf,prev_ver); + dx.persist(ver); + } else { + dx.persist(); } - dx.persist(ver); } else if(strcmp(argv[1], "delta-sub") == 0) { int op = std::stoi(argv[2]); int64_t ver = (int64_t)atoi(argv[3]); @@ -537,8 +541,10 @@ int main(int argc, char** argv) { std::cout << "signature=" << std::endl; dump_binary_buffer(sig_buf,sig_size); dx.addSignature(ver,sig_buf,prev_ver); + dx.persist(ver); + } else { + dx.persist(); } - dx.persist(ver); } else if(strcmp(argv[1], "delta-list") == 0) { cout << "Persistent:" << endl; listvar(dx);