Derecho  0.9
Distributed systems toolkit for RDMA
p2p_connections.hpp
Go to the documentation of this file.
1 #pragma once
2 
3 #include <atomic>
4 #include <iostream>
5 #include <map>
6 #include <memory>
7 #include <optional>
8 #include <thread>
9 #include <vector>
10 
11 #ifdef USE_VERBS_API
13 #else
15 #endif
16 
17 namespace sst {
18 struct P2PParams {
19  uint32_t my_node_id;
20  std::vector<uint32_t> members;
21  uint32_t p2p_window_size;
22  uint32_t rpc_window_size;
26 };
27 
29  P2P_REPLY = 0,
32 };
33 static const REQUEST_TYPE p2p_request_types[] = {P2P_REPLY,
35  RPC_REPLY};
36 static const uint8_t num_request_types = 3;
37 
39  const std::vector<uint32_t> members;
40  const std::uint32_t num_members;
41  const uint32_t my_node_id;
42  uint32_t my_index;
43  std::map<uint32_t, uint32_t> node_id_to_rank;
44  // one element per member for P2P
45  std::vector<std::unique_ptr<volatile char[]>> incoming_p2p_buffers;
46  std::vector<std::unique_ptr<volatile char[]>> outgoing_p2p_buffers;
47  std::vector<std::unique_ptr<resources>> res_vec;
48  uint64_t p2p_buf_size;
49  std::map<REQUEST_TYPE, std::vector<std::atomic<uint64_t>>> incoming_seq_nums_map, outgoing_seq_nums_map;
50  std::vector<REQUEST_TYPE> prev_mode;
51  std::atomic<bool> thread_shutdown{false};
52  std::thread timeout_thread;
53  uint64_t getOffsetSeqNum(REQUEST_TYPE type, uint64_t seq_num);
54  uint64_t getOffsetBuf(REQUEST_TYPE type, uint64_t seq_num);
55  uint32_t window_sizes[num_request_types];
56  uint32_t max_msg_sizes[num_request_types];
57  uint64_t offsets[num_request_types];
58  char* probe(uint32_t rank);
60  uint32_t last_rank;
61  uint32_t num_rdma_writes = 0;
62  void check_failures_loop();
63 
64 public:
65  P2PConnections(const P2PParams params);
66  P2PConnections(P2PConnections&& old_connections, const std::vector<uint32_t> new_members);
67  ~P2PConnections();
68  void shutdown_failures_thread();
69  uint32_t get_node_rank(uint32_t node_id);
70  uint64_t get_max_p2p_reply_size();
71  std::optional<std::pair<uint32_t, char*>> probe_all();
72  void update_incoming_seq_num();
73  char* get_sendbuffer_ptr(uint32_t rank, REQUEST_TYPE type);
74  void send(uint32_t rank);
75  void debug_print();
76 };
77 } // namespace sst
std::map< REQUEST_TYPE, std::vector< std::atomic< uint64_t > > > outgoing_seq_nums_map
const std::uint32_t num_members
std::thread timeout_thread
bool send(uint16_t group_number, std::shared_ptr< rdma::memory_region > mr, size_t offset, size_t length) __attribute__((warn_unused_result))
uint32_t p2p_window_size
std::vector< REQUEST_TYPE > prev_mode
const std::vector< uint32_t > members
std::vector< std::unique_ptr< volatile char[]> > incoming_p2p_buffers
std::vector< std::unique_ptr< resources > > res_vec
std::map< uint32_t, uint32_t > node_id_to_rank
const uint32_t my_node_id
std::vector< std::unique_ptr< volatile char[]> > outgoing_p2p_buffers
uint64_t max_p2p_reply_size
uint32_t rpc_window_size
uint64_t max_rpc_reply_size
std::vector< uint32_t > members
uint64_t max_p2p_request_size