7 #include "../replicated.hpp" 15 uint32_t subgroup_index, uint32_t shard_num,
18 : persistent_registry_ptr(
std::make_unique<
persistent::PersistentRegistry>(
19 this,
std::type_index(typeid(T)), subgroup_index, shard_num)),
20 user_object_ptr(
std::make_unique<
std::unique_ptr<T>>(
21 client_object_factory(persistent_registry_ptr.get()))),
23 subgroup_id(subgroup_id),
24 subgroup_index(subgroup_index),
26 group_rpc_manager(group_rpc_manager),
27 wrapped_this(group_rpc_manager.make_remote_invocable_class(user_object_ptr.get(),
29 T::register_functions())),
31 if constexpr(std::is_base_of_v<GroupReference, T>) {
32 (**user_object_ptr).set_group_pointers(group, subgroup_index);
41 this,
std::type_index(typeid(T)), subgroup_index, shard_num)),
44 subgroup_id(subgroup_id),
45 subgroup_index(subgroup_index),
47 group_rpc_manager(group_rpc_manager),
50 T::register_functions())),
80 + std::to_string(dest_node) +
": it is not a member of the Group.");
83 [
this, &dest_node](
size_t size) ->
char* {
85 if(size <= max_payload_size) {
89 throw derecho_exception(
"The size of serialized args exceeds the maximum message size.");
92 std::forward<Args>(args)...);
94 return std::move(return_pair.results);
100 template <
typename T>
104 size_t payload_size_for_multicast_send =
wrapped_this->template get_size_for_ordered_send<tag>(std::forward<Args>(args)...);
106 using Ret =
typename std::remove_pointer<decltype(
wrapped_this->template getReturnType<tag>(
107 std::forward<Args>(args)...))>::type;
110 auto serializer = [&](
char* buffer) {
113 auto send_return_struct =
wrapped_this->template send<tag>(
114 [&buffer, &max_payload_size](
size_t size) ->
char* {
115 if(size <= max_payload_size) {
118 throw derecho_exception(
"The size of serialized args exceeds the maximum message size.");
121 std::forward<Args>(args)...);
123 pending_ptr = &send_return_struct.pending;
129 ->multicast_group->send(
subgroup_id, payload_size_for_multicast_send, serializer,
true);
132 return std::move(*results_ptr);
138 template <
typename T>
140 const std::function<
void(
char* buf)>& msg_generator) {
144 template <
typename T>
149 template <
typename T>
151 auto bind_socket_write = [&receiver_socket](
const char* bytes, std::size_t size) {
152 receiver_socket.
write(bytes, size);
158 template <
typename T>
160 auto bind_socket_write = [&receiver_socket](
const char* bytes, std::size_t size) {
161 receiver_socket.
write(bytes, size);
166 template <
typename T>
173 if constexpr(std::is_base_of_v<GroupReference, T>) {
179 template <
typename T>
186 if(persisted_ver == -1) {
189 persisted_ver = version;
191 }
while(persisted_ver < version);
194 template <
typename T>
199 template <
typename T>
204 template <
typename T>
208 subgroup_id(subgroup_id),
209 group_rpc_manager(group_rpc_manager),
210 wrapped_this(group_rpc_manager.make_remote_invoker<T>(type_id, subgroup_id,
211 T::register_functions())) {}
215 template <
typename T>
222 + std::to_string(dest_node) +
": it is not a member of the Group.");
225 [
this, &dest_node](
size_t size) ->
char* {
227 if(size <= max_payload_size) {
231 throw derecho_exception(
"The size of serialized args exceeds the maximum message size.");
234 std::forward<Args>(args)...);
236 return std::move(return_pair.results);
242 template <
typename T>
246 auto send_result = EC.template p2p_send<tag>(shard_reps.at(0), std::forward<Args>(args)...);
247 std::vector<decltype(send_result)> send_result_vec;
248 send_result_vec.emplace_back(std::move(send_result));
249 for(uint i = 1; i < shard_reps.size(); ++i) {
250 send_result_vec.emplace_back(EC.template p2p_send<tag>(shard_reps[i], std::forward<Args>(args)...));
252 return send_result_vec;
uint32_t subgroup_id_t
Type alias for the internal Subgroup IDs generated by ViewManager.
const uint32_t shard_num
The index, within the subgroup, of the shard that replicates this object.
partial_wrapped< Tag, Ret, NewClass, Args... > tag(Ret(NewClass::*fun)(Args...))
User-facing entry point for the series of functions that binds a FunctionTag to a class's member func...
unsigned long long FunctionTag
This file include all common types internal to derecho and not necessarily being known by a client pr...
std::size_t object_size() const
uint32_t subgroup_type_id_t
Type of the numeric ID used to refer to subgroup types within a Group; this is currently computed as ...
const subgroup_id_t subgroup_id
The internally-generated subgroup ID of the subgroup that replicates this object. ...
Data structure that holds a set of promises for a single RPC function call; the promises transmit one...
auto p2p_send(node_id_t dest_node, Args &&... args)
Sends a peer-to-peer message to a single member of the subgroup that this ExternalCaller<T> connects ...
const node_id_t node_id
The ID of this node.
std::unique_ptr< rpc::RemoteInvokerFor< T > > wrapped_this
The actual implementation of ExternalCaller, which has lots of ugly template parameters.
std::map< subgroup_id_t, uint64_t > get_max_payload_sizes()
std::size_t receive_object(char *buffer)
Updates the state of the "wrapped" object by replacing it with the object serialized in a buffer...
subgroup_id_t subgroup_id
The internally-generated subgroup ID of the subgroup that this ExternalCaller will contact...
const persistent::version_t get_minimum_latest_persisted_version() noexcept(false)
Returns the minimum among the "latest version" numbers of all Persistent fields of this object...
auto bytes_size(const T &)
Just calls sizeof(T)
void send(subgroup_id_t subgroup_num, long long unsigned int payload_size, const std::function< void(char *buf)> &msg_generator, bool cooked_send=false)
Instructs the managed DerechoGroup's to send the next message.
volatile char * get_sendbuffer_ptr(uint32_t dest_id, sst::REQUEST_TYPE type)
Retrieves a buffer for sending P2P messages from the RPCManager's pool of P2P RDMA connections...
std::unique_ptr< persistent::PersistentRegistry > persistent_registry_ptr
persistent registry for persistent<t>
const uint32_t subgroup_index
The manager for any RemoteDeserializationContexts.
rpc::RPCManager & group_rpc_manager
Reference to the RPCManager for the Group this Replicated is in.
Exception that means the user requested an operation targeting a specific node and that node was not ...
Replicated(subgroup_type_id_t type_id, node_id_t nid, subgroup_id_t subgroup_id, uint32_t subgroup_index, uint32_t shard_num, rpc::RPCManager &group_rpc_manager, Factory< T > client_object_factory, _Group *group)
Constructs a Replicated<T> that enables sending and receiving RPC function calls for an object of typ...
void finish_p2p_send(node_id_t dest_node, subgroup_id_t dest_subgroup_id, PendingBase &pending_results_handle)
Sends the next P2P message buffer over an RDMA connection to the specified node, and registers the "p...
std::function< std::unique_ptr< T >(persistent::PersistentRegistry *)> Factory
The type of factory function the user must provide to the Group constructor, to construct each Replic...
void send_object_raw(tcp::socket &receiver_socket) const
Serializes and sends the state of the "wrapped" object (of type T) for this Replicated<T> over the gi...
const uint64_t compute_global_stability_frontier(subgroup_id_t subgroup_num)
SharedLockedReference< View > get_current_view()
virtual void persist(const persistent::version_t version) noexcept(false)
persist the data to the latest version
auto p2p_send(Args &&... args)
ViewManager & view_manager
std::shared_timed_mutex view_mutex
Controls access to curr_view.
uint32_t node_id_t
Type alias for Node IDs in a Derecho group.
bool finish_rpc_send(subgroup_id_t subgroup_id, PendingBase &pending_results_handle)
Sends the next message in the MulticastGroup's send buffer (which is assumed to be an RPC message pre...
ExternalCaller(uint32_t type_id, node_id_t nid, subgroup_id_t subgroup_id, rpc::RPCManager &group_rpc_manager)
std::unique_ptr< rpc::RemoteInvocableOf< T > > wrapped_this
The actual implementation of Replicated<T>, hiding its ugly template parameters.
std::condition_variable_any view_change_cv
Notified when curr_view changes (i.e.
std::vector< RemoteDeserializationContext_p > RemoteDeserialization_v
void destroy_remote_invocable_class(uint32_t instance_id)
std::unique_ptr< std::unique_ptr< T > > user_object_ptr
The user-provided state object with some RPC methods.
mutils::RemoteDeserialization_v rdv
An emtpy DeserializationManager, in case we need it later.
std::enable_if_t< std::is_pod< BR >::value > post_object(const F &f, const BR &br, Args &&... args)
In-place serialization is also sometimes possible.
bool write(const char *buffer, size_t size)
Writes size bytes from the given buffer to the socket.
Exception that means a reference-like type is "empty" (does not contain a valid object).
Data structure that (indirectly) holds a set of futures for a single RPC function call; there is one ...
std::unique_ptr< View > curr_view
The current View, containing the state of the managed group.
void send_object(tcp::socket &receiver_socket) const
Serializes and sends the state of the "wrapped" object (of type T) for this Replicated<T> over the gi...
auto ordered_send(Args &&... args)
Sends a multicast to the entire subgroup that replicates this Replicated<T>, invoking the RPC functio...
auto p2p_send(node_id_t dest_node, Args &&... args)
Sends a peer-to-peer message to a single member of the subgroup that replicates this Replicated<T>...
void send(unsigned long long int payload_size, const std::function< void(char *buf)> &msg_generator)
Submits a call to send a "raw" (byte array) message in a multicast to this object's subgroup; the mes...
Base exception class for all exceptions raised by Derecho.
const node_id_t node_id
The ID of this node.
const uint64_t compute_global_stability_frontier()
rpc::RPCManager & group_rpc_manager
Reference to the RPCManager for the Group this ExternalCaller is in.