Derecho  0.9
Distributed systems toolkit for RDMA
tcp.cpp
Go to the documentation of this file.
1 #include <derecho/tcp/tcp.hpp>
2 
3 #include <algorithm>
4 #include <arpa/inet.h>
5 #include <cerrno>
6 #include <cstring>
7 #include <iostream>
8 #include <netdb.h>
9 #include <sys/ioctl.h>
10 #include <sys/socket.h>
11 #include <sys/epoll.h>
12 #include <fcntl.h>
13 #include <unistd.h>
14 #include <cassert>
15 #include <linux/tcp.h>
16 
17 namespace tcp {
18 
19 using namespace std;
20 
21 socket::socket(string server_ip, uint16_t server_port) {
22  sock = ::socket(AF_INET, SOCK_STREAM, 0);
23  if(sock < 0) throw connection_failure();
24 
25  hostent *server;
26  server = gethostbyname(server_ip.c_str());
27  if(server == nullptr) throw connection_failure();
28 
29  char server_ip_cstr[INET_ADDRSTRLEN];
30  inet_ntop(AF_INET, server->h_addr, server_ip_cstr, sizeof(server_ip_cstr));
31  remote_ip = string(server_ip_cstr);
32 
33  sockaddr_in serv_addr;
34  memset(&serv_addr, 0, sizeof(serv_addr));
35  serv_addr.sin_family = AF_INET;
36  serv_addr.sin_port = htons(server_port);
37  bcopy((char *)server->h_addr, (char *)&serv_addr.sin_addr.s_addr,
38  server->h_length);
39 
40  int optval = 1;
41  if(setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, &optval, sizeof(optval))) {
42  fprintf(stderr, "WARNING: Failed to disable Nagle's algorithm, continue without TCP_NODELAY...\n");
43  }
44 
45  while(connect(sock, (sockaddr *)&serv_addr, sizeof(serv_addr)) < 0)
46  /* do nothing*/;
47 }
48 socket::socket(socket &&s) : sock(s.sock), remote_ip(s.remote_ip) {
49  s.sock = -1;
50  s.remote_ip = std::string();
51 }
52 
54  sock = s.sock;
55  s.sock = -1;
56  remote_ip = std::move(s.remote_ip);
57  return *this;
58 }
59 
61  if(sock >= 0) close(sock);
62 }
63 
64 bool socket::is_empty() const { return sock == -1; }
65 
66 int socket::try_connect(string servername, int port, int timeout_ms) {
67  sock = ::socket(AF_INET, SOCK_STREAM, 0);
68  if(sock < 0) throw connection_failure();
69 
70  hostent *server;
71  server = gethostbyname(servername.c_str());
72  if(server == nullptr) throw connection_failure();
73 
74  char server_ip_cstr[INET_ADDRSTRLEN];
75  inet_ntop(AF_INET, server->h_addr, server_ip_cstr, sizeof(server_ip_cstr));
76  remote_ip = string(server_ip_cstr);
77 
78  sockaddr_in serv_addr;
79  memset(&serv_addr, 0, sizeof(serv_addr));
80  serv_addr.sin_family = AF_INET;
81  serv_addr.sin_port = htons(port);
82  bcopy((char *)server->h_addr, (char *)&serv_addr.sin_addr.s_addr,
83  server->h_length);
84 
85  //Temporarily set socket to nonblocking in order to connect with a timeout
86  int sock_flags = fcntl(sock, F_GETFL, 0);
87  sock_flags |= O_NONBLOCK;
88  fcntl(sock, F_SETFL, sock_flags);
89  int epoll_fd = epoll_create1(0);
90  struct epoll_event connect_event;
91  memset(&connect_event, 0, sizeof(connect_event));
92  connect_event.data.fd = sock;
93  connect_event.events = EPOLLOUT;
94  epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sock, &connect_event);
95 
96  bool connected = false;
97  int return_code = connect(sock, (sockaddr *)&serv_addr, sizeof(serv_addr));
98  if(return_code == -1 && errno != EINPROGRESS) {
99  return_code = errno;
100  } else if(return_code == 0) {
101  //Successfully connected right away
102  connected = true;
103  } else {
104  epoll_event array_of_one_event[1];
105  //Wait for the EPOLLOUT event indicating the socket is connected
106  int numfds = epoll_wait(epoll_fd, array_of_one_event, 1, timeout_ms);
107  if(numfds == 0) {
108  //Timed out
109  return_code = ETIMEDOUT;
110  } else if(numfds < 0) {
111  return_code = errno;
112  } else {
113  assert(numfds == 1);
114  assert(array_of_one_event[0].data.fd == sock);
115  if(array_of_one_event[0].events & EPOLLERR) {
116  return_code = 0;
117  socklen_t len = sizeof return_code;
118  getsockopt(sock, SOL_SOCKET, SO_ERROR, &return_code, &len);
119  } else {
120  assert(array_of_one_event[0].events & EPOLLOUT);
121  connected = true;
122  }
123  }
124  }
125  if(connected) {
126  //Connection was successful, set the socket back to blocking
127  epoll_ctl(epoll_fd, EPOLL_CTL_DEL, sock, NULL);
128  sock_flags = fcntl(sock, F_GETFL, 0);
129  sock_flags &= ~O_NONBLOCK;
130  return_code = fcntl(sock, F_SETFL, sock_flags); //This should return 0
131  } else {
132  close(sock);
133  }
134  close(epoll_fd);
135  return return_code;
136 }
137 
138 bool socket::read(char *buffer, size_t size) {
139  if(sock < 0) {
140  fprintf(stderr, "WARNING: Attempted to read from closed socket\n");
141  return false;
142  }
143 
144  size_t total_bytes = 0;
145  while(total_bytes < size) {
146  ssize_t new_bytes = ::read(sock, buffer + total_bytes, size - total_bytes);
147  if(new_bytes > 0) {
148  total_bytes += new_bytes;
149  } else if(new_bytes == 0 || (new_bytes == -1 && errno != EINTR)) {
150  return false;
151  }
152  }
153  return true;
154 }
155 
156 ssize_t socket::read_partial(char *buffer, size_t max_size) {
157  if(sock < 0) {
158  fprintf(stderr, "WARNING: Attempted to read from closed socket\n");
159  return -1;
160  }
161 
162  ssize_t bytes_read = ::read(sock, buffer, max_size);
163  return bytes_read;
164 }
166  int count;
167  ioctl(sock, FIONREAD, &count);
168  return count > 0;
169 }
170 
171 bool socket::write(const char *buffer, size_t size) {
172  if(sock < 0) {
173  fprintf(stderr, "WARNING: Attempted to write to closed socket\n");
174  return false;
175  }
176 
177  size_t total_bytes = 0;
178  while(total_bytes < size) {
179  //MSG_NOSIGNAL makes send return a proper error code if the socket has been
180  //closed by the remote, rather than crashing the entire program with a SIGPIPE
181  ssize_t bytes_written = send(sock, buffer + total_bytes, size - total_bytes, MSG_NOSIGNAL);
182  if(bytes_written >= 0) {
183  total_bytes += bytes_written;
184  } else if(bytes_written == -1 && errno != EINTR) {
185  std::cerr << "socket::write: Error in the socket! Errno " << errno << std::endl;
186  return false;
187  }
188  }
189  return true;
190 }
191 
192 std::string socket::get_self_ip() {
193  struct sockaddr_storage my_addr_info;
194  socklen_t len = sizeof my_addr_info;
195 
196  getsockname(sock, (struct sockaddr *)&my_addr_info, &len);
197  char my_ip_cstr[INET6_ADDRSTRLEN + 1];
198  if(my_addr_info.ss_family == AF_INET) {
199  struct sockaddr_in *s = (struct sockaddr_in *)&my_addr_info;
200  inet_ntop(AF_INET, &s->sin_addr, my_ip_cstr,
201  sizeof my_ip_cstr);
202  } else {
203  struct sockaddr_in6 *s = (struct sockaddr_in6 *)&my_addr_info;
204  inet_ntop(AF_INET6, &s->sin6_addr, my_ip_cstr,
205  sizeof my_ip_cstr);
206  }
207  return std::string(my_ip_cstr);
208 }
209 
211  sockaddr_in serv_addr;
212 
213  int listenfd = ::socket(AF_INET, SOCK_STREAM, 0);
214  if(listenfd < 0) throw connection_failure();
215 
216  int reuse_addr = 1;
217  setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, (char *)&reuse_addr,
218  sizeof(reuse_addr));
219 
220  memset(&serv_addr, 0, sizeof(serv_addr));
221  serv_addr.sin_family = AF_INET;
222  serv_addr.sin_addr.s_addr = INADDR_ANY;
223  serv_addr.sin_port = htons(port);
224  if(bind(listenfd, (sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) {
225  fprintf(stderr,
226  "ERROR on binding to socket in ConnectionListener: %s\n",
227  strerror(errno));
228  std::cout << "Port is: " << port << std::endl;
229  }
230  listen(listenfd, 5);
231 
232  fd = unique_ptr<int, std::function<void(int *)>>(
233  new int(listenfd), [](int *fd) { close(*fd); delete fd; });
234 }
235 
237  char client_ip_cstr[INET6_ADDRSTRLEN + 1];
238  struct sockaddr_storage client_addr_info;
239  socklen_t len = sizeof client_addr_info;
240 
241  int sock = ::accept(*fd, (struct sockaddr *)&client_addr_info, &len);
242  if(sock < 0) throw connection_failure();
243 
244  if(client_addr_info.ss_family == AF_INET) {
245  // Client has an IPv4 address
246  struct sockaddr_in *s = (struct sockaddr_in *)&client_addr_info;
247  inet_ntop(AF_INET, &s->sin_addr, client_ip_cstr, sizeof client_ip_cstr);
248  } else { // AF_INET6
249  // Client has an IPv6 address
250  struct sockaddr_in6 *s = (struct sockaddr_in6 *)&client_addr_info;
251  inet_ntop(AF_INET6, &s->sin6_addr, client_ip_cstr,
252  sizeof client_ip_cstr);
253  }
254 
255  return socket(sock, std::string(client_ip_cstr));
256 }
257 
258 std::optional<socket> connection_listener::try_accept(int timeout_ms) {
259  //Temporarily set server socket to nonblocking
260  int socket_flags = fcntl(*fd, F_GETFL, 0);
261  socket_flags |= O_NONBLOCK;
262  if(fcntl(*fd, F_SETFL, socket_flags) < 0) {
263  throw connection_failure();
264  }
265 
266  int epoll_fd = epoll_create1(0);
267  struct epoll_event accept_event;
268  memset(&accept_event, 0, sizeof(accept_event));
269  accept_event.data.fd = *fd;
270  accept_event.events = EPOLLIN;
271  epoll_ctl(epoll_fd, EPOLL_CTL_ADD, *fd, &accept_event);
272 
273  struct sockaddr_storage client_addr_info;
274  socklen_t len = sizeof client_addr_info;
275  int client_sock;
276  bool success = false;
277  epoll_event array_of_one_event[1];
278  //Wait for the EPOLLIN event indicating the socket is connected
279  int numfds = epoll_wait(epoll_fd, array_of_one_event, 1, timeout_ms);
280  if(numfds == 1) {
281  assert(array_of_one_event[0].data.fd == *fd);
282  client_sock = ::accept(*fd, (struct sockaddr *)&client_addr_info, &len);
283  if(client_sock >= 0) {
284  success = true;
285  }
286  }
287  close(epoll_fd);
288 
289  //Set server socket back to blocking
290  socket_flags = fcntl(*fd, F_GETFL, 0);
291  socket_flags &= (~O_NONBLOCK);
292  if(fcntl(*fd, F_SETFL, socket_flags) < 0) {
293  throw connection_failure();
294  }
295 
296  if(success) {
297  char client_ip_cstr[INET6_ADDRSTRLEN + 1];
298  if(client_addr_info.ss_family == AF_INET) {
299  // Client has an IPv4 address
300  struct sockaddr_in *s = (struct sockaddr_in *)&client_addr_info;
301  inet_ntop(AF_INET, &s->sin_addr, client_ip_cstr, sizeof client_ip_cstr);
302  } else { // AF_INET6
303  // Client has an IPv6 address
304  struct sockaddr_in6 *s = (struct sockaddr_in6 *)&client_addr_info;
305  inet_ntop(AF_INET6, &s->sin6_addr, client_ip_cstr,
306  sizeof client_ip_cstr);
307  }
308  return socket(client_sock, std::string(client_ip_cstr));
309  } else {
310  return std::nullopt;
311  }
312 
313 }
314 
315 } // namespace tcp
std::vector< event > events
Definition: util.cpp:118
socket accept()
Blocks until a remote client makes a connection to this connection listener, then returns a new socke...
Definition: tcp.cpp:236
std::string get_self_ip()
Definition: tcp.cpp:192
ssize_t read_partial(char *buffer, size_t max_size)
Attempts to read up to max_size bytes from socket and write them to the given buffer, but returns immediately even if fewer than max_size bytes are available to be read.
Definition: tcp.cpp:156
int sock
Definition: tcp.hpp:14
socket()
Constructs an empty, unconnected socket.
Definition: tcp.hpp:28
STL namespace.
int try_connect(std::string servername, int port, int timeout_ms=20000)
Attempts to connect the socket to the specified address and port, but returns promptly with an error ...
Definition: tcp.cpp:66
bool send(uint16_t group_number, std::shared_ptr< rdma::memory_region > mr, size_t offset, size_t length) __attribute__((warn_unused_result))
std::string remote_ip
Definition: tcp.hpp:21
bool probe()
Returns true if there is any data available to be read from the socket.
Definition: tcp.cpp:165
bool read(char *buffer, size_t size)
Reads size bytes from the socket and writes them to the given buffer.
Definition: tcp.cpp:138
std::optional< socket > try_accept(int timeout_ms)
Waits the specified number of milliseconds for a remote client to connect to this connection listener...
Definition: tcp.cpp:258
~socket()
Definition: tcp.cpp:60
bool write(const char *buffer, size_t size)
Writes size bytes from the given buffer to the socket.
Definition: tcp.cpp:171
bool is_empty() const
Definition: tcp.cpp:64
socket & operator=(socket &s)=delete
connection_listener(uint16_t port)
Constructs a connection listener ("server socket") that listens on the given port of this machine&#39;s T...
Definition: tcp.cpp:210