11 #include "derecho/derecho_ports.h" 14 #include "verbs_helper.h" 16 #error "Verbs implementation is obsolete. Compilation stopped." 19 #include <infiniband/verbs.h> 22 #ifdef INFINIBAND_VERBS_EXP_H 23 #define MELLANOX_EXPERIMENTAL_VERBS 44 static map<uint32_t, tcp::socket> sockets;
47 static unique_ptr<tcp::connection_listener> connection_listener;
67 static vector<completion_handler_set> completion_handlers;
68 static std::mutex completion_handlers_mutex;
70 static atomic<bool> interrupt_mode;
71 static atomic<bool> contiguous_memory_mode;
75 static atomic<bool> polling_loop_shutdown_flag;
76 static void polling_loop() {
77 pthread_setname_np(pthread_self(),
"rdmc_poll");
78 TRACE(
"Spawned main loop");
80 const int max_work_completions = 1024;
81 unique_ptr<ibv_wc[]> work_completions(
new ibv_wc[max_work_completions]);
84 int num_completions = 0;
85 while(num_completions == 0) {
86 if(polling_loop_shutdown_flag)
return;
87 uint64_t poll_end =
get_time() + (interrupt_mode ? 0L : 50000000L);
89 if(polling_loop_shutdown_flag)
return;
91 work_completions.get());
92 }
while(num_completions == 0 &&
get_time() < poll_end);
94 if(num_completions == 0) {
99 work_completions.get());
101 if(num_completions == 0) {
102 pollfd file_descriptor;
104 file_descriptor.events = POLLIN;
105 file_descriptor.revents = 0;
107 while(rc == 0 && !polling_loop_shutdown_flag) {
108 if(polling_loop_shutdown_flag)
return;
109 rc = poll(&file_descriptor, 1, 50);
116 ibv_ack_cq_events(ev_cq, 1);
122 if(num_completions < 0) {
123 fprintf(stderr,
"Failed to poll completion queue.");
127 std::lock_guard<std::mutex> l(completion_handlers_mutex);
128 for(
int i = 0; i < num_completions; i++) {
129 ibv_wc &wc = work_completions[i];
131 if(wc.status == 5)
continue;
133 string opcode =
"[unknown]";
134 if(wc.opcode == IBV_WC_SEND) opcode =
"IBV_WC_SEND";
135 if(wc.opcode == IBV_WC_RECV) opcode =
"IBV_WC_RECV";
136 if(wc.opcode == IBV_WC_RDMA_WRITE) opcode =
"IBV_WC_RDMA_WRITE";
139 printf(
"wc.status = %d; wc.wr_id = 0x%llx; imm = 0x%x; " 141 (
int)wc.status, (
long long)wc.wr_id,
142 (
unsigned int)wc.imm_data, opcode.c_str());
146 message_type::tag_type type = wc.wr_id >> message_type::shift_bits;
147 if(type == std::numeric_limits<message_type::tag_type>::max())
150 uint64_t masked_wr_id = wc.wr_id & 0x00ffffffffffffff;
151 if(type >= completion_handlers.size()) {
153 }
else if(wc.status != 0) {
155 }
else if(wc.opcode == IBV_WC_SEND) {
156 completion_handlers[type].send(masked_wr_id, wc.imm_data,
158 }
else if(wc.opcode == IBV_WC_RECV) {
159 completion_handlers[type].recv(masked_wr_id, wc.imm_data,
161 }
else if(wc.opcode == IBV_WC_RDMA_WRITE) {
162 completion_handlers[type].write(masked_wr_id, wc.imm_data,
165 puts(
"Sent unrecognized completion type?!");
171 static int modify_qp_to_init(
struct ibv_qp *qp,
int ib_port) {
172 struct ibv_qp_attr attr;
175 memset(&attr, 0,
sizeof(attr));
176 attr.qp_state = IBV_QPS_INIT;
179 attr.qp_access_flags = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE;
180 flags = IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_ACCESS_FLAGS;
181 rc = ibv_modify_qp(qp, &attr, flags);
182 if(rc) fprintf(stderr,
"failed to modify QP state to INIT\n");
186 static int modify_qp_to_rtr(
struct ibv_qp *qp, uint32_t remote_qpn,
187 uint16_t dlid, uint8_t *dgid,
int ib_port,
189 struct ibv_qp_attr attr;
192 memset(&attr, 0,
sizeof(attr));
193 attr.qp_state = IBV_QPS_RTR;
195 attr.dest_qp_num = remote_qpn;
197 attr.max_dest_rd_atomic = 1;
198 attr.min_rnr_timer = 16;
199 attr.ah_attr.is_global = 1;
200 attr.ah_attr.dlid = dlid;
202 attr.ah_attr.src_path_bits = 0;
203 attr.ah_attr.port_num =
ib_port;
205 attr.ah_attr.is_global = 1;
206 attr.ah_attr.port_num = 1;
207 memcpy(&attr.ah_attr.grh.dgid, dgid, 16);
208 attr.ah_attr.grh.flow_label = 0;
209 attr.ah_attr.grh.hop_limit = 0xFF;
210 attr.ah_attr.grh.sgid_index =
gid_idx;
211 attr.ah_attr.grh.traffic_class = 0;
213 flags = IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU | IBV_QP_DEST_QPN | IBV_QP_RQ_PSN | IBV_QP_MAX_DEST_RD_ATOMIC | IBV_QP_MIN_RNR_TIMER;
214 rc = ibv_modify_qp(qp, &attr, flags);
215 if(rc) fprintf(stderr,
"failed to modify QP state to RTR\n");
219 static int modify_qp_to_rts(
struct ibv_qp *qp) {
220 struct ibv_qp_attr attr;
223 memset(&attr, 0,
sizeof(attr));
224 attr.qp_state = IBV_QPS_RTS;
229 attr.max_rd_atomic = 1;
230 flags = IBV_QP_STATE | IBV_QP_TIMEOUT | IBV_QP_RETRY_CNT | IBV_QP_RNR_RETRY | IBV_QP_SQ_PSN | IBV_QP_MAX_QP_RD_ATOMIC;
231 rc = ibv_modify_qp(qp, &attr, flags);
232 if(rc) fprintf(stderr,
"failed to modify QP state to RTS. ERRNO=%d\n", rc);
239 fprintf(stderr,
"failed to destroy CQ\n");
242 fprintf(stderr,
"failed to destroy Completion Channel\n");
245 fprintf(stderr,
"failed to deallocate PD\n");
248 fprintf(stderr,
"failed to close device context\n");
252 bool verbs_initialize(
const map<uint32_t, std::pair<ip_addr_t, uint16_t>> &ip_addrs_and_ports,
256 connection_listener = make_unique<tcp::connection_listener>(derecho::rdmc_tcp_port);
258 TRACE(
"Starting connection phase");
262 for(
auto it = ip_addrs_and_ports.begin(); it != ip_addrs_and_ports.end(); it++) {
263 if(it->first != node_rank) {
265 fprintf(stderr,
"WARNING: failed to connect to node %d at %s\n",
266 (
int)it->first, it->second.c_str());
270 TRACE(
"Done connecting");
274 ibv_device **dev_list = NULL;
275 ibv_device *ib_dev = NULL;
280 fprintf(stdout,
"searching for IB devices in host\n");
282 dev_list = ibv_get_device_list(&num_devices);
284 fprintf(stderr,
"failed to get IB devices list\n");
285 goto resources_create_exit;
289 fprintf(stderr,
"found %d device(s)\n", num_devices);
290 goto resources_create_exit;
293 local_config.
dev_name = getenv(
"RDMC_DEVICE_NAME");
294 fprintf(stdout,
"found %d device(s)\n", num_devices);
296 for(i = 1; i < num_devices; i++) {
298 local_config.
dev_name = strdup(ibv_get_device_name(dev_list[i]));
299 fprintf(stdout,
"device not specified, using first one found: %s\n",
302 if(!strcmp(ibv_get_device_name(dev_list[i]), local_config.
dev_name)) {
303 ib_dev = dev_list[i];
309 fprintf(stderr,
"IB device %s wasn't found\n", local_config.
dev_name);
310 goto resources_create_exit;
313 res->ib_ctx = ibv_open_device(ib_dev);
315 fprintf(stderr,
"failed to open device %s\n", local_config.
dev_name);
316 goto resources_create_exit;
319 ibv_free_device_list(dev_list);
323 if(ibv_query_port(res->ib_ctx, local_config.
ib_port, &res->port_attr)) {
324 fprintf(stderr,
"ibv_query_port on port %u failed\n",
326 goto resources_create_exit;
329 res->pd = ibv_alloc_pd(res->ib_ctx);
331 fprintf(stderr,
"ibv_alloc_pd failed\n");
332 goto resources_create_exit;
335 res->cc = ibv_create_comp_channel(res->ib_ctx);
337 fprintf(stderr,
"ibv_create_comp_channel failed\n");
338 goto resources_create_exit;
341 if(fcntl(res->cc->fd, F_SETFL, fcntl(res->cc->fd, F_GETFL) | O_NONBLOCK)) {
343 "failed to change file descriptor for completion channel\n");
344 goto resources_create_exit;
348 res->cq = ibv_create_cq(res->ib_ctx, cq_size, NULL, res->cc, 0);
350 fprintf(stderr,
"failed to create CQ with %u entries\n", cq_size);
351 goto resources_create_exit;
358 (void)message_type::ignored();
361 #ifdef MELLANOX_EXPERIMENTAL_VERBS 365 ibv_exp_device_attr attr;
367 int ret = ibv_exp_query_device(res->ib_ctx, &attr);
369 supported_features.
cross_channel = attr.exp_device_cap_flags & IBV_EXP_DEVICE_CROSS_CHANNEL;
375 thread t(polling_loop);
379 TRACE(
"verbs_initialize() - SUCCESS");
381 resources_create_exit:
382 TRACE(
"verbs_initialize() - ERROR!!!!!!!!!!!!!!");
384 ibv_destroy_cq(res->cq);
388 ibv_destroy_comp_channel(res->cc);
392 ibv_dealloc_pd(res->pd);
396 ibv_close_device(res->ib_ctx);
400 ibv_free_device_list(dev_list);
407 if(index < node_rank) {
408 if(sockets.count(index) > 0) {
410 "WARNING: attempted to connect to node %u at %s:%d but we " 411 "already have a connection to a node with that index.",
412 (
unsigned int)index, address.c_str(), derecho::rdmc_tcp_port);
417 sockets[index] =
tcp::socket(address, derecho::rdmc_tcp_port);
418 }
catch(tcp::exception) {
419 fprintf(stderr,
"WARNING: failed to node %u at %s:%d",
420 (
unsigned int)index, address.c_str(), derecho::rdmc_tcp_port);
426 uint32_t remote_rank = 0;
427 if(!sockets[index].exchange(node_rank, remote_rank)) {
429 "WARNING: failed to exchange rank with node %u at %s:%d",
430 (
unsigned int)index, address.c_str(), derecho::rdmc_tcp_port);
431 sockets.erase(index);
433 }
else if(remote_rank != index) {
435 "WARNING: node at %s:%d replied with wrong rank (expected" 437 address.c_str(), derecho::rdmc_tcp_port, (
unsigned int)index,
438 (
unsigned int)remote_rank);
440 sockets.erase(index);
444 }
else if(index > node_rank) {
448 uint32_t remote_rank = 0;
449 if(!s.
exchange(node_rank, remote_rank)) {
450 fprintf(stderr,
"WARNING: failed to exchange rank with node");
453 sockets[remote_rank] = std::move(s);
456 }
catch(tcp::exception) {
457 fprintf(stderr,
"Got error while attempting to listing on port");
465 return sockets.erase(index) > 0;
468 interrupt_mode = enabled;
472 #ifdef MELLANOX_EXPERIMENTAL_VERBS 473 contiguous_memory_mode = enabled;
485 int mr_flags = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE;
489 [](ibv_mr *m) { ibv_dereg_mr(m); });
496 #ifdef MELLANOX_EXPERIMENTAL_VERBS 500 ibv_exp_reg_mr_in in;
504 in.exp_access = IBV_EXP_ACCESS_LOCAL_WRITE | IBV_EXP_ACCESS_REMOTE_READ | IBV_EXP_ACCESS_REMOTE_WRITE | IBV_EXP_ACCESS_ALLOCATE_MR;
505 in.create_flags = IBV_EXP_REG_MR_CREATE_CONTIG;
506 in.comp_mask = IBV_EXP_REG_MR_CREATE_FLAGS;
508 [](ibv_mr *m) { ibv_dereg_mr(m); });
514 memory_region::memory_region(
size_t s,
bool contiguous)
515 : mr(contiguous ? create_contiguous_mr(s) : create_mr(
new char[s], s)),
516 buffer((
char *)mr->addr),
519 memset(buffer, 0, size);
521 allocated_buffer.reset(buffer);
525 memory_region::memory_region(
size_t s,
bool contiguous) :
memory_region(new char[s], s) {
536 ibv_cq *cq_ptr =
nullptr;
540 #ifdef MELLANOX_EXPERIMENTAL_VERBS 541 ibv_exp_cq_init_attr attr;
542 attr.comp_mask = IBV_EXP_CQ_INIT_ATTR_FLAGS;
543 attr.flags = IBV_EXP_CQ_CREATE_CROSS_CHANNEL;
547 ibv_exp_cq_attr mod_attr;
548 mod_attr.comp_mask = IBV_EXP_CQ_ATTR_CQ_CAP_FLAGS;
549 mod_attr.cq_cap_flags = IBV_EXP_CQ_IGNORE_OVERRUN;
550 ibv_exp_modify_cq(cq_ptr, &mod_attr, IBV_EXP_CQ_CAP_FLAGS);
559 cq = decltype(cq)(cq_ptr, [](ibv_cq *q) { ibv_destroy_cq(q); });
572 std::function<
void(
queue_pair *)> post_recvs) {
573 auto it = sockets.find(remote_index);
576 auto &sock = it->second;
578 ibv_qp_init_attr qp_init_attr;
579 memset(&qp_init_attr, 0,
sizeof(qp_init_attr));
580 qp_init_attr.qp_type = IBV_QPT_RC;
581 qp_init_attr.sq_sig_all = 1;
584 qp_init_attr.cap.max_send_wr = 16;
585 qp_init_attr.cap.max_recv_wr = 16;
586 qp_init_attr.cap.max_send_sge = 1;
587 qp_init_attr.cap.max_recv_sge = 1;
589 qp = unique_ptr<ibv_qp, std::function<void(ibv_qp *)>>(
591 [](ibv_qp *q) { ibv_destroy_qp(q); });
594 fprintf(stderr,
"failed to create QP\n");
600 memset(&local_con_data, 0,
sizeof(local_con_data));
601 memset(&remote_con_data, 0,
sizeof(remote_con_data));
602 union ibv_gid my_gid;
604 if(local_config.
gid_idx >= 0) {
606 local_config.
gid_idx, &my_gid);
608 fprintf(stderr,
"could not get gid for port %d, index %d\n",
613 memset(&my_gid, 0,
sizeof my_gid);
617 local_con_data.
qp_num = qp->qp_num;
619 memcpy(local_con_data.
gid, &my_gid, 16);
624 if(!sock.exchange(local_con_data, remote_con_data))
627 bool success = !modify_qp_to_init(qp.get(), local_config.
ib_port) && !modify_qp_to_rtr(qp.get(), remote_con_data.
qp_num, remote_con_data.
lid, remote_con_data.
gid, local_config.
ib_port, local_config.
gid_idx) && !modify_qp_to_rts(qp.get());
629 if(!success) printf(
"Failed to initialize QP\n");
640 size_t length, uint64_t wr_id, uint32_t immediate,
647 ibv_send_wr *bad_wr = NULL;
650 memset(&sge, 0,
sizeof(sge));
651 sge.addr = (uintptr_t)(mr.
buffer + offset);
653 sge.lkey = mr.
mr->lkey;
656 memset(&sr, 0,
sizeof(sr));
659 sr.imm_data = immediate;
662 sr.opcode = IBV_WR_SEND_WITH_IMM;
663 sr.send_flags = IBV_SEND_SIGNALED;
665 if(ibv_post_send(qp.get(), &sr, &bad_wr)) {
666 fprintf(stderr,
"failed to post SR\n");
676 ibv_send_wr *bad_wr = NULL;
679 memset(&sr, 0,
sizeof(sr));
682 sr.imm_data = immediate;
685 sr.opcode = IBV_WR_SEND_WITH_IMM;
686 sr.send_flags = IBV_SEND_SIGNALED;
688 if(ibv_post_send(qp.get(), &sr, &bad_wr)) {
689 fprintf(stderr,
"failed to post SR\n");
696 size_t length, uint64_t wr_id,
706 memset(&sge, 0,
sizeof(sge));
707 sge.addr = (uintptr_t)(mr.
buffer + offset);
709 sge.lkey = mr.
mr->lkey;
712 memset(&rr, 0,
sizeof(rr));
718 if(ibv_post_recv(qp.get(), &rr, &bad_wr)) {
719 fprintf(stderr,
"failed to post RR\n");
732 memset(&rr, 0,
sizeof(rr));
738 if(ibv_post_recv(qp.get(), &rr, &bad_wr)) {
739 fprintf(stderr,
"failed to post RR\n");
746 size_t length, uint64_t wr_id,
749 bool signaled,
bool send_inline) {
751 if(mr.
size < offset + length || remote_mr.
size < remote_offset + length) {
752 cout <<
"mr.size = " << mr.
size <<
" offset = " << offset
753 <<
" length = " << length <<
" remote_mr.size = " << remote_mr.
size 754 <<
" remote_offset = " << remote_offset;
760 ibv_send_wr *bad_wr = NULL;
763 memset(&sge, 0,
sizeof(sge));
764 sge.addr = (uintptr_t)(mr.
buffer + offset);
766 sge.lkey = mr.
mr->lkey;
769 memset(&sr, 0,
sizeof(sr));
774 sr.opcode = IBV_WR_RDMA_WRITE;
775 sr.send_flags = (signaled ? IBV_SEND_SIGNALED : 0) | (send_inline ? IBV_SEND_INLINE : 0);
776 sr.wr.rdma.remote_addr = remote_mr.
buffer + remote_offset;
777 sr.wr.rdma.rkey = remote_mr.
rkey;
779 if(ibv_post_send(qp.get(), &sr, &bad_wr)) {
780 fprintf(stderr,
"failed to post SR\n");
786 #ifdef MELLANOX_EXPERIMENTAL_VERBS 790 auto it = sockets.find(remote_index);
793 auto &sock = it->second;
795 ibv_exp_qp_init_attr attr;
796 memset(&attr, 0,
sizeof(attr));
797 attr.qp_context =
nullptr;
798 attr.send_cq = scq.cq.get();
799 attr.recv_cq = rcq.cq.get();
801 attr.cap.max_send_wr = 1024;
802 attr.cap.max_recv_wr = 1024;
803 attr.cap.max_send_sge = 1;
804 attr.cap.max_recv_sge = 1;
805 attr.cap.max_inline_data = 0;
806 attr.qp_type = IBV_QPT_RC;
808 attr.comp_mask = IBV_EXP_QP_INIT_ATTR_CREATE_FLAGS | IBV_EXP_QP_INIT_ATTR_PD;
811 attr.exp_create_flags = IBV_EXP_QP_CREATE_CROSS_CHANNEL | IBV_EXP_QP_CREATE_MANAGED_SEND;
812 attr.max_inl_recv = 0;
815 [](ibv_qp *q) { ibv_destroy_qp(q); });
818 fprintf(stderr,
"failed to create QP, (errno = %s)\n", strerror(errno));
824 memset(&local_con_data, 0,
sizeof(local_con_data));
825 memset(&remote_con_data, 0,
sizeof(remote_con_data));
826 union ibv_gid my_gid;
828 if(local_config.
gid_idx >= 0) {
830 local_config.
gid_idx, &my_gid);
832 fprintf(stderr,
"could not get gid for port %d, index %d\n",
837 memset(&my_gid, 0,
sizeof my_gid);
841 local_con_data.
qp_num = qp->qp_num;
843 memcpy(local_con_data.
gid, &my_gid, 16);
848 if(!sock.exchange(local_con_data, remote_con_data))
851 bool success = !modify_qp_to_init(qp.get(), local_config.
ib_port) && !modify_qp_to_rtr(qp.get(), remote_con_data.
qp_num, remote_con_data.
lid, remote_con_data.
gid, local_config.
ib_port, local_config.
gid_idx) && !modify_qp_to_rts(qp.get());
863 ibv_exp_qp_init_attr attr;
864 memset(&attr, 0,
sizeof(attr));
865 attr.qp_context =
nullptr;
869 attr.cap.max_send_wr = 1024;
870 attr.cap.max_recv_wr = 0;
871 attr.cap.max_send_sge = 1;
872 attr.cap.max_recv_sge = 1;
873 attr.cap.max_inline_data = 0;
874 attr.qp_type = IBV_QPT_RC;
876 attr.comp_mask = IBV_EXP_QP_INIT_ATTR_CREATE_FLAGS | IBV_EXP_QP_INIT_ATTR_PD;
879 attr.exp_create_flags = IBV_EXP_QP_CREATE_CROSS_CHANNEL;
880 attr.max_inl_recv = 0;
883 [](ibv_qp *q) { ibv_destroy_qp(q); });
886 fprintf(stderr,
"failed to create QP, (errno = %s)\n", strerror(errno));
890 bool success = !modify_qp_to_init(qp.get(), local_config.
ib_port) && !modify_qp_to_rtr(qp.get(), qp->qp_num, 0,
nullptr, local_config.
ib_port, -1) && !modify_qp_to_rts(qp.get());
901 std::list<ibv_sge> sges;
902 vector<ibv_recv_wr> recv_wrs;
903 vector<ibv_exp_send_wr> send_wrs;
905 map<ibv_qp *, vector<size_t>> recv_list;
906 map<ibv_qp *, vector<size_t>> send_list;
907 vector<size_t> mqp_list;
910 task_impl(ibv_qp *mqp_ptr) : mqp(mqp_ptr) {}
913 task::task(std::shared_ptr<manager_queue_pair> manager_qp)
914 : impl(
new task_impl(manager_qp->qp.get())), mqp(manager_qp) {}
919 impl->send_wrs.emplace_back();
920 auto &wr = impl->send_wrs.back();
922 wr.sg_list =
nullptr;
924 wr.exp_opcode = IBV_EXP_WR_CQE_WAIT;
925 wr.exp_send_flags = (signaled ? IBV_SEND_SIGNALED : 0) | (last ? IBV_EXP_SEND_WAIT_EN_LAST : 0);
927 wr.task.cqe_wait.cq = cq.
cq.get();
928 wr.task.cqe_wait.cq_count = count;
931 impl->mqp_list.push_back(impl->send_wrs.size() - 1);
934 impl->send_wrs.emplace_back();
935 auto &wr = impl->send_wrs.back();
936 wr.wr_id = 0xfffffffff1f1f1f1;
937 wr.sg_list =
nullptr;
939 wr.exp_opcode = IBV_EXP_WR_SEND_ENABLE;
940 wr.exp_send_flags = 0;
942 wr.task.wqe_enable.qp = qp.
qp.get();
943 wr.task.wqe_enable.wqe_count = count;
946 impl->mqp_list.push_back(impl->send_wrs.size() - 1);
949 size_t offset,
size_t length, uint32_t immediate) {
950 impl->sges.emplace_back();
951 auto &sge = impl->sges.back();
952 sge.addr = (uintptr_t)mr.
buffer + offset;
954 sge.lkey = mr.
mr->lkey;
956 impl->send_wrs.emplace_back();
957 auto &wr = impl->send_wrs.back();
958 wr.wr_id = 0xfffffffff2f2f2f2;
960 wr.sg_list = &impl->sges.back();
962 wr.exp_opcode = IBV_EXP_WR_SEND;
963 wr.exp_send_flags = 0;
964 wr.ex.imm_data = immediate;
966 impl->send_list[qp.
qp.get()].push_back(impl->send_wrs.size() - 1);
969 size_t offset,
size_t length) {
970 impl->sges.emplace_back();
971 auto &sge = impl->sges.back();
972 sge.addr = (uintptr_t)mr.
buffer + offset;
974 sge.lkey = mr.
mr->lkey;
976 impl->recv_wrs.emplace_back();
977 auto &wr = impl->recv_wrs.back();
978 wr.wr_id = 0xfffffffff3f3f3f3;
980 wr.sg_list = &impl->sges.back();
982 impl->recv_list[qp.
qp.get()].push_back(impl->recv_wrs.size() - 1);
985 size_t num_tasks = 1 + impl->send_list.size() + impl->recv_list.size();
986 auto tasks = make_unique<ibv_exp_task[]>(num_tasks);
989 for(
auto &&l : impl->recv_list) {
990 for(
size_t i = 0; i + 1 < l.second.size(); i++) {
991 impl->recv_wrs[l.second[i]].next = &impl->recv_wrs[l.second[i + 1]];
994 tasks[index].item.qp = l.first;
995 tasks[index].item.recv_wr = &impl->recv_wrs[l.second.front()];
996 tasks[index].task_type = IBV_EXP_TASK_RECV;
997 tasks[index].next = &tasks[index + 1];
998 tasks[index].comp_mask = 0;
1001 for(
auto &&l : impl->send_list) {
1002 for(
size_t i = 0; i + 1 < l.second.size(); i++) {
1003 impl->send_wrs[l.second[i]].next = &impl->send_wrs[l.second[i + 1]];
1006 tasks[index].item.qp = l.first;
1007 tasks[index].item.send_wr = &impl->send_wrs[l.second.front()];
1008 tasks[index].task_type = IBV_EXP_TASK_SEND;
1009 tasks[index].next = &tasks[index + 1];
1010 tasks[index].comp_mask = 0;
1014 for(
size_t i = 0; i + 1 < impl->mqp_list.size(); i++) {
1015 impl->send_wrs[impl->mqp_list[i]].next = &impl->send_wrs[impl->mqp_list[i + 1]];
1018 tasks[index].item.qp = impl->mqp;
1019 tasks[index].item.send_wr = &impl->send_wrs[impl->mqp_list.front()];
1020 tasks[index].task_type = IBV_EXP_TASK_SEND;
1021 tasks[index].next =
nullptr;
1022 tasks[index].comp_mask = 0;
1024 ibv_exp_task *bad =
nullptr;
1052 size_t offset,
size_t length, uint32_t immediate) {
1056 size_t offset,
size_t length) {
1068 std::lock_guard<std::mutex> l(completion_handlers_mutex);
1070 if(completion_handlers.size() >= std::numeric_limits<tag_type>::max())
1073 tag = completion_handlers.size();
1076 set.
send = send_handler;
1077 set.recv = recv_handler;
1078 set.write = write_handler;
1080 completion_handlers.push_back(
set);
1084 static message_type m(std::numeric_limits<tag_type>::max());
1089 return supported_features;
1122 const vector<uint32_t> &members, uint32_t
node_rank,
1124 map<uint32_t, remote_memory_region> remote_mrs;
1125 for(uint32_t m : members) {
1126 if(m == node_rank) {
1130 auto it = sockets.find(m);
1131 if(it == sockets.end()) {
1139 bool still_connected = it->second.exchange((uintptr_t)mr.
buffer, buffer) && it->second.exchange((
size_t)mr.
size, size) && it->second.exchange((uint32_t)mr.
get_rkey(),
rkey);
1141 if(!still_connected) {
1142 fprintf(stderr,
"WARNING: lost connection to node %u\n",
1143 (
unsigned int)it->first);
socket accept()
Blocks until a remote client makes a connection to this connection listener, then returns a new socke...
void append_recv(const managed_endpoint &ep, const memory_region &mr, size_t offset, size_t length)
partial_wrapped< Tag, Ret, NewClass, Args... > tag(Ret(NewClass::*fun)(Args...))
User-facing entry point for the series of functions that binds a FunctionTag to a class's member func...
ibv_comp_channel * verbs_get_completion_channel()
std::function< void(uint64_t tag, uint32_t immediate, size_t length)> completion_handler
std::map< uint32_t, remote_memory_region > verbs_exchange_memory_regions(const std::vector< uint32_t > &members, uint32_t node_rank, const memory_region &mr)
int ib_port
Local IB port to work with.
std::unique_ptr< fid_mr, std::function< void(fid_mr *)> > mr
Smart pointer for managing the registered memory region.
std::unique_ptr< fid_cq, std::function< void(fid_cq *)> > cq
Smart pointer for managing the completion queue.
bool verbs_add_connection(uint32_t index, const std::string &address, uint32_t node_rank)
ibv_device_attr device_attr
static message_type ignored()
bool post_empty_recv(uint64_t wr_id, const message_type &type)
void append_send(const managed_endpoint &ep, const memory_region &mr, size_t offset, size_t length, uint32_t immediate)
A C++ wrapper for the libfabric fid_cq struct and its associated functions.
void append_wait(const completion_queue &cq, int count, bool signaled, bool last, uint64_t wr_id, const message_type &type)
A C++ wrapper for the IB Verbs ibv_qp struct and its associated functions.
Listener to detect new incoming connections.
bool send(uint16_t group_number, std::shared_ptr< rdma::memory_region > mr, size_t offset, size_t length) __attribute__((warn_unused_result))
Contains functions and classes for low-level RDMA operations, such as setting up memory regions and q...
static constexpr unsigned int shift_bits
completion_queue()
Constructor Uses the libfabrics API to open a completion queue.
void append_enable_send(const managed_endpoint &ep, int count)
task(std::shared_ptr< manager_endpoint > manager_ep)
memory_region(size_t size)
Constructor Creates a buffer of the specified size and then calls the second constructor with the new...
struct rdma::ibv_resources verbs_resources
unique_ptr< ibv_mr, std::function< void(ibv_mr *)> > ibv_mr_unique_ptr
bool set_contiguous_memory_mode(bool enabled)
bool post_write(const memory_region &mr, size_t offset, size_t length, uint64_t wr_id, remote_memory_region remote_mr, size_t remote_offset, const message_type &type, bool signaled=false, bool send_inline=false)
bool verbs_initialize(const map< uint32_t, std::pair< ip_addr_t, uint16_t >> &ip_addrs_and_ports, uint32_t node_rank)
std::unique_ptr< char[]> allocated_buffer
Smart pointer for managing the buffer the mr uses.
bool post_empty_send(uint64_t wr_id, uint32_t immediate, const message_type &type)
bool post_recv(const memory_region &mr, size_t offset, size_t length, uint64_t wr_id, const message_type &type)
bool set_interrupt_mode(bool enabled)
uint32_t get_rkey() const
managed_queue_pair(size_t remote_index, std::function< void(managed_queue_pair *)> post_recvs)
std::optional< tag_type > tag
int gid_idx
GID index to use.
bool post() __attribute__((warn_unused_result))
feature_set get_supported_features()
struct sst::verbs_sender_ctxt __attribute__
bool verbs_remove_connection(uint32_t index)
bool exchange(T local, T &remote)
bool post_send(const memory_region &mr, size_t offset, size_t length, uint64_t wr_id, uint32_t immediate, const message_type &type)
std::unique_ptr< ibv_qp, std::function< void(ibv_qp *)> > qp