11 #include <arpa/inet.h> 13 #include <rdma/fabric.h> 14 #include <rdma/fi_domain.h> 15 #include <rdma/fi_cm.h> 16 #include <rdma/fi_rma.h> 17 #include <rdma/fi_errno.h> 31 #if __BYTE_ORDER == __LITTLE_ENDIAN 32 static inline uint64_t htonll(uint64_t x) {
return bswap_64(x); }
33 static inline uint64_t ntohll(uint64_t x) {
return bswap_64(x); }
34 #elif __BYTE_ORDER == __BIG_ENDIAN 35 static inline uint64_t htonll(uint64_t x) {
return x; }
36 static inline uint64_t ntohll(uint64_t x) {
return x; }
38 #error __BYTE_ORDER is neither 39 __LITTLE_ENDIAN nor __BIG_ENDIAN
43 #define RED "\x1B[31m" 44 #define GRN "\x1B[32m" 45 #define YEL "\x1B[33m" 46 #define BLU "\x1B[34m" 47 #define MAG "\x1B[35m" 48 #define CYN "\x1B[36m" 49 #define WHT "\x1B[37m" 50 #define RESET "\x1B[0m" 57 struct cm_con_data_t {
58 #define MAX_LF_ADDR_SIZE ((128)-sizeof(uint32_t)-2*sizeof(uint64_t)) 84 struct fi_eq_attr eq_attr;
85 struct fi_cq_attr cq_attr;
96 #define LF_CONFIG_FILE "rdma.cfg" 97 #define LF_USE_VADDR ((g_ctxt.fi->domain_attr->mr_mode) & (FI_MR_VIRT_ADDR|FI_MR_BASIC)) 98 static bool shutdown =
false;
107 #define CRASH_WITH_MESSAGE(...) \ 109 fprintf(stderr,__VA_ARGS__); \ 118 #define FAIL_IF_NONZERO_RETRY_EAGAIN(x,desc,next) \ 122 _int64_r_ = (int64_t)(x); \ 123 } while ( _int64_r_ == -FI_EAGAIN ); \ 124 if (_int64_r_ != 0) { \ 125 dbg_default_error("{}:{},ret={},{}",__FILE__,__LINE__,_int64_r_,desc); \ 126 fprintf(stderr,"%s:%d,ret=%ld,%s\n",__FILE__,__LINE__,_int64_r_,desc); \ 127 if (next == CRASH_ON_FAILURE) { \ 129 dbg_default_flush(); \ 134 #define FAIL_IF_ZERO(x,desc,next) \ 136 int64_t _int64_r_ = (int64_t)(x); \ 137 if (_int64_r_ == 0) { \ 138 dbg_default_error("{}:{},{}",__FILE__,__LINE__,desc); \ 139 fprintf(stderr,"%s:%d,%s\n",__FILE__,__LINE__,desc); \ 140 if (next == CRASH_ON_FAILURE) { \ 142 dbg_default_flush(); \ 149 static void default_context() {
150 memset((
void*)&g_ctxt,0,
sizeof(lf_ctxt));
153 g_ctxt.hints->caps = FI_MSG|FI_RMA|FI_READ|FI_WRITE|FI_REMOTE_READ|FI_REMOTE_WRITE;
154 g_ctxt.hints->ep_attr->type = FI_EP_MSG;
155 g_ctxt.hints->mode = ~0;
161 if (g_ctxt.cq_attr.format == FI_CQ_FORMAT_UNSPEC) {
162 g_ctxt.cq_attr.format = FI_CQ_FORMAT_CONTEXT;
164 g_ctxt.cq_attr.wait_obj = FI_WAIT_UNSPEC;
170 static void load_configuration() {
180 if (strcmp(g_ctxt.
hints->fabric_attr->prov_name,
"sockets")==0) {
181 g_ctxt.
hints->domain_attr->mr_mode = FI_MR_BASIC;
183 g_ctxt.
hints->domain_attr->mr_mode = FI_MR_LOCAL | FI_MR_ALLOCATED | FI_MR_PROV_KEY | FI_MR_VIRT_ADDR;
211 dbg_default_debug(
"{}:{} init_endpoint:ep->fid={}",__FILE__,__func__,(
void*)&this->ep->fid);
215 dbg_default_debug(
"{}:{} event_queue opened={}",__FILE__,__func__,(
void*)&this->eq->fid);
227 dbg_default_trace(
"preparing connection to remote node(id=%d)...\n",this->remote_id);
232 local_cm_data.pep_addr_len = (uint32_t)htonl((uint32_t)g_ctxt.
pep_addr_len);
234 local_cm_data.mr_key = (uint64_t)htonll(this->mr_lwkey);
235 local_cm_data.vaddr = (uint64_t)htonll((uint64_t)this->write_buf);
239 remote_cm_data.pep_addr_len = (uint32_t)ntohl(remote_cm_data.pep_addr_len);
240 this->mr_rwkey = (uint64_t)ntohll(remote_cm_data.mr_key);
241 this->remote_fi_addr = (fi_addr_t)ntohll(remote_cm_data.vaddr);
247 struct fi_eq_cm_entry entry;
254 nRead = fi_eq_sread(g_ctxt.
peq, &event, &entry,
sizeof(entry), -1, 0);
255 if(nRead !=
sizeof(entry)) {
259 if(init_endpoint(entry.info)){
260 fi_reject(g_ctxt.
pep, entry.info->handle, NULL, 0);
261 fi_freeinfo(entry.info);
264 if(fi_accept(this->ep, NULL, 0)){
265 fi_reject(g_ctxt.
pep, entry.info->handle, NULL, 0);
266 fi_freeinfo(entry.info);
269 fi_freeinfo(entry.info);
275 struct fi_info * client_hints = fi_dupinfo(g_ctxt.
hints);
276 struct fi_info * client_info = NULL;
278 FAIL_IF_ZERO(client_hints->dest_addr = malloc(remote_cm_data.pep_addr_len),
"failed to malloc address space for server pep.",
CRASH_ON_FAILURE);
279 memcpy((
void*)client_hints->dest_addr,(
void*)remote_cm_data.pep_addr,(
size_t)remote_cm_data.pep_addr_len);
280 client_hints->dest_addrlen = remote_cm_data.pep_addr_len;
282 if(init_endpoint(client_info)){
283 fi_freeinfo(client_hints);
284 fi_freeinfo(client_info);
290 nRead = fi_eq_sread(this->eq, &event, &entry,
sizeof(entry), -1, 0);
291 if (nRead !=
sizeof(entry)) {
295 dbg_default_debug(
"{}:{} entry.fid={},this->ep->fid={}",__FILE__,__func__,(
void*)entry.fid,(
void*)&(this->ep->fid));
296 if (event != FI_CONNECTED || entry.fid != &(this->ep->fid)) {
297 fi_freeinfo(client_hints);
298 fi_freeinfo(client_info);
303 fi_freeinfo(client_hints);
304 fi_freeinfo(client_info);
322 this->remote_id = r_id;
325 this->write_buf = write_addr;
329 this->read_buf = read_addr;
334 #define LF_RMR_KEY(rid) (((uint64_t)0xf0000000)<<32 | (uint64_t)(rid)) 335 #define LF_WMR_KEY(rid) (((uint64_t)0xf8000000)<<32 | (uint64_t)(rid)) 339 g_ctxt.
domain,write_buf,size_w,FI_SEND|FI_RECV|FI_READ|FI_WRITE|FI_REMOTE_READ|FI_REMOTE_WRITE,
340 0, 0, 0, &this->write_mr, NULL),
342 "register memory buffer for write",
344 dbg_default_trace(
"{}:{} registered memory for remote write: {}:{}",__FILE__,__func__,(
void*)write_addr,size_w);
348 g_ctxt.
domain,read_buf,size_r,FI_SEND|FI_RECV|FI_READ|FI_WRITE|FI_REMOTE_READ|FI_REMOTE_WRITE,
349 0, 0, 0, &this->read_mr, NULL),
351 "register memory buffer for read",
353 dbg_default_trace(
"{}:{} registered memory for remote read: {}:{}",__FILE__,__func__,(
void*)read_addr,size_r);
355 this->mr_lrkey = fi_mr_key(this->read_mr);
356 if (this->mr_lrkey == FI_KEY_NOTAVAIL) {
359 this->mr_lwkey = fi_mr_key(this->write_mr);
360 dbg_default_trace(
"{}:{} local write key:{}, local read key:{}",__FILE__,__func__,(uint64_t)this->mr_lwkey,(uint64_t)this->mr_lrkey);
361 if (this->mr_lwkey == FI_KEY_NOTAVAIL) {
365 connect_endpoint(is_lf_server);
386 const long long int offset,
387 const long long int size,
389 const bool completion) {
401 struct iovec msg_iov;
403 msg_iov.iov_base = read_buf + offset;
404 msg_iov.iov_len = size;
406 msg.msg_iov = &msg_iov;
407 msg.desc = (
void**)&this->mr_lrkey;
410 msg.context = (
void*)ctxt;
414 "fi_sendmsg failed.",
417 struct iovec msg_iov;
418 struct fi_rma_iov rma_iov;
419 struct fi_msg_rma msg;
421 msg_iov.iov_base = read_buf + offset;
422 msg_iov.iov_len = size;
424 rma_iov.addr = ((
LF_USE_VADDR)?remote_fi_addr:0) + offset;
426 rma_iov.key = this->mr_rwkey;
428 msg.msg_iov = &msg_iov;
429 msg.desc = (
void**)&this->mr_lrkey;
432 msg.rma_iov = &rma_iov;
433 msg.rma_iov_count = 1;
434 msg.context = (
void*)ctxt;
444 "fi_writemsg failed.",
448 "fi_readmsg failed.",
491 int rc = post_remote_send(NULL, 0, size, 2,
false);
493 cout <<
"Could not post RDMA two sided send (with no offset), error code is " << rc << endl;
504 int rc = post_remote_send(NULL, offset, size, 2,
false);
506 cout <<
"Could not post RDMA two sided send with offset, error code is " << rc << endl;
511 int rc = post_remote_send(ctxt, 0, size, 2,
true);
513 cout <<
"Could not post RDMA two sided send (with no offset) with completion, error code is " << rc << endl;
518 int rc = post_remote_send(ctxt, offset, size, 2,
true);
520 cout <<
"Could not post RDMA two sided send with offset and completion, error code is " << rc <<
", remote_id is " << ctxt->
remote_id << endl;
525 int rc = post_receive(ctxt, 0, size);
527 cout <<
"Could not post RDMA two sided receive (with no offset), error code is " << rc <<
", remote_id is " << ctxt->
remote_id << endl;
532 int rc = post_receive(ctxt, offset, size);
534 cout <<
"Could not post RDMA two sided receive with offset, error code is " << rc <<
", remote_id is " << ctxt->
remote_id << endl;
539 struct iovec msg_iov;
543 msg_iov.iov_base = write_buf + offset;
544 msg_iov.iov_len = size;
546 msg.msg_iov = &msg_iov;
547 msg.desc = (
void**)&this->mr_lwkey;
550 msg.context = (
void*)ctxt;
557 bool add_node(uint32_t new_id,
const std::pair<ip_addr_t, uint16_t>& new_ip_addr_and_port) {
558 return sst_connections->
add_node(new_id, new_ip_addr_and_port);
567 return sst_connections->
exchange(r_id, s, t);
571 pthread_setname_np(pthread_self(),
"sst_poll");
574 struct timespec last_time, cur_time;
575 clock_gettime(CLOCK_REALTIME, &last_time);
582 if (ce.first != 0xFFFFFFFF) {
586 clock_gettime(CLOCK_REALTIME, &last_time);
588 clock_gettime(CLOCK_REALTIME, &cur_time);
590 double time_elapsed_in_ms = (cur_time.tv_sec - last_time.tv_sec) * 1e3
591 + (cur_time.tv_nsec - last_time.tv_nsec) / 1e6;
592 if(time_elapsed_in_ms > 1) {
594 std::this_thread::sleep_for(1ms);
611 struct fi_cq_entry entry;
614 struct timespec last_time, cur_time;
615 clock_gettime(CLOCK_REALTIME, &last_time);
618 clock_gettime(CLOCK_REALTIME, &cur_time);
620 double time_elapsed_in_ms = (cur_time.tv_sec - last_time.tv_sec) * 1e3
621 + (cur_time.tv_nsec - last_time.tv_nsec) / 1e6;
622 if(time_elapsed_in_ms > 1) {
624 std::this_thread::sleep_for(1ms);
628 for(
int i = 0; i < 50; ++i) {
629 poll_result = fi_cq_read(g_ctxt.
cq, &entry, 1);
630 if(poll_result && (poll_result!=-FI_EAGAIN)) {
634 if(poll_result && (poll_result!=-FI_EAGAIN)) {
641 if((poll_result < 0) && (poll_result != -FI_EAGAIN)) {
642 struct fi_cq_err_entry eentry;
643 fi_cq_readerr(g_ctxt.
cq, &eentry, 0);
646 if (eentry.op_context == NULL) {
654 #ifdef DEBUG_FOR_RELEASE 655 printf(
"\tflags=%x\n",eentry.flags);
656 printf(
"\tlen=%x\n",eentry.len);
657 printf(
"\tbuf=%p\n",eentry.buf);
658 printf(
"\tdata=0x%x\n",eentry.data);
659 printf(
"\ttag=0x%x\n",eentry.tag);
660 printf(
"\tolen=0x%x\n",eentry.olen);
661 printf(
"\terr=0x%x\n",eentry.err);
662 #endif//DEBUG_FOR_RELEASE 674 fi_cq_strerror(g_ctxt.
cq,eentry.prov_errno,eentry.err_data,errbuf,1024));
675 #ifdef DEBUG_FOR_RELEASE 676 printf(
"\tproverr=0x%x,%s\n",eentry.prov_errno,
677 fi_cq_strerror(g_ctxt.
cq,eentry.prov_errno,eentry.err_data,errbuf,1024));
678 #endif//DEBUG_FOR_RELEASE 681 #ifdef DEBUG_FOR_RELEASE 682 printf(
"\terr_data_size=%d\n",eentry.err_data_size);
683 #endif//DEBUG_FOR_RELEASE 684 if (eentry.op_context!=NULL){
689 fprintf(stderr,
"Failed polling the completion queue");
690 return {(uint32_t)0xFFFFFFFF,{0,-1}};
698 return {0xFFFFFFFFu,{0,0}};
718 load_configuration();
void insert_completion_entry(uint32_t index, std::pair< int32_t, int32_t > ce)
#define CRASH_WITH_MESSAGE(...)
Internal Tools.
bool add_node(node_id_t new_id, const std::pair< ip_addr_t, uint16_t > &new_ip_addr_and_port)
Adds a TCP connection to a new node.
bool add_node(uint32_t new_id, const std::pair< ip_addr_t, uint16_t > &new_ip_addr_and_port)
Adds a new node to the SST TPC connections set.
std::thread polling_thread
const int32_t getInt32(const std::string &key) const
void shutdown_polling_thread()
Shutdown the polling thread.
struct fi_cq_attr cq_attr
const std::string & getConfString(const std::string &key)
struct fid_fabric * fabric
void connect_endpoint(bool is_lf_server)
Connect the queue pair.
bool exchange(node_id_t node_id, T local, T &remote)
virtual ~_resources()
Destroys the resources.
std::pair< uint32_t, std::pair< int32_t, int32_t > > lf_poll_completion()
Polls for completion of a single posted remote write.
bool remove_node(uint32_t node_id)
Removes a node from the SST TCP connections set.
#define dbg_default_debug(...)
struct fi_eq_attr eq_attr
#define dbg_default_error(...)
#define CONF_RDMA_PROVIDER
void lf_initialize(const std::map< uint32_t, std::pair< ip_addr_t, uint16_t >> &ip_addrs_and_ports, uint32_t node_rank)
Initializes the global libfabric resources.
tcp::tcp_connections * sst_connections
void post_two_sided_send_with_completion(struct lf_sender_ctxt *ctxt, const long long int size)
static const Conf * get() noexcept
char pep_addr[MAX_LF_ADDR_SIZE]
#define FAIL_IF_ZERO(x, desc, next)
int init_endpoint(struct fi_info *fi)
Initialize resource endpoint using fi_info.
#define CONF_RDMA_RX_DEPTH
bool delete_node(node_id_t remove_id)
Removes a node from the managed set of TCP connections, closing the socket connected to it...
int post_receive(struct lf_sender_ctxt *ctxt, const long long int offset, const long long int size)
void post_two_sided_receive(struct lf_sender_ctxt *ctxt, const long long int size)
void post_remote_read(const long long int size)
Post an RDMA read at the beginning address of remote memory.
bool sync(uint32_t r_id)
Blocks the current thread until both this node and a remote node reach this function, which exchanges some trivial data over a TCP connection.
void post_two_sided_send(const long long int size)
#define dbg_default_trace(...)
#define dbg_default_flush()
uint32_t node_id_t
Type alias for Node IDs in a Derecho group.
#define CONF_RDMA_TX_DEPTH
void lf_destroy()
Destroys the global libfabric resources.
#define FAIL_IF_NONZERO_RETRY_EAGAIN(x, desc, next)
char pep_addr[MAX_LF_ADDR_SIZE]
struct fid_domain * domain
int post_remote_send(struct lf_sender_ctxt *ctxt, const long long int offset, const long long int size, const int op, const bool completion)
post read/write request
void post_remote_write_with_completion(struct lf_sender_ctxt *ctxt, const long long int size)
Structure to exchange the data needed to connect the Queue Pairs.
_resources(int r_id, char *write_addr, char *read_addr, int size_w, int size_r, int is_lf_server)
Constructor Initializes the resources.
struct sst::verbs_sender_ctxt __attribute__
void post_remote_write(const long long int size)
Post an RDMA write at the beginning address of remote memory.
#define dbg_default_warn(...)