Derecho  0.9
Distributed systems toolkit for RDMA
rpc_manager.hpp
Go to the documentation of this file.
1 
7 #pragma once
8 
9 #include <exception>
10 #include <functional>
11 #include <map>
12 #include <memory>
13 #include <mutex>
14 #include <vector>
15 
16 #include "../derecho_type_definitions.hpp"
17 #include "../view.hpp"
18 #include "derecho_internal.hpp"
19 #include "p2p_connections.hpp"
20 #include "remote_invocable.hpp"
21 #include "rpc_utils.hpp"
22 #include "view_manager.hpp"
24 #include <derecho/utils/logger.hpp>
25 
26 namespace derecho {
27 
28 template <typename T>
29 class Replicated;
30 template <typename T>
32 
37 
38 namespace rpc {
39 
40 using PendingBase_ref = std::reference_wrapper<PendingBase>;
41 
42 class RPCManager {
43  static_assert(std::is_trivially_copyable<Opcode>::value, "Oh no! Opcode is not trivially copyable!");
45  const node_id_t nid;
50  std::unique_ptr<std::map<Opcode, receive_fun_t>> receivers;
52  // mutils::DeserializationManager dsm{{}};
53  // Weijia: I prefer the deserialization context vector.
55 
56  template <typename T>
57  friend class ::derecho::Replicated; //Give only Replicated access to view_manager
58  template <typename T>
59  friend class ::derecho::ExternalCaller;
61 
63  std::unique_ptr<sst::P2PConnections> connections;
64 
73  std::condition_variable pending_results_cv;
74  //Both maps contain one list of PendingResults references per subgroup
75  std::map<subgroup_id_t, std::queue<PendingBase_ref>> pending_results_to_fulfill;
76  std::map<subgroup_id_t, std::list<PendingBase_ref>> fulfilled_pending_results;
77 
78  bool thread_start = false;
80  std::mutex thread_start_mutex;
82  std::condition_variable thread_start_cv;
83  std::atomic<bool> thread_shutdown{false};
84  std::thread rpc_thread;
86  std::thread fifo_worker_thread;
87  struct fifo_req {
89  char* msg_buf;
90  uint32_t buffer_size;
91  fifo_req() : sender_id(0),
92  msg_buf(nullptr),
93  buffer_size(0) {}
94  fifo_req(node_id_t _sender_id,
95  char* _msg_buf,
96  uint32_t _buffer_size) : sender_id(_sender_id),
97  msg_buf(_msg_buf),
98  buffer_size(_buffer_size) {}
99  };
100  std::queue<fifo_req> fifo_queue;
101  std::mutex fifo_queue_mutex;
102  std::condition_variable fifo_queue_cv;
103 
105  void p2p_receive_loop();
106 
108  void fifo_worker();
109 
117  void p2p_message_handler(node_id_t sender_id, char* msg_buf, uint32_t buffer_size);
118 
132  std::exception_ptr receive_message(const Opcode& indx, const node_id_t& received_from,
133  char const* const buf, std::size_t payload_size,
134  const std::function<char*(int)>& out_alloc);
135 
147  std::exception_ptr parse_and_receive(char* buf, std::size_t size,
148  const std::function<char*(int)>& out_alloc);
149 
150 public:
151  RPCManager(ViewManager& group_view_manager,
152  IDeserializationContext* deserialization_context_ptr)
153  // mutils::RemoteDeserializationContext_p deserialization_context_ptr = nullptr)
155  receivers(new std::decay_t<decltype(*receivers)>()),
156  view_manager(group_view_manager) {
157  if(deserialization_context_ptr != nullptr) {
158  rdv.push_back(deserialization_context_ptr);
159  }
160  rpc_thread = std::thread(&RPCManager::p2p_receive_loop, this);
161  }
162 
163  ~RPCManager();
164 
165  void create_connections();
166 
171  void start_listening();
192  template <typename UserProvidedClass, typename FunctionTuple>
193  auto make_remote_invocable_class(std::unique_ptr<UserProvidedClass>* cls, uint32_t type_id, uint32_t instance_id, FunctionTuple funs) {
194  //FunctionTuple is a std::tuple of partial_wrapped<Tag, Ret, UserProvidedClass, Args>,
195  //which is the result of the user calling tag<Tag>(&UserProvidedClass::method) on each RPC method
196  //Use callFunc to unpack the tuple into a variadic parameter pack for build_remoteinvocableclass
197  return mutils::callFunc([&](const auto&... unpacked_functions) {
198  return build_remote_invocable_class<UserProvidedClass>(nid, type_id, instance_id, *receivers,
199  bind_to_instance(cls, unpacked_functions)...);
200  },
201  funs);
202  }
203 
204  void destroy_remote_invocable_class(uint32_t instance_id);
205 
224  template <typename UserProvidedClass, typename FunctionTuple>
225  auto make_remote_invoker(uint32_t type_id, uint32_t instance_id, FunctionTuple funs) {
226  return mutils::callFunc([&](const auto&... unpacked_functions) {
227  //Supply the template parameters for build_remote_invoker_for_class by
228  //asking bind_to_instance for the type of the wrapped<> that corresponds to each partial_wrapped<>
229  return build_remote_invoker_for_class<UserProvidedClass,
230  decltype(bind_to_instance(std::declval<std::unique_ptr<UserProvidedClass>*>(),
231  unpacked_functions))...>(nid, type_id,
232  instance_id, *receivers);
233  },
234  funs);
235  }
236 
244  void new_view_callback(const View& new_view);
245 
257  void rpc_message_handler(subgroup_id_t subgroup_id, node_id_t sender_id,
258  char* msg_buf, uint32_t buffer_size);
259 
269  bool finish_rpc_send(subgroup_id_t subgroup_id, PendingBase& pending_results_handle);
270 
278  volatile char* get_sendbuffer_ptr(uint32_t dest_id, sst::REQUEST_TYPE type);
279 
288  void finish_p2p_send(node_id_t dest_node, subgroup_id_t dest_subgroup_id, PendingBase& pending_results_handle);
289 };
290 
291 //Now that RPCManager is finished being declared, we can declare these convenience types
292 //(the declarations should really live in remote_invocable.h, but they depend on RPCManager existing)
293 template <typename T>
294 using RemoteInvocableOf = std::decay_t<decltype(*std::declval<RPCManager>()
295  .make_remote_invocable_class(std::declval<std::unique_ptr<T>*>(),
296  std::declval<uint32_t>(),
297  std::declval<uint32_t>(),
298  T::register_functions()))>;
299 
300 template <typename T>
301 using RemoteInvokerFor = std::decay_t<decltype(*std::declval<RPCManager>()
302  .make_remote_invoker<T>(std::declval<uint32_t>(),
303  std::declval<uint32_t>(),
304  T::register_functions()))>;
305 
306 // test if the current thread is in an RPC handler to tell if we are sending a cascading RPC message.
307 bool in_rpc_handler();
308 
309 } // namespace rpc
310 } // namespace derecho
uint32_t subgroup_id_t
Type alias for the internal Subgroup IDs generated by ViewManager.
The Deserialization Interface to be implemented by user applications.
Definition: rpc_manager.hpp:36
wrapped< Tag, std::function< Ret(Args...)> > bind_to_instance(std::unique_ptr< NewClass > *_this, const partial_wrapped< Tag, Ret, NewClass, Args... > &partial)
Converts a partial_wrapped<> containing a pointer-to-member-function to a wrapped<> containing the sa...
std::condition_variable thread_start_cv
Notified when the P2P listening thread should start.
Definition: rpc_manager.hpp:82
An RPC function call can be uniquely identified by the tuple (class, subgroup ID, function ID...
Definition: rpc_utils.hpp:62
std::map< subgroup_id_t, std::queue< PendingBase_ref > > pending_results_to_fulfill
Definition: rpc_manager.hpp:75
STL namespace.
bool in_rpc_handler()
defined in rpc_manager.h
std::map< subgroup_id_t, std::list< PendingBase_ref > > fulfilled_pending_results
Definition: rpc_manager.hpp:76
std::unique_ptr< sst::P2PConnections > connections
Contains an RDMA connection to each member of the group.
Definition: rpc_manager.hpp:63
const uint32_t getConfUInt32(const std::string &key)
Definition: conf.cpp:118
const node_id_t nid
The ID of the node this RPCManager is running on.
Definition: rpc_manager.hpp:43
std::decay_t< decltype(*std::declval< RPCManager >() .make_remote_invocable_class(std::declval< std::unique_ptr< T > * >(), std::declval< uint32_t >(), std::declval< uint32_t >(), T::register_functions()))> RemoteInvocableOf
auto build_remote_invoker_for_class(const node_id_t nid, const uint32_t type_id, const uint32_t instance_id, std::map< Opcode, receive_fun_t > &rvrs)
Constructs a RemoteInvokerForClass that can act as a client for the class in the template parameter (...
std::mutex pending_results_mutex
This mutex guards both toFulfillQueue and fulfilledList.
Definition: rpc_manager.hpp:71
std::thread fifo_worker_thread
p2p send and queries are queued in fifo worker
Definition: rpc_manager.hpp:86
fifo_req(node_id_t _sender_id, char *_msg_buf, uint32_t _buffer_size)
Definition: rpc_manager.hpp:94
auto make_remote_invoker(uint32_t type_id, uint32_t instance_id, FunctionTuple funs)
Given a subgroup ID and a list of functions, constructs a RemoteInvokerForClass for the type of objec...
ViewManager & view_manager
Definition: rpc_manager.hpp:60
std::reference_wrapper< PendingBase > PendingBase_ref
Definition: rpc_manager.hpp:40
uint32_t node_id_t
Type alias for Node IDs in a Derecho group.
std::mutex thread_start_mutex
Mutex for thread_start_cv.
Definition: rpc_manager.hpp:80
std::vector< RemoteDeserializationContext_p > RemoteDeserialization_v
std::mutex p2p_connections_mutex
This provides mutual exclusion between the P2P listening thread and the view-change thread...
Definition: rpc_manager.hpp:69
std::decay_t< decltype(*std::declval< RPCManager >() .make_remote_invoker< T >(std::declval< uint32_t >(), std::declval< uint32_t >(), T::register_functions()))> RemoteInvokerFor
auto make_remote_invocable_class(std::unique_ptr< UserProvidedClass > *cls, uint32_t type_id, uint32_t instance_id, FunctionTuple funs)
Given a pointer to an object and a list of its methods, constructs a RemoteInvocableClass for that ob...
Abstract base type for PendingResults.
Definition: rpc_utils.hpp:334
RPCManager(ViewManager &group_view_manager, IDeserializationContext *deserialization_context_ptr)
mutils::RemoteDeserialization_v rdv
An emtpy DeserializationManager, in case we need it later.
Definition: rpc_manager.hpp:54
#define CONF_DERECHO_LOCAL_ID
Definition: conf.hpp:24
If a class which implements ByteRepresentable requires a context in order to correctly deserialize...
std::condition_variable fifo_queue_cv
std::unique_ptr< std::map< Opcode, receive_fun_t > > receivers
A map from FunctionIDs to RPC functions, either the "server" stubs that receive remote calls to invok...
Definition: rpc_manager.hpp:50
std::queue< fifo_req > fifo_queue
std::condition_variable pending_results_cv
This condition variable is to resolve a race condition in using ToFulfillQueue and fulfilledList...
Definition: rpc_manager.hpp:73