diff options
Diffstat (limited to 'librabbitmq/amqp_socket.c')
-rw-r--r-- | librabbitmq/amqp_socket.c | 111 |
1 files changed, 56 insertions, 55 deletions
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c index 17805fa..13f6376 100644 --- a/librabbitmq/amqp_socket.c +++ b/librabbitmq/amqp_socket.c @@ -52,47 +52,47 @@ #include <stdio.h> #include <string.h> #include <stdint.h> -#include <errno.h> #include <stdarg.h> +#include <assert.h> #include "amqp.h" #include "amqp_framing.h" #include "amqp_private.h" -#include <sys/types.h> -#include <sys/uio.h> -#include <unistd.h> -#include <sys/socket.h> -#include <netdb.h> -#include <netinet/in.h> +#include "socket.h" -#include <assert.h> int amqp_open_socket(char const *hostname, int portnumber) { - int sockfd; + int sockfd, res; struct sockaddr_in addr; struct hostent *he; + int one = 1; /* used as a buffer by setsockopt below */ + + res = amqp_socket_init(); + if (res) + return res; he = gethostbyname(hostname); - if (he == NULL) { - return -ENOENT; - } + if (he == NULL) + return -ERROR_GETHOSTBYNAME_FAILED; addr.sin_family = AF_INET; addr.sin_port = htons(portnumber); addr.sin_addr.s_addr = * (uint32_t *) he->h_addr_list[0]; sockfd = socket(PF_INET, SOCK_STREAM, 0); - if (sockfd == -1) { - return -errno; - } + if (sockfd == -1) + return -amqp_socket_error(); - if (connect(sockfd, (struct sockaddr *) &addr, sizeof(addr)) < 0) { - int result = -errno; - close(sockfd); - return result; + if (amqp_socket_setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, &one, + sizeof(one)) < 0 + || connect(sockfd, (struct sockaddr *) &addr, sizeof(addr)) < 0) + { + res = -amqp_socket_error(); + amqp_socket_close(sockfd); + return res; } return sockfd; @@ -104,22 +104,15 @@ static char *header() { header[1] = 'M'; header[2] = 'Q'; header[3] = 'P'; -#ifndef USE_MODERN_AMQP_PROTOCOL_HEADER - header[4] = 1; - header[5] = 1; - header[6] = AMQP_PROTOCOL_VERSION_MAJOR; - header[7] = AMQP_PROTOCOL_VERSION_MINOR; -#else header[4] = 0; header[5] = AMQP_PROTOCOL_VERSION_MAJOR; header[6] = AMQP_PROTOCOL_VERSION_MINOR; header[7] = AMQP_PROTOCOL_VERSION_REVISION; -#endif return header; } int amqp_send_header(amqp_connection_state_t state) { - return write(state->sockfd, header(), 8); + return send(state->sockfd, header(), 8, 0); } int amqp_send_header_to(amqp_connection_state_t state, @@ -194,24 +187,21 @@ static int wait_frame_inner(amqp_connection_state_t state, AMQP_CHECK_RESULT((result = amqp_handle_input(state, buffer, decoded_frame))); state->sock_inbound_offset += result; - if (decoded_frame->frame_type != 0) { + if (decoded_frame->frame_type != 0) /* Complete frame was read. Return it. */ - return 1; - } + return 0; /* Incomplete or ignored frame. Keep processing input. */ assert(result != 0); } - result = read(state->sockfd, - state->sock_inbound_buffer.bytes, - state->sock_inbound_buffer.len); - if (result < 0) { - return -errno; - } - if (result == 0) { - /* EOF. */ - return 0; + result = recv(state->sockfd, state->sock_inbound_buffer.bytes, + state->sock_inbound_buffer.len, 0); + if (result <= 0) { + if (result == 0) + return -ERROR_CONNECTION_CLOSED; + else + return -amqp_socket_error(); } state->sock_inbound_limit = result; @@ -229,7 +219,7 @@ int amqp_simple_wait_frame(amqp_connection_state_t state, state->last_queued_frame = NULL; } *decoded_frame = *f; - return 1; + return 0; } else { return wait_frame_inner(state, decoded_frame); } @@ -241,8 +231,10 @@ int amqp_simple_wait_method(amqp_connection_state_t state, amqp_method_t *output) { amqp_frame_t frame; - - AMQP_CHECK_EOF_RESULT(amqp_simple_wait_frame(state, &frame)); + int res = amqp_simple_wait_frame(state, &frame); + if (res < 0) + return res; + amqp_assert(frame.channel == expected_channel, "Expected 0x%08X method frame on channel %d, got frame on channel %d", expected_method, @@ -259,7 +251,7 @@ int amqp_simple_wait_method(amqp_connection_state_t state, expected_channel, frame.payload.method.id); *output = frame.payload.method; - return 1; + return 0; } int amqp_send_method(amqp_connection_state_t state, @@ -299,7 +291,7 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state, status = amqp_send_method(state, channel, request_id, decoded_request_method); if (status < 0) { result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; - result.library_errno = -status; + result.library_error = -status; return result; } @@ -308,9 +300,9 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state, retry: status = wait_frame_inner(state, &frame); - if (status <= 0) { + if (status < 0) { result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; - result.library_errno = -status; + result.library_error = -status; return result; } @@ -335,7 +327,7 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state, if (frame_copy == NULL || link == NULL) { result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; - result.library_errno = ENOMEM; + result.library_error = ERROR_NO_MEMORY; return result; } @@ -370,6 +362,7 @@ static int amqp_login_inner(amqp_connection_state_t state, amqp_sasl_method_enum sasl_method, va_list vl) { + int res; amqp_method_t method; uint32_t server_frame_max; uint16_t server_channel_max; @@ -377,12 +370,16 @@ static int amqp_login_inner(amqp_connection_state_t state, amqp_send_header(state); - AMQP_CHECK_EOF_RESULT(amqp_simple_wait_method(state, 0, AMQP_CONNECTION_START_METHOD, &method)); + res = amqp_simple_wait_method(state, 0, AMQP_CONNECTION_START_METHOD, + &method); + if (res < 0) + return res; + { amqp_connection_start_t *s = (amqp_connection_start_t *) method.decoded; if ((s->version_major != AMQP_PROTOCOL_VERSION_MAJOR) || (s->version_minor != AMQP_PROTOCOL_VERSION_MINOR)) { - return -EPROTOTYPE; + return -ERROR_INCOMPATIBLE_AMQP_VERSION; } /* TODO: check that our chosen SASL mechanism is in the list of @@ -394,7 +391,7 @@ static int amqp_login_inner(amqp_connection_state_t state, amqp_bytes_t response_bytes = sasl_response(&state->decoding_pool, sasl_method, vl); amqp_connection_start_ok_t s; if (response_bytes.bytes == NULL) { - return -ENOMEM; + return -ERROR_NO_MEMORY; } s = (amqp_connection_start_ok_t) { @@ -408,7 +405,11 @@ static int amqp_login_inner(amqp_connection_state_t state, amqp_release_buffers(state); - AMQP_CHECK_EOF_RESULT(amqp_simple_wait_method(state, 0, AMQP_CONNECTION_TUNE_METHOD, &method)); + res = amqp_simple_wait_method(state, 0, AMQP_CONNECTION_TUNE_METHOD, + &method); + if (res < 0) + return res; + { amqp_connection_tune_t *s = (amqp_connection_tune_t *) method.decoded; server_channel_max = s->channel_max; @@ -442,7 +443,7 @@ static int amqp_login_inner(amqp_connection_state_t state, amqp_release_buffers(state); - return 1; + return 0; } amqp_rpc_reply_t amqp_login(amqp_connection_state_t state, @@ -460,11 +461,11 @@ amqp_rpc_reply_t amqp_login(amqp_connection_state_t state, va_start(vl, sasl_method); status = amqp_login_inner(state, channel_max, frame_max, heartbeat, sasl_method, vl); - if (status <= 0) { + if (status < 0) { result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; result.reply.id = 0; result.reply.decoded = NULL; - result.library_errno = -status; + result.library_error = -status; return result; } @@ -492,6 +493,6 @@ amqp_rpc_reply_t amqp_login(amqp_connection_state_t state, result.reply_type = AMQP_RESPONSE_NORMAL; result.reply.id = 0; result.reply.decoded = NULL; - result.library_errno = 0; + result.library_error = 0; return result; } |