Derecho  0.9
Distributed systems toolkit for RDMA
multicast_group.hpp
Go to the documentation of this file.
1 #pragma once
2 
3 #include <assert.h>
4 #include <condition_variable>
5 #include <functional>
6 #include <list>
7 #include <map>
8 #include <memory>
9 #include <mutex>
10 #include <optional>
11 #include <ostream>
12 #include <queue>
13 #include <set>
14 #include <tuple>
15 #include <vector>
16 
17 #include "../derecho_modes.hpp"
18 #include "../subgroup_info.hpp"
19 #include "connection_manager.hpp"
20 #include "derecho_internal.hpp"
21 #include "derecho_sst.hpp"
22 #include <derecho/conf/conf.hpp>
25 #include <derecho/rdmc/rdmc.hpp>
27 #include <derecho/sst/sst.hpp>
28 #include <spdlog/spdlog.h>
29 
30 namespace derecho {
31 
39 struct CallbackSet {
43 };
44 
49 struct __attribute__((__packed__)) header {
50  uint32_t header_size;
51  int32_t index;
52  uint64_t timestamp;
53  bool cooked_send;
54 };
55 
63  uint64_t max_msg_size;
65  uint64_t sst_max_msg_size;
66  uint64_t block_size;
67  unsigned int window_size;
68  unsigned int heartbeat_ms;
70  uint32_t rpc_port;
71 
72  static uint64_t compute_max_msg_size(
73  const uint64_t max_payload_size,
74  const uint64_t block_size,
75  bool using_rdmc) {
76  auto max_msg_size = max_payload_size + sizeof(header);
77  if(using_rdmc) {
78  if(max_msg_size % block_size != 0) {
79  max_msg_size = (max_msg_size / block_size + 1) * block_size;
80  }
81  }
82  return max_msg_size;
83  }
84 
85  static rdmc::send_algorithm send_algorithm_from_string(const std::string& rdmc_send_algorithm_string) {
86  if(rdmc_send_algorithm_string == "binomial_send") {
88  } else if(rdmc_send_algorithm_string == "chain_send") {
90  } else if(rdmc_send_algorithm_string == "sequential_send") {
92  } else if(rdmc_send_algorithm_string == "tree_send") {
94  } else {
95  throw "wrong value for RDMC send algorithm: " + rdmc_send_algorithm_string + ". Check your config file.";
96  }
97  }
98 
99  DerechoParams(uint64_t max_payload_size,
100  uint64_t max_reply_payload_size,
101  uint64_t max_smc_payload_size,
102  uint64_t block_size,
103  unsigned int window_size,
104  unsigned int heartbeat_ms,
105  rdmc::send_algorithm rdmc_send_algorithm,
106  uint32_t rpc_port)
107  : max_reply_msg_size(max_reply_payload_size + sizeof(header)),
108  sst_max_msg_size(max_smc_payload_size + sizeof(header)),
109  block_size(block_size),
110  window_size(window_size),
111  heartbeat_ms(heartbeat_ms),
112  rdmc_send_algorithm(rdmc_send_algorithm),
113  rpc_port(rpc_port) {
114  //if this is initialized above, DerechoParams turns abstract. idk why.
115  max_msg_size = compute_max_msg_size(max_payload_size, block_size,
116  max_payload_size > max_smc_payload_size);
117  }
118 
120 
126  static DerechoParams from_profile(const std::string& profile) {
127  // Use the profile string to search the configuration file for the appropriate
128  // settings. If they do not exist, then we should utilize the defaults
129  std::string prefix = "SUBGROUP/" + profile + "/";
130  for(auto& field : Conf::subgroupProfileFields) {
131  if(!hasCustomizedConfKey(prefix + field)) {
132  std::cout << "profile " << profile
133  << " not found in SUBGROUP section of derecho conf. "
134  "Look at derecho-sample.cfg for more information."
135  << std::endl;
136  throw profile + " derecho subgroup profile not found";
137  }
138  }
139 
140  uint64_t max_payload_size = getConfUInt64(prefix + Conf::subgroupProfileFields[0]);
141  uint64_t max_reply_payload_size = getConfUInt64(prefix + Conf::subgroupProfileFields[1]);
142  uint64_t max_smc_payload_size = getConfUInt64(prefix + Conf::subgroupProfileFields[2]);
143  uint64_t block_size = getConfUInt64(prefix + Conf::subgroupProfileFields[3]);
144  uint32_t window_size = getConfUInt32(prefix + Conf::subgroupProfileFields[4]);
145  uint32_t timeout_ms = getConfUInt32(CONF_DERECHO_HEARTBEAT_MS);
146  const std::string& algorithm = getConfString(prefix + Conf::subgroupProfileFields[5]);
147  uint32_t rpc_port = getConfUInt32(CONF_DERECHO_RPC_PORT);
148 
149  return DerechoParams{
150  max_payload_size,
151  max_reply_payload_size,
152  max_smc_payload_size,
153  block_size,
154  window_size,
155  timeout_ms,
157  rpc_port,
158  };
159  }
160 
161  DEFAULT_SERIALIZATION_SUPPORT(DerechoParams, max_msg_size, max_reply_msg_size,
162  sst_max_msg_size, block_size, window_size,
163  heartbeat_ms, rdmc_send_algorithm, rpc_port);
164 };
165 
173  std::unique_ptr<char[]> buffer;
174  std::shared_ptr<rdma::memory_region> mr;
175 
177  MessageBuffer(size_t size) {
178  if(size != 0) {
179  buffer = std::unique_ptr<char[]>(new char[size]);
180  mr = std::make_shared<rdma::memory_region>(buffer.get(), size);
181  }
182  }
183  MessageBuffer(const MessageBuffer&) = delete;
184  MessageBuffer(MessageBuffer&&) = default;
185  MessageBuffer& operator=(const MessageBuffer&) = delete;
186  MessageBuffer& operator=(MessageBuffer&&) = default;
187 };
188 
195 struct RDMCMessage {
197  uint32_t sender_id;
199  //long long int index;
202  long long unsigned int size;
205 };
206 
207 struct SSTMessage {
209  uint32_t sender_id;
211  int32_t index;
213  long long unsigned int size;
215  volatile char* buf;
216 };
217 
225  uint32_t shard_num;
227  uint32_t shard_rank;
229  std::vector<node_id_t> members;
231  std::vector<int> senders;
237  uint32_t slot_offset;
241 };
242 
248  friend class ViewManager;
249 
250 private:
252  std::vector<node_id_t> members;
254  std::map<node_id_t, uint32_t> node_id_to_sst_index;
256  const unsigned int num_members;
258  const int member_index;
264  const std::map<subgroup_id_t, SubgroupSettings> subgroup_settings_map;
266  std::vector<std::list<int32_t>> received_intervals;
269  std::map<subgroup_id_t, uint32_t> subgroup_to_rdmc_group;
272 
276  bool rdmc_sst_groups_created = false;
279  std::map<uint32_t, std::vector<MessageBuffer>> free_message_buffers;
280 
283  std::vector<message_id_t> future_message_indices;
284 
287  std::vector<std::optional<RDMCMessage>> next_sends;
288  std::map<uint32_t, bool> pending_sst_sends;
290  std::vector<std::queue<RDMCMessage>> pending_sends;
293  std::vector<std::optional<RDMCMessage>> current_sends;
294 
296  std::map<std::pair<subgroup_id_t, node_id_t>, RDMCMessage> current_receives;
298  std::map<subgroup_id_t, std::function<void(char*, size_t)>> singleton_shard_receive_handlers;
299 
302  std::map<subgroup_id_t, std::map<message_id_t, RDMCMessage>> locally_stable_rdmc_messages;
304  std::map<subgroup_id_t, std::map<message_id_t, SSTMessage>> locally_stable_sst_messages;
305  std::map<subgroup_id_t, std::set<uint64_t>> pending_message_timestamps;
306  std::map<subgroup_id_t, std::map<message_id_t, uint64_t>> pending_persistence;
308  std::map<subgroup_id_t, std::map<message_id_t, RDMCMessage>> non_persistent_messages;
310  std::map<subgroup_id_t, std::map<message_id_t, SSTMessage>> non_persistent_sst_messages;
311 
312  std::vector<message_id_t> next_message_to_deliver;
313  std::recursive_mutex msg_state_mtx;
314  std::condition_variable_any sender_cv;
315 
317  unsigned int sender_timeout;
318 
320  std::atomic<bool> thread_shutdown{false};
322  std::thread sender_thread;
323 
324  std::thread timeout_thread;
325 
327  std::shared_ptr<DerechoSST> sst;
328 
330  std::vector<std::unique_ptr<sst::multicast_group<DerechoSST>>> sst_multicast_group_ptrs;
331 
333  std::list<pred_handle> receiver_pred_handles;
334  std::list<pred_handle> stability_pred_handles;
335  std::list<pred_handle> delivery_pred_handles;
336  std::list<pred_handle> persistence_pred_handles;
337  std::list<pred_handle> sender_pred_handles;
338 
339  std::vector<bool> last_transfer_medium;
340 
344 
347 
350  void send_loop();
351 
352  uint64_t get_time();
353 
356  void check_failures_loop();
357 
358  bool create_rdmc_sst_groups();
359  void initialize_sst_row();
360  void register_predicates();
361 
370  void deliver_message(RDMCMessage& msg, const subgroup_id_t& subgroup_num,
371  const persistent::version_t& version, const uint64_t& msg_timestamp);
372 
380  void deliver_message(SSTMessage& msg, const subgroup_id_t& subgroup_num,
381  const persistent::version_t& version, const uint64_t& msg_timestamp);
382 
395  bool version_message(RDMCMessage& msg, const subgroup_id_t& subgroup_num,
396  const persistent::version_t& version, const uint64_t& msg_timestamp);
407  bool version_message(SSTMessage& msg, const subgroup_id_t& subgroup_num,
408  const persistent::version_t& version, const uint64_t& msg_timestamp);
409 
410  uint32_t get_num_senders(const std::vector<int>& shard_senders) {
411  uint32_t num = 0;
412  for(const auto i : shard_senders) {
413  if(i) {
414  num++;
415  }
416  }
417  return num;
418  };
419 
420  int32_t resolve_num_received(int32_t index, uint32_t num_received_entry);
421 
422  /* Predicate functions for receiving and delivering messages, parameterized by subgroup.
423  * register_predicates will create and bind one of these for each subgroup. */
424 
425  void delivery_trigger(subgroup_id_t subgroup_num, const SubgroupSettings& subgroup_settings,
426  const uint32_t num_shard_members, DerechoSST& sst);
427 
428  void sst_receive_handler(subgroup_id_t subgroup_num, const SubgroupSettings& subgroup_settings,
429  const std::map<uint32_t, uint32_t>& shard_ranks_by_sender_rank,
430  uint32_t num_shard_senders, uint32_t sender_rank,
431  volatile char* data, uint64_t size);
432 
433  bool receiver_predicate(const SubgroupSettings& subgroup_settings,
434  const std::map<uint32_t, uint32_t>& shard_ranks_by_sender_rank,
435  uint32_t num_shard_senders, const DerechoSST& sst);
436 
437  void receiver_function(subgroup_id_t subgroup_num, const SubgroupSettings& subgroup_settings,
438  const std::map<uint32_t, uint32_t>& shard_ranks_by_sender_rank,
439  uint32_t num_shard_senders, DerechoSST& sst, unsigned int batch_size,
440  const std::function<void(uint32_t, volatile char*, uint32_t)>& sst_receive_handler_lambda);
441 
442  // Internally used to automatically send a NULL message
443  void get_buffer_and_send_auto_null(subgroup_id_t subgroup_num);
444  /* Get a pointer into the current buffer, to write data into it before sending
445  * Now this is a private function, called by send internally */
446  char* get_sendbuffer_ptr(subgroup_id_t subgroup_num, long long unsigned int payload_size, bool cooked_send);
447 
448 public:
469  std::vector<node_id_t> members, node_id_t my_node_id,
470  std::shared_ptr<DerechoSST> sst,
471  CallbackSet callbacks,
472  uint32_t total_num_subgroups,
473  const std::map<subgroup_id_t, SubgroupSettings>& subgroup_settings_by_id,
474  unsigned int sender_timeout,
475  const subgroup_post_next_version_func_t& post_next_version_callback,
476  const persistence_manager_callbacks_t& persistence_manager_callbacks,
477  std::vector<char> already_failed = {});
481  std::vector<node_id_t> members, node_id_t my_node_id,
482  std::shared_ptr<DerechoSST> sst,
483  MulticastGroup&& old_group,
484  uint32_t total_num_subgroups,
485  const std::map<subgroup_id_t, SubgroupSettings>& subgroup_settings_by_id,
486  const subgroup_post_next_version_func_t& post_next_version_callback,
487  const persistence_manager_callbacks_t& persistence_manager_callbacks,
488  std::vector<char> already_failed = {});
489 
490  ~MulticastGroup();
491 
496  void register_rpc_callback(rpc_handler_t handler) { rpc_callback = std::move(handler); }
497 
498  void deliver_messages_upto(const std::vector<int32_t>& max_indices_for_senders, subgroup_id_t subgroup_num, uint32_t num_shard_senders);
501  bool send(subgroup_id_t subgroup_num, long long unsigned int payload_size,
502  const std::function<void(char* buf)>& msg_generator, bool cooked_send);
503  bool check_pending_sst_sends(subgroup_id_t subgroup_num);
504 
505  const uint64_t compute_global_stability_frontier(subgroup_id_t subgroup_num);
506 
508  void wedge();
510  void debug_print();
511 
516  const std::map<subgroup_id_t, SubgroupSettings>& get_subgroup_settings() {
517  return subgroup_settings_map;
518  }
519  std::vector<uint32_t> get_shard_sst_indices(subgroup_id_t subgroup_num);
520 };
521 } // namespace derecho
uint32_t subgroup_id_t
Type alias for the internal Subgroup IDs generated by ViewManager.
std::map< subgroup_id_t, uint32_t > subgroup_to_rdmc_group
Maps subgroup IDs for which this node is a sender to the RDMC group it should use to send...
std::map< subgroup_id_t, std::map< message_id_t, SSTMessage > > non_persistent_sst_messages
Messages that are currently being written to persistent storage.
std::list< pred_handle > stability_pred_handles
const int member_index
index of the local node in the members vector, which should also be its row index in the SST ...
std::condition_variable_any sender_cv
long long unsigned int size
The message&#39;s size in bytes.
const std::map< subgroup_id_t, SubgroupSettings > & get_subgroup_settings()
std::vector< std::queue< RDMCMessage > > pending_sends
Messages that are ready to be sent, but must wait until the current send finishes.
std::function< void(subgroup_id_t, node_id_t, message_id_t, std::optional< std::pair< char *, long long int > >, persistent::version_t)> message_callback_t
Alias for the type of std::function that is used for message delivery event callbacks.
std::map< uint32_t, bool > pending_sst_sends
uint32_t shard_rank
This node&#39;s rank within its shard of the subgroup.
uint32_t sender_id
The unique node ID of the message&#39;s sender.
uint16_t rdmc_group_num_offset
Offset to add to member ranks to form RDMC group numbers.
uint32_t sender_id
The unique node ID of the message&#39;s sender.
std::vector< int > senders
The "is_sender" flags for members of the subgroup.
uint64_t get_time()
Definition: time.h:13
Bundles together a set of low-level parameters for configuring Derecho groups.
static const std::vector< std::string > subgroupProfileFields
Definition: conf.hpp:186
const std::map< subgroup_id_t, SubgroupSettings > subgroup_settings_map
Maps subgroup IDs (for subgroups this node is a member of) to an immutable set of configuration optio...
unsigned int sender_timeout
The time, in milliseconds, that a sender can wait to send a message before it is considered failed...
static uint64_t compute_max_msg_size(const uint64_t max_payload_size, const uint64_t block_size, bool using_rdmc)
subgroup_post_next_version_func_t post_next_version_callback
post the next version to a subgroup just before deliver a message so that the user code know the curr...
const std::string & getConfString(const std::string &key)
Definition: conf.cpp:110
A structure containing an RDMC message (which consists of some bytes in a registered memory region) a...
const unsigned int num_members
number of members
std::map< subgroup_id_t, std::function< void(char *, size_t)> > singleton_shard_receive_handlers
Receiver lambdas for shards that have only one member.
std::vector< std::unique_ptr< sst::multicast_group< DerechoSST > > > sst_multicast_group_ptrs
The SSTs for multicasts.
MessageBuffer message_buffer
The MessageBuffer that contains the message&#39;s body.
const uint32_t getConfUInt32(const std::string &key)
Definition: conf.cpp:118
std::function< void(subgroup_id_t, node_id_t, char *, uint32_t)> rpc_handler_t
std::list< pred_handle > persistence_pred_handles
uint32_t num_received_offset
The offset of this node&#39;s num_received counter within the subgroup&#39;s SST section. ...
persistence_callback_t local_persistence_callback
rdmc::send_algorithm rdmc_send_algorithm
#define DEFAULT_SERIALIZATION_SUPPORT(CLASS_NAME, CLASS_MEMBERS...)
THIS (below) is the only user-facing macro in this file.
std::map< subgroup_id_t, std::map< message_id_t, RDMCMessage > > non_persistent_messages
Messages that are currently being written to persistent storage.
std::recursive_mutex msg_state_mtx
A non-POD type which wishes to mark itself byte representable should extend this class.
bool send(uint16_t group_number, std::shared_ptr< rdma::memory_region > mr, size_t offset, size_t length) __attribute__((warn_unused_result))
std::vector< std::optional< RDMCMessage > > next_sends
next_message is the message that will be sent when send is called the next time.
std::vector< node_id_t > members
vector of member id&#39;s
int32_t index
The message&#39;s index (relative to other messages sent by that sender).
message_id_t index
The message&#39;s index (relative to other messages sent by that sender).
static DerechoParams from_profile(const std::string &profile)
Constructs DerechoParams specifying subgroup metadata for specified profile.
rpc_handler_t rpc_callback
These two callbacks are internal, not exposed to clients, so they&#39;re not in CallbackSet.
DerechoParams(uint64_t max_payload_size, uint64_t max_reply_payload_size, uint64_t max_smc_payload_size, uint64_t block_size, unsigned int window_size, unsigned int heartbeat_ms, rdmc::send_algorithm rdmc_send_algorithm, uint32_t rpc_port)
uint32_t get_num_senders(const std::vector< int > &shard_senders)
uint32_t shard_num
This node&#39;s shard number within the subgroup.
#define CONF_DERECHO_HEARTBEAT_MS
Definition: conf.hpp:30
std::function< void(subgroup_id_t, persistent::version_t)> persistence_callback_t
std::vector< message_id_t > next_message_to_deliver
int32_t message_id_t
Type alias for a message&#39;s unique "sequence number" or index.
send_algorithm
Definition: rdmc.hpp:28
static rdmc::send_algorithm send_algorithm_from_string(const std::string &rdmc_send_algorithm_string)
std::map< subgroup_id_t, std::set< uint64_t > > pending_message_timestamps
#define CONF_DERECHO_RPC_PORT
Definition: conf.hpp:27
const CallbackSet callbacks
Message-delivery event callbacks, supplied by the client, for "raw" sends.
Bundles together a set of callback functions for message delivery events.
std::thread sender_thread
The background thread that sends messages with RDMC.
std::vector< node_id_t > members
The members of the subgroup.
Mode mode
The operation mode of the subgroup.
Represents a block of memory used to store a message.
uint32_t slot_offset
The offset of this node&#39;s slot within the subgroup&#39;s SST section.
std::map< subgroup_id_t, std::map< message_id_t, SSTMessage > > locally_stable_sst_messages
Same map as locally_stable_rdmc_messages, but for SST messages.
struct __attribute__((__packed__)) header
The header for an individual multicast message, which will always be the first sizeof(header) bytes i...
uint32_t node_id_t
Type alias for Node IDs in a Derecho group.
std::map< subgroup_id_t, std::map< message_id_t, RDMCMessage > > locally_stable_rdmc_messages
Messages that have finished sending/receiving but aren&#39;t yet globally stable.
The GMS and derecho_group will share the same SST for efficiency.
Definition: derecho_sst.hpp:22
void register_rpc_callback(rpc_handler_t handler)
Registers a function to be called upon receipt of a multicast RPC message.
std::list< pred_handle > sender_pred_handles
std::map< node_id_t, uint32_t > node_id_to_sst_index
inverse map of node_ids to sst_row
std::shared_ptr< DerechoSST > sst
The SST, shared between this group and its GMS.
std::vector< message_id_t > future_message_indices
Index to be used the next time get_sendbuffer_ptr is called.
std::unique_ptr< char[]> buffer
std::vector< std::optional< RDMCMessage > > current_sends
Vector of messages that are currently being sent out using RDMC, or boost::none otherwise.
std::tuple< persistence_manager_make_version_func_t, persistence_manager_post_persist_func_t > persistence_manager_callbacks_t
volatile char * buf
Pointer to the message.
std::shared_ptr< rdma::memory_region > mr
std::list< pred_handle > receiver_pred_handles
std::function< void(const subgroup_id_t &, const persistent::version_t &, const uint64_t &)> subgroup_post_next_version_func_t
int sender_rank
This node&#39;s sender rank within the subgroup (as defined by SubView::sender_rank_of) ...
std::vector< bool > last_transfer_medium
std::map< subgroup_id_t, std::map< message_id_t, uint64_t > > pending_persistence
message_callback_t global_stability_callback
long long unsigned int size
The message&#39;s size in bytes.
typename sst::Predicates< DerechoSST >::pred_handle pred_handle
std::list< pred_handle > delivery_pred_handles
Implements the low-level mechanics of tracking multicasts in a Derecho group, using RDMC to deliver m...
A collection of settings for a single subgroup that this node is a member of.
const uint64_t getConfUInt64(const std::string &key)
Definition: conf.cpp:134
persistence_manager_callbacks_t persistence_manager_callbacks
persistence manager callbacks
std::map< std::pair< subgroup_id_t, node_id_t >, RDMCMessage > current_receives
Messages that are currently being received.
persistence_callback_t global_persistence_callback
const bool hasCustomizedConfKey(const std::string &key)
Definition: conf.cpp:150
std::vector< std::list< int32_t > > received_intervals
Used for synchronizing receives by RDMC and SST.
std::map< uint32_t, std::vector< MessageBuffer > > free_message_buffers
Stores message buffers not currently in use.