Derecho  0.9
Distributed systems toolkit for RDMA
verbs_helper.cpp
Go to the documentation of this file.
1 #include <atomic>
2 #include <cstring>
3 #include <fcntl.h>
4 #include <iostream>
5 #include <list>
6 #include <mutex>
7 #include <poll.h>
8 #include <thread>
9 #include <vector>
10 
11 #include "derecho/derecho_ports.h"
12 #include "tcp/tcp.h"
13 #include "util.h"
14 #include "verbs_helper.h"
15 
16 #error "Verbs implementation is obsolete. Compilation stopped."
17 
18 extern "C" {
19 #include <infiniband/verbs.h>
20 }
21 
22 #ifdef INFINIBAND_VERBS_EXP_H
23 #define MELLANOX_EXPERIMENTAL_VERBS
24 #endif
25 
26 using namespace std;
27 
28 namespace rdma {
29 
30 struct config_t {
31  const char *dev_name; // IB device name
32  int ib_port = 1; // local IB port to work with
33  int gid_idx = 0; // gid index to use
34 };
35 
36 // structure to exchange data which is needed to connect the QPs
37 struct cm_con_data_t {
38  uint32_t qp_num; // QP number
39  uint16_t lid; // LID of the IB port
40  uint8_t gid[16]; // gid
41 } __attribute__((packed));
42 
43 // sockets for each connection
44 static map<uint32_t, tcp::socket> sockets;
45 
46 // listener to detect new incoming connections
47 static unique_ptr<tcp::connection_listener> connection_listener;
48 
49 static config_t local_config;
50 
51 // structure of system resources
52 struct ibv_resources {
53  ibv_device_attr device_attr; // Device attributes
54  ibv_port_attr port_attr; // IB port attributes
55  ibv_context *ib_ctx; // device handle
56  ibv_pd *pd; // PD handle
57  ibv_cq *cq; // CQ handle
58  ibv_comp_channel *cc; // Completion channel
60 
63  completion_handler recv;
64  completion_handler write;
65  string name;
66 };
67 static vector<completion_handler_set> completion_handlers;
68 static std::mutex completion_handlers_mutex;
69 
70 static atomic<bool> interrupt_mode;
71 static atomic<bool> contiguous_memory_mode;
72 
73 static feature_set supported_features;
74 
75 static atomic<bool> polling_loop_shutdown_flag;
76 static void polling_loop() {
77  pthread_setname_np(pthread_self(), "rdmc_poll");
78  TRACE("Spawned main loop");
79 
80  const int max_work_completions = 1024;
81  unique_ptr<ibv_wc[]> work_completions(new ibv_wc[max_work_completions]);
82 
83  while(true) {
84  int num_completions = 0;
85  while(num_completions == 0) {
86  if(polling_loop_shutdown_flag) return;
87  uint64_t poll_end = get_time() + (interrupt_mode ? 0L : 50000000L);
88  do {
89  if(polling_loop_shutdown_flag) return;
90  num_completions = ibv_poll_cq(verbs_resources.cq, max_work_completions,
91  work_completions.get());
92  } while(num_completions == 0 && get_time() < poll_end);
93 
94  if(num_completions == 0) {
95  if(ibv_req_notify_cq(verbs_resources.cq, 0))
96  throw rdma::exception();
97 
98  num_completions = ibv_poll_cq(verbs_resources.cq, max_work_completions,
99  work_completions.get());
100 
101  if(num_completions == 0) {
102  pollfd file_descriptor;
103  file_descriptor.fd = verbs_resources.cc->fd;
104  file_descriptor.events = POLLIN;
105  file_descriptor.revents = 0;
106  int rc = 0;
107  while(rc == 0 && !polling_loop_shutdown_flag) {
108  if(polling_loop_shutdown_flag) return;
109  rc = poll(&file_descriptor, 1, 50);
110  }
111 
112  if(rc > 0) {
113  ibv_cq *ev_cq;
114  void *ev_ctx;
115  ibv_get_cq_event(verbs_resources.cc, &ev_cq, &ev_ctx);
116  ibv_ack_cq_events(ev_cq, 1);
117  }
118  }
119  }
120  }
121 
122  if(num_completions < 0) { // Negative indicates an IBV error.
123  fprintf(stderr, "Failed to poll completion queue.");
124  continue;
125  }
126 
127  std::lock_guard<std::mutex> l(completion_handlers_mutex);
128  for(int i = 0; i < num_completions; i++) {
129  ibv_wc &wc = work_completions[i];
130 
131  if(wc.status == 5) continue; // Queue Flush
132  if(wc.status != 0) {
133  string opcode = "[unknown]";
134  if(wc.opcode == IBV_WC_SEND) opcode = "IBV_WC_SEND";
135  if(wc.opcode == IBV_WC_RECV) opcode = "IBV_WC_RECV";
136  if(wc.opcode == IBV_WC_RDMA_WRITE) opcode = "IBV_WC_RDMA_WRITE";
137 
138  // Failed operation
139  printf("wc.status = %d; wc.wr_id = 0x%llx; imm = 0x%x; "
140  "opcode = %s\n",
141  (int)wc.status, (long long)wc.wr_id,
142  (unsigned int)wc.imm_data, opcode.c_str());
143  fflush(stdout);
144  }
145 
146  message_type::tag_type type = wc.wr_id >> message_type::shift_bits;
147  if(type == std::numeric_limits<message_type::tag_type>::max())
148  continue;
149 
150  uint64_t masked_wr_id = wc.wr_id & 0x00ffffffffffffff;
151  if(type >= completion_handlers.size()) {
152  // Unrecognized message type
153  } else if(wc.status != 0) {
154  // Failed operation
155  } else if(wc.opcode == IBV_WC_SEND) {
156  completion_handlers[type].send(masked_wr_id, wc.imm_data,
157  wc.byte_len);
158  } else if(wc.opcode == IBV_WC_RECV) {
159  completion_handlers[type].recv(masked_wr_id, wc.imm_data,
160  wc.byte_len);
161  } else if(wc.opcode == IBV_WC_RDMA_WRITE) {
162  completion_handlers[type].write(masked_wr_id, wc.imm_data,
163  wc.byte_len);
164  } else {
165  puts("Sent unrecognized completion type?!");
166  }
167  }
168  }
169 }
170 
171 static int modify_qp_to_init(struct ibv_qp *qp, int ib_port) {
172  struct ibv_qp_attr attr;
173  int flags;
174  int rc;
175  memset(&attr, 0, sizeof(attr));
176  attr.qp_state = IBV_QPS_INIT;
177  attr.port_num = ib_port;
178  attr.pkey_index = 0;
179  attr.qp_access_flags = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE;
180  flags = IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_ACCESS_FLAGS;
181  rc = ibv_modify_qp(qp, &attr, flags);
182  if(rc) fprintf(stderr, "failed to modify QP state to INIT\n");
183  return rc;
184 }
185 
186 static int modify_qp_to_rtr(struct ibv_qp *qp, uint32_t remote_qpn,
187  uint16_t dlid, uint8_t *dgid, int ib_port,
188  int gid_idx) {
189  struct ibv_qp_attr attr;
190  int flags;
191  int rc;
192  memset(&attr, 0, sizeof(attr));
193  attr.qp_state = IBV_QPS_RTR;
194  attr.path_mtu = verbs_resources.port_attr.active_mtu;
195  attr.dest_qp_num = remote_qpn;
196  attr.rq_psn = 0;
197  attr.max_dest_rd_atomic = 1;
198  attr.min_rnr_timer = 16;
199  attr.ah_attr.is_global = 1;
200  attr.ah_attr.dlid = dlid;
201  attr.ah_attr.sl = 0;
202  attr.ah_attr.src_path_bits = 0;
203  attr.ah_attr.port_num = ib_port;
204  if(gid_idx >= 0) {
205  attr.ah_attr.is_global = 1;
206  attr.ah_attr.port_num = 1;
207  memcpy(&attr.ah_attr.grh.dgid, dgid, 16);
208  attr.ah_attr.grh.flow_label = 0;
209  attr.ah_attr.grh.hop_limit = 0xFF;
210  attr.ah_attr.grh.sgid_index = gid_idx;
211  attr.ah_attr.grh.traffic_class = 0;
212  }
213  flags = IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU | IBV_QP_DEST_QPN | IBV_QP_RQ_PSN | IBV_QP_MAX_DEST_RD_ATOMIC | IBV_QP_MIN_RNR_TIMER;
214  rc = ibv_modify_qp(qp, &attr, flags);
215  if(rc) fprintf(stderr, "failed to modify QP state to RTR\n");
216  return rc;
217 }
218 
219 static int modify_qp_to_rts(struct ibv_qp *qp) {
220  struct ibv_qp_attr attr;
221  int flags;
222  int rc;
223  memset(&attr, 0, sizeof(attr));
224  attr.qp_state = IBV_QPS_RTS;
225  attr.timeout = 4;
226  attr.retry_cnt = 6;
227  attr.rnr_retry = 6;
228  attr.sq_psn = 0;
229  attr.max_rd_atomic = 1;
230  flags = IBV_QP_STATE | IBV_QP_TIMEOUT | IBV_QP_RETRY_CNT | IBV_QP_RNR_RETRY | IBV_QP_SQ_PSN | IBV_QP_MAX_QP_RD_ATOMIC;
231  rc = ibv_modify_qp(qp, &attr, flags);
232  if(rc) fprintf(stderr, "failed to modify QP state to RTS. ERRNO=%d\n", rc);
233  return rc;
234 }
235 
236 namespace impl {
238  if(verbs_resources.cq && ibv_destroy_cq(verbs_resources.cq)) {
239  fprintf(stderr, "failed to destroy CQ\n");
240  }
241  if(verbs_resources.cc && ibv_destroy_comp_channel(verbs_resources.cc)) {
242  fprintf(stderr, "failed to destroy Completion Channel\n");
243  }
244  if(verbs_resources.pd && ibv_dealloc_pd(verbs_resources.pd)) {
245  fprintf(stderr, "failed to deallocate PD\n");
246  }
247  if(verbs_resources.ib_ctx && ibv_close_device(verbs_resources.ib_ctx)) {
248  fprintf(stderr, "failed to close device context\n");
249  }
250 }
251 
252  bool verbs_initialize(const map<uint32_t, std::pair<ip_addr_t, uint16_t>> &ip_addrs_and_ports,
253  uint32_t node_rank) {
254  memset(&verbs_resources, 0, sizeof(verbs_resources));
255 
256  connection_listener = make_unique<tcp::connection_listener>(derecho::rdmc_tcp_port);
257 
258  TRACE("Starting connection phase");
259 
260  // Connect to other nodes in group. Since map traversal is ordered, we don't
261  // have to worry about circular waits, so deadlock can't occur.
262  for(auto it = ip_addrs_and_ports.begin(); it != ip_addrs_and_ports.end(); it++) {
263  if(it->first != node_rank) {
264  if(!verbs_add_connection(it->first, it->second, node_rank)) {
265  fprintf(stderr, "WARNING: failed to connect to node %d at %s\n",
266  (int)it->first, it->second.c_str());
267  }
268  }
269  }
270  TRACE("Done connecting");
271 
272  auto res = &verbs_resources;
273 
274  ibv_device **dev_list = NULL;
275  ibv_device *ib_dev = NULL;
276  int i;
277  int cq_size = 0;
278  int num_devices = 0;
279 
280  fprintf(stdout, "searching for IB devices in host\n");
281  /* get device names in the system */
282  dev_list = ibv_get_device_list(&num_devices);
283  if(!dev_list) {
284  fprintf(stderr, "failed to get IB devices list\n");
285  goto resources_create_exit;
286  }
287  /* if there isn't any IB device in host */
288  if(!num_devices) {
289  fprintf(stderr, "found %d device(s)\n", num_devices);
290  goto resources_create_exit;
291  }
292 
293  local_config.dev_name = getenv("RDMC_DEVICE_NAME");
294  fprintf(stdout, "found %d device(s)\n", num_devices);
295  /* search for the specific device we want to work with */
296  for(i = 1; i < num_devices; i++) {
297  if(!local_config.dev_name) {
298  local_config.dev_name = strdup(ibv_get_device_name(dev_list[i]));
299  fprintf(stdout, "device not specified, using first one found: %s\n",
300  local_config.dev_name);
301  }
302  if(!strcmp(ibv_get_device_name(dev_list[i]), local_config.dev_name)) {
303  ib_dev = dev_list[i];
304  break;
305  }
306  }
307  /* if the device wasn't found in host */
308  if(!ib_dev) {
309  fprintf(stderr, "IB device %s wasn't found\n", local_config.dev_name);
310  goto resources_create_exit;
311  }
312  /* get device handle */
313  res->ib_ctx = ibv_open_device(ib_dev);
314  if(!res->ib_ctx) {
315  fprintf(stderr, "failed to open device %s\n", local_config.dev_name);
316  goto resources_create_exit;
317  }
318  /* We are now done with device list, free it */
319  ibv_free_device_list(dev_list);
320  dev_list = NULL;
321  ib_dev = NULL;
322  /* query port properties */
323  if(ibv_query_port(res->ib_ctx, local_config.ib_port, &res->port_attr)) {
324  fprintf(stderr, "ibv_query_port on port %u failed\n",
325  local_config.ib_port);
326  goto resources_create_exit;
327  }
328  /* allocate Protection Domain */
329  res->pd = ibv_alloc_pd(res->ib_ctx);
330  if(!res->pd) {
331  fprintf(stderr, "ibv_alloc_pd failed\n");
332  goto resources_create_exit;
333  }
334 
335  res->cc = ibv_create_comp_channel(res->ib_ctx);
336  if(!res->cc) {
337  fprintf(stderr, "ibv_create_comp_channel failed\n");
338  goto resources_create_exit;
339  }
340 
341  if(fcntl(res->cc->fd, F_SETFL, fcntl(res->cc->fd, F_GETFL) | O_NONBLOCK)) {
342  fprintf(stderr,
343  "failed to change file descriptor for completion channel\n");
344  goto resources_create_exit;
345  }
346 
347  cq_size = 1024;
348  res->cq = ibv_create_cq(res->ib_ctx, cq_size, NULL, res->cc, 0);
349  if(!res->cq) {
350  fprintf(stderr, "failed to create CQ with %u entries\n", cq_size);
351  goto resources_create_exit;
352  }
353 
354  set_interrupt_mode(false);
356 
357  // Initialize the ignored message type.
358  (void)message_type::ignored();
359 
360 // Detect experimental features
361 #ifdef MELLANOX_EXPERIMENTAL_VERBS
362  {
363  supported_features.contiguous_memory = true;
364 
365  ibv_exp_device_attr attr;
366  attr.comp_mask = 0;
367  int ret = ibv_exp_query_device(res->ib_ctx, &attr);
368  if(ret == 0) {
369  supported_features.cross_channel = attr.exp_device_cap_flags & IBV_EXP_DEVICE_CROSS_CHANNEL;
370  }
371  }
372 #endif
373 
374  {
375  thread t(polling_loop);
376  t.detach();
377  }
378 
379  TRACE("verbs_initialize() - SUCCESS");
380  return true;
381 resources_create_exit:
382  TRACE("verbs_initialize() - ERROR!!!!!!!!!!!!!!");
383  if(res->cq) {
384  ibv_destroy_cq(res->cq);
385  res->cq = NULL;
386  }
387  if(res->cq) {
388  ibv_destroy_comp_channel(res->cc);
389  res->cc = NULL;
390  }
391  if(res->pd) {
392  ibv_dealloc_pd(res->pd);
393  res->pd = NULL;
394  }
395  if(res->ib_ctx) {
396  ibv_close_device(res->ib_ctx);
397  res->ib_ctx = NULL;
398  }
399  if(dev_list) {
400  ibv_free_device_list(dev_list);
401  dev_list = NULL;
402  }
403  return false;
404 }
405 bool verbs_add_connection(uint32_t index, const string &address,
406  uint32_t node_rank) {
407  if(index < node_rank) {
408  if(sockets.count(index) > 0) {
409  fprintf(stderr,
410  "WARNING: attempted to connect to node %u at %s:%d but we "
411  "already have a connection to a node with that index.",
412  (unsigned int)index, address.c_str(), derecho::rdmc_tcp_port);
413  return false;
414  }
415 
416  try {
417  sockets[index] = tcp::socket(address, derecho::rdmc_tcp_port);
418  } catch(tcp::exception) {
419  fprintf(stderr, "WARNING: failed to node %u at %s:%d",
420  (unsigned int)index, address.c_str(), derecho::rdmc_tcp_port);
421  return false;
422  }
423 
424  // Make sure that the connection works, and that we've connected to the
425  // right node.
426  uint32_t remote_rank = 0;
427  if(!sockets[index].exchange(node_rank, remote_rank)) {
428  fprintf(stderr,
429  "WARNING: failed to exchange rank with node %u at %s:%d",
430  (unsigned int)index, address.c_str(), derecho::rdmc_tcp_port);
431  sockets.erase(index);
432  return false;
433  } else if(remote_rank != index) {
434  fprintf(stderr,
435  "WARNING: node at %s:%d replied with wrong rank (expected"
436  "%d but got %d)",
437  address.c_str(), derecho::rdmc_tcp_port, (unsigned int)index,
438  (unsigned int)remote_rank);
439 
440  sockets.erase(index);
441  return false;
442  }
443  return true;
444  } else if(index > node_rank) {
445  try {
446  tcp::socket s = connection_listener->accept();
447 
448  uint32_t remote_rank = 0;
449  if(!s.exchange(node_rank, remote_rank)) {
450  fprintf(stderr, "WARNING: failed to exchange rank with node");
451  return false;
452  } else {
453  sockets[remote_rank] = std::move(s);
454  return true;
455  }
456  } catch(tcp::exception) {
457  fprintf(stderr, "Got error while attempting to listing on port");
458  return false;
459  }
460  }
461 
462  return false; // we can't connect to ourselves...
463 }
464 bool verbs_remove_connection(uint32_t index) {
465  return sockets.erase(index) > 0;
466 }
467 bool set_interrupt_mode(bool enabled) {
468  interrupt_mode = enabled;
469  return true;
470 }
471 bool set_contiguous_memory_mode(bool enabled) {
472 #ifdef MELLANOX_EXPERIMENTAL_VERBS
473  contiguous_memory_mode = enabled;
474  return true;
475 #else
476  return false;
477 #endif
478 }
479 } // namespace impl
480 
481 using ibv_mr_unique_ptr = unique_ptr<ibv_mr, std::function<void(ibv_mr *)>>;
482 static ibv_mr_unique_ptr create_mr(char *buffer, size_t size) {
483  if(!buffer || size == 0) throw rdma::invalid_args();
484 
485  int mr_flags = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE;
486 
488  ibv_reg_mr(verbs_resources.pd, (void *)buffer, size, mr_flags),
489  [](ibv_mr *m) { ibv_dereg_mr(m); });
490 
491  if(!mr) {
493  }
494  return mr;
495 }
496 #ifdef MELLANOX_EXPERIMENTAL_VERBS
497 static ibv_mr_unique_ptr create_contiguous_mr(size_t size) {
498  if(size == 0) throw rdma::invalid_args();
499 
500  ibv_exp_reg_mr_in in;
501  in.pd = verbs_resources.pd;
502  in.addr = 0;
503  in.length = size;
504  in.exp_access = IBV_EXP_ACCESS_LOCAL_WRITE | IBV_EXP_ACCESS_REMOTE_READ | IBV_EXP_ACCESS_REMOTE_WRITE | IBV_EXP_ACCESS_ALLOCATE_MR;
505  in.create_flags = IBV_EXP_REG_MR_CREATE_CONTIG;
506  in.comp_mask = IBV_EXP_REG_MR_CREATE_FLAGS;
507  ibv_mr_unique_ptr mr = ibv_mr_unique_ptr(ibv_exp_reg_mr(&in),
508  [](ibv_mr *m) { ibv_dereg_mr(m); });
509  if(!mr) {
511  }
512  return mr;
513 }
514 memory_region::memory_region(size_t s, bool contiguous)
515  : mr(contiguous ? create_contiguous_mr(s) : create_mr(new char[s], s)),
516  buffer((char *)mr->addr),
517  size(s) {
518  if(contiguous) {
519  memset(buffer, 0, size);
520  } else {
521  allocated_buffer.reset(buffer);
522  }
523 }
524 #else
525 memory_region::memory_region(size_t s, bool contiguous) : memory_region(new char[s], s) {
526  allocated_buffer.reset(buffer);
527 }
528 #endif
529 
530 memory_region::memory_region(size_t s) : memory_region(s, contiguous_memory_mode) {}
531 memory_region::memory_region(char *buf, size_t s) : mr(create_mr(buf, s)), buffer(buf), size(s) {}
532 
533 uint32_t memory_region::get_rkey() const { return mr->rkey; }
534 
536  ibv_cq *cq_ptr = nullptr;
537  if(!cross_channel) {
538  cq_ptr = ibv_create_cq(verbs_resources.ib_ctx, 1024, nullptr, nullptr, 0);
539  } else {
540 #ifdef MELLANOX_EXPERIMENTAL_VERBS
541  ibv_exp_cq_init_attr attr;
542  attr.comp_mask = IBV_EXP_CQ_INIT_ATTR_FLAGS;
543  attr.flags = IBV_EXP_CQ_CREATE_CROSS_CHANNEL;
544  cq_ptr = ibv_exp_create_cq(verbs_resources.ib_ctx, 1024, nullptr,
545  nullptr, 0, &attr);
546 
547  ibv_exp_cq_attr mod_attr;
548  mod_attr.comp_mask = IBV_EXP_CQ_ATTR_CQ_CAP_FLAGS;
549  mod_attr.cq_cap_flags = IBV_EXP_CQ_IGNORE_OVERRUN;
550  ibv_exp_modify_cq(cq_ptr, &mod_attr, IBV_EXP_CQ_CAP_FLAGS);
551 #else
552  throw invalid_args();
553 #endif
554  }
555  if(!cq_ptr) {
556  throw cq_creation_failure();
557  }
558 
559  cq = decltype(cq)(cq_ptr, [](ibv_cq *q) { ibv_destroy_cq(q); });
560 }
561 
563  // if(qp) cout << "Destroying Queue Pair..." << endl;
564 }
565 queue_pair::queue_pair(size_t remote_index)
566  : queue_pair(remote_index, [](queue_pair *) {}) {}
567 
568 // The post_recvs lambda will be called before queue_pair creation completes on
569 // either end of the connection. This enables the user to avoid race conditions
570 // between post_send() and post_recv().
571 queue_pair::queue_pair(size_t remote_index,
572  std::function<void(queue_pair *)> post_recvs) {
573  auto it = sockets.find(remote_index);
574  if(it == sockets.end()) throw rdma::invalid_args();
575 
576  auto &sock = it->second;
577 
578  ibv_qp_init_attr qp_init_attr;
579  memset(&qp_init_attr, 0, sizeof(qp_init_attr));
580  qp_init_attr.qp_type = IBV_QPT_RC;
581  qp_init_attr.sq_sig_all = 1;
582  qp_init_attr.send_cq = verbs_resources.cq;
583  qp_init_attr.recv_cq = verbs_resources.cq;
584  qp_init_attr.cap.max_send_wr = 16;
585  qp_init_attr.cap.max_recv_wr = 16;
586  qp_init_attr.cap.max_send_sge = 1;
587  qp_init_attr.cap.max_recv_sge = 1;
588 
589  qp = unique_ptr<ibv_qp, std::function<void(ibv_qp *)>>(
590  ibv_create_qp(verbs_resources.pd, &qp_init_attr),
591  [](ibv_qp *q) { ibv_destroy_qp(q); });
592 
593  if(!qp) {
594  fprintf(stderr, "failed to create QP\n");
596  }
597 
598  struct cm_con_data_t local_con_data;
599  struct cm_con_data_t remote_con_data;
600  memset(&local_con_data, 0, sizeof(local_con_data));
601  memset(&remote_con_data, 0, sizeof(remote_con_data));
602  union ibv_gid my_gid;
603 
604  if(local_config.gid_idx >= 0) {
605  int rc = ibv_query_gid(verbs_resources.ib_ctx, local_config.ib_port,
606  local_config.gid_idx, &my_gid);
607  if(rc) {
608  fprintf(stderr, "could not get gid for port %d, index %d\n",
609  local_config.ib_port, local_config.gid_idx);
610  return;
611  }
612  } else {
613  memset(&my_gid, 0, sizeof my_gid);
614  }
615 
616  /* exchange using TCP sockets info required to connect QPs */
617  local_con_data.qp_num = qp->qp_num;
618  local_con_data.lid = verbs_resources.port_attr.lid;
619  memcpy(local_con_data.gid, &my_gid, 16);
620  // fprintf(stdout, "Local QP number = 0x%x\n", qp->qp_num);
621  // fprintf(stdout, "Local LID = 0x%x\n",
622  // verbs_resources.port_attr.lid);
623 
624  if(!sock.exchange(local_con_data, remote_con_data))
626 
627  bool success = !modify_qp_to_init(qp.get(), local_config.ib_port) && !modify_qp_to_rtr(qp.get(), remote_con_data.qp_num, remote_con_data.lid, remote_con_data.gid, local_config.ib_port, local_config.gid_idx) && !modify_qp_to_rts(qp.get());
628 
629  if(!success) printf("Failed to initialize QP\n");
630 
631  post_recvs(this);
632 
633  /* sync to make sure that both sides are in states that they can connect to
634  * prevent packet loss */
635  /* just send a dummy char back and forth */
636  int tmp = -1;
637  if(!sock.exchange(0, tmp) || tmp != 0) throw rdma::qp_creation_failure();
638 }
639 bool queue_pair::post_send(const memory_region &mr, size_t offset,
640  size_t length, uint64_t wr_id, uint32_t immediate,
641  const message_type &type) {
642  if(mr.size < offset + length || wr_id >> type.shift_bits || !type.tag)
643  throw invalid_args();
644 
645  ibv_send_wr sr;
646  ibv_sge sge;
647  ibv_send_wr *bad_wr = NULL;
648 
649  // prepare the scatter/gather entry
650  memset(&sge, 0, sizeof(sge));
651  sge.addr = (uintptr_t)(mr.buffer + offset);
652  sge.length = length;
653  sge.lkey = mr.mr->lkey;
654 
655  // prepare the send work request
656  memset(&sr, 0, sizeof(sr));
657  sr.next = NULL;
658  sr.wr_id = wr_id | ((uint64_t)*type.tag << type.shift_bits);
659  sr.imm_data = immediate;
660  sr.sg_list = &sge;
661  sr.num_sge = 1;
662  sr.opcode = IBV_WR_SEND_WITH_IMM;
663  sr.send_flags = IBV_SEND_SIGNALED; // | IBV_SEND_INLINE;
664 
665  if(ibv_post_send(qp.get(), &sr, &bad_wr)) {
666  fprintf(stderr, "failed to post SR\n");
667  return false;
668  }
669  return true;
670 }
671 bool queue_pair::post_empty_send(uint64_t wr_id, uint32_t immediate,
672  const message_type &type) {
673  if(wr_id >> type.shift_bits || !type.tag) throw invalid_args();
674 
675  ibv_send_wr sr;
676  ibv_send_wr *bad_wr = NULL;
677 
678  // prepare the send work request
679  memset(&sr, 0, sizeof(sr));
680  sr.next = NULL;
681  sr.wr_id = wr_id | ((uint64_t)*type.tag << type.shift_bits);
682  sr.imm_data = immediate;
683  sr.sg_list = NULL;
684  sr.num_sge = 0;
685  sr.opcode = IBV_WR_SEND_WITH_IMM;
686  sr.send_flags = IBV_SEND_SIGNALED; // | IBV_SEND_INLINE;
687 
688  if(ibv_post_send(qp.get(), &sr, &bad_wr)) {
689  fprintf(stderr, "failed to post SR\n");
690  return false;
691  }
692  return true;
693 }
694 
695 bool queue_pair::post_recv(const memory_region &mr, size_t offset,
696  size_t length, uint64_t wr_id,
697  const message_type &type) {
698  if(mr.size < offset + length || wr_id >> type.shift_bits || !type.tag)
699  throw invalid_args();
700 
701  ibv_recv_wr rr;
702  ibv_sge sge;
703  ibv_recv_wr *bad_wr;
704 
705  // prepare the scatter/gather entry
706  memset(&sge, 0, sizeof(sge));
707  sge.addr = (uintptr_t)(mr.buffer + offset);
708  sge.length = length;
709  sge.lkey = mr.mr->lkey;
710 
711  // prepare the receive work request
712  memset(&rr, 0, sizeof(rr));
713  rr.next = NULL;
714  rr.wr_id = wr_id | ((uint64_t)*type.tag << type.shift_bits);
715  rr.sg_list = &sge;
716  rr.num_sge = 1;
717 
718  if(ibv_post_recv(qp.get(), &rr, &bad_wr)) {
719  fprintf(stderr, "failed to post RR\n");
720  fflush(stdout);
721  return false;
722  }
723  return true;
724 }
725 bool queue_pair::post_empty_recv(uint64_t wr_id, const message_type &type) {
726  if(wr_id >> type.shift_bits || !type.tag) throw invalid_args();
727 
728  ibv_recv_wr rr;
729  ibv_recv_wr *bad_wr;
730 
731  // prepare the receive work request
732  memset(&rr, 0, sizeof(rr));
733  rr.next = NULL;
734  rr.wr_id = wr_id | ((uint64_t)*type.tag << type.shift_bits);
735  rr.sg_list = NULL;
736  rr.num_sge = 0;
737 
738  if(ibv_post_recv(qp.get(), &rr, &bad_wr)) {
739  fprintf(stderr, "failed to post RR\n");
740  fflush(stdout);
741  return false;
742  }
743  return true;
744 }
745 bool queue_pair::post_write(const memory_region &mr, size_t offset,
746  size_t length, uint64_t wr_id,
747  remote_memory_region remote_mr,
748  size_t remote_offset, const message_type &type,
749  bool signaled, bool send_inline) {
750  if(wr_id >> type.shift_bits || !type.tag) throw invalid_args();
751  if(mr.size < offset + length || remote_mr.size < remote_offset + length) {
752  cout << "mr.size = " << mr.size << " offset = " << offset
753  << " length = " << length << " remote_mr.size = " << remote_mr.size
754  << " remote_offset = " << remote_offset;
755  return false;
756  }
757 
758  ibv_send_wr sr;
759  ibv_sge sge;
760  ibv_send_wr *bad_wr = NULL;
761 
762  // prepare the scatter/gather entry
763  memset(&sge, 0, sizeof(sge));
764  sge.addr = (uintptr_t)(mr.buffer + offset);
765  sge.length = length;
766  sge.lkey = mr.mr->lkey;
767 
768  // prepare the send work request
769  memset(&sr, 0, sizeof(sr));
770  sr.next = NULL;
771  sr.wr_id = wr_id | ((uint64_t)*type.tag << type.shift_bits);
772  sr.sg_list = &sge;
773  sr.num_sge = 1;
774  sr.opcode = IBV_WR_RDMA_WRITE;
775  sr.send_flags = (signaled ? IBV_SEND_SIGNALED : 0) | (send_inline ? IBV_SEND_INLINE : 0);
776  sr.wr.rdma.remote_addr = remote_mr.buffer + remote_offset;
777  sr.wr.rdma.rkey = remote_mr.rkey;
778 
779  if(ibv_post_send(qp.get(), &sr, &bad_wr)) {
780  fprintf(stderr, "failed to post SR\n");
781  return false;
782  }
783  return true;
784 }
785 
786 #ifdef MELLANOX_EXPERIMENTAL_VERBS
788  size_t remote_index, std::function<void(managed_queue_pair *)> post_recvs)
789  : queue_pair(), scq(true), rcq(true) {
790  auto it = sockets.find(remote_index);
791  if(it == sockets.end()) throw rdma::invalid_args();
792 
793  auto &sock = it->second;
794 
795  ibv_exp_qp_init_attr attr;
796  memset(&attr, 0, sizeof(attr));
797  attr.qp_context = nullptr;
798  attr.send_cq = scq.cq.get();
799  attr.recv_cq = rcq.cq.get();
800  attr.srq = nullptr;
801  attr.cap.max_send_wr = 1024;
802  attr.cap.max_recv_wr = 1024;
803  attr.cap.max_send_sge = 1;
804  attr.cap.max_recv_sge = 1;
805  attr.cap.max_inline_data = 0;
806  attr.qp_type = IBV_QPT_RC;
807  attr.sq_sig_all = 0;
808  attr.comp_mask = IBV_EXP_QP_INIT_ATTR_CREATE_FLAGS | IBV_EXP_QP_INIT_ATTR_PD;
809  attr.pd = verbs_resources.pd;
810  attr.xrcd = nullptr;
811  attr.exp_create_flags = IBV_EXP_QP_CREATE_CROSS_CHANNEL | IBV_EXP_QP_CREATE_MANAGED_SEND;
812  attr.max_inl_recv = 0;
813 
814  qp = decltype(qp)(ibv_exp_create_qp(verbs_resources.ib_ctx, &attr),
815  [](ibv_qp *q) { ibv_destroy_qp(q); });
816 
817  if(!qp) {
818  fprintf(stderr, "failed to create QP, (errno = %s)\n", strerror(errno));
820  }
821 
822  struct cm_con_data_t local_con_data;
823  struct cm_con_data_t remote_con_data;
824  memset(&local_con_data, 0, sizeof(local_con_data));
825  memset(&remote_con_data, 0, sizeof(remote_con_data));
826  union ibv_gid my_gid;
827 
828  if(local_config.gid_idx >= 0) {
829  int rc = ibv_query_gid(verbs_resources.ib_ctx, local_config.ib_port,
830  local_config.gid_idx, &my_gid);
831  if(rc) {
832  fprintf(stderr, "could not get gid for port %d, index %d\n",
833  local_config.ib_port, local_config.gid_idx);
834  return;
835  }
836  } else {
837  memset(&my_gid, 0, sizeof my_gid);
838  }
839 
840  /* exchange using TCP sockets info required to connect QPs */
841  local_con_data.qp_num = qp->qp_num;
842  local_con_data.lid = verbs_resources.port_attr.lid;
843  memcpy(local_con_data.gid, &my_gid, 16);
844  // fprintf(stdout, "Local QP number = 0x%x\n", qp->qp_num);
845  // fprintf(stdout, "Local LID = 0x%x\n",
846  // verbs_resources.port_attr.lid);
847 
848  if(!sock.exchange(local_con_data, remote_con_data))
850 
851  bool success = !modify_qp_to_init(qp.get(), local_config.ib_port) && !modify_qp_to_rtr(qp.get(), remote_con_data.qp_num, remote_con_data.lid, remote_con_data.gid, local_config.ib_port, local_config.gid_idx) && !modify_qp_to_rts(qp.get());
852 
853  if(!success) throw rdma::qp_creation_failure();
854 
855  post_recvs(this);
856 
857  // Sync to make sure that both sides are in states that they can connect to
858  // prevent packet loss.
859  int tmp = -1;
860  if(!sock.exchange(0, tmp) || tmp != 0) throw rdma::qp_creation_failure();
861 }
863  ibv_exp_qp_init_attr attr;
864  memset(&attr, 0, sizeof(attr));
865  attr.qp_context = nullptr;
866  attr.send_cq = verbs_resources.cq;
867  attr.recv_cq = verbs_resources.cq;
868  attr.srq = nullptr;
869  attr.cap.max_send_wr = 1024;
870  attr.cap.max_recv_wr = 0;
871  attr.cap.max_send_sge = 1;
872  attr.cap.max_recv_sge = 1;
873  attr.cap.max_inline_data = 0;
874  attr.qp_type = IBV_QPT_RC;
875  attr.sq_sig_all = 0;
876  attr.comp_mask = IBV_EXP_QP_INIT_ATTR_CREATE_FLAGS | IBV_EXP_QP_INIT_ATTR_PD;
877  attr.pd = verbs_resources.pd;
878  attr.xrcd = nullptr;
879  attr.exp_create_flags = IBV_EXP_QP_CREATE_CROSS_CHANNEL;
880  attr.max_inl_recv = 0;
881 
882  qp = decltype(qp)(ibv_exp_create_qp(verbs_resources.ib_ctx, &attr),
883  [](ibv_qp *q) { ibv_destroy_qp(q); });
884 
885  if(!qp) {
886  fprintf(stderr, "failed to create QP, (errno = %s)\n", strerror(errno));
888  }
889 
890  bool success = !modify_qp_to_init(qp.get(), local_config.ib_port) && !modify_qp_to_rtr(qp.get(), qp->qp_num, 0, nullptr, local_config.ib_port, -1) && !modify_qp_to_rts(qp.get());
891 
892  if(!success) throw rdma::qp_creation_failure();
893 }
894 
895 struct task::task_impl {
896  struct wr_list {
897  size_t start_index;
898  size_t end_index;
899  };
900 
901  std::list<ibv_sge> sges;
902  vector<ibv_recv_wr> recv_wrs;
903  vector<ibv_exp_send_wr> send_wrs;
904 
905  map<ibv_qp *, vector<size_t>> recv_list;
906  map<ibv_qp *, vector<size_t>> send_list;
907  vector<size_t> mqp_list;
908  ibv_qp *mqp;
909 
910  task_impl(ibv_qp *mqp_ptr) : mqp(mqp_ptr) {}
911 };
912 
913 task::task(std::shared_ptr<manager_queue_pair> manager_qp)
914  : impl(new task_impl(manager_qp->qp.get())), mqp(manager_qp) {}
915 task::~task() {}
916 
917 void task::append_wait(const completion_queue &cq, int count, bool signaled,
918  bool last, uint64_t wr_id, const message_type &type) {
919  impl->send_wrs.emplace_back();
920  auto &wr = impl->send_wrs.back();
921  wr.wr_id = wr_id | ((uint64_t)*type.tag << type.shift_bits);
922  wr.sg_list = nullptr;
923  wr.num_sge = 0;
924  wr.exp_opcode = IBV_EXP_WR_CQE_WAIT;
925  wr.exp_send_flags = (signaled ? IBV_SEND_SIGNALED : 0) | (last ? IBV_EXP_SEND_WAIT_EN_LAST : 0);
926  wr.ex.imm_data = 0;
927  wr.task.cqe_wait.cq = cq.cq.get();
928  wr.task.cqe_wait.cq_count = count;
929  wr.comp_mask = 0;
930  wr.next = nullptr;
931  impl->mqp_list.push_back(impl->send_wrs.size() - 1);
932 }
933 void task::append_enable_send(const managed_queue_pair &qp, int count) {
934  impl->send_wrs.emplace_back();
935  auto &wr = impl->send_wrs.back();
936  wr.wr_id = 0xfffffffff1f1f1f1;
937  wr.sg_list = nullptr;
938  wr.num_sge = 0;
939  wr.exp_opcode = IBV_EXP_WR_SEND_ENABLE;
940  wr.exp_send_flags = 0;
941  wr.ex.imm_data = 0;
942  wr.task.wqe_enable.qp = qp.qp.get();
943  wr.task.wqe_enable.wqe_count = count;
944  wr.comp_mask = 0;
945  wr.next = nullptr;
946  impl->mqp_list.push_back(impl->send_wrs.size() - 1);
947 }
948 void task::append_send(const managed_queue_pair &qp, const memory_region &mr,
949  size_t offset, size_t length, uint32_t immediate) {
950  impl->sges.emplace_back();
951  auto &sge = impl->sges.back();
952  sge.addr = (uintptr_t)mr.buffer + offset;
953  sge.length = length;
954  sge.lkey = mr.mr->lkey;
955 
956  impl->send_wrs.emplace_back();
957  auto &wr = impl->send_wrs.back();
958  wr.wr_id = 0xfffffffff2f2f2f2;
959  wr.next = nullptr;
960  wr.sg_list = &impl->sges.back();
961  wr.num_sge = 1;
962  wr.exp_opcode = IBV_EXP_WR_SEND;
963  wr.exp_send_flags = 0;
964  wr.ex.imm_data = immediate;
965  wr.comp_mask = 0;
966  impl->send_list[qp.qp.get()].push_back(impl->send_wrs.size() - 1);
967 }
968 void task::append_recv(const managed_queue_pair &qp, const memory_region &mr,
969  size_t offset, size_t length) {
970  impl->sges.emplace_back();
971  auto &sge = impl->sges.back();
972  sge.addr = (uintptr_t)mr.buffer + offset;
973  sge.length = length;
974  sge.lkey = mr.mr->lkey;
975 
976  impl->recv_wrs.emplace_back();
977  auto &wr = impl->recv_wrs.back();
978  wr.wr_id = 0xfffffffff3f3f3f3;
979  wr.next = nullptr;
980  wr.sg_list = &impl->sges.back();
981  wr.num_sge = 1;
982  impl->recv_list[qp.qp.get()].push_back(impl->recv_wrs.size() - 1);
983 }
984 bool task::post() {
985  size_t num_tasks = 1 + impl->send_list.size() + impl->recv_list.size();
986  auto tasks = make_unique<ibv_exp_task[]>(num_tasks);
987 
988  size_t index = 0;
989  for(auto &&l : impl->recv_list) {
990  for(size_t i = 0; i + 1 < l.second.size(); i++) {
991  impl->recv_wrs[l.second[i]].next = &impl->recv_wrs[l.second[i + 1]];
992  }
993 
994  tasks[index].item.qp = l.first;
995  tasks[index].item.recv_wr = &impl->recv_wrs[l.second.front()];
996  tasks[index].task_type = IBV_EXP_TASK_RECV;
997  tasks[index].next = &tasks[index + 1];
998  tasks[index].comp_mask = 0;
999  ++index;
1000  }
1001  for(auto &&l : impl->send_list) {
1002  for(size_t i = 0; i + 1 < l.second.size(); i++) {
1003  impl->send_wrs[l.second[i]].next = &impl->send_wrs[l.second[i + 1]];
1004  }
1005 
1006  tasks[index].item.qp = l.first;
1007  tasks[index].item.send_wr = &impl->send_wrs[l.second.front()];
1008  tasks[index].task_type = IBV_EXP_TASK_SEND;
1009  tasks[index].next = &tasks[index + 1];
1010  tasks[index].comp_mask = 0;
1011  ++index;
1012  }
1013 
1014  for(size_t i = 0; i + 1 < impl->mqp_list.size(); i++) {
1015  impl->send_wrs[impl->mqp_list[i]].next = &impl->send_wrs[impl->mqp_list[i + 1]];
1016  }
1017 
1018  tasks[index].item.qp = impl->mqp;
1019  tasks[index].item.send_wr = &impl->send_wrs[impl->mqp_list.front()];
1020  tasks[index].task_type = IBV_EXP_TASK_SEND;
1021  tasks[index].next = nullptr;
1022  tasks[index].comp_mask = 0;
1023 
1024  ibv_exp_task *bad = nullptr;
1025  return !ibv_exp_post_task(verbs_resources.ib_ctx, &tasks[0], &bad);
1026 }
1027 
1028 #else
1030  std::function<void(managed_queue_pair *)> post_recvs)
1031  : queue_pair(), scq(true), rcq(true) {
1032  throw rdma::qp_creation_failure();
1033 }
1034 
1036  throw rdma::qp_creation_failure();
1037 }
1038 
1039 struct task::task_impl {};
1040 task::~task() {}
1041 task::task(std::shared_ptr<manager_queue_pair> manager_qp) {
1042  throw unsupported_feature();
1043 }
1044 void task::append_wait(const completion_queue &cq, int count, bool signaled,
1045  bool last, uint64_t wr_id, const message_type &type) {
1046  throw unsupported_feature();
1047 }
1048 void task::append_enable_send(const managed_queue_pair &qp, int count) {
1049  throw unsupported_feature();
1050 }
1052  size_t offset, size_t length, uint32_t immediate) {
1053  throw unsupported_feature();
1054 }
1056  size_t offset, size_t length) {
1057  throw unsupported_feature();
1058 }
1059 bool task::post() {
1060  throw unsupported_feature();
1061 }
1062 
1063 #endif
1064 
1065 message_type::message_type(const string &name, completion_handler send_handler,
1066  completion_handler recv_handler,
1067  completion_handler write_handler) {
1068  std::lock_guard<std::mutex> l(completion_handlers_mutex);
1069 
1070  if(completion_handlers.size() >= std::numeric_limits<tag_type>::max())
1071  throw message_types_exhausted();
1072 
1073  tag = completion_handlers.size();
1074 
1076  set.send = send_handler;
1077  set.recv = recv_handler;
1078  set.write = write_handler;
1079  set.name = name;
1080  completion_handlers.push_back(set);
1081 }
1082 
1084  static message_type m(std::numeric_limits<tag_type>::max());
1085  return m;
1086 }
1087 
1089  return supported_features;
1090 }
1091 
1092 // int poll_for_completions(int num, ibv_wc *wcs, atomic<bool> &shutdown_flag) {
1093 // while(true) {
1094 // int poll_result = ibv_poll_cq(verbs_resources.cq, num, wcs);
1095 // if(poll_result != 0 || shutdown_flag) {
1096 // return poll_result;
1097 // }
1098 // }
1099 
1100 // // if(poll_result < 0) {
1101 // // /* poll CQ failed */
1102 // // fprintf(stderr, "poll CQ failed\n");
1103 // // } else {
1104 // // return
1105 // // /* CQE found */
1106 // // fprintf(stdout, "completion was found in CQ with status 0x%x\n",
1107 // // wc.status);
1108 // // /* check the completion status (here we don't care about the
1109 // // completion
1110 // // * opcode */
1111 // // if(wc.status != IBV_WC_SUCCESS) {
1112 // // fprintf(
1113 // // stderr,
1114 // // "got bad completion with status: 0x%x, vendor syndrome:
1115 // // 0x%x\n",
1116 // // wc.status, wc.vendor_err);
1117 // // }
1118 // // }
1119 // }
1120 namespace impl {
1121 map<uint32_t, remote_memory_region> verbs_exchange_memory_regions(
1122  const vector<uint32_t> &members, uint32_t node_rank,
1123  const memory_region &mr) {
1124  map<uint32_t, remote_memory_region> remote_mrs;
1125  for(uint32_t m : members) {
1126  if(m == node_rank) {
1127  continue;
1128  }
1129 
1130  auto it = sockets.find(m);
1131  if(it == sockets.end()) {
1132  throw rdma::connection_broken();
1133  }
1134 
1135  uintptr_t buffer;
1136  size_t size;
1137  uint32_t rkey;
1138 
1139  bool still_connected = it->second.exchange((uintptr_t)mr.buffer, buffer) && it->second.exchange((size_t)mr.size, size) && it->second.exchange((uint32_t)mr.get_rkey(), rkey);
1140 
1141  if(!still_connected) {
1142  fprintf(stderr, "WARNING: lost connection to node %u\n",
1143  (unsigned int)it->first);
1144  throw rdma::connection_broken();
1145  }
1146 
1147  remote_mrs.emplace(it->first, remote_memory_region(buffer, size, rkey));
1148  }
1149  return remote_mrs;
1150 }
1151 ibv_cq *verbs_get_cq() { return verbs_resources.cq; }
1152 ibv_comp_channel *verbs_get_completion_channel() { return verbs_resources.cc; }
1153 } // namespace impl
1154 } // namespace rdma
uint8_t gid[16]
socket accept()
Blocks until a remote client makes a connection to this connection listener, then returns a new socke...
Definition: tcp.cpp:236
void append_recv(const managed_endpoint &ep, const memory_region &mr, size_t offset, size_t length)
Definition: lf_helper.cpp:556
partial_wrapped< Tag, Ret, NewClass, Args... > tag(Ret(NewClass::*fun)(Args...))
User-facing entry point for the series of functions that binds a FunctionTag to a class&#39;s member func...
ibv_cq * verbs_get_cq()
ibv_comp_channel * verbs_get_completion_channel()
std::function< void(uint64_t tag, uint32_t immediate, size_t length)> completion_handler
Definition: lf_helper.hpp:131
completion_handler send
Definition: lf_helper.cpp:105
uint64_t get_time()
Definition: time.h:13
std::map< uint32_t, remote_memory_region > verbs_exchange_memory_regions(const std::vector< uint32_t > &members, uint32_t node_rank, const memory_region &mr)
int ib_port
Local IB port to work with.
Definition: verbs.cpp:55
std::unique_ptr< fid_mr, std::function< void(fid_mr *)> > mr
Smart pointer for managing the registered memory region.
Definition: lf_helper.hpp:55
std::unique_ptr< fid_cq, std::function< void(fid_cq *)> > cq
Smart pointer for managing the completion queue.
Definition: lf_helper.hpp:117
A wrapper for fi_close.
Definition: lf_helper.hpp:53
bool verbs_add_connection(uint32_t index, const std::string &address, uint32_t node_rank)
STL namespace.
ibv_device_attr device_attr
static message_type ignored()
Definition: lf_helper.cpp:528
bool post_empty_recv(uint64_t wr_id, const message_type &type)
void append_send(const managed_endpoint &ep, const memory_region &mr, size_t offset, size_t length, uint32_t immediate)
Definition: lf_helper.cpp:552
A C++ wrapper for the libfabric fid_cq struct and its associated functions.
Definition: lf_helper.hpp:115
void append_wait(const completion_queue &cq, int count, bool signaled, bool last, uint64_t wr_id, const message_type &type)
Definition: lf_helper.cpp:543
#define TRACE(x)
Definition: util.hpp:33
uint32_t node_rank
Definition: experiment.cpp:45
A C++ wrapper for the IB Verbs ibv_qp struct and its associated functions.
Listener to detect new incoming connections.
Definition: lf_helper.cpp:104
bool send(uint16_t group_number, std::shared_ptr< rdma::memory_region > mr, size_t offset, size_t length) __attribute__((warn_unused_result))
uint32_t rkey
Remote key.
Definition: verbs.hpp:290
Contains functions and classes for low-level RDMA operations, such as setting up memory regions and q...
Definition: lf_helper.hpp:28
static constexpr unsigned int shift_bits
Definition: lf_helper.hpp:136
completion_queue()
Constructor Uses the libfabrics API to open a completion queue.
Definition: lf_helper.cpp:231
void append_enable_send(const managed_endpoint &ep, int count)
Definition: lf_helper.cpp:548
task(std::shared_ptr< manager_endpoint > manager_ep)
Definition: lf_helper.cpp:537
ibv_port_attr port_attr
memory_region(size_t size)
Constructor Creates a buffer of the specified size and then calls the second constructor with the new...
Definition: lf_helper.cpp:203
struct rdma::ibv_resources verbs_resources
unique_ptr< ibv_mr, std::function< void(ibv_mr *)> > ibv_mr_unique_ptr
const char * dev_name
bool set_contiguous_memory_mode(bool enabled)
bool post_write(const memory_region &mr, size_t offset, size_t length, uint64_t wr_id, remote_memory_region remote_mr, size_t remote_offset, const message_type &type, bool signaled=false, bool send_inline=false)
const size_t size
Definition: lf_helper.hpp:89
bool verbs_initialize(const map< uint32_t, std::pair< ip_addr_t, uint16_t >> &ip_addrs_and_ports, uint32_t node_rank)
void verbs_destroy()
std::unique_ptr< char[]> allocated_buffer
Smart pointer for managing the buffer the mr uses.
Definition: lf_helper.hpp:57
ibv_comp_channel * cc
bool post_empty_send(uint64_t wr_id, uint32_t immediate, const message_type &type)
bool post_recv(const memory_region &mr, size_t offset, size_t length, uint64_t wr_id, const message_type &type)
bool set_interrupt_mode(bool enabled)
Definition: lf_helper.cpp:752
uint32_t get_rkey() const
char *const buffer
Definition: lf_helper.hpp:88
managed_queue_pair(size_t remote_index, std::function< void(managed_queue_pair *)> post_recvs)
std::optional< tag_type > tag
Definition: lf_helper.hpp:139
virtual ~task()
Definition: lf_helper.cpp:541
int gid_idx
GID index to use.
Definition: verbs.cpp:57
bool post() __attribute__((warn_unused_result))
Definition: lf_helper.cpp:561
feature_set get_supported_features()
struct sst::verbs_sender_ctxt __attribute__
bool verbs_remove_connection(uint32_t index)
bool exchange(T local, T &remote)
Definition: tcp.hpp:120
ibv_context * ib_ctx
bool post_send(const memory_region &mr, size_t offset, size_t length, uint64_t wr_id, uint32_t immediate, const message_type &type)
std::unique_ptr< ibv_qp, std::function< void(ibv_qp *)> > qp