Derecho  0.9
Distributed systems toolkit for RDMA
p2p_connections.cpp
Go to the documentation of this file.
1 #include <map>
2 
3 #include <cassert>
4 #include <cstring>
5 #include <sstream>
6 #include <sys/time.h>
7 
8 #include <derecho/conf/conf.hpp>
11 
12 namespace sst {
14  : members(params.members),
15  num_members(members.size()),
16  my_node_id(params.my_node_id),
17  incoming_p2p_buffers(num_members),
18  outgoing_p2p_buffers(num_members),
19  res_vec(num_members),
20  prev_mode(num_members) {
21  //Figure out my SST index
22  my_index = (uint32_t)-1;
23  for(uint32_t i = 0; i < num_members; ++i) {
24  if(members[i] == my_node_id) {
25  my_index = i;
26  }
27  node_id_to_rank[members[i]] = i;
28  }
29 
30  // HARD-CODED. Adding another request type will break this
37 
38  p2p_buf_size = 0;
39  for(uint8_t i = 0; i < num_request_types; ++i) {
40  offsets[i] = p2p_buf_size;
42  }
43  p2p_buf_size += sizeof(bool);
44 
45  for(auto type : p2p_request_types) {
46  incoming_seq_nums_map.try_emplace(type,std::vector<std::atomic<uint64_t>>(num_members));
47  outgoing_seq_nums_map.try_emplace(type,std::vector<std::atomic<uint64_t>>(num_members));
48  }
49 
50  for(uint i = 0; i < num_members; ++i) {
51  incoming_p2p_buffers[i] = std::make_unique<volatile char[]>(p2p_buf_size);
52  outgoing_p2p_buffers[i] = std::make_unique<volatile char[]>(p2p_buf_size);
53  if(i != my_index) {
54 #ifdef USE_VERBS_API
55  res_vec[i] = std::make_unique<resources>(i, const_cast<char*>(incoming_p2p_buffers[i].get()),
56  const_cast<char*>(outgoing_p2p_buffers[i].get()),
58 #else
59  res_vec[i] = std::make_unique<resources>(i, const_cast<char*>(incoming_p2p_buffers[i].get()),
60  const_cast<char*>(outgoing_p2p_buffers[i].get()),
62 #endif
63  }
64  }
65 
67 }
68 
69 P2PConnections::P2PConnections(P2PConnections&& old_connections, const std::vector<uint32_t> new_members)
70  : members(new_members),
71  num_members(members.size()),
72  my_node_id(old_connections.my_node_id),
76  p2p_buf_size(old_connections.p2p_buf_size),
78  old_connections.shutdown_failures_thread();
79  //Figure out my SST index
80  my_index = (uint32_t)-1;
81  for(uint32_t i = 0; i < num_members; ++i) {
82  if(members[i] == my_node_id) {
83  my_index = i;
84  }
85  node_id_to_rank[members[i]] = i;
86  }
87 
88  // HARD-CODED. Adding another request type will break this
89  window_sizes[P2P_REPLY] = old_connections.window_sizes[P2P_REPLY];
90  window_sizes[P2P_REQUEST] = old_connections.window_sizes[P2P_REQUEST];
91  window_sizes[RPC_REPLY] = old_connections.window_sizes[RPC_REPLY];
92  max_msg_sizes[P2P_REPLY] = old_connections.max_msg_sizes[P2P_REPLY];
93  max_msg_sizes[P2P_REQUEST] = old_connections.max_msg_sizes[P2P_REQUEST];
94  max_msg_sizes[RPC_REPLY] = old_connections.max_msg_sizes[RPC_REPLY];
95 
96  for(uint8_t i = 0; i < num_request_types; ++i) {
97  offsets[i] = old_connections.offsets[i];
98  }
99 
100  for(auto type : p2p_request_types) {
101  incoming_seq_nums_map.try_emplace(type,std::vector<std::atomic<uint64_t>>(num_members));
102  outgoing_seq_nums_map.try_emplace(type,std::vector<std::atomic<uint64_t>>(num_members));
103  }
104 
105  for(uint i = 0; i < num_members; ++i) {
106  if(old_connections.node_id_to_rank.find(members[i]) == old_connections.node_id_to_rank.end()) {
107  incoming_p2p_buffers[i] = std::make_unique<volatile char[]>(p2p_buf_size);
108  outgoing_p2p_buffers[i] = std::make_unique<volatile char[]>(p2p_buf_size);
109  if(i != my_index) {
110  res_vec[i] = std::make_unique<resources>(members[i], const_cast<char*>(incoming_p2p_buffers[i].get()),
111  const_cast<char*>(outgoing_p2p_buffers[i].get()),
113  }
114  } else {
115  auto old_rank = old_connections.node_id_to_rank[members[i]];
116  incoming_p2p_buffers[i] = std::move(old_connections.incoming_p2p_buffers[old_rank]);
117  outgoing_p2p_buffers[i] = std::move(old_connections.outgoing_p2p_buffers[old_rank]);
118  for(auto type : p2p_request_types) {
119  incoming_seq_nums_map[type][i].store(old_connections.incoming_seq_nums_map[type][old_rank]);
120  outgoing_seq_nums_map[type][i].store(old_connections.outgoing_seq_nums_map[type][old_rank]);
121  }
122  if(i != my_index) {
123  res_vec[i] = std::move(old_connections.res_vec[old_rank]);
124  }
125  }
126  }
127 
129 }
130 
133 }
134 
136  thread_shutdown = true;
137  if(timeout_thread.joinable()) {
138  timeout_thread.join();
139  }
140 }
141 
142 uint32_t P2PConnections::get_node_rank(uint32_t node_id) {
143  return node_id_to_rank.at(node_id);
144 }
145 
147  return max_msg_sizes[P2P_REPLY] - sizeof(uint64_t);
148 }
149 
150 uint64_t P2PConnections::getOffsetSeqNum(REQUEST_TYPE type, uint64_t seq_num) {
151  return offsets[type] + max_msg_sizes[type] * ((seq_num % window_sizes[type]) + 1) - sizeof(uint64_t);
152  // return max_msg_size * (type * window_size + (seq_num % window_size) + 1) - sizeof(uint64_t);
153 }
154 
155 uint64_t P2PConnections::getOffsetBuf(REQUEST_TYPE type, uint64_t seq_num) {
156  return offsets[type] + max_msg_sizes[type] * (seq_num % window_sizes[type]);
157  // return max_msg_size * (type * window_size + (seq_num % window_size));
158 }
159 
160 // check if there's a new request from some node
161 char* P2PConnections::probe(uint32_t rank) {
162  for(auto type : p2p_request_types) {
163  if((uint64_t&)incoming_p2p_buffers[rank][getOffsetSeqNum(type, incoming_seq_nums_map[type][rank])]
164  == incoming_seq_nums_map[type][rank] + 1) {
165  last_type = type;
166  last_rank = rank;
167  return const_cast<char*>(incoming_p2p_buffers[rank].get())
168  + getOffsetBuf(type, incoming_seq_nums_map[type][rank]);
169  }
170  }
171  return nullptr;
172 }
173 
176 }
177 
178 // check if there's a new request from any node
179 std::optional<std::pair<uint32_t, char*>> P2PConnections::probe_all() {
180  for(uint rank = 0; rank < num_members; ++rank) {
181  auto buf = probe(rank);
182  if(buf && buf[0]) {
183  return std::pair<uint32_t, char*>(members[rank], buf);
184  }
185  else if(buf) {
186  // this means that we have a null reply
187  // we don't need to process it, but we still want to increment the seq num
189  }
190  }
191  return {};
192 }
193 
195  prev_mode[rank] = type;
196  if(type != REQUEST_TYPE::P2P_REQUEST
197  || static_cast<int32_t>(incoming_seq_nums_map[REQUEST_TYPE::P2P_REPLY][rank])
198  > static_cast<int32_t>(outgoing_seq_nums_map[REQUEST_TYPE::P2P_REQUEST][rank] - window_sizes[P2P_REQUEST])) {
199  (uint64_t&)outgoing_p2p_buffers[rank][getOffsetSeqNum(type, outgoing_seq_nums_map[type][rank])]
200  = outgoing_seq_nums_map[type][rank] + 1;
201  return const_cast<char*>(outgoing_p2p_buffers[rank].get())
202  + getOffsetBuf(type, outgoing_seq_nums_map[type][rank]);
203  }
204  return nullptr;
205 }
206 
207 void P2PConnections::send(uint32_t rank) {
208  auto type = prev_mode[rank];
209  if(rank == my_index) {
210  // there's no reason why memcpy shouldn't also copy guard and data separately
211  std::memcpy(const_cast<char*>(incoming_p2p_buffers[rank].get()) + getOffsetBuf(type, outgoing_seq_nums_map[type][rank]),
212  const_cast<char*>(outgoing_p2p_buffers[rank].get()) + getOffsetBuf(type, outgoing_seq_nums_map[type][rank]),
213  max_msg_sizes[type] - sizeof(uint64_t));
214  std::memcpy(const_cast<char*>(incoming_p2p_buffers[rank].get()) + getOffsetSeqNum(type, outgoing_seq_nums_map[type][rank]),
215  const_cast<char*>(outgoing_p2p_buffers[rank].get()) + getOffsetSeqNum(type, outgoing_seq_nums_map[type][rank]),
216  sizeof(uint64_t));
217  } else {
218  res_vec[rank]->post_remote_write(getOffsetBuf(type, outgoing_seq_nums_map[type][rank]),
219  max_msg_sizes[type] - sizeof(uint64_t));
220  res_vec[rank]->post_remote_write(getOffsetSeqNum(type, outgoing_seq_nums_map[type][rank]),
221  sizeof(uint64_t));
222  num_rdma_writes++;
223  }
224  outgoing_seq_nums_map[type][rank]++;
225 }
226 
228  pthread_setname_np(pthread_self(), "p2p_timeout");
229 
230  uint32_t heartbeat_ms = derecho::getConfUInt32(CONF_DERECHO_HEARTBEAT_MS);
231  const auto tid = std::this_thread::get_id();
232  // get id first
233  uint32_t ce_idx = util::polling_data.get_index(tid);
234  while(!thread_shutdown) {
235  std::this_thread::sleep_for(std::chrono::milliseconds(heartbeat_ms));
236  if(num_rdma_writes < 1000) {
237  continue;
238  }
239  num_rdma_writes = 0;
240 
242 #ifdef USE_VERBS_API
243  struct verbs_sender_ctxt sctxt[num_members];
244 #else
245  struct lf_sender_ctxt sctxt[num_members];
246 #endif
247 
248  for(uint rank = 0; rank < num_members; ++rank) {
249  if(rank == my_index) {
250  continue;
251  }
252 
253  sctxt[rank].remote_id = rank;
254  sctxt[rank].ce_idx = ce_idx;
255 
256  res_vec[rank]->post_remote_write_with_completion(&sctxt[rank], p2p_buf_size - sizeof(bool), sizeof(bool));
257  }
258 
260  const int MAX_POLL_CQ_TIMEOUT = 2000;
261  unsigned long start_time_msec;
262  unsigned long cur_time_msec;
263  struct timeval cur_time;
264 
265  // wait for completion for a while before giving up of doing it ..
266  gettimeofday(&cur_time, NULL);
267  start_time_msec = (cur_time.tv_sec * 1000) + (cur_time.tv_usec / 1000);
268 
269  uint32_t num_completions = 0;
270  while(num_completions < num_members - 1) {
271  std::optional<std::pair<int32_t, int32_t>> ce;
272  while(true) {
273  // check if polling result is available
275  if(ce) {
276  break;
277  }
278  gettimeofday(&cur_time, NULL);
279  cur_time_msec = (cur_time.tv_sec * 1000) + (cur_time.tv_usec / 1000);
280  if((cur_time_msec - start_time_msec) >= MAX_POLL_CQ_TIMEOUT) {
281  break;
282  }
283  }
284  if(!ce) {
285  break;
286  }
287  num_completions++;
288  }
290  }
291 }
292 
294  std::cout << "Members: " << std::endl;
295  for(auto m : members) {
296  std::cout << m << " ";
297  }
298  std::cout << std::endl;
299 
300  for(const auto& type : p2p_request_types) {
301  std::cout << "P2PConnections: Request type " << type << std::endl;
302  for(uint32_t node = 0; node < num_members; ++node) {
303  std::cout << "Node " << node << std::endl;
304  std::cout << "incoming seq_nums:";
305  for(uint32_t i = 0; i < window_sizes[type]; ++i) {
306  uint64_t offset = max_msg_sizes[type] * (type * window_sizes[type] + i + 1) - sizeof(uint64_t);
307  std::cout << " " << (uint64_t&)incoming_p2p_buffers[node][offset];
308  }
309  std::cout << std::endl
310  << "outgoing seq_nums:";
311  for(uint32_t i = 0; i < window_sizes[type]; ++i) {
312  uint64_t offset = max_msg_sizes[type] * (type * window_sizes[type] + i + 1) - sizeof(uint64_t);
313  std::cout << " " << (uint64_t&)outgoing_p2p_buffers[node][offset];
314  }
315  std::cout << std::endl;
316  }
317  }
318 }
319 } // namespace sst
uint32_t remote_id
Definition: lf.hpp:24
uint32_t get_index(const std::thread::id id)
Definition: poll_utils.cpp:39
uint64_t offsets[num_request_types]
uint32_t ce_idx
Definition: lf.hpp:23
uint64_t getOffsetSeqNum(REQUEST_TYPE type, uint64_t seq_num)
std::optional< std::pair< uint32_t, char * > > probe_all()
char * get_sendbuffer_ptr(uint32_t rank, REQUEST_TYPE type)
std::map< REQUEST_TYPE, std::vector< std::atomic< uint64_t > > > outgoing_seq_nums_map
char * probe(uint32_t rank)
const std::uint32_t num_members
const uint32_t getConfUInt32(const std::string &key)
Definition: conf.cpp:118
std::map< REQUEST_TYPE, std::vector< std::atomic< uint64_t > > > incoming_seq_nums_map
uint64_t get_max_p2p_reply_size()
std::thread timeout_thread
PollingData polling_data
Definition: poll_utils.cpp:17
void set_waiting(const std::thread::id id)
Definition: poll_utils.cpp:49
uint32_t p2p_window_size
uint32_t window_sizes[num_request_types]
std::vector< REQUEST_TYPE > prev_mode
const std::vector< uint32_t > members
void send(uint32_t rank)
void reset_waiting(const std::thread::id id)
Definition: poll_utils.cpp:56
std::vector< std::unique_ptr< volatile char[]> > incoming_p2p_buffers
uint32_t max_msg_sizes[num_request_types]
#define CONF_DERECHO_HEARTBEAT_MS
Definition: conf.hpp:30
std::vector< std::unique_ptr< resources > > res_vec
std::atomic< bool > thread_shutdown
std::map< uint32_t, uint32_t > node_id_to_rank
const uint32_t my_node_id
uint32_t get_node_rank(uint32_t node_id)
P2PConnections(const P2PParams params)
uint64_t getOffsetBuf(REQUEST_TYPE type, uint64_t seq_num)
std::optional< std::pair< int32_t, int32_t > > get_completion_entry(const std::thread::id id)
Definition: poll_utils.cpp:28
std::vector< std::unique_ptr< volatile char[]> > outgoing_p2p_buffers
uint64_t max_p2p_reply_size
uint32_t rpc_window_size
uint64_t max_rpc_reply_size
uint64_t max_p2p_request_size