Derecho  0.9
Distributed systems toolkit for RDMA
experiment.cpp
Go to the documentation of this file.
1 #include <derecho/rdmc/rdmc.hpp>
4 #ifdef USE_VERBS_API
6 #else
8 #endif
9 
10 #include <algorithm>
11 #include <atomic>
12 #include <cassert>
13 #include <chrono>
14 #include <cmath>
15 #include <condition_variable>
16 #include <cstdlib>
17 #include <cstring>
18 #include <memory>
19 #include <mutex>
20 #include <set>
21 #include <string>
22 #include <thread>
23 #include <vector>
24 
25 using namespace std;
26 using namespace rdma;
27 
28 template <class T>
29 struct stat {
30  T mean;
31  T stddev;
32 };
33 
34 struct send_stats {
35  stat<double> time; // in ms
36  stat<double> bandwidth; // in Gb/s
37  stat<double> cpu_usage; // core seconds/wall time seconds
38 
39  size_t size;
40  size_t block_size;
41  size_t group_size;
42  size_t iterations;
43 };
44 
45 uint32_t node_rank;
46 uint32_t num_nodes;
47 
48 unique_ptr<rdmc::barrier_group> universal_barrier_group;
50 
52  size_t size, size_t block_size, uint32_t group_size, uint32_t num_senders,
53  size_t iterations, rdmc::send_algorithm type = rdmc::BINOMIAL_SEND,
54  bool use_cv = true) {
55  if(node_rank >= group_size) {
56  // Each iteration involves two barriers: one at the start and one at the
57  // end.
58  for(size_t i = 0; i < iterations * 2; i++) {
59  universal_barrier_group->barrier_wait();
60  }
61 
62  return send_stats();
63  }
64 
65  std::mutex send_mutex;
66  std::condition_variable send_done_cv;
67  atomic<uint64_t> end_time;
68  atomic<uint64_t> end_ptime;
69 
70  size_t num_blocks = (size - 1) / block_size + 1;
71  size_t buffer_size = num_blocks * block_size;
72  auto mr = make_shared<memory_region>(buffer_size * num_senders);
73  char *buffer = mr->buffer;
74 
75  uint16_t base_group_number = next_group_number;
76  atomic<uint32_t> sends_remaining;
77 
78  for(uint16_t i = 0u; i < num_senders; i++) {
79  vector<uint32_t> members;
80  for(uint32_t j = 0; j < group_size; j++) {
81  members.push_back((j + i) % group_size);
82  }
84  base_group_number + i, members, block_size, type,
85  [&mr, i, buffer_size](size_t length) -> rdmc::receive_destination {
86  return {mr, buffer_size * i};
87  },
88  [&](char *data, size_t) {
89  if(--sends_remaining == 0) {
90  universal_barrier_group->barrier_wait();
91  end_ptime = get_process_time();
92  end_time = get_time();
93  unique_lock<mutex> lk(send_mutex);
94  send_done_cv.notify_all();
95  }
96  },
97  [group_number = base_group_number + i](std::optional<uint32_t>) {
98  LOG_EVENT(group_number, -1, -1, "send_failed");
99  CHECK(false);
100  }));
101  }
102 
103  vector<double> rates;
104  vector<double> times;
105  vector<double> cpu_usages;
106 
107  for(size_t i = 0; i < iterations; i++) {
108  sends_remaining = num_senders;
109  end_time = 0;
110  end_ptime = 0;
111 
112  if(node_rank < num_senders) {
113  for(size_t j = 0; j < size; j += 256)
114  buffer[node_rank * buffer_size + j] = (rand() >> 5) % 256;
115  }
116 
117  universal_barrier_group->barrier_wait();
118 
119  uint64_t start_ptime = get_process_time();
120  uint64_t start_time = get_time();
121 
122  if(node_rank < num_senders) {
123  CHECK(rdmc::send(base_group_number + node_rank, mr,
124  buffer_size * node_rank, size));
125  }
126 
127  if(use_cv) {
128  unique_lock<mutex> lk(send_mutex);
129  send_done_cv.wait(lk, [&] { return end_time != 0; });
130  } else {
131  while(end_time == 0)
132  /* do nothing*/;
133  }
134 
135  uint64_t time_diff = end_time - start_time;
136  uint64_t ptime_diff = end_ptime - start_ptime;
137  rates.push_back(8.0 * size * num_senders / time_diff);
138  times.push_back(1.0e-6 * time_diff);
139  cpu_usages.push_back((double)ptime_diff / time_diff);
140  }
141 
142  for(auto i = 0u; i < group_size; i++) {
143  rdmc::destroy_group(base_group_number + i);
144  }
145 
146  send_stats s;
147  s.size = size;
148  s.block_size = block_size;
149  s.group_size = group_size;
150  s.iterations = iterations;
151 
152  s.time.mean = compute_mean(times);
153  s.time.stddev = compute_stddev(times);
154  s.bandwidth.mean = compute_mean(rates);
155  s.bandwidth.stddev = compute_stddev(rates);
156  s.cpu_usage.mean = compute_mean(cpu_usages);
157  s.cpu_usage.stddev = compute_stddev(cpu_usages);
158  return s;
159 }
160 
161 send_stats measure_multicast(size_t size, size_t block_size,
162  uint32_t group_size, size_t iterations,
164  bool use_cv = true) {
165  return measure_partially_concurrent_multicast(size, block_size, group_size,
166  1, iterations, type, use_cv);
167 }
168 
169 void blocksize_v_bandwidth(uint16_t gsize) {
170  const size_t min_block_size = 16ull << 10;
171  const size_t max_block_size = 16ull << 20;
172 
173  puts("=========================================================");
174  puts("= Block Size vs. Bandwdith (Gb/s) =");
175  puts("=========================================================");
176  printf("Group Size = %d\n", (int)gsize);
177  printf("Send Size, ");
178  for(auto block_size = min_block_size; block_size <= max_block_size;
179  block_size *= 2) {
180  if(block_size >= 1 << 20)
181  printf("%d MB, ", (int)(block_size >> 20));
182  else
183  printf("%d KB, ", (int)(block_size >> 10));
184  }
185  for(auto block_size = min_block_size; block_size <= max_block_size;
186  block_size *= 2) {
187  if(block_size >= 1 << 20)
188  printf("%d MB stddev, ", (int)(block_size >> 20));
189  else
190  printf("%d KB stddev, ", (int)(block_size >> 10));
191  }
192  puts("");
193  fflush(stdout);
194  for(auto size : {256ull << 20, 64ull << 20, 16ull << 20, 4ull << 20,
195  1ull << 20, 256ull << 10, 64ull << 10, 16ull << 10}) {
196  if(size >= 1 << 20)
197  printf("%d MB, ", (int)(size >> 20));
198  else if(size >= 1 << 10)
199  printf("%d KB, ", (int)(size >> 10));
200  else
201  printf("%d B, ", (int)(size));
202 
203  vector<double> stddevs;
204  for(auto block_size = min_block_size; block_size <= max_block_size;
205  block_size *= 2) {
206  if(block_size > size) {
207  printf(", ");
208  continue;
209  }
210  auto s = measure_multicast(size, block_size, gsize, 8);
211  printf("%f, ", s.bandwidth.mean);
212  fflush(stdout);
213 
214  stddevs.push_back(s.bandwidth.stddev);
215  }
216  for(auto s : stddevs) {
217  printf("%f, ", s);
218  }
219  puts("");
220  fflush(stdout);
221  }
222  puts("");
223  fflush(stdout);
224 }
226  puts("=========================================================");
227  puts("= Compare Send Types - Bandwidth (Gb/s) =");
228  puts("=========================================================");
229  puts(
230  "Group Size,"
231  "Binomial Pipeline (256 MB),Chain Send (256 MB),Sequential Send (256 "
232  "MB),Tree Send (256 MB),"
233  "Binomial Pipeline (64 MB),Chain Send (64 MB),Sequential Send (64 "
234  "MB),Tree Send (64 MB),"
235  "Binomial Pipeline (8 MB),Chain Send (8 MB),Sequential Send (8 "
236  "MB),Tree Send (8 MB),");
237  fflush(stdout);
238 
239  const size_t block_size = 1 << 20;
240  const size_t iterations = 64;
241  for(int gsize = num_nodes; gsize >= 2; --gsize) {
242  auto bp8 = measure_multicast(8 << 20, block_size, gsize, iterations,
244  auto bp64 = measure_multicast(64 << 20, block_size, gsize, iterations,
246  auto bp256 = measure_multicast(256 << 20, block_size, gsize, iterations,
248  auto cs8 = measure_multicast(8 << 20, block_size, gsize, iterations,
250  auto cs64 = measure_multicast(64 << 20, block_size, gsize, iterations,
252  auto cs256 = measure_multicast(256 << 20, block_size, gsize, iterations,
254  auto ss8 = measure_multicast(8 << 20, block_size, gsize, iterations,
256  auto ss64 = measure_multicast(64 << 20, block_size, gsize, iterations,
258  auto ss256 = measure_multicast(256 << 20, block_size, gsize, iterations,
260  auto ts8 = measure_multicast(8 << 20, block_size, gsize, iterations,
262  auto ts64 = measure_multicast(64 << 20, block_size, gsize, iterations,
264  auto ts256 = measure_multicast(256 << 20, block_size, gsize, iterations,
266  printf(
267  "%d, %f, %f, %f, %f, %f, %f, %f, %f, %f, %f, %f, %f, %f, %f, %f, "
268  "%f, %f, %f, %f, %f, %f, %f, %f, %f\n",
269  gsize, bp256.bandwidth.mean, cs256.bandwidth.mean,
270  ss256.bandwidth.mean, ts256.bandwidth.mean, bp64.bandwidth.mean,
271  cs64.bandwidth.mean, ss64.bandwidth.mean, ts64.bandwidth.mean,
272  bp8.bandwidth.mean, cs8.bandwidth.mean, ss8.bandwidth.mean,
273  ts8.bandwidth.mean, bp256.bandwidth.stddev, cs256.bandwidth.stddev,
274  ss256.bandwidth.stddev, ts256.bandwidth.stddev,
275  bp64.bandwidth.stddev, cs64.bandwidth.stddev, ss64.bandwidth.stddev,
276  ts64.bandwidth.stddev, bp8.bandwidth.stddev, cs8.bandwidth.stddev,
277  ss8.bandwidth.stddev, ts8.bandwidth.stddev);
278 
279  // ss256.bandwidth.mean, 0.0f /*ts256.bandwidth.mean*/,
280  // bp64.bandwidth.mean, cs64.bandwidth.mean, ss64.bandwidth.mean,
281  // 0.0f /*ts64.bandwidth.mean*/, bp256.bandwidth.stddev,
282  // cs256.bandwidth.stddev, ss256.bandwidth.stddev,
283  // 0.0f /*ts256.bandwidth.stddev*/, bp64.bandwidth.stddev,
284  // cs64.bandwidth.stddev, ss64.bandwidth.stddev,
285  // 0.0f /*ts64.bandwidth.stddev*/);
286  fflush(stdout);
287  }
288  puts("");
289  fflush(stdout);
290 }
292  puts("=========================================================");
293  puts("= Bandwidth vs. Group Size =");
294  puts("=========================================================");
295  puts(
296  "Group Size, 256 MB, 64 MB, 16 MB, 4 MB, "
297  "256stddev, 64stddev, 16stddev, 4stddev");
298  fflush(stdout);
299 
300  for(int gsize = num_nodes; gsize >= 2; --gsize) {
301  auto bp256 = measure_multicast(256 << 20, 1 << 20, gsize, 64,
303  auto bp64 = measure_multicast(64 << 20, 1 << 20, gsize, 64,
305  auto bp16 = measure_multicast(16 << 20, 1 << 20, gsize, 64,
307  auto bp4 = measure_multicast(4 << 20, 1 << 20, gsize, 64, rdmc::BINOMIAL_SEND);
308  printf("%d, %f, %f, %f, %f, %f, %f, %f, %f\n", gsize,
309  bp256.bandwidth.mean, bp64.bandwidth.mean, bp16.bandwidth.mean,
310  bp4.bandwidth.mean, bp256.bandwidth.stddev,
311  bp64.bandwidth.stddev, bp16.bandwidth.stddev,
312  bp4.bandwidth.stddev);
313  fflush(stdout);
314  }
315  puts("");
316  fflush(stdout);
317 }
319  puts("=========================================================");
320  puts("= Concurrent Bandwidth vs. Group Size =");
321  puts("=========================================================");
322  puts(
323  "Group Size, 256 MB, 64 MB, 16 MB, 4 MB, "
324  "256stddev, 64stddev, 16stddev, 4stddev");
325  fflush(stdout);
326 
327  for(int gsize = num_nodes; gsize >= 2; --gsize) {
328  auto bp256 = measure_partially_concurrent_multicast(256 << 20, 1 << 20,
329  gsize, gsize, 16, rdmc::BINOMIAL_SEND);
330  auto bp64 = measure_partially_concurrent_multicast(64 << 20, 1 << 20,
331  gsize, gsize, 16, rdmc::BINOMIAL_SEND);
332  auto bp16 = measure_partially_concurrent_multicast(16 << 20, 1 << 20,
333  gsize, gsize, 16, rdmc::BINOMIAL_SEND);
334  auto bp4 = measure_partially_concurrent_multicast(4 << 20, 1 << 20,
335  gsize, gsize, 16, rdmc::BINOMIAL_SEND);
336  printf("%d, %f, %f, %f, %f, %f, %f, %f, %f\n", gsize,
337  bp256.bandwidth.mean, bp64.bandwidth.mean, bp16.bandwidth.mean,
338  bp4.bandwidth.mean, bp256.bandwidth.stddev,
339  bp64.bandwidth.stddev, bp16.bandwidth.stddev,
340  bp4.bandwidth.stddev);
341  fflush(stdout);
342  }
343  puts("");
344  fflush(stdout);
345 }
346 void active_senders(bool interrupts = false, bool labels = true) {
347  auto compute_block_size = [](size_t message_size) -> size_t {
348  if(message_size < 4096 * 2) return message_size;
349  if(message_size < 4096 * 10) return 4096;
350  if(message_size < 10 * (1 << 20)) return message_size / 10;
351  return 1 << 20;
352  };
353 
354  auto compute_iterations = [](size_t message_size) -> size_t {
355  if(message_size == 1) return 20000;
356  if(message_size == 10000) return 10000;
357  if(message_size == 1'000'000) return 1000;
358  if(message_size == 100'000'000) return 100;
359  return max<size_t>(100000000 / message_size, 4u);
360  };
361 
362  if(labels) {
363  printf("Interrupts, Message Size, Group Size, 1-sender Bandwidth, "
364  "half-sending Bandwidth, all-sending Bandwidth, 1-sender CPU, "
365  "half-sending CPU, all-sending CPU\n");
366  }
367 
368  // rdma::impl::set_interrupt_mode(interrupts);
369 
370  for(size_t message_size : {1, 10000, 1'000'000, 100'000'000}) {
371  for(uint32_t group_size = 3; group_size <= num_nodes; group_size++) {
372  printf("%s, %d, %d, ", interrupts ? "enabled" : "disabled",
373  (int)message_size, (int)group_size);
374  fflush(stdout);
375 
376  vector<double> cpu_usage;
377  for(uint32_t num_senders : {1u, (group_size + 1) / 2, group_size}) {
379  message_size, compute_block_size(message_size), group_size,
380  num_senders, compute_iterations(message_size),
382  printf("%f, ", s.bandwidth.mean);
383  fflush(stdout);
384  cpu_usage.push_back(s.cpu_usage.mean * 100);
385  }
386  for(double usage : cpu_usage) {
387  printf("%f, ", usage);
388  }
389  printf("\n");
390  fflush(stdout);
391  }
392  }
393 }
395  puts("=========================================================");
396  puts("= Latency vs. Group Size =");
397  puts("=========================================================");
398  puts(
399  "Group Size,64 KB,16 KB,4 KB,1 KB,256 B,"
400  "64stddev,16stddev,4stddev,1stddev,256stddev");
401  fflush(stdout);
402 
403  size_t iterations = 10000;
404 
405  for(int gsize = num_nodes; gsize >= 2; gsize /= 2) {
406  auto bp64 = measure_multicast(64 << 10, 32 << 10, gsize, iterations,
408  auto bp16 = measure_multicast(16 << 10, 8 << 10, gsize, iterations,
410  auto bp4 = measure_multicast(4 << 10, 4 << 10, gsize, iterations,
412  auto bp1 = measure_multicast(1 << 10, 1 << 10, gsize, iterations,
414  auto bp256 = measure_multicast(256, 256, gsize, iterations, rdmc::BINOMIAL_SEND);
415  printf("%d, %f, %f, %f, %f, %f, %f, %f, %f, %f, %f\n", gsize,
416  bp64.time.mean, bp16.time.mean, bp4.time.mean, bp1.time.mean,
417  bp256.time.mean, bp64.time.stddev, bp16.time.stddev,
418  bp4.time.stddev, bp1.time.stddev, bp256.time.stddev);
419  fflush(stdout);
420  }
421  puts("");
422  fflush(stdout);
423 }
424 // void small_send_latency_group_size() {
425 // puts("=========================================================");
426 // puts("= Latency vs. Group Size =");
427 // puts("=========================================================");
428 // puts(
429 // "Group Size, 16 KB, 4 KB, 1 KB, 256 Bytes, "
430 // "64stddev, 16stddev, 4stddev, 1stddev");
431 // fflush(stdout);
432 
433 // for(int gsize = num_nodes; gsize >= 2; --gsize) {
434 // auto bp16 = measure_small_multicast(16 << 10, gsize, 16, 512);
435 // auto bp4 = measure_small_multicast(4 << 10, gsize, 16, 512);
436 // auto bp1 = measure_small_multicast(1 << 10, gsize, 16, 512);
437 // auto bp256 = measure_small_multicast(256, gsize, 16, 512);
438 // printf("%d, %f, %f, %f, %f, %f, %f, %f, %f\n", gsize, bp16.time.mean,
439 // bp4.time.mean, bp1.time.mean, bp256.time.mean,
440 // bp16.time.stddev,
441 // bp4.time.stddev, bp1.time.stddev, bp256.time.stddev);
442 // fflush(stdout);
443 // }
444 // puts("");
445 // fflush(stdout);
446 // }
447 void large_send() {
448  LOG_EVENT(-1, -1, -1, "start_large_send");
449  auto s = measure_multicast(16 << 20, 1 << 20, num_nodes, 16,
451  // flush_events();
452  printf("Bandwidth = %f(%f) Gb/s\n", s.bandwidth.mean, s.bandwidth.stddev);
453  printf("Latency = %f(%f) ms\n", s.time.mean, s.time.stddev);
454  // uint64_t eTime = get_time();
455  // double diff = 1.e-6 * (eTime - sTime);
456  // printf("Percent time sending: %f%%", 100.0 * s.time.mean * 16 / diff);
457  fflush(stdout);
458 }
460  LOG_EVENT(-1, -1, -1, "start_concurrent_send");
461  auto s = measure_partially_concurrent_multicast(128 << 20, 1 << 20,
462  num_nodes, num_nodes, 16);
463  // flush_events();
464  printf("Bandwidth = %f(%f) Gb/s\n", s.bandwidth.mean, s.bandwidth.stddev);
465  printf("Latency = %f(%f) ms\n", s.time.mean, s.time.stddev);
466  // uint64_t eTime = get_time();
467  // double diff = 1.e-6 * (eTime - sTime);
468  // printf("Percent time sending: %f%%", 100.0 * s.time.mean * 16 / diff);
469  fflush(stdout);
470 }
471 // void small_send() {
472 // auto s = measure_small_multicast(1024, num_nodes, 4, 128);
473 // printf("Latency = %.2f(%.2f) us\n", s.time.mean * 1000.0,
474 // s.time.stddev * 1000.0);
475 // fflush(stdout);
476 // }
477 
479  if(node_rank > 1) {
480  return;
481  }
482 
483  static volatile atomic<bool> done_flag;
484  done_flag = false;
485 
486  auto nop_handler = [](auto, auto, auto) {};
487  auto done_handler = [](uint64_t tag, uint32_t immediate, size_t length) {
488  if(tag == 0x6000000) done_flag = true;
489  };
490 
491  static message_type mtype_done("ccc.done", nop_handler, nop_handler,
492  done_handler);
493 
494  const int steps = 128;
495  const size_t chunk_size = 1024;
496  const size_t buffer_size = (steps + 1) * chunk_size;
497 
498  // Setup memory region
499  memory_region mr{buffer_size};
500  memset(mr.buffer, 1 + node_rank, buffer_size);
501  memset(mr.buffer, node_rank * 10 + 10, chunk_size);
502  mr.buffer[buffer_size - 1] = 0;
503 
504  auto mqp = make_shared<manager_queue_pair>();
505  managed_queue_pair qp(node_rank == 0 ? 1 : 0, [&](managed_queue_pair *qp) {
506  qp->post_recv(mr, chunk_size, chunk_size, 125, mtype_done);
507  });
508 
509  rdma::task t(mqp);
510  for(int i = 1; i < steps; i++) {
511  t.append_recv(qp, mr, (i + 1) * chunk_size, chunk_size);
512  }
513  for(int i = 0; i < steps; i++) {
514  t.append_send(qp, mr, i * chunk_size, chunk_size, 0);
515  }
516  for(int i = 0; i < steps; i++) {
517  t.append_enable_send(qp, i + 1);
518  t.append_wait(qp.rcq, i + 1, false, false, 0x321000 + i, mtype_done);
519  }
520  t.append_wait(qp.scq, 0, true, true, 0x6000000, mtype_done);
521  CHECK(t.post());
522 
523  while(!done_flag) {
524  }
525 
526  // std::this_thread::sleep_for(std::chrono::seconds(1));
527  // for(int i = 0; i < steps && i < 16; i++) {
528  // printf("%2d ", mr.buffer[i * chunk_size]);
529  // }
530  // printf("\n");
531 
532  puts("PASS");
533 }
534 
536  if(num_nodes <= 2) {
537  puts("FAILURE: must run with at least 3 nodes");
538  }
539  if(node_rank == 0) {
540  puts("Node 0 exiting...");
541  exit(0);
542  }
543 
544  std::this_thread::sleep_for(std::chrono::seconds(1));
545 
546  vector<uint32_t> members;
547  for(uint32_t i = 0; i < num_nodes; i++) {
548  members.push_back(i);
549  }
550 
551  puts("Starting test...");
552  uint64_t t = get_time();
553  bool ret = rdmc::create_group(
554  0, members, 1 << 20, rdmc::BINOMIAL_SEND,
555  [&](size_t length) -> rdmc::receive_destination {
556  puts("FAILURE: incoming message called");
557  return {nullptr, 0};
558  },
559  [&](char *data, size_t) { puts("FAILURE: received message called"); },
560  [group_number = next_group_number](std::optional<uint32_t>) {});
561 
562  t = get_time() - t;
563  if(ret) {
564  puts("FAILURE: Managed to create group containing failed node");
565  } else {
566  printf("time taken: %f ms\n", t * 1e-6);
567  puts("PASS");
568  }
569 }
570 
571 #define ANSI_COLOR_RED "\x1b[31m"
572 #define ANSI_COLOR_GREEN "\x1b[32m"
573 #define ANSI_COLOR_YELLOW "\x1b[33m"
574 #define ANSI_COLOR_BLUE "\x1b[34m"
575 #define ANSI_COLOR_MAGENTA "\x1b[35m"
576 #define ANSI_COLOR_CYAN "\x1b[36m"
577 #define ANSI_COLOR_RESET "\x1b[0m"
578 void test_pattern() {
579  size_t n = 0;
580  auto t = get_time();
581  for(size_t group_size = 2; group_size <= 64; group_size++) {
582  for(size_t message_size = 1; message_size <= 32; message_size++) {
583  size_t total_steps = message_size + ceil(log2(group_size)) - 1;
584  for(unsigned int step = 0; step < total_steps; step++) {
585  for(unsigned int node = 0; node < group_size; node++) {
586  // Compute the outgoing transfer for this node/step
588  node, step, group_size, floor(log2(group_size)),
589  message_size, total_steps);
590  n++;
591 
592  if(transfer) {
593  // See what the supposed sender is doing this step
595  transfer->target, step, group_size,
596  floor(log2(group_size)), message_size, total_steps);
597  n++;
598 
599  // Make sure the two nodes agree
600  if(!reverse) throw false;
601  if(transfer->block_number != reverse->block_number)
602  throw false;
603 
604  // If we aren't the root sender, also check that the
605  // node got this block on a past step.
606  if(node != 0) {
607  for(int s = step - 1; s >= 0; s--) {
609  node, s, group_size,
610  floor(log2(group_size)), message_size,
611  total_steps);
612  n++;
613  if(prev && prev->block_number == transfer->block_number)
614  break;
615 
616  if(s == 0) {
617  throw false;
618  }
619  }
620  }
621  }
622 
623  // Compute the incoming transfer for this node/step
625  node, step, group_size, floor(log2(group_size)),
626  message_size, total_steps);
627  n++;
628 
629  if(transfer) {
630  // Again make sure the supposed receiver agrees
632  transfer->target, step, group_size,
633  floor(log2(group_size)), message_size, total_steps);
634  n++;
635  if(!reverse) throw false;
636  if(transfer->block_number != reverse->block_number)
637  throw false;
638  if(reverse->target != node) throw false;
639 
640  // Make sure we don't already have the block we're
641  // getting.
642  for(int s = step - 1; s >= 0; s--) {
644  node, s, group_size, floor(log2(group_size)),
645  message_size, total_steps);
646  n++;
647  if(prev && prev->block_number == transfer->block_number) {
648  throw false;
649  }
650  }
651  }
652  }
653  }
654 
655  // Make sure that all nodes get every block
656  for(unsigned int node = 1; node < group_size; node++) {
657  set<size_t> blocks;
658  for(unsigned int step = 0; step < total_steps; step++) {
660  node, step, group_size, floor(log2(group_size)),
661  message_size, total_steps);
662  n++;
663 
664  if(transfer) blocks.insert(transfer->block_number);
665  }
666  if(blocks.size() != message_size) throw false;
667  }
668  }
669  }
670  auto diff = get_time() - t;
671  printf("average time = %f ns\n", (double)diff / n);
672  puts("PASS");
673 }
674 
675 int main(int argc, char *argv[]) {
676  // rlimit rlim;
677  // rlim.rlim_cur = RLIM_INFINITY;
678  // rlim.rlim_max = RLIM_INFINITY;
679  // setrlimit(RLIMIT_CORE, &rlim);
680 
681  if(argc >= 2 && strcmp(argv[1], "test_pattern") == 0) {
682  test_pattern();
683  exit(0);
684  } else if(argc >= 2 && strcmp(argv[1], "spin") == 0) {
685  volatile bool b = true;
686  while(b)
687  ;
688  CHECK(false);
689  }
690 
691  LOG_EVENT(-1, -1, -1, "querying_addresses");
692  map<uint32_t, string> addresses;
693  rdmc::query_addresses(addresses, node_rank);
694  num_nodes = addresses.size();
695 
696  LOG_EVENT(-1, -1, -1, "calling_init");
697  assert(rdmc::initialize(addresses, node_rank));
698 
699  LOG_EVENT(-1, -1, -1, "creating_barrier_group");
700  vector<uint32_t> members;
701  for(uint32_t i = 0; i < num_nodes; i++) members.push_back(i);
702  universal_barrier_group = make_unique<rdmc::barrier_group>(members);
703 
704  universal_barrier_group->barrier_wait();
705  uint64_t t1 = get_time();
706  universal_barrier_group->barrier_wait();
707  uint64_t t2 = get_time();
708  reset_epoch();
709  universal_barrier_group->barrier_wait();
710  uint64_t t3 = get_time();
711 
712  printf(
713  "Synchronized clocks.\nTotal possible variation = %5.3f us\n"
714  "Max possible variation from local = %5.3f us\n",
715  (t3 - t1) * 1e-3f, max(t2 - t1, t3 - t2) * 1e-3f);
716  fflush(stdout);
717 
718  TRACE("Finished initializing.");
719 
720  printf("Experiment Name: %s\n", argv[1]);
721  if(argc <= 1 || strcmp(argv[1], "custom") == 0) {
722  for(int i = 0; i < 3; i++) {
723  large_send();
724  }
725  } else if(strcmp(argv[1], "blocksize4") == 0) {
727  } else if(strcmp(argv[1], "blocksize16") == 0) {
729  } else if(strcmp(argv[1], "sendtypes") == 0) {
731  } else if(strcmp(argv[1], "bandwidth") == 0) {
733  } else if(strcmp(argv[1], "overhead") == 0) {
735  } else if(strcmp(argv[1], "smallsend") == 0) {
736  // small_send_latency_group_size();
737  } else if(strcmp(argv[1], "concurrent") == 0) {
739  } else if(strcmp(argv[1], "active_senders") == 0) {
740  active_senders();
741  } else if(strcmp(argv[1], "polling_interrupts") == 0) {
742  active_senders(false, true);
743  active_senders(true, false);
744  } else if(strcmp(argv[1], "test_create_group_failure") == 0) {
746  exit(0);
747  } else if(strcmp(argv[1], "test_cross_channel") == 0) {
749  exit(0);
750  } else {
751  puts("Unrecognized experiment name.");
752  fflush(stdout);
753  }
754 
755  TRACE("About to trigger shutdown");
756  universal_barrier_group->barrier_wait();
757  universal_barrier_group.reset();
758  rdmc::shutdown();
759 }
bool initialize(const std::map< uint32_t, std::pair< ip_addr_t, uint16_t >> &addresses, uint32_t node_rank) __attribute__((warn_unused_result))
Definition: rdmc.cpp:37
partial_wrapped< Tag, Ret, NewClass, Args... > tag(Ret(NewClass::*fun)(Args...))
User-facing entry point for the series of functions that binds a FunctionTag to a class&#39;s member func...
void active_senders(bool interrupts=false, bool labels=true)
Definition: experiment.cpp:346
uint64_t get_time()
Definition: time.h:13
double compute_stddev(std::vector< double > v)
Definition: util.cpp:112
size_t block_size
Definition: experiment.cpp:40
T mean
Definition: experiment.cpp:30
void test_pattern()
Definition: experiment.cpp:578
A wrapper for fi_close.
Definition: lf_helper.hpp:53
#define LOG_EVENT(group_number, message_number, block_number, event_name)
Definition: util.hpp:64
size_t iterations
Definition: experiment.cpp:42
int argc
static optional< block_transfer > get_incoming_transfer(uint32_t node, size_t step, uint32_t num_members, unsigned int log2_num_members, size_t num_blocks, size_t total_steps)
Definition: schedule.cpp:306
STL namespace.
uint16_t next_group_number
Definition: experiment.cpp:49
void test_create_group_failure()
Definition: experiment.cpp:535
double compute_mean(std::vector< double > v)
Definition: util.cpp:108
unique_ptr< rdmc::barrier_group > universal_barrier_group
Definition: experiment.cpp:48
#define TRACE(x)
Definition: util.hpp:33
uint32_t node_rank
Definition: experiment.cpp:45
void reset_epoch()
Definition: util.cpp:102
void destroy_group(uint16_t group_number)
Definition: rdmc.cpp:91
bool send(uint16_t group_number, std::shared_ptr< rdma::memory_region > mr, size_t offset, size_t length) __attribute__((warn_unused_result))
Contains functions and classes for low-level RDMA operations, such as setting up memory regions and q...
Definition: lf_helper.hpp:28
size_t group_size
Definition: experiment.cpp:41
stat< double > bandwidth
Definition: experiment.cpp:36
void concurrent_bandwidth_group_size()
Definition: experiment.cpp:318
char ** argv
void latency_group_size()
Definition: experiment.cpp:394
stat< double > time
Definition: experiment.cpp:35
void large_send()
Definition: experiment.cpp:447
send_algorithm
Definition: rdmc.hpp:28
void shutdown()
Definition: rdmc.cpp:98
int main(int argc, char *argv[])
Definition: experiment.cpp:675
void test_cross_channel()
Definition: experiment.cpp:478
send_stats measure_multicast(size_t size, size_t block_size, uint32_t group_size, size_t iterations, rdmc::send_algorithm type=rdmc::BINOMIAL_SEND, bool use_cv=true)
Definition: experiment.cpp:161
void barrier_wait()
Definition: rdmc.cpp:188
void CHECK(bool b)
Definition: util.hpp:68
void compare_send_types()
Definition: experiment.cpp:225
uint64_t get_process_time()
Definition: time.h:21
bool post_recv(const memory_region &mr, size_t offset, size_t length, uint64_t wr_id, const message_type &type)
static optional< block_transfer > get_outgoing_transfer(uint32_t node, size_t step, uint32_t num_members, unsigned int log2_num_members, size_t num_blocks, size_t total_steps)
Definition: schedule.cpp:234
bool create_group(uint16_t group_number, std::vector< uint32_t > members, size_t block_size, send_algorithm algorithm, incoming_message_callback_t incoming_receive, completion_callback_t send_callback, failure_callback_t failure_callback) __attribute__((warn_unused_result))
Creates a new RDMC group.
Definition: rdmc.cpp:60
T stddev
Definition: experiment.cpp:31
size_t size
Definition: experiment.cpp:39
void blocksize_v_bandwidth(uint16_t gsize)
Definition: experiment.cpp:169
stat< double > cpu_usage
Definition: experiment.cpp:37
void bandwidth_group_size()
Definition: experiment.cpp:291
send_stats measure_partially_concurrent_multicast(size_t size, size_t block_size, uint32_t group_size, uint32_t num_senders, size_t iterations, rdmc::send_algorithm type=rdmc::BINOMIAL_SEND, bool use_cv=true)
Definition: experiment.cpp:51
void concurrent_send()
Definition: experiment.cpp:459
uint32_t num_nodes
Definition: experiment.cpp:46