11 #include <type_traits> 16 #include <mutils/FunctionalMap.hpp> 17 #include <mutils/tuple_extras.hpp> 18 #include <spdlog/spdlog.h> 30 template <FunctionTag,
typename>
33 template <FunctionTag,
typename>
46 template <
FunctionTag Tag,
typename Ret,
typename... Args>
65 #define MAX_CONCURRENT_RPCS_PER_INVOKER (4096) 69 using lock_t = std::unique_lock<std::mutex>;
75 std::integral_constant<FunctionTag, Tag>
const*
const,
85 template <
typename A,
typename... Rest>
88 return size + serialize_one(v + size, rest...);
92 return serialize_one(v, args...);
114 send_return
send(
const std::function<
char*(
int)>& out_alloc,
115 const std::decay_t<Args>&... remote_args) {
117 std::size_t invocation_id = invocation_id_sequencer++;
122 size += std::accumulate(t.begin(), t.end(), 0);
124 char* serialized_args = out_alloc(size);
126 auto v = serialized_args +
mutils::to_bytes(invocation_id, serialized_args);
127 auto check_size =
mutils::bytes_size(invocation_id) + serialize_all(v, remote_args...);
132 results_vector[invocation_id].
reset();
135 dbg_default_trace(
"Ready to send an RPC call message with invocation ID {}", invocation_id);
136 return send_return{size, serialized_args, pending_results.
get_future(),
146 template <
typename definitely_
char>
150 const node_id_t& nid,
const char* response,
151 const std::function<definitely_char*(
int)>&) {
153 bool is_exception = response[0];
154 long int invocation_id = ((
long int*)(response + 1))[0];
185 dbg_default_trace(
"Received an RPC response for invocation ID {} from node {}", invocation_id, nid);
186 results_vector[invocation_id].
set_value(nid, *mutils::from_bytes<Ret>(dsm, response + 1 +
sizeof(invocation_id)));
197 const node_id_t& nid,
const char* response,
198 const std::function<
char*(
int)>&) {
214 const node_id_t& nid,
const char* response,
215 const std::function<
char*(
int)>& f) {
216 constexpr std::is_same<void, Ret>* choice{
nullptr};
219 return receive_response(choice, &dsm, nid, response, f);
246 std::map<Opcode, receive_fun_t>& receivers)
247 : invoke_opcode{class_id, instance_id, Tag,
false},
248 reply_opcode{class_id, instance_id, Tag,
true},
249 invocation_id_sequencer(0) {
250 receivers.emplace(reply_opcode, [
this](
auto... a) {
251 return this->receive_response(a...);
266 template <
FunctionTag Tag,
typename Ret,
typename... Args>
277 std::integral_constant<FunctionTag, Tag>
const*
const,
291 const node_id_t& caller,
const char* _recv_buf,
292 const std::function<
char*(
int)>& out_alloc) {
293 long int invocation_id = ((
long int*)_recv_buf)[0];
294 auto recv_buf = _recv_buf +
sizeof(
long int);
298 auto out = out_alloc(result_size);
300 ((
long int*)(out + 1))[0] = invocation_id;
302 dbg_default_trace(
"Ready to send an RPC reply for invocation ID {} to node {}", invocation_id, caller);
303 return recv_ret{reply_opcode, result_size, out,
nullptr};
305 char* out = out_alloc(
sizeof(
long int) + 1);
307 ((
long int*)(out + 1))[0] = invocation_id;
308 return recv_ret{reply_opcode,
sizeof(
long int) + 1, out,
309 std::current_exception()};
320 const std::function<
char*(
int)>&) {
322 auto recv_buf = _recv_buf +
sizeof(
long int);
324 return recv_ret{reply_opcode, 0,
nullptr};
339 const node_id_t& who,
const char* recv_buf,
340 const std::function<
char*(
int)>& out_alloc) {
341 constexpr std::is_same<Ret, void>* choice{
nullptr};
344 return this->receive_call(choice, &dsm, who, recv_buf, out_alloc);
357 std::map<Opcode, receive_fun_t>& receivers,
358 std::function<Ret(Args...)> f)
359 : remote_invocable_function(f),
360 invoke_opcode{class_id, instance_id, Tag,
false},
361 reply_opcode{class_id, instance_id, Tag,
true} {
362 receivers.emplace(invoke_opcode, [
this](
auto... a) {
363 return this->receive_call(a...);
370 template <FunctionTag Tag,
typename NotAFunction>
380 template <
FunctionTag Tag,
typename Ret,
typename... Arguments>
382 using fun_t = std::function<Ret(Arguments...)>;
392 template <
FunctionTag Tag,
typename Ret,
typename Class,
typename... Arguments>
394 using fun_t = Ret (Class::*)(Arguments...);
402 template <
FunctionTag Tag,
typename Ret,
typename Class,
typename... Arguments>
404 using fun_t = Ret (Class::*)(Arguments...)
const;
419 template <
typename NewClass,
FunctionTag Tag,
typename Ret,
typename... Args>
423 return wrapped<Tag, std::function<Ret(Args...)>>{
424 [_this, fun = partial.
fun](Args... arguments) {
426 assert(_this->get());
427 return (_this->get()->*fun)(arguments...);
435 template <
typename NewClass,
FunctionTag Tag,
typename Ret,
typename... Args>
439 return wrapped<Tag, std::function<Ret(Args...)>>{
440 [_this, fun = partial.
fun](Args... arguments) {
441 return (_this->get()->*fun)(arguments...);
459 template <
FunctionTag Tag,
typename NewClass,
typename Ret,
typename... Args>
461 static_assert(!std::is_reference<Ret>::value && !std::is_pointer<Ret>::value,
"RPC-registered functions cannot return references or pointers!");
463 static_assert(((std::is_reference<Args>::value ||
sizeof(Args) < 2 *
sizeof(
void*)) && ...),
"RPC-registered functions must take non-pointer-size arguments by reference to avoid extra copying.");
468 template <
FunctionTag Tag,
typename NewClass,
typename Ret,
typename... Args>
470 static_assert(!std::is_reference<Ret>::value && !std::is_pointer<Ret>::value,
"RPC-registered functions cannot return references or pointers!");
471 static_assert(((std::is_reference<Args>::value ||
sizeof(Args) < 2 *
sizeof(
void*)) && ...),
"RPC-registered functions must take non-pointer-size arguments by reference to avoid extra copying.");
480 template <
typename...>
487 template <FunctionTag
id,
typename FunType>
491 uint32_t instance_id,
492 std::map<Opcode, receive_fun_t>& receivers, FunType function_ptr)
493 :
RemoteInvoker<id, FunType>(class_id, instance_id, receivers),
494 RemoteInvocable<id, FunType>(class_id, instance_id, receivers, function_ptr) {}
512 template <
FunctionTag id,
typename FunType,
typename... rest>
515 template <
typename... RestFunTypes>
517 uint32_t instance_id,
518 std::map<Opcode, receive_fun_t>& receivers,
519 FunType function_ptr,
520 RestFunTypes&&... function_ptrs)
521 :
RemoteInvoker<id, FunType>(class_id, instance_id, receivers),
522 RemoteInvocable<id, FunType>(class_id, instance_id, receivers, function_ptr),
523 RemoteInvocablePairs<rest...>(class_id, instance_id, receivers,
std::forward<RestFunTypes>(function_ptrs)...) {}
536 template <
typename...>
543 template <FunctionTag Tag,
typename FunType>
546 uint32_t instance_id,
547 std::map<Opcode, receive_fun_t>& receivers)
548 :
RemoteInvoker<Tag, FunType>(class_id, instance_id, receivers) {}
563 template <
FunctionTag Tag,
typename FunType,
typename... RestWrapped>
567 uint32_t instance_id,
568 std::map<Opcode, receive_fun_t>& receivers)
569 :
RemoteInvoker<Tag, FunType>(class_id, instance_id, receivers),
570 RemoteInvokers<RestWrapped...>(class_id, instance_id, receivers) {}
586 template <
class IdentifyingClass,
typename... WrappedFuns>
592 std::map<Opcode, receive_fun_t>& rvrs,
const WrappedFuns&... fs)
599 long int invocation_id = 0;
610 constexpr std::integral_constant<FunctionTag, Tag>* choice{
nullptr};
611 return this->get_invoker(choice, args...).returnRet();
625 auto send(
const std::function<
char*(
int)>& out_alloc, Args&&... args) {
626 using namespace remote_invocation_utilities;
628 constexpr std::integral_constant<FunctionTag, Tag>* choice{
nullptr};
629 auto& invoker = this->get_invoker(choice, args...);
631 auto sent_return = invoker.send(
632 [&out_alloc, &header_size](std::size_t size) {
633 return out_alloc(size + header_size) + header_size;
635 std::forward<Args>(args)...);
637 std::size_t payload_size = sent_return.size;
638 char* buf = sent_return.buf - header_size;
654 using Ret =
typename decltype(sent_return.results)::type;
674 return send_return{std::move(sent_return.results),
675 sent_return.pending};
687 template <
class IdentifyingClass>
693 std::map<Opcode, receive_fun_t>& rvrs)
698 auto invocation_id = mutils::long_rand();
702 size += std::accumulate(t.begin(), t.end(), 0);
709 void* null_and_void(
nullptr);
710 return null_and_void;
735 template <
class IdentifyingClass,
typename... WrappedFuns>
737 std::map<Opcode, receive_fun_t>& rvrs,
738 const WrappedFuns&... fs) {
739 return std::make_unique<
RemoteInvocableClass<IdentifyingClass, WrappedFuns...>>(nid, type_id, instance_id, rvrs, fs...);
753 template <
class IdentifyingClass,
typename... WrappedFuns>
759 std::map<Opcode, receive_fun_t>& rvrs)
764 auto send(
const std::function<
char*(
int)>& out_alloc, Args&&... args) {
765 using namespace remote_invocation_utilities;
767 constexpr std::integral_constant<FunctionTag, Tag>* choice{
nullptr};
768 auto& invoker = this->get_invoker(choice, args...);
770 auto sent_return = invoker.send(
771 [&out_alloc, &header_size](std::size_t size) {
772 return out_alloc(size + header_size) + header_size;
774 std::forward<Args>(args)...);
776 std::size_t payload_size = sent_return.size;
777 char* buf = sent_return.buf - header_size;
785 using Ret =
typename decltype(sent_return.results)::type;
794 return send_return{std::move(sent_return.results),
795 sent_return.pending};
802 template <
class IdentifyingClass>
808 std::map<Opcode, receive_fun_t>& rvrs)
827 template <
class IdentifyingClass,
typename... WrappedFuns>
829 std::map<Opcode, receive_fun_t>& rvrs) {
830 return std::make_unique<
RemoteInvokerForClass<IdentifyingClass, WrappedFuns...>>(nid, type_id, instance_id, rvrs);
RemoteInvokers(uint32_t class_id, uint32_t instance_id, std::map< Opcode, receive_fun_t > &receivers)
const Opcode reply_opcode
RemoteInvokers(uint32_t class_id, uint32_t instance_id, std::map< Opcode, receive_fun_t > &receivers)
wrapped< Tag, std::function< Ret(Args...)> > bind_to_instance(std::unique_ptr< NewClass > *_this, const partial_wrapped< Tag, Ret, NewClass, Args... > &partial)
Converts a partial_wrapped<> containing a pointer-to-member-function to a wrapped<> containing the sa...
recv_ret receive_response(mutils::RemoteDeserialization_v *rdv, const node_id_t &nid, const char *response, const std::function< char *(int)> &f)
Entry point for responses; called when a message is received that contains a response to this RemoteI...
partial_wrapped< Tag, Ret, NewClass, Args... > tag(Ret(NewClass::*fun)(Args...))
User-facing entry point for the series of functions that binds a FunctionTag to a class's member func...
unsigned long long FunctionTag
Data structure that holds a set of promises for a single RPC function call; the promises transmit one...
Transforms a class into a "replicated object" with methods that can be invoked by RPC...
An RPC function call can be uniquely identified by the tuple (class, subgroup ID, function ID...
IdentifyingClass specialized_to
std::size_t header_space()
const Opcode invoke_opcode
Ret(Class::*)(Arguments...) const fun_t
auto * getReturnType(Args &&... args)
std::unique_lock< std::mutex > lock_t
auto serialize_one(barray v, const A &a, const Rest &... rest)
send_return send(const std::function< char *(int)> &out_alloc, const std::decay_t< Args > &... remote_args)
Called to construct an RPC message to send that will invoke the remote- invocable function targeted b...
Ret(Class::*)(Arguments...) fun_t
RemoteInvokerForClass(node_id_t nid, uint32_t type_id, uint32_t instance_id, std::map< Opcode, receive_fun_t > &rvrs)
bool in_rpc_handler()
defined in rpc_manager.h
const Opcode invoke_opcode
RemoteInvoker(uint32_t class_id, uint32_t instance_id, std::map< Opcode, receive_fun_t > &receivers)
Constructs a RemoteInvoker that provides RPC call marshalling and response-handling for a specific fu...
const remote_function_type remote_invocable_function
This matches uses of wrapped<> where the second argument is not a function, and does nothing...
PendingResults< Ret > & pending
auto build_remote_invocable_class(const node_id_t nid, const uint32_t type_id, const uint32_t instance_id, std::map< Opcode, receive_fun_t > &rvrs, const WrappedFuns &... fs)
Constructs a RemoteInvocableClass instance that proxies for an instance of the class in the template ...
std::atomic< unsigned short > invocation_id_sequencer
void reset()
reset this object.
Exactly the same as the partial_wrapped template, but for pointer-to-member-functions that are const...
RemoteInvocableClass(node_id_t nid, uint32_t type_id, uint32_t instance_id, std::map< Opcode, receive_fun_t > &rvrs)
recv_ret receive_call(std::true_type const *const, mutils::DeserializationManager *dsm, const node_id_t &, const char *_recv_buf, const std::function< char *(int)> &)
Specialization of receive_call for void functions, which do not need to send a response.
auto bytes_size(const T &)
Just calls sizeof(T)
auto build_remote_invoker_for_class(const node_id_t nid, const uint32_t type_id, const uint32_t instance_id, std::map< Opcode, receive_fun_t > &rvrs)
Constructs a RemoteInvokerForClass that can act as a client for the class in the template parameter (...
recv_ret receive_response(std::true_type *, mutils::DeserializationManager *, const node_id_t &nid, const char *response, const std::function< char *(int)> &)
Specialization of receive_response for void functions (which don't expect any response).
RemoteInvoker & get_invoker(std::integral_constant< FunctionTag, Tag > const *const, const Args &...)
RemoteInvocablePairs(uint32_t class_id, uint32_t instance_id, std::map< Opcode, receive_fun_t > &receivers, FunType function_ptr, RestFunTypes &&... function_ptrs)
std::function< Ret(Args...)> remote_function_type
recv_ret receive_call(std::false_type const *const, mutils::DeserializationManager *dsm, const node_id_t &caller, const char *_recv_buf, const std::function< char *(int)> &out_alloc)
Specialization of receive_call for non-void functions.
The manager for any RemoteDeserializationContexts.
void fulfill_pending_results_vector(long int invocation_id, const node_list_t &who)
Populates the pending-results map of a particular invocation of the remote-invocable function...
#define MAX_CONCURRENT_RPCS_PER_INVOKER
auto serialize_one(barray)
std::vector< node_id_t > node_list_t
void populate_header(char *reply_buf, const std::size_t &payload_size, const Opcode &op, const node_id_t &from, const uint32_t &flags)
std::size_t get_size(Args &&... a)
IdentifyingClass specialized_to
std::function< Ret(Args...)> remote_function_type
auto * getReturnType(Args &&... args)
void fulfill_map(const node_list_t &who)
Fill pending_map and reply_promises with one promise/future pair for each node that was contacted in ...
auto send(const std::function< char *(int)> &out_alloc, Args &&... args)
Constructs a message that will remotely invoke a method of this class, supplying the specified argume...
#define dbg_default_trace(...)
RemoteInvocable(uint32_t class_id, uint32_t instance_id, std::map< Opcode, receive_fun_t > &receivers, std::function< Ret(Args...)> f)
Constructs a RemoteInvocable that provides RPC call handling for a specific function, and registers the RPC-handling functions in the given "receivers" map.
Template that pairs a FunctionTag with a pointer-to-member-function.
const Opcode reply_opcode
RemoteInvocableClass(node_id_t nid, uint32_t type_id, uint32_t instance_id, std::map< Opcode, receive_fun_t > &rvrs, const WrappedFuns &... fs)
RemoteInvocableClass & for_class(IdentifyingClass *)
uint32_t node_id_t
Type alias for Node IDs in a Derecho group.
Return type of all the RemoteInvocable::receive_* methods.
Transforms a class into an RPC client for the methods of that class, given a place to store RPC messa...
auto send(const std::function< char *(int)> &out_alloc, Args &&... args)
QueryResults< Ret > results
QueryResults< Ret > get_future()
Constructs and returns a QueryResults representing the "future" end of the response promises in this ...
RemoteInvocableClass & for_class(IdentifyingClass *)
auto deserialize_and_run(DeserializationManager *dsm, char *v, const F &fun)
Calls mutils::from_bytes_noalloc<T>(ctx,v), dereferences the result, and passes it to fun...
recv_ret receive_call(mutils::RemoteDeserialization_v *rdv, const node_id_t &who, const char *recv_buf, const std::function< char *(int)> &out_alloc)
Entry point for handling an RPC function call to this RemoteInvocable function.
#define dbg_default_info(...)
void set_value(const node_id_t &nid, const Ret &v)
Fulfills a promise for a single node's reply by setting the value that the node returned for the RPC ...
std::vector< RemoteDeserializationContext_p > RemoteDeserialization_v
void set_exception(const node_id_t &nid, const std::exception_ptr e)
Fulfills a promise for a single node's reply by setting an exception that was thrown by the RPC funct...
#define assert_always(x...)
std::size_t get_size_for_ordered_send(Args &&... a)
std::size_t to_bytes(const ByteRepresentable &b, char *v)
calls b.to_bytes(v) when b is a ByteRepresentable; calls std::memcpy() when b is POD.
Technically, RemoteInvokers is a specialization of this template, but it's the only specialization...
Data structure that (indirectly) holds a set of futures for a single RPC function call; there is one ...
auto serialize_all(barray v, const Args &... args)
recv_ret receive_response(std::false_type *, mutils::DeserializationManager *dsm, const node_id_t &nid, const char *response, const std::function< definitely_char *(int)> &)
Specialization of receive_response for non-void functions.
std::function< Ret(Arguments...)> fun_t
RemoteInvokerForClass(node_id_t nid, uint32_t type_id, uint32_t instance_id, std::map< Opcode, receive_fun_t > &rvrs)
Indicates that an RPC call failed because executing the RPC function on the remote node resulted in a...
RemoteInvocablePairs(uint32_t class_id, uint32_t instance_id, std::map< Opcode, receive_fun_t > &receivers, FunType function_ptr)
RemoteInvocable & get_handler(std::integral_constant< FunctionTag, Tag > const *const, const Args &...)
#define RPC_HEADER_FLAG_SET(f, name)