Derecho  0.9
Distributed systems toolkit for RDMA
verbs.cpp
Go to the documentation of this file.
1 
5 #include <arpa/inet.h>
6 #include <byteswap.h>
7 #include <cstring>
8 #include <endian.h>
9 #include <errno.h>
10 #include <getopt.h>
11 #include <infiniband/verbs.h>
12 #include <inttypes.h>
13 #include <iostream>
14 #include <netdb.h>
15 #include <stdint.h>
16 #include <stdio.h>
17 #include <stdlib.h>
18 #include <string>
19 #include <sys/socket.h>
20 #include <sys/time.h>
21 #include <sys/types.h>
22 #include <thread>
23 #include <unistd.h>
24 
25 #include "derecho/connection_manager.h"
26 #include "derecho/derecho_ports.h"
27 #include "poll_utils.h"
28 #include "tcp/tcp.h"
29 #include "verbs.h"
30 
31 #error "Verbs implementation is obsolete. Compilation stopped."
32 
33 using std::cout;
34 using std::endl;
35 
36 #define MSG "SEND operation "
37 #define RDMAMSGR "RDMA read operation "
38 #define RDMAMSGW "RDMA write operation"
39 #define MSG_SIZE (strlen(MSG) + 1)
40 #if __BYTE_ORDER == __LITTLE_ENDIAN
41 static inline uint64_t htonll(uint64_t x) { return bswap_64(x); }
42 static inline uint64_t ntohll(uint64_t x) { return bswap_64(x); }
43 #elif __BYTE_ORDER == __BIG_ENDIAN
44 static inline uint64_t htonll(uint64_t x) { return x; }
45 static inline uint64_t ntohll(uint64_t x) { return x; }
46 #else
47 #error __BYTE_ORDER is neither
48 __LITTLE_ENDIAN nor __BIG_ENDIAN
49 #endif
50 
51 namespace sst {
53 const char *dev_name = NULL;
55 int ib_port = 1;
57 int gid_idx = 0;
58 
60 
61 // unsigned int max_time_to_completion = 0;
62 
66  struct ibv_device_attr device_attr;
68  struct ibv_port_attr port_attr;
70  struct ibv_context *ib_ctx;
72  struct ibv_pd *pd;
74  struct ibv_cq *cq;
75 };
78 
79 std::thread polling_thread;
80 static bool shutdown = false;
81 
95 _resources::_resources(int r_index, char *write_addr, char *read_addr, int size_w,
96  int size_r) {
97  // set the remote index
98  remote_index = r_index;
99 
100  write_buf = write_addr;
101  if(!write_buf) {
102  cout << "Write address is NULL" << endl;
103  }
104 
105  read_buf = read_addr;
106  if(!read_buf) {
107  cout << "Read address is NULL" << endl;
108  }
109 
110  // register the memory buffer
111  int mr_flags = 0;
112  // allow access for only local writes and remote reads
113  mr_flags = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE;
114  // register memory with the protection domain and the buffer
115  write_mr = ibv_reg_mr(g_res->pd, write_buf, size_w, mr_flags);
116  read_mr = ibv_reg_mr(g_res->pd, read_buf, size_r, mr_flags);
117  if(!write_mr) {
118  cout << "Could not register memory region : write_mr, error code is: " << errno << endl;
119  }
120  if(!read_mr) {
121  cout << "Could not register memory region : read_mr, error code is: " << errno << endl;
122  }
123 
124  // set the queue pair up for creation
125  struct ibv_qp_init_attr qp_init_attr;
126  memset(&qp_init_attr, 0, sizeof(qp_init_attr));
127  qp_init_attr.qp_type = IBV_QPT_RC;
128  qp_init_attr.sq_sig_all = 0;
129  // same completion queue for both send and receive operations
130  qp_init_attr.send_cq = g_res->cq;
131  qp_init_attr.recv_cq = g_res->cq;
132  // allow a lot of requests at a time
133  qp_init_attr.cap.max_send_wr = 4000;
134  qp_init_attr.cap.max_recv_wr = 4000;
135  qp_init_attr.cap.max_send_sge = 1;
136  qp_init_attr.cap.max_recv_sge = 1;
137  // create the queue pair
138  qp = ibv_create_qp(g_res->pd, &qp_init_attr);
139 
140  if(!qp) {
141  cout << "Could not create queue pair, error code is: " << errno << endl;
142  }
143 
144  // connect the QPs
145  connect_qp();
146  cout << "Established RDMA connection with node " << r_index << endl;
147 }
148 
153  int rc = 0;
154  if(qp) {
155  rc = ibv_destroy_qp(qp);
156  if(!qp) {
157  cout << "Could not destroy queue pair, error code is " << rc << endl;
158  }
159  }
160 
161  if(write_mr) {
162  rc = ibv_dereg_mr(write_mr);
163  if(rc) {
164  cout << "Could not de-register memory region : write_mr, error code is " << rc << endl;
165  }
166  }
167  if(read_mr) {
168  rc = ibv_dereg_mr(read_mr);
169  if(rc) {
170  cout << "Could not de-register memory region : read_mr, error code is " << rc << endl;
171  }
172  }
173 }
174 
179  struct ibv_qp_attr attr;
180  int flags;
181  int rc;
182  memset(&attr, 0, sizeof(attr));
183  // the init state
184  attr.qp_state = IBV_QPS_INIT;
185  attr.port_num = ib_port;
186  attr.pkey_index = 0;
187  // give access to local writes and remote reads
188  attr.qp_access_flags = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE;
189  flags = IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_ACCESS_FLAGS;
190  // modify the queue pair to init state
191  rc = ibv_modify_qp(qp, &attr, flags);
192  if(rc) {
193  cout << "Failed to modify queue pair to init state, error code is " << rc << endl;
194  }
195 }
196 
198  struct ibv_qp_attr attr;
199  int flags, rc;
200  memset(&attr, 0, sizeof(attr));
201  // change the state to ready to receive
202  attr.qp_state = IBV_QPS_RTR;
203  attr.path_mtu = IBV_MTU_256;
204  // set the queue pair number of the remote side
205  attr.dest_qp_num = remote_props.qp_num;
206  attr.rq_psn = 0;
207  attr.max_dest_rd_atomic = 1;
208  attr.min_rnr_timer = 0x12;
209  attr.ah_attr.is_global = 0;
210  // set the local id of the remote side
211  attr.ah_attr.dlid = remote_props.lid;
212  attr.ah_attr.sl = 0;
213  attr.ah_attr.src_path_bits = 0;
214  // the infiniband port to associate with
215  attr.ah_attr.port_num = ib_port;
216  if(gid_idx >= 0) {
217  attr.ah_attr.is_global = 1;
218  attr.ah_attr.port_num = 1;
219  memcpy(&attr.ah_attr.grh.dgid, remote_props.gid, 16);
220  attr.ah_attr.grh.flow_label = 0;
221  attr.ah_attr.grh.hop_limit = 1;
222  attr.ah_attr.grh.sgid_index = gid_idx;
223  attr.ah_attr.grh.traffic_class = 0;
224  }
225  flags = IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU | IBV_QP_DEST_QPN | IBV_QP_RQ_PSN | IBV_QP_MAX_DEST_RD_ATOMIC | IBV_QP_MIN_RNR_TIMER;
226  rc = ibv_modify_qp(qp, &attr, flags);
227  if(rc) {
228  cout << "Failed to modify queue pair to ready-to-receive state, error code is " << rc << endl;
229  }
230 }
231 
233  struct ibv_qp_attr attr;
234  int flags, rc;
235  memset(&attr, 0, sizeof(attr));
236  // set the state to ready to send
237  attr.qp_state = IBV_QPS_RTS;
238  attr.timeout = 4; // The timeout is 4.096x2^(timeout) microseconds
239  attr.retry_cnt = 6;
240  attr.rnr_retry = 0;
241  attr.sq_psn = 0;
242  attr.max_rd_atomic = 1;
243  flags = IBV_QP_STATE | IBV_QP_TIMEOUT | IBV_QP_RETRY_CNT | IBV_QP_RNR_RETRY | IBV_QP_SQ_PSN | IBV_QP_MAX_QP_RD_ATOMIC;
244  rc = ibv_modify_qp(qp, &attr, flags);
245  if(rc) {
246  cout << "Failed to modify queue pair to ready-to-send state, error code is " << rc << endl;
247  }
248 }
249 
255  // local connection data
256  struct cm_con_data_t local_con_data;
257  // remote connection data. Obtained via TCP
258  struct cm_con_data_t remote_con_data;
259  // this is used to ensure that host byte order is correct at each node
260  struct cm_con_data_t tmp_con_data;
261 
262  union ibv_gid my_gid;
263  if(gid_idx >= 0) {
264  int rc = ibv_query_gid(g_res->ib_ctx, ib_port, gid_idx, &my_gid);
265  if(rc) {
266  cout << "ibv_query_gid failed, error code is " << errno << endl;
267  }
268  } else {
269  memset(&my_gid, 0, sizeof my_gid);
270  }
271 
272  // exchange using TCP sockets info required to connect QPs
273  local_con_data.addr = htonll((uintptr_t)(char *)write_buf);
274  local_con_data.rkey = htonl(write_mr->rkey);
275  local_con_data.qp_num = htonl(qp->qp_num);
276  local_con_data.lid = htons(g_res->port_attr.lid);
277  memcpy(local_con_data.gid, &my_gid, 16);
278  bool success = sst_connections->exchange(remote_index, local_con_data, tmp_con_data);
279  if(!success) {
280  cout << "Could not exchange qp data in connect_qp" << endl;
281  }
282  remote_con_data.addr = ntohll(tmp_con_data.addr);
283  remote_con_data.rkey = ntohl(tmp_con_data.rkey);
284  remote_con_data.qp_num = ntohl(tmp_con_data.qp_num);
285  remote_con_data.lid = ntohs(tmp_con_data.lid);
286  memcpy(remote_con_data.gid, tmp_con_data.gid, 16);
287  // save the remote side attributes, we will need it for the post SR
288  remote_props = remote_con_data;
289 
290  // modify the QP to init
291  set_qp_initialized();
292 
293  // modify the QP to RTR
294  set_qp_ready_to_receive();
295 
296  // modify it to RTS
297  set_qp_ready_to_send();
298 
299  // sync to make sure that both sides are in states that they can connect to
300  // prevent packet loss
301  // just send a dummy char back and forth
302  success = sync(remote_index);
303  if(!success) {
304  cout << "Could not sync in connect_qp after qp transition to RTS state" << endl;
305  }
306 }
307 
317 int _resources::post_remote_send(const uint32_t id, const long long int offset, const long long int size,
318  const int op, const bool completion) {
319  struct ibv_send_wr sr;
320  struct ibv_sge sge;
321  struct ibv_send_wr *bad_wr = NULL;
322 
323  // don't care where the read buffer is saved
324  sge.addr = (uintptr_t)(read_buf + offset);
325  sge.length = size;
326  sge.lkey = read_mr->lkey;
327  // prepare the send work request
328  memset(&sr, 0, sizeof(sr));
329  sr.next = NULL;
330  // set the id for the work request, useful at the time of polling
331  sr.wr_id = id;
332  sr.sg_list = &sge;
333  sr.num_sge = 1;
334  // set opcode depending on op parameter
335  if(op == 0) {
336  sr.opcode = IBV_WR_RDMA_READ;
337  } else if(op == 1) {
338  sr.opcode = IBV_WR_RDMA_WRITE;
339  } else {
340  sr.opcode = IBV_WR_SEND;
341  }
342  if(completion) {
343  sr.send_flags = IBV_SEND_SIGNALED;
344  }
345  if(op == 0 || op == 1) {
346  // set the remote rkey and virtual address
347  sr.wr.rdma.remote_addr = remote_props.addr + offset;
348  sr.wr.rdma.rkey = remote_props.rkey;
349  }
350  // there is a receive request in the responder side
351  // , so we won't get any into RNR flow
352  auto ret = ibv_post_send(qp, &sr, &bad_wr);
353  return ret;
354 }
355 
356 resources::resources(int r_index, char *write_addr, char *read_addr, int size_w,
357  int size_r) : _resources(r_index, write_addr, read_addr, size_w, size_r) {
358 }
359 
363 void resources::post_remote_read(const uint32_t id, const long long int size) {
364  int rc = post_remote_send(id, 0, size, 0, false);
365  if(rc) {
366  cout << "Could not post RDMA read, error code is " << rc << ", remote_index is " << remote_index << endl;
367  }
368 }
374 void resources::post_remote_read(const uint32_t id, const long long int offset, const long long int size) {
375  int rc = post_remote_send(id, offset, size, 0, false);
376  if(rc) {
377  cout << "Could not post RDMA read, error code is " << rc << ", remote_index is " << remote_index << endl;
378  }
379 }
384 void resources::post_remote_write(const uint32_t id, const long long int size) {
385  int rc = post_remote_send(id, 0, size, 1, false);
386  if(rc) {
387  cout << "Could not post RDMA write (with no offset), error code is " << rc << ", remote_index is " << remote_index << endl;
388  }
389 }
390 
397 void resources::post_remote_write(const uint32_t id, const long long int offset, const long long int size) {
398  int rc = post_remote_send(id, offset, size, 1, false);
399  if(rc) {
400  cout << "Could not post RDMA write with offset, error code is " << rc << ", remote_index is " << remote_index << endl;
401  }
402 }
403 
404 void resources::post_remote_write_with_completion(const uint32_t id, const long long int size) {
405  int rc = post_remote_send(id, 0, size, 1, true);
406  if(rc) {
407  cout << "Could not post RDMA write (with no offset) with completion, error code is " << rc << ", remote_index is " << remote_index << endl;
408  }
409 }
410 
411 void resources::post_remote_write_with_completion(const uint32_t id, const long long int offset, const long long int size) {
412  int rc = post_remote_send(id, offset, size, 1, true);
413  if(rc) {
414  cout << "Could not post RDMA write with offset and completion, error code is " << rc << ", remote_index is " << remote_index << endl;
415  }
416 }
417 
418 resources_two_sided::resources_two_sided(int r_index, char *write_addr, char *read_addr, int size_w,
419  int size_r) : _resources(r_index, write_addr, read_addr, size_w, size_r) {
420 }
421 
426 void resources_two_sided::post_two_sided_send(const uint32_t id, const long long int size) {
427  int rc = post_remote_send(id, 0, size, 2, false);
428  if(rc) {
429  cout << "Could not post RDMA two sided send (with no offset), error code is " << rc << ", remote_index is " << remote_index << endl;
430  }
431 }
432 
439 void resources_two_sided::post_two_sided_send(const uint32_t id, const long long int offset, const long long int size) {
440  int rc = post_remote_send(id, offset, size, 2, false);
441  if(rc) {
442  cout << "Could not post RDMA two sided send with offset, error code is " << rc << ", remote_index is " << remote_index << endl;
443  }
444 }
445 
446 void resources_two_sided::post_two_sided_send_with_completion(const uint32_t id, const long long int size) {
447  int rc = post_remote_send(id, 0, size, 2, true);
448  if(rc) {
449  cout << "Could not post RDMA two sided send (with no offset) with completion, error code is " << rc << ", remote_index is " << remote_index << endl;
450  }
451 }
452 
453 void resources_two_sided::post_two_sided_send_with_completion(const uint32_t id, const long long int offset, const long long int size) {
454  int rc = post_remote_send(id, offset, size, 2, true);
455  if(rc) {
456  cout << "Could not post RDMA two sided send with offset and completion, error code is " << rc << ", remote_index is " << remote_index << endl;
457  }
458 }
459 
460 int resources_two_sided::post_receive(const uint32_t id, const long long int offset, const long long int size) {
461  struct ibv_recv_wr rr;
462  struct ibv_sge sge;
463  struct ibv_recv_wr *bad_wr;
464 
465  /* prepare the scatter/gather entry */
466  memset(&sge, 0, sizeof(sge));
467  sge.addr = (uintptr_t)(write_buf + offset);
468  sge.length = size;
469  sge.lkey = write_mr->lkey;
470  /* prepare the receive work request */
471  memset(&rr, 0, sizeof(rr));
472  rr.next = NULL;
473  rr.wr_id = id;
474  rr.sg_list = &sge;
475  rr.num_sge = 1;
476 
477  /* post the Receive Request to the RQ */
478  auto ret = ibv_post_recv(qp, &rr, &bad_wr);
479  return ret;
480 }
481 
482 void resources_two_sided::post_two_sided_receive(const uint32_t id, const long long int size) {
483  int rc = post_receive(id, 0, size);
484  if(rc) {
485  cout << "Could not post RDMA two sided receive (with no offset), error code is " << rc << ", remote_index is " << remote_index << endl;
486  }
487 }
488 
489 void resources_two_sided::post_two_sided_receive(const uint32_t id, const long long int offset, const long long int size) {
490  int rc = post_receive(id, offset, size);
491  if(rc) {
492  cout << "Could not post RDMA two sided receive with offset, error code is " << rc << ", remote_index is " << remote_index << endl;
493  }
494 }
495 
496 void polling_loop() {
497  pthread_setname_np(pthread_self(), "sst_poll");
498  cout << "Polling thread starting" << endl;
499  while(!shutdown) {
500  auto ce = verbs_poll_completion();
501  util::polling_data.insert_completion_entry(ce.first, ce.second);
502  }
503  cout << "Polling thread ending" << endl;
504 }
505 
515 std::pair<uint32_t, std::pair<int, int>> verbs_poll_completion() {
516  struct ibv_wc wc;
517  int poll_result;
518 
519  while(!shutdown) {
520  poll_result = 0;
521  for(int i = 0; i < 50; ++i) {
522  poll_result = ibv_poll_cq(g_res->cq, 1, &wc);
523  if(poll_result) {
524  break;
525  }
526  }
527  if(poll_result) {
528  break;
529  }
530  // util::polling_data.wait_for_requests();
531  }
532  // not sure what to do when we cannot read entries off the CQ
533  // this means that something is wrong with the local node
534  if(poll_result < 0) {
535  cout << "Poll completion failed" << endl;
536  exit(-1);
537  }
538  // check the completion status (here we don't care about the completion
539  // opcode)
540  if(wc.status != IBV_WC_SUCCESS) {
541  cout << "got bad completion with status: "
542  << wc.status << ", vendor syndrome: " << wc.vendor_err;
543  return {wc.wr_id, {wc.qp_num, -1}};
544  }
545  return {wc.wr_id, {wc.qp_num, 1}};
546 }
547 
550  // initialize the global resources
551  g_res = (global_resources *)malloc(sizeof(global_resources));
552  memset(g_res, 0, sizeof *g_res);
553 }
554 
557  struct ibv_device **dev_list = NULL;
558  struct ibv_device *ib_dev = NULL;
559  int i;
560  int num_devices;
561  int rc = 0;
562 
563  // get device names in the system
564  dev_list = ibv_get_device_list(&num_devices);
565  if(!dev_list) {
566  cout << "ibv_get_device_list failed; returned a NULL list" << endl;
567  }
568 
569  // if there isn't any IB device in host
570  if(!num_devices) {
571  cout << "NO RDMA device present" << endl;
572  }
573  // search for the specific device we want to work with
574  for(i = 1; i < num_devices; i++) {
575  if(!dev_name) {
576  dev_name = strdup(ibv_get_device_name(dev_list[i]));
577  fprintf(stdout, "device not specified, using first one found: %s\n",
578  dev_name);
579  }
580  if(!strcmp(ibv_get_device_name(dev_list[i]), dev_name)) {
581  ib_dev = dev_list[i];
582  break;
583  }
584  }
585  // if the device wasn't found in host
586  if(!ib_dev) {
587  cout << "No RDMA devices found in the host" << endl;
588  }
589  // get device handle
590  g_res->ib_ctx = ibv_open_device(ib_dev);
591  if(!g_res->ib_ctx) {
592  cout << "Could not open RDMA device" << endl;
593  }
594  // we are now done with device list, free it
595  ibv_free_device_list(dev_list);
596  dev_list = NULL;
597  ib_dev = NULL;
598  // query port properties
599  rc = ibv_query_port(g_res->ib_ctx, ib_port, &g_res->port_attr);
600  if(rc) {
601  cout << "Could not query port properties, error code is " << rc << endl;
602  }
603 
604  // allocate Protection Domain
605  g_res->pd = ibv_alloc_pd(g_res->ib_ctx);
606  if(!g_res->pd) {
607  cout << "Could not allocate protection domain" << endl;
608  }
609 
610  // get the device attributes for the device
611  ibv_query_device(g_res->ib_ctx, &g_res->device_attr);
612 
613  // cout << "device_attr.max_qp_wr = " << g_res->device_attr.max_qp_wr << endl;
614  // cout << "device_attr.max_cqe = " << g_res->device_attr.max_cqe << endl;
615 
616  // set to many entries
617  int cq_size = 1000;
618  g_res->cq = ibv_create_cq(g_res->ib_ctx, cq_size, NULL, NULL, 0);
619  if(!g_res->cq) {
620  cout << "Could not create completion queue, error code is " << errno << endl;
621  }
622 
623  // start the polling thread
624  polling_thread = std::thread(polling_loop);
625  polling_thread.detach();
626 }
627 
628 bool add_node(uint32_t new_id, const std::string new_ip_addr) {
629  return sst_connections->add_node(new_id, new_ip_addr);
630 }
631 bool remove_node(uint32_t node_id) {
632  return sst_connections->delete_node(node_id);
633 }
634 
635 bool sync(uint32_t r_index) {
636  int s = 0, t = 0;
637  return sst_connections->exchange(r_index, s, t);
638 }
639 
644  void verbs_initialize(const std::map<uint32_t, std::pair<ip_addr_t, uint16_t>> &ip_addrs_and_ports, uint32_t node_rank) {
645  sst_connections = new tcp::tcp_connections(node_rank, ip_addrs_and_ports);
646 
647  // init all of the resources, so cleanup will be easy
648  resources_init();
649  // create resources before using them
651 
652  cout << "Initialized global RDMA resources" << endl;
653 }
654 
656  shutdown = true;
657 }
658 
665  shutdown = true;
666  // int rc;
667  // if(g_res->cq) {
668  // rc = ibv_destroy_cq(g_res->cq);
669  // if(rc) {
670  // cout << "Could not destroy completion queue" << endl;
671  // }
672  // }
673  // if(g_res->pd) {
674  // rc = ibv_dealloc_pd(g_res->pd);
675  // if(rc) {
676  // cout << "Could not deallocate protection domain" << endl;
677  // }
678  // }
679  // if(g_res->ib_ctx) {
680  // rc = ibv_close_device(g_res->ib_ctx);
681  // if(rc) {
682  // cout << "Could not close RDMA device" << endl;
683  // }
684  // }
685  delete sst_connections;
686  cout << "SST Verbs shutting down" << endl;
687 }
688 
689 } // namespace sst
void insert_completion_entry(uint32_t index, std::pair< int32_t, int32_t > ce)
Definition: poll_utils.cpp:23
struct ibv_port_attr port_attr
IB port attributes.
Definition: verbs.cpp:68
bool add_node(node_id_t new_id, const std::pair< ip_addr_t, uint16_t > &new_ip_addr_and_port)
Adds a TCP connection to a new node.
struct fid_mr * write_mr
memory region for remote writer
Definition: lf.hpp:69
void resources_create()
Creates global RDMA resources.
Definition: verbs.cpp:556
uint32_t qp_num
Queue Pair number.
Definition: verbs.hpp:23
bool add_node(uint32_t new_id, const std::pair< ip_addr_t, uint16_t > &new_ip_addr_and_port)
Adds a new node to the SST TPC connections set.
Definition: lf.cpp:557
std::thread polling_thread
Definition: lf.cpp:99
int ib_port
Local IB port to work with.
Definition: verbs.cpp:55
struct global_resources * g_res
The single instance of global_resources for the SST system.
Definition: verbs.cpp:77
void shutdown_polling_thread()
Shutdown the polling thread.
Definition: lf.cpp:743
bool exchange(node_id_t node_id, T local, T &remote)
std::pair< uint32_t, std::pair< int, int > > verbs_poll_completion()
Polls for completion of a single posted remote write.
Definition: verbs.cpp:515
void set_qp_ready_to_send()
Transitions the queue pair to the ready-to-send state.
Definition: verbs.cpp:232
virtual ~_resources()
Destroys the resources.
Definition: lf.cpp:368
bool remove_node(uint32_t node_id)
Removes a node from the SST TCP connections set.
Definition: lf.cpp:561
char * write_buf
Pointer to the memory buffer used for local writes.
Definition: lf.hpp:73
struct ibv_cq * cq
Completion Queue handle.
Definition: verbs.cpp:74
uint32_t rkey
Remote key.
Definition: verbs.hpp:21
uint32_t node_rank
Definition: experiment.cpp:45
void resources_init()
Allocates memory for global RDMA resources.
Definition: verbs.cpp:549
PollingData polling_data
Definition: poll_utils.cpp:17
struct ibv_pd * pd
PD handle.
Definition: verbs.cpp:72
tcp::tcp_connections * sst_connections
Definition: lf.cpp:100
void post_two_sided_send_with_completion(struct lf_sender_ctxt *ctxt, const long long int size)
Definition: lf.cpp:510
void connect_qp()
Connect the queue pairs.
Definition: verbs.cpp:254
void set_qp_ready_to_receive()
Transitions the queue pair to the ready-to-receive state.
Definition: verbs.cpp:197
int remote_index
Index of the remote node.
Definition: verbs.hpp:56
void verbs_initialize(const std::map< uint32_t, std::string > &ip_addrs, uint32_t node_rank)
Initializes the global verbs resources.
bool delete_node(node_id_t remove_id)
Removes a node from the managed set of TCP connections, closing the socket connected to it...
int post_receive(struct lf_sender_ctxt *ctxt, const long long int offset, const long long int size)
Definition: lf.cpp:538
void post_two_sided_receive(struct lf_sender_ctxt *ctxt, const long long int size)
Definition: lf.cpp:524
void post_remote_read(const long long int size)
Post an RDMA read at the beginning address of remote memory.
Definition: lf.cpp:461
const char * dev_name
IB device name.
Definition: verbs.cpp:53
bool sync(uint32_t r_id)
Blocks the current thread until both this node and a remote node reach this function, which exchanges some trivial data over a TCP connection.
Definition: lf.cpp:565
void post_two_sided_send(const long long int size)
Definition: lf.cpp:490
uint64_t addr
Buffer address.
Definition: verbs.hpp:19
struct ibv_qp * qp
Handle for the IB Verbs Queue Pair object.
Definition: verbs.hpp:58
void set_qp_initialized()
Initializes the queue pair.
Definition: verbs.cpp:178
uint8_t gid[16]
GID.
Definition: verbs.hpp:27
void verbs_destroy()
Destroys the global verbs resources.
Definition: verbs.cpp:664
struct ibv_context * ib_ctx
Device handle.
Definition: verbs.cpp:70
uint16_t lid
LID of the InfiniBand port.
Definition: verbs.hpp:25
resources_two_sided(int r_id, char *write_addr, char *read_addr, int size_w, int size_r, int is_lf_server)
constructor: simply forwards to _resources::_resources
Definition: lf.hpp:149
resources(int r_id, char *write_addr, char *read_addr, int size_w, int size_r, int is_lf_server)
Constructor: simply forwards to _resources::_resources.
Definition: lf.hpp:118
Structure containing global system resources.
Definition: verbs.cpp:64
Represents the set of RDMA resources needed to maintain a two-way connection to a single remote node...
Definition: lf.hpp:31
struct ibv_device_attr device_attr
RDMA device attributes.
Definition: verbs.cpp:66
int post_remote_send(struct lf_sender_ctxt *ctxt, const long long int offset, const long long int size, const int op, const bool completion)
post read/write request
Definition: lf.cpp:384
int gid_idx
GID index to use.
Definition: verbs.cpp:57
void post_remote_write_with_completion(struct lf_sender_ctxt *ctxt, const long long int size)
Definition: lf.cpp:477
Structure to exchange the data needed to connect the Queue Pairs.
Definition: verbs.hpp:17
_resources(int r_id, char *write_addr, char *read_addr, int size_w, int size_r, int is_lf_server)
Constructor Initializes the resources.
Definition: lf.cpp:311
void post_remote_write(const long long int size)
Post an RDMA write at the beginning address of remote memory.
Definition: lf.cpp:469
void polling_loop()
Definition: lf.cpp:570