11 #include <infiniband/verbs.h> 19 #include <sys/socket.h> 21 #include <sys/types.h> 25 #include "derecho/connection_manager.h" 26 #include "derecho/derecho_ports.h" 27 #include "poll_utils.h" 31 #error "Verbs implementation is obsolete. Compilation stopped." 36 #define MSG "SEND operation " 37 #define RDMAMSGR "RDMA read operation " 38 #define RDMAMSGW "RDMA write operation" 39 #define MSG_SIZE (strlen(MSG) + 1) 40 #if __BYTE_ORDER == __LITTLE_ENDIAN 41 static inline uint64_t htonll(uint64_t x) {
return bswap_64(x); }
42 static inline uint64_t ntohll(uint64_t x) {
return bswap_64(x); }
43 #elif __BYTE_ORDER == __BIG_ENDIAN 44 static inline uint64_t htonll(uint64_t x) {
return x; }
45 static inline uint64_t ntohll(uint64_t x) {
return x; }
47 #error __BYTE_ORDER is neither 48 __LITTLE_ENDIAN nor __BIG_ENDIAN
80 static bool shutdown =
false;
98 remote_index = r_index;
100 write_buf = write_addr;
102 cout <<
"Write address is NULL" << endl;
105 read_buf = read_addr;
107 cout <<
"Read address is NULL" << endl;
113 mr_flags = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE;
115 write_mr = ibv_reg_mr(g_res->
pd, write_buf, size_w, mr_flags);
116 read_mr = ibv_reg_mr(g_res->
pd, read_buf, size_r, mr_flags);
118 cout <<
"Could not register memory region : write_mr, error code is: " << errno << endl;
121 cout <<
"Could not register memory region : read_mr, error code is: " << errno << endl;
125 struct ibv_qp_init_attr qp_init_attr;
126 memset(&qp_init_attr, 0,
sizeof(qp_init_attr));
127 qp_init_attr.qp_type = IBV_QPT_RC;
128 qp_init_attr.sq_sig_all = 0;
130 qp_init_attr.send_cq = g_res->
cq;
131 qp_init_attr.recv_cq = g_res->
cq;
133 qp_init_attr.cap.max_send_wr = 4000;
134 qp_init_attr.cap.max_recv_wr = 4000;
135 qp_init_attr.cap.max_send_sge = 1;
136 qp_init_attr.cap.max_recv_sge = 1;
138 qp = ibv_create_qp(g_res->
pd, &qp_init_attr);
141 cout <<
"Could not create queue pair, error code is: " << errno << endl;
146 cout <<
"Established RDMA connection with node " << r_index << endl;
155 rc = ibv_destroy_qp(qp);
157 cout <<
"Could not destroy queue pair, error code is " << rc << endl;
162 rc = ibv_dereg_mr(write_mr);
164 cout <<
"Could not de-register memory region : write_mr, error code is " << rc << endl;
168 rc = ibv_dereg_mr(read_mr);
170 cout <<
"Could not de-register memory region : read_mr, error code is " << rc << endl;
179 struct ibv_qp_attr attr;
182 memset(&attr, 0,
sizeof(attr));
184 attr.qp_state = IBV_QPS_INIT;
188 attr.qp_access_flags = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE;
189 flags = IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_ACCESS_FLAGS;
191 rc = ibv_modify_qp(qp, &attr, flags);
193 cout <<
"Failed to modify queue pair to init state, error code is " << rc << endl;
198 struct ibv_qp_attr attr;
200 memset(&attr, 0,
sizeof(attr));
202 attr.qp_state = IBV_QPS_RTR;
203 attr.path_mtu = IBV_MTU_256;
205 attr.dest_qp_num = remote_props.qp_num;
207 attr.max_dest_rd_atomic = 1;
208 attr.min_rnr_timer = 0x12;
209 attr.ah_attr.is_global = 0;
211 attr.ah_attr.dlid = remote_props.lid;
213 attr.ah_attr.src_path_bits = 0;
215 attr.ah_attr.port_num =
ib_port;
217 attr.ah_attr.is_global = 1;
218 attr.ah_attr.port_num = 1;
219 memcpy(&attr.ah_attr.grh.dgid, remote_props.gid, 16);
220 attr.ah_attr.grh.flow_label = 0;
221 attr.ah_attr.grh.hop_limit = 1;
222 attr.ah_attr.grh.sgid_index =
gid_idx;
223 attr.ah_attr.grh.traffic_class = 0;
225 flags = IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU | IBV_QP_DEST_QPN | IBV_QP_RQ_PSN | IBV_QP_MAX_DEST_RD_ATOMIC | IBV_QP_MIN_RNR_TIMER;
226 rc = ibv_modify_qp(qp, &attr, flags);
228 cout <<
"Failed to modify queue pair to ready-to-receive state, error code is " << rc << endl;
233 struct ibv_qp_attr attr;
235 memset(&attr, 0,
sizeof(attr));
237 attr.qp_state = IBV_QPS_RTS;
242 attr.max_rd_atomic = 1;
243 flags = IBV_QP_STATE | IBV_QP_TIMEOUT | IBV_QP_RETRY_CNT | IBV_QP_RNR_RETRY | IBV_QP_SQ_PSN | IBV_QP_MAX_QP_RD_ATOMIC;
244 rc = ibv_modify_qp(qp, &attr, flags);
246 cout <<
"Failed to modify queue pair to ready-to-send state, error code is " << rc << endl;
262 union ibv_gid my_gid;
264 int rc = ibv_query_gid(g_res->
ib_ctx, ib_port, gid_idx, &my_gid);
266 cout <<
"ibv_query_gid failed, error code is " << errno << endl;
269 memset(&my_gid, 0,
sizeof my_gid);
273 local_con_data.
addr = htonll((uintptr_t)(
char *)write_buf);
274 local_con_data.
rkey = htonl(write_mr->rkey);
275 local_con_data.
qp_num = htonl(qp->qp_num);
277 memcpy(local_con_data.
gid, &my_gid, 16);
278 bool success = sst_connections->
exchange(remote_index, local_con_data, tmp_con_data);
280 cout <<
"Could not exchange qp data in connect_qp" << endl;
282 remote_con_data.
addr = ntohll(tmp_con_data.
addr);
283 remote_con_data.
rkey = ntohl(tmp_con_data.
rkey);
285 remote_con_data.
lid = ntohs(tmp_con_data.
lid);
286 memcpy(remote_con_data.
gid, tmp_con_data.
gid, 16);
288 remote_props = remote_con_data;
291 set_qp_initialized();
294 set_qp_ready_to_receive();
297 set_qp_ready_to_send();
302 success =
sync(remote_index);
304 cout <<
"Could not sync in connect_qp after qp transition to RTS state" << endl;
318 const int op,
const bool completion) {
319 struct ibv_send_wr sr;
321 struct ibv_send_wr *bad_wr = NULL;
324 sge.addr = (uintptr_t)(read_buf + offset);
326 sge.lkey = read_mr->lkey;
328 memset(&sr, 0,
sizeof(sr));
336 sr.opcode = IBV_WR_RDMA_READ;
338 sr.opcode = IBV_WR_RDMA_WRITE;
340 sr.opcode = IBV_WR_SEND;
343 sr.send_flags = IBV_SEND_SIGNALED;
345 if(op == 0 || op == 1) {
347 sr.wr.rdma.remote_addr = remote_props.addr + offset;
348 sr.wr.rdma.rkey = remote_props.rkey;
352 auto ret = ibv_post_send(qp, &sr, &bad_wr);
357 int size_r) :
_resources(r_index, write_addr, read_addr, size_w, size_r) {
366 cout <<
"Could not post RDMA read, error code is " << rc <<
", remote_index is " <<
remote_index << endl;
377 cout <<
"Could not post RDMA read, error code is " << rc <<
", remote_index is " <<
remote_index << endl;
387 cout <<
"Could not post RDMA write (with no offset), error code is " << rc <<
", remote_index is " <<
remote_index << endl;
400 cout <<
"Could not post RDMA write with offset, error code is " << rc <<
", remote_index is " <<
remote_index << endl;
407 cout <<
"Could not post RDMA write (with no offset) with completion, error code is " << rc <<
", remote_index is " <<
remote_index << endl;
414 cout <<
"Could not post RDMA write with offset and completion, error code is " << rc <<
", remote_index is " <<
remote_index << endl;
419 int size_r) :
_resources(r_index, write_addr, read_addr, size_w, size_r) {
429 cout <<
"Could not post RDMA two sided send (with no offset), error code is " << rc <<
", remote_index is " <<
remote_index << endl;
442 cout <<
"Could not post RDMA two sided send with offset, error code is " << rc <<
", remote_index is " <<
remote_index << endl;
449 cout <<
"Could not post RDMA two sided send (with no offset) with completion, error code is " << rc <<
", remote_index is " <<
remote_index << endl;
456 cout <<
"Could not post RDMA two sided send with offset and completion, error code is " << rc <<
", remote_index is " <<
remote_index << endl;
461 struct ibv_recv_wr rr;
463 struct ibv_recv_wr *bad_wr;
466 memset(&sge, 0,
sizeof(sge));
467 sge.addr = (uintptr_t)(
write_buf + offset);
471 memset(&rr, 0,
sizeof(rr));
478 auto ret = ibv_post_recv(
qp, &rr, &bad_wr);
485 cout <<
"Could not post RDMA two sided receive (with no offset), error code is " << rc <<
", remote_index is " <<
remote_index << endl;
492 cout <<
"Could not post RDMA two sided receive with offset, error code is " << rc <<
", remote_index is " <<
remote_index << endl;
497 pthread_setname_np(pthread_self(),
"sst_poll");
498 cout <<
"Polling thread starting" << endl;
503 cout <<
"Polling thread ending" << endl;
521 for(
int i = 0; i < 50; ++i) {
522 poll_result = ibv_poll_cq(g_res->
cq, 1, &wc);
534 if(poll_result < 0) {
535 cout <<
"Poll completion failed" << endl;
540 if(wc.status != IBV_WC_SUCCESS) {
541 cout <<
"got bad completion with status: " 542 << wc.status <<
", vendor syndrome: " << wc.vendor_err;
543 return {wc.wr_id, {wc.qp_num, -1}};
545 return {wc.wr_id, {wc.qp_num, 1}};
552 memset(g_res, 0,
sizeof *g_res);
557 struct ibv_device **dev_list = NULL;
558 struct ibv_device *ib_dev = NULL;
564 dev_list = ibv_get_device_list(&num_devices);
566 cout <<
"ibv_get_device_list failed; returned a NULL list" << endl;
571 cout <<
"NO RDMA device present" << endl;
574 for(i = 1; i < num_devices; i++) {
576 dev_name = strdup(ibv_get_device_name(dev_list[i]));
577 fprintf(stdout,
"device not specified, using first one found: %s\n",
580 if(!strcmp(ibv_get_device_name(dev_list[i]), dev_name)) {
581 ib_dev = dev_list[i];
587 cout <<
"No RDMA devices found in the host" << endl;
590 g_res->
ib_ctx = ibv_open_device(ib_dev);
592 cout <<
"Could not open RDMA device" << endl;
595 ibv_free_device_list(dev_list);
601 cout <<
"Could not query port properties, error code is " << rc << endl;
605 g_res->
pd = ibv_alloc_pd(g_res->
ib_ctx);
607 cout <<
"Could not allocate protection domain" << endl;
618 g_res->
cq = ibv_create_cq(g_res->
ib_ctx, cq_size, NULL, NULL, 0);
620 cout <<
"Could not create completion queue, error code is " << errno << endl;
625 polling_thread.detach();
628 bool add_node(uint32_t new_id,
const std::string new_ip_addr) {
629 return sst_connections->
add_node(new_id, new_ip_addr);
635 bool sync(uint32_t r_index) {
637 return sst_connections->
exchange(r_index, s, t);
652 cout <<
"Initialized global RDMA resources" << endl;
686 cout <<
"SST Verbs shutting down" << endl;
void insert_completion_entry(uint32_t index, std::pair< int32_t, int32_t > ce)
struct ibv_port_attr port_attr
IB port attributes.
bool add_node(node_id_t new_id, const std::pair< ip_addr_t, uint16_t > &new_ip_addr_and_port)
Adds a TCP connection to a new node.
struct fid_mr * write_mr
memory region for remote writer
void resources_create()
Creates global RDMA resources.
uint32_t qp_num
Queue Pair number.
bool add_node(uint32_t new_id, const std::pair< ip_addr_t, uint16_t > &new_ip_addr_and_port)
Adds a new node to the SST TPC connections set.
std::thread polling_thread
int ib_port
Local IB port to work with.
struct global_resources * g_res
The single instance of global_resources for the SST system.
void shutdown_polling_thread()
Shutdown the polling thread.
bool exchange(node_id_t node_id, T local, T &remote)
std::pair< uint32_t, std::pair< int, int > > verbs_poll_completion()
Polls for completion of a single posted remote write.
void set_qp_ready_to_send()
Transitions the queue pair to the ready-to-send state.
virtual ~_resources()
Destroys the resources.
bool remove_node(uint32_t node_id)
Removes a node from the SST TCP connections set.
char * write_buf
Pointer to the memory buffer used for local writes.
struct ibv_cq * cq
Completion Queue handle.
void resources_init()
Allocates memory for global RDMA resources.
struct ibv_pd * pd
PD handle.
tcp::tcp_connections * sst_connections
void post_two_sided_send_with_completion(struct lf_sender_ctxt *ctxt, const long long int size)
void connect_qp()
Connect the queue pairs.
void set_qp_ready_to_receive()
Transitions the queue pair to the ready-to-receive state.
int remote_index
Index of the remote node.
void verbs_initialize(const std::map< uint32_t, std::string > &ip_addrs, uint32_t node_rank)
Initializes the global verbs resources.
bool delete_node(node_id_t remove_id)
Removes a node from the managed set of TCP connections, closing the socket connected to it...
int post_receive(struct lf_sender_ctxt *ctxt, const long long int offset, const long long int size)
void post_two_sided_receive(struct lf_sender_ctxt *ctxt, const long long int size)
void post_remote_read(const long long int size)
Post an RDMA read at the beginning address of remote memory.
const char * dev_name
IB device name.
bool sync(uint32_t r_id)
Blocks the current thread until both this node and a remote node reach this function, which exchanges some trivial data over a TCP connection.
void post_two_sided_send(const long long int size)
uint64_t addr
Buffer address.
struct ibv_qp * qp
Handle for the IB Verbs Queue Pair object.
void set_qp_initialized()
Initializes the queue pair.
void verbs_destroy()
Destroys the global verbs resources.
struct ibv_context * ib_ctx
Device handle.
uint16_t lid
LID of the InfiniBand port.
resources_two_sided(int r_id, char *write_addr, char *read_addr, int size_w, int size_r, int is_lf_server)
constructor: simply forwards to _resources::_resources
resources(int r_id, char *write_addr, char *read_addr, int size_w, int size_r, int is_lf_server)
Constructor: simply forwards to _resources::_resources.
Structure containing global system resources.
Represents the set of RDMA resources needed to maintain a two-way connection to a single remote node...
struct ibv_device_attr device_attr
RDMA device attributes.
int post_remote_send(struct lf_sender_ctxt *ctxt, const long long int offset, const long long int size, const int op, const bool completion)
post read/write request
int gid_idx
GID index to use.
void post_remote_write_with_completion(struct lf_sender_ctxt *ctxt, const long long int size)
Structure to exchange the data needed to connect the Queue Pairs.
_resources(int r_id, char *write_addr, char *read_addr, int size_w, int size_r, int is_lf_server)
Constructor Initializes the resources.
void post_remote_write(const long long int size)
Post an RDMA write at the beginning address of remote memory.