11 #include <shared_mutex> 16 #include "../subgroup_info.hpp" 17 #include "../view.hpp" 25 #include <spdlog/spdlog.h> 34 class ReplicatedObject;
61 return LockedListAccess{mutex, underlying_list};
101 template <
typename T>
103 template <
typename T>
231 std::atomic<bool> bSilent =
false;
244 void redirect_join_attempt(
DerechoSST& gmsSST);
255 void leader_commit_change(
DerechoSST& gmsSST);
260 void acknowledge_proposed_change(
DerechoSST& gmsSST);
282 void echo_ragged_trim(std::shared_ptr<std::map<subgroup_id_t, uint32_t>> follower_subgroups_and_shards,
313 void update_tcp_connections(
const View& new_view);
318 void send_objects_to_new_members(
const View& new_view,
const vector_int64_2d& old_shard_leaders);
338 void log_ragged_trim(
const int shard_leader_rank,
340 const uint32_t num_received_offset,
341 const uint num_shard_senders);
356 void deliver_in_order(
const int shard_leader_rank,
357 const subgroup_id_t subgroup_num,
const uint32_t num_received_offset,
358 const std::vector<node_id_t>& shard_members, uint num_shard_senders);
372 void leader_ragged_edge_cleanup(
const subgroup_id_t subgroup_num,
373 const uint32_t num_received_offset,
374 const std::vector<node_id_t>& shard_members,
375 uint num_shard_senders);
388 void follower_ragged_edge_cleanup(
const subgroup_id_t subgroup_num,
389 uint shard_leader_rank,
390 const uint32_t num_received_offset,
391 uint num_shard_senders);
394 static bool suspected_not_equal(
const DerechoSST& gmsSST,
const std::vector<bool>& old);
395 static void copy_suspected(
const DerechoSST& gmsSST, std::vector<bool>& old);
397 static int min_acked(
const DerechoSST& gmsSST,
const std::vector<char>& failed);
398 static bool previous_leaders_suspected(
const DerechoSST& gmsSST,
const View& curr_view);
410 static bool copy_prior_leader_proposals(
DerechoSST& gmsSST);
420 static std::unique_ptr<View> make_next_view(
const std::unique_ptr<View>& curr_view,
428 void create_threads();
430 void register_predicates();
433 void load_ragged_trim();
436 void await_first_view(
const node_id_t my_id);
449 void initialize_rdmc_sst();
461 void setup_initial_tcp_connections(
const View& initial_view,
node_id_t my_id);
470 void reinit_tcp_connections(
const View& initial_view,
node_id_t my_id);
477 void construct_multicast_group(
CallbackSet callbacks,
478 const std::map<subgroup_id_t, SubgroupSettings>& subgroup_settings,
479 const uint32_t num_received_size,
480 const uint32_t slot_size);
489 void transition_multicast_group(
const std::map<subgroup_id_t, SubgroupSettings>& new_subgroup_settings,
490 const uint32_t new_num_received_size,
491 const uint32_t new_slot_size);
503 static void make_subgroup_maps(
const SubgroupInfo& subgroup_info,
504 const std::unique_ptr<View>& prev_view,
518 std::pair<uint32_t, uint32_t> derive_subgroup_settings(
View& curr_view,
519 std::map<subgroup_id_t, SubgroupSettings>& subgroup_settings);
529 static uint32_t compute_num_received_size(
const View& view);
534 template <PORT_TYPE port_index>
535 static std::map<node_id_t, std::pair<ip_addr_t, uint16_t>>
537 std::map<node_id_t, std::pair<ip_addr_t, uint16_t>> member_ips_and_ports_map;
538 size_t num_members = view.
members.size();
539 for(uint i = 0; i < num_members; ++i) {
541 member_ips_and_ports_map[view.
members[i]] = std::pair<ip_addr_t, uint16_t>{
546 return member_ips_and_ports_map;
565 template <
typename ValueType>
567 std::size_t buffer_size;
568 socket.
read(buffer_size);
569 if(buffer_size == 0) {
570 return std::make_unique<std::vector<std::vector<ValueType>>>();
572 char buffer[buffer_size];
573 socket.
read(buffer, buffer_size);
574 return mutils::from_bytes<std::vector<std::vector<ValueType>>>(
nullptr, buffer);
593 const std::vector<std::type_index>& subgroup_type_order,
594 const bool any_persistent_objects,
595 const std::shared_ptr<tcp::tcp_connections>& group_tcp_sockets,
598 std::vector<view_upcall_t> _view_upcalls = {});
620 const std::vector<std::type_index>& subgroup_type_order,
621 const bool any_persistent_objects,
622 const std::shared_ptr<tcp::tcp_connections>& group_tcp_sockets,
625 std::vector<view_upcall_t> _view_upcalls = {});
633 void await_rejoining_nodes(
const node_id_t my_id);
643 bool check_view_committed(
tcp::socket& leader_connection);
656 bool leader_prepare_initial_view(
bool& leader_has_quorum);
662 void leader_commit_initial_view();
670 void truncate_logs();
686 void initialize_multicast_groups(
CallbackSet callbacks);
712 std::vector<node_id_t> get_members();
714 int32_t get_my_rank();
717 std::vector<std::vector<node_id_t>> get_subgroup_members(
subgroup_type_id_t subgroup_type, uint32_t subgroup_index);
718 std::size_t get_number_of_shards_in_subgroup(
subgroup_type_id_t subgroup_type, uint32_t subgroup_index);
729 const std::function<
void(
char* buf)>& msg_generator,
bool cooked_send =
false);
731 const uint64_t compute_global_stability_frontier(
subgroup_id_t subgroup_num);
754 void report_failure(
const node_id_t who);
765 initialize_subgroup_objects = std::move(upcall);
773 void debug_print_status()
const;
777 std::map<subgroup_id_t, uint64_t> get_max_payload_sizes();
779 uint64_t view_max_rpc_reply_payload_size = 0;
780 uint32_t view_max_rpc_window_size = 0;
uint32_t subgroup_id_t
Type alias for the internal Subgroup IDs generated by ViewManager.
std::shared_ptr< tcp::tcp_connections > tcp_sockets
The same set of TCP sockets used by Group and RPCManager.
bool active_leader
True if this node is the current leader and is fully active (i.e.
std::vector< std::vector< int64_t > > vector_int64_2d
Type of a 2-dimensional vector used to store potential node IDs, or -1.
std::condition_variable old_views_cv
const std::vector< node_id_t > members
Node IDs of members in the current view, indexed by their SST rank.
std::atomic< bool > thread_shutdown
A flag to signal background threads to shut down; set to true when the group is destroyed.
std::mutex old_views_mutex
uint32_t subgroup_type_id_t
Type of the numeric ID used to refer to subgroup types within a Group; this is currently computed as ...
Bundles together a JoinResponseCode and the leader's node ID, which it also needs to send to the new ...
std::map< subgroup_id_t, uint64_t > max_payload_sizes
const vector_int64_2d & get_old_shard_leaders() const
std::map< subgroup_id_t, std::reference_wrapper< ReplicatedObject > > ReplicatedObjectReferenceMap
LockedListAccess(std::mutex &m, std::list< T > &a)
std::vector< std::type_index > subgroup_type_order
Indicates the order that the subgroups should be provisioned; set by Group to be the same order as it...
A little helper class that wraps together a reference and a lock on a related mutex.
pred_handle leader_proposed_handle
LEADER_REDIRECT This node is not actually the leader and can't accept a join.
static std::unique_ptr< std::vector< std::vector< ValueType > > > receive_vector2d(tcp::socket &socket)
A little convenience method that receives a 2-dimensional vector using our standard network protocol...
pred_handle reject_join_handle
std::unique_ptr< RestartLeaderState > restart_leader_state_machine
If this node is the restart leader and currently doing a total restart, this object contains state re...
bool in_total_restart
Set to true in the constructor if this node must do a total restart before completing group setup; fa...
std::vector< char > failed
failed[i] is true if members[i] is considered to have failed.
bool send(uint16_t group_number, std::shared_ptr< rdma::memory_region > mr, size_t offset, size_t length) __attribute__((warn_unused_result))
const bool disable_partitioning_safety
A user-configurable option that disables the checks for partitioning events.
PersistenceManager is responsible for persisting all the data in a group.
std::list< T > underlying_list
const bool any_persistent_objects
True if any of the Replicated<T> objects in this group have a Persistent<T> field, false if none of them do.
std::thread client_listener_thread
The background thread that listens for clients connecting on our server socket.
bool read(char *buffer, size_t size)
Reads size bytes from the socket and writes them to the given buffer.
std::unique_ptr< RestartState > restart_state
If this node is currently doing a total restart, this object contains state related to restarting...
pred_handle start_join_handle
pred_handle leader_committed_handle
const SubgroupInfo subgroup_info
The subgroup membership function, which will be called whenever the view changes. ...
Bundles together a set of callback functions for message delivery events.
std::map< subgroup_id_t, std::map< uint32_t, std::unique_ptr< RaggedTrim > >> ragged_trim_map_t
List of logged ragged trim states, indexed by (subgroup ID, shard num), stored by pointer...
std::unique_ptr< View > next_view
May hold a pointer to the partially-constructed next view, if we are in the process of transitioning ...
const std::vector< std::tuple< ip_addr_t, uint16_t, uint16_t, uint16_t, uint16_t > > member_ips_and_ports
IP addresses and ports (gms, rpc, sst, rdmc in order) of members in the current view, indexed by their SST rank.
std::unique_lock< std::mutex > unique_lock_t
ID_IN_USE The node's ID is already listed as a member of the current view, so it can't join...
OK The new member can proceed to join as normal.
pred_handle suspected_changed_handle
std::function< void(const View &)> view_upcall_t
std::vector< bool > last_suspected
A cached copy of the last known value of this node's suspected[] array.
std::shared_timed_mutex view_mutex
Controls access to curr_view.
uint32_t node_id_t
Type alias for Node IDs in a Derecho group.
The GMS and derecho_group will share the same SST for efficiency.
tcp::connection_listener server_socket
The TCP socket the leader uses to listen for joining clients.
std::condition_variable_any view_change_cv
Notified when curr_view changes (i.e.
JoinResponseCode
A set of status codes the group leader can respond with upon initially receiving a connection request...
std::thread old_view_cleanup_thread
LockedListAccess locked()
static std::map< node_id_t, std::pair< ip_addr_t, uint16_t > > make_member_ips_and_ports_map(const View &view)
Constructs a map from node ID -> IP address from the parallel vectors in the given View...
std::tuple< persistence_manager_make_version_func_t, persistence_manager_post_persist_func_t > persistence_manager_callbacks_t
Container for whatever information is needed to describe a Group's subgroups and shards.
std::vector< std::vector< int64_t > > prior_view_shard_leaders
A 2-dimensional vector, indexed by (subgroup ID -> shard number), containing the ID of the node in ea...
persistence_manager_callbacks_t persistence_manager_callbacks
The persistence request func is from persistence manager.
TOTAL_RESTART The group is currently restarting from a total failure, so the new member should send i...
std::vector< view_upcall_t > view_upcalls
Functions to be called whenever the view changes, to report the new view to some other component...
std::unique_ptr< View > curr_view
The current View, containing the state of the managed group.
std::function< void(node_id_t, const View &, const std::vector< std::vector< int64_t > > &)> initialize_rpc_objects_t
initialize_rpc_objects_t initialize_subgroup_objects
A function that will be called to initialize replicated objects after transitioning to a new view...
std::queue< std::unique_ptr< View > > old_views
Contains old Views that need to be cleaned up.
void register_initialize_objects_upcall(initialize_rpc_objects_t upcall)
Registers a function that will initialize all the RPC objects at this node, given a new view and a li...
ReplicatedObjectReferenceMap & subgroup_objects
A type-erased list of references to the Replicated<T> objects in this group, indexed by their subgrou...
LockedQueue< tcp::socket > pending_join_sockets
On the leader node, contains client sockets for pending joins that have not yet been handled...
A little helper class that implements a threadsafe queue by requiring all clients to lock a mutex bef...
pred_handle change_commit_ready_handle
std::list< tcp::socket > proposed_join_sockets
The sockets connected to clients that will join in the next view, if any.