summaryrefslogtreecommitdiff
path: root/librabbitmq/amqp_socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'librabbitmq/amqp_socket.c')
-rw-r--r--librabbitmq/amqp_socket.c98
1 files changed, 52 insertions, 46 deletions
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index d16c319..6425c34 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -52,43 +52,40 @@
#include <stdio.h>
#include <string.h>
#include <stdint.h>
-#include <errno.h>
#include <stdarg.h>
+#include <assert.h>
#include "amqp.h"
#include "amqp_framing.h"
#include "amqp_private.h"
-#include <sys/types.h>
-#include <sys/uio.h>
-#include <unistd.h>
-#include <sys/socket.h>
-#include <netdb.h>
-#include <netinet/in.h>
+#include "socket.h"
-#include <assert.h>
int amqp_open_socket(char const *hostname,
int portnumber)
{
- int sockfd;
+ int sockfd, res;
struct sockaddr_in addr;
struct hostent *he;
+ res = socket_init();
+ if (res)
+ return res;
+
he = gethostbyname(hostname);
- if (he == NULL) {
- return -ENOENT;
- }
+ if (he == NULL)
+ return -ERROR_HOST_NOT_FOUND;
addr.sin_family = AF_INET;
addr.sin_port = htons(portnumber);
addr.sin_addr.s_addr = * (uint32_t *) he->h_addr_list[0];
- sockfd = socket(PF_INET, SOCK_STREAM, 0);
- if (connect(sockfd, (struct sockaddr *) &addr, sizeof(addr)) < 0) {
- int result = -errno;
- close(sockfd);
- return result;
+ sockfd = socket_socket(PF_INET, SOCK_STREAM, 0);
+ if (socket_connect(sockfd, (struct sockaddr *) &addr, sizeof(addr)) < 0) {
+ res = -encoded_socket_errno();
+ socket_close(sockfd);
+ return res;
}
return sockfd;
@@ -108,7 +105,7 @@ static char *header() {
}
int amqp_send_header(amqp_connection_state_t state) {
- return write(state->sockfd, header(), 8);
+ return socket_write(state->sockfd, header(), 8);
}
int amqp_send_header_to(amqp_connection_state_t state,
@@ -183,24 +180,22 @@ static int wait_frame_inner(amqp_connection_state_t state,
AMQP_CHECK_RESULT((result = amqp_handle_input(state, buffer, decoded_frame)));
state->sock_inbound_offset += result;
- if (decoded_frame->frame_type != 0) {
+ if (decoded_frame->frame_type != 0)
/* Complete frame was read. Return it. */
- return 1;
- }
+ return 0;
/* Incomplete or ignored frame. Keep processing input. */
assert(result != 0);
}
- result = read(state->sockfd,
- state->sock_inbound_buffer.bytes,
- state->sock_inbound_buffer.len);
- if (result < 0) {
- return -errno;
- }
- if (result == 0) {
- /* EOF. */
- return 0;
+ result = socket_read(state->sockfd,
+ state->sock_inbound_buffer.bytes,
+ state->sock_inbound_buffer.len);
+ if (result <= 0) {
+ if (result == 0)
+ return -ERROR_CONNECTION_CLOSED;
+ else
+ return -encoded_socket_errno();
}
state->sock_inbound_limit = result;
@@ -218,7 +213,7 @@ int amqp_simple_wait_frame(amqp_connection_state_t state,
state->last_queued_frame = NULL;
}
*decoded_frame = *f;
- return 1;
+ return 0;
} else {
return wait_frame_inner(state, decoded_frame);
}
@@ -230,8 +225,10 @@ int amqp_simple_wait_method(amqp_connection_state_t state,
amqp_method_t *output)
{
amqp_frame_t frame;
-
- AMQP_CHECK_EOF_RESULT(amqp_simple_wait_frame(state, &frame));
+ int res = amqp_simple_wait_frame(state, &frame);
+ if (res < 0)
+ return res;
+
amqp_assert(frame.channel == expected_channel,
"Expected 0x%08X method frame on channel %d, got frame on channel %d",
expected_method,
@@ -248,7 +245,7 @@ int amqp_simple_wait_method(amqp_connection_state_t state,
expected_channel,
frame.payload.method.id);
*output = frame.payload.method;
- return 1;
+ return 0;
}
int amqp_send_method(amqp_connection_state_t state,
@@ -288,7 +285,7 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state,
status = amqp_send_method(state, channel, request_id, decoded_request_method);
if (status < 0) {
result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
- result.library_errno = -status;
+ result.library_error = -status;
return result;
}
@@ -297,9 +294,9 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state,
retry:
status = wait_frame_inner(state, &frame);
- if (status <= 0) {
+ if (status < 0) {
result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
- result.library_errno = -status;
+ result.library_error = -status;
return result;
}
@@ -324,7 +321,7 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state,
if (frame_copy == NULL || link == NULL) {
result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
- result.library_errno = ENOMEM;
+ result.library_error = ERROR_NO_MEMORY;
return result;
}
@@ -359,6 +356,7 @@ static int amqp_login_inner(amqp_connection_state_t state,
amqp_sasl_method_enum sasl_method,
va_list vl)
{
+ int res;
amqp_method_t method;
uint32_t server_frame_max;
uint16_t server_channel_max;
@@ -366,12 +364,16 @@ static int amqp_login_inner(amqp_connection_state_t state,
amqp_send_header(state);
- AMQP_CHECK_EOF_RESULT(amqp_simple_wait_method(state, 0, AMQP_CONNECTION_START_METHOD, &method));
+ res = amqp_simple_wait_method(state, 0, AMQP_CONNECTION_START_METHOD,
+ &method);
+ if (res < 0)
+ return res;
+
{
amqp_connection_start_t *s = (amqp_connection_start_t *) method.decoded;
if ((s->version_major != AMQP_PROTOCOL_VERSION_MAJOR) ||
(s->version_minor != AMQP_PROTOCOL_VERSION_MINOR)) {
- return -EPROTOTYPE;
+ return -ERROR_INCOMPATIBLE_AMQP_VERSION;
}
/* TODO: check that our chosen SASL mechanism is in the list of
@@ -383,7 +385,7 @@ static int amqp_login_inner(amqp_connection_state_t state,
amqp_bytes_t response_bytes = sasl_response(&state->decoding_pool, sasl_method, vl);
amqp_connection_start_ok_t s;
if (response_bytes.bytes == NULL) {
- return -ENOMEM;
+ return -ERROR_NO_MEMORY;
}
s =
(amqp_connection_start_ok_t) {
@@ -397,7 +399,11 @@ static int amqp_login_inner(amqp_connection_state_t state,
amqp_release_buffers(state);
- AMQP_CHECK_EOF_RESULT(amqp_simple_wait_method(state, 0, AMQP_CONNECTION_TUNE_METHOD, &method));
+ res = amqp_simple_wait_method(state, 0, AMQP_CONNECTION_TUNE_METHOD,
+ &method);
+ if (res < 0)
+ return res;
+
{
amqp_connection_tune_t *s = (amqp_connection_tune_t *) method.decoded;
server_channel_max = s->channel_max;
@@ -431,7 +437,7 @@ static int amqp_login_inner(amqp_connection_state_t state,
amqp_release_buffers(state);
- return 1;
+ return 0;
}
amqp_rpc_reply_t amqp_login(amqp_connection_state_t state,
@@ -449,11 +455,11 @@ amqp_rpc_reply_t amqp_login(amqp_connection_state_t state,
va_start(vl, sasl_method);
status = amqp_login_inner(state, channel_max, frame_max, heartbeat, sasl_method, vl);
- if (status <= 0) {
+ if (status < 0) {
result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
result.reply.id = 0;
result.reply.decoded = NULL;
- result.library_errno = -status;
+ result.library_error = -status;
return result;
}
@@ -481,6 +487,6 @@ amqp_rpc_reply_t amqp_login(amqp_connection_state_t state,
result.reply_type = AMQP_RESPONSE_NORMAL;
result.reply.id = 0;
result.reply.decoded = NULL;
- result.library_errno = 0;
+ result.library_error = 0;
return result;
}