Derecho  0.9
Distributed systems toolkit for RDMA
sst_impl.hpp
Go to the documentation of this file.
1 
7 #pragma once
8 
9 #include <chrono>
10 #include <condition_variable>
11 #include <memory>
12 #include <mutex>
13 #include <pthread.h>
14 #include <sys/time.h>
15 #include <thread>
16 #include <time.h>
17 #include <vector>
18 
19 #include "poll_utils.hpp"
20 #include "../predicates.hpp"
21 #include "../sst.hpp"
22 
23 namespace sst {
24 
29 template <typename DerivedSST>
31  thread_shutdown = true;
32  for(auto& thread : background_threads) {
33  if(thread.joinable()) thread.join();
34  }
35 
36  if(rows != nullptr) {
37  delete[](const_cast<char*>(rows));
38  }
39 }
40 
46 template <typename DerivedSST>
48  std::lock_guard<std::mutex> lock(thread_start_mutex);
49  thread_start = true;
50  thread_start_cv.notify_all();
51 }
52 
60 template <typename DerivedSST>
62  pthread_setname_np(pthread_self(), "sst_detect");
63  if(!thread_start) {
64  std::unique_lock<std::mutex> lock(thread_start_mutex);
65  thread_start_cv.wait(lock, [this]() { return thread_start; });
66  }
67  struct timespec last_time, cur_time;
68  clock_gettime(CLOCK_REALTIME, &last_time);
69 
70  while(!thread_shutdown) {
71  bool predicate_fired = false;
72  // Take the predicate lock before reading the predicate lists
73  std::unique_lock<std::mutex> predicates_lock(predicates.predicate_mutex);
74 
75  // one time predicates need to be evaluated only until they become true
76  for(auto& pred : predicates.one_time_predicates) {
77  if(pred != nullptr && (pred->first(*derived_this) == true)) {
78  predicate_fired = true;
79  // Copy the trigger pointer locally, so it can continue running without
80  // segfaulting even if this predicate gets deleted when we unlock predicates_lock
81  std::shared_ptr<typename Predicates<DerivedSST>::trig> trigger(pred->second);
82  predicates_lock.unlock();
83  (*trigger)(*derived_this);
84  predicates_lock.lock();
85  // erase the predicate as it was just found to be true
86  pred.reset();
87  }
88  }
89 
90  // recurrent predicates are evaluated each time they are found to be true
91  for(auto& pred : predicates.recurrent_predicates) {
92  if(pred != nullptr && (pred->first(*derived_this) == true)) {
93  predicate_fired = true;
94  std::shared_ptr<typename Predicates<DerivedSST>::trig> trigger(pred->second);
95  predicates_lock.unlock();
96  (*trigger)(*derived_this);
97  predicates_lock.lock();
98  }
99  }
100 
101  // transition predicates are only evaluated when they change from false to true
102  // We need to use iterators here because we need to iterate over two lists in parallel
103  auto pred_it = predicates.transition_predicates.begin();
104  auto pred_state_it = predicates.transition_predicate_states.begin();
105  while(pred_it != predicates.transition_predicates.end()) {
106  if(*pred_it != nullptr) {
107  //*pred_state_it is the previous state of the predicate at *pred_it
108  bool curr_pred_state = (*pred_it)->first(*derived_this);
109  if(curr_pred_state == true && *pred_state_it == false) {
110  predicate_fired = true;
111  std::shared_ptr<typename Predicates<DerivedSST>::trig> trigger(
112  (*pred_it)->second);
113  predicates_lock.unlock();
114  (*trigger)(*derived_this);
115  predicates_lock.lock();
116  }
117  *pred_state_it = curr_pred_state;
118 
119  ++pred_it;
120  ++pred_state_it;
121  }
122  }
123 
124  if(predicate_fired) {
125  // update last time
126  clock_gettime(CLOCK_REALTIME, &last_time);
127  } else {
128  clock_gettime(CLOCK_REALTIME, &cur_time);
129  // check if the system has been inactive for enough time to induce sleep
130  double time_elapsed_in_ms = (cur_time.tv_sec - last_time.tv_sec) * 1e3
131  + (cur_time.tv_nsec - last_time.tv_nsec) / 1e6;
132  if(time_elapsed_in_ms > 1) {
133  predicates_lock.unlock();
134  using namespace std::chrono_literals;
135  std::this_thread::sleep_for(1ms);
136  predicates_lock.lock();
137  }
138  }
139  //Still to do: Clean up deleted predicates
140  }
141 }
142 
143 template <typename DerivedSST>
144 void SST<DerivedSST>::put(const std::vector<uint32_t> receiver_ranks, size_t offset, size_t size) {
145  assert(offset + size <= rowLen);
146  for(auto index : receiver_ranks) {
147  // don't write to yourself or a frozen row
148  if(index == my_index || row_is_frozen[index]) {
149  continue;
150  }
151  // perform a remote RDMA write on the owner of the row
152  res_vec[index]->post_remote_write(offset, size);
153  }
154  return;
155 }
156 
157 template <typename DerivedSST>
158 void SST<DerivedSST>::put_with_completion(const std::vector<uint32_t> receiver_ranks, size_t offset, size_t size) {
159  assert(offset + size <= rowLen);
160  unsigned int num_writes_posted = 0;
161  std::vector<bool> posted_write_to(num_members, false);
162 
163  const auto tid = std::this_thread::get_id();
164  // get id first
165  uint32_t ce_idx = util::polling_data.get_index(tid);
166 
168 #ifdef USE_VERBS_API
169  struct verbs_sender_ctxt sctxt[receiver_ranks.size()];
170 #else
171  struct lf_sender_ctxt sctxt[receiver_ranks.size()];
172 #endif
173  for(auto index : receiver_ranks) {
174  // don't write to yourself or a frozen row
175  if(index == my_index || row_is_frozen[index]) {
176  continue;
177  }
178  // perform a remote RDMA write on the owner of the row
179  sctxt[index].remote_id = index;
180  sctxt[index].ce_idx = ce_idx;
181  res_vec[index]->post_remote_write_with_completion(&sctxt[index], offset, size);
182  posted_write_to[index] = true;
183  num_writes_posted++;
184  }
185 
186  // track which nodes respond successfully
187  std::vector<bool> polled_successfully_from(num_members, false);
188 
189  std::vector<uint32_t> failed_node_indexes;
190 
191  unsigned long start_time_msec;
192  unsigned long cur_time_msec;
193  struct timeval cur_time;
194 
195  // wait for completions for a while but eventually give up on it
196  gettimeofday(&cur_time, NULL);
197  start_time_msec = (cur_time.tv_sec * 1000) + (cur_time.tv_usec / 1000);
198 
199  // poll for a single completion for each write request submitted
200  for(unsigned int index = 0; index < num_writes_posted; ++index) {
201  std::optional<std::pair<int32_t, int32_t>> ce;
202 
203  while(true) {
204  // check if polling result is available
206  if(ce) {
207  break;
208  }
209  gettimeofday(&cur_time, NULL);
210  cur_time_msec = (cur_time.tv_sec * 1000) + (cur_time.tv_usec / 1000);
211  if((cur_time_msec - start_time_msec) >= poll_cq_timeout_ms) {
212  break;
213  }
214  }
215  // if waiting for a completion entry timed out
216  if(!ce) {
217  // mark all nodes that have not yet responded as failed
218  for(unsigned int index2 = 0; index2 < num_members; ++index2) {
219  if(!posted_write_to[index2] || polled_successfully_from[index2]) {
220  continue;
221  }
222  failed_node_indexes.push_back(index2);
223  }
224  break;
225  }
226 
227  auto ce_v = ce.value();
228  int remote_id = ce_v.first;
229  int result = ce_v.second;
230  if(result == 1) {
231  polled_successfully_from[remote_id] = true;
232  } else if(result == -1) {
233  if(!row_is_frozen[index]) {
234  failed_node_indexes.push_back(remote_id);
235  }
236  }
237  }
238 
240 
241  for(auto index : failed_node_indexes) {
242  freeze(index);
243  }
244 }
245 
246 template <typename DerivedSST>
247 void SST<DerivedSST>::freeze(int row_index) {
248  {
249  std::lock_guard<std::mutex> lock(freeze_mutex);
250  if(row_is_frozen[row_index]) {
251  return;
252  }
253  row_is_frozen[row_index] = true;
254  }
255  num_frozen++;
256  //BUG: deleting from res_vec here creates a race with put(), which blindly
257  //dereferences res_vec[index] after checking fow_is_frozen
258 // res_vec[row_index].reset();
259  if(failure_upcall) {
260  failure_upcall(members[row_index]);
261  }
262 }
263 
271 template <typename DerivedSST>
273  unsigned int node_id, sst_index;
274  for(auto const& id_index : members_by_id) {
275  std::tie(node_id, sst_index) = id_index;
276  if(sst_index != my_index && !row_is_frozen[sst_index]) {
277  sync(node_id);
278  }
279  }
280 }
281 
285 template <typename DerivedSST>
286 void SST<DerivedSST>::sync_with_members(std::vector<uint32_t> row_indices) const {
287  for(auto const& row_index : row_indices) {
288  if(row_index == my_index) {
289  continue;
290  }
291  if(!row_is_frozen[row_index]) {
292  sync(members[row_index]);
293  }
294  }
295 }
296 } // namespace sst
uint32_t remote_id
Definition: lf.hpp:24
uint32_t get_index(const std::thread::id id)
Definition: poll_utils.cpp:39
uint32_t ce_idx
Definition: lf.hpp:23
PollingData polling_data
Definition: poll_utils.cpp:17
void set_waiting(const std::thread::id id)
Definition: poll_utils.cpp:49
void reset_waiting(const std::thread::id id)
Definition: poll_utils.cpp:56
~SST()
Destructor for the SST object; sets thread_shutdown to true and waits for background threads to exit ...
Definition: sst_impl.hpp:30
void sync_with_members() const
Does a TCP sync with each member of the SST.
Definition: sst_impl.hpp:272
bool sync(uint32_t r_id)
Blocks the current thread until both this node and a remote node reach this function, which exchanges some trivial data over a TCP connection.
Definition: lf.cpp:565
void put_with_completion()
Definition: sst.hpp:329
void freeze(int row_index)
Marks a row as frozen, so it will no longer update, and its corresponding node will not receive write...
Definition: sst_impl.hpp:247
void detect()
This function is run in a detached background thread to detect predicate events.
Definition: sst_impl.hpp:61
void start_predicate_evaluation()
Starts the predicate evaluation loop.
Definition: sst_impl.hpp:47
std::optional< std::pair< int32_t, int32_t > > get_completion_entry(const std::thread::id id)
Definition: poll_utils.cpp:28
void put()
Writes the entire local row to all remote nodes.
Definition: sst.hpp:325