15 template <
typename sstType>
33 std::shared_ptr<sstType>
sst;
56 for(
auto i : row_indices) {
57 for(uint j = num_received_offset; j < num_received_offset +
num_senders; ++j) {
58 sst->num_received_sst[i][j] = -1;
61 sst->slots[i][slots_offset + max_msg_size * j] = 0;
62 (uint64_t&)sst->slots[i][slots_offset + (max_msg_size * (j + 1)) -
sizeof(uint64_t)] = 0;
65 sst->sync_with_members(row_indices);
70 std::vector<uint32_t> row_indices,
72 uint64_t max_msg_size,
73 std::vector<int> is_sender = {},
74 uint32_t num_received_offset = 0,
75 uint32_t slots_offset = 0)
76 :
my_row(sst->get_local_index()),
80 if(is_sender.size() == 0) {
81 return std::vector<int32_t>(row_indices.size(), 1);
93 if(row_indices[i] == my_row) {
99 if(i == my_member_index) {
102 if(this->is_sender[i]) {
108 if(!this->is_sender[my_member_index]) {
109 my_sender_index = -1;
115 assert(my_sender_index >= 0);
116 std::lock_guard<std::mutex> lock(msg_send_mutex);
117 assert(msg_size <= max_msg_size);
119 if(queued_num - finished_multicasts_num < window_size) {
123 (uint64_t&)sst->slots[my_row][slots_offset + (max_msg_size * (slot + 1)) - 2 *
sizeof(uint64_t)] = msg_size;
124 return &sst->slots[
my_row][slots_offset + (max_msg_size * slot)];
126 long long int min_multicast_num = sst->num_received_sst[
my_row][num_received_offset +
my_sender_index];
127 for(
auto i : row_indices) {
128 long long int num_received_sst_copy = sst->num_received_sst[i][num_received_offset +
my_sender_index];
129 min_multicast_num = std::min(min_multicast_num, num_received_sst_copy);
131 if(finished_multicasts_num == min_multicast_num) {
134 finished_multicasts_num = min_multicast_num;
143 ((uint64_t&)sst->slots[my_row][slots_offset + max_msg_size * (slot + 1) -
sizeof(uint64_t)])++;
145 (
char*)std::addressof(sst->slots[0][slots_offset + max_msg_size * slot]) - sst->getBaseAddress(),
146 max_msg_size -
sizeof(uint64_t));
148 (
char*)std::addressof(sst->slots[0][slots_offset + slot * max_msg_size]) - sst->getBaseAddress() + max_msg_size -
sizeof(uint64_t),
155 cout <<
"Printing slots::next_seq" << endl;
156 for(
auto i : row_indices) {
158 cout << (uint64_t&)sst->slots[i][slots_offset + (max_msg_size * (j + 1)) -
sizeof(uint64_t)] <<
" ";
162 cout <<
"Printing num_received_sst" << endl;
163 for(
auto i : row_indices) {
164 for(uint j = num_received_offset; j < num_received_offset +
num_senders; ++j) {
165 cout << sst->num_received_sst[i][j] <<
" ";
const uint32_t window_size
std::thread timeout_thread
const uint32_t slots_offset
const std::vector< uint32_t > row_indices
long long int finished_multicasts_num
const uint32_t num_received_offset
std::shared_ptr< sstType > sst
const uint64_t max_msg_size
volatile char * get_buffer(uint64_t msg_size)
std::mutex msg_send_mutex
const std::vector< int > is_sender
multicast_group(std::shared_ptr< sstType > sst, std::vector< uint32_t > row_indices, uint32_t window_size, uint64_t max_msg_size, std::vector< int > is_sender={}, uint32_t num_received_offset=0, uint32_t slots_offset=0)
const uint32_t num_members