34 map<uint16_t, shared_ptr<group>>
groups;
37 bool initialize(
const map<uint32_t, std::pair<ip_addr_t, uint16_t>>& ip_addrs_and_ports, uint32_t _node_rank) {
38 if(shutdown_flag)
return false;
40 node_rank = _node_rank;
52 void add_address(uint32_t index,
const std::pair<ip_addr_t, uint16_t>& address) {
60 bool create_group(uint16_t group_number, std::vector<uint32_t> members,
65 if(shutdown_flag)
return false;
68 uint32_t member_index =
index_of(members, node_rank);
76 send_schedule =
new tree_schedule(members.size(), member_index);
78 puts(
"Unsupported group type?!");
83 unique_lock<mutex> lock(groups_lock);
84 auto g = make_shared<polling_group>(group_number, block_size, members,
85 member_index, incoming_upcall, callback,
86 unique_ptr<schedule>(send_schedule));
87 auto p = groups.emplace(group_number, std::move(g));
92 if(shutdown_flag)
return;
94 unique_lock<mutex> lock(groups_lock);
95 LOG_EVENT(group_number, -1, -1,
"destroy_group");
96 groups.erase(group_number);
99 bool send(uint16_t group_number, shared_ptr<memory_region> mr,
size_t offset,
101 if(shutdown_flag)
return false;
105 unique_lock<mutex> lock(groups_lock);
106 auto it = groups.find(group_number);
107 if(it == groups.end())
return false;
110 LOG_EVENT(group_number, -1, -1,
"preparing_to_send_message");
111 g->send_message(mr, offset, length);
119 barrier_group::barrier_group(vector<uint32_t> members) {
120 member_index =
index_of(members, node_rank);
121 group_size = members.size();
123 if(group_size <= 1 || member_index >= members.size())
126 total_steps = ceil(log2(group_size));
127 for(
unsigned int m = 0; m < total_steps; m++) steps[m] = -1;
129 steps_mr = make_unique<memory_region>((
char*)&steps[0],
130 total_steps *
sizeof(int64_t));
131 number_mr = make_unique<memory_region>((
char*)&number,
sizeof(number));
133 set<uint32_t> targets;
134 for(
unsigned int m = 0; m < total_steps; m++) {
135 auto target = (member_index + (1 << m)) % group_size;
136 auto target2 = (group_size * (1 << m) + member_index - (1 << m)) % group_size;
137 targets.insert(target);
138 targets.insert(target2);
142 map<uint32_t, queue_pair> qps;
143 for(
auto target : targets) {
144 qps.emplace(target,
queue_pair(members[target]));
148 members, node_rank, *steps_mr.get());
149 for(
unsigned int m = 0; m < total_steps; m++) {
150 auto target = (member_index + (1 << m)) % group_size;
152 remote_memory_regions.push_back(remote_mrs.find(target)->second);
154 auto qp_it = qps.find(target);
155 endpoints.push_back(std::move(qp_it->second));
159 for(
auto it = qps.begin(); it != qps.end(); it++) {
160 extra_endpoints.push_back(std::move(it->second));
164 map<uint32_t, endpoint> eps;
165 for(
auto target : targets) {
167 bool is_lf_server = members[member_index] < members[target];
168 eps.emplace(target,
endpoint(members[target], is_lf_server));
172 members, node_rank, *steps_mr.get());
173 for(
unsigned int m = 0; m < total_steps; m++) {
174 auto target = (member_index + (1 << m)) % group_size;
176 remote_memory_regions.push_back(remote_mrs.find(target)->second);
178 auto ep_it = eps.find(target);
179 endpoints.push_back(std::move(ep_it->second));
182 for(
auto it = eps.begin(); it != eps.end(); it++) {
183 extra_endpoints.push_back(std::move(it->second));
188 void barrier_group::barrier_wait() {
192 unique_lock<mutex> l(lock);
196 for(
unsigned int m = 0; m < total_steps; m++) {
198 if(!queue_pairs[m].post_write(
200 if(!endpoints[m].post_write(
202 *number_mr.get(), 0, 8,
203 form_tag(0, (node_rank + (1 << m)) % group_size),
204 remote_memory_regions[m], m * 8, message_type::ignored(),
209 while(steps[m] < number)
static void initialize_message_types()
void add_address(uint32_t index, const std::pair< ip_addr_t, uint16_t > &address)
bool initialize(const std::map< uint32_t, std::pair< ip_addr_t, uint16_t >> &addresses, uint32_t node_rank) __attribute__((warn_unused_result))
std::map< uint32_t, remote_memory_region > verbs_exchange_memory_regions(const std::vector< uint32_t > &members, uint32_t node_rank, const memory_region &mr)
bool verbs_add_connection(uint32_t index, const std::string &address, uint32_t node_rank)
#define LOG_EVENT(group_number, message_number, block_number, event_name)
atomic< bool > shutdown_flag
std::function< void(std::optional< uint32_t > suspected_victim)> failure_callback_t
A C++ wrapper for the IB Verbs ibv_qp struct and its associated functions.
void destroy_group(uint16_t group_number)
Contains functions and classes for low-level RDMA operations, such as setting up memory regions and q...
bool verbs_initialize(const std::map< uint32_t, std::string > &node_addresses, uint32_t node_rank)
uint64_t form_tag(uint16_t group_number, uint32_t target)
bool send(uint16_t group_number, shared_ptr< memory_region > mr, size_t offset, size_t length)
std::map< uint32_t, remote_memory_region > lf_exchange_memory_regions(const std::vector< uint32_t > &members, uint32_t node_rank, const memory_region &mr)
bool lf_add_connection(uint32_t new_id, const std::pair< ip_addr_t, uint16_t > &new_ip_addr_and_port)
Adds a node to the group via tcp.
std::function< receive_destination(size_t size)> incoming_message_callback_t
A C++ wrapper for the libfabric fid_ep struct and its associated functions.
std::function< void(char *buffer, size_t size)> completion_callback_t
bool create_group(uint16_t group_number, std::vector< uint32_t > members, size_t block_size, send_algorithm algorithm, incoming_message_callback_t incoming_receive, completion_callback_t send_callback, failure_callback_t failure_callback) __attribute__((warn_unused_result))
Creates a new RDMC group.
size_t index_of(T container, U elem)
bool lf_initialize(const std::map< uint32_t, std::pair< ip_addr_t, uint16_t >> &ip_addrs_and_ports, uint32_t node_rank)
Initialize the global context.
map< uint16_t, shared_ptr< group > > groups