18 template <
class T,
class U>
21 for(
auto it = begin(container); it != end(container); ++it) {
22 if(*it == elem)
return n;
25 return container.size();
29 std::vector<node_id_t> _members,
node_id_t my_node_id,
30 std::shared_ptr<DerechoSST>
sst,
32 uint32_t total_num_subgroups,
33 const std::map<subgroup_id_t, SubgroupSettings>& subgroup_settings_by_id,
34 unsigned int sender_timeout,
37 std::vector<char> already_failed)
39 num_members(members.size()),
40 member_index(
index_of(members, my_node_id)),
42 total_num_subgroups(total_num_subgroups),
43 subgroup_settings_map(subgroup_settings_by_id),
44 received_intervals(sst->num_received.size(), {-1, -1}),
71 bool no_member_failed =
true;
72 if(already_failed.size()) {
74 if(already_failed[i]) {
75 no_member_failed =
false;
80 if(!already_failed.size() || no_member_failed) {
90 std::vector<node_id_t> _members,
node_id_t my_node_id,
91 std::shared_ptr<DerechoSST>
sst,
94 const std::map<subgroup_id_t, SubgroupSettings>& subgroup_settings_by_id,
97 std::vector<char> already_failed)
102 total_num_subgroups(total_num_subgroups),
119 assert(old_group.rdmc_group_num_offset <= std::numeric_limits<uint16_t>::max() - old_group.num_members -
num_members);
133 return std::move(msg);
141 return std::move(msg);
146 std::lock_guard<std::recursive_mutex> lock(old_group.msg_state_mtx);
147 for(
const auto p : subgroup_settings_by_id) {
153 for(
auto& msg : old_group.current_receives) {
156 old_group.current_receives.clear();
161 for(
auto& p : old_group.locally_stable_rdmc_messages) {
162 if(p.second.size() == 0) {
166 for(
auto& q : p.second) {
174 old_group.locally_stable_rdmc_messages.clear();
185 old_group.locally_stable_sst_messages.clear();
188 for(
const auto& p : subgroup_settings_by_id) {
189 auto subgroup_num = p.first;
190 if(old_group.current_sends.size() > subgroup_num && old_group.current_sends[subgroup_num]) {
191 pending_sends[subgroup_num].push(convert_msg(*old_group.current_sends[subgroup_num], subgroup_num));
194 if(old_group.pending_sends.size() > subgroup_num) {
195 while(!old_group.pending_sends[subgroup_num].empty()) {
196 pending_sends[subgroup_num].push(convert_msg(old_group.pending_sends[subgroup_num].front(), subgroup_num));
197 old_group.pending_sends[subgroup_num].pop();
201 if(old_group.next_sends.size() > subgroup_num && old_group.next_sends[subgroup_num]) {
202 next_sends[subgroup_num] = convert_msg(*old_group.next_sends[subgroup_num], subgroup_num);
205 for(
auto& entry : old_group.non_persistent_messages[subgroup_num]) {
207 convert_msg(entry.second, subgroup_num));
209 old_group.non_persistent_messages.clear();
210 for(
auto& entry : old_group.non_persistent_sst_messages[subgroup_num]) {
212 convert_sst_msg(entry.second, subgroup_num));
214 old_group.non_persistent_sst_messages.clear();
218 bool no_member_failed =
true;
219 if(already_failed.size()) {
221 if(already_failed[i]) {
222 no_member_failed =
false;
227 if(!already_failed.size() || no_member_failed) {
238 uint32_t subgroup_num = p.first;
240 const std::vector<node_id_t>& shard_members = subgroup_settings.
members;
241 std::size_t num_shard_members = shard_members.size();
242 std::vector<int> shard_senders = subgroup_settings.
senders;
250 for(uint shard_rank = 0, sender_rank = -1; shard_rank < num_shard_members; ++shard_rank) {
252 if(!shard_senders[shard_rank]) {
256 node_id_t node_id = shard_members[shard_rank];
260 rdmc_receive_handler = [
this, subgroup_num, shard_rank, sender_rank,
261 subgroup_settings, node_id,
263 shard_sst_indices](
char* data,
size_t size) {
266 header* h = (header*)data;
267 const int32_t index = h->index;
268 message_id_t sequence_number = index * num_shard_senders + sender_rank;
270 dbg_default_trace(
"Locally received message in subgroup {}, sender rank {}, index {}",
271 subgroup_num, shard_rank, index);
280 auto& msg = it->second;
288 auto new_num_received =
resolve_num_received(index, subgroup_settings.num_received_offset + sender_rank);
291 if(subgroup_settings.sender_rank >= 0 && subgroup_settings.mode !=
Mode::UNORDERED) {
292 if(subgroup_settings.sender_rank < (
int)sender_rank) {
296 }
else if(subgroup_settings.sender_rank > (
int)sender_rank) {
306 for(
int i = sst->num_received[member_index][subgroup_settings.num_received_offset + sender_rank] + 1;
307 i <= new_num_received; ++i) {
308 message_id_t seq_num = i * num_shard_senders + sender_rank;
312 char* buf =
const_cast<char*
>(msg.buf);
313 header* h = (header*)(buf);
318 {{buf + h->header_size, msg.size - h->header_size}},
321 if(node_id ==
members[member_index]) {
328 assert(it2->first == seq_num);
329 auto& msg = it2->second;
330 char* buf = msg.message_buffer.buffer.get();
331 header* h = (header*)(buf);
336 {{buf + h->header_size, msg.size - h->header_size}},
340 if(node_id ==
members[member_index]) {
353 auto new_seq_num = (*min_ptr + 1) * num_shard_senders + min_index - 1;
354 if(static_cast<message_id_t>(new_seq_num) > sst->seq_num[
member_index][subgroup_num]) {
355 dbg_default_trace(
"Updating seq_num for subgroup {} to {}", subgroup_num, new_seq_num);
357 sst->put(shard_sst_indices,
358 sst->seq_num, subgroup_num);
360 sst->put(shard_sst_indices,
366 auto receive_handler_plus_notify =
367 [
this, rdmc_receive_handler](
char* data,
size_t size) {
368 rdmc_receive_handler(data, size);
374 std::vector<uint32_t> rotated_shard_members(shard_members.size());
375 for(uint k = 0; k < num_shard_members; ++k) {
376 rotated_shard_members[k] = shard_members[(shard_rank + k) % num_shard_members];
380 if(num_shard_members <= 1) {
388 rdmc_group_num_offset, rotated_shard_members, subgroup_settings.profile.block_size, subgroup_settings.profile.rdmc_send_algorithm,
393 receive_handler_plus_notify,
394 [](std::optional<uint32_t>) {})) {
401 rdmc_group_num_offset, rotated_shard_members, subgroup_settings.profile.block_size, subgroup_settings.profile.rdmc_send_algorithm,
402 [
this, subgroup_num, node_id, max_msg_size=subgroup_settings.profile.max_msg_size](
size_t length) {
420 assert(ret.mr->buffer !=
nullptr);
423 rdmc_receive_handler, [](std::optional<uint32_t>) {})) {
434 auto num_received_size =
sst->num_received.size();
435 auto seq_num_size =
sst->seq_num.size();
437 for(uint j = 0; j < num_received_size; ++j) {
438 sst->num_received[i][j] = -1;
440 for(uint j = 0; j < seq_num_size; ++j) {
441 sst->seq_num[i][j] = -1;
442 sst->delivered_num[i][j] = -1;
443 sst->persisted_num[i][j] = -1;
447 sst->sync_with_members();
452 const uint64_t& msg_ts_us) {
454 header* h = (header*)(buf);
457 buf += h->header_size;
458 auto payload_size = msg.
size - h->header_size;
467 {{buf + h->header_size, msg.size - h->header_size}},
474 const uint64_t& msg_ts_us) {
475 char* buf =
const_cast<char*
>(msg.
buf);
476 header* h = (header*)(buf);
479 buf += h->header_size;
480 auto payload_size = msg.
size - h->header_size;
489 {{buf + h->header_size, msg.size - h->header_size}},
497 header* h = (header*)(buf);
499 if(msg.
size == h->header_size) {
506 uint64_t msg_ts_us = msg_timestamp / 1e3;
509 clock_gettime(CLOCK_REALTIME, &now);
510 msg_ts_us = (uint64_t)now.tv_sec * 1e6 + now.tv_nsec / 1e3;
518 char* buf =
const_cast<char*
>(msg.
buf);
519 header* h = (header*)(buf);
521 if(msg.
size == h->header_size) {
528 uint64_t msg_ts_us = msg_timestamp / 1e3;
531 clock_gettime(CLOCK_REALTIME, &now);
532 msg_ts_us = (uint64_t)now.tv_sec * 1e6 + now.tv_nsec / 1e3;
539 const std::vector<int32_t>& max_indices_for_senders,
541 bool non_null_msgs_delivered =
false;
542 assert(max_indices_for_senders.size() == (size_t)num_shard_senders);
545 int32_t max_seq_num = curr_seq_num;
546 for(uint sender = 0; sender < num_shard_senders; sender++) {
547 max_seq_num = std::max(max_seq_num,
548 static_cast<int32_t>(max_indices_for_senders[sender] * num_shard_senders + sender));
551 for(int32_t seq_num = curr_seq_num + 1; seq_num <= max_seq_num; seq_num++) {
553 int32_t index = seq_num / num_shard_senders;
554 uint32_t sender_rank = seq_num % num_shard_senders;
555 if(index > max_indices_for_senders[sender_rank]) {
561 auto& msg = rdmc_msg_ptr->second;
562 char* buf = msg.message_buffer.buffer.get();
563 uint64_t msg_ts = ((header*)buf)->timestamp;
566 non_null_msgs_delivered |=
version_message(msg, subgroup_num, assigned_version, msg_ts);
571 dbg_default_trace(
"Subgroup {}, deliver_messages_upto delivering an SST message with seq_num = {}",
572 subgroup_num, seq_num);
574 char* buf = (
char*)msg.buf;
575 uint64_t msg_ts = ((header*)buf)->timestamp;
577 non_null_msgs_delivered |=
version_message(msg, subgroup_num, assigned_version, msg_ts);
583 sst->delivered_num, subgroup_num);
584 if(non_null_msgs_delivered) {
597 if(*it == index - 1) {
604 auto next_it = std::next(it);
605 if(*it != index - 1) {
607 if(*next_it != index + 1) {
613 if(*next_it != index + 1) {
625 const std::map<uint32_t, uint32_t>& shard_ranks_by_sender_rank,
627 for(uint sender_count = 0; sender_count < num_shard_senders; ++sender_count) {
640 const std::map<uint32_t, uint32_t>& shard_ranks_by_sender_rank,
641 uint32_t num_shard_senders, uint32_t sender_rank,
642 volatile char* data, uint64_t size) {
643 header* h = (header*)data;
644 const int32_t index = h->index;
646 message_id_t sequence_number = index * num_shard_senders + sender_rank;
647 node_id_t node_id = subgroup_settings.
members[shard_ranks_by_sender_rank.at(sender_rank)];
655 if(subgroup_settings.
sender_rank < (
int)sender_rank) {
659 }
else if(subgroup_settings.
sender_rank > (
int)sender_rank) {
669 message_id_t seq_num = i * num_shard_senders + sender_rank;
673 char* buf =
const_cast<char*
>(msg.buf);
674 header* h = (header*)(buf);
678 {{buf + h->header_size, msg.size - h->header_size}},
688 assert(it2->first == seq_num);
689 auto& msg = it2->second;
690 char* buf = msg.message_buffer.buffer.get();
691 header* h = (header*)(buf);
695 {{buf + h->header_size, msg.size - h->header_size}},
706 sst->num_received[
member_index][subgroup_settings.num_received_offset + sender_rank] = new_num_received;
710 const std::map<uint32_t, uint32_t>& shard_ranks_by_sender_rank,
711 uint32_t num_shard_senders,
DerechoSST&
sst,
unsigned int batch_size,
712 const std::function<
void(uint32_t,
volatile char*, uint32_t)>& sst_receive_handler_lambda) {
714 const uint64_t slot_width = profile.
sst_max_msg_size + 2 *
sizeof(uint64_t);
716 for(uint i = 0; i < batch_size; ++i) {
717 for(uint sender_count = 0; sender_count < num_shard_senders; ++sender_count) {
719 const uint32_t slot = num_received % profile.
window_size;
721 subgroup_settings.
members[shard_ranks_by_sender_rank.at(sender_count)]);
723 [subgroup_settings.
slot_offset + slot_width * (slot + 1) -
sizeof(uint64_t)];
724 if(next_seq == num_received / static_cast<int32_t>(profile.
window_size) + 1) {
725 dbg_default_trace(
"receiver_trig calling sst_receive_handler_lambda. next_seq = {}, num_received = {}, sender rank = {}. Reading from SST row {}, slot {}",
726 next_seq, num_received, sender_count, sender_sst_index, subgroup_settings.
slot_offset + slot_width * slot);
727 sst_receive_handler_lambda(sender_count,
728 &sst.
slots[sender_sst_index]
729 [subgroup_settings.
slot_offset + slot_width * slot],
730 (uint64_t&)sst.
slots[sender_sst_index]
731 [subgroup_settings.
slot_offset + slot_width * (slot + 1) - 2 *
sizeof(uint64_t)]);
742 message_id_t new_seq_num = (*min_ptr + 1) * num_shard_senders + min_index - 1;
744 dbg_default_trace(
"Updating seq_num for subgroup {} to {}", subgroup_num, new_seq_num);
749 sizeof(decltype(sst.
num_received)::value_type) * num_shard_senders);
753 const uint32_t num_shard_members,
DerechoSST& sst) {
758 for(uint i = 0; i < num_shard_members; ++i) {
761 min_stable_num = std::min(min_stable_num, stable_num_copy);
764 bool update_sst =
false;
765 bool non_null_msgs_delivered =
false;
771 int32_t least_undelivered_rdmc_seq_num, least_undelivered_sst_seq_num;
772 least_undelivered_rdmc_seq_num = least_undelivered_sst_seq_num = std::numeric_limits<int32_t>::max();
779 if(least_undelivered_rdmc_seq_num < least_undelivered_sst_seq_num && least_undelivered_rdmc_seq_num <= min_stable_num) {
781 dbg_default_trace(
"Subgroup {}, can deliver a locally stable RDMC message: min_stable_num={} and least_undelivered_seq_num={}",
782 subgroup_num, min_stable_num, least_undelivered_rdmc_seq_num);
785 uint64_t msg_ts = ((header*)buf)->timestamp;
789 non_null_msgs_delivered |=
version_message(msg, subgroup_num, assigned_version, msg_ts);
794 }
else if(least_undelivered_sst_seq_num < least_undelivered_rdmc_seq_num && least_undelivered_sst_seq_num <= min_stable_num) {
796 dbg_default_trace(
"Subgroup {}, can deliver a locally stable SST message: min_stable_num={} and least_undelivered_seq_num={}",
797 subgroup_num, min_stable_num, least_undelivered_sst_seq_num);
799 char* buf = (
char*)msg.
buf;
800 uint64_t msg_ts = ((header*)buf)->timestamp;
803 non_null_msgs_delivered |=
version_message(msg, subgroup_num, assigned_version, msg_ts);
814 if(non_null_msgs_delivered) {
823 auto num_shard_members = subgroup_settings.
members.size();
824 std::vector<int> shard_senders = subgroup_settings.
senders;
826 std::map<uint32_t, uint32_t> shard_ranks_by_sender_rank;
827 for(uint j = 0, l = 0; j < num_shard_members; ++j) {
828 if(shard_senders[j]) {
829 shard_ranks_by_sender_rank[l] = j;
834 auto receiver_pred = [
this, subgroup_settings, shard_ranks_by_sender_rank, num_shard_senders](
const DerechoSST&
sst) {
836 shard_ranks_by_sender_rank, num_shard_senders, sst);
842 auto sst_receive_handler_lambda = [
this, subgroup_num, subgroup_settings, shard_ranks_by_sender_rank,
843 num_shard_senders](uint32_t sender_rank,
volatile char* data, uint64_t size) {
845 shard_ranks_by_sender_rank, num_shard_senders,
846 sender_rank, data, size);
848 auto receiver_trig = [
this, subgroup_num, subgroup_settings, shard_ranks_by_sender_rank,
849 num_shard_senders, batch_size, sst_receive_handler_lambda](
DerechoSST&
sst)
mutable {
851 shard_ranks_by_sender_rank, num_shard_senders, sst,
852 batch_size, sst_receive_handler_lambda);
858 auto delivery_pred = [
this, subgroup_num, subgroup_settings, num_shard_members](
const DerechoSST&
sst) {
863 for(uint i = 0; i < num_shard_members; ++i) {
866 min_stable_num = std::min(min_stable_num, stable_num_copy);
871 auto delivery_trig = [
this, subgroup_num, subgroup_settings, num_shard_members](
DerechoSST&
sst)
mutable {
878 auto persistence_pred = [
this, subgroup_num, subgroup_settings, num_shard_members,
884 for(uint i = 1; i < num_shard_members; ++i) {
886 min_persisted_num = std::min(min_persisted_num, persisted_num_copy);
890 auto persistence_trig = [
this, subgroup_num, subgroup_settings, num_shard_members,
896 for(uint i = 1; i < num_shard_members; ++i) {
898 min_persisted_num = std::min(min_persisted_num, persisted_num_copy);
903 version_seen = min_persisted_num;
909 if(subgroup_settings.sender_rank >= 0) {
910 auto sender_pred = [
this, subgroup_num, subgroup_settings, num_shard_members, num_shard_senders](
const DerechoSST&
sst) {
912 for(uint i = 0; i < num_shard_members; ++i) {
929 if(subgroup_settings.sender_rank >= 0) {
930 auto sender_pred = [
this, subgroup_num, subgroup_settings, num_shard_members](
const DerechoSST&
sst) {
931 for(uint i = 0; i < num_shard_members; ++i) {
932 uint32_t num_received_offset = subgroup_settings.num_received_offset;
934 <
static_cast<int32_t
>(
future_message_indices[subgroup_num] - 1 - subgroup_settings.profile.window_size)) {
959 if(thread_shutdown_existing) {
992 pthread_setname_np(pthread_self(),
"sender_thread");
994 auto should_send_to_subgroup = [&](
subgroup_id_t subgroup_num) {
1004 int shard_sender_index = subgroup_settings.
sender_rank;
1005 std::vector<int> shard_senders = subgroup_settings.
senders;
1007 assert(shard_sender_index >= 0);
1013 std::vector<node_id_t> shard_members = subgroup_settings.
members;
1014 auto num_shard_members = shard_members.size();
1015 assert(num_shard_members >= 1);
1017 for(uint i = 0; i < num_shard_members; ++i) {
1024 for(uint i = 0; i < num_shard_members; ++i) {
1035 auto should_send = [&]() {
1038 if(should_send_to_subgroup(subgroup_num)) {
1039 subgroup_to_send = subgroup_num;
1045 auto should_wake = [&]() {
return thread_shutdown || should_send(); };
1058 throw std::runtime_error(
"rdmc::send returned false");
1071 struct timespec start_time;
1072 clock_gettime(CLOCK_REALTIME, &start_time);
1073 return start_time.tv_sec * 1e9 + start_time.tv_nsec;
1079 for(
auto index : shard_sst_indices) {
1081 global_stability_frontier = std::min(global_stability_frontier, local_stability_frontier_copy);
1083 return global_stability_frontier;
1087 pthread_setname_np(pthread_self(),
"timeout_thread");
1089 std::this_thread::sleep_for(std::chrono::milliseconds(
sender_timeout));
1094 auto subgroup_num = p.first;
1095 auto members = p.second.members;
1099 for(
auto i : sst_indices) {
1101 min_persisted_num = std::min(min_persisted_num, persisted_num_copy);
1125 long long unsigned int msg_size =
sizeof(header);
1134 msg.
size = msg_size;
1146 ((header*)buf)->header_size =
sizeof(header);
1147 ((header*)buf)->index = msg.
index;
1148 ((header*)buf)->timestamp = current_time;
1149 ((header*)buf)->cooked_send =
false;
1162 ((header*)buf)->header_size =
sizeof(header);
1164 ((header*)buf)->timestamp = current_time;
1165 ((header*)buf)->cooked_send =
false;
1173 long long unsigned int payload_size,
1175 long long unsigned int msg_size = payload_size +
sizeof(header);
1177 if(msg_size > subgroup_settings.profile.max_msg_size) {
1178 std::string exp_msg(
"Can't send messages of size larger than the maximum message size which is equal to ");
1179 exp_msg += subgroup_settings.profile.max_msg_size;
1183 std::vector<node_id_t> shard_members = subgroup_settings.members;
1184 auto num_shard_members = shard_members.size();
1186 uint32_t num_shard_senders;
1187 std::vector<int> shard_senders = subgroup_settings.senders;
1188 int shard_sender_index = subgroup_settings.sender_rank;
1190 assert(shard_sender_index >= 0);
1193 for(uint i = 0; i < num_shard_members; ++i) {
1195 <
static_cast<int32_t
>((
future_message_indices[subgroup_num] - subgroup_settings.profile.window_size) * num_shard_senders + shard_sender_index)) {
1200 for(uint i = 0; i < num_shard_members; ++i) {
1201 auto num_received_offset = subgroup_settings.num_received_offset;
1209 if(msg_size > subgroup_settings.profile.sst_max_msg_size) {
1216 subgroup_settings.profile.max_msg_size);
1227 msg.
size = msg_size;
1236 ((header*)buf)->header_size =
sizeof(header);
1237 ((header*)buf)->index = msg.
index;
1238 ((header*)buf)->timestamp = current_time;
1239 ((header*)buf)->cooked_send = cooked_send;
1245 return buf +
sizeof(header);
1264 ((header*)buf)->header_size =
sizeof(header);
1266 ((header*)buf)->timestamp = current_time;
1267 ((header*)buf)->cooked_send = cooked_send;
1269 dbg_default_trace(
"Subgroup {}: get_sendbuffer_ptr increased future_message_indices to {}",
1273 return buf +
sizeof(header);
1278 const std::function<
void(
char* buf)>& msg_generator,
bool cooked_send) {
1322 std::vector<uint32_t> shard_sst_indices;
1323 for(
auto m : shard_members) {
1326 return shard_sst_indices;
1334 cout <<
"Printing SST" << endl;
1336 uint32_t subgroup_num = p.first;
1337 auto subgroup_settings = p.second;
1338 cout <<
"Subgroup " << subgroup_num << endl;
1340 cout <<
"Printing seq_num, delivered_num" << endl;
1341 for(
auto i : shard_sst_indices) {
1344 cout <<
"Printing last_received_messages" << endl;
1345 for(
auto i : shard_sst_indices) {
1346 uint32_t num_shard_senders = subgroup_settings.
senders.size();
1347 for(uint j = 0; j < num_shard_senders; ++j) {
1352 cout <<
"Printing multicastSST fields" << endl;
1357 std::cout <<
"Printing memory usage of free_message_buffers" << std::endl;
1359 std::cout <<
"Subgroup " << p.first <<
", Number of free buffers " << p.second.size() << std::endl;
uint32_t subgroup_id_t
Type alias for the internal Subgroup IDs generated by ViewManager.
std::map< subgroup_id_t, uint32_t > subgroup_to_rdmc_group
Maps subgroup IDs for which this node is a sender to the RDMC group it should use to send...
std::map< subgroup_id_t, std::map< message_id_t, SSTMessage > > non_persistent_sst_messages
Messages that are currently being written to persistent storage.
const int member_index
index of the local node in the members vector, which should also be its row index in the SST ...
MulticastGroup(std::vector< node_id_t > members, node_id_t my_node_id, std::shared_ptr< DerechoSST > sst, CallbackSet callbacks, uint32_t total_num_subgroups, const std::map< subgroup_id_t, SubgroupSettings > &subgroup_settings_by_id, unsigned int sender_timeout, const subgroup_post_next_version_func_t &post_next_version_callback, const persistence_manager_callbacks_t &persistence_manager_callbacks, std::vector< char > already_failed={})
Standard constructor for setting up a MulticastGroup for the first time.
std::condition_variable_any sender_cv
long long unsigned int size
The message's size in bytes.
std::vector< std::queue< RDMCMessage > > pending_sends
Messages that are ready to be sent, but must wait until the current send finishes.
std::map< uint32_t, bool > pending_sst_sends
std::vector< uint32_t > get_shard_sst_indices(subgroup_id_t subgroup_num)
uint32_t sender_id
The unique node ID of the message's sender.
uint16_t rdmc_group_num_offset
Offset to add to member ranks to form RDMC group numbers.
uint32_t sender_id
The unique node ID of the message's sender.
std::vector< int > senders
The "is_sender" flags for members of the subgroup.
pred_handle insert(pred predicate, trig trigger, PredicateType type=PredicateType::ONE_TIME)
Inserts a single (predicate, trigger) pair to the appropriate predicate list.
void get_buffer_and_send_auto_null(subgroup_id_t subgroup_num)
Bundles together a set of low-level parameters for configuring Derecho groups.
unsigned int get_num_rows() const
Returns the total number of rows in the table.
SSTFieldVector< char > slots
for SST multicast
void check_failures_loop()
Checks for failures when a sender reaches its timeout.
Predicates< DerivedSST > predicates
const std::map< subgroup_id_t, SubgroupSettings > subgroup_settings_map
Maps subgroup IDs (for subgroups this node is a member of) to an immutable set of configuration optio...
unsigned int sender_timeout
The time, in milliseconds, that a sender can wait to send a message before it is considered failed...
subgroup_post_next_version_func_t post_next_version_callback
post the next version to a subgroup just before deliver a message so that the user code know the curr...
SSTField< int32_t > vid
View ID associated with this SST.
A structure containing an RDMC message (which consists of some bytes in a registered memory region) a...
const unsigned int num_members
number of members
void register_predicates()
std::map< subgroup_id_t, std::function< void(char *, size_t)> > singleton_shard_receive_handlers
Receiver lambdas for shards that have only one member.
SSTFieldVector< message_id_t > seq_num
Sequence numbers are interpreted like a row-major pair: (sender, index) becomes sender + num_members ...
void wedge()
Stops all sending and receiving in this group, in preparation for shutting it down.
std::vector< std::unique_ptr< sst::multicast_group< DerechoSST > > > sst_multicast_group_ptrs
The SSTs for multicasts.
void remove(pred_handle &pred)
Removes a (predicate, trigger) pair previously registered with insert().
uint64_t sst_max_msg_size
MessageBuffer message_buffer
The MessageBuffer that contains the message's body.
SSTFieldVector< persistent::version_t > persisted_num
This represents the highest persistent version number that has been persisted to disk at this node...
std::list< pred_handle > persistence_pred_handles
uint32_t num_received_offset
The offset of this node's num_received counter within the subgroup's SST section. ...
std::size_t index_of(const Container &container, const typename Container::value_type &elem)
Finds a value in a STL container, and returns the index of that value in the container.
std::map< subgroup_id_t, std::map< message_id_t, RDMCMessage > > non_persistent_messages
Messages that are currently being written to persistent storage.
void destroy_group(uint16_t group_number)
std::recursive_mutex msg_state_mtx
void set(volatile Elem &e, const Elem &value)
Thread-safe setter for DerechoSST members; ensures there is a std::atomic_signal_fence after writing ...
bool send(uint16_t group_number, std::shared_ptr< rdma::memory_region > mr, size_t offset, size_t length) __attribute__((warn_unused_result))
std::vector< std::optional< RDMCMessage > > next_sends
next_message is the message that will be sent when send is called the next time.
std::vector< node_id_t > members
vector of member id's
int32_t index
The message's index (relative to other messages sent by that sender).
void deliver_messages_upto(const std::vector< int32_t > &max_indices_for_senders, subgroup_id_t subgroup_num, uint32_t num_shard_senders)
message_id_t index
The message's index (relative to other messages sent by that sender).
void send_loop()
Continuously waits for a new pending send, then sends it.
SSTFieldVector< message_id_t > delivered_num
This represents the highest sequence number that has been delivered at this node. ...
rpc_handler_t rpc_callback
These two callbacks are internal, not exposed to clients, so they're not in CallbackSet.
const uint64_t compute_global_stability_frontier(subgroup_id_t subgroup_num)
uint32_t get_num_senders(const std::vector< int > &shard_senders)
std::vector< message_id_t > next_message_to_deliver
bool receiver_predicate(const SubgroupSettings &subgroup_settings, const std::map< uint32_t, uint32_t > &shard_ranks_by_sender_rank, uint32_t num_shard_senders, const DerechoSST &sst)
int32_t message_id_t
Type alias for a message's unique "sequence number" or index.
bool rdmc_sst_groups_created
false if RDMC groups haven't been created successfully
char * get_sendbuffer_ptr(subgroup_id_t subgroup_num, long long unsigned int payload_size, bool cooked_send)
bool check_pending_sst_sends(subgroup_id_t subgroup_num)
std::map< subgroup_id_t, std::set< uint64_t > > pending_message_timestamps
const CallbackSet callbacks
Message-delivery event callbacks, supplied by the client, for "raw" sends.
Bundles together a set of callback functions for message delivery events.
std::thread sender_thread
The background thread that sends messages with RDMC.
std::vector< node_id_t > members
The members of the subgroup.
Mode mode
The operation mode of the subgroup.
void put_with_completion()
SSTFieldVector< int32_t > num_received_sst
#define dbg_default_trace(...)
SSTFieldVector< int32_t > num_received
Local count of number of received messages by sender.
uint32_t slot_offset
The offset of this node's slot within the subgroup's SST section.
std::map< subgroup_id_t, std::map< message_id_t, SSTMessage > > locally_stable_sst_messages
Same map as locally_stable_rdmc_messages, but for SST messages.
size_t size() const
Just like std::vector::size(), returns the number of elements in this vector.
void receiver_function(subgroup_id_t subgroup_num, const SubgroupSettings &subgroup_settings, const std::map< uint32_t, uint32_t > &shard_ranks_by_sender_rank, uint32_t num_shard_senders, DerechoSST &sst, unsigned int batch_size, const std::function< void(uint32_t, volatile char *, uint32_t)> &sst_receive_handler_lambda)
const char * getBaseAddress()
SSTFieldVector< uint64_t > local_stability_frontier
to check for failures - used by the thread running check_failures_loop in derecho_group ...
void deliver_message(RDMCMessage &msg, const subgroup_id_t &subgroup_num, const persistent::version_t &version, const uint64_t &msg_timestamp)
Delivers a single message to the application layer, either by invoking an RPC function or by calling ...
uint32_t node_id_t
Type alias for Node IDs in a Derecho group.
void initialize_sst_row()
std::map< subgroup_id_t, std::map< message_id_t, RDMCMessage > > locally_stable_rdmc_messages
Messages that have finished sending/receiving but aren't yet globally stable.
The GMS and derecho_group will share the same SST for efficiency.
std::atomic< bool > thread_shutdown
Indicates that the group is being destroyed.
std::thread timeout_thread
std::list< pred_handle > sender_pred_handles
std::map< node_id_t, uint32_t > node_id_to_sst_index
inverse map of node_ids to sst_row
std::shared_ptr< DerechoSST > sst
The SST, shared between this group and its GMS.
int32_t resolve_num_received(int32_t index, uint32_t num_received_entry)
std::function< void(char *buffer, size_t size)> completion_callback_t
std::vector< message_id_t > future_message_indices
Index to be used the next time get_sendbuffer_ptr is called.
bool create_group(uint16_t group_number, std::vector< uint32_t > members, size_t block_size, send_algorithm algorithm, incoming_message_callback_t incoming_receive, completion_callback_t send_callback, failure_callback_t failure_callback) __attribute__((warn_unused_result))
Creates a new RDMC group.
std::unique_ptr< char[]> buffer
#define assert_always(x...)
std::vector< std::optional< RDMCMessage > > current_sends
Vector of messages that are currently being sent out using RDMC, or boost::none otherwise.
std::tuple< persistence_manager_make_version_func_t, persistence_manager_post_persist_func_t > persistence_manager_callbacks_t
volatile char * buf
Pointer to the message.
version_t combine_int32s(const int_type high_bits, const int_type low_bits)
Helper function for creating Persistent version numbers out of MulticastGroup sequence numbers and Vi...
std::shared_ptr< rdma::memory_region > mr
std::list< pred_handle > receiver_pred_handles
bool create_rdmc_sst_groups()
std::function< void(const subgroup_id_t &, const persistent::version_t &, const uint64_t &)> subgroup_post_next_version_func_t
uint32_t total_num_subgroups
int sender_rank
This node's sender rank within the subgroup (as defined by SubView::sender_rank_of) ...
std::vector< bool > last_transfer_medium
std::map< subgroup_id_t, std::map< message_id_t, uint64_t > > pending_persistence
message_callback_t global_stability_callback
bool send(subgroup_id_t subgroup_num, long long unsigned int payload_size, const std::function< void(char *buf)> &msg_generator, bool cooked_send)
Send now internally calls get_sendbuffer_ptr.
Recurrent predicates persist as long as the SST instance and fire their triggers every time they are ...
long long unsigned int size
The message's size in bytes.
void sst_receive_handler(subgroup_id_t subgroup_num, const SubgroupSettings &subgroup_settings, const std::map< uint32_t, uint32_t > &shard_ranks_by_sender_rank, uint32_t num_shard_senders, uint32_t sender_rank, volatile char *data, uint64_t size)
void debug_print()
Debugging function; prints the current state of the SST to stdout.
std::list< pred_handle > delivery_pred_handles
Implements the low-level mechanics of tracking multicasts in a Derecho group, using RDMC to deliver m...
Base exception class for all exceptions raised by Derecho.
void put()
Writes the entire local row to all remote nodes.
void delivery_trigger(subgroup_id_t subgroup_num, const SubgroupSettings &subgroup_settings, const uint32_t num_shard_members, DerechoSST &sst)
A collection of settings for a single subgroup that this node is a member of.
persistence_manager_callbacks_t persistence_manager_callbacks
persistence manager callbacks
std::map< std::pair< subgroup_id_t, node_id_t >, RDMCMessage > current_receives
Messages that are currently being received.
persistence_callback_t global_persistence_callback
bool version_message(RDMCMessage &msg, const subgroup_id_t &subgroup_num, const persistent::version_t &version, const uint64_t &msg_timestamp)
Enqueues a single message for persistence with the persistence manager.
std::vector< std::list< int32_t > > received_intervals
Used for synchronizing receives by RDMC and SST.
std::map< uint32_t, std::vector< MessageBuffer > > free_message_buffers
Stores message buffers not currently in use.