Derecho  0.9
Distributed systems toolkit for RDMA
ObjectStore.cpp
Go to the documentation of this file.
3 #include <algorithm>
4 #include <errno.h>
5 #include <iostream>
6 #include <map>
7 #include <optional>
8 
9 namespace objectstore {
10 
11 /*
12  The ObjectStore is composed of two kinds of Derecho nodes: the replicas and
13  clients. The replicas are responsible for maintaining object data, while
14  the clients provide an API layer to the application. The replicas are in
15  subgroup 0 managed by class ObjectStore. The clients are only in top-level
16  group and access the Replica's subgroup with ExternCaller.
17 
18  A short summary of the classes:
19 
20  - DeltaObjectStoreCore
21  Delta feature is enabled.
22 
23  - VolatileUnloggedObjectStore (Type for a derecho subgroup)
24  The implementation of an object store with out persistence and log.
25 
26  - PersistentUnloggedObjectStore (Type for a derecho subgroup)
27  The implementation of an object store with persistence. Operations are
28  unlogged.
29 
30  - PersistentLoggedObjectStore (Type for a derecho subgroup)
31  The implementation of an object store with both persistence and log. We
32  do not explicitly support a "VolatileLoggedObjectStore" Type right now.
33 
34  - IObjectStoreAPI
35  The interface for p2p_send between clients and replicas.
36 
37  - IReplica
38  The interface for operations provided by the replica subgroup.
39 
40  - IObjectStoreService
41  The core APIs expose to object store users.
42 
43  - ObjectStoreService
44  IObjectStoreService implementation.
45 
46  */
47 
48 /*
49  Object store configurations.
50  */
51 #define CONF_OBJECTSTORE_MIN_REPLICATION_FACTOR "OBJECTSTORE/min_replication_factor"
52 #define CONF_OBJECTSTORE_REPLICAS "OBJECTSTORE/replicas"
53 #define CONF_OBJECTSTORE_PERSISTED "OBJECTSTORE/persisted"
54 #define CONF_OBJECTSTORE_LOGGED "OBJECTSTORE/logged"
55 
57 public:
58  // insert or update a new object
59  // @PARAM object - reference to the object to be inserted or updated.
60  // object.ver is not used and will be assigned a value after
61  // 'put' operation finish.
62  // @RETURN
63  // return the version of the new object
64  virtual std::tuple<version_t,uint64_t> put(const Object& object) = 0;
65  // remove an object
66  // @PARAM oid
67  // the object id
68  // @RETURN
69  // return the version of the remove operation.
70  virtual std::tuple<version_t,uint64_t> remove(const OID& oid) = 0;
71  // get an object
72  // @PARAM oid
73  // the object id
74  // @PARAM ver
75  // the version of the object requested. An ordered get will be performed
76  // if the version is INVALID_VERSION.
77  // @RETURN
78  // return the object. If an invalid object is returned, one of the
79  // following happens:
80  // - oid is not found
81  // - ver is not found
82  // - ver is not INVALID_VERSION, but versions are not logged.
83  virtual const Object get(const OID& oid, const version_t& ver) = 0;
84  // @PARAM oid
85  // the object id
86  // @PARAM ts_us
87  // timestamp in microsecond
88  // @RETURN
89  // return the object. If an invalid object is returned, oid is not
90  // found or is not found at time ts_us.
91  virtual const Object get_by_time(const OID& oid, const uint64_t& ts_us) = 0;
92 };
93 
94 class IReplica {
95 public:
96  // Perform an ordered 'put' in the subgroup
97  //
98  // @PARAM oid
99  // @RETURN
100  // return the version of the new object
101  virtual std::tuple<version_t,uint64_t> orderedPut(const Object& object) = 0;
102  // Perform an ordered 'remove' in the subgroup
103  // @PARAM oid
104  // the object id
105  // @RETURN
106  // return the version of the remove operation
107  virtual std::tuple<version_t,uint64_t> orderedRemove(const OID& oid) = 0;
108  // Perform an ordered 'get' in the subgroup
109  // @PARAM oid
110  // the object id
111  // @RETURN
112  // return the object. If an invalid object is returned, oid is not
113  // found.
114  virtual const Object orderedGet(const OID& oid) = 0;
115 };
116 
120  public IObjectStoreAPI {
121 public:
123  std::map<OID, Object> objects;
126 
128  orderedPut,
129  orderedRemove,
130  orderedGet,
131  put,
132  remove,
133  get,
134  get_by_time);
135 
136  inline std::tuple<version_t,uint64_t> get_version() {
137  derecho::Replicated<VolatileUnloggedObjectStore>& subgroup_handle = group->template get_subgroup<VolatileUnloggedObjectStore>();
138  return subgroup_handle.get_next_version();
139  }
140 
141  // @override IObjectStoreAPI::put
142  virtual std::tuple<version_t,uint64_t> put(const Object& object) {
143  derecho::Replicated<VolatileUnloggedObjectStore>& subgroup_handle = group->template get_subgroup<VolatileUnloggedObjectStore>();
144  auto results = subgroup_handle.ordered_send<RPC_NAME(orderedPut)>(object);
145  decltype(results)::ReplyMap& replies = results.get();
146  std::tuple<version_t,uint64_t> vRet(INVALID_VERSION,0);
147  // TODO: should we verify consistency of the versions?
148  for(auto& reply_pair : replies) {
149  vRet = reply_pair.second.get();
150  }
151  return vRet;
152  }
153  // @override IObjectStoreAPI::remove
154  virtual std::tuple<version_t,uint64_t> remove(const OID& oid) {
155  auto& subgroup_handle = group->template get_subgroup<VolatileUnloggedObjectStore>();
156  derecho::rpc::QueryResults<std::tuple<version_t,uint64_t>> results = subgroup_handle.template ordered_send<RPC_NAME(orderedRemove)>(oid);
157  decltype(results)::ReplyMap& replies = results.get();
158  std::tuple<version_t,uint64_t> vRet(INVALID_VERSION,0);
159  // TODO: should we verify consistency of the versions?
160  for(auto& reply_pair : replies) {
161  vRet = reply_pair.second.get();
162  }
163  return vRet;
164  }
165  // @override IObjectStoreAPI::get
166  virtual const Object get(const OID& oid, const version_t& ver) {
167  // check version
168  if(ver != INVALID_VERSION) {
169  dbg_default_info("{}:{} does not support versioned query ( oid = {}; ver = 0x{:x} ). Return with an invalid object.",
170  typeid(*this).name(), __func__, oid, ver);
171  return inv_obj;
172  }
173 
174  auto& subgroup_handle = group->template get_subgroup<VolatileUnloggedObjectStore>();
175  derecho::rpc::QueryResults<const Object> results = subgroup_handle.template ordered_send<RPC_NAME(orderedGet)>(oid);
176  decltype(results)::ReplyMap& replies = results.get();
177  // here we only check the first reply.
178  // Should we verify the consistency of all replies?
179  return replies.begin()->second.get();
180  }
181  //@overrid IObjectStoreAPI::get_by_time
182  virtual const Object get_by_time(const OID& oid, const uint64_t& ts_us) {
183  dbg_default_info("{}:{} does not support temporal query (oid = {}; timestamp = {} us). Return with an invalid object.",
184  typeid(*this).name(), __func__, oid, ts_us);
185  return inv_obj;
186  }
187 
188  // This is for REGISTER_RPC_FUNCTIONS
189  // @override IReplica::orderedPut
190  virtual std::tuple<version_t,uint64_t> orderedPut(const Object& object) {
191  std::tuple<version_t,uint64_t> version = get_version();
192  dbg_default_info("orderedPut object:{},version:0x{:x},timestamp:{}", object.oid, std::get<0>(version), std::get<1>(version));
193  this->objects.erase(object.oid);
194  object.ver = version;
195  this->objects.emplace(object.oid, object); // copy constructor
196  // call object watcher
197  if(object_watcher) {
198  object_watcher(object.oid, object);
199  }
200  return object.ver;
201  }
202  // @override IReplica::orderedRemove
203  virtual std::tuple<version_t,uint64_t> orderedRemove(const OID& oid) {
204  auto version = get_version();
205  dbg_default_info("orderedRemove object:{},version:0x{:x},timestamp:{}", oid, std::get<0>(version), std::get<1>(version));
206  if(this->objects.erase(oid)) {
207  object_watcher(oid, inv_obj);
208  }
209  return version;
210  }
211  // @override IReplica::orderedGet
212  virtual const Object orderedGet(const OID& oid) {
213  auto version = get_version();
214  dbg_default_info("orderedGet object:{},version:0x{:x},timestamp:{}", oid, std::get<0>(version), std::get<1>(version));
215  if(objects.find(oid) != objects.end()) {
216  return objects.at(oid);
217  } else {
218  return this->inv_obj;
219  }
220  }
221 
222  DEFAULT_SERIALIZE(objects);
223 
224  static std::unique_ptr<VolatileUnloggedObjectStore> from_bytes(mutils::DeserializationManager* dsm, char const* buf) {
225 // OPTION ONE to test
226 // return std::make_unique<VolatileUnloggedObjectStore>(
227 // std::move(*mutils::from_bytes<decltype(objects)>(dsm, buf)),
228 // dsm->mgr<IObjectStoreService>().getObjectWatcher());
229  auto ptr_to_objects = mutils::from_bytes<decltype(objects)>(dsm, buf);
230  auto ptr_to_return = std::make_unique<VolatileUnloggedObjectStore>(std::move(*ptr_to_objects),
231  dsm->mgr<IObjectStoreService>().getObjectWatcher());
232  ptr_to_objects.release(); // to avoid double free.
233  return ptr_to_return;
234  }
235 
237 
239 
240  // constructors
241  VolatileUnloggedObjectStore(const ObjectWatcher& ow) : object_watcher(ow) {}
242  VolatileUnloggedObjectStore(const std::map<OID, Object>& _objects, const ObjectWatcher& ow) : objects(_objects), object_watcher(ow) {}
243  VolatileUnloggedObjectStore(std::map<OID, Object>&& _objects, const ObjectWatcher& ow) : objects(std::move(_objects)), object_watcher(ow) {}
244 };
245 
246 // Enable the Delta feature
248  public persistent::IDeltaSupport<DeltaObjectStoreCore> {
249 #define DEFAULT_DELTA_BUFFER_CAPACITY (4096)
250  enum _OPID {
252  REMOVE
253  };
254  // _dosc_delta is a name used only for struct constructor.
255  struct {
256  size_t capacity;
257  size_t len;
258  char* buffer;
259  inline void setOpid(_OPID opid) {
260  assert(buffer != nullptr);
261  assert(capacity >= sizeof(uint32_t));
262  *(_OPID*)buffer = opid;
263  }
264  inline void setDataLen(const size_t& dlen) {
265  assert(capacity >= (dlen + sizeof(uint32_t)));
266  this->len = dlen + sizeof(uint32_t);
267  }
268  inline char* dataPtr() {
269  assert(buffer != nullptr);
270  assert(capacity > sizeof(uint32_t));
271  return buffer + sizeof(uint32_t);
272  }
273  inline void calibrate(const size_t& dlen) {
274  size_t new_cap = dlen + sizeof(uint32_t);
275  if(this->capacity >= new_cap) {
276  return;
277  }
278  // calculate new capacity
279  int width = sizeof(size_t) << 3;
280  int right_shift_bits = 1;
281  new_cap--;
282  while(right_shift_bits < width) {
283  new_cap |= new_cap >> right_shift_bits;
284  right_shift_bits = right_shift_bits << 1;
285  }
286  new_cap++;
287  // resize
288  this->buffer = (char*)realloc(buffer, new_cap);
289  if(this->buffer == nullptr) {
290  dbg_default_crit("{}:{} Failed to allocate delta buffer. errno={}", __FILE__, __LINE__, errno);
291  throw derecho::derecho_exception("Failed to allocate delta buffer.");
292  } else {
293  this->capacity = new_cap;
294  }
295  }
296  inline bool isEmpty() {
297  return (this->len == 0);
298  }
299  inline void clean() {
300  this->len = 0;
301  }
302  inline void destroy() {
303  if(this->capacity > 0) {
304  free(this->buffer);
305  }
306  }
307  } delta;
308 
310  delta.buffer = (char*)malloc(DEFAULT_DELTA_BUFFER_CAPACITY);
311  if(delta.buffer == nullptr) {
312  dbg_default_crit("{}:{} Failed to allocate delta buffer. errno={}", __FILE__, __LINE__, errno);
313  throw derecho::derecho_exception("Failed to allocate delta buffer.");
314  }
315  delta.capacity = DEFAULT_DELTA_BUFFER_CAPACITY;
316  delta.len = 0;
317  }
318 
319 public:
320  std::map<OID, Object> objects;
324  // Object Store Delta is represented by an operation id and a list of
325  // argument. The operation id (OPID) is a 4 bytes integer.
326  // 1) put(const Object& object):
327  // [OPID:PUT] [object]
328  // 2) remove(const OID& oid)
329  // [OPID:REMOVE][oid]
330  // 3) get(const OID& oid)
331  // no need to prepare a delta
333  // @override IDeltaSupport::finalizeCurrentDelta()
334  virtual void finalizeCurrentDelta(const DeltaFinalizer& df) {
335  df(this->delta.buffer, this->delta.len);
336  this->delta.clean();
337  }
338  // @override IDeltaSupport::applyDelta()
339  virtual void applyDelta(char const* const delta) {
340  const char* data = (delta + sizeof(const uint32_t));
341  switch(*(const uint32_t*)delta) {
342  case PUT:
343  applyOrderedPut(*mutils::from_bytes<Object>(nullptr, data));
344  break;
345  case REMOVE:
346  applyOrderedRemove(*(const OID*)data);
347  break;
348  default:
349  std::cerr << __FILE__ << ":" << __LINE__ << ":" << __func__ << " " << std::endl;
350  };
351  }
352 
353  // @override IDeltaSupport::create()
354  static std::unique_ptr<DeltaObjectStoreCore> create(mutils::DeserializationManager* dm) {
355  if(dm != nullptr) {
356  try {
357  return std::make_unique<DeltaObjectStoreCore>(dm->mgr<IObjectStoreService>().getObjectWatcher());
358  } catch(...) {
359  }
360  }
361  return std::make_unique<DeltaObjectStoreCore>((ObjectWatcher){});
362  }
363 
364  inline void applyOrderedPut(const Object& object) {
365  // put
366  this->objects.erase(object.oid);
367  this->objects.emplace(object.oid, object);
368  // call object watcher
369  if(object_watcher) {
370  object_watcher(object.oid, object);
371  }
372  }
373  inline bool applyOrderedRemove(const OID& oid) {
374  bool bRet = false;
375  // remove
376  if(this->objects.erase(oid)) {
377  // call object watcher
378  if(object_watcher) {
379  object_watcher(oid, inv_obj);
380  }
381  bRet = true;
382  }
383  return bRet;
384  }
385 
386  // Can we get the serialized operation representation from Derecho?
387  virtual bool orderedPut(const Object& object) {
388  // create delta.
389  assert(this->delta.isEmpty());
390  this->delta.calibrate(object.bytes_size());
391  object.to_bytes(this->delta.dataPtr());
392  this->delta.setDataLen(object.bytes_size());
393  this->delta.setOpid(PUT);
394  // apply orderedPut
395  applyOrderedPut(object);
396  return true;
397  }
398  // Can we get the serialized operation representation from Derecho?
399  virtual bool orderedRemove(const OID& oid) {
400  // create delta
401  assert(this->delta.isEmpty());
402  this->delta.calibrate(sizeof(OID));
403  *(OID*)this->delta.dataPtr() = oid;
404  this->delta.setDataLen(sizeof(OID));
405  this->delta.setOpid(REMOVE);
406  // remove
407  return applyOrderedRemove(oid);
408  }
409 
410  virtual const Object orderedGet(const OID& oid) {
411  if(objects.find(oid) != objects.end()) {
412  return objects.at(oid);
413  } else {
414  return this->inv_obj;
415  }
416  }
417 
418  // Not going to register them as RPC functions because DeltaObjectStoreCore
419  // works with PersistedObjectStore instead of the type for Replicated<T>.
420  // REGISTER_RPC_FUNCTIONS(ObjectStore, put, remove, get);
421 
422  // DEFAULT_SERIALIZATION_SUPPORT(DeltaObjectStoreCore, objects);
423 
424  DEFAULT_SERIALIZE(objects);
425 
426  static std::unique_ptr<DeltaObjectStoreCore> from_bytes(mutils::DeserializationManager* dsm, char const* buf) {
427  if(dsm != nullptr) {
428  try {
429  return std::make_unique<DeltaObjectStoreCore>(
430  std::move(*mutils::from_bytes<decltype(objects)>(dsm, buf).get()),
431  dsm->mgr<IObjectStoreService>().getObjectWatcher());
432  } catch(...) {
433  }
434  }
435  return std::make_unique<DeltaObjectStoreCore>(
436  std::move(*mutils::from_bytes<decltype(objects)>(dsm, buf).get()),
437  (ObjectWatcher){});
438  }
439 
441 
443 
444  // constructor
445  DeltaObjectStoreCore(const ObjectWatcher& ow) : object_watcher(ow) {
446  initialize_delta();
447  }
448  DeltaObjectStoreCore(const std::map<OID, Object>& _objects, const ObjectWatcher& ow) : objects(_objects), object_watcher(ow) {
449  initialize_delta();
450  }
451  DeltaObjectStoreCore(std::map<OID, Object>&& _objects, const ObjectWatcher& ow) : objects(_objects), object_watcher(ow) {
452  initialize_delta();
453  }
455  if(delta.buffer != nullptr) {
456  free(delta.buffer);
457  }
458  }
459 };
460 
464  public IObjectStoreAPI,
465  public IReplica {
466 private:
468 
469 public:
472 
474  orderedPut,
475  orderedRemove,
476  orderedGet,
477  put,
478  remove,
479  get,
480  get_by_time);
481 
482  // @override IReplica::orderedPut
483  virtual std::tuple<version_t,uint64_t> orderedPut(const Object& object) {
484  auto& subgroup_handle = group->template get_subgroup<PersistentLoggedObjectStore>();
485  object.ver = subgroup_handle.get_next_version();
486  dbg_default_info("orderedPut object:{},version:0x{:x},timestamp:{}", object.oid, std::get<0>(object.ver), std::get<1>(object.ver));
487  this->persistent_objectstore->orderedPut(object);
488  return object.ver;
489  }
490  // @override IReplica::orderedRemove
491  virtual std::tuple<version_t,uint64_t> orderedRemove(const OID& oid) {
492  auto& subgroup_handle = group->template get_subgroup<PersistentLoggedObjectStore>();
493  std::tuple<version_t,uint64_t> vRet = subgroup_handle.get_next_version();
494  dbg_default_info("orderedRemove object:{},version:0x{:x},timestamp:{}", oid, std::get<0>(vRet), std::get<1>(vRet));
495  this->persistent_objectstore->orderedRemove(oid);
496  return vRet;
497  }
498  // @override IReplica::orderedGet
499  virtual const Object orderedGet(const OID& oid) {
500 #ifndef NDEBUG
501  auto& subgroup_handle = group->template get_subgroup<PersistentLoggedObjectStore>();
502  auto version = subgroup_handle.get_next_version();
503  dbg_default_info("orderedGet object:{},version:0x{:x},timestamp:{}", oid, std::get<0>(version), std::get<1>(version));
504 #endif
505  return this->persistent_objectstore->orderedGet(oid);
506  }
507  // @override IObjectStoreAPI::put
508  virtual std::tuple<version_t,uint64_t> put(const Object& object) {
509  auto& subgroup_handle = group->template get_subgroup<PersistentLoggedObjectStore>();
510  auto results = subgroup_handle.ordered_send<RPC_NAME(orderedPut)>(object);
511  decltype(results)::ReplyMap& replies = results.get();
512  std::tuple<version_t,uint64_t> vRet(INVALID_VERSION,0);
513  for(auto& reply_pair : replies) {
514  vRet = reply_pair.second.get();
515  }
516  return vRet;
517  }
518  // @override IObjectStoreAPI::remove
519  virtual std::tuple<version_t,uint64_t> remove(const OID& oid) {
520  auto& subgroup_handle = group->template get_subgroup<PersistentLoggedObjectStore>();
521  derecho::rpc::QueryResults<std::tuple<version_t,uint64_t>> results = subgroup_handle.template ordered_send<RPC_NAME(orderedRemove)>(oid);
522  decltype(results)::ReplyMap& replies = results.get();
523  std::tuple<version_t,uint64_t> vRet(INVALID_VERSION,0);
524  for(auto& reply_pair : replies) {
525  vRet = reply_pair.second.get();
526  }
527  return vRet;
528  }
529  // @override IObjectStoreAPI::get
530  virtual const Object get(const OID& oid, const version_t& ver) {
531  if(ver == INVALID_VERSION) {
532  auto& subgroup_handle = group->template get_subgroup<PersistentLoggedObjectStore>();
533  derecho::rpc::QueryResults<const Object> results = subgroup_handle.template ordered_send<RPC_NAME(orderedGet)>(oid);
534 
535  decltype(results)::ReplyMap& replies = results.get();
536  // Here we only wait for the first reply.
537  // Should we verify the consistency of replies?
538  return replies.begin()->second.get();
539  } else { // do a versioned get
540  // 1 - check if the version is valid
541  version_t lv = persistent_objectstore.getLatestVersion();
542  if(ver > lv || ver < 0) {
543  dbg_default_info("{}::{} failed with invalid version ( oid={}, ver = 0x{:x}, latest_version=0x{:x}), returning an invalid object.",
544  typeid(*this).name(), __func__, oid, ver, lv);
545  return inv_obj;
546  }
547  // 2 - return the object
548  // TODO:
549  // - avoid copy here!!!
550  // - delta implementation needs to be improved!!! The existing
551  // implementation of Persistent<T> will Rebuild the whole object
552  // store on each versioned call. Fix this in persistent/
553  // Persistent.hpp
554  try {
555  return persistent_objectstore[ver]->objects.at(oid);
556  } catch(std::out_of_range& ex) {
557  return inv_obj;
558  }
559  }
560  }
561  // @override IObjectStoreAPI::get_by_time
562  virtual const Object get_by_time(const OID& oid, const uint64_t& ts_us) {
563  dbg_default_debug("get_by_time, oid={}, ts={}.", oid, ts_us);
564  const HLC hlc(ts_us,0ull); // generate a hybrid logical clock: TODO: do we have to use HLC????
565  try{
566  return persistent_objectstore.get(hlc)->objects.at(oid);
567  } catch (const int64_t &ex) {
568  dbg_default_warn("temporal query throws exception:0x{:x}. oid={}, ts={}", ex, oid, ts_us);
569  } catch (...) {
570  dbg_default_warn("temporal query throws unknown exception. oid={}, ts={}", oid, ts_us);
571  }
572  return inv_obj;
573  }
574 
575  // DEFAULT_SERIALIZATION_SUPPORT(PersistentLoggedObjectStore,persistent_objectstore);
576 
577  DEFAULT_SERIALIZE(persistent_objectstore);
578 
579  static std::unique_ptr<PersistentLoggedObjectStore> from_bytes(mutils::DeserializationManager* dsm, char const*
580  buf) {
581 // OPTION ONE to be tested
582 // return std::make_unique<PersistentLoggedObjectStore>(
583 // std::move(*mutils::from_bytes<decltype(persistent_objectstore)>(dsm, buf)));
584 // OPTION TWO
585  auto ptr_to_persistent_objectstore = mutils::from_bytes<decltype(persistent_objectstore)>(dsm, buf);
586  auto ptr_to_return = std::make_unique<PersistentLoggedObjectStore>(std::move(*ptr_to_persistent_objectstore));
587  ptr_to_persistent_objectstore.release(); // to avoid double free.
588  return ptr_to_return;
589  }
590 
592 
594 
595  // constructors TODO: how to pass ObjectWatcher to Persistent? ==>
597  [&]() {
598  return std::make_unique<DeltaObjectStoreCore>(oss.getObjectWatcher());
599  },
600  nullptr,
601  pr,
603  // Persistent<T> does not allow copy constructor.
604  // PersistentLoggedObjectStore(Persistent<DeltaObjectStoreCore>& _persistent_objectstore) :
605  // persistent_objectstore(_persistent_objectstore) {}
606  PersistentLoggedObjectStore(Persistent<DeltaObjectStoreCore>&& _persistent_objectstore) : persistent_objectstore(std::move(_persistent_objectstore)) {}
607 };
608 
609 // ==============================================================
610 
611 // helper functions
612 // get replica list
613 // @PARAM replica_str
614 // a list of replicas in string representation like: 1,2,5-7,100
615 // @RETURN
616 // a vector of replicas
617 static std::vector<node_id_t> parseReplicaList(
618  const std::string& replica_str) {
619  std::string::size_type s = 0, e;
620  std::vector<node_id_t> replicas;
621  while(s < replica_str.size()) {
622  e = replica_str.find(',', s);
623  if(e == std::string::npos) {
624  e = replica_str.size();
625  }
626  if(e > s) {
627  std::string range = replica_str.substr(s, e - s);
628  std::string::size_type hyphen_pos = range.find('-');
629  if(hyphen_pos != std::string::npos) {
630  // range is "a-b"
631  node_id_t rsid = std::stol(range.substr(0, hyphen_pos));
632  node_id_t reid = std::stol(range.substr(hyphen_pos + 1));
633  while(rsid <= reid) {
634  replicas.push_back(rsid);
635  rsid++;
636  }
637  } else {
638  replicas.push_back((node_id_t)std::stol(range));
639  }
640  }
641  s = e + 1;
642  }
643  return replicas;
644 }
645 
647 private:
648  enum OSSMode {
652  PERSISTENT_LOGGED
653  };
656  std::vector<node_id_t> replicas;
657  const bool bReplica;
660  // TODO: WHY do I need "write_mutex"? I should be able to update the data
661  // concurrently from multiple threads. Right?
662  std::mutex write_mutex;
663 
664 public:
665  // constructor
667  derecho::getConfBoolean(CONF_OBJECTSTORE_PERSISTED) ? (derecho::getConfBoolean(CONF_OBJECTSTORE_LOGGED) ? PERSISTENT_LOGGED : PERSISTENT_UNLOGGED) : (derecho::getConfBoolean(CONF_OBJECTSTORE_LOGGED) ? VOLATILE_LOGGED : VOLATILE_UNLOGGED)),
668  object_watcher(ow),
669  replicas(parseReplicaList(derecho::getConfString(CONF_OBJECTSTORE_REPLICAS))),
670  bReplica(std::find(replicas.begin(), replicas.end(),
672  != replicas.end()),
674  group(
675  {}, // callback set
676  // derecho::SubgroupInfo
677  {
678  [this](const std::vector<std::type_index>& subgroup_type_order,
679  const std::unique_ptr<derecho::View>& prev_view,
680  derecho::View& curr_view) {
681  derecho::subgroup_allocation_map_t subgroup_allocation;
682  for(const auto& subgroup_type : subgroup_type_order) {
683  if(subgroup_type == std::type_index(typeid(VolatileUnloggedObjectStore)) || subgroup_type == std::type_index(typeid(PersistentLoggedObjectStore))) {
684  std::vector<node_id_t> active_replicas;
685  for(uint32_t i = 0; i < curr_view.members.size(); i++) {
686  const node_id_t id = curr_view.members[i];
687  if(!curr_view.failed[i] && std::find(replicas.begin(), replicas.end(), id) != replicas.end()) {
688  active_replicas.push_back(id);
689  }
690  }
693  }
694 
695  derecho::subgroup_shard_layout_t subgroup_vector(1);
696  subgroup_vector[0].emplace_back(curr_view.make_subview(active_replicas));
697  curr_view.next_unassigned_rank += active_replicas.size();
698  subgroup_allocation.emplace(subgroup_type, std::move(subgroup_vector));
699  } else {
700  subgroup_allocation.emplace(subgroup_type, derecho::subgroup_shard_layout_t{});
701  }
702  }
703  return subgroup_allocation;
704  }},
705  this,
706  std::vector<derecho::view_upcall_t>{}, // view up-calls
707  // factories ...
708  [this](persistent::PersistentRegistry*) { return std::make_unique<VolatileUnloggedObjectStore>(object_watcher); },
709  [this](persistent::PersistentRegistry* pr) { return std::make_unique<PersistentLoggedObjectStore>(pr, *this); }) {
710  // Unimplemented yet:
711  if(mode == PERSISTENT_UNLOGGED || mode == VOLATILE_LOGGED) {
712  // log it
713  dbg_default_error("ObjectStoreService mode {} is not supported yet.", mode);
714  throw derecho::derecho_exception("Unimplmented ObjectStoreService mode: persistent_unlogged/volatile_logged.");
715  }
716  }
717 
718  virtual const bool isReplica() {
719  return bReplica;
720  }
721 
722  template <typename T>
724  std::lock_guard<std::mutex> guard(write_mutex);
725  if(bReplica && !force_client) {
726  // replica server can do ordered send
727  derecho::Replicated<T>& os_rpc_handle = group.template get_subgroup<T>();
728  return std::move(os_rpc_handle.template ordered_send<RPC_NAME(orderedPut)>(object));
729  } else {
730  // send request to a static mapped replica. Use random mapping for load-balance?
731  node_id_t target = replicas[myid % replicas.size()];
732  derecho::ExternalCaller<T>& os_p2p_handle = group.get_nonmember_subgroup<T>();
733  return std::move(os_p2p_handle.template p2p_send<RPC_NAME(put)>(target, object));
734  }
735  }
736 
737  template <typename T>
738  std::tuple<version_t,uint64_t> _bio_put(const Object& object, const bool& force_client) {
739  derecho::rpc::QueryResults<std::tuple<version_t,uint64_t>> results = this->template _aio_put<T>(object, force_client);
740  decltype(results)::ReplyMap& replies = results.get();
741 
742  std::tuple<version_t,uint64_t> vRet(INVALID_VERSION,0);
743  for(auto& reply_pair : replies) {
744  vRet = reply_pair.second.get();
745  }
746  return vRet;
747  }
748 
749  // blocking put
750  virtual std::tuple<version_t,uint64_t> bio_put(const Object& object, const bool& force_client) {
751  dbg_default_debug("bio_put object id={}, mode={}, force_client={}", object.oid, mode, force_client);
752  std::tuple<version_t,uint64_t> vRet(INVALID_VERSION,0);
753  switch(this->mode) {
754  case VOLATILE_UNLOGGED:
755  vRet = this->template _bio_put<VolatileUnloggedObjectStore>(object, force_client);
756  break;
757  case PERSISTENT_LOGGED:
758  vRet = this->template _bio_put<PersistentLoggedObjectStore>(object, force_client);
759  break;
760  default:
761  dbg_default_error("Cannot execute 'put' in unsupported mode {}.", mode);
762  }
763  return vRet;
764  }
765 
766  // non-blocking put
767  virtual derecho::rpc::QueryResults<std::tuple<version_t,uint64_t>> aio_put(const Object& object, const bool& force_client) {
768  dbg_default_debug("aio_put object id={}, mode={}, force_client={}", object.oid, mode, force_client);
769  switch(this->mode) {
770  case VOLATILE_UNLOGGED:
771  return this->template _aio_put<VolatileUnloggedObjectStore>(object, force_client);
772  case PERSISTENT_LOGGED:
773  return this->template _aio_put<PersistentLoggedObjectStore>(object, force_client);
774  default:
775  dbg_default_error("Cannot execute 'put' in unsupported mode {}.", mode);
776  throw derecho::derecho_exception("Cannot execute 'put' in unsupported mode");
777  }
778  }
779 
780  template <typename T>
782  std::lock_guard<std::mutex> guard(write_mutex);
783  if(bReplica && !force_client) {
784  // replica server can do ordered send
785  derecho::Replicated<T>& os_rpc_handle = group.template get_subgroup<T>();
786  return std::move(os_rpc_handle.template ordered_send<RPC_NAME(orderedRemove)>(oid));
787  } else {
788  // send request to a static mapped replica. Use random mapping for load-balance?
789  node_id_t target = replicas[myid % replicas.size()];
790  derecho::ExternalCaller<T>& os_p2p_handle = group.get_nonmember_subgroup<T>();
791  return std::move(os_p2p_handle.template p2p_send<RPC_NAME(remove)>(target, oid));
792  }
793  }
794 
795  template <typename T>
796  std::tuple<version_t,uint64_t> _bio_remove(const OID& oid, const bool& force_client) {
797  derecho::rpc::QueryResults<std::tuple<version_t,uint64_t>> results = this->template _aio_remove<T>(oid, force_client);
798  decltype(results)::ReplyMap& replies = results.get();
799 
800  std::tuple<version_t,uint64_t> vRet(INVALID_VERSION,0);
801  for(auto& reply_pair : replies) {
802  vRet = reply_pair.second.get();
803  }
804  return vRet;
805  }
806 
807  // blocking remove
808  virtual std::tuple<version_t,uint64_t> bio_remove(const OID& oid, const bool& force_client) {
809  dbg_default_debug("bio_remove object id={}, mode={}, force_client={}", oid, mode, force_client);
810  switch(this->mode) {
811  case VOLATILE_UNLOGGED:
812  return this->template _bio_remove<VolatileUnloggedObjectStore>(oid, force_client);
813  case PERSISTENT_LOGGED:
814  return this->template _bio_remove<PersistentLoggedObjectStore>(oid, force_client);
815  default:
816  dbg_default_error("Cannot execute 'remove' in unsupported mode {}.", mode);
817  throw derecho::derecho_exception("Cannot execute 'remove' in unsupported mode {}.'");
818  }
819  }
820 
821  // non-blocking remove
822  virtual derecho::rpc::QueryResults<std::tuple<version_t,uint64_t>> aio_remove(const OID& oid, const bool& force_client) {
823  dbg_default_debug("aio_remove object id={}, mode={}, force_client={}", oid, mode, force_client);
824  switch(this->mode) {
825  case VOLATILE_UNLOGGED:
826  return this->template _aio_remove<VolatileUnloggedObjectStore>(oid, force_client);
827  case PERSISTENT_LOGGED:
828  return this->template _aio_remove<PersistentLoggedObjectStore>(oid, force_client);
829  default:
830  dbg_default_error("Cannot execute 'remove' in unsupported mode {}.", mode);
831  throw derecho::derecho_exception("Cannot execute 'remove' in unsupported mode {}.'");
832  }
833  }
834 
835  // get
836  template <typename T>
837  derecho::rpc::QueryResults<const Object> _aio_get(const OID& oid, const version_t& ver, const bool& force_client) {
838  std::lock_guard<std::mutex> guard(write_mutex);
839  if(bReplica && !force_client) {
840  derecho::Replicated<T>& os_rpc_handle = group.template get_subgroup<T>();
841  if(ver == INVALID_VERSION) {
842  // replica server can do ordered send
843  return std::move(os_rpc_handle.template ordered_send<RPC_NAME(orderedGet)>(oid));
844  } else {
845  // send a local p2p send/query to access history versions
846  return std::move(os_rpc_handle.template p2p_send<RPC_NAME(get)>(myid, oid, ver));
847  }
848  } else {
849  // Send request to a static mapped replica. Use random mapping for load-balance?
850  node_id_t target = replicas[myid % replicas.size()];
851  derecho::ExternalCaller<T>& os_p2p_handle = group.template get_nonmember_subgroup<T>();
852  return std::move(os_p2p_handle.template p2p_send<RPC_NAME(get)>(target, oid, ver));
853  }
854  }
855 
856  template <typename T>
857  derecho::rpc::QueryResults<const Object> _aio_get(const OID& oid, const uint64_t& ts_us) {
858  std::lock_guard<std::mutex> guard(write_mutex);
859  if (bReplica) {
860  // send to myself.
861  derecho::Replicated<T>& os_rpc_handle = group.template get_subgroup<T>();
862  return std::move(os_rpc_handle.template p2p_send<RPC_NAME(get_by_time)>(myid, oid, ts_us));
863  } else {
864  // Send request to a static mapped replica. Use random mapping for load-balance?
865  node_id_t target = replicas[myid % replicas.size()];
866  derecho::ExternalCaller<T>& os_p2p_handle = group.template get_nonmember_subgroup<T>();
867  return std::move(os_p2p_handle.template p2p_send<RPC_NAME(get_by_time)>(target, oid, ts_us));
868  }
869  }
870 
871  template <typename T>
872  Object _bio_get(const OID& oid, const version_t& ver, const bool& force_client) {
873  derecho::rpc::QueryResults<const Object> results = this->template _aio_get<T>(oid, ver, force_client);
874  decltype(results)::ReplyMap& replies = results.get();
875  // should we check reply consistency?
876  return replies.begin()->second.get();
877  }
878 
879  template <typename T>
880  Object _bio_get(const OID& oid, const uint64_t& ts_us) {
881  derecho::rpc::QueryResults<const Object> results = this->template _aio_get<T>(oid, ts_us);
882  decltype(results)::ReplyMap& replies = results.get();
883  return replies.begin()->second.get();
884  }
885 
886  virtual Object bio_get(const OID& oid, const version_t& ver, const bool& force_client) {
887  dbg_default_debug("bio_get object id={}, ver={}, mode={}, force_client={}", oid, ver, mode, force_client);
888  switch(this->mode) {
889  case VOLATILE_UNLOGGED:
890  return this->template _bio_get<VolatileUnloggedObjectStore>(oid, ver, force_client);
891  case PERSISTENT_LOGGED:
892  return this->template _bio_get<PersistentLoggedObjectStore>(oid, ver, force_client);
893  default:
894  dbg_default_error("Cannot execute 'get' in unsupported mode {}.", mode);
895  throw derecho::derecho_exception("Cannot execute 'get' in unsupported mode {}.'");
896  }
897  }
898 
899  virtual Object bio_get(const OID& oid, const uint64_t& ts_us) {
900  dbg_default_debug("bio_get object id={}, ts_us={}.", oid, ts_us);
901  switch(this->mode) {
902  case VOLATILE_UNLOGGED:
903  return this->template _bio_get<VolatileUnloggedObjectStore>(oid, ts_us);
904  case PERSISTENT_LOGGED:
905  return this->template _bio_get<PersistentLoggedObjectStore>(oid, ts_us);
906  default:
907  dbg_default_error("Cannot execute 'get' in unsupported mode {}.", mode);
908  throw derecho::derecho_exception("Cannot execute 'get' in unsupported mode {}.'");
909  }
910  }
911 
912  virtual derecho::rpc::QueryResults<const Object> aio_get(const OID& oid, const version_t& ver, const bool& force_client) {
913  dbg_default_debug("aio_get object id={}, ver={}, mode={}, force_client={}", oid, ver, mode, force_client);
914  switch(this->mode) {
915  case VOLATILE_UNLOGGED:
916  return this->template _aio_get<VolatileUnloggedObjectStore>(oid, ver, force_client);
917  case PERSISTENT_LOGGED:
918  return this->template _aio_get<PersistentLoggedObjectStore>(oid, ver, force_client);
919  default:
920  dbg_default_error("Cannot execute 'get' in unsupported mode {}.", mode);
921  throw derecho::derecho_exception("Cannot execute 'get' in unsupported mode {}.'");
922  }
923  }
924 
925  virtual derecho::rpc::QueryResults<const Object> aio_get(const OID& oid, const uint64_t& ts_us) {
926  dbg_default_debug("aio_get object id={}, ts_us={}.", oid, ts_us);
927  switch(this->mode) {
928  case VOLATILE_UNLOGGED:
929  return this->template _aio_get<VolatileUnloggedObjectStore>(oid, ts_us);
930  case PERSISTENT_LOGGED:
931  return this->template _aio_get<PersistentLoggedObjectStore>(oid, ts_us);
932  default:
933  dbg_default_error("Cannot execute 'get' in unsupported mode {}.", mode);
934  throw derecho::derecho_exception("Cannot execute 'get' in unsupported mode {}.'");
935  }
936  }
937 
938  virtual void leave(bool group_shutdown) {
939  if(group_shutdown) {
940  group.barrier_sync();
941  }
942  group.leave(group_shutdown);
943  }
944 
945  virtual const ObjectWatcher& getObjectWatcher() {
946  return this->object_watcher;
947  }
948 
949  // get singleton
950  static IObjectStoreService& get(int argc, char** argv, const ObjectWatcher& ow = {});
951 };
952 
953 // The singleton unique pointer
954 std::shared_ptr<IObjectStoreService> IObjectStoreService::singleton;
955 
956 // get the singleton
957 // NOTE: caller only get access to this member object. The ownership of this
958 // object is NOT transferred.
960  if(IObjectStoreService::singleton.get() == nullptr) {
961  // step 1: initialize the configuration
962  derecho::Conf::initialize(argc, argv);
963  // step 2: create the group resources
964  IObjectStoreService::singleton = std::make_shared<ObjectStoreService>(ow);
965  }
966 
967  return *IObjectStoreService::singleton.get();
968 };
969 
970 } // namespace objectstore
std::map< OID, Object > objects
virtual const ObjectWatcher & getObjectWatcher()=0
derecho::rpc::QueryResults< const Object > _aio_get(const OID &oid, const version_t &ver, const bool &force_client)
virtual derecho::rpc::QueryResults< const Object > aio_get(const OID &oid, const version_t &ver, const bool &force_client)
virtual void leave(bool group_shutdown)
virtual std::tuple< version_t, uint64_t > bio_put(const Object &object, const bool &force_client)
void barrier_sync()
Waits until all members of the group have called this function.
Definition: group_impl.hpp:431
The top-level object for creating a Derecho group.
Definition: group.hpp:122
std::enable_if_t< std::is_base_of< ByteRepresentable CMA T >::value, std::unique_ptr< T > > from_bytes(DeserializationManager *ctx, char const *v)
Calls T::from_bytes(ctx,v) when T is a ByteRepresentable.
DeltaObjectStoreCore(const std::map< OID, Object > &_objects, const ObjectWatcher &ow)
virtual derecho::rpc::QueryResults< std::tuple< version_t, uint64_t > > aio_put(const Object &object, const bool &force_client)
static IObjectStoreService & getObjectStoreService(int argc, char **argv, const ObjectWatcher &ow={})
virtual const Object get_by_time(const OID &oid, const uint64_t &ts_us)
void applyOrderedPut(const Object &object)
virtual std::tuple< version_t, uint64_t > orderedPut(const Object &object)
std::vector< std::vector< SubView > > subgroup_shard_layout_t
The data structure used to store a subgroups-and-shards layout for a single subgroup type (i...
derecho::rpc::QueryResults< std::tuple< version_t, uint64_t > > _aio_remove(const OID &oid, const bool &force_client)
const std::string & getConfString(const std::string &key)
Definition: conf.cpp:110
virtual int64_t getLatestVersion() noexcept(false)
get the lastest version excluding truncated ones.
persistent::version_t version_t
Definition: ObjectStore.hpp:11
#define CONF_OBJECTSTORE_REPLICAS
Definition: ObjectStore.cpp:52
DeltaObjectStoreCore(const ObjectWatcher &ow)
void ensure_registered(mutils::DeserializationManager &)
static std::unique_ptr< DeltaObjectStoreCore > create(mutils::DeserializationManager *dm)
int argc
const ObjectWatcher & object_watcher
STL namespace.
ExternalCaller< SubgroupType > & get_nonmember_subgroup(uint32_t subgroup_index=0)
Gets the "handle" for a subgroup of the specified type and index, assuming this node is not a member ...
Definition: group_impl.hpp:335
virtual Object bio_get(const OID &oid, const version_t &ver, const bool &force_client)
void leave(bool group_shutdown=true)
Causes this node to cleanly leave the group by setting itself to "failed.".
Definition: group_impl.hpp:396
const ObjectWatcher object_watcher
const uint32_t getConfUInt32(const std::string &key)
Definition: conf.cpp:118
VolatileUnloggedObjectStore(const std::map< OID, Object > &_objects, const ObjectWatcher &ow)
virtual const Object orderedGet(const OID &oid)
derecho::Group< VolatileUnloggedObjectStore, PersistentLoggedObjectStore > group
virtual void finalizeCurrentDelta(const DeltaFinalizer &df)
std::tuple< version_t, uint64_t > get_version()
#define dbg_default_debug(...)
Definition: logger.hpp:42
#define DEFAULT_DESERIALIZE_NOALLOC(Name)
#define dbg_default_error(...)
Definition: logger.hpp:48
std::tuple< version_t, uint64_t > _bio_remove(const OID &oid, const bool &force_client)
std::tuple< version_t, uint64_t > _bio_put(const Object &object, const bool &force_client)
std::function< void(char const *const, std::size_t)> DeltaFinalizer
Definition: Persistent.hpp:255
A non-POD type which wishes to mark itself byte representable should extend this class.
auto bytes_size(const T &)
Just calls sizeof(T)
An exception that indicates that a subgroup membership function was unable to finish executing becaus...
std::map< std::type_index, subgroup_shard_layout_t > subgroup_allocation_map_t
The data structure used to store the subgroups-and-shards layouts for all subgroup types in a Group (...
virtual std::tuple< version_t, uint64_t > orderedRemove(const OID &oid)
virtual void applyDelta(char const *const delta)
PersistentLoggedObjectStore(persistent::PersistentRegistry *pr, IObjectStoreService &oss)
#define DEFAULT_DELTA_BUFFER_CAPACITY
static void initialize(int argc, char *argv[], const char *conf_file=nullptr)
Definition: conf.cpp:67
static std::unique_ptr< DeltaObjectStoreCore > from_bytes(mutils::DeserializationManager *dsm, char const *buf)
virtual const Object get_by_time(const OID &oid, const uint64_t &ts_us)
char ** argv
The manager for any RemoteDeserializationContexts.
virtual std::tuple< version_t, uint64_t > put(const Object &object)=0
virtual const Object get_by_time(const OID &oid, const uint64_t &ts_us)=0
PersistentLoggedObjectStore(Persistent< DeltaObjectStoreCore > &&_persistent_objectstore)
#define DEFAULT_SERIALIZE(...)
VolatileUnloggedObjectStore(const ObjectWatcher &ow)
derecho::rpc::QueryResults< const Object > _aio_get(const OID &oid, const uint64_t &ts_us)
T & mgr()
Lookup the context registered at this DeserializationManager whose type is T.
static std::unique_ptr< PersistentLoggedObjectStore > from_bytes(mutils::DeserializationManager *dsm, char const *buf)
void ensure_registered(mutils::DeserializationManager &)
#define CONF_OBJECTSTORE_MIN_REPLICATION_FACTOR
Definition: ObjectStore.cpp:51
Object _bio_get(const OID &oid, const uint64_t &ts_us)
#define INVALID_VERSION
Definition: PersistLog.hpp:28
virtual std::tuple< version_t, uint64_t > put(const Object &object)
virtual std::tuple< version_t, uint64_t > bio_remove(const OID &oid, const bool &force_client)
virtual const Object orderedGet(const OID &oid)
std::function< void(const OID &, const Object &)> ObjectWatcher
Definition: ObjectStore.hpp:10
#define RPC_NAME(...)
This macro generates the Derecho-registered name of an RPC function, for use in the template paramete...
virtual const bool isReplica()
This is a marker interface for user-defined Replicated Objects (i.e.
Definition: replicated.hpp:35
virtual derecho::rpc::QueryResults< const Object > aio_get(const OID &oid, const uint64_t &ts_us)
virtual std::tuple< persistent::version_t, uint64_t > get_next_version()
Get the next version to be handled.
Definition: replicated.hpp:298
virtual bool orderedPut(const Object &object)
#define dbg_default_crit(...)
Definition: logger.hpp:50
Definition: HLC.hpp:7
virtual Object bio_get(const OID &oid, const uint64_t &ts_us)
uint32_t node_id_t
Type alias for Node IDs in a Derecho group.
VolatileUnloggedObjectStore(std::map< OID, Object > &&_objects, const ObjectWatcher &ow)
const bool getConfBoolean(const std::string &key)
Definition: conf.cpp:146
PersistentRegistry pr(nullptr, typeid(ReplicatedT), 123, 321)
ObjectStoreService(const ObjectWatcher &ow)
static std::shared_ptr< IObjectStoreService > singleton
Definition: ObjectStore.hpp:15
virtual const Object orderedGet(const OID &oid)
virtual std::tuple< version_t, uint64_t > put(const Object &object)
void ensure_registered(mutils::DeserializationManager &)
#define dbg_default_info(...)
Definition: logger.hpp:44
uint64_t OID
Definition: Object.hpp:70
virtual const ObjectWatcher & getObjectWatcher()
std::vector< node_id_t > replicas
DeltaObjectStoreCore(std::map< OID, Object > &&_objects, const ObjectWatcher &ow)
#define CONF_DERECHO_LOCAL_ID
Definition: conf.hpp:24
derecho::rpc::QueryResults< std::tuple< version_t, uint64_t > > _aio_put(const Object &object, const bool &force_client)
Object _bio_get(const OID &oid, const version_t &ver, const bool &force_client)
#define CONF_OBJECTSTORE_LOGGED
Definition: ObjectStore.cpp:54
bool applyOrderedRemove(const OID &oid)
virtual std::tuple< version_t, uint64_t > orderedPut(const Object &object)
Data structure that (indirectly) holds a set of futures for a single RPC function call; there is one ...
Definition: rpc_utils.hpp:158
ReplyMap & get()
Block until the ReplyMap is fulfilled, then return the map by reference.
Definition: rpc_utils.hpp:235
auto ordered_send(Args &&... args)
Sends a multicast to the entire subgroup that replicates this Replicated<T>, invoking the RPC functio...
Base exception class for all exceptions raised by Derecho.
auto get(const Func &fun, mutils::DeserializationManager *dm=nullptr) noexcept(false)
get the latest Value of T.
const uint64_t getConfUInt64(const std::string &key)
Definition: conf.cpp:134
virtual bool orderedRemove(const OID &oid)
#define CONF_OBJECTSTORE_PERSISTED
Definition: ObjectStore.cpp:53
Persistent< DeltaObjectStoreCore > persistent_objectstore
static std::unique_ptr< VolatileUnloggedObjectStore > from_bytes(mutils::DeserializationManager *dsm, char const *buf)
virtual derecho::rpc::QueryResults< std::tuple< version_t, uint64_t > > aio_remove(const OID &oid, const bool &force_client)
PersistentRegistry is a book for all the Persistent<T> or Volatile<T> variables.
Definition: Persistent.hpp:81
virtual std::tuple< version_t, uint64_t > orderedRemove(const OID &oid)
#define REGISTER_RPC_FUNCTIONS(...)
This macro automatically generates a register_functions() method for a Derecho Replicated Object...
#define dbg_default_warn(...)
Definition: logger.hpp:46