10 #include <arpa/inet.h> 12 #include <rdma/fabric.h> 13 #include <rdma/fi_cm.h> 14 #include <rdma/fi_tagged.h> 15 #include <rdma/fi_rma.h> 16 #include <rdma/fi_endpoint.h> 17 #include <rdma/fi_domain.h> 27 #if __BYTE_ORDER == __LITTLE_ENDIAN 28 static inline uint64_t htonll(uint64_t x) {
return bswap_64(x); }
29 static inline uint64_t ntohll(uint64_t x) {
return bswap_64(x); }
30 #elif __BYTE_ORDER == __BIG_ENDIAN 31 static inline uint64_t htonll(uint64_t x) {
return x; }
32 static inline uint64_t ntohll(uint64_t x) {
return x; }
34 #error __BYTE_ORDER is neither 35 __LITTLE_ENDIAN nor __BIG_ENDIAN
42 #define CRASH_WITH_MESSAGE(...) \ 44 fprintf(stderr,__VA_ARGS__); \ 54 #define FAIL_IF_NONZERO_RETRY_EAGAIN(x,desc,next) \ 58 _int64_r_ = (int64_t)(x); \ 59 } while ( _int64_r_ == -FI_EAGAIN ); \ 60 if (_int64_r_ != 0) { \ 61 dbg_default_error("{}:{},ret={},{}",__FILE__,__LINE__,_int64_r_,desc); \ 62 fprintf(stderr,"%s:%d,ret=%ld,%s\n",__FILE__,__LINE__,_int64_r_,desc); \ 63 if (next == CRASH_ON_FAILURE) { \ 69 #define FAIL_IF_ZERO(x,desc,next) \ 71 int64_t _int64_r_ = (int64_t)(x); \ 72 if (_int64_r_ == 0) { \ 73 dbg_default_error("{}:{},{}",__FILE__,__LINE__,desc); \ 74 fprintf(stderr,"%s:%d,%s\n",__FILE__,__LINE__,desc); \ 75 if (next == CRASH_ON_FAILURE) { \ 85 #define MAX_LF_ADDR_SIZE ((128)-sizeof(uint32_t)-2*sizeof(uint64_t)) 110 static vector<completion_handler_set> completion_handlers;
111 static std::mutex completion_handlers_mutex;
127 struct fi_eq_attr eq_attr;
128 struct fi_cq_attr cq_attr;
133 #define LF_USE_VADDR ((g_ctxt.fi->domain_attr->mr_mode) & (FI_MR_VIRT_ADDR | FI_MR_BASIC)) 134 #define LF_CONFIG_FILE "rdma.cfg" 141 #define OP_BITS_SHIFT (48) 142 #define OP_BITS_MASK (0x00ff000000000000ull) 143 #define EXTRACT_RDMA_OP_CODE(x) ((uint8_t)((((uint64_t)x) & OP_BITS_MASK) >> OP_BITS_SHIFT)) 150 static void default_context() {
151 memset((
void*)&
g_ctxt, 0,
sizeof(
struct lf_ctxt));
156 g_ctxt.hints->caps = FI_MSG | FI_RMA | FI_READ | FI_WRITE |
157 FI_REMOTE_READ | FI_REMOTE_WRITE;
159 g_ctxt.hints->ep_attr->type = FI_EP_MSG;
161 g_ctxt.hints->mode = ~0;
163 g_ctxt.cq_attr.format = FI_CQ_FORMAT_DATA;
166 g_ctxt.cq_attr.wait_obj = FI_WAIT_FD;
181 if (strcmp(g_ctxt.hints->fabric_attr->prov_name,
"sockets")==0) {
182 g_ctxt.hints->domain_attr->mr_mode = FI_MR_BASIC;
193 g_ctxt.hints->domain_attr->mr_mode = FI_MR_LOCAL | FI_MR_ALLOCATED | FI_MR_PROV_KEY | FI_MR_VIRT_ADDR;
210 const int mr_access = FI_WRITE | FI_REMOTE_READ | FI_REMOTE_WRITE;
216 0, 0, 0, &raw_mr,
nullptr),
221 mr = unique_ptr<fid_mr, std::function<void(fid_mr *)>>(
222 raw_mr, [](fid_mr *
mr) { fi_close(&
mr->fid); }
240 cq = unique_ptr<fid_cq, std::function<void(fid_cq *)>>(
241 raw_cq, [](fid_cq *cq) { fi_close(&cq->fid); }
249 std::function<
void(
endpoint *)> post_recvs) {
250 connect(remote_index, is_lf_server, post_recvs);
262 dbg_default_trace(
"{}:{} created rdmc endpoint: {}",__FILE__,__func__,(
void*)&raw_ep->fid);
265 ep = unique_ptr<fid_ep, std::function<void(fid_ep *)>>(
280 eq = unique_ptr<fid_eq, std::function<void(fid_eq *)>>(
281 raw_eq, [](fid_eq *
eq) { fi_close(&
eq->fid); }
286 ret = fi_ep_bind(raw_ep, &(raw_eq)->fid, 0),
290 const uint64_t ep_flags = FI_RECV | FI_TRANSMIT | FI_SELECTIVE_COMPLETION;
292 ret = fi_ep_bind(raw_ep, &(
g_ctxt.
cq)->fid, ep_flags),
297 ret = fi_enable(raw_ep),
306 return rdmc_connections->
exchange(r_id, s, t);
310 std::function<
void(
endpoint *)> post_recvs) {
312 memset(&local_cm_data, 0,
sizeof(local_cm_data));
313 memset(&remote_cm_data, 0,
sizeof(remote_cm_data));
320 rdmc_connections->
exchange(remote_index, local_cm_data, remote_cm_data),
328 struct fi_eq_cm_entry entry;
333 nRead = fi_eq_sread(
g_ctxt.
peq, &event, &entry,
sizeof(entry), -1, 0);
334 if(nRead !=
sizeof(entry)) {
337 if (
init(entry.info)){
338 fi_reject(
g_ctxt.
pep, entry.info->handle, NULL, 0);
339 fi_freeinfo(entry.info);
342 if (fi_accept(
ep.get(), NULL, 0)){
343 fi_reject(
g_ctxt.
pep, entry.info->handle, NULL, 0);
344 fi_freeinfo(entry.info);
347 fi_freeinfo(entry.info);
349 struct fi_info * client_hints = fi_dupinfo(
g_ctxt.
hints);
350 struct fi_info * client_info = NULL;
354 client_hints->dest_addr = malloc(remote_cm_data.
pep_addr_len),
357 memcpy((
void*)client_hints->dest_addr,
360 client_hints->dest_addrlen = remote_cm_data.
pep_addr_len;
362 fi_getinfo(
LF_VERSION, NULL, NULL, 0, client_hints, &client_info),
367 if (
init(client_info)){
368 fi_freeinfo(client_hints);
369 fi_freeinfo(client_info);
373 fi_connect(
ep.get(), remote_cm_data.
pep_addr, NULL, 0),
378 nRead = fi_eq_sread(this->
eq.get(), &event, &entry,
sizeof(entry), -1, 0);
379 if (nRead !=
sizeof(entry)) {
382 if (event != FI_CONNECTED || entry.fid != &(
ep->fid)) {
383 fi_freeinfo(client_hints);
384 fi_freeinfo(client_info);
387 fi_freeinfo(client_hints);
388 fi_freeinfo(client_info);
393 if (!rdmc_connections->
exchange(remote_index, 0, tmp) || tmp != 0)
398 uint64_t wr_id, uint32_t immediate,
400 struct iovec msg_iov;
403 msg_iov.iov_base = mr.
buffer + offset;
404 msg_iov.iov_len = size;
406 msg.msg_iov = &msg_iov;
407 msg.desc = (
void**)&mr.
mr->key;
411 msg.data = immediate;
414 fi_sendmsg(
ep.get(), &msg, FI_COMPLETION|FI_REMOTE_CQ_DATA),
422 struct iovec msg_iov;
425 msg_iov.iov_base = mr.
buffer + offset;
426 msg_iov.iov_len = size;
428 msg.msg_iov = &msg_iov;
429 msg.desc = (
void**)&mr.
mr->key;
435 fi_recvmsg(
ep.get(), &msg, FI_COMPLETION),
445 memset(&msg, 0,
sizeof(msg));
447 msg.data = immediate;
450 fi_sendmsg(
ep.get(), &msg, FI_COMPLETION|FI_REMOTE_CQ_DATA),
459 memset(&msg, 0,
sizeof(msg));
463 fi_recvmsg(
ep.get(), &msg, FI_COMPLETION),
472 bool signaled,
bool send_inline) {
474 if(mr.
size < offset + size || remote_mr.
size < remote_offset + size) {
475 cout <<
"mr.size = " << mr.
size <<
" offset = " << offset
476 <<
" length = " << size <<
" remote_mr.size = " << remote_mr.
size 477 <<
" remote_offset = " << remote_offset;
481 struct iovec msg_iov;
482 struct fi_rma_iov rma_iov;
483 struct fi_msg_rma msg;
485 msg_iov.iov_base = mr.
buffer + offset;
486 msg_iov.iov_len = size;
490 rma_iov.key = remote_mr.
rkey;
492 msg.msg_iov = &msg_iov;
493 msg.desc = (
void**)&mr.
mr->key;
496 msg.rma_iov = &rma_iov;
497 msg.rma_iov_count = 1;
502 fi_writemsg(
ep.get(), &msg, FI_COMPLETION),
513 std::lock_guard<std::mutex> l(completion_handlers_mutex);
518 tag = completion_handlers.size();
521 set.
send = send_handler;
522 set.recv = recv_handler;
523 set.write = write_handler;
525 completion_handlers.push_back(
set);
529 static message_type m(std::numeric_limits<tag_type>::max());
553 size_t offset,
size_t length, uint32_t immediate) {
557 size_t offset,
size_t length) {
571 const std::pair<ip_addr_t, uint16_t> &new_ip_addr_and_port) {
572 return rdmc_connections->
add_node(new_id, new_ip_addr_and_port);
582 static atomic<bool> interrupt_mode;
583 static atomic<bool> polling_loop_shutdown_flag;
584 static void polling_loop() {
585 pthread_setname_np(pthread_self(),
"rdmc_poll");
587 const int max_cq_entries = 1024;
588 unique_ptr<fi_cq_data_entry[]> cq_entries(
new fi_cq_data_entry[max_cq_entries]);
591 int num_completions = 0;
592 while (num_completions == 0 || num_completions == -FI_EAGAIN) {
593 if (polling_loop_shutdown_flag)
return;
594 uint64_t poll_end =
get_time() + (interrupt_mode ? 0L : 50000000L);
596 if(polling_loop_shutdown_flag)
return;
597 num_completions = fi_cq_read(
g_ctxt.
cq, cq_entries.get(), max_cq_entries);
598 }
while((num_completions == 0 || num_completions == -FI_EAGAIN) &&
get_time() < poll_end);
600 if (num_completions == 0 || num_completions == -FI_EAGAIN) {
603 num_completions = fi_cq_read(
g_ctxt.
cq, cq_entries.get(), max_cq_entries);
605 if (num_completions == 0 || num_completions == -FI_EAGAIN) {
606 pollfd file_descriptor;
607 fi_control(&
g_ctxt.
cq->fid, FI_GETWAIT, &file_descriptor);
609 while (rc == 0 && !polling_loop_shutdown_flag) {
610 if(polling_loop_shutdown_flag)
return;
611 file_descriptor.events = POLLIN|POLLERR|POLLHUP;
612 file_descriptor.revents = 0;
613 rc = poll(&file_descriptor, 1, 50);
617 num_completions = fi_cq_read(
g_ctxt.
cq, cq_entries.get(), max_cq_entries);
623 if (num_completions < 0) {
624 cout <<
"Failed to read from completion queue, fi_cq_read returned " 625 << num_completions << std::endl;
628 std::lock_guard<std::mutex> l(completion_handlers_mutex);
629 for (
int i = 0; i < num_completions; i++) {
630 fi_cq_data_entry &cq_entry = cq_entries[i];
633 if (type == std::numeric_limits<message_type::tag_type>::max())
636 uint64_t masked_wr_id = (uint64_t)cq_entry.op_context & 0x0000ffffffffffffull;
638 uint32_t immediate = cq_entry.data;
639 if (type >= completion_handlers.size()) {
642 completion_handlers[type].send(masked_wr_id, immediate,
645 completion_handlers[type].recv(masked_wr_id, immediate,
648 completion_handlers[type].write(masked_wr_id, immediate,
651 puts(
"Sent unrecognized completion type?!");
693 "failed to open the event queue for passive endpoint",
698 "failed to bind event queue to passive endpoint",
701 "failed to prepare passive endpoint for incoming connections",
707 "local name is too big to fit in local buffer",
718 polling_thread.detach();
728 const std::vector<uint32_t>& members, uint32_t
node_rank,
731 map<uint32_t, remote_memory_region> remote_mrs;
732 for (uint32_t m : members) {
733 if (m == node_rank) {
744 fprintf(stderr,
"WARNING: lost connection to node %u\n", m);
753 interrupt_mode = enabled;
void append_recv(const managed_endpoint &ep, const memory_region &mr, size_t offset, size_t length)
partial_wrapped< Tag, Ret, NewClass, Args... > tag(Ret(NewClass::*fun)(Args...))
User-facing entry point for the series of functions that binds a FunctionTag to a class's member func...
struct fi_cq_attr cq_attr
event queue attributes
struct fid_cq * cq
event queue for connection management
bool add_node(node_id_t new_id, const std::pair< ip_addr_t, uint16_t > &new_ip_addr_and_port)
Adds a TCP connection to a new node.
std::function< void(uint64_t tag, uint32_t immediate, size_t length)> completion_handler
std::thread polling_thread
const int32_t getInt32(const std::string &key) const
std::unique_ptr< fid_mr, std::function< void(fid_mr *)> > mr
Smart pointer for managing the registered memory region.
const std::string & getConfString(const std::string &key)
#define FAIL_IF_NONZERO_RETRY_EAGAIN(x, desc, next)
bool exchange(node_id_t node_id, T local, T &remote)
static message_type ignored()
uint64_t get_key() const
get_key Returns the key associated with the registered memory region, which is used to access the reg...
void append_send(const managed_endpoint &ep, const memory_region &mr, size_t offset, size_t length, uint32_t immediate)
struct lf_ctxt g_ctxt
The global context for libfabric.
A C++ wrapper for the libfabric fid_cq struct and its associated functions.
void append_wait(const completion_queue &cq, int count, bool signaled, bool last, uint64_t wr_id, const message_type &type)
#define dbg_default_debug(...)
char pep_addr[MAX_LF_ADDR_SIZE]
length of local pep address
std::unique_ptr< fid_eq, std::function< void(fid_eq *)> > eq
Smart pointer for managing the endpoint.
bool post_send(const memory_region &mr, size_t offset, size_t size, uint64_t wr_id, uint32_t immediate, const message_type &type)
post_send Uses the libfabrics API to post a buffer to an endpoint.
Listener to detect new incoming connections.
#define FAIL_IF_ZERO(x, desc, next)
Contains functions and classes for low-level RDMA operations, such as setting up memory regions and q...
#define CONF_RDMA_PROVIDER
static constexpr unsigned int shift_bits
completion_queue()
Constructor Uses the libfabrics API to open a completion queue.
void append_enable_send(const managed_endpoint &ep, int count)
static const Conf * get() noexcept
task(std::shared_ptr< manager_endpoint > manager_ep)
char pep_addr[MAX_LF_ADDR_SIZE]
local endpoint address length
struct fi_eq_attr eq_attr
local pep address
void connect(size_t remote_index, bool is_lf_server, std::function< void(endpoint *)> post_recvs)
connect Uses the initialized endpoint to connect to a remote node
memory_region(size_t size)
Constructor Creates a buffer of the specified size and then calls the second constructor with the new...
bool lf_remove_connection(uint32_t node_id)
Removes a node's TCP connection, presumably because it has failed.
#define CONF_RDMA_RX_DEPTH
bool delete_node(node_id_t remove_id)
Removes a node from the managed set of TCP connections, closing the socket connected to it...
#define CRASH_WITH_MESSAGE(...)
size_t pep_addr_len
completion queue for all rma operations
#define dbg_default_trace(...)
std::map< uint32_t, remote_memory_region > lf_exchange_memory_regions(const std::vector< uint32_t > &members, uint32_t node_rank, const memory_region &mr)
bool lf_add_connection(uint32_t new_id, const std::pair< ip_addr_t, uint16_t > &new_ip_addr_and_port)
Adds a node to the group via tcp.
#define dbg_default_flush()
bool post_recv(const memory_region &mr, size_t offset, size_t size, uint64_t wr_id, const message_type &type)
post_recv Uses the libfabrics API to post a buffer to the recv queue of an endpoint.
A C++ wrapper for the libfabric fid_ep struct and its associated functions.
uint32_t node_id_t
Type alias for Node IDs in a Derecho group.
#define CONF_RDMA_TX_DEPTH
#define EXTRACT_RDMA_OP_CODE(x)
std::unique_ptr< char[]> allocated_buffer
Smart pointer for managing the buffer the mr uses.
NextOnFailure
Testing tools from Weijia's sst code.
struct fid_fabric * fabric
fabric information
bool set_interrupt_mode(bool enabled)
char pep_addr[MAX_LF_ADDR_SIZE]
local endpoint address length
int init(struct fi_info *fi)
init Creates an endpoint, and then initializes/enables it
std::optional< tag_type > tag
struct rdma::cm_con_data_t __attribute__((packed))
bool lf_initialize(const std::map< uint32_t, std::pair< ip_addr_t, uint16_t >> &ip_addrs_and_ports, uint32_t node_rank)
Initialize the global context.
bool post_write(const memory_region &mr, size_t offset, size_t size, uint64_t wr_id, remote_memory_region remote_mr, size_t remote_offset, const message_type &type, bool signaled=false, bool send_inline=false)
std::unique_ptr< fid_ep, std::function< void(fid_ep *)> > ep
bool post() __attribute__((warn_unused_result))
bool post_empty_recv(uint64_t wr_id, const message_type &type)
bool post_empty_send(uint64_t wr_id, uint32_t immediate, const message_type &type)
struct fid_pep * pep
domain handle
struct fid_domain * domain
fabric handle
#define MAX_LF_ADDR_SIZE
Passive endpoint info to be exchange.
struct fid_eq * peq
passive endpoint for receiving connection
tcp::tcp_connections * rdmc_connections
Object to hold the tcp connections for every node.