14 : members(params.members),
15 num_members(members.size()),
16 my_node_id(params.my_node_id),
17 incoming_p2p_buffers(num_members),
18 outgoing_p2p_buffers(num_members),
20 prev_mode(num_members) {
39 for(uint8_t i = 0; i < num_request_types; ++i) {
45 for(
auto type : p2p_request_types) {
56 const_cast<char*>(outgoing_p2p_buffers[i].
get()),
60 const_cast<char*>(outgoing_p2p_buffers[i].
get()),
78 old_connections.shutdown_failures_thread();
96 for(uint8_t i = 0; i < num_request_types; ++i) {
97 offsets[i] = old_connections.offsets[i];
100 for(
auto type : p2p_request_types) {
106 if(old_connections.node_id_to_rank.find(
members[i]) == old_connections.node_id_to_rank.end()) {
111 const_cast<char*>(outgoing_p2p_buffers[i].
get()),
115 auto old_rank = old_connections.node_id_to_rank[
members[i]];
118 for(
auto type : p2p_request_types) {
123 res_vec[i] = std::move(old_connections.res_vec[old_rank]);
162 for(
auto type : p2p_request_types) {
181 auto buf =
probe(rank);
183 return std::pair<uint32_t, char*>(
members[rank], buf);
228 pthread_setname_np(pthread_self(),
"p2p_timeout");
231 const auto tid = std::this_thread::get_id();
235 std::this_thread::sleep_for(std::chrono::milliseconds(heartbeat_ms));
254 sctxt[rank].
ce_idx = ce_idx;
256 res_vec[rank]->post_remote_write_with_completion(&sctxt[rank],
p2p_buf_size -
sizeof(
bool),
sizeof(
bool));
260 const int MAX_POLL_CQ_TIMEOUT = 2000;
261 unsigned long start_time_msec;
262 unsigned long cur_time_msec;
263 struct timeval cur_time;
266 gettimeofday(&cur_time, NULL);
267 start_time_msec = (cur_time.tv_sec * 1000) + (cur_time.tv_usec / 1000);
269 uint32_t num_completions = 0;
270 while(num_completions < num_members - 1) {
271 std::optional<std::pair<int32_t, int32_t>> ce;
278 gettimeofday(&cur_time, NULL);
279 cur_time_msec = (cur_time.tv_sec * 1000) + (cur_time.tv_usec / 1000);
280 if((cur_time_msec - start_time_msec) >= MAX_POLL_CQ_TIMEOUT) {
294 std::cout <<
"Members: " << std::endl;
296 std::cout << m <<
" ";
298 std::cout << std::endl;
300 for(
const auto& type : p2p_request_types) {
301 std::cout <<
"P2PConnections: Request type " << type << std::endl;
302 for(uint32_t node = 0; node <
num_members; ++node) {
303 std::cout <<
"Node " << node << std::endl;
304 std::cout <<
"incoming seq_nums:";
306 uint64_t offset =
max_msg_sizes[type] * (type * window_sizes[type] + i + 1) -
sizeof(uint64_t);
309 std::cout << std::endl
310 <<
"outgoing seq_nums:";
311 for(uint32_t i = 0; i < window_sizes[type]; ++i) {
312 uint64_t offset =
max_msg_sizes[type] * (type * window_sizes[type] + i + 1) -
sizeof(uint64_t);
315 std::cout << std::endl;
uint32_t get_index(const std::thread::id id)
uint64_t offsets[num_request_types]
uint64_t getOffsetSeqNum(REQUEST_TYPE type, uint64_t seq_num)
std::optional< std::pair< uint32_t, char * > > probe_all()
void shutdown_failures_thread()
char * get_sendbuffer_ptr(uint32_t rank, REQUEST_TYPE type)
std::map< REQUEST_TYPE, std::vector< std::atomic< uint64_t > > > outgoing_seq_nums_map
char * probe(uint32_t rank)
const std::uint32_t num_members
const uint32_t getConfUInt32(const std::string &key)
std::map< REQUEST_TYPE, std::vector< std::atomic< uint64_t > > > incoming_seq_nums_map
uint64_t get_max_p2p_reply_size()
std::thread timeout_thread
void set_waiting(const std::thread::id id)
uint32_t window_sizes[num_request_types]
std::vector< REQUEST_TYPE > prev_mode
const std::vector< uint32_t > members
void reset_waiting(const std::thread::id id)
void check_failures_loop()
std::vector< std::unique_ptr< volatile char[]> > incoming_p2p_buffers
uint32_t max_msg_sizes[num_request_types]
#define CONF_DERECHO_HEARTBEAT_MS
std::vector< std::unique_ptr< resources > > res_vec
std::atomic< bool > thread_shutdown
std::map< uint32_t, uint32_t > node_id_to_rank
const uint32_t my_node_id
uint32_t get_node_rank(uint32_t node_id)
P2PConnections(const P2PParams params)
uint64_t getOffsetBuf(REQUEST_TYPE type, uint64_t seq_num)
std::optional< std::pair< int32_t, int32_t > > get_completion_entry(const std::thread::id id)
void update_incoming_seq_num()
std::vector< std::unique_ptr< volatile char[]> > outgoing_p2p_buffers
uint64_t max_p2p_reply_size
uint64_t max_rpc_reply_size
uint64_t max_p2p_request_size