19 extern map<uint16_t, shared_ptr<group>>
groups;
26 vector<uint32_t> _members, uint32_t _member_index,
29 unique_ptr<schedule> _schedule)
31 group_number(_group_number),
32 block_size(_block_size),
33 num_members(members.size()),
34 member_index(_member_index),
35 transfer_schedule(
std::move(_schedule)),
36 completion_callback(callback),
37 incoming_message_upcall(upcall) {}
44 return it !=
groups.end() ? it->second :
nullptr;
46 auto send_data_block = [find_group](uint64_t
tag, uint32_t immediate,
49 shared_ptr<group> g = find_group(parsed_tag.
group_number);
50 if(g) g->complete_block_send();
52 auto receive_data_block = [find_group](uint64_t
tag, uint32_t immediate,
55 shared_ptr<group> g = find_group(parsed_tag.
group_number);
56 if(g) g->receive_block(immediate, length);
58 auto send_ready_for_block = [](uint64_t, uint32_t, size_t) {};
60 uint64_t
tag, uint32_t immediate,
size_t length) {
62 shared_ptr<group> g = find_group(parsed_tag.
group_number);
63 if(g) g->receive_ready_for_block(immediate, parsed_tag.
target);
66 message_types.data_block =
message_type(
"rdmc.data_block", send_data_block, receive_data_block);
71 vector<uint32_t> _members, uint32_t _member_index,
74 unique_ptr<schedule> _schedule)
75 :
group(_group_number, _block_size, _members, _member_index, upcall,
76 callback,
std::move(_schedule)),
77 first_block_buffer(nullptr) {
85 for(
auto c : connections) {
99 unique_lock<mutex> lock(
monitor);
109 message_size = received_block_size;
127 "initialized_internal_datastructures");
143 "found_next_transfer");
172 "calling_send_next_block");
177 "returned_from_send_next_block");
186 printf(
"Expected block #%d but got #%d on step %d\n",
205 std::optional<schedule::block_transfer> transfer;
239 unique_lock<mutex> lock(
monitor);
242 auto it = rfb_queue_pairs.find(sender);
243 assert(it != rfb_queue_pairs.end());
258 unique_lock<mutex> lock(
monitor);
261 "finished_sending_block");
276 unique_lock<mutex> lock(
monitor);
290 if(
num_blocks > std::numeric_limits<uint16_t>::max())
312 size_t target = transfer->target;
313 size_t block_number = transfer->block_number;
320 "receiver_not_ready");
332 auto it = queue_pairs.find(target);
333 assert(it != queue_pairs.end());
353 "started_sending_block");
359 "starting_remap_first_block");
377 "finished_remap_first_block");
411 auto it = queue_pairs.find(transfer.
target);
412 assert(it != queue_pairs.end());
436 "posted_receive_buffer");
464 auto it = rfb_queue_pairs.find(neighbor);
465 assert(it != rfb_queue_pairs.end());
const uint16_t group_number
static void initialize_message_types()
partial_wrapped< Tag, Ret, NewClass, Args... > tag(Ret(NewClass::*fun)(Args...))
User-facing entry point for the series of functions that binds a FunctionTag to a class's member func...
virtual void receive_block(uint32_t send_imm, size_t size)
#define LOG_EVENT(group_number, message_number, block_number, event_name)
uint32_t form_immediate(uint16_t total_blocks, uint16_t block_number)
const unique_ptr< schedule > transfer_schedule
const uint32_t member_index
unique_ptr< rdma::memory_region > first_block_mr
void post_recv(schedule::block_transfer transfer)
A C++ wrapper for the IB Verbs ibv_qp struct and its associated functions.
map< size_t, rdma::endpoint > rfb_endpoints
Contains functions and classes for low-level RDMA operations, such as setting up memory regions and q...
unique_ptr< char[]> first_block_buffer
ParsedImmediate parse_immediate(uint32_t imm)
virtual void send_message(std::shared_ptr< rdma::memory_region > message_mr, size_t offset, size_t length)
map< size_t, rdma::endpoint > endpoints
vector< bool > received_blocks
uint64_t form_tag(uint16_t group_number, uint32_t target)
virtual void receive_ready_for_block(uint32_t step, uint32_t sender)=0
group(uint16_t group_number, size_t block_size, vector< uint32_t > members, uint32_t member_index, incoming_message_callback_t upcall, completion_callback_t callback, unique_ptr< schedule > transfer_schedule)
ParsedTag parse_tag(uint64_t t)
std::function< receive_destination(size_t size)> incoming_message_callback_t
A C++ wrapper for the libfabric fid_ep struct and its associated functions.
void connect(uint32_t neighbor)
std::set< uint32_t > receivers_ready
std::function< void(char *buffer, size_t size)> completion_callback_t
static struct polling_group::@2 message_types
std::shared_ptr< rdma::memory_region > mr
polling_group(uint16_t group_number, size_t block_size, vector< uint32_t > members, uint32_t member_index, incoming_message_callback_t upcall, completion_callback_t callback, unique_ptr< schedule > transfer_schedule)
size_t num_received_blocks
void send_ready_for_block(uint32_t neighbor)
optional< size_t > first_block_number
incoming_message_callback_t incoming_message_upcall
virtual void receive_ready_for_block(uint32_t step, uint32_t sender)
completion_callback_t completion_callback
const vector< uint32_t > members
virtual void complete_block_send()
map< uint16_t, shared_ptr< group > > groups