19 #include <type_traits> 24 #include "../derecho_exception.hpp" 25 #include "../derecho_type_definitions.hpp" 29 #include <mutils/macro_utils.hpp> 41 template <
char... str>
43 static constexpr uint64_t
hash() {
44 char string[] = {str...};
45 uint64_t hash_code = 0;
46 for(
const int c :
string) {
48 hash_code = hash_code * 31 + c;
87 +
std::to_string(who)),
100 +
std::to_string(who)
101 +
std::string(
" has been removed from the group.")),
113 "and can no longer send the RPC message.") {}
137 const std::function<
char*(
int)>& out_alloc)>;
143 template <
typename T>
157 template <
typename Ret>
160 using map_fut = std::future<std::unique_ptr<reply_map<Ret>>>;
176 assert(rmap.size() == 0 || rmap.count(nid) != 0);
177 return (rmap.size() > 0) && rmap.at(nid).valid();
186 auto begin() {
return std::begin(rmap); }
188 auto end() {
return std::end(rmap); }
191 if(rmap.size() == 0) {
195 assert(rmap.size() > 0);
196 assert(rmap.count(nid));
197 assert(rmap.at(nid).valid());
198 return rmap.at(nid).get();
210 : pending_rmap{std::move(o.pending_rmap)},
211 replies{std::move(o.replies)} {}
218 template <
typename Time>
220 if(replies.rmap.size() == 0) {
221 if(pending_rmap.wait_for(t) == std::future_status::ready) {
222 replies.
rmap = std::move(*pending_rmap.get());
236 using namespace std::chrono;
238 if(
auto rmap = wait(5min)) {
255 using map_fut = std::future<std::unique_ptr<std::set<node_id_t>>>;
256 using map = std::set<node_id_t>;
267 ReplyMap(
const ReplyMap&) =
delete;
268 ReplyMap(ReplyMap&& rm) : parent(rm.parent), rmap(
std::move(rm.rmap)) {}
271 assert(rmap.size() == 0 || rmap.count(nid) != 0);
272 return (rmap.size() > 0) && rmap.count(nid) > 0;
281 auto begin() {
return std::begin(rmap); }
283 auto end() {
return std::end(rmap); }
289 ReplyMap replies{*
this};
294 : pending_rmap{std::move(o.pending_rmap)},
295 replies{std::move(o.replies)} {}
302 template <
typename Time>
304 if(replies.rmap.size() == 0) {
305 if(pending_rmap.wait_for(t) == std::future_status::ready) {
306 replies.rmap = std::move(*pending_rmap.get());
320 using namespace std::chrono;
322 if(
auto rmap = wait(5min)) {
337 virtual void set_exception_for_removed_node(
const node_id_t&) = 0;
338 virtual void set_exception_for_caller_removed() = 0;
339 virtual bool all_responded()
const = 0;
340 virtual void reset() = 0;
352 template <
typename Ret>
368 bool map_fulfilled =
false;
373 : reply_promises_are_ready(promise_for_reply_promises.get_future()) {}
391 dbg_default_trace(
"Got a call to fulfill_map for PendingResults<{}>",
typeid(Ret).name());
392 std::unique_ptr<reply_map<Ret>> futures_map = std::make_unique<reply_map<Ret>>();
393 std::map<node_id_t, std::promise<Ret>> promises_map;
394 for(
const auto& e : who) {
395 futures_map->emplace(e, promises_map[e].get_future());
397 dest_nodes.insert(who.begin(), who.end());
399 promise_for_reply_promises.set_value(std::move(promises_map));
400 promise_for_pending_map.set_value(std::move(futures_map));
401 map_fulfilled =
true;
410 promise_for_pending_map.set_exception(
413 if(reply_promises.size() == 0) {
414 reply_promises = std::move(reply_promises_are_ready.get());
417 for(
auto& node_and_promise : reply_promises) {
418 if(responded_nodes.find(node_and_promise.first)
419 == responded_nodes.end()) {
420 node_and_promise.second.set_exception(
428 assert(map_fulfilled);
429 if(dest_nodes.find(removed_nid) != dest_nodes.end()
430 && responded_nodes.find(removed_nid) == responded_nodes.end()) {
431 set_exception(removed_nid,
432 std::make_exception_ptr(
444 std::lock_guard<std::mutex> lock(reply_promises_are_ready_mutex);
445 responded_nodes.insert(nid);
446 if(reply_promises.size() == 0) {
447 dbg_default_trace(
"PendingResults<{}>::set_value about to wait on reply_promises_are_ready",
typeid(Ret).name());
449 reply_promises = std::move(reply_promises_are_ready.get());
451 reply_promises.at(nid).set_value(v);
461 responded_nodes.insert(nid);
462 if(reply_promises.size() == 0) {
463 reply_promises = std::move(reply_promises_are_ready.get());
465 reply_promises.at(nid).set_exception(e);
473 return map_fulfilled && (responded_nodes == dest_nodes);
480 promise_for_pending_map = std::promise<std::unique_ptr<reply_map<Ret>>>();
481 promise_for_reply_promises = std::promise<std::map<node_id_t, std::promise<Ret>>>();
482 reply_promises_are_ready = promise_for_reply_promises.get_future();
484 reply_promises.clear();
485 map_fulfilled =
false;
487 responded_nodes.clear();
501 bool map_fulfilled =
false;
509 auto nodes_sent_set = std::make_unique<std::set<node_id_t>>();
510 for(
const node_id_t& node : sent_nodes) {
511 nodes_sent_set->emplace(node);
513 promise_for_pending_map.set_value(std::move(nodes_sent_set));
514 map_fulfilled =
true;
521 promise_for_pending_map.set_exception(
527 return map_fulfilled;
531 promise_for_pending_map = std::promise<std::unique_ptr<std::set<node_id_t>>>();
532 map_fulfilled =
false;
539 namespace remote_invocation_utilities {
540 #define RPC_HEADER_FLAG_TST(f, name) \ 541 ((f) & (((uint32_t)1L) << (_RPC_HEADER_FLAG_##name))) 542 #define RPC_HEADER_FLAG_SET(f, name) \ 543 ((f) |= (((uint32_t)1L) << (_RPC_HEADER_FLAG_##name))) 544 #define RPC_HEADER_FLAG_CLR(f, name) \ 545 ((f) &= ~(((uint32_t)1L) << (_RPC_HEADER_FLAG_##name))) 548 #define _RPC_HEADER_FLAG_CASCADE (0) 549 #define _RPC_HEADER_FLAG_RESERVED (1) 552 return sizeof(std::size_t) +
sizeof(
Opcode) +
sizeof(
node_id_t) +
sizeof(uint32_t);
558 return (
char*)calloc(i + hs,
sizeof(
char)) + hs;
562 const std::size_t& payload_size,
564 const uint32_t& flags) {
565 std::size_t offset = 0;
566 static_assert(
sizeof(op) ==
sizeof(
Opcode),
"Opcode& is not the same size as Opcode!");
567 reinterpret_cast<std::size_t*
>(reply_buf + offset)[0] = payload_size;
568 offset +=
sizeof(payload_size);
569 reinterpret_cast<Opcode*
>(reply_buf + offset)[0] = op;
570 offset +=
sizeof(op);
571 reinterpret_cast<node_id_t*
>(reply_buf + offset)[0] = from;
572 offset +=
sizeof(from);
573 reinterpret_cast<uint32_t*
>(reply_buf + offset)[0] = flags;
578 const char* reply_buf,
579 std::size_t& payload_size,
Opcode& op,
581 std::size_t offset = 0;
582 payload_size =
reinterpret_cast<const std::size_t*
>(reply_buf + offset)[0];
583 offset +=
sizeof(payload_size);
584 op =
reinterpret_cast<const Opcode*
>(reply_buf + offset)[0];
585 offset +=
sizeof(op);
586 from =
reinterpret_cast<const node_id_t*
>(reply_buf + offset)[0];
587 offset +=
sizeof(from);
588 flags =
reinterpret_cast<const uint32_t*
>(reply_buf + offset)[0];
595 #define CT_STRING(...) derecho::rpc::String<MACRO_GET_STR(#__VA_ARGS__)> uint32_t subgroup_id_t
Type alias for the internal Subgroup IDs generated by ViewManager.
static constexpr uint64_t hash()
QueryResults(QueryResults &&o)
unsigned long long FunctionTag
node_removed_from_group_exception(node_id_t who)
uint32_t subgroup_type_id_t
Type of the numeric ID used to refer to subgroup types within a Group; this is currently computed as ...
void fulfill_map(const node_list_t &sent_nodes)
subgroup_id_t subgroup_id
Data structure that holds a set of promises for a single RPC function call; the promises transmit one...
An RPC function call can be uniquely identified by the tuple (class, subgroup ID, function ID...
std::map< node_id_t, std::promise< Ret > > reply_promises
std::size_t header_space()
std::set< node_id_t > responded_nodes
bool valid(const node_id_t &nid)
void set_exception_for_caller_removed()
Sets exceptions to indicate to the sender of this RPC call that it has been removed from its subgroup...
bool all_responded() const
sender_removed_from_group_exception()
void set_exception_for_caller_removed()
subgroup_type_id_t class_id
void set_exception_for_removed_node(const node_id_t &removed_nid)
void reset()
reset this object.
virtual ~PendingResults()
This "compile-time String" puts a short sequence of characters into a type's template parameter...
std::future< std::map< node_id_t, std::promise< Ret > > > reply_promises_are_ready
A future for a map containing one promise for each reply to the RPC function call.
Specialization of QueryResults for void functions, which do not generate replies. ...
std::future< std::unique_ptr< reply_map< Ret > >> map_fut
std::future< std::unique_ptr< std::set< node_id_t > >> map_fut
bool all_responded() const
std::promise< std::map< node_id_t, std::promise< Ret > > > promise_for_reply_promises
bool operator==(const Opcode &lhs, const Opcode &rhs)
void set_exception_for_removed_node(const node_id_t &)
bool contains(const node_id_t &nid)
bool operator<(const Opcode &lhs, const Opcode &rhs)
std::vector< node_id_t > node_list_t
void populate_header(char *reply_buf, const std::size_t &payload_size, const Opcode &op, const node_id_t &from, const uint32_t &flags)
bool valid(const node_id_t &nid)
std::exception_ptr possible_exception
void fulfill_map(const node_list_t &who)
Fill pending_map and reply_promises with one promise/future pair for each node that was contacted in ...
Indicates that an RPC call from this node was aborted because this node was removed from its subgroup...
#define dbg_default_trace(...)
std::map< node_id_t, std::future< T > > reply_map
The type of map contained in a QueryResults::ReplyMap.
bool contains(const node_id_t &nid)
remote_exception_occurred(node_id_t who)
void retrieve_header(mutils::RemoteDeserialization_v *rdv, const char *reply_buf, std::size_t &payload_size, Opcode &op, node_id_t &from, uint32_t &flags)
#define dbg_default_flush()
ReplyMap * wait(Time t)
Wait the specified duration; if a ReplyMap is available after that duration, return it...
char * extra_alloc(int i)
uint32_t node_id_t
Type alias for Node IDs in a Derecho group.
Return type of all the RemoteInvocable::receive_* methods.
std::mutex reply_promises_are_ready_mutex
QueryResults< Ret > get_future()
Constructs and returns a QueryResults representing the "future" end of the response promises in this ...
void set_value(const node_id_t &nid, const Ret &v)
Fulfills a promise for a single node's reply by setting the value that the node returned for the RPC ...
std::vector< RemoteDeserializationContext_p > RemoteDeserialization_v
void set_exception(const node_id_t &nid, const std::exception_ptr e)
Fulfills a promise for a single node's reply by setting an exception that was thrown by the RPC funct...
Indicates that an RPC call to a node failed because the node was removed from the Replicated Object's...
ReplyMap(QueryResults &qr)
Abstract base type for PendingResults.
ReplyMap * wait(Time t)
Wait the specified duration; if a ReplyMap is available after that duration, return it...
std::function< recv_ret(mutils::RemoteDeserialization_v *rdv, const node_id_t &, const char *recv_buf, const std::function< char *(int)> &out_alloc)> receive_fun_t
Type signature for all the RemoteInvocable::receive_* methods.
std::promise< std::unique_ptr< reply_map< Ret > > > promise_for_pending_map
A promise for a map containing one future for each reply to the RPC function call.
ReplyMap(QueryResults &qr)
Data structure that (indirectly) holds a set of futures for a single RPC function call; there is one ...
std::set< node_id_t > map
Base exception class for all exceptions raised by Derecho.
QueryResults< void > get_future()
std::promise< std::unique_ptr< std::set< node_id_t > > > promise_for_pending_map
Indicates that an RPC call failed because executing the RPC function on the remote node resulted in a...
QueryResults(QueryResults &&o)