Derecho  0.9
Distributed systems toolkit for RDMA
replicated_impl.hpp
Go to the documentation of this file.
1 #pragma once
2 
3 #include <functional>
4 #include <mutex>
5 #include <utility>
6 
7 #include "../replicated.hpp"
8 
10 
11 namespace derecho {
12 
13 template <typename T>
15  uint32_t subgroup_index, uint32_t shard_num,
16  rpc::RPCManager& group_rpc_manager, Factory<T> client_object_factory,
17  _Group* group)
18  : persistent_registry_ptr(std::make_unique<persistent::PersistentRegistry>(
19  this, std::type_index(typeid(T)), subgroup_index, shard_num)),
20  user_object_ptr(std::make_unique<std::unique_ptr<T>>(
21  client_object_factory(persistent_registry_ptr.get()))),
22  node_id(nid),
23  subgroup_id(subgroup_id),
24  subgroup_index(subgroup_index),
25  shard_num(shard_num),
26  group_rpc_manager(group_rpc_manager),
27  wrapped_this(group_rpc_manager.make_remote_invocable_class(user_object_ptr.get(),
28  type_id, subgroup_id,
29  T::register_functions())),
30  group(group) {
31  if constexpr(std::is_base_of_v<GroupReference, T>) {
32  (**user_object_ptr).set_group_pointers(group, subgroup_index);
33  }
34 }
35 
36 template <typename T>
38  uint32_t subgroup_index, uint32_t shard_num,
40  : persistent_registry_ptr(std::make_unique<persistent::PersistentRegistry>(
41  this, std::type_index(typeid(T)), subgroup_index, shard_num)),
42  user_object_ptr(std::make_unique<std::unique_ptr<T>>(nullptr)),
43  node_id(nid),
44  subgroup_id(subgroup_id),
45  subgroup_index(subgroup_index),
46  shard_num(shard_num),
47  group_rpc_manager(group_rpc_manager),
48  wrapped_this(group_rpc_manager.make_remote_invocable_class(user_object_ptr.get(),
49  type_id, subgroup_id,
50  T::register_functions())),
51  group(group) {}
52 
53 template <typename T>
56  node_id(rhs.node_id),
59  shard_num(rhs.shard_num),
61  wrapped_this(std::move(rhs.wrapped_this)),
62  group(rhs.group) {
63  persistent_registry_ptr->updateTemporalFrontierProvider(this);
64 }
65 
66 template <typename T>
68  // hack to check if the object was merely moved
69  if(wrapped_this) {
71  }
72 }
73 
74 template <typename T>
75 template <rpc::FunctionTag tag, typename... Args>
76 auto Replicated<T>::p2p_send(node_id_t dest_node, Args&&... args) {
77  if(is_valid()) {
78  if(group_rpc_manager.view_manager.get_current_view().get().rank_of(dest_node) == -1) {
79  throw invalid_node_exception("Cannot send a p2p request to node "
80  + std::to_string(dest_node) + ": it is not a member of the Group.");
81  }
82  auto return_pair = wrapped_this->template send<tag>(
83  [this, &dest_node](size_t size) -> char* {
84  const std::size_t max_payload_size = group_rpc_manager.view_manager.get_max_payload_sizes().at(subgroup_id);
85  if(size <= max_payload_size) {
86  return (char*)group_rpc_manager.get_sendbuffer_ptr(dest_node,
88  } else {
89  throw derecho_exception("The size of serialized args exceeds the maximum message size.");
90  }
91  },
92  std::forward<Args>(args)...);
93  group_rpc_manager.finish_p2p_send(dest_node, subgroup_id, return_pair.pending);
94  return std::move(return_pair.results);
95  } else {
96  throw empty_reference_exception{"Attempted to use an empty Replicated<T>"};
97  }
98 }
99 
100 template <typename T>
101 template <rpc::FunctionTag tag, typename... Args>
102 auto Replicated<T>::ordered_send(Args&&... args) {
103  if(is_valid()) {
104  size_t payload_size_for_multicast_send = wrapped_this->template get_size_for_ordered_send<tag>(std::forward<Args>(args)...);
105 
106  using Ret = typename std::remove_pointer<decltype(wrapped_this->template getReturnType<tag>(
107  std::forward<Args>(args)...))>::type;
108  rpc::QueryResults<Ret>* results_ptr;
109  rpc::PendingResults<Ret>* pending_ptr;
110  auto serializer = [&](char* buffer) {
111  //By the time this lambda runs, the current thread will be holding a read lock on view_mutex
112  const std::size_t max_payload_size = group_rpc_manager.view_manager.get_max_payload_sizes().at(subgroup_id);
113  auto send_return_struct = wrapped_this->template send<tag>(
114  [&buffer, &max_payload_size](size_t size) -> char* {
115  if(size <= max_payload_size) {
116  return buffer;
117  } else {
118  throw derecho_exception("The size of serialized args exceeds the maximum message size.");
119  }
120  },
121  std::forward<Args>(args)...);
122  results_ptr = new rpc::QueryResults<Ret>(std::move(send_return_struct.results));
123  pending_ptr = &send_return_struct.pending;
124  };
125 
126  std::shared_lock<std::shared_timed_mutex> view_read_lock(group_rpc_manager.view_manager.view_mutex);
127  group_rpc_manager.view_manager.view_change_cv.wait(view_read_lock, [&]() {
129  ->multicast_group->send(subgroup_id, payload_size_for_multicast_send, serializer, true);
130  });
132  return std::move(*results_ptr);
133  } else {
134  throw empty_reference_exception{"Attempted to use an empty Replicated<T>"};
135  }
136 }
137 
138 template <typename T>
139 void Replicated<T>::send(unsigned long long int payload_size,
140  const std::function<void(char* buf)>& msg_generator) {
141  group_rpc_manager.view_manager.send(subgroup_id, payload_size, msg_generator);
142 }
143 
144 template <typename T>
145 std::size_t Replicated<T>::object_size() const {
147 }
148 
149 template <typename T>
150 void Replicated<T>::send_object(tcp::socket& receiver_socket) const {
151  auto bind_socket_write = [&receiver_socket](const char* bytes, std::size_t size) {
152  receiver_socket.write(bytes, size);
153  };
154  mutils::post_object(bind_socket_write, object_size());
155  send_object_raw(receiver_socket);
156 }
157 
158 template <typename T>
159 void Replicated<T>::send_object_raw(tcp::socket& receiver_socket) const {
160  auto bind_socket_write = [&receiver_socket](const char* bytes, std::size_t size) {
161  receiver_socket.write(bytes, size);
162  };
163  mutils::post_object(bind_socket_write, **user_object_ptr);
164 }
165 
166 template <typename T>
167 std::size_t Replicated<T>::receive_object(char* buffer) {
168  // *user_object_ptr = std::move(mutils::from_bytes<T>(&group_rpc_manager.dsm, buffer));
170  rdv.insert(rdv.begin(), persistent_registry_ptr.get());
172  *user_object_ptr = std::move(mutils::from_bytes<T>(&dsm, buffer));
173  if constexpr(std::is_base_of_v<GroupReference, T>) {
174  (**user_object_ptr).set_group_pointers(group, subgroup_index);
175  }
177 }
178 
179 template <typename T>
180 void Replicated<T>::persist(const persistent::version_t version) noexcept(false) {
181  persistent::version_t persisted_ver;
182 
183  // persist variables
184  do {
185  persisted_ver = persistent_registry_ptr->persist();
186  if(persisted_ver == -1) {
187  // for replicated<T> without Persistent fields,
188  // tell the persistent thread that we are done.
189  persisted_ver = version;
190  }
191  } while(persisted_ver < version);
192 };
193 
194 template <typename T>
196  return persistent_registry_ptr->getMinimumLatestPersistedVersion();
197 }
198 
199 template <typename T>
202 }
203 
204 template <typename T>
207  : node_id(nid),
208  subgroup_id(subgroup_id),
209  group_rpc_manager(group_rpc_manager),
210  wrapped_this(group_rpc_manager.make_remote_invoker<T>(type_id, subgroup_id,
211  T::register_functions())) {}
212 
213 //This is literally copied and pasted from Replicated<T>. I wish I could let them share code with inheritance,
214 //but I'm afraid that will introduce unnecessary overheads.
215 template <typename T>
216 template <rpc::FunctionTag tag, typename... Args>
217 auto ExternalCaller<T>::p2p_send(node_id_t dest_node, Args&&... args) {
218  if(is_valid()) {
219  assert(dest_node != node_id);
220  if(group_rpc_manager.view_manager.get_current_view().get().rank_of(dest_node) == -1) {
221  throw invalid_node_exception("Cannot send a p2p request to node "
222  + std::to_string(dest_node) + ": it is not a member of the Group.");
223  }
224  auto return_pair = wrapped_this->template send<tag>(
225  [this, &dest_node](size_t size) -> char* {
226  const std::size_t max_payload_size = group_rpc_manager.view_manager.get_max_payload_sizes().at(subgroup_id);
227  if(size <= max_payload_size) {
228  return (char*)group_rpc_manager.get_sendbuffer_ptr(dest_node,
230  } else {
231  throw derecho_exception("The size of serialized args exceeds the maximum message size.");
232  }
233  },
234  std::forward<Args>(args)...);
235  group_rpc_manager.finish_p2p_send(dest_node, subgroup_id, return_pair.pending);
236  return std::move(return_pair.results);
237  } else {
238  throw empty_reference_exception{"Attempted to use an empty Replicated<T>"};
239  }
240 }
241 
242 template <typename T>
243 template <rpc::FunctionTag tag, typename... Args>
244 auto ShardIterator<T>::p2p_send(Args&&... args) {
245  // shard_reps should have at least one member
246  auto send_result = EC.template p2p_send<tag>(shard_reps.at(0), std::forward<Args>(args)...);
247  std::vector<decltype(send_result)> send_result_vec;
248  send_result_vec.emplace_back(std::move(send_result));
249  for(uint i = 1; i < shard_reps.size(); ++i) {
250  send_result_vec.emplace_back(EC.template p2p_send<tag>(shard_reps[i], std::forward<Args>(args)...));
251  }
252  return send_result_vec;
253 }
254 
255 } // namespace derecho
uint32_t subgroup_id_t
Type alias for the internal Subgroup IDs generated by ViewManager.
const uint32_t shard_num
The index, within the subgroup, of the shard that replicates this object.
Definition: replicated.hpp:91
partial_wrapped< Tag, Ret, NewClass, Args... > tag(Ret(NewClass::*fun)(Args...))
User-facing entry point for the series of functions that binds a FunctionTag to a class&#39;s member func...
unsigned long long FunctionTag
Definition: rpc_utils.hpp:54
This file include all common types internal to derecho and not necessarily being known by a client pr...
std::size_t object_size() const
uint32_t subgroup_type_id_t
Type of the numeric ID used to refer to subgroup types within a Group; this is currently computed as ...
const subgroup_id_t subgroup_id
The internally-generated subgroup ID of the subgroup that replicates this object. ...
Definition: replicated.hpp:84
Data structure that holds a set of promises for a single RPC function call; the promises transmit one...
Definition: rpc_utils.hpp:353
auto p2p_send(node_id_t dest_node, Args &&... args)
Sends a peer-to-peer message to a single member of the subgroup that this ExternalCaller<T> connects ...
const node_id_t node_id
The ID of this node.
Definition: replicated.hpp:322
STL namespace.
std::unique_ptr< rpc::RemoteInvokerFor< T > > wrapped_this
The actual implementation of ExternalCaller, which has lots of ugly template parameters.
Definition: replicated.hpp:328
std::map< subgroup_id_t, uint64_t > get_max_payload_sizes()
std::size_t receive_object(char *buffer)
Updates the state of the "wrapped" object by replacing it with the object serialized in a buffer...
subgroup_id_t subgroup_id
The internally-generated subgroup ID of the subgroup that this ExternalCaller will contact...
Definition: replicated.hpp:324
const persistent::version_t get_minimum_latest_persisted_version() noexcept(false)
Returns the minimum among the "latest version" numbers of all Persistent fields of this object...
auto bytes_size(const T &)
Just calls sizeof(T)
void send(subgroup_id_t subgroup_num, long long unsigned int payload_size, const std::function< void(char *buf)> &msg_generator, bool cooked_send=false)
Instructs the managed DerechoGroup&#39;s to send the next message.
volatile char * get_sendbuffer_ptr(uint32_t dest_id, sst::REQUEST_TYPE type)
Retrieves a buffer for sending P2P messages from the RPCManager&#39;s pool of P2P RDMA connections...
std::unique_ptr< persistent::PersistentRegistry > persistent_registry_ptr
persistent registry for persistent<t>
Definition: replicated.hpp:68
const uint32_t subgroup_index
Definition: replicated.hpp:85
The manager for any RemoteDeserializationContexts.
rpc::RPCManager & group_rpc_manager
Reference to the RPCManager for the Group this Replicated is in.
Definition: replicated.hpp:93
Exception that means the user requested an operation targeting a specific node and that node was not ...
Replicated(subgroup_type_id_t type_id, node_id_t nid, subgroup_id_t subgroup_id, uint32_t subgroup_index, uint32_t shard_num, rpc::RPCManager &group_rpc_manager, Factory< T > client_object_factory, _Group *group)
Constructs a Replicated<T> that enables sending and receiving RPC function calls for an object of typ...
void finish_p2p_send(node_id_t dest_node, subgroup_id_t dest_subgroup_id, PendingBase &pending_results_handle)
Sends the next P2P message buffer over an RDMA connection to the specified node, and registers the "p...
std::function< std::unique_ptr< T >(persistent::PersistentRegistry *)> Factory
The type of factory function the user must provide to the Group constructor, to construct each Replic...
void send_object_raw(tcp::socket &receiver_socket) const
Serializes and sends the state of the "wrapped" object (of type T) for this Replicated<T> over the gi...
bool is_valid() const
Definition: replicated.hpp:162
const uint64_t compute_global_stability_frontier(subgroup_id_t subgroup_num)
SharedLockedReference< View > get_current_view()
virtual void persist(const persistent::version_t version) noexcept(false)
persist the data to the latest version
auto p2p_send(Args &&... args)
ViewManager & view_manager
Definition: rpc_manager.hpp:60
std::shared_timed_mutex view_mutex
Controls access to curr_view.
uint32_t node_id_t
Type alias for Node IDs in a Derecho group.
bool finish_rpc_send(subgroup_id_t subgroup_id, PendingBase &pending_results_handle)
Sends the next message in the MulticastGroup&#39;s send buffer (which is assumed to be an RPC message pre...
ExternalCaller(uint32_t type_id, node_id_t nid, subgroup_id_t subgroup_id, rpc::RPCManager &group_rpc_manager)
std::unique_ptr< rpc::RemoteInvocableOf< T > > wrapped_this
The actual implementation of Replicated<T>, hiding its ugly template parameters.
Definition: replicated.hpp:95
std::condition_variable_any view_change_cv
Notified when curr_view changes (i.e.
std::vector< RemoteDeserializationContext_p > RemoteDeserialization_v
void destroy_remote_invocable_class(uint32_t instance_id)
Definition: rpc_manager.cpp:36
std::unique_ptr< std::unique_ptr< T > > user_object_ptr
The user-provided state object with some RPC methods.
Definition: replicated.hpp:77
mutils::RemoteDeserialization_v rdv
An emtpy DeserializationManager, in case we need it later.
Definition: rpc_manager.hpp:54
std::enable_if_t< std::is_pod< BR >::value > post_object(const F &f, const BR &br, Args &&... args)
In-place serialization is also sometimes possible.
bool write(const char *buffer, size_t size)
Writes size bytes from the given buffer to the socket.
Definition: tcp.cpp:171
Exception that means a reference-like type is "empty" (does not contain a valid object).
Data structure that (indirectly) holds a set of futures for a single RPC function call; there is one ...
Definition: rpc_utils.hpp:158
std::unique_ptr< View > curr_view
The current View, containing the state of the managed group.
void send_object(tcp::socket &receiver_socket) const
Serializes and sends the state of the "wrapped" object (of type T) for this Replicated<T> over the gi...
auto ordered_send(Args &&... args)
Sends a multicast to the entire subgroup that replicates this Replicated<T>, invoking the RPC functio...
auto p2p_send(node_id_t dest_node, Args &&... args)
Sends a peer-to-peer message to a single member of the subgroup that replicates this Replicated<T>...
void send(unsigned long long int payload_size, const std::function< void(char *buf)> &msg_generator)
Submits a call to send a "raw" (byte array) message in a multicast to this object&#39;s subgroup; the mes...
Base exception class for all exceptions raised by Derecho.
const node_id_t node_id
The ID of this node.
Definition: replicated.hpp:82
const uint64_t compute_global_stability_frontier()
rpc::RPCManager & group_rpc_manager
Reference to the RPCManager for the Group this ExternalCaller is in.
Definition: replicated.hpp:326