Derecho  0.9
Distributed systems toolkit for RDMA
lf.cpp
Go to the documentation of this file.
1 
5 #include <iostream>
6 #include <stdio.h>
7 #include <unistd.h>
8 #include <stdlib.h>
9 #include <errno.h>
10 #include <string.h>
11 #include <arpa/inet.h>
12 #include <byteswap.h>
13 #include <rdma/fabric.h>
14 #include <rdma/fi_domain.h>
15 #include <rdma/fi_cm.h>
16 #include <rdma/fi_rma.h>
17 #include <rdma/fi_errno.h>
18 
19 #include <derecho/conf/conf.hpp>
20 #include <derecho/utils/logger.hpp>
23 #include <derecho/tcp/tcp.hpp>
26 
27 using std::cout;
28 using std::endl;
29 
30 //from verbs.cpp
31 #if __BYTE_ORDER == __LITTLE_ENDIAN
32 static inline uint64_t htonll(uint64_t x) { return bswap_64(x); }
33 static inline uint64_t ntohll(uint64_t x) { return bswap_64(x); }
34 #elif __BYTE_ORDER == __BIG_ENDIAN
35 static inline uint64_t htonll(uint64_t x) { return x; }
36 static inline uint64_t ntohll(uint64_t x) { return x; }
37 #else
38 #error __BYTE_ORDER is neither
39 __LITTLE_ENDIAN nor __BIG_ENDIAN
40 #endif
41 
42 #ifndef NDEBUG
43  #define RED "\x1B[31m"
44  #define GRN "\x1B[32m"
45  #define YEL "\x1B[33m"
46  #define BLU "\x1B[34m"
47  #define MAG "\x1B[35m"
48  #define CYN "\x1B[36m"
49  #define WHT "\x1B[37m"
50  #define RESET "\x1B[0m"
51 #endif//NDEBUG
52 
53 namespace sst{
57  struct cm_con_data_t {
58  #define MAX_LF_ADDR_SIZE ((128)-sizeof(uint32_t)-2*sizeof(uint64_t))
59  uint32_t pep_addr_len; // local endpoint address length
61  // local endpoint address
62  uint64_t mr_key; // local memory key
63  uint64_t vaddr; // virtual addr
64  } __attribute__((packed));
65 
69  class lf_ctxt {
70  public:
71  // libfabric resources
72  struct fi_info * hints; // hints
73  struct fi_info * fi; // fabric information
74  struct fid_fabric * fabric; // fabric handle
75  struct fid_domain * domain; // domain handle
76  struct fid_pep * pep; // passive endpoint for receiving connection
77  struct fid_eq * peq; // event queue for connection management
78  // struct fid_eq * eq; // event queue for transmitting events --> now move to resources.
79  struct fid_cq * cq; // completion queue for all rma operations
80  size_t pep_addr_len; // length of local pep address
82  // local pep address
83  // configuration resources
84  struct fi_eq_attr eq_attr; // event queue attributes
85  struct fi_cq_attr cq_attr; // completion queue attributes
86  // #define DEFAULT_TX_DEPTH (4096)
87  // uint32_t tx_depth; // transfer depth
88  // #define DEFAULT_RX_DEPTH (4096)
89  // uint32_t rx_depth; // transfer depth
90  // #define DEFAULT_SGE_BATCH_SIZE (8)
91  // uint32_t sge_bat_size; // maximum scatter/gather batch size
92  virtual ~lf_ctxt() {
93  lf_destroy();
94  }
95  };
96  #define LF_CONFIG_FILE "rdma.cfg"
97  #define LF_USE_VADDR ((g_ctxt.fi->domain_attr->mr_mode) & (FI_MR_VIRT_ADDR|FI_MR_BASIC))
98  static bool shutdown = false;
99  std::thread polling_thread;
101  // singlton: global states
103 
107  #define CRASH_WITH_MESSAGE(...) \
108  do { \
109  fprintf(stderr,__VA_ARGS__); \
110  fflush(stderr); \
111  exit(-1); \
112  } while (0);
113  // Test tools
117  };
118  #define FAIL_IF_NONZERO_RETRY_EAGAIN(x,desc,next) \
119  do { \
120  int64_t _int64_r_; \
121  do { \
122  _int64_r_ = (int64_t)(x); \
123  } while ( _int64_r_ == -FI_EAGAIN ); \
124  if (_int64_r_ != 0) { \
125  dbg_default_error("{}:{},ret={},{}",__FILE__,__LINE__,_int64_r_,desc); \
126  fprintf(stderr,"%s:%d,ret=%ld,%s\n",__FILE__,__LINE__,_int64_r_,desc); \
127  if (next == CRASH_ON_FAILURE) { \
128  fflush(stderr); \
129  dbg_default_flush(); \
130  exit(-1); \
131  } \
132  } \
133  } while (0)
134  #define FAIL_IF_ZERO(x,desc,next) \
135  do { \
136  int64_t _int64_r_ = (int64_t)(x); \
137  if (_int64_r_ == 0) { \
138  dbg_default_error("{}:{},{}",__FILE__,__LINE__,desc); \
139  fprintf(stderr,"%s:%d,%s\n",__FILE__,__LINE__,desc); \
140  if (next == CRASH_ON_FAILURE) { \
141  fflush(stderr); \
142  dbg_default_flush(); \
143  exit(-1); \
144  } \
145  } \
146  } while (0)
147 
149  static void default_context() {
150  memset((void*)&g_ctxt,0,sizeof(lf_ctxt));
151  FAIL_IF_ZERO(g_ctxt.hints = fi_allocinfo(),"Fail to allocate fi hints",CRASH_ON_FAILURE);
152  //defaults the hints:
153  g_ctxt.hints->caps = FI_MSG|FI_RMA|FI_READ|FI_WRITE|FI_REMOTE_READ|FI_REMOTE_WRITE;
154  g_ctxt.hints->ep_attr->type = FI_EP_MSG; // use connection based endpoint by default.
155  g_ctxt.hints->mode = ~0; // all modes
156 
157  // g_ctxt.hints->tx_attr->rma_iov_limit = DEFAULT_SGE_BATCH_SIZE; //
158  // g_ctxt.hints->tx_attr->iov_limit = DEFAULT_SGE_BATCH_SIZE;
159  // g_ctxt.hints->rx_attr->iov_limit = DEFAULT_SGE_BATCH_SIZE;
160 
161  if (g_ctxt.cq_attr.format == FI_CQ_FORMAT_UNSPEC) {
162  g_ctxt.cq_attr.format = FI_CQ_FORMAT_CONTEXT;
163  }
164  g_ctxt.cq_attr.wait_obj = FI_WAIT_UNSPEC;
165 
166  g_ctxt.pep_addr_len = MAX_LF_ADDR_SIZE;
167  }
168 
170  static void load_configuration() {
171  FAIL_IF_ZERO(g_ctxt.hints,"hints is not initialized.", CRASH_ON_FAILURE);
172 
173  // dbg_default_info("No RDMA conf file, use the default values.");
174  // provider:
175  FAIL_IF_ZERO(g_ctxt.hints->fabric_attr->prov_name = strdup(derecho::getConfString(CONF_RDMA_PROVIDER).c_str()),
176  "strdup provider name.", CRASH_ON_FAILURE);
177  // domain:
178  FAIL_IF_ZERO(g_ctxt.hints->domain_attr->name = strdup(derecho::getConfString(CONF_RDMA_DOMAIN).c_str()),
179  "strdup domain name.", CRASH_ON_FAILURE);
180  if (strcmp(g_ctxt.hints->fabric_attr->prov_name,"sockets")==0) {
181  g_ctxt.hints->domain_attr->mr_mode = FI_MR_BASIC;
182  } else { // default
183  g_ctxt.hints->domain_attr->mr_mode = FI_MR_LOCAL | FI_MR_ALLOCATED | FI_MR_PROV_KEY | FI_MR_VIRT_ADDR;
184  }
185 
186  // scatter/gather batch size
187  // g_ctxt.sge_bat_size = DEFAULT_SGE_BATCH_SIZE;
188  // send pipeline depth
189  // g_ctxt.tx_depth = DEFAULT_TX_DEPTH;
190  // recv pipeline depth
191  // g_ctxt.rx_depth = DEFAULT_RX_DEPTH;
192 
193  // tx_depth
194  g_ctxt.hints->tx_attr->size = derecho::Conf::get()->getInt32(CONF_RDMA_TX_DEPTH);
195  g_ctxt.hints->rx_attr->size = derecho::Conf::get()->getInt32(CONF_RDMA_RX_DEPTH);
196  }
197 
198  int _resources::init_endpoint(struct fi_info *fi) {
199  int ret = 0;
200  // struct fi_cq_attr cq_attr = g_ctxt.cq_attr;
201 
202  // 1 - open completion queue - use unified cq
203  // cq_attr.size = fi->tx_attr->size;
204  // FAIL_IF_NONZERO_RETRY_EAGAIN(ret = fi_cq_open(g_ctxt.domain, &cq_attr, &(this->txcq), NULL)"initialize tx completion queue.",REPORT_ON_FAILURE);
205  // if(ret) return ret;
206  // FAIL_IF_NONZERO_RETRY_EAGAIN(ret = fi_cq_open(g_ctxt.domain, &cq_attr, &(this->rxcq), NULL)"initialize rx completion queue.",REPORT_ON_FAILURE);
207  // if(ret) return ret;
208  // 2 - open endpoint
209  FAIL_IF_NONZERO_RETRY_EAGAIN(ret = fi_endpoint(g_ctxt.domain, fi, &(this->ep), NULL), "open endpoint.", REPORT_ON_FAILURE);
210  if(ret) return ret;
211  dbg_default_debug("{}:{} init_endpoint:ep->fid={}",__FILE__,__func__,(void*)&this->ep->fid);
212 
213  // 2.5 - open an event queue.
214  FAIL_IF_NONZERO_RETRY_EAGAIN(fi_eq_open(g_ctxt.fabric,&g_ctxt.eq_attr,&this->eq,NULL),"open the event queue for rdma transmission.", CRASH_ON_FAILURE);
215  dbg_default_debug("{}:{} event_queue opened={}",__FILE__,__func__,(void*)&this->eq->fid);
216 
217  // 3 - bind them and global event queue together
218  FAIL_IF_NONZERO_RETRY_EAGAIN(ret = fi_ep_bind(this->ep, &(this->eq)->fid, 0), "bind endpoint and event queue", REPORT_ON_FAILURE);
219  if(ret) return ret;
220  FAIL_IF_NONZERO_RETRY_EAGAIN(ret = fi_ep_bind(this->ep, &(g_ctxt.cq)->fid, FI_RECV | FI_TRANSMIT | FI_SELECTIVE_COMPLETION), "bind endpoint and tx completion queue", REPORT_ON_FAILURE);
221  if(ret) return ret;
222  FAIL_IF_NONZERO_RETRY_EAGAIN(ret = fi_enable(this->ep), "enable endpoint", REPORT_ON_FAILURE);
223  return ret;
224  }
225 
226  void _resources::connect_endpoint(bool is_lf_server) {
227  dbg_default_trace("preparing connection to remote node(id=%d)...\n",this->remote_id);
228  struct cm_con_data_t local_cm_data,remote_cm_data;
229 
230  // STEP 1 exchange CM info
231  dbg_default_trace("Exchanging connection management info.");
232  local_cm_data.pep_addr_len = (uint32_t)htonl((uint32_t)g_ctxt.pep_addr_len);
233  memcpy((void*)&local_cm_data.pep_addr,&g_ctxt.pep_addr,g_ctxt.pep_addr_len);
234  local_cm_data.mr_key = (uint64_t)htonll(this->mr_lwkey);
235  local_cm_data.vaddr = (uint64_t)htonll((uint64_t)this->write_buf); // for pull mode
236 
237  FAIL_IF_ZERO(sst_connections->exchange(this->remote_id,local_cm_data,remote_cm_data),"exchange connection management info.",CRASH_ON_FAILURE);
238 
239  remote_cm_data.pep_addr_len = (uint32_t)ntohl(remote_cm_data.pep_addr_len);
240  this->mr_rwkey = (uint64_t)ntohll(remote_cm_data.mr_key);
241  this->remote_fi_addr = (fi_addr_t)ntohll(remote_cm_data.vaddr);
242  dbg_default_trace("Exchanging connection management info succeeds.");
243 
244  // STEP 2 connect to remote
245  dbg_default_trace("connect to remote node.");
246  ssize_t nRead;
247  struct fi_eq_cm_entry entry;
248  uint32_t event;
249 
250  if (is_lf_server) {
251  dbg_default_trace("connecting as a server.");
252  dbg_default_trace("waiting for connection.");
253 
254  nRead = fi_eq_sread(g_ctxt.peq, &event, &entry, sizeof(entry), -1, 0);
255  if(nRead != sizeof(entry)) {
256  dbg_default_error("failed to get connection from remote.");
257  CRASH_WITH_MESSAGE("failed to get connection from remote. nRead=%ld\n",nRead);
258  }
259  if(init_endpoint(entry.info)){
260  fi_reject(g_ctxt.pep, entry.info->handle, NULL, 0);
261  fi_freeinfo(entry.info);
262  CRASH_WITH_MESSAGE("failed to initialize server endpoint.\n");
263  }
264  if(fi_accept(this->ep, NULL, 0)){
265  fi_reject(g_ctxt.pep, entry.info->handle, NULL, 0);
266  fi_freeinfo(entry.info);
267  CRASH_WITH_MESSAGE("failed to accept connection.\n");
268  }
269  fi_freeinfo(entry.info);
270  } else {
271  // libfabric connection client
272  dbg_default_trace("connecting as a client.\n");
273  dbg_default_trace("initiating a connection.\n");
274 
275  struct fi_info * client_hints = fi_dupinfo(g_ctxt.hints);
276  struct fi_info * client_info = NULL;
277 
278  FAIL_IF_ZERO(client_hints->dest_addr = malloc(remote_cm_data.pep_addr_len),"failed to malloc address space for server pep.",CRASH_ON_FAILURE);
279  memcpy((void*)client_hints->dest_addr,(void*)remote_cm_data.pep_addr,(size_t)remote_cm_data.pep_addr_len);
280  client_hints->dest_addrlen = remote_cm_data.pep_addr_len;
281  FAIL_IF_NONZERO_RETRY_EAGAIN(fi_getinfo(LF_VERSION,NULL,NULL,0,client_hints,&client_info),"fi_getinfo() failed.",CRASH_ON_FAILURE);
282  if(init_endpoint(client_info)){
283  fi_freeinfo(client_hints);
284  fi_freeinfo(client_info);
285  CRASH_WITH_MESSAGE("failed to initialize client endpoint.\n");
286  }
287 
288  FAIL_IF_NONZERO_RETRY_EAGAIN(fi_connect(this->ep, remote_cm_data.pep_addr, NULL, 0),"fi_connect()",CRASH_ON_FAILURE);
289 
290  nRead = fi_eq_sread(this->eq, &event, &entry, sizeof(entry), -1, 0);
291  if (nRead != sizeof(entry)) {
292  dbg_default_error("failed to connect remote.");
293  CRASH_WITH_MESSAGE("failed to connect remote. nRead=%ld.\n",nRead);
294  }
295  dbg_default_debug("{}:{} entry.fid={},this->ep->fid={}",__FILE__,__func__,(void*)entry.fid,(void*)&(this->ep->fid));
296  if (event != FI_CONNECTED || entry.fid != &(this->ep->fid)) {
297  fi_freeinfo(client_hints);
298  fi_freeinfo(client_info);
300  CRASH_WITH_MESSAGE("SST: Unexpected CM event: %d.\n", event);
301  }
302 
303  fi_freeinfo(client_hints);
304  fi_freeinfo(client_info);
305  }
306  }
307 
312  int r_id,
313  char *write_addr,
314  char *read_addr,
315  int size_w,
316  int size_r,
317  int is_lf_server) {
318 
319  dbg_default_trace("resources constructor: this={}",(void*)this);
320 
321  // set remote id
322  this->remote_id = r_id;
323 
324  // set write and read buffer
325  this->write_buf = write_addr;
326  if (!write_addr) {
327  dbg_default_warn("{}:{} called with NULL write_addr!",__FILE__,__func__);
328  }
329  this->read_buf = read_addr;
330  if (!read_addr) {
331  dbg_default_warn("{}:{} called with NULL read_addr!",__FILE__,__func__);
332  }
333 
334 #define LF_RMR_KEY(rid) (((uint64_t)0xf0000000)<<32 | (uint64_t)(rid))
335 #define LF_WMR_KEY(rid) (((uint64_t)0xf8000000)<<32 | (uint64_t)(rid))
336  // register the write buffer
338  fi_mr_reg(
339  g_ctxt.domain,write_buf,size_w,FI_SEND|FI_RECV|FI_READ|FI_WRITE|FI_REMOTE_READ|FI_REMOTE_WRITE,
340  0, 0, 0, &this->write_mr, NULL),
341  // 0, LF_WMR_KEY(r_id), 0, &this->write_mr, NULL),
342  "register memory buffer for write",
344  dbg_default_trace("{}:{} registered memory for remote write: {}:{}",__FILE__,__func__,(void*)write_addr,size_w);
345  // register the read buffer
347  fi_mr_reg(
348  g_ctxt.domain,read_buf,size_r,FI_SEND|FI_RECV|FI_READ|FI_WRITE|FI_REMOTE_READ|FI_REMOTE_WRITE,
349  0, 0, 0, &this->read_mr, NULL),
350  //0, LF_RMR_KEY(r_id), 0, &this->read_mr, NULL),
351  "register memory buffer for read",
353  dbg_default_trace("{}:{} registered memory for remote read: {}:{}",__FILE__,__func__,(void*)read_addr,size_r);
354 
355  this->mr_lrkey = fi_mr_key(this->read_mr);
356  if (this->mr_lrkey == FI_KEY_NOTAVAIL) {
357  CRASH_WITH_MESSAGE("fail to get read memory key.");
358  }
359  this->mr_lwkey = fi_mr_key(this->write_mr);
360  dbg_default_trace("{}:{} local write key:{}, local read key:{}",__FILE__,__func__,(uint64_t)this->mr_lwkey,(uint64_t)this->mr_lrkey);
361  if (this->mr_lwkey == FI_KEY_NOTAVAIL) {
362  CRASH_WITH_MESSAGE("fail to get write memory key.");
363  }
364  // set up the endpoint
365  connect_endpoint(is_lf_server);
366  }
367 
369  dbg_default_trace("resources destructor:this={}",(void*)this);
370  // if(this->txcq)
371  // FAIL_IF_NONZERO_RETRY_EAGAIN(fi_close(&this->txcq->fid),"close txcq",REPORT_ON_FAILURE);
372  // if(this->rxcq)
373  // FAIL_IF_NONZERO_RETRY_EAGAIN(fi_close(&this->rxcq->fid),"close rxcq",REPORT_ON_FAILURE);
374  if(this->ep)
375  FAIL_IF_NONZERO_RETRY_EAGAIN(fi_close(&this->ep->fid),"close endpoint",REPORT_ON_FAILURE);
376  if(this->eq)
377  FAIL_IF_NONZERO_RETRY_EAGAIN(fi_close(&this->eq->fid),"close event",REPORT_ON_FAILURE);
378  if(this->write_mr)
379  FAIL_IF_NONZERO_RETRY_EAGAIN(fi_close(&this->write_mr->fid),"unregister write mr",REPORT_ON_FAILURE);
380  if(this->read_mr)
381  FAIL_IF_NONZERO_RETRY_EAGAIN(fi_close(&this->read_mr->fid),"unregister read mr",REPORT_ON_FAILURE);
382  }
383 
385  struct lf_sender_ctxt *ctxt,
386  const long long int offset,
387  const long long int size,
388  const int op,
389  const bool completion) {
390  // dbg_default_trace("resources::post_remote_send(),this={}",(void*)this);
391  // #ifdef !NDEBUG
392  // printf(YEL "resources::post_remote_send(),this=%p\n" RESET, this);
393  // fflush(stdout);
394  // #endif
395  // dbg_default_trace("resources::post_remote_send(ctxt=({},{}),offset={},size={},op={},completion={})",ctxt?ctxt->ce_idx:0,ctxt?ctxt->remote_id:0,offset,size,op,completion);
396 
397  int ret = 0;
398 
399  if (op == 2) { // two sided send
400  struct fi_msg msg;
401  struct iovec msg_iov;
402 
403  msg_iov.iov_base = read_buf + offset;
404  msg_iov.iov_len = size;
405 
406  msg.msg_iov = &msg_iov;
407  msg.desc = (void**)&this->mr_lrkey;
408  msg.iov_count = 1;
409  msg.addr = 0;
410  msg.context = (void*)ctxt;
411  msg.data = 0l; // not used
412 
413  FAIL_IF_NONZERO_RETRY_EAGAIN(ret = fi_sendmsg(this->ep,&msg,(completion)?(FI_COMPLETION|FI_REMOTE_CQ_DATA):(FI_REMOTE_CQ_DATA)),
414  "fi_sendmsg failed.",
416  } else { // one sided send or receive
417  struct iovec msg_iov;
418  struct fi_rma_iov rma_iov;
419  struct fi_msg_rma msg;
420 
421  msg_iov.iov_base = read_buf + offset;
422  msg_iov.iov_len = size;
423 
424  rma_iov.addr = ((LF_USE_VADDR)?remote_fi_addr:0) + offset;
425  rma_iov.len = size;
426  rma_iov.key = this->mr_rwkey;
427 
428  msg.msg_iov = &msg_iov;
429  msg.desc = (void**)&this->mr_lrkey;
430  msg.iov_count = 1;
431  msg.addr = 0; // not used for a connection endpoint
432  msg.rma_iov = &rma_iov;
433  msg.rma_iov_count = 1;
434  msg.context = (void*)ctxt;
435  msg.data = 0l; // not used
436 
437  // dbg_default_trace("{}:{} calling fi_writemsg/fi_readmsg with",__FILE__,__func__);
438  // dbg_default_trace("remote addr = {} len = {} key = {}",(void*)rma_iov.addr,rma_iov.len,(uint64_t)this->mr_rwkey);
439  // dbg_default_trace("local addr = {} len = {} key = {}",(void*)msg_iov.iov_base,msg_iov.iov_len,(uint64_t)this->mr_lrkey);
440  // dbg_default_flush();
441 
442  if(op == 1) { //write
443  FAIL_IF_NONZERO_RETRY_EAGAIN(ret = fi_writemsg(this->ep,&msg,(completion)?FI_COMPLETION:0),
444  "fi_writemsg failed.",
446  } else { // read op==0
447  FAIL_IF_NONZERO_RETRY_EAGAIN(ret = fi_readmsg(this->ep,&msg,(completion)?FI_COMPLETION:0),
448  "fi_readmsg failed.",
450  }
451  }
452  // dbg_default_trace("post_remote_send return with ret={}",ret);
453  // dbg_default_flush();
454  // #ifdef !NDEBUG
455  // printf(YEL "resources::post_remote_send return with ret=%d\n" RESET, ret);
456  // fflush(stdout);
457  // #endif//!NDEBUG
458  return ret;
459  }
460 
461  void resources::post_remote_read(const long long int size){
462  FAIL_IF_NONZERO_RETRY_EAGAIN(post_remote_send(NULL,0,size,0,false),"post_remote_read(1) failed.",REPORT_ON_FAILURE);
463  }
464 
465  void resources::post_remote_read(const long long int offset, const long long int size){
466  FAIL_IF_NONZERO_RETRY_EAGAIN(post_remote_send(NULL,offset,size,0,false),"post_remote_read(2) failed.",REPORT_ON_FAILURE);
467  }
468 
469  void resources::post_remote_write(const long long int size){
470  FAIL_IF_NONZERO_RETRY_EAGAIN(post_remote_send(NULL,0,size,1,false),"post_remote_write(1) failed.",REPORT_ON_FAILURE);
471  }
472 
473  void resources::post_remote_write(const long long int offset, long long int size){
474  FAIL_IF_NONZERO_RETRY_EAGAIN(post_remote_send(NULL,offset,size,1,false),"post_remote_write(2) failed.",REPORT_ON_FAILURE);
475  }
476 
477  void resources::post_remote_write_with_completion(struct lf_sender_ctxt *ctxt, const long long int size){
478  FAIL_IF_NONZERO_RETRY_EAGAIN(post_remote_send(ctxt,0,size,1,true),"post_remote_write(3) failed.",REPORT_ON_FAILURE);
479  }
480 
481  void resources::post_remote_write_with_completion(struct lf_sender_ctxt *ctxt, const long long int offset, const long long int size){
482  FAIL_IF_NONZERO_RETRY_EAGAIN(post_remote_send(ctxt,offset,size,1,true),"post_remote_write(4) failed.",REPORT_ON_FAILURE);
483  }
484 
485 
490  void resources_two_sided::post_two_sided_send(const long long int size) {
491  int rc = post_remote_send(NULL, 0, size, 2, false);
492  if(rc) {
493  cout << "Could not post RDMA two sided send (with no offset), error code is " << rc << endl;
494  }
495  }
496 
503  void resources_two_sided::post_two_sided_send(const long long int offset, const long long int size) {
504  int rc = post_remote_send(NULL, offset, size, 2, false);
505  if(rc) {
506  cout << "Could not post RDMA two sided send with offset, error code is " << rc << endl;
507  }
508  }
509 
510  void resources_two_sided::post_two_sided_send_with_completion(struct lf_sender_ctxt *ctxt, const long long int size) {
511  int rc = post_remote_send(ctxt, 0, size, 2, true);
512  if(rc) {
513  cout << "Could not post RDMA two sided send (with no offset) with completion, error code is " << rc << endl;
514  }
515  }
516 
517  void resources_two_sided::post_two_sided_send_with_completion(struct lf_sender_ctxt *ctxt, const long long int offset, const long long int size) {
518  int rc = post_remote_send(ctxt, offset, size, 2, true);
519  if(rc) {
520  cout << "Could not post RDMA two sided send with offset and completion, error code is " << rc << ", remote_id is " << ctxt->remote_id << endl;
521  }
522  }
523 
524  void resources_two_sided::post_two_sided_receive(struct lf_sender_ctxt *ctxt, const long long int size) {
525  int rc = post_receive(ctxt, 0, size);
526  if(rc) {
527  cout << "Could not post RDMA two sided receive (with no offset), error code is " << rc << ", remote_id is " << ctxt->remote_id << endl;
528  }
529  }
530 
531  void resources_two_sided::post_two_sided_receive(struct lf_sender_ctxt *ctxt, const long long int offset, const long long int size) {
532  int rc = post_receive(ctxt, offset, size);
533  if(rc) {
534  cout << "Could not post RDMA two sided receive with offset, error code is " << rc << ", remote_id is " << ctxt->remote_id << endl;
535  }
536  }
537 
538  int resources_two_sided::post_receive(struct lf_sender_ctxt *ctxt, const long long int offset, const long long int size) {
539  struct iovec msg_iov;
540  struct fi_msg msg;
541  int ret;
542 
543  msg_iov.iov_base = write_buf + offset;
544  msg_iov.iov_len = size;
545 
546  msg.msg_iov = &msg_iov;
547  msg.desc = (void**)&this->mr_lwkey;
548  msg.iov_count = 1;
549  msg.addr = 0; // not used
550  msg.context = (void*)ctxt;
551  FAIL_IF_NONZERO_RETRY_EAGAIN(ret = fi_recvmsg(this->ep, &msg, FI_COMPLETION|FI_REMOTE_CQ_DATA),
552  "fi_recvmsg",
554  return ret;
555  }
556 
557  bool add_node(uint32_t new_id, const std::pair<ip_addr_t, uint16_t>& new_ip_addr_and_port) {
558  return sst_connections->add_node(new_id, new_ip_addr_and_port);
559  }
560 
561  bool remove_node(uint32_t node_id) {
562  return sst_connections->delete_node(node_id);
563  }
564 
565  bool sync(uint32_t r_id) {
566  int s = 0, t = 0;
567  return sst_connections->exchange(r_id, s, t);
568  }
569 
570  void polling_loop() {
571  pthread_setname_np(pthread_self(), "sst_poll");
572  dbg_default_trace("Polling thread starting.");
573 
574  struct timespec last_time, cur_time;
575  clock_gettime(CLOCK_REALTIME, &last_time);
576 
577  while(!shutdown) {
578  auto ce = lf_poll_completion();
579  if (shutdown) {
580  break;
581  }
582  if (ce.first != 0xFFFFFFFF) {
583  util::polling_data.insert_completion_entry(ce.first, ce.second);
584 
585  // update last time
586  clock_gettime(CLOCK_REALTIME, &last_time);
587  } else {
588  clock_gettime(CLOCK_REALTIME, &cur_time);
589  // check if the system has been inactive for enough time to induce sleep
590  double time_elapsed_in_ms = (cur_time.tv_sec - last_time.tv_sec) * 1e3
591  + (cur_time.tv_nsec - last_time.tv_nsec) / 1e6;
592  if(time_elapsed_in_ms > 1) {
593  using namespace std::chrono_literals;
594  std::this_thread::sleep_for(1ms);
595  }
596  }
597  }
598  dbg_default_trace("Polling thread ending.");
599  }
600 
610  std::pair<uint32_t, std::pair<int32_t, int32_t>> lf_poll_completion() {
611  struct fi_cq_entry entry;
612  int poll_result = 0;
613 
614  struct timespec last_time, cur_time;
615  clock_gettime(CLOCK_REALTIME, &last_time);
616 
617  while(!shutdown) {
618  clock_gettime(CLOCK_REALTIME, &cur_time);
619  // check if the system has been inactive for enough time to induce sleep
620  double time_elapsed_in_ms = (cur_time.tv_sec - last_time.tv_sec) * 1e3
621  + (cur_time.tv_nsec - last_time.tv_nsec) / 1e6;
622  if(time_elapsed_in_ms > 1) {
623  using namespace std::chrono_literals;
624  std::this_thread::sleep_for(1ms);
625  }
626 
627  poll_result = 0;
628  for(int i = 0; i < 50; ++i) {
629  poll_result = fi_cq_read(g_ctxt.cq, &entry, 1);
630  if(poll_result && (poll_result!=-FI_EAGAIN)) {
631  break;
632  }
633  }
634  if(poll_result && (poll_result!=-FI_EAGAIN)) {
635  break;
636  }
637  // util::polling_data.wait_for_requests();
638  }
639  // not sure what to do when we cannot read entries off the CQ
640  // this means that something is wrong with the local node
641  if((poll_result < 0) && (poll_result != -FI_EAGAIN)) {
642  struct fi_cq_err_entry eentry;
643  fi_cq_readerr(g_ctxt.cq, &eentry, 0);
644 
645  dbg_default_error("fi_cq_readerr() read the following error entry:");
646  if (eentry.op_context == NULL) {
647  dbg_default_error("\top_context:NULL");
648  } else {
649 #ifndef NOLOG
650  struct lf_sender_ctxt *sctxt = (struct lf_sender_ctxt *)eentry.op_context;
651 #endif
652  dbg_default_error("\top_context:ce_idx={},remote_id={}",sctxt->ce_idx,sctxt->remote_id);
653  }
654 #ifdef DEBUG_FOR_RELEASE
655  printf("\tflags=%x\n",eentry.flags);
656  printf("\tlen=%x\n",eentry.len);
657  printf("\tbuf=%p\n",eentry.buf);
658  printf("\tdata=0x%x\n",eentry.data);
659  printf("\ttag=0x%x\n",eentry.tag);
660  printf("\tolen=0x%x\n",eentry.olen);
661  printf("\terr=0x%x\n",eentry.err);
662 #endif//DEBUG_FOR_RELEASE
663  dbg_default_error("\tflags={}",eentry.flags);
664  dbg_default_error("\tlen={}",eentry.len);
665  dbg_default_error("\tbuf={}",eentry.buf);
666  dbg_default_error("\tdata={}",eentry.data);
667  dbg_default_error("\ttag={}",eentry.tag);
668  dbg_default_error("\tolen={}",eentry.olen);
669  dbg_default_error("\terr={}",eentry.err);
670 #ifndef NOLOG
671  char errbuf[1024];
672 #endif
673  dbg_default_error("\tprov_errno={}:{}",eentry.prov_errno,
674  fi_cq_strerror(g_ctxt.cq,eentry.prov_errno,eentry.err_data,errbuf,1024));
675 #ifdef DEBUG_FOR_RELEASE
676  printf("\tproverr=0x%x,%s\n",eentry.prov_errno,
677  fi_cq_strerror(g_ctxt.cq,eentry.prov_errno,eentry.err_data,errbuf,1024));
678 #endif//DEBUG_FOR_RELEASE
679  dbg_default_error("\terr_data={}",eentry.err_data);
680  dbg_default_error("\terr_data_size={}",eentry.err_data_size);
681 #ifdef DEBUG_FOR_RELEASE
682  printf("\terr_data_size=%d\n",eentry.err_data_size);
683 #endif//DEBUG_FOR_RELEASE
684  if (eentry.op_context!=NULL){
685  struct lf_sender_ctxt * sctxt = (struct lf_sender_ctxt *)eentry.op_context;
686  return {sctxt->ce_idx, {sctxt->remote_id, -1}};
687  } else {
688  dbg_default_error("\tFailed polling the completion queue");
689  fprintf(stderr,"Failed polling the completion queue");
690  return {(uint32_t)0xFFFFFFFF,{0,-1}}; // we don't know who sent the message.
691  // CRASH_WITH_MESSAGE("failed polling the completion queue");
692  }
693  }
694  if (!shutdown) {
695  struct lf_sender_ctxt * sctxt = (struct lf_sender_ctxt *)entry.op_context;
696  if (sctxt == NULL) {
697  dbg_default_debug("WEIRD: we get an entry with op_context = NULL.");
698  return {0xFFFFFFFFu,{0,0}}; // return a bad entry: weird!!!!
699  } else {
700 // dbg_default_trace("Normal: we get an entry with op_context = {}.",(long long unsigned)sctxt);
701  return {sctxt->ce_idx, {sctxt->remote_id, 1}};
702  }
703  } else { // shutdown return a bad entry
704  return {0,{0,0}};
705  }
706  }
707 
708  void lf_initialize(const std::map<node_id_t, std::pair<ip_addr_t, uint16_t>>
709  &ip_addrs_and_ports,
710  uint32_t node_rank) {
711  // initialize derecho connection manager: This is derived from Sagar's code.
712  // May there be a better desgin?
713  sst_connections = new tcp::tcp_connections(node_rank, ip_addrs_and_ports);
714 
715  // initialize global resources:
716  // STEP 1: initialize with configuration.
717  default_context(); // default the context
718  load_configuration(); // load configuration
719 
720  //dbg_default_info(fi_tostr(g_ctxt.hints,FI_TYPE_INFO));
721  // STEP 2: initialize fabric, domain, and completion queue
722  FAIL_IF_NONZERO_RETRY_EAGAIN(fi_getinfo(LF_VERSION,NULL,NULL,0,g_ctxt.hints,&(g_ctxt.fi)),"fi_getinfo()",CRASH_ON_FAILURE);
723  dbg_default_trace("going to use virtual address?{}",LF_USE_VADDR);
724  FAIL_IF_NONZERO_RETRY_EAGAIN(fi_fabric(g_ctxt.fi->fabric_attr, &(g_ctxt.fabric), NULL),"fi_fabric()",CRASH_ON_FAILURE);
725  FAIL_IF_NONZERO_RETRY_EAGAIN(fi_domain(g_ctxt.fabric, g_ctxt.fi, &(g_ctxt.domain), NULL),"fi_domain()",CRASH_ON_FAILURE);
726  g_ctxt.cq_attr.size = g_ctxt.fi->tx_attr->size;
727  FAIL_IF_NONZERO_RETRY_EAGAIN(fi_cq_open(g_ctxt.domain, &(g_ctxt.cq_attr), &(g_ctxt.cq), NULL),"initialize tx completion queue.",REPORT_ON_FAILURE);
728 
729  // STEP 3: prepare local PEP
730  FAIL_IF_NONZERO_RETRY_EAGAIN(fi_eq_open(g_ctxt.fabric,&g_ctxt.eq_attr,&g_ctxt.peq,NULL),"open the event queue for passive endpoint",CRASH_ON_FAILURE);
731  FAIL_IF_NONZERO_RETRY_EAGAIN(fi_passive_ep(g_ctxt.fabric,g_ctxt.fi,&g_ctxt.pep,NULL),"open a local passive endpoint",CRASH_ON_FAILURE);
732  FAIL_IF_NONZERO_RETRY_EAGAIN(fi_pep_bind(g_ctxt.pep,&g_ctxt.peq->fid,0),"binding event queue to passive endpoint",CRASH_ON_FAILURE);
733  FAIL_IF_NONZERO_RETRY_EAGAIN(fi_listen(g_ctxt.pep),"preparing passive endpoint for incoming connections",CRASH_ON_FAILURE);
734  FAIL_IF_NONZERO_RETRY_EAGAIN(fi_getname(&g_ctxt.pep->fid, g_ctxt.pep_addr, &g_ctxt.pep_addr_len),"get the local PEP address",CRASH_ON_FAILURE);
735  FAIL_IF_NONZERO_RETRY_EAGAIN((g_ctxt.pep_addr_len > MAX_LF_ADDR_SIZE),"local name is too big to fit in local buffer",CRASH_ON_FAILURE);
736  // FAIL_IF_NONZERO_RETRY_EAGAIN(fi_eq_open(g_ctxt.fabric,&g_ctxt.eq_attr,&g_ctxt.eq,NULL),"open the event queue for rdma transmission.", CRASH_ON_FAILURE);
737 
738  // STEP 4: start polling thread.
739  polling_thread = std::thread(polling_loop);
740  // polling_thread.detach();
741  }
742 
744  shutdown = true;
745  if(polling_thread.joinable()) {
746  polling_thread.join();
747  }
748  }
749 
750  void lf_destroy(){
752  // TODO: make sure all resources are destroyed first.
753  if (g_ctxt.pep) {
754  FAIL_IF_NONZERO_RETRY_EAGAIN(fi_close(&g_ctxt.pep->fid),"close passive endpoint",REPORT_ON_FAILURE);
755  }
756  if (g_ctxt.peq) {
757  FAIL_IF_NONZERO_RETRY_EAGAIN(fi_close(&g_ctxt.peq->fid),"close event queue for passive endpoint",REPORT_ON_FAILURE);
758  }
759  if (g_ctxt.cq) {
760  FAIL_IF_NONZERO_RETRY_EAGAIN(fi_close(&g_ctxt.cq->fid),"close completion queue",REPORT_ON_FAILURE);
761  }
762  // g_ctxt.eq has been moved to resources
763  // if (g_ctxt.eq) {
764  // FAIL_IF_NONZERO_RETRY_EAGAIN(fi_close(&g_ctxt.eq->fid),"close event queue",REPORT_ON_FAILURE);
765  // }
766  if (g_ctxt.domain) {
767  FAIL_IF_NONZERO_RETRY_EAGAIN(fi_close(&g_ctxt.domain->fid),"close domain",REPORT_ON_FAILURE);
768  }
769  if (g_ctxt.fabric) {
770  FAIL_IF_NONZERO_RETRY_EAGAIN(fi_close(&g_ctxt.fabric->fid),"close fabric",REPORT_ON_FAILURE);
771  }
772  if (g_ctxt.fi) {
773  fi_freeinfo(g_ctxt.fi);
774  g_ctxt.hints = nullptr;
775  }
776  if (g_ctxt.hints) {
777  fi_freeinfo(g_ctxt.hints);
778  g_ctxt.hints = nullptr;
779  }
780  }
781 }
uint32_t pep_addr_len
Definition: lf.cpp:59
uint32_t remote_id
Definition: lf.hpp:24
Definition: util.hpp:38
#define MAX_LF_ADDR_SIZE
Definition: lf.cpp:34
void insert_completion_entry(uint32_t index, std::pair< int32_t, int32_t > ce)
Definition: poll_utils.cpp:23
uint32_t ce_idx
Definition: lf.hpp:23
#define CRASH_WITH_MESSAGE(...)
Internal Tools.
Definition: lf.cpp:107
bool add_node(node_id_t new_id, const std::pair< ip_addr_t, uint16_t > &new_ip_addr_and_port)
Adds a TCP connection to a new node.
bool add_node(uint32_t new_id, const std::pair< ip_addr_t, uint16_t > &new_ip_addr_and_port)
Adds a new node to the SST TPC connections set.
Definition: lf.cpp:557
std::thread polling_thread
Definition: lf.cpp:99
struct fi_info * hints
Definition: lf.cpp:72
const int32_t getInt32(const std::string &key) const
Definition: conf.hpp:146
void shutdown_polling_thread()
Shutdown the polling thread.
Definition: lf.cpp:743
struct fi_cq_attr cq_attr
Definition: lf.cpp:85
struct fid_pep * pep
Definition: lf.cpp:76
const std::string & getConfString(const std::string &key)
Definition: conf.cpp:110
struct fid_fabric * fabric
Definition: lf.cpp:74
void connect_endpoint(bool is_lf_server)
Connect the queue pair.
Definition: lf.cpp:226
bool exchange(node_id_t node_id, T local, T &remote)
virtual ~_resources()
Destroys the resources.
Definition: lf.cpp:368
std::pair< uint32_t, std::pair< int32_t, int32_t > > lf_poll_completion()
Polls for completion of a single posted remote write.
Definition: lf.cpp:610
bool remove_node(uint32_t node_id)
Removes a node from the SST TCP connections set.
Definition: lf.cpp:561
Global States.
Definition: lf.cpp:69
virtual ~lf_ctxt()
Definition: lf.cpp:92
#define dbg_default_debug(...)
Definition: logger.hpp:42
struct fi_eq_attr eq_attr
Definition: lf.cpp:84
uint32_t node_rank
Definition: experiment.cpp:45
size_t pep_addr_len
Definition: lf.cpp:80
#define dbg_default_error(...)
Definition: logger.hpp:48
PollingData polling_data
Definition: poll_utils.cpp:17
#define CONF_RDMA_PROVIDER
Definition: conf.hpp:45
void lf_initialize(const std::map< uint32_t, std::pair< ip_addr_t, uint16_t >> &ip_addrs_and_ports, uint32_t node_rank)
Initializes the global libfabric resources.
Definition: lf.cpp:708
tcp::tcp_connections * sst_connections
Definition: lf.cpp:100
void post_two_sided_send_with_completion(struct lf_sender_ctxt *ctxt, const long long int size)
Definition: lf.cpp:510
static const Conf * get() noexcept
Definition: conf.cpp:102
char pep_addr[MAX_LF_ADDR_SIZE]
Definition: lf.cpp:60
#define FAIL_IF_ZERO(x, desc, next)
Definition: lf.cpp:134
lf_ctxt g_ctxt
Definition: lf.cpp:102
int init_endpoint(struct fi_info *fi)
Initialize resource endpoint using fi_info.
Definition: lf.cpp:198
#define CONF_RDMA_RX_DEPTH
Definition: conf.hpp:48
uint64_t vaddr
Definition: lf.cpp:63
bool delete_node(node_id_t remove_id)
Removes a node from the managed set of TCP connections, closing the socket connected to it...
int post_receive(struct lf_sender_ctxt *ctxt, const long long int offset, const long long int size)
Definition: lf.cpp:538
void post_two_sided_receive(struct lf_sender_ctxt *ctxt, const long long int size)
Definition: lf.cpp:524
void post_remote_read(const long long int size)
Post an RDMA read at the beginning address of remote memory.
Definition: lf.cpp:461
NextOnFailure
Definition: lf.cpp:114
uint64_t mr_key
Definition: lf.cpp:62
bool sync(uint32_t r_id)
Blocks the current thread until both this node and a remote node reach this function, which exchanges some trivial data over a TCP connection.
Definition: lf.cpp:565
void post_two_sided_send(const long long int size)
Definition: lf.cpp:490
#define dbg_default_trace(...)
Definition: logger.hpp:40
#define dbg_default_flush()
Definition: logger.hpp:52
struct fid_eq * peq
Definition: lf.cpp:77
#define LF_VERSION
Definition: lf_helper.hpp:16
uint32_t node_id_t
Type alias for Node IDs in a Derecho group.
#define CONF_RDMA_TX_DEPTH
Definition: conf.hpp:47
#define CONF_RDMA_DOMAIN
Definition: conf.hpp:46
void lf_destroy()
Destroys the global libfabric resources.
Definition: lf.cpp:750
struct fid_cq * cq
Definition: lf.cpp:79
#define FAIL_IF_NONZERO_RETRY_EAGAIN(x, desc, next)
Definition: lf.cpp:118
char pep_addr[MAX_LF_ADDR_SIZE]
Definition: lf.cpp:81
#define LF_USE_VADDR
Definition: lf.cpp:97
struct fid_domain * domain
Definition: lf.cpp:75
int post_remote_send(struct lf_sender_ctxt *ctxt, const long long int offset, const long long int size, const int op, const bool completion)
post read/write request
Definition: lf.cpp:384
struct fi_info * fi
Definition: lf.cpp:73
void post_remote_write_with_completion(struct lf_sender_ctxt *ctxt, const long long int size)
Definition: lf.cpp:477
Structure to exchange the data needed to connect the Queue Pairs.
Definition: verbs.hpp:17
_resources(int r_id, char *write_addr, char *read_addr, int size_w, int size_r, int is_lf_server)
Constructor Initializes the resources.
Definition: lf.cpp:311
struct sst::verbs_sender_ctxt __attribute__
void post_remote_write(const long long int size)
Post an RDMA write at the beginning address of remote memory.
Definition: lf.cpp:469
void polling_loop()
Definition: lf.cpp:570
#define dbg_default_warn(...)
Definition: logger.hpp:46