15 #include <condition_variable> 52 size_t size,
size_t block_size, uint32_t group_size, uint32_t num_senders,
58 for(
size_t i = 0; i < iterations * 2; i++) {
65 std::mutex send_mutex;
66 std::condition_variable send_done_cv;
67 atomic<uint64_t> end_time;
68 atomic<uint64_t> end_ptime;
70 size_t num_blocks = (size - 1) / block_size + 1;
71 size_t buffer_size = num_blocks * block_size;
72 auto mr = make_shared<memory_region>(buffer_size * num_senders);
73 char *buffer = mr->buffer;
76 atomic<uint32_t> sends_remaining;
78 for(uint16_t i = 0u; i < num_senders; i++) {
79 vector<uint32_t> members;
80 for(uint32_t j = 0; j < group_size; j++) {
81 members.push_back((j + i) % group_size);
84 base_group_number + i, members, block_size, type,
86 return {mr, buffer_size * i};
88 [&](
char *data, size_t) {
89 if(--sends_remaining == 0) {
93 unique_lock<mutex> lk(send_mutex);
94 send_done_cv.notify_all();
97 [group_number = base_group_number + i](std::optional<uint32_t>) {
98 LOG_EVENT(group_number, -1, -1,
"send_failed");
103 vector<double> rates;
104 vector<double> times;
105 vector<double> cpu_usages;
107 for(
size_t i = 0; i < iterations; i++) {
108 sends_remaining = num_senders;
113 for(
size_t j = 0; j < size; j += 256)
114 buffer[
node_rank * buffer_size + j] = (rand() >> 5) % 256;
128 unique_lock<mutex> lk(send_mutex);
129 send_done_cv.wait(lk, [&] {
return end_time != 0; });
135 uint64_t time_diff = end_time - start_time;
136 uint64_t ptime_diff = end_ptime - start_ptime;
137 rates.push_back(8.0 * size * num_senders / time_diff);
138 times.push_back(1.0e-6 * time_diff);
139 cpu_usages.push_back((
double)ptime_diff / time_diff);
142 for(
auto i = 0u; i < group_size; i++) {
162 uint32_t group_size,
size_t iterations,
164 bool use_cv =
true) {
166 1, iterations, type, use_cv);
170 const size_t min_block_size = 16ull << 10;
171 const size_t max_block_size = 16ull << 20;
173 puts(
"=========================================================");
174 puts(
"= Block Size vs. Bandwdith (Gb/s) =");
175 puts(
"=========================================================");
176 printf(
"Group Size = %d\n", (
int)gsize);
177 printf(
"Send Size, ");
178 for(
auto block_size = min_block_size; block_size <= max_block_size;
180 if(block_size >= 1 << 20)
181 printf(
"%d MB, ", (
int)(block_size >> 20));
183 printf(
"%d KB, ", (
int)(block_size >> 10));
185 for(
auto block_size = min_block_size; block_size <= max_block_size;
187 if(block_size >= 1 << 20)
188 printf(
"%d MB stddev, ", (
int)(block_size >> 20));
190 printf(
"%d KB stddev, ", (
int)(block_size >> 10));
194 for(
auto size : {256ull << 20, 64ull << 20, 16ull << 20, 4ull << 20,
195 1ull << 20, 256ull << 10, 64ull << 10, 16ull << 10}) {
197 printf(
"%d MB, ", (
int)(size >> 20));
198 else if(size >= 1 << 10)
199 printf(
"%d KB, ", (
int)(size >> 10));
201 printf(
"%d B, ", (
int)(size));
203 vector<double> stddevs;
204 for(
auto block_size = min_block_size; block_size <= max_block_size;
206 if(block_size > size) {
211 printf(
"%f, ", s.bandwidth.mean);
214 stddevs.push_back(s.bandwidth.stddev);
216 for(
auto s : stddevs) {
226 puts(
"=========================================================");
227 puts(
"= Compare Send Types - Bandwidth (Gb/s) =");
228 puts(
"=========================================================");
231 "Binomial Pipeline (256 MB),Chain Send (256 MB),Sequential Send (256 " 232 "MB),Tree Send (256 MB)," 233 "Binomial Pipeline (64 MB),Chain Send (64 MB),Sequential Send (64 " 234 "MB),Tree Send (64 MB)," 235 "Binomial Pipeline (8 MB),Chain Send (8 MB),Sequential Send (8 " 236 "MB),Tree Send (8 MB),");
239 const size_t block_size = 1 << 20;
240 const size_t iterations = 64;
241 for(
int gsize =
num_nodes; gsize >= 2; --gsize) {
267 "%d, %f, %f, %f, %f, %f, %f, %f, %f, %f, %f, %f, %f, %f, %f, %f, " 268 "%f, %f, %f, %f, %f, %f, %f, %f, %f\n",
269 gsize, bp256.bandwidth.mean, cs256.bandwidth.mean,
270 ss256.bandwidth.mean, ts256.bandwidth.mean, bp64.bandwidth.mean,
271 cs64.bandwidth.mean, ss64.bandwidth.mean, ts64.bandwidth.mean,
272 bp8.bandwidth.mean, cs8.bandwidth.mean, ss8.bandwidth.mean,
273 ts8.bandwidth.mean, bp256.bandwidth.stddev, cs256.bandwidth.stddev,
274 ss256.bandwidth.stddev, ts256.bandwidth.stddev,
275 bp64.bandwidth.stddev, cs64.bandwidth.stddev, ss64.bandwidth.stddev,
276 ts64.bandwidth.stddev, bp8.bandwidth.stddev, cs8.bandwidth.stddev,
277 ss8.bandwidth.stddev, ts8.bandwidth.stddev);
292 puts(
"=========================================================");
293 puts(
"= Bandwidth vs. Group Size =");
294 puts(
"=========================================================");
296 "Group Size, 256 MB, 64 MB, 16 MB, 4 MB, " 297 "256stddev, 64stddev, 16stddev, 4stddev");
300 for(
int gsize =
num_nodes; gsize >= 2; --gsize) {
308 printf(
"%d, %f, %f, %f, %f, %f, %f, %f, %f\n", gsize,
309 bp256.bandwidth.mean, bp64.bandwidth.mean, bp16.bandwidth.mean,
310 bp4.bandwidth.mean, bp256.bandwidth.stddev,
311 bp64.bandwidth.stddev, bp16.bandwidth.stddev,
312 bp4.bandwidth.stddev);
319 puts(
"=========================================================");
320 puts(
"= Concurrent Bandwidth vs. Group Size =");
321 puts(
"=========================================================");
323 "Group Size, 256 MB, 64 MB, 16 MB, 4 MB, " 324 "256stddev, 64stddev, 16stddev, 4stddev");
327 for(
int gsize =
num_nodes; gsize >= 2; --gsize) {
336 printf(
"%d, %f, %f, %f, %f, %f, %f, %f, %f\n", gsize,
337 bp256.bandwidth.mean, bp64.bandwidth.mean, bp16.bandwidth.mean,
338 bp4.bandwidth.mean, bp256.bandwidth.stddev,
339 bp64.bandwidth.stddev, bp16.bandwidth.stddev,
340 bp4.bandwidth.stddev);
347 auto compute_block_size = [](
size_t message_size) ->
size_t {
348 if(message_size < 4096 * 2)
return message_size;
349 if(message_size < 4096 * 10)
return 4096;
350 if(message_size < 10 * (1 << 20))
return message_size / 10;
354 auto compute_iterations = [](
size_t message_size) ->
size_t {
355 if(message_size == 1)
return 20000;
356 if(message_size == 10000)
return 10000;
357 if(message_size == 1
'000'000)
return 1000;
358 if(message_size == 100
'000'000)
return 100;
359 return max<size_t>(100000000 / message_size, 4u);
363 printf(
"Interrupts, Message Size, Group Size, 1-sender Bandwidth, " 364 "half-sending Bandwidth, all-sending Bandwidth, 1-sender CPU, " 365 "half-sending CPU, all-sending CPU\n");
370 for(
size_t message_size : {1, 10000, 1
'000'000, 100
'000'000}) {
371 for(uint32_t group_size = 3; group_size <=
num_nodes; group_size++) {
372 printf(
"%s, %d, %d, ", interrupts ?
"enabled" :
"disabled",
373 (
int)message_size, (
int)group_size);
376 vector<double> cpu_usage;
377 for(uint32_t num_senders : {1u, (group_size + 1) / 2, group_size}) {
379 message_size, compute_block_size(message_size), group_size,
380 num_senders, compute_iterations(message_size),
382 printf(
"%f, ", s.bandwidth.mean);
384 cpu_usage.push_back(s.cpu_usage.mean * 100);
386 for(
double usage : cpu_usage) {
387 printf(
"%f, ", usage);
395 puts(
"=========================================================");
396 puts(
"= Latency vs. Group Size =");
397 puts(
"=========================================================");
399 "Group Size,64 KB,16 KB,4 KB,1 KB,256 B," 400 "64stddev,16stddev,4stddev,1stddev,256stddev");
403 size_t iterations = 10000;
405 for(
int gsize =
num_nodes; gsize >= 2; gsize /= 2) {
415 printf(
"%d, %f, %f, %f, %f, %f, %f, %f, %f, %f, %f\n", gsize,
416 bp64.time.mean, bp16.time.mean, bp4.time.mean, bp1.time.mean,
417 bp256.time.mean, bp64.time.stddev, bp16.time.stddev,
418 bp4.time.stddev, bp1.time.stddev, bp256.time.stddev);
448 LOG_EVENT(-1, -1, -1,
"start_large_send");
452 printf(
"Bandwidth = %f(%f) Gb/s\n", s.bandwidth.mean, s.bandwidth.stddev);
453 printf(
"Latency = %f(%f) ms\n", s.time.mean, s.time.stddev);
460 LOG_EVENT(-1, -1, -1,
"start_concurrent_send");
464 printf(
"Bandwidth = %f(%f) Gb/s\n", s.bandwidth.mean, s.bandwidth.stddev);
465 printf(
"Latency = %f(%f) ms\n", s.time.mean, s.time.stddev);
483 static volatile atomic<bool> done_flag;
486 auto nop_handler = [](
auto,
auto,
auto) {};
487 auto done_handler = [](uint64_t
tag, uint32_t immediate,
size_t length) {
488 if(tag == 0x6000000) done_flag =
true;
491 static message_type mtype_done(
"ccc.done", nop_handler, nop_handler,
494 const int steps = 128;
495 const size_t chunk_size = 1024;
496 const size_t buffer_size = (steps + 1) * chunk_size;
500 memset(mr.buffer, 1 +
node_rank, buffer_size);
501 memset(mr.buffer,
node_rank * 10 + 10, chunk_size);
502 mr.buffer[buffer_size - 1] = 0;
504 auto mqp = make_shared<manager_queue_pair>();
506 qp->
post_recv(mr, chunk_size, chunk_size, 125, mtype_done);
510 for(
int i = 1; i < steps; i++) {
511 t.append_recv(qp, mr, (i + 1) * chunk_size, chunk_size);
513 for(
int i = 0; i < steps; i++) {
514 t.append_send(qp, mr, i * chunk_size, chunk_size, 0);
516 for(
int i = 0; i < steps; i++) {
517 t.append_enable_send(qp, i + 1);
518 t.append_wait(qp.
rcq, i + 1,
false,
false, 0x321000 + i, mtype_done);
520 t.append_wait(qp.
scq, 0,
true,
true, 0x6000000, mtype_done);
537 puts(
"FAILURE: must run with at least 3 nodes");
540 puts(
"Node 0 exiting...");
544 std::this_thread::sleep_for(std::chrono::seconds(1));
546 vector<uint32_t> members;
547 for(uint32_t i = 0; i <
num_nodes; i++) {
548 members.push_back(i);
551 puts(
"Starting test...");
556 puts(
"FAILURE: incoming message called");
559 [&](
char *data, size_t) { puts(
"FAILURE: received message called"); },
564 puts(
"FAILURE: Managed to create group containing failed node");
566 printf(
"time taken: %f ms\n", t * 1e-6);
571 #define ANSI_COLOR_RED "\x1b[31m" 572 #define ANSI_COLOR_GREEN "\x1b[32m" 573 #define ANSI_COLOR_YELLOW "\x1b[33m" 574 #define ANSI_COLOR_BLUE "\x1b[34m" 575 #define ANSI_COLOR_MAGENTA "\x1b[35m" 576 #define ANSI_COLOR_CYAN "\x1b[36m" 577 #define ANSI_COLOR_RESET "\x1b[0m" 581 for(
size_t group_size = 2; group_size <= 64; group_size++) {
582 for(
size_t message_size = 1; message_size <= 32; message_size++) {
583 size_t total_steps = message_size + ceil(log2(group_size)) - 1;
584 for(
unsigned int step = 0; step < total_steps; step++) {
585 for(
unsigned int node = 0; node < group_size; node++) {
588 node, step, group_size, floor(log2(group_size)),
589 message_size, total_steps);
595 transfer->target, step, group_size,
596 floor(log2(group_size)), message_size, total_steps);
600 if(!reverse)
throw false;
601 if(transfer->block_number != reverse->block_number)
607 for(
int s = step - 1; s >= 0; s--) {
610 floor(log2(group_size)), message_size,
613 if(prev && prev->block_number == transfer->block_number)
625 node, step, group_size, floor(log2(group_size)),
626 message_size, total_steps);
632 transfer->target, step, group_size,
633 floor(log2(group_size)), message_size, total_steps);
635 if(!reverse)
throw false;
636 if(transfer->block_number != reverse->block_number)
638 if(reverse->target != node)
throw false;
642 for(
int s = step - 1; s >= 0; s--) {
644 node, s, group_size, floor(log2(group_size)),
645 message_size, total_steps);
647 if(prev && prev->block_number == transfer->block_number) {
656 for(
unsigned int node = 1; node < group_size; node++) {
658 for(
unsigned int step = 0; step < total_steps; step++) {
660 node, step, group_size, floor(log2(group_size)),
661 message_size, total_steps);
664 if(transfer) blocks.insert(transfer->block_number);
666 if(blocks.size() != message_size)
throw false;
671 printf(
"average time = %f ns\n", (
double)diff / n);
681 if(argc >= 2 && strcmp(argv[1],
"test_pattern") == 0) {
684 }
else if(argc >= 2 && strcmp(argv[1],
"spin") == 0) {
685 volatile bool b =
true;
691 LOG_EVENT(-1, -1, -1,
"querying_addresses");
692 map<uint32_t, string> addresses;
693 rdmc::query_addresses(addresses,
node_rank);
699 LOG_EVENT(-1, -1, -1,
"creating_barrier_group");
700 vector<uint32_t> members;
701 for(uint32_t i = 0; i <
num_nodes; i++) members.push_back(i);
702 universal_barrier_group = make_unique<rdmc::barrier_group>(members);
713 "Synchronized clocks.\nTotal possible variation = %5.3f us\n" 714 "Max possible variation from local = %5.3f us\n",
715 (t3 - t1) * 1e-3f, max(t2 - t1, t3 - t2) * 1e-3f);
718 TRACE(
"Finished initializing.");
720 printf(
"Experiment Name: %s\n", argv[1]);
721 if(argc <= 1 || strcmp(argv[1],
"custom") == 0) {
722 for(
int i = 0; i < 3; i++) {
725 }
else if(strcmp(argv[1],
"blocksize4") == 0) {
727 }
else if(strcmp(argv[1],
"blocksize16") == 0) {
729 }
else if(strcmp(argv[1],
"sendtypes") == 0) {
731 }
else if(strcmp(argv[1],
"bandwidth") == 0) {
733 }
else if(strcmp(argv[1],
"overhead") == 0) {
735 }
else if(strcmp(argv[1],
"smallsend") == 0) {
737 }
else if(strcmp(argv[1],
"concurrent") == 0) {
739 }
else if(strcmp(argv[1],
"active_senders") == 0) {
741 }
else if(strcmp(argv[1],
"polling_interrupts") == 0) {
744 }
else if(strcmp(argv[1],
"test_create_group_failure") == 0) {
747 }
else if(strcmp(argv[1],
"test_cross_channel") == 0) {
751 puts(
"Unrecognized experiment name.");
755 TRACE(
"About to trigger shutdown");
757 universal_barrier_group.reset();
bool initialize(const std::map< uint32_t, std::pair< ip_addr_t, uint16_t >> &addresses, uint32_t node_rank) __attribute__((warn_unused_result))
partial_wrapped< Tag, Ret, NewClass, Args... > tag(Ret(NewClass::*fun)(Args...))
User-facing entry point for the series of functions that binds a FunctionTag to a class's member func...
void active_senders(bool interrupts=false, bool labels=true)
double compute_stddev(std::vector< double > v)
#define LOG_EVENT(group_number, message_number, block_number, event_name)
static optional< block_transfer > get_incoming_transfer(uint32_t node, size_t step, uint32_t num_members, unsigned int log2_num_members, size_t num_blocks, size_t total_steps)
uint16_t next_group_number
void test_create_group_failure()
double compute_mean(std::vector< double > v)
unique_ptr< rdmc::barrier_group > universal_barrier_group
void destroy_group(uint16_t group_number)
bool send(uint16_t group_number, std::shared_ptr< rdma::memory_region > mr, size_t offset, size_t length) __attribute__((warn_unused_result))
Contains functions and classes for low-level RDMA operations, such as setting up memory regions and q...
void concurrent_bandwidth_group_size()
void latency_group_size()
int main(int argc, char *argv[])
void test_cross_channel()
send_stats measure_multicast(size_t size, size_t block_size, uint32_t group_size, size_t iterations, rdmc::send_algorithm type=rdmc::BINOMIAL_SEND, bool use_cv=true)
void compare_send_types()
uint64_t get_process_time()
bool post_recv(const memory_region &mr, size_t offset, size_t length, uint64_t wr_id, const message_type &type)
static optional< block_transfer > get_outgoing_transfer(uint32_t node, size_t step, uint32_t num_members, unsigned int log2_num_members, size_t num_blocks, size_t total_steps)
bool create_group(uint16_t group_number, std::vector< uint32_t > members, size_t block_size, send_algorithm algorithm, incoming_message_callback_t incoming_receive, completion_callback_t send_callback, failure_callback_t failure_callback) __attribute__((warn_unused_result))
Creates a new RDMC group.
void blocksize_v_bandwidth(uint16_t gsize)
void bandwidth_group_size()
send_stats measure_partially_concurrent_multicast(size_t size, size_t block_size, uint32_t group_size, uint32_t num_senders, size_t iterations, rdmc::send_algorithm type=rdmc::BINOMIAL_SEND, bool use_cv=true)