Derecho  0.9
Distributed systems toolkit for RDMA
connection_manager.cpp
Go to the documentation of this file.
2 
3 #include <cassert>
4 #include <iostream>
5 #include <set>
6 
7 namespace tcp {
9  const std::pair<ip_addr_t, uint16_t>& other_ip_and_port) {
10  if(other_id < my_id) {
11  try {
12  sockets[other_id] = socket(other_ip_and_port.first, other_ip_and_port.second);
13  } catch(exception) {
14  std::cerr << "WARNING: failed to connect to node " << other_id << " at "
15  << other_ip_and_port.first << ":" << other_ip_and_port.second << std::endl;
16  return false;
17  }
18 
19  node_id_t remote_id = 0;
20  if(!sockets[other_id].exchange(my_id, remote_id)) {
21  std::cerr << "WARNING: failed to exchange rank with node "
22  << other_id << " at " << other_ip_and_port.first << ":" << other_ip_and_port.second
23  << std::endl;
24  sockets.erase(other_id);
25  return false;
26  } else if(remote_id != other_id) {
27  std::cerr << "WARNING: node at " << other_ip_and_port.first << ":" << other_ip_and_port.second
28  << " replied with wrong id (expected " << other_id
29  << " but got " << remote_id << ")" << std::endl;
30 
31  sockets.erase(other_id);
32  return false;
33  }
34  return true;
35  } else if(other_id > my_id) {
36  while(true) {
37  try {
38  socket s = conn_listener->accept();
39 
40  node_id_t remote_id = 0;
41  if(!s.exchange(my_id, remote_id)) {
42  std::cerr << "WARNING: failed to exchange id with node" << other_id
43  << std::endl;
44  return false;
45  } else {
46  sockets[remote_id] = std::move(s);
47  //If the connection we got wasn't the intended node, keep
48  //looping and try again; there must be multiple nodes connecting
49  //simultaneously
50  if(remote_id == other_id)
51  return true;
52  }
53  } catch(exception&) {
54  std::cerr << "Got error while attempting to listen on port"
55  << std::endl;
56  return false;
57  }
58  }
59  }
60 
61  return false;
62 }
63 
64 void tcp_connections::establish_node_connections(const std::map<node_id_t, std::pair<ip_addr_t, uint16_t>>& ip_addrs_and_ports) {
65  conn_listener = std::make_unique<connection_listener>(ip_addrs_and_ports.at(my_id).second);
66 
67  for(auto it = ip_addrs_and_ports.begin(); it != ip_addrs_and_ports.end(); it++) {
68  //Check that there isn't already a connection to this ID,
69  //since an earlier add_connection could have connected to it by "mistake"
70  if(it->first != my_id && sockets.count(it->first) == 0) {
71  if(!add_connection(it->first, it->second)) {
72  std::cerr << "WARNING: failed to connect to node " << it->first
73  << " at " << it->second.first
74  << ":" << it->second.second << std::endl;
75  }
76  }
77  }
78 }
79 
81  const std::map<node_id_t, std::pair<ip_addr_t, uint16_t>> ip_addrs_and_ports)
82  : my_id(my_id) {
83  establish_node_connections(ip_addrs_and_ports);
84 }
85 
87  std::lock_guard<std::mutex> lock(sockets_mutex);
88  sockets.clear();
89  conn_listener.reset();
90 }
91 
92 bool tcp_connections::write(node_id_t node_id, char const* buffer,
93  size_t size) {
94  std::lock_guard<std::mutex> lock(sockets_mutex);
95  const auto it = sockets.find(node_id);
96  assert(it != sockets.end());
97  return it->second.write(buffer, size);
98 }
99 
100 bool tcp_connections::write_all(char const* buffer, size_t size) {
101  std::lock_guard<std::mutex> lock(sockets_mutex);
102  bool success = true;
103  for(auto& p : sockets) {
104  if(p.first == my_id) {
105  continue;
106  }
107  success = success && p.second.write(buffer, size);
108  }
109  return success;
110 }
111 
112 bool tcp_connections::read(node_id_t node_id, char* buffer,
113  size_t size) {
114  std::lock_guard<std::mutex> lock(sockets_mutex);
115  const auto it = sockets.find(node_id);
116  assert(it != sockets.end());
117  return it->second.read(buffer, size);
118 }
119 
120 bool tcp_connections::add_node(node_id_t new_id, const std::pair<ip_addr_t, uint16_t>& new_ip_addr_and_port) {
121  std::lock_guard<std::mutex> lock(sockets_mutex);
122  assert(new_id != my_id);
123  //If there's already a connection to this ID, just return "success"
124  if(sockets.count(new_id) > 0)
125  return true;
126  return add_connection(new_id, new_ip_addr_and_port);
127 }
128 
130  std::lock_guard<std::mutex> lock(sockets_mutex);
131  return (sockets.erase(remove_id) > 0);
132 }
133 
135  std::lock_guard<std::mutex> lock(sockets_mutex);
136  return (sockets.find(node_id) != sockets.end());
137 }
138 
140  std::lock_guard<std::mutex> lock(sockets_mutex);
141  for(auto& p : sockets) {
142  bool new_data_available = p.second.probe();
143  if(new_data_available == true) {
144  return p.first;
145  }
146  }
147  return -1;
148 }
149 
150 void tcp_connections::filter_to(const std::vector<node_id_t>& live_nodes_list) {
151  std::vector<node_id_t> sorted_nodes_list(live_nodes_list.size());
152  //There's nothing "partial" about this. Make a sorted copy of live_nodes_list.
153  std::partial_sort_copy(live_nodes_list.begin(), live_nodes_list.end(),
154  sorted_nodes_list.begin(), sorted_nodes_list.end());
155  std::lock_guard<std::mutex> lock(sockets_mutex);
156  for(auto socket_map_iter = sockets.begin(); socket_map_iter != sockets.end();) {
157  if(!std::binary_search(sorted_nodes_list.begin(),
158  sorted_nodes_list.end(),
159  socket_map_iter->first)) {
160  //If the node ID is not in the list, delete the socket
161  socket_map_iter = sockets.erase(socket_map_iter);
162  } else {
163  socket_map_iter++;
164  }
165  }
166 }
167 
170 }
171 } // namespace tcp
bool contains_node(node_id_t node_id)
Checks whether this connection manager currently has a socket connected to the node with the specifie...
bool add_node(node_id_t new_id, const std::pair< ip_addr_t, uint16_t > &new_ip_addr_and_port)
Adds a TCP connection to a new node.
A little helper class that wraps together a reference and a lock on a related mutex.
bool write_all(char const *buffer, size_t size)
Writes size bytes from a buffer to all the other nodes currently connected, in ascending order of nod...
bool exchange(node_id_t node_id, T local, T &remote)
std::unique_ptr< connection_listener > conn_listener
bool read(node_id_t node_id, char *buffer, size_t size)
Receives size bytes from the node with ID node_id, over the TCP socket connected to that node...
tcp_connections(node_id_t my_id, const std::map< node_id_t, std::pair< ip_addr_t, uint16_t >> ip_addrs_and_ports)
Creates a TCP connection manager for a set of connections to all of the initial set of addresses...
void establish_node_connections(const std::map< node_id_t, std::pair< ip_addr_t, uint16_t >> &ip_addrs_and_ports)
derecho::LockedReference< std::unique_lock< std::mutex >, socket > get_socket(node_id_t node_id)
Gets a locked reference to the TCP socket connected to a particular node.
bool write(node_id_t node_id, char const *buffer, size_t size)
Writes size bytes from a buffer to the node with ID node_id, using the TCP socket connected to that n...
bool delete_node(node_id_t remove_id)
Removes a node from the managed set of TCP connections, closing the socket connected to it...
void filter_to(const std::vector< node_id_t > &live_nodes_list)
Compares the set of TCP connections to a list of known live nodes and removes any connections to node...
uint32_t node_id_t
Type alias for Node IDs in a Derecho group.
std::map< node_id_t, socket > sockets
int32_t probe_all()
Checks all of the TCP connections managed by this object for new incoming data, and returns the ID of...
bool add_connection(const node_id_t other_id, const std::pair< ip_addr_t, uint16_t > &other_ip_and_port)
bool exchange(T local, T &remote)
Definition: tcp.hpp:120