summaryrefslogtreecommitdiff
path: root/librabbitmq/amqp_socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'librabbitmq/amqp_socket.c')
-rw-r--r--librabbitmq/amqp_socket.c74
1 files changed, 54 insertions, 20 deletions
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index 79a7696..29a7389 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -3,7 +3,7 @@
* ***** BEGIN LICENSE BLOCK *****
* Version: MIT
*
- * Portions created by Alan Antonuk are Copyright (c) 2012-2013
+ * Portions created by Alan Antonuk are Copyright (c) 2012-2014
* Alan Antonuk. All Rights Reserved.
*
* Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc.
@@ -42,6 +42,7 @@
#include "amqp_timer.h"
#include <assert.h>
+#include <limits.h>
#include <stdarg.h>
#include <stdint.h>
#include <stdio.h>
@@ -64,9 +65,14 @@
# include <netdb.h>
# include <sys/uio.h>
# include <fcntl.h>
+# include <poll.h>
# include <unistd.h>
#endif
+#ifdef _WIN32
+# define poll(fdarray, nfds, timeout) WSAPoll(fdarray, nfds, timeout)
+#endif
+
static int
amqp_os_socket_init(void)
{
@@ -275,7 +281,9 @@ int amqp_open_socket_noblock(char const *hostname,
AMQP_INIT_TIMER(timer)
- if (timeout && (timeout->tv_sec < 0 || timeout->tv_usec < 0)) {
+ if (timeout && (timeout->tv_sec < 0 || timeout->tv_usec < 0 ||
+ INT_MAX < ((uint64_t)timeout->tv_sec * AMQP_MS_PER_S +
+ (uint64_t)timeout->tv_usec / AMQP_US_PER_MS))) {
return AMQP_STATUS_INVALID_PARAMETER;
}
@@ -300,7 +308,6 @@ int amqp_open_socket_noblock(char const *hostname,
for (addr = address_list; addr; addr = addr->ai_next) {
if (-1 != sockfd) {
amqp_os_socket_close(sockfd);
- sockfd = -1;
}
sockfd = amqp_os_socket_socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol);
@@ -349,14 +356,12 @@ int amqp_open_socket_noblock(char const *hostname,
#endif
while(1) {
- fd_set write_fd;
- fd_set except_fd;
+ struct pollfd pfd;
+ int timeout_ms;
- FD_ZERO(&write_fd);
- FD_SET(sockfd, &write_fd);
-
- FD_ZERO(&except_fd);
- FD_SET(sockfd, &except_fd);
+ pfd.fd = sockfd;
+ pfd.events = POLLERR | POLLOUT;
+ pfd.revents = 0;
timer_error = amqp_timer_update(&timer, timeout);
@@ -365,11 +370,13 @@ int amqp_open_socket_noblock(char const *hostname,
break;
}
+ timeout_ms = timer.tv.tv_sec * AMQP_MS_PER_S +
+ timer.tv.tv_usec / AMQP_US_PER_MS;
/* Win32 requires except_fds to be passed to detect connection
* failure. Other platforms only need write_fds, passing except_fds
* seems to be harmless otherwise
*/
- res = select(sockfd+1, NULL, &write_fd, &except_fd, &timer.tv);
+ res = poll(&pfd, 1, timeout_ms);
if (res > 0) {
int result;
@@ -547,22 +554,29 @@ static int recv_with_timeout(amqp_connection_state_t state, uint64_t start, stru
if (timeout) {
int fd;
- fd_set read_fd;
- fd_set except_fd;
fd = amqp_get_sockfd(state);
if (-1 == fd) {
return AMQP_STATUS_CONNECTION_CLOSED;
}
+ if (INT_MAX < (uint64_t)timeout->tv_sec * AMQP_MS_PER_S +
+ (uint64_t)timeout->tv_usec / AMQP_US_PER_MS) {
+ return AMQP_STATUS_INVALID_PARAMETER;
+ }
+
while (1) {
- FD_ZERO(&read_fd);
- FD_SET(fd, &read_fd);
+ struct pollfd pfd;
+ int timeout_ms;
+
+ pfd.fd = fd;
+ pfd.events = POLLIN;
+ pfd.revents = 0;
- FD_ZERO(&except_fd);
- FD_SET(fd, &except_fd);
+ timeout_ms = timeout->tv_sec * AMQP_MS_PER_S +
+ timeout->tv_usec / AMQP_US_PER_MS;
- res = select(fd + 1, &read_fd, NULL, &except_fd, timeout);
+ res = poll(&pfd, 1, timeout_ms);
if (0 < res) {
break;
@@ -1144,7 +1158,7 @@ static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state,
}
{
- amqp_table_entry_t default_properties[2];
+ amqp_table_entry_t default_properties[5];
amqp_table_t default_table;
amqp_connection_start_ok_t s;
amqp_pool_t *channel_pool;
@@ -1168,9 +1182,27 @@ static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state,
default_properties[0].value.value.bytes =
amqp_cstring_bytes("rabbitmq-c");
- default_properties[1].key = amqp_cstring_bytes("information");
+ /* version */
+ default_properties[1].key = amqp_cstring_bytes("version");
default_properties[1].value.kind = AMQP_FIELD_KIND_UTF8;
default_properties[1].value.value.bytes =
+ amqp_cstring_bytes(AMQP_VERSION_STRING);
+
+ /* platform */
+ default_properties[2].key = amqp_cstring_bytes("platform");
+ default_properties[2].value.kind = AMQP_FIELD_KIND_UTF8;
+ default_properties[2].value.value.bytes =
+ amqp_cstring_bytes(AMQ_PLATFORM);
+
+ /* copyright */
+ default_properties[3].key = amqp_cstring_bytes("copyright");
+ default_properties[3].value.kind = AMQP_FIELD_KIND_UTF8;
+ default_properties[3].value.value.bytes =
+ amqp_cstring_bytes(AMQ_COPYRIGHT);
+
+ default_properties[4].key = amqp_cstring_bytes("information");
+ default_properties[4].value.kind = AMQP_FIELD_KIND_UTF8;
+ default_properties[4].value.value.bytes =
amqp_cstring_bytes("See https://github.com/alanxz/rabbitmq-c");
default_table.entries = default_properties;
@@ -1245,6 +1277,8 @@ static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state,
if (server_channel_max != 0 && server_channel_max < channel_max) {
channel_max = server_channel_max;
+ } else if (server_channel_max == 0 && channel_max == 0) {
+ channel_max = UINT16_MAX;
}
if (server_frame_max != 0 && server_frame_max < frame_max) {