Derecho  0.9
Distributed systems toolkit for RDMA
rpc_manager.cpp
Go to the documentation of this file.
1 
7 #include <cassert>
8 #include <iostream>
9 
11 
12 namespace derecho {
13 
14 namespace rpc {
15 
16 thread_local bool _in_rpc_handler = false;
17 
19  thread_shutdown = true;
20  if(rpc_thread.joinable()) {
21  rpc_thread.join();
22  }
23 }
24 
26  connections = std::make_unique<sst::P2PConnections>(sst::P2PParams{
27  nid,
28  {nid},
34 }
35 
36 void RPCManager::destroy_remote_invocable_class(uint32_t instance_id) {
37  //Delete receiver functions that were added by this class/subgroup
38  for(auto receivers_iterator = receivers->begin();
39  receivers_iterator != receivers->end();) {
40  if(receivers_iterator->first.subgroup_id == instance_id) {
41  receivers_iterator = receivers->erase(receivers_iterator);
42  } else {
43  receivers_iterator++;
44  }
45  }
46  //Deliver a node_removed_from_shard_exception to the QueryResults for this class
47  //Important: This only works because the Replicated destructor runs before the
48  //wrapped_this member is destroyed; otherwise the PendingResults we're referencing
49  //would already have been deleted.
50  std::lock_guard<std::mutex> lock(pending_results_mutex);
51  while(!pending_results_to_fulfill[instance_id].empty()) {
52  pending_results_to_fulfill[instance_id].front().get().set_exception_for_caller_removed();
53  pending_results_to_fulfill[instance_id].pop();
54  }
55  while(!fulfilled_pending_results[instance_id].empty()) {
56  fulfilled_pending_results[instance_id].front().get().set_exception_for_caller_removed();
57  fulfilled_pending_results[instance_id].pop_front();
58  }
59 }
60 
62  std::lock_guard<std::mutex> lock(thread_start_mutex);
63  thread_start = true;
64  thread_start_cv.notify_all();
65 }
66 
67 std::exception_ptr RPCManager::receive_message(
68  const Opcode& indx, const node_id_t& received_from, char const* const buf,
69  std::size_t payload_size, const std::function<char*(int)>& out_alloc) {
70  using namespace remote_invocation_utilities;
71  assert(payload_size);
72  auto receiver_function_entry = receivers->find(indx);
73  if(receiver_function_entry == receivers->end()) {
74  dbg_default_error("Received an RPC message with an invalid RPC opcode! Opcode was ({}, {}, {}, {}).",
75  indx.class_id, indx.subgroup_id, indx.function_id, indx.is_reply);
76  //TODO: We should reply with some kind of "no such method" error in this case
77  return std::exception_ptr{};
78  }
79  std::size_t reply_header_size = header_space();
80  recv_ret reply_return = receiver_function_entry->second(
81  &rdv, received_from, buf,
82  [&out_alloc, &reply_header_size](std::size_t size) {
83  return out_alloc(size + reply_header_size) + reply_header_size;
84  });
85  auto* reply_buf = reply_return.payload;
86  if(reply_buf) {
87  reply_buf -= reply_header_size;
88  const auto id = reply_return.opcode;
89  const auto size = reply_return.size;
90  populate_header(reply_buf, size, id, nid, 0);
91  }
92  return reply_return.possible_exception;
93 }
94 
95 std::exception_ptr RPCManager::parse_and_receive(char* buf, std::size_t size,
96  const std::function<char*(int)>& out_alloc) {
97  using namespace remote_invocation_utilities;
98  assert(size >= header_space());
99  std::size_t payload_size = size;
100  Opcode indx;
101  node_id_t received_from;
102  uint32_t flags;
103  retrieve_header(&rdv, buf, payload_size, indx, received_from, flags);
104  return receive_message(indx, received_from, buf + header_space(),
105  payload_size, out_alloc);
106 }
107 
109  char* msg_buf, uint32_t buffer_size) {
110  // WARNING: This assumes the current view doesn't change during execution!
111  // (It accesses curr_view without a lock).
112 
113  // set the thread local rpc_handler context
114  _in_rpc_handler = true;
115 
116  //Use the reply-buffer allocation lambda to detect whether parse_and_receive generated a reply
117  size_t reply_size = 0;
118  char* reply_buf;
119  parse_and_receive(msg_buf, buffer_size,
120  [this, &reply_buf, &reply_size, &sender_id](size_t size) -> char* {
121  reply_size = size;
122  if(reply_size <= connections->get_max_p2p_reply_size()) {
123  reply_buf = (char*)connections->get_sendbuffer_ptr(
124  connections->get_node_rank(sender_id), sst::REQUEST_TYPE::RPC_REPLY);
125  return reply_buf;
126  } else {
127  // the reply size is too large - not part of the design to handle it
128  return nullptr;
129  }
130  });
131  if(sender_id == nid) {
132  //This is a self-receive of an RPC message I sent, so I have a reply-map that needs fulfilling
133  const uint32_t my_shard = view_manager.curr_view->my_subgroups.at(subgroup_id);
134  {
135  std::unique_lock<std::mutex> lock(pending_results_mutex);
136  // because of a race condition, pending_results_to_fulfill can genuinely be empty
137  // so before accessing it we should sleep on a condition variable and let the main
138  // thread that called the orderedSend signal us
139  // although the race condition is infinitely rare
140  pending_results_cv.wait(lock, [&]() { return !pending_results_to_fulfill[subgroup_id].empty(); });
141  //We now know the membership of "all nodes in my shard of the subgroup" in the current view
142  pending_results_to_fulfill[subgroup_id].front().get().fulfill_map(
143  view_manager.curr_view->subgroup_shard_views.at(subgroup_id).at(my_shard).members);
144  fulfilled_pending_results[subgroup_id].push_back(
145  std::move(pending_results_to_fulfill[subgroup_id].front()));
146  pending_results_to_fulfill[subgroup_id].pop();
147  } //release pending_results_mutex
148  if(reply_size > 0) {
149  //Since this was a self-receive, the reply also goes to myself
151  reply_buf, reply_size,
152  [](size_t size) -> char* { assert_always(false); });
153  }
154  } else if(reply_size > 0) {
155  //Otherwise, the only thing to do is send the reply (if there was one)
156  connections->send(connections->get_node_rank(sender_id));
157  }
158 
159  // clear the thread local rpc_handler context
160  _in_rpc_handler = false;
161 }
162 
163 void RPCManager::p2p_message_handler(node_id_t sender_id, char* msg_buf, uint32_t buffer_size) {
164  using namespace remote_invocation_utilities;
165  const std::size_t header_size = header_space();
166  std::size_t payload_size;
167  Opcode indx;
168  node_id_t received_from;
169  uint32_t flags;
170  retrieve_header(nullptr, msg_buf, payload_size, indx, received_from, flags);
171  size_t reply_size = 0;
172  if(indx.is_reply) {
173  // REPLYs can be handled here because they do not block.
174  receive_message(indx, received_from, msg_buf + header_size, payload_size,
175  [this, &buffer_size, &reply_size, &sender_id](size_t _size) -> char* {
176  reply_size = _size;
177  if(reply_size <= buffer_size) {
178  return (char*)connections->get_sendbuffer_ptr(
179  connections->get_node_rank(sender_id), sst::REQUEST_TYPE::P2P_REPLY);
180  }
181  return nullptr;
182  });
183  if(reply_size > 0) {
184  connections->send(connections->get_node_rank(sender_id));
185  }
186  } else if(RPC_HEADER_FLAG_TST(flags, CASCADE)) {
187  // TODO: what is the lifetime of msg_buf? discuss with Sagar to make
188  // sure the buffers are safely managed.
189  // for cascading messages, we create a new thread.
190  throw derecho::derecho_exception("Cascading P2P Send/Queries to be implemented!");
191  } else {
192  // send to fifo queue.
193  std::unique_lock<std::mutex> lock(fifo_queue_mutex);
194  fifo_queue.emplace(sender_id, msg_buf, buffer_size);
195  fifo_queue_cv.notify_one();
196  }
197 }
198 
199 //This is always called while holding a write lock on view_manager.view_mutex
200 void RPCManager::new_view_callback(const View& new_view) {
201  {
202  std::lock_guard<std::mutex> connections_lock(p2p_connections_mutex);
203  connections = std::make_unique<sst::P2PConnections>(std::move(*connections), new_view.members);
204  }
205  dbg_default_debug("Created new connections among the new view members");
206  std::lock_guard<std::mutex> lock(pending_results_mutex);
207  for(auto& fulfilled_pending_results_pair : fulfilled_pending_results) {
208  const subgroup_id_t subgroup_id = fulfilled_pending_results_pair.first;
209  //For each PendingResults in this subgroup, check the departed list of each shard
210  //the subgroup, and call set_exception_for_removed_node for the departed nodes
211  for(auto pending_results_iter = fulfilled_pending_results_pair.second.begin();
212  pending_results_iter != fulfilled_pending_results_pair.second.end();) {
213  //Garbage-collect PendingResults references that are obsolete
214  if(pending_results_iter->get().all_responded()) {
215  pending_results_iter = fulfilled_pending_results_pair.second.erase(pending_results_iter);
216  } else {
217  for(uint32_t shard_num = 0;
218  shard_num < new_view.subgroup_shard_views[subgroup_id].size();
219  ++shard_num) {
220  for(auto removed_id : new_view.subgroup_shard_views[subgroup_id][shard_num].departed) {
221  //This will do nothing if removed_id was never in the
222  //shard this PendingResult corresponds to
223  dbg_default_debug("Setting exception for removed node {} on PendingResults for subgroup {}, shard {}", removed_id, subgroup_id, shard_num);
224  pending_results_iter->get().set_exception_for_removed_node(removed_id);
225  }
226  }
227  pending_results_iter++;
228  }
229  }
230  }
231 }
232 
233 bool RPCManager::finish_rpc_send(subgroup_id_t subgroup_id, PendingBase& pending_results_handle) {
234  std::lock_guard<std::mutex> lock(pending_results_mutex);
235  pending_results_to_fulfill[subgroup_id].push(pending_results_handle);
236  pending_results_cv.notify_all();
237  return true;
238 }
239 
240 volatile char* RPCManager::get_sendbuffer_ptr(uint32_t dest_id, sst::REQUEST_TYPE type) {
241  volatile char* buf;
242  int curr_vid = -1;
243  uint32_t dest_rank = 0;
244  do {
245  //This lock also prevents connections from being reassigned (because that happens
246  //in new_view_callback), so we don't need p2p_connections_mutex
247  std::shared_lock<std::shared_timed_mutex> view_read_lock(view_manager.view_mutex);
248  //Check to see if the view changed between iterations of the loop, and re-get the rank
249  if(curr_vid != view_manager.curr_view->vid) {
250  try {
251  dest_rank = connections->get_node_rank(dest_id);
252  } catch(std::out_of_range& map_error) {
253  throw node_removed_from_group_exception(dest_id);
254  }
255  curr_vid = view_manager.curr_view->vid;
256  }
257  buf = connections->get_sendbuffer_ptr(dest_rank, type);
258  } while(!buf);
259  return buf;
260 }
261 
262 void RPCManager::finish_p2p_send(node_id_t dest_id, subgroup_id_t dest_subgroup_id, PendingBase& pending_results_handle) {
263  try {
264  //This lock also prevents connections from being reassigned (because that happens
265  //in new_view_callback), so we don't need p2p_connections_mutex
266  std::shared_lock<std::shared_timed_mutex> view_read_lock(view_manager.view_mutex);
267  connections->send(connections->get_node_rank(dest_id));
268  } catch(std::out_of_range& map_error) {
269  throw node_removed_from_group_exception(dest_id);
270  }
271  pending_results_handle.fulfill_map({dest_id});
272  std::lock_guard<std::mutex> lock(pending_results_mutex);
273  fulfilled_pending_results[dest_subgroup_id].push_back(pending_results_handle);
274 }
275 
277  pthread_setname_np(pthread_self(), "fifo_thread");
278  using namespace remote_invocation_utilities;
279  const std::size_t header_size = header_space();
280  std::size_t payload_size;
281  Opcode indx;
282  node_id_t received_from;
283  uint32_t flags;
284  size_t reply_size = 0;
285  fifo_req request;
286 
287  while(!thread_shutdown) {
288  {
289  std::unique_lock<std::mutex> lock(fifo_queue_mutex);
290  fifo_queue_cv.wait(lock, [&]() { return !fifo_queue.empty() || thread_shutdown; });
291  if(thread_shutdown) {
292  break;
293  }
294  request = fifo_queue.front();
295  fifo_queue.pop();
296  }
297  retrieve_header(nullptr, request.msg_buf, payload_size, indx, received_from, flags);
298  if(indx.is_reply || RPC_HEADER_FLAG_TST(flags, CASCADE)) {
299  dbg_default_error("Invalid rpc message in fifo queue: is_reply={}, is_cascading={}",
300  indx.is_reply, RPC_HEADER_FLAG_TST(flags, CASCADE));
301  throw derecho::derecho_exception("invalid rpc message in fifo queue...crash.");
302  }
303  receive_message(indx, received_from, request.msg_buf + header_size, payload_size,
304  [this, &reply_size, &request](size_t _size) -> char* {
305  reply_size = _size;
306  if(reply_size <= request.buffer_size) {
307  return (char*)connections->get_sendbuffer_ptr(
308  connections->get_node_rank(request.sender_id), sst::REQUEST_TYPE::P2P_REPLY);
309  }
310  return nullptr;
311  });
312  if(reply_size > 0) {
313  connections->send(connections->get_node_rank(request.sender_id));
314  } else {
315  // hack for now to "simulate" a reply for p2p_sends to functions that do not generate a reply
316  char* buf = connections->get_sendbuffer_ptr(connections->get_node_rank(request.sender_id), sst::REQUEST_TYPE::P2P_REPLY);
317  buf[0] = 0;
318  connections->send(connections->get_node_rank(request.sender_id));
319  }
320  }
321 }
322 
324  pthread_setname_np(pthread_self(), "rpc_thread");
325 
326  uint64_t max_payload_size = getConfUInt64(CONF_SUBGROUP_DEFAULT_MAX_PAYLOAD_SIZE);
327  // set the thread local rpc_handler context
328  _in_rpc_handler = true;
329 
330  while(!thread_start) {
331  std::unique_lock<std::mutex> lock(thread_start_mutex);
332  thread_start_cv.wait(lock, [this]() { return thread_start; });
333  }
334  dbg_default_debug("P2P listening thread started");
335  // start the fifo worker thread
336  fifo_worker_thread = std::thread(&RPCManager::fifo_worker, this);
337 
338  struct timespec last_time, cur_time;
339  clock_gettime(CLOCK_REALTIME, &last_time);
340 
341  // loop event
342  while(!thread_shutdown) {
343  std::unique_lock<std::mutex> connections_lock(p2p_connections_mutex);
344  auto optional_reply_pair = connections->probe_all();
345  if(optional_reply_pair) {
346  auto reply_pair = optional_reply_pair.value();
347  p2p_message_handler(reply_pair.first, (char*)reply_pair.second, max_payload_size);
348  connections->update_incoming_seq_num();
349 
350  // update last time
351  clock_gettime(CLOCK_REALTIME, &last_time);
352  } else {
353  clock_gettime(CLOCK_REALTIME, &cur_time);
354  // check if the system has been inactive for enough time to induce sleep
355  double time_elapsed_in_ms = (cur_time.tv_sec - last_time.tv_sec) * 1e3
356  + (cur_time.tv_nsec - last_time.tv_nsec) / 1e6;
357  if(time_elapsed_in_ms > 1) {
358  connections_lock.unlock();
359  using namespace std::chrono_literals;
360  std::this_thread::sleep_for(1ms);
361  connections_lock.lock();
362  }
363  }
364  }
365  // stop fifo worker.
366  fifo_queue_cv.notify_one();
367  fifo_worker_thread.join();
368 }
369 
371  return _in_rpc_handler;
372 }
373 } // namespace rpc
374 } // namespace derecho
uint32_t subgroup_id_t
Type alias for the internal Subgroup IDs generated by ViewManager.
std::exception_ptr parse_and_receive(char *buf, std::size_t size, const std::function< char *(int)> &out_alloc)
Entry point for receiving a single RPC message for a function managed by this RPCManager.
Definition: rpc_manager.cpp:95
virtual void fulfill_map(const node_list_t &)=0
std::vector< std::vector< SubView > > subgroup_shard_views
Maps subgroup ID -> shard number -> SubView for that subgroup/shard.
Definition: view.hpp:143
const std::vector< node_id_t > members
Node IDs of members in the current view, indexed by their SST rank.
Definition: view.hpp:99
#define CONF_SUBGROUP_DEFAULT_MAX_PAYLOAD_SIZE
Definition: conf.hpp:38
std::condition_variable thread_start_cv
Notified when the P2P listening thread should start.
Definition: rpc_manager.hpp:82
subgroup_id_t subgroup_id
Definition: rpc_utils.hpp:64
An RPC function call can be uniquely identified by the tuple (class, subgroup ID, function ID...
Definition: rpc_utils.hpp:62
void rpc_message_handler(subgroup_id_t subgroup_id, node_id_t sender_id, char *msg_buf, uint32_t buffer_size)
Handler to be called by MulticastGroup when it receives a message that appears to be a "cooked send" ...
FunctionTag function_id
Definition: rpc_utils.hpp:65
uint64_t view_max_rpc_reply_payload_size
thread_local bool _in_rpc_handler
Definition: rpc_manager.cpp:16
std::map< subgroup_id_t, std::queue< PendingBase_ref > > pending_results_to_fulfill
Definition: rpc_manager.hpp:75
bool in_rpc_handler()
defined in rpc_manager.h
subgroup_type_id_t class_id
Definition: rpc_utils.hpp:63
std::map< subgroup_id_t, std::list< PendingBase_ref > > fulfilled_pending_results
Definition: rpc_manager.hpp:76
std::unique_ptr< sst::P2PConnections > connections
Contains an RDMA connection to each member of the group.
Definition: rpc_manager.hpp:63
#define CONF_DERECHO_MAX_P2P_REPLY_PAYLOAD_SIZE
Definition: conf.hpp:35
const uint32_t getConfUInt32(const std::string &key)
Definition: conf.cpp:118
const node_id_t nid
The ID of the node this RPCManager is running on.
Definition: rpc_manager.hpp:43
void start_listening()
Starts the thread that listens for incoming P2P RPC requests over the RDMA P2P connections.
Definition: rpc_manager.cpp:61
void fifo_worker()
Handle Non-cascading P2P Send and P2P Queries in fifo.
#define dbg_default_debug(...)
Definition: logger.hpp:42
#define RPC_HEADER_FLAG_TST(f, name)
Definition: rpc_utils.hpp:540
#define dbg_default_error(...)
Definition: logger.hpp:48
volatile char * get_sendbuffer_ptr(uint32_t dest_id, sst::REQUEST_TYPE type)
Retrieves a buffer for sending P2P messages from the RPCManager&#39;s pool of P2P RDMA connections...
std::mutex pending_results_mutex
This mutex guards both toFulfillQueue and fulfilledList.
Definition: rpc_manager.hpp:71
void p2p_message_handler(node_id_t sender_id, char *msg_buf, uint32_t buffer_size)
Handler to be called by rpc_process_loop each time it receives a peer-to-peer message over an RDMA P2...
uint32_t view_max_rpc_window_size
void p2p_receive_loop()
Listens for P2P RPC calls over the RDMA P2P connections and handles them.
void populate_header(char *reply_buf, const std::size_t &payload_size, const Opcode &op, const node_id_t &from, const uint32_t &flags)
Definition: rpc_utils.hpp:561
void finish_p2p_send(node_id_t dest_node, subgroup_id_t dest_subgroup_id, PendingBase &pending_results_handle)
Sends the next P2P message buffer over an RDMA connection to the specified node, and registers the "p...
std::thread fifo_worker_thread
p2p send and queries are queued in fifo worker
Definition: rpc_manager.hpp:86
std::exception_ptr possible_exception
Definition: rpc_utils.hpp:126
#define CONF_DERECHO_P2P_WINDOW_SIZE
Definition: conf.hpp:36
std::exception_ptr receive_message(const Opcode &indx, const node_id_t &received_from, char const *const buf, std::size_t payload_size, const std::function< char *(int)> &out_alloc)
Processes an RPC message for any of the functions managed by this RPCManager, using the opcode to for...
Definition: rpc_manager.cpp:67
ViewManager & view_manager
Definition: rpc_manager.hpp:60
void new_view_callback(const View &new_view)
Callback for new-view events that updates internal state in response to joins or leaves.
#define CONF_DERECHO_MAX_P2P_REQUEST_PAYLOAD_SIZE
Definition: conf.hpp:34
void retrieve_header(mutils::RemoteDeserialization_v *rdv, const char *reply_buf, std::size_t &payload_size, Opcode &op, node_id_t &from, uint32_t &flags)
Definition: rpc_utils.hpp:577
std::shared_timed_mutex view_mutex
Controls access to curr_view.
uint32_t node_id_t
Type alias for Node IDs in a Derecho group.
Return type of all the RemoteInvocable::receive_* methods.
Definition: rpc_utils.hpp:122
bool finish_rpc_send(subgroup_id_t subgroup_id, PendingBase &pending_results_handle)
Sends the next message in the MulticastGroup&#39;s send buffer (which is assumed to be an RPC message pre...
std::mutex thread_start_mutex
Mutex for thread_start_cv.
Definition: rpc_manager.hpp:80
std::mutex p2p_connections_mutex
This provides mutual exclusion between the P2P listening thread and the view-change thread...
Definition: rpc_manager.hpp:69
Indicates that an RPC call to a node failed because the node was removed from the Replicated Object&#39;s...
Definition: rpc_utils.hpp:96
void destroy_remote_invocable_class(uint32_t instance_id)
Definition: rpc_manager.cpp:36
Abstract base type for PendingResults.
Definition: rpc_utils.hpp:334
#define assert_always(x...)
Definition: schedule.cpp:10
mutils::RemoteDeserialization_v rdv
An emtpy DeserializationManager, in case we need it later.
Definition: rpc_manager.hpp:54
std::unique_ptr< View > curr_view
The current View, containing the state of the managed group.
Base exception class for all exceptions raised by Derecho.
std::condition_variable fifo_queue_cv
const uint64_t getConfUInt64(const std::string &key)
Definition: conf.cpp:134
std::unique_ptr< std::map< Opcode, receive_fun_t > > receivers
A map from FunctionIDs to RPC functions, either the "server" stubs that receive remote calls to invok...
Definition: rpc_manager.hpp:50
std::queue< fifo_req > fifo_queue
std::atomic< bool > thread_shutdown
Definition: rpc_manager.hpp:83
std::condition_variable pending_results_cv
This condition variable is to resolve a race condition in using ToFulfillQueue and fulfilledList...
Definition: rpc_manager.hpp:73