10 #include <sys/socket.h> 11 #include <sys/epoll.h> 15 #include <linux/tcp.h> 22 sock =
::socket(AF_INET, SOCK_STREAM, 0);
26 server = gethostbyname(server_ip.c_str());
29 char server_ip_cstr[INET_ADDRSTRLEN];
30 inet_ntop(AF_INET, server->h_addr, server_ip_cstr,
sizeof(server_ip_cstr));
31 remote_ip = string(server_ip_cstr);
33 sockaddr_in serv_addr;
34 memset(&serv_addr, 0,
sizeof(serv_addr));
35 serv_addr.sin_family = AF_INET;
36 serv_addr.sin_port = htons(server_port);
37 bcopy((
char *)server->h_addr, (
char *)&serv_addr.sin_addr.s_addr,
41 if(setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, &optval,
sizeof(optval))) {
42 fprintf(stderr,
"WARNING: Failed to disable Nagle's algorithm, continue without TCP_NODELAY...\n");
45 while(connect(sock, (sockaddr *)&serv_addr,
sizeof(serv_addr)) < 0)
50 s.remote_ip = std::string();
71 server = gethostbyname(servername.c_str());
74 char server_ip_cstr[INET_ADDRSTRLEN];
75 inet_ntop(AF_INET, server->h_addr, server_ip_cstr,
sizeof(server_ip_cstr));
78 sockaddr_in serv_addr;
79 memset(&serv_addr, 0,
sizeof(serv_addr));
80 serv_addr.sin_family = AF_INET;
81 serv_addr.sin_port = htons(port);
82 bcopy((
char *)server->h_addr, (
char *)&serv_addr.sin_addr.s_addr,
86 int sock_flags = fcntl(
sock, F_GETFL, 0);
87 sock_flags |= O_NONBLOCK;
88 fcntl(
sock, F_SETFL, sock_flags);
89 int epoll_fd = epoll_create1(0);
90 struct epoll_event connect_event;
91 memset(&connect_event, 0,
sizeof(connect_event));
92 connect_event.data.fd =
sock;
93 connect_event.events = EPOLLOUT;
94 epoll_ctl(epoll_fd, EPOLL_CTL_ADD,
sock, &connect_event);
96 bool connected =
false;
97 int return_code = connect(
sock, (sockaddr *)&serv_addr,
sizeof(serv_addr));
98 if(return_code == -1 && errno != EINPROGRESS) {
100 }
else if(return_code == 0) {
104 epoll_event array_of_one_event[1];
106 int numfds = epoll_wait(epoll_fd, array_of_one_event, 1, timeout_ms);
109 return_code = ETIMEDOUT;
110 }
else if(numfds < 0) {
114 assert(array_of_one_event[0].data.fd ==
sock);
115 if(array_of_one_event[0].
events & EPOLLERR) {
117 socklen_t len =
sizeof return_code;
118 getsockopt(
sock, SOL_SOCKET, SO_ERROR, &return_code, &len);
120 assert(array_of_one_event[0].
events & EPOLLOUT);
127 epoll_ctl(epoll_fd, EPOLL_CTL_DEL,
sock, NULL);
128 sock_flags = fcntl(
sock, F_GETFL, 0);
129 sock_flags &= ~O_NONBLOCK;
130 return_code = fcntl(
sock, F_SETFL, sock_flags);
140 fprintf(stderr,
"WARNING: Attempted to read from closed socket\n");
144 size_t total_bytes = 0;
145 while(total_bytes < size) {
146 ssize_t new_bytes =
::read(
sock, buffer + total_bytes, size - total_bytes);
148 total_bytes += new_bytes;
149 }
else if(new_bytes == 0 || (new_bytes == -1 && errno != EINTR)) {
158 fprintf(stderr,
"WARNING: Attempted to read from closed socket\n");
162 ssize_t bytes_read =
::read(
sock, buffer, max_size);
167 ioctl(
sock, FIONREAD, &count);
173 fprintf(stderr,
"WARNING: Attempted to write to closed socket\n");
177 size_t total_bytes = 0;
178 while(total_bytes < size) {
181 ssize_t bytes_written =
send(
sock, buffer + total_bytes, size - total_bytes, MSG_NOSIGNAL);
182 if(bytes_written >= 0) {
183 total_bytes += bytes_written;
184 }
else if(bytes_written == -1 && errno != EINTR) {
185 std::cerr <<
"socket::write: Error in the socket! Errno " << errno << std::endl;
193 struct sockaddr_storage my_addr_info;
194 socklen_t len =
sizeof my_addr_info;
196 getsockname(
sock, (
struct sockaddr *)&my_addr_info, &len);
197 char my_ip_cstr[INET6_ADDRSTRLEN + 1];
198 if(my_addr_info.ss_family == AF_INET) {
199 struct sockaddr_in *s = (
struct sockaddr_in *)&my_addr_info;
200 inet_ntop(AF_INET, &s->sin_addr, my_ip_cstr,
203 struct sockaddr_in6 *s = (
struct sockaddr_in6 *)&my_addr_info;
204 inet_ntop(AF_INET6, &s->sin6_addr, my_ip_cstr,
207 return std::string(my_ip_cstr);
211 sockaddr_in serv_addr;
213 int listenfd =
::socket(AF_INET, SOCK_STREAM, 0);
217 setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, (
char *)&reuse_addr,
220 memset(&serv_addr, 0,
sizeof(serv_addr));
221 serv_addr.sin_family = AF_INET;
222 serv_addr.sin_addr.s_addr = INADDR_ANY;
223 serv_addr.sin_port = htons(port);
224 if(bind(listenfd, (sockaddr *)&serv_addr,
sizeof(serv_addr)) < 0) {
226 "ERROR on binding to socket in ConnectionListener: %s\n",
228 std::cout <<
"Port is: " << port << std::endl;
232 fd = unique_ptr<int, std::function<void(int *)>>(
233 new int(listenfd), [](
int *fd) { close(*fd);
delete fd; });
237 char client_ip_cstr[INET6_ADDRSTRLEN + 1];
238 struct sockaddr_storage client_addr_info;
239 socklen_t len =
sizeof client_addr_info;
241 int sock = ::accept(*fd, (
struct sockaddr *)&client_addr_info, &len);
244 if(client_addr_info.ss_family == AF_INET) {
246 struct sockaddr_in *s = (
struct sockaddr_in *)&client_addr_info;
247 inet_ntop(AF_INET, &s->sin_addr, client_ip_cstr,
sizeof client_ip_cstr);
250 struct sockaddr_in6 *s = (
struct sockaddr_in6 *)&client_addr_info;
251 inet_ntop(AF_INET6, &s->sin6_addr, client_ip_cstr,
252 sizeof client_ip_cstr);
255 return socket(sock, std::string(client_ip_cstr));
260 int socket_flags = fcntl(*fd, F_GETFL, 0);
261 socket_flags |= O_NONBLOCK;
262 if(fcntl(*fd, F_SETFL, socket_flags) < 0) {
266 int epoll_fd = epoll_create1(0);
267 struct epoll_event accept_event;
268 memset(&accept_event, 0,
sizeof(accept_event));
269 accept_event.data.fd = *fd;
270 accept_event.events = EPOLLIN;
271 epoll_ctl(epoll_fd, EPOLL_CTL_ADD, *fd, &accept_event);
273 struct sockaddr_storage client_addr_info;
274 socklen_t len =
sizeof client_addr_info;
276 bool success =
false;
277 epoll_event array_of_one_event[1];
279 int numfds = epoll_wait(epoll_fd, array_of_one_event, 1, timeout_ms);
281 assert(array_of_one_event[0].data.fd == *fd);
282 client_sock = ::accept(*fd, (
struct sockaddr *)&client_addr_info, &len);
283 if(client_sock >= 0) {
290 socket_flags = fcntl(*fd, F_GETFL, 0);
291 socket_flags &= (~O_NONBLOCK);
292 if(fcntl(*fd, F_SETFL, socket_flags) < 0) {
297 char client_ip_cstr[INET6_ADDRSTRLEN + 1];
298 if(client_addr_info.ss_family == AF_INET) {
300 struct sockaddr_in *s = (
struct sockaddr_in *)&client_addr_info;
301 inet_ntop(AF_INET, &s->sin_addr, client_ip_cstr,
sizeof client_ip_cstr);
304 struct sockaddr_in6 *s = (
struct sockaddr_in6 *)&client_addr_info;
305 inet_ntop(AF_INET6, &s->sin6_addr, client_ip_cstr,
306 sizeof client_ip_cstr);
308 return socket(client_sock, std::string(client_ip_cstr));
std::vector< event > events
socket accept()
Blocks until a remote client makes a connection to this connection listener, then returns a new socke...
std::string get_self_ip()
ssize_t read_partial(char *buffer, size_t max_size)
Attempts to read up to max_size bytes from socket and write them to the given buffer, but returns immediately even if fewer than max_size bytes are available to be read.
socket()
Constructs an empty, unconnected socket.
int try_connect(std::string servername, int port, int timeout_ms=20000)
Attempts to connect the socket to the specified address and port, but returns promptly with an error ...
bool send(uint16_t group_number, std::shared_ptr< rdma::memory_region > mr, size_t offset, size_t length) __attribute__((warn_unused_result))
bool probe()
Returns true if there is any data available to be read from the socket.
bool read(char *buffer, size_t size)
Reads size bytes from the socket and writes them to the given buffer.
std::optional< socket > try_accept(int timeout_ms)
Waits the specified number of milliseconds for a remote client to connect to this connection listener...
bool write(const char *buffer, size_t size)
Writes size bytes from the given buffer to the socket.
socket & operator=(socket &s)=delete
connection_listener(uint16_t port)
Constructs a connection listener ("server socket") that listens on the given port of this machine's T...