20 #include <mutils/macro_utils.hpp> 31 const std::vector<std::type_index>& subgroup_type_order,
32 const bool any_persistent_objects,
33 const std::shared_ptr<tcp::tcp_connections>& group_tcp_sockets,
36 std::vector<view_upcall_t> _view_upcalls)
38 thread_shutdown(false),
40 view_upcalls(_view_upcalls),
41 subgroup_info(subgroup_info),
42 subgroup_type_order(subgroup_type_order),
43 tcp_sockets(group_tcp_sockets),
44 subgroup_objects(object_reference_map),
45 any_persistent_objects(any_persistent_objects),
47 persistence_manager_callbacks(_persistence_manager_callbacks) {
51 if(any_persistent_objects) {
53 curr_view = persistent::loadObject<View>();
72 0, std::vector<node_id_t>{my_id},
73 std::vector<std::tuple<ip_addr_t, uint16_t, uint16_t, uint16_t, uint16_t>>{
80 std::vector<node_id_t>{}, std::vector<node_id_t>{},
93 const std::shared_ptr<tcp::tcp_connections>& group_tcp_sockets,
96 std::vector<view_upcall_t> _view_upcalls)
101 subgroup_info(subgroup_info),
102 subgroup_type_order(subgroup_type_order),
105 any_persistent_objects(any_persistent_objects),
133 bool leader_redirect;
135 leader_redirect =
false;
136 uint64_t leader_version_hashcode;
140 if(!success)
throw derecho_exception(
"Failed to exchange version hashcodes with the leader! Leader has crashed.");
142 throw derecho_exception(
"Unable to connect to Derecho leader because the leader is running on an incompatible platform or used an incompatible compiler.");
144 success = leader_connection.
write(my_id);
145 if(!success)
throw derecho_exception(
"Failed to send ID to the leader! Leader has crashed.");
146 success = leader_connection.
read(leader_response);
147 if(!success)
throw derecho_exception(
"Failed to read initial response from leader! Leader has crashed.");
149 dbg_default_error(
"Error! Leader refused connection because ID {} is already in use!", my_id);
154 std::size_t ip_addr_size;
155 leader_connection.
read(ip_addr_size);
156 char buffer[ip_addr_size];
157 leader_connection.
read(buffer, ip_addr_size);
159 uint16_t leader_gms_port;
160 leader_connection.
read(leader_gms_port);
161 dbg_default_info(
"That node was not the leader! Redirecting to {}", leader_ip);
164 leader_connection =
tcp::socket(leader_ip, leader_gms_port);
165 leader_redirect =
true;
167 }
while(leader_redirect);
171 curr_view = persistent::loadObject<View>();
174 if(!success)
throw derecho_exception(
"Restart leader crashed before sending a restart View!");
175 auto leader_socket_write = [&leader_connection](
const char* bytes, std::size_t size) {
176 if(!leader_connection.
write(bytes, size)) {
177 throw derecho_exception(
"Restart leader crashed before sending a restart View!");
190 if(!success)
throw derecho_exception(
"Restart leader crashed before sending a restart View!");
191 for(
const auto& id_to_shard_map :
restart_state->logged_ragged_trim) {
192 const std::unique_ptr<RaggedTrim>& ragged_trim = id_to_shard_map.second.begin()->second;
194 if(!success)
throw derecho_exception(
"Restart leader crashed before sending a restart View!");
209 std::size_t size_of_view;
210 bool success = leader_connection.
read(size_of_view);
212 throw derecho_exception(
"Leader crashed before it could send the initial View! Try joining again at the new leader.");
214 char buffer[size_of_view];
215 success = leader_connection.
read(buffer, size_of_view);
217 throw derecho_exception(
"Leader crashed before it could send the initial View! Try joining again at the new leader.");
219 curr_view = mutils::from_bytes<View>(
nullptr, buffer);
224 std::size_t num_of_ragged_trims;
225 leader_connection.
read(num_of_ragged_trims);
226 for(std::size_t i = 0; i < num_of_ragged_trims; ++i) {
227 std::size_t size_of_ragged_trim;
228 leader_connection.
read(size_of_ragged_trim);
229 char buffer[size_of_ragged_trim];
230 leader_connection.
read(buffer, size_of_ragged_trim);
231 std::unique_ptr<RaggedTrim> ragged_trim = mutils::from_bytes<RaggedTrim>(
nullptr, buffer);
233 restart_state->logged_ragged_trim[ragged_trim->subgroup_id].emplace(
234 ragged_trim->shard_num, std::move(ragged_trim));
250 bool success = leader_connection.
read(commit_message);
252 throw derecho_exception(
"Leader crashed before it could send the initial View! Try joining again at the new leader.");
258 throw derecho_exception(
"Leader crashed before it could send the initial View! Try joining again at the new leader.");
262 success = leader_connection.
read(commit_message);
264 throw derecho_exception(
"Leader crashed before it could send the initial View! Try joining again at the new leader.");
286 for(
const auto& subgroup_and_map :
restart_state->logged_ragged_trim) {
287 for(
const auto& shard_and_trim : subgroup_and_map.second) {
292 dbg_default_debug(
"Truncating persistent logs to conform to leader's ragged trim");
298 std::map<subgroup_id_t, SubgroupSettings> subgroup_settings_map;
300 uint32_t num_received_size = sizes.first;
301 uint32_t slot_size = sizes.second;
355 dbg_default_debug(
"This node is the restart leader for subgroup {}, shard {}. Sending object data to shard members.", subgroup_id, shard);
358 if(shard_member != my_id) {
369 for(
int i = 0; i < initial_view.
num_members; ++i) {
370 if(initial_view.
members[i] != my_id) {
372 {std::get<0>(initial_view.member_ips_and_ports[i]),
373 std::get<PORT_TYPE::RPC>(initial_view.member_ips_and_ports[i])});
383 for(
int i = 0; i < initial_view.
num_members; ++i) {
384 if(initial_view.
members[i] != my_id
387 {std::get<0>(initial_view.member_ips_and_ports[i]),
388 std::get<PORT_TYPE::RPC>(initial_view.member_ips_and_ports[i])});
396 curr_view->gmsSST->start_predicate_evaluation();
400 for(
const auto& id_to_shard_map : logged_ragged_trim) {
402 uint32_t my_shard_id;
406 const auto find_my_shard = restart_view.
my_subgroups.find(subgroup_id);
410 my_shard_id = find_my_shard->second;
411 const auto& my_shard_ragged_trim = id_to_shard_map.second.at(my_shard_id);
413 my_shard_ragged_trim->vid, my_shard_ragged_trim->max_received_by_sender);
414 dbg_default_trace(
"Truncating persistent log for subgroup {} to version {}", subgroup_id, max_delivered_version);
421 std::map<node_id_t, tcp::socket> waiting_join_sockets;
422 std::set<node_id_t> members_sent_view;
423 curr_view->is_adequately_provisioned =
false;
424 bool joiner_failed =
false;
426 while(!
curr_view->is_adequately_provisioned) {
428 uint64_t joiner_version_code;
431 rls_default_warn(
"Rejected a connection from client at {}. Client was running on an incompatible platform or used an incompatible compiler.", client_socket.
get_remote_ip());
435 client_socket.
read(joiner_id);
436 if(
curr_view->rank_of(joiner_id) != -1) {
441 uint16_t joiner_gms_port = 0;
442 client_socket.
read(joiner_gms_port);
443 uint16_t joiner_rpc_port = 0;
444 client_socket.
read(joiner_rpc_port);
445 uint16_t joiner_sst_port = 0;
446 client_socket.
read(joiner_sst_port);
447 uint16_t joiner_rdmc_port = 0;
448 client_socket.
read(joiner_rdmc_port);
456 {joiner_ip, joiner_gms_port, joiner_rpc_port, joiner_sst_port, joiner_rdmc_port}),
457 std::vector<char>(
curr_view->num_members + 1, 0),
459 std::vector<node_id_t>{}, 0, 0,
462 waiting_join_sockets.emplace(joiner_id, std::move(client_socket));
463 dbg_default_debug(
"Node {} connected from IP address {} and GMS port {}", joiner_id, joiner_ip, joiner_gms_port);
465 joiner_failed =
false;
466 for(
auto waiting_sockets_iter = waiting_join_sockets.begin();
467 waiting_sockets_iter != waiting_join_sockets.end();) {
469 char view_buffer[view_buffer_size];
474 send_success = waiting_sockets_iter->second.write(view_buffer_size);
476 throw waiting_sockets_iter->first;
479 send_success = waiting_sockets_iter->second.write(view_buffer, view_buffer_size);
481 throw waiting_sockets_iter->first;
484 send_success = waiting_sockets_iter->second.write(std::size_t{0});
486 throw waiting_sockets_iter->first;
488 members_sent_view.emplace(waiting_sockets_iter->first);
489 waiting_sockets_iter++;
491 dbg_default_warn(
"Node {} failed after contacting the leader! Removing it from the initial view.", failed_joiner_id);
493 std::vector<node_id_t> filtered_members(
curr_view->members.size() - 1);
494 std::vector<std::tuple<ip_addr_t, uint16_t, uint16_t, uint16_t, uint16_t>> filtered_ips_and_ports(
curr_view->member_ips_and_ports.size() - 1);
495 std::vector<node_id_t> filtered_joiners(
curr_view->joined.size() - 1);
497 filtered_members.begin(), failed_joiner_id);
498 std::remove_copy(
curr_view->member_ips_and_ports.begin(),
500 filtered_ips_and_ports.begin(),
503 filtered_joiners.begin(), failed_joiner_id);
504 curr_view = std::make_unique<View>(0, filtered_members, filtered_ips_and_ports,
505 std::vector<char>(curr_view->num_members - 1, 0), filtered_joiners,
506 std::vector<node_id_t>{}, 0, 0,
511 waiting_join_sockets.erase(waiting_sockets_iter);
512 joiner_failed =
true;
518 for(
const node_id_t& member_sent_view : members_sent_view) {
522 members_sent_view.clear();
524 }
while(joiner_failed);
530 for(
auto waiting_sockets_iter = waiting_join_sockets.begin();
531 waiting_sockets_iter != waiting_join_sockets.end();) {
532 dbg_default_debug(
"Sending prepare and commit messages to node {}", waiting_sockets_iter->first);
535 waiting_sockets_iter = waiting_join_sockets.erase(waiting_sockets_iter);
540 bool quorum_achieved =
false;
541 while(!quorum_achieved) {
544 quorum_achieved =
true;
549 for(
auto& subgroup_to_map :
restart_state->logged_ragged_trim) {
550 for(
auto& shard_trim_pair : subgroup_to_map.second) {
551 shard_trim_pair.second->leader_id = std::numeric_limits<node_id_t>::max();
559 if(failed_node_id != -1) {
560 dbg_default_warn(
"Node {} failed while waiting for restart leader to reach a quorum!", failed_node_id);
573 if(failed_node_id != -1) {
574 dbg_default_warn(
"Node {} failed when sending Prepare messages for the restart view!", failed_node_id);
596 dbg_default_debug(
"Starting global initialization of RDMC and SST, including internal TCP connection setup");
598 auto member_ips_and_rdmc_ports_map = make_member_ips_and_ports_map<PORT_TYPE::RDMC>(*curr_view);
600 curr_view->members[curr_view->my_rank])) {
601 std::cout <<
"Global setup failed" << std::endl;
604 auto member_ips_and_sst_ports_map = make_member_ips_and_ports_map<PORT_TYPE::SST>(*curr_view);
608 curr_view->members[curr_view->my_rank]);
611 curr_view->members[curr_view->my_rank]);
617 pthread_setname_np(pthread_self(),
"client_thread");
626 pthread_setname_np(pthread_self(),
"old_view");
659 auto change_commit_ready = [
this](
const DerechoSST& gmsSST) {
665 auto leader_proposed_change = [
this](
const DerechoSST& gmsSST) {
666 return gmsSST.num_changes[
curr_view->find_rank_of_leader()]
673 auto leader_committed_changes = [
this](
const DerechoSST& gmsSST) {
674 return gmsSST.num_committed[
curr_view->find_rank_of_leader()]
675 > gmsSST.num_installed[
curr_view->my_rank];
681 suspected_changed, suspected_changed_trig,
698 leader_proposed_change, ack_proposed_change,
703 leader_committed_changes, view_change_trig,
720 const int old_leader_rank =
curr_view->find_rank_of_leader();
742 dbg_default_warn(
"Potential partitioning event, but partitioning safety is disabled. num_failed - num_left = {} but num_members - num_left + 1 = {}",
745 throw derecho_exception(
"Potential partitioning event: this node is no longer in the majority and must shut down!");
761 dbg_default_warn(
"Potential partitioning event, but partitioning safety is disabled. num_failed - num_left = {} but num_members - num_left + 1 = {}",
764 throw derecho_exception(
"Potential partitioning event: this node is no longer in the majority and must shut down!");
774 if(my_rank == new_leader_rank && my_rank == old_leader_rank
777 if(next_change_index == (
int)gmsSST.
changes.
size()) {
785 gmsSST.
put(gmsSST.
changes, next_change_index);
792 dbg_default_debug(
"The current leader failed, so this node will take over as leader");
793 auto leader_change_finished = [
this](
const DerechoSST&
sst) {
807 == static_cast<int>(
curr_view->members.size())) {
815 pending_join_sockets_locked.access,
816 pending_join_sockets_locked.access.begin());
827 client_socket = std::move(pending_join_sockets_locked.access.front());
828 pending_join_sockets_locked.access.pop_front();
831 client_socket.
read(joiner_id);
835 const int rank_of_leader =
curr_view->find_rank_of_leader();
837 curr_view->member_ips_and_ports[rank_of_leader])));
838 auto bind_socket_write = [&client_socket](
const char* bytes, std::size_t size) {
839 client_socket.
write(bytes, size);
842 std::get<0>(
curr_view->member_ips_and_ports[rank_of_leader]));
843 client_socket.
write(std::get<PORT_TYPE::GMS>(
844 curr_view->member_ips_and_ports[rank_of_leader]));
849 dbg_default_debug(
"Taking over as the new leader; everyone suspects prior leaders.");
852 for(
int rank = 0; rank <
curr_view->num_members; ++rank) {
856 if(next_change_index == (
int)gmsSST.
changes.
size()) {
866 if(!changes_copied) {
867 gmsSST.
put(gmsSST.
changes, next_change_index);
889 const int leader =
curr_view->find_rank_of_leader();
891 if(myRank != leader) {
946 auto is_meta_wedged = [
this](
const DerechoSST& gmsSST) {
947 for(
unsigned int n = 0; n < gmsSST.
get_num_rows(); ++n) {
954 auto meta_wedged_continuation = [
this](
DerechoSST& gmsSST) {
964 bool first_call =
false;
968 std::unique_lock<std::shared_timed_mutex> write_lock(
view_mutex);
972 if(!
next_view->is_adequately_provisioned) {
973 dbg_default_debug(
"Next view would not be adequately provisioned, waiting for more joins.");
982 auto leader_committed_change = [
this, curr_num_committed](
const DerechoSST& gmsSST) {
996 for(
const auto& shard_settings_pair :
997 curr_view->multicast_group->get_subgroup_settings()) {
999 const auto& curr_subgroup_settings = shard_settings_pair.second;
1000 auto num_shard_members = curr_subgroup_settings.members.size();
1001 std::vector<int> shard_senders = curr_subgroup_settings.senders;
1002 auto num_shard_senders =
curr_view->multicast_group->get_num_senders(shard_senders);
1003 std::map<uint32_t, uint32_t> shard_ranks_by_sender_rank;
1004 for(uint j = 0, l = 0; j < num_shard_members; ++j) {
1005 if(shard_senders[j]) {
1006 shard_ranks_by_sender_rank[l] = j;
1012 while(
curr_view->multicast_group->check_pending_sst_sends(subgroup_id)) {
1016 curr_view->multicast_group->get_shard_sst_indices(subgroup_id));
1017 while(
curr_view->multicast_group->receiver_predicate(
1018 curr_subgroup_settings, shard_ranks_by_sender_rank,
1019 num_shard_senders, gmsSST)) {
1020 auto sst_receive_handler_lambda =
1021 [
this, subgroup_id, curr_subgroup_settings,
1022 shard_ranks_by_sender_rank, num_shard_senders](
1023 uint32_t sender_rank,
volatile char* data, uint32_t size) {
1024 curr_view->multicast_group->sst_receive_handler(
1025 subgroup_id, curr_subgroup_settings, shard_ranks_by_sender_rank,
1026 num_shard_senders, sender_rank, data, size);
1028 curr_view->multicast_group->receiver_function(
1029 subgroup_id, curr_subgroup_settings, shard_ranks_by_sender_rank,
1030 num_shard_senders, gmsSST,
1031 curr_subgroup_settings.profile.window_size, sst_receive_handler_lambda);
1040 auto follower_subgroups_and_shards = std::make_shared<std::map<subgroup_id_t, uint32_t>>();
1041 for(
const auto& shard_settings_pair :
curr_view->multicast_group->get_subgroup_settings()) {
1042 const subgroup_id_t subgroup_id = shard_settings_pair.first;
1043 const uint32_t shard_num = shard_settings_pair.second.shard_num;
1044 const SubView& shard_view =
curr_view->subgroup_shard_views.at(subgroup_id).at(shard_num);
1045 uint num_shard_senders = 0;
1047 if(v) num_shard_senders++;
1049 if(num_shard_senders) {
1050 if(shard_view.
my_rank ==
curr_view->subview_rank_of_shard_leader(subgroup_id, shard_num)) {
1053 shard_settings_pair.second.num_received_offset, shard_view.
members,
1058 follower_subgroups_and_shards->emplace(subgroup_id, shard_num);
1065 auto leader_global_mins_are_ready = [
this, follower_subgroups_and_shards](
const DerechoSST& gmsSST) {
1066 for(
const auto& subgroup_shard_pair : *follower_subgroups_and_shards) {
1067 const SubView& shard_view =
curr_view->subgroup_shard_views.at(subgroup_shard_pair.first)
1068 .at(subgroup_shard_pair.second);
1070 curr_view->subview_rank_of_shard_leader(subgroup_shard_pair.first,
1071 subgroup_shard_pair.second));
1079 auto global_min_ready_continuation = [
this, follower_subgroups_and_shards](
DerechoSST& gmsSST) {
1083 gmsSST.
predicates.
insert(leader_global_mins_are_ready, global_min_ready_continuation,
1088 std::shared_ptr<std::map<subgroup_id_t, uint32_t>> follower_subgroups_and_shards,
1090 dbg_default_debug(
"GlobalMins are ready for all {} subgroup leaders this node is waiting on", follower_subgroups_and_shards->size());
1092 for(
const auto& subgroup_shard_pair : *follower_subgroups_and_shards) {
1093 const subgroup_id_t subgroup_id = subgroup_shard_pair.first;
1094 const uint32_t shard_num = subgroup_shard_pair.second;
1095 SubView& shard_view =
curr_view->subgroup_shard_views.at(subgroup_id).at(shard_num);
1096 uint num_shard_senders = 0;
1098 if(v) num_shard_senders++;
1101 subgroup_id, shard_num)];
1105 curr_view->multicast_group->get_subgroup_settings().at(subgroup_id).num_received_offset,
1111 auto everyone_echoed_pred = [
this](
const DerechoSST& gmsSST) {
1112 for(
const auto& subgroup_shard_pair :
curr_view->my_subgroups) {
1113 const SubView& shard_view =
curr_view->subgroup_shard_views.at(subgroup_shard_pair.first)
1114 .at(subgroup_shard_pair.second);
1116 int shard_member_rank =
curr_view->rank_of(shard_member);
1118 if(!
curr_view->failed[shard_member_rank]
1119 && !gmsSST.
global_min_ready[shard_member_rank][subgroup_shard_pair.first]) {
1127 auto deliver_ragged_trim_trig = [
this](
DerechoSST& gmsSST) {
1136 dbg_default_debug(
"GlobalMin has been echoed by everyone for all {} subgroups this node is in",
curr_view->my_subgroups.size());
1137 for(
const auto& subgroup_shard_pair :
curr_view->my_subgroups) {
1138 const subgroup_id_t subgroup_id = subgroup_shard_pair.first;
1139 const uint32_t shard_num = subgroup_shard_pair.second;
1140 const SubView& shard_view =
curr_view->subgroup_shard_views.at(subgroup_id).at(shard_num);
1142 curr_view->subview_rank_of_shard_leader(subgroup_id, shard_num));
1143 uint num_shard_senders = 0;
1145 if(v) num_shard_senders++;
1148 curr_view->multicast_group->get_subgroup_settings()
1150 .num_received_offset,
1151 shard_view.
members, num_shard_senders);
1155 auto persistence_finished_pred = [
this](
const DerechoSST& gmsSST) {
1157 for(
const auto& subgroup_shard_pair :
curr_view->my_subgroups) {
1158 const subgroup_id_t subgroup_id = subgroup_shard_pair.first;
1159 const uint32_t shard_num = subgroup_shard_pair.second;
1167 curr_view->subgroup_shard_views.at(subgroup_id).at(shard_num).members) {
1168 uint member_row =
curr_view->rank_of(shard_member);
1171 && persistent::unpack_version<int32_t>(
1174 < last_delivered_seq_num) {
1182 auto finish_view_change_trig = [
this](
DerechoSST& gmsSST) {
1186 gmsSST.
predicates.
insert(persistence_finished_pred, finish_view_change_trig,
1191 std::unique_lock<std::shared_timed_mutex> write_lock(
view_mutex);
1200 std::map<subgroup_id_t, SubgroupSettings> next_subgroup_settings;
1202 uint32_t new_num_received_size = sizes.first;
1203 uint32_t new_slot_size = sizes.second;
1210 std::list<tcp::socket> joiner_sockets;
1215 for(std::size_t c = 0; c <
next_view->joined.size(); ++c) {
1222 old_shard_leaders_by_id);
1243 for(
auto& joiner_socket : joiner_sockets) {
1247 for(
auto& joiner_socket : joiner_sockets) {
1250 joiner_sockets.clear();
1259 dbg_default_debug(
"Removing global TCP connections for failed node {} from RDMC and SST", failed_node_id);
1260 #ifdef USE_VERBS_API 1268 for(std::size_t i = 0; i <
next_view->joined.size(); ++i) {
1271 dbg_default_debug(
"Adding RDMC connection to node {}, at IP {} and port {}",
next_view->members[joiner_rank], std::get<0>(
next_view->member_ips_and_ports[joiner_rank]), std::get<PORT_TYPE::RDMC>(
next_view->member_ips_and_ports[joiner_rank]));
1273 #ifdef USE_VERBS_API 1275 next_view->member_ips_and_ports[joiner_rank], my_id);
1279 std::pair<ip_addr_t, uint16_t>{
1280 std::get<0>(next_view->member_ips_and_ports[joiner_rank]),
1281 std::get<PORT_TYPE::RDMC>(
1282 next_view->member_ips_and_ports[joiner_rank])});
1285 for(std::size_t i = 0; i <
next_view->joined.size(); ++i) {
1288 std::pair<ip_addr_t, uint16_t>{
1289 std::get<0>(next_view->member_ips_and_ports[joiner_rank]),
1290 std::get<PORT_TYPE::SST>(
1291 next_view->member_ips_and_ports[joiner_rank])});
1331 curr_view->gmsSST->start_predicate_evaluation();
1339 const std::map<subgroup_id_t, SubgroupSettings>& subgroup_settings,
1340 const uint32_t num_received_size,
1341 const uint32_t slot_size) {
1342 const auto num_subgroups =
curr_view->subgroup_shard_views.size();
1344 curr_view->gmsSST = std::make_shared<DerechoSST>(
1347 [
this](
const uint32_t node_id) { report_failure(node_id); },
1349 num_subgroups, num_received_size, slot_size);
1351 curr_view->multicast_group = std::make_unique<MulticastGroup>(
1353 curr_view->gmsSST, callbacks, num_subgroups, subgroup_settings,
1363 const std::map<subgroup_id_t, SubgroupSettings>& new_subgroup_settings,
1364 const uint32_t new_num_received_size,
const uint32_t new_slot_size) {
1365 const auto num_subgroups =
next_view->subgroup_shard_views.size();
1367 next_view->gmsSST = std::make_shared<DerechoSST>(
1370 [
this](
const uint32_t node_id) { report_failure(node_id); },
1372 num_subgroups, new_num_received_size, new_slot_size);
1374 next_view->multicast_group = std::make_unique<MulticastGroup>(
1377 new_subgroup_settings,
1388 next_view->gmsSST->init_local_row_from_previous(
1394 struct in_addr joiner_ip_packed;
1395 inet_aton(client_socket.
get_remote_ip().c_str(), &joiner_ip_packed);
1397 uint64_t joiner_version_code;
1400 rls_default_warn(
"Rejected a connection from client at {}. Client was running on an incompatible platform or used an incompatible compiler.",
1405 client_socket.
read(joining_client_id);
1407 if(
curr_view->rank_of(joining_client_id) != -1) {
1408 dbg_default_warn(
"Joining node at IP {} announced it has ID {}, which is already in the View!", client_socket.
get_remote_ip(), joining_client_id);
1415 uint16_t joiner_gms_port = 0;
1416 client_socket.
read(joiner_gms_port);
1417 uint16_t joiner_rpc_port = 0;
1418 client_socket.
read(joiner_rpc_port);
1419 uint16_t joiner_sst_port = 0;
1420 client_socket.
read(joiner_sst_port);
1421 uint16_t joiner_rdmc_port = 0;
1422 client_socket.
read(joiner_rdmc_port);
1429 joiner_ip_packed.s_addr);
1457 auto bind_socket_write = [&client_socket](
const char* bytes, std::size_t size) {
1458 client_socket.
write(bytes, size);
1461 client_socket.
write(size_of_view);
1467 for(
subgroup_id_t subgroup_id = 0; subgroup_id < old_shard_leaders.size(); ++subgroup_id) {
1468 for(uint32_t shard = 0; shard < old_shard_leaders[subgroup_id].size(); ++shard) {
1470 if(my_id == old_shard_leaders[subgroup_id][shard]) {
1473 if(shard_joiner != my_id) {
1493 int64_t persistent_log_length = 0;
1494 joiner_socket.
get().read(persistent_log_length);
1498 dbg_default_debug(
"Sending Replicated Object state for subgroup {} to node {}", subgroup_id, new_node_id);
1516 uint32_t num_received_size = 0;
1518 uint32_t max_shard_senders = 0;
1519 for(uint32_t shard_num = 0; shard_num < view.
subgroup_shard_views[subgroup_num].size(); ++shard_num) {
1522 if(num_shard_senders > max_shard_senders) {
1523 max_shard_senders = shard_size;
1526 num_received_size += max_shard_senders;
1528 return num_received_size;
1539 prev_view, curr_view);
1541 subgroup_allocations = std::move(temp);
1555 ++subgroup_type_id) {
1558 std::size_t num_subgroups = curr_type_subviews.size();
1560 for(uint32_t subgroup_index = 0; subgroup_index < num_subgroups; ++subgroup_index) {
1564 uint32_t num_shards = curr_type_subviews[subgroup_index].size();
1565 for(uint shard_num = 0; shard_num < num_shards; ++shard_num) {
1566 SubView& shard_view = curr_type_subviews[subgroup_index][shard_num];
1568 if(shard_view.
my_rank != -1) {
1574 subgroup_id_t prev_subgroup_id = prev_view->subgroup_ids_by_type_id.at(subgroup_type_id)
1575 .at(subgroup_index);
1576 SubView& prev_shard_view = prev_view->subgroup_shard_views[prev_subgroup_id][shard_num];
1584 subgroup_allocations[subgroup_type][subgroup_index]));
1590 std::map<subgroup_id_t, SubgroupSettings>& subgroup_settings) {
1591 uint32_t num_received_offset = 0;
1592 uint32_t slot_offset = 0;
1596 uint32_t max_shard_senders = 0;
1597 uint32_t slot_size_for_subgroup = 0;
1598 uint64_t max_payload_size = 0;
1600 for(uint32_t shard_num = 0; shard_num < num_shards; ++shard_num) {
1602 max_shard_senders = std::max(shard_view.
num_senders(), max_shard_senders);
1605 uint32_t slot_size_for_shard = profile.window_size * (profile.sst_max_msg_size + 2 *
sizeof(uint64_t));
1606 uint64_t payload_size = profile.max_msg_size -
sizeof(header);
1607 max_payload_size = std::max(payload_size, max_payload_size);
1609 profile.max_reply_msg_size -
sizeof(header),
1611 slot_size_for_subgroup = std::max(slot_size_for_shard, slot_size_for_subgroup);
1616 if(shard_view.
my_rank != -1) {
1620 subgroup_settings[subgroup_id] = {
1626 num_received_offset,
1633 num_received_offset += max_shard_senders;
1634 slot_offset += slot_size_for_subgroup;
1638 return {num_received_offset, slot_offset};
1647 int myRank = curr_view->my_rank;
1648 std::set<int> leave_ranks;
1649 std::vector<int> join_indexes;
1651 const int committed_count = gmsSST.
num_committed[curr_view->find_rank_of_leader()]
1653 for(
int change_index = 0; change_index < committed_count; change_index++) {
1655 int change_rank = curr_view->rank_of(change_id);
1656 if(change_rank != -1) {
1658 leave_ranks.emplace(change_rank);
1660 join_indexes.emplace_back(change_index);
1664 int next_num_members = curr_view->num_members - leave_ranks.size() + join_indexes.size();
1666 std::vector<node_id_t> joined, members(next_num_members), departed;
1667 std::vector<char> failed(next_num_members);
1668 std::vector<std::tuple<ip_addr_t, uint16_t, uint16_t, uint16_t, uint16_t>> member_ips_and_ports(next_num_members);
1669 int next_unassigned_rank = curr_view->next_unassigned_rank;
1670 for(std::size_t i = 0; i < join_indexes.size(); ++i) {
1671 const int join_index = join_indexes[i];
1673 struct in_addr joiner_ip_packed;
1674 joiner_ip_packed.s_addr = gmsSST.
joiner_ips[myRank][join_index];
1675 char* joiner_ip_cstr = inet_ntoa(joiner_ip_packed);
1676 std::string joiner_ip(joiner_ip_cstr);
1678 joined.emplace_back(joiner_id);
1680 int new_member_rank = curr_view->num_members - leave_ranks.size() + i;
1681 members[new_member_rank] = joiner_id;
1682 member_ips_and_ports[new_member_rank] = std::tuple{joiner_ip,
1689 for(
const auto& leaver_rank : leave_ranks) {
1690 departed.emplace_back(curr_view->members[leaver_rank]);
1693 if(leaver_rank <= curr_view->next_unassigned_rank) {
1694 next_unassigned_rank--;
1697 dbg_default_debug(
"Next view will exclude {} failed members.", leave_ranks.size());
1701 for(
int old_rank = 0; old_rank < curr_view->num_members; old_rank++) {
1703 if(leave_ranks.find(old_rank) == leave_ranks.end()) {
1704 members[new_rank] = curr_view->members[old_rank];
1705 member_ips_and_ports[new_rank] = curr_view->member_ips_and_ports[old_rank];
1706 failed[new_rank] = curr_view->failed[old_rank];
1712 int32_t my_new_rank = -1;
1713 node_id_t myID = curr_view->members[myRank];
1714 for(
int i = 0; i < next_num_members; ++i) {
1715 if(members[i] == myID) {
1720 if(my_new_rank == -1) {
1721 throw derecho_exception(
"Some other node reported that I failed. Node " + std::to_string(myID) +
" terminating.");
1724 auto next_view = std::make_unique<View>(
1725 curr_view->vid + 1, members, member_ips_and_ports, failed, joined,
1726 departed, my_new_rank, next_unassigned_rank,
1727 curr_view->subgroup_type_order);
1728 next_view->i_know_i_am_leader = curr_view->i_know_i_am_leader;
1734 std::vector<std::vector<int64_t>> old_shard_leaders_by_new_id(next_view.
subgroup_shard_views.size());
1736 for(uint32_t subgroup_index = 0; subgroup_index < type_to_old_ids.second.size(); ++subgroup_index) {
1737 subgroup_id_t old_subgroup_id = type_to_old_ids.second[subgroup_index];
1740 .at(subgroup_index);
1742 old_shard_leaders_by_new_id[new_subgroup_id].resize(new_num_shards, -1);
1743 for(uint32_t shard_num = 0; shard_num < new_num_shards; ++shard_num) {
1744 int64_t old_shard_leader = -1;
1747 != std::type_index(
typeid(
RawObject))) {
1749 if(old_shard_leader_rank >= 0) {
1751 .members[old_shard_leader_rank];
1754 old_shard_leaders_by_new_id[new_subgroup_id][shard_num] = old_shard_leader;
1758 return old_shard_leaders_by_new_id;
1762 for(
unsigned int r = 0; r < gmsSST.
get_num_rows(); r++) {
1764 if(gmsSST.
suspected[r][who] && !old[who]) {
1780 for(
int p_index = 0;
1793 int min_num_acked = gmsSST.
num_acked[myRank];
1794 for(
size_t n = 0; n < failed.size(); n++) {
1797 int num_acked_copy = gmsSST.
num_acked[n];
1798 min_num_acked = std::min(min_num_acked, num_acked_copy);
1802 return min_num_acked;
1808 for(uint row = 0; row < gmsSST.
get_num_rows(); ++row) {
1809 if(!curr_view.
failed[row]) {
1810 for(
int previous_leader_rank = 0;
1811 previous_leader_rank < rank_of_leader;
1812 ++previous_leader_rank) {
1813 if(!gmsSST.
suspected[row][previous_leader_rank]) {
1825 bool prior_changes_found =
false;
1826 int prior_leader_rank = my_rank;
1827 while(!prior_changes_found && prior_leader_rank > 0) {
1828 prior_leader_rank--;
1829 const int changes_length = gmsSST.
num_changes[prior_leader_rank]
1831 if(changes_length > my_changes_length) {
1832 prior_changes_found =
true;
1835 for(
int i = 0; i < changes_length; ++i) {
1836 if(gmsSST.
changes[prior_leader_rank][i] != gmsSST.
changes[my_rank][i]) {
1837 prior_changes_found =
true;
1845 if(prior_changes_found) {
1846 dbg_default_debug(
"Re-proposing changes from prior leader at rank {}. Num_changes is now {}", prior_leader_rank, gmsSST.
num_changes[prior_leader_rank]);
1861 return prior_changes_found;
1865 const uint32_t num_received_offset,
1866 const std::vector<node_id_t>& shard_members,
1867 uint num_shard_senders) {
1868 dbg_default_debug(
"Running leader RaggedEdgeCleanup for subgroup {}", subgroup_num);
1873 for(uint n = 0; n < shard_members.size() && !found; n++) {
1874 const auto node_id = shard_members[n];
1879 &Vc.
gmsSST->global_min[
node_rank][num_received_offset], num_shard_senders);
1886 for(uint n = 0; n < num_shard_senders; n++) {
1887 int min_num_received = Vc.
gmsSST->num_received[myRank][num_received_offset + n];
1888 for(uint r = 0; r < shard_members.size(); r++) {
1891 int num_received_copy = Vc.
gmsSST->num_received[
node_rank][num_received_offset + n];
1892 min_num_received = std::min(min_num_received, num_received_copy);
1896 gmssst::set(Vc.
gmsSST->global_min[myRank][num_received_offset + n], min_num_received);
1900 dbg_default_debug(
"Shard leader for subgroup {} finished computing global_min", subgroup_num);
1904 (
char*)std::addressof(Vc.
gmsSST->global_min[0][num_received_offset]) - Vc.
gmsSST->getBaseAddress(),
1905 sizeof(Vc.
gmsSST->global_min[0][num_received_offset]) * num_shard_senders);
1907 Vc.
gmsSST->global_min_ready, subgroup_num);
1910 log_ragged_trim(myRank, subgroup_num, num_received_offset, num_shard_senders);
1916 const uint32_t num_received_offset,
1917 uint num_shard_senders) {
1920 dbg_default_debug(
"Running follower RaggedEdgeCleanup for subgroup {}", subgroup_num);
1923 log_ragged_trim(shard_leader_rank, subgroup_num, num_received_offset, num_shard_senders);
1927 &Vc.
gmsSST->global_min[shard_leader_rank][num_received_offset],
1932 (
char*)std::addressof(Vc.
gmsSST->global_min[0][num_received_offset]) - Vc.
gmsSST->getBaseAddress(),
1933 sizeof(Vc.
gmsSST->global_min[0][num_received_offset]) * num_shard_senders);
1935 Vc.
gmsSST->global_min_ready, subgroup_num);
1939 const uint32_t subgroup_num,
const uint32_t num_received_offset,
1940 const std::vector<node_id_t>& shard_members, uint num_shard_senders) {
1943 std::vector<int32_t> max_received_indices(num_shard_senders);
1944 whenlog(std::stringstream delivery_order);
1946 delivery_order <<
"Subgroup " << subgroup_num
1950 for(uint sender_rank = 0; sender_rank < num_shard_senders; sender_rank++) {
1953 delivery_order << shard_members[sender_rank]
1955 << Vc.
gmsSST->global_min[shard_leader_rank][num_received_offset + sender_rank]
1958 max_received_indices[sender_rank]
1959 = Vc.
gmsSST->global_min[shard_leader_rank][num_received_offset + sender_rank];
1961 dbg_default_debug(
"Delivering ragged-edge messages in order: {}", delivery_order.str());
1962 Vc.
multicast_group->deliver_messages_upto(max_received_indices, subgroup_num, num_shard_senders);
1967 const uint32_t num_received_offset,
1968 const uint num_shard_senders) {
1970 std::vector<int32_t> max_received_indices(num_shard_senders);
1971 for(uint sender_rank = 0; sender_rank < num_shard_senders; sender_rank++) {
1972 max_received_indices[sender_rank]
1973 =
curr_view->gmsSST->global_min[shard_leader_rank][num_received_offset + sender_rank];
1975 uint32_t shard_num =
curr_view->my_subgroups.at(subgroup_num);
1978 max_received_indices};
1980 dbg_default_debug(
"Done logging ragged trim to disk for subgroup {}", subgroup_num);
1990 const int failed_rank =
curr_view->rank_of(who);
1991 dbg_default_debug(
"Node ID {} failure reported; marking suspected[{}]", who, failed_rank);
1995 for(std::size_t r = 0; r <
curr_view->gmsSST->suspected.size(); r++) {
2004 && failed_cnt != 0 && (failed_cnt >= (
curr_view->num_members - rip_cnt + 1) / 2)) {
2006 dbg_default_warn(
"Potential partitioning event, but partitioning safety is disabled. failed_cnt = {} but num_members - rip_cnt + 1 = {}",
2007 failed_cnt,
curr_view->num_members - rip_cnt + 1);
2009 throw derecho_exception(
"Potential partitioning event: this node is no longer in the majority and must shut down!");
2033 const std::function<
void(
char* buf)>& msg_generator,
bool cooked_send) {
2036 return curr_view->multicast_group->send(subgroup_num, payload_size,
2037 msg_generator, cooked_send);
2043 return curr_view->multicast_group->compute_global_stability_frontier(subgroup_num);
2063 std::vector<std::vector<node_id_t>> subgroup_members;
2064 for(
const auto& shard_view :
curr_view->subgroup_shard_views.at(subgroup_id)) {
2065 subgroup_members.push_back(shard_view.members);
2067 return subgroup_members;
2073 return curr_view->subgroup_shard_views.at(subgroup_id).size();
2079 auto find_id_result =
curr_view->my_subgroups.find(subgroup_id);
2080 if(find_id_result ==
curr_view->my_subgroups.end()) {
2083 return find_id_result->second;
2106 std::cout <<
"curr_view = " <<
curr_view->debug_string() << std::endl;
uint32_t subgroup_id_t
Type alias for the internal Subgroup IDs generated by ViewManager.
void send_logs()
An extra setup method only needed during total restart.
void send_view(const View &new_view, tcp::socket &client_socket)
Sends a joining node the new view that has been constructed to include it.
std::shared_ptr< tcp::tcp_connections > tcp_sockets
The same set of TCP sockets used by Group and RPCManager.
bool initialize(const std::map< uint32_t, std::pair< ip_addr_t, uint16_t >> &addresses, uint32_t node_rank) __attribute__((warn_unused_result))
socket accept()
Blocks until a remote client makes a connection to this connection listener, then returns a new socke...
bool active_leader
True if this node is the current leader and is fully active (i.e.
unsigned int get_local_index() const
Gets the index of the local row in the table.
std::vector< std::vector< int64_t > > vector_int64_2d
Type of a 2-dimensional vector used to store potential node IDs, or -1.
void follower_ragged_edge_cleanup(const subgroup_id_t subgroup_num, uint shard_leader_rank, const uint32_t num_received_offset, uint num_shard_senders)
Implements the Ragged Edge Cleanup algorithm for a non-leader node in a subgroup. ...
std::condition_variable old_views_cv
std::vector< std::vector< SubView > > subgroup_shard_views
Maps subgroup ID -> shard number -> SubView for that subgroup/shard.
const std::vector< node_id_t > members
Node IDs of members in the current view, indexed by their SST rank.
SSTFieldVector< bool > global_min_ready
Array indicating whether each shard leader (indexed by subgroup number) has published a global_min fo...
CommitMessage
A type-safe set of messages that can be sent during two-phase commit.
std::size_t get_number_of_shards_in_subgroup(subgroup_type_id_t subgroup_type, uint32_t subgroup_index)
std::vector< int > is_sender
vector selecting the senders, 0 for non-sender, non-0 for sender
One-time predicates only fire once; they are deleted once they become true.
std::string get_self_ip()
std::atomic< bool > thread_shutdown
A flag to signal background threads to shut down; set to true when the group is destroyed.
SSTField< int > num_changes
How many changes to the view have been proposed.
bool check_view_committed(tcp::socket &leader_connection)
Setup method for non-leader nodes: checks whether the initial View received in the constructor gets c...
SSTField< bool > wedged
Set after calling rdmc::wedged(), reports that this member is wedged.
const uint16_t getConfUInt16(const std::string &key)
std::mutex old_views_mutex
uint32_t subgroup_type_id_t
Type of the numeric ID used to refer to subgroup types within a Group; this is currently computed as ...
SharedLockedReference< const View > get_current_view_const()
This function is a dirty workaround for the fact that Group might need read-only access to curr_view ...
SSTFieldVector< uint16_t > joiner_gms_ports
joiner_xxx_ports are the port numbers for the joining nodes.
Bundles together a JoinResponseCode and the leader's node ID, which it also needs to send to the new ...
std::map< subgroup_id_t, uint64_t > max_payload_sizes
int32_t next_unassigned_rank
The rank of the lowest-ranked member that is not assigned to a subgroup in this View.
std::map< subgroup_id_t, std::reference_wrapper< ReplicatedObject > > ReplicatedObjectReferenceMap
SSTFieldVector< uint16_t > joiner_sst_ports
bool add_node(uint32_t new_id, const std::pair< ip_addr_t, uint16_t > &new_ip_addr_and_port)
Adds a new node to the SST TPC connections set.
int rank_of(const node_id_t &who) const
Looks up the sub-view rank of a node ID.
std::vector< std::type_index > subgroup_type_order
Indicates the order that the subgroups should be provisioned; set by Group to be the same order as it...
pred_handle insert(pred predicate, trig trigger, PredicateType type=PredicateType::ONE_TIME)
Inserts a single (predicate, trigger) pair to the appropriate predicate list.
Bundles together a set of low-level parameters for configuring Derecho groups.
unsigned int get_num_rows() const
Returns the total number of rows in the table.
A little helper class that wraps together a reference and a lock on a related mutex.
void silence()
stop complaining about node failures.
int32_t my_rank
The rank of this node within the subgroup/shard, or -1 if this node is not a member of the subgroup/s...
#define rls_default_info(...)
std::vector< std::vector< SubView > > subgroup_shard_layout_t
The data structure used to store a subgroups-and-shards layout for a single subgroup type (i...
Predicates< DerivedSST > predicates
std::unique_lock< std::mutex > unique_lock_t
void send_subgroup_object(subgroup_id_t subgroup_id, node_id_t new_node_id)
Sends a single subgroup's replicated object to a new member after a view change.
const std::string & getConfString(const std::string &key)
void finish_setup()
Completes first-time setup of the ViewManager, including synchronizing the initial SST and delivering...
uint64_t view_max_rpc_reply_payload_size
int find_rank_of_leader() const
Returns the rank of this View's leader, based on failed[].
static bool previous_leaders_suspected(const DerechoSST &gmsSST, const View &curr_view)
bool verbs_add_connection(uint32_t index, const std::string &address, uint32_t node_rank)
std::map< subgroup_id_t, uint32_t > my_subgroups
Lists the (subgroup ID, shard num) pairs that this node is a member of.
pred_handle leader_proposed_handle
const int COMMITS_AHEAD_OF_VERSION
If the currently-compiled version of the Derecho library is more recent than the last "release" versi...
const int PATCH_VERSION
The current "patch" (more-minor) version number of the Derecho library, as defined by Git...
std::unique_ptr< MulticastGroup > multicast_group
RDMC manager object used for sending multicasts.
const std::string profile
Settings for the subview.
void increment(volatile int &member)
Thread-safe increment of an integer member of GMSTableRow; ensures there is a std::atomic_signal_fenc...
std::map< subgroup_id_t, uint64_t > get_max_payload_sizes()
const shard_view_generator_t subgroup_membership_function
The function that generates all the SubViews for a View.
void receive_view_and_leaders(const node_id_t my_id, tcp::socket &leader_connection)
Constructor helper for non-leader nodes; encapsulates receiving and deserializing a View...
void remove(pred_handle &pred)
Removes a (predicate, trigger) pair previously registered with insert().
void transition_multicast_group(const std::map< subgroup_id_t, SubgroupSettings > &new_subgroup_settings, const uint32_t new_num_received_size, const uint32_t new_slot_size)
Sets up the SST and MulticastGroup for a new view, based on the settings in the current view...
bool remove_node(uint32_t node_id)
Removes a node from the SST TCP connections set.
LEADER_REDIRECT This node is not actually the leader and can't accept a join.
const uint32_t getConfUInt32(const std::string &key)
SSTFieldVector< persistent::version_t > persisted_num
This represents the highest persistent version number that has been persisted to disk at this node...
SSTField< int > num_acked
How many proposed changes have been seen.
#define dbg_default_debug(...)
int rank_of(const std::tuple< ip_addr_t, uint16_t, uint16_t, uint16_t, uint16_t > &who) const
Looks up the SST rank of an IP address.
static vector_int64_2d old_shard_leaders_by_new_ids(const View &curr_view, const View &next_view)
Constructs a vector mapping subgroup ID in the new view -> shard number -> node ID of that shard's le...
static std::unique_ptr< View > make_next_view(const std::unique_ptr< View > &curr_view, const DerechoSST &gmsSST)
Constructs the next view from the current view and the set of committed changes in the SST...
#define dbg_default_error(...)
std::vector< node_id_t > departed
List of IDs of nodes that left since the previous view, if any.
pred_handle reject_join_handle
std::unique_ptr< RestartLeaderState > restart_leader_state_machine
If this node is the restart leader and currently doing a total restart, this object contains state re...
void send_objects_to_new_members(const View &new_view, const vector_int64_2d &old_shard_leaders)
Helper method for completing view changes; determines whether this node needs to send Replicated Obje...
bool in_total_restart
Set to true in the constructor if this node must do a total restart before completing group setup; fa...
std::vector< char > failed
failed[i] is true if members[i] is considered to have failed.
static void copy_suspected(const DerechoSST &gmsSST, std::vector< bool > &old)
void set(volatile Elem &e, const Elem &value)
Thread-safe setter for DerechoSST members; ensures there is a std::atomic_signal_fence after writing ...
std::shared_ptr< DerechoSST > gmsSST
Pointer to the SST instance used by the GMS in this View.
auto bytes_size(const T &)
Just calls sizeof(T)
void await_first_view(const node_id_t my_id)
Constructor helper for the leader when it first starts; waits for enough new nodes to join to make th...
bool is_adequately_provisioned
Set to false during MulticastGroup setup if a subgroup membership function throws a subgroup_provisio...
#define CONF_DERECHO_SST_PORT
void send(subgroup_id_t subgroup_num, long long unsigned int payload_size, const std::function< void(char *buf)> &msg_generator, bool cooked_send=false)
Instructs the managed DerechoGroup's to send the next message.
void lf_initialize(const std::map< uint32_t, std::pair< ip_addr_t, uint16_t >> &ip_addrs_and_ports, uint32_t node_rank)
Initializes the global libfabric resources.
void leader_start_join(DerechoSST &gmsSST)
Runs only on the group leader; proposes new views to include new members.
void start()
Starts predicate evaluation in the current view's SST.
An exception that indicates that a subgroup membership function was unable to finish executing becaus...
void barrier_sync()
Waits until all members of the group have called this function.
std::map< std::type_index, subgroup_shard_layout_t > subgroup_allocation_map_t
The data structure used to store the subgroups-and-shards layouts for all subgroup types in a Group (...
std::map< subgroup_type_id_t, std::vector< subgroup_id_t > > subgroup_ids_by_type_id
Maps the (type, index) pairs used by users to identify subgroups to the internal subgroup IDs generat...
int32_t num_failed
Number of current outstanding failures in this view.
const bool disable_partitioning_safety
A user-configurable option that disables the checks for partitioning events.
void construct_multicast_group(CallbackSet callbacks, const std::map< subgroup_id_t, SubgroupSettings > &subgroup_settings, const uint32_t num_received_size, const uint32_t slot_size)
Creates the SST and MulticastGroup for the first time, using the current view's member list...
SSTFieldVector< message_id_t > delivered_num
This represents the highest sequence number that has been delivered at this node. ...
#define CONF_DERECHO_DISABLE_PARTITIONING_SAFETY
void deliver_ragged_trim(DerechoSST &gmsSST)
Delivers messages that were marked deliverable by the ragged trim and proceeds to finish_view_change(...
static DerechoParams from_profile(const std::string &profile)
Constructs DerechoParams specifying subgroup metadata for specified profile.
void leader_commit_initial_view()
Setup method for the leader node: sends a commit message to all non-leader nodes indicating that it i...
bool leader_prepare_initial_view(bool &leader_has_quorum)
Setup method for the leader node in total restart mode: sends a Prepare message to all non-leader nod...
void initialize_multicast_groups(CallbackSet callbacks)
Sets up RDMA sessions for the multicast groups within this group.
void leader_ragged_edge_cleanup(const subgroup_id_t subgroup_num, const uint32_t num_received_offset, const std::vector< node_id_t > &shard_members, uint num_shard_senders)
Implements the Ragged Edge Cleanup algorithm for a subgroup/shard leader, operating on the shard that...
uint64_t my_version_hashcode
A runtime constant (computed once during static initialization) that represents the current running v...
void add_view_upcall(const view_upcall_t &upcall)
Adds another function to the set of "view upcalls," which are called when the view changes to notify ...
void verbs_initialize(const std::map< uint32_t, std::string > &ip_addrs, uint32_t node_rank)
Initializes the global verbs resources.
const bool any_persistent_objects
True if any of the Replicated<T> objects in this group have a Persistent<T> field, false if none of them do.
std::thread client_listener_thread
The background thread that listens for clients connecting on our server socket.
void new_suspicion(DerechoSST &gmsSST)
Called when there is a new failure suspicion.
void new_leader_takeover(DerechoSST &gmsSST)
Runs once on a node that becomes a leader due to a failure.
void receive_initial_view(node_id_t my_id, tcp::socket &leader_connection)
Helper for joining an existing group; receives the View and parameters from the leader.
bool lf_remove_connection(uint32_t node_id)
Removes a node's TCP connection, presumably because it has failed.
static std::shared_ptr< spdlog::logger > & getDefaultLogger()
void debug_print_status() const
static void make_subgroup_maps(const SubgroupInfo &subgroup_info, const std::unique_ptr< View > &prev_view, View &curr_view)
Initializes curr_view with subgroup information based on the membership functions in subgroup_info...
uint32_t view_max_rpc_window_size
std::string ip_addr_t
Type alias for IP addresses, currently stored as strings.
void finish_view_change(DerechoSST &gmsSST)
Finishes installing the new view, assuming it is adequately provisioned.
#define CONF_DERECHO_LOCAL_IP
bool receive_join(DerechoSST &gmsSST, tcp::socket &client_socket)
Assuming this node is the leader, handles a join request from a client.
static void setEarliestVersionToSerialize(const int64_t &ver) noexcept(true)
Set the earliest version for serialization, exclusive.
bool read(char *buffer, size_t size)
Reads size bytes from the socket and writes them to the given buffer.
#define CONF_DERECHO_HEARTBEAT_MS
Common interface for all types of Replicated<T>, specifying some methods for state transfer and persi...
std::unique_ptr< RestartState > restart_state
If this node is currently doing a total restart, this object contains state related to restarting...
SSTField< int > num_installed
How many previously proposed view changes have been installed in the current view.
SSTFieldVector< uint16_t > joiner_rdmc_ports
int32_t message_id_t
Type alias for a message's unique "sequence number" or index.
pred_handle start_join_handle
void echo_ragged_trim(std::shared_ptr< std::map< subgroup_id_t, uint32_t >> follower_subgroups_and_shards, DerechoSST &gmsSST)
Runs when the leader nodes of each subgroup have finished ragged edge cleanup.
#define rls_default_warn(...)
static int min_acked(const DerechoSST &gmsSST, const std::vector< char > &failed)
pred_handle leader_committed_handle
#define CONF_DERECHO_RPC_PORT
const SubgroupInfo subgroup_info
The subgroup membership function, which will be called whenever the view changes. ...
Bundles together a set of callback functions for message delivery events.
std::vector< std::vector< node_id_t > > get_subgroup_members(subgroup_type_id_t subgroup_type, uint32_t subgroup_index)
Returns a vector of vectors listing the members of a single subgroup (identified by type and index)...
std::map< subgroup_id_t, std::map< uint32_t, std::unique_ptr< RaggedTrim > >> ragged_trim_map_t
List of logged ragged trim states, indexed by (subgroup ID, shard num), stored by pointer...
void sync_with_members() const
Does a TCP sync with each member of the SST.
const uint64_t compute_global_stability_frontier(subgroup_id_t subgroup_num)
ViewManager(const SubgroupInfo &subgroup_info, const std::vector< std::type_index > &subgroup_type_order, const bool any_persistent_objects, const std::shared_ptr< tcp::tcp_connections > &group_tcp_sockets, ReplicatedObjectReferenceMap &object_reference_map, const persistence_manager_callbacks_t &_persistence_manager_callbacks, std::vector< view_upcall_t > _view_upcalls={})
Constructor for a new group where this node is the GMS leader.
std::unique_ptr< View > next_view
May hold a pointer to the partially-constructed next view, if we are in the process of transitioning ...
const std::vector< std::tuple< ip_addr_t, uint16_t, uint16_t, uint16_t, uint16_t > > member_ips_and_ports
IP addresses and ports (gms, rpc, sst, rdmc in order) of members in the current view, indexed by their SST rank.
SharedLockedReference< View > get_current_view()
void initialize_rdmc_sst()
Performs one-time global initialization of RDMC and SST, using the current view's membership...
static persistent::version_t ragged_trim_to_latest_version(const int32_t view_id, const std::vector< int32_t > &max_received_by_sender)
Computes the persistent version corresponding to a ragged trim proposal, i.e.
ID_IN_USE The node's ID is already listed as a member of the current view, so it can't join...
void put_with_completion()
OK The new member can proceed to join as normal.
#define CONF_DERECHO_GMS_PORT
pred_handle suspected_changed_handle
void acknowledge_proposed_change(DerechoSST &gmsSST)
Updates num_acked to acknowledge a proposed change when the leader increments num_changes.
int sender_rank_of(uint32_t rank) const
Looks up the sender rank of a given member.
#define dbg_default_trace(...)
void create_threads()
Constructor helper method to encapsulate spawning the background threads.
std::function< void(const View &)> view_upcall_t
void freeze(int row_index)
Marks a row as frozen, so it will no longer update, and its corresponding node will not receive write...
std::vector< bool > last_suspected
A cached copy of the last known value of this node's suspected[] array.
size_t size() const
Just like std::vector::size(), returns the number of elements in this vector.
bool lf_add_connection(uint32_t new_id, const std::pair< ip_addr_t, uint16_t > &new_ip_addr_and_port)
Adds a node to the group via tcp.
const int MAJOR_VERSION
The current major version number of the Derecho library, as defined by Git.
static bool copy_prior_leader_proposals(DerechoSST &gmsSST)
Searches backwards from this node's row in the SST to lower-ranked rows, looking for proposed changes...
#define dbg_default_flush()
void leader_commit_change(DerechoSST &gmsSST)
Runs only on the group leader and updates num_committed when all non-failed members have acked a prop...
const char * getBaseAddress()
std::string get_remote_ip() const
static uint32_t compute_num_received_size(const View &view)
Recomputes num_received_size (the length of the num_received column in the SST) for an existing provi...
std::shared_timed_mutex view_mutex
Controls access to curr_view.
uint32_t node_id_t
Type alias for Node IDs in a Derecho group.
void deliver_in_order(const int shard_leader_rank, const subgroup_id_t subgroup_num, const uint32_t num_received_offset, const std::vector< node_id_t > &shard_members, uint num_shard_senders)
Reads the global_min for the specified subgroup from the SST (assuming it has been computed already) ...
SSTField< bool > rip
to signal a graceful exit
The GMS and derecho_group will share the same SST for efficiency.
const bool getConfBoolean(const std::string &key)
void setup_initial_tcp_connections(const View &initial_view, node_id_t my_id)
Constructor helper that initializes TCP connections (for state transfer) to the members of initial_vi...
Constructor parameter pack for SST.
tcp::connection_listener server_socket
The TCP socket the leader uses to listen for joining clients.
static bool changes_contains(const DerechoSST &gmsSST, const node_id_t q)
std::condition_variable_any view_change_cv
Notified when curr_view changes (i.e.
std::pair< uint32_t, uint32_t > derive_subgroup_settings(View &curr_view, std::map< subgroup_id_t, SubgroupSettings > &subgroup_settings)
Creates the subgroup-settings map that MulticastGroup's constructor needs (and the num_received_size ...
void saveObject(ObjectType &obj, const char *object_name) noexcept(false)
saveObject() saves a serializable object
#define CONF_DERECHO_RDMC_PORT
void leave()
Causes this node to cleanly leave the group by setting itself to "failed.".
SSTFieldVector< uint32_t > joiner_ips
If changes[i] is a Join, joiner_ips[i] is the IP address of the joining node, packed into an unsigned...
#define dbg_default_info(...)
int32_t get_my_rank()
Returns the order of this node in the sequence of members of the group.
std::thread old_view_cleanup_thread
std::lock_guard< std::mutex > lock_guard_t
std::vector< T > functional_append(const std::vector< T > &original, const T &item)
Base case for functional_append, with one argument.
LockedListAccess locked()
An empty class to be used as the "replicated type" for a subgroup that doesn't implement a Replicated...
The subset of a View associated with a single shard, or a single subgroup if the subgroup is non-shar...
std::vector< node_id_t > members
Node IDs of members in this subgroup/shard, indexed by their order in the SST.
void truncate_persistent_logs(const ragged_trim_map_t &logged_ragged_trim)
Helper function for total restart mode: Uses the RaggedTrim values in logged_ragged_trim to truncate ...
void terminate_epoch(DerechoSST &gmsSST)
Runs when all live nodes have reported they have wedged the current view (meta-wedged), and starts ragged edge cleanup to finalize the terminated epoch.
void start_meta_wedge(DerechoSST &gmsSST)
Runs when at least one membership change has been committed by the leader, and wedges the current vie...
int subview_rank_of_shard_leader(subgroup_id_t subgroup_id, uint32_t shard_index) const
Computes the within-shard rank of a particular shard's leader, based on failed[]. ...
std::tuple< persistence_manager_make_version_func_t, persistence_manager_post_persist_func_t > persistence_manager_callbacks_t
Container for whatever information is needed to describe a Group's subgroups and shards.
const int MINOR_VERSION
The current minor version number of the Derecho library, as defined by Git.
std::vector< std::vector< int64_t > > prior_view_shard_leaders
A 2-dimensional vector, indexed by (subgroup ID -> shard number), containing the ID of the node in ea...
persistence_manager_callbacks_t persistence_manager_callbacks
The persistence request func is from persistence manager.
TOTAL_RESTART The group is currently restarting from a total failure, so the new member should send i...
std::vector< view_upcall_t > view_upcalls
Functions to be called whenever the view changes, to report the new view to some other component...
SSTFieldVector< uint16_t > joiner_rpc_ports
uint32_t num_senders() const
returns the number of senders in the subview
#define CONF_DERECHO_LOCAL_ID
void report_failure(const node_id_t who)
Reports to the GMS that the given node has failed.
void log_ragged_trim(const int shard_leader_rank, const subgroup_id_t subgroup_num, const uint32_t num_received_offset, const uint num_shard_senders)
Reads the global_min values for the specified subgroup (and the shard that this node belongs to) from...
std::enable_if_t< std::is_pod< BR >::value > post_object(const F &f, const BR &br, Args &&... args)
In-place serialization is also sometimes possible.
Mode mode
Operation mode, raw mode does not do stability and delivery.
void init_joined_departed(const SubView &previous_subview)
Initialization helper method that initializes the joined and departed lists given the previous View's...
std::vector< std::type_index > subgroup_type_order
The order of subgroup types as they were declared in the Group's template parameters.
Represents the data needed to log a "ragged trim" decision to disk.
int32_t my_rank
The rank of this node (as returned by rank_of())
SSTFieldVector< node_id_t > changes
An array of the same length as View::members, containing a list of proposed changes to the view that ...
void await_rejoining_nodes(const node_id_t my_id)
Setup method for the leader when it is restarting from complete failure: waits for a restart quorum o...
int32_t get_my_shard(subgroup_type_id_t subgroup_type, uint32_t subgroup_index)
If this node is a member of the given subgroup (identified by its type and index), returns the number of the shard this node belongs to.
bool write(const char *buffer, size_t size)
Writes size bytes from the given buffer to the socket.
std::size_t to_bytes(const ByteRepresentable &b, char *v)
calls b.to_bytes(v) when b is a ByteRepresentable; calls std::memcpy() when b is POD.
void redirect_join_attempt(DerechoSST &gmsSST)
Runs on non-leaders to redirect confused new members to the current leader.
Recurrent predicates persist as long as the SST instance and fire their triggers every time they are ...
std::vector< node_id_t > joined
List of IDs of nodes that joined since the previous view, if any.
std::unique_ptr< View > curr_view
The current View, containing the state of the managed group.
initialize_rpc_objects_t initialize_subgroup_objects
A function that will be called to initialize replicated objects after transitioning to a new view...
std::vector< node_id_t > get_members()
Returns a vector listing the nodes that are currently members of the group.
SSTField< int > num_committed
How many proposed view changes have reached the commit point.
std::queue< std::unique_ptr< View > > old_views
Contains old Views that need to be cleaned up.
std::shared_lock< std::shared_timed_mutex > shared_lock_t
Base exception class for all exceptions raised by Derecho.
void put()
Writes the entire local row to all remote nodes.
virtual void send_object(tcp::socket &receiver_socket) const =0
static bool suspected_not_equal(const DerechoSST &gmsSST, const std::vector< bool > &old)
ReplicatedObjectReferenceMap & subgroup_objects
A type-erased list of references to the Replicated<T> objects in this group, indexed by their subgrou...
LockedQueue< tcp::socket > pending_join_sockets
On the leader node, contains client sockets for pending joins that have not yet been handled...
SSTFieldVector< bool > suspected
Array of same length as View::members, where each bool represents whether the corresponding member is...
virtual bool is_persistent() const =0
void truncate_logs()
An extra setup step only needed during total restart; truncates the persistent logs of this node to c...
bool verbs_remove_connection(uint32_t index)
bool exchange(T local, T &remote)
void reinit_tcp_connections(const View &initial_view, node_id_t my_id)
Another setup helper for joining nodes; re-initializes the TCP connections list to reflect the curren...
void update_tcp_connections(const View &new_view)
Updates the TCP connections pool to reflect the joined and departed members in a new view...
pred_handle change_commit_ready_handle
std::list< tcp::socket > proposed_join_sockets
The sockets connected to clients that will join in the next view, if any.
std::atomic< bool > bSilent
On a graceful exit, nodes will be agree to leave at some point, where the view manager should stop th...
void register_predicates()
Constructor helper method to encapsulate creating all the predicates.
const int32_t num_members
Number of members in this view.
#define dbg_default_warn(...)
std::string ragged_trim_filename(subgroup_id_t subgroup_num, uint32_t shard_num)
Builds a filename to use for a RaggedTrim logged to disk using its subgroup and shard IDs...