Derecho  0.9
Distributed systems toolkit for RDMA
view_manager.hpp
Go to the documentation of this file.
1 
6 #pragma once
7 
8 #include <map>
9 #include <memory>
10 #include <mutex>
11 #include <shared_mutex>
12 #include <string>
13 #include <thread>
14 #include <vector>
15 
16 #include "../subgroup_info.hpp"
17 #include "../view.hpp"
18 #include "derecho_internal.hpp"
19 #include "locked_reference.hpp"
20 #include "multicast_group.hpp"
21 #include "restart_state.hpp"
22 #include <derecho/conf/conf.hpp>
23 
25 #include <spdlog/spdlog.h>
26 
27 namespace derecho {
28 
29 template <typename T>
30 class Replicated;
31 template <typename T>
32 class ExternalCaller;
33 
34 class ReplicatedObject;
35 
36 namespace rpc {
37 class RPCManager;
38 }
39 
44 template <typename T>
45 class LockedQueue {
46 private:
47  using unique_lock_t = std::unique_lock<std::mutex>;
48  std::mutex mutex;
49  std::list<T> underlying_list;
50 
51 public:
53  private:
55 
56  public:
57  std::list<T>& access;
58  LockedListAccess(std::mutex& m, std::list<T>& a) : lock(m), access(a){};
59  };
60  LockedListAccess locked() {
61  return LockedListAccess{mutex, underlying_list};
62  }
63 };
64 
69 enum class JoinResponseCode {
70  OK,
72  ID_IN_USE,
74 };
75 
80 struct JoinResponse {
83 };
84 
85 template <typename T>
87 
88 using view_upcall_t = std::function<void(const View&)>;
89 
91 using vector_int64_2d = std::vector<std::vector<int64_t>>;
92 
93 class ViewManager {
94 private:
96 
97  using initialize_rpc_objects_t = std::function<void(node_id_t, const View&, const std::vector<std::vector<int64_t>>&)>;
98 
99  //Allow RPCManager and Replicated to access curr_view and view_mutex directly
100  friend class rpc::RPCManager;
101  template <typename T>
102  friend class Replicated;
103  template <typename T>
104  friend class ExternalCaller;
105 
106  friend class PersistenceManager;
107 
108  friend class RestartLeaderState;
109 
112  std::shared_timed_mutex view_mutex;
114  std::condition_variable_any view_change_cv;
115 
118  std::unique_ptr<View> curr_view;
121  std::unique_ptr<View> next_view;
122 
125 
127  std::queue<std::unique_ptr<View>> old_views;
128  std::mutex old_views_mutex;
129  std::condition_variable old_views_cv;
130 
132  std::list<tcp::socket> proposed_join_sockets;
135  std::vector<bool> last_suspected;
136 
140  std::atomic<bool> thread_shutdown;
144 
151 
152  //Handles for all the predicates the GMS registered with the current view's SST.
159 
162  std::vector<view_upcall_t> view_upcalls;
167  std::vector<std::type_index> subgroup_type_order;
168 
170  std::shared_ptr<tcp::tcp_connections> tcp_sockets;
171 
172  using ReplicatedObjectReferenceMap = std::map<subgroup_id_t, std::reference_wrapper<ReplicatedObject>>;
192 
196 
200  std::unique_ptr<RestartLeaderState> restart_leader_state_machine;
201 
205  std::unique_ptr<RestartState> restart_state;
206 
212 
215 
223  std::vector<std::vector<int64_t>> prior_view_shard_leaders;
224 
231  std::atomic<bool> bSilent = false;
232 
233  bool has_pending_join() { return pending_join_sockets.locked().access.size() > 0; }
234 
235  /* ---------------------------- View-management triggers ---------------------------- */
240  void new_suspicion(DerechoSST& gmsSST);
242  void leader_start_join(DerechoSST& gmsSST);
244  void redirect_join_attempt(DerechoSST& gmsSST);
250  void new_leader_takeover(DerechoSST& gmsSST);
255  void leader_commit_change(DerechoSST& gmsSST);
260  void acknowledge_proposed_change(DerechoSST& gmsSST);
267  void start_meta_wedge(DerechoSST& gmsSST);
274  void terminate_epoch(DerechoSST& gmsSST);
282  void echo_ragged_trim(std::shared_ptr<std::map<subgroup_id_t, uint32_t>> follower_subgroups_and_shards,
283  DerechoSST& gmsSST);
289  void deliver_ragged_trim(DerechoSST& gmsSST);
296  void finish_view_change(DerechoSST& gmsSST);
297 
298  /* ---------------------------------------------------------------------------------- */
299  /* ------------------- Helper methods for view-management triggers ------------------ */
300 
306  bool receive_join(DerechoSST& gmsSST, tcp::socket& client_socket);
313  void update_tcp_connections(const View& new_view);
314 
318  void send_objects_to_new_members(const View& new_view, const vector_int64_2d& old_shard_leaders);
319 
321  void send_subgroup_object(subgroup_id_t subgroup_id, node_id_t new_node_id);
322 
324  void send_view(const View& new_view, tcp::socket& client_socket);
325 
338  void log_ragged_trim(const int shard_leader_rank,
339  const subgroup_id_t subgroup_num,
340  const uint32_t num_received_offset,
341  const uint num_shard_senders);
356  void deliver_in_order(const int shard_leader_rank,
357  const subgroup_id_t subgroup_num, const uint32_t num_received_offset,
358  const std::vector<node_id_t>& shard_members, uint num_shard_senders);
372  void leader_ragged_edge_cleanup(const subgroup_id_t subgroup_num,
373  const uint32_t num_received_offset,
374  const std::vector<node_id_t>& shard_members,
375  uint num_shard_senders);
388  void follower_ragged_edge_cleanup(const subgroup_id_t subgroup_num,
389  uint shard_leader_rank,
390  const uint32_t num_received_offset,
391  uint num_shard_senders);
392 
393  /* -- Static helper methods that implement chunks of view-management functionality -- */
394  static bool suspected_not_equal(const DerechoSST& gmsSST, const std::vector<bool>& old);
395  static void copy_suspected(const DerechoSST& gmsSST, std::vector<bool>& old);
396  static bool changes_contains(const DerechoSST& gmsSST, const node_id_t q);
397  static int min_acked(const DerechoSST& gmsSST, const std::vector<char>& failed);
398  static bool previous_leaders_suspected(const DerechoSST& gmsSST, const View& curr_view);
399 
410  static bool copy_prior_leader_proposals(DerechoSST& gmsSST);
411 
420  static std::unique_ptr<View> make_next_view(const std::unique_ptr<View>& curr_view,
421  const DerechoSST& gmsSST);
422 
423  /* ---------------------------------------------------------------------------------- */
424 
425  /* ------------------------ Setup/constructor helpers ------------------------------- */
426 
428  void create_threads();
430  void register_predicates();
433  void load_ragged_trim();
436  void await_first_view(const node_id_t my_id);
437 
441  void receive_view_and_leaders(const node_id_t my_id, tcp::socket& leader_connection);
442 
446  void truncate_persistent_logs(const ragged_trim_map_t& logged_ragged_trim);
447 
449  void initialize_rdmc_sst();
453  void receive_initial_view(node_id_t my_id, tcp::socket& leader_connection);
454 
461  void setup_initial_tcp_connections(const View& initial_view, node_id_t my_id);
462 
470  void reinit_tcp_connections(const View& initial_view, node_id_t my_id);
477  void construct_multicast_group(CallbackSet callbacks,
478  const std::map<subgroup_id_t, SubgroupSettings>& subgroup_settings,
479  const uint32_t num_received_size,
480  const uint32_t slot_size);
481 
489  void transition_multicast_group(const std::map<subgroup_id_t, SubgroupSettings>& new_subgroup_settings,
490  const uint32_t new_num_received_size,
491  const uint32_t new_slot_size);
503  static void make_subgroup_maps(const SubgroupInfo& subgroup_info,
504  const std::unique_ptr<View>& prev_view,
505  View& curr_view);
506 
518  std::pair<uint32_t, uint32_t> derive_subgroup_settings(View& curr_view,
519  std::map<subgroup_id_t, SubgroupSettings>& subgroup_settings);
520 
529  static uint32_t compute_num_received_size(const View& view);
530 
534  template <PORT_TYPE port_index>
535  static std::map<node_id_t, std::pair<ip_addr_t, uint16_t>>
537  std::map<node_id_t, std::pair<ip_addr_t, uint16_t>> member_ips_and_ports_map;
538  size_t num_members = view.members.size();
539  for(uint i = 0; i < num_members; ++i) {
540  if(!view.failed[i]) {
541  member_ips_and_ports_map[view.members[i]] = std::pair<ip_addr_t, uint16_t>{
542  std::get<0>(view.member_ips_and_ports[i]),
543  std::get<port_index>(view.member_ips_and_ports[i])};
544  }
545  }
546  return member_ips_and_ports_map;
547  }
554  static vector_int64_2d old_shard_leaders_by_new_ids(const View& curr_view, const View& next_view);
555 
565  template <typename ValueType>
566  static std::unique_ptr<std::vector<std::vector<ValueType>>> receive_vector2d(tcp::socket& socket) {
567  std::size_t buffer_size;
568  socket.read(buffer_size);
569  if(buffer_size == 0) {
570  return std::make_unique<std::vector<std::vector<ValueType>>>();
571  }
572  char buffer[buffer_size];
573  socket.read(buffer, buffer_size);
574  return mutils::from_bytes<std::vector<std::vector<ValueType>>>(nullptr, buffer);
575  }
576 
577 public:
592  ViewManager(const SubgroupInfo& subgroup_info,
593  const std::vector<std::type_index>& subgroup_type_order,
594  const bool any_persistent_objects,
595  const std::shared_ptr<tcp::tcp_connections>& group_tcp_sockets,
596  ReplicatedObjectReferenceMap& object_reference_map,
597  const persistence_manager_callbacks_t& _persistence_manager_callbacks,
598  std::vector<view_upcall_t> _view_upcalls = {});
599 
618  ViewManager(tcp::socket& leader_connection,
619  const SubgroupInfo& subgroup_info,
620  const std::vector<std::type_index>& subgroup_type_order,
621  const bool any_persistent_objects,
622  const std::shared_ptr<tcp::tcp_connections>& group_tcp_sockets,
623  ReplicatedObjectReferenceMap& object_reference_map,
624  const persistence_manager_callbacks_t& _persistence_manager_callbacks,
625  std::vector<view_upcall_t> _view_upcalls = {});
626 
627  ~ViewManager();
628 
633  void await_rejoining_nodes(const node_id_t my_id);
634 
643  bool check_view_committed(tcp::socket& leader_connection);
644 
656  bool leader_prepare_initial_view(bool& leader_has_quorum);
657 
662  void leader_commit_initial_view();
663 
670  void truncate_logs();
671 
678  void send_logs();
679 
686  void initialize_multicast_groups(CallbackSet callbacks);
687 
694  void finish_setup();
695 
700  void start();
701 
707  const vector_int64_2d& get_old_shard_leaders() const { return prior_view_shard_leaders; }
708 
710  void leave();
712  std::vector<node_id_t> get_members();
714  int32_t get_my_rank();
717  std::vector<std::vector<node_id_t>> get_subgroup_members(subgroup_type_id_t subgroup_type, uint32_t subgroup_index);
718  std::size_t get_number_of_shards_in_subgroup(subgroup_type_id_t subgroup_type, uint32_t subgroup_index);
724  int32_t get_my_shard(subgroup_type_id_t subgroup_type, uint32_t subgroup_index);
728  void send(subgroup_id_t subgroup_num, long long unsigned int payload_size,
729  const std::function<void(char* buf)>& msg_generator, bool cooked_send = false);
730 
731  const uint64_t compute_global_stability_frontier(subgroup_id_t subgroup_num);
732 
738  SharedLockedReference<View> get_current_view();
739 
747  SharedLockedReference<const View> get_current_view_const();
748 
751  void add_view_upcall(const view_upcall_t& upcall);
752 
754  void report_failure(const node_id_t who);
756  void barrier_sync();
757 
765  initialize_subgroup_objects = std::move(upcall);
766  }
767 
771  void silence();
772 
773  void debug_print_status() const;
774 
775  // UGLY - IMPROVE LATER
776  std::map<subgroup_id_t, uint64_t> max_payload_sizes;
777  std::map<subgroup_id_t, uint64_t> get_max_payload_sizes();
778  // max of max_payload_sizes
779  uint64_t view_max_rpc_reply_payload_size = 0;
780  uint32_t view_max_rpc_window_size = 0;
781 };
782 
783 } /* namespace derecho */
uint32_t subgroup_id_t
Type alias for the internal Subgroup IDs generated by ViewManager.
std::shared_ptr< tcp::tcp_connections > tcp_sockets
The same set of TCP sockets used by Group and RPCManager.
bool active_leader
True if this node is the current leader and is fully active (i.e.
std::vector< std::vector< int64_t > > vector_int64_2d
Type of a 2-dimensional vector used to store potential node IDs, or -1.
std::condition_variable old_views_cv
const std::vector< node_id_t > members
Node IDs of members in the current view, indexed by their SST rank.
Definition: view.hpp:99
std::atomic< bool > thread_shutdown
A flag to signal background threads to shut down; set to true when the group is destroyed.
std::mutex old_views_mutex
uint32_t subgroup_type_id_t
Type of the numeric ID used to refer to subgroup types within a Group; this is currently computed as ...
Bundles together a JoinResponseCode and the leader&#39;s node ID, which it also needs to send to the new ...
std::map< subgroup_id_t, uint64_t > max_payload_sizes
const vector_int64_2d & get_old_shard_leaders() const
std::map< subgroup_id_t, std::reference_wrapper< ReplicatedObject > > ReplicatedObjectReferenceMap
LockedListAccess(std::mutex &m, std::list< T > &a)
std::vector< std::type_index > subgroup_type_order
Indicates the order that the subgroups should be provisioned; set by Group to be the same order as it...
A little helper class that wraps together a reference and a lock on a related mutex.
pred_handle leader_proposed_handle
LEADER_REDIRECT This node is not actually the leader and can&#39;t accept a join.
static std::unique_ptr< std::vector< std::vector< ValueType > > > receive_vector2d(tcp::socket &socket)
A little convenience method that receives a 2-dimensional vector using our standard network protocol...
pred_handle reject_join_handle
std::unique_ptr< RestartLeaderState > restart_leader_state_machine
If this node is the restart leader and currently doing a total restart, this object contains state re...
bool in_total_restart
Set to true in the constructor if this node must do a total restart before completing group setup; fa...
std::vector< char > failed
failed[i] is true if members[i] is considered to have failed.
Definition: view.hpp:104
bool send(uint16_t group_number, std::shared_ptr< rdma::memory_region > mr, size_t offset, size_t length) __attribute__((warn_unused_result))
const bool disable_partitioning_safety
A user-configurable option that disables the checks for partitioning events.
PersistenceManager is responsible for persisting all the data in a group.
std::list< T > underlying_list
const bool any_persistent_objects
True if any of the Replicated<T> objects in this group have a Persistent<T> field, false if none of them do.
std::thread client_listener_thread
The background thread that listens for clients connecting on our server socket.
bool read(char *buffer, size_t size)
Reads size bytes from the socket and writes them to the given buffer.
Definition: tcp.cpp:138
std::unique_ptr< RestartState > restart_state
If this node is currently doing a total restart, this object contains state related to restarting...
pred_handle start_join_handle
pred_handle leader_committed_handle
const SubgroupInfo subgroup_info
The subgroup membership function, which will be called whenever the view changes. ...
Bundles together a set of callback functions for message delivery events.
std::map< subgroup_id_t, std::map< uint32_t, std::unique_ptr< RaggedTrim > >> ragged_trim_map_t
List of logged ragged trim states, indexed by (subgroup ID, shard num), stored by pointer...
std::unique_ptr< View > next_view
May hold a pointer to the partially-constructed next view, if we are in the process of transitioning ...
const std::vector< std::tuple< ip_addr_t, uint16_t, uint16_t, uint16_t, uint16_t > > member_ips_and_ports
IP addresses and ports (gms, rpc, sst, rdmc in order) of members in the current view, indexed by their SST rank.
Definition: view.hpp:101
std::unique_lock< std::mutex > unique_lock_t
ID_IN_USE The node&#39;s ID is already listed as a member of the current view, so it can&#39;t join...
OK The new member can proceed to join as normal.
pred_handle suspected_changed_handle
std::function< void(const View &)> view_upcall_t
std::vector< bool > last_suspected
A cached copy of the last known value of this node&#39;s suspected[] array.
std::shared_timed_mutex view_mutex
Controls access to curr_view.
uint32_t node_id_t
Type alias for Node IDs in a Derecho group.
JoinResponseCode code
The GMS and derecho_group will share the same SST for efficiency.
Definition: derecho_sst.hpp:22
tcp::connection_listener server_socket
The TCP socket the leader uses to listen for joining clients.
std::condition_variable_any view_change_cv
Notified when curr_view changes (i.e.
JoinResponseCode
A set of status codes the group leader can respond with upon initially receiving a connection request...
std::thread old_view_cleanup_thread
LockedListAccess locked()
static std::map< node_id_t, std::pair< ip_addr_t, uint16_t > > make_member_ips_and_ports_map(const View &view)
Constructs a map from node ID -> IP address from the parallel vectors in the given View...
std::tuple< persistence_manager_make_version_func_t, persistence_manager_post_persist_func_t > persistence_manager_callbacks_t
Container for whatever information is needed to describe a Group&#39;s subgroups and shards.
std::vector< std::vector< int64_t > > prior_view_shard_leaders
A 2-dimensional vector, indexed by (subgroup ID -> shard number), containing the ID of the node in ea...
persistence_manager_callbacks_t persistence_manager_callbacks
The persistence request func is from persistence manager.
TOTAL_RESTART The group is currently restarting from a total failure, so the new member should send i...
std::vector< view_upcall_t > view_upcalls
Functions to be called whenever the view changes, to report the new view to some other component...
std::unique_ptr< View > curr_view
The current View, containing the state of the managed group.
std::function< void(node_id_t, const View &, const std::vector< std::vector< int64_t > > &)> initialize_rpc_objects_t
initialize_rpc_objects_t initialize_subgroup_objects
A function that will be called to initialize replicated objects after transitioning to a new view...
std::queue< std::unique_ptr< View > > old_views
Contains old Views that need to be cleaned up.
void register_initialize_objects_upcall(initialize_rpc_objects_t upcall)
Registers a function that will initialize all the RPC objects at this node, given a new view and a li...
ReplicatedObjectReferenceMap & subgroup_objects
A type-erased list of references to the Replicated<T> objects in this group, indexed by their subgrou...
LockedQueue< tcp::socket > pending_join_sockets
On the leader node, contains client sockets for pending joins that have not yet been handled...
A little helper class that implements a threadsafe queue by requiring all clients to lock a mutex bef...
pred_handle change_commit_ready_handle
std::list< tcp::socket > proposed_join_sockets
The sockets connected to clients that will join in the next view, if any.