Derecho  0.9
Distributed systems toolkit for RDMA
multicast_group.cpp
Go to the documentation of this file.
1 #include <algorithm>
2 #include <cassert>
3 #include <chrono>
4 #include <limits>
5 #include <thread>
6 
11 #include <derecho/utils/logger.hpp>
12 
13 namespace derecho {
14 
18 template <class T, class U>
19 size_t index_of(T container, U elem) {
20  size_t n = 0;
21  for(auto it = begin(container); it != end(container); ++it) {
22  if(*it == elem) return n;
23  n++;
24  }
25  return container.size();
26 }
27 
29  std::vector<node_id_t> _members, node_id_t my_node_id,
30  std::shared_ptr<DerechoSST> sst,
31  CallbackSet callbacks,
32  uint32_t total_num_subgroups,
33  const std::map<subgroup_id_t, SubgroupSettings>& subgroup_settings_by_id,
34  unsigned int sender_timeout,
35  const subgroup_post_next_version_func_t& post_next_version_callback,
36  const persistence_manager_callbacks_t& persistence_manager_callbacks,
37  std::vector<char> already_failed)
38  : members(_members),
39  num_members(members.size()),
40  member_index(index_of(members, my_node_id)),
41  callbacks(callbacks),
42  total_num_subgroups(total_num_subgroups),
43  subgroup_settings_map(subgroup_settings_by_id),
44  received_intervals(sst->num_received.size(), {-1, -1}),
52  sst(sst),
57  for(uint i = 0; i < num_members; ++i) {
59  }
60 
61  // for(const auto p : subgroup_settings_by_id) {
62  // subgroup_id_t id = p.first;
63  // const SubgroupSettings& settings = p.second;
64  // auto num_shard_members = settings.members.size();
65  // while(free_message_buffers[id].size() < settings.profile.window_size * num_shard_members) {
66  // free_message_buffers[id].emplace_back(settings.profile.max_msg_size);
67  // }
68  // }
69 
71  bool no_member_failed = true;
72  if(already_failed.size()) {
73  for(uint i = 0; i < num_members; ++i) {
74  if(already_failed[i]) {
75  no_member_failed = false;
76  break;
77  }
78  }
79  }
80  if(!already_failed.size() || no_member_failed) {
81  // if groups are created successfully, rdmc_sst_groups_created will be set to true
83  }
85  sender_thread = std::thread(&MulticastGroup::send_loop, this);
87 }
88 
90  std::vector<node_id_t> _members, node_id_t my_node_id,
91  std::shared_ptr<DerechoSST> sst,
92  MulticastGroup&& old_group,
93  uint32_t total_num_subgroups,
94  const std::map<subgroup_id_t, SubgroupSettings>& subgroup_settings_by_id,
97  std::vector<char> already_failed)
98  : members(_members),
99  num_members(members.size()),
100  member_index(index_of(members, my_node_id)),
101  callbacks(old_group.callbacks),
102  total_num_subgroups(total_num_subgroups),
103  subgroup_settings_map(subgroup_settings_by_id),
104  received_intervals(sst->num_received.size(), {-1, -1}),
105  rpc_callback(old_group.rpc_callback),
106  rdmc_group_num_offset(old_group.rdmc_group_num_offset + old_group.num_members),
112  sender_timeout(old_group.sender_timeout),
113  sst(sst),
118  // Make sure rdmc_group_num_offset didn't overflow.
119  assert(old_group.rdmc_group_num_offset <= std::numeric_limits<uint16_t>::max() - old_group.num_members - num_members);
120 
121  // Just in case
122  old_group.wedge();
123 
124  for(uint i = 0; i < num_members; ++i) {
126  }
127 
128  // Convience function that takes a msg from the old group and
129  // produces one suitable for this group.
130  auto convert_msg = [this](RDMCMessage& msg, subgroup_id_t subgroup_num) {
132  msg.index = future_message_indices[subgroup_num]++;
133  return std::move(msg);
134  };
135 
136  // Convience function that takes a msg from the old group and
137  // produces one suitable for this group.
138  auto convert_sst_msg = [this](SSTMessage& msg, subgroup_id_t subgroup_num) {
140  msg.index = future_message_indices[subgroup_num]++;
141  return std::move(msg);
142  };
143 
144  // Reclaim RDMCMessageBuffers from the old group, and supplement them with
145  // additional if the group has grown.
146  std::lock_guard<std::recursive_mutex> lock(old_group.msg_state_mtx);
147  for(const auto p : subgroup_settings_by_id) {
148  const subgroup_id_t subgroup_num = p.first;
149  // for later: don't move extra message buffers
150  free_message_buffers[subgroup_num].swap(old_group.free_message_buffers[subgroup_num]);
151  }
152 
153  for(auto& msg : old_group.current_receives) {
154  free_message_buffers[msg.first.first].push_back(std::move(msg.second.message_buffer));
155  }
156  old_group.current_receives.clear();
157 
158  // Assume that any locally stable messages failed. If we were the sender
159  // than re-attempt, otherwise discard. TODO: Presumably the ragged edge
160  // cleanup will want the chance to deliver some of these.
161  for(auto& p : old_group.locally_stable_rdmc_messages) {
162  if(p.second.size() == 0) {
163  continue;
164  }
165 
166  for(auto& q : p.second) {
167  if(q.second.sender_id == members[member_index]) {
168  pending_sends[p.first].push(convert_msg(q.second, p.first));
169  } else {
170  free_message_buffers[p.first].push_back(std::move(q.second.message_buffer));
171  }
172  }
173  }
174  old_group.locally_stable_rdmc_messages.clear();
175 
176  // for(const auto p : subgroup_settings_by_id) {
177  // subgroup_id_t id = p.first;
178  // const SubgroupSettings& settings = p.second;
179  // auto num_shard_members = settings.members.size();
180  // while(free_message_buffers[id].size() < settings.profile.window_size * num_shard_members) {
181  // free_message_buffers[id].emplace_back(settings.profile.max_msg_size);
182  // }
183  // }
184 
185  old_group.locally_stable_sst_messages.clear();
186 
187  // Any messages that were being sent should be re-attempted.
188  for(const auto& p : subgroup_settings_by_id) {
189  auto subgroup_num = p.first;
190  if(old_group.current_sends.size() > subgroup_num && old_group.current_sends[subgroup_num]) {
191  pending_sends[subgroup_num].push(convert_msg(*old_group.current_sends[subgroup_num], subgroup_num));
192  }
193 
194  if(old_group.pending_sends.size() > subgroup_num) {
195  while(!old_group.pending_sends[subgroup_num].empty()) {
196  pending_sends[subgroup_num].push(convert_msg(old_group.pending_sends[subgroup_num].front(), subgroup_num));
197  old_group.pending_sends[subgroup_num].pop();
198  }
199  }
200 
201  if(old_group.next_sends.size() > subgroup_num && old_group.next_sends[subgroup_num]) {
202  next_sends[subgroup_num] = convert_msg(*old_group.next_sends[subgroup_num], subgroup_num);
203  }
204 
205  for(auto& entry : old_group.non_persistent_messages[subgroup_num]) {
206  non_persistent_messages[subgroup_num].emplace(entry.first,
207  convert_msg(entry.second, subgroup_num));
208  }
209  old_group.non_persistent_messages.clear();
210  for(auto& entry : old_group.non_persistent_sst_messages[subgroup_num]) {
211  non_persistent_sst_messages[subgroup_num].emplace(entry.first,
212  convert_sst_msg(entry.second, subgroup_num));
213  }
214  old_group.non_persistent_sst_messages.clear();
215  }
216 
218  bool no_member_failed = true;
219  if(already_failed.size()) {
220  for(uint i = 0; i < num_members; ++i) {
221  if(already_failed[i]) {
222  no_member_failed = false;
223  break;
224  }
225  }
226  }
227  if(!already_failed.size() || no_member_failed) {
228  // if groups are created successfully, rdmc_sst_groups_created will be set to true
230  }
232  sender_thread = std::thread(&MulticastGroup::send_loop, this);
234 }
235 
237  for(const auto& p : subgroup_settings_map) {
238  uint32_t subgroup_num = p.first;
239  const SubgroupSettings& subgroup_settings = p.second;
240  const std::vector<node_id_t>& shard_members = subgroup_settings.members;
241  std::size_t num_shard_members = shard_members.size();
242  std::vector<int> shard_senders = subgroup_settings.senders;
243  uint32_t num_shard_senders = get_num_senders(shard_senders);
244  auto shard_sst_indices = get_shard_sst_indices(subgroup_num);
245 
246  sst_multicast_group_ptrs[subgroup_num] = std::make_unique<sst::multicast_group<DerechoSST>>(
247  sst, shard_sst_indices, subgroup_settings.profile.window_size, subgroup_settings.profile.sst_max_msg_size, subgroup_settings.senders,
248  subgroup_settings.num_received_offset, subgroup_settings.slot_offset);
249 
250  for(uint shard_rank = 0, sender_rank = -1; shard_rank < num_shard_members; ++shard_rank) {
251  // don't create RDMC group if the shard member is never going to send
252  if(!shard_senders[shard_rank]) {
253  continue;
254  }
255  sender_rank++;
256  node_id_t node_id = shard_members[shard_rank];
257  // When RDMC receives a message, it should store it in
258  // locally_stable_rdmc_messages and update the received count
259  rdmc::completion_callback_t rdmc_receive_handler;
260  rdmc_receive_handler = [this, subgroup_num, shard_rank, sender_rank,
261  subgroup_settings, node_id,
262  num_shard_senders,
263  shard_sst_indices](char* data, size_t size) {
264  assert(this->sst);
265  std::lock_guard<std::recursive_mutex> lock(msg_state_mtx);
266  header* h = (header*)data;
267  const int32_t index = h->index;
268  message_id_t sequence_number = index * num_shard_senders + sender_rank;
269 
270  dbg_default_trace("Locally received message in subgroup {}, sender rank {}, index {}",
271  subgroup_num, shard_rank, index);
272  // Move message from current_receives to locally_stable_rdmc_messages.
273  if(node_id == members[member_index]) {
274  assert(current_sends[subgroup_num]);
275  locally_stable_rdmc_messages[subgroup_num][sequence_number] = std::move(*current_sends[subgroup_num]);
276  current_sends[subgroup_num] = std::nullopt;
277  } else {
278  auto it = current_receives.find({subgroup_num, node_id});
279  assert(it != current_receives.end());
280  auto& msg = it->second;
281  msg.index = index;
282  // We set the size in this receive handler instead of in the incoming_message_handler
283  msg.size = size;
284  locally_stable_rdmc_messages[subgroup_num].emplace(sequence_number, std::move(msg));
285  current_receives.erase(it);
286  }
287 
288  auto new_num_received = resolve_num_received(index, subgroup_settings.num_received_offset + sender_rank);
289  /* NULL Send Scheme */
290  // only if I am a sender in the subgroup and the subgroup is not in UNORDERED mode
291  if(subgroup_settings.sender_rank >= 0 && subgroup_settings.mode != Mode::UNORDERED) {
292  if(subgroup_settings.sender_rank < (int)sender_rank) {
293  while(future_message_indices[subgroup_num] <= new_num_received) {
294  get_buffer_and_send_auto_null(subgroup_num);
295  }
296  } else if(subgroup_settings.sender_rank > (int)sender_rank) {
297  while(future_message_indices[subgroup_num] < new_num_received) {
298  get_buffer_and_send_auto_null(subgroup_num);
299  }
300  }
301  }
302 
303  // deliver immediately if in UNORDERED mode
304  if(subgroup_settings.mode == Mode::UNORDERED) {
305  // issue stability upcalls for the recently sequenced messages
306  for(int i = sst->num_received[member_index][subgroup_settings.num_received_offset + sender_rank] + 1;
307  i <= new_num_received; ++i) {
308  message_id_t seq_num = i * num_shard_senders + sender_rank;
309  if(!locally_stable_sst_messages[subgroup_num].empty()
310  && locally_stable_sst_messages[subgroup_num].begin()->first == seq_num) {
311  auto& msg = locally_stable_sst_messages[subgroup_num].begin()->second;
312  char* buf = const_cast<char*>(msg.buf);
313  header* h = (header*)(buf);
314  // no delivery callback for a NULL message
315  if(msg.size > h->header_size && callbacks.global_stability_callback) {
316  callbacks.global_stability_callback(subgroup_num, msg.sender_id,
317  msg.index,
318  {{buf + h->header_size, msg.size - h->header_size}},
320  }
321  if(node_id == members[member_index]) {
322  pending_message_timestamps[subgroup_num].erase(h->timestamp);
323  }
324  locally_stable_sst_messages[subgroup_num].erase(locally_stable_sst_messages[subgroup_num].begin());
325  } else {
326  assert(!locally_stable_rdmc_messages[subgroup_num].empty());
327  auto it2 = locally_stable_rdmc_messages[subgroup_num].begin();
328  assert(it2->first == seq_num);
329  auto& msg = it2->second;
330  char* buf = msg.message_buffer.buffer.get();
331  header* h = (header*)(buf);
332  // no delivery for a NULL message
333  if(msg.size > h->header_size && callbacks.global_stability_callback) {
334  callbacks.global_stability_callback(subgroup_num, msg.sender_id,
335  msg.index,
336  {{buf + h->header_size, msg.size - h->header_size}},
338  }
339  free_message_buffers[subgroup_num].push_back(std::move(msg.message_buffer));
340  if(node_id == members[member_index]) {
341  pending_message_timestamps[subgroup_num].erase(h->timestamp);
342  }
343  locally_stable_rdmc_messages[subgroup_num].erase(it2);
344  }
345  }
346  }
347  if(new_num_received > sst->num_received[member_index][subgroup_settings.num_received_offset + sender_rank]) {
348  sst->num_received[member_index][subgroup_settings.num_received_offset + sender_rank] = new_num_received;
349  // std::atomic_signal_fence(std::memory_order_acq_rel);
350  auto* min_ptr = std::min_element(&sst->num_received[member_index][subgroup_settings.num_received_offset],
351  &sst->num_received[member_index][subgroup_settings.num_received_offset + num_shard_senders]);
352  uint min_index = std::distance(&sst->num_received[member_index][subgroup_settings.num_received_offset], min_ptr);
353  auto new_seq_num = (*min_ptr + 1) * num_shard_senders + min_index - 1;
354  if(static_cast<message_id_t>(new_seq_num) > sst->seq_num[member_index][subgroup_num]) {
355  dbg_default_trace("Updating seq_num for subgroup {} to {}", subgroup_num, new_seq_num);
356  sst->seq_num[member_index][subgroup_num] = new_seq_num;
357  sst->put(shard_sst_indices,
358  sst->seq_num, subgroup_num);
359  }
360  sst->put(shard_sst_indices,
361  sst->num_received,
362  subgroup_settings.num_received_offset + sender_rank);
363  }
364  };
365  // Capture rdmc_receive_handler by copy! The reference to it won't be valid after this constructor ends!
366  auto receive_handler_plus_notify =
367  [this, rdmc_receive_handler](char* data, size_t size) {
368  rdmc_receive_handler(data, size);
369  // signal background writer thread
370  sender_cv.notify_all();
371  };
372 
373  // Create a "rotated" vector of members in which the currently selected shard member (shard_rank) is first
374  std::vector<uint32_t> rotated_shard_members(shard_members.size());
375  for(uint k = 0; k < num_shard_members; ++k) {
376  rotated_shard_members[k] = shard_members[(shard_rank + k) % num_shard_members];
377  }
378 
379  // don't create rdmc group if there's only one member in the shard
380  if(num_shard_members <= 1) {
381  singleton_shard_receive_handlers[subgroup_num] = receive_handler_plus_notify;
382  continue;
383  }
384 
385  if(node_id == members[member_index]) {
386  //Create a group in which this node is the sender, and only self-receives happen
387  if(!rdmc::create_group(
388  rdmc_group_num_offset, rotated_shard_members, subgroup_settings.profile.block_size, subgroup_settings.profile.rdmc_send_algorithm,
389  [](size_t length) -> rdmc::receive_destination {
390  assert_always(false);
391  return {nullptr, 0};
392  },
393  receive_handler_plus_notify,
394  [](std::optional<uint32_t>) {})) {
395  return false;
396  }
399  } else {
400  if(!rdmc::create_group(
401  rdmc_group_num_offset, rotated_shard_members, subgroup_settings.profile.block_size, subgroup_settings.profile.rdmc_send_algorithm,
402  [this, subgroup_num, node_id, max_msg_size=subgroup_settings.profile.max_msg_size](size_t length) {
403  std::lock_guard<std::recursive_mutex> lock(msg_state_mtx);
404  // assert(!free_message_buffers[subgroup_num].empty());
405  if(free_message_buffers[subgroup_num].empty()) {
406  free_message_buffers[subgroup_num].emplace_back(max_msg_size);
407  }
408  //Create a Message struct to receive the data into.
409  RDMCMessage msg;
410  msg.sender_id = node_id;
411  // The length variable is not the exact size of the msg,
412  // but it is the nearest multiple of the block size greater then the size
413  // so we will set the size in the receive handler
414  msg.message_buffer = std::move(free_message_buffers[subgroup_num].back());
415  free_message_buffers[subgroup_num].pop_back();
416 
418  current_receives[{subgroup_num, node_id}] = std::move(msg);
419 
420  assert(ret.mr->buffer != nullptr);
421  return ret;
422  },
423  rdmc_receive_handler, [](std::optional<uint32_t>) {})) {
424  return false;
425  }
427  }
428  }
429  }
430  return true;
431 }
432 
434  auto num_received_size = sst->num_received.size();
435  auto seq_num_size = sst->seq_num.size();
436  for(uint i = 0; i < num_members; ++i) {
437  for(uint j = 0; j < num_received_size; ++j) {
438  sst->num_received[i][j] = -1;
439  }
440  for(uint j = 0; j < seq_num_size; ++j) {
441  sst->seq_num[i][j] = -1;
442  sst->delivered_num[i][j] = -1;
443  sst->persisted_num[i][j] = -1;
444  }
445  }
446  sst->put();
447  sst->sync_with_members();
448 }
449 
451  const persistent::version_t& version,
452  const uint64_t& msg_ts_us) {
453  char* buf = msg.message_buffer.buffer.get();
454  header* h = (header*)(buf);
455  // cooked send
456  if(h->cooked_send) {
457  buf += h->header_size;
458  auto payload_size = msg.size - h->header_size;
459  post_next_version_callback(subgroup_num, version, msg_ts_us);
460  rpc_callback(subgroup_num, msg.sender_id, buf, payload_size);
462  callbacks.global_stability_callback(subgroup_num, msg.sender_id, msg.index, {},
463  version);
464  }
465  } else if(msg.size > h->header_size && callbacks.global_stability_callback) {
466  callbacks.global_stability_callback(subgroup_num, msg.sender_id, msg.index,
467  {{buf + h->header_size, msg.size - h->header_size}},
468  version);
469  }
470 }
471 
473  const persistent::version_t& version,
474  const uint64_t& msg_ts_us) {
475  char* buf = const_cast<char*>(msg.buf);
476  header* h = (header*)(buf);
477  // cooked send
478  if(h->cooked_send) {
479  buf += h->header_size;
480  auto payload_size = msg.size - h->header_size;
481  post_next_version_callback(subgroup_num, version, msg_ts_us);
482  rpc_callback(subgroup_num, msg.sender_id, buf, payload_size);
484  callbacks.global_stability_callback(subgroup_num, msg.sender_id, msg.index, {},
485  version);
486  }
487  } else if(msg.size > h->header_size && callbacks.global_stability_callback) {
488  callbacks.global_stability_callback(subgroup_num, msg.sender_id, msg.index,
489  {{buf + h->header_size, msg.size - h->header_size}},
490  version);
491  }
492 }
493 
495  const persistent::version_t& version, const uint64_t& msg_timestamp) {
496  char* buf = msg.message_buffer.buffer.get();
497  header* h = (header*)(buf);
498  // null message filter
499  if(msg.size == h->header_size) {
500  return false;
501  }
502  if(msg.sender_id == members[member_index]) {
503  pending_persistence[subgroup_num][locally_stable_rdmc_messages[subgroup_num].begin()->first] = msg_timestamp;
504  }
505  // make a version for persistent<t>/volatile<t>
506  uint64_t msg_ts_us = msg_timestamp / 1e3;
507  if(msg_ts_us == 0) {
508  struct timespec now;
509  clock_gettime(CLOCK_REALTIME, &now);
510  msg_ts_us = (uint64_t)now.tv_sec * 1e6 + now.tv_nsec / 1e3;
511  }
512  std::get<0>(persistence_manager_callbacks)(subgroup_num, version, HLC{msg_ts_us, 0});
513  return true;
514 }
515 
517  const persistent::version_t& version, const uint64_t& msg_timestamp) {
518  char* buf = const_cast<char*>(msg.buf);
519  header* h = (header*)(buf);
520  // null message filter
521  if(msg.size == h->header_size) {
522  return false;
523  }
524  if(msg.sender_id == members[member_index]) {
525  pending_persistence[subgroup_num][locally_stable_sst_messages[subgroup_num].begin()->first] = msg_timestamp;
526  }
527  // make a version for persistent<t>/volatile<t>
528  uint64_t msg_ts_us = msg_timestamp / 1e3;
529  if(msg_ts_us == 0) {
530  struct timespec now;
531  clock_gettime(CLOCK_REALTIME, &now);
532  msg_ts_us = (uint64_t)now.tv_sec * 1e6 + now.tv_nsec / 1e3;
533  }
534  std::get<0>(persistence_manager_callbacks)(subgroup_num, version, HLC{msg_ts_us, 0});
535  return true;
536 }
537 
539  const std::vector<int32_t>& max_indices_for_senders,
540  subgroup_id_t subgroup_num, uint32_t num_shard_senders) {
541  bool non_null_msgs_delivered = false;
542  assert(max_indices_for_senders.size() == (size_t)num_shard_senders);
543  std::lock_guard<std::recursive_mutex> lock(msg_state_mtx);
544  int32_t curr_seq_num = sst->delivered_num[member_index][subgroup_num];
545  int32_t max_seq_num = curr_seq_num;
546  for(uint sender = 0; sender < num_shard_senders; sender++) {
547  max_seq_num = std::max(max_seq_num,
548  static_cast<int32_t>(max_indices_for_senders[sender] * num_shard_senders + sender));
549  }
550  persistent::version_t assigned_version = INVALID_VERSION;
551  for(int32_t seq_num = curr_seq_num + 1; seq_num <= max_seq_num; seq_num++) {
552  //determine if this sequence number should actually be skipped
553  int32_t index = seq_num / num_shard_senders;
554  uint32_t sender_rank = seq_num % num_shard_senders;
555  if(index > max_indices_for_senders[sender_rank]) {
556  continue;
557  }
558  auto rdmc_msg_ptr = locally_stable_rdmc_messages[subgroup_num].find(seq_num);
559  assigned_version = persistent::combine_int32s(sst->vid[member_index], seq_num);
560  if(rdmc_msg_ptr != locally_stable_rdmc_messages[subgroup_num].end()) {
561  auto& msg = rdmc_msg_ptr->second;
562  char* buf = msg.message_buffer.buffer.get();
563  uint64_t msg_ts = ((header*)buf)->timestamp;
564  //Note: deliver_message frees the RDMC buffer in msg, which is why the timestamp must be saved before calling this
565  deliver_message(msg, subgroup_num, assigned_version, msg_ts/1000);
566  non_null_msgs_delivered |= version_message(msg, subgroup_num, assigned_version, msg_ts);
567  // free the message buffer only after it version_message has been called
568  free_message_buffers[subgroup_num].push_back(std::move(msg.message_buffer));
569  locally_stable_rdmc_messages[subgroup_num].erase(rdmc_msg_ptr);
570  } else {
571  dbg_default_trace("Subgroup {}, deliver_messages_upto delivering an SST message with seq_num = {}",
572  subgroup_num, seq_num);
573  auto& msg = locally_stable_sst_messages[subgroup_num].at(seq_num);
574  char* buf = (char*)msg.buf;
575  uint64_t msg_ts = ((header*)buf)->timestamp;
576  deliver_message(msg, subgroup_num, assigned_version, msg_ts/1000);
577  non_null_msgs_delivered |= version_message(msg, subgroup_num, assigned_version, msg_ts);
578  locally_stable_sst_messages[subgroup_num].erase(seq_num);
579  }
580  }
581  gmssst::set(sst->delivered_num[member_index][subgroup_num], max_seq_num);
582  sst->put(get_shard_sst_indices(subgroup_num),
583  sst->delivered_num, subgroup_num);
584  if(non_null_msgs_delivered) {
585  //Call the persistence_manager_post_persist_func
586  std::get<1>(persistence_manager_callbacks)(subgroup_num, assigned_version);
587  }
588 }
589 
590 int32_t MulticastGroup::resolve_num_received(int32_t index, uint32_t num_received_entry) {
591  auto it = received_intervals[num_received_entry].end();
592  it--;
593  while(*it > index) {
594  it--;
595  }
596  if(std::next(it) == received_intervals[num_received_entry].end()) {
597  if(*it == index - 1) {
598  *it = index;
599  } else {
600  received_intervals[num_received_entry].push_back(index);
601  received_intervals[num_received_entry].push_back(index);
602  }
603  } else {
604  auto next_it = std::next(it);
605  if(*it != index - 1) {
606  received_intervals[num_received_entry].insert(next_it, index);
607  if(*next_it != index + 1) {
608  received_intervals[num_received_entry].insert(next_it, index);
609  } else {
610  received_intervals[num_received_entry].erase(next_it);
611  }
612  } else {
613  if(*next_it != index + 1) {
614  received_intervals[num_received_entry].insert(next_it, index);
615  } else {
616  received_intervals[num_received_entry].erase(next_it);
617  }
618  received_intervals[num_received_entry].erase(it);
619  }
620  }
621  return *std::next(received_intervals[num_received_entry].begin());
622 }
623 
625  const std::map<uint32_t, uint32_t>& shard_ranks_by_sender_rank,
626  uint32_t num_shard_senders, const DerechoSST& sst) {
627  for(uint sender_count = 0; sender_count < num_shard_senders; ++sender_count) {
628  int32_t num_received = sst.num_received_sst[member_index][subgroup_settings.num_received_offset + sender_count] + 1;
629  uint32_t slot = num_received % subgroup_settings.profile.window_size;
630  if(static_cast<long long int>((uint64_t&)sst.slots[node_id_to_sst_index.at(subgroup_settings.members[shard_ranks_by_sender_rank.at(sender_count)])]
631  [subgroup_settings.slot_offset + (subgroup_settings.profile.sst_max_msg_size + 2 * sizeof(uint64_t)) * (slot + 1) - sizeof(uint64_t)])
632  == num_received / subgroup_settings.profile.window_size + 1) {
633  return true;
634  }
635  }
636  return false;
637 }
638 
639 void MulticastGroup::sst_receive_handler(subgroup_id_t subgroup_num, const SubgroupSettings& subgroup_settings,
640  const std::map<uint32_t, uint32_t>& shard_ranks_by_sender_rank,
641  uint32_t num_shard_senders, uint32_t sender_rank,
642  volatile char* data, uint64_t size) {
643  header* h = (header*)data;
644  const int32_t index = h->index;
645 
646  message_id_t sequence_number = index * num_shard_senders + sender_rank;
647  node_id_t node_id = subgroup_settings.members[shard_ranks_by_sender_rank.at(sender_rank)];
648 
649  locally_stable_sst_messages[subgroup_num][sequence_number] = {node_id, index, size, data};
650 
651  auto new_num_received = resolve_num_received(index, subgroup_settings.num_received_offset + sender_rank);
652  /* NULL Send Scheme */
653  // only if I am a sender in the subgroup and the subgroup is not in UNORDERED mode
654  if(subgroup_settings.sender_rank >= 0 && subgroup_settings.mode != Mode::UNORDERED) {
655  if(subgroup_settings.sender_rank < (int)sender_rank) {
656  while(future_message_indices[subgroup_num] <= new_num_received) {
657  get_buffer_and_send_auto_null(subgroup_num);
658  }
659  } else if(subgroup_settings.sender_rank > (int)sender_rank) {
660  while(future_message_indices[subgroup_num] < new_num_received) {
661  get_buffer_and_send_auto_null(subgroup_num);
662  }
663  }
664  }
665 
666  if(subgroup_settings.mode == Mode::UNORDERED) {
667  // issue stability upcalls for the recently sequenced messages
668  for(int i = sst->num_received[member_index][subgroup_settings.num_received_offset + sender_rank] + 1; i <= new_num_received; ++i) {
669  message_id_t seq_num = i * num_shard_senders + sender_rank;
670  if(!locally_stable_sst_messages[subgroup_num].empty()
671  && locally_stable_sst_messages[subgroup_num].begin()->first == seq_num) {
672  auto& msg = locally_stable_sst_messages[subgroup_num].begin()->second;
673  char* buf = const_cast<char*>(msg.buf);
674  header* h = (header*)(buf);
675  if(msg.size > h->header_size && callbacks.global_stability_callback) {
676  callbacks.global_stability_callback(subgroup_num, msg.sender_id,
677  msg.index,
678  {{buf + h->header_size, msg.size - h->header_size}},
680  }
681  if(node_id == members[member_index]) {
682  pending_message_timestamps[subgroup_num].erase(h->timestamp);
683  }
684  locally_stable_sst_messages[subgroup_num].erase(locally_stable_sst_messages[subgroup_num].begin());
685  } else {
686  assert(!locally_stable_rdmc_messages[subgroup_num].empty());
687  auto it2 = locally_stable_rdmc_messages[subgroup_num].begin();
688  assert(it2->first == seq_num);
689  auto& msg = it2->second;
690  char* buf = msg.message_buffer.buffer.get();
691  header* h = (header*)(buf);
692  if(msg.size > h->header_size && callbacks.global_stability_callback) {
693  callbacks.global_stability_callback(subgroup_num, msg.sender_id,
694  msg.index,
695  {{buf + h->header_size, msg.size - h->header_size}},
697  }
698  free_message_buffers[subgroup_num].push_back(std::move(msg.message_buffer));
699  if(node_id == members[member_index]) {
700  pending_message_timestamps[subgroup_num].erase(h->timestamp);
701  }
702  locally_stable_rdmc_messages[subgroup_num].erase(it2);
703  }
704  }
705  }
706  sst->num_received[member_index][subgroup_settings.num_received_offset + sender_rank] = new_num_received;
707 }
708 
709 void MulticastGroup::receiver_function(subgroup_id_t subgroup_num, const SubgroupSettings& subgroup_settings,
710  const std::map<uint32_t, uint32_t>& shard_ranks_by_sender_rank,
711  uint32_t num_shard_senders, DerechoSST& sst, unsigned int batch_size,
712  const std::function<void(uint32_t, volatile char*, uint32_t)>& sst_receive_handler_lambda) {
713  DerechoParams profile = subgroup_settings.profile;
714  const uint64_t slot_width = profile.sst_max_msg_size + 2 * sizeof(uint64_t);
715  std::lock_guard<std::recursive_mutex> lock(msg_state_mtx);
716  for(uint i = 0; i < batch_size; ++i) {
717  for(uint sender_count = 0; sender_count < num_shard_senders; ++sender_count) {
718  auto num_received = sst.num_received_sst[member_index][subgroup_settings.num_received_offset + sender_count] + 1;
719  const uint32_t slot = num_received % profile.window_size;
720  const uint32_t sender_sst_index = node_id_to_sst_index.at(
721  subgroup_settings.members[shard_ranks_by_sender_rank.at(sender_count)]);
722  const message_id_t next_seq = (uint64_t&)sst.slots[sender_sst_index]
723  [subgroup_settings.slot_offset + slot_width * (slot + 1) - sizeof(uint64_t)];
724  if(next_seq == num_received / static_cast<int32_t>(profile.window_size) + 1) {
725  dbg_default_trace("receiver_trig calling sst_receive_handler_lambda. next_seq = {}, num_received = {}, sender rank = {}. Reading from SST row {}, slot {}",
726  next_seq, num_received, sender_count, sender_sst_index, subgroup_settings.slot_offset + slot_width * slot);
727  sst_receive_handler_lambda(sender_count,
728  &sst.slots[sender_sst_index]
729  [subgroup_settings.slot_offset + slot_width * slot],
730  (uint64_t&)sst.slots[sender_sst_index]
731  [subgroup_settings.slot_offset + slot_width * (slot + 1) - 2 * sizeof(uint64_t)]);
732  sst.num_received_sst[member_index][subgroup_settings.num_received_offset + sender_count] = num_received;
733  }
734  }
735  }
736  sst.put((char*)std::addressof(sst.num_received_sst[0][subgroup_settings.num_received_offset]) - sst.getBaseAddress(),
737  sizeof(decltype(sst.num_received_sst)::value_type) * num_shard_senders);
738  // std::atomic_signal_fence(std::memory_order_acq_rel);
739  auto* min_ptr = std::min_element(&sst.num_received[member_index][subgroup_settings.num_received_offset],
740  &sst.num_received[member_index][subgroup_settings.num_received_offset + num_shard_senders]);
741  int min_index = std::distance(&sst.num_received[member_index][subgroup_settings.num_received_offset], min_ptr);
742  message_id_t new_seq_num = (*min_ptr + 1) * num_shard_senders + min_index - 1;
743  if(new_seq_num > sst.seq_num[member_index][subgroup_num]) {
744  dbg_default_trace("Updating seq_num for subgroup {} to {}", subgroup_num, new_seq_num);
745  sst.seq_num[member_index][subgroup_num] = new_seq_num;
746  sst.put(sst.seq_num, subgroup_num);
747  }
748  sst.put((char*)std::addressof(sst.num_received[0][subgroup_settings.num_received_offset]) - sst.getBaseAddress(),
749  sizeof(decltype(sst.num_received)::value_type) * num_shard_senders);
750 }
751 
752 void MulticastGroup::delivery_trigger(subgroup_id_t subgroup_num, const SubgroupSettings& subgroup_settings,
753  const uint32_t num_shard_members, DerechoSST& sst) {
754  std::lock_guard<std::recursive_mutex> lock(msg_state_mtx);
755  // compute the min of the seq_num
756  message_id_t min_stable_num
757  = sst.seq_num[node_id_to_sst_index.at(subgroup_settings.members[0])][subgroup_num];
758  for(uint i = 0; i < num_shard_members; ++i) {
759  // to avoid a race condition, do not read the same SST entry twice
760  message_id_t stable_num_copy = sst.seq_num[node_id_to_sst_index.at(subgroup_settings.members[i])][subgroup_num];
761  min_stable_num = std::min(min_stable_num, stable_num_copy);
762  }
763 
764  bool update_sst = false;
765  bool non_null_msgs_delivered = false;
766  persistent::version_t assigned_version = INVALID_VERSION;
767  while(true) {
768  if(locally_stable_rdmc_messages[subgroup_num].empty() && locally_stable_sst_messages[subgroup_num].empty()) {
769  break;
770  }
771  int32_t least_undelivered_rdmc_seq_num, least_undelivered_sst_seq_num;
772  least_undelivered_rdmc_seq_num = least_undelivered_sst_seq_num = std::numeric_limits<int32_t>::max();
773  if(!locally_stable_rdmc_messages[subgroup_num].empty()) {
774  least_undelivered_rdmc_seq_num = locally_stable_rdmc_messages[subgroup_num].begin()->first;
775  }
776  if(!locally_stable_sst_messages[subgroup_num].empty()) {
777  least_undelivered_sst_seq_num = locally_stable_sst_messages[subgroup_num].begin()->first;
778  }
779  if(least_undelivered_rdmc_seq_num < least_undelivered_sst_seq_num && least_undelivered_rdmc_seq_num <= min_stable_num) {
780  update_sst = true;
781  dbg_default_trace("Subgroup {}, can deliver a locally stable RDMC message: min_stable_num={} and least_undelivered_seq_num={}",
782  subgroup_num, min_stable_num, least_undelivered_rdmc_seq_num);
783  RDMCMessage& msg = locally_stable_rdmc_messages[subgroup_num].begin()->second;
784  char* buf = msg.message_buffer.buffer.get();
785  uint64_t msg_ts = ((header*)buf)->timestamp;
786  //Note: deliver_message frees the RDMC buffer in msg, which is why the timestamp must be saved before calling this
787  assigned_version = persistent::combine_int32s(sst.vid[member_index], least_undelivered_rdmc_seq_num);
788  deliver_message(msg, subgroup_num, assigned_version, msg_ts/1000);
789  non_null_msgs_delivered |= version_message(msg, subgroup_num, assigned_version, msg_ts);
790  // free the message buffer only after version_message has been called
791  free_message_buffers[subgroup_num].push_back(std::move(msg.message_buffer));
792  sst.delivered_num[member_index][subgroup_num] = least_undelivered_rdmc_seq_num;
793  locally_stable_rdmc_messages[subgroup_num].erase(locally_stable_rdmc_messages[subgroup_num].begin());
794  } else if(least_undelivered_sst_seq_num < least_undelivered_rdmc_seq_num && least_undelivered_sst_seq_num <= min_stable_num) {
795  update_sst = true;
796  dbg_default_trace("Subgroup {}, can deliver a locally stable SST message: min_stable_num={} and least_undelivered_seq_num={}",
797  subgroup_num, min_stable_num, least_undelivered_sst_seq_num);
798  SSTMessage& msg = locally_stable_sst_messages[subgroup_num].begin()->second;
799  char* buf = (char*)msg.buf;
800  uint64_t msg_ts = ((header*)buf)->timestamp;
801  assigned_version = persistent::combine_int32s(sst.vid[member_index], least_undelivered_sst_seq_num);
802  deliver_message(msg, subgroup_num, assigned_version, msg_ts/1000);
803  non_null_msgs_delivered |= version_message(msg, subgroup_num, assigned_version, msg_ts);
804  sst.delivered_num[member_index][subgroup_num] = least_undelivered_sst_seq_num;
805  locally_stable_sst_messages[subgroup_num].erase(locally_stable_sst_messages[subgroup_num].begin());
806  } else {
807  break;
808  }
809  }
810  if(update_sst) {
811  sst.put(get_shard_sst_indices(subgroup_num),
812  sst.delivered_num, subgroup_num);
813  // post persistence request for ordered mode.
814  if(non_null_msgs_delivered) {
815  std::get<1>(persistence_manager_callbacks)(subgroup_num, assigned_version);
816  }
817  }
818 }
820  for(const auto& p : subgroup_settings_map) {
821  subgroup_id_t subgroup_num = p.first;
822  const SubgroupSettings& subgroup_settings = p.second;
823  auto num_shard_members = subgroup_settings.members.size();
824  std::vector<int> shard_senders = subgroup_settings.senders;
825  auto num_shard_senders = get_num_senders(shard_senders);
826  std::map<uint32_t, uint32_t> shard_ranks_by_sender_rank;
827  for(uint j = 0, l = 0; j < num_shard_members; ++j) {
828  if(shard_senders[j]) {
829  shard_ranks_by_sender_rank[l] = j;
830  l++;
831  }
832  }
833 
834  auto receiver_pred = [this, subgroup_settings, shard_ranks_by_sender_rank, num_shard_senders](const DerechoSST& sst) {
835  return receiver_predicate(subgroup_settings,
836  shard_ranks_by_sender_rank, num_shard_senders, sst);
837  };
838  auto batch_size = subgroup_settings.profile.window_size / 2;
839  if(!batch_size) {
840  batch_size = 1;
841  }
842  auto sst_receive_handler_lambda = [this, subgroup_num, subgroup_settings, shard_ranks_by_sender_rank,
843  num_shard_senders](uint32_t sender_rank, volatile char* data, uint64_t size) {
844  sst_receive_handler(subgroup_num, subgroup_settings,
845  shard_ranks_by_sender_rank, num_shard_senders,
846  sender_rank, data, size);
847  };
848  auto receiver_trig = [this, subgroup_num, subgroup_settings, shard_ranks_by_sender_rank,
849  num_shard_senders, batch_size, sst_receive_handler_lambda](DerechoSST& sst) mutable {
850  receiver_function(subgroup_num, subgroup_settings,
851  shard_ranks_by_sender_rank, num_shard_senders, sst,
852  batch_size, sst_receive_handler_lambda);
853  };
854  receiver_pred_handles.emplace_back(sst->predicates.insert(receiver_pred, receiver_trig,
856 
857  if(subgroup_settings.mode != Mode::UNORDERED) {
858  auto delivery_pred = [this, subgroup_num, subgroup_settings, num_shard_members](const DerechoSST& sst) {
859  std::lock_guard<std::recursive_mutex> lock(msg_state_mtx);
860  // compute the min of the seq_num
861  message_id_t min_stable_num
862  = sst.seq_num[node_id_to_sst_index.at(subgroup_settings.members[0])][subgroup_num];
863  for(uint i = 0; i < num_shard_members; ++i) {
864  // to avoid a race condition, do not read the same SST entry twice
865  message_id_t stable_num_copy = sst.seq_num[node_id_to_sst_index.at(subgroup_settings.members[i])][subgroup_num];
866  min_stable_num = std::min(min_stable_num, stable_num_copy);
867  }
868 
869  return min_stable_num > sst.delivered_num[member_index][subgroup_num];
870  };
871  auto delivery_trig = [this, subgroup_num, subgroup_settings, num_shard_members](DerechoSST& sst) mutable {
872  delivery_trigger(subgroup_num, subgroup_settings, num_shard_members, sst);
873  };
874 
875  delivery_pred_handles.emplace_back(sst->predicates.insert(delivery_pred, delivery_trig,
877 
878  auto persistence_pred = [this, subgroup_num, subgroup_settings, num_shard_members,
879  version_seen = (persistent::version_t)INVALID_VERSION](const DerechoSST& sst) {
880  std::lock_guard<std::recursive_mutex> lock(msg_state_mtx);
881  // compute the min of the persisted_num
882  persistent::version_t min_persisted_num
883  = sst.persisted_num[node_id_to_sst_index.at(subgroup_settings.members[0])][subgroup_num];
884  for(uint i = 1; i < num_shard_members; ++i) {
885  persistent::version_t persisted_num_copy = sst.persisted_num[node_id_to_sst_index.at(subgroup_settings.members[i])][subgroup_num];
886  min_persisted_num = std::min(min_persisted_num, persisted_num_copy);
887  }
888  return (version_seen < min_persisted_num) && callbacks.global_persistence_callback;
889  };
890  auto persistence_trig = [this, subgroup_num, subgroup_settings, num_shard_members,
891  version_seen = (persistent::version_t)INVALID_VERSION](DerechoSST& sst) mutable {
892  std::lock_guard<std::recursive_mutex> lock(msg_state_mtx);
893  // compute the min of the persisted_num
894  persistent::version_t min_persisted_num
895  = sst.persisted_num[node_id_to_sst_index.at(subgroup_settings.members[0])][subgroup_num];
896  for(uint i = 1; i < num_shard_members; ++i) {
897  persistent::version_t persisted_num_copy = sst.persisted_num[node_id_to_sst_index.at(subgroup_settings.members[i])][subgroup_num];
898  min_persisted_num = std::min(min_persisted_num, persisted_num_copy);
899  }
900  // callbacks
901  if((version_seen < min_persisted_num) && callbacks.global_persistence_callback) {
902  callbacks.global_persistence_callback(subgroup_num, min_persisted_num);
903  version_seen = min_persisted_num;
904  }
905  };
906 
907  persistence_pred_handles.emplace_back(sst->predicates.insert(persistence_pred, persistence_trig, sst::PredicateType::RECURRENT));
908 
909  if(subgroup_settings.sender_rank >= 0) {
910  auto sender_pred = [this, subgroup_num, subgroup_settings, num_shard_members, num_shard_senders](const DerechoSST& sst) {
911  message_id_t seq_num = next_message_to_deliver[subgroup_num] * num_shard_senders + subgroup_settings.sender_rank;
912  for(uint i = 0; i < num_shard_members; ++i) {
913  if(sst.delivered_num[node_id_to_sst_index.at(subgroup_settings.members[i])][subgroup_num] < seq_num
914  || (sst.persisted_num[node_id_to_sst_index.at(subgroup_settings.members[i])][subgroup_num] < seq_num)) {
915  return false;
916  }
917  }
918  return true;
919  };
920  auto sender_trig = [this, subgroup_num](DerechoSST& sst) {
921  sender_cv.notify_all();
922  next_message_to_deliver[subgroup_num]++;
923  };
924  sender_pred_handles.emplace_back(sst->predicates.insert(sender_pred, sender_trig,
926  }
927  } else {
928  //This subgroup is in UNORDERED mode
929  if(subgroup_settings.sender_rank >= 0) {
930  auto sender_pred = [this, subgroup_num, subgroup_settings, num_shard_members](const DerechoSST& sst) {
931  for(uint i = 0; i < num_shard_members; ++i) {
932  uint32_t num_received_offset = subgroup_settings.num_received_offset;
933  if(sst.num_received[node_id_to_sst_index.at(subgroup_settings.members[i])][num_received_offset + subgroup_settings.sender_rank]
934  < static_cast<int32_t>(future_message_indices[subgroup_num] - 1 - subgroup_settings.profile.window_size)) {
935  return false;
936  }
937  }
938  return true;
939  };
940  auto sender_trig = [this](DerechoSST& sst) {
941  sender_cv.notify_all();
942  };
943  sender_pred_handles.emplace_back(sst->predicates.insert(sender_pred, sender_trig,
945  }
946  }
947  }
948 }
949 
951  wedge();
952  if(timeout_thread.joinable()) {
953  timeout_thread.join();
954  }
955 }
956 
958  bool thread_shutdown_existing = thread_shutdown.exchange(true);
959  if(thread_shutdown_existing) { // Wedge has already been called
960  return;
961  }
962 
963  //Consume and remove all the predicate handles
964  for(auto handle_iter = sender_pred_handles.begin(); handle_iter != sender_pred_handles.end();) {
965  sst->predicates.remove(*handle_iter);
966  handle_iter = sender_pred_handles.erase(handle_iter);
967  }
968  for(auto handle_iter = receiver_pred_handles.begin(); handle_iter != receiver_pred_handles.end();) {
969  sst->predicates.remove(*handle_iter);
970  handle_iter = receiver_pred_handles.erase(handle_iter);
971  }
972  for(auto handle_iter = delivery_pred_handles.begin(); handle_iter != delivery_pred_handles.end();) {
973  sst->predicates.remove(*handle_iter);
974  handle_iter = delivery_pred_handles.erase(handle_iter);
975  }
976  for(auto handle_iter = persistence_pred_handles.begin(); handle_iter != persistence_pred_handles.end();) {
977  sst->predicates.remove(*handle_iter);
978  handle_iter = persistence_pred_handles.erase(handle_iter);
979  }
980 
981  for(uint i = 0; i < num_members; ++i) {
983  }
984 
985  sender_cv.notify_all();
986  if(sender_thread.joinable()) {
987  sender_thread.join();
988  }
989 }
990 
992  pthread_setname_np(pthread_self(), "sender_thread");
993  subgroup_id_t subgroup_to_send = 0;
994  auto should_send_to_subgroup = [&](subgroup_id_t subgroup_num) {
996  return false;
997  }
998  if(pending_sends[subgroup_num].empty()) {
999  return false;
1000  }
1001  RDMCMessage& msg = pending_sends[subgroup_num].front();
1002  const SubgroupSettings& subgroup_settings = subgroup_settings_map.at(subgroup_num);
1003 
1004  int shard_sender_index = subgroup_settings.sender_rank;
1005  std::vector<int> shard_senders = subgroup_settings.senders;
1006  uint32_t num_shard_senders = get_num_senders(shard_senders);
1007  assert(shard_sender_index >= 0);
1008 
1009  if(sst->num_received[member_index][subgroup_settings.num_received_offset + shard_sender_index] < msg.index - 1) {
1010  return false;
1011  }
1012 
1013  std::vector<node_id_t> shard_members = subgroup_settings.members;
1014  auto num_shard_members = shard_members.size();
1015  assert(num_shard_members >= 1);
1016  if(subgroup_settings.mode != Mode::UNORDERED) {
1017  for(uint i = 0; i < num_shard_members; ++i) {
1018  if(sst->delivered_num[node_id_to_sst_index.at(shard_members[i])][subgroup_num] < static_cast<message_id_t>((msg.index - subgroup_settings.profile.window_size) * num_shard_senders + shard_sender_index)
1019  || (sst->persisted_num[node_id_to_sst_index.at(shard_members[i])][subgroup_num] < static_cast<message_id_t>((msg.index - subgroup_settings.profile.window_size) * num_shard_senders + shard_sender_index))) {
1020  return false;
1021  }
1022  }
1023  } else {
1024  for(uint i = 0; i < num_shard_members; ++i) {
1025  auto num_received_offset = subgroup_settings.num_received_offset;
1026  if(sst->num_received[node_id_to_sst_index.at(shard_members[i])][num_received_offset + shard_sender_index]
1027  < static_cast<int32_t>(future_message_indices[subgroup_num] - 1 - subgroup_settings.profile.window_size)) {
1028  return false;
1029  }
1030  }
1031  }
1032 
1033  return true;
1034  };
1035  auto should_send = [&]() {
1036  for(uint i = 1; i <= total_num_subgroups; ++i) {
1037  auto subgroup_num = (subgroup_to_send + i) % total_num_subgroups;
1038  if(should_send_to_subgroup(subgroup_num)) {
1039  subgroup_to_send = subgroup_num;
1040  return true;
1041  }
1042  }
1043  return false;
1044  };
1045  auto should_wake = [&]() { return thread_shutdown || should_send(); };
1046  std::unique_lock<std::recursive_mutex> lock(msg_state_mtx);
1047  while(!thread_shutdown) {
1048  sender_cv.wait(lock, should_wake);
1049  if(!thread_shutdown) {
1050  current_sends[subgroup_to_send] = std::move(pending_sends[subgroup_to_send].front());
1051  dbg_default_trace("Calling send in subgroup {} on message {} from sender {}",
1052  subgroup_to_send, current_sends[subgroup_to_send]->index, current_sends[subgroup_to_send]->sender_id);
1053  // make sure there are > 1 members before issuing RDMC send
1054  if(subgroup_settings_map.at(subgroup_to_send).members.size() > 1) {
1055  if(!rdmc::send(subgroup_to_rdmc_group.at(subgroup_to_send),
1056  current_sends[subgroup_to_send]->message_buffer.mr, 0,
1057  current_sends[subgroup_to_send]->size)) {
1058  throw std::runtime_error("rdmc::send returned false");
1059  }
1060  } else {
1061  // receive the message right here
1062  singleton_shard_receive_handlers.at(subgroup_to_send)(
1063 current_sends[subgroup_to_send]->message_buffer.buffer.get(), current_sends[subgroup_to_send]->size);
1064  }
1065  pending_sends[subgroup_to_send].pop();
1066  }
1067  }
1068 }
1069 
1071  struct timespec start_time;
1072  clock_gettime(CLOCK_REALTIME, &start_time);
1073  return start_time.tv_sec * 1e9 + start_time.tv_nsec;
1074 }
1075 
1076 const uint64_t MulticastGroup::compute_global_stability_frontier(uint32_t subgroup_num) {
1077  uint64_t global_stability_frontier = sst->local_stability_frontier[member_index][subgroup_num];
1078  auto shard_sst_indices = get_shard_sst_indices(subgroup_num);
1079  for(auto index : shard_sst_indices) {
1080  uint64_t local_stability_frontier_copy = sst->local_stability_frontier[index][subgroup_num];
1081  global_stability_frontier = std::min(global_stability_frontier, local_stability_frontier_copy);
1082  }
1083  return global_stability_frontier;
1084 }
1085 
1087  pthread_setname_np(pthread_self(), "timeout_thread");
1088  while(!thread_shutdown) {
1089  std::this_thread::sleep_for(std::chrono::milliseconds(sender_timeout));
1090  if(sst) {
1091  std::unique_lock<std::recursive_mutex> lock(msg_state_mtx);
1092  auto current_time = get_time();
1093  for(auto p : subgroup_settings_map) {
1094  auto subgroup_num = p.first;
1095  auto members = p.second.members;
1096  auto sst_indices = get_shard_sst_indices(subgroup_num);
1097  // clean up timestamps of persisted messages
1098  auto min_persisted_num = sst->persisted_num[member_index][subgroup_num];
1099  for(auto i : sst_indices) {
1100  persistent::version_t persisted_num_copy = sst->persisted_num[i][subgroup_num];
1101  min_persisted_num = std::min(min_persisted_num, persisted_num_copy);
1102  }
1103  while(!pending_persistence[subgroup_num].empty() && pending_persistence[subgroup_num].begin()->first <= min_persisted_num) {
1104  auto timestamp = pending_persistence[subgroup_num].begin()->second;
1105  pending_persistence[subgroup_num].erase(pending_persistence[subgroup_num].begin());
1106  pending_message_timestamps[subgroup_num].erase(timestamp);
1107  }
1108  if(pending_message_timestamps[subgroup_num].empty()) {
1109  sst->local_stability_frontier[member_index][subgroup_num] = current_time;
1110  } else {
1111  sst->local_stability_frontier[member_index][subgroup_num] = std::min(current_time,
1112  *pending_message_timestamps[subgroup_num].begin());
1113  }
1114  }
1115  sst->put_with_completion((char*)std::addressof(sst->local_stability_frontier[0][0]) - sst->getBaseAddress(),
1116  sizeof(sst->local_stability_frontier[0][0]) * sst->local_stability_frontier.size());
1117  }
1118  }
1119 }
1120 
1121 // we already hold the lock on msg_state_mtx when we call this
1123  // short-circuits most of the normal checks because
1124  // we know that we received a message and are sending a null
1125  long long unsigned int msg_size = sizeof(header);
1126  const DerechoParams& profile = subgroup_settings_map.at(subgroup_num).profile;
1127  // very unlikely that msg_size does not fit in the max_msg_size since we are sending a NULL
1128  // but the user might not be interested in using SSTMC at all, then sst::max_msg_size can be zero
1129  if(msg_size > profile.sst_max_msg_size) {
1130  // Create new Message
1131  RDMCMessage msg;
1133  msg.index = future_message_indices[subgroup_num];
1134  msg.size = msg_size;
1135  if(free_message_buffers[subgroup_num].empty()) {
1136  free_message_buffers[subgroup_num].emplace_back(profile.max_msg_size);
1137  }
1138  msg.message_buffer = std::move(free_message_buffers[subgroup_num].back());
1139  free_message_buffers[subgroup_num].pop_back();
1140 
1141  auto current_time = get_time();
1142  pending_message_timestamps[subgroup_num].insert(current_time);
1143 
1144  // Fill header
1145  char* buf = msg.message_buffer.buffer.get();
1146  ((header*)buf)->header_size = sizeof(header);
1147  ((header*)buf)->index = msg.index;
1148  ((header*)buf)->timestamp = current_time;
1149  ((header*)buf)->cooked_send = false;
1150 
1151  future_message_indices[subgroup_num]++;
1152  pending_sends[subgroup_num].push(std::move(msg));
1153  sender_cv.notify_all();
1154  } else {
1155  char* buf = (char*)sst_multicast_group_ptrs[subgroup_num]->get_buffer(msg_size);
1156 
1157  assert(buf);
1158 
1159  auto current_time = get_time();
1160  pending_message_timestamps[subgroup_num].insert(current_time);
1161 
1162  ((header*)buf)->header_size = sizeof(header);
1163  ((header*)buf)->index = future_message_indices[subgroup_num];
1164  ((header*)buf)->timestamp = current_time;
1165  ((header*)buf)->cooked_send = false;
1166 
1167  future_message_indices[subgroup_num]++;
1168  sst_multicast_group_ptrs[subgroup_num]->send();
1169  }
1170 }
1171 
1173  long long unsigned int payload_size,
1174  bool cooked_send) {
1175  long long unsigned int msg_size = payload_size + sizeof(header);
1176  const SubgroupSettings& subgroup_settings = subgroup_settings_map.at(subgroup_num);
1177  if(msg_size > subgroup_settings.profile.max_msg_size) {
1178  std::string exp_msg("Can't send messages of size larger than the maximum message size which is equal to ");
1179  exp_msg += subgroup_settings.profile.max_msg_size;
1180  throw derecho_exception(exp_msg);
1181  }
1182 
1183  std::vector<node_id_t> shard_members = subgroup_settings.members;
1184  auto num_shard_members = shard_members.size();
1185  // if the current node is not a sender, shard_sender_index will be -1
1186  uint32_t num_shard_senders;
1187  std::vector<int> shard_senders = subgroup_settings.senders;
1188  int shard_sender_index = subgroup_settings.sender_rank;
1189  num_shard_senders = get_num_senders(shard_senders);
1190  assert(shard_sender_index >= 0);
1191 
1192  if(subgroup_settings.mode != Mode::UNORDERED) {
1193  for(uint i = 0; i < num_shard_members; ++i) {
1194  if(sst->delivered_num[node_id_to_sst_index.at(shard_members[i])][subgroup_num]
1195  < static_cast<int32_t>((future_message_indices[subgroup_num] - subgroup_settings.profile.window_size) * num_shard_senders + shard_sender_index)) {
1196  return nullptr;
1197  }
1198  }
1199  } else {
1200  for(uint i = 0; i < num_shard_members; ++i) {
1201  auto num_received_offset = subgroup_settings.num_received_offset;
1202  if(sst->num_received[node_id_to_sst_index.at(shard_members[i])][num_received_offset + shard_sender_index]
1203  < static_cast<int32_t>(future_message_indices[subgroup_num] - subgroup_settings.profile.window_size)) {
1204  return nullptr;
1205  }
1206  }
1207  }
1208 
1209  if(msg_size > subgroup_settings.profile.sst_max_msg_size) {
1210  if(thread_shutdown) {
1211  return nullptr;
1212  }
1213 
1214  if(free_message_buffers[subgroup_num].empty()) {
1215  free_message_buffers[subgroup_num].emplace_back(
1216  subgroup_settings.profile.max_msg_size);
1217  }
1218 
1219  if(pending_sst_sends[subgroup_num] || next_sends[subgroup_num]) {
1220  return nullptr;
1221  }
1222 
1223  // Create new Message
1224  RDMCMessage msg;
1226  msg.index = future_message_indices[subgroup_num];
1227  msg.size = msg_size;
1228  msg.message_buffer = std::move(free_message_buffers[subgroup_num].back());
1229  free_message_buffers[subgroup_num].pop_back();
1230 
1231  auto current_time = get_time();
1232  pending_message_timestamps[subgroup_num].insert(current_time);
1233 
1234  // Fill header
1235  char* buf = msg.message_buffer.buffer.get();
1236  ((header*)buf)->header_size = sizeof(header);
1237  ((header*)buf)->index = msg.index;
1238  ((header*)buf)->timestamp = current_time;
1239  ((header*)buf)->cooked_send = cooked_send;
1240 
1241  next_sends[subgroup_num] = std::move(msg);
1242  future_message_indices[subgroup_num]++;
1243 
1244  last_transfer_medium[subgroup_num] = true;
1245  return buf + sizeof(header);
1246  } else {
1247  if(pending_sst_sends[subgroup_num] || next_sends[subgroup_num]) {
1248  return nullptr;
1249  }
1250 
1251  pending_sst_sends[subgroup_num] = true;
1252  if(thread_shutdown) {
1253  pending_sst_sends[subgroup_num] = false;
1254  return nullptr;
1255  }
1256  char* buf = (char*)sst_multicast_group_ptrs[subgroup_num]->get_buffer(msg_size);
1257  if(!buf) {
1258  pending_sst_sends[subgroup_num] = false;
1259  return nullptr;
1260  }
1261  auto current_time = get_time();
1262  pending_message_timestamps[subgroup_num].insert(current_time);
1263 
1264  ((header*)buf)->header_size = sizeof(header);
1265  ((header*)buf)->index = future_message_indices[subgroup_num];
1266  ((header*)buf)->timestamp = current_time;
1267  ((header*)buf)->cooked_send = cooked_send;
1268  future_message_indices[subgroup_num]++;
1269  dbg_default_trace("Subgroup {}: get_sendbuffer_ptr increased future_message_indices to {}",
1270  subgroup_num, future_message_indices[subgroup_num]);
1271 
1272  last_transfer_medium[subgroup_num] = false;
1273  return buf + sizeof(header);
1274  }
1275 }
1276 
1277 bool MulticastGroup::send(subgroup_id_t subgroup_num, long long unsigned int payload_size,
1278  const std::function<void(char* buf)>& msg_generator, bool cooked_send) {
1280  return false;
1281  }
1282  std::unique_lock<std::recursive_mutex> lock(msg_state_mtx);
1283 
1284  char* buf = get_sendbuffer_ptr(subgroup_num, payload_size, cooked_send);
1285  while(!buf) {
1286  // Don't want any deadlocks. For example, this thread cannot get a buffer because delivery is lagging
1287  // but the SST detect thread cannot proceed (and deliver) because it requires the same lock
1288  // do not use defer_lock in the unique_lock declaration above and move unlock to the end of the loop.
1289  // That will cause a bug. We want to unlock only when we are sure that buf is nullptr.
1290  lock.unlock();
1291  if(thread_shutdown) {
1292  return false;
1293  }
1294  lock.lock();
1295  buf = get_sendbuffer_ptr(subgroup_num, payload_size, cooked_send);
1296  }
1297 
1298  // call to the user supplied message generator
1299  msg_generator(buf);
1300 
1301  if(last_transfer_medium[subgroup_num]) {
1302  assert(next_sends[subgroup_num]);
1303  pending_sends[subgroup_num].push(std::move(*next_sends[subgroup_num]));
1304  next_sends[subgroup_num] = std::nullopt;
1305  sender_cv.notify_all();
1306  return true;
1307  } else {
1308  sst_multicast_group_ptrs[subgroup_num]->send();
1309  pending_sst_sends[subgroup_num] = false;
1310  return true;
1311  }
1312 }
1313 
1315  std::lock_guard<std::recursive_mutex> lock(msg_state_mtx);
1316  return pending_sst_sends[subgroup_num];
1317 }
1318 
1319 std::vector<uint32_t> MulticastGroup::get_shard_sst_indices(subgroup_id_t subgroup_num) {
1320  std::vector<node_id_t> shard_members = subgroup_settings_map.at(subgroup_num).members;
1321 
1322  std::vector<uint32_t> shard_sst_indices;
1323  for(auto m : shard_members) {
1324  shard_sst_indices.push_back(node_id_to_sst_index.at(m));
1325  }
1326  return shard_sst_indices;
1327 }
1328 
1330  using std::cout;
1331  using std::endl;
1332  cout << "SST has " << sst->get_num_rows()
1333  << " rows; member_index is " << member_index << endl;
1334  cout << "Printing SST" << endl;
1335  for(const auto& p : subgroup_settings_map) {
1336  uint32_t subgroup_num = p.first;
1337  auto subgroup_settings = p.second;
1338  cout << "Subgroup " << subgroup_num << endl;
1339  auto shard_sst_indices = get_shard_sst_indices(subgroup_num);
1340  cout << "Printing seq_num, delivered_num" << endl;
1341  for(auto i : shard_sst_indices) {
1342  cout << sst->seq_num[i][subgroup_num] << " " << sst->delivered_num[i][subgroup_num] << endl;
1343  }
1344  cout << "Printing last_received_messages" << endl;
1345  for(auto i : shard_sst_indices) {
1346  uint32_t num_shard_senders = subgroup_settings.senders.size();
1347  for(uint j = 0; j < num_shard_senders; ++j) {
1348  cout << sst->num_received[i][subgroup_settings.num_received_offset + j] << " ";
1349  }
1350  cout << endl;
1351  }
1352  cout << "Printing multicastSST fields" << endl;
1353  sst_multicast_group_ptrs[subgroup_num]->debug_print();
1354  cout << endl;
1355  }
1356 
1357  std::cout << "Printing memory usage of free_message_buffers" << std::endl;
1358  for(const auto& p : free_message_buffers) {
1359  std::cout << "Subgroup " << p.first << ", Number of free buffers " << p.second.size() << std::endl;
1360  }
1361 }
1362 
1363 } // namespace derecho
uint32_t subgroup_id_t
Type alias for the internal Subgroup IDs generated by ViewManager.
std::map< subgroup_id_t, uint32_t > subgroup_to_rdmc_group
Maps subgroup IDs for which this node is a sender to the RDMC group it should use to send...
std::map< subgroup_id_t, std::map< message_id_t, SSTMessage > > non_persistent_sst_messages
Messages that are currently being written to persistent storage.
const int member_index
index of the local node in the members vector, which should also be its row index in the SST ...
MulticastGroup(std::vector< node_id_t > members, node_id_t my_node_id, std::shared_ptr< DerechoSST > sst, CallbackSet callbacks, uint32_t total_num_subgroups, const std::map< subgroup_id_t, SubgroupSettings > &subgroup_settings_by_id, unsigned int sender_timeout, const subgroup_post_next_version_func_t &post_next_version_callback, const persistence_manager_callbacks_t &persistence_manager_callbacks, std::vector< char > already_failed={})
Standard constructor for setting up a MulticastGroup for the first time.
std::condition_variable_any sender_cv
long long unsigned int size
The message&#39;s size in bytes.
std::vector< std::queue< RDMCMessage > > pending_sends
Messages that are ready to be sent, but must wait until the current send finishes.
std::map< uint32_t, bool > pending_sst_sends
std::vector< uint32_t > get_shard_sst_indices(subgroup_id_t subgroup_num)
uint32_t sender_id
The unique node ID of the message&#39;s sender.
uint16_t rdmc_group_num_offset
Offset to add to member ranks to form RDMC group numbers.
uint32_t sender_id
The unique node ID of the message&#39;s sender.
std::vector< int > senders
The "is_sender" flags for members of the subgroup.
pred_handle insert(pred predicate, trig trigger, PredicateType type=PredicateType::ONE_TIME)
Inserts a single (predicate, trigger) pair to the appropriate predicate list.
Definition: predicates.hpp:107
void get_buffer_and_send_auto_null(subgroup_id_t subgroup_num)
Bundles together a set of low-level parameters for configuring Derecho groups.
unsigned int get_num_rows() const
Returns the total number of rows in the table.
Definition: sst.hpp:315
SSTFieldVector< char > slots
for SST multicast
void check_failures_loop()
Checks for failures when a sender reaches its timeout.
Predicates< DerivedSST > predicates
Definition: sst.hpp:180
const std::map< subgroup_id_t, SubgroupSettings > subgroup_settings_map
Maps subgroup IDs (for subgroups this node is a member of) to an immutable set of configuration optio...
unsigned int sender_timeout
The time, in milliseconds, that a sender can wait to send a message before it is considered failed...
subgroup_post_next_version_func_t post_next_version_callback
post the next version to a subgroup just before deliver a message so that the user code know the curr...
SSTField< int32_t > vid
View ID associated with this SST.
Definition: derecho_sst.hpp:53
A structure containing an RDMC message (which consists of some bytes in a registered memory region) a...
const unsigned int num_members
number of members
std::map< subgroup_id_t, std::function< void(char *, size_t)> > singleton_shard_receive_handlers
Receiver lambdas for shards that have only one member.
SSTFieldVector< message_id_t > seq_num
Sequence numbers are interpreted like a row-major pair: (sender, index) becomes sender + num_members ...
Definition: derecho_sst.hpp:37
void wedge()
Stops all sending and receiving in this group, in preparation for shutting it down.
std::vector< std::unique_ptr< sst::multicast_group< DerechoSST > > > sst_multicast_group_ptrs
The SSTs for multicasts.
void remove(pred_handle &pred)
Removes a (predicate, trigger) pair previously registered with insert().
Definition: predicates.hpp:126
MessageBuffer message_buffer
The MessageBuffer that contains the message&#39;s body.
SSTFieldVector< persistent::version_t > persisted_num
This represents the highest persistent version number that has been persisted to disk at this node...
Definition: derecho_sst.hpp:49
std::list< pred_handle > persistence_pred_handles
uint32_t num_received_offset
The offset of this node&#39;s num_received counter within the subgroup&#39;s SST section. ...
std::size_t index_of(const Container &container, const typename Container::value_type &elem)
Finds a value in a STL container, and returns the index of that value in the container.
std::map< subgroup_id_t, std::map< message_id_t, RDMCMessage > > non_persistent_messages
Messages that are currently being written to persistent storage.
void destroy_group(uint16_t group_number)
Definition: rdmc.cpp:91
std::recursive_mutex msg_state_mtx
void set(volatile Elem &e, const Elem &value)
Thread-safe setter for DerechoSST members; ensures there is a std::atomic_signal_fence after writing ...
bool send(uint16_t group_number, std::shared_ptr< rdma::memory_region > mr, size_t offset, size_t length) __attribute__((warn_unused_result))
std::vector< std::optional< RDMCMessage > > next_sends
next_message is the message that will be sent when send is called the next time.
std::vector< node_id_t > members
vector of member id&#39;s
int32_t index
The message&#39;s index (relative to other messages sent by that sender).
void deliver_messages_upto(const std::vector< int32_t > &max_indices_for_senders, subgroup_id_t subgroup_num, uint32_t num_shard_senders)
message_id_t index
The message&#39;s index (relative to other messages sent by that sender).
void send_loop()
Continuously waits for a new pending send, then sends it.
SSTFieldVector< message_id_t > delivered_num
This represents the highest sequence number that has been delivered at this node. ...
Definition: derecho_sst.hpp:43
rpc_handler_t rpc_callback
These two callbacks are internal, not exposed to clients, so they&#39;re not in CallbackSet.
const uint64_t compute_global_stability_frontier(subgroup_id_t subgroup_num)
uint32_t get_num_senders(const std::vector< int > &shard_senders)
std::vector< message_id_t > next_message_to_deliver
bool receiver_predicate(const SubgroupSettings &subgroup_settings, const std::map< uint32_t, uint32_t > &shard_ranks_by_sender_rank, uint32_t num_shard_senders, const DerechoSST &sst)
int32_t message_id_t
Type alias for a message&#39;s unique "sequence number" or index.
bool rdmc_sst_groups_created
false if RDMC groups haven&#39;t been created successfully
char * get_sendbuffer_ptr(subgroup_id_t subgroup_num, long long unsigned int payload_size, bool cooked_send)
bool check_pending_sst_sends(subgroup_id_t subgroup_num)
#define INVALID_VERSION
Definition: PersistLog.hpp:28
std::map< subgroup_id_t, std::set< uint64_t > > pending_message_timestamps
const CallbackSet callbacks
Message-delivery event callbacks, supplied by the client, for "raw" sends.
Bundles together a set of callback functions for message delivery events.
std::thread sender_thread
The background thread that sends messages with RDMC.
std::vector< node_id_t > members
The members of the subgroup.
Mode mode
The operation mode of the subgroup.
void put_with_completion()
Definition: sst.hpp:329
SSTFieldVector< int32_t > num_received_sst
#define dbg_default_trace(...)
Definition: logger.hpp:40
SSTFieldVector< int32_t > num_received
Local count of number of received messages by sender.
uint32_t slot_offset
The offset of this node&#39;s slot within the subgroup&#39;s SST section.
std::map< subgroup_id_t, std::map< message_id_t, SSTMessage > > locally_stable_sst_messages
Same map as locally_stable_rdmc_messages, but for SST messages.
size_t size() const
Just like std::vector::size(), returns the number of elements in this vector.
Definition: sst.hpp:110
void receiver_function(subgroup_id_t subgroup_num, const SubgroupSettings &subgroup_settings, const std::map< uint32_t, uint32_t > &shard_ranks_by_sender_rank, uint32_t num_shard_senders, DerechoSST &sst, unsigned int batch_size, const std::function< void(uint32_t, volatile char *, uint32_t)> &sst_receive_handler_lambda)
const char * getBaseAddress()
Definition: sst.hpp:320
Definition: HLC.hpp:7
SSTFieldVector< uint64_t > local_stability_frontier
to check for failures - used by the thread running check_failures_loop in derecho_group ...
void deliver_message(RDMCMessage &msg, const subgroup_id_t &subgroup_num, const persistent::version_t &version, const uint64_t &msg_timestamp)
Delivers a single message to the application layer, either by invoking an RPC function or by calling ...
uint32_t node_id_t
Type alias for Node IDs in a Derecho group.
std::map< subgroup_id_t, std::map< message_id_t, RDMCMessage > > locally_stable_rdmc_messages
Messages that have finished sending/receiving but aren&#39;t yet globally stable.
The GMS and derecho_group will share the same SST for efficiency.
Definition: derecho_sst.hpp:22
std::atomic< bool > thread_shutdown
Indicates that the group is being destroyed.
std::list< pred_handle > sender_pred_handles
std::map< node_id_t, uint32_t > node_id_to_sst_index
inverse map of node_ids to sst_row
std::shared_ptr< DerechoSST > sst
The SST, shared between this group and its GMS.
int32_t resolve_num_received(int32_t index, uint32_t num_received_entry)
std::function< void(char *buffer, size_t size)> completion_callback_t
Definition: rdmc.hpp:42
std::vector< message_id_t > future_message_indices
Index to be used the next time get_sendbuffer_ptr is called.
bool create_group(uint16_t group_number, std::vector< uint32_t > members, size_t block_size, send_algorithm algorithm, incoming_message_callback_t incoming_receive, completion_callback_t send_callback, failure_callback_t failure_callback) __attribute__((warn_unused_result))
Creates a new RDMC group.
Definition: rdmc.cpp:60
std::unique_ptr< char[]> buffer
#define assert_always(x...)
Definition: schedule.cpp:10
std::vector< std::optional< RDMCMessage > > current_sends
Vector of messages that are currently being sent out using RDMC, or boost::none otherwise.
std::tuple< persistence_manager_make_version_func_t, persistence_manager_post_persist_func_t > persistence_manager_callbacks_t
volatile char * buf
Pointer to the message.
version_t combine_int32s(const int_type high_bits, const int_type low_bits)
Helper function for creating Persistent version numbers out of MulticastGroup sequence numbers and Vi...
std::shared_ptr< rdma::memory_region > mr
std::list< pred_handle > receiver_pred_handles
std::function< void(const subgroup_id_t &, const persistent::version_t &, const uint64_t &)> subgroup_post_next_version_func_t
int sender_rank
This node&#39;s sender rank within the subgroup (as defined by SubView::sender_rank_of) ...
std::vector< bool > last_transfer_medium
std::map< subgroup_id_t, std::map< message_id_t, uint64_t > > pending_persistence
message_callback_t global_stability_callback
bool send(subgroup_id_t subgroup_num, long long unsigned int payload_size, const std::function< void(char *buf)> &msg_generator, bool cooked_send)
Send now internally calls get_sendbuffer_ptr.
Recurrent predicates persist as long as the SST instance and fire their triggers every time they are ...
long long unsigned int size
The message&#39;s size in bytes.
void sst_receive_handler(subgroup_id_t subgroup_num, const SubgroupSettings &subgroup_settings, const std::map< uint32_t, uint32_t > &shard_ranks_by_sender_rank, uint32_t num_shard_senders, uint32_t sender_rank, volatile char *data, uint64_t size)
void debug_print()
Debugging function; prints the current state of the SST to stdout.
std::list< pred_handle > delivery_pred_handles
Implements the low-level mechanics of tracking multicasts in a Derecho group, using RDMC to deliver m...
Base exception class for all exceptions raised by Derecho.
void put()
Writes the entire local row to all remote nodes.
Definition: sst.hpp:325
void delivery_trigger(subgroup_id_t subgroup_num, const SubgroupSettings &subgroup_settings, const uint32_t num_shard_members, DerechoSST &sst)
A collection of settings for a single subgroup that this node is a member of.
persistence_manager_callbacks_t persistence_manager_callbacks
persistence manager callbacks
std::map< std::pair< subgroup_id_t, node_id_t >, RDMCMessage > current_receives
Messages that are currently being received.
persistence_callback_t global_persistence_callback
bool version_message(RDMCMessage &msg, const subgroup_id_t &subgroup_num, const persistent::version_t &version, const uint64_t &msg_timestamp)
Enqueues a single message for persistence with the persistence manager.
std::vector< std::list< int32_t > > received_intervals
Used for synchronizing receives by RDMC and SST.
std::map< uint32_t, std::vector< MessageBuffer > > free_message_buffers
Stores message buffers not currently in use.