Derecho  0.9
Distributed systems toolkit for RDMA
multicast.hpp
Go to the documentation of this file.
1 #include <cassert>
2 #include <chrono>
3 #include <cstdlib>
4 #include <ctime>
5 #include <functional>
6 #include <iostream>
7 #include <map>
8 #include <memory>
9 #include <thread>
10 #include <vector>
11 
12 #include "sst.hpp"
13 
14 namespace sst {
15 template <typename sstType>
17  // number of messages for which get_buffer has been called
18  long long int queued_num = -1;
19  // number of messages for which RDMA write is complete
20  uint64_t num_sent = 0;
21  // the number of messages acknowledged by all the nodes
22  long long int finished_multicasts_num = -1;
23  // row of the node in the sst
24  const uint32_t my_row;
25  // rank of the node in the members list
26  uint32_t my_member_index;
27  // rank of node in the senders list
28  int32_t my_sender_index;
29  // only one send at a time
30  std::mutex msg_send_mutex;
31 
32  // SST
33  std::shared_ptr<sstType> sst;
34 
35  // rows indices
36  const std::vector<uint32_t> row_indices;
37  const std::vector<int> is_sender;
38 
39  // start indexes for sst fields it uses
40  // need to know the range it can operate on
41  const uint32_t num_received_offset;
42  const uint32_t slots_offset;
43 
44  // number of members
45  const uint32_t num_members;
46  // number of senders
47  uint32_t num_senders;
48  // window size
49  const uint32_t window_size;
50  // maximum size that the SST can send
51  const uint64_t max_msg_size;
52 
53  std::thread timeout_thread;
54 
55  void initialize() {
56  for(auto i : row_indices) {
57  for(uint j = num_received_offset; j < num_received_offset + num_senders; ++j) {
58  sst->num_received_sst[i][j] = -1;
59  }
60  for(uint j = 0; j < window_size; ++j) {
61  sst->slots[i][slots_offset + max_msg_size * j] = 0;
62  (uint64_t&)sst->slots[i][slots_offset + (max_msg_size * (j + 1)) - sizeof(uint64_t)] = 0;
63  }
64  }
65  sst->sync_with_members(row_indices);
66  }
67 
68 public:
69  multicast_group(std::shared_ptr<sstType> sst,
70  std::vector<uint32_t> row_indices,
71  uint32_t window_size,
72  uint64_t max_msg_size,
73  std::vector<int> is_sender = {},
74  uint32_t num_received_offset = 0,
75  uint32_t slots_offset = 0)
76  : my_row(sst->get_local_index()),
77  sst(sst),
78  row_indices(row_indices),
79  is_sender([is_sender, row_indices]() {
80  if(is_sender.size() == 0) {
81  return std::vector<int32_t>(row_indices.size(), 1);
82  } else {
83  return is_sender;
84  }
85  }()),
86  num_received_offset(num_received_offset),
87  slots_offset(slots_offset),
88  num_members(row_indices.size()),
89  window_size(window_size),
90  max_msg_size(max_msg_size + 2 * sizeof(uint64_t)) {
91  // find my_member_index
92  for(uint i = 0; i < num_members; ++i) {
93  if(row_indices[i] == my_row) {
94  my_member_index = i;
95  }
96  }
97  int j = 0;
98  for(uint i = 0; i < num_members; ++i) {
99  if(i == my_member_index) {
100  my_sender_index = j;
101  }
102  if(this->is_sender[i]) {
103  j++;
104  }
105  }
106  num_senders = j;
107 
108  if(!this->is_sender[my_member_index]) {
109  my_sender_index = -1;
110  }
111  initialize();
112  }
113 
114  volatile char* get_buffer(uint64_t msg_size) {
115  assert(my_sender_index >= 0);
116  std::lock_guard<std::mutex> lock(msg_send_mutex);
117  assert(msg_size <= max_msg_size);
118  while(true) {
119  if(queued_num - finished_multicasts_num < window_size) {
120  queued_num++;
121  uint32_t slot = queued_num % window_size;
122  // set size appropriately
123  (uint64_t&)sst->slots[my_row][slots_offset + (max_msg_size * (slot + 1)) - 2 * sizeof(uint64_t)] = msg_size;
124  return &sst->slots[my_row][slots_offset + (max_msg_size * slot)];
125  } else {
126  long long int min_multicast_num = sst->num_received_sst[my_row][num_received_offset + my_sender_index];
127  for(auto i : row_indices) {
128  long long int num_received_sst_copy = sst->num_received_sst[i][num_received_offset + my_sender_index];
129  min_multicast_num = std::min(min_multicast_num, num_received_sst_copy);
130  }
131  if(finished_multicasts_num == min_multicast_num) {
132  return nullptr;
133  } else {
134  finished_multicasts_num = min_multicast_num;
135  }
136  }
137  }
138  }
139 
140  void send() {
141  uint32_t slot = num_sent % window_size;
142  num_sent++;
143  ((uint64_t&)sst->slots[my_row][slots_offset + max_msg_size * (slot + 1) - sizeof(uint64_t)])++;
144  sst->put(
145  (char*)std::addressof(sst->slots[0][slots_offset + max_msg_size * slot]) - sst->getBaseAddress(),
146  max_msg_size - sizeof(uint64_t));
147  sst->put(
148  (char*)std::addressof(sst->slots[0][slots_offset + slot * max_msg_size]) - sst->getBaseAddress() + max_msg_size - sizeof(uint64_t),
149  sizeof(uint64_t));
150  }
151 
152  void debug_print() {
153  using std::cout;
154  using std::endl;
155  cout << "Printing slots::next_seq" << endl;
156  for(auto i : row_indices) {
157  for(uint j = 0; j < window_size; ++j) {
158  cout << (uint64_t&)sst->slots[i][slots_offset + (max_msg_size * (j + 1)) - sizeof(uint64_t)] << " ";
159  }
160  cout << endl;
161  }
162  cout << "Printing num_received_sst" << endl;
163  for(auto i : row_indices) {
164  for(uint j = num_received_offset; j < num_received_offset + num_senders; ++j) {
165  cout << sst->num_received_sst[i][j] << " ";
166  }
167  cout << endl;
168  }
169  cout << endl;
170  }
171 };
172 } // namespace sst
const uint32_t my_row
Definition: multicast.hpp:24
const uint32_t window_size
Definition: multicast.hpp:49
std::thread timeout_thread
Definition: multicast.hpp:53
long long int queued_num
Definition: multicast.hpp:18
const uint32_t slots_offset
Definition: multicast.hpp:42
const std::vector< uint32_t > row_indices
Definition: multicast.hpp:36
long long int finished_multicasts_num
Definition: multicast.hpp:22
const uint32_t num_received_offset
Definition: multicast.hpp:41
std::shared_ptr< sstType > sst
Definition: multicast.hpp:33
const uint64_t max_msg_size
Definition: multicast.hpp:51
volatile char * get_buffer(uint64_t msg_size)
Definition: multicast.hpp:114
std::mutex msg_send_mutex
Definition: multicast.hpp:30
const std::vector< int > is_sender
Definition: multicast.hpp:37
multicast_group(std::shared_ptr< sstType > sst, std::vector< uint32_t > row_indices, uint32_t window_size, uint64_t max_msg_size, std::vector< int > is_sender={}, uint32_t num_received_offset=0, uint32_t slots_offset=0)
Definition: multicast.hpp:69
const uint32_t num_members
Definition: multicast.hpp:45
uint32_t my_member_index
Definition: multicast.hpp:26