Derecho  0.9
Distributed systems toolkit for RDMA
rdmc.cpp
Go to the documentation of this file.
1 #include <derecho/rdmc/rdmc.hpp>
6 #ifdef USE_VERBS_API
8 #else
10 #endif
11 
12 #include <atomic>
13 #include <cmath>
14 #include <cstdint>
15 #include <cstdio>
16 #include <map>
17 #include <memory>
18 #include <mutex>
19 #include <set>
20 #include <string>
21 #include <utility>
22 #include <vector>
23 
25 
26 using namespace std;
27 using namespace rdma;
28 
29 namespace rdmc {
30 uint32_t node_rank;
31 atomic<bool> shutdown_flag;
32 
33 // map from group number to group
34 map<uint16_t, shared_ptr<group>> groups;
36 
37  bool initialize(const map<uint32_t, std::pair<ip_addr_t, uint16_t>>& ip_addrs_and_ports, uint32_t _node_rank) {
38  if(shutdown_flag) return false;
39 
40  node_rank = _node_rank;
41 #ifdef USE_VERBS_API
42  if(!::rdma::impl::verbs_initialize(ip_addrs_and_ports, node_rank)) {
43 #else
44  if (!::rdma::impl::lf_initialize(ip_addrs_and_ports, node_rank)) {
45 #endif
46  return false;
47  }
48 
50  return true;
51 }
52 void add_address(uint32_t index, const std::pair<ip_addr_t, uint16_t>& address) {
53 #ifdef USE_VERBS_API
54  ::rdma::impl::verbs_add_connection(index, address);
55 #else
56  ::rdma::impl::lf_add_connection(index, address);
57 #endif
58 }
59 
60 bool create_group(uint16_t group_number, std::vector<uint32_t> members,
61  size_t block_size, send_algorithm algorithm,
62  incoming_message_callback_t incoming_upcall,
63  completion_callback_t callback,
64  failure_callback_t failure_callback) {
65  if(shutdown_flag) return false;
66 
67  schedule* send_schedule;
68  uint32_t member_index = index_of(members, node_rank);
69  if(algorithm == BINOMIAL_SEND) {
70  send_schedule = new binomial_schedule(members.size(), member_index);
71  } else if(algorithm == SEQUENTIAL_SEND) {
72  send_schedule = new sequential_schedule(members.size(), member_index);
73  } else if(algorithm == CHAIN_SEND) {
74  send_schedule = new chain_schedule(members.size(), member_index);
75  } else if(algorithm == TREE_SEND) {
76  send_schedule = new tree_schedule(members.size(), member_index);
77  } else {
78  puts("Unsupported group type?!");
79  fflush(stdout);
80  return false;
81  }
82 
83  unique_lock<mutex> lock(groups_lock);
84  auto g = make_shared<polling_group>(group_number, block_size, members,
85  member_index, incoming_upcall, callback,
86  unique_ptr<schedule>(send_schedule));
87  auto p = groups.emplace(group_number, std::move(g));
88  return p.second;
89 }
90 
91 void destroy_group(uint16_t group_number) {
92  if(shutdown_flag) return;
93 
94  unique_lock<mutex> lock(groups_lock);
95  LOG_EVENT(group_number, -1, -1, "destroy_group");
96  groups.erase(group_number);
97 }
98 void shutdown() { shutdown_flag = true; }
99 bool send(uint16_t group_number, shared_ptr<memory_region> mr, size_t offset,
100  size_t length) {
101  if(shutdown_flag) return false;
102 
103  shared_ptr<group> g;
104  {
105  unique_lock<mutex> lock(groups_lock);
106  auto it = groups.find(group_number);
107  if(it == groups.end()) return false;
108  g = it->second;
109  }
110  LOG_EVENT(group_number, -1, -1, "preparing_to_send_message");
111  g->send_message(mr, offset, length);
112  return true;
113 }
114 // void query_addresses(std::map<uint32_t, std::string>& addresses,
115 // uint32_t& node_rank) {
116 // query_peer_addresses(addresses, node_rank);
117 // }
118 
119 barrier_group::barrier_group(vector<uint32_t> members) {
120  member_index = index_of(members, node_rank);
121  group_size = members.size();
122 
123  if(group_size <= 1 || member_index >= members.size())
124  throw rdmc::invalid_args();
125 
126  total_steps = ceil(log2(group_size));
127  for(unsigned int m = 0; m < total_steps; m++) steps[m] = -1;
128 
129  steps_mr = make_unique<memory_region>((char*)&steps[0],
130  total_steps * sizeof(int64_t));
131  number_mr = make_unique<memory_region>((char*)&number, sizeof(number));
132 
133  set<uint32_t> targets;
134  for(unsigned int m = 0; m < total_steps; m++) {
135  auto target = (member_index + (1 << m)) % group_size;
136  auto target2 = (group_size * (1 << m) + member_index - (1 << m)) % group_size;
137  targets.insert(target);
138  targets.insert(target2);
139  }
140 
141 #ifdef USE_VERBS_API
142  map<uint32_t, queue_pair> qps;
143  for(auto target : targets) {
144  qps.emplace(target, queue_pair(members[target]));
145  }
146 
148  members, node_rank, *steps_mr.get());
149  for(unsigned int m = 0; m < total_steps; m++) {
150  auto target = (member_index + (1 << m)) % group_size;
151 
152  remote_memory_regions.push_back(remote_mrs.find(target)->second);
153 
154  auto qp_it = qps.find(target);
155  endpoints.push_back(std::move(qp_it->second));
156  qps.erase(qp_it);
157  }
158 
159  for(auto it = qps.begin(); it != qps.end(); it++) {
160  extra_endpoints.push_back(std::move(it->second));
161  }
162  qps.clear();
163 #else
164  map<uint32_t, endpoint> eps;
165  for(auto target : targets) {
166  // Decide whether the endpoint will act as server in the connection
167  bool is_lf_server = members[member_index] < members[target];
168  eps.emplace(target, endpoint(members[target], is_lf_server));
169  }
170 
172  members, node_rank, *steps_mr.get());
173  for(unsigned int m = 0; m < total_steps; m++) {
174  auto target = (member_index + (1 << m)) % group_size;
175 
176  remote_memory_regions.push_back(remote_mrs.find(target)->second);
177 
178  auto ep_it = eps.find(target);
179  endpoints.push_back(std::move(ep_it->second));
180  eps.erase(ep_it);
181  }
182  for(auto it = eps.begin(); it != eps.end(); it++) {
183  extra_endpoints.push_back(std::move(it->second));
184  }
185  eps.clear();
186 #endif
187 }
188 void barrier_group::barrier_wait() {
189  // See:
190  // http://mvapich.cse.ohio-state.edu/static/media/publications/abstract/kinis-euro03.pdf
191 
192  unique_lock<mutex> l(lock);
193  LOG_EVENT(-1, -1, -1, "start_barrier");
194  number++;
195 
196  for(unsigned int m = 0; m < total_steps; m++) {
197 #ifdef USE_VERBS_API
198  if(!queue_pairs[m].post_write(
199 #else
200  if(!endpoints[m].post_write(
201 #endif
202  *number_mr.get(), 0, 8,
203  form_tag(0, (node_rank + (1 << m)) % group_size),
204  remote_memory_regions[m], m * 8, message_type::ignored(),
205  false, true)) {
206  throw rdmc::connection_broken();
207  }
208 
209  while(steps[m] < number) /* do nothing*/
210  ;
211  }
212  LOG_EVENT(-1, -1, -1, "end_barrier");
213 }
214 } // namespace rdmc
static void initialize_message_types()
Definition: group_send.cpp:40
void add_address(uint32_t index, const std::pair< ip_addr_t, uint16_t > &address)
Definition: rdmc.cpp:52
bool initialize(const std::map< uint32_t, std::pair< ip_addr_t, uint16_t >> &addresses, uint32_t node_rank) __attribute__((warn_unused_result))
Definition: rdmc.cpp:37
Definition: rdmc.hpp:20
std::map< uint32_t, remote_memory_region > verbs_exchange_memory_regions(const std::vector< uint32_t > &members, uint32_t node_rank, const memory_region &mr)
mutex groups_lock
Definition: rdmc.cpp:35
bool verbs_add_connection(uint32_t index, const std::string &address, uint32_t node_rank)
#define LOG_EVENT(group_number, message_number, block_number, event_name)
Definition: util.hpp:64
STL namespace.
atomic< bool > shutdown_flag
Definition: rdmc.cpp:31
std::function< void(std::optional< uint32_t > suspected_victim)> failure_callback_t
Definition: rdmc.hpp:44
uint32_t node_rank
Definition: experiment.cpp:45
A C++ wrapper for the IB Verbs ibv_qp struct and its associated functions.
void destroy_group(uint16_t group_number)
Definition: rdmc.cpp:91
Contains functions and classes for low-level RDMA operations, such as setting up memory regions and q...
Definition: lf_helper.hpp:28
bool verbs_initialize(const std::map< uint32_t, std::string > &node_addresses, uint32_t node_rank)
uint64_t form_tag(uint16_t group_number, uint32_t target)
Definition: message.hpp:18
bool send(uint16_t group_number, shared_ptr< memory_region > mr, size_t offset, size_t length)
Definition: rdmc.cpp:99
send_algorithm
Definition: rdmc.hpp:28
void shutdown()
Definition: rdmc.cpp:98
std::map< uint32_t, remote_memory_region > lf_exchange_memory_regions(const std::vector< uint32_t > &members, uint32_t node_rank, const memory_region &mr)
Definition: lf_helper.cpp:727
bool lf_add_connection(uint32_t new_id, const std::pair< ip_addr_t, uint16_t > &new_ip_addr_and_port)
Adds a node to the group via tcp.
Definition: lf_helper.cpp:569
std::function< receive_destination(size_t size)> incoming_message_callback_t
Definition: rdmc.hpp:41
A C++ wrapper for the libfabric fid_ep struct and its associated functions.
Definition: lf_helper.hpp:157
std::function< void(char *buffer, size_t size)> completion_callback_t
Definition: rdmc.hpp:42
bool create_group(uint16_t group_number, std::vector< uint32_t > members, size_t block_size, send_algorithm algorithm, incoming_message_callback_t incoming_receive, completion_callback_t send_callback, failure_callback_t failure_callback) __attribute__((warn_unused_result))
Creates a new RDMC group.
Definition: rdmc.cpp:60
size_t index_of(T container, U elem)
Definition: util.hpp:13
bool lf_initialize(const std::map< uint32_t, std::pair< ip_addr_t, uint16_t >> &ip_addrs_and_ports, uint32_t node_rank)
Initialize the global context.
Definition: lf_helper.cpp:660
map< uint16_t, shared_ptr< group > > groups
Definition: rdmc.cpp:34