Skip to content

Commit

Permalink
Adapt to the new IDeltaSupport API to save a copy during persistence
Browse files Browse the repository at this point in the history
  • Loading branch information
songweijia committed Apr 30, 2024
1 parent 37980ac commit 52e98f9
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 116 deletions.
101 changes: 101 additions & 0 deletions .github/workflows/build_use_zerocopy_delta_api.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
name: General Build Test

on:
push:
branches: [ "use_zerocopy_delta_api" ]
pull_request:
branches: [ "use_zerocopy_delta_api" ]

jobs:
build-check:
runs-on: ubuntu-latest
steps:
- run: echo "The job is automatically triggered by a ${{github.event_name}} event."
- name: "Install APT packages"
run: >
sudo apt update;
sudo apt install libssl-dev librdmacm-dev libibverbs-dev libspdlog-dev -y;
sudo apt install libboost-all-dev ragel python3.10 python3-pip -y
- run: g++ --version
- run: cmake --version
- run: lscpu
- run: df -ha
- run: cat /proc/meminfo
- name: Checkout Derecho code
uses: actions/checkout@v3
with:
repository: 'songweijia/derecho'
ref: 'master'
path: derecho
- run: echo "The Derecho HEAD repository has been clone to the runner."
- name: Checkout Cascade code
uses: actions/checkout@v3
with:
path: 'cascade'
- run: echo "The ${{ github.repository }} repository has been cloned to the runner."
- name: Install libfabric
run: ${{github.workspace}}/derecho/scripts/prerequisites/install-libfabric.sh ${{github.workspace}}/opt
- name: Install json library
run: ${{github.workspace}}/derecho/scripts/prerequisites/install-json.sh ${{github.workspace}}/opt
- name: Install mutils
run: ${{github.workspace}}/derecho/scripts/prerequisites/install-mutils.sh ${{github.workspace}}/opt
- name: Install mutils-containers
run: ${{github.workspace}}/derecho/scripts/prerequisites/install-mutils-containers.sh ${{github.workspace}}/opt
- run: mkdir ${{github.workspace}}/derecho/build-Release
- name: Build Derecho
run: >
cd ${{github.workspace}}/derecho/build-Release;
export PREFIX=${{ github.workspace }}/opt;
export CMAKE_PREFIX_PATH=$PREFIX;
export C_INCLUDE_PATH=$PREFIX/include/;
export CPLUS_INCLUDE_PATH=$PREFIX/include/;
export LIBRARY_PATH=$PREFIX/lib/:$PREFIX/lib64/;
export LD_LIBRARY_PATH=$PREFIX/lib/:$PREFIX/lib64/;
cmake -DCMAKE_BUILD_TYPE=Release -DCMAKE_EXPORT_COMPILE_COMMANDS=ON -DCMAKE_INSTALL_PREFIX=${PREFIX} ..;
make -j2;
make install
- name: Install rpclib
run: ${{github.workspace}}/cascade/scripts/prerequisites/install-rpclib.sh ${{github.workspace}}/opt
- name: Install hyperscan
run: ${{github.workspace}}/cascade/scripts/prerequisites/install-hyperscan.sh ${{github.workspace}}/opt
- name: Install libwsong
run: ${{github.workspace}}/cascade/scripts/prerequisites/install-libwsong.sh ${{github.workspace}}/opt
- name: Install python dependencies
run: >
pip3 install -r ${{github.workspace}}/cascade/src/service/python/requirements.txt;
pip3 install -r ${{github.workspace}}/cascade/src/udl_zoo/python/requirements.txt
- run: mkdir ${{github.workspace}}/cascade/build-Release
- name: Build Cascade
run: >
cd ${{github.workspace}}/cascade/build-Release;
export PREFIX=${{github.workspace}}/opt;
export CMAKE_PREFIX_PATH=$PREFIX;
export C_INCLUDE_PATH=$PREFIX/include/;
export CPLUS_INCLUDE_PATH=$PREFIX/include/;
export LIBRARY_PATH=$PREFIX/lib/:$PREFIX/lib64/;
export LD_LIBRARY_PATH=$PREFIX/lib/:$PREFIX/lib64/;
export PATH=$PATH:$HOME/.local/bin/;
cmake -DCMAKE_BUILD_TYPE=Release -DCMAKE_EXPORT_COMPILE_COMMANDS=ON -DCMAKE_INSTALL_PREFIX=${PREFIX} -Dpybind11_DIR=`pybind11-config --cmakedir` ..;
make -j2;
make install;
cd ${{github.workspace}}/cascade/src/applications/standalone/console_printer_udl;
mkdir build;
cd build;
cmake ..;
make -j2;
cd ${{github.workspace}}/cascade/src/applications/standalone/dds;
mkdir build;
cd build;
cmake ..;
make -j2;
cd ${{github.workspace}}/cascade/src/applications/standalone/kvs_client;
mkdir build;
cd build;
cmake ..;
make -j2;
cd ${{github.workspace}}/cascade/src/applications/standalone/notification;
mkdir build;
cd build;
cmake ..;
make -j2
- run: echo "The job's status is ${{job.status}}."
45 changes: 9 additions & 36 deletions include/cascade/detail/delta_store_core.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,6 @@ namespace cascade {
template <typename KT, typename VT, KT* IK, VT* IV>
class DeltaCascadeStoreCore : public mutils::ByteRepresentable,
public persistent::IDeltaSupport<DeltaCascadeStoreCore<KT, VT, IK, IV>> {
// TODO: use max payload size in subgroup configuration.
#define DEFAULT_DELTA_BUFFER_CAPACITY (4096)
/**
* Initialize the delta data structure.
*/
void initialize_delta();

private:
#if defined(__i386__) || defined(__x86_64__) || defined(_M_AMD64) || defined(_M_IX86)
mutable std::atomic<persistent::version_t> lockless_v1;
Expand All @@ -36,39 +29,19 @@ class DeltaCascadeStoreCore : public mutils::ByteRepresentable,
#endif

public:
// delta
typedef struct {
size_t capacity;
size_t len;
uint8_t* buffer;
// methods
inline void set_data_len(const size_t& dlen);
inline uint8_t* data_ptr();
inline void calibrate(const size_t& dlen);
inline bool is_empty();
inline void clean();
inline void destroy();
} _Delta;
_Delta delta;

struct DeltaBytesFormat {
uint32_t op;
uint8_t first_data_byte;
};

/** The delta is a list of keys for the objects that are changed by put or remove. */
std::vector<KT> delta;
/** The KV map */
std::map<KT, VT> kv_map;

//////////////////////////////////////////////////////////////////////////
// Delta is represented by an operation id and a list of
// argument. The operation id (OPID) is a 4 bytes integer.
// 1) put(const Object& object):
// [OPID:PUT] [value]
// 2) remove(const KT& key)
// [OPID:REMOVE][key]
// 3) get(const KT& key)
// no need to prepare a delta
// Delta is represented by a list of objects for both put and remove
// operations, where the latter one is a list of objects with only key but
// is empty. Get operations will not create a delta. The first 4 bytes of
// the delta is the number of deltas.
///////////////////////////////////////////////////////////////////////////
virtual void finalizeCurrentDelta(const persistent::DeltaFinalizer& df) override;
virtual size_t currentDeltaSize() override;
virtual size_t currentDeltaToBytes(uint8_t * const buf, size_t buf_size) override;
virtual void applyDelta(uint8_t const* const delta) override;
static std::unique_ptr<DeltaCascadeStoreCore<KT, VT, IK, IV>> create(mutils::DeserializationManager* dm);
/**
Expand Down
117 changes: 38 additions & 79 deletions include/cascade/detail/delta_store_core_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "debug_util.hpp"

#include <derecho/core/derecho.hpp>
#include <derecho/mutils-serialization/SerializationSupport.hpp>
#include <derecho/persistent/Persistent.hpp>

#ifdef ENABLE_EVALUATION
Expand All @@ -20,82 +21,50 @@

namespace derecho {
namespace cascade {
template <typename KT, typename VT, KT* IK, VT* IV>
void DeltaCascadeStoreCore<KT, VT, IK, IV>::_Delta::set_data_len(const size_t& dlen) {
assert(capacity >= dlen);
this->len = dlen;
}

template <typename KT, typename VT, KT* IK, VT* IV>
uint8_t* DeltaCascadeStoreCore<KT, VT, IK, IV>::_Delta::data_ptr() {
assert(buffer != nullptr);
return buffer;
}

template <typename KT, typename VT, KT* IK, VT* IV>
void DeltaCascadeStoreCore<KT, VT, IK, IV>::_Delta::calibrate(const size_t& dlen) {
size_t new_cap = dlen;
if(this->capacity >= new_cap) {
return;
}
// calculate new capacity
int width = sizeof(size_t) << 3;
int right_shift_bits = 1;
new_cap--;
while(right_shift_bits < width) {
new_cap |= new_cap >> right_shift_bits;
right_shift_bits = right_shift_bits << 1;
}
new_cap++;
// resize
this->buffer = (uint8_t*)realloc(buffer, new_cap);
if(this->buffer == nullptr) {
dbg_default_crit("{}:{} Failed to allocate delta buffer. errno={}", __FILE__, __LINE__, errno);
throw derecho::derecho_exception("Failed to allocate delta buffer.");
} else {
this->capacity = new_cap;
size_t DeltaCascadeStoreCore<KT, VT, IK, IV>::currentDeltaSize() {
size_t delta_size = 0;
if (delta.size() > 0) {
delta_size += mutils::bytes_size(delta.size());
for (const auto& k:delta) {
delta_size+=mutils::bytes_size(this->kv_map[k]);
}
}
return delta_size;
}

template <typename KT, typename VT, KT* IK, VT* IV>
bool DeltaCascadeStoreCore<KT, VT, IK, IV>::_Delta::is_empty() {
return (this->len == 0);
}

template <typename KT, typename VT, KT* IK, VT* IV>
void DeltaCascadeStoreCore<KT, VT, IK, IV>::_Delta::clean() {
this->len = 0;
}

template <typename KT, typename VT, KT* IK, VT* IV>
void DeltaCascadeStoreCore<KT, VT, IK, IV>::_Delta::destroy() {
if(this->capacity > 0) {
free(this->buffer);
size_t DeltaCascadeStoreCore<KT, VT, IK, IV>::currentDeltaToBytes(uint8_t * const buf, size_t buf_size) {
size_t delta_size = currentDeltaSize();
if (delta_size == 0) return 0;
if (delta_size > buf_size) {
dbg_default_error("{}: failed because we need {} bytes for delta, but only a buffer with {} bytes given.\n",
__PRETTY_FUNCTION__, delta_size, buf_size);
}
}

template <typename KT, typename VT, KT* IK, VT* IV>
void DeltaCascadeStoreCore<KT, VT, IK, IV>::initialize_delta() {
delta.buffer = (uint8_t*)malloc(DEFAULT_DELTA_BUFFER_CAPACITY);
if(delta.buffer == nullptr) {
dbg_default_crit("{}:{} Failed to allocate delta buffer. errno={}", __FILE__, __LINE__, errno);
throw derecho::derecho_exception("Failed to allocate delta buffer.");
size_t offset = mutils::to_bytes(delta.size(),buf);
for(const auto& k:delta) {
offset += mutils::to_bytes(this->kv_map[k],buf+offset);
}
delta.capacity = DEFAULT_DELTA_BUFFER_CAPACITY;
delta.len = 0;
return offset;
}

template <typename KT, typename VT, KT* IK, VT* IV>
void DeltaCascadeStoreCore<KT, VT, IK, IV>::finalizeCurrentDelta(const persistent::DeltaFinalizer& df) {
df(this->delta.buffer, this->delta.len);
this->delta.clean();
}

template <typename KT, typename VT, KT* IK, VT* IV>
void DeltaCascadeStoreCore<KT, VT, IK, IV>::applyDelta(uint8_t const* const delta) {
mutils::deserialize_and_run(nullptr, delta, [this](const VT& value) {
this->apply_ordered_put(value);
});
void DeltaCascadeStoreCore<KT, VT, IK, IV>::applyDelta(uint8_t const* const serialized_delta) {
auto num_objects =
*mutils::from_bytes<
std::result_of_t<decltype(&std::vector<KT>::size)(std::vector<KT>)>
>(nullptr,serialized_delta);
size_t offset = mutils::bytes_size(num_objects);
while (num_objects--) {
offset +=
mutils::deserialize_and_run(nullptr, serialized_delta + offset,
[this](const VT& value) {
this->apply_ordered_put(value);
return mutils::bytes_size(value);
}
);
}
}

template <typename KT, typename VT, KT* IK, VT* IV>
Expand Down Expand Up @@ -166,10 +135,8 @@ bool DeltaCascadeStoreCore<KT, VT, IK, IV>::ordered_put(const VT& value, persist
value.set_previous_version(prev_ver, prev_ver_by_key);
}
// create delta.
assert(this->delta.is_empty());
this->delta.calibrate(mutils::bytes_size(value));
mutils::to_bytes(value, this->delta.data_ptr());
this->delta.set_data_len(mutils::bytes_size(value));
assert(this->delta.empty());
this->delta.push_back(value.get_key_ref());
// apply_ordered_put
apply_ordered_put(value);
return true;
Expand All @@ -191,10 +158,8 @@ bool DeltaCascadeStoreCore<KT, VT, IK, IV>::ordered_remove(const VT& value, pers
value.set_previous_version(prev_ver, kv_map.at(key).get_version());
}
// create delta.
assert(this->delta.is_empty());
this->delta.calibrate(mutils::bytes_size(value));
mutils::to_bytes(value, this->delta.data_ptr());
this->delta.set_data_len(mutils::bytes_size(value));
assert(this->delta.empty());
this->delta.push_back(key);
// apply_ordered_put
apply_ordered_put(value);
return true;
Expand Down Expand Up @@ -346,28 +311,22 @@ uint64_t DeltaCascadeStoreCore<KT, VT, IK, IV>::lockless_get_size(const KT& key)
template <typename KT, typename VT, KT* IK, VT* IV>
DeltaCascadeStoreCore<KT, VT, IK, IV>::DeltaCascadeStoreCore() : lockless_v1(persistent::INVALID_VERSION),
lockless_v2(persistent::INVALID_VERSION) {
initialize_delta();
}

template <typename KT, typename VT, KT* IK, VT* IV>
DeltaCascadeStoreCore<KT, VT, IK, IV>::DeltaCascadeStoreCore(const std::map<KT, VT>& _kv_map) : lockless_v1(persistent::INVALID_VERSION),
lockless_v2(persistent::INVALID_VERSION),
kv_map(_kv_map) {
initialize_delta();
}

template <typename KT, typename VT, KT* IK, VT* IV>
DeltaCascadeStoreCore<KT, VT, IK, IV>::DeltaCascadeStoreCore(std::map<KT, VT>&& _kv_map) : lockless_v1(persistent::INVALID_VERSION),
lockless_v2(persistent::INVALID_VERSION),
kv_map(std::move(_kv_map)) {
initialize_delta();
}

template <typename KT, typename VT, KT* IK, VT* IV>
DeltaCascadeStoreCore<KT, VT, IK, IV>::~DeltaCascadeStoreCore() {
if(this->delta.buffer != nullptr) {
free(this->delta.buffer);
}
}

} // namespace cascade
Expand Down
2 changes: 1 addition & 1 deletion include/cascade/detail/persistent_store_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ const VT PersistentCascadeStore<KT, VT, IK, IV, ST>::get(const KT& key, const pe
return *IV;
} else {
// fall back to the slow path.
// following the backward chain until its version is behine requested_version.
// following the backward chain until its version is behind requested_version.
// TODO: We can introduce a per-key version index to achieve a better performance
// with a 64bit per log entry memory overhead.
VT o = persistent_core->lockless_get(key);
Expand Down

0 comments on commit 52e98f9

Please sign in to comment.