Derecho  0.9
Distributed systems toolkit for RDMA
restart_state.cpp
Go to the documentation of this file.
1 #include <optional>
2 
7 //This code needs access to ViewManager's static methods
9 
11 
12 namespace derecho {
13 
14 void RestartState::load_ragged_trim(const View& curr_view) {
15  /* Iterate through all subgroups by type, rather than iterating through my_subgroups,
16  * so that I have access to the type ID. This wastes time, but I don't have a map
17  * from subgroup ID to subgroup_type_id within curr_view. */
18  for(const auto& type_id_and_indices : curr_view.subgroup_ids_by_type_id) {
19  for(uint32_t subgroup_index = 0; subgroup_index < type_id_and_indices.second.size(); ++subgroup_index) {
20  subgroup_id_t subgroup_id = type_id_and_indices.second.at(subgroup_index);
21  //We only care if the subgroup's ID is in my_subgroups
22  auto subgroup_shard_ptr = curr_view.my_subgroups.find(subgroup_id);
23  if(subgroup_shard_ptr != curr_view.my_subgroups.end()) {
24  //If the subgroup ID is in my_subgroups, its value is this node's shard number
25  uint32_t shard_num = subgroup_shard_ptr->second;
26  std::unique_ptr<RaggedTrim> ragged_trim = persistent::loadObject<RaggedTrim>(
27  ragged_trim_filename(subgroup_id, shard_num).c_str());
28  //If there was a logged ragged trim from an obsolete View, it's the same as not having a logged ragged trim
29  if(ragged_trim == nullptr || ragged_trim->vid < curr_view.vid) {
30  dbg_default_debug("No ragged trim information found for subgroup {}, synthesizing it from logs", subgroup_id);
31  //Get the latest persisted version number from this subgroup's object's log
32  //(this requires converting the type ID to a std::type_index
33  persistent::version_t last_persisted_version = persistent::getMinimumLatestPersistedVersion(curr_view.subgroup_type_order.at(type_id_and_indices.first),
34  subgroup_index, shard_num);
35  int32_t last_vid, last_seq_num;
36  std::tie(last_vid, last_seq_num) = persistent::unpack_version<int32_t>(last_persisted_version);
37  //Divide the sequence number into sender rank and message counter
38  uint32_t num_shard_senders = curr_view.subgroup_shard_views.at(subgroup_id).at(shard_num).num_senders();
39  int32_t last_message_counter = last_seq_num / num_shard_senders;
40  uint32_t last_sender = last_seq_num % num_shard_senders;
41  /* Fill max_received_by_sender: In round-robin order, all senders ranked below
42  * the last sender delivered last_message_counter, while all senders ranked above
43  * the last sender have only delivered last_message_counter-1. */
44  std::vector<int32_t> max_received_by_sender(num_shard_senders);
45  for(uint sender_rank = 0; sender_rank <= last_sender; ++sender_rank) {
46  max_received_by_sender[sender_rank] = last_message_counter;
47  }
48  for(uint sender_rank = last_sender + 1; sender_rank < num_shard_senders; ++sender_rank) {
49  max_received_by_sender[sender_rank] = last_message_counter - 1;
50  }
51  ragged_trim = std::make_unique<RaggedTrim>(subgroup_id, shard_num, last_vid, -1, max_received_by_sender);
52  }
53  //operator[] is intentional: default-construct an inner std::map at subgroup_id
54  //Note that the inner map will only one entry, except on the restart leader where it will have one for every shard
55  logged_ragged_trim[subgroup_id].emplace(shard_num, std::move(ragged_trim));
56  } // if(subgroup_shard_ptr != curr_view->my_subgroups.end())
57  } // for(subgroup_index)
58  }
59 }
60 
62  const std::vector<int32_t>& max_received_by_sender) {
63  uint32_t num_shard_senders = max_received_by_sender.size();
64  //Determine the last deliverable sequence number using the same logic as deliver_messages_upto
65  int32_t max_seq_num = 0;
66  for(uint sender = 0; sender < num_shard_senders; sender++) {
67  max_seq_num = std::max(max_seq_num,
68  static_cast<int32_t>(max_received_by_sender[sender] * num_shard_senders + sender));
69  }
70  //Make the corresponding version number using the same logic as version_message
71  return persistent::combine_int32s(view_id, max_seq_num);
72 }
73 
74 RestartLeaderState::RestartLeaderState(std::unique_ptr<View> _curr_view, RestartState& restart_state,
75  const SubgroupInfo& subgroup_info,
76  const node_id_t my_id)
77  : curr_view(std::move(_curr_view)),
78  restart_state(restart_state),
79  subgroup_info(subgroup_info),
80  last_known_view_members(curr_view->members.begin(), curr_view->members.end()),
81  longest_log_versions(curr_view->subgroup_shard_views.size()),
82  nodes_with_longest_log(curr_view->subgroup_shard_views.size()),
83  my_id(my_id) {
84  rejoined_node_ids.emplace(my_id);
85  for(subgroup_id_t subgroup = 0; subgroup < curr_view->subgroup_shard_views.size(); ++subgroup) {
86  longest_log_versions[subgroup].resize(curr_view->subgroup_shard_views[subgroup].size(), 0);
87  nodes_with_longest_log[subgroup].resize(curr_view->subgroup_shard_views[subgroup].size(), -1);
88  }
89  //Initialize longest_logs with the RaggedTrims known locally -
90  //this node will only have RaggedTrims for subgroups it belongs to
91  for(const auto& subgroup_map_pair : restart_state.logged_ragged_trim) {
92  for(const auto& shard_and_trim : subgroup_map_pair.second) {
93  nodes_with_longest_log[subgroup_map_pair.first][shard_and_trim.first] = my_id;
94  longest_log_versions[subgroup_map_pair.first][shard_and_trim.first]
95  = RestartState::ragged_trim_to_latest_version(shard_and_trim.second->vid,
96  shard_and_trim.second->max_received_by_sender);
97  dbg_default_trace("Latest logged persistent version for subgroup {}, shard {} initialized to {}",
98  subgroup_map_pair.first, shard_and_trim.first, longest_log_versions[subgroup_map_pair.first][shard_and_trim.first]);
99  }
100  }
101 }
102 
104  bool ready_to_restart = false;
105  int time_remaining_ms = RESTART_LEADER_TIMEOUT;
106  while(time_remaining_ms > 0) {
107  using namespace std::chrono;
108  auto start_time = high_resolution_clock::now();
109  std::optional<tcp::socket> client_socket = server_socket.try_accept(time_remaining_ms);
110  auto end_time = high_resolution_clock::now();
111  milliseconds time_waited = duration_cast<milliseconds>(end_time - start_time);
112  time_remaining_ms -= time_waited.count();
113  if(client_socket) {
114  uint64_t joiner_version_code;
115  client_socket->exchange(my_version_hashcode, joiner_version_code);
116  if(joiner_version_code != my_version_hashcode) {
117  rls_default_warn("Rejected a connection from client at {}. Client was running on an incompatible platform or used an incompatible compiler.", client_socket->get_remote_ip());
118  continue;
119  }
120  node_id_t joiner_id = 0;
121  client_socket->read(joiner_id);
122  client_socket->write(JoinResponse{JoinResponseCode::TOTAL_RESTART, my_id});
123  dbg_default_debug("Node {} rejoined", joiner_id);
124  rejoined_node_ids.emplace(joiner_id);
125 
126  //Receive and process the joining node's logs of the last known View and RaggedTrim
127  receive_joiner_logs(joiner_id, *client_socket);
128 
129  //Receive the joining node's ports - this is part of the standard join logic
130  uint16_t joiner_gms_port = 0;
131  client_socket->read(joiner_gms_port);
132  uint16_t joiner_rpc_port = 0;
133  client_socket->read(joiner_rpc_port);
134  uint16_t joiner_sst_port = 0;
135  client_socket->read(joiner_sst_port);
136  uint16_t joiner_rdmc_port = 0;
137  client_socket->read(joiner_rdmc_port);
138  const ip_addr_t& joiner_ip = client_socket->get_remote_ip();
139  rejoined_node_ips_and_ports[joiner_id] = {joiner_ip, joiner_gms_port,
140  joiner_rpc_port, joiner_sst_port, joiner_rdmc_port};
141  //Done receiving from this socket (for now), so store it in waiting_join_sockets for later
142  waiting_join_sockets.emplace(joiner_id, std::move(*client_socket));
143  //Check for quorum
144  ready_to_restart = has_restart_quorum();
145  //If all the members have rejoined, no need to keep waiting
146  if(std::includes(rejoined_node_ids.begin(), rejoined_node_ids.end(),
148  return;
149  }
150  } else if(!ready_to_restart) {
151  //Accept timed out, but we haven't heard from enough nodes yet, so reset the timer
152  time_remaining_ms = RESTART_LEADER_TIMEOUT;
153  }
154  }
155 }
156 
158  //Compute rejoined_node_ids.intersect(last_known_view_members)
159  //but with a lot of unnecessary repetitive boilerplate because it's the STL
160  std::set<node_id_t> intersection_of_ids;
161  std::set_intersection(rejoined_node_ids.begin(), rejoined_node_ids.end(),
163  std::inserter(intersection_of_ids, intersection_of_ids.end()));
164  if(intersection_of_ids.size() < (last_known_view_members.size() / 2) + 1 || !contains_at_least_one_member_per_subgroup(rejoined_node_ids, *curr_view)) {
165  return false;
166  }
167  //If we have a sufficient number of members, attempt to compute a restart view
168  //If that fails, we know we don't have a quorum, but if it succeeds, we
169  //both have a quorum and know the restart view
170  return compute_restart_view();
171 }
172 
173 void RestartLeaderState::receive_joiner_logs(const node_id_t& joiner_id, tcp::socket& client_socket) {
174  //Receive the joining node's saved View
175  std::size_t size_of_view;
176  client_socket.read(size_of_view);
177  char view_buffer[size_of_view];
178  client_socket.read(view_buffer, size_of_view);
179  std::unique_ptr<View> client_view = mutils::from_bytes<View>(nullptr, view_buffer);
180 
181  if(client_view->vid > curr_view->vid) {
182  dbg_default_trace("Node {} had newer view {}, replacing view {} and discarding ragged trim",
183  joiner_id, client_view->vid, curr_view->vid);
184  //The joining node has a newer View, so discard any ragged trims that are not longest-log records
185  for(auto& subgroup_to_map : restart_state.logged_ragged_trim) {
186  auto trim_map_iterator = subgroup_to_map.second.begin();
187  while(trim_map_iterator != subgroup_to_map.second.end()) {
188  if(trim_map_iterator->second->leader_id != -1) {
189  trim_map_iterator = subgroup_to_map.second.erase(trim_map_iterator);
190  } else {
191  ++trim_map_iterator;
192  }
193  }
194  }
195  }
196  //Receive the joining node's RaggedTrims
197  std::size_t num_of_ragged_trims;
198  client_socket.read(num_of_ragged_trims);
199  for(std::size_t i = 0; i < num_of_ragged_trims; ++i) {
200  std::size_t size_of_ragged_trim;
201  client_socket.read(size_of_ragged_trim);
202  char buffer[size_of_ragged_trim];
203  client_socket.read(buffer, size_of_ragged_trim);
204  std::unique_ptr<RaggedTrim> ragged_trim = mutils::from_bytes<RaggedTrim>(nullptr, buffer);
205  dbg_default_trace("Received ragged trim for subgroup {}, shard {} from node {}",
206  ragged_trim->subgroup_id, ragged_trim->shard_num, joiner_id);
207  /* If the joining node has an obsolete View, we only care about the
208  * "ragged trims" if they are actually longest-log records and from
209  * a newer view than any ragged trims we have for this subgroup. */
210  if(client_view->vid < curr_view->vid && ragged_trim->leader_id != -1) { //-1 means the RaggedTrim is a log report
211  continue;
212  }
213  /* Determine if this node might end up being the "restart leader" for its subgroup
214  * because it has the longest log. Note that comparing log versions implicitly
215  * compares VIDs, so a ragged trim from a newer View is always "longer" */
216  persistent::version_t ragged_trim_log_version = RestartState::ragged_trim_to_latest_version(ragged_trim->vid, ragged_trim->max_received_by_sender);
217  if(ragged_trim_log_version > longest_log_versions[ragged_trim->subgroup_id][ragged_trim->shard_num]) {
218  dbg_default_trace("Latest logged persistent version for subgroup {}, shard {} is now {}, which is at node {}",
219  ragged_trim->subgroup_id, ragged_trim->shard_num, ragged_trim_log_version, joiner_id);
220  longest_log_versions[ragged_trim->subgroup_id][ragged_trim->shard_num] = ragged_trim_log_version;
221  nodes_with_longest_log[ragged_trim->subgroup_id][ragged_trim->shard_num] = joiner_id;
222  }
223  if(client_view->vid <= curr_view->vid) {
224  //In both of these cases, only keep the ragged trim if it is newer than anything we have
225  auto existing_ragged_trim = restart_state.logged_ragged_trim[ragged_trim->subgroup_id].find(ragged_trim->shard_num);
226  if(existing_ragged_trim == restart_state.logged_ragged_trim[ragged_trim->subgroup_id].end()) {
227  dbg_default_trace("Adding node {}'s ragged trim to map, because we don't have one for shard ({}, {})",
228  joiner_id, ragged_trim->subgroup_id, ragged_trim->shard_num);
229  //operator[] is intentional: Default-construct an inner std::map if one doesn't exist at this ID
230  restart_state.logged_ragged_trim[ragged_trim->subgroup_id].emplace(ragged_trim->shard_num,
231  std::move(ragged_trim));
232  } else if(existing_ragged_trim->second->vid <= ragged_trim->vid) {
233  existing_ragged_trim->second = std::move(ragged_trim);
234  }
235  } else {
236  //The client had a newer View, so accept everything it sends
237  restart_state.logged_ragged_trim[ragged_trim->subgroup_id].emplace(ragged_trim->shard_num,
238  std::move(ragged_trim));
239  }
240  }
241  //Replace curr_view if the client's view was newer
242  if(client_view->vid > curr_view->vid) {
243  client_view->subgroup_type_order = curr_view->subgroup_type_order;
244  curr_view.swap(client_view);
245  //Remake the std::set version of curr_view->members
246  last_known_view_members.clear();
247  last_known_view_members.insert(curr_view->members.begin(), curr_view->members.end());
248  }
249 }
250 
254  return restart_view->is_adequately_provisioned;
255 }
256 
259  for(auto waiting_sockets_iter = waiting_join_sockets.begin();
260  waiting_sockets_iter != waiting_join_sockets.end();) {
261  std::size_t view_buffer_size = mutils::bytes_size(*restart_view);
262  std::size_t leaders_buffer_size = mutils::bytes_size(nodes_with_longest_log);
263  char view_buffer[view_buffer_size];
264  char leaders_buffer[leaders_buffer_size];
265  bool send_success;
266  //Within this try block, any send that returns failure throws the ID of the node that failed
267  try {
268  dbg_default_debug("Sending post-recovery view {} to node {}", restart_view->vid, waiting_sockets_iter->first);
269  send_success = waiting_sockets_iter->second.write(view_buffer_size);
270  if(!send_success) {
271  throw waiting_sockets_iter->first;
272  }
273  mutils::to_bytes(*restart_view, view_buffer);
274  send_success = waiting_sockets_iter->second.write(view_buffer, view_buffer_size);
275  if(!send_success) {
276  throw waiting_sockets_iter->first;
277  }
278  dbg_default_debug("Sending ragged-trim information to node {}", waiting_sockets_iter->first);
279  std::size_t num_ragged_trims = multimap_size(restart_state.logged_ragged_trim);
280  send_success = waiting_sockets_iter->second.write(num_ragged_trims);
281  if(!send_success) {
282  throw waiting_sockets_iter->first;
283  }
284  //Unroll the maps and send each RaggedTrim individually, since it contains its subgroup_id and shard_num
285  for(const auto& subgroup_to_shard_map : restart_state.logged_ragged_trim) {
286  for(const auto& shard_trim_pair : subgroup_to_shard_map.second) {
287  std::size_t trim_buffer_size = mutils::bytes_size(*shard_trim_pair.second);
288  char trim_buffer[trim_buffer_size];
289  send_success = waiting_sockets_iter->second.write(trim_buffer_size);
290  if(!send_success) {
291  throw waiting_sockets_iter->first;
292  }
293  mutils::to_bytes(*shard_trim_pair.second, trim_buffer);
294  send_success = waiting_sockets_iter->second.write(trim_buffer, trim_buffer_size);
295  if(!send_success) {
296  throw waiting_sockets_iter->first;
297  }
298  }
299  }
300  dbg_default_debug("Sending longest-log locations to node {}", waiting_sockets_iter->first);
301  send_success = waiting_sockets_iter->second.write(leaders_buffer_size);
302  if(!send_success) {
303  throw waiting_sockets_iter->first;
304  }
305  mutils::to_bytes(nodes_with_longest_log, leaders_buffer);
306  send_success = waiting_sockets_iter->second.write(leaders_buffer, leaders_buffer_size);
307  if(!send_success) {
308  throw waiting_sockets_iter->first;
309  }
310  members_sent_restart_view.emplace(waiting_sockets_iter->first);
311  waiting_sockets_iter++;
312  } catch(node_id_t failed_node) {
313  //All send failures will end up here.
314  //Close the failed socket, delete it from rejoined_node_ids, and return the ID of the failed node.
315  waiting_join_sockets.erase(waiting_sockets_iter);
316  rejoined_node_ips_and_ports.erase(failed_node);
317  rejoined_node_ids.erase(failed_node);
318  return failed_node;
319  }
320  } //for (waiting_join_sockets)
321 
322  //Save this to a class member so that we still have it in send_objects_if_total_restart()
324  //Return -1 to indicate success: no node failed.
325  return -1;
326 }
327 
329  bool success = false;
330  //This method is only called after at least one failure, so first recompute the view
331  bool can_retry = has_restart_quorum();
332  while(can_retry) {
333  int64_t failed_node_id = send_restart_view();
334  if(failed_node_id != -1) {
335  dbg_default_warn("Recomputed View would still have been adequate, but node {} failed while sending it!", failed_node_id);
336  send_abort();
337  //Recompute the restart view again, and try again if it's still adequate
338  can_retry = has_restart_quorum();
339  } else {
340  //Successfully sent the recomputed View to all remaining nodes, so we can stop retrying
341  success = true;
342  break;
343  }
344  }
345  //If we reached this point and success is still false, we lost the quorum
346  return success;
347 }
348 
350  for(const node_id_t& member_sent_view : members_sent_restart_view) {
351  dbg_default_debug("Sending view abort message to node {}", member_sent_view);
352  waiting_join_sockets.at(member_sent_view).write(CommitMessage::ABORT);
353  }
354 }
355 
357  for(auto waiting_sockets_iter = waiting_join_sockets.begin();
358  waiting_sockets_iter != waiting_join_sockets.end();) {
359  bool socket_success;
360  try {
361  dbg_default_debug("Sending view prepare message to node {}", waiting_sockets_iter->first);
362  socket_success = waiting_sockets_iter->second.write(CommitMessage::PREPARE);
363  if(!socket_success) {
364  throw waiting_sockets_iter->first;
365  }
366  //Wait for an acknowledgment, to make sure the node has finished state transfer
367  CommitMessage response;
368  socket_success = waiting_sockets_iter->second.read(response);
369  if(!socket_success) {
370  throw waiting_sockets_iter->first;
371  }
372  if(response == CommitMessage::ACK) {
373  dbg_default_debug("Node {} acknowledged Prepare", waiting_sockets_iter->first);
374  } else {
375  dbg_default_warn("Node {} responded to Prepare with something other than Ack!", waiting_sockets_iter->first);
376  throw waiting_sockets_iter->first;
377  }
378  } catch(node_id_t failed_node) {
379  waiting_join_sockets.erase(waiting_sockets_iter);
380  rejoined_node_ips_and_ports.erase(failed_node);
381  rejoined_node_ids.erase(failed_node);
382  return failed_node;
383  }
384  waiting_sockets_iter++;
385  }
386  return -1;
387 }
388 
390  for(auto waiting_sockets_iter = waiting_join_sockets.begin();
391  waiting_sockets_iter != waiting_join_sockets.end();) {
392  dbg_default_debug("Sending view commit message to node {}", waiting_sockets_iter->first);
393  waiting_sockets_iter->second.write(CommitMessage::COMMIT);
394  waiting_sockets_iter = waiting_join_sockets.erase(waiting_sockets_iter);
395  }
396 }
397 
399  std::ostringstream leader_list;
400  for(subgroup_id_t subgroup = 0; subgroup < longest_log_versions.size(); ++subgroup) {
401  for(uint32_t shard = 0; shard < longest_log_versions.at(subgroup).size(); ++shard) {
402  leader_list << "Subgroup (" << subgroup << "," << shard << "): node "
403  << nodes_with_longest_log.at(subgroup).at(shard) << " with log length "
404  << longest_log_versions.at(subgroup).at(shard) << ". ";
405  }
406  }
407  dbg_default_debug("Restart subgroup/shard leaders: {}", leader_list.str());
408 }
409 
411  //Nodes that were not in the last view but have restarted will immediately "join" in the new view
412  std::vector<node_id_t> nodes_to_add_in_next_view;
413  std::vector<std::tuple<ip_addr_t, uint16_t, uint16_t, uint16_t, uint16_t>> ips_and_ports_to_add_in_next_view;
414  for(const auto& id_socket_pair : waiting_join_sockets) {
415  node_id_t joiner_id = id_socket_pair.first;
416  int joiner_rank = curr_view->rank_of(joiner_id);
417  if(joiner_rank == -1) {
418  nodes_to_add_in_next_view.emplace_back(joiner_id);
419  ips_and_ports_to_add_in_next_view.emplace_back(rejoined_node_ips_and_ports.at(joiner_id));
420  //If this node had been marked as failed, but was still in the view, un-fail it
421  } else if(curr_view->failed[joiner_rank] == true) {
422  curr_view->failed[joiner_rank] = false;
423  curr_view->num_failed--;
424  }
425  }
426  //Mark any nodes from the last view that haven't yet responded as failed
427  for(std::size_t rank = 0; rank < curr_view->members.size(); ++rank) {
428  if(rejoined_node_ids.count(curr_view->members[rank]) == 0
429  && !curr_view->failed[rank]) {
430  curr_view->failed[rank] = true;
431  curr_view->num_failed++;
432  }
433  }
434 
435  //Compute the next view, which will include all the members currently rejoining and remove the failed ones
436  return make_next_view(curr_view, nodes_to_add_in_next_view, ips_and_ports_to_add_in_next_view);
437 }
438 
439 std::unique_ptr<View> RestartLeaderState::make_next_view(const std::unique_ptr<View>& curr_view,
440  const std::vector<node_id_t>& joiner_ids,
441  const std::vector<std::tuple<ip_addr_t, uint16_t, uint16_t, uint16_t, uint16_t>>& joiner_ips_and_ports) {
442  int next_num_members = curr_view->num_members - curr_view->num_failed + joiner_ids.size();
443  std::vector<node_id_t> members(next_num_members), departed;
444  std::vector<char> failed(next_num_members);
445  std::vector<std::tuple<ip_addr_t, uint16_t, uint16_t, uint16_t, uint16_t>> member_ips_and_ports(next_num_members);
446  int next_unassigned_rank = curr_view->next_unassigned_rank;
447  std::set<int> leave_ranks;
448  for(std::size_t rank = 0; rank < curr_view->failed.size(); ++rank) {
449  if(curr_view->failed[rank]) {
450  leave_ranks.emplace(rank);
451  }
452  }
453  for(std::size_t i = 0; i < joiner_ids.size(); ++i) {
454  int new_member_rank = curr_view->num_members - leave_ranks.size() + i;
455  members[new_member_rank] = joiner_ids[i];
456  member_ips_and_ports[new_member_rank] = joiner_ips_and_ports[i];
457  dbg_default_debug("Restarted next view will add new member with id {}", joiner_ids[i]);
458  }
459  for(const auto& leaver_rank : leave_ranks) {
460  departed.emplace_back(curr_view->members[leaver_rank]);
461  //Decrement next_unassigned_rank for every failure, unless the failure wasn't assigned to a subgroup anyway
462  if(leaver_rank <= curr_view->next_unassigned_rank) {
463  next_unassigned_rank--;
464  }
465  }
466  dbg_default_debug("Next view will exclude {} failed members.", leave_ranks.size());
467  //Copy member information, excluding the members that have failed
468  int new_rank = 0;
469  for(int old_rank = 0; old_rank < curr_view->num_members; ++old_rank) {
470  //This is why leave_ranks needs to be a set
471  if(leave_ranks.find(old_rank) == leave_ranks.end()) {
472  members[new_rank] = curr_view->members[old_rank];
473  member_ips_and_ports[new_rank] = curr_view->member_ips_and_ports[old_rank];
474  failed[new_rank] = curr_view->failed[old_rank];
475  ++new_rank;
476  }
477  }
478 
479  //Initialize my_rank in next_view
480  int32_t my_new_rank = -1;
481  node_id_t myID = curr_view->members[curr_view->my_rank];
482  for(int i = 0; i < next_num_members; ++i) {
483  if(members[i] == myID) {
484  my_new_rank = i;
485  break;
486  }
487  }
488  if(my_new_rank == -1) {
490  throw derecho_exception("Recovery leader wasn't in the next view it computed?!?!");
491  }
492 
493  auto next_view = std::make_unique<View>(curr_view->vid + 1, members, member_ips_and_ports, failed,
494  joiner_ids, departed, my_new_rank, next_unassigned_rank,
495  curr_view->subgroup_type_order);
496  next_view->i_know_i_am_leader = curr_view->i_know_i_am_leader;
497  return next_view;
498 }
499 
501  for(const auto& shard_view_vector : last_view.subgroup_shard_views) {
502  for(const SubView& shard_view : shard_view_vector) {
503  //If none of the former members of this shard are in the restart set, it is insufficient
504  bool shard_member_restarted = false;
505  for(const node_id_t member_node : shard_view.members) {
506  if(rejoined_node_ids.find(member_node) != rejoined_node_ids.end()) {
507  shard_member_restarted = true;
508  }
509  }
510  if(!shard_member_restarted) {
511  return false;
512  }
513  }
514  }
515  return true;
516 }
517 
518 } /* namespace derecho */
uint32_t subgroup_id_t
Type alias for the internal Subgroup IDs generated by ViewManager.
std::vector< std::vector< int64_t > > restart_shard_leaders
Map from (subgroup ID, shard num) to ID of the "restart leader" for that shard, which is the node wit...
std::vector< std::vector< SubView > > subgroup_shard_views
Maps subgroup ID -> shard number -> SubView for that subgroup/shard.
Definition: view.hpp:143
CommitMessage
A type-safe set of messages that can be sent during two-phase commit.
RestartLeaderState(std::unique_ptr< View > _curr_view, RestartState &restart_state, const SubgroupInfo &subgroup_info, const node_id_t my_id)
const SubgroupInfo & subgroup_info
Bundles together a JoinResponseCode and the leader&#39;s node ID, which it also needs to send to the new ...
std::size_t multimap_size(const std::map< K1, std::map< K2, V >> &multimap)
Returns the size of a std::map of std::maps, by counting up the sizes of all the inner maps...
void receive_joiner_logs(const node_id_t &joiner_id, tcp::socket &client_socket)
Helper method for await_quorum that processes the logged View and RaggedTrims from a single rejoining...
void load_ragged_trim(const View &curr_view)
RestartState & restart_state
Mutable reference to RestartState, since this class needs to update the restart state stored in ViewM...
bool compute_restart_view()
Recomputes the restart view based on the current set of nodes that have rejoined (in waiting_join_soc...
std::unique_ptr< View > restart_view
std::map< subgroup_id_t, uint32_t > my_subgroups
Lists the (subgroup ID, shard num) pairs that this node is a member of.
Definition: view.hpp:145
void send_commit()
Sends a Commit message to all members of the restart view, then closes the TCP sockets connected to t...
const std::enable_if<(storageType==ST_FILE||storageType==ST_MEM), version_t >::type getMinimumLatestPersistedVersion(const std::type_index &subgroup_type, uint32_t subgroup_index, uint32_t shard_num)
get the minmum latest persisted version for a Replicated<T> identified by
STL namespace.
void send_abort()
Sends an Abort message to all nodes that have previously been sent the restart View, indicating that they must go back to waiting for a new View.
#define dbg_default_debug(...)
Definition: logger.hpp:42
auto bytes_size(const T &)
Just calls sizeof(T)
int64_t send_prepare()
Sends a Prepare message to all members who are currently ready to restart; this checks for failures o...
std::map< subgroup_type_id_t, std::vector< subgroup_id_t > > subgroup_ids_by_type_id
Maps the (type, index) pairs used by users to identify subgroups to the internal subgroup IDs generat...
Definition: view.hpp:139
std::set< node_id_t > rejoined_node_ids
bool has_restart_quorum()
Checks to see whether the leader has achieved a restart quorum, which may involve recomputing the res...
uint64_t my_version_hashcode
A runtime constant (computed once during static initialization) that represents the current running v...
void await_quorum(tcp::connection_listener &server_socket)
Waits for nodes to rejoin at this node, updating the last known View and RaggedTrim (and correspondin...
static void make_subgroup_maps(const SubgroupInfo &subgroup_info, const std::unique_ptr< View > &prev_view, View &curr_view)
Initializes curr_view with subgroup information based on the membership functions in subgroup_info...
std::string ip_addr_t
Type alias for IP addresses, currently stored as strings.
static bool contains_at_least_one_member_per_subgroup(std::set< node_id_t > rejoined_node_ids, const View &last_view)
bool read(char *buffer, size_t size)
Reads size bytes from the socket and writes them to the given buffer.
Definition: tcp.cpp:138
bool resend_view_until_quorum_lost()
Repeatedly attempts to send a new restart view, recomputing it on each failure, until either there is...
#define rls_default_warn(...)
Definition: logger.hpp:75
std::optional< socket > try_accept(int timeout_ms)
Waits the specified number of milliseconds for a remote client to connect to this connection listener...
Definition: tcp.cpp:258
static persistent::version_t ragged_trim_to_latest_version(const int32_t view_id, const std::vector< int32_t > &max_received_by_sender)
Computes the persistent version corresponding to a ragged trim proposal, i.e.
#define dbg_default_trace(...)
Definition: logger.hpp:40
static std::unique_ptr< View > make_next_view(const std::unique_ptr< View > &curr_view, const std::vector< node_id_t > &joiner_ids, const std::vector< std::tuple< ip_addr_t, uint16_t, uint16_t, uint16_t, uint16_t >> &joiner_ips_and_ports)
Constructs the next view from the current view and a list of joining nodes, by ID and IP address...
std::set< node_id_t > last_known_view_members
const int32_t vid
Sequential view ID: 0, 1, ...
Definition: view.hpp:97
std::unique_ptr< View > curr_view
Takes ownership of ViewManager&#39;s curr_view pointer, because await_quroum() might replace curr_view wi...
#define dbg_default_flush()
Definition: logger.hpp:52
uint32_t node_id_t
Type alias for Node IDs in a Derecho group.
ragged_trim_map_t logged_ragged_trim
List of logged ragged trim states recovered from the last known View, either read locally from this n...
int64_t send_restart_view()
Sends the currently-computed restart view, the current ragged trim, the current location of the longe...
The subset of a View associated with a single shard, or a single subgroup if the subgroup is non-shar...
Definition: view.hpp:31
std::set< node_id_t > members_sent_restart_view
std::vector< std::vector< int64_t > > nodes_with_longest_log
version_t combine_int32s(const int_type high_bits, const int_type low_bits)
Helper function for creating Persistent version numbers out of MulticastGroup sequence numbers and Vi...
Container for whatever information is needed to describe a Group&#39;s subgroups and shards.
TOTAL_RESTART The group is currently restarting from a total failure, so the new member should send i...
std::vector< std::vector< persistent::version_t > > longest_log_versions
std::vector< std::type_index > subgroup_type_order
The order of subgroup types as they were declared in the Group&#39;s template parameters.
Definition: view.hpp:133
std::map< node_id_t, std::tuple< ip_addr_t, uint16_t, uint16_t, uint16_t, uint16_t > > rejoined_node_ips_and_ports
std::size_t to_bytes(const ByteRepresentable &b, char *v)
calls b.to_bytes(v) when b is a ByteRepresentable; calls std::memcpy() when b is POD.
std::unique_ptr< View > update_curr_and_next_restart_view()
Updates curr_view and makes a new next_view based on the current set of rejoining nodes during total ...
Base exception class for all exceptions raised by Derecho.
std::map< node_id_t, tcp::socket > waiting_join_sockets
static const int RESTART_LEADER_TIMEOUT
#define dbg_default_warn(...)
Definition: logger.hpp:46
std::string ragged_trim_filename(subgroup_id_t subgroup_num, uint32_t shard_num)
Builds a filename to use for a RaggedTrim logged to disk using its subgroup and shard IDs...