38 for(
auto receivers_iterator =
receivers->begin();
39 receivers_iterator !=
receivers->end();) {
40 if(receivers_iterator->first.subgroup_id == instance_id) {
41 receivers_iterator =
receivers->erase(receivers_iterator);
68 const Opcode& indx,
const node_id_t& received_from,
char const*
const buf,
69 std::size_t payload_size,
const std::function<
char*(
int)>& out_alloc) {
70 using namespace remote_invocation_utilities;
72 auto receiver_function_entry =
receivers->find(indx);
73 if(receiver_function_entry ==
receivers->end()) {
74 dbg_default_error(
"Received an RPC message with an invalid RPC opcode! Opcode was ({}, {}, {}, {}).",
77 return std::exception_ptr{};
80 recv_ret reply_return = receiver_function_entry->second(
81 &
rdv, received_from, buf,
82 [&out_alloc, &reply_header_size](std::size_t size) {
83 return out_alloc(size + reply_header_size) + reply_header_size;
85 auto* reply_buf = reply_return.
payload;
87 reply_buf -= reply_header_size;
88 const auto id = reply_return.
opcode;
89 const auto size = reply_return.
size;
96 const std::function<
char*(
int)>& out_alloc) {
97 using namespace remote_invocation_utilities;
99 std::size_t payload_size = size;
105 payload_size, out_alloc);
109 char* msg_buf, uint32_t buffer_size) {
114 _in_rpc_handler =
true;
117 size_t reply_size = 0;
120 [
this, &reply_buf, &reply_size, &sender_id](
size_t size) ->
char* {
122 if(reply_size <= connections->get_max_p2p_reply_size()) {
123 reply_buf = (
char*)
connections->get_sendbuffer_ptr(
131 if(sender_id ==
nid) {
151 reply_buf, reply_size,
154 }
else if(reply_size > 0) {
160 _in_rpc_handler =
false;
164 using namespace remote_invocation_utilities;
166 std::size_t payload_size;
170 retrieve_header(
nullptr, msg_buf, payload_size, indx, received_from, flags);
171 size_t reply_size = 0;
174 receive_message(indx, received_from, msg_buf + header_size, payload_size,
175 [
this, &buffer_size, &reply_size, &sender_id](
size_t _size) ->
char* {
177 if(reply_size <= buffer_size) {
194 fifo_queue.emplace(sender_id, msg_buf, buffer_size);
208 const subgroup_id_t subgroup_id = fulfilled_pending_results_pair.first;
211 for(
auto pending_results_iter = fulfilled_pending_results_pair.second.begin();
212 pending_results_iter != fulfilled_pending_results_pair.second.end();) {
214 if(pending_results_iter->get().all_responded()) {
215 pending_results_iter = fulfilled_pending_results_pair.second.erase(pending_results_iter);
217 for(uint32_t shard_num = 0;
223 dbg_default_debug(
"Setting exception for removed node {} on PendingResults for subgroup {}, shard {}", removed_id, subgroup_id, shard_num);
224 pending_results_iter->get().set_exception_for_removed_node(removed_id);
227 pending_results_iter++;
243 uint32_t dest_rank = 0;
252 }
catch(std::out_of_range& map_error) {
257 buf =
connections->get_sendbuffer_ptr(dest_rank, type);
268 }
catch(std::out_of_range& map_error) {
277 pthread_setname_np(pthread_self(),
"fifo_thread");
278 using namespace remote_invocation_utilities;
280 std::size_t payload_size;
284 size_t reply_size = 0;
299 dbg_default_error(
"Invalid rpc message in fifo queue: is_reply={}, is_cascading={}",
304 [
this, &reply_size, &request](
size_t _size) ->
char* {
306 if(reply_size <= request.buffer_size) {
307 return (char*)connections->get_sendbuffer_ptr(
308 connections->get_node_rank(request.sender_id), sst::REQUEST_TYPE::P2P_REPLY);
324 pthread_setname_np(pthread_self(),
"rpc_thread");
328 _in_rpc_handler =
true;
338 struct timespec last_time, cur_time;
339 clock_gettime(CLOCK_REALTIME, &last_time);
344 auto optional_reply_pair =
connections->probe_all();
345 if(optional_reply_pair) {
346 auto reply_pair = optional_reply_pair.value();
351 clock_gettime(CLOCK_REALTIME, &last_time);
353 clock_gettime(CLOCK_REALTIME, &cur_time);
355 double time_elapsed_in_ms = (cur_time.tv_sec - last_time.tv_sec) * 1e3
356 + (cur_time.tv_nsec - last_time.tv_nsec) / 1e6;
357 if(time_elapsed_in_ms > 1) {
358 connections_lock.unlock();
360 std::this_thread::sleep_for(1ms);
361 connections_lock.lock();
uint32_t subgroup_id_t
Type alias for the internal Subgroup IDs generated by ViewManager.
std::exception_ptr parse_and_receive(char *buf, std::size_t size, const std::function< char *(int)> &out_alloc)
Entry point for receiving a single RPC message for a function managed by this RPCManager.
virtual void fulfill_map(const node_list_t &)=0
std::vector< std::vector< SubView > > subgroup_shard_views
Maps subgroup ID -> shard number -> SubView for that subgroup/shard.
const std::vector< node_id_t > members
Node IDs of members in the current view, indexed by their SST rank.
#define CONF_SUBGROUP_DEFAULT_MAX_PAYLOAD_SIZE
std::condition_variable thread_start_cv
Notified when the P2P listening thread should start.
subgroup_id_t subgroup_id
An RPC function call can be uniquely identified by the tuple (class, subgroup ID, function ID...
void rpc_message_handler(subgroup_id_t subgroup_id, node_id_t sender_id, char *msg_buf, uint32_t buffer_size)
Handler to be called by MulticastGroup when it receives a message that appears to be a "cooked send" ...
std::size_t header_space()
uint64_t view_max_rpc_reply_payload_size
thread_local bool _in_rpc_handler
std::map< subgroup_id_t, std::queue< PendingBase_ref > > pending_results_to_fulfill
bool in_rpc_handler()
defined in rpc_manager.h
subgroup_type_id_t class_id
std::map< subgroup_id_t, std::list< PendingBase_ref > > fulfilled_pending_results
std::unique_ptr< sst::P2PConnections > connections
Contains an RDMA connection to each member of the group.
#define CONF_DERECHO_MAX_P2P_REPLY_PAYLOAD_SIZE
const uint32_t getConfUInt32(const std::string &key)
const node_id_t nid
The ID of the node this RPCManager is running on.
void start_listening()
Starts the thread that listens for incoming P2P RPC requests over the RDMA P2P connections.
void fifo_worker()
Handle Non-cascading P2P Send and P2P Queries in fifo.
#define dbg_default_debug(...)
#define RPC_HEADER_FLAG_TST(f, name)
#define dbg_default_error(...)
volatile char * get_sendbuffer_ptr(uint32_t dest_id, sst::REQUEST_TYPE type)
Retrieves a buffer for sending P2P messages from the RPCManager's pool of P2P RDMA connections...
std::mutex fifo_queue_mutex
std::mutex pending_results_mutex
This mutex guards both toFulfillQueue and fulfilledList.
void p2p_message_handler(node_id_t sender_id, char *msg_buf, uint32_t buffer_size)
Handler to be called by rpc_process_loop each time it receives a peer-to-peer message over an RDMA P2...
uint32_t view_max_rpc_window_size
void p2p_receive_loop()
Listens for P2P RPC calls over the RDMA P2P connections and handles them.
void populate_header(char *reply_buf, const std::size_t &payload_size, const Opcode &op, const node_id_t &from, const uint32_t &flags)
void finish_p2p_send(node_id_t dest_node, subgroup_id_t dest_subgroup_id, PendingBase &pending_results_handle)
Sends the next P2P message buffer over an RDMA connection to the specified node, and registers the "p...
std::thread fifo_worker_thread
p2p send and queries are queued in fifo worker
std::exception_ptr possible_exception
#define CONF_DERECHO_P2P_WINDOW_SIZE
std::exception_ptr receive_message(const Opcode &indx, const node_id_t &received_from, char const *const buf, std::size_t payload_size, const std::function< char *(int)> &out_alloc)
Processes an RPC message for any of the functions managed by this RPCManager, using the opcode to for...
ViewManager & view_manager
void new_view_callback(const View &new_view)
Callback for new-view events that updates internal state in response to joins or leaves.
#define CONF_DERECHO_MAX_P2P_REQUEST_PAYLOAD_SIZE
void retrieve_header(mutils::RemoteDeserialization_v *rdv, const char *reply_buf, std::size_t &payload_size, Opcode &op, node_id_t &from, uint32_t &flags)
std::shared_timed_mutex view_mutex
Controls access to curr_view.
uint32_t node_id_t
Type alias for Node IDs in a Derecho group.
Return type of all the RemoteInvocable::receive_* methods.
bool finish_rpc_send(subgroup_id_t subgroup_id, PendingBase &pending_results_handle)
Sends the next message in the MulticastGroup's send buffer (which is assumed to be an RPC message pre...
std::mutex thread_start_mutex
Mutex for thread_start_cv.
std::mutex p2p_connections_mutex
This provides mutual exclusion between the P2P listening thread and the view-change thread...
Indicates that an RPC call to a node failed because the node was removed from the Replicated Object's...
void destroy_remote_invocable_class(uint32_t instance_id)
void create_connections()
Abstract base type for PendingResults.
#define assert_always(x...)
mutils::RemoteDeserialization_v rdv
An emtpy DeserializationManager, in case we need it later.
std::unique_ptr< View > curr_view
The current View, containing the state of the managed group.
Base exception class for all exceptions raised by Derecho.
std::condition_variable fifo_queue_cv
const uint64_t getConfUInt64(const std::string &key)
std::unique_ptr< std::map< Opcode, receive_fun_t > > receivers
A map from FunctionIDs to RPC functions, either the "server" stubs that receive remote calls to invok...
std::queue< fifo_req > fifo_queue
std::atomic< bool > thread_shutdown
std::condition_variable pending_results_cv
This condition variable is to resolve a race condition in using ToFulfillQueue and fulfilledList...