summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Antonuk <alan.antonuk@gmail.com>2014-02-17 19:01:00 -0800
committerAlan Antonuk <alan.antonuk@gmail.com>2014-04-14 21:22:00 -0700
commita78aa8ab906c97dc3f4123048519a2953fa96fb2 (patch)
tree267b8a1c27ea6a229f5763abf2510bba71065da0
parentfe844e41ffad5691607982cbfe4054aacdcb81e0 (diff)
downloadrabbitmq-c-github-ask-a78aa8ab906c97dc3f4123048519a2953fa96fb2.tar.gz
Use poll(2) for timeouts on socket
Use poll(2) instead of select(2) to do timeout operations on sockets. This helps with the situation where the fd is larger than FD_MAXSIZE. Fixes #168
-rw-r--r--librabbitmq/amqp_socket.c47
-rw-r--r--librabbitmq/amqp_timer.h5
2 files changed, 35 insertions, 17 deletions
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index 79a7696..6e54f7e 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -42,6 +42,7 @@
#include "amqp_timer.h"
#include <assert.h>
+#include <limits.h>
#include <stdarg.h>
#include <stdint.h>
#include <stdio.h>
@@ -64,9 +65,14 @@
# include <netdb.h>
# include <sys/uio.h>
# include <fcntl.h>
+# include <poll.h>
# include <unistd.h>
#endif
+#ifdef _WIN32
+# define poll(fdarray, nfds, timeout) WSAPoll(fdarray, nfds, timeout)
+#endif
+
static int
amqp_os_socket_init(void)
{
@@ -275,7 +281,9 @@ int amqp_open_socket_noblock(char const *hostname,
AMQP_INIT_TIMER(timer)
- if (timeout && (timeout->tv_sec < 0 || timeout->tv_usec < 0)) {
+ if (timeout && (timeout->tv_sec < 0 || timeout->tv_usec < 0 ||
+ INT_MAX < ((uint64_t)timeout->tv_sec * AMQP_MS_PER_S +
+ (uint64_t)timeout->tv_usec / AMQP_US_PER_MS))) {
return AMQP_STATUS_INVALID_PARAMETER;
}
@@ -349,14 +357,12 @@ int amqp_open_socket_noblock(char const *hostname,
#endif
while(1) {
- fd_set write_fd;
- fd_set except_fd;
+ struct pollfd pfd;
+ int timeout_ms;
- FD_ZERO(&write_fd);
- FD_SET(sockfd, &write_fd);
-
- FD_ZERO(&except_fd);
- FD_SET(sockfd, &except_fd);
+ pfd.fd = sockfd;
+ pfd.events = POLLERR | POLLOUT;
+ pfd.revents = 0;
timer_error = amqp_timer_update(&timer, timeout);
@@ -365,11 +371,13 @@ int amqp_open_socket_noblock(char const *hostname,
break;
}
+ timeout_ms = timer.tv.tv_sec * AMQP_MS_PER_S +
+ timer.tv.tv_usec / AMQP_US_PER_MS;
/* Win32 requires except_fds to be passed to detect connection
* failure. Other platforms only need write_fds, passing except_fds
* seems to be harmless otherwise
*/
- res = select(sockfd+1, NULL, &write_fd, &except_fd, &timer.tv);
+ res = poll(&pfd, 1, timeout_ms);
if (res > 0) {
int result;
@@ -547,22 +555,29 @@ static int recv_with_timeout(amqp_connection_state_t state, uint64_t start, stru
if (timeout) {
int fd;
- fd_set read_fd;
- fd_set except_fd;
fd = amqp_get_sockfd(state);
if (-1 == fd) {
return AMQP_STATUS_CONNECTION_CLOSED;
}
+ if (INT_MAX < (uint64_t)timeout->tv_sec * AMQP_MS_PER_S +
+ (uint64_t)timeout->tv_usec / AMQP_US_PER_MS) {
+ return AMQP_STATUS_INVALID_PARAMETER;
+ }
+
while (1) {
- FD_ZERO(&read_fd);
- FD_SET(fd, &read_fd);
+ struct pollfd pfd;
+ int timeout_ms;
+
+ pfd.fd = fd;
+ pfd.events = POLLIN;
+ pfd.revents = 0;
- FD_ZERO(&except_fd);
- FD_SET(fd, &except_fd);
+ timeout_ms = timeout->tv_sec * AMQP_MS_PER_S +
+ timeout->tv_usec * AMQP_US_PER_MS;
- res = select(fd + 1, &read_fd, NULL, &except_fd, timeout);
+ res = poll(&pfd, 1, timeout_ms);
if (0 < res) {
break;
diff --git a/librabbitmq/amqp_timer.h b/librabbitmq/amqp_timer.h
index 8ac3de9..10254a5 100644
--- a/librabbitmq/amqp_timer.h
+++ b/librabbitmq/amqp_timer.h
@@ -37,7 +37,10 @@
# include <sys/time.h>
#endif
-#define AMQP_NS_PER_S 1000000000
+#define AMQP_MS_PER_S 1000
+#define AMQP_US_PER_MS 1000
+#define AMQP_NS_PER_S 1000000000
+#define AMQP_NS_PER_MS 1000000
#define AMQP_NS_PER_US 1000
#define AMQP_INIT_TIMER(structure) { \