Derecho  0.9
Distributed systems toolkit for RDMA
sst.hpp
Go to the documentation of this file.
1 #pragma once
2 
3 #include <atomic>
4 #include <bitset>
5 #include <cassert>
6 #include <condition_variable>
7 #include <functional>
8 #include <iostream>
9 #include <list>
10 #include <memory>
11 #include <mutex>
12 #include <numeric>
13 #include <stdexcept>
14 #include <string.h>
15 #include <string>
16 #include <thread>
17 #include <vector>
18 
19 #include "predicates.hpp"
20 #include <derecho/conf/conf.hpp>
21 
22 #ifdef USE_VERBS_API
23 #include "detail/verbs.hpp"
24 #else //LIBFABRIC
25 #include "detail/lf.hpp"
26 #endif
27 
28 using sst::resources;
29 
30 namespace sst {
31 
32 const int alignTo = sizeof(long);
33 
34 constexpr size_t padded_len(const size_t& len) {
35  return (len < alignTo) ? alignTo : (len + alignTo) | (alignTo - 1);
36 }
37 
39 class _SSTField {
40 public:
41  volatile char* base;
42  size_t rowLen;
43  size_t field_len;
44 
45  _SSTField(const size_t field_len) : base(nullptr), rowLen(0), field_len(field_len) {}
46 
47  size_t set_base(volatile char* const base) {
48  this->base = base;
49  return padded_len(field_len);
50  }
51 
52  char* get_base() {
53  return const_cast<char*>(base);
54  }
55 
56  void set_rowLen(const size_t& _rowLen) { rowLen = _rowLen; }
57 };
58 
64 template <typename T>
65 class SSTField : public _SSTField {
66 public:
67  using _SSTField::base;
69  using _SSTField::rowLen;
70 
71  SSTField() : _SSTField(sizeof(T)) {
72  }
73 
74  // Tracks down the appropriate row
75  volatile T& operator[](const size_t row_idx) const { return ((T&)base[row_idx * rowLen]); }
76 
77  // Getter
78  volatile T const& operator()(const size_t row_idx) const {
79  return *(T*)(base + row_idx * rowLen);
80  }
81 
82  // Setter
83  void operator()(const size_t row_idx, T const v) { *(T*)(base + row_idx * rowLen) = v; }
84 };
85 
92 template <typename T>
93 class SSTFieldVector : public _SSTField {
94 private:
95  const size_t length;
96 
97 public:
98  using _SSTField::base;
100  using _SSTField::rowLen;
101  using value_type = T;
102 
103  SSTFieldVector(size_t num_elements) : _SSTField(num_elements * sizeof(T)), length(num_elements) {
104  }
105 
106  // Tracks down the appropriate row
107  volatile T* operator[](const size_t& idx) const { return (T*)(base + idx * rowLen); }
108 
110  size_t size() const { return length; }
111 
112  void __attribute__((noinline)) debug_print(size_t row_num) {
113  volatile T* arr = (*this)[row_num];
114  for(size_t j = 0; j < length; ++j) {
115  std::cout << arr[j] << " ";
116  }
117  std::cout << std::endl;
118  }
119 };
120 
121 typedef std::function<void(uint32_t)> failure_upcall_t;
122 
124 struct SSTParams {
125  const std::vector<uint32_t>& members;
126  const uint32_t my_node_id;
127  const failure_upcall_t failure_upcall;
128  const std::vector<char> already_failed;
130 
147  SSTParams(const std::vector<uint32_t>& _members,
148  const uint32_t my_node_id,
149  const failure_upcall_t failure_upcall = nullptr,
150  const std::vector<char> already_failed = {},
151  const bool start_predicate_thread = true)
152  : members(_members),
153  my_node_id(my_node_id),
154  failure_upcall(failure_upcall),
155  already_failed(already_failed),
156  start_predicate_thread(start_predicate_thread) {}
157 };
158 
159 template <class DerivedSST>
160 class SST {
161 private:
162  template <typename... Fields>
163  void init_SSTFields(Fields&... fields) {
164  rowLen = 0;
165  compute_rowLen(rowLen, fields...);
166  rows = new char[rowLen * num_members];
167  // snapshot = new char[rowLen * num_members];
168  volatile char* base = rows;
169  set_bases_and_rowLens(base, rowLen, fields...);
170  }
171 
172  DerivedSST* derived_this;
173 
174  std::vector<std::thread> background_threads;
175  std::atomic<bool> thread_shutdown;
176 
177  void detect();
178 
179 public:
181  friend class Predicates<DerivedSST>;
182 
183 private:
185  const uint32_t poll_cq_timeout_ms;
187  volatile char* rows;
188  // char* snapshot;
190  size_t rowLen;
192  const std::vector<uint32_t>& members;
194  const unsigned int num_members;
195  std::vector<uint32_t> all_indices;
197  unsigned int my_index;
199  std::map<uint32_t, int, std::greater<uint32_t>> members_by_id;
201  uint32_t my_node_id;
203  // std::map<int, int> qp_num_to_index;
204 
207  std::vector<bool> row_is_frozen;
209  int num_frozen{0};
211  failure_upcall_t failure_upcall;
213  std::mutex freeze_mutex;
214 
216  std::vector<std::unique_ptr<resources>> res_vec;
217 
222  std::mutex thread_start_mutex;
224  std::condition_variable thread_start_cv;
225 
226 public:
227  SST(DerivedSST* derived_class_pointer, const SSTParams& params)
228  : derived_this(derived_class_pointer),
229  thread_shutdown(false),
231  members(params.members),
232  num_members(members.size()),
233  all_indices(num_members),
234  my_node_id(params.my_node_id),
235  row_is_frozen(num_members),
236  failure_upcall(params.failure_upcall),
237  res_vec(num_members),
238  thread_start(params.start_predicate_thread) {
239  //Figure out my SST index
240  my_index = (uint)-1;
241  for(uint32_t i = 0; i < num_members; ++i) {
242  if(members[i] == my_node_id) {
243  my_index = i;
244  break;
245  }
246  }
247  assert(my_index != (uint)-1);
248 
249  std::iota(all_indices.begin(), all_indices.end(), 0);
250 
251  if(!params.already_failed.empty()) {
252  assert(params.already_failed.size() == num_members);
253  for(size_t index = 0; index < params.already_failed.size(); ++index) {
254  if(params.already_failed[index]) {
255  row_is_frozen[index] = true;
256  }
257  }
258  }
259 
260  // sort members descending by node ID, while keeping track of their
261  // specified index in the SST
262  for(unsigned int sst_index = 0; sst_index < num_members; ++sst_index) {
263  members_by_id[members[sst_index]] = sst_index;
264  }
265  }
266 
267  template <typename... Fields>
268  void SSTInit(Fields&... fields) {
269  //Initialize rows and set the "base" field of each SSTField
270  init_SSTFields(fields...);
271 
272  //Initialize res_vec with the correct offsets for each row
273  unsigned int node_rank, sst_index;
274  for(auto const& rank_index : members_by_id) {
275  std::tie(node_rank, sst_index) = rank_index;
276  char *write_addr, *read_addr;
277  write_addr = const_cast<char*>(rows) + rowLen * sst_index;
278  read_addr = const_cast<char*>(rows) + rowLen * my_index;
279  if(sst_index != my_index) {
280  if(row_is_frozen[sst_index]) {
281  continue;
282  }
283 #ifdef USE_VERBS_API
284  res_vec[sst_index] = std::make_unique<resources>(
285  node_rank, write_addr, read_addr, rowLen, rowLen);
286 #else // use libfabric api by default
287  res_vec[sst_index] = std::make_unique<resources>(
288  node_rank, write_addr, read_addr, rowLen, rowLen, (my_node_id < node_rank));
289 #endif
290  // update qp_num_to_index
291  // qp_num_to_index[res_vec[sst_index].get()->qp->qp_num] = sst_index;
292  }
293  }
294 
295  std::thread detector(&SST::detect, this);
296  background_threads.push_back(std::move(detector));
297  }
298 
299  ~SST();
300 
302  void start_predicate_evaluation();
303 
305  void sync_with_members() const;
306 
308  void sync_with_members(std::vector<uint32_t> row_indices) const;
309 
312  void freeze(int row_index);
313 
315  unsigned int get_num_rows() const { return num_members; }
316 
318  unsigned int get_local_index() const { return my_index; }
319 
320  const char* getBaseAddress() {
321  return const_cast<char*>(rows);
322  }
323 
325  void put() {
326  put(all_indices, 0, rowLen);
327  }
328 
330  put_with_completion(all_indices, 0, rowLen);
331  }
332 
334  void put(const std::vector<uint32_t> receiver_ranks) {
335  put(receiver_ranks, 0, rowLen);
336  }
337 
338  void put_with_completion(const std::vector<uint32_t> receiver_ranks) {
339  put_with_completion(receiver_ranks, 0, rowLen);
340  }
341 
343  void put(size_t offset, size_t size) {
344  put(all_indices, offset, size);
345  }
346 
348  template <typename T>
349  void put(SSTField<T>& field) {
350  put(all_indices, field.get_base() - getBaseAddress(), sizeof(field[0]));
351  }
352 
354  template <typename T>
355  void put(SSTFieldVector<T>& vec_field) {
356  put(all_indices, vec_field.get_base() - getBaseAddress(),
357  sizeof(vec_field[0][0]) * vec_field.size());
358  }
359 
361  template <typename T>
362  void put(SSTFieldVector<T>& vec_field, std::size_t index) {
363  put(all_indices,
364  const_cast<char*>(reinterpret_cast<volatile char*>(std::addressof(vec_field[0][index])))
365  - getBaseAddress(),
366  sizeof(vec_field[0][index]));
367  }
368 
370  template <typename T>
371  void put(const std::vector<uint32_t> receiver_ranks,
372  SSTFieldVector<T>& vec_field, std::size_t index) {
373  put(receiver_ranks,
374  const_cast<char*>(reinterpret_cast<volatile char*>(std::addressof(vec_field[0][index])))
375  - getBaseAddress(),
376  sizeof(vec_field[0][index]));
377  }
378 
379  void put_with_completion(size_t offset, size_t size) {
380  put_with_completion(all_indices, offset, size);
381  }
382 
384  void put(const std::vector<uint32_t> receiver_ranks, size_t offset, size_t size);
385 
386  void put_with_completion(const std::vector<uint32_t> receiver_ranks, size_t offset, size_t size);
387 
388 private:
389  using char_p = volatile char*;
390 
391  void compute_rowLen(size_t&) {}
392 
393  template <typename Field, typename... Fields>
394  void compute_rowLen(size_t& rowLen, Field& f, Fields&... rest) {
395  rowLen += padded_len(f.field_len);
396  compute_rowLen(rowLen, rest...);
397  }
398 
399  void set_bases_and_rowLens(char_p&, const size_t) {}
400 
401  template <typename Field, typename... Fields>
402  void set_bases_and_rowLens(char_p& base, const size_t rlen, Field& f, Fields&... rest) {
403  base += f.set_base(base);
404  f.set_rowLen(rlen);
405  set_bases_and_rowLens(base, rlen, rest...);
406  }
407 
408  // void take_snapshot() {
409  // memcpy(snapshot, const_cast<char*>(rows), rowLen * num_members);
410  // }
411 
412  // // returns snapshot == current
413  // bool compare_snapshot_and_current() {
414  // int res = memcmp(const_cast<char*>(rows), snapshot, rowLen * num_members);
415  // if(res == 0) {
416  // return true;
417  // }
418  // return false;
419  // }
420 };
421 
422 } /* namespace sst */
423 
424 #include "detail/sst_impl.hpp"
_SSTField(const size_t field_len)
Definition: sst.hpp:45
volatile T * operator[](const size_t &idx) const
Definition: sst.hpp:107
unsigned int get_local_index() const
Gets the index of the local row in the table.
Definition: sst.hpp:318
void __attribute__((noinline)) debug_print(size_t row_num)
Definition: sst.hpp:112
char * get_base()
Definition: sst.hpp:52
void put_with_completion(const std::vector< uint32_t > receiver_ranks)
Definition: sst.hpp:338
volatile char * char_p
Definition: sst.hpp:389
const uint32_t poll_cq_timeout_ms
timeout settings for poll completion queue
Definition: sst.hpp:185
const unsigned int num_members
Equal to members.size()
Definition: sst.hpp:194
unsigned int get_num_rows() const
Returns the total number of rows in the table.
Definition: sst.hpp:315
std::vector< uint32_t > all_indices
Definition: sst.hpp:195
std::mutex freeze_mutex
Mutex for failure detection and row freezing.
Definition: sst.hpp:213
A public-facing version of the internal _resources class that extends it with more convenient functio...
Definition: lf.hpp:115
Predicates< DerivedSST > predicates
Definition: sst.hpp:180
DerivedSST * derived_this
Definition: sst.hpp:172
uint32_t my_node_id
ID of this node in the system.
Definition: sst.hpp:201
failure_upcall_t failure_upcall
The function to call when a remote node appears to have failed.
Definition: sst.hpp:211
size_t rowLen
Length of each row in this SST, in bytes.
Definition: sst.hpp:190
Internal helper class, never exposed to the client.
Definition: sst.hpp:39
const uint32_t getConfUInt32(const std::string &key)
Definition: conf.cpp:118
size_t field_len
Definition: sst.hpp:43
void operator()(const size_t row_idx, T const v)
Definition: sst.hpp:83
bool thread_start
Indicates whether the predicate evaluation thread should start after being forked in the constructor...
Definition: sst.hpp:220
void put(size_t offset, size_t size)
Writes a contiguous subset of the local row to all remote nodes.
Definition: sst.hpp:343
uint32_t node_rank
Definition: experiment.cpp:45
void compute_rowLen(size_t &)
Definition: sst.hpp:391
unsigned int my_index
Index (row number) of this node in the SST.
Definition: sst.hpp:197
std::vector< std::thread > background_threads
Definition: sst.hpp:174
#define CONF_DERECHO_SST_POLL_CQ_TIMEOUT_MS
Definition: conf.hpp:31
Clients should use instances of this class with the appropriate template parameter to declare fields ...
Definition: sst.hpp:65
std::mutex thread_start_mutex
Mutex for thread_start_cv.
Definition: sst.hpp:222
SST(DerivedSST *derived_class_pointer, const SSTParams &params)
Definition: sst.hpp:227
void put(SSTFieldVector< T > &vec_field)
Writes a specific local vector field to all remote nodes.
Definition: sst.hpp:355
Clients should use instances of this class to declare vector-like fields in their SST; the template p...
Definition: sst.hpp:93
volatile T const & operator()(const size_t row_idx) const
Definition: sst.hpp:78
void set_rowLen(const size_t &_rowLen)
Definition: sst.hpp:56
const std::vector< uint32_t > & members
List of nodes in the SST; indexes are row numbers, values are node IDs.
Definition: sst.hpp:192
size_t rowLen
Definition: sst.hpp:42
void set_bases_and_rowLens(char_p &, const size_t)
Definition: sst.hpp:399
std::function< void(uint32_t)> failure_upcall_t
Definition: sst.hpp:121
void put_with_completion()
Definition: sst.hpp:329
volatile char * base
Definition: sst.hpp:41
std::map< uint32_t, int, std::greater< uint32_t > > members_by_id
Maps node IDs to SST row indexes.
Definition: sst.hpp:199
size_t size() const
Just like std::vector::size(), returns the number of elements in this vector.
Definition: sst.hpp:110
const char * getBaseAddress()
Definition: sst.hpp:320
std::atomic< bool > thread_shutdown
Definition: sst.hpp:175
SSTFieldVector(size_t num_elements)
Definition: sst.hpp:103
volatile T & operator[](const size_t row_idx) const
Definition: sst.hpp:75
void set_bases_and_rowLens(char_p &base, const size_t rlen, Field &f, Fields &... rest)
Definition: sst.hpp:402
void detect()
This function is run in a detached background thread to detect predicate events.
Definition: sst_impl.hpp:61
void put(SSTField< T > &field)
Writes a specific local field to all remote nodes.
Definition: sst.hpp:349
std::condition_variable thread_start_cv
Notified when the predicate evaluation thread should start.
Definition: sst.hpp:224
const bool start_predicate_thread
Definition: sst.hpp:129
Constructor parameter pack for SST.
Definition: sst.hpp:124
void put(const std::vector< uint32_t > receiver_ranks, SSTFieldVector< T > &vec_field, std::size_t index)
Writes only a single element of a vector field to only some of the remote nodes.
Definition: sst.hpp:371
const failure_upcall_t failure_upcall
Definition: sst.hpp:127
const std::vector< char > already_failed
Definition: sst.hpp:128
SSTParams(const std::vector< uint32_t > &_members, const uint32_t my_node_id, const failure_upcall_t failure_upcall=nullptr, const std::vector< char > already_failed={}, const bool start_predicate_thread=true)
Definition: sst.hpp:147
void compute_rowLen(size_t &rowLen, Field &f, Fields &... rest)
Definition: sst.hpp:394
void SSTInit(Fields &... fields)
Definition: sst.hpp:268
void init_SSTFields(Fields &... fields)
Definition: sst.hpp:163
void put_with_completion(size_t offset, size_t size)
Definition: sst.hpp:379
volatile char * rows
Pointer to memory where the SST rows are stored.
Definition: sst.hpp:187
size_t set_base(volatile char *const base)
Definition: sst.hpp:47
void put(SSTFieldVector< T > &vec_field, std::size_t index)
Writes only a single element of a vector field to all remote nodes.
Definition: sst.hpp:362
void put()
Writes the entire local row to all remote nodes.
Definition: sst.hpp:325
std::vector< std::unique_ptr< resources > > res_vec
RDMA resources vector, one for each member.
Definition: sst.hpp:216
constexpr size_t padded_len(const size_t &len)
Definition: sst.hpp:34
void put(const std::vector< uint32_t > receiver_ranks)
Writes the entire local row to some of the remote nodes.
Definition: sst.hpp:334
const int alignTo
Definition: sst.hpp:32
const size_t length
Definition: sst.hpp:95
const uint32_t my_node_id
Definition: sst.hpp:126
SSTField()
Definition: sst.hpp:71
std::vector< bool > row_is_frozen
Map of queue pair number to row.
Definition: sst.hpp:207
const std::vector< uint32_t > & members
Definition: sst.hpp:125