1 #ifndef PERSISTENT_IMPL_HPP 2 #define PERSISTENT_IMPL_HPP 6 template <
typename int_type>
8 return static_cast<version_t>((
static_cast<uint64_t
>(high_bits) << 32) | (0xffffffffll & low_bits));
11 template <
typename int_type>
13 return std::make_pair(static_cast<int_type>(packed_int >> 32), static_cast<int_type>(0xffffffffll & packed_int));
20 template <
int funcIdx,
typename... Args>
24 std::get<funcIdx>(itr->second)(args...);
28 template <
int funcIdx,
typename ReturnType,
typename... Args>
30 ReturnType min_ret = -1;
33 ReturnType ret = std::get<funcIdx>(itr->second)(args...);
36 }
else if(min_ret > ret) {
46 template <
typename ObjectType, StorageType storageType>
49 if(pthread_spin_init(&this->m_oLck, PTHREAD_PROCESS_SHARED) != 0) {
54 template <
typename ObjectType, StorageType storageType>
56 pthread_spin_destroy(&this->m_oLck);
59 template <
typename ObjectType, StorageType storageType>
62 if(pthread_spin_lock(&this->m_oLck) != 0) {
65 cnt = this->m_iCounter++;
66 if(pthread_spin_unlock(&this->m_oLck) != 0) {
69 std::unique_ptr<std::string> ret = std::make_unique<std::string>();
72 sprintf(buf,
"%s-%d-%s-%d", (prefix) ? prefix :
"none", storageType, this->m_sObjectTypeName, cnt);
81 template <
typename ObjectType,
85 this->m_pLog =
nullptr;
89 this->m_pLog = std::make_unique<FilePersistLog>(object_name);
90 if(this->m_pLog ==
nullptr) {
98 if(this->m_pLog ==
nullptr) {
109 template <
typename ObjectType,
113 if(this->getNumOfVersions() > 0) {
115 this->m_pWrappedObject = this->getByIndex(this->getLatestIndex(), dm);
117 this->m_pWrappedObject = object_factory();
121 template <
typename ObjectType,
124 if(this->m_pRegistry !=
nullptr) {
125 this->m_pRegistry->registerPersist(
126 this->m_pLog->m_sName.c_str(),
136 template <
typename ObjectType,
139 if(this->m_pRegistry !=
nullptr && this->m_pLog !=
nullptr) {
140 this->m_pRegistry->unregisterPersist(this->m_pLog->m_sName.c_str());
144 template <
typename ObjectType,
147 const std::function<std::unique_ptr<ObjectType>(
void)>& object_factory,
148 const char* object_name,
151 : m_pRegistry(persistent_registry) {
153 initialize_log((object_name ==
nullptr) ? (*
Persistent::getNameMaker().make(persistent_registry ? persistent_registry->get_subgroup_prefix() :
nullptr)).c_str() : object_name);
155 initialize_object_from_log(object_factory, &dm);
157 register_callbacks();
160 template <
typename ObjectType,
163 this->m_pWrappedObject = std::move(other.m_pWrappedObject);
164 this->m_pLog = std::move(other.m_pLog);
165 this->m_pRegistry = other.m_pRegistry;
166 register_callbacks();
169 template <
typename ObjectType,
172 const char* object_name,
173 std::unique_ptr<ObjectType>& wrapped_obj_ptr,
174 const char* log_tail,
177 : m_pRegistry(persistent_registry) {
179 initialize_log(object_name);
181 if(log_tail !=
nullptr) {
182 this->m_pLog->applyLogTail(log_tail);
185 assert(wrapped_obj_ptr !=
nullptr);
186 this->m_pWrappedObject = std::move(wrapped_obj_ptr);
188 register_callbacks();
191 template <
typename ObjectType,
202 unregister_callbacks();
205 template <
typename ObjectType,
208 return *this->m_pWrappedObject;
211 template <
typename ObjectType,
214 return this->m_pWrappedObject.get();
217 template <
typename ObjectType,
220 return *this->m_pWrappedObject;
223 template <
typename ObjectType,
226 return this->m_pLog->m_sName;
229 template <
typename ObjectType,
231 template <
typename Func>
235 return this->getByIndex(-1L, fun, dm);
238 template <
typename ObjectType,
242 return this->getByIndex(-1L, dm);
245 template <
typename ObjectType,
247 template <
typename Func>
254 return f(*this->getByIndex(idx, dm));
257 return mutils::deserialize_and_run<ObjectType>(dm, (
char*)this->m_pLog->getEntryByIndex(idx), fun);
261 template <
typename ObjectType,
269 std::unique_ptr<ObjectType> p = ObjectType::create(dm);
271 for(int64_t i = this->m_pLog->getEarliestIndex(); i <= idx; i++) {
272 const char* entry_data = (
const char*)this->m_pLog->getEntryByIndex(i);
273 p->applyDelta(entry_data);
279 return mutils::from_bytes<ObjectType>(dm, (
char const*)this->m_pLog->getEntryByIndex(idx));
283 template <
typename ObjectType,
285 template <
typename Func>
290 char* pdat = (
char*)this->m_pLog->getEntry(ver);
291 if(pdat ==
nullptr) {
297 return f(*this->
get(ver, dm));
300 return mutils::deserialize_and_run<ObjectType>(dm, pdat, fun);
304 template <
typename ObjectType,
309 int64_t idx = this->m_pLog->getVersionIndex(ver);
316 return getByIndex(idx, dm);
319 return mutils::from_bytes<ObjectType>(dm, (
const char*)this->m_pLog->getEntryByIndex(idx));
323 template <
typename ObjectType,
325 template <
typename TKey>
328 this->m_pLog->trim(k);
332 template <
typename ObjectType,
336 this->m_pLog->truncate(ver);
340 template <
typename ObjectType,
342 template <
typename Func>
348 if(m_pRegistry !=
nullptr && m_pRegistry->getFrontier() <= hlc) {
354 int64_t idx = this->m_pLog->getHLCIndex(hlc);
358 return getByIndex(idx, fun, dm);
361 char* pdat = (
char*)this->m_pLog->getEntry(hlc);
362 if(pdat ==
nullptr) {
365 return mutils::deserialize_and_run<ObjectType>(dm, pdat, fun);
369 template <
typename ObjectType,
375 if(m_pRegistry !=
nullptr && m_pRegistry->getFrontier() <= hlc) {
380 int64_t idx = this->m_pLog->getHLCIndex(hlc);
384 return getByIndex(idx, dm);
387 char const* pdat = (
char const*)this->m_pLog->getEntry(hlc);
388 if(pdat ==
nullptr) {
391 return mutils::from_bytes<ObjectType>(dm, pdat);
395 template <
typename ObjectType,
398 return this->m_pLog->getLength();
401 template <
typename ObjectType,
404 return this->m_pLog->getEarliestIndex();
407 template <
typename ObjectType,
410 return this->m_pLog->getEarliestVersion();
413 template <
typename ObjectType,
416 return this->m_pLog->getLatestIndex();
419 template <
typename ObjectType,
422 return this->m_pLog->getLatestVersion();
425 template <
typename ObjectType,
428 return this->m_pLog->getLastPersisted();
431 template <
typename ObjectType,
434 dbg_default_trace(
"append to log with ver({}),hlc({},{})", ver, mhlc.m_rtc_us, mhlc.m_logic);
437 v.finalizeCurrentDelta([&](
char const*
const buf,
size_t len) {
438 this->m_pLog->append((
const void*
const)buf, len, ver, mhlc);
444 char* buf =
new char[size];
447 this->m_pLog->append((
void*)buf, size, ver, mhlc);
452 template <
typename ObjectType,
456 #if defined(_PERFORMANCE_DEBUG) || defined(DERECHO_DEBUG) 457 struct timespec t1, t2;
458 clock_gettime(CLOCK_REALTIME, &t1);
460 this->
set(v, ver, mhlc);
462 #if defined(_PERFORMANCE_DEBUG) || defined(DERECHO_DEBUG) 463 clock_gettime(CLOCK_REALTIME, &t2);
465 ns_in_set += ((t2.tv_sec - t1.tv_sec) * 1000000000ul + t2.tv_nsec - t1.tv_nsec);
469 template <
typename ObjectType,
473 this->
set(*this->m_pWrappedObject, ver);
476 template <
typename ObjectType,
479 #if defined(_PERFORMANCE_DEBUG) || defined(DERECHO_DEBUG) 480 struct timespec t1, t2;
481 clock_gettime(CLOCK_REALTIME, &t1);
482 const int64_t ret = this->m_pLog->persist();
483 clock_gettime(CLOCK_REALTIME, &t2);
485 ns_in_persist += ((t2.tv_sec - t1.tv_sec) * 1000000000ul + t2.tv_nsec - t1.tv_nsec);
488 return this->m_pLog->persist();
489 #endif //_PERFORMANCE_DEBUG 492 template <
typename ObjectType,
497 dbg_default_trace(
"{0}[{1}] object_name starts at {2}", this->m_pLog->m_sName, __func__, sz);
500 dbg_default_trace(
"{0}[{1}] wrapped_object starts at {2}", this->m_pLog->m_sName, __func__, sz);
503 dbg_default_trace(
"{0}[{1}] log starts at {2}", this->m_pLog->m_sName, __func__, sz);
508 template <
typename ObjectType,
514 template <
typename ObjectType,
523 template <
typename ObjectType,
528 auto obj_name = mutils::from_bytes<std::string>(dsm, v);
532 auto wrapped_obj = mutils::from_bytes<ObjectType>(dsm, v + ofst);
540 dbg_default_trace(
"{0}[{1}] create object from serialized bytes.", obj_name->c_str(), __func__);
541 return std::make_unique<Persistent>(obj_name->data(), wrapped_obj, v + ofst,
pr);
544 template <
typename ObjectType,
547 this->m_pLog->applyLogTail(v);
550 #if defined(_PERFORMANCE_DEBUG) || defined(DERECHO_DEBUG) 551 template <
typename ObjectType,
554 std::cout <<
"Performance Statistic of Persistent<" 555 <<
typeid(ObjectType).name() <<
">,var_name=" 556 << this->m_pLog->m_sName <<
":" << std::endl;
557 std::cout <<
"\tpersist:\t" << ns_in_persist <<
" ns/" 558 << cnt_in_persist <<
" ops" << std::endl;
560 std::cout <<
"\tset:\t" << ns_in_set <<
" ns/" 561 << cnt_in_set <<
" ops" << std::endl;
564 #endif //_PERFORMANCE_DEBUG 566 template <
typename ObjectType, StorageType storageType>
568 static std::map<std::string, _NameMaker<ObjectType, storageType>> name_makers;
570 auto search = name_makers.find(prefix);
571 if(search == name_makers.end()) {
575 return name_makers[prefix];
578 template <
typename ObjectType, StorageType storageType>
579 void saveObject(ObjectType& obj,
const char* object_name) noexcept(
false) {
580 switch(storageType) {
596 template <
typename ObjectType, StorageType storageType>
597 std::unique_ptr<ObjectType>
loadObject(
const char* object_name) noexcept(
false) {
598 switch(storageType) {
601 return loadNoLogObjectFromFile<ObjectType>(object_name);
604 return loadNoLogObjectFromMem<ObjectType>(object_name);
610 template <StorageType storageType>
611 const typename std::enable_if<(storageType == ST_FILE || storageType == ST_MEM), version_t>::type
getMinimumLatestPersistedVersion(
const std::type_index& subgroup_type, uint32_t subgroup_index, uint32_t shard_num) {
625 #endif //PERSISTENT_IMPL_HPP This file include all common types internal to derecho and not necessarily being known by a client pr...
#define PERSIST_EXP_SPIN_LOCK(x)
virtual int64_t getLatestIndex() noexcept(false)
get the latest index excluding truncated ones.
const ObjectType & getConstRef() const
get a const reference to the wrapped object
virtual int64_t getEarliestIndex() noexcept(false)
get the earliest index excluding trimmed ones.
virtual int64_t getEarliestVersion() noexcept(false)
get the earliest version excluding trimmed ones.
std::pair< int_type, int_type > unpack_version(const version_t packed_int)
Helper function for unpacking a Persistent version number into two signed or unsigned int32 values...
virtual void version(const version_t &ver) noexcept(false)
make a version with only a version number, using the current state.
virtual int64_t getNumOfVersions() noexcept(false)
get the number of versions excluding trimmed ones.
virtual int64_t getLatestVersion() noexcept(false)
get the lastest version excluding truncated ones.
#define PERSIST_EXP_NEW_FAILED_UNKNOWN
std::size_t to_bytes(char *ret) const
Write this class's marshalled representation into the array found at v.
void trim(const TKey &k) noexcept(false)
trim by version or index
void truncate(const int64_t &ver)
truncate the log
ReturnType callFuncMin(Args... args)
Helper function II.
auto bytes_size(const T &)
Just calls sizeof(T)
virtual ~_NameMaker() noexcept(true)
#define PERSIST_EXP_SPIN_INIT(x)
void saveNoLogObjectInMem(ObjectType &obj, const char *object_name) noexcept(false)
#define PERSIST_EXP_BEYOND_GSF
The manager for any RemoteDeserializationContexts.
const int64_t getMinimumLatestPersistedVersion() noexcept(false)
Returns the minimum of the latest persisted versions among all Persistent fields. ...
std::unique_ptr< std::string > make(const char *prefix) noexcept(false)
std::size_t bytes_size() const
the size of the marshalled representation of this object.
ObjectType * operator->()
overload the '->' operator to access the wrapped object
void initialize_object_from_log(const std::function< std::unique_ptr< ObjectType >(void)> &object_factory, mutils::DeserializationManager *dm)
initialize the object from log
static int64_t getEarliestVersionToSerialize() noexcept(true)
Returns the earliest version for serialization.
std::map< std::size_t, std::tuple< VersionFunc, PersistFunc, TrimFunc, LatestPersistedGetterFunc, TruncateFunc > > _registry
Callback registry.
virtual ~Persistent() noexcept(true)
destructor: release the resources
#define PERSIST_EXP_STORAGE_TYPE_UNKNOWN(x)
virtual const int64_t persist() noexcept(false)
persist till version
#define PERSIST_EXP_INV_VERSION
static std::unique_ptr< Persistent > from_bytes(mutils::DeserializationManager *dsm, char const *v)
static _NameMaker< ObjectType, storageType > & getNameMaker(const std::string &prefix=std::string(""))
#define dbg_default_trace(...)
void callFunc(Args... args)
Helper function I.
void post_object(const std::function< void(char const *const, std::size_t)> &f) const
Pass a pointer to a buffer containing this class's marshalled representation into the function f...
void applyLogTail(mutils::DeserializationManager *dsm, char const *v)
void initialize_log(const char *object_name) noexcept(false)
initialize from local state.
PersistentRegistry pr(nullptr, typeid(ReplicatedT), 123, 321)
std::unique_ptr< ObjectType > loadObject(const char *object_name) noexcept(false)
loadObject() loads a serializable object from a persistent store
_NameMaker() noexcept(false)
void unregister_callbacks() noexcept(false)
unregister the callbacks.
void saveObject(ObjectType &obj, const char *object_name) noexcept(false)
saveObject() saves a serializable object
#define PERSIST_EXP_INV_HLC
void saveNoLogObjectInFile(ObjectType &obj, const char *object_name) noexcept(false)
save object in file
version_t combine_int32s(const int_type high_bits, const int_type low_bits)
Helper function for creating Persistent version numbers out of MulticastGroup sequence numbers and Vi...
void register_callbacks() noexcept(false)
register the callbacks.
std::string getPersRamdiskPath()
virtual void print_performance_stat()
std::enable_if_t< std::is_pod< BR >::value > post_object(const F &f, const BR &br, Args &&... args)
In-place serialization is also sometimes possible.
auto getByIndex(int64_t idx, const Func &fun, mutils::DeserializationManager *dm=nullptr) noexcept(false)
get a version of Value T.
std::size_t to_bytes(const ByteRepresentable &b, char *v)
calls b.to_bytes(v) when b is a ByteRepresentable; calls std::memcpy() when b is POD.
static const uint64_t getMinimumLatestPersistedVersion(const std::string &prefix)
Get the minimum latest persisted version for a subgroup/shard with prefix prefix the subgroup/shard ...
Persistent(const std::function< std::unique_ptr< ObjectType >(void)> &object_factory, const char *object_name=nullptr, PersistentRegistry *persistent_registry=nullptr, mutils::DeserializationManager dm={{}}) noexcept(false)
constructor 1 is for building a persistent<T> locally, load/create a log and register itself to a per...
auto get(const Func &fun, mutils::DeserializationManager *dm=nullptr) noexcept(false)
get the latest Value of T.
#define PERSIST_EXP_SPIN_UNLOCK(x)
virtual const int64_t getLastPersisted() noexcept(false)
get the last persisted index.
virtual void set(ObjectType &v, const version_t &ver, const HLC &mhlc) noexcept(false)
make a version with a version number and mhlc clock
const std::string & getObjectName()
get object name
static std::string generate_prefix(const std::type_index &subgroup_type, uint32_t subgroup_index, uint32_t shard_num) noexcept(true)
prefix generator prefix format: [hex of subgroup_type]-[subgroup_index]-[shard_num] ...
PersistentRegistry is a book for all the Persistent<T> or Volatile<T> variables.