Derecho  0.9
Distributed systems toolkit for RDMA
Persistent.hpp
Go to the documentation of this file.
1 #pragma once
2 #ifndef PERSISTENT_HPP
3 #define PERSISTENT_HPP
4 
6 #include "HLC.hpp"
7 #include "PersistException.hpp"
8 #include "detail/PersistLog.hpp"
9 #include "PersistNoLog.hpp"
10 #include "PersistentTypenames.hpp"
12 #include <functional>
13 #include <inttypes.h>
14 #include <iostream>
15 #include <map>
16 #include <memory>
17 #include <pthread.h>
18 #include <string>
19 #include <sys/types.h>
20 #include <time.h>
21 #include <typeindex>
22 
23 #include <derecho/utils/logger.hpp>
24 
25 #if defined(_PERFORMANCE_DEBUG) || defined(DERECHO_DEBUG) || !defined(NDEBUG)
26  #include <derecho/utils/time.h>
27 #endif //_PERFORMANCE_DEBUG
28 
34 namespace persistent {
35 
36 // #define DEFINE_PERSIST_VAR(_t,_n) DEFINE_PERSIST_VAR(_t,_n,ST_FILE)
37 #define DEFINE_PERSIST_VAR(_t, _n, _s) \
38  Persistent<_t, _s> _n(#_n)
39 #define DECLARE_PERSIST_VAR(_t, _n, _s) \
40  extern DEFINE_PERSIST_VAR(_t, _n, _s)
41 
43 public:
44  virtual const HLC getFrontier() = 0;
45 };
46 
57 template <typename int_type>
58 version_t combine_int32s(const int_type high_bits, const int_type low_bits);
59 
70 template <typename int_type>
71 std::pair<int_type, int_type> unpack_version(const version_t packed_int);
72 
82 public:
83  /* Constructor */
86  const std::type_index& subgroup_type,
87  uint32_t subgroup_index, uint32_t shard_num);
88 
89  /* Destructor */
90  virtual ~PersistentRegistry();
91 
93  void makeVersion(const int64_t& ver, const HLC& mhlc) noexcept(false);
94 
97  const int64_t persist() noexcept(false);
98 
100  void trim(const int64_t& earliest_version) noexcept(false);
101 
103  const int64_t getMinimumLatestPersistedVersion() noexcept(false);
104 
112  static void setEarliestVersionToSerialize(const int64_t& ver) noexcept(true);
113 
115  static void resetEarliestVersionToSerialize() noexcept(true);
116 
118  static int64_t getEarliestVersionToSerialize() noexcept(true);
119 
125  void truncate(const int64_t& last_version);
126 
131  void registerPersist(const char* obj_name,
132  const VersionFunc& vf,
133  const PersistFunc& pf,
134  const TrimFunc& tf,
135  const LatestPersistedGetterFunc& lpgf,
136  const TruncateFunc& tcf) noexcept(false);
137 
141  void unregisterPersist(const char* obj_name) noexcept(false);
142 
146  inline const HLC getFrontier() {
147  if(_temporal_query_frontier_provider != nullptr) {
148 #ifndef NDEBUG
149  const HLC r = _temporal_query_frontier_provider->getFrontier();
150  dbg_default_warn("temporal_query_frontier=HLC({},{})", r.m_rtc_us, r.m_logic);
151  return r;
152 #else
153  return _temporal_query_frontier_provider->getFrontier();
154 #endif //NDEBUG
155  } else {
156  struct timespec t;
157  clock_gettime(CLOCK_REALTIME, &t);
158  return HLC((uint64_t)(t.tv_sec * 1e6 + t.tv_nsec / 1e3), (uint64_t)0);
159  }
160  }
161 
168  void updateTemporalFrontierProvider(ITemporalQueryFrontierProvider* tqfp);
169 
174 
178  PersistentRegistry(const PersistentRegistry&) = delete;
179 
183  const char* get_subgroup_prefix();
184 
192  static std::string generate_prefix(const std::type_index& subgroup_type, uint32_t subgroup_index, uint32_t shard_num) noexcept(true);
193 
201  static bool match_prefix(const std::string str, const std::type_index& subgroup_type, uint32_t subgroup_index, uint32_t shard_num) noexcept(true);
202 
203 protected:
207  const std::string _subgroup_prefix;
208 
213 
217  std::map<std::size_t, std::tuple<VersionFunc, PersistFunc, TrimFunc, LatestPersistedGetterFunc, TruncateFunc>> _registry;
218 
222  template <int funcIdx, typename... Args>
223  void callFunc(Args... args);
224 
228  template <int funcIdx, typename ReturnType, typename... Args>
229  ReturnType callFuncMin(Args... args);
230 
234  static thread_local int64_t earliest_version_to_serialize;
235 };
236 
237 // If the type T in persistent<T> is a big object and the operations are small
238 // updates, for example, an object store or a file system, then T would better
239 // implement the IDeltaSupport interface. This interface allows persistent<T>
240 // only store the delta in the log, avoiding huge duplicated data wasting
241 // storage space as well as I/O bandwidth.
242 //
243 // The idea is that T is responsible of keeping track of the updates in the form
244 // of a byte array - the DELTA, as long as the update should be persisted. Each
245 // time Persistent<T> trying to make a version, it collects the DELTA and write
246 // it to the log. On reloading data from persistent storage, the DELTAs in the
247 // log entries are applied in order. TODO: use checkpointing to accelerate it!
248 //
249 // There are three method included in this interface:
250 // - 'finalizeCurrentDelta' This method is called when Persistent<T> trying to
251 // make a version. Once done, the delta needs to be cleared.
252 // - 'applyDelta' This method is called on object construction from the disk
253 // - 'create' This static method is used to create an empty object from deserialization
254 // manager.
255 using DeltaFinalizer = std::function<void(char const* const, std::size_t)>;
256 
257 template <typename DeltaObjectType>
259 public:
260  static std::unique_ptr<DeltaObjectType> create(mutils::DeserializationManager* dm) {
261  return DeltaObjectType::create(dm);
262  }
263 };
264 
265 template <typename ObjectType>
266 class IDeltaSupport : public IDeltaObjectFactory<ObjectType> {
267 public:
268  virtual void finalizeCurrentDelta(const DeltaFinalizer&) = 0;
269  virtual void applyDelta(char const* const) = 0;
270 };
271 
272 
273 // _NameMaker is a tool makeing the name for the log corresponding to a
274 // given Persistent<ObjectType> object.
275 template <typename ObjectType, StorageType storageType>
276 class _NameMaker {
277 public:
278  // Constructor
279  _NameMaker() noexcept(false);
280 
281  // Destructor
282  virtual ~_NameMaker() noexcept(true);
283 
284  // guess a name
285  std::unique_ptr<std::string> make(const char* prefix) noexcept(false);
286 
287 private:
289  const char* m_sObjectTypeName;
290  pthread_spinlock_t m_oLck;
291 };
292 
293 
294 // Persistent represents a variable backed up by persistent storage. The
295 // backend is PersistLog class. PersistLog handles only raw bytes and this
296 // class is repsonsible for converting it back and forth between raw bytes
297 // and ObjectType. But, the serialization/deserialization functionality is
298 // actually defined by ObjectType and provided by Persistent users.
299 // - ObjectType: user-defined type of the variable it is required to support
300 // serialization and deserialization as follows:
301 // // serialize
302 // void * ObjectType::serialize(const ObjectType & obj, uint64_t *psize)
303 // - obj: obj is the reference to the object to be serialized
304 // - psize: psize is a uint64_t pointer to receive the size of the serialized
305 // data.
306 // - Return value is a pointer to a new malloced buffer with the serialized
307 // //TODO: this may not be efficient for large object...open to be changed.
308 // // deserialize
309 // ObjectType * ObjectType::deserialize(const void *pdata)
310 // - pdata: a buffer of the serialized data
311 // - Return value is a pointer to a new created ObjectType deserialized from
312 // 'pdata' buffer.
313 // - StorageType: storage type is defined in PersistLog. The value could be
314 // ST_FILE/ST_MEM/ST_3DXP ... I will start with ST_FILE and extend it to
315 // other persistent Storage.
316 // TODO:comments
317 //TODO: Persistent<T> has to be serializable, extending from mutils::ByteRepresentable
318 template <typename ObjectType,
319  StorageType storageType = ST_FILE>
321 protected:
322 
326  inline void initialize_log(const char* object_name) noexcept(false);
327 
330  inline void initialize_object_from_log(const std::function<std::unique_ptr<ObjectType>(void)>& object_factory,
332 
335  inline void register_callbacks() noexcept(false);
336 
339  inline void unregister_callbacks() noexcept(false);
340 
341 public:
349  Persistent(
350  const std::function<std::unique_ptr<ObjectType>(void)>& object_factory,
351  const char* object_name = nullptr,
352  PersistentRegistry* persistent_registry = nullptr,
353  mutils::DeserializationManager dm = {{}}) noexcept(false);
354 
359  Persistent(Persistent&& other) noexcept(false);
360 
369  Persistent(
370  const char* object_name,
371  std::unique_ptr<ObjectType>& wrapped_obj_ptr,
372  const char* log_tail = nullptr,
373  PersistentRegistry* persistent_registry = nullptr,
374  mutils::DeserializationManager dm = {{}}) noexcept(false);
375 
378  Persistent(const Persistent&) = delete;
379 
382  virtual ~Persistent() noexcept(true);
383 
388  ObjectType& operator*();
389 
393  ObjectType* operator->();
394 
399  const ObjectType& getConstRef() const;
400 
404  const std::string& getObjectName();
405 
411  template <typename Func>
412  auto get(
413  const Func& fun,
414  mutils::DeserializationManager* dm = nullptr) noexcept(false);
415 
419  std::unique_ptr<ObjectType> get(mutils::DeserializationManager* dm = nullptr);
420 
426  template <typename Func>
427  auto getByIndex(
428  int64_t idx,
429  const Func& fun,
430  mutils::DeserializationManager* dm = nullptr) noexcept(false);
431 
435  std::unique_ptr<ObjectType> getByIndex(
436  int64_t idx,
437  mutils::DeserializationManager* dm = nullptr) noexcept(false);
438 
445  template <typename Func>
446  auto get(
447  const int64_t& ver,
448  const Func& fun,
449  mutils::DeserializationManager* dm = nullptr) noexcept(false);
450 
455  std::unique_ptr<ObjectType> get(
456  const int64_t& ver,
457  mutils::DeserializationManager* dm = nullptr) noexcept(false);
458 
462  template <typename TKey>
463  void trim(const TKey& k) noexcept(false);
464 
469  void truncate(const int64_t& ver);
470 
477  template <typename Func>
478  auto get(
479  const HLC& hlc,
480  const Func& fun,
481  mutils::DeserializationManager* dm = nullptr) noexcept(false);
482 
486  std::unique_ptr<ObjectType> get(
487  const HLC& hlc,
488  mutils::DeserializationManager* dm = nullptr) noexcept(false);
489 
493  std::unique_ptr<ObjectType> operator[](const int64_t ver) noexcept(false) {
494  return this->get(ver);
495  }
496 
500  std::unique_ptr<ObjectType> operator[](const HLC& hlc) noexcept(false) {
501  return this->get(hlc);
502  }
503 
507  virtual int64_t getNumOfVersions() noexcept(false);
508 
512  virtual int64_t getEarliestIndex() noexcept(false);
513 
517  virtual int64_t getEarliestVersion() noexcept(false);
518 
522  virtual int64_t getLatestIndex() noexcept(false);
523 
527  virtual int64_t getLatestVersion() noexcept(false);
528 
532  virtual const int64_t getLastPersisted() noexcept(false);
533 
537  virtual void set(ObjectType& v, const version_t& ver, const HLC& mhlc) noexcept(false);
538 
542  virtual void set(ObjectType& v, const version_t& ver) noexcept(false);
543 
547  virtual void version(const version_t& ver) noexcept(false);
548 
553  virtual const int64_t persist() noexcept(false);
554 
555 public:
556  // wrapped objected
557  std::unique_ptr<ObjectType> m_pWrappedObject;
558 
559 protected:
560  // PersistLog
561  std::unique_ptr<PersistLog> m_pLog;
562  // Persistence Registry
564  // get the static name maker.
565  static _NameMaker<ObjectType,storageType>& getNameMaker(const std::string& prefix = std::string(""));
566 
567  //serialization supports
568 public:
570  // Serialization and Deserialization of Persistent<T>
571  // Serialization of the persistent<T> is packed in the following order
572  // 1) the log name
573  // 2) current state of the object
574  // 3) number of log entries
575  // 4) the log entries from the earliest to the latest
576  // TODO.
577  //Note: this rely on PersistentRegistry::earliest_version_to_serialize
578  std::size_t to_bytes(char* ret) const;
579  std::size_t bytes_size() const;
580  void post_object(const std::function<void(char const* const, std::size_t)>& f) const;
581  // NOTE: we do not set up the registry here. This will only happen in the
582  // construction of Replicated<T>
583  static std::unique_ptr<Persistent> from_bytes(mutils::DeserializationManager* dsm, char const* v);
584  // derived from ByteRepresentable
586  // apply the serialized log tail to existing log
587  // @dsm - deserialization manager
588  // @v - bytes representation of the log tail)
589  void applyLogTail(mutils::DeserializationManager* dsm, char const* v);
590 
591 #if defined(_PERFORMANCE_DEBUG) || !defined(NDEBUG)
592  uint64_t ns_in_persist = 0ul;
593  uint64_t ns_in_set = 0ul;
594  uint64_t cnt_in_persist = 0ul;
595  uint64_t cnt_in_set = 0ul;
596  virtual void print_performance_stat();
597 #endif //_PERFORMANCE_DEBUG
598 };
599 
600 template <typename ObjectType>
601 class Volatile : public Persistent<ObjectType, ST_MEM> {
602 public:
611  const std::function<std::unique_ptr<ObjectType>(void)>& object_factory,
612  const char* object_name = nullptr,
613  PersistentRegistry* persistent_registry = nullptr,
614  mutils::DeserializationManager dm = {{}}) noexcept(false)
615  : Persistent<ObjectType, ST_MEM>(object_factory, object_name, persistent_registry) {}
616 
621  Volatile(Volatile&& other) noexcept(false)
622  : Persistent<ObjectType, ST_MEM>(other) {}
623 
635  const std::function<std::unique_ptr<ObjectType>(void)>& object_factory,
636  const char* object_name,
637  std::unique_ptr<ObjectType>& wrapped_obj_ptr,
638  std::unique_ptr<PersistLog>& log_ptr,
639  PersistentRegistry* persistent_registry = nullptr,
640  mutils::DeserializationManager dm = {{}}) noexcept(false)
641  : Persistent<ObjectType, ST_MEM>(object_factory, object_name, wrapped_obj_ptr, log_ptr, persistent_registry) {}
642 
645  Volatile(const Volatile&) = delete;
646 
647  // destructor:
648  virtual ~Volatile() noexcept(true){
649  // do nothing
650  };
651 };
652 
653 /* Utilities for manage a single "ByteRepresentable" persistent object. */
663 template <typename ObjectType, StorageType storageType = ST_FILE>
664 void saveObject(ObjectType& obj, const char* object_name = nullptr) noexcept(false);
665 
671 template <typename ObjectType, StorageType storageType = ST_FILE>
672 std::unique_ptr<ObjectType> loadObject(const char* object_name = nullptr) noexcept(false);
673 
680 template <StorageType storageType = ST_FILE>
681 const typename std::enable_if<(storageType == ST_FILE || storageType == ST_MEM), version_t>::type getMinimumLatestPersistedVersion(const std::type_index& subgroup_type, uint32_t subgroup_index, uint32_t shard_num);
682 
684 } // namespace persistent
685 
687 
688 #endif //PERSISTENT_HPP
std::enable_if_t< std::is_base_of< ByteRepresentable CMA T >::value, std::unique_ptr< T > > from_bytes(DeserializationManager *ctx, char const *v)
Calls T::from_bytes(ctx,v) when T is a ByteRepresentable.
const char * m_sObjectTypeName
Definition: Persistent.hpp:289
virtual void ensure_registered(mutils::DeserializationManager &)
Definition: Persistent.hpp:585
This file include all common types internal to derecho and not necessarily being known by a client pr...
const std::string _subgroup_prefix
this appears in the first part of storage file for persistent<T>
Definition: Persistent.hpp:207
std::pair< int_type, int_type > unpack_version(const version_t packed_int)
Helper function for unpacking a Persistent version number into two signed or unsigned int32 values...
static thread_local int64_t earliest_version_to_serialize
Set the earliest version to serialize for recovery.
Definition: Persistent.hpp:234
This file defines some type aliases used by the Persistence library.
std::function< void(const version_t &, const HLC &)> VersionFunc
const std::enable_if<(storageType==ST_FILE||storageType==ST_MEM), version_t >::type getMinimumLatestPersistedVersion(const std::type_index &subgroup_type, uint32_t subgroup_index, uint32_t shard_num)
get the minmum latest persisted version for a Replicated<T> identified by
static std::unique_ptr< DeltaObjectType > create(mutils::DeserializationManager *dm)
Definition: Persistent.hpp:260
std::unique_ptr< ObjectType > m_pWrappedObject
Definition: Persistent.hpp:557
virtual ~Volatile() noexcept(true)
Definition: Persistent.hpp:648
pthread_spinlock_t m_oLck
Definition: Persistent.hpp:290
std::function< void(char const *const, std::size_t)> DeltaFinalizer
Definition: Persistent.hpp:255
std::function< const version_t(void)> PersistFunc
A non-POD type which wishes to mark itself byte representable should extend this class.
auto bytes_size(const T &)
Just calls sizeof(T)
Volatile(const std::function< std::unique_ptr< ObjectType >(void)> &object_factory, const char *object_name=nullptr, PersistentRegistry *persistent_registry=nullptr, mutils::DeserializationManager dm={{}}) noexcept(false)
constructor 1 is for building a persistent<T> locally, load/create a log and register itself to a per...
Definition: Persistent.hpp:610
The manager for any RemoteDeserializationContexts.
Volatile(Volatile &&other) noexcept(false)
constructor 2 is move constructor.
Definition: Persistent.hpp:621
std::map< std::size_t, std::tuple< VersionFunc, PersistFunc, TrimFunc, LatestPersistedGetterFunc, TruncateFunc > > _registry
Callback registry.
Definition: Persistent.hpp:217
Volatile(const std::function< std::unique_ptr< ObjectType >(void)> &object_factory, const char *object_name, std::unique_ptr< ObjectType > &wrapped_obj_ptr, std::unique_ptr< PersistLog > &log_ptr, PersistentRegistry *persistent_registry=nullptr, mutils::DeserializationManager dm={{}}) noexcept(false)
constructor 3 is for deserialization.
Definition: Persistent.hpp:634
std::unique_ptr< PersistLog > m_pLog
Definition: Persistent.hpp:561
std::unique_ptr< ObjectType > operator[](const HLC &hlc) noexcept(false)
syntax sugar: get a specified version of T without DSM
Definition: Persistent.hpp:500
std::function< void(const int64_t &)> TruncateFunc
std::function< void(const version_t &)> TrimFunc
Definition: HLC.hpp:7
const HLC getFrontier()
get temporal query frontier
Definition: Persistent.hpp:146
std::unique_ptr< ObjectType > loadObject(const char *object_name) noexcept(false)
loadObject() loads a serializable object from a persistent store
PersistentRegistry * m_pRegistry
Definition: Persistent.hpp:563
std::function< const version_t(void)> LatestPersistedGetterFunc
void saveObject(ObjectType &obj, const char *object_name) noexcept(false)
saveObject() saves a serializable object
version_t combine_int32s(const int_type high_bits, const int_type low_bits)
Helper function for creating Persistent version numbers out of MulticastGroup sequence numbers and Vi...
std::enable_if_t< std::is_pod< BR >::value > post_object(const F &f, const BR &br, Args &&... args)
In-place serialization is also sometimes possible.
std::unique_ptr< ObjectType > operator[](const int64_t ver) noexcept(false)
syntax sugar: get a specified version of T without DSM
Definition: Persistent.hpp:493
std::size_t to_bytes(const ByteRepresentable &b, char *v)
calls b.to_bytes(v) when b is a ByteRepresentable; calls std::memcpy() when b is POD.
If a class which implements ByteRepresentable requires a context in order to correctly deserialize...
uint64_t m_logic
Definition: HLC.hpp:13
uint64_t m_rtc_us
Definition: HLC.hpp:12
PersistentRegistry is a book for all the Persistent<T> or Volatile<T> variables.
Definition: Persistent.hpp:81
ITemporalQueryFrontierProvider * _temporal_query_frontier_provider
Pointer to an entity providing TemporalQueryFrontier service.
Definition: Persistent.hpp:212
#define dbg_default_warn(...)
Definition: logger.hpp:46