Derecho  0.9
Distributed systems toolkit for RDMA
lf_helper.cpp
Go to the documentation of this file.
1 #include <atomic>
2 #include <cstring>
3 #include <fcntl.h>
4 #include <iostream>
5 #include <list>
6 #include <mutex>
7 #include <poll.h>
8 #include <thread>
9 #include <vector>
10 #include <arpa/inet.h>
11 #include <byteswap.h>
12 #include <rdma/fabric.h>
13 #include <rdma/fi_cm.h>
14 #include <rdma/fi_tagged.h>
15 #include <rdma/fi_rma.h>
16 #include <rdma/fi_endpoint.h>
17 #include <rdma/fi_domain.h>
18 
19 #include <derecho/conf/conf.hpp>
22 #include <derecho/tcp/tcp.hpp>
24 #include <derecho/utils/logger.hpp>
25 
27 #if __BYTE_ORDER == __LITTLE_ENDIAN
28 static inline uint64_t htonll(uint64_t x) { return bswap_64(x); }
29 static inline uint64_t ntohll(uint64_t x) { return bswap_64(x); }
30 #elif __BYTE_ORDER == __BIG_ENDIAN
31 static inline uint64_t htonll(uint64_t x) { return x; }
32 static inline uint64_t ntohll(uint64_t x) { return x; }
33 #else
34 #error __BYTE_ORDER is neither
35 __LITTLE_ENDIAN nor __BIG_ENDIAN
36 #endif
37 
38 using namespace std;
39 
40 namespace rdma {
41 
42 #define CRASH_WITH_MESSAGE(...) \
43 do { \
44  fprintf(stderr,__VA_ARGS__); \
45  fflush(stderr); \
46  exit(-1); \
47 } while (0);
48 
53 };
54 #define FAIL_IF_NONZERO_RETRY_EAGAIN(x,desc,next) \
55  do { \
56  int64_t _int64_r_; \
57  do { \
58  _int64_r_ = (int64_t)(x); \
59  } while ( _int64_r_ == -FI_EAGAIN ); \
60  if (_int64_r_ != 0) { \
61  dbg_default_error("{}:{},ret={},{}",__FILE__,__LINE__,_int64_r_,desc); \
62  fprintf(stderr,"%s:%d,ret=%ld,%s\n",__FILE__,__LINE__,_int64_r_,desc); \
63  if (next == CRASH_ON_FAILURE) { \
64  fflush(stderr); \
65  exit(-1); \
66  } \
67  } \
68  } while (0)
69 #define FAIL_IF_ZERO(x,desc,next) \
70  do { \
71  int64_t _int64_r_ = (int64_t)(x); \
72  if (_int64_r_ == 0) { \
73  dbg_default_error("{}:{},{}",__FILE__,__LINE__,desc); \
74  fprintf(stderr,"%s:%d,%s\n",__FILE__,__LINE__,desc); \
75  if (next == CRASH_ON_FAILURE) { \
76  fflush(stderr); \
77  exit(-1); \
78  } \
79  } \
80  } while (0)
81 
85 #define MAX_LF_ADDR_SIZE ((128)-sizeof(uint32_t)-2*sizeof(uint64_t))
86 struct cm_con_data_t {
87  uint32_t pep_addr_len;
89 } __attribute__((packed));
90 
95 
99 //static unique_ptr<tcp::connection_listener> connection_listener;
100 
108  string name;
109 };
110 static vector<completion_handler_set> completion_handlers;
111 static std::mutex completion_handlers_mutex;
112 
116 struct lf_ctxt {
117  struct fi_info * hints;
118  struct fi_info * fi;
119  struct fid_fabric * fabric;
120  struct fid_domain * domain;
121  struct fid_pep * pep;
122  struct fid_eq * peq;
123  // struct fid_eq * eq; /** event queue for transmitting events */ : moved to resources.
124  struct fid_cq * cq;
125  size_t pep_addr_len;
127  struct fi_eq_attr eq_attr;
128  struct fi_cq_attr cq_attr;
129 };
132 
133 #define LF_USE_VADDR ((g_ctxt.fi->domain_attr->mr_mode) & (FI_MR_VIRT_ADDR | FI_MR_BASIC))
134 #define LF_CONFIG_FILE "rdma.cfg"
135 
136 enum RDMAOps {
140 };
141 #define OP_BITS_SHIFT (48)
142 #define OP_BITS_MASK (0x00ff000000000000ull)
143 #define EXTRACT_RDMA_OP_CODE(x) ((uint8_t)((((uint64_t)x) & OP_BITS_MASK) >> OP_BITS_SHIFT))
144 
145 namespace impl {
146 
150 static void default_context() {
151  memset((void*)&g_ctxt, 0, sizeof(struct lf_ctxt));
152 
154  FAIL_IF_ZERO(g_ctxt.hints = fi_allocinfo(),"Fail to allocate fi hints",CRASH_ON_FAILURE);
156  g_ctxt.hints->caps = FI_MSG | FI_RMA | FI_READ | FI_WRITE |
157  FI_REMOTE_READ | FI_REMOTE_WRITE;
159  g_ctxt.hints->ep_attr->type = FI_EP_MSG;
161  g_ctxt.hints->mode = ~0;
163  g_ctxt.cq_attr.format = FI_CQ_FORMAT_DATA;
165  // g_ctxt.cq_attr.wait_obj = FI_WAIT_UNSPEC; //FI_WAIT_FD;
166  g_ctxt.cq_attr.wait_obj = FI_WAIT_FD;
168  g_ctxt.pep_addr_len = MAX_LF_ADDR_SIZE;
169 
171  FAIL_IF_ZERO(
172  g_ctxt.hints->fabric_attr->prov_name = strdup(derecho::getConfString(CONF_RDMA_PROVIDER).c_str()),
173  "strdup provider name.", CRASH_ON_FAILURE
174  );
176  FAIL_IF_ZERO(
177  g_ctxt.hints->domain_attr->name = strdup(derecho::getConfString(CONF_RDMA_DOMAIN).c_str()),
178  "strdup domain name.", CRASH_ON_FAILURE
179  );
181  if (strcmp(g_ctxt.hints->fabric_attr->prov_name,"sockets")==0) {
182  g_ctxt.hints->domain_attr->mr_mode = FI_MR_BASIC;
183  } else { // default
185  FAIL_IF_ZERO(
186  g_ctxt.hints->tx_attr->size = derecho::Conf::get()->getInt32(CONF_RDMA_TX_DEPTH),
187  "get tx depth.", CRASH_ON_FAILURE
188  );
189  FAIL_IF_ZERO(
190  g_ctxt.hints->rx_attr->size = derecho::Conf::get()->getInt32(CONF_RDMA_RX_DEPTH),
191  "get rx depth.", CRASH_ON_FAILURE
192  );
193  g_ctxt.hints->domain_attr->mr_mode = FI_MR_LOCAL | FI_MR_ALLOCATED | FI_MR_PROV_KEY | FI_MR_VIRT_ADDR;
194  }
195 }
196 }
197 
198 
203 memory_region::memory_region(size_t s) : memory_region(new char[s], s) {
204  allocated_buffer.reset(buffer);
205 }
206 
207 memory_region::memory_region(char *buf, size_t s) : buffer(buf), size(s) {
208  if (!buffer || size <= 0) throw rdma::invalid_args();
209 
210  const int mr_access = FI_WRITE | FI_REMOTE_READ | FI_REMOTE_WRITE;
211 
213  fid_mr* raw_mr;
215  fi_mr_reg(g_ctxt.domain, (void *)buffer, size, mr_access,
216  0, 0, 0, &raw_mr, nullptr),
217  "Failed to register memory", CRASH_ON_FAILURE
218  );
219  FAIL_IF_ZERO(raw_mr, "Pointer to memory region is null", CRASH_ON_FAILURE);
220 
221  mr = unique_ptr<fid_mr, std::function<void(fid_mr *)>>(
222  raw_mr, [](fid_mr *mr) { fi_close(&mr->fid); }
223  );
224 }
225 
226 uint64_t memory_region::get_key() const { return mr->key; }
227 
232  g_ctxt.cq_attr.size = g_ctxt.fi->tx_attr->size;
233  fid_cq* raw_cq;
235  fi_cq_open(g_ctxt.domain, &(g_ctxt.cq_attr), &raw_cq, NULL),
236  "failed to initialize tx completion queue", CRASH_ON_FAILURE
237  );
238  FAIL_IF_ZERO(raw_cq, "Pointer to completion queue is null", CRASH_ON_FAILURE);
239 
240  cq = unique_ptr<fid_cq, std::function<void(fid_cq *)>>(
241  raw_cq, [](fid_cq *cq) { fi_close(&cq->fid); }
242  );
243 }
244 
246 endpoint::endpoint(size_t remote_index, bool is_lf_server)
247  : endpoint(remote_index, is_lf_server, [](endpoint *){}) {}
248 endpoint::endpoint(size_t remote_index, bool is_lf_server,
249  std::function<void(endpoint *)> post_recvs) {
250  connect(remote_index, is_lf_server, post_recvs);
251 }
252 
253 int endpoint::init(struct fi_info *fi) {
254  int ret;
256  fid_ep* raw_ep;
258  ret = fi_endpoint(g_ctxt.domain, fi, &raw_ep, NULL),
259  "Failed to open endpoint", REPORT_ON_FAILURE
260  );
261  if(ret) return ret;
262  dbg_default_trace("{}:{} created rdmc endpoint: {}",__FILE__,__func__,(void*)&raw_ep->fid);
265  ep = unique_ptr<fid_ep, std::function<void(fid_ep *)>>(
266  raw_ep,
267  [](fid_ep *ep) {
268  fi_close(&ep->fid);
269  }
270  );
271 
273  fid_eq* raw_eq;
275  ret = fi_eq_open(g_ctxt.fabric, &g_ctxt.eq_attr, &raw_eq, NULL),
276  "Failed to open event queue", REPORT_ON_FAILURE
277  );
278  if(ret) return ret;
280  eq = unique_ptr<fid_eq, std::function<void(fid_eq *)>>(
281  raw_eq, [](fid_eq *eq) { fi_close(&eq->fid); }
282  );
283 
286  ret = fi_ep_bind(raw_ep, &(raw_eq)->fid, 0),
287  "Failed to bind endpoint and event queue", REPORT_ON_FAILURE
288  );
289  if(ret) return ret;
290  const uint64_t ep_flags = FI_RECV | FI_TRANSMIT | FI_SELECTIVE_COMPLETION;
292  ret = fi_ep_bind(raw_ep, &(g_ctxt.cq)->fid, ep_flags),
293  "Failed to bind endpoint and tx completion queue", REPORT_ON_FAILURE
294  );
295  if(ret) return ret;
297  ret = fi_enable(raw_ep),
298  "Failed to enable endpoint", REPORT_ON_FAILURE
299  );
300  return ret;
301 }
302 
303 bool sync(uint32_t r_id) {
304  int s = 0, t = 0;
305 
306  return rdmc_connections->exchange(r_id, s, t);
307 }
308 
309 void endpoint::connect(size_t remote_index, bool is_lf_server,
310  std::function<void(endpoint *)> post_recvs) {
311  struct cm_con_data_t local_cm_data, remote_cm_data;
312  memset(&local_cm_data, 0, sizeof(local_cm_data));
313  memset(&remote_cm_data, 0, sizeof(remote_cm_data));
314 
316  local_cm_data.pep_addr_len = (uint32_t)htonl((uint32_t)g_ctxt.pep_addr_len);
317  memcpy((void*)&local_cm_data.pep_addr, &g_ctxt.pep_addr, g_ctxt.pep_addr_len);
318 
319  FAIL_IF_ZERO(
320  rdmc_connections->exchange(remote_index, local_cm_data, remote_cm_data),
321  "Failed to exchange cm info", CRASH_ON_FAILURE
322  );
323 
324  remote_cm_data.pep_addr_len = (uint32_t)ntohl(remote_cm_data.pep_addr_len);
325 
327  ssize_t nRead;
328  struct fi_eq_cm_entry entry;
329  uint32_t event;
330 
331  if (is_lf_server) {
333  nRead = fi_eq_sread(g_ctxt.peq, &event, &entry, sizeof(entry), -1, 0);
334  if(nRead != sizeof(entry)) {
335  CRASH_WITH_MESSAGE("Failed to get connection from remote. nRead=%ld\n",nRead);
336  }
337  if (init(entry.info)){
338  fi_reject(g_ctxt.pep, entry.info->handle, NULL, 0);
339  fi_freeinfo(entry.info);
340  CRASH_WITH_MESSAGE("Failed to initialize server endpoint.\n");
341  }
342  if (fi_accept(ep.get(), NULL, 0)){
343  fi_reject(g_ctxt.pep, entry.info->handle, NULL, 0);
344  fi_freeinfo(entry.info);
345  CRASH_WITH_MESSAGE("Failed to accept connection.\n");
346  }
347  fi_freeinfo(entry.info);
348  } else {
349  struct fi_info * client_hints = fi_dupinfo(g_ctxt.hints);
350  struct fi_info * client_info = NULL;
351 
353  FAIL_IF_ZERO(
354  client_hints->dest_addr = malloc(remote_cm_data.pep_addr_len),
355  "Failed to malloc address space for server pep.", CRASH_ON_FAILURE
356  );
357  memcpy((void*)client_hints->dest_addr,
358  (void*)remote_cm_data.pep_addr,
359  (size_t)remote_cm_data.pep_addr_len);
360  client_hints->dest_addrlen = remote_cm_data.pep_addr_len;
362  fi_getinfo(LF_VERSION, NULL, NULL, 0, client_hints, &client_info),
363  "fi_getinfo() failed.", CRASH_ON_FAILURE
364  );
365 
367  if (init(client_info)){
368  fi_freeinfo(client_hints);
369  fi_freeinfo(client_info);
370  CRASH_WITH_MESSAGE("failed to initialize client endpoint.\n");
371  }
373  fi_connect(ep.get(), remote_cm_data.pep_addr, NULL, 0),
374  "fi_connect() failed", CRASH_ON_FAILURE
375  );
376 
378  nRead = fi_eq_sread(this->eq.get(), &event, &entry, sizeof(entry), -1, 0);
379  if (nRead != sizeof(entry)) {
380  CRASH_WITH_MESSAGE("failed to connect remote. nRead=%ld.\n",nRead);
381  }
382  if (event != FI_CONNECTED || entry.fid != &(ep->fid)) {
383  fi_freeinfo(client_hints);
384  fi_freeinfo(client_info);
385  CRASH_WITH_MESSAGE("RDMC Unexpected CM event: %d.\n", event);
386  }
387  fi_freeinfo(client_hints);
388  fi_freeinfo(client_info);
389  }
390 
391  post_recvs(this);
392  int tmp = -1;
393  if (!rdmc_connections->exchange(remote_index, 0, tmp) || tmp != 0)
394  CRASH_WITH_MESSAGE("Failed to sync after endpoint creation");
395 }
396 
397 bool endpoint::post_send(const memory_region& mr, size_t offset, size_t size,
398  uint64_t wr_id, uint32_t immediate,
399  const message_type& type) {
400  struct iovec msg_iov;
401  struct fi_msg msg;
402 
403  msg_iov.iov_base = mr.buffer + offset;
404  msg_iov.iov_len = size;
405 
406  msg.msg_iov = &msg_iov;
407  msg.desc = (void**)&mr.mr->key;
408  msg.iov_count = 1;
409  msg.addr = 0;
410  msg.context = (void*)(wr_id | ((uint64_t)*type.tag << type.shift_bits) | ((uint64_t)RDMA_OP_SEND) << OP_BITS_SHIFT);
411  msg.data = immediate;
412 
414  fi_sendmsg(ep.get(), &msg, FI_COMPLETION|FI_REMOTE_CQ_DATA),
415  "fi_sendmsg() failed", REPORT_ON_FAILURE
416  );
417  return true;
418 }
419 
420 bool endpoint::post_recv(const memory_region& mr, size_t offset, size_t size,
421  uint64_t wr_id, const message_type& type) {
422  struct iovec msg_iov;
423  struct fi_msg msg;
424 
425  msg_iov.iov_base = mr.buffer + offset;
426  msg_iov.iov_len = size;
427 
428  msg.msg_iov = &msg_iov;
429  msg.desc = (void**)&mr.mr->key;
430  msg.iov_count = 1;
431  msg.addr = 0;
432  msg.context = (void*)(wr_id | ((uint64_t)*type.tag << type.shift_bits) | ((uint64_t)RDMA_OP_RECV) << OP_BITS_SHIFT);
433 
435  fi_recvmsg(ep.get(), &msg, FI_COMPLETION),
436  "fi_recvmsg() failed", REPORT_ON_FAILURE
437  );
438  return true;
439 }
440 
441 bool endpoint::post_empty_send(uint64_t wr_id, uint32_t immediate,
442  const message_type& type) {
443  struct fi_msg msg;
444 
445  memset(&msg, 0, sizeof(msg));
446  msg.context = (void*)(wr_id | ((uint64_t)*type.tag << type.shift_bits) | ((uint64_t)RDMA_OP_SEND) << OP_BITS_SHIFT);
447  msg.data = immediate;
448 
450  fi_sendmsg(ep.get(), &msg, FI_COMPLETION|FI_REMOTE_CQ_DATA),
451  "fi_sendmsg() failed", REPORT_ON_FAILURE
452  );
453  return true;
454 }
455 
456 bool endpoint::post_empty_recv(uint64_t wr_id, const message_type& type) {
457  struct fi_msg msg;
458 
459  memset(&msg, 0, sizeof(msg));
460  msg.context = (void*)(wr_id | ((uint64_t)*type.tag << type.shift_bits) | ((uint64_t)RDMA_OP_RECV) << OP_BITS_SHIFT);
461 
463  fi_recvmsg(ep.get(), &msg, FI_COMPLETION),
464  "fi_recvmsg() failed", REPORT_ON_FAILURE
465  );
466  return true;
467 }
468 
469 bool endpoint::post_write(const memory_region& mr, size_t offset, size_t size,
470  uint64_t wr_id, remote_memory_region remote_mr,
471  size_t remote_offset, const message_type& type,
472  bool signaled, bool send_inline) {
473  if(wr_id >> type.shift_bits || !type.tag) throw invalid_args();
474  if(mr.size < offset + size || remote_mr.size < remote_offset + size) {
475  cout << "mr.size = " << mr.size << " offset = " << offset
476  << " length = " << size << " remote_mr.size = " << remote_mr.size
477  << " remote_offset = " << remote_offset;
478  return false;
479  }
480 
481  struct iovec msg_iov;
482  struct fi_rma_iov rma_iov;
483  struct fi_msg_rma msg;
484 
485  msg_iov.iov_base = mr.buffer + offset;
486  msg_iov.iov_len = size;
487 
488  rma_iov.addr = ((LF_USE_VADDR) ? remote_mr.buffer : 0) + remote_offset;
489  rma_iov.len = size;
490  rma_iov.key = remote_mr.rkey;
491 
492  msg.msg_iov = &msg_iov;
493  msg.desc = (void**)&mr.mr->key;
494  msg.iov_count = 1;
495  msg.addr = 0;
496  msg.rma_iov = &rma_iov;
497  msg.rma_iov_count = 1;
498  msg.context = (void*)(wr_id | ((uint64_t)*type.tag << type.shift_bits) | ((uint64_t)RDMA_OP_WRITE) << OP_BITS_SHIFT);
499  // msg.data = RDMA_OP_WRITE;
500 
502  fi_writemsg(ep.get(), &msg, FI_COMPLETION),
503  "fi_writemsg() failed", REPORT_ON_FAILURE
504  );
505 
506  return true;
507 }
508 
509 message_type::message_type(const std::string& name, completion_handler send_handler,
510  completion_handler recv_handler,
511  completion_handler write_handler) {
512 
513  std::lock_guard<std::mutex> l(completion_handlers_mutex);
514 
515  //if(completion_handlers.size() >= std::numeric_limits<tag_type>::max())
516  // throw message_types_exhausted();
517 
518  tag = completion_handlers.size();
519 
521  set.send = send_handler;
522  set.recv = recv_handler;
523  set.write = write_handler;
524  set.name = name;
525  completion_handlers.push_back(set);
526 }
527 
529  static message_type m(std::numeric_limits<tag_type>::max());
530  return m;
531 }
532 
534  int dummy;
535 };
536 
537 task::task(std::shared_ptr<manager_endpoint> manager_ep) {
538  return;
539 }
540 
542 
543 void task::append_wait(const completion_queue& cq, int count, bool signaled,
544  bool last, uint64_t wr_id, const message_type& type) {
545  throw unsupported_feature();
546 }
547 
549  throw unsupported_feature();
550 }
551 
553  size_t offset, size_t length, uint32_t immediate) {
554  throw unsupported_feature();
555 }
557  size_t offset, size_t length) {
558  throw unsupported_feature();
559 }
560 
561 bool task::post() {
562  throw unsupported_feature();
563 }
564 
565 namespace impl {
570  uint32_t new_id,
571  const std::pair<ip_addr_t, uint16_t> &new_ip_addr_and_port) {
572  return rdmc_connections->add_node(new_id, new_ip_addr_and_port);
573 }
574 
578 bool lf_remove_connection(uint32_t node_id) {
579  return rdmc_connections->delete_node(node_id);
580 }
581 
582 static atomic<bool> interrupt_mode;
583 static atomic<bool> polling_loop_shutdown_flag;
584 static void polling_loop() {
585  pthread_setname_np(pthread_self(), "rdmc_poll");
586 
587  const int max_cq_entries = 1024;
588  unique_ptr<fi_cq_data_entry[]> cq_entries(new fi_cq_data_entry[max_cq_entries]);
589 
590  while (true) {
591  int num_completions = 0;
592  while (num_completions == 0 || num_completions == -FI_EAGAIN) {
593  if (polling_loop_shutdown_flag) return;
594  uint64_t poll_end = get_time() + (interrupt_mode ? 0L : 50000000L);
595  do {
596  if(polling_loop_shutdown_flag) return;
597  num_completions = fi_cq_read(g_ctxt.cq, cq_entries.get(), max_cq_entries);
598  } while((num_completions == 0 || num_completions == -FI_EAGAIN) && get_time() < poll_end);
599 
600  if (num_completions == 0 || num_completions == -FI_EAGAIN) {
603  num_completions = fi_cq_read(g_ctxt.cq, cq_entries.get(), max_cq_entries);
604 
605  if (num_completions == 0 || num_completions == -FI_EAGAIN) {
606  pollfd file_descriptor;
607  fi_control(&g_ctxt.cq->fid, FI_GETWAIT, &file_descriptor);
608  int rc = 0;
609  while (rc == 0 && !polling_loop_shutdown_flag) {
610  if(polling_loop_shutdown_flag) return;
611  file_descriptor.events = POLLIN|POLLERR|POLLHUP;
612  file_descriptor.revents = 0;
613  rc = poll(&file_descriptor, 1, 50);
614  }
615 
616  if (rc > 0) {
617  num_completions = fi_cq_read(g_ctxt.cq, cq_entries.get(), max_cq_entries);
618  }
619  }
620  }
621  }
622 
623  if (num_completions < 0) {
624  cout << "Failed to read from completion queue, fi_cq_read returned "
625  << num_completions << std::endl;
626  }
627 
628  std::lock_guard<std::mutex> l(completion_handlers_mutex);
629  for (int i = 0; i < num_completions; i++) {
630  fi_cq_data_entry &cq_entry = cq_entries[i];
631 
632  message_type::tag_type type = (uint64_t)cq_entry.op_context >> message_type::shift_bits;
633  if (type == std::numeric_limits<message_type::tag_type>::max())
634  continue;
635 
636  uint64_t masked_wr_id = (uint64_t)cq_entry.op_context & 0x0000ffffffffffffull;
637  uint32_t opcode = (uint32_t)EXTRACT_RDMA_OP_CODE(cq_entry.op_context);
638  uint32_t immediate = cq_entry.data;
639  if (type >= completion_handlers.size()) {
640  // Unrecognized message type
641  } else if (opcode == RDMA_OP_SEND) {
642  completion_handlers[type].send(masked_wr_id, immediate,
643  cq_entry.len);
644  } else if (opcode == RDMA_OP_RECV) {
645  completion_handlers[type].recv(masked_wr_id, immediate,
646  cq_entry.len);
647  } else if (opcode == RDMA_OP_WRITE) {
648  completion_handlers[type].write(masked_wr_id, immediate,
649  cq_entry.len);
650  } else {
651  puts("Sent unrecognized completion type?!");
652  }
653  }
654  }
655 }
656 
660 bool lf_initialize(const std::map<node_id_t, std::pair<ip_addr_t, uint16_t>>
661  &ip_addrs_and_ports,
662  uint32_t node_rank) {
663 
665  // connection_listener =
666  // make_unique<tcp::connection_listener>(derecho::rdmc_tcp_port);
667 
669  rdmc_connections = new tcp::tcp_connections(node_rank, ip_addrs_and_ports);
670 
672  default_context();
673  // load_configuration();
674 
675  dbg_default_debug(fi_tostr(g_ctxt.hints, FI_TYPE_INFO));
678  fi_getinfo(LF_VERSION, NULL, NULL, 0, g_ctxt.hints, &(g_ctxt.fi)),
679  "fi_getinfo() failed", CRASH_ON_FAILURE);
680 
681  FAIL_IF_NONZERO_RETRY_EAGAIN(fi_fabric(g_ctxt.fi->fabric_attr, &(g_ctxt.fabric), NULL),
682  "fi_fabric() failed", CRASH_ON_FAILURE);
684  "fi_domain() failed", CRASH_ON_FAILURE);
686  fi_cq_open(g_ctxt.domain, &(g_ctxt.cq_attr), &(g_ctxt.cq), NULL),
687  "failed to initialize tx completion queue", CRASH_ON_FAILURE);
688  FAIL_IF_ZERO(g_ctxt.cq, "Pointer to completion queue is null",
690 
693  "failed to open the event queue for passive endpoint",
695  FAIL_IF_NONZERO_RETRY_EAGAIN(fi_passive_ep(g_ctxt.fabric, g_ctxt.fi, &g_ctxt.pep, NULL),
696  "failed to open a local passive endpoint", CRASH_ON_FAILURE);
697  FAIL_IF_NONZERO_RETRY_EAGAIN(fi_pep_bind(g_ctxt.pep, &g_ctxt.peq->fid, 0),
698  "failed to bind event queue to passive endpoint",
701  "failed to prepare passive endpoint for incoming connections",
704  fi_getname(&g_ctxt.pep->fid, g_ctxt.pep_addr, &g_ctxt.pep_addr_len),
705  "failed to get the local PEP address", CRASH_ON_FAILURE);
707  "local name is too big to fit in local buffer",
709  // event queue moved to endpoint.
710  // FAIL_IF_NONZERO_RETRY_EAGAIN(
711  // fi_eq_open(g_ctxt.fabric, &g_ctxt.eq_attr, &g_ctxt.eq, NULL),
712  // "failed to open the event queue for rdma transmission.",
713  // CRASH_ON_FAILURE
714  // );
715 
717  std::thread polling_thread(polling_loop);
718  polling_thread.detach();
719 
720  return true;
721 }
722 
723 bool lf_destroy() {
724  return false;
725 }
726 
727 std::map<uint32_t, remote_memory_region> lf_exchange_memory_regions(
728  const std::vector<uint32_t>& members, uint32_t node_rank,
729  const memory_region& mr) {
731  map<uint32_t, remote_memory_region> remote_mrs;
732  for (uint32_t m : members) {
733  if (m == node_rank) {
734  continue;
735  }
736 
737  uint64_t buffer;
738  size_t size;
739  uint64_t rkey;
740 
741  if(!rdmc_connections->exchange(m, (uint64_t)mr.buffer, buffer) ||
742  !rdmc_connections->exchange(m, mr.size, size) ||
743  !rdmc_connections->exchange(m, mr.get_key(), rkey)) {
744  fprintf(stderr, "WARNING: lost connection to node %u\n", m);
745  throw rdma::connection_broken();
746  }
747  remote_mrs.emplace(m, remote_memory_region(buffer, size, rkey));
748  }
749  return remote_mrs;
750 }
751 
752 bool set_interrupt_mode(bool enabled) {
753  interrupt_mode = enabled;
754  return true;
755 }
756 
757 }/* namespace impl */
758 }/* namespace rdma */
Definition: util.hpp:38
void append_recv(const managed_endpoint &ep, const memory_region &mr, size_t offset, size_t length)
Definition: lf_helper.cpp:556
partial_wrapped< Tag, Ret, NewClass, Args... > tag(Ret(NewClass::*fun)(Args...))
User-facing entry point for the series of functions that binds a FunctionTag to a class&#39;s member func...
struct fi_cq_attr cq_attr
event queue attributes
Definition: lf_helper.cpp:128
struct fid_cq * cq
event queue for connection management
Definition: lf_helper.cpp:124
bool add_node(node_id_t new_id, const std::pair< ip_addr_t, uint16_t > &new_ip_addr_and_port)
Adds a TCP connection to a new node.
std::function< void(uint64_t tag, uint32_t immediate, size_t length)> completion_handler
Definition: lf_helper.hpp:131
completion_handler send
Definition: lf_helper.cpp:105
uint64_t get_time()
Definition: time.h:13
std::thread polling_thread
Definition: lf.cpp:99
const int32_t getInt32(const std::string &key) const
Definition: conf.hpp:146
std::unique_ptr< fid_mr, std::function< void(fid_mr *)> > mr
Smart pointer for managing the registered memory region.
Definition: lf_helper.hpp:55
const std::string & getConfString(const std::string &key)
Definition: conf.cpp:110
A wrapper for fi_close.
Definition: lf_helper.hpp:53
#define FAIL_IF_NONZERO_RETRY_EAGAIN(x, desc, next)
Definition: lf_helper.cpp:54
struct fi_info * hints
Definition: lf_helper.cpp:117
bool exchange(node_id_t node_id, T local, T &remote)
STL namespace.
static message_type ignored()
Definition: lf_helper.cpp:528
completion_handler recv
Definition: lf_helper.cpp:106
uint64_t get_key() const
get_key Returns the key associated with the registered memory region, which is used to access the reg...
Definition: lf_helper.cpp:226
completion_handler write
Definition: lf_helper.cpp:107
void append_send(const managed_endpoint &ep, const memory_region &mr, size_t offset, size_t length, uint32_t immediate)
Definition: lf_helper.cpp:552
struct lf_ctxt g_ctxt
The global context for libfabric.
Definition: lf_helper.cpp:131
A C++ wrapper for the libfabric fid_cq struct and its associated functions.
Definition: lf_helper.hpp:115
void append_wait(const completion_queue &cq, int count, bool signaled, bool last, uint64_t wr_id, const message_type &type)
Definition: lf_helper.cpp:543
bool sync(uint32_t r_id)
Definition: lf_helper.cpp:303
#define dbg_default_debug(...)
Definition: logger.hpp:42
char pep_addr[MAX_LF_ADDR_SIZE]
length of local pep address
Definition: lf_helper.cpp:126
uint32_t node_rank
Definition: experiment.cpp:45
std::unique_ptr< fid_eq, std::function< void(fid_eq *)> > eq
Smart pointer for managing the endpoint.
Definition: lf_helper.hpp:160
bool post_send(const memory_region &mr, size_t offset, size_t size, uint64_t wr_id, uint32_t immediate, const message_type &type)
post_send Uses the libfabrics API to post a buffer to an endpoint.
Definition: lf_helper.cpp:397
Listener to detect new incoming connections.
Definition: lf_helper.cpp:104
#define FAIL_IF_ZERO(x, desc, next)
Definition: lf_helper.cpp:69
uint32_t rkey
Remote key.
Definition: verbs.hpp:290
Contains functions and classes for low-level RDMA operations, such as setting up memory regions and q...
Definition: lf_helper.hpp:28
#define CONF_RDMA_PROVIDER
Definition: conf.hpp:45
static constexpr unsigned int shift_bits
Definition: lf_helper.hpp:136
completion_queue()
Constructor Uses the libfabrics API to open a completion queue.
Definition: lf_helper.cpp:231
void append_enable_send(const managed_endpoint &ep, int count)
Definition: lf_helper.cpp:548
static const Conf * get() noexcept
Definition: conf.cpp:102
task(std::shared_ptr< manager_endpoint > manager_ep)
Definition: lf_helper.cpp:537
char pep_addr[MAX_LF_ADDR_SIZE]
local endpoint address length
Definition: lf_helper.cpp:31
struct fi_eq_attr eq_attr
local pep address
Definition: lf_helper.cpp:127
uint32_t pep_addr_len
Definition: lf_helper.cpp:87
void connect(size_t remote_index, bool is_lf_server, std::function< void(endpoint *)> post_recvs)
connect Uses the initialized endpoint to connect to a remote node
Definition: lf_helper.cpp:309
memory_region(size_t size)
Constructor Creates a buffer of the specified size and then calls the second constructor with the new...
Definition: lf_helper.cpp:203
bool lf_remove_connection(uint32_t node_id)
Removes a node&#39;s TCP connection, presumably because it has failed.
Definition: lf_helper.cpp:578
#define CONF_RDMA_RX_DEPTH
Definition: conf.hpp:48
bool delete_node(node_id_t remove_id)
Removes a node from the managed set of TCP connections, closing the socket connected to it...
#define CRASH_WITH_MESSAGE(...)
Definition: lf_helper.cpp:42
#define OP_BITS_SHIFT
Definition: lf_helper.cpp:141
size_t pep_addr_len
completion queue for all rma operations
Definition: lf_helper.cpp:125
#define dbg_default_trace(...)
Definition: logger.hpp:40
std::map< uint32_t, remote_memory_region > lf_exchange_memory_regions(const std::vector< uint32_t > &members, uint32_t node_rank, const memory_region &mr)
Definition: lf_helper.cpp:727
bool lf_add_connection(uint32_t new_id, const std::pair< ip_addr_t, uint16_t > &new_ip_addr_and_port)
Adds a node to the group via tcp.
Definition: lf_helper.cpp:569
#define dbg_default_flush()
Definition: logger.hpp:52
virtual ~endpoint()
Definition: lf_helper.cpp:245
const size_t size
Definition: lf_helper.hpp:89
bool post_recv(const memory_region &mr, size_t offset, size_t size, uint64_t wr_id, const message_type &type)
post_recv Uses the libfabrics API to post a buffer to the recv queue of an endpoint.
Definition: lf_helper.cpp:420
A C++ wrapper for the libfabric fid_ep struct and its associated functions.
Definition: lf_helper.hpp:157
#define LF_VERSION
Definition: lf_helper.hpp:16
uint32_t node_id_t
Type alias for Node IDs in a Derecho group.
#define CONF_RDMA_TX_DEPTH
Definition: conf.hpp:47
#define CONF_RDMA_DOMAIN
Definition: conf.hpp:46
#define EXTRACT_RDMA_OP_CODE(x)
Definition: lf_helper.cpp:143
std::unique_ptr< char[]> allocated_buffer
Smart pointer for managing the buffer the mr uses.
Definition: lf_helper.hpp:57
NextOnFailure
Testing tools from Weijia&#39;s sst code.
Definition: lf_helper.cpp:50
struct fid_fabric * fabric
fabric information
Definition: lf_helper.cpp:119
bool set_interrupt_mode(bool enabled)
Definition: lf_helper.cpp:752
char pep_addr[MAX_LF_ADDR_SIZE]
local endpoint address length
Definition: lf_helper.cpp:88
struct fi_info * fi
hints
Definition: lf_helper.cpp:118
int init(struct fi_info *fi)
init Creates an endpoint, and then initializes/enables it
Definition: lf_helper.cpp:253
char *const buffer
Definition: lf_helper.hpp:88
bool lf_destroy()
Definition: lf_helper.cpp:723
std::optional< tag_type > tag
Definition: lf_helper.hpp:139
#define LF_USE_VADDR
Definition: lf_helper.cpp:133
struct rdma::cm_con_data_t __attribute__((packed))
bool lf_initialize(const std::map< uint32_t, std::pair< ip_addr_t, uint16_t >> &ip_addrs_and_ports, uint32_t node_rank)
Initialize the global context.
Definition: lf_helper.cpp:660
bool post_write(const memory_region &mr, size_t offset, size_t size, uint64_t wr_id, remote_memory_region remote_mr, size_t remote_offset, const message_type &type, bool signaled=false, bool send_inline=false)
Definition: lf_helper.cpp:469
virtual ~task()
Definition: lf_helper.cpp:541
std::unique_ptr< fid_ep, std::function< void(fid_ep *)> > ep
Definition: lf_helper.hpp:161
bool post() __attribute__((warn_unused_result))
Definition: lf_helper.cpp:561
Global states.
Definition: lf_helper.cpp:116
bool post_empty_recv(uint64_t wr_id, const message_type &type)
Definition: lf_helper.cpp:456
bool post_empty_send(uint64_t wr_id, uint32_t immediate, const message_type &type)
Definition: lf_helper.cpp:441
struct fid_pep * pep
domain handle
Definition: lf_helper.cpp:121
struct fid_domain * domain
fabric handle
Definition: lf_helper.cpp:120
#define MAX_LF_ADDR_SIZE
Passive endpoint info to be exchange.
Definition: lf_helper.cpp:85
struct fid_eq * peq
passive endpoint for receiving connection
Definition: lf_helper.cpp:122
tcp::tcp_connections * rdmc_connections
Object to hold the tcp connections for every node.
Definition: lf_helper.cpp:94