Derecho  0.9
Distributed systems toolkit for RDMA
group_send.cpp
Go to the documentation of this file.
4 
5 #ifdef USE_VERBS_API
7 #else
9 #endif
10 
11 #include <cassert>
12 #include <cstring>
13 
14 using namespace std;
15 using namespace rdma;
16 using namespace rdmc;
17 
18 namespace rdmc {
19 extern map<uint16_t, shared_ptr<group>> groups;
20 extern mutex groups_lock;
21 }; // namespace rdmc
22 
24 
25 group::group(uint16_t _group_number, size_t _block_size,
26  vector<uint32_t> _members, uint32_t _member_index,
28  completion_callback_t callback,
29  unique_ptr<schedule> _schedule)
30  : members(_members),
31  group_number(_group_number),
32  block_size(_block_size),
33  num_members(members.size()),
34  member_index(_member_index),
35  transfer_schedule(std::move(_schedule)),
36  completion_callback(callback),
37  incoming_message_upcall(upcall) {}
38 group::~group() { unique_lock<mutex> lock(monitor); }
39 
41  auto find_group = [](uint16_t group_number) {
42  unique_lock<mutex> lock(groups_lock);
43  auto it = groups.find(group_number);
44  return it != groups.end() ? it->second : nullptr;
45  };
46  auto send_data_block = [find_group](uint64_t tag, uint32_t immediate,
47  size_t length) {
48  ParsedTag parsed_tag = parse_tag(tag);
49  shared_ptr<group> g = find_group(parsed_tag.group_number);
50  if(g) g->complete_block_send();
51  };
52  auto receive_data_block = [find_group](uint64_t tag, uint32_t immediate,
53  size_t length) {
54  ParsedTag parsed_tag = parse_tag(tag);
55  shared_ptr<group> g = find_group(parsed_tag.group_number);
56  if(g) g->receive_block(immediate, length);
57  };
58  auto send_ready_for_block = [](uint64_t, uint32_t, size_t) {};
59  auto receive_ready_for_block = [find_group](
60  uint64_t tag, uint32_t immediate, size_t length) {
61  ParsedTag parsed_tag = parse_tag(tag);
62  shared_ptr<group> g = find_group(parsed_tag.group_number);
63  if(g) g->receive_ready_for_block(immediate, parsed_tag.target);
64  };
65 
66  message_types.data_block = message_type("rdmc.data_block", send_data_block, receive_data_block);
67  message_types.ready_for_block = message_type(
68  "rdmc.ready_for_block", send_ready_for_block, receive_ready_for_block);
69 }
70 polling_group::polling_group(uint16_t _group_number, size_t _block_size,
71  vector<uint32_t> _members, uint32_t _member_index,
73  completion_callback_t callback,
74  unique_ptr<schedule> _schedule)
75  : group(_group_number, _block_size, _members, _member_index, upcall,
76  callback, std::move(_schedule)),
77  first_block_buffer(nullptr) {
78  if(member_index != 0) {
79  first_block_buffer = unique_ptr<char[]>(new char[block_size]);
80  memset(first_block_buffer.get(), 0, block_size);
81  first_block_mr = make_unique<memory_region>(first_block_buffer.get(), block_size);
82  }
83 
84  auto connections = transfer_schedule->get_connections();
85  for(auto c : connections) {
86  connect(c);
87  }
88 
89  if(member_index > 0) {
90  auto transfer = transfer_schedule->get_first_block(num_blocks);
91  first_block_number = transfer->block_number;
92  post_recv(*transfer);
93  incoming_block = transfer->block_number;
94  send_ready_for_block(transfer->target);
95  // puts("Issued Ready For Block CCCCCCCCC");
96  }
97 }
98 void polling_group::receive_block(uint32_t send_imm, size_t received_block_size) {
99  unique_lock<mutex> lock(monitor);
100 
101  assert(member_index > 0);
102 
103  if(receive_step == 0) {
105  first_block_number = min(transfer_schedule->get_first_block(num_blocks)->block_number,
106  num_blocks - 1);
108  if(num_blocks == 1) {
109  message_size = received_block_size;
110  }
111 
112  assert(*first_block_number == parse_immediate(send_imm).block_number);
113 
115  auto destination = incoming_message_upcall(message_size);
116  mr_offset = destination.offset;
117  mr = destination.mr;
118 
119  assert(mr->size >= mr_offset + message_size);
121 
123  received_blocks = vector<bool>(num_blocks);
125 
127  "initialized_internal_datastructures");
128 
129  assert(receive_step == 0);
130  auto transfer = transfer_schedule->get_incoming_transfer(num_blocks,
131  receive_step);
132  while((!transfer || transfer->block_number == *first_block_number) && receive_step < transfer_schedule->get_total_steps(num_blocks)) {
133  transfer = transfer_schedule->get_incoming_transfer(num_blocks, ++receive_step);
134  }
135 
136  // cout << "receive_step = " << receive_step
137  // << " transfer->block_number = "
138  // << transfer->block_number
139  // << " first_block_number = " << *first_block_number
140  // << " total_steps = " << get_total_steps() << endl;
141 
143  "found_next_transfer");
144 
145  if(transfer) {
146  LOG_EVENT(group_number, message_number, transfer->block_number,
147  "posting_recv");
148  // printf("Posting recv #%d (receive_step = %d,
149  // *first_block_number =
150  // %d, total_steps = %d)\n",
151  // (int)transfer->block_number, (int)receive_step,
152  // (int)*first_block_number, (int)get_total_steps());
153  post_recv(*transfer);
154  incoming_block = transfer->block_number;
155  send_ready_for_block(transfer->target);
156  // cout << "Issued Ready For Block AAAAAAAA (receive_step = "
157  // << receive_step << ", target = " << transfer->target << ")"
158  // << endl;
159 
160  for(auto r = receive_step + 1; r < transfer_schedule->get_total_steps(num_blocks); r++) {
161  auto t = transfer_schedule->get_incoming_transfer(num_blocks, r);
162  if(t) {
163  // cout << "posting block for step " << (int)r
164  // << " (block #" << (*t).block_number << ")" << endl;
165  post_recv(*t);
166  break;
167  }
168  }
169  }
170 
172  "calling_send_next_block");
173 
174  send_next_block();
175 
177  "returned_from_send_next_block");
178 
179  if(!sending && num_received_blocks == num_blocks && send_step == transfer_schedule->get_total_steps(num_blocks)) {
181  }
182  } else {
183  // assert(tag.index() <= tag.message_size());
184  size_t block_number = incoming_block;
185  if(block_number != parse_immediate(send_imm).block_number) {
186  printf("Expected block #%d but got #%d on step %d\n",
187  (int)block_number,
188  (int)parse_immediate(send_imm).block_number,
189  (int)receive_step);
190  fflush(stdout);
191  }
192  assert(block_number == parse_immediate(send_imm).block_number);
193 
194  if(block_number == num_blocks - 1) {
195  message_size = (num_blocks - 1) * block_size + received_block_size;
196  } else {
197  assert(received_block_size == block_size);
198  }
199 
200  received_blocks[block_number] = true;
201 
202  LOG_EVENT(group_number, message_number, block_number, "received_block");
203 
204  // Figure out the next block to receive.
205  std::optional<schedule::block_transfer> transfer;
206  while(!transfer && receive_step + 1 < transfer_schedule->get_total_steps(num_blocks)) {
207  transfer = transfer_schedule->get_incoming_transfer(num_blocks, ++receive_step);
208  }
209 
210  // Post a receive for it.
211  if(transfer) {
212  incoming_block = transfer->block_number;
213  send_ready_for_block(transfer->target);
214  // cout << "Issued Ready For Block BBBBBBBB (receive_step = "
215  // << receive_step << ", target = " << transfer->target
216  // << ", total_steps = " << get_total_steps() << ")" << endl;
217  for(auto r = receive_step + 1; r < transfer_schedule->get_total_steps(num_blocks); r++) {
218  auto t = transfer_schedule->get_incoming_transfer(num_blocks, r);
219  if(t) {
220  post_recv(*t);
221  break;
222  }
223  }
224  }
225 
226  // If we just finished receiving a block and we weren't
227  // previously sending, then try to send now.
228  if(!sending) {
229  send_next_block();
230  }
231  // If we just received the last block and aren't still sending then
232  // issue a completion callback
233  if(++num_received_blocks == num_blocks && !sending && send_step == transfer_schedule->get_total_steps(num_blocks)) {
235  }
236  }
237 }
238 void polling_group::receive_ready_for_block(uint32_t step, uint32_t sender) {
239  unique_lock<mutex> lock(monitor);
240 
241 #ifdef USE_VERBS_API
242  auto it = rfb_queue_pairs.find(sender);
243  assert(it != rfb_queue_pairs.end());
244 #else
245  auto it = rfb_endpoints.find(sender);
246  assert(it != rfb_endpoints.end());
247 #endif
248  it->second.post_empty_recv(form_tag(group_number, sender),
249  message_types.ready_for_block);
250 
251  receivers_ready.insert(sender);
252 
253  if(!sending && mr) {
254  send_next_block();
255  }
256 }
258  unique_lock<mutex> lock(monitor);
259 
261  "finished_sending_block");
262 
263  send_next_block();
264 
265  // If we just send the last block, and were already done
266  // receiving, then signal completion and prepare for the next
267  // message.
268  if(!sending && send_step == transfer_schedule->get_total_steps(num_blocks) && (member_index == 0 || num_received_blocks == num_blocks)) {
270  }
271 }
272 void polling_group::send_message(shared_ptr<memory_region> message_mr, size_t offset,
273  size_t length) {
274  LOG_EVENT(group_number, -1, -1, "send()");
275 
276  unique_lock<mutex> lock(monitor);
277 
278  if(length == 0) throw rdmc::invalid_args();
279  if(offset + length > message_mr->size) throw rdmc::invalid_args();
280  if(member_index > 0) throw rdmc::nonroot_sender();
281 
282  // Queueing sends is not supported
283  if(receive_step > 0) throw rdmc::group_busy();
284  if(send_step > 0) throw rdmc::group_busy();
285 
286  mr = message_mr;
287  mr_offset = offset;
288  message_size = length;
289  num_blocks = (message_size - 1) / block_size + 1;
290  if(num_blocks > std::numeric_limits<uint16_t>::max())
291  throw rdmc::invalid_args();
292  // printf("message_size = %lu, block_size = %lu, num_blocks = %lu\n",
293  // message_size, block_size, num_blocks);
294  LOG_EVENT(group_number, message_number, -1, "send_message");
295 
296  send_next_block();
297  // No need to worry about completion here. We must send at least
298  // one block, so we can't be done already.
299 }
301  sending = false;
302  if(send_step == transfer_schedule->get_total_steps(num_blocks)) {
303  return;
304  }
305  auto transfer = transfer_schedule->get_outgoing_transfer(num_blocks, send_step);
306  while(!transfer) {
307  if(++send_step == transfer_schedule->get_total_steps(num_blocks)) return;
308 
309  transfer = transfer_schedule->get_outgoing_transfer(num_blocks, send_step);
310  }
311 
312  size_t target = transfer->target;
313  size_t block_number = transfer->block_number;
314  // size_t forged_block_number = transfer->forged_block_number;
315 
316  if(member_index > 0 && !received_blocks[block_number]) return;
317 
318  if(receivers_ready.count(transfer->target) == 0) {
319  LOG_EVENT(group_number, message_number, block_number,
320  "receiver_not_ready");
321  return;
322  }
323 
324  receivers_ready.erase(transfer->target);
325  sending = true;
326  ++send_step;
327 
328  // printf("sending block #%d to node #%d on step %d\n", (int)block_number,
329  // (int)target, (int)send_step-1);
330  // fflush(stdout);
331 #ifdef USE_VERBS_API
332  auto it = queue_pairs.find(target);
333  assert(it != queue_pairs.end());
334 #else
335  auto it = endpoints.find(target);
336  assert(it != endpoints.end());
337 #endif
338  if(first_block_number && block_number == *first_block_number) {
339  CHECK(it->second.post_send(*first_block_mr, 0, block_size,
340  form_tag(group_number, target),
341  form_immediate(num_blocks, block_number),
342  message_types.data_block));
343  } else {
344  size_t offset = block_number * block_size;
345  size_t nbytes = min(block_size, message_size - offset);
346  CHECK(it->second.post_send(*mr, mr_offset + offset, nbytes,
347  form_tag(group_number, target),
348  form_immediate(num_blocks, block_number),
349  message_types.data_block));
350  }
351  outgoing_block = block_number;
352  LOG_EVENT(group_number, message_number, block_number,
353  "started_sending_block");
354 }
356  // remap first_block into buffer
357  if(member_index > 0 && first_block_number) {
359  "starting_remap_first_block");
360  // if(block_size > (128 << 10) && (block_size % 4096 == 0)) {
361  // char *tmp_buffer =
362  // (char *)mmap(NULL, block_size, PROT_READ | PROT_WRITE,
363  // MAP_ANON | MAP_PRIVATE, -1, 0);
364 
365  // mremap(buffer + block_size * (*first_block_number), block_size,
366  // block_size, MREMAP_FIXED | MREMAP_MAYMOVE, tmp_buffer);
367 
368  // mremap(first_block_buffer, block_size, block_size,
369  // MREMAP_FIXED | MREMAP_MAYMOVE,
370  // buffer + block_size * (*first_block_number));
371  // first_block_buffer = tmp_buffer;
372  // } else {
373  memcpy(mr->buffer + mr_offset + block_size * (*first_block_number),
375  // }
377  "finished_remap_first_block");
378  }
380 
381  ++message_number;
382  sending = false;
383  send_step = 0;
384  receive_step = 0;
385  mr.reset();
386  // if(first_block_buffer == nullptr && member_index > 0){
387  // first_block_buffer = (char*)mmap(NULL, block_size,
388  // PROT_READ|PROT_WRITE,
389  // MAP_ANON|MAP_PRIVATE, -1, 0);
390  // memset(first_block_buffer, 1, block_size);
391  // memset(first_block_buffer, 0, block_size);
392  // }
393  first_block_number = std::nullopt;
394 
395  if(member_index != 0) {
397  received_blocks.clear();
398  auto transfer = transfer_schedule->get_first_block(num_blocks);
399  assert(transfer);
400  first_block_number = transfer->block_number;
401  post_recv(*transfer);
402  incoming_block = transfer->block_number;
403  send_ready_for_block(transfer->target);
404  // cout << "Issued Ready For Block DDDDDDD (target = " <<
405  // transfer->target
406  // << ")" << endl;
407  }
408 }
410 #ifdef USE_VERBS_API
411  auto it = queue_pairs.find(transfer.target);
412  assert(it != queue_pairs.end());
413 #else
414  auto it = endpoints.find(transfer.target);
415  assert(it != endpoints.end());
416 #endif
417  // printf("Posting receive buffer for block #%d from node #%d\n",
418  // (int)transfer.block_number, (int)transfer.target);
419  // fflush(stdout);
420 
422  CHECK(it->second.post_recv(*first_block_mr, 0, block_size,
423  form_tag(group_number, transfer.target),
424  message_types.data_block));
425  } else {
426  size_t offset = block_size * transfer.block_number;
427  size_t length = min(block_size, (size_t)(message_size - offset));
428 
429  if(length > 0) {
430  CHECK(it->second.post_recv(*mr, mr_offset + offset, length,
431  form_tag(group_number, transfer.target),
432  message_types.data_block));
433  }
434  }
436  "posted_receive_buffer");
437 }
438 void polling_group::connect(uint32_t neighbor) {
439 #ifdef USE_VERBS_API
440  queue_pairs.emplace(neighbor, queue_pair(members[neighbor]));
441 
442  auto post_recv = [this, neighbor](rdma::queue_pair* qp) {
443  qp->post_empty_recv(form_tag(group_number, neighbor),
444  message_types.ready_for_block);
445  };
446 
447  rfb_queue_pairs.emplace(neighbor, endpoint(members[neighbor], post_recv));
448 #else
449  // Decide whether the endpoint will act as a server in the connection
450  bool is_lf_server = members[member_index] < members[neighbor];
451  endpoints.emplace(neighbor, endpoint(members[neighbor], is_lf_server));
452 
453  auto post_recv = [this, neighbor](rdma::endpoint* ep) {
454  ep->post_empty_recv(form_tag(group_number, neighbor),
455  message_types.ready_for_block);
456  };
457 
458  rfb_endpoints.emplace(neighbor, endpoint(members[neighbor], is_lf_server, post_recv));
459 #endif
460 }
461 
462 void polling_group::send_ready_for_block(uint32_t neighbor) {
463 #ifdef USE_VERBS_API
464  auto it = rfb_queue_pairs.find(neighbor);
465  assert(it != rfb_queue_pairs.end());
466 #else
467  auto it = rfb_endpoints.find(neighbor);
468  assert(it != rfb_endpoints.end());
469 #endif
470 
471  it->second.post_empty_send(form_tag(group_number, neighbor), 0,
472  message_types.ready_for_block);
473 }
const uint16_t group_number
Definition: group_send.hpp:30
static void initialize_message_types()
Definition: group_send.cpp:40
partial_wrapped< Tag, Ret, NewClass, Args... > tag(Ret(NewClass::*fun)(Args...))
User-facing entry point for the series of functions that binds a FunctionTag to a class&#39;s member func...
virtual void receive_block(uint32_t send_imm, size_t size)
Definition: group_send.cpp:98
Definition: rdmc.hpp:20
size_t num_blocks
Definition: group_send.hpp:42
const size_t block_size
Definition: group_send.hpp:31
mutex groups_lock
Definition: rdmc.cpp:35
#define LOG_EVENT(group_number, message_number, block_number, event_name)
Definition: util.hpp:64
size_t message_number
Definition: group_send.hpp:74
STL namespace.
uint32_t form_immediate(uint16_t total_blocks, uint16_t block_number)
Definition: message.hpp:31
const unique_ptr< schedule > transfer_schedule
Definition: group_send.hpp:35
void send_next_block()
Definition: group_send.cpp:300
size_t send_step
Definition: group_send.hpp:78
const uint32_t member_index
Definition: group_send.hpp:33
size_t receive_step
Definition: group_send.hpp:83
unique_ptr< rdma::memory_region > first_block_mr
Definition: group_send.hpp:69
std::mutex monitor
Definition: group_send.hpp:37
void post_recv(schedule::block_transfer transfer)
Definition: group_send.cpp:409
A C++ wrapper for the IB Verbs ibv_qp struct and its associated functions.
size_t outgoing_block
Definition: group_send.hpp:76
map< size_t, rdma::endpoint > rfb_endpoints
Definition: group_send.hpp:92
Contains functions and classes for low-level RDMA operations, such as setting up memory regions and q...
Definition: lf_helper.hpp:28
uint16_t group_number
Definition: message.hpp:10
unique_ptr< char[]> first_block_buffer
Definition: group_send.hpp:71
size_t message_size
Definition: group_send.hpp:41
size_t incoming_block
Definition: group_send.hpp:73
ParsedImmediate parse_immediate(uint32_t imm)
Definition: message.hpp:27
virtual void send_message(std::shared_ptr< rdma::memory_region > message_mr, size_t offset, size_t length)
Definition: group_send.cpp:272
virtual ~group()
Definition: group_send.cpp:38
map< size_t, rdma::endpoint > endpoints
Definition: group_send.hpp:91
vector< bool > received_blocks
Definition: group_send.hpp:84
uint64_t form_tag(uint16_t group_number, uint32_t target)
Definition: message.hpp:18
virtual void receive_ready_for_block(uint32_t step, uint32_t sender)=0
group(uint16_t group_number, size_t block_size, vector< uint32_t > members, uint32_t member_index, incoming_message_callback_t upcall, completion_callback_t callback, unique_ptr< schedule > transfer_schedule)
Definition: group_send.cpp:25
ParsedTag parse_tag(uint64_t t)
Definition: message.hpp:14
std::function< receive_destination(size_t size)> incoming_message_callback_t
Definition: rdmc.hpp:41
A C++ wrapper for the libfabric fid_ep struct and its associated functions.
Definition: lf_helper.hpp:157
void connect(uint32_t neighbor)
Definition: group_send.cpp:438
void CHECK(bool b)
Definition: util.hpp:68
std::set< uint32_t > receivers_ready
Definition: group_send.hpp:67
std::function< void(char *buffer, size_t size)> completion_callback_t
Definition: rdmc.hpp:42
static struct polling_group::@2 message_types
std::shared_ptr< rdma::memory_region > mr
Definition: group_send.hpp:39
polling_group(uint16_t group_number, size_t block_size, vector< uint32_t > members, uint32_t member_index, incoming_message_callback_t upcall, completion_callback_t callback, unique_ptr< schedule > transfer_schedule)
Definition: group_send.cpp:70
uint32_t target
Definition: message.hpp:11
size_t num_received_blocks
Definition: group_send.hpp:82
void send_ready_for_block(uint32_t neighbor)
Definition: group_send.cpp:462
optional< size_t > first_block_number
Definition: group_send.hpp:70
uint16_t total_blocks
Definition: message.hpp:23
size_t mr_offset
Definition: group_send.hpp:40
incoming_message_callback_t incoming_message_upcall
Definition: group_send.hpp:45
virtual void receive_ready_for_block(uint32_t step, uint32_t sender)
Definition: group_send.cpp:238
completion_callback_t completion_callback
Definition: group_send.hpp:44
const vector< uint32_t > members
Definition: group_send.hpp:29
void complete_message()
Definition: group_send.cpp:355
virtual void complete_block_send()
Definition: group_send.cpp:257
map< uint16_t, shared_ptr< group > > groups
Definition: rdmc.cpp:34