Derecho  0.9
Distributed systems toolkit for RDMA
persistence_manager.cpp
Go to the documentation of this file.
1 
7 
8 namespace derecho {
9 
14  std::map<subgroup_id_t, std::reference_wrapper<ReplicatedObject>>& objects_map,
15  const persistence_callback_t& _persistence_callback)
16  : thread_shutdown(false),
17  persistence_callback(_persistence_callback),
18  objects_by_subgroup_id(objects_map) {
19  // initialize semaphore
20  if(sem_init(&persistence_request_sem, 1, 0) != 0) {
21  throw derecho_exception("Cannot initialize persistent_request_sem:errno=" + std::to_string(errno));
22  }
23 }
24 
25 
29  sem_destroy(&persistence_request_sem);
30 }
31 
33  this->view_manager = &view_manager;
34 }
35 
38  //skip for raw subgroups -- NO, DON'T
39  // if(replicated_objects == nullptr) return;
40 
41  this->persist_thread = std::thread{[this]() {
42  pthread_setname_np(pthread_self(), "persist");
43  do {
44  // wait for semaphore
45  sem_wait(&persistence_request_sem);
46  while(prq_lock.test_and_set(std::memory_order_acquire)) // acquire lock
47  ; // spin
48  if(this->persistence_request_queue.empty()) {
49  prq_lock.clear(std::memory_order_release); // release lock
50  if(this->thread_shutdown) {
51  break;
52  }
53  continue;
54  }
55 
56  subgroup_id_t subgroup_id = std::get<0>(persistence_request_queue.front());
57  persistent::version_t version = std::get<1>(persistence_request_queue.front());
59  prq_lock.clear(std::memory_order_release); // release lock
60 
61  // persist
62  try {
63  auto search = objects_by_subgroup_id.find(subgroup_id);
64  if(search != objects_by_subgroup_id.end()) {
65  search->second.get().persist(version);
66  }
67  // read lock the view
68  std::shared_lock<std::shared_timed_mutex> read_lock(view_manager->view_mutex);
69  // update the persisted_num in SST
70 
71  View& Vc = *view_manager->curr_view;
72  Vc.gmsSST->persisted_num[Vc.gmsSST->get_local_index()][subgroup_id] = version;
73  Vc.gmsSST->put(Vc.multicast_group->get_shard_sst_indices(subgroup_id),
74  (char*)std::addressof(Vc.gmsSST->persisted_num[0][subgroup_id]) - Vc.gmsSST->getBaseAddress(),
75  sizeof(long long int));
76  } catch(uint64_t exp) {
77  dbg_default_debug("exception on persist():subgroup={},ver={},exp={}.", subgroup_id, version, exp);
78  std::cout << "exception on persistent:subgroup=" << subgroup_id << ",ver=" << version << "exception=0x" << std::hex << exp << std::endl;
79  }
80 
81  // callback
82  if(this->persistence_callback != nullptr) {
83  this->persistence_callback(subgroup_id, version);
84  }
85 
86  if(this->thread_shutdown) {
87  while(prq_lock.test_and_set(std::memory_order_acquire)) // acquire lock
88  ; // spin
89  if(persistence_request_queue.empty()) {
90  prq_lock.clear(std::memory_order_release); // release lock
91  break; // finish
92  }
93  prq_lock.clear(std::memory_order_release); // release lock
94  }
95  } while(true);
96  }};
97 }
98 
101  // request enqueue
102  while(prq_lock.test_and_set(std::memory_order_acquire)) // acquire lock
103  ; // spin
104  persistence_request_queue.push(std::make_tuple(subgroup_id, version));
105  prq_lock.clear(std::memory_order_release); // release lock
106  // post semaphore
107  sem_post(&persistence_request_sem);
108 }
109 
112  const persistent::version_t& version, const HLC& mhlc) {
113  auto search = objects_by_subgroup_id.find(subgroup_id);
114  if(search != objects_by_subgroup_id.end()) {
115  search->second.get().make_version(version, mhlc);
116  }
117 }
118 
123  // if(replicated_objects == nullptr) return; //skip for raw subgroups - NO DON'T
124 
125  thread_shutdown = true;
126  sem_post(&persistence_request_sem); // kick the persistence thread in case it is sleeping
127 
128  if(wait) {
129  this->persist_thread.join();
130  }
131 }
132 
137  return std::make_tuple(
138  [this](const subgroup_id_t& subgroup_id,
139  const persistent::version_t& ver,
140  const HLC& mhlc) {
141  this->make_version(subgroup_id, ver, mhlc);
142  },
143  [this](const subgroup_id_t& subgroup_id,
144  const persistent::version_t& ver) {
145  this->post_persist_request(subgroup_id, ver);
146  });
147 }
148 } // namespace derecho
uint32_t subgroup_id_t
Type alias for the internal Subgroup IDs generated by ViewManager.
std::atomic< bool > thread_shutdown
A flag to singal the persistent thread to shutdown; set to true when the group is destroyed...
PersistenceManager(std::map< subgroup_id_t, std::reference_wrapper< ReplicatedObject >> &objects_map, const persistence_callback_t &_persistence_callback)
Constructor.
void shutdown(bool wait)
shutdown the thread - wait till the thread finished or not.
std::queue< persistence_request_t > persistence_request_queue
a queue for the requests
persistence_manager_callbacks_t get_callbacks()
get the persistence callbacks.
std::unique_ptr< MulticastGroup > multicast_group
RDMC manager object used for sending multicasts.
Definition: view.hpp:126
void post_persist_request(const subgroup_id_t &subgroup_id, const persistent::version_t &version)
post a persistence request
virtual ~PersistenceManager()
default Destructor
#define dbg_default_debug(...)
Definition: logger.hpp:42
void start()
Start the persistent thread.
std::shared_ptr< DerechoSST > gmsSST
Pointer to the SST instance used by the GMS in this View.
Definition: view.hpp:128
void make_version(const subgroup_id_t &subgroup_id, const persistent::version_t &version, const HLC &mhlc)
make a version
void set_view_manager(ViewManager &view_manager)
std::function< void(subgroup_id_t, persistent::version_t)> persistence_callback_t
std::map< subgroup_id_t, std::reference_wrapper< ReplicatedObject > > & objects_by_subgroup_id
Replicated Objects handle: TODO:make it safer.
ViewManager * view_manager
View Manager pointer.
Definition: HLC.hpp:7
std::shared_timed_mutex view_mutex
Controls access to curr_view.
sem_t persistence_request_sem
The semaphore for persistence request the persistent thread.
persistence_callback_t persistence_callback
persistence callback
std::atomic_flag prq_lock
lock for persistence request queue
std::tuple< persistence_manager_make_version_func_t, persistence_manager_post_persist_func_t > persistence_manager_callbacks_t
std::unique_ptr< View > curr_view
The current View, containing the state of the managed group.
Base exception class for all exceptions raised by Derecho.
std::thread persist_thread
Thread handle.