4 #include <condition_variable> 17 #include "../derecho_modes.hpp" 18 #include "../subgroup_info.hpp" 28 #include <spdlog/spdlog.h> 73 const uint64_t max_payload_size,
74 const uint64_t block_size,
76 auto max_msg_size = max_payload_size +
sizeof(header);
78 if(max_msg_size % block_size != 0) {
79 max_msg_size = (max_msg_size / block_size + 1) * block_size;
86 if(rdmc_send_algorithm_string ==
"binomial_send") {
88 }
else if(rdmc_send_algorithm_string ==
"chain_send") {
90 }
else if(rdmc_send_algorithm_string ==
"sequential_send") {
92 }
else if(rdmc_send_algorithm_string ==
"tree_send") {
95 throw "wrong value for RDMC send algorithm: " + rdmc_send_algorithm_string +
". Check your config file.";
100 uint64_t max_reply_payload_size,
101 uint64_t max_smc_payload_size,
103 unsigned int window_size,
104 unsigned int heartbeat_ms,
107 : max_reply_msg_size(max_reply_payload_size + sizeof(header)),
108 sst_max_msg_size(max_smc_payload_size + sizeof(header)),
109 block_size(block_size),
110 window_size(window_size),
111 heartbeat_ms(heartbeat_ms),
112 rdmc_send_algorithm(rdmc_send_algorithm),
115 max_msg_size = compute_max_msg_size(max_payload_size, block_size,
116 max_payload_size > max_smc_payload_size);
129 std::string prefix =
"SUBGROUP/" + profile +
"/";
132 std::cout <<
"profile " << profile
133 <<
" not found in SUBGROUP section of derecho conf. " 134 "Look at derecho-sample.cfg for more information." 136 throw profile +
" derecho subgroup profile not found";
140 uint64_t max_payload_size =
getConfUInt64(prefix + Conf::subgroupProfileFields[0]);
141 uint64_t max_reply_payload_size =
getConfUInt64(prefix + Conf::subgroupProfileFields[1]);
142 uint64_t max_smc_payload_size =
getConfUInt64(prefix + Conf::subgroupProfileFields[2]);
143 uint64_t block_size =
getConfUInt64(prefix + Conf::subgroupProfileFields[3]);
144 uint32_t window_size =
getConfUInt32(prefix + Conf::subgroupProfileFields[4]);
146 const std::string& algorithm =
getConfString(prefix + Conf::subgroupProfileFields[5]);
151 max_reply_payload_size,
152 max_smc_payload_size,
162 sst_max_msg_size, block_size, window_size,
163 heartbeat_ms, rdmc_send_algorithm, rpc_port);
174 std::shared_ptr<rdma::memory_region>
mr;
179 buffer = std::unique_ptr<char[]>(
new char[size]);
180 mr = std::make_shared<rdma::memory_region>(buffer.get(), size);
276 bool rdmc_sst_groups_created =
false;
320 std::atomic<bool> thread_shutdown{
false};
327 std::shared_ptr<DerechoSST>
sst;
356 void check_failures_loop();
358 bool create_rdmc_sst_groups();
359 void initialize_sst_row();
360 void register_predicates();
412 for(
const auto i : shard_senders) {
420 int32_t resolve_num_received(int32_t index, uint32_t num_received_entry);
426 const uint32_t num_shard_members,
DerechoSST& sst);
429 const std::map<uint32_t, uint32_t>& shard_ranks_by_sender_rank,
430 uint32_t num_shard_senders, uint32_t sender_rank,
431 volatile char* data, uint64_t size);
434 const std::map<uint32_t, uint32_t>& shard_ranks_by_sender_rank,
435 uint32_t num_shard_senders,
const DerechoSST& sst);
438 const std::map<uint32_t, uint32_t>& shard_ranks_by_sender_rank,
439 uint32_t num_shard_senders,
DerechoSST& sst,
unsigned int batch_size,
440 const std::function<
void(uint32_t,
volatile char*, uint32_t)>& sst_receive_handler_lambda);
443 void get_buffer_and_send_auto_null(
subgroup_id_t subgroup_num);
446 char* get_sendbuffer_ptr(
subgroup_id_t subgroup_num,
long long unsigned int payload_size,
bool cooked_send);
469 std::vector<node_id_t> members,
node_id_t my_node_id,
470 std::shared_ptr<DerechoSST> sst,
472 uint32_t total_num_subgroups,
473 const std::map<subgroup_id_t, SubgroupSettings>& subgroup_settings_by_id,
474 unsigned int sender_timeout,
477 std::vector<char> already_failed = {});
481 std::vector<node_id_t> members,
node_id_t my_node_id,
482 std::shared_ptr<DerechoSST> sst,
484 uint32_t total_num_subgroups,
485 const std::map<subgroup_id_t, SubgroupSettings>& subgroup_settings_by_id,
488 std::vector<char> already_failed = {});
498 void deliver_messages_upto(
const std::vector<int32_t>& max_indices_for_senders,
subgroup_id_t subgroup_num, uint32_t num_shard_senders);
502 const std::function<
void(
char* buf)>& msg_generator,
bool cooked_send);
505 const uint64_t compute_global_stability_frontier(
subgroup_id_t subgroup_num);
517 return subgroup_settings_map;
519 std::vector<uint32_t> get_shard_sst_indices(
subgroup_id_t subgroup_num);
uint32_t subgroup_id_t
Type alias for the internal Subgroup IDs generated by ViewManager.
std::map< subgroup_id_t, uint32_t > subgroup_to_rdmc_group
Maps subgroup IDs for which this node is a sender to the RDMC group it should use to send...
std::map< subgroup_id_t, std::map< message_id_t, SSTMessage > > non_persistent_sst_messages
Messages that are currently being written to persistent storage.
std::list< pred_handle > stability_pred_handles
const int member_index
index of the local node in the members vector, which should also be its row index in the SST ...
std::condition_variable_any sender_cv
long long unsigned int size
The message's size in bytes.
const std::map< subgroup_id_t, SubgroupSettings > & get_subgroup_settings()
std::vector< std::queue< RDMCMessage > > pending_sends
Messages that are ready to be sent, but must wait until the current send finishes.
std::function< void(subgroup_id_t, node_id_t, message_id_t, std::optional< std::pair< char *, long long int > >, persistent::version_t)> message_callback_t
Alias for the type of std::function that is used for message delivery event callbacks.
std::map< uint32_t, bool > pending_sst_sends
uint32_t shard_rank
This node's rank within its shard of the subgroup.
uint32_t sender_id
The unique node ID of the message's sender.
uint16_t rdmc_group_num_offset
Offset to add to member ranks to form RDMC group numbers.
uint32_t sender_id
The unique node ID of the message's sender.
std::vector< int > senders
The "is_sender" flags for members of the subgroup.
Bundles together a set of low-level parameters for configuring Derecho groups.
static const std::vector< std::string > subgroupProfileFields
const std::map< subgroup_id_t, SubgroupSettings > subgroup_settings_map
Maps subgroup IDs (for subgroups this node is a member of) to an immutable set of configuration optio...
unsigned int sender_timeout
The time, in milliseconds, that a sender can wait to send a message before it is considered failed...
static uint64_t compute_max_msg_size(const uint64_t max_payload_size, const uint64_t block_size, bool using_rdmc)
subgroup_post_next_version_func_t post_next_version_callback
post the next version to a subgroup just before deliver a message so that the user code know the curr...
const std::string & getConfString(const std::string &key)
unsigned int heartbeat_ms
A structure containing an RDMC message (which consists of some bytes in a registered memory region) a...
const unsigned int num_members
number of members
std::map< subgroup_id_t, std::function< void(char *, size_t)> > singleton_shard_receive_handlers
Receiver lambdas for shards that have only one member.
std::vector< std::unique_ptr< sst::multicast_group< DerechoSST > > > sst_multicast_group_ptrs
The SSTs for multicasts.
uint64_t sst_max_msg_size
MessageBuffer message_buffer
The MessageBuffer that contains the message's body.
const uint32_t getConfUInt32(const std::string &key)
std::function< void(subgroup_id_t, node_id_t, char *, uint32_t)> rpc_handler_t
std::list< pred_handle > persistence_pred_handles
uint32_t num_received_offset
The offset of this node's num_received counter within the subgroup's SST section. ...
persistence_callback_t local_persistence_callback
rdmc::send_algorithm rdmc_send_algorithm
MessageBuffer(size_t size)
#define DEFAULT_SERIALIZATION_SUPPORT(CLASS_NAME, CLASS_MEMBERS...)
THIS (below) is the only user-facing macro in this file.
std::map< subgroup_id_t, std::map< message_id_t, RDMCMessage > > non_persistent_messages
Messages that are currently being written to persistent storage.
std::recursive_mutex msg_state_mtx
A non-POD type which wishes to mark itself byte representable should extend this class.
bool send(uint16_t group_number, std::shared_ptr< rdma::memory_region > mr, size_t offset, size_t length) __attribute__((warn_unused_result))
std::vector< std::optional< RDMCMessage > > next_sends
next_message is the message that will be sent when send is called the next time.
std::vector< node_id_t > members
vector of member id's
int32_t index
The message's index (relative to other messages sent by that sender).
message_id_t index
The message's index (relative to other messages sent by that sender).
static DerechoParams from_profile(const std::string &profile)
Constructs DerechoParams specifying subgroup metadata for specified profile.
rpc_handler_t rpc_callback
These two callbacks are internal, not exposed to clients, so they're not in CallbackSet.
DerechoParams(uint64_t max_payload_size, uint64_t max_reply_payload_size, uint64_t max_smc_payload_size, uint64_t block_size, unsigned int window_size, unsigned int heartbeat_ms, rdmc::send_algorithm rdmc_send_algorithm, uint32_t rpc_port)
uint32_t get_num_senders(const std::vector< int > &shard_senders)
uint32_t shard_num
This node's shard number within the subgroup.
#define CONF_DERECHO_HEARTBEAT_MS
std::function< void(subgroup_id_t, persistent::version_t)> persistence_callback_t
std::vector< message_id_t > next_message_to_deliver
int32_t message_id_t
Type alias for a message's unique "sequence number" or index.
static rdmc::send_algorithm send_algorithm_from_string(const std::string &rdmc_send_algorithm_string)
std::map< subgroup_id_t, std::set< uint64_t > > pending_message_timestamps
#define CONF_DERECHO_RPC_PORT
const CallbackSet callbacks
Message-delivery event callbacks, supplied by the client, for "raw" sends.
Bundles together a set of callback functions for message delivery events.
uint64_t max_reply_msg_size
std::thread sender_thread
The background thread that sends messages with RDMC.
std::vector< node_id_t > members
The members of the subgroup.
Mode mode
The operation mode of the subgroup.
Represents a block of memory used to store a message.
uint32_t slot_offset
The offset of this node's slot within the subgroup's SST section.
std::map< subgroup_id_t, std::map< message_id_t, SSTMessage > > locally_stable_sst_messages
Same map as locally_stable_rdmc_messages, but for SST messages.
struct __attribute__((__packed__)) header
The header for an individual multicast message, which will always be the first sizeof(header) bytes i...
uint32_t node_id_t
Type alias for Node IDs in a Derecho group.
std::map< subgroup_id_t, std::map< message_id_t, RDMCMessage > > locally_stable_rdmc_messages
Messages that have finished sending/receiving but aren't yet globally stable.
The GMS and derecho_group will share the same SST for efficiency.
void register_rpc_callback(rpc_handler_t handler)
Registers a function to be called upon receipt of a multicast RPC message.
std::thread timeout_thread
std::list< pred_handle > sender_pred_handles
std::map< node_id_t, uint32_t > node_id_to_sst_index
inverse map of node_ids to sst_row
std::shared_ptr< DerechoSST > sst
The SST, shared between this group and its GMS.
std::vector< message_id_t > future_message_indices
Index to be used the next time get_sendbuffer_ptr is called.
std::unique_ptr< char[]> buffer
std::vector< std::optional< RDMCMessage > > current_sends
Vector of messages that are currently being sent out using RDMC, or boost::none otherwise.
std::tuple< persistence_manager_make_version_func_t, persistence_manager_post_persist_func_t > persistence_manager_callbacks_t
volatile char * buf
Pointer to the message.
std::shared_ptr< rdma::memory_region > mr
std::list< pred_handle > receiver_pred_handles
std::function< void(const subgroup_id_t &, const persistent::version_t &, const uint64_t &)> subgroup_post_next_version_func_t
uint32_t total_num_subgroups
int sender_rank
This node's sender rank within the subgroup (as defined by SubView::sender_rank_of) ...
std::vector< bool > last_transfer_medium
std::map< subgroup_id_t, std::map< message_id_t, uint64_t > > pending_persistence
message_callback_t global_stability_callback
long long unsigned int size
The message's size in bytes.
typename sst::Predicates< DerechoSST >::pred_handle pred_handle
std::list< pred_handle > delivery_pred_handles
Implements the low-level mechanics of tracking multicasts in a Derecho group, using RDMC to deliver m...
A collection of settings for a single subgroup that this node is a member of.
const uint64_t getConfUInt64(const std::string &key)
persistence_manager_callbacks_t persistence_manager_callbacks
persistence manager callbacks
std::map< std::pair< subgroup_id_t, node_id_t >, RDMCMessage > current_receives
Messages that are currently being received.
persistence_callback_t global_persistence_callback
const bool hasCustomizedConfKey(const std::string &key)
std::vector< std::list< int32_t > > received_intervals
Used for synchronizing receives by RDMC and SST.
std::map< uint32_t, std::vector< MessageBuffer > > free_message_buffers
Stores message buffers not currently in use.