summaryrefslogtreecommitdiff
path: root/librabbitmq/amqp_socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'librabbitmq/amqp_socket.c')
-rw-r--r--librabbitmq/amqp_socket.c111
1 files changed, 56 insertions, 55 deletions
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index 17805fa..13f6376 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -52,47 +52,47 @@
#include <stdio.h>
#include <string.h>
#include <stdint.h>
-#include <errno.h>
#include <stdarg.h>
+#include <assert.h>
#include "amqp.h"
#include "amqp_framing.h"
#include "amqp_private.h"
-#include <sys/types.h>
-#include <sys/uio.h>
-#include <unistd.h>
-#include <sys/socket.h>
-#include <netdb.h>
-#include <netinet/in.h>
+#include "socket.h"
-#include <assert.h>
int amqp_open_socket(char const *hostname,
int portnumber)
{
- int sockfd;
+ int sockfd, res;
struct sockaddr_in addr;
struct hostent *he;
+ int one = 1; /* used as a buffer by setsockopt below */
+
+ res = amqp_socket_init();
+ if (res)
+ return res;
he = gethostbyname(hostname);
- if (he == NULL) {
- return -ENOENT;
- }
+ if (he == NULL)
+ return -ERROR_GETHOSTBYNAME_FAILED;
addr.sin_family = AF_INET;
addr.sin_port = htons(portnumber);
addr.sin_addr.s_addr = * (uint32_t *) he->h_addr_list[0];
sockfd = socket(PF_INET, SOCK_STREAM, 0);
- if (sockfd == -1) {
- return -errno;
- }
+ if (sockfd == -1)
+ return -amqp_socket_error();
- if (connect(sockfd, (struct sockaddr *) &addr, sizeof(addr)) < 0) {
- int result = -errno;
- close(sockfd);
- return result;
+ if (amqp_socket_setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, &one,
+ sizeof(one)) < 0
+ || connect(sockfd, (struct sockaddr *) &addr, sizeof(addr)) < 0)
+ {
+ res = -amqp_socket_error();
+ amqp_socket_close(sockfd);
+ return res;
}
return sockfd;
@@ -104,22 +104,15 @@ static char *header() {
header[1] = 'M';
header[2] = 'Q';
header[3] = 'P';
-#ifndef USE_MODERN_AMQP_PROTOCOL_HEADER
- header[4] = 1;
- header[5] = 1;
- header[6] = AMQP_PROTOCOL_VERSION_MAJOR;
- header[7] = AMQP_PROTOCOL_VERSION_MINOR;
-#else
header[4] = 0;
header[5] = AMQP_PROTOCOL_VERSION_MAJOR;
header[6] = AMQP_PROTOCOL_VERSION_MINOR;
header[7] = AMQP_PROTOCOL_VERSION_REVISION;
-#endif
return header;
}
int amqp_send_header(amqp_connection_state_t state) {
- return write(state->sockfd, header(), 8);
+ return send(state->sockfd, header(), 8, 0);
}
int amqp_send_header_to(amqp_connection_state_t state,
@@ -194,24 +187,21 @@ static int wait_frame_inner(amqp_connection_state_t state,
AMQP_CHECK_RESULT((result = amqp_handle_input(state, buffer, decoded_frame)));
state->sock_inbound_offset += result;
- if (decoded_frame->frame_type != 0) {
+ if (decoded_frame->frame_type != 0)
/* Complete frame was read. Return it. */
- return 1;
- }
+ return 0;
/* Incomplete or ignored frame. Keep processing input. */
assert(result != 0);
}
- result = read(state->sockfd,
- state->sock_inbound_buffer.bytes,
- state->sock_inbound_buffer.len);
- if (result < 0) {
- return -errno;
- }
- if (result == 0) {
- /* EOF. */
- return 0;
+ result = recv(state->sockfd, state->sock_inbound_buffer.bytes,
+ state->sock_inbound_buffer.len, 0);
+ if (result <= 0) {
+ if (result == 0)
+ return -ERROR_CONNECTION_CLOSED;
+ else
+ return -amqp_socket_error();
}
state->sock_inbound_limit = result;
@@ -229,7 +219,7 @@ int amqp_simple_wait_frame(amqp_connection_state_t state,
state->last_queued_frame = NULL;
}
*decoded_frame = *f;
- return 1;
+ return 0;
} else {
return wait_frame_inner(state, decoded_frame);
}
@@ -241,8 +231,10 @@ int amqp_simple_wait_method(amqp_connection_state_t state,
amqp_method_t *output)
{
amqp_frame_t frame;
-
- AMQP_CHECK_EOF_RESULT(amqp_simple_wait_frame(state, &frame));
+ int res = amqp_simple_wait_frame(state, &frame);
+ if (res < 0)
+ return res;
+
amqp_assert(frame.channel == expected_channel,
"Expected 0x%08X method frame on channel %d, got frame on channel %d",
expected_method,
@@ -259,7 +251,7 @@ int amqp_simple_wait_method(amqp_connection_state_t state,
expected_channel,
frame.payload.method.id);
*output = frame.payload.method;
- return 1;
+ return 0;
}
int amqp_send_method(amqp_connection_state_t state,
@@ -299,7 +291,7 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state,
status = amqp_send_method(state, channel, request_id, decoded_request_method);
if (status < 0) {
result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
- result.library_errno = -status;
+ result.library_error = -status;
return result;
}
@@ -308,9 +300,9 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state,
retry:
status = wait_frame_inner(state, &frame);
- if (status <= 0) {
+ if (status < 0) {
result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
- result.library_errno = -status;
+ result.library_error = -status;
return result;
}
@@ -335,7 +327,7 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state,
if (frame_copy == NULL || link == NULL) {
result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
- result.library_errno = ENOMEM;
+ result.library_error = ERROR_NO_MEMORY;
return result;
}
@@ -370,6 +362,7 @@ static int amqp_login_inner(amqp_connection_state_t state,
amqp_sasl_method_enum sasl_method,
va_list vl)
{
+ int res;
amqp_method_t method;
uint32_t server_frame_max;
uint16_t server_channel_max;
@@ -377,12 +370,16 @@ static int amqp_login_inner(amqp_connection_state_t state,
amqp_send_header(state);
- AMQP_CHECK_EOF_RESULT(amqp_simple_wait_method(state, 0, AMQP_CONNECTION_START_METHOD, &method));
+ res = amqp_simple_wait_method(state, 0, AMQP_CONNECTION_START_METHOD,
+ &method);
+ if (res < 0)
+ return res;
+
{
amqp_connection_start_t *s = (amqp_connection_start_t *) method.decoded;
if ((s->version_major != AMQP_PROTOCOL_VERSION_MAJOR) ||
(s->version_minor != AMQP_PROTOCOL_VERSION_MINOR)) {
- return -EPROTOTYPE;
+ return -ERROR_INCOMPATIBLE_AMQP_VERSION;
}
/* TODO: check that our chosen SASL mechanism is in the list of
@@ -394,7 +391,7 @@ static int amqp_login_inner(amqp_connection_state_t state,
amqp_bytes_t response_bytes = sasl_response(&state->decoding_pool, sasl_method, vl);
amqp_connection_start_ok_t s;
if (response_bytes.bytes == NULL) {
- return -ENOMEM;
+ return -ERROR_NO_MEMORY;
}
s =
(amqp_connection_start_ok_t) {
@@ -408,7 +405,11 @@ static int amqp_login_inner(amqp_connection_state_t state,
amqp_release_buffers(state);
- AMQP_CHECK_EOF_RESULT(amqp_simple_wait_method(state, 0, AMQP_CONNECTION_TUNE_METHOD, &method));
+ res = amqp_simple_wait_method(state, 0, AMQP_CONNECTION_TUNE_METHOD,
+ &method);
+ if (res < 0)
+ return res;
+
{
amqp_connection_tune_t *s = (amqp_connection_tune_t *) method.decoded;
server_channel_max = s->channel_max;
@@ -442,7 +443,7 @@ static int amqp_login_inner(amqp_connection_state_t state,
amqp_release_buffers(state);
- return 1;
+ return 0;
}
amqp_rpc_reply_t amqp_login(amqp_connection_state_t state,
@@ -460,11 +461,11 @@ amqp_rpc_reply_t amqp_login(amqp_connection_state_t state,
va_start(vl, sasl_method);
status = amqp_login_inner(state, channel_max, frame_max, heartbeat, sasl_method, vl);
- if (status <= 0) {
+ if (status < 0) {
result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
result.reply.id = 0;
result.reply.decoded = NULL;
- result.library_errno = -status;
+ result.library_error = -status;
return result;
}
@@ -492,6 +493,6 @@ amqp_rpc_reply_t amqp_login(amqp_connection_state_t state,
result.reply_type = AMQP_RESPONSE_NORMAL;
result.reply.id = 0;
result.reply.decoded = NULL;
- result.library_errno = 0;
+ result.library_error = 0;
return result;
}