summaryrefslogtreecommitdiff
path: root/librabbitmq
diff options
context:
space:
mode:
Diffstat (limited to 'librabbitmq')
-rw-r--r--librabbitmq/Makefile.am12
-rw-r--r--librabbitmq/amqp.h17
-rw-r--r--librabbitmq/amqp_api.c42
-rw-r--r--librabbitmq/amqp_connection.c50
-rw-r--r--librabbitmq/amqp_mem.c20
-rw-r--r--librabbitmq/amqp_private.h32
-rw-r--r--librabbitmq/amqp_socket.c111
-rw-r--r--librabbitmq/amqp_table.c18
-rw-r--r--librabbitmq/codegen.py15
-rw-r--r--librabbitmq/unix/socket.c85
-rw-r--r--librabbitmq/unix/socket.h79
-rw-r--r--librabbitmq/windows/socket.c88
-rw-r--r--librabbitmq/windows/socket.h89
13 files changed, 530 insertions, 128 deletions
diff --git a/librabbitmq/Makefile.am b/librabbitmq/Makefile.am
index b4c8843..82b9f30 100644
--- a/librabbitmq/Makefile.am
+++ b/librabbitmq/Makefile.am
@@ -1,12 +1,18 @@
lib_LTLIBRARIES = librabbitmq.la
-librabbitmq_la_SOURCES = amqp_mem.c amqp_table.c amqp_connection.c amqp_socket.c amqp_debug.c amqp_api.c
+AM_CFLAGS = -I$(srcdir)/$(PLATFORM_DIR)
+librabbitmq_la_SOURCES = amqp_mem.c amqp_table.c amqp_connection.c amqp_socket.c amqp_debug.c amqp_api.c $(PLATFORM_DIR)/socket.c
+librabbitmq_la_LDFLAGS = -no-undefined
+librabbitmq_la_LIBADD = $(EXTRA_LIBS)
nodist_librabbitmq_la_SOURCES = amqp_framing.c
include_HEADERS = amqp_framing.h amqp.h
-noinst_HEADERS = amqp_private.h
+noinst_HEADERS = amqp_private.h $(PLATFORM_DIR)/socket.h
BUILT_SOURCES = amqp_framing.h amqp_framing.c
CLEANFILES = amqp_framing.h amqp_framing.c
-EXTRA_DIST = codegen.py
+EXTRA_DIST = \
+ codegen.py \
+ unix/socket.c unix/socket.h \
+ windows/socket.c windows/socket.h
CODEGEN_PY=$(srcdir)/codegen.py
diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h
index ad9b3e4..40c8292 100644
--- a/librabbitmq/amqp.h
+++ b/librabbitmq/amqp.h
@@ -263,7 +263,7 @@ typedef enum amqp_response_type_enum_ {
typedef struct amqp_rpc_reply_t_ {
amqp_response_type_enum reply_type;
amqp_method_t reply;
- int library_errno; /* if AMQP_RESPONSE_LIBRARY_EXCEPTION, then 0 here means socket EOF */
+ int library_error; /* if AMQP_RESPONSE_LIBRARY_EXCEPTION, then 0 here means socket EOF */
} amqp_rpc_reply_t;
typedef enum amqp_sasl_method_enum_ {
@@ -308,8 +308,8 @@ extern int amqp_tune_connection(amqp_connection_state_t state,
int channel_max,
int frame_max,
int heartbeat);
-int amqp_get_channel_max(amqp_connection_state_t state);
-extern void amqp_destroy_connection(amqp_connection_state_t state);
+extern int amqp_get_channel_max(amqp_connection_state_t state);
+extern int amqp_destroy_connection(amqp_connection_state_t state);
extern int amqp_handle_input(amqp_connection_state_t state,
amqp_bytes_t received_data,
@@ -412,7 +412,6 @@ extern struct amqp_exchange_declare_ok_t_ *amqp_exchange_declare(amqp_connection
amqp_bytes_t type,
amqp_boolean_t passive,
amqp_boolean_t durable,
- amqp_boolean_t auto_delete,
amqp_table_t arguments);
extern struct amqp_queue_declare_ok_t_ *amqp_queue_declare(amqp_connection_state_t state,
@@ -441,7 +440,7 @@ extern struct amqp_queue_unbind_ok_t_ *amqp_queue_unbind(amqp_connection_state_t
amqp_channel_t channel,
amqp_bytes_t queue,
amqp_bytes_t exchange,
- amqp_bytes_t binding_key,
+ amqp_bytes_t routing_key,
amqp_table_t arguments);
extern struct amqp_basic_consume_ok_t_ *amqp_basic_consume(amqp_connection_state_t state,
@@ -501,6 +500,14 @@ extern amqp_boolean_t amqp_data_in_buffer(amqp_connection_state_t state);
*/
extern amqp_rpc_reply_t amqp_get_rpc_reply(amqp_connection_state_t state);
+/*
+ * Get the error string for the given error code.
+ *
+ * The returned string resides on the heap; the caller is responsible
+ * for freeing it.
+ */
+extern char *amqp_error_string(int err);
+
#ifdef __cplusplus
}
#endif
diff --git a/librabbitmq/amqp_api.c b/librabbitmq/amqp_api.c
index 592ab58..b2793ff 100644
--- a/librabbitmq/amqp_api.c
+++ b/librabbitmq/amqp_api.c
@@ -52,7 +52,6 @@
#include <stdio.h>
#include <string.h>
#include <stdint.h>
-#include <errno.h>
#include "amqp.h"
#include "amqp_framing.h"
@@ -60,6 +59,40 @@
#include <assert.h>
+static const char *client_error_strings[ERROR_MAX] = {
+ "could not allocate memory", /* ERROR_NO_MEMORY */
+ "received bad AMQP data", /* ERROR_BAD_AQMP_DATA */
+ "unknown AMQP class id", /* ERROR_UNKOWN_CLASS */
+ "unknown AMQP method id", /* ERROR_UNKOWN_METHOD */
+ "unknown host", /* ERROR_GETHOSTBYNAME_FAILED */
+ "incompatible AMQP version", /* ERROR_INCOMPATIBLE_AMQP_VERSION */
+ "connection closed unexpectedly", /* ERROR_CONNECTION_CLOSED */
+};
+
+char *amqp_error_string(int err)
+{
+ const char *str;
+ int category = (err & ERROR_CATEGORY_MASK);
+ err = (err & ~ERROR_CATEGORY_MASK);
+
+ switch (category) {
+ case ERROR_CATEGORY_CLIENT:
+ if (err < 1 || err > ERROR_MAX)
+ str = "(undefined librabbitmq error)";
+ else
+ str = client_error_strings[err - 1];
+ break;
+
+ case ERROR_CATEGORY_OS:
+ return amqp_os_error_string(err);
+
+ default:
+ str = "(undefined error category)";
+ }
+
+ return strdup(str);
+}
+
#define RPC_REPLY(replytype) \
(state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL \
? (replytype *) state->most_recent_api_result.reply.decoded \
@@ -163,13 +196,12 @@ amqp_exchange_declare_ok_t *amqp_exchange_declare(amqp_connection_state_t state,
amqp_bytes_t type,
amqp_boolean_t passive,
amqp_boolean_t durable,
- amqp_boolean_t auto_delete,
amqp_table_t arguments)
{
state->most_recent_api_result =
AMQP_SIMPLE_RPC(state, channel, EXCHANGE, DECLARE, DECLARE_OK,
amqp_exchange_declare_t,
- 0, exchange, type, passive, durable, auto_delete, 0, 0, arguments);
+ 0, exchange, type, passive, durable, 0, 0, 0, arguments);
return RPC_REPLY(amqp_exchange_declare_ok_t);
}
@@ -220,13 +252,13 @@ amqp_queue_unbind_ok_t *amqp_queue_unbind(amqp_connection_state_t state,
amqp_channel_t channel,
amqp_bytes_t queue,
amqp_bytes_t exchange,
- amqp_bytes_t binding_key,
+ amqp_bytes_t routing_key,
amqp_table_t arguments)
{
state->most_recent_api_result =
AMQP_SIMPLE_RPC(state, channel, QUEUE, UNBIND, UNBIND_OK,
amqp_queue_unbind_t,
- 0, queue, exchange, binding_key, arguments);
+ 0, queue, exchange, routing_key, arguments);
return RPC_REPLY(amqp_queue_unbind_ok_t);
}
diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c
index 8623eed..3d95e98 100644
--- a/librabbitmq/amqp_connection.c
+++ b/librabbitmq/amqp_connection.c
@@ -52,17 +52,13 @@
#include <stdio.h>
#include <string.h>
#include <stdint.h>
-#include <errno.h>
-
-#include <unistd.h>
-#include <sys/uio.h>
-#include <sys/types.h>
+#include <assert.h>
#include "amqp.h"
#include "amqp_framing.h"
#include "amqp_private.h"
-#include <assert.h>
+#include "socket.h"
#define INITIAL_FRAME_POOL_PAGE_SIZE 65536
#define INITIAL_DECODING_POOL_PAGE_SIZE 131072
@@ -151,7 +147,7 @@ int amqp_tune_connection(amqp_connection_state_t state,
newbuf = realloc(state->outbound_buffer.bytes, frame_max);
if (newbuf == NULL) {
amqp_destroy_connection(state);
- return -ENOMEM;
+ return -ERROR_NO_MEMORY;
}
state->outbound_buffer.bytes = newbuf;
@@ -162,12 +158,19 @@ int amqp_get_channel_max(amqp_connection_state_t state) {
return state->channel_max;
}
-void amqp_destroy_connection(amqp_connection_state_t state) {
+int amqp_destroy_connection(amqp_connection_state_t state) {
+ int s = state->sockfd;
+
empty_amqp_pool(&state->frame_pool);
empty_amqp_pool(&state->decoding_pool);
free(state->outbound_buffer.bytes);
free(state->sock_inbound_buffer.bytes);
free(state);
+
+ if (s >= 0 && amqp_socket_close(s) < 0)
+ return -amqp_socket_error();
+ else
+ return 0;
}
static void return_to_idle(amqp_connection_state_t state) {
@@ -199,7 +202,7 @@ int amqp_handle_input(amqp_connection_state_t state,
/* state->inbound_buffer.len is always nonzero, because it
corresponds to frame_max, which is not permitted to be less
than AMQP_FRAME_MIN_SIZE (currently 4096 bytes). */
- return -ENOMEM;
+ return -ERROR_NO_MEMORY;
}
state->state = CONNECTION_STATE_WAITING_FOR_HEADER;
}
@@ -246,7 +249,7 @@ int amqp_handle_input(amqp_connection_state_t state,
/* Check frame end marker (footer) */
if (D_8(state->inbound_buffer, state->target_size - 1) != AMQP_FRAME_END) {
- return -EINVAL;
+ return -ERROR_BAD_AMQP_DATA;
}
decoded_frame->channel = D_16(state->inbound_buffer, 1);
@@ -392,7 +395,7 @@ static int inner_send_frame(amqp_connection_state_t state,
break;
default:
- return -EINVAL;
+ abort();
}
E_32(state->outbound_buffer, 3, *payload_len);
@@ -419,16 +422,14 @@ int amqp_send_frame(amqp_connection_state_t state,
amqp_frame_t const *frame)
{
amqp_bytes_t encoded;
- int payload_len;
- int separate_body;
+ int payload_len, res;
- separate_body = inner_send_frame(state, frame, &encoded, &payload_len);
- switch (separate_body) {
+ res = inner_send_frame(state, frame, &encoded, &payload_len);
+ switch (res) {
case 0:
- AMQP_CHECK_RESULT(write(state->sockfd,
- state->outbound_buffer.bytes,
- payload_len + (HEADER_SIZE + FOOTER_SIZE)));
- return 0;
+ res = send(state->sockfd, state->outbound_buffer.bytes,
+ payload_len + (HEADER_SIZE + FOOTER_SIZE), 0);
+ break;
case 1: {
struct iovec iov[3];
@@ -440,13 +441,18 @@ int amqp_send_frame(amqp_connection_state_t state,
iov[2].iov_base = &frame_end_byte;
assert(FOOTER_SIZE == 1);
iov[2].iov_len = FOOTER_SIZE;
- AMQP_CHECK_RESULT(writev(state->sockfd, &iov[0], 3));
- return 0;
+ res = amqp_socket_writev(state->sockfd, &iov[0], 3);
+ break;
}
default:
- return separate_body;
+ return res;
}
+
+ if (res < 0)
+ return -amqp_socket_error();
+ else
+ return 0;
}
int amqp_send_frame_to(amqp_connection_state_t state,
diff --git a/librabbitmq/amqp_mem.c b/librabbitmq/amqp_mem.c
index 6e52dc8..021151a 100644
--- a/librabbitmq/amqp_mem.c
+++ b/librabbitmq/amqp_mem.c
@@ -53,7 +53,6 @@
#include <string.h>
#include <stdint.h>
#include <sys/types.h>
-#include <errno.h>
#include <assert.h>
#include "amqp.h"
@@ -102,25 +101,24 @@ void empty_amqp_pool(amqp_pool_t *pool) {
empty_blocklist(&pool->pages);
}
+/* Returns 1 on success, 0 on failure */
static int record_pool_block(amqp_pool_blocklist_t *x, void *block) {
size_t blocklistlength = sizeof(void *) * (x->num_blocks + 1);
if (x->blocklist == NULL) {
x->blocklist = malloc(blocklistlength);
- if (x->blocklist == NULL) {
- return -ENOMEM;
- }
+ if (x->blocklist == NULL)
+ return 0;
} else {
void *newbl = realloc(x->blocklist, blocklistlength);
- if (newbl == NULL) {
- return -ENOMEM;
- }
+ if (newbl == NULL)
+ return 0;
x->blocklist = newbl;
}
x->blocklist[x->num_blocks] = block;
x->num_blocks++;
- return 0;
+ return 1;
}
void *amqp_pool_alloc(amqp_pool_t *pool, size_t amount) {
@@ -135,9 +133,8 @@ void *amqp_pool_alloc(amqp_pool_t *pool, size_t amount) {
if (result == NULL) {
return NULL;
}
- if (record_pool_block(&pool->large_blocks, result) != 0) {
+ if (!record_pool_block(&pool->large_blocks, result))
return NULL;
- }
return result;
}
@@ -156,9 +153,8 @@ void *amqp_pool_alloc(amqp_pool_t *pool, size_t amount) {
if (pool->alloc_block == NULL) {
return NULL;
}
- if (record_pool_block(&pool->pages, pool->alloc_block) != 0) {
+ if (!record_pool_block(&pool->pages, pool->alloc_block))
return NULL;
- }
pool->next_page = pool->pages.num_blocks;
} else {
pool->alloc_block = pool->pages.blocklist[pool->next_page];
diff --git a/librabbitmq/amqp_private.h b/librabbitmq/amqp_private.h
index 3985619..c30663a 100644
--- a/librabbitmq/amqp_private.h
+++ b/librabbitmq/amqp_private.h
@@ -55,7 +55,28 @@
extern "C" {
#endif
-#include <arpa/inet.h> /* ntohl, htonl, ntohs, htons */
+/* Error numbering: Because of differences in error numbering on
+ * different platforms, we want to keep error numbers opaque for
+ * client code. Internally, we encode the category of an error
+ * (i.e. where its number comes from) in the top bits of the number
+ * (assuming that an int has at least 32 bits).
+ */
+#define ERROR_CATEGORY_MASK (1 << 29)
+
+#define ERROR_CATEGORY_CLIENT (0 << 29) /* librabbitmq error codes */
+#define ERROR_CATEGORY_OS (1 << 29) /* OS-specific error codes */
+
+/* librabbitmq error codes */
+#define ERROR_NO_MEMORY 1
+#define ERROR_BAD_AMQP_DATA 2
+#define ERROR_UNKNOWN_CLASS 3
+#define ERROR_UNKNOWN_METHOD 4
+#define ERROR_GETHOSTBYNAME_FAILED 5
+#define ERROR_INCOMPATIBLE_AMQP_VERSION 6
+#define ERROR_CONNECTION_CLOSED 7
+#define ERROR_MAX 7
+
+extern char *amqp_os_error_string(int err);
/*
* Connection states:
@@ -125,7 +146,7 @@ struct amqp_connection_state_t_ {
amqp_rpc_reply_t most_recent_api_result;
};
-#define CHECK_LIMIT(b, o, l, v) ({ if ((o + l) > (b).len) { return -EFAULT; } (v); })
+#define CHECK_LIMIT(b, o, l, v) ({ if ((o + l) > (b).len) { return -ERROR_BAD_AMQP_DATA; } (v); })
#define BUF_AT(b, o) (&(((uint8_t *) (b).bytes)[o]))
#define D_8(b, o) CHECK_LIMIT(b, o, 1, * (uint8_t *) BUF_AT(b, o))
@@ -176,13 +197,6 @@ extern int amqp_encode_table(amqp_bytes_t encoded,
#define AMQP_CHECK_RESULT(expr) AMQP_CHECK_RESULT_CLEANUP(expr, )
-#define AMQP_CHECK_EOF_RESULT(expr) \
- ({ \
- int _result = (expr); \
- if (_result <= 0) return _result; \
- _result; \
- })
-
#ifndef NDEBUG
extern void amqp_dump(void const *buffer, size_t len);
#else
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index 17805fa..13f6376 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -52,47 +52,47 @@
#include <stdio.h>
#include <string.h>
#include <stdint.h>
-#include <errno.h>
#include <stdarg.h>
+#include <assert.h>
#include "amqp.h"
#include "amqp_framing.h"
#include "amqp_private.h"
-#include <sys/types.h>
-#include <sys/uio.h>
-#include <unistd.h>
-#include <sys/socket.h>
-#include <netdb.h>
-#include <netinet/in.h>
+#include "socket.h"
-#include <assert.h>
int amqp_open_socket(char const *hostname,
int portnumber)
{
- int sockfd;
+ int sockfd, res;
struct sockaddr_in addr;
struct hostent *he;
+ int one = 1; /* used as a buffer by setsockopt below */
+
+ res = amqp_socket_init();
+ if (res)
+ return res;
he = gethostbyname(hostname);
- if (he == NULL) {
- return -ENOENT;
- }
+ if (he == NULL)
+ return -ERROR_GETHOSTBYNAME_FAILED;
addr.sin_family = AF_INET;
addr.sin_port = htons(portnumber);
addr.sin_addr.s_addr = * (uint32_t *) he->h_addr_list[0];
sockfd = socket(PF_INET, SOCK_STREAM, 0);
- if (sockfd == -1) {
- return -errno;
- }
+ if (sockfd == -1)
+ return -amqp_socket_error();
- if (connect(sockfd, (struct sockaddr *) &addr, sizeof(addr)) < 0) {
- int result = -errno;
- close(sockfd);
- return result;
+ if (amqp_socket_setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, &one,
+ sizeof(one)) < 0
+ || connect(sockfd, (struct sockaddr *) &addr, sizeof(addr)) < 0)
+ {
+ res = -amqp_socket_error();
+ amqp_socket_close(sockfd);
+ return res;
}
return sockfd;
@@ -104,22 +104,15 @@ static char *header() {
header[1] = 'M';
header[2] = 'Q';
header[3] = 'P';
-#ifndef USE_MODERN_AMQP_PROTOCOL_HEADER
- header[4] = 1;
- header[5] = 1;
- header[6] = AMQP_PROTOCOL_VERSION_MAJOR;
- header[7] = AMQP_PROTOCOL_VERSION_MINOR;
-#else
header[4] = 0;
header[5] = AMQP_PROTOCOL_VERSION_MAJOR;
header[6] = AMQP_PROTOCOL_VERSION_MINOR;
header[7] = AMQP_PROTOCOL_VERSION_REVISION;
-#endif
return header;
}
int amqp_send_header(amqp_connection_state_t state) {
- return write(state->sockfd, header(), 8);
+ return send(state->sockfd, header(), 8, 0);
}
int amqp_send_header_to(amqp_connection_state_t state,
@@ -194,24 +187,21 @@ static int wait_frame_inner(amqp_connection_state_t state,
AMQP_CHECK_RESULT((result = amqp_handle_input(state, buffer, decoded_frame)));
state->sock_inbound_offset += result;
- if (decoded_frame->frame_type != 0) {
+ if (decoded_frame->frame_type != 0)
/* Complete frame was read. Return it. */
- return 1;
- }
+ return 0;
/* Incomplete or ignored frame. Keep processing input. */
assert(result != 0);
}
- result = read(state->sockfd,
- state->sock_inbound_buffer.bytes,
- state->sock_inbound_buffer.len);
- if (result < 0) {
- return -errno;
- }
- if (result == 0) {
- /* EOF. */
- return 0;
+ result = recv(state->sockfd, state->sock_inbound_buffer.bytes,
+ state->sock_inbound_buffer.len, 0);
+ if (result <= 0) {
+ if (result == 0)
+ return -ERROR_CONNECTION_CLOSED;
+ else
+ return -amqp_socket_error();
}
state->sock_inbound_limit = result;
@@ -229,7 +219,7 @@ int amqp_simple_wait_frame(amqp_connection_state_t state,
state->last_queued_frame = NULL;
}
*decoded_frame = *f;
- return 1;
+ return 0;
} else {
return wait_frame_inner(state, decoded_frame);
}
@@ -241,8 +231,10 @@ int amqp_simple_wait_method(amqp_connection_state_t state,
amqp_method_t *output)
{
amqp_frame_t frame;
-
- AMQP_CHECK_EOF_RESULT(amqp_simple_wait_frame(state, &frame));
+ int res = amqp_simple_wait_frame(state, &frame);
+ if (res < 0)
+ return res;
+
amqp_assert(frame.channel == expected_channel,
"Expected 0x%08X method frame on channel %d, got frame on channel %d",
expected_method,
@@ -259,7 +251,7 @@ int amqp_simple_wait_method(amqp_connection_state_t state,
expected_channel,
frame.payload.method.id);
*output = frame.payload.method;
- return 1;
+ return 0;
}
int amqp_send_method(amqp_connection_state_t state,
@@ -299,7 +291,7 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state,
status = amqp_send_method(state, channel, request_id, decoded_request_method);
if (status < 0) {
result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
- result.library_errno = -status;
+ result.library_error = -status;
return result;
}
@@ -308,9 +300,9 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state,
retry:
status = wait_frame_inner(state, &frame);
- if (status <= 0) {
+ if (status < 0) {
result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
- result.library_errno = -status;
+ result.library_error = -status;
return result;
}
@@ -335,7 +327,7 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state,
if (frame_copy == NULL || link == NULL) {
result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
- result.library_errno = ENOMEM;
+ result.library_error = ERROR_NO_MEMORY;
return result;
}
@@ -370,6 +362,7 @@ static int amqp_login_inner(amqp_connection_state_t state,
amqp_sasl_method_enum sasl_method,
va_list vl)
{
+ int res;
amqp_method_t method;
uint32_t server_frame_max;
uint16_t server_channel_max;
@@ -377,12 +370,16 @@ static int amqp_login_inner(amqp_connection_state_t state,
amqp_send_header(state);
- AMQP_CHECK_EOF_RESULT(amqp_simple_wait_method(state, 0, AMQP_CONNECTION_START_METHOD, &method));
+ res = amqp_simple_wait_method(state, 0, AMQP_CONNECTION_START_METHOD,
+ &method);
+ if (res < 0)
+ return res;
+
{
amqp_connection_start_t *s = (amqp_connection_start_t *) method.decoded;
if ((s->version_major != AMQP_PROTOCOL_VERSION_MAJOR) ||
(s->version_minor != AMQP_PROTOCOL_VERSION_MINOR)) {
- return -EPROTOTYPE;
+ return -ERROR_INCOMPATIBLE_AMQP_VERSION;
}
/* TODO: check that our chosen SASL mechanism is in the list of
@@ -394,7 +391,7 @@ static int amqp_login_inner(amqp_connection_state_t state,
amqp_bytes_t response_bytes = sasl_response(&state->decoding_pool, sasl_method, vl);
amqp_connection_start_ok_t s;
if (response_bytes.bytes == NULL) {
- return -ENOMEM;
+ return -ERROR_NO_MEMORY;
}
s =
(amqp_connection_start_ok_t) {
@@ -408,7 +405,11 @@ static int amqp_login_inner(amqp_connection_state_t state,
amqp_release_buffers(state);
- AMQP_CHECK_EOF_RESULT(amqp_simple_wait_method(state, 0, AMQP_CONNECTION_TUNE_METHOD, &method));
+ res = amqp_simple_wait_method(state, 0, AMQP_CONNECTION_TUNE_METHOD,
+ &method);
+ if (res < 0)
+ return res;
+
{
amqp_connection_tune_t *s = (amqp_connection_tune_t *) method.decoded;
server_channel_max = s->channel_max;
@@ -442,7 +443,7 @@ static int amqp_login_inner(amqp_connection_state_t state,
amqp_release_buffers(state);
- return 1;
+ return 0;
}
amqp_rpc_reply_t amqp_login(amqp_connection_state_t state,
@@ -460,11 +461,11 @@ amqp_rpc_reply_t amqp_login(amqp_connection_state_t state,
va_start(vl, sasl_method);
status = amqp_login_inner(state, channel_max, frame_max, heartbeat, sasl_method, vl);
- if (status <= 0) {
+ if (status < 0) {
result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
result.reply.id = 0;
result.reply.decoded = NULL;
- result.library_errno = -status;
+ result.library_error = -status;
return result;
}
@@ -492,6 +493,6 @@ amqp_rpc_reply_t amqp_login(amqp_connection_state_t state,
result.reply_type = AMQP_RESPONSE_NORMAL;
result.reply.id = 0;
result.reply.decoded = NULL;
- result.library_errno = 0;
+ result.library_error = 0;
return result;
}
diff --git a/librabbitmq/amqp_table.c b/librabbitmq/amqp_table.c
index 25c5932..3f5eb61 100644
--- a/librabbitmq/amqp_table.c
+++ b/librabbitmq/amqp_table.c
@@ -52,10 +52,10 @@
#include <stdio.h>
#include <string.h>
#include <stdint.h>
-#include <errno.h>
#include "amqp.h"
#include "amqp_private.h"
+#include "socket.h"
#include <assert.h>
@@ -86,7 +86,7 @@ static int amqp_decode_array(amqp_bytes_t encoded,
int limit;
if (entries == NULL) {
- return -ENOMEM;
+ return -ERROR_NO_MEMORY;
}
offset += 4;
@@ -99,7 +99,7 @@ static int amqp_decode_array(amqp_bytes_t encoded,
newentries = realloc(entries, allocated_entries * sizeof(amqp_field_value_t));
if (newentries == NULL) {
free(entries);
- return -ENOMEM;
+ return -ERROR_NO_MEMORY;
}
entries = newentries;
}
@@ -117,7 +117,7 @@ static int amqp_decode_array(amqp_bytes_t encoded,
if (output->entries == NULL && num_entries > 0) {
/* NULL is legitimate if we requested a zero-length block. */
free(entries);
- return -ENOMEM;
+ return -ERROR_NO_MEMORY;
}
memcpy(output->entries, entries, num_entries * sizeof(amqp_field_value_t));
@@ -140,7 +140,7 @@ int amqp_decode_table(amqp_bytes_t encoded,
int limit;
if (entries == NULL) {
- return -ENOMEM;
+ return -ERROR_NO_MEMORY;
}
offset += 4;
@@ -159,7 +159,7 @@ int amqp_decode_table(amqp_bytes_t encoded,
newentries = realloc(entries, allocated_entries * sizeof(amqp_table_entry_t));
if (newentries == NULL) {
free(entries);
- return -ENOMEM;
+ return -ERROR_NO_MEMORY;
}
entries = newentries;
}
@@ -182,7 +182,7 @@ int amqp_decode_table(amqp_bytes_t encoded,
if (output->entries == NULL && num_entries > 0) {
/* NULL is legitimate if we requested a zero-length block. */
free(entries);
- return -ENOMEM;
+ return -ERROR_NO_MEMORY;
}
memcpy(output->entries, entries, num_entries * sizeof(amqp_table_entry_t));
@@ -274,7 +274,7 @@ static int amqp_decode_field_value(amqp_bytes_t encoded,
case AMQP_FIELD_KIND_VOID:
break;
default:
- return -EINVAL;
+ return -ERROR_BAD_AMQP_DATA;
}
*offsetptr = offset;
@@ -410,7 +410,7 @@ static int amqp_encode_field_value(amqp_bytes_t encoded,
case AMQP_FIELD_KIND_VOID:
break;
default:
- return -EINVAL;
+ abort();
}
*offsetptr = offset;
diff --git a/librabbitmq/codegen.py b/librabbitmq/codegen.py
index 792a6cf..f911966 100644
--- a/librabbitmq/codegen.py
+++ b/librabbitmq/codegen.py
@@ -170,7 +170,7 @@ def genErl(spec):
if m.arguments:
print " %s *m = (%s *) amqp_pool_alloc(pool, sizeof(%s));" % \
(m.structName(), m.structName(), m.structName())
- print " if (m == NULL) { return -ENOMEM; }"
+ print " if (m == NULL) { return -ERROR_NO_MEMORY; }"
else:
print " %s *m = NULL; /* no fields */" % (m.structName(),)
bitindex = None
@@ -197,7 +197,7 @@ def genErl(spec):
print " case %d: {" % (c.index,)
print " %s *p = (%s *) amqp_pool_alloc(pool, sizeof(%s));" % \
(c.structName(), c.structName(), c.structName())
- print " if (p == NULL) { return -ENOMEM; }"
+ print " if (p == NULL) { return -ERROR_NO_MEMORY; }"
print " p->_flags = flags;"
for f in c.fields:
if spec.resolveDomain(f.domain) == 'bit':
@@ -261,12 +261,11 @@ def genErl(spec):
print '#include <stdint.h>'
print '#include <string.h>'
print '#include <stdio.h>'
- print '#include <errno.h>'
- print '#include <arpa/inet.h> /* ntohl, htonl, ntohs, htons */'
print
print '#include "amqp.h"'
print '#include "amqp_framing.h"'
print '#include "amqp_private.h"'
+ print '#include "socket.h"'
print """
char const *amqp_constant_name(int constantNumber) {
@@ -317,7 +316,7 @@ int amqp_decode_method(amqp_method_number_t methodNumber,
switch (methodNumber) {"""
for m in methods: genDecodeMethodFields(m)
- print """ default: return -ENOENT;
+ print """ default: return -ERROR_UNKNOWN_METHOD;
}
}"""
@@ -343,7 +342,7 @@ int amqp_decode_properties(uint16_t class_id,
switch (class_id) {"""
for c in spec.allClasses(): genDecodeProperties(c)
- print """ default: return -ENOENT;
+ print """ default: return -ERROR_UNKNOWN_CLASS;
}
}"""
@@ -358,7 +357,7 @@ int amqp_encode_method(amqp_method_number_t methodNumber,
switch (methodNumber) {"""
for m in methods: genEncodeMethodFields(m)
- print """ default: return -ENOENT;
+ print """ default: return -ERROR_UNKNOWN_METHOD;
}
}"""
@@ -390,7 +389,7 @@ int amqp_encode_properties(uint16_t class_id,
switch (class_id) {"""
for c in spec.allClasses(): genEncodeProperties(c)
- print """ default: return -ENOENT;
+ print """ default: return -ERROR_UNKNOWN_CLASS;
}
}"""
diff --git a/librabbitmq/unix/socket.c b/librabbitmq/unix/socket.c
new file mode 100644
index 0000000..9d37dfc
--- /dev/null
+++ b/librabbitmq/unix/socket.c
@@ -0,0 +1,85 @@
+/*
+ * ***** BEGIN LICENSE BLOCK *****
+ * Version: MPL 1.1/GPL 2.0
+ *
+ * The contents of this file are subject to the Mozilla Public License
+ * Version 1.1 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ * http://www.mozilla.org/MPL/
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and
+ * limitations under the License.
+ *
+ * The Original Code is librabbitmq.
+ *
+ * The Initial Developers of the Original Code are LShift Ltd, Cohesive
+ * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions
+ * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive
+ * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright
+ * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and
+ * Rabbit Technologies Ltd.
+ *
+ * Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+ * Ltd. Portions created by Cohesive Financial Technologies LLC are
+ * Copyright (C) 2007-2010 Cohesive Financial Technologies
+ * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C)
+ * 2007-2010 Rabbit Technologies Ltd.
+ *
+ * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010
+ * LShift Ltd and Tony Garnock-Jones.
+ *
+ * All Rights Reserved.
+ *
+ * Contributor(s): ______________________________________.
+ *
+ * Alternatively, the contents of this file may be used under the terms
+ * of the GNU General Public License Version 2 or later (the "GPL"), in
+ * which case the provisions of the GPL are applicable instead of those
+ * above. If you wish to allow use of your version of this file only
+ * under the terms of the GPL, and not to allow others to use your
+ * version of this file under the terms of the MPL, indicate your
+ * decision by deleting the provisions above and replace them with the
+ * notice and other provisions required by the GPL. If you do not
+ * delete the provisions above, a recipient may use your version of
+ * this file under the terms of any one of the MPL or the GPL.
+ *
+ * ***** END LICENSE BLOCK *****
+ */
+
+#include <sys/socket.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <stdint.h>
+#include <string.h>
+
+#include "amqp.h"
+#include "amqp_private.h"
+#include "socket.h"
+
+int amqp_socket_socket(int domain, int type, int proto)
+{
+ int flags;
+
+ int s = socket(domain, type, proto);
+ if (s < 0)
+ return s;
+
+ /* Always enable CLOEXEC on the socket */
+ flags = fcntl(s, F_GETFD);
+ if (flags == -1
+ || fcntl(s, F_SETFD, (long)(flags | FD_CLOEXEC)) == -1) {
+ int e = errno;
+ close(s);
+ errno = e;
+ return -1;
+ }
+
+ return s;
+}
+
+char *amqp_os_error_string(int err)
+{
+ return strdup(strerror(err));
+}
diff --git a/librabbitmq/unix/socket.h b/librabbitmq/unix/socket.h
new file mode 100644
index 0000000..5cb37f1
--- /dev/null
+++ b/librabbitmq/unix/socket.h
@@ -0,0 +1,79 @@
+#ifndef librabbitmq_unix_socket_h
+#define librabbitmq_unix_socket_h
+
+/*
+ * ***** BEGIN LICENSE BLOCK *****
+ * Version: MPL 1.1/GPL 2.0
+ *
+ * The contents of this file are subject to the Mozilla Public License
+ * Version 1.1 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ * http://www.mozilla.org/MPL/
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and
+ * limitations under the License.
+ *
+ * The Original Code is librabbitmq.
+ *
+ * The Initial Developers of the Original Code are LShift Ltd, Cohesive
+ * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions
+ * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive
+ * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright
+ * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and
+ * Rabbit Technologies Ltd.
+ *
+ * Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+ * Ltd. Portions created by Cohesive Financial Technologies LLC are
+ * Copyright (C) 2007-2010 Cohesive Financial Technologies
+ * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C)
+ * 2007-2010 Rabbit Technologies Ltd.
+ *
+ * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010
+ * LShift Ltd and Tony Garnock-Jones.
+ *
+ * All Rights Reserved.
+ *
+ * Contributor(s): ______________________________________.
+ *
+ * Alternatively, the contents of this file may be used under the terms
+ * of the GNU General Public License Version 2 or later (the "GPL"), in
+ * which case the provisions of the GPL are applicable instead of those
+ * above. If you wish to allow use of your version of this file only
+ * under the terms of the GPL, and not to allow others to use your
+ * version of this file under the terms of the MPL, indicate your
+ * decision by deleting the provisions above and replace them with the
+ * notice and other provisions required by the GPL. If you do not
+ * delete the provisions above, a recipient may use your version of
+ * this file under the terms of any one of the MPL or the GPL.
+ *
+ * ***** END LICENSE BLOCK *****
+ */
+
+#include <errno.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <sys/uio.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+
+static inline int amqp_socket_init(void)
+{
+ return 0;
+}
+
+extern int amqp_socket_socket(int domain, int type, int proto);
+
+#define amqp_socket_setsockopt setsockopt
+#define amqp_socket_close close
+#define amqp_socket_writev writev
+
+static inline int amqp_socket_error()
+{
+ return errno | ERROR_CATEGORY_OS;
+}
+
+#endif
diff --git a/librabbitmq/windows/socket.c b/librabbitmq/windows/socket.c
new file mode 100644
index 0000000..9c026bd
--- /dev/null
+++ b/librabbitmq/windows/socket.c
@@ -0,0 +1,88 @@
+/*
+ * ***** BEGIN LICENSE BLOCK *****
+ * Version: MPL 1.1/GPL 2.0
+ *
+ * The contents of this file are subject to the Mozilla Public License
+ * Version 1.1 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ * http://www.mozilla.org/MPL/
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and
+ * limitations under the License.
+ *
+ * The Original Code is librabbitmq.
+ *
+ * The Initial Developers of the Original Code are LShift Ltd, Cohesive
+ * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions
+ * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive
+ * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright
+ * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and
+ * Rabbit Technologies Ltd.
+ *
+ * Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+ * Ltd. Portions created by Cohesive Financial Technologies LLC are
+ * Copyright (C) 2007-2010 Cohesive Financial Technologies
+ * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C)
+ * 2007-2010 Rabbit Technologies Ltd.
+ *
+ * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010
+ * LShift Ltd and Tony Garnock-Jones.
+ *
+ * All Rights Reserved.
+ *
+ * Contributor(s): ______________________________________.
+ *
+ * Alternatively, the contents of this file may be used under the terms
+ * of the GNU General Public License Version 2 or later (the "GPL"), in
+ * which case the provisions of the GPL are applicable instead of those
+ * above. If you wish to allow use of your version of this file only
+ * under the terms of the GPL, and not to allow others to use your
+ * version of this file under the terms of the MPL, indicate your
+ * decision by deleting the provisions above and replace them with the
+ * notice and other provisions required by the GPL. If you do not
+ * delete the provisions above, a recipient may use your version of
+ * this file under the terms of any one of the MPL or the GPL.
+ *
+ * ***** END LICENSE BLOCK *****
+ */
+
+#include <windows.h>
+#include <stdint.h>
+
+#include "amqp.h"
+#include "amqp_private.h"
+#include "socket.h"
+
+static int called_wsastartup;
+
+int amqp_socket_init(void)
+{
+ if (!called_wsastartup) {
+ WSADATA data;
+ int res = WSAStartup(0x0202, &data);
+ if (res)
+ return -res;
+
+ called_wsastartup = 1;
+ }
+
+ return 0;
+}
+
+char *amqp_os_error_string(int err)
+{
+ char *msg, *copy;
+
+ if (!FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM
+ | FORMAT_MESSAGE_ALLOCATE_BUFFER,
+ NULL, err,
+ MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
+ (LPSTR)&msg, 0, NULL))
+ return strdup("(error retrieving Windows error message)");
+
+ copy = strdup(msg);
+ LocalFree(msg);
+ return copy;
+}
diff --git a/librabbitmq/windows/socket.h b/librabbitmq/windows/socket.h
new file mode 100644
index 0000000..3e0a378
--- /dev/null
+++ b/librabbitmq/windows/socket.h
@@ -0,0 +1,89 @@
+#ifndef librabbitmq_windows_socket_h
+#define librabbitmq_windows_socket_h
+
+/*
+ * ***** BEGIN LICENSE BLOCK *****
+ * Version: MPL 1.1/GPL 2.0
+ *
+ * The contents of this file are subject to the Mozilla Public License
+ * Version 1.1 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ * http://www.mozilla.org/MPL/
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and
+ * limitations under the License.
+ *
+ * The Original Code is librabbitmq.
+ *
+ * The Initial Developers of the Original Code are LShift Ltd, Cohesive
+ * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions
+ * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive
+ * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright
+ * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and
+ * Rabbit Technologies Ltd.
+ *
+ * Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+ * Ltd. Portions created by Cohesive Financial Technologies LLC are
+ * Copyright (C) 2007-2010 Cohesive Financial Technologies
+ * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C)
+ * 2007-2010 Rabbit Technologies Ltd.
+ *
+ * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010
+ * LShift Ltd and Tony Garnock-Jones.
+ *
+ * All Rights Reserved.
+ *
+ * Contributor(s): ______________________________________.
+ *
+ * Alternatively, the contents of this file may be used under the terms
+ * of the GNU General Public License Version 2 or later (the "GPL"), in
+ * which case the provisions of the GPL are applicable instead of those
+ * above. If you wish to allow use of your version of this file only
+ * under the terms of the GPL, and not to allow others to use your
+ * version of this file under the terms of the MPL, indicate your
+ * decision by deleting the provisions above and replace them with the
+ * notice and other provisions required by the GPL. If you do not
+ * delete the provisions above, a recipient may use your version of
+ * this file under the terms of any one of the MPL or the GPL.
+ *
+ * ***** END LICENSE BLOCK *****
+ */
+
+#include <winsock2.h>
+
+extern int amqp_socket_init(void);
+
+#define amqp_socket_socket socket
+#define amqp_socket_close closesocket
+
+static inline int amqp_socket_setsockopt(int sock, int level, int optname,
+ const void *optval, size_t optlen)
+{
+ /* the winsock setsockopt function has its 4th argument as a
+ const char * */
+ return setsockopt(sock, level, optname, (const char *)optval, optlen);
+}
+
+/* same as WSABUF */
+struct iovec {
+ u_long iov_len;
+ char *iov_base;
+};
+
+static inline int amqp_socket_writev(int sock, struct iovec *iov, int nvecs)
+{
+ DWORD ret;
+ if (WSASend(sock, (LPWSABUF)iov, nvecs, &ret, 0, NULL, NULL) == 0)
+ return ret;
+ else
+ return -1;
+}
+
+static inline int amqp_socket_error()
+{
+ return WSAGetLastError() | ERROR_CATEGORY_OS;
+}
+
+#endif