19 for(uint32_t subgroup_index = 0; subgroup_index < type_id_and_indices.second.size(); ++subgroup_index) {
20 subgroup_id_t subgroup_id = type_id_and_indices.second.at(subgroup_index);
22 auto subgroup_shard_ptr = curr_view.
my_subgroups.find(subgroup_id);
25 uint32_t shard_num = subgroup_shard_ptr->second;
26 std::unique_ptr<RaggedTrim> ragged_trim = persistent::loadObject<RaggedTrim>(
29 if(ragged_trim ==
nullptr || ragged_trim->vid < curr_view.
vid) {
30 dbg_default_debug(
"No ragged trim information found for subgroup {}, synthesizing it from logs", subgroup_id);
34 subgroup_index, shard_num);
35 int32_t last_vid, last_seq_num;
36 std::tie(last_vid, last_seq_num) = persistent::unpack_version<int32_t>(last_persisted_version);
38 uint32_t num_shard_senders = curr_view.
subgroup_shard_views.at(subgroup_id).at(shard_num).num_senders();
39 int32_t last_message_counter = last_seq_num / num_shard_senders;
40 uint32_t last_sender = last_seq_num % num_shard_senders;
44 std::vector<int32_t> max_received_by_sender(num_shard_senders);
45 for(uint sender_rank = 0; sender_rank <= last_sender; ++sender_rank) {
46 max_received_by_sender[sender_rank] = last_message_counter;
48 for(uint sender_rank = last_sender + 1; sender_rank < num_shard_senders; ++sender_rank) {
49 max_received_by_sender[sender_rank] = last_message_counter - 1;
51 ragged_trim = std::make_unique<RaggedTrim>(subgroup_id, shard_num, last_vid, -1, max_received_by_sender);
62 const std::vector<int32_t>& max_received_by_sender) {
63 uint32_t num_shard_senders = max_received_by_sender.size();
65 int32_t max_seq_num = 0;
66 for(uint sender = 0; sender < num_shard_senders; sender++) {
67 max_seq_num = std::max(max_seq_num,
68 static_cast<int32_t>(max_received_by_sender[sender] * num_shard_senders + sender));
77 : curr_view(
std::move(_curr_view)),
78 restart_state(restart_state),
79 subgroup_info(subgroup_info),
80 last_known_view_members(curr_view->members.begin(), curr_view->members.end()),
81 longest_log_versions(curr_view->subgroup_shard_views.size()),
82 nodes_with_longest_log(curr_view->subgroup_shard_views.size()),
92 for(
const auto& shard_and_trim : subgroup_map_pair.second) {
96 shard_and_trim.second->max_received_by_sender);
97 dbg_default_trace(
"Latest logged persistent version for subgroup {}, shard {} initialized to {}",
98 subgroup_map_pair.first, shard_and_trim.first,
longest_log_versions[subgroup_map_pair.first][shard_and_trim.first]);
104 bool ready_to_restart =
false;
106 while(time_remaining_ms > 0) {
107 using namespace std::chrono;
108 auto start_time = high_resolution_clock::now();
109 std::optional<tcp::socket> client_socket = server_socket.
try_accept(time_remaining_ms);
110 auto end_time = high_resolution_clock::now();
111 milliseconds time_waited = duration_cast<milliseconds>(end_time - start_time);
112 time_remaining_ms -= time_waited.count();
114 uint64_t joiner_version_code;
117 rls_default_warn(
"Rejected a connection from client at {}. Client was running on an incompatible platform or used an incompatible compiler.", client_socket->get_remote_ip());
121 client_socket->read(joiner_id);
130 uint16_t joiner_gms_port = 0;
131 client_socket->read(joiner_gms_port);
132 uint16_t joiner_rpc_port = 0;
133 client_socket->read(joiner_rpc_port);
134 uint16_t joiner_sst_port = 0;
135 client_socket->read(joiner_sst_port);
136 uint16_t joiner_rdmc_port = 0;
137 client_socket->read(joiner_rdmc_port);
138 const ip_addr_t& joiner_ip = client_socket->get_remote_ip();
140 joiner_rpc_port, joiner_sst_port, joiner_rdmc_port};
150 }
else if(!ready_to_restart) {
160 std::set<node_id_t> intersection_of_ids;
163 std::inserter(intersection_of_ids, intersection_of_ids.end()));
175 std::size_t size_of_view;
176 client_socket.
read(size_of_view);
177 char view_buffer[size_of_view];
178 client_socket.
read(view_buffer, size_of_view);
179 std::unique_ptr<View> client_view = mutils::from_bytes<View>(
nullptr, view_buffer);
182 dbg_default_trace(
"Node {} had newer view {}, replacing view {} and discarding ragged trim",
183 joiner_id, client_view->vid,
curr_view->vid);
186 auto trim_map_iterator = subgroup_to_map.second.begin();
187 while(trim_map_iterator != subgroup_to_map.second.end()) {
188 if(trim_map_iterator->second->leader_id != -1) {
189 trim_map_iterator = subgroup_to_map.second.erase(trim_map_iterator);
197 std::size_t num_of_ragged_trims;
198 client_socket.
read(num_of_ragged_trims);
199 for(std::size_t i = 0; i < num_of_ragged_trims; ++i) {
200 std::size_t size_of_ragged_trim;
201 client_socket.
read(size_of_ragged_trim);
202 char buffer[size_of_ragged_trim];
203 client_socket.
read(buffer, size_of_ragged_trim);
204 std::unique_ptr<RaggedTrim> ragged_trim = mutils::from_bytes<RaggedTrim>(
nullptr, buffer);
206 ragged_trim->subgroup_id, ragged_trim->shard_num, joiner_id);
210 if(client_view->vid <
curr_view->vid && ragged_trim->leader_id != -1) {
217 if(ragged_trim_log_version >
longest_log_versions[ragged_trim->subgroup_id][ragged_trim->shard_num]) {
218 dbg_default_trace(
"Latest logged persistent version for subgroup {}, shard {} is now {}, which is at node {}",
219 ragged_trim->subgroup_id, ragged_trim->shard_num, ragged_trim_log_version, joiner_id);
220 longest_log_versions[ragged_trim->subgroup_id][ragged_trim->shard_num] = ragged_trim_log_version;
227 dbg_default_trace(
"Adding node {}'s ragged trim to map, because we don't have one for shard ({}, {})",
228 joiner_id, ragged_trim->subgroup_id, ragged_trim->shard_num);
231 std::move(ragged_trim));
232 }
else if(existing_ragged_trim->second->vid <= ragged_trim->vid) {
233 existing_ragged_trim->second = std::move(ragged_trim);
238 std::move(ragged_trim));
243 client_view->subgroup_type_order =
curr_view->subgroup_type_order;
263 char view_buffer[view_buffer_size];
264 char leaders_buffer[leaders_buffer_size];
269 send_success = waiting_sockets_iter->second.write(view_buffer_size);
271 throw waiting_sockets_iter->first;
274 send_success = waiting_sockets_iter->second.write(view_buffer, view_buffer_size);
276 throw waiting_sockets_iter->first;
278 dbg_default_debug(
"Sending ragged-trim information to node {}", waiting_sockets_iter->first);
280 send_success = waiting_sockets_iter->second.write(num_ragged_trims);
282 throw waiting_sockets_iter->first;
286 for(
const auto& shard_trim_pair : subgroup_to_shard_map.second) {
288 char trim_buffer[trim_buffer_size];
289 send_success = waiting_sockets_iter->second.write(trim_buffer_size);
291 throw waiting_sockets_iter->first;
294 send_success = waiting_sockets_iter->second.write(trim_buffer, trim_buffer_size);
296 throw waiting_sockets_iter->first;
300 dbg_default_debug(
"Sending longest-log locations to node {}", waiting_sockets_iter->first);
301 send_success = waiting_sockets_iter->second.write(leaders_buffer_size);
303 throw waiting_sockets_iter->first;
306 send_success = waiting_sockets_iter->second.write(leaders_buffer, leaders_buffer_size);
308 throw waiting_sockets_iter->first;
311 waiting_sockets_iter++;
329 bool success =
false;
334 if(failed_node_id != -1) {
335 dbg_default_warn(
"Recomputed View would still have been adequate, but node {} failed while sending it!", failed_node_id);
361 dbg_default_debug(
"Sending view prepare message to node {}", waiting_sockets_iter->first);
363 if(!socket_success) {
364 throw waiting_sockets_iter->first;
368 socket_success = waiting_sockets_iter->second.read(response);
369 if(!socket_success) {
370 throw waiting_sockets_iter->first;
375 dbg_default_warn(
"Node {} responded to Prepare with something other than Ack!", waiting_sockets_iter->first);
376 throw waiting_sockets_iter->first;
384 waiting_sockets_iter++;
392 dbg_default_debug(
"Sending view commit message to node {}", waiting_sockets_iter->first);
399 std::ostringstream leader_list;
402 leader_list <<
"Subgroup (" << subgroup <<
"," << shard <<
"): node " 412 std::vector<node_id_t> nodes_to_add_in_next_view;
413 std::vector<std::tuple<ip_addr_t, uint16_t, uint16_t, uint16_t, uint16_t>> ips_and_ports_to_add_in_next_view;
415 node_id_t joiner_id = id_socket_pair.first;
416 int joiner_rank =
curr_view->rank_of(joiner_id);
417 if(joiner_rank == -1) {
418 nodes_to_add_in_next_view.emplace_back(joiner_id);
421 }
else if(
curr_view->failed[joiner_rank] ==
true) {
427 for(std::size_t rank = 0; rank <
curr_view->members.size(); ++rank) {
440 const std::vector<node_id_t>& joiner_ids,
441 const std::vector<std::tuple<ip_addr_t, uint16_t, uint16_t, uint16_t, uint16_t>>& joiner_ips_and_ports) {
442 int next_num_members = curr_view->num_members - curr_view->num_failed + joiner_ids.size();
443 std::vector<node_id_t> members(next_num_members), departed;
444 std::vector<char> failed(next_num_members);
445 std::vector<std::tuple<ip_addr_t, uint16_t, uint16_t, uint16_t, uint16_t>> member_ips_and_ports(next_num_members);
446 int next_unassigned_rank = curr_view->next_unassigned_rank;
447 std::set<int> leave_ranks;
448 for(std::size_t rank = 0; rank < curr_view->failed.size(); ++rank) {
449 if(curr_view->failed[rank]) {
450 leave_ranks.emplace(rank);
453 for(std::size_t i = 0; i < joiner_ids.size(); ++i) {
454 int new_member_rank = curr_view->num_members - leave_ranks.size() + i;
455 members[new_member_rank] = joiner_ids[i];
456 member_ips_and_ports[new_member_rank] = joiner_ips_and_ports[i];
457 dbg_default_debug(
"Restarted next view will add new member with id {}", joiner_ids[i]);
459 for(
const auto& leaver_rank : leave_ranks) {
460 departed.emplace_back(curr_view->members[leaver_rank]);
462 if(leaver_rank <= curr_view->next_unassigned_rank) {
463 next_unassigned_rank--;
466 dbg_default_debug(
"Next view will exclude {} failed members.", leave_ranks.size());
469 for(
int old_rank = 0; old_rank < curr_view->num_members; ++old_rank) {
471 if(leave_ranks.find(old_rank) == leave_ranks.end()) {
472 members[new_rank] = curr_view->members[old_rank];
473 member_ips_and_ports[new_rank] = curr_view->member_ips_and_ports[old_rank];
474 failed[new_rank] = curr_view->failed[old_rank];
480 int32_t my_new_rank = -1;
481 node_id_t myID = curr_view->members[curr_view->my_rank];
482 for(
int i = 0; i < next_num_members; ++i) {
483 if(members[i] == myID) {
488 if(my_new_rank == -1) {
490 throw derecho_exception(
"Recovery leader wasn't in the next view it computed?!?!");
493 auto next_view = std::make_unique<View>(curr_view->vid + 1, members, member_ips_and_ports, failed,
494 joiner_ids, departed, my_new_rank, next_unassigned_rank,
495 curr_view->subgroup_type_order);
496 next_view->i_know_i_am_leader = curr_view->i_know_i_am_leader;
502 for(
const SubView& shard_view : shard_view_vector) {
504 bool shard_member_restarted =
false;
505 for(
const node_id_t member_node : shard_view.members) {
506 if(rejoined_node_ids.find(member_node) != rejoined_node_ids.end()) {
507 shard_member_restarted =
true;
510 if(!shard_member_restarted) {
uint32_t subgroup_id_t
Type alias for the internal Subgroup IDs generated by ViewManager.
std::vector< std::vector< int64_t > > restart_shard_leaders
Map from (subgroup ID, shard num) to ID of the "restart leader" for that shard, which is the node wit...
std::vector< std::vector< SubView > > subgroup_shard_views
Maps subgroup ID -> shard number -> SubView for that subgroup/shard.
CommitMessage
A type-safe set of messages that can be sent during two-phase commit.
RestartLeaderState(std::unique_ptr< View > _curr_view, RestartState &restart_state, const SubgroupInfo &subgroup_info, const node_id_t my_id)
const SubgroupInfo & subgroup_info
Bundles together a JoinResponseCode and the leader's node ID, which it also needs to send to the new ...
std::size_t multimap_size(const std::map< K1, std::map< K2, V >> &multimap)
Returns the size of a std::map of std::maps, by counting up the sizes of all the inner maps...
void receive_joiner_logs(const node_id_t &joiner_id, tcp::socket &client_socket)
Helper method for await_quorum that processes the logged View and RaggedTrims from a single rejoining...
void load_ragged_trim(const View &curr_view)
RestartState & restart_state
Mutable reference to RestartState, since this class needs to update the restart state stored in ViewM...
bool compute_restart_view()
Recomputes the restart view based on the current set of nodes that have rejoined (in waiting_join_soc...
std::unique_ptr< View > restart_view
std::map< subgroup_id_t, uint32_t > my_subgroups
Lists the (subgroup ID, shard num) pairs that this node is a member of.
void send_commit()
Sends a Commit message to all members of the restart view, then closes the TCP sockets connected to t...
const std::enable_if<(storageType==ST_FILE||storageType==ST_MEM), version_t >::type getMinimumLatestPersistedVersion(const std::type_index &subgroup_type, uint32_t subgroup_index, uint32_t shard_num)
get the minmum latest persisted version for a Replicated<T> identified by
void send_abort()
Sends an Abort message to all nodes that have previously been sent the restart View, indicating that they must go back to waiting for a new View.
#define dbg_default_debug(...)
auto bytes_size(const T &)
Just calls sizeof(T)
int64_t send_prepare()
Sends a Prepare message to all members who are currently ready to restart; this checks for failures o...
std::map< subgroup_type_id_t, std::vector< subgroup_id_t > > subgroup_ids_by_type_id
Maps the (type, index) pairs used by users to identify subgroups to the internal subgroup IDs generat...
std::set< node_id_t > rejoined_node_ids
bool has_restart_quorum()
Checks to see whether the leader has achieved a restart quorum, which may involve recomputing the res...
uint64_t my_version_hashcode
A runtime constant (computed once during static initialization) that represents the current running v...
void await_quorum(tcp::connection_listener &server_socket)
Waits for nodes to rejoin at this node, updating the last known View and RaggedTrim (and correspondin...
static void make_subgroup_maps(const SubgroupInfo &subgroup_info, const std::unique_ptr< View > &prev_view, View &curr_view)
Initializes curr_view with subgroup information based on the membership functions in subgroup_info...
std::string ip_addr_t
Type alias for IP addresses, currently stored as strings.
static bool contains_at_least_one_member_per_subgroup(std::set< node_id_t > rejoined_node_ids, const View &last_view)
bool read(char *buffer, size_t size)
Reads size bytes from the socket and writes them to the given buffer.
void print_longest_logs() const
bool resend_view_until_quorum_lost()
Repeatedly attempts to send a new restart view, recomputing it on each failure, until either there is...
#define rls_default_warn(...)
std::optional< socket > try_accept(int timeout_ms)
Waits the specified number of milliseconds for a remote client to connect to this connection listener...
static persistent::version_t ragged_trim_to_latest_version(const int32_t view_id, const std::vector< int32_t > &max_received_by_sender)
Computes the persistent version corresponding to a ragged trim proposal, i.e.
#define dbg_default_trace(...)
static std::unique_ptr< View > make_next_view(const std::unique_ptr< View > &curr_view, const std::vector< node_id_t > &joiner_ids, const std::vector< std::tuple< ip_addr_t, uint16_t, uint16_t, uint16_t, uint16_t >> &joiner_ips_and_ports)
Constructs the next view from the current view and a list of joining nodes, by ID and IP address...
std::set< node_id_t > last_known_view_members
const int32_t vid
Sequential view ID: 0, 1, ...
std::unique_ptr< View > curr_view
Takes ownership of ViewManager's curr_view pointer, because await_quroum() might replace curr_view wi...
#define dbg_default_flush()
uint32_t node_id_t
Type alias for Node IDs in a Derecho group.
ragged_trim_map_t logged_ragged_trim
List of logged ragged trim states recovered from the last known View, either read locally from this n...
int64_t send_restart_view()
Sends the currently-computed restart view, the current ragged trim, the current location of the longe...
The subset of a View associated with a single shard, or a single subgroup if the subgroup is non-shar...
std::set< node_id_t > members_sent_restart_view
std::vector< std::vector< int64_t > > nodes_with_longest_log
version_t combine_int32s(const int_type high_bits, const int_type low_bits)
Helper function for creating Persistent version numbers out of MulticastGroup sequence numbers and Vi...
Container for whatever information is needed to describe a Group's subgroups and shards.
TOTAL_RESTART The group is currently restarting from a total failure, so the new member should send i...
std::vector< std::vector< persistent::version_t > > longest_log_versions
std::vector< std::type_index > subgroup_type_order
The order of subgroup types as they were declared in the Group's template parameters.
std::map< node_id_t, std::tuple< ip_addr_t, uint16_t, uint16_t, uint16_t, uint16_t > > rejoined_node_ips_and_ports
std::size_t to_bytes(const ByteRepresentable &b, char *v)
calls b.to_bytes(v) when b is a ByteRepresentable; calls std::memcpy() when b is POD.
std::unique_ptr< View > update_curr_and_next_restart_view()
Updates curr_view and makes a new next_view based on the current set of rejoining nodes during total ...
Base exception class for all exceptions raised by Derecho.
std::map< node_id_t, tcp::socket > waiting_join_sockets
static const int RESTART_LEADER_TIMEOUT
#define dbg_default_warn(...)
std::string ragged_trim_filename(subgroup_id_t subgroup_num, uint32_t shard_num)
Builds a filename to use for a RaggedTrim logged to disk using its subgroup and shard IDs...