9 const std::pair<ip_addr_t, uint16_t>& other_ip_and_port) {
10 if(other_id <
my_id) {
12 sockets[other_id] =
socket(other_ip_and_port.first, other_ip_and_port.second);
14 std::cerr <<
"WARNING: failed to connect to node " << other_id <<
" at " 15 << other_ip_and_port.first <<
":" << other_ip_and_port.second << std::endl;
21 std::cerr <<
"WARNING: failed to exchange rank with node " 22 << other_id <<
" at " << other_ip_and_port.first <<
":" << other_ip_and_port.second
26 }
else if(remote_id != other_id) {
27 std::cerr <<
"WARNING: node at " << other_ip_and_port.first <<
":" << other_ip_and_port.second
28 <<
" replied with wrong id (expected " << other_id
29 <<
" but got " << remote_id <<
")" << std::endl;
35 }
else if(other_id >
my_id) {
42 std::cerr <<
"WARNING: failed to exchange id with node" << other_id
46 sockets[remote_id] = std::move(s);
50 if(remote_id == other_id)
54 std::cerr <<
"Got error while attempting to listen on port" 65 conn_listener = std::make_unique<connection_listener>(ip_addrs_and_ports.at(
my_id).second);
67 for(
auto it = ip_addrs_and_ports.begin(); it != ip_addrs_and_ports.end(); it++) {
70 if(it->first !=
my_id &&
sockets.count(it->first) == 0) {
72 std::cerr <<
"WARNING: failed to connect to node " << it->first
73 <<
" at " << it->second.first
74 <<
":" << it->second.second << std::endl;
81 const std::map<
node_id_t, std::pair<ip_addr_t, uint16_t>> ip_addrs_and_ports)
95 const auto it =
sockets.find(node_id);
97 return it->second.write(buffer, size);
104 if(p.first ==
my_id) {
107 success = success && p.second.write(buffer, size);
115 const auto it =
sockets.find(node_id);
117 return it->second.read(buffer, size);
122 assert(new_id !=
my_id);
131 return (
sockets.erase(remove_id) > 0);
142 bool new_data_available = p.second.probe();
143 if(new_data_available ==
true) {
151 std::vector<node_id_t> sorted_nodes_list(live_nodes_list.size());
153 std::partial_sort_copy(live_nodes_list.begin(), live_nodes_list.end(),
154 sorted_nodes_list.begin(), sorted_nodes_list.end());
156 for(
auto socket_map_iter =
sockets.begin(); socket_map_iter !=
sockets.end();) {
157 if(!std::binary_search(sorted_nodes_list.begin(),
158 sorted_nodes_list.end(),
159 socket_map_iter->first)) {
161 socket_map_iter =
sockets.erase(socket_map_iter);
bool contains_node(node_id_t node_id)
Checks whether this connection manager currently has a socket connected to the node with the specifie...
bool add_node(node_id_t new_id, const std::pair< ip_addr_t, uint16_t > &new_ip_addr_and_port)
Adds a TCP connection to a new node.
A little helper class that wraps together a reference and a lock on a related mutex.
bool write_all(char const *buffer, size_t size)
Writes size bytes from a buffer to all the other nodes currently connected, in ascending order of nod...
bool exchange(node_id_t node_id, T local, T &remote)
std::unique_ptr< connection_listener > conn_listener
bool read(node_id_t node_id, char *buffer, size_t size)
Receives size bytes from the node with ID node_id, over the TCP socket connected to that node...
tcp_connections(node_id_t my_id, const std::map< node_id_t, std::pair< ip_addr_t, uint16_t >> ip_addrs_and_ports)
Creates a TCP connection manager for a set of connections to all of the initial set of addresses...
void establish_node_connections(const std::map< node_id_t, std::pair< ip_addr_t, uint16_t >> &ip_addrs_and_ports)
derecho::LockedReference< std::unique_lock< std::mutex >, socket > get_socket(node_id_t node_id)
Gets a locked reference to the TCP socket connected to a particular node.
bool write(node_id_t node_id, char const *buffer, size_t size)
Writes size bytes from a buffer to the node with ID node_id, using the TCP socket connected to that n...
bool delete_node(node_id_t remove_id)
Removes a node from the managed set of TCP connections, closing the socket connected to it...
void filter_to(const std::vector< node_id_t > &live_nodes_list)
Compares the set of TCP connections to a list of known live nodes and removes any connections to node...
uint32_t node_id_t
Type alias for Node IDs in a Derecho group.
std::map< node_id_t, socket > sockets
int32_t probe_all()
Checks all of the TCP connections managed by this object for new incoming data, and returns the ID of...
bool add_connection(const node_id_t other_id, const std::pair< ip_addr_t, uint16_t > &other_ip_and_port)
bool exchange(T local, T &remote)