Derecho  0.9
Distributed systems toolkit for RDMA
group_send.hpp
Go to the documentation of this file.
1 #ifndef GROUP_SEND_HPP
2 #define GROUP_SEND_HPP
3 
4 #include <derecho/rdmc/rdmc.hpp>
5 #include "detail/schedule.hpp"
6 
7 #ifdef USE_VERBS_API
8  #include "detail/verbs_helper.hpp"
9 #else
10  #include "detail/lf_helper.hpp"
11 #endif
12 
13 #include <optional>
14 #include <map>
15 #include <memory>
16 #include <mutex>
17 #include <set>
18 #include <vector>
19 
22 using std::map;
23 using std::unique_ptr;
24 using std::vector;
25 using std::optional;
26 
27 class group {
28 protected:
29  const vector<uint32_t> members; // first element is the sender
30  const uint16_t group_number;
31  const size_t block_size;
32  const uint32_t num_members;
33  const uint32_t member_index; // our index in the members list
34 
35  const unique_ptr<schedule> transfer_schedule;
36 
37  std::mutex monitor;
38 
39  std::shared_ptr<rdma::memory_region> mr;
40  size_t mr_offset;
41  size_t message_size;
42  size_t num_blocks;
43 
46 
47  group(uint16_t group_number, size_t block_size,
48  vector<uint32_t> members, uint32_t member_index,
50  completion_callback_t callback,
51  unique_ptr<schedule> transfer_schedule);
52 
53 public:
54  virtual ~group();
55 
56  virtual void receive_block(uint32_t send_imm, size_t size) = 0;
57  virtual void receive_ready_for_block(uint32_t step, uint32_t sender) = 0;
58  virtual void complete_block_send() = 0;
59  virtual void send_message(std::shared_ptr<rdma::memory_region> message_mr,
60  size_t offset, size_t length)
61  = 0;
62 };
63 
64 class polling_group : public group {
65 private:
66  // Set of receivers who are ready to receive the next block from us.
67  std::set<uint32_t> receivers_ready;
68 
69  unique_ptr<rdma::memory_region> first_block_mr;
70  optional<size_t> first_block_number;
71  unique_ptr<char[]> first_block_buffer;
72 
74  size_t message_number = 0;
75 
77  bool sending = false; // Whether a block send is in progress
78  size_t send_step = 0; // Number of blocks sent/stalls so far
79 
80  // Total number of blocks received and the number of chunks
81  // received for ecah block, respectively.
82  size_t num_received_blocks = 0;
83  size_t receive_step = 0;
84  vector<bool> received_blocks;
85 
86  // maps from member_indices to the queue pairs
87 #ifdef USE_VERBS_API
88  map<size_t, rdma::queue_pair> queue_pairs;
89  map<size_t, rdma::queue_pair> rfb_queue_pairs;
90 #else
91  map<size_t, rdma::endpoint> endpoints;
92  map<size_t, rdma::endpoint> rfb_endpoints;
93 #endif
94  static struct {
95  rdma::message_type data_block;
96  rdma::message_type ready_for_block;
97  } message_types;
98 
99 public:
100  static void initialize_message_types();
101 
102  polling_group(uint16_t group_number, size_t block_size,
103  vector<uint32_t> members, uint32_t member_index,
105  completion_callback_t callback,
106  unique_ptr<schedule> transfer_schedule);
107 
108  virtual void receive_block(uint32_t send_imm, size_t size);
109  virtual void receive_ready_for_block(uint32_t step, uint32_t sender);
110  virtual void complete_block_send();
111 
112  virtual void send_message(std::shared_ptr<rdma::memory_region> message_mr,
113  size_t offset, size_t length);
114 
115 private:
116  void post_recv(schedule::block_transfer transfer);
117  void send_next_block();
118  void complete_message();
119  void prepare_for_next_message();
120  void send_ready_for_block(uint32_t neighbor);
121  void connect(uint32_t neighbor);
122 };
123 
124 #endif /* GROUP_SEND_HPP */
const uint16_t group_number
Definition: group_send.hpp:30
size_t num_blocks
Definition: group_send.hpp:42
const size_t block_size
Definition: group_send.hpp:31
const unique_ptr< schedule > transfer_schedule
Definition: group_send.hpp:35
virtual void receive_block(uint32_t send_imm, size_t size)=0
const uint32_t member_index
Definition: group_send.hpp:33
unique_ptr< rdma::memory_region > first_block_mr
Definition: group_send.hpp:69
std::mutex monitor
Definition: group_send.hpp:37
virtual void send_message(std::shared_ptr< rdma::memory_region > message_mr, size_t offset, size_t length)=0
size_t outgoing_block
Definition: group_send.hpp:76
map< size_t, rdma::endpoint > rfb_endpoints
Definition: group_send.hpp:92
unique_ptr< char[]> first_block_buffer
Definition: group_send.hpp:71
size_t message_size
Definition: group_send.hpp:41
size_t incoming_block
Definition: group_send.hpp:73
virtual ~group()
Definition: group_send.cpp:38
map< size_t, rdma::endpoint > endpoints
Definition: group_send.hpp:91
vector< bool > received_blocks
Definition: group_send.hpp:84
const uint32_t num_members
Definition: group_send.hpp:32
virtual void receive_ready_for_block(uint32_t step, uint32_t sender)=0
virtual void complete_block_send()=0
group(uint16_t group_number, size_t block_size, vector< uint32_t > members, uint32_t member_index, incoming_message_callback_t upcall, completion_callback_t callback, unique_ptr< schedule > transfer_schedule)
Definition: group_send.cpp:25
std::function< receive_destination(size_t size)> incoming_message_callback_t
Definition: rdmc.hpp:41
std::set< uint32_t > receivers_ready
Definition: group_send.hpp:67
std::function< void(char *buffer, size_t size)> completion_callback_t
Definition: rdmc.hpp:42
std::shared_ptr< rdma::memory_region > mr
Definition: group_send.hpp:39
optional< size_t > first_block_number
Definition: group_send.hpp:70
size_t mr_offset
Definition: group_send.hpp:40
incoming_message_callback_t incoming_message_upcall
Definition: group_send.hpp:45
completion_callback_t completion_callback
Definition: group_send.hpp:44
const vector< uint32_t > members
Definition: group_send.hpp:29