6 #include <condition_variable> 35 return (len < alignTo) ?
alignTo : (len +
alignTo) | (alignTo - 1);
45 _SSTField(
const size_t field_len) : base(nullptr), rowLen(0), field_len(field_len) {}
53 return const_cast<char*
>(
base);
56 void set_rowLen(
const size_t& _rowLen) { rowLen = _rowLen; }
78 volatile T
const&
operator()(
const size_t row_idx)
const {
110 size_t size()
const {
return length; }
113 volatile T* arr = (*this)[row_num];
114 for(
size_t j = 0; j < length; ++j) {
115 std::cout << arr[j] <<
" ";
117 std::cout << std::endl;
148 const uint32_t my_node_id,
149 const failure_upcall_t failure_upcall =
nullptr,
150 const std::vector<char> already_failed = {},
151 const bool start_predicate_thread =
true)
153 my_node_id(my_node_id),
154 failure_upcall(failure_upcall),
155 already_failed(already_failed),
156 start_predicate_thread(start_predicate_thread) {}
159 template <
class DerivedSST>
162 template <
typename... Fields>
165 compute_rowLen(
rowLen, fields...);
166 rows =
new char[
rowLen * num_members];
168 volatile char*
base = rows;
169 set_bases_and_rowLens(base,
rowLen, fields...);
216 std::vector<std::unique_ptr<resources>>
res_vec;
228 : derived_this(derived_class_pointer),
229 thread_shutdown(false),
231 members(params.members),
232 num_members(members.size()),
233 all_indices(num_members),
234 my_node_id(params.my_node_id),
235 row_is_frozen(num_members),
236 failure_upcall(params.failure_upcall),
237 res_vec(num_members),
238 thread_start(params.start_predicate_thread) {
241 for(uint32_t i = 0; i < num_members; ++i) {
242 if(members[i] == my_node_id) {
247 assert(my_index != (uint)-1);
249 std::iota(all_indices.begin(), all_indices.end(), 0);
253 for(
size_t index = 0; index < params.
already_failed.size(); ++index) {
255 row_is_frozen[index] =
true;
262 for(
unsigned int sst_index = 0; sst_index < num_members; ++sst_index) {
263 members_by_id[members[sst_index]] = sst_index;
267 template <
typename... Fields>
270 init_SSTFields(fields...);
274 for(
auto const& rank_index : members_by_id) {
275 std::tie(node_rank, sst_index) = rank_index;
276 char *write_addr, *read_addr;
277 write_addr =
const_cast<char*
>(rows) + rowLen * sst_index;
278 read_addr =
const_cast<char*
>(rows) + rowLen * my_index;
279 if(sst_index != my_index) {
280 if(row_is_frozen[sst_index]) {
284 res_vec[sst_index] = std::make_unique<resources>(
286 #else // use libfabric api by default 287 res_vec[sst_index] = std::make_unique<resources>(
296 background_threads.push_back(std::move(detector));
302 void start_predicate_evaluation();
305 void sync_with_members()
const;
308 void sync_with_members(std::vector<uint32_t> row_indices)
const;
312 void freeze(
int row_index);
321 return const_cast<char*
>(rows);
326 put(all_indices, 0, rowLen);
330 put_with_completion(all_indices, 0, rowLen);
334 void put(
const std::vector<uint32_t> receiver_ranks) {
335 put(receiver_ranks, 0, rowLen);
339 put_with_completion(receiver_ranks, 0, rowLen);
343 void put(
size_t offset,
size_t size) {
344 put(all_indices, offset, size);
348 template <
typename T>
350 put(all_indices, field.
get_base() - getBaseAddress(),
sizeof(field[0]));
354 template <
typename T>
356 put(all_indices, vec_field.
get_base() - getBaseAddress(),
357 sizeof(vec_field[0][0]) * vec_field.
size());
361 template <
typename T>
364 const_cast<char*>(reinterpret_cast<volatile char*>(std::addressof(vec_field[0][index])))
366 sizeof(vec_field[0][index]));
370 template <
typename T>
371 void put(
const std::vector<uint32_t> receiver_ranks,
374 const_cast<char*>(reinterpret_cast<volatile char*>(std::addressof(vec_field[0][index])))
376 sizeof(vec_field[0][index]));
380 put_with_completion(all_indices, offset, size);
384 void put(
const std::vector<uint32_t> receiver_ranks,
size_t offset,
size_t size);
386 void put_with_completion(
const std::vector<uint32_t> receiver_ranks,
size_t offset,
size_t size);
393 template <
typename Field,
typename... Fields>
396 compute_rowLen(rowLen, rest...);
401 template <
typename Field,
typename... Fields>
403 base += f.set_base(base);
405 set_bases_and_rowLens(base, rlen, rest...);
_SSTField(const size_t field_len)
volatile T * operator[](const size_t &idx) const
unsigned int get_local_index() const
Gets the index of the local row in the table.
void __attribute__((noinline)) debug_print(size_t row_num)
void put_with_completion(const std::vector< uint32_t > receiver_ranks)
const uint32_t poll_cq_timeout_ms
timeout settings for poll completion queue
const unsigned int num_members
Equal to members.size()
unsigned int get_num_rows() const
Returns the total number of rows in the table.
std::vector< uint32_t > all_indices
std::mutex freeze_mutex
Mutex for failure detection and row freezing.
A public-facing version of the internal _resources class that extends it with more convenient functio...
Predicates< DerivedSST > predicates
DerivedSST * derived_this
uint32_t my_node_id
ID of this node in the system.
failure_upcall_t failure_upcall
The function to call when a remote node appears to have failed.
size_t rowLen
Length of each row in this SST, in bytes.
Internal helper class, never exposed to the client.
const uint32_t getConfUInt32(const std::string &key)
void operator()(const size_t row_idx, T const v)
bool thread_start
Indicates whether the predicate evaluation thread should start after being forked in the constructor...
void put(size_t offset, size_t size)
Writes a contiguous subset of the local row to all remote nodes.
void compute_rowLen(size_t &)
unsigned int my_index
Index (row number) of this node in the SST.
std::vector< std::thread > background_threads
#define CONF_DERECHO_SST_POLL_CQ_TIMEOUT_MS
Clients should use instances of this class with the appropriate template parameter to declare fields ...
std::mutex thread_start_mutex
Mutex for thread_start_cv.
SST(DerivedSST *derived_class_pointer, const SSTParams ¶ms)
void put(SSTFieldVector< T > &vec_field)
Writes a specific local vector field to all remote nodes.
Clients should use instances of this class to declare vector-like fields in their SST; the template p...
volatile T const & operator()(const size_t row_idx) const
void set_rowLen(const size_t &_rowLen)
const std::vector< uint32_t > & members
List of nodes in the SST; indexes are row numbers, values are node IDs.
void set_bases_and_rowLens(char_p &, const size_t)
std::function< void(uint32_t)> failure_upcall_t
void put_with_completion()
std::map< uint32_t, int, std::greater< uint32_t > > members_by_id
Maps node IDs to SST row indexes.
size_t size() const
Just like std::vector::size(), returns the number of elements in this vector.
const char * getBaseAddress()
std::atomic< bool > thread_shutdown
SSTFieldVector(size_t num_elements)
volatile T & operator[](const size_t row_idx) const
void set_bases_and_rowLens(char_p &base, const size_t rlen, Field &f, Fields &... rest)
void detect()
This function is run in a detached background thread to detect predicate events.
void put(SSTField< T > &field)
Writes a specific local field to all remote nodes.
std::condition_variable thread_start_cv
Notified when the predicate evaluation thread should start.
const bool start_predicate_thread
Constructor parameter pack for SST.
void put(const std::vector< uint32_t > receiver_ranks, SSTFieldVector< T > &vec_field, std::size_t index)
Writes only a single element of a vector field to only some of the remote nodes.
const failure_upcall_t failure_upcall
const std::vector< char > already_failed
SSTParams(const std::vector< uint32_t > &_members, const uint32_t my_node_id, const failure_upcall_t failure_upcall=nullptr, const std::vector< char > already_failed={}, const bool start_predicate_thread=true)
void compute_rowLen(size_t &rowLen, Field &f, Fields &... rest)
void SSTInit(Fields &... fields)
void init_SSTFields(Fields &... fields)
void put_with_completion(size_t offset, size_t size)
volatile char * rows
Pointer to memory where the SST rows are stored.
size_t set_base(volatile char *const base)
void put(SSTFieldVector< T > &vec_field, std::size_t index)
Writes only a single element of a vector field to all remote nodes.
void put()
Writes the entire local row to all remote nodes.
std::vector< std::unique_ptr< resources > > res_vec
RDMA resources vector, one for each member.
constexpr size_t padded_len(const size_t &len)
void put(const std::vector< uint32_t > receiver_ranks)
Writes the entire local row to some of the remote nodes.
const uint32_t my_node_id
std::vector< bool > row_is_frozen
Map of queue pair number to row.
const std::vector< uint32_t > & members