Derecho  0.9
Distributed systems toolkit for RDMA
view_manager.cpp
Go to the documentation of this file.
1 
7 #include <arpa/inet.h>
8 #include <tuple>
9 
16 
18 #include <derecho/utils/logger.hpp>
19 
20 #include <mutils/macro_utils.hpp>
21 
22 namespace derecho {
23 
24 using lock_guard_t = std::lock_guard<std::mutex>;
25 using unique_lock_t = std::unique_lock<std::mutex>;
26 using shared_lock_t = std::shared_lock<std::shared_timed_mutex>;
27 
28 /* Leader/Restart Leader Constructor */
30  const SubgroupInfo& subgroup_info,
31  const std::vector<std::type_index>& subgroup_type_order,
32  const bool any_persistent_objects,
33  const std::shared_ptr<tcp::tcp_connections>& group_tcp_sockets,
34  ReplicatedObjectReferenceMap& object_reference_map,
35  const persistence_manager_callbacks_t& _persistence_manager_callbacks,
36  std::vector<view_upcall_t> _view_upcalls)
37  : server_socket(getConfUInt16(CONF_DERECHO_GMS_PORT)),
38  thread_shutdown(false),
39  disable_partitioning_safety(getConfBoolean(CONF_DERECHO_DISABLE_PARTITIONING_SAFETY)),
40  view_upcalls(_view_upcalls),
41  subgroup_info(subgroup_info),
42  subgroup_type_order(subgroup_type_order),
43  tcp_sockets(group_tcp_sockets),
44  subgroup_objects(object_reference_map),
45  any_persistent_objects(any_persistent_objects),
46  active_leader(true),
47  persistence_manager_callbacks(_persistence_manager_callbacks) {
48  rls_default_info("Derecho library running version {}.{}.{} + {} commits",
51  if(any_persistent_objects) {
52  //Attempt to load a saved View from disk, to see if one is there
53  curr_view = persistent::loadObject<View>();
54  }
55  const uint32_t my_id = getConfUInt32(CONF_DERECHO_LOCAL_ID);
56  if(curr_view) {
57  in_total_restart = true;
58  dbg_default_debug("Found view {} on disk", curr_view->vid);
59  dbg_default_info("Logged View found on disk. Restarting in recovery mode.");
60  //The subgroup_type_order can't be serialized, but it's constant across restarts
61  curr_view->subgroup_type_order = subgroup_type_order;
62  restart_state = std::make_unique<RestartState>();
63  restart_state->load_ragged_trim(*curr_view);
64  restart_leader_state_machine = std::make_unique<RestartLeaderState>(
65  std::move(curr_view), *restart_state,
66  subgroup_info, my_id);
67  await_rejoining_nodes(my_id);
69  } else {
70  in_total_restart = false;
71  curr_view = std::make_unique<View>(
72  0, std::vector<node_id_t>{my_id},
73  std::vector<std::tuple<ip_addr_t, uint16_t, uint16_t, uint16_t, uint16_t>>{
79  std::vector<char>{0},
80  std::vector<node_id_t>{}, std::vector<node_id_t>{},
81  0, 0, subgroup_type_order);
82  await_first_view(my_id);
84  }
85 }
86 
87 /* Non-leader Constructor */
89  tcp::socket& leader_connection,
91  const std::vector<std::type_index>& subgroup_type_order,
92  const bool any_persistent_objects,
93  const std::shared_ptr<tcp::tcp_connections>& group_tcp_sockets,
94  ReplicatedObjectReferenceMap& object_reference_map,
95  const persistence_manager_callbacks_t& _persistence_manager_callbacks,
96  std::vector<view_upcall_t> _view_upcalls)
98  thread_shutdown(false),
100  view_upcalls(_view_upcalls),
101  subgroup_info(subgroup_info),
102  subgroup_type_order(subgroup_type_order),
103  tcp_sockets(group_tcp_sockets),
104  subgroup_objects(object_reference_map),
105  any_persistent_objects(any_persistent_objects),
106  active_leader(false),
107  persistence_manager_callbacks(_persistence_manager_callbacks) {
108  rls_default_info("Derecho library running version {}.{}.{} + {} commits",
111  const uint32_t my_id = getConfUInt32(CONF_DERECHO_LOCAL_ID);
112  receive_initial_view(my_id, leader_connection);
113  //As soon as we have a tentative initial view, set up the TCP connections
115 }
116 
118  thread_shutdown = true;
119  // force accept to return.
121  if(client_listener_thread.joinable()) {
122  client_listener_thread.join();
123  }
124  old_views_cv.notify_all();
125  if(old_view_cleanup_thread.joinable()) {
127  }
128 }
129 
130 /* ---------- 1. Constructor Components ------------- */
131 void ViewManager::receive_initial_view(node_id_t my_id, tcp::socket& leader_connection) {
132  JoinResponse leader_response;
133  bool leader_redirect;
134  do {
135  leader_redirect = false;
136  uint64_t leader_version_hashcode;
137  bool success;
138  dbg_default_debug("Socket connected to leader, exchanging version codes.");
139  success = leader_connection.exchange(my_version_hashcode, leader_version_hashcode);
140  if(!success) throw derecho_exception("Failed to exchange version hashcodes with the leader! Leader has crashed.");
141  if(leader_version_hashcode != my_version_hashcode) {
142  throw derecho_exception("Unable to connect to Derecho leader because the leader is running on an incompatible platform or used an incompatible compiler.");
143  }
144  success = leader_connection.write(my_id);
145  if(!success) throw derecho_exception("Failed to send ID to the leader! Leader has crashed.");
146  success = leader_connection.read(leader_response);
147  if(!success) throw derecho_exception("Failed to read initial response from leader! Leader has crashed.");
148  if(leader_response.code == JoinResponseCode::ID_IN_USE) {
149  dbg_default_error("Error! Leader refused connection because ID {} is already in use!", my_id);
151  throw derecho_exception("Leader rejected join, ID already in use.");
152  }
153  if(leader_response.code == JoinResponseCode::LEADER_REDIRECT) {
154  std::size_t ip_addr_size;
155  leader_connection.read(ip_addr_size);
156  char buffer[ip_addr_size];
157  leader_connection.read(buffer, ip_addr_size);
158  ip_addr_t leader_ip(buffer);
159  uint16_t leader_gms_port;
160  leader_connection.read(leader_gms_port);
161  dbg_default_info("That node was not the leader! Redirecting to {}", leader_ip);
162  //Use move-assignment to reconnect the socket to the given IP address, and try again
163  //(good thing that leader_connection reference is mutable)
164  leader_connection = tcp::socket(leader_ip, leader_gms_port);
165  leader_redirect = true;
166  }
167  } while(leader_redirect);
168 
170  if(in_total_restart) {
171  curr_view = persistent::loadObject<View>();
172  dbg_default_debug("In restart mode, sending view {} to leader", curr_view->vid);
173  bool success = leader_connection.write(mutils::bytes_size(*curr_view));
174  if(!success) throw derecho_exception("Restart leader crashed before sending a restart View!");
175  auto leader_socket_write = [&leader_connection](const char* bytes, std::size_t size) {
176  if(!leader_connection.write(bytes, size)) {
177  throw derecho_exception("Restart leader crashed before sending a restart View!");
178  }
179  };
180  mutils::post_object(leader_socket_write, *curr_view);
181  //Restore this non-serializeable field to curr_view before using it
182  curr_view->subgroup_type_order = subgroup_type_order;
183  restart_state = std::make_unique<RestartState>();
184  restart_state->load_ragged_trim(*curr_view);
185  dbg_default_debug("In restart mode, sending {} ragged trims to leader", restart_state->logged_ragged_trim.size());
186  /* Protocol: Send the number of RaggedTrim objects, then serialize each RaggedTrim */
187  /* Since we know this node is only a member of one shard per subgroup,
188  * the size of the outer map (subgroup IDs) is the number of RaggedTrims. */
189  success = leader_connection.write(restart_state->logged_ragged_trim.size());
190  if(!success) throw derecho_exception("Restart leader crashed before sending a restart View!");
191  for(const auto& id_to_shard_map : restart_state->logged_ragged_trim) {
192  const std::unique_ptr<RaggedTrim>& ragged_trim = id_to_shard_map.second.begin()->second; //The inner map has one entry
193  success = leader_connection.write(mutils::bytes_size(*ragged_trim));
194  if(!success) throw derecho_exception("Restart leader crashed before sending a restart View!");
195  mutils::post_object(leader_socket_write, *ragged_trim);
196  }
197  }
198  leader_connection.write(getConfUInt16(CONF_DERECHO_GMS_PORT));
199  leader_connection.write(getConfUInt16(CONF_DERECHO_RPC_PORT));
200  leader_connection.write(getConfUInt16(CONF_DERECHO_SST_PORT));
201  leader_connection.write(getConfUInt16(CONF_DERECHO_RDMC_PORT));
202 
203  receive_view_and_leaders(my_id, leader_connection);
204  dbg_default_debug("Received initial view {} from leader.", curr_view->vid);
205 }
206 
207 void ViewManager::receive_view_and_leaders(const node_id_t my_id, tcp::socket& leader_connection) {
208  //The leader will first send the size of the necessary buffer, then the serialized View
209  std::size_t size_of_view;
210  bool success = leader_connection.read(size_of_view);
211  if(!success) {
212  throw derecho_exception("Leader crashed before it could send the initial View! Try joining again at the new leader.");
213  }
214  char buffer[size_of_view];
215  success = leader_connection.read(buffer, size_of_view);
216  if(!success) {
217  throw derecho_exception("Leader crashed before it could send the initial View! Try joining again at the new leader.");
218  }
219  curr_view = mutils::from_bytes<View>(nullptr, buffer);
220  if(in_total_restart) {
221  //In total restart mode, the leader will also send the RaggedTrims it has collected
222  dbg_default_debug("In restart mode, receiving ragged trim from leader");
223  restart_state->logged_ragged_trim.clear();
224  std::size_t num_of_ragged_trims;
225  leader_connection.read(num_of_ragged_trims);
226  for(std::size_t i = 0; i < num_of_ragged_trims; ++i) {
227  std::size_t size_of_ragged_trim;
228  leader_connection.read(size_of_ragged_trim);
229  char buffer[size_of_ragged_trim];
230  leader_connection.read(buffer, size_of_ragged_trim);
231  std::unique_ptr<RaggedTrim> ragged_trim = mutils::from_bytes<RaggedTrim>(nullptr, buffer);
232  //operator[] is intentional: Create an empty inner map at subgroup_id if one does not exist
233  restart_state->logged_ragged_trim[ragged_trim->subgroup_id].emplace(
234  ragged_trim->shard_num, std::move(ragged_trim));
235  }
236  }
237  //Next, the leader will send the list of nodes to do state transfer from
238  prior_view_shard_leaders = *receive_vector2d<int64_t>(leader_connection);
239 
240  //Set up non-serialized fields of curr_view
241  curr_view->subgroup_type_order = subgroup_type_order;
242  curr_view->my_rank = curr_view->rank_of(my_id);
243 }
244 
246  CommitMessage commit_message;
247  //The leader will first sent a Prepare message, then a Commit message if the
248  //new was committed at all joining members. Either one of these could be Abort
249  //if the leader detected a failure.
250  bool success = leader_connection.read(commit_message);
251  if(!success) {
252  throw derecho_exception("Leader crashed before it could send the initial View! Try joining again at the new leader.");
253  }
254  if(commit_message == CommitMessage::PREPARE) {
255  dbg_default_debug("Leader sent PREPARE");
256  bool success = leader_connection.write(CommitMessage::ACK);
257  if(!success) {
258  throw derecho_exception("Leader crashed before it could send the initial View! Try joining again at the new leader.");
259  }
260  //After a successful Prepare, replace commit_message with the second message,
261  //which is either Commit or Abort
262  success = leader_connection.read(commit_message);
263  if(!success) {
264  throw derecho_exception("Leader crashed before it could send the initial View! Try joining again at the new leader.");
265  }
266  }
267  //This checks if either the first or the second message was Abort
268  if(commit_message == CommitMessage::ABORT) {
269  dbg_default_debug("Leader sent ABORT");
270  const uint32_t my_id = getConfUInt32(CONF_DERECHO_LOCAL_ID);
271  //Wait for a new initial view and ragged trim to be sent,
272  //so that when this method returns we can try state transfer again
273  receive_view_and_leaders(my_id, leader_connection);
274  //Update the TCP connections pool for any new/failed nodes,
275  //so we can run state transfer again.
277  }
278  //Unless the final message was Commit, we need to retry state transfer
279  return (commit_message == CommitMessage::COMMIT);
280 }
281 
283  if(!in_total_restart) {
284  return;
285  }
286  for(const auto& subgroup_and_map : restart_state->logged_ragged_trim) {
287  for(const auto& shard_and_trim : subgroup_and_map.second) {
288  persistent::saveObject(*shard_and_trim.second,
289  ragged_trim_filename(subgroup_and_map.first, shard_and_trim.first).c_str());
290  }
291  }
292  dbg_default_debug("Truncating persistent logs to conform to leader's ragged trim");
293  truncate_persistent_logs(restart_state->logged_ragged_trim);
294 }
295 
298  std::map<subgroup_id_t, SubgroupSettings> subgroup_settings_map;
299  auto sizes = derive_subgroup_settings(*curr_view, subgroup_settings_map);
300  uint32_t num_received_size = sizes.first;
301  uint32_t slot_size = sizes.second;
302  dbg_default_trace("Initial view is: {}", curr_view->debug_string());
304  //Persist the initial View to disk as soon as possible, which is after my_subgroups has been initialized
306  }
307 
308  dbg_default_debug("Initializing SST and RDMC for the first time.");
309  construct_multicast_group(callbacks, subgroup_settings_map, num_received_size, slot_size);
310  curr_view->gmsSST->vid[curr_view->my_rank] = curr_view->vid;
311 }
312 
314  //At this point curr_view has been committed by the leader
315  if(in_total_restart) {
316  //If we were doing total restart, it has completed successfully
317  restart_state.reset();
318  in_total_restart = false;
319  //The restart leader now gives up its leader role to the "real" leader
320  active_leader = curr_view->i_am_leader();
321  }
322  last_suspected = std::vector<bool>(curr_view->members.size());
323  curr_view->gmsSST->put();
324  curr_view->gmsSST->sync_with_members();
325  dbg_default_debug("Done setting up initial SST and RDMC");
326 
327  if(curr_view->vid != 0 && curr_view->my_rank != curr_view->find_rank_of_leader()) {
328  // If this node is joining an existing group with a non-initial view, copy the leader's num_changes, num_acked, and num_committed
329  // Otherwise, you'll immediately think that there's a new proposed view change because gmsSST.num_changes[leader] > num_acked[my_rank]
330  curr_view->gmsSST->init_local_change_proposals(curr_view->find_rank_of_leader());
331  curr_view->gmsSST->put();
332  dbg_default_debug("Joining node initialized its SST row from the leader");
333  }
334  create_threads();
336 
338  for(auto& view_upcall : view_upcalls) {
339  view_upcall(*curr_view);
340  }
341 }
342 
344  if(!in_total_restart) {
345  return;
346  }
347  //The restart leader doesn't have curr_view
348  const View& restart_view = curr_view ? *curr_view : restart_leader_state_machine->get_restart_view();
349  /* If we're in total restart mode, prior_view_shard_leaders is equal
350  * to restart_state->restart_shard_leaders */
352  for(subgroup_id_t subgroup_id = 0; subgroup_id < prior_view_shard_leaders.size(); ++subgroup_id) {
353  for(uint32_t shard = 0; shard < prior_view_shard_leaders[subgroup_id].size(); ++shard) {
354  if(my_id == prior_view_shard_leaders[subgroup_id][shard]) {
355  dbg_default_debug("This node is the restart leader for subgroup {}, shard {}. Sending object data to shard members.", subgroup_id, shard);
356  //Send object data to all shard members, since they will all be in receive_objects()
357  for(node_id_t shard_member : restart_view.subgroup_shard_views[subgroup_id][shard].members) {
358  if(shard_member != my_id) {
359  send_subgroup_object(subgroup_id, shard_member);
360  }
361  }
362  }
363  }
364  }
365 }
366 
368  //Establish TCP connections to each other member of the view in ascending order
369  for(int i = 0; i < initial_view.num_members; ++i) {
370  if(initial_view.members[i] != my_id) {
371  tcp_sockets->add_node(initial_view.members[i],
372  {std::get<0>(initial_view.member_ips_and_ports[i]),
373  std::get<PORT_TYPE::RPC>(initial_view.member_ips_and_ports[i])});
374  dbg_default_debug("Established a TCP connection to node {}", initial_view.members[i]);
375  }
376  }
377 }
378 
379 void ViewManager::reinit_tcp_connections(const View& initial_view, node_id_t my_id) {
380  //Delete sockets for failed members no longer in the view
381  tcp_sockets->filter_to(initial_view.members);
382  //Recheck the members list and establish connections to any new members
383  for(int i = 0; i < initial_view.num_members; ++i) {
384  if(initial_view.members[i] != my_id
385  && !tcp_sockets->contains_node(initial_view.members[i])) {
386  tcp_sockets->add_node(initial_view.members[i],
387  {std::get<0>(initial_view.member_ips_and_ports[i]),
388  std::get<PORT_TYPE::RPC>(initial_view.member_ips_and_ports[i])});
389  dbg_default_debug("Established a TCP connection to node {}", initial_view.members[i]);
390  }
391  }
392 }
393 
395  dbg_default_debug("Starting predicate evaluation");
396  curr_view->gmsSST->start_predicate_evaluation();
397 }
398 
400  for(const auto& id_to_shard_map : logged_ragged_trim) {
401  subgroup_id_t subgroup_id = id_to_shard_map.first;
402  uint32_t my_shard_id;
403  //On the restart leader, the proposed view is still in RestartLeaderState
404  //On all the other nodes, it's been received and stored in curr_view
405  const View& restart_view = curr_view ? *curr_view : restart_leader_state_machine->get_restart_view();
406  const auto find_my_shard = restart_view.my_subgroups.find(subgroup_id);
407  if(find_my_shard == restart_view.my_subgroups.end()) {
408  continue;
409  }
410  my_shard_id = find_my_shard->second;
411  const auto& my_shard_ragged_trim = id_to_shard_map.second.at(my_shard_id);
413  my_shard_ragged_trim->vid, my_shard_ragged_trim->max_received_by_sender);
414  dbg_default_trace("Truncating persistent log for subgroup {} to version {}", subgroup_id, max_delivered_version);
416  subgroup_objects.at(subgroup_id).get().truncate(max_delivered_version);
417  }
418 }
419 
421  std::map<node_id_t, tcp::socket> waiting_join_sockets;
422  std::set<node_id_t> members_sent_view;
423  curr_view->is_adequately_provisioned = false;
424  bool joiner_failed = false;
425  do {
426  while(!curr_view->is_adequately_provisioned) {
427  tcp::socket client_socket = server_socket.accept();
428  uint64_t joiner_version_code;
429  client_socket.exchange(my_version_hashcode, joiner_version_code);
430  if(joiner_version_code != my_version_hashcode) {
431  rls_default_warn("Rejected a connection from client at {}. Client was running on an incompatible platform or used an incompatible compiler.", client_socket.get_remote_ip());
432  continue;
433  }
434  node_id_t joiner_id = 0;
435  client_socket.read(joiner_id);
436  if(curr_view->rank_of(joiner_id) != -1) {
437  client_socket.write(JoinResponse{JoinResponseCode::ID_IN_USE, my_id});
438  continue;
439  }
440  client_socket.write(JoinResponse{JoinResponseCode::OK, my_id});
441  uint16_t joiner_gms_port = 0;
442  client_socket.read(joiner_gms_port);
443  uint16_t joiner_rpc_port = 0;
444  client_socket.read(joiner_rpc_port);
445  uint16_t joiner_sst_port = 0;
446  client_socket.read(joiner_sst_port);
447  uint16_t joiner_rdmc_port = 0;
448  client_socket.read(joiner_rdmc_port);
449  const ip_addr_t& joiner_ip = client_socket.get_remote_ip();
450  ip_addr_t my_ip = client_socket.get_self_ip();
451  //Construct a new view by appending this joiner to the previous view
452  //None of these views are ever installed, so we don't use curr_view/next_view like normal
453  curr_view = std::make_unique<View>(curr_view->vid,
454  functional_append(curr_view->members, joiner_id),
455  functional_append(curr_view->member_ips_and_ports,
456  {joiner_ip, joiner_gms_port, joiner_rpc_port, joiner_sst_port, joiner_rdmc_port}),
457  std::vector<char>(curr_view->num_members + 1, 0),
458  functional_append(curr_view->joined, joiner_id),
459  std::vector<node_id_t>{}, 0, 0,
461  make_subgroup_maps(subgroup_info, std::unique_ptr<View>(), *curr_view);
462  waiting_join_sockets.emplace(joiner_id, std::move(client_socket));
463  dbg_default_debug("Node {} connected from IP address {} and GMS port {}", joiner_id, joiner_ip, joiner_gms_port);
464  }
465  joiner_failed = false;
466  for(auto waiting_sockets_iter = waiting_join_sockets.begin();
467  waiting_sockets_iter != waiting_join_sockets.end();) {
468  std::size_t view_buffer_size = mutils::bytes_size(*curr_view);
469  char view_buffer[view_buffer_size];
470  bool send_success;
471  //Within this try block, any send that returns failure throws the ID of the node that failed
472  try {
473  //First send the View
474  send_success = waiting_sockets_iter->second.write(view_buffer_size);
475  if(!send_success) {
476  throw waiting_sockets_iter->first;
477  }
478  mutils::to_bytes(*curr_view, view_buffer);
479  send_success = waiting_sockets_iter->second.write(view_buffer, view_buffer_size);
480  if(!send_success) {
481  throw waiting_sockets_iter->first;
482  }
483  //Then send "0" as the size of the "old shard leaders" vector, since there are no old leaders
484  send_success = waiting_sockets_iter->second.write(std::size_t{0});
485  if(!send_success) {
486  throw waiting_sockets_iter->first;
487  }
488  members_sent_view.emplace(waiting_sockets_iter->first);
489  waiting_sockets_iter++;
490  } catch(node_id_t failed_joiner_id) {
491  dbg_default_warn("Node {} failed after contacting the leader! Removing it from the initial view.", failed_joiner_id);
492  //Remove the failed client and recompute the view
493  std::vector<node_id_t> filtered_members(curr_view->members.size() - 1);
494  std::vector<std::tuple<ip_addr_t, uint16_t, uint16_t, uint16_t, uint16_t>> filtered_ips_and_ports(curr_view->member_ips_and_ports.size() - 1);
495  std::vector<node_id_t> filtered_joiners(curr_view->joined.size() - 1);
496  std::remove_copy(curr_view->members.begin(), curr_view->members.end(),
497  filtered_members.begin(), failed_joiner_id);
498  std::remove_copy(curr_view->member_ips_and_ports.begin(),
499  curr_view->member_ips_and_ports.end(),
500  filtered_ips_and_ports.begin(),
501  curr_view->member_ips_and_ports[curr_view->rank_of(failed_joiner_id)]);
502  std::remove_copy(curr_view->joined.begin(), curr_view->joined.end(),
503  filtered_joiners.begin(), failed_joiner_id);
504  curr_view = std::make_unique<View>(0, filtered_members, filtered_ips_and_ports,
505  std::vector<char>(curr_view->num_members - 1, 0), filtered_joiners,
506  std::vector<node_id_t>{}, 0, 0,
508  /* This will update curr_view->is_adequately_provisioned, so set joiner_failed to true
509  * to start over from the beginning and test if we need to wait for more joiners. */
510  make_subgroup_maps(subgroup_info, std::unique_ptr<View>(), *curr_view);
511  waiting_join_sockets.erase(waiting_sockets_iter);
512  joiner_failed = true;
513  break;
514  }
515  } //for (waiting_join_sockets)
516 
517  if(joiner_failed) {
518  for(const node_id_t& member_sent_view : members_sent_view) {
519  dbg_default_debug("Sending view abort message to node {}", member_sent_view);
520  waiting_join_sockets.at(member_sent_view).write(CommitMessage::ABORT);
521  }
522  members_sent_view.clear();
523  }
524  } while(joiner_failed);
525 
526  dbg_default_trace("Decided on initial view: {}", curr_view->debug_string());
527  //At this point, we have successfully sent an initial view to all joining nodes, so we can commit it
528  //There's no state-transfer step to do, so we don't have to wait for state transfer to complete
529  //before committing this view, and the Group constructor's state-transfer methods will do nothing
530  for(auto waiting_sockets_iter = waiting_join_sockets.begin();
531  waiting_sockets_iter != waiting_join_sockets.end();) {
532  dbg_default_debug("Sending prepare and commit messages to node {}", waiting_sockets_iter->first);
533  waiting_sockets_iter->second.write(CommitMessage::PREPARE);
534  waiting_sockets_iter->second.write(CommitMessage::COMMIT);
535  waiting_sockets_iter = waiting_join_sockets.erase(waiting_sockets_iter);
536  }
537 }
538 
540  bool quorum_achieved = false;
541  while(!quorum_achieved) {
543  dbg_default_debug("Reached a quorum of nodes from view {}, created view {}", restart_leader_state_machine->get_curr_view().vid, restart_leader_state_machine->get_restart_view().vid);
544  quorum_achieved = true;
545  //Compute a final ragged trim
546  //Actually, I don't think there's anything to "compute" because
547  //we only kept the latest ragged trim from each subgroup and shard
548  //So just mark all of the RaggedTrims with the "restart leader" value to stamp them with our approval
549  for(auto& subgroup_to_map : restart_state->logged_ragged_trim) {
550  for(auto& shard_trim_pair : subgroup_to_map.second) {
551  shard_trim_pair.second->leader_id = std::numeric_limits<node_id_t>::max();
552  }
553  }
554  whendebug(restart_leader_state_machine->print_longest_logs(););
555 
556  //Send the restart view, ragged trim, and restart shard leaders to all the members
557  int64_t failed_node_id = restart_leader_state_machine->send_restart_view();
558  //If a node failed while waiting for the quorum, abort this restart view and try again
559  if(failed_node_id != -1) {
560  dbg_default_warn("Node {} failed while waiting for restart leader to reach a quorum!", failed_node_id);
561  quorum_achieved = restart_leader_state_machine->resend_view_until_quorum_lost();
562  }
563  }
564  dbg_default_debug("Successfully sent restart view {} to all restarted nodes", restart_leader_state_machine->get_restart_view().vid);
565  prior_view_shard_leaders = restart_state->restart_shard_leaders;
566  //Now control will return to Group to do state transfer before confirming this view
567 }
568 
569 bool ViewManager::leader_prepare_initial_view(bool& leader_has_quorum) {
571  dbg_default_trace("Sending prepare messages for restart View");
572  int64_t failed_node_id = restart_leader_state_machine->send_prepare();
573  if(failed_node_id != -1) {
574  dbg_default_warn("Node {} failed when sending Prepare messages for the restart view!", failed_node_id);
575  leader_has_quorum = restart_leader_state_machine->resend_view_until_quorum_lost();
576  //If there was at least one failure, we (may) need to do state transfer again, so return false
577  //The out-parameter will tell the leader if it also needs to wait for more joins
578  return false;
579  }
580  }
581  return true;
582 }
583 
586  dbg_default_trace("Decided on restart view: {}", restart_leader_state_machine->get_restart_view().debug_string());
587  //Commit the restart view at all joining clients
588  restart_leader_state_machine->send_commit();
589  curr_view = restart_leader_state_machine->take_restart_view();
590  //After sending the commit messages, it's safe to discard the restart state
592  }
593 }
594 
596  dbg_default_debug("Starting global initialization of RDMC and SST, including internal TCP connection setup");
597  // construct member_ips
598  auto member_ips_and_rdmc_ports_map = make_member_ips_and_ports_map<PORT_TYPE::RDMC>(*curr_view);
599  if(!rdmc::initialize(member_ips_and_rdmc_ports_map,
600  curr_view->members[curr_view->my_rank])) {
601  std::cout << "Global setup failed" << std::endl;
602  exit(0);
603  }
604  auto member_ips_and_sst_ports_map = make_member_ips_and_ports_map<PORT_TYPE::SST>(*curr_view);
605 
606 #ifdef USE_VERBS_API
607  sst::verbs_initialize(member_ips_and_sst_ports_map,
608  curr_view->members[curr_view->my_rank]);
609 #else
610  sst::lf_initialize(member_ips_and_sst_ports_map,
611  curr_view->members[curr_view->my_rank]);
612 #endif
613 }
614 
616  client_listener_thread = std::thread{[this]() {
617  pthread_setname_np(pthread_self(), "client_thread");
618  while(!thread_shutdown) {
619  tcp::socket client_socket = server_socket.accept();
620  dbg_default_debug("Background thread got a client connection from {}", client_socket.get_remote_ip());
621  pending_join_sockets.locked().access.emplace_back(std::move(client_socket));
622  }
623  }};
624 
625  old_view_cleanup_thread = std::thread([this]() {
626  pthread_setname_np(pthread_self(), "old_view");
627  while(!thread_shutdown) {
628  unique_lock_t old_views_lock(old_views_mutex);
629  old_views_cv.wait(old_views_lock, [this]() {
630  return !old_views.empty() || thread_shutdown;
631  });
632  if(!thread_shutdown) {
633  old_views.front().reset();
634  old_views.pop();
635  }
636  }
637  });
638 }
639 
641  /* Note that each trigger function must be wrapped in a lambda because it's
642  * a member function, and lambdas are the only way to bind "this" to a member
643  * function invocation. */
644  auto suspected_changed = [this](const DerechoSST& sst) {
646  };
647  auto suspected_changed_trig = [this](DerechoSST& sst) { new_suspicion(sst); };
648 
649  auto start_join_pred = [this](const DerechoSST& sst) {
650  return active_leader && has_pending_join();
651  };
652  auto start_join_trig = [this](DerechoSST& sst) { leader_start_join(sst); };
653 
654  auto reject_join_pred = [this](const DerechoSST& sst) {
655  return !active_leader && has_pending_join();
656  };
657  auto reject_join = [this](DerechoSST& sst) { redirect_join_attempt(sst); };
658 
659  auto change_commit_ready = [this](const DerechoSST& gmsSST) {
660  return active_leader
661  && min_acked(gmsSST, curr_view->failed) > gmsSST.num_committed[curr_view->my_rank];
662  };
663  auto commit_change = [this](DerechoSST& sst) { leader_commit_change(sst); };
664 
665  auto leader_proposed_change = [this](const DerechoSST& gmsSST) {
666  return gmsSST.num_changes[curr_view->find_rank_of_leader()]
667  > gmsSST.num_acked[curr_view->my_rank];
668  };
669  auto ack_proposed_change = [this](DerechoSST& sst) {
671  };
672 
673  auto leader_committed_changes = [this](const DerechoSST& gmsSST) {
674  return gmsSST.num_committed[curr_view->find_rank_of_leader()]
675  > gmsSST.num_installed[curr_view->my_rank];
676  };
677  auto view_change_trig = [this](DerechoSST& sst) { start_meta_wedge(sst); };
678 
680  suspected_changed_handle = curr_view->gmsSST->predicates.insert(
681  suspected_changed, suspected_changed_trig,
683  }
684  if(!start_join_handle.is_valid()) {
685  start_join_handle = curr_view->gmsSST->predicates.insert(
686  start_join_pred, start_join_trig, sst::PredicateType::RECURRENT);
687  }
689  reject_join_handle = curr_view->gmsSST->predicates.insert(reject_join_pred, reject_join,
691  }
693  change_commit_ready_handle = curr_view->gmsSST->predicates.insert(
694  change_commit_ready, commit_change, sst::PredicateType::RECURRENT);
695  }
697  leader_proposed_handle = curr_view->gmsSST->predicates.insert(
698  leader_proposed_change, ack_proposed_change,
700  }
702  leader_committed_handle = curr_view->gmsSST->predicates.insert(
703  leader_committed_changes, view_change_trig,
705  }
706 }
707 
708 /* ------------- 2. Predicate-Triggers That Implement View Management Logic ---------- */
709 
711  // keep calm?
712  if(bSilent) {
713  return;
714  }
715 
716  dbg_default_debug("Suspected[] changed");
717  View& Vc = *curr_view;
718  const int my_rank = curr_view->my_rank;
719  //Cache this before changing failed[], so we can see if the leader changed
720  const int old_leader_rank = curr_view->find_rank_of_leader();
721  int num_left = 0;
722  // Aggregate suspicions into gmsSST[myRank].Suspected;
723  for(int r = 0; r < Vc.num_members; r++) {
724  for(int who = 0; who < Vc.num_members; who++) {
725  gmssst::set(gmsSST.suspected[my_rank][who],
726  gmsSST.suspected[my_rank][who] || gmsSST.suspected[r][who]);
727  }
728  if(gmsSST.rip[r]) {
729  num_left++;
730  }
731  }
732 
733  for(int rank = 0; rank < Vc.num_members; rank++) {
734  if(gmsSST.suspected[my_rank][rank] && !last_suspected[rank]) {
735  // This is safer than copy_suspected, since suspected[] might change during this loop
736  last_suspected[rank] = gmsSST.suspected[my_rank][rank];
737  dbg_default_debug("Marking {} failed", Vc.members[rank]);
738 
739  if(!gmsSST.rip[my_rank] && Vc.num_failed != 0
740  && (Vc.num_failed - num_left >= (Vc.num_members - num_left + 1) / 2)) {
742  dbg_default_warn("Potential partitioning event, but partitioning safety is disabled. num_failed - num_left = {} but num_members - num_left + 1 = {}",
743  Vc.num_failed - num_left, Vc.num_members - num_left + 1);
744  } else {
745  throw derecho_exception("Potential partitioning event: this node is no longer in the majority and must shut down!");
746  }
747  }
748 
749  dbg_default_debug("GMS telling SST to freeze row {}", rank);
750  gmsSST.freeze(rank);
751  //These two lines are the same as Vc.wedge()
752  Vc.multicast_group->wedge();
753  gmssst::set(gmsSST.wedged[my_rank], true);
754  //Synchronize Vc.failed with gmsSST.suspected
755  Vc.failed[rank] = true;
756  Vc.num_failed++;
757 
758  if(!gmsSST.rip[my_rank] && Vc.num_failed != 0
759  && (Vc.num_failed - num_left >= (Vc.num_members - num_left + 1) / 2)) {
761  dbg_default_warn("Potential partitioning event, but partitioning safety is disabled. num_failed - num_left = {} but num_members - num_left + 1 = {}",
762  Vc.num_failed - num_left, Vc.num_members - num_left + 1);
763  } else {
764  throw derecho_exception("Potential partitioning event: this node is no longer in the majority and must shut down!");
765  }
766  }
767 
768  // push change to gmsSST.suspected[myRank]
769  gmsSST.put(gmsSST.suspected);
770  // push change to gmsSST.wedged[myRank]
771  gmsSST.put(gmsSST.wedged);
772  const int new_leader_rank = Vc.find_rank_of_leader();
773  //Only propose the change if there was no change in leadership
774  if(my_rank == new_leader_rank && my_rank == old_leader_rank
775  && !changes_contains(gmsSST, Vc.members[rank])) {
776  const int next_change_index = gmsSST.num_changes[my_rank] - gmsSST.num_installed[my_rank];
777  if(next_change_index == (int)gmsSST.changes.size()) {
778  throw derecho_exception("Ran out of room in the pending changes list!");
779  }
780 
781  gmssst::set(gmsSST.changes[my_rank][next_change_index],
782  Vc.members[rank]); // Reports the failure
783  gmssst::increment(gmsSST.num_changes[my_rank]);
784  dbg_default_debug("Leader proposed a change to remove failed node {}", Vc.members[rank]);
785  gmsSST.put(gmsSST.changes, next_change_index);
786  gmsSST.put(gmsSST.num_changes);
787  }
788  }
789  }
790  //Determine if the detected failures made me the new leader, and register the takeover predicate
791  if(my_rank == Vc.find_rank_of_leader() && my_rank != old_leader_rank) {
792  dbg_default_debug("The current leader failed, so this node will take over as leader");
793  auto leader_change_finished = [this](const DerechoSST& sst) {
794  return curr_view->i_am_leader() && previous_leaders_suspected(sst, *curr_view);
795  };
796  auto leader_change_trigger = [this](DerechoSST& sst) {
798  };
799  gmsSST.predicates.insert(leader_change_finished, leader_change_trigger,
801  }
802 }
803 
805  dbg_default_debug("GMS handling a new client connection");
806  if((gmsSST.num_changes[curr_view->my_rank] - gmsSST.num_committed[curr_view->my_rank])
807  == static_cast<int>(curr_view->members.size())) {
808  dbg_default_debug("Delaying handling the new client, there are already {} pending changes", curr_view->members.size());
809  return;
810  }
811  {
812  //Hold the lock on pending_join_sockets while moving a socket into proposed_join_sockets
813  auto pending_join_sockets_locked = pending_join_sockets.locked();
815  pending_join_sockets_locked.access,
816  pending_join_sockets_locked.access.begin());
817  }
818  bool success = receive_join(gmsSST, proposed_join_sockets.back());
819  //If the join failed, close the socket
820  if(!success) proposed_join_sockets.pop_back();
821 }
822 
824  tcp::socket client_socket;
825  {
826  auto pending_join_sockets_locked = pending_join_sockets.locked();
827  client_socket = std::move(pending_join_sockets_locked.access.front());
828  pending_join_sockets_locked.access.pop_front();
829  }
830  node_id_t joiner_id;
831  client_socket.read(joiner_id);
833  curr_view->members[curr_view->my_rank]});
834  //Send the client the IP address of the current leader
835  const int rank_of_leader = curr_view->find_rank_of_leader();
836  client_socket.write(mutils::bytes_size(std::get<0>(
837  curr_view->member_ips_and_ports[rank_of_leader])));
838  auto bind_socket_write = [&client_socket](const char* bytes, std::size_t size) {
839  client_socket.write(bytes, size);
840  };
841  mutils::post_object(bind_socket_write,
842  std::get<0>(curr_view->member_ips_and_ports[rank_of_leader]));
843  client_socket.write(std::get<PORT_TYPE::GMS>(
844  curr_view->member_ips_and_ports[rank_of_leader]));
845 }
846 
848  bool changes_copied = copy_prior_leader_proposals(gmsSST);
849  dbg_default_debug("Taking over as the new leader; everyone suspects prior leaders.");
850  //For each node that I suspect, make sure a change is proposed to remove it
851  const unsigned int my_rank = gmsSST.get_local_index();
852  for(int rank = 0; rank < curr_view->num_members; ++rank) {
853  if(gmsSST.suspected[my_rank][rank]
854  && !changes_contains(gmsSST, curr_view->members[rank])) {
855  const int next_change_index = gmsSST.num_changes[my_rank] - gmsSST.num_installed[my_rank];
856  if(next_change_index == (int)gmsSST.changes.size()) {
857  throw derecho_exception("Ran out of room in the pending changes list!");
858  }
859 
860  gmssst::set(gmsSST.changes[my_rank][next_change_index],
861  curr_view->members[rank]);
862  gmssst::increment(gmsSST.num_changes[my_rank]);
863  dbg_default_debug("Leader proposed a change to remove failed node {}", curr_view->members[rank]);
864  //If changes were copied, we'll have to push the whole vector
865  //otherwise we can just push the new element
866  if(!changes_copied) {
867  gmsSST.put(gmsSST.changes, next_change_index);
868  gmsSST.put(gmsSST.num_changes);
869  }
870  }
871  }
872  if(changes_copied) {
873  gmsSST.put(gmsSST.changes);
874  gmsSST.put(gmsSST.num_changes);
875  }
876  //I am now "awake" as the leader and can take new actions
877  active_leader = true;
878 }
879 
881  gmssst::set(gmsSST.num_committed[gmsSST.get_local_index()],
882  min_acked(gmsSST, curr_view->failed)); // Leader commits a new request
883  dbg_default_debug("Leader committing change proposal #{}", gmsSST.num_committed[gmsSST.get_local_index()]);
884  gmsSST.put(gmsSST.num_committed);
885 }
886 
888  const int myRank = gmsSST.get_local_index();
889  const int leader = curr_view->find_rank_of_leader();
890  dbg_default_debug("Detected that leader proposed change #{}. Acknowledging.", gmsSST.num_changes[leader]);
891  if(myRank != leader) {
892  // Echo the count
893  gmssst::set(gmsSST.num_changes[myRank], gmsSST.num_changes[leader]);
894 
895  // Echo (copy) the vector including the new changes
896  gmssst::set(gmsSST.changes[myRank], gmsSST.changes[leader],
897  gmsSST.changes.size());
898  // Echo the new member's IP and ports
899  gmssst::set(gmsSST.joiner_ips[myRank], gmsSST.joiner_ips[leader],
900  gmsSST.joiner_ips.size());
901  gmssst::set(gmsSST.joiner_gms_ports[myRank], gmsSST.joiner_gms_ports[leader],
902  gmsSST.joiner_gms_ports.size());
903  gmssst::set(gmsSST.joiner_rpc_ports[myRank], gmsSST.joiner_rpc_ports[leader],
904  gmsSST.joiner_rpc_ports.size());
905  gmssst::set(gmsSST.joiner_sst_ports[myRank], gmsSST.joiner_sst_ports[leader],
906  gmsSST.joiner_sst_ports.size());
907  gmssst::set(gmsSST.joiner_rdmc_ports[myRank], gmsSST.joiner_rdmc_ports[leader],
908  gmsSST.joiner_rdmc_ports.size());
909  gmssst::set(gmsSST.num_committed[myRank], gmsSST.num_committed[leader]);
910  }
911 
912  // Notice a new request, acknowledge it
913  gmssst::set(gmsSST.num_acked[myRank], gmsSST.num_changes[myRank]);
914  /* breaking the above put statement into individual put calls, to be sure that
915  * if we were relying on any ordering guarantees, we won't run into issue when
916  * guarantees do not hold*/
917  gmsSST.put(gmsSST.changes);
918  //This pushes the contiguous set of joiner_xxx_ports fields all at once
919  gmsSST.put(gmsSST.joiner_ips.get_base() - gmsSST.getBaseAddress(),
920  gmsSST.num_changes.get_base() - gmsSST.joiner_ips.get_base());
921  gmsSST.put(gmsSST.num_changes);
922  gmsSST.put(gmsSST.num_committed);
923  gmsSST.put(gmsSST.num_acked);
924  gmsSST.put(gmsSST.num_installed);
925  dbg_default_debug("Wedging current view.");
926  curr_view->wedge();
927  dbg_default_debug("Done wedging current view.");
928 }
929 
931  dbg_default_debug("Meta-wedging view {}", curr_view->vid);
932  // Disable all the other SST predicates, except suspected_changed and the
933  // one I'm about to register
938 
939  curr_view->wedge();
940 
941  /* We now need to wait for all other nodes to wedge the current view,
942  * which is called "meta-wedged." To do that, this predicate trigger
943  * creates a new predicate that will fire when meta-wedged is true, and
944  * registers the next epoch termination method as its trigger.
945  */
946  auto is_meta_wedged = [this](const DerechoSST& gmsSST) {
947  for(unsigned int n = 0; n < gmsSST.get_num_rows(); ++n) {
948  if(!curr_view->failed[n] && !gmsSST.wedged[n]) {
949  return false;
950  }
951  }
952  return true;
953  };
954  auto meta_wedged_continuation = [this](DerechoSST& gmsSST) {
955  terminate_epoch(gmsSST);
956  };
957  gmsSST.predicates.insert(is_meta_wedged, meta_wedged_continuation,
959 }
960 
962  dbg_default_debug("MetaWedged is true; continuing epoch termination");
963  // If this is the first time terminate_epoch() was called, next_view will still be null
964  bool first_call = false;
965  if(!next_view) {
966  first_call = true;
967  }
968  std::unique_lock<std::shared_timed_mutex> write_lock(view_mutex);
970  dbg_default_debug("Checking provisioning of view {}", next_view->vid);
972  if(!next_view->is_adequately_provisioned) {
973  dbg_default_debug("Next view would not be adequately provisioned, waiting for more joins.");
974  if(first_call) {
975  // Re-register the predicates for accepting and acknowledging joins
977  // But remove the one for start_meta_wedge
979  }
980  // Construct a predicate that watches for any new committed change that is a join
981  int curr_num_committed = gmsSST.num_committed[curr_view->find_rank_of_leader()];
982  auto leader_committed_change = [this, curr_num_committed](const DerechoSST& gmsSST) {
983  return gmsSST.num_committed[curr_view->find_rank_of_leader()] > curr_num_committed;
984  };
985  // Construct a trigger that will re-call terminate_epoch()
986  auto retry_next_view = [this](DerechoSST& sst) {
988  };
989  gmsSST.predicates.insert(leader_committed_change, retry_next_view,
991  return;
992  }
993  // If execution reached here, we have a valid next view
994 
995  // go through all subgroups first and acknowledge all messages received through SST
996  for(const auto& shard_settings_pair :
997  curr_view->multicast_group->get_subgroup_settings()) {
998  const subgroup_id_t subgroup_id = shard_settings_pair.first;
999  const auto& curr_subgroup_settings = shard_settings_pair.second;
1000  auto num_shard_members = curr_subgroup_settings.members.size();
1001  std::vector<int> shard_senders = curr_subgroup_settings.senders;
1002  auto num_shard_senders = curr_view->multicast_group->get_num_senders(shard_senders);
1003  std::map<uint32_t, uint32_t> shard_ranks_by_sender_rank;
1004  for(uint j = 0, l = 0; j < num_shard_members; ++j) {
1005  if(shard_senders[j]) {
1006  shard_ranks_by_sender_rank[l] = j;
1007  l++;
1008  }
1009  }
1010  // wait for all pending sst sends to finish
1011  dbg_default_debug("Waiting for pending SST sends to finish");
1012  while(curr_view->multicast_group->check_pending_sst_sends(subgroup_id)) {
1013  }
1014  gmsSST.put_with_completion();
1015  gmsSST.sync_with_members(
1016  curr_view->multicast_group->get_shard_sst_indices(subgroup_id));
1017  while(curr_view->multicast_group->receiver_predicate(
1018  curr_subgroup_settings, shard_ranks_by_sender_rank,
1019  num_shard_senders, gmsSST)) {
1020  auto sst_receive_handler_lambda =
1021  [this, subgroup_id, curr_subgroup_settings,
1022  shard_ranks_by_sender_rank, num_shard_senders](
1023  uint32_t sender_rank, volatile char* data, uint32_t size) {
1024  curr_view->multicast_group->sst_receive_handler(
1025  subgroup_id, curr_subgroup_settings, shard_ranks_by_sender_rank,
1026  num_shard_senders, sender_rank, data, size);
1027  };
1028  curr_view->multicast_group->receiver_function(
1029  subgroup_id, curr_subgroup_settings, shard_ranks_by_sender_rank,
1030  num_shard_senders, gmsSST,
1031  curr_subgroup_settings.profile.window_size, sst_receive_handler_lambda);
1032  }
1033  }
1034 
1035  gmsSST.put_with_completion();
1036  dbg_default_debug("Doing an SST sync_with_members");
1037  gmsSST.sync_with_members();
1038 
1039  // For subgroups in which I'm the shard leader, do RaggedEdgeCleanup for the leader
1040  auto follower_subgroups_and_shards = std::make_shared<std::map<subgroup_id_t, uint32_t>>();
1041  for(const auto& shard_settings_pair : curr_view->multicast_group->get_subgroup_settings()) {
1042  const subgroup_id_t subgroup_id = shard_settings_pair.first;
1043  const uint32_t shard_num = shard_settings_pair.second.shard_num;
1044  const SubView& shard_view = curr_view->subgroup_shard_views.at(subgroup_id).at(shard_num);
1045  uint num_shard_senders = 0;
1046  for(auto v : shard_view.is_sender) {
1047  if(v) num_shard_senders++;
1048  }
1049  if(num_shard_senders) {
1050  if(shard_view.my_rank == curr_view->subview_rank_of_shard_leader(subgroup_id, shard_num)) {
1052  subgroup_id,
1053  shard_settings_pair.second.num_received_offset, shard_view.members,
1054  num_shard_senders);
1055  } else {
1056  // Keep track of which subgroups I'm a non-leader in, and what my
1057  // corresponding shard ID is
1058  follower_subgroups_and_shards->emplace(subgroup_id, shard_num);
1059  }
1060  }
1061  }
1062 
1063  // Wait for the shard leaders of subgroups I'm not a leader in to post
1064  // global_min_ready before continuing.
1065  auto leader_global_mins_are_ready = [this, follower_subgroups_and_shards](const DerechoSST& gmsSST) {
1066  for(const auto& subgroup_shard_pair : *follower_subgroups_and_shards) {
1067  const SubView& shard_view = curr_view->subgroup_shard_views.at(subgroup_shard_pair.first)
1068  .at(subgroup_shard_pair.second);
1069  node_id_t shard_leader = shard_view.members.at(
1070  curr_view->subview_rank_of_shard_leader(subgroup_shard_pair.first,
1071  subgroup_shard_pair.second));
1072  if(!gmsSST.global_min_ready[curr_view->rank_of(shard_leader)][subgroup_shard_pair.first]) {
1073  return false;
1074  }
1075  }
1076  return true;
1077  };
1078 
1079  auto global_min_ready_continuation = [this, follower_subgroups_and_shards](DerechoSST& gmsSST) {
1080  echo_ragged_trim(follower_subgroups_and_shards, gmsSST);
1081  };
1082 
1083  gmsSST.predicates.insert(leader_global_mins_are_ready, global_min_ready_continuation,
1085 }
1086 
1088  std::shared_ptr<std::map<subgroup_id_t, uint32_t>> follower_subgroups_and_shards,
1089  DerechoSST& gmsSST) {
1090  dbg_default_debug("GlobalMins are ready for all {} subgroup leaders this node is waiting on", follower_subgroups_and_shards->size());
1091  // Call RaggedEdgeCleanup for subgroups in which I'm not the leader
1092  for(const auto& subgroup_shard_pair : *follower_subgroups_and_shards) {
1093  const subgroup_id_t subgroup_id = subgroup_shard_pair.first;
1094  const uint32_t shard_num = subgroup_shard_pair.second;
1095  SubView& shard_view = curr_view->subgroup_shard_views.at(subgroup_id).at(shard_num);
1096  uint num_shard_senders = 0;
1097  for(auto v : shard_view.is_sender) {
1098  if(v) num_shard_senders++;
1099  }
1100  node_id_t shard_leader = shard_view.members[curr_view->subview_rank_of_shard_leader(
1101  subgroup_id, shard_num)];
1103  subgroup_id,
1104  curr_view->rank_of(shard_leader),
1105  curr_view->multicast_group->get_subgroup_settings().at(subgroup_id).num_received_offset,
1106  num_shard_senders);
1107  }
1108 
1109  //Now, for all subgroups I'm in (leader or not), wait for everyone to have echoed the leader's
1110  //global_min_ready before delivering any messages; this means they have seen and logged the ragged trim
1111  auto everyone_echoed_pred = [this](const DerechoSST& gmsSST) {
1112  for(const auto& subgroup_shard_pair : curr_view->my_subgroups) {
1113  const SubView& shard_view = curr_view->subgroup_shard_views.at(subgroup_shard_pair.first)
1114  .at(subgroup_shard_pair.second);
1115  for(const node_id_t shard_member : shard_view.members) {
1116  int shard_member_rank = curr_view->rank_of(shard_member);
1117  //Always check failed before reading an SST row
1118  if(!curr_view->failed[shard_member_rank]
1119  && !gmsSST.global_min_ready[shard_member_rank][subgroup_shard_pair.first]) {
1120  return false;
1121  }
1122  }
1123  }
1124  return true;
1125  };
1126 
1127  auto deliver_ragged_trim_trig = [this](DerechoSST& gmsSST) {
1128  deliver_ragged_trim(gmsSST);
1129  };
1130 
1131  gmsSST.predicates.insert(everyone_echoed_pred, deliver_ragged_trim_trig,
1133 }
1134 
1136  dbg_default_debug("GlobalMin has been echoed by everyone for all {} subgroups this node is in", curr_view->my_subgroups.size());
1137  for(const auto& subgroup_shard_pair : curr_view->my_subgroups) {
1138  const subgroup_id_t subgroup_id = subgroup_shard_pair.first;
1139  const uint32_t shard_num = subgroup_shard_pair.second;
1140  const SubView& shard_view = curr_view->subgroup_shard_views.at(subgroup_id).at(shard_num);
1141  node_id_t shard_leader = shard_view.members.at(
1142  curr_view->subview_rank_of_shard_leader(subgroup_id, shard_num));
1143  uint num_shard_senders = 0;
1144  for(auto v : shard_view.is_sender) {
1145  if(v) num_shard_senders++;
1146  }
1147  deliver_in_order(curr_view->rank_of(shard_leader), subgroup_id,
1148  curr_view->multicast_group->get_subgroup_settings()
1149  .at(subgroup_id)
1150  .num_received_offset,
1151  shard_view.members, num_shard_senders);
1152  }
1153 
1154  // Wait for persistence to finish for messages delivered in RaggedEdgeCleanup before continuing
1155  auto persistence_finished_pred = [this](const DerechoSST& gmsSST) {
1156  // For each subgroup/shard that this node is a member of...
1157  for(const auto& subgroup_shard_pair : curr_view->my_subgroups) {
1158  const subgroup_id_t subgroup_id = subgroup_shard_pair.first;
1159  const uint32_t shard_num = subgroup_shard_pair.second;
1160  if(curr_view->subgroup_shard_views.at(subgroup_id).at(shard_num).mode == Mode::UNORDERED) {
1161  // Skip non-ordered subgroups, they never do persistence
1162  continue;
1163  }
1164  message_id_t last_delivered_seq_num = gmsSST.delivered_num[curr_view->my_rank][subgroup_id];
1165  // For each member of that shard...
1166  for(const node_id_t& shard_member :
1167  curr_view->subgroup_shard_views.at(subgroup_id).at(shard_num).members) {
1168  uint member_row = curr_view->rank_of(shard_member);
1169  // Check to see if the member persisted up to the ragged edge trim
1170  if(!curr_view->failed[member_row]
1171  && persistent::unpack_version<int32_t>(
1172  gmsSST.persisted_num[member_row][subgroup_id])
1173  .second
1174  < last_delivered_seq_num) {
1175  return false;
1176  }
1177  }
1178  }
1179  return true;
1180  };
1181 
1182  auto finish_view_change_trig = [this](DerechoSST& gmsSST) {
1183  finish_view_change(gmsSST);
1184  };
1185 
1186  gmsSST.predicates.insert(persistence_finished_pred, finish_view_change_trig,
1188 }
1189 
1191  std::unique_lock<std::shared_timed_mutex> write_lock(view_mutex);
1192 
1193  // Disable all the other SST predicates, except suspected_changed
1198 
1199  // Now that the next_view won't change any more, calculate its subgroup settings
1200  std::map<subgroup_id_t, SubgroupSettings> next_subgroup_settings;
1201  auto sizes = derive_subgroup_settings(*next_view, next_subgroup_settings);
1202  uint32_t new_num_received_size = sizes.first;
1203  uint32_t new_slot_size = sizes.second;
1204 
1205  dbg_default_debug("Ready to transition to the next View: {}", next_view->debug_string());
1206  // Determine the shard leaders in the old view and re-index them by new subgroup IDs
1207  vector_int64_2d old_shard_leaders_by_id = old_shard_leaders_by_new_ids(
1208  *curr_view, *next_view);
1209 
1210  std::list<tcp::socket> joiner_sockets;
1211  if(active_leader && next_view->joined.size() > 0) {
1212  // If j joins have been committed, pop the next j sockets off
1213  // proposed_join_sockets and send them the new View and old shard
1214  // leaders list
1215  for(std::size_t c = 0; c < next_view->joined.size(); ++c) {
1217  std::size_t size_of_vector = mutils::bytes_size(old_shard_leaders_by_id);
1218  proposed_join_sockets.front().write(size_of_vector);
1219  mutils::post_object([this](const char* bytes, std::size_t size) {
1220  proposed_join_sockets.front().write(bytes, size);
1221  },
1222  old_shard_leaders_by_id);
1223  // save the socket for the commit step
1224  joiner_sockets.emplace_back(std::move(proposed_join_sockets.front()));
1225  proposed_join_sockets.pop_front();
1226  }
1227  }
1228 
1229  node_id_t my_id = next_view->members[next_view->my_rank];
1230 
1231  // Set up TCP connections to the joined nodes
1233  // After doing that, shard leaders can send them RPC objects
1234  send_objects_to_new_members(*next_view, old_shard_leaders_by_id);
1235 
1236  // Re-initialize this node's RPC objects, which includes receiving them
1237  // from shard leaders if it is newly a member of a subgroup
1238  dbg_default_debug("Receiving state for local Replicated Objects");
1239  initialize_subgroup_objects(my_id, *next_view, old_shard_leaders_by_id);
1240 
1241  // Once state transfer completes, we can tell joining clients to commit the view
1242  if(active_leader) {
1243  for(auto& joiner_socket : joiner_sockets) {
1244  //Eventually, we could check for success here and abort the view if a node failed
1245  joiner_socket.write(CommitMessage::PREPARE);
1246  }
1247  for(auto& joiner_socket : joiner_sockets) {
1248  joiner_socket.write(CommitMessage::COMMIT);
1249  }
1250  joiner_sockets.clear();
1251  }
1252 
1253  // Delete the last two GMS predicates from the old SST in preparation for deleting it
1256 
1257  dbg_default_debug("Starting creation of new SST and DerechoGroup for view {}", next_view->vid);
1258  for(const node_id_t failed_node_id : next_view->departed) {
1259  dbg_default_debug("Removing global TCP connections for failed node {} from RDMC and SST", failed_node_id);
1260 #ifdef USE_VERBS_API
1261  rdma::impl::verbs_remove_connection(failed_node_id);
1262 #else
1263  rdma::impl::lf_remove_connection(failed_node_id);
1264 #endif
1265  sst::remove_node(failed_node_id);
1266  }
1267  // if new members have joined, add their RDMA connections to SST and RDMC
1268  for(std::size_t i = 0; i < next_view->joined.size(); ++i) {
1269  // The new members will be the last joined.size() elements of the members lists
1270  int joiner_rank = next_view->num_members - next_view->joined.size() + i;
1271  dbg_default_debug("Adding RDMC connection to node {}, at IP {} and port {}", next_view->members[joiner_rank], std::get<0>(next_view->member_ips_and_ports[joiner_rank]), std::get<PORT_TYPE::RDMC>(next_view->member_ips_and_ports[joiner_rank]));
1272 
1273 #ifdef USE_VERBS_API
1274  rdma::impl::verbs_add_connection(next_view->members[joiner_rank],
1275  next_view->member_ips_and_ports[joiner_rank], my_id);
1276 #else
1278  next_view->members[joiner_rank],
1279  std::pair<ip_addr_t, uint16_t>{
1280  std::get<0>(next_view->member_ips_and_ports[joiner_rank]),
1281  std::get<PORT_TYPE::RDMC>(
1282  next_view->member_ips_and_ports[joiner_rank])});
1283 #endif
1284  }
1285  for(std::size_t i = 0; i < next_view->joined.size(); ++i) {
1286  int joiner_rank = next_view->num_members - next_view->joined.size() + i;
1287  sst::add_node(next_view->members[joiner_rank],
1288  std::pair<ip_addr_t, uint16_t>{
1289  std::get<0>(next_view->member_ips_and_ports[joiner_rank]),
1290  std::get<PORT_TYPE::SST>(
1291  next_view->member_ips_and_ports[joiner_rank])});
1292  }
1293 
1294  // This will block until everyone responds to SST/RDMC initial handshakes
1295  transition_multicast_group(next_subgroup_settings, new_num_received_size, new_slot_size);
1296 
1297  // New members can now proceed to view_manager.start(), which will call sync()
1298  next_view->gmsSST->put();
1299  next_view->gmsSST->sync_with_members();
1300  dbg_default_debug("Done setting up SST and MulticastGroup for view {}", next_view->vid);
1301  {
1302  lock_guard_t old_views_lock(old_views_mutex);
1303  old_views.push(std::move(curr_view));
1304  old_views_cv.notify_all();
1305  }
1306  curr_view = std::move(next_view);
1307 
1309  // Write the new view to disk before using it
1311  }
1312 
1313  // Re-initialize last_suspected (suspected[] has been reset to all false in the new view)
1314  last_suspected.assign(curr_view->members.size(), false);
1315 
1316  // Register predicates in the new view
1318 
1319  // First task with my new view...
1320  if(curr_view->i_am_new_leader()) {
1321  dbg_default_debug("i_am_new_leader() was true, calling merge_changes()");
1322  curr_view->merge_changes(); // Create a combined list of Changes
1323  active_leader = true;
1324  }
1325 
1326  // Announce the new view to the application
1327  for(auto& view_upcall : view_upcalls) {
1328  view_upcall(*curr_view);
1329  }
1330 
1331  curr_view->gmsSST->start_predicate_evaluation();
1332  view_change_cv.notify_all();
1333  dbg_default_debug("Done with view change to view {}", curr_view->vid);
1334 }
1335 
1336 /* ------------- 3. Helper Functions for Predicates and Triggers ------------- */
1337 
1339  const std::map<subgroup_id_t, SubgroupSettings>& subgroup_settings,
1340  const uint32_t num_received_size,
1341  const uint32_t slot_size) {
1342  const auto num_subgroups = curr_view->subgroup_shard_views.size();
1343 
1344  curr_view->gmsSST = std::make_shared<DerechoSST>(
1346  curr_view->members, curr_view->members[curr_view->my_rank],
1347  [this](const uint32_t node_id) { report_failure(node_id); },
1348  curr_view->failed, false),
1349  num_subgroups, num_received_size, slot_size);
1350 
1351  curr_view->multicast_group = std::make_unique<MulticastGroup>(
1352  curr_view->members, curr_view->members[curr_view->my_rank],
1353  curr_view->gmsSST, callbacks, num_subgroups, subgroup_settings,
1355  [this](const subgroup_id_t& subgroup_id, const persistent::version_t& ver, const uint64_t& msg_ts) {
1356  assert(subgroup_objects.find(subgroup_id) != subgroup_objects.end());
1357  subgroup_objects.at(subgroup_id).get().post_next_version(ver, msg_ts);
1358  },
1360 }
1361 
1363  const std::map<subgroup_id_t, SubgroupSettings>& new_subgroup_settings,
1364  const uint32_t new_num_received_size, const uint32_t new_slot_size) {
1365  const auto num_subgroups = next_view->subgroup_shard_views.size();
1366 
1367  next_view->gmsSST = std::make_shared<DerechoSST>(
1369  next_view->members, next_view->members[next_view->my_rank],
1370  [this](const uint32_t node_id) { report_failure(node_id); },
1371  next_view->failed, false),
1372  num_subgroups, new_num_received_size, new_slot_size);
1373 
1374  next_view->multicast_group = std::make_unique<MulticastGroup>(
1375  next_view->members, next_view->members[next_view->my_rank],
1376  next_view->gmsSST, std::move(*curr_view->multicast_group), num_subgroups,
1377  new_subgroup_settings,
1378  [this](const subgroup_id_t& subgroup_id, const persistent::version_t& ver, const uint64_t& msg_ts) {
1379  assert(subgroup_objects.find(subgroup_id) != subgroup_objects.end());
1380  subgroup_objects.at(subgroup_id).get().post_next_version(ver, msg_ts);
1381  },
1383 
1384  curr_view->multicast_group.reset();
1385 
1386  // Initialize this node's row in the new SST
1387  int changes_installed = next_view->joined.size() + next_view->departed.size();
1388  next_view->gmsSST->init_local_row_from_previous(
1389  (*curr_view->gmsSST), curr_view->my_rank, changes_installed);
1390  gmssst::set(next_view->gmsSST->vid[next_view->my_rank], next_view->vid);
1391 }
1392 
1393 bool ViewManager::receive_join(DerechoSST& gmsSST, tcp::socket& client_socket) {
1394  struct in_addr joiner_ip_packed;
1395  inet_aton(client_socket.get_remote_ip().c_str(), &joiner_ip_packed);
1396 
1397  uint64_t joiner_version_code;
1398  client_socket.exchange(my_version_hashcode, joiner_version_code);
1399  if(joiner_version_code != my_version_hashcode) {
1400  rls_default_warn("Rejected a connection from client at {}. Client was running on an incompatible platform or used an incompatible compiler.",
1401  client_socket.get_remote_ip());
1402  return false;
1403  }
1404  node_id_t joining_client_id = 0;
1405  client_socket.read(joining_client_id);
1406 
1407  if(curr_view->rank_of(joining_client_id) != -1) {
1408  dbg_default_warn("Joining node at IP {} announced it has ID {}, which is already in the View!", client_socket.get_remote_ip(), joining_client_id);
1410  curr_view->members[curr_view->my_rank]});
1411  return false;
1412  }
1413  client_socket.write(JoinResponse{JoinResponseCode::OK, curr_view->members[curr_view->my_rank]});
1414 
1415  uint16_t joiner_gms_port = 0;
1416  client_socket.read(joiner_gms_port);
1417  uint16_t joiner_rpc_port = 0;
1418  client_socket.read(joiner_rpc_port);
1419  uint16_t joiner_sst_port = 0;
1420  client_socket.read(joiner_sst_port);
1421  uint16_t joiner_rdmc_port = 0;
1422  client_socket.read(joiner_rdmc_port);
1423 
1424  dbg_default_debug("Proposing change to add node {}", joining_client_id);
1425  size_t next_change = gmsSST.num_changes[curr_view->my_rank] - gmsSST.num_installed[curr_view->my_rank];
1426  gmssst::set(gmsSST.changes[curr_view->my_rank][next_change],
1427  joining_client_id);
1428  gmssst::set(gmsSST.joiner_ips[curr_view->my_rank][next_change],
1429  joiner_ip_packed.s_addr);
1430  gmssst::set(gmsSST.joiner_gms_ports[curr_view->my_rank][next_change],
1431  joiner_gms_port);
1432  gmssst::set(gmsSST.joiner_rpc_ports[curr_view->my_rank][next_change],
1433  joiner_rpc_port);
1434  gmssst::set(gmsSST.joiner_sst_ports[curr_view->my_rank][next_change],
1435  joiner_sst_port);
1436  gmssst::set(gmsSST.joiner_rdmc_ports[curr_view->my_rank][next_change],
1437  joiner_rdmc_port);
1438 
1439  gmssst::increment(gmsSST.num_changes[curr_view->my_rank]);
1440 
1441  dbg_default_debug("Wedging view {}", curr_view->vid);
1442  curr_view->wedge();
1443  // gmsSST.put(gmsSST.changes.get_base() - gmsSST.getBaseAddress(),
1444  // gmsSST.num_committed.get_base() - gmsSST.changes.get_base());
1445  /* breaking the above put statement into individual put calls, to be sure
1446  * that if we were relying on any ordering guarantees, we won't run into
1447  * issue when guarantees do not hold*/
1448  gmsSST.put(gmsSST.changes);
1449  gmsSST.put(gmsSST.joiner_ips.get_base() - gmsSST.getBaseAddress(),
1450  gmsSST.num_changes.get_base() - gmsSST.joiner_ips.get_base());
1451  gmsSST.put(gmsSST.num_changes);
1452  return true;
1453 }
1454 
1455 void ViewManager::send_view(const View& new_view, tcp::socket& client_socket) {
1456  dbg_default_debug("Sending client the new view");
1457  auto bind_socket_write = [&client_socket](const char* bytes, std::size_t size) {
1458  client_socket.write(bytes, size);
1459  };
1460  std::size_t size_of_view = mutils::bytes_size(new_view);
1461  client_socket.write(size_of_view);
1462  mutils::post_object(bind_socket_write, new_view);
1463 }
1464 
1465 void ViewManager::send_objects_to_new_members(const View& new_view, const vector_int64_2d& old_shard_leaders) {
1466  node_id_t my_id = new_view.members[new_view.my_rank];
1467  for(subgroup_id_t subgroup_id = 0; subgroup_id < old_shard_leaders.size(); ++subgroup_id) {
1468  for(uint32_t shard = 0; shard < old_shard_leaders[subgroup_id].size(); ++shard) {
1469  //if I was the leader of the shard in the old view...
1470  if(my_id == old_shard_leaders[subgroup_id][shard]) {
1471  //send its object state to the new members
1472  for(node_id_t shard_joiner : new_view.subgroup_shard_views[subgroup_id][shard].joined) {
1473  if(shard_joiner != my_id) {
1474  send_subgroup_object(subgroup_id, shard_joiner);
1475  }
1476  }
1477  }
1478  }
1479  }
1480 }
1481 
1482 /* Note for the future: Since this "send" requires first receiving the log tail length,
1483  * it's really a blocking receive-then-send. Since all nodes call send_subgroup_object
1484  * before initialize_subgroup_objects, there's a small chance of a deadlock: node A could
1485  * be attempting to send an object to node B at the same time as B is attempting to send a
1486  * different object to A, and neither node will be able to send the log tail length that
1487  * the other one is waiting on. */
1489  LockedReference<std::unique_lock<std::mutex>, tcp::socket> joiner_socket = tcp_sockets->get_socket(new_node_id);
1490  ReplicatedObject& subgroup_object = subgroup_objects.at(subgroup_id);
1491  if(subgroup_object.is_persistent()) {
1492  //First, read the log tail length sent by the joining node
1493  int64_t persistent_log_length = 0;
1494  joiner_socket.get().read(persistent_log_length);
1496  dbg_default_debug("Got log tail length {}", persistent_log_length);
1497  }
1498  dbg_default_debug("Sending Replicated Object state for subgroup {} to node {}", subgroup_id, new_node_id);
1499  subgroup_object.send_object(joiner_socket.get());
1500 }
1501 
1503  for(const node_id_t& removed_id : new_view.departed) {
1504  dbg_default_debug("Removing TCP connection for failed node {}", removed_id);
1505  tcp_sockets->delete_node(removed_id);
1506  }
1507  for(const node_id_t& joiner_id : new_view.joined) {
1508  tcp_sockets->add_node(joiner_id,
1509  {std::get<0>(new_view.member_ips_and_ports[new_view.rank_of(joiner_id)]),
1510  std::get<PORT_TYPE::RPC>(new_view.member_ips_and_ports[new_view.rank_of(joiner_id)])});
1511  dbg_default_debug("Established a TCP connection to node {}", joiner_id);
1512  }
1513 }
1514 
1516  uint32_t num_received_size = 0;
1517  for(subgroup_id_t subgroup_num = 0; subgroup_num < view.subgroup_shard_views.size(); ++subgroup_num) {
1518  uint32_t max_shard_senders = 0;
1519  for(uint32_t shard_num = 0; shard_num < view.subgroup_shard_views[subgroup_num].size(); ++shard_num) {
1520  std::size_t shard_size = view.subgroup_shard_views[subgroup_num][shard_num].members.size();
1521  uint32_t num_shard_senders = view.subgroup_shard_views[subgroup_num][shard_num].num_senders();
1522  if(num_shard_senders > max_shard_senders) {
1523  max_shard_senders = shard_size;
1524  }
1525  }
1526  num_received_size += max_shard_senders;
1527  }
1528  return num_received_size;
1529 }
1530 
1532  const std::unique_ptr<View>& prev_view, View& curr_view) {
1533  int32_t initial_next_unassigned_rank = curr_view.next_unassigned_rank;
1534  curr_view.subgroup_shard_views.clear();
1535  curr_view.subgroup_ids_by_type_id.clear();
1536  subgroup_allocation_map_t subgroup_allocations;
1537  try {
1538  auto temp = subgroup_info.subgroup_membership_function(curr_view.subgroup_type_order,
1539  prev_view, curr_view);
1540  //Hack to ensure RVO works even though subgroup_allocations had to be declared outside this scope
1541  subgroup_allocations = std::move(temp);
1542  } catch(subgroup_provisioning_exception& ex) {
1543  // Mark the view as inadequate and roll back everything done by allocation functions
1544  curr_view.is_adequately_provisioned = false;
1545  curr_view.next_unassigned_rank = initial_next_unassigned_rank;
1546  curr_view.subgroup_shard_views.clear();
1547  curr_view.subgroup_ids_by_type_id.clear();
1548  return;
1549  }
1550  /* Now that all the subgroups are fully provisioned, use subgroup_allocations to initialize
1551  * curr_view's subgroup_ids_by_type_id, my_subgroups, and subgroup_shard_views
1552  */
1553  for(subgroup_type_id_t subgroup_type_id = 0;
1554  subgroup_type_id < curr_view.subgroup_type_order.size();
1555  ++subgroup_type_id) {
1556  const std::type_index& subgroup_type = curr_view.subgroup_type_order[subgroup_type_id];
1557  subgroup_shard_layout_t& curr_type_subviews = subgroup_allocations[subgroup_type];
1558  std::size_t num_subgroups = curr_type_subviews.size();
1559  curr_view.subgroup_ids_by_type_id.emplace(subgroup_type_id, std::vector<subgroup_id_t>(num_subgroups));
1560  for(uint32_t subgroup_index = 0; subgroup_index < num_subgroups; ++subgroup_index) {
1561  // Assign this (type, index) pair a new unique subgroup ID
1562  subgroup_id_t curr_subgroup_id = curr_view.subgroup_shard_views.size();
1563  curr_view.subgroup_ids_by_type_id[subgroup_type_id][subgroup_index] = curr_subgroup_id;
1564  uint32_t num_shards = curr_type_subviews[subgroup_index].size();
1565  for(uint shard_num = 0; shard_num < num_shards; ++shard_num) {
1566  SubView& shard_view = curr_type_subviews[subgroup_index][shard_num];
1567  shard_view.my_rank = shard_view.rank_of(curr_view.members[curr_view.my_rank]);
1568  if(shard_view.my_rank != -1) {
1569  // Initialize my_subgroups
1570  curr_view.my_subgroups[curr_subgroup_id] = shard_num;
1571  }
1572  if(prev_view) {
1573  // Initialize this shard's SubView.joined and SubView.departed
1574  subgroup_id_t prev_subgroup_id = prev_view->subgroup_ids_by_type_id.at(subgroup_type_id)
1575  .at(subgroup_index);
1576  SubView& prev_shard_view = prev_view->subgroup_shard_views[prev_subgroup_id][shard_num];
1577  shard_view.init_joined_departed(prev_shard_view);
1578  }
1579  } // for(shard_num)
1580  /* Pull this shard->SubView mapping out of the subgroup allocation
1581  * and save it under its subgroup ID (which was subgroup_shard_views.size()).
1582  * This deletes it from the subgroup_shard_layout_t's outer vector. */
1583  curr_view.subgroup_shard_views.emplace_back(std::move(
1584  subgroup_allocations[subgroup_type][subgroup_index]));
1585  } //for(subgroup_index)
1586  }
1587 }
1588 
1589 std::pair<uint32_t, uint32_t> ViewManager::derive_subgroup_settings(View& view,
1590  std::map<subgroup_id_t, SubgroupSettings>& subgroup_settings) {
1591  uint32_t num_received_offset = 0;
1592  uint32_t slot_offset = 0;
1593  view.my_subgroups.clear();
1594  for(subgroup_id_t subgroup_id = 0; subgroup_id < view.subgroup_shard_views.size(); ++subgroup_id) {
1595  uint32_t num_shards = view.subgroup_shard_views.at(subgroup_id).size();
1596  uint32_t max_shard_senders = 0;
1597  uint32_t slot_size_for_subgroup = 0;
1598  uint64_t max_payload_size = 0;
1599 
1600  for(uint32_t shard_num = 0; shard_num < num_shards; ++shard_num) {
1601  SubView& shard_view = view.subgroup_shard_views.at(subgroup_id).at(shard_num);
1602  max_shard_senders = std::max(shard_view.num_senders(), max_shard_senders);
1603 
1604  const DerechoParams& profile = DerechoParams::from_profile(shard_view.profile);
1605  uint32_t slot_size_for_shard = profile.window_size * (profile.sst_max_msg_size + 2 * sizeof(uint64_t));
1606  uint64_t payload_size = profile.max_msg_size - sizeof(header);
1607  max_payload_size = std::max(payload_size, max_payload_size);
1609  profile.max_reply_msg_size - sizeof(header),
1611  slot_size_for_subgroup = std::max(slot_size_for_shard, slot_size_for_subgroup);
1612  view_max_rpc_window_size = std::max(profile.window_size, view_max_rpc_window_size);
1613 
1614  //Initialize my_rank in the SubView for this node's ID
1615  shard_view.my_rank = shard_view.rank_of(view.members[view.my_rank]);
1616  if(shard_view.my_rank != -1) {
1617  //Initialize my_subgroups
1618  view.my_subgroups[subgroup_id] = shard_num;
1619  //Save the settings for MulticastGroup
1620  subgroup_settings[subgroup_id] = {
1621  shard_num,
1622  (uint32_t)shard_view.my_rank,
1623  shard_view.members,
1624  shard_view.is_sender,
1625  shard_view.sender_rank_of(shard_view.my_rank),
1626  num_received_offset,
1627  slot_offset,
1628  shard_view.mode,
1629  profile,
1630  };
1631  }
1632  } // for(shard_num)
1633  num_received_offset += max_shard_senders;
1634  slot_offset += slot_size_for_subgroup;
1635  max_payload_sizes[subgroup_id] = max_payload_size;
1636  } // for(subgroup_id)
1637 
1638  return {num_received_offset, slot_offset};
1639 }
1640 
1641 std::map<subgroup_id_t, uint64_t> ViewManager::get_max_payload_sizes() {
1642  return max_payload_sizes;
1643 }
1644 
1645 std::unique_ptr<View> ViewManager::make_next_view(const std::unique_ptr<View>& curr_view,
1646  const DerechoSST& gmsSST) {
1647  int myRank = curr_view->my_rank;
1648  std::set<int> leave_ranks;
1649  std::vector<int> join_indexes;
1650  // Look through pending changes up to num_committed and filter the joins and leaves
1651  const int committed_count = gmsSST.num_committed[curr_view->find_rank_of_leader()]
1652  - gmsSST.num_installed[curr_view->find_rank_of_leader()];
1653  for(int change_index = 0; change_index < committed_count; change_index++) {
1654  node_id_t change_id = gmsSST.changes[myRank][change_index];
1655  int change_rank = curr_view->rank_of(change_id);
1656  if(change_rank != -1) {
1657  // Might as well save the rank, since we'll need it again
1658  leave_ranks.emplace(change_rank);
1659  } else {
1660  join_indexes.emplace_back(change_index);
1661  }
1662  }
1663 
1664  int next_num_members = curr_view->num_members - leave_ranks.size() + join_indexes.size();
1665  // Initialize the next view
1666  std::vector<node_id_t> joined, members(next_num_members), departed;
1667  std::vector<char> failed(next_num_members);
1668  std::vector<std::tuple<ip_addr_t, uint16_t, uint16_t, uint16_t, uint16_t>> member_ips_and_ports(next_num_members);
1669  int next_unassigned_rank = curr_view->next_unassigned_rank;
1670  for(std::size_t i = 0; i < join_indexes.size(); ++i) {
1671  const int join_index = join_indexes[i];
1672  node_id_t joiner_id = gmsSST.changes[myRank][join_index];
1673  struct in_addr joiner_ip_packed;
1674  joiner_ip_packed.s_addr = gmsSST.joiner_ips[myRank][join_index];
1675  char* joiner_ip_cstr = inet_ntoa(joiner_ip_packed);
1676  std::string joiner_ip(joiner_ip_cstr);
1677 
1678  joined.emplace_back(joiner_id);
1679  // New members go at the end of the members list, but it may shrink in the new view
1680  int new_member_rank = curr_view->num_members - leave_ranks.size() + i;
1681  members[new_member_rank] = joiner_id;
1682  member_ips_and_ports[new_member_rank] = std::tuple{joiner_ip,
1683  gmsSST.joiner_gms_ports[myRank][join_index],
1684  gmsSST.joiner_rpc_ports[myRank][join_index],
1685  gmsSST.joiner_sst_ports[myRank][join_index],
1686  gmsSST.joiner_rdmc_ports[myRank][join_index]};
1687  dbg_default_debug("Next view will add new member with ID {}", joiner_id);
1688  }
1689  for(const auto& leaver_rank : leave_ranks) {
1690  departed.emplace_back(curr_view->members[leaver_rank]);
1691  // Decrement next_unassigned_rank for every failure,
1692  // unless the failed node wasn't assigned to a subgroup anyway
1693  if(leaver_rank <= curr_view->next_unassigned_rank) {
1694  next_unassigned_rank--;
1695  }
1696  }
1697  dbg_default_debug("Next view will exclude {} failed members.", leave_ranks.size());
1698 
1699  // Copy member information, excluding the members that have failed
1700  int new_rank = 0;
1701  for(int old_rank = 0; old_rank < curr_view->num_members; old_rank++) {
1702  // This is why leave_ranks needs to be a set
1703  if(leave_ranks.find(old_rank) == leave_ranks.end()) {
1704  members[new_rank] = curr_view->members[old_rank];
1705  member_ips_and_ports[new_rank] = curr_view->member_ips_and_ports[old_rank];
1706  failed[new_rank] = curr_view->failed[old_rank];
1707  ++new_rank;
1708  }
1709  }
1710 
1711  // Initialize my_rank in next_view
1712  int32_t my_new_rank = -1;
1713  node_id_t myID = curr_view->members[myRank];
1714  for(int i = 0; i < next_num_members; ++i) {
1715  if(members[i] == myID) {
1716  my_new_rank = i;
1717  break;
1718  }
1719  }
1720  if(my_new_rank == -1) {
1721  throw derecho_exception("Some other node reported that I failed. Node " + std::to_string(myID) + " terminating.");
1722  }
1723 
1724  auto next_view = std::make_unique<View>(
1725  curr_view->vid + 1, members, member_ips_and_ports, failed, joined,
1726  departed, my_new_rank, next_unassigned_rank,
1727  curr_view->subgroup_type_order);
1728  next_view->i_know_i_am_leader = curr_view->i_know_i_am_leader;
1729  return next_view;
1730 }
1731 
1733  const View& next_view) {
1734  std::vector<std::vector<int64_t>> old_shard_leaders_by_new_id(next_view.subgroup_shard_views.size());
1735  for(const auto& type_to_old_ids : curr_view.subgroup_ids_by_type_id) {
1736  for(uint32_t subgroup_index = 0; subgroup_index < type_to_old_ids.second.size(); ++subgroup_index) {
1737  subgroup_id_t old_subgroup_id = type_to_old_ids.second[subgroup_index];
1738  //The subgroup is uniquely identified by (type ID, subgroup index) in both old and new views
1739  subgroup_id_t new_subgroup_id = next_view.subgroup_ids_by_type_id.at(type_to_old_ids.first)
1740  .at(subgroup_index);
1741  std::size_t new_num_shards = next_view.subgroup_shard_views[new_subgroup_id].size();
1742  old_shard_leaders_by_new_id[new_subgroup_id].resize(new_num_shards, -1);
1743  for(uint32_t shard_num = 0; shard_num < new_num_shards; ++shard_num) {
1744  int64_t old_shard_leader = -1;
1745  //Raw subgroups don't have any state to send to new members, so they have no leaders
1746  if(curr_view.subgroup_type_order.at(type_to_old_ids.first)
1747  != std::type_index(typeid(RawObject))) {
1748  int old_shard_leader_rank = curr_view.subview_rank_of_shard_leader(old_subgroup_id, shard_num);
1749  if(old_shard_leader_rank >= 0) {
1750  old_shard_leader = curr_view.subgroup_shard_views[old_subgroup_id][shard_num]
1751  .members[old_shard_leader_rank];
1752  }
1753  }
1754  old_shard_leaders_by_new_id[new_subgroup_id][shard_num] = old_shard_leader;
1755  } // for(shard_num)
1756  } // for(subgroup_index)
1757  } // for(type_to_old_ids)
1758  return old_shard_leaders_by_new_id;
1759 }
1760 
1761 bool ViewManager::suspected_not_equal(const DerechoSST& gmsSST, const std::vector<bool>& old) {
1762  for(unsigned int r = 0; r < gmsSST.get_num_rows(); r++) {
1763  for(size_t who = 0; who < gmsSST.suspected.size(); who++) {
1764  if(gmsSST.suspected[r][who] && !old[who]) {
1765  return true;
1766  }
1767  }
1768  }
1769  return false;
1770 }
1771 
1772 void ViewManager::copy_suspected(const DerechoSST& gmsSST, std::vector<bool>& old) {
1773  for(size_t who = 0; who < gmsSST.suspected.size(); ++who) {
1774  old[who] = gmsSST.suspected[gmsSST.get_local_index()][who];
1775  }
1776 }
1777 
1779  int myRow = gmsSST.get_local_index();
1780  for(int p_index = 0;
1781  p_index < gmsSST.num_changes[myRow] - gmsSST.num_installed[myRow];
1782  p_index++) {
1783  const node_id_t p(const_cast<node_id_t&>(gmsSST.changes[myRow][p_index]));
1784  if(p == q) {
1785  return true;
1786  }
1787  }
1788  return false;
1789 }
1790 
1791 int ViewManager::min_acked(const DerechoSST& gmsSST, const std::vector<char>& failed) {
1792  int myRank = gmsSST.get_local_index();
1793  int min_num_acked = gmsSST.num_acked[myRank];
1794  for(size_t n = 0; n < failed.size(); n++) {
1795  if(!failed[n]) {
1796  // copy to avoid race condition and non-volatile based optimizations
1797  int num_acked_copy = gmsSST.num_acked[n];
1798  min_num_acked = std::min(min_num_acked, num_acked_copy);
1799  }
1800  }
1801 
1802  return min_num_acked;
1803 }
1804 
1806  const int rank_of_leader = curr_view.find_rank_of_leader();
1807  //For each non-failed member, check if that node suspects all ranks lower than the current leader
1808  for(uint row = 0; row < gmsSST.get_num_rows(); ++row) {
1809  if(!curr_view.failed[row]) {
1810  for(int previous_leader_rank = 0;
1811  previous_leader_rank < rank_of_leader;
1812  ++previous_leader_rank) {
1813  if(!gmsSST.suspected[row][previous_leader_rank]) {
1814  return false;
1815  }
1816  }
1817  }
1818  }
1819  return true;
1820 }
1821 
1823  const int my_rank = gmsSST.get_local_index();
1824  const int my_changes_length = gmsSST.num_changes[my_rank] - gmsSST.num_installed[my_rank];
1825  bool prior_changes_found = false;
1826  int prior_leader_rank = my_rank;
1827  while(!prior_changes_found && prior_leader_rank > 0) {
1828  prior_leader_rank--;
1829  const int changes_length = gmsSST.num_changes[prior_leader_rank]
1830  - gmsSST.num_installed[prior_leader_rank];
1831  if(changes_length > my_changes_length) {
1832  prior_changes_found = true;
1833  } else {
1834  //Check each element of changes, in case this node acknowledged a different leader's change
1835  for(int i = 0; i < changes_length; ++i) {
1836  if(gmsSST.changes[prior_leader_rank][i] != gmsSST.changes[my_rank][i]) {
1837  prior_changes_found = true;
1838  break;
1839  }
1840  }
1841  }
1842  }
1843  //Copy the prior leader's changes over mine;
1844  //this function is called before proposing any changes as the new leader
1845  if(prior_changes_found) {
1846  dbg_default_debug("Re-proposing changes from prior leader at rank {}. Num_changes is now {}", prior_leader_rank, gmsSST.num_changes[prior_leader_rank]);
1847  gmssst::set(gmsSST.changes[my_rank], gmsSST.changes[prior_leader_rank],
1848  gmsSST.changes.size());
1849  gmssst::set(gmsSST.num_changes[my_rank], gmsSST.num_changes[prior_leader_rank]);
1850  gmssst::set(gmsSST.joiner_ips[my_rank], gmsSST.joiner_ips[prior_leader_rank],
1851  gmsSST.joiner_ips.size());
1852  gmssst::set(gmsSST.joiner_gms_ports[my_rank], gmsSST.joiner_gms_ports[prior_leader_rank],
1853  gmsSST.joiner_gms_ports.size());
1854  gmssst::set(gmsSST.joiner_rpc_ports[my_rank], gmsSST.joiner_rpc_ports[prior_leader_rank],
1855  gmsSST.joiner_rpc_ports.size());
1856  gmssst::set(gmsSST.joiner_sst_ports[my_rank], gmsSST.joiner_sst_ports[prior_leader_rank],
1857  gmsSST.joiner_sst_ports.size());
1858  gmssst::set(gmsSST.joiner_rdmc_ports[my_rank], gmsSST.joiner_rdmc_ports[prior_leader_rank],
1859  gmsSST.joiner_rdmc_ports.size());
1860  }
1861  return prior_changes_found;
1862 }
1863 
1865  const uint32_t num_received_offset,
1866  const std::vector<node_id_t>& shard_members,
1867  uint num_shard_senders) {
1868  dbg_default_debug("Running leader RaggedEdgeCleanup for subgroup {}", subgroup_num);
1869  View& Vc = *curr_view;
1870  int myRank = Vc.my_rank;
1871  bool found = false;
1872  //Look to see if another node (i.e. a previous leader) has already set global_min_ready
1873  for(uint n = 0; n < shard_members.size() && !found; n++) {
1874  const auto node_id = shard_members[n];
1875  const auto node_rank = Vc.rank_of(node_id);
1876  if(Vc.gmsSST->global_min_ready[node_rank][subgroup_num]) {
1877  //Copy this shard's slice of global_min, starting at num_received_offset
1878  gmssst::set(&Vc.gmsSST->global_min[myRank][num_received_offset],
1879  &Vc.gmsSST->global_min[node_rank][num_received_offset], num_shard_senders);
1880  found = true;
1881  }
1882  }
1883 
1884  if(!found) {
1885  //Compute the global_min for this shard
1886  for(uint n = 0; n < num_shard_senders; n++) {
1887  int min_num_received = Vc.gmsSST->num_received[myRank][num_received_offset + n];
1888  for(uint r = 0; r < shard_members.size(); r++) {
1889  auto node_rank = Vc.rank_of(shard_members[r]);
1890  if(!Vc.failed[node_rank]) {
1891  int num_received_copy = Vc.gmsSST->num_received[node_rank][num_received_offset + n];
1892  min_num_received = std::min(min_num_received, num_received_copy);
1893  }
1894  }
1895 
1896  gmssst::set(Vc.gmsSST->global_min[myRank][num_received_offset + n], min_num_received);
1897  }
1898  }
1899 
1900  dbg_default_debug("Shard leader for subgroup {} finished computing global_min", subgroup_num);
1901  gmssst::set(Vc.gmsSST->global_min_ready[myRank][subgroup_num], true);
1902  Vc.gmsSST->put(
1903  Vc.multicast_group->get_shard_sst_indices(subgroup_num),
1904  (char*)std::addressof(Vc.gmsSST->global_min[0][num_received_offset]) - Vc.gmsSST->getBaseAddress(),
1905  sizeof(Vc.gmsSST->global_min[0][num_received_offset]) * num_shard_senders);
1906  Vc.gmsSST->put(Vc.multicast_group->get_shard_sst_indices(subgroup_num),
1907  Vc.gmsSST->global_min_ready, subgroup_num);
1908 
1910  log_ragged_trim(myRank, subgroup_num, num_received_offset, num_shard_senders);
1911  }
1912 }
1913 
1915  const subgroup_id_t subgroup_num, uint shard_leader_rank,
1916  const uint32_t num_received_offset,
1917  uint num_shard_senders) {
1918  const View& Vc = *curr_view;
1919  int myRank = Vc.my_rank;
1920  dbg_default_debug("Running follower RaggedEdgeCleanup for subgroup {}", subgroup_num);
1921  // Learn the leader's ragged trim, log it, and echo it before acting upon it
1923  log_ragged_trim(shard_leader_rank, subgroup_num, num_received_offset, num_shard_senders);
1924  }
1925  //Copy this shard's slice of global_min, starting at num_received_offset
1926  gmssst::set(&Vc.gmsSST->global_min[myRank][num_received_offset],
1927  &Vc.gmsSST->global_min[shard_leader_rank][num_received_offset],
1928  num_shard_senders);
1929  gmssst::set(Vc.gmsSST->global_min_ready[myRank][subgroup_num], true);
1930  Vc.gmsSST->put(
1931  Vc.multicast_group->get_shard_sst_indices(subgroup_num),
1932  (char*)std::addressof(Vc.gmsSST->global_min[0][num_received_offset]) - Vc.gmsSST->getBaseAddress(),
1933  sizeof(Vc.gmsSST->global_min[0][num_received_offset]) * num_shard_senders);
1934  Vc.gmsSST->put(Vc.multicast_group->get_shard_sst_indices(subgroup_num),
1935  Vc.gmsSST->global_min_ready, subgroup_num);
1936 }
1937 
1938 void ViewManager::deliver_in_order(const int shard_leader_rank,
1939  const uint32_t subgroup_num, const uint32_t num_received_offset,
1940  const std::vector<node_id_t>& shard_members, uint num_shard_senders) {
1941  // Ragged cleanup is finished, deliver in the implied order
1942  const View& Vc = *curr_view;
1943  std::vector<int32_t> max_received_indices(num_shard_senders);
1944  whenlog(std::stringstream delivery_order);
1945  whenlog(if(LoggerFactory::getDefaultLogger()->should_log(spdlog::level::debug)) {
1946  delivery_order << "Subgroup " << subgroup_num
1947  << ", shard " << Vc.my_subgroups.at(subgroup_num)
1948  << " ";
1949  });
1950  for(uint sender_rank = 0; sender_rank < num_shard_senders; sender_rank++) {
1951  whenlog(if(LoggerFactory::getDefaultLogger()->should_log(spdlog::level::debug)) {
1952  //This only works if every member is a sender, otherwise the rank will be wrong
1953  delivery_order << shard_members[sender_rank]
1954  << ":0..."
1955  << Vc.gmsSST->global_min[shard_leader_rank][num_received_offset + sender_rank]
1956  << " ";
1957  });
1958  max_received_indices[sender_rank]
1959  = Vc.gmsSST->global_min[shard_leader_rank][num_received_offset + sender_rank];
1960  }
1961  dbg_default_debug("Delivering ragged-edge messages in order: {}", delivery_order.str());
1962  Vc.multicast_group->deliver_messages_upto(max_received_indices, subgroup_num, num_shard_senders);
1963 }
1964 
1965 void ViewManager::log_ragged_trim(const int shard_leader_rank,
1966  const subgroup_id_t subgroup_num,
1967  const uint32_t num_received_offset,
1968  const uint num_shard_senders) {
1969  //Copy this shard's slice of global_min into a new vector
1970  std::vector<int32_t> max_received_indices(num_shard_senders);
1971  for(uint sender_rank = 0; sender_rank < num_shard_senders; sender_rank++) {
1972  max_received_indices[sender_rank]
1973  = curr_view->gmsSST->global_min[shard_leader_rank][num_received_offset + sender_rank];
1974  }
1975  uint32_t shard_num = curr_view->my_subgroups.at(subgroup_num);
1976  RaggedTrim trim_log{subgroup_num, shard_num, curr_view->vid,
1977  static_cast<int32_t>(curr_view->members[curr_view->find_rank_of_leader()]),
1978  max_received_indices};
1979  persistent::saveObject(trim_log, ragged_trim_filename(subgroup_num, shard_num).c_str());
1980  dbg_default_debug("Done logging ragged trim to disk for subgroup {}", subgroup_num);
1981 }
1982 
1983 /* ------------- 4. Public-Interface methods of ViewManager ------------- */
1984 
1986  // keep calm
1987  if(bSilent) {
1988  return;
1989  }
1990  const int failed_rank = curr_view->rank_of(who);
1991  dbg_default_debug("Node ID {} failure reported; marking suspected[{}]", who, failed_rank);
1992  gmssst::set(curr_view->gmsSST->suspected[curr_view->my_rank][failed_rank], true);
1993  int failed_cnt = 0;
1994  int rip_cnt = 0;
1995  for(std::size_t r = 0; r < curr_view->gmsSST->suspected.size(); r++) {
1996  if(curr_view->gmsSST->rip[r]) {
1997  ++rip_cnt;
1998  } else if(curr_view->gmsSST->suspected[curr_view->my_rank][r]) {
1999  ++failed_cnt;
2000  }
2001  }
2002 
2003  if(!curr_view->gmsSST->rip[curr_view->my_rank]
2004  && failed_cnt != 0 && (failed_cnt >= (curr_view->num_members - rip_cnt + 1) / 2)) {
2006  dbg_default_warn("Potential partitioning event, but partitioning safety is disabled. failed_cnt = {} but num_members - rip_cnt + 1 = {}",
2007  failed_cnt, curr_view->num_members - rip_cnt + 1);
2008  } else {
2009  throw derecho_exception("Potential partitioning event: this node is no longer in the majority and must shut down!");
2010  }
2011  }
2012  curr_view->gmsSST->put(curr_view->gmsSST->suspected, failed_rank);
2013 }
2014 
2016  bSilent = true;
2017 }
2018 
2020  shared_lock_t lock(view_mutex);
2021  dbg_default_debug("Cleanly leaving the group.");
2022  curr_view->multicast_group->wedge();
2023  curr_view->gmsSST->predicates.clear();
2024  gmssst::set(curr_view->gmsSST->suspected[curr_view->my_rank][curr_view->my_rank], true);
2025  curr_view->gmsSST->put(curr_view->gmsSST->suspected, curr_view->my_rank);
2026  curr_view->gmsSST->rip[curr_view->my_rank] = true;
2027  curr_view->gmsSST->put_with_completion(curr_view->gmsSST->rip.get_base() - curr_view->gmsSST->getBaseAddress(),
2028  sizeof(curr_view->gmsSST->rip[0]));
2029  thread_shutdown = true;
2030 }
2031 
2032 void ViewManager::send(subgroup_id_t subgroup_num, long long unsigned int payload_size,
2033  const std::function<void(char* buf)>& msg_generator, bool cooked_send) {
2034  shared_lock_t lock(view_mutex);
2035  view_change_cv.wait(lock, [&]() {
2036  return curr_view->multicast_group->send(subgroup_num, payload_size,
2037  msg_generator, cooked_send);
2038  });
2039 }
2040 
2042  shared_lock_t lock(view_mutex);
2043  return curr_view->multicast_group->compute_global_stability_frontier(subgroup_num);
2044 }
2045 
2047  view_upcalls.emplace_back(upcall);
2048 }
2049 
2050 std::vector<node_id_t> ViewManager::get_members() {
2051  shared_lock_t read_lock(view_mutex);
2052  return curr_view->members;
2053 }
2054 
2056  shared_lock_t read_lock(view_mutex);
2057  return curr_view->my_rank;
2058 }
2059 
2060 std::vector<std::vector<node_id_t>> ViewManager::get_subgroup_members(subgroup_type_id_t subgroup_type, uint32_t subgroup_index) {
2061  shared_lock_t read_lock(view_mutex);
2062  subgroup_id_t subgroup_id = curr_view->subgroup_ids_by_type_id.at(subgroup_type).at(subgroup_index);
2063  std::vector<std::vector<node_id_t>> subgroup_members;
2064  for(const auto& shard_view : curr_view->subgroup_shard_views.at(subgroup_id)) {
2065  subgroup_members.push_back(shard_view.members);
2066  }
2067  return subgroup_members;
2068 }
2069 
2070 std::size_t ViewManager::get_number_of_shards_in_subgroup(subgroup_type_id_t subgroup_type, uint32_t subgroup_index) {
2071  shared_lock_t read_lock(view_mutex);
2072  subgroup_id_t subgroup_id = curr_view->subgroup_ids_by_type_id.at(subgroup_type).at(subgroup_index);
2073  return curr_view->subgroup_shard_views.at(subgroup_id).size();
2074 }
2075 
2076 int32_t ViewManager::get_my_shard(subgroup_type_id_t subgroup_type, uint32_t subgroup_index) {
2077  shared_lock_t read_lock(view_mutex);
2078  subgroup_id_t subgroup_id = curr_view->subgroup_ids_by_type_id.at(subgroup_type).at(subgroup_index);
2079  auto find_id_result = curr_view->my_subgroups.find(subgroup_id);
2080  if(find_id_result == curr_view->my_subgroups.end()) {
2081  return -1;
2082  } else {
2083  return find_id_result->second;
2084  }
2085 }
2086 
2088  shared_lock_t read_lock(view_mutex);
2089  curr_view->gmsSST->sync_with_members();
2090 }
2091 
2093  assert(curr_view);
2095 }
2096 
2100  } else {
2102  }
2103 }
2104 
2106  std::cout << "curr_view = " << curr_view->debug_string() << std::endl;
2107 }
2108 } /* namespace derecho */
uint32_t subgroup_id_t
Type alias for the internal Subgroup IDs generated by ViewManager.
void send_logs()
An extra setup method only needed during total restart.
void send_view(const View &new_view, tcp::socket &client_socket)
Sends a joining node the new view that has been constructed to include it.
std::shared_ptr< tcp::tcp_connections > tcp_sockets
The same set of TCP sockets used by Group and RPCManager.
bool initialize(const std::map< uint32_t, std::pair< ip_addr_t, uint16_t >> &addresses, uint32_t node_rank) __attribute__((warn_unused_result))
Definition: rdmc.cpp:37
socket accept()
Blocks until a remote client makes a connection to this connection listener, then returns a new socke...
Definition: tcp.cpp:236
bool active_leader
True if this node is the current leader and is fully active (i.e.
unsigned int get_local_index() const
Gets the index of the local row in the table.
Definition: sst.hpp:318
std::vector< std::vector< int64_t > > vector_int64_2d
Type of a 2-dimensional vector used to store potential node IDs, or -1.
void follower_ragged_edge_cleanup(const subgroup_id_t subgroup_num, uint shard_leader_rank, const uint32_t num_received_offset, uint num_shard_senders)
Implements the Ragged Edge Cleanup algorithm for a non-leader node in a subgroup. ...
std::condition_variable old_views_cv
std::vector< std::vector< SubView > > subgroup_shard_views
Maps subgroup ID -> shard number -> SubView for that subgroup/shard.
Definition: view.hpp:143
const std::vector< node_id_t > members
Node IDs of members in the current view, indexed by their SST rank.
Definition: view.hpp:99
SSTFieldVector< bool > global_min_ready
Array indicating whether each shard leader (indexed by subgroup number) has published a global_min fo...
CommitMessage
A type-safe set of messages that can be sent during two-phase commit.
std::size_t get_number_of_shards_in_subgroup(subgroup_type_id_t subgroup_type, uint32_t subgroup_index)
std::vector< int > is_sender
vector selecting the senders, 0 for non-sender, non-0 for sender
Definition: view.hpp:39
One-time predicates only fire once; they are deleted once they become true.
std::string get_self_ip()
Definition: tcp.cpp:192
std::atomic< bool > thread_shutdown
A flag to signal background threads to shut down; set to true when the group is destroyed.
SSTField< int > num_changes
How many changes to the view have been proposed.
Definition: derecho_sst.hpp:86
char * get_base()
Definition: sst.hpp:52
bool check_view_committed(tcp::socket &leader_connection)
Setup method for non-leader nodes: checks whether the initial View received in the constructor gets c...
SSTField< bool > wedged
Set after calling rdmc::wedged(), reports that this member is wedged.
const uint16_t getConfUInt16(const std::string &key)
Definition: conf.cpp:126
std::mutex old_views_mutex
uint32_t subgroup_type_id_t
Type of the numeric ID used to refer to subgroup types within a Group; this is currently computed as ...
SharedLockedReference< const View > get_current_view_const()
This function is a dirty workaround for the fact that Group might need read-only access to curr_view ...
SSTFieldVector< uint16_t > joiner_gms_ports
joiner_xxx_ports are the port numbers for the joining nodes.
Definition: derecho_sst.hpp:76
Bundles together a JoinResponseCode and the leader&#39;s node ID, which it also needs to send to the new ...
std::map< subgroup_id_t, uint64_t > max_payload_sizes
int32_t next_unassigned_rank
The rank of the lowest-ranked member that is not assigned to a subgroup in this View.
Definition: view.hpp:124
std::map< subgroup_id_t, std::reference_wrapper< ReplicatedObject > > ReplicatedObjectReferenceMap
SSTFieldVector< uint16_t > joiner_sst_ports
Definition: derecho_sst.hpp:78
bool add_node(uint32_t new_id, const std::pair< ip_addr_t, uint16_t > &new_ip_addr_and_port)
Adds a new node to the SST TPC connections set.
Definition: lf.cpp:557
int rank_of(const node_id_t &who) const
Looks up the sub-view rank of a node ID.
Definition: view.cpp:44
std::vector< std::type_index > subgroup_type_order
Indicates the order that the subgroups should be provisioned; set by Group to be the same order as it...
pred_handle insert(pred predicate, trig trigger, PredicateType type=PredicateType::ONE_TIME)
Inserts a single (predicate, trigger) pair to the appropriate predicate list.
Definition: predicates.hpp:107
Bundles together a set of low-level parameters for configuring Derecho groups.
unsigned int get_num_rows() const
Returns the total number of rows in the table.
Definition: sst.hpp:315
A little helper class that wraps together a reference and a lock on a related mutex.
void silence()
stop complaining about node failures.
int32_t my_rank
The rank of this node within the subgroup/shard, or -1 if this node is not a member of the subgroup/s...
Definition: view.hpp:49
#define rls_default_info(...)
Definition: logger.hpp:73
std::vector< std::vector< SubView > > subgroup_shard_layout_t
The data structure used to store a subgroups-and-shards layout for a single subgroup type (i...
Predicates< DerivedSST > predicates
Definition: sst.hpp:180
std::unique_lock< std::mutex > unique_lock_t
void send_subgroup_object(subgroup_id_t subgroup_id, node_id_t new_node_id)
Sends a single subgroup&#39;s replicated object to a new member after a view change.
const std::string & getConfString(const std::string &key)
Definition: conf.cpp:110
void finish_setup()
Completes first-time setup of the ViewManager, including synchronizing the initial SST and delivering...
uint64_t view_max_rpc_reply_payload_size
int find_rank_of_leader() const
Returns the rank of this View&#39;s leader, based on failed[].
Definition: view.cpp:120
static bool previous_leaders_suspected(const DerechoSST &gmsSST, const View &curr_view)
bool verbs_add_connection(uint32_t index, const std::string &address, uint32_t node_rank)
std::map< subgroup_id_t, uint32_t > my_subgroups
Lists the (subgroup ID, shard num) pairs that this node is a member of.
Definition: view.hpp:145
pred_handle leader_proposed_handle
const int COMMITS_AHEAD_OF_VERSION
If the currently-compiled version of the Derecho library is more recent than the last "release" versi...
Definition: git_version.cpp:35
const int PATCH_VERSION
The current "patch" (more-minor) version number of the Derecho library, as defined by Git...
Definition: git_version.cpp:34
std::unique_ptr< MulticastGroup > multicast_group
RDMC manager object used for sending multicasts.
Definition: view.hpp:126
const std::string profile
Settings for the subview.
Definition: view.hpp:51
void increment(volatile int &member)
Thread-safe increment of an integer member of GMSTableRow; ensures there is a std::atomic_signal_fenc...
std::map< subgroup_id_t, uint64_t > get_max_payload_sizes()
const shard_view_generator_t subgroup_membership_function
The function that generates all the SubViews for a View.
void receive_view_and_leaders(const node_id_t my_id, tcp::socket &leader_connection)
Constructor helper for non-leader nodes; encapsulates receiving and deserializing a View...
void remove(pred_handle &pred)
Removes a (predicate, trigger) pair previously registered with insert().
Definition: predicates.hpp:126
void transition_multicast_group(const std::map< subgroup_id_t, SubgroupSettings > &new_subgroup_settings, const uint32_t new_num_received_size, const uint32_t new_slot_size)
Sets up the SST and MulticastGroup for a new view, based on the settings in the current view...
bool remove_node(uint32_t node_id)
Removes a node from the SST TCP connections set.
Definition: lf.cpp:561
LEADER_REDIRECT This node is not actually the leader and can&#39;t accept a join.
const uint32_t getConfUInt32(const std::string &key)
Definition: conf.cpp:118
SSTFieldVector< persistent::version_t > persisted_num
This represents the highest persistent version number that has been persisted to disk at this node...
Definition: derecho_sst.hpp:49
SSTField< int > num_acked
How many proposed changes have been seen.
Definition: derecho_sst.hpp:93
#define dbg_default_debug(...)
Definition: logger.hpp:42
int rank_of(const std::tuple< ip_addr_t, uint16_t, uint16_t, uint16_t, uint16_t > &who) const
Looks up the SST rank of an IP address.
Definition: view.cpp:157
static vector_int64_2d old_shard_leaders_by_new_ids(const View &curr_view, const View &next_view)
Constructs a vector mapping subgroup ID in the new view -> shard number -> node ID of that shard&#39;s le...
uint32_t node_rank
Definition: experiment.cpp:45
static std::unique_ptr< View > make_next_view(const std::unique_ptr< View > &curr_view, const DerechoSST &gmsSST)
Constructs the next view from the current view and the set of committed changes in the SST...
#define dbg_default_error(...)
Definition: logger.hpp:48
std::vector< node_id_t > departed
List of IDs of nodes that left since the previous view, if any.
Definition: view.hpp:112
pred_handle reject_join_handle
std::unique_ptr< RestartLeaderState > restart_leader_state_machine
If this node is the restart leader and currently doing a total restart, this object contains state re...
void send_objects_to_new_members(const View &new_view, const vector_int64_2d &old_shard_leaders)
Helper method for completing view changes; determines whether this node needs to send Replicated Obje...
bool in_total_restart
Set to true in the constructor if this node must do a total restart before completing group setup; fa...
std::vector< char > failed
failed[i] is true if members[i] is considered to have failed.
Definition: view.hpp:104
static void copy_suspected(const DerechoSST &gmsSST, std::vector< bool > &old)
void set(volatile Elem &e, const Elem &value)
Thread-safe setter for DerechoSST members; ensures there is a std::atomic_signal_fence after writing ...
std::shared_ptr< DerechoSST > gmsSST
Pointer to the SST instance used by the GMS in this View.
Definition: view.hpp:128
auto bytes_size(const T &)
Just calls sizeof(T)
void await_first_view(const node_id_t my_id)
Constructor helper for the leader when it first starts; waits for enough new nodes to join to make th...
bool is_adequately_provisioned
Set to false during MulticastGroup setup if a subgroup membership function throws a subgroup_provisio...
Definition: view.hpp:120
#define CONF_DERECHO_SST_PORT
Definition: conf.hpp:28
void send(subgroup_id_t subgroup_num, long long unsigned int payload_size, const std::function< void(char *buf)> &msg_generator, bool cooked_send=false)
Instructs the managed DerechoGroup&#39;s to send the next message.
void lf_initialize(const std::map< uint32_t, std::pair< ip_addr_t, uint16_t >> &ip_addrs_and_ports, uint32_t node_rank)
Initializes the global libfabric resources.
Definition: lf.cpp:708
void leader_start_join(DerechoSST &gmsSST)
Runs only on the group leader; proposes new views to include new members.
void start()
Starts predicate evaluation in the current view&#39;s SST.
An exception that indicates that a subgroup membership function was unable to finish executing becaus...
void barrier_sync()
Waits until all members of the group have called this function.
std::map< std::type_index, subgroup_shard_layout_t > subgroup_allocation_map_t
The data structure used to store the subgroups-and-shards layouts for all subgroup types in a Group (...
std::map< subgroup_type_id_t, std::vector< subgroup_id_t > > subgroup_ids_by_type_id
Maps the (type, index) pairs used by users to identify subgroups to the internal subgroup IDs generat...
Definition: view.hpp:139
int32_t num_failed
Number of current outstanding failures in this view.
Definition: view.hpp:108
const bool disable_partitioning_safety
A user-configurable option that disables the checks for partitioning events.
void construct_multicast_group(CallbackSet callbacks, const std::map< subgroup_id_t, SubgroupSettings > &subgroup_settings, const uint32_t num_received_size, const uint32_t slot_size)
Creates the SST and MulticastGroup for the first time, using the current view&#39;s member list...
SSTFieldVector< message_id_t > delivered_num
This represents the highest sequence number that has been delivered at this node. ...
Definition: derecho_sst.hpp:43
#define CONF_DERECHO_DISABLE_PARTITIONING_SAFETY
Definition: conf.hpp:32
void deliver_ragged_trim(DerechoSST &gmsSST)
Delivers messages that were marked deliverable by the ragged trim and proceeds to finish_view_change(...
static DerechoParams from_profile(const std::string &profile)
Constructs DerechoParams specifying subgroup metadata for specified profile.
void leader_commit_initial_view()
Setup method for the leader node: sends a commit message to all non-leader nodes indicating that it i...
bool leader_prepare_initial_view(bool &leader_has_quorum)
Setup method for the leader node in total restart mode: sends a Prepare message to all non-leader nod...
void initialize_multicast_groups(CallbackSet callbacks)
Sets up RDMA sessions for the multicast groups within this group.
void leader_ragged_edge_cleanup(const subgroup_id_t subgroup_num, const uint32_t num_received_offset, const std::vector< node_id_t > &shard_members, uint num_shard_senders)
Implements the Ragged Edge Cleanup algorithm for a subgroup/shard leader, operating on the shard that...
uint64_t my_version_hashcode
A runtime constant (computed once during static initialization) that represents the current running v...
void add_view_upcall(const view_upcall_t &upcall)
Adds another function to the set of "view upcalls," which are called when the view changes to notify ...
void verbs_initialize(const std::map< uint32_t, std::string > &ip_addrs, uint32_t node_rank)
Initializes the global verbs resources.
const bool any_persistent_objects
True if any of the Replicated<T> objects in this group have a Persistent<T> field, false if none of them do.
std::thread client_listener_thread
The background thread that listens for clients connecting on our server socket.
void new_suspicion(DerechoSST &gmsSST)
Called when there is a new failure suspicion.
void new_leader_takeover(DerechoSST &gmsSST)
Runs once on a node that becomes a leader due to a failure.
void receive_initial_view(node_id_t my_id, tcp::socket &leader_connection)
Helper for joining an existing group; receives the View and parameters from the leader.
bool lf_remove_connection(uint32_t node_id)
Removes a node&#39;s TCP connection, presumably because it has failed.
Definition: lf_helper.cpp:578
static std::shared_ptr< spdlog::logger > & getDefaultLogger()
Definition: logger.cpp:68
void debug_print_status() const
static void make_subgroup_maps(const SubgroupInfo &subgroup_info, const std::unique_ptr< View > &prev_view, View &curr_view)
Initializes curr_view with subgroup information based on the membership functions in subgroup_info...
uint32_t view_max_rpc_window_size
std::string ip_addr_t
Type alias for IP addresses, currently stored as strings.
void finish_view_change(DerechoSST &gmsSST)
Finishes installing the new view, assuming it is adequately provisioned.
#define CONF_DERECHO_LOCAL_IP
Definition: conf.hpp:25
bool receive_join(DerechoSST &gmsSST, tcp::socket &client_socket)
Assuming this node is the leader, handles a join request from a client.
static void setEarliestVersionToSerialize(const int64_t &ver) noexcept(true)
Set the earliest version for serialization, exclusive.
Definition: Persistent.cpp:42
bool read(char *buffer, size_t size)
Reads size bytes from the socket and writes them to the given buffer.
Definition: tcp.cpp:138
#define CONF_DERECHO_HEARTBEAT_MS
Definition: conf.hpp:30
Common interface for all types of Replicated<T>, specifying some methods for state transfer and persi...
std::unique_ptr< RestartState > restart_state
If this node is currently doing a total restart, this object contains state related to restarting...
SSTField< int > num_installed
How many previously proposed view changes have been installed in the current view.
Definition: derecho_sst.hpp:98
SSTFieldVector< uint16_t > joiner_rdmc_ports
Definition: derecho_sst.hpp:79
int32_t message_id_t
Type alias for a message&#39;s unique "sequence number" or index.
pred_handle start_join_handle
void echo_ragged_trim(std::shared_ptr< std::map< subgroup_id_t, uint32_t >> follower_subgroups_and_shards, DerechoSST &gmsSST)
Runs when the leader nodes of each subgroup have finished ragged edge cleanup.
#define rls_default_warn(...)
Definition: logger.hpp:75
static int min_acked(const DerechoSST &gmsSST, const std::vector< char > &failed)
pred_handle leader_committed_handle
#define CONF_DERECHO_RPC_PORT
Definition: conf.hpp:27
const SubgroupInfo subgroup_info
The subgroup membership function, which will be called whenever the view changes. ...
Bundles together a set of callback functions for message delivery events.
std::vector< std::vector< node_id_t > > get_subgroup_members(subgroup_type_id_t subgroup_type, uint32_t subgroup_index)
Returns a vector of vectors listing the members of a single subgroup (identified by type and index)...
std::map< subgroup_id_t, std::map< uint32_t, std::unique_ptr< RaggedTrim > >> ragged_trim_map_t
List of logged ragged trim states, indexed by (subgroup ID, shard num), stored by pointer...
void sync_with_members() const
Does a TCP sync with each member of the SST.
Definition: sst_impl.hpp:272
const uint64_t compute_global_stability_frontier(subgroup_id_t subgroup_num)
ViewManager(const SubgroupInfo &subgroup_info, const std::vector< std::type_index > &subgroup_type_order, const bool any_persistent_objects, const std::shared_ptr< tcp::tcp_connections > &group_tcp_sockets, ReplicatedObjectReferenceMap &object_reference_map, const persistence_manager_callbacks_t &_persistence_manager_callbacks, std::vector< view_upcall_t > _view_upcalls={})
Constructor for a new group where this node is the GMS leader.
std::unique_ptr< View > next_view
May hold a pointer to the partially-constructed next view, if we are in the process of transitioning ...
const std::vector< std::tuple< ip_addr_t, uint16_t, uint16_t, uint16_t, uint16_t > > member_ips_and_ports
IP addresses and ports (gms, rpc, sst, rdmc in order) of members in the current view, indexed by their SST rank.
Definition: view.hpp:101
SharedLockedReference< View > get_current_view()
void initialize_rdmc_sst()
Performs one-time global initialization of RDMC and SST, using the current view&#39;s membership...
static persistent::version_t ragged_trim_to_latest_version(const int32_t view_id, const std::vector< int32_t > &max_received_by_sender)
Computes the persistent version corresponding to a ragged trim proposal, i.e.
ID_IN_USE The node&#39;s ID is already listed as a member of the current view, so it can&#39;t join...
void put_with_completion()
Definition: sst.hpp:329
OK The new member can proceed to join as normal.
#define CONF_DERECHO_GMS_PORT
Definition: conf.hpp:26
pred_handle suspected_changed_handle
void acknowledge_proposed_change(DerechoSST &gmsSST)
Updates num_acked to acknowledge a proposed change when the leader increments num_changes.
int sender_rank_of(uint32_t rank) const
Looks up the sender rank of a given member.
Definition: view.cpp:53
#define dbg_default_trace(...)
Definition: logger.hpp:40
void create_threads()
Constructor helper method to encapsulate spawning the background threads.
std::function< void(const View &)> view_upcall_t
void freeze(int row_index)
Marks a row as frozen, so it will no longer update, and its corresponding node will not receive write...
Definition: sst_impl.hpp:247
std::vector< bool > last_suspected
A cached copy of the last known value of this node&#39;s suspected[] array.
size_t size() const
Just like std::vector::size(), returns the number of elements in this vector.
Definition: sst.hpp:110
bool lf_add_connection(uint32_t new_id, const std::pair< ip_addr_t, uint16_t > &new_ip_addr_and_port)
Adds a node to the group via tcp.
Definition: lf_helper.cpp:569
const int MAJOR_VERSION
The current major version number of the Derecho library, as defined by Git.
Definition: git_version.cpp:32
static bool copy_prior_leader_proposals(DerechoSST &gmsSST)
Searches backwards from this node&#39;s row in the SST to lower-ranked rows, looking for proposed changes...
#define dbg_default_flush()
Definition: logger.hpp:52
void leader_commit_change(DerechoSST &gmsSST)
Runs only on the group leader and updates num_committed when all non-failed members have acked a prop...
const char * getBaseAddress()
Definition: sst.hpp:320
std::string get_remote_ip() const
Definition: tcp.hpp:47
static uint32_t compute_num_received_size(const View &view)
Recomputes num_received_size (the length of the num_received column in the SST) for an existing provi...
std::shared_timed_mutex view_mutex
Controls access to curr_view.
uint32_t node_id_t
Type alias for Node IDs in a Derecho group.
JoinResponseCode code
void deliver_in_order(const int shard_leader_rank, const subgroup_id_t subgroup_num, const uint32_t num_received_offset, const std::vector< node_id_t > &shard_members, uint num_shard_senders)
Reads the global_min for the specified subgroup from the SST (assuming it has been computed already) ...
SSTField< bool > rip
to signal a graceful exit
The GMS and derecho_group will share the same SST for efficiency.
Definition: derecho_sst.hpp:22
const bool getConfBoolean(const std::string &key)
Definition: conf.cpp:146
void setup_initial_tcp_connections(const View &initial_view, node_id_t my_id)
Constructor helper that initializes TCP connections (for state transfer) to the members of initial_vi...
Constructor parameter pack for SST.
Definition: sst.hpp:124
tcp::connection_listener server_socket
The TCP socket the leader uses to listen for joining clients.
static bool changes_contains(const DerechoSST &gmsSST, const node_id_t q)
std::condition_variable_any view_change_cv
Notified when curr_view changes (i.e.
std::pair< uint32_t, uint32_t > derive_subgroup_settings(View &curr_view, std::map< subgroup_id_t, SubgroupSettings > &subgroup_settings)
Creates the subgroup-settings map that MulticastGroup&#39;s constructor needs (and the num_received_size ...
void saveObject(ObjectType &obj, const char *object_name) noexcept(false)
saveObject() saves a serializable object
#define CONF_DERECHO_RDMC_PORT
Definition: conf.hpp:29
void leave()
Causes this node to cleanly leave the group by setting itself to "failed.".
SSTFieldVector< uint32_t > joiner_ips
If changes[i] is a Join, joiner_ips[i] is the IP address of the joining node, packed into an unsigned...
Definition: derecho_sst.hpp:74
#define dbg_default_info(...)
Definition: logger.hpp:44
int32_t get_my_rank()
Returns the order of this node in the sequence of members of the group.
std::thread old_view_cleanup_thread
std::lock_guard< std::mutex > lock_guard_t
std::vector< T > functional_append(const std::vector< T > &original, const T &item)
Base case for functional_append, with one argument.
LockedListAccess locked()
An empty class to be used as the "replicated type" for a subgroup that doesn&#39;t implement a Replicated...
Definition: replicated.hpp:51
The subset of a View associated with a single shard, or a single subgroup if the subgroup is non-shar...
Definition: view.hpp:31
std::vector< node_id_t > members
Node IDs of members in this subgroup/shard, indexed by their order in the SST.
Definition: view.hpp:36
void truncate_persistent_logs(const ragged_trim_map_t &logged_ragged_trim)
Helper function for total restart mode: Uses the RaggedTrim values in logged_ragged_trim to truncate ...
void terminate_epoch(DerechoSST &gmsSST)
Runs when all live nodes have reported they have wedged the current view (meta-wedged), and starts ragged edge cleanup to finalize the terminated epoch.
void start_meta_wedge(DerechoSST &gmsSST)
Runs when at least one membership change has been committed by the leader, and wedges the current vie...
int subview_rank_of_shard_leader(subgroup_id_t subgroup_id, uint32_t shard_index) const
Computes the within-shard rank of a particular shard&#39;s leader, based on failed[]. ...
Definition: view.cpp:193
std::tuple< persistence_manager_make_version_func_t, persistence_manager_post_persist_func_t > persistence_manager_callbacks_t
Container for whatever information is needed to describe a Group&#39;s subgroups and shards.
const int MINOR_VERSION
The current minor version number of the Derecho library, as defined by Git.
Definition: git_version.cpp:33
std::vector< std::vector< int64_t > > prior_view_shard_leaders
A 2-dimensional vector, indexed by (subgroup ID -> shard number), containing the ID of the node in ea...
persistence_manager_callbacks_t persistence_manager_callbacks
The persistence request func is from persistence manager.
TOTAL_RESTART The group is currently restarting from a total failure, so the new member should send i...
std::vector< view_upcall_t > view_upcalls
Functions to be called whenever the view changes, to report the new view to some other component...
SSTFieldVector< uint16_t > joiner_rpc_ports
Definition: derecho_sst.hpp:77
uint32_t num_senders() const
returns the number of senders in the subview
Definition: view.cpp:66
#define CONF_DERECHO_LOCAL_ID
Definition: conf.hpp:24
void report_failure(const node_id_t who)
Reports to the GMS that the given node has failed.
void log_ragged_trim(const int shard_leader_rank, const subgroup_id_t subgroup_num, const uint32_t num_received_offset, const uint num_shard_senders)
Reads the global_min values for the specified subgroup (and the shard that this node belongs to) from...
std::enable_if_t< std::is_pod< BR >::value > post_object(const F &f, const BR &br, Args &&... args)
In-place serialization is also sometimes possible.
Mode mode
Operation mode, raw mode does not do stability and delivery.
Definition: view.hpp:34
void init_joined_departed(const SubView &previous_subview)
Initialization helper method that initializes the joined and departed lists given the previous View&#39;s...
Definition: view.cpp:76
std::vector< std::type_index > subgroup_type_order
The order of subgroup types as they were declared in the Group&#39;s template parameters.
Definition: view.hpp:133
Represents the data needed to log a "ragged trim" decision to disk.
int32_t my_rank
The rank of this node (as returned by rank_of())
Definition: view.hpp:116
SSTFieldVector< node_id_t > changes
An array of the same length as View::members, containing a list of proposed changes to the view that ...
Definition: derecho_sst.hpp:67
void await_rejoining_nodes(const node_id_t my_id)
Setup method for the leader when it is restarting from complete failure: waits for a restart quorum o...
int32_t get_my_shard(subgroup_type_id_t subgroup_type, uint32_t subgroup_index)
If this node is a member of the given subgroup (identified by its type and index), returns the number of the shard this node belongs to.
bool write(const char *buffer, size_t size)
Writes size bytes from the given buffer to the socket.
Definition: tcp.cpp:171
std::size_t to_bytes(const ByteRepresentable &b, char *v)
calls b.to_bytes(v) when b is a ByteRepresentable; calls std::memcpy() when b is POD.
void redirect_join_attempt(DerechoSST &gmsSST)
Runs on non-leaders to redirect confused new members to the current leader.
Recurrent predicates persist as long as the SST instance and fire their triggers every time they are ...
std::vector< node_id_t > joined
List of IDs of nodes that joined since the previous view, if any.
Definition: view.hpp:110
std::unique_ptr< View > curr_view
The current View, containing the state of the managed group.
initialize_rpc_objects_t initialize_subgroup_objects
A function that will be called to initialize replicated objects after transitioning to a new view...
std::vector< node_id_t > get_members()
Returns a vector listing the nodes that are currently members of the group.
SSTField< int > num_committed
How many proposed view changes have reached the commit point.
Definition: derecho_sst.hpp:88
std::queue< std::unique_ptr< View > > old_views
Contains old Views that need to be cleaned up.
std::shared_lock< std::shared_timed_mutex > shared_lock_t
Base exception class for all exceptions raised by Derecho.
void put()
Writes the entire local row to all remote nodes.
Definition: sst.hpp:325
virtual void send_object(tcp::socket &receiver_socket) const =0
static bool suspected_not_equal(const DerechoSST &gmsSST, const std::vector< bool > &old)
ReplicatedObjectReferenceMap & subgroup_objects
A type-erased list of references to the Replicated<T> objects in this group, indexed by their subgrou...
LockedQueue< tcp::socket > pending_join_sockets
On the leader node, contains client sockets for pending joins that have not yet been handled...
SSTFieldVector< bool > suspected
Array of same length as View::members, where each bool represents whether the corresponding member is...
Definition: derecho_sst.hpp:58
virtual bool is_persistent() const =0
void truncate_logs()
An extra setup step only needed during total restart; truncates the persistent logs of this node to c...
bool verbs_remove_connection(uint32_t index)
bool exchange(T local, T &remote)
Definition: tcp.hpp:120
void reinit_tcp_connections(const View &initial_view, node_id_t my_id)
Another setup helper for joining nodes; re-initializes the TCP connections list to reflect the curren...
void update_tcp_connections(const View &new_view)
Updates the TCP connections pool to reflect the joined and departed members in a new view...
pred_handle change_commit_ready_handle
std::list< tcp::socket > proposed_join_sockets
The sockets connected to clients that will join in the next view, if any.
std::atomic< bool > bSilent
On a graceful exit, nodes will be agree to leave at some point, where the view manager should stop th...
void register_predicates()
Constructor helper method to encapsulate creating all the predicates.
const int32_t num_members
Number of members in this view.
Definition: view.hpp:114
#define dbg_default_warn(...)
Definition: logger.hpp:46
std::string ragged_trim_filename(subgroup_id_t subgroup_num, uint32_t shard_num)
Builds a filename to use for a RaggedTrim logged to disk using its subgroup and shard IDs...