Derecho  0.9
Distributed systems toolkit for RDMA
rpc_utils.hpp
Go to the documentation of this file.
1 
7 #pragma once
8 
9 #include <cstddef>
10 #include <exception>
11 #include <functional>
12 #include <future>
13 #include <map>
14 #include <memory>
15 #include <set>
16 #include <sstream>
17 #include <string>
18 #include <tuple>
19 #include <type_traits>
20 #include <typeindex>
21 #include <utility>
22 #include <vector>
23 
24 #include "../derecho_exception.hpp"
25 #include "../derecho_type_definitions.hpp"
26 #include "derecho_internal.hpp"
28 #include <derecho/utils/logger.hpp>
29 #include <mutils/macro_utils.hpp>
30 
31 namespace derecho {
32 
33 namespace rpc {
34 
41 template <char... str>
42 struct String {
43  static constexpr uint64_t hash() {
44  char string[] = {str...};
45  uint64_t hash_code = 0;
46  for(const int c : string) {
47  if(c == 0) break; //NUL character terminates the string
48  hash_code = hash_code * 31 + c;
49  }
50  return hash_code;
51  }
52 };
53 
54 using FunctionTag = unsigned long long;
55 
62 struct Opcode {
66  bool is_reply;
67 };
68 inline bool operator<(const Opcode& lhs, const Opcode& rhs) {
69  return std::tie(lhs.class_id, lhs.subgroup_id, lhs.function_id, lhs.is_reply)
70  < std::tie(rhs.class_id, rhs.subgroup_id, rhs.function_id, rhs.is_reply);
71 }
72 inline bool operator==(const Opcode& lhs, const Opcode& rhs) {
73  return lhs.class_id == rhs.class_id && lhs.subgroup_id == rhs.subgroup_id
74  && lhs.function_id == rhs.function_id && lhs.is_reply == rhs.is_reply;
75 }
76 
77 using node_list_t = std::vector<node_id_t>;
78 
86  : derecho_exception(std::string("An exception occurred at node with ID ")
87  + std::to_string(who)),
88  who(who) {}
89 };
90 
99  : derecho_exception(std::string("Node with ID ")
100  + std::to_string(who)
101  + std::string(" has been removed from the group.")),
102  who(who) {}
103 };
104 
112  : derecho_exception("This node was removed from its subgroup or shard "
113  "and can no longer send the RPC message.") {}
114 };
115 
122 struct recv_ret {
124  std::size_t size;
125  char* payload;
126  std::exception_ptr possible_exception;
127 };
128 
134 using receive_fun_t = std::function<recv_ret(
135  // mutils::DeserializationManager* dsm,
136  mutils::RemoteDeserialization_v* rdv, const node_id_t&, const char* recv_buf,
137  const std::function<char*(int)>& out_alloc)>;
138 
143 template <typename T>
144 using reply_map = std::map<node_id_t, std::future<T>>;
145 
157 template <typename Ret>
159 public:
160  using map_fut = std::future<std::unique_ptr<reply_map<Ret>>>;
162  using type = Ret;
163 
164  class ReplyMap {
165  private:
167 
168  public:
170 
171  ReplyMap(QueryResults& qr) : parent(qr){};
172  ReplyMap(const ReplyMap&) = delete;
173  ReplyMap(ReplyMap&& rm) : parent(rm.parent), rmap(std::move(rm.rmap)) {}
174 
175  bool valid(const node_id_t& nid) {
176  assert(rmap.size() == 0 || rmap.count(nid) != 0);
177  return (rmap.size() > 0) && rmap.at(nid).valid();
178  }
179 
180  /*
181  returns true if we sent to this node,
182  regardless of whether this node has replied.
183  */
184  bool contains(const node_id_t& nid) { return rmap.count(nid); }
185 
186  auto begin() { return std::begin(rmap); }
187 
188  auto end() { return std::end(rmap); }
189 
190  Ret get(const node_id_t& nid) {
191  if(rmap.size() == 0) {
192  assert(parent.pending_rmap.valid());
193  rmap = std::move(*parent.pending_rmap.get());
194  }
195  assert(rmap.size() > 0);
196  assert(rmap.count(nid));
197  assert(rmap.at(nid).valid());
198  return rmap.at(nid).get();
199  }
200  };
201 
203 
204 private:
205  ReplyMap replies{*this};
206 
207 public:
208  QueryResults(map_fut pm) : pending_rmap(std::move(pm)) {}
210  : pending_rmap{std::move(o.pending_rmap)},
211  replies{std::move(o.replies)} {}
212  QueryResults(const QueryResults&) = delete;
213 
218  template <typename Time>
219  ReplyMap* wait(Time t) {
220  if(replies.rmap.size() == 0) {
221  if(pending_rmap.wait_for(t) == std::future_status::ready) {
222  replies.rmap = std::move(*pending_rmap.get());
223  return &replies;
224  } else
225  return nullptr;
226  } else
227  return &replies;
228  }
229 
235  ReplyMap& get() {
236  using namespace std::chrono;
237  while(true) {
238  if(auto rmap = wait(5min)) {
239  return *rmap;
240  }
241  }
242  }
243 };
244 
252 template <>
253 class QueryResults<void> {
254 public:
255  using map_fut = std::future<std::unique_ptr<std::set<node_id_t>>>;
256  using map = std::set<node_id_t>;
257  using type = void;
258 
259  class ReplyMap {
260  private:
262 
263  public:
265 
266  ReplyMap(QueryResults& qr) : parent(qr){};
267  ReplyMap(const ReplyMap&) = delete;
268  ReplyMap(ReplyMap&& rm) : parent(rm.parent), rmap(std::move(rm.rmap)) {}
269 
270  bool valid(const node_id_t& nid) {
271  assert(rmap.size() == 0 || rmap.count(nid) != 0);
272  return (rmap.size() > 0) && rmap.count(nid) > 0;
273  }
274 
275  /*
276  returns true if we sent to this node,
277  regardless of whether this node has replied.
278  */
279  bool contains(const node_id_t& nid) { return rmap.count(nid); }
280 
281  auto begin() { return std::begin(rmap); }
282 
283  auto end() { return std::end(rmap); }
284  };
285 
287 
288 private:
289  ReplyMap replies{*this};
290 
291 public:
292  QueryResults(map_fut pm) : pending_rmap(std::move(pm)) {}
294  : pending_rmap{std::move(o.pending_rmap)},
295  replies{std::move(o.replies)} {}
296  QueryResults(const QueryResults&) = delete;
297 
302  template <typename Time>
303  ReplyMap* wait(Time t) {
304  if(replies.rmap.size() == 0) {
305  if(pending_rmap.wait_for(t) == std::future_status::ready) {
306  replies.rmap = std::move(*pending_rmap.get());
307  return &replies;
308  } else
309  return nullptr;
310  } else
311  return &replies;
312  }
313 
319  ReplyMap& get() {
320  using namespace std::chrono;
321  while(true) {
322  if(auto rmap = wait(5min)) {
323  return *rmap;
324  }
325  }
326  }
327 };
328 
334 class PendingBase {
335 public:
336  virtual void fulfill_map(const node_list_t&) = 0;
337  virtual void set_exception_for_removed_node(const node_id_t&) = 0;
338  virtual void set_exception_for_caller_removed() = 0;
339  virtual bool all_responded() const = 0;
340  virtual void reset() = 0;
341  virtual ~PendingBase() {}
342 };
343 
352 template <typename Ret>
353 class PendingResults : public PendingBase {
354 private:
358  std::promise<std::unique_ptr<reply_map<Ret>>> promise_for_pending_map;
359 
360  std::promise<std::map<node_id_t, std::promise<Ret>>> promise_for_reply_promises;
364  std::future<std::map<node_id_t, std::promise<Ret>>> reply_promises_are_ready;
366  std::map<node_id_t, std::promise<Ret>> reply_promises;
367 
368  bool map_fulfilled = false;
369  std::set<node_id_t> dest_nodes, responded_nodes;
370 
371 public:
373  : reply_promises_are_ready(promise_for_reply_promises.get_future()) {}
374  virtual ~PendingResults() {}
375 
382  return QueryResults<Ret>{promise_for_pending_map.get_future()};
383  }
384 
390  void fulfill_map(const node_list_t& who) {
391  dbg_default_trace("Got a call to fulfill_map for PendingResults<{}>", typeid(Ret).name());
392  std::unique_ptr<reply_map<Ret>> futures_map = std::make_unique<reply_map<Ret>>();
393  std::map<node_id_t, std::promise<Ret>> promises_map;
394  for(const auto& e : who) {
395  futures_map->emplace(e, promises_map[e].get_future());
396  }
397  dest_nodes.insert(who.begin(), who.end());
398  dbg_default_trace("Setting a value for reply_promises_are_ready");
399  promise_for_reply_promises.set_value(std::move(promises_map));
400  promise_for_pending_map.set_value(std::move(futures_map));
401  map_fulfilled = true;
402  }
403 
409  if(!map_fulfilled) {
410  promise_for_pending_map.set_exception(
411  std::make_exception_ptr(sender_removed_from_group_exception{}));
412  } else {
413  if(reply_promises.size() == 0) {
414  reply_promises = std::move(reply_promises_are_ready.get());
415  }
416  //Set exceptions for any nodes that have not yet responded
417  for(auto& node_and_promise : reply_promises) {
418  if(responded_nodes.find(node_and_promise.first)
419  == responded_nodes.end()) {
420  node_and_promise.second.set_exception(
421  std::make_exception_ptr(sender_removed_from_group_exception{}));
422  }
423  }
424  }
425  }
426 
427  void set_exception_for_removed_node(const node_id_t& removed_nid) {
428  assert(map_fulfilled);
429  if(dest_nodes.find(removed_nid) != dest_nodes.end()
430  && responded_nodes.find(removed_nid) == responded_nodes.end()) {
431  set_exception(removed_nid,
432  std::make_exception_ptr(
433  node_removed_from_group_exception{removed_nid}));
434  }
435  }
436 
443  void set_value(const node_id_t& nid, const Ret& v) {
444  std::lock_guard<std::mutex> lock(reply_promises_are_ready_mutex);
445  responded_nodes.insert(nid);
446  if(reply_promises.size() == 0) {
447  dbg_default_trace("PendingResults<{}>::set_value about to wait on reply_promises_are_ready", typeid(Ret).name());
449  reply_promises = std::move(reply_promises_are_ready.get());
450  }
451  reply_promises.at(nid).set_value(v);
452  }
453 
460  void set_exception(const node_id_t& nid, const std::exception_ptr e) {
461  responded_nodes.insert(nid);
462  if(reply_promises.size() == 0) {
463  reply_promises = std::move(reply_promises_are_ready.get());
464  }
465  reply_promises.at(nid).set_exception(e);
466  }
467 
472  bool all_responded() const {
473  return map_fulfilled && (responded_nodes == dest_nodes);
474  }
475 
479  void reset() {
480  promise_for_pending_map = std::promise<std::unique_ptr<reply_map<Ret>>>();
481  promise_for_reply_promises = std::promise<std::map<node_id_t, std::promise<Ret>>>();
482  reply_promises_are_ready = promise_for_reply_promises.get_future();
483  // reply_promises_are_ready_mutex
484  reply_promises.clear();
485  map_fulfilled = false;
486  dest_nodes.clear();
487  responded_nodes.clear();
488  }
489 };
490 
497 template <>
498 class PendingResults<void> : public PendingBase {
499 private:
500  std::promise<std::unique_ptr<std::set<node_id_t>>> promise_for_pending_map;
501  bool map_fulfilled = false;
502 
503 public:
505  return QueryResults<void>(promise_for_pending_map.get_future());
506  }
507 
508  void fulfill_map(const node_list_t& sent_nodes) {
509  auto nodes_sent_set = std::make_unique<std::set<node_id_t>>();
510  for(const node_id_t& node : sent_nodes) {
511  nodes_sent_set->emplace(node);
512  }
513  promise_for_pending_map.set_value(std::move(nodes_sent_set));
514  map_fulfilled = true;
515  }
516 
518 
520  if(!map_fulfilled) {
521  promise_for_pending_map.set_exception(
522  std::make_exception_ptr(sender_removed_from_group_exception()));
523  }
524  }
525 
526  bool all_responded() const {
527  return map_fulfilled;
528  }
529 
530  void reset() {
531  promise_for_pending_map = std::promise<std::unique_ptr<std::set<node_id_t>>>();
532  map_fulfilled = false;
533  }
534 };
535 
539 namespace remote_invocation_utilities {
540 #define RPC_HEADER_FLAG_TST(f, name) \
541  ((f) & (((uint32_t)1L) << (_RPC_HEADER_FLAG_##name)))
542 #define RPC_HEADER_FLAG_SET(f, name) \
543  ((f) |= (((uint32_t)1L) << (_RPC_HEADER_FLAG_##name)))
544 #define RPC_HEADER_FLAG_CLR(f, name) \
545  ((f) &= ~(((uint32_t)1L) << (_RPC_HEADER_FLAG_##name)))
546 
547 // add new rpc header flags here.
548 #define _RPC_HEADER_FLAG_CASCADE (0)
549 #define _RPC_HEADER_FLAG_RESERVED (1)
550 
551 inline std::size_t header_space() {
552  return sizeof(std::size_t) + sizeof(Opcode) + sizeof(node_id_t) + sizeof(uint32_t);
553  // size operation from flags
554 }
555 
556 inline char* extra_alloc(int i) {
557  const auto hs = header_space();
558  return (char*)calloc(i + hs, sizeof(char)) + hs;
559 }
560 
561 inline void populate_header(char* reply_buf,
562  const std::size_t& payload_size,
563  const Opcode& op, const node_id_t& from,
564  const uint32_t& flags) {
565  std::size_t offset = 0;
566  static_assert(sizeof(op) == sizeof(Opcode), "Opcode& is not the same size as Opcode!");
567  reinterpret_cast<std::size_t*>(reply_buf + offset)[0] = payload_size; // size
568  offset += sizeof(payload_size);
569  reinterpret_cast<Opcode*>(reply_buf + offset)[0] = op; // what
570  offset += sizeof(op);
571  reinterpret_cast<node_id_t*>(reply_buf + offset)[0] = from; // from
572  offset += sizeof(from);
573  reinterpret_cast<uint32_t*>(reply_buf + offset)[0] = flags; // flags
574 }
575 
576 //inline void retrieve_header(mutils::DeserializationManager* dsm,
578  const char* reply_buf,
579  std::size_t& payload_size, Opcode& op,
580  node_id_t& from, uint32_t& flags) {
581  std::size_t offset = 0;
582  payload_size = reinterpret_cast<const std::size_t*>(reply_buf + offset)[0];
583  offset += sizeof(payload_size);
584  op = reinterpret_cast<const Opcode*>(reply_buf + offset)[0];
585  offset += sizeof(op);
586  from = reinterpret_cast<const node_id_t*>(reply_buf + offset)[0];
587  offset += sizeof(from);
588  flags = reinterpret_cast<const uint32_t*>(reply_buf + offset)[0];
589 }
590 } // namespace remote_invocation_utilities
591 
592 } // namespace rpc
593 } // namespace derecho
594 
595 #define CT_STRING(...) derecho::rpc::String<MACRO_GET_STR(#__VA_ARGS__)>
uint32_t subgroup_id_t
Type alias for the internal Subgroup IDs generated by ViewManager.
static constexpr uint64_t hash()
Definition: rpc_utils.hpp:43
QueryResults(QueryResults &&o)
Definition: rpc_utils.hpp:209
unsigned long long FunctionTag
Definition: rpc_utils.hpp:54
uint32_t subgroup_type_id_t
Type of the numeric ID used to refer to subgroup types within a Group; this is currently computed as ...
void fulfill_map(const node_list_t &sent_nodes)
Definition: rpc_utils.hpp:508
subgroup_id_t subgroup_id
Definition: rpc_utils.hpp:64
Data structure that holds a set of promises for a single RPC function call; the promises transmit one...
Definition: rpc_utils.hpp:353
An RPC function call can be uniquely identified by the tuple (class, subgroup ID, function ID...
Definition: rpc_utils.hpp:62
FunctionTag function_id
Definition: rpc_utils.hpp:65
std::map< node_id_t, std::promise< Ret > > reply_promises
Definition: rpc_utils.hpp:366
std::set< node_id_t > responded_nodes
Definition: rpc_utils.hpp:369
bool valid(const node_id_t &nid)
Definition: rpc_utils.hpp:175
void set_exception_for_caller_removed()
Sets exceptions to indicate to the sender of this RPC call that it has been removed from its subgroup...
Definition: rpc_utils.hpp:408
STL namespace.
subgroup_type_id_t class_id
Definition: rpc_utils.hpp:63
void set_exception_for_removed_node(const node_id_t &removed_nid)
Definition: rpc_utils.hpp:427
void reset()
reset this object.
Definition: rpc_utils.hpp:479
This "compile-time String" puts a short sequence of characters into a type&#39;s template parameter...
Definition: rpc_utils.hpp:42
std::future< std::map< node_id_t, std::promise< Ret > > > reply_promises_are_ready
A future for a map containing one promise for each reply to the RPC function call.
Definition: rpc_utils.hpp:364
Specialization of QueryResults for void functions, which do not generate replies. ...
Definition: rpc_utils.hpp:253
std::future< std::unique_ptr< reply_map< Ret > >> map_fut
Definition: rpc_utils.hpp:160
std::future< std::unique_ptr< std::set< node_id_t > >> map_fut
Definition: rpc_utils.hpp:255
std::promise< std::map< node_id_t, std::promise< Ret > > > promise_for_reply_promises
Definition: rpc_utils.hpp:360
bool operator==(const Opcode &lhs, const Opcode &rhs)
Definition: rpc_utils.hpp:72
void set_exception_for_removed_node(const node_id_t &)
Definition: rpc_utils.hpp:517
bool contains(const node_id_t &nid)
Definition: rpc_utils.hpp:184
bool operator<(const Opcode &lhs, const Opcode &rhs)
Definition: rpc_utils.hpp:68
std::vector< node_id_t > node_list_t
Definition: rpc_utils.hpp:77
void populate_header(char *reply_buf, const std::size_t &payload_size, const Opcode &op, const node_id_t &from, const uint32_t &flags)
Definition: rpc_utils.hpp:561
std::exception_ptr possible_exception
Definition: rpc_utils.hpp:126
void fulfill_map(const node_list_t &who)
Fill pending_map and reply_promises with one promise/future pair for each node that was contacted in ...
Definition: rpc_utils.hpp:390
Indicates that an RPC call from this node was aborted because this node was removed from its subgroup...
Definition: rpc_utils.hpp:110
#define dbg_default_trace(...)
Definition: logger.hpp:40
std::map< node_id_t, std::future< T > > reply_map
The type of map contained in a QueryResults::ReplyMap.
Definition: rpc_utils.hpp:144
void retrieve_header(mutils::RemoteDeserialization_v *rdv, const char *reply_buf, std::size_t &payload_size, Opcode &op, node_id_t &from, uint32_t &flags)
Definition: rpc_utils.hpp:577
#define dbg_default_flush()
Definition: logger.hpp:52
ReplyMap * wait(Time t)
Wait the specified duration; if a ReplyMap is available after that duration, return it...
Definition: rpc_utils.hpp:303
uint32_t node_id_t
Type alias for Node IDs in a Derecho group.
Return type of all the RemoteInvocable::receive_* methods.
Definition: rpc_utils.hpp:122
std::mutex reply_promises_are_ready_mutex
Definition: rpc_utils.hpp:365
QueryResults< Ret > get_future()
Constructs and returns a QueryResults representing the "future" end of the response promises in this ...
Definition: rpc_utils.hpp:381
void set_value(const node_id_t &nid, const Ret &v)
Fulfills a promise for a single node&#39;s reply by setting the value that the node returned for the RPC ...
Definition: rpc_utils.hpp:443
std::vector< RemoteDeserializationContext_p > RemoteDeserialization_v
void set_exception(const node_id_t &nid, const std::exception_ptr e)
Fulfills a promise for a single node&#39;s reply by setting an exception that was thrown by the RPC funct...
Definition: rpc_utils.hpp:460
Indicates that an RPC call to a node failed because the node was removed from the Replicated Object&#39;s...
Definition: rpc_utils.hpp:96
reply_map< Ret > map
Definition: rpc_utils.hpp:161
Abstract base type for PendingResults.
Definition: rpc_utils.hpp:334
ReplyMap * wait(Time t)
Wait the specified duration; if a ReplyMap is available after that duration, return it...
Definition: rpc_utils.hpp:219
std::function< recv_ret(mutils::RemoteDeserialization_v *rdv, const node_id_t &, const char *recv_buf, const std::function< char *(int)> &out_alloc)> receive_fun_t
Type signature for all the RemoteInvocable::receive_* methods.
Definition: rpc_utils.hpp:137
std::promise< std::unique_ptr< reply_map< Ret > > > promise_for_pending_map
A promise for a map containing one future for each reply to the RPC function call.
Definition: rpc_utils.hpp:358
Data structure that (indirectly) holds a set of futures for a single RPC function call; there is one ...
Definition: rpc_utils.hpp:158
Base exception class for all exceptions raised by Derecho.
QueryResults< void > get_future()
Definition: rpc_utils.hpp:504
std::promise< std::unique_ptr< std::set< node_id_t > > > promise_for_pending_map
Definition: rpc_utils.hpp:500
Indicates that an RPC call failed because executing the RPC function on the remote node resulted in a...
Definition: rpc_utils.hpp:83