summaryrefslogtreecommitdiff
path: root/librabbitmq
diff options
context:
space:
mode:
authorAlan Antonuk <alan.antonuk@gmail.com>2013-04-08 14:52:53 -0700
committerAlan Antonuk <alan.antonuk@gmail.com>2013-04-08 14:52:53 -0700
commitad2b116059e22d393b7e44ad54f345a3fb4e267b (patch)
treecfd98876757f85d9fe7bfe56d179bd5fb63a8ff0 /librabbitmq
parentf3074030723b840690458983b63c746549aa3bd5 (diff)
downloadrabbitmq-c-github-ask-ad2b116059e22d393b7e44ad54f345a3fb4e267b.tar.gz
Formatted source code with astyle utilty
Diffstat (limited to 'librabbitmq')
-rw-r--r--librabbitmq/amqp.h98
-rw-r--r--librabbitmq/amqp_api.c86
-rw-r--r--librabbitmq/amqp_connection.c147
-rw-r--r--librabbitmq/amqp_mem.c48
-rw-r--r--librabbitmq/amqp_private.h160
-rw-r--r--librabbitmq/amqp_socket.c294
-rw-r--r--librabbitmq/amqp_table.c143
-rw-r--r--librabbitmq/amqp_url.c298
-rw-r--r--librabbitmq/unix/socket.c35
-rw-r--r--librabbitmq/win32/socket.c59
-rw-r--r--librabbitmq/win32/socket.h6
11 files changed, 740 insertions, 634 deletions
diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h
index 5de39d5..c379125 100644
--- a/librabbitmq/amqp.h
+++ b/librabbitmq/amqp.h
@@ -152,28 +152,28 @@ typedef struct amqp_array_t_ {
t t Boolean
b b Signed 8-bit
B Unsigned 8-bit
- U s Signed 16-bit (A1)
+ U s Signed 16-bit (A1)
u Unsigned 16-bit
- I I I Signed 32-bit
- i Unsigned 32-bit
- L l Signed 64-bit (B)
- l Unsigned 64-bit
- f f 32-bit float
- d d 64-bit float
- D D D Decimal
- s Short string (A2)
- S S S Long string
- A Nested Array
- T T T Timestamp (u64)
- F F F Nested Table
- V V V Void
- x Byte array
+ I I I Signed 32-bit
+ i Unsigned 32-bit
+ L l Signed 64-bit (B)
+ l Unsigned 64-bit
+ f f 32-bit float
+ d d 64-bit float
+ D D D Decimal
+ s Short string (A2)
+ S S S Long string
+ A Nested Array
+ T T T Timestamp (u64)
+ F F F Nested Table
+ V V V Void
+ x Byte array
Remarks:
A1, A2: Notice how the types **CONFLICT** here. In Qpid and Rabbit,
's' means a signed 16-bit integer; in 0-9-1, it means a
- short string.
+ short string.
B: Notice how the signednesses **CONFLICT** here. In Qpid and Rabbit,
'l' means a signed 64-bit integer; in 0-9-1, it means an unsigned
@@ -360,9 +360,9 @@ AMQP_CALL amqp_set_sockfd(amqp_connection_state_t state, int sockfd);
AMQP_PUBLIC_FUNCTION
int
AMQP_CALL amqp_tune_connection(amqp_connection_state_t state,
- int channel_max,
- int frame_max,
- int heartbeat);
+ int channel_max,
+ int frame_max,
+ int heartbeat);
AMQP_PUBLIC_FUNCTION
int
@@ -375,8 +375,8 @@ AMQP_CALL amqp_destroy_connection(amqp_connection_state_t state);
AMQP_PUBLIC_FUNCTION
int
AMQP_CALL amqp_handle_input(amqp_connection_state_t state,
- amqp_bytes_t received_data,
- amqp_frame_t *decoded_frame);
+ amqp_bytes_t received_data,
+ amqp_frame_t *decoded_frame);
AMQP_PUBLIC_FUNCTION
amqp_boolean_t
@@ -413,37 +413,37 @@ AMQP_CALL amqp_frames_enqueued(amqp_connection_state_t state);
AMQP_PUBLIC_FUNCTION
int
AMQP_CALL amqp_simple_wait_frame(amqp_connection_state_t state,
- amqp_frame_t *decoded_frame);
+ amqp_frame_t *decoded_frame);
AMQP_PUBLIC_FUNCTION
int
AMQP_CALL amqp_simple_wait_method(amqp_connection_state_t state,
- amqp_channel_t expected_channel,
- amqp_method_number_t expected_method,
- amqp_method_t *output);
+ amqp_channel_t expected_channel,
+ amqp_method_number_t expected_method,
+ amqp_method_t *output);
AMQP_PUBLIC_FUNCTION
int
AMQP_CALL amqp_send_method(amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_method_number_t id,
- void *decoded);
+ amqp_channel_t channel,
+ amqp_method_number_t id,
+ void *decoded);
AMQP_PUBLIC_FUNCTION
amqp_rpc_reply_t
AMQP_CALL amqp_simple_rpc(amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_method_number_t request_id,
- amqp_method_number_t *expected_reply_ids,
- void *decoded_request_method);
+ amqp_channel_t channel,
+ amqp_method_number_t request_id,
+ amqp_method_number_t *expected_reply_ids,
+ void *decoded_request_method);
AMQP_PUBLIC_FUNCTION
void *
AMQP_CALL amqp_simple_rpc_decoded(amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_method_number_t request_id,
- amqp_method_number_t reply_id,
- void *decoded_request_method);
+ amqp_channel_t channel,
+ amqp_method_number_t request_id,
+ amqp_method_number_t reply_id,
+ void *decoded_request_method);
/*
* The API methods corresponding to most synchronous AMQP methods
@@ -465,23 +465,23 @@ AMQP_CALL amqp_get_rpc_reply(amqp_connection_state_t state);
AMQP_PUBLIC_FUNCTION
amqp_rpc_reply_t
AMQP_CALL amqp_login(amqp_connection_state_t state, char const *vhost,
- int channel_max, int frame_max, int heartbeat,
- amqp_sasl_method_enum sasl_method, ...);
+ int channel_max, int frame_max, int heartbeat,
+ amqp_sasl_method_enum sasl_method, ...);
struct amqp_basic_properties_t_;
AMQP_PUBLIC_FUNCTION
int
AMQP_CALL amqp_basic_publish(amqp_connection_state_t state, amqp_channel_t channel,
- amqp_bytes_t exchange, amqp_bytes_t routing_key,
- amqp_boolean_t mandatory, amqp_boolean_t immediate,
- struct amqp_basic_properties_t_ const *properties,
- amqp_bytes_t body);
+ amqp_bytes_t exchange, amqp_bytes_t routing_key,
+ amqp_boolean_t mandatory, amqp_boolean_t immediate,
+ struct amqp_basic_properties_t_ const *properties,
+ amqp_bytes_t body);
AMQP_PUBLIC_FUNCTION
amqp_rpc_reply_t
AMQP_CALL amqp_channel_close(amqp_connection_state_t state, amqp_channel_t channel,
- int code);
+ int code);
AMQP_PUBLIC_FUNCTION
amqp_rpc_reply_t
@@ -490,17 +490,17 @@ AMQP_CALL amqp_connection_close(amqp_connection_state_t state, int code);
AMQP_PUBLIC_FUNCTION
int
AMQP_CALL amqp_basic_ack(amqp_connection_state_t state, amqp_channel_t channel,
- uint64_t delivery_tag, amqp_boolean_t multiple);
+ uint64_t delivery_tag, amqp_boolean_t multiple);
AMQP_PUBLIC_FUNCTION
amqp_rpc_reply_t
AMQP_CALL amqp_basic_get(amqp_connection_state_t state, amqp_channel_t channel,
- amqp_bytes_t queue, amqp_boolean_t no_ack);
+ amqp_bytes_t queue, amqp_boolean_t no_ack);
AMQP_PUBLIC_FUNCTION
int
AMQP_CALL amqp_basic_reject(amqp_connection_state_t state, amqp_channel_t channel,
- uint64_t delivery_tag, amqp_boolean_t requeue);
+ uint64_t delivery_tag, amqp_boolean_t requeue);
/*
* Can be used to see if there is data still in the buffer, if so
@@ -526,7 +526,7 @@ AMQP_CALL amqp_error_string(int err);
AMQP_PUBLIC_FUNCTION
int
AMQP_CALL amqp_decode_table(amqp_bytes_t encoded, amqp_pool_t *pool,
- amqp_table_t *output, size_t *offset);
+ amqp_table_t *output, size_t *offset);
AMQP_PUBLIC_FUNCTION
int
@@ -541,11 +541,11 @@ struct amqp_connection_info {
};
AMQP_PUBLIC_FUNCTION
-void
+void
AMQP_CALL amqp_default_connection_info(struct amqp_connection_info *parsed);
AMQP_PUBLIC_FUNCTION
-int
+int
AMQP_CALL amqp_parse_url(char *url, struct amqp_connection_info *parsed);
AMQP_END_DECLS
diff --git a/librabbitmq/amqp_api.c b/librabbitmq/amqp_api.c
index 6faabbb..aacaf91 100644
--- a/librabbitmq/amqp_api.c
+++ b/librabbitmq/amqp_api.c
@@ -62,10 +62,11 @@ char *amqp_error_string(int err)
switch (category) {
case ERROR_CATEGORY_CLIENT:
- if (err < 1 || err > ERROR_MAX)
+ if (err < 1 || err > ERROR_MAX) {
str = "(undefined librabbitmq error)";
- else
+ } else {
str = client_error_strings[err - 1];
+ }
break;
case ERROR_CATEGORY_OS:
@@ -80,31 +81,31 @@ char *amqp_error_string(int err)
void amqp_abort(const char *fmt, ...)
{
- va_list ap;
- va_start(ap, fmt);
- vfprintf(stderr, fmt, ap);
- va_end(ap);
- fputc('\n', stderr);
- abort();
+ va_list ap;
+ va_start(ap, fmt);
+ vfprintf(stderr, fmt, ap);
+ va_end(ap);
+ fputc('\n', stderr);
+ abort();
}
const amqp_bytes_t amqp_empty_bytes = { 0, NULL };
const amqp_table_t amqp_empty_table = { 0, NULL };
const amqp_array_t amqp_empty_array = { 0, NULL };
-#define RPC_REPLY(replytype) \
- (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL \
- ? (replytype *) state->most_recent_api_result.reply.decoded \
+#define RPC_REPLY(replytype)\
+ (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL\
+ ? (replytype *) state->most_recent_api_result.reply.decoded\
: NULL)
int amqp_basic_publish(amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_bytes_t exchange,
- amqp_bytes_t routing_key,
- amqp_boolean_t mandatory,
- amqp_boolean_t immediate,
- amqp_basic_properties_t const *properties,
- amqp_bytes_t body)
+ amqp_channel_t channel,
+ amqp_bytes_t exchange,
+ amqp_bytes_t routing_key,
+ amqp_boolean_t mandatory,
+ amqp_boolean_t immediate,
+ amqp_basic_properties_t const *properties,
+ amqp_bytes_t body)
{
amqp_frame_t f;
size_t body_offset;
@@ -121,8 +122,9 @@ int amqp_basic_publish(amqp_connection_state_t state,
m.ticket = 0;
res = amqp_send_method(state, channel, AMQP_BASIC_PUBLISH_METHOD, &m);
- if (res < 0)
+ if (res < 0) {
return res;
+ }
if (properties == NULL) {
memset(&default_properties, 0, sizeof(default_properties));
@@ -136,15 +138,17 @@ int amqp_basic_publish(amqp_connection_state_t state,
f.payload.properties.decoded = (void *) properties;
res = amqp_send_frame(state, &f);
- if (res < 0)
+ if (res < 0) {
return res;
+ }
body_offset = 0;
while (body_offset < body.len) {
size_t remaining = body.len - body_offset;
- if (remaining == 0)
+ if (remaining == 0) {
break;
+ }
f.frame_type = AMQP_FRAME_BODY;
f.channel = channel;
@@ -157,16 +161,17 @@ int amqp_basic_publish(amqp_connection_state_t state,
body_offset += f.payload.body_fragment.len;
res = amqp_send_frame(state, &f);
- if (res < 0)
+ if (res < 0) {
return res;
+ }
}
return 0;
}
amqp_rpc_reply_t amqp_channel_close(amqp_connection_state_t state,
- amqp_channel_t channel,
- int code)
+ amqp_channel_t channel,
+ int code)
{
char codestr[13];
amqp_method_number_t replies[2] = { AMQP_CHANNEL_CLOSE_OK_METHOD, 0};
@@ -179,11 +184,11 @@ amqp_rpc_reply_t amqp_channel_close(amqp_connection_state_t state,
req.method_id = 0;
return amqp_simple_rpc(state, channel, AMQP_CHANNEL_CLOSE_METHOD,
- replies, &req);
+ replies, &req);
}
amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state,
- int code)
+ int code)
{
char codestr[13];
amqp_method_number_t replies[2] = { AMQP_CONNECTION_CLOSE_OK_METHOD, 0};
@@ -196,13 +201,13 @@ amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state,
req.method_id = 0;
return amqp_simple_rpc(state, 0, AMQP_CONNECTION_CLOSE_METHOD,
- replies, &req);
+ replies, &req);
}
int amqp_basic_ack(amqp_connection_state_t state,
- amqp_channel_t channel,
- uint64_t delivery_tag,
- amqp_boolean_t multiple)
+ amqp_channel_t channel,
+ uint64_t delivery_tag,
+ amqp_boolean_t multiple)
{
amqp_basic_ack_t m;
m.delivery_tag = delivery_tag;
@@ -211,28 +216,29 @@ int amqp_basic_ack(amqp_connection_state_t state,
}
amqp_rpc_reply_t amqp_basic_get(amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_bytes_t queue,
- amqp_boolean_t no_ack)
+ amqp_channel_t channel,
+ amqp_bytes_t queue,
+ amqp_boolean_t no_ack)
{
amqp_method_number_t replies[] = { AMQP_BASIC_GET_OK_METHOD,
- AMQP_BASIC_GET_EMPTY_METHOD,
- 0 };
+ AMQP_BASIC_GET_EMPTY_METHOD,
+ 0
+ };
amqp_basic_get_t req;
req.ticket = 0;
req.queue = queue;
req.no_ack = no_ack;
state->most_recent_api_result = amqp_simple_rpc(state, channel,
- AMQP_BASIC_GET_METHOD,
- replies, &req);
+ AMQP_BASIC_GET_METHOD,
+ replies, &req);
return state->most_recent_api_result;
}
int amqp_basic_reject(amqp_connection_state_t state,
- amqp_channel_t channel,
- uint64_t delivery_tag,
- amqp_boolean_t requeue)
+ amqp_channel_t channel,
+ uint64_t delivery_tag,
+ amqp_boolean_t requeue)
{
amqp_basic_reject_t req;
req.delivery_tag = delivery_tag;
diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c
index 421c745..be7b1a8 100644
--- a/librabbitmq/amqp_connection.c
+++ b/librabbitmq/amqp_connection.c
@@ -46,36 +46,40 @@
#define INITIAL_DECODING_POOL_PAGE_SIZE 131072
#define INITIAL_INBOUND_SOCK_BUFFER_SIZE 131072
-#define ENFORCE_STATE(statevec, statenum) \
- { \
- amqp_connection_state_t _check_state = (statevec); \
- size_t _wanted_state = (statenum); \
- if (_check_state->state != _wanted_state) \
+#define ENFORCE_STATE(statevec, statenum) \
+ { \
+ amqp_connection_state_t _check_state = (statevec); \
+ size_t _wanted_state = (statenum); \
+ if (_check_state->state != _wanted_state) \
amqp_abort("Programming error: invalid AMQP connection state: expected %d, got %d", \
- _wanted_state, \
- _check_state->state); \
+ _wanted_state, \
+ _check_state->state); \
}
-amqp_connection_state_t amqp_new_connection(void) {
+amqp_connection_state_t amqp_new_connection(void)
+{
int res;
amqp_connection_state_t state =
(amqp_connection_state_t) calloc(1, sizeof(struct amqp_connection_state_t_));
- if (state == NULL)
+ if (state == NULL) {
return NULL;
+ }
init_amqp_pool(&state->frame_pool, INITIAL_FRAME_POOL_PAGE_SIZE);
init_amqp_pool(&state->decoding_pool, INITIAL_DECODING_POOL_PAGE_SIZE);
res = amqp_tune_connection(state, 0, INITIAL_FRAME_POOL_PAGE_SIZE, 0);
- if (-ERROR_NO_MEMORY == res)
+ if (-ERROR_NO_MEMORY == res) {
return NULL;
- else if (0 != res)
+ } else if (0 != res) {
goto out_nomem;
+ }
state->inbound_buffer.bytes = amqp_pool_alloc(&state->frame_pool, state->inbound_buffer.len);
- if (state->inbound_buffer.bytes == NULL)
+ if (state->inbound_buffer.bytes == NULL) {
goto out_nomem;
+ }
state->state = CONNECTION_STATE_INITIAL;
/* the server protocol version response is 8 bytes, which conveniently
@@ -85,12 +89,13 @@ amqp_connection_state_t amqp_new_connection(void) {
state->sockfd = -1;
state->sock_inbound_buffer.len = INITIAL_INBOUND_SOCK_BUFFER_SIZE;
state->sock_inbound_buffer.bytes = malloc(INITIAL_INBOUND_SOCK_BUFFER_SIZE);
- if (state->sock_inbound_buffer.bytes == NULL)
+ if (state->sock_inbound_buffer.bytes == NULL) {
goto out_nomem;
+ }
return state;
- out_nomem:
+out_nomem:
free(state->sock_inbound_buffer.bytes);
empty_amqp_pool(&state->frame_pool);
empty_amqp_pool(&state->decoding_pool);
@@ -98,20 +103,21 @@ amqp_connection_state_t amqp_new_connection(void) {
return NULL;
}
-int amqp_get_sockfd(amqp_connection_state_t state) {
+int amqp_get_sockfd(amqp_connection_state_t state)
+{
return state->sockfd;
}
void amqp_set_sockfd(amqp_connection_state_t state,
- int sockfd)
+ int sockfd)
{
state->sockfd = sockfd;
}
int amqp_tune_connection(amqp_connection_state_t state,
- int channel_max,
- int frame_max,
- int heartbeat)
+ int channel_max,
+ int frame_max,
+ int heartbeat)
{
void *newbuf;
@@ -136,11 +142,13 @@ int amqp_tune_connection(amqp_connection_state_t state,
return 0;
}
-int amqp_get_channel_max(amqp_connection_state_t state) {
+int amqp_get_channel_max(amqp_connection_state_t state)
+{
return state->channel_max;
}
-int amqp_destroy_connection(amqp_connection_state_t state) {
+int amqp_destroy_connection(amqp_connection_state_t state)
+{
int s = state->sockfd;
empty_amqp_pool(&state->frame_pool);
@@ -149,13 +157,15 @@ int amqp_destroy_connection(amqp_connection_state_t state) {
free(state->sock_inbound_buffer.bytes);
free(state);
- if (s >= 0 && amqp_socket_close(s) < 0)
+ if (s >= 0 && amqp_socket_close(s) < 0) {
return -amqp_socket_error();
- else
+ } else {
return 0;
+ }
}
-static void return_to_idle(amqp_connection_state_t state) {
+static void return_to_idle(amqp_connection_state_t state)
+{
state->inbound_buffer.bytes = NULL;
state->inbound_offset = 0;
state->target_size = HEADER_SIZE;
@@ -163,15 +173,16 @@ static void return_to_idle(amqp_connection_state_t state) {
}
static size_t consume_data(amqp_connection_state_t state,
- amqp_bytes_t *received_data)
+ amqp_bytes_t *received_data)
{
/* how much data is available and will fit? */
size_t bytes_consumed = state->target_size - state->inbound_offset;
- if (received_data->len < bytes_consumed)
+ if (received_data->len < bytes_consumed) {
bytes_consumed = received_data->len;
+ }
memcpy(amqp_offset(state->inbound_buffer.bytes, state->inbound_offset),
- received_data->bytes, bytes_consumed);
+ received_data->bytes, bytes_consumed);
state->inbound_offset += bytes_consumed;
received_data->bytes = amqp_offset(received_data->bytes, bytes_consumed);
received_data->len -= bytes_consumed;
@@ -180,8 +191,8 @@ static size_t consume_data(amqp_connection_state_t state,
}
int amqp_handle_input(amqp_connection_state_t state,
- amqp_bytes_t received_data,
- amqp_frame_t *decoded_frame)
+ amqp_bytes_t received_data,
+ amqp_frame_t *decoded_frame)
{
size_t bytes_consumed;
void *raw_frame;
@@ -190,17 +201,20 @@ int amqp_handle_input(amqp_connection_state_t state,
or a complete, ignored frame was read. */
decoded_frame->frame_type = 0;
- if (received_data.len == 0)
+ if (received_data.len == 0) {
return 0;
+ }
if (state->state == CONNECTION_STATE_IDLE) {
state->inbound_buffer.bytes = amqp_pool_alloc(&state->frame_pool,
- state->inbound_buffer.len);
+ state->inbound_buffer.len);
if (state->inbound_buffer.bytes == NULL)
/* state->inbound_buffer.len is always nonzero, because it
- corresponds to frame_max, which is not permitted to be less
- than AMQP_FRAME_MIN_SIZE (currently 4096 bytes). */
+ corresponds to frame_max, which is not permitted to be less
+ than AMQP_FRAME_MIN_SIZE (currently 4096 bytes). */
+ {
return -ERROR_NO_MEMORY;
+ }
state->state = CONNECTION_STATE_HEADER;
}
@@ -209,8 +223,9 @@ int amqp_handle_input(amqp_connection_state_t state,
/* do we have target_size data yet? if not, return with the
expectation that more will arrive */
- if (state->inbound_offset < state->target_size)
+ if (state->inbound_offset < state->target_size) {
return bytes_consumed;
+ }
raw_frame = state->inbound_buffer.bytes;
@@ -222,13 +237,13 @@ int amqp_handle_input(amqp_connection_state_t state,
decoded_frame->channel = 0;
decoded_frame->payload.protocol_header.transport_high
- = amqp_d8(raw_frame, 4);
+ = amqp_d8(raw_frame, 4);
decoded_frame->payload.protocol_header.transport_low
- = amqp_d8(raw_frame, 5);
+ = amqp_d8(raw_frame, 5);
decoded_frame->payload.protocol_header.protocol_version_major
- = amqp_d8(raw_frame, 6);
+ = amqp_d8(raw_frame, 6);
decoded_frame->payload.protocol_header.protocol_version_minor
- = amqp_d8(raw_frame, 7);
+ = amqp_d8(raw_frame, 7);
return_to_idle(state);
return bytes_consumed;
@@ -240,15 +255,16 @@ int amqp_handle_input(amqp_connection_state_t state,
case CONNECTION_STATE_HEADER:
/* frame length is 3 bytes in */
state->target_size
- = amqp_d32(raw_frame, 3) + HEADER_SIZE + FOOTER_SIZE;
+ = amqp_d32(raw_frame, 3) + HEADER_SIZE + FOOTER_SIZE;
state->state = CONNECTION_STATE_BODY;
bytes_consumed += consume_data(state, &received_data);
/* do we have target_size data yet? if not, return with the
expectation that more will arrive */
- if (state->inbound_offset < state->target_size)
+ if (state->inbound_offset < state->target_size) {
return bytes_consumed;
+ }
/* fall through to process body */
@@ -257,8 +273,9 @@ int amqp_handle_input(amqp_connection_state_t state,
int res;
/* Check frame end marker (footer) */
- if (amqp_d8(raw_frame, state->target_size - 1) != AMQP_FRAME_END)
+ if (amqp_d8(raw_frame, state->target_size - 1) != AMQP_FRAME_END) {
return -ERROR_BAD_AMQP_DATA;
+ }
decoded_frame->frame_type = amqp_d8(raw_frame, 0);
decoded_frame->channel = amqp_d16(raw_frame, 1);
@@ -270,19 +287,20 @@ int amqp_handle_input(amqp_connection_state_t state,
encoded.len = state->target_size - HEADER_SIZE - 4 - FOOTER_SIZE;
res = amqp_decode_method(decoded_frame->payload.method.id,
- &state->decoding_pool, encoded,
- &decoded_frame->payload.method.decoded);
- if (res < 0)
- return res;
+ &state->decoding_pool, encoded,
+ &decoded_frame->payload.method.decoded);
+ if (res < 0) {
+ return res;
+ }
break;
case AMQP_FRAME_HEADER:
decoded_frame->payload.properties.class_id
- = amqp_d16(raw_frame, HEADER_SIZE);
+ = amqp_d16(raw_frame, HEADER_SIZE);
/* unused 2-byte weight field goes here */
decoded_frame->payload.properties.body_size
- = amqp_d64(raw_frame, HEADER_SIZE + 4);
+ = amqp_d64(raw_frame, HEADER_SIZE + 4);
encoded.bytes = amqp_offset(raw_frame, HEADER_SIZE + 12);
encoded.len = state->target_size - HEADER_SIZE - 12 - FOOTER_SIZE;
decoded_frame->payload.properties.raw = encoded;
@@ -290,16 +308,17 @@ int amqp_handle_input(amqp_connection_state_t state,
res = amqp_decode_properties(decoded_frame->payload.properties.class_id,
&state->decoding_pool, encoded,
&decoded_frame->payload.properties.decoded);
- if (res < 0)
+ if (res < 0) {
return res;
+ }
break;
case AMQP_FRAME_BODY:
decoded_frame->payload.body_fragment.len
- = state->target_size - HEADER_SIZE - FOOTER_SIZE;
+ = state->target_size - HEADER_SIZE - FOOTER_SIZE;
decoded_frame->payload.body_fragment.bytes
- = amqp_offset(raw_frame, HEADER_SIZE);
+ = amqp_offset(raw_frame, HEADER_SIZE);
break;
case AMQP_FRAME_HEARTBEAT:
@@ -321,28 +340,32 @@ int amqp_handle_input(amqp_connection_state_t state,
}
}
-amqp_boolean_t amqp_release_buffers_ok(amqp_connection_state_t state) {
+amqp_boolean_t amqp_release_buffers_ok(amqp_connection_state_t state)
+{
return (state->state == CONNECTION_STATE_IDLE) && (state->first_queued_frame == NULL);
}
-void amqp_release_buffers(amqp_connection_state_t state) {
+void amqp_release_buffers(amqp_connection_state_t state)
+{
ENFORCE_STATE(state, CONNECTION_STATE_IDLE);
- if (state->first_queued_frame)
+ if (state->first_queued_frame) {
amqp_abort("Programming error: attempt to amqp_release_buffers while waiting events enqueued");
+ }
recycle_amqp_pool(&state->frame_pool);
recycle_amqp_pool(&state->decoding_pool);
}
-void amqp_maybe_release_buffers(amqp_connection_state_t state) {
+void amqp_maybe_release_buffers(amqp_connection_state_t state)
+{
if (amqp_release_buffers_ok(state)) {
amqp_release_buffers(state);
}
}
int amqp_send_frame(amqp_connection_state_t state,
- const amqp_frame_t *frame)
+ const amqp_frame_t *frame)
{
void *out_frame = state->outbound_buffer.bytes;
int res;
@@ -367,8 +390,7 @@ int amqp_send_frame(amqp_connection_state_t state,
iov[2].iov_len = FOOTER_SIZE;
res = amqp_socket_writev(state->sockfd, iov, 3);
- }
- else {
+ } else {
size_t out_frame_len;
amqp_bytes_t encoded;
@@ -381,8 +403,9 @@ int amqp_send_frame(amqp_connection_state_t state,
res = amqp_encode_method(frame->payload.method.id,
frame->payload.method.decoded, encoded);
- if (res < 0)
+ if (res < 0) {
return res;
+ }
out_frame_len = res + 4;
break;
@@ -397,8 +420,9 @@ int amqp_send_frame(amqp_connection_state_t state,
res = amqp_encode_properties(frame->payload.properties.class_id,
frame->payload.properties.decoded, encoded);
- if (res < 0)
+ if (res < 0) {
return res;
+ }
out_frame_len = res + 12;
break;
@@ -417,8 +441,9 @@ int amqp_send_frame(amqp_connection_state_t state,
out_frame_len + HEADER_SIZE + FOOTER_SIZE, MSG_NOSIGNAL);
}
- if (res < 0)
+ if (res < 0) {
return -amqp_socket_error();
- else
+ } else {
return 0;
+ }
}
diff --git a/librabbitmq/amqp_mem.c b/librabbitmq/amqp_mem.c
index 6915763..0f49df2 100644
--- a/librabbitmq/amqp_mem.c
+++ b/librabbitmq/amqp_mem.c
@@ -43,11 +43,13 @@
#include <string.h>
#include <sys/types.h>
-char const *amqp_version(void) {
+char const *amqp_version(void)
+{
return VERSION; /* defined in config.h */
}
-void init_amqp_pool(amqp_pool_t *pool, size_t pagesize) {
+void init_amqp_pool(amqp_pool_t *pool, size_t pagesize)
+{
pool->pagesize = pagesize ? pagesize : 4096;
pool->pages.num_blocks = 0;
@@ -61,7 +63,8 @@ void init_amqp_pool(amqp_pool_t *pool, size_t pagesize) {
pool->alloc_used = 0;
}
-static void empty_blocklist(amqp_pool_blocklist_t *x) {
+static void empty_blocklist(amqp_pool_blocklist_t *x)
+{
int i;
for (i = 0; i < x->num_blocks; i++) {
@@ -74,30 +77,35 @@ static void empty_blocklist(amqp_pool_blocklist_t *x) {
x->blocklist = NULL;
}
-void recycle_amqp_pool(amqp_pool_t *pool) {
+void recycle_amqp_pool(amqp_pool_t *pool)
+{
empty_blocklist(&pool->large_blocks);
pool->next_page = 0;
pool->alloc_block = NULL;
pool->alloc_used = 0;
}
-void empty_amqp_pool(amqp_pool_t *pool) {
+void empty_amqp_pool(amqp_pool_t *pool)
+{
recycle_amqp_pool(pool);
empty_blocklist(&pool->pages);
}
/* Returns 1 on success, 0 on failure */
-static int record_pool_block(amqp_pool_blocklist_t *x, void *block) {
+static int record_pool_block(amqp_pool_blocklist_t *x, void *block)
+{
size_t blocklistlength = sizeof(void *) * (x->num_blocks + 1);
if (x->blocklist == NULL) {
x->blocklist = malloc(blocklistlength);
- if (x->blocklist == NULL)
+ if (x->blocklist == NULL) {
return 0;
+ }
} else {
void *newbl = realloc(x->blocklist, blocklistlength);
- if (newbl == NULL)
+ if (newbl == NULL) {
return 0;
+ }
x->blocklist = newbl;
}
@@ -106,7 +114,8 @@ static int record_pool_block(amqp_pool_blocklist_t *x, void *block) {
return 1;
}
-void *amqp_pool_alloc(amqp_pool_t *pool, size_t amount) {
+void *amqp_pool_alloc(amqp_pool_t *pool, size_t amount)
+{
if (amount == 0) {
return NULL;
}
@@ -118,8 +127,9 @@ void *amqp_pool_alloc(amqp_pool_t *pool, size_t amount) {
if (result == NULL) {
return NULL;
}
- if (!record_pool_block(&pool->large_blocks, result))
+ if (!record_pool_block(&pool->large_blocks, result)) {
return NULL;
+ }
return result;
}
@@ -138,8 +148,9 @@ void *amqp_pool_alloc(amqp_pool_t *pool, size_t amount) {
if (pool->alloc_block == NULL) {
return NULL;
}
- if (!record_pool_block(&pool->pages, pool->alloc_block))
+ if (!record_pool_block(&pool->pages, pool->alloc_block)) {
return NULL;
+ }
pool->next_page = pool->pages.num_blocks;
} else {
pool->alloc_block = pool->pages.blocklist[pool->next_page];
@@ -151,19 +162,22 @@ void *amqp_pool_alloc(amqp_pool_t *pool, size_t amount) {
return pool->alloc_block;
}
-void amqp_pool_alloc_bytes(amqp_pool_t *pool, size_t amount, amqp_bytes_t *output) {
+void amqp_pool_alloc_bytes(amqp_pool_t *pool, size_t amount, amqp_bytes_t *output)
+{
output->len = amount;
output->bytes = amqp_pool_alloc(pool, amount);
}
-amqp_bytes_t amqp_cstring_bytes(char const *cstr) {
+amqp_bytes_t amqp_cstring_bytes(char const *cstr)
+{
amqp_bytes_t result;
result.len = strlen(cstr);
result.bytes = (void *) cstr;
return result;
}
-amqp_bytes_t amqp_bytes_malloc_dup(amqp_bytes_t src) {
+amqp_bytes_t amqp_bytes_malloc_dup(amqp_bytes_t src)
+{
amqp_bytes_t result;
result.len = src.len;
result.bytes = malloc(src.len);
@@ -173,13 +187,15 @@ amqp_bytes_t amqp_bytes_malloc_dup(amqp_bytes_t src) {
return result;
}
-amqp_bytes_t amqp_bytes_malloc(size_t amount) {
+amqp_bytes_t amqp_bytes_malloc(size_t amount)
+{
amqp_bytes_t result;
result.len = amount;
result.bytes = malloc(amount); /* will return NULL if it fails */
return result;
}
-void amqp_bytes_free(amqp_bytes_t bytes) {
+void amqp_bytes_free(amqp_bytes_t bytes)
+{
free(bytes.bytes);
}
diff --git a/librabbitmq/amqp_private.h b/librabbitmq/amqp_private.h
index 725e3c5..191155e 100644
--- a/librabbitmq/amqp_private.h
+++ b/librabbitmq/amqp_private.h
@@ -159,53 +159,53 @@ static inline void *amqp_offset(void *data, size_t offset)
/* This macro defines the encoding and decoding functions associated with a
simple type. */
-#define DECLARE_CODEC_BASE_TYPE(bits, htonx, ntohx) \
- \
-static inline void amqp_e##bits(void *data, size_t offset, \
- uint##bits##_t val) \
-{ \
- /* The AMQP data might be unaligned. So we encode and then copy the \
- result into place. */ \
- uint##bits##_t res = htonx(val); \
- memcpy(amqp_offset(data, offset), &res, bits/8); \
-} \
- \
-static inline uint##bits##_t amqp_d##bits(void *data, size_t offset) \
-{ \
- /* The AMQP data might be unaligned. So we copy the source value \
- into a variable and then decode it. */ \
- uint##bits##_t val; \
- memcpy(&val, amqp_offset(data, offset), bits/8); \
- return ntohx(val); \
-} \
- \
-static inline int amqp_encode_##bits(amqp_bytes_t encoded, size_t *offset, \
- uint##bits##_t input) \
- \
-{ \
- size_t o = *offset; \
- if ((*offset = o + bits / 8) <= encoded.len) { \
- amqp_e##bits(encoded.bytes, o, input); \
- return 1; \
- } \
- else { \
- return 0; \
- } \
-} \
- \
-static inline int amqp_decode_##bits(amqp_bytes_t encoded, size_t *offset, \
- uint##bits##_t *output) \
- \
-{ \
- size_t o = *offset; \
- if ((*offset = o + bits / 8) <= encoded.len) { \
- *output = amqp_d##bits(encoded.bytes, o); \
- return 1; \
- } \
- else { \
- return 0; \
- } \
-}
+#define DECLARE_CODEC_BASE_TYPE(bits, htonx, ntohx) \
+ \
+ static inline void amqp_e##bits(void *data, size_t offset, \
+ uint##bits##_t val) \
+ { \
+ /* The AMQP data might be unaligned. So we encode and then copy the \
+ result into place. */ \
+ uint##bits##_t res = htonx(val); \
+ memcpy(amqp_offset(data, offset), &res, bits/8); \
+ } \
+ \
+ static inline uint##bits##_t amqp_d##bits(void *data, size_t offset) \
+ { \
+ /* The AMQP data might be unaligned. So we copy the source value \
+ into a variable and then decode it. */ \
+ uint##bits##_t val; \
+ memcpy(&val, amqp_offset(data, offset), bits/8); \
+ return ntohx(val); \
+ } \
+ \
+ static inline int amqp_encode_##bits(amqp_bytes_t encoded, size_t *offset, \
+ uint##bits##_t input) \
+ \
+ { \
+ size_t o = *offset; \
+ if ((*offset = o + bits / 8) <= encoded.len) { \
+ amqp_e##bits(encoded.bytes, o, input); \
+ return 1; \
+ } \
+ else { \
+ return 0; \
+ } \
+ } \
+ \
+ static inline int amqp_decode_##bits(amqp_bytes_t encoded, size_t *offset, \
+ uint##bits##_t *output) \
+ \
+ { \
+ size_t o = *offset; \
+ if ((*offset = o + bits / 8) <= encoded.len) { \
+ *output = amqp_d##bits(encoded.bytes, o); \
+ return 1; \
+ } \
+ else { \
+ return 0; \
+ } \
+ }
/* Determine byte order */
#if defined(__GLIBC__)
@@ -215,7 +215,7 @@ static inline int amqp_decode_##bits(amqp_bytes_t encoded, size_t *offset, \
# elif (__BYTE_ORDER == __BIG_ENDIAN)
# define AMQP_BIG_ENDIAN
# else
- /* Don't define anything */
+/* Don't define anything */
# endif
#elif defined(_BIG_ENDIAN) && !defined(_LITTLE_ENDIAN) || \
defined(__BIG_ENDIAN__) && !defined(__LITTLE_ENDIAN__)
@@ -235,40 +235,40 @@ static inline int amqp_decode_##bits(amqp_bytes_t encoded, size_t *offset, \
defined(__i386__) || defined(_M_IX86)
# define AMQP_LITTLE_ENDIAN
#else
- /* Don't define anything */
+/* Don't define anything */
#endif
#if defined(AMQP_LITTLE_ENDIAN)
-#define DECLARE_XTOXLL(func) \
-static inline uint64_t func##ll(uint64_t val) \
-{ \
- union { \
- uint64_t whole; \
- uint32_t halves[2]; \
- } u; \
- uint32_t t; \
- u.whole = val; \
- t = u.halves[0]; \
- u.halves[0] = func##l(u.halves[1]); \
- u.halves[1] = func##l(t); \
- return u.whole; \
-}
+#define DECLARE_XTOXLL(func) \
+ static inline uint64_t func##ll(uint64_t val) \
+ { \
+ union { \
+ uint64_t whole; \
+ uint32_t halves[2]; \
+ } u; \
+ uint32_t t; \
+ u.whole = val; \
+ t = u.halves[0]; \
+ u.halves[0] = func##l(u.halves[1]); \
+ u.halves[1] = func##l(t); \
+ return u.whole; \
+ }
#elif defined(AMQP_BIG_ENDIAN)
-#define DECLARE_XTOXLL(func) \
-static inline uint64_t func##ll(uint64_t val) \
-{ \
- union { \
- uint64_t whole; \
- uint32_t halves[2]; \
- } u; \
- u.whole = val; \
- u.halves[0] = func##l(u.halves[0]); \
- u.halves[1] = func##l(u.halves[1]); \
- return u.whole; \
-}
+#define DECLARE_XTOXLL(func) \
+ static inline uint64_t func##ll(uint64_t val) \
+ { \
+ union { \
+ uint64_t whole; \
+ uint32_t halves[2]; \
+ } u; \
+ u.whole = val; \
+ u.halves[0] = func##l(u.halves[0]); \
+ u.halves[1] = func##l(u.halves[1]); \
+ return u.whole; \
+ }
#else
# error Endianness not known
@@ -285,28 +285,26 @@ DECLARE_CODEC_BASE_TYPE(32, htonl, ntohl)
DECLARE_CODEC_BASE_TYPE(64, htonll, ntohll)
static inline int amqp_encode_bytes(amqp_bytes_t encoded, size_t *offset,
- amqp_bytes_t input)
+ amqp_bytes_t input)
{
size_t o = *offset;
if ((*offset = o + input.len) <= encoded.len) {
memcpy(amqp_offset(encoded.bytes, o), input.bytes, input.len);
return 1;
- }
- else {
+ } else {
return 0;
}
}
static inline int amqp_decode_bytes(amqp_bytes_t encoded, size_t *offset,
- amqp_bytes_t *output, size_t len)
+ amqp_bytes_t *output, size_t len)
{
size_t o = *offset;
if ((*offset = o + len) <= encoded.len) {
output->bytes = amqp_offset(encoded.bytes, o);
output->len = len;
return 1;
- }
- else {
+ } else {
return 0;
}
}
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index c323792..8454d45 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -44,7 +44,7 @@
#include <assert.h>
int amqp_open_socket(char const *hostname,
- int portnumber)
+ int portnumber)
{
struct addrinfo hint;
struct addrinfo *address_list;
@@ -54,8 +54,9 @@ int amqp_open_socket(char const *hostname,
int last_error = 0;
int one = 1; /* for setsockopt */
- if (0 != (last_error = amqp_socket_init()))
+ if (0 != (last_error = amqp_socket_init())) {
return last_error;
+ }
memset(&hint, 0, sizeof(hint));
hint.ai_family = PF_UNSPEC; /* PF_INET or PF_INET6 */
@@ -66,63 +67,58 @@ int amqp_open_socket(char const *hostname,
last_error = getaddrinfo(hostname, portnumber_string, &hint, &address_list);
- if (last_error != 0)
- {
+ if (last_error != 0) {
return -ERROR_GETHOSTBYNAME_FAILED;
}
- for (addr = address_list; addr; addr = addr->ai_next)
- {
+ for (addr = address_list; addr; addr = addr->ai_next) {
/*
This cast is to squash warnings on Win64, see:
http://stackoverflow.com/questions/1953639/is-it-safe-to-cast-socket-to-int-under-win64
*/
sockfd = (int)socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol);
- if (-1 == sockfd)
- {
+ if (-1 == sockfd) {
last_error = -amqp_socket_error();
continue;
}
#ifdef DISABLE_SIGPIPE_WITH_SETSOCKOPT
- if (0 != amqp_socket_setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one)))
- {
+ if (0 != amqp_socket_setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one))) {
last_error = -amqp_socket_error();
amqp_socket_close(sockfd);
continue;
}
#endif /* DISABLE_SIGPIPE_WITH_SETSOCKOPT */
if (0 != amqp_socket_setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one))
- || 0 != connect(sockfd, addr->ai_addr, addr->ai_addrlen))
- {
+ || 0 != connect(sockfd, addr->ai_addr, addr->ai_addrlen)) {
last_error = -amqp_socket_error();
amqp_socket_close(sockfd);
continue;
- }
- else
- {
+ } else {
last_error = 0;
break;
}
}
freeaddrinfo(address_list);
- if (last_error != 0)
- {
+ if (last_error != 0) {
return last_error;
}
return sockfd;
}
-int amqp_send_header(amqp_connection_state_t state) {
+int amqp_send_header(amqp_connection_state_t state)
+{
static const uint8_t header[8] = { 'A', 'M', 'Q', 'P', 0,
- AMQP_PROTOCOL_VERSION_MAJOR,
- AMQP_PROTOCOL_VERSION_MINOR,
- AMQP_PROTOCOL_VERSION_REVISION };
+ AMQP_PROTOCOL_VERSION_MAJOR,
+ AMQP_PROTOCOL_VERSION_MINOR,
+ AMQP_PROTOCOL_VERSION_REVISION
+ };
return send(state->sockfd, (void *)header, 8, MSG_NOSIGNAL);
}
-static amqp_bytes_t sasl_method_name(amqp_sasl_method_enum method) {
+static amqp_bytes_t sasl_method_name(amqp_sasl_method_enum method)
+{
amqp_bytes_t res;
switch (method) {
@@ -139,40 +135,43 @@ static amqp_bytes_t sasl_method_name(amqp_sasl_method_enum method) {
}
static amqp_bytes_t sasl_response(amqp_pool_t *pool,
- amqp_sasl_method_enum method,
- va_list args)
+ amqp_sasl_method_enum method,
+ va_list args)
{
amqp_bytes_t response;
switch (method) {
- case AMQP_SASL_METHOD_PLAIN: {
- char *username = va_arg(args, char *);
- size_t username_len = strlen(username);
- char *password = va_arg(args, char *);
- size_t password_len = strlen(password);
- char *response_buf;
-
- amqp_pool_alloc_bytes(pool, strlen(username) + strlen(password) + 2, &response);
- if (response.bytes == NULL)
- /* We never request a zero-length block, because of the +2
- above, so a NULL here really is ENOMEM. */
- return response;
-
- response_buf = response.bytes;
- response_buf[0] = 0;
- memcpy(response_buf + 1, username, username_len);
- response_buf[username_len + 1] = 0;
- memcpy(response_buf + username_len + 2, password, password_len);
- break;
+ case AMQP_SASL_METHOD_PLAIN: {
+ char *username = va_arg(args, char *);
+ size_t username_len = strlen(username);
+ char *password = va_arg(args, char *);
+ size_t password_len = strlen(password);
+ char *response_buf;
+
+ amqp_pool_alloc_bytes(pool, strlen(username) + strlen(password) + 2, &response);
+ if (response.bytes == NULL)
+ /* We never request a zero-length block, because of the +2
+ above, so a NULL here really is ENOMEM. */
+ {
+ return response;
}
- default:
- amqp_abort("Invalid SASL method: %d", (int) method);
+
+ response_buf = response.bytes;
+ response_buf[0] = 0;
+ memcpy(response_buf + 1, username, username_len);
+ response_buf[username_len + 1] = 0;
+ memcpy(response_buf + username_len + 2, password, password_len);
+ break;
+ }
+ default:
+ amqp_abort("Invalid SASL method: %d", (int) method);
}
return response;
}
-amqp_boolean_t amqp_frames_enqueued(amqp_connection_state_t state) {
+amqp_boolean_t amqp_frames_enqueued(amqp_connection_state_t state)
+{
return (state->first_queued_frame != NULL);
}
@@ -180,12 +179,13 @@ amqp_boolean_t amqp_frames_enqueued(amqp_connection_state_t state) {
* Check to see if we have data in our buffer. If this returns 1, we
* will avoid an immediate blocking read in amqp_simple_wait_frame.
*/
-amqp_boolean_t amqp_data_in_buffer(amqp_connection_state_t state) {
+amqp_boolean_t amqp_data_in_buffer(amqp_connection_state_t state)
+{
return (state->sock_inbound_offset < state->sock_inbound_limit);
}
static int wait_frame_inner(amqp_connection_state_t state,
- amqp_frame_t *decoded_frame)
+ amqp_frame_t *decoded_frame)
{
while (1) {
int res;
@@ -196,26 +196,29 @@ static int wait_frame_inner(amqp_connection_state_t state,
buffer.bytes = ((char *) state->sock_inbound_buffer.bytes) + state->sock_inbound_offset;
res = amqp_handle_input(state, buffer, decoded_frame);
- if (res < 0)
- return res;
+ if (res < 0) {
+ return res;
+ }
state->sock_inbound_offset += res;
- if (decoded_frame->frame_type != 0)
- /* Complete frame was read. Return it. */
- return 0;
+ if (decoded_frame->frame_type != 0) {
+ /* Complete frame was read. Return it. */
+ return 0;
+ }
/* Incomplete or ignored frame. Keep processing input. */
assert(res != 0);
}
res = recv(state->sockfd, state->sock_inbound_buffer.bytes,
- state->sock_inbound_buffer.len, 0);
+ state->sock_inbound_buffer.len, 0);
if (res <= 0) {
- if (res == 0)
- return -ERROR_CONNECTION_CLOSED;
- else
- return -amqp_socket_error();
+ if (res == 0) {
+ return -ERROR_CONNECTION_CLOSED;
+ } else {
+ return -amqp_socket_error();
+ }
}
state->sock_inbound_limit = res;
@@ -224,7 +227,7 @@ static int wait_frame_inner(amqp_connection_state_t state,
}
int amqp_simple_wait_frame(amqp_connection_state_t state,
- amqp_frame_t *decoded_frame)
+ amqp_frame_t *decoded_frame)
{
if (state->first_queued_frame != NULL) {
amqp_frame_t *f = (amqp_frame_t *) state->first_queued_frame->data;
@@ -240,38 +243,42 @@ int amqp_simple_wait_frame(amqp_connection_state_t state,
}
int amqp_simple_wait_method(amqp_connection_state_t state,
- amqp_channel_t expected_channel,
- amqp_method_number_t expected_method,
- amqp_method_t *output)
+ amqp_channel_t expected_channel,
+ amqp_method_number_t expected_method,
+ amqp_method_t *output)
{
amqp_frame_t frame;
int res = amqp_simple_wait_frame(state, &frame);
- if (res < 0)
+ if (res < 0) {
return res;
+ }
- if (frame.channel != expected_channel)
+ if (frame.channel != expected_channel) {
amqp_abort("Expected 0x%08X method frame on channel %d, got frame on channel %d",
- expected_method,
- expected_channel,
- frame.channel);
- if (frame.frame_type != AMQP_FRAME_METHOD)
+ expected_method,
+ expected_channel,
+ frame.channel);
+ }
+ if (frame.frame_type != AMQP_FRAME_METHOD) {
amqp_abort("Expected 0x%08X method frame on channel %d, got frame type %d",
- expected_method,
- expected_channel,
- frame.frame_type);
- if (frame.payload.method.id != expected_method)
+ expected_method,
+ expected_channel,
+ frame.frame_type);
+ }
+ if (frame.payload.method.id != expected_method) {
amqp_abort("Expected method ID 0x%08X on channel %d, got ID 0x%08X",
- expected_method,
- expected_channel,
- frame.payload.method.id);
+ expected_method,
+ expected_channel,
+ frame.payload.method.id);
+ }
*output = frame.payload.method;
return 0;
}
int amqp_send_method(amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_method_number_t id,
- void *decoded)
+ amqp_channel_t channel,
+ amqp_method_number_t id,
+ void *decoded)
{
amqp_frame_t frame;
@@ -285,17 +292,19 @@ int amqp_send_method(amqp_connection_state_t state,
static int amqp_id_in_reply_list( amqp_method_number_t expected, amqp_method_number_t *list )
{
while ( *list != 0 ) {
- if ( *list == expected ) return 1;
+ if ( *list == expected ) {
+ return 1;
+ }
list++;
}
return 0;
}
amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_method_number_t request_id,
- amqp_method_number_t *expected_reply_ids,
- void *decoded_request_method)
+ amqp_channel_t channel,
+ amqp_method_number_t request_id,
+ amqp_method_number_t *expected_reply_ids,
+ void *decoded_request_method)
{
int status;
amqp_rpc_reply_t result;
@@ -312,7 +321,7 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state,
{
amqp_frame_t frame;
- retry:
+retry:
status = wait_frame_inner(state, &frame);
if (status < 0) {
result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
@@ -328,21 +337,23 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state,
* - on the channel we want, and a channel.close frame, or
* - on channel zero, and a connection.close frame.
*/
- if (!( (frame.frame_type == AMQP_FRAME_METHOD) &&
- ( ((frame.channel == channel) &&
- ((amqp_id_in_reply_list(frame.payload.method.id, expected_reply_ids)) ||
- (frame.payload.method.id == AMQP_CHANNEL_CLOSE_METHOD)))
- ||
- ((frame.channel == 0) &&
- (frame.payload.method.id == AMQP_CONNECTION_CLOSE_METHOD)) ) ))
- {
+ if (!((frame.frame_type == AMQP_FRAME_METHOD)
+ && (
+ ((frame.channel == channel)
+ && (amqp_id_in_reply_list(frame.payload.method.id, expected_reply_ids)
+ || (frame.payload.method.id == AMQP_CHANNEL_CLOSE_METHOD)))
+ ||
+ ((frame.channel == 0)
+ && (frame.payload.method.id == AMQP_CONNECTION_CLOSE_METHOD))
+ )
+ )) {
amqp_frame_t *frame_copy = amqp_pool_alloc(&state->decoding_pool, sizeof(amqp_frame_t));
amqp_link_t *link = amqp_pool_alloc(&state->decoding_pool, sizeof(amqp_link_t));
if (frame_copy == NULL || link == NULL) {
- result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
- result.library_error = ERROR_NO_MEMORY;
- return result;
+ result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
+ result.library_error = ERROR_NO_MEMORY;
+ return result;
}
*frame_copy = frame;
@@ -351,9 +362,9 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state,
link->data = frame_copy;
if (state->last_queued_frame == NULL) {
- state->first_queued_frame = link;
+ state->first_queued_frame = link;
} else {
- state->last_queued_frame->next = link;
+ state->last_queued_frame->next = link;
}
state->last_queued_frame = link;
@@ -361,8 +372,8 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state,
}
result.reply_type = (amqp_id_in_reply_list(frame.payload.method.id, expected_reply_ids))
- ? AMQP_RESPONSE_NORMAL
- : AMQP_RESPONSE_SERVER_EXCEPTION;
+ ? AMQP_RESPONSE_NORMAL
+ : AMQP_RESPONSE_SERVER_EXCEPTION;
result.reply = frame.payload.method;
return result;
@@ -370,10 +381,10 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state,
}
void *amqp_simple_rpc_decoded(amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_method_number_t request_id,
- amqp_method_number_t reply_id,
- void *decoded_request_method)
+ amqp_channel_t channel,
+ amqp_method_number_t request_id,
+ amqp_method_number_t reply_id,
+ void *decoded_request_method)
{
amqp_method_number_t replies[2];
@@ -381,12 +392,13 @@ void *amqp_simple_rpc_decoded(amqp_connection_state_t state,
replies[1] = 0;
state->most_recent_api_result = amqp_simple_rpc(state, channel,
- request_id, replies,
- decoded_request_method);
- if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL)
+ request_id, replies,
+ decoded_request_method);
+ if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) {
return state->most_recent_api_result.reply.decoded;
- else
+ } else {
return NULL;
+ }
}
amqp_rpc_reply_t amqp_get_rpc_reply(amqp_connection_state_t state)
@@ -396,11 +408,11 @@ amqp_rpc_reply_t amqp_get_rpc_reply(amqp_connection_state_t state)
static int amqp_login_inner(amqp_connection_state_t state,
- int channel_max,
- int frame_max,
- int heartbeat,
- amqp_sasl_method_enum sasl_method,
- va_list vl)
+ int channel_max,
+ int frame_max,
+ int heartbeat,
+ amqp_sasl_method_enum sasl_method,
+ va_list vl)
{
int res;
amqp_method_t method;
@@ -411,14 +423,15 @@ static int amqp_login_inner(amqp_connection_state_t state,
amqp_send_header(state);
res = amqp_simple_wait_method(state, 0, AMQP_CONNECTION_START_METHOD,
- &method);
- if (res < 0)
+ &method);
+ if (res < 0) {
return res;
+ }
{
amqp_connection_start_t *s = (amqp_connection_start_t *) method.decoded;
if ((s->version_major != AMQP_PROTOCOL_VERSION_MAJOR) ||
- (s->version_minor != AMQP_PROTOCOL_VERSION_MINOR)) {
+ (s->version_minor != AMQP_PROTOCOL_VERSION_MINOR)) {
return -ERROR_INCOMPATIBLE_AMQP_VERSION;
}
@@ -431,10 +444,11 @@ static int amqp_login_inner(amqp_connection_state_t state,
amqp_table_entry_t properties[2];
amqp_connection_start_ok_t s;
amqp_bytes_t response_bytes = sasl_response(&state->decoding_pool,
- sasl_method, vl);
+ sasl_method, vl);
- if (response_bytes.bytes == NULL)
+ if (response_bytes.bytes == NULL) {
return -ERROR_NO_MEMORY;
+ }
properties[0].key = amqp_cstring_bytes("product");
properties[0].value.kind = AMQP_FIELD_KIND_UTF8;
@@ -454,16 +468,18 @@ static int amqp_login_inner(amqp_connection_state_t state,
s.locale.len = 5;
res = amqp_send_method(state, 0, AMQP_CONNECTION_START_OK_METHOD, &s);
- if (res < 0)
+ if (res < 0) {
return res;
+ }
}
amqp_release_buffers(state);
res = amqp_simple_wait_method(state, 0, AMQP_CONNECTION_TUNE_METHOD,
- &method);
- if (res < 0)
+ &method);
+ if (res < 0) {
return res;
+ }
{
amqp_connection_tune_t *s = (amqp_connection_tune_t *) method.decoded;
@@ -472,18 +488,22 @@ static int amqp_login_inner(amqp_connection_state_t state,
server_heartbeat = s->heartbeat;
}
- if (server_channel_max != 0 && server_channel_max < channel_max)
+ if (server_channel_max != 0 && server_channel_max < channel_max) {
channel_max = server_channel_max;
+ }
- if (server_frame_max != 0 && server_frame_max < frame_max)
+ if (server_frame_max != 0 && server_frame_max < frame_max) {
frame_max = server_frame_max;
+ }
- if (server_heartbeat != 0 && server_heartbeat < heartbeat)
+ if (server_heartbeat != 0 && server_heartbeat < heartbeat) {
heartbeat = server_heartbeat;
+ }
res = amqp_tune_connection(state, channel_max, frame_max, heartbeat);
- if (res < 0)
+ if (res < 0) {
return res;
+ }
{
amqp_connection_tune_ok_t s;
@@ -492,8 +512,9 @@ static int amqp_login_inner(amqp_connection_state_t state,
s.heartbeat = heartbeat;
res = amqp_send_method(state, 0, AMQP_CONNECTION_TUNE_OK_METHOD, &s);
- if (res < 0)
+ if (res < 0) {
return res;
+ }
}
amqp_release_buffers(state);
@@ -502,12 +523,12 @@ static int amqp_login_inner(amqp_connection_state_t state,
}
amqp_rpc_reply_t amqp_login(amqp_connection_state_t state,
- char const *vhost,
- int channel_max,
- int frame_max,
- int heartbeat,
- amqp_sasl_method_enum sasl_method,
- ...)
+ char const *vhost,
+ int channel_max,
+ int frame_max,
+ int heartbeat,
+ amqp_sasl_method_enum sasl_method,
+ ...)
{
va_list vl;
amqp_rpc_reply_t result;
@@ -533,12 +554,13 @@ amqp_rpc_reply_t amqp_login(amqp_connection_state_t state,
s.insist = 1;
result = amqp_simple_rpc(state,
- 0,
- AMQP_CONNECTION_OPEN_METHOD,
- (amqp_method_number_t *) &replies,
- &s);
- if (result.reply_type != AMQP_RESPONSE_NORMAL)
+ 0,
+ AMQP_CONNECTION_OPEN_METHOD,
+ (amqp_method_number_t *) &replies,
+ &s);
+ if (result.reply_type != AMQP_RESPONSE_NORMAL) {
return result;
+ }
}
amqp_maybe_release_buffers(state);
diff --git a/librabbitmq/amqp_table.c b/librabbitmq/amqp_table.c
index d82c946..d9383d6 100644
--- a/librabbitmq/amqp_table.c
+++ b/librabbitmq/amqp_table.c
@@ -46,20 +46,20 @@
#define INITIAL_TABLE_SIZE 16
static int amqp_decode_field_value(amqp_bytes_t encoded,
- amqp_pool_t *pool,
- amqp_field_value_t *entry,
- size_t *offset);
+ amqp_pool_t *pool,
+ amqp_field_value_t *entry,
+ size_t *offset);
static int amqp_encode_field_value(amqp_bytes_t encoded,
- amqp_field_value_t *entry,
- size_t *offset);
+ amqp_field_value_t *entry,
+ size_t *offset);
/*---------------------------------------------------------------------------*/
static int amqp_decode_array(amqp_bytes_t encoded,
- amqp_pool_t *pool,
- amqp_array_t *output,
- size_t *offset)
+ amqp_pool_t *pool,
+ amqp_array_t *output,
+ size_t *offset)
{
uint32_t arraysize;
int num_entries = 0;
@@ -68,12 +68,14 @@ static int amqp_decode_array(amqp_bytes_t encoded,
size_t limit;
int res;
- if (!amqp_decode_32(encoded, offset, &arraysize))
+ if (!amqp_decode_32(encoded, offset, &arraysize)) {
return -ERROR_BAD_AMQP_DATA;
+ }
entries = malloc(allocated_entries * sizeof(amqp_field_value_t));
- if (entries == NULL)
+ if (entries == NULL) {
return -ERROR_NO_MEMORY;
+ }
limit = *offset + arraysize;
while (*offset < limit) {
@@ -82,16 +84,18 @@ static int amqp_decode_array(amqp_bytes_t encoded,
allocated_entries = allocated_entries * 2;
newentries = realloc(entries, allocated_entries * sizeof(amqp_field_value_t));
res = -ERROR_NO_MEMORY;
- if (newentries == NULL)
- goto out;
+ if (newentries == NULL) {
+ goto out;
+ }
entries = newentries;
}
res = amqp_decode_field_value(encoded, pool, &entries[num_entries],
- offset);
- if (res < 0)
+ offset);
+ if (res < 0) {
goto out;
+ }
num_entries++;
}
@@ -100,21 +104,22 @@ static int amqp_decode_array(amqp_bytes_t encoded,
output->entries = amqp_pool_alloc(pool, num_entries * sizeof(amqp_field_value_t));
res = -ERROR_NO_MEMORY;
/* NULL is legitimate if we requested a zero-length block. */
- if (output->entries == NULL && num_entries > 0)
+ if (output->entries == NULL && num_entries > 0) {
goto out;
+ }
memcpy(output->entries, entries, num_entries * sizeof(amqp_field_value_t));
res = 0;
- out:
+out:
free(entries);
return res;
}
int amqp_decode_table(amqp_bytes_t encoded,
- amqp_pool_t *pool,
- amqp_table_t *output,
- size_t *offset)
+ amqp_pool_t *pool,
+ amqp_table_t *output,
+ size_t *offset)
{
uint32_t tablesize;
int num_entries = 0;
@@ -123,40 +128,46 @@ int amqp_decode_table(amqp_bytes_t encoded,
size_t limit;
int res;
- if (!amqp_decode_32(encoded, offset, &tablesize))
+ if (!amqp_decode_32(encoded, offset, &tablesize)) {
return -ERROR_BAD_AMQP_DATA;
+ }
entries = malloc(allocated_entries * sizeof(amqp_table_entry_t));
- if (entries == NULL)
+ if (entries == NULL) {
return -ERROR_NO_MEMORY;
+ }
limit = *offset + tablesize;
while (*offset < limit) {
uint8_t keylen;
res = -ERROR_BAD_AMQP_DATA;
- if (!amqp_decode_8(encoded, offset, &keylen))
+ if (!amqp_decode_8(encoded, offset, &keylen)) {
goto out;
+ }
if (num_entries >= allocated_entries) {
void *newentries;
allocated_entries = allocated_entries * 2;
newentries = realloc(entries, allocated_entries * sizeof(amqp_table_entry_t));
res = -ERROR_NO_MEMORY;
- if (newentries == NULL)
- goto out;
+ if (newentries == NULL) {
+ goto out;
+ }
entries = newentries;
}
res = -ERROR_BAD_AMQP_DATA;
- if (!amqp_decode_bytes(encoded, offset, &entries[num_entries].key, keylen))
+ if (!amqp_decode_bytes(encoded, offset, &entries[num_entries].key, keylen)) {
goto out;
+ }
res = amqp_decode_field_value(encoded, pool, &entries[num_entries].value,
- offset);
- if (res < 0)
+ offset);
+ if (res < 0) {
goto out;
+ }
num_entries++;
}
@@ -165,26 +176,28 @@ int amqp_decode_table(amqp_bytes_t encoded,
output->entries = amqp_pool_alloc(pool, num_entries * sizeof(amqp_table_entry_t));
res = -ERROR_NO_MEMORY;
/* NULL is legitimate if we requested a zero-length block. */
- if (output->entries == NULL && num_entries > 0)
+ if (output->entries == NULL && num_entries > 0) {
goto out;
+ }
memcpy(output->entries, entries, num_entries * sizeof(amqp_table_entry_t));
res = 0;
- out:
+out:
free(entries);
return res;
}
static int amqp_decode_field_value(amqp_bytes_t encoded,
- amqp_pool_t *pool,
- amqp_field_value_t *entry,
- size_t *offset)
+ amqp_pool_t *pool,
+ amqp_field_value_t *entry,
+ size_t *offset)
{
int res = -ERROR_BAD_AMQP_DATA;
- if (!amqp_decode_8(encoded, offset, &entry->kind))
+ if (!amqp_decode_8(encoded, offset, &entry->kind)) {
goto out;
+ }
#define TRIVIAL_FIELD_DECODER(bits) if (!amqp_decode_##bits(encoded, offset, &entry->value.u##bits)) goto out; break
#define SIMPLE_FIELD_DECODER(bits, dest, how) { uint##bits##_t val; if (!amqp_decode_##bits(encoded, offset, &val)) goto out; entry->value.dest = how; } break
@@ -223,8 +236,9 @@ static int amqp_decode_field_value(amqp_bytes_t encoded,
case AMQP_FIELD_KIND_DECIMAL:
if (!amqp_decode_8(encoded, offset, &entry->value.decimal.decimals)
- || !amqp_decode_32(encoded, offset, &entry->value.decimal.value))
+ || !amqp_decode_32(encoded, offset, &entry->value.decimal.value)) {
goto out;
+ }
break;
case AMQP_FIELD_KIND_UTF8:
@@ -234,8 +248,9 @@ static int amqp_decode_field_value(amqp_bytes_t encoded,
case AMQP_FIELD_KIND_BYTES: {
uint32_t len;
if (!amqp_decode_32(encoded, offset, &len)
- || !amqp_decode_bytes(encoded, offset, &entry->value.bytes, len))
+ || !amqp_decode_bytes(encoded, offset, &entry->value.bytes, len)) {
goto out;
+ }
break;
}
@@ -259,15 +274,15 @@ static int amqp_decode_field_value(amqp_bytes_t encoded,
res = 0;
- out:
+out:
return res;
}
/*---------------------------------------------------------------------------*/
static int amqp_encode_array(amqp_bytes_t encoded,
- amqp_array_t *input,
- size_t *offset)
+ amqp_array_t *input,
+ size_t *offset)
{
size_t start = *offset;
int i, res;
@@ -276,22 +291,24 @@ static int amqp_encode_array(amqp_bytes_t encoded,
for (i = 0; i < input->num_entries; i++) {
res = amqp_encode_field_value(encoded, &input->entries[i], offset);
- if (res < 0)
+ if (res < 0) {
goto out;
+ }
}
- if (amqp_encode_32(encoded, &start, *offset - start - 4))
+ if (amqp_encode_32(encoded, &start, *offset - start - 4)) {
res = 0;
- else
+ } else {
res = -ERROR_BAD_AMQP_DATA;
+ }
- out:
+out:
return res;
}
int amqp_encode_table(amqp_bytes_t encoded,
- amqp_table_t *input,
- size_t *offset)
+ amqp_table_t *input,
+ size_t *offset)
{
size_t start = *offset;
int i, res;
@@ -300,35 +317,40 @@ int amqp_encode_table(amqp_bytes_t encoded,
for (i = 0; i < input->num_entries; i++) {
res = amqp_encode_8(encoded, offset, input->entries[i].key.len);
- if (res < 0)
+ if (res < 0) {
goto out;
+ }
res = amqp_encode_bytes(encoded, offset, input->entries[i].key);
- if (res < 0)
+ if (res < 0) {
goto out;
+ }
res = amqp_encode_field_value(encoded, &input->entries[i].value, offset);
- if (res < 0)
+ if (res < 0) {
goto out;
+ }
}
- if (amqp_encode_32(encoded, &start, *offset - start - 4))
+ if (amqp_encode_32(encoded, &start, *offset - start - 4)) {
res = 0;
- else
+ } else {
res = -ERROR_BAD_AMQP_DATA;
+ }
- out:
+out:
return res;
}
static int amqp_encode_field_value(amqp_bytes_t encoded,
- amqp_field_value_t *entry,
- size_t *offset)
+ amqp_field_value_t *entry,
+ size_t *offset)
{
int res = -ERROR_BAD_AMQP_DATA;
- if (!amqp_encode_8(encoded, offset, entry->kind))
+ if (!amqp_encode_8(encoded, offset, entry->kind)) {
goto out;
+ }
#define FIELD_ENCODER(bits, val) if (!amqp_encode_##bits(encoded, offset, val)) goto out; break
@@ -366,8 +388,9 @@ static int amqp_encode_field_value(amqp_bytes_t encoded,
case AMQP_FIELD_KIND_DECIMAL:
if (!amqp_encode_8(encoded, offset, entry->value.decimal.decimals)
- || !amqp_encode_32(encoded, offset, entry->value.decimal.value))
+ || !amqp_encode_32(encoded, offset, entry->value.decimal.value)) {
goto out;
+ }
break;
case AMQP_FIELD_KIND_UTF8:
@@ -376,8 +399,9 @@ static int amqp_encode_field_value(amqp_bytes_t encoded,
/* fall through */
case AMQP_FIELD_KIND_BYTES:
if (!amqp_encode_32(encoded, offset, entry->value.bytes.len)
- || !amqp_encode_bytes(encoded, offset, entry->value.bytes))
+ || !amqp_encode_bytes(encoded, offset, entry->value.bytes)) {
goto out;
+ }
break;
case AMQP_FIELD_KIND_ARRAY:
@@ -400,13 +424,14 @@ static int amqp_encode_field_value(amqp_bytes_t encoded,
res = 0;
- out:
+out:
return res;
}
/*---------------------------------------------------------------------------*/
-int amqp_table_entry_cmp(void const *entry1, void const *entry2) {
+int amqp_table_entry_cmp(void const *entry1, void const *entry2)
+{
amqp_table_entry_t const *p1 = (amqp_table_entry_t const *) entry1;
amqp_table_entry_t const *p2 = (amqp_table_entry_t const *) entry2;
@@ -414,7 +439,9 @@ int amqp_table_entry_cmp(void const *entry1, void const *entry2) {
size_t minlen;
minlen = p1->key.len;
- if (p2->key.len < minlen) minlen = p2->key.len;
+ if (p2->key.len < minlen) {
+ minlen = p2->key.len;
+ }
d = memcmp(p1->key.bytes, p2->key.bytes, minlen);
if (d != 0) {
diff --git a/librabbitmq/amqp_url.c b/librabbitmq/amqp_url.c
index 367376d..d3e6724 100644
--- a/librabbitmq/amqp_url.c
+++ b/librabbitmq/amqp_url.c
@@ -43,159 +43,167 @@
void amqp_default_connection_info(struct amqp_connection_info *ci)
{
- /* Apply defaults */
- ci->user = "guest";
- ci->password = "guest";
- ci->host = "localhost";
- ci->port = 5672;
- ci->vhost = "/";
+ /* Apply defaults */
+ ci->user = "guest";
+ ci->password = "guest";
+ ci->host = "localhost";
+ ci->port = 5672;
+ ci->vhost = "/";
}
/* Scan for the next delimiter, handling percent-encodings on the way. */
static char find_delim(char **pp, int colon_and_at_sign_are_delims)
{
- char *from = *pp;
- char *to = from;
-
- for (;;) {
- char ch = *from++;
-
- switch (ch) {
- case ':':
- case '@':
- if (!colon_and_at_sign_are_delims) {
- *to++ = ch;
- break;
- }
-
- /* fall through */
- case 0:
- case '/':
- case '?':
- case '#':
- case '[':
- case ']':
- *to = 0;
- *pp = from;
- return ch;
-
- case '%': {
- unsigned int val;
- int chars;
- int res = sscanf(from, "%2x%n", &val, &chars);
-
- if (res == EOF || res < 1 || chars != 2)
- /* Return a surprising delimiter to
- force an error. */
- return '%';
-
- *to++ = val;
- from += 2;
- break;
- }
-
- default:
- *to++ = ch;
- break;
- }
- }
+ char *from = *pp;
+ char *to = from;
+
+ for (;;) {
+ char ch = *from++;
+
+ switch (ch) {
+ case ':':
+ case '@':
+ if (!colon_and_at_sign_are_delims) {
+ *to++ = ch;
+ break;
+ }
+
+ /* fall through */
+ case 0:
+ case '/':
+ case '?':
+ case '#':
+ case '[':
+ case ']':
+ *to = 0;
+ *pp = from;
+ return ch;
+
+ case '%': {
+ unsigned int val;
+ int chars;
+ int res = sscanf(from, "%2x%n", &val, &chars);
+
+ if (res == EOF || res < 1 || chars != 2)
+ /* Return a surprising delimiter to
+ force an error. */
+ {
+ return '%';
+ }
+
+ *to++ = val;
+ from += 2;
+ break;
+ }
+
+ default:
+ *to++ = ch;
+ break;
+ }
+ }
}
/* Parse an AMQP URL into its component parts. */
int amqp_parse_url(char *url, struct amqp_connection_info *parsed)
{
- int res = -ERROR_BAD_AMQP_URL;
- char delim;
- char *start;
- char *host;
- char *port = NULL;
-
- /* check the prefix */
- if (strncmp(url, "amqp://", 7))
- goto out;
-
- host = start = url += 7;
- delim = find_delim(&url, 1);
-
- if (delim == ':') {
- /* The colon could be introducing the port or the
- password part of the userinfo. We don't know yet,
- so stash the preceding component. */
- port = start = url;
- delim = find_delim(&url, 1);
- }
-
- if (delim == '@') {
- /* What might have been the host and port were in fact
- the username and password */
- parsed->user = host;
- if (port)
- parsed->password = port;
-
- port = NULL;
- host = start = url;
- delim = find_delim(&url, 1);
- }
-
- if (delim == '[') {
- /* IPv6 address. The bracket should be the first
- character in the host. */
- if (host != start || *host != 0)
- goto out;
-
- start = url;
- delim = find_delim(&url, 0);
-
- if (delim != ']')
- goto out;
-
- parsed->host = start;
- start = url;
- delim = find_delim(&url, 1);
-
- /* Closing bracket should be the last character in the
- host. */
- if (*start != 0)
- goto out;
- }
- else {
- /* If we haven't seen the host yet, this is it. */
- if (*host != 0)
- parsed->host = host;
- }
-
- if (delim == ':') {
- port = start = url;
- delim = find_delim(&url, 1);
- }
-
- if (port) {
- char *end;
- long portnum = strtol(port, &end, 10);
-
- if (port == end || *end != 0 || portnum < 0 || portnum > 65535)
- goto out;
-
- parsed->port = portnum;
- }
-
- if (delim == '/') {
- start = url;
- delim = find_delim(&url, 1);
-
- if (delim != 0)
- goto out;
-
- parsed->vhost = start;
- res = 0;
- }
- else if (delim == 0) {
- res = 0;
- }
-
- /* Any other delimiter is bad, and we will return
- ERROR_BAD_AMQP_URL. */
-
- out:
- return res;
+ int res = -ERROR_BAD_AMQP_URL;
+ char delim;
+ char *start;
+ char *host;
+ char *port = NULL;
+
+ /* check the prefix */
+ if (strncmp(url, "amqp://", 7)) {
+ goto out;
+ }
+
+ host = start = url += 7;
+ delim = find_delim(&url, 1);
+
+ if (delim == ':') {
+ /* The colon could be introducing the port or the
+ password part of the userinfo. We don't know yet,
+ so stash the preceding component. */
+ port = start = url;
+ delim = find_delim(&url, 1);
+ }
+
+ if (delim == '@') {
+ /* What might have been the host and port were in fact
+ the username and password */
+ parsed->user = host;
+ if (port) {
+ parsed->password = port;
+ }
+
+ port = NULL;
+ host = start = url;
+ delim = find_delim(&url, 1);
+ }
+
+ if (delim == '[') {
+ /* IPv6 address. The bracket should be the first
+ character in the host. */
+ if (host != start || *host != 0) {
+ goto out;
+ }
+
+ start = url;
+ delim = find_delim(&url, 0);
+
+ if (delim != ']') {
+ goto out;
+ }
+
+ parsed->host = start;
+ start = url;
+ delim = find_delim(&url, 1);
+
+ /* Closing bracket should be the last character in the
+ host. */
+ if (*start != 0) {
+ goto out;
+ }
+ } else {
+ /* If we haven't seen the host yet, this is it. */
+ if (*host != 0) {
+ parsed->host = host;
+ }
+ }
+
+ if (delim == ':') {
+ port = start = url;
+ delim = find_delim(&url, 1);
+ }
+
+ if (port) {
+ char *end;
+ long portnum = strtol(port, &end, 10);
+
+ if (port == end || *end != 0 || portnum < 0 || portnum > 65535) {
+ goto out;
+ }
+
+ parsed->port = portnum;
+ }
+
+ if (delim == '/') {
+ start = url;
+ delim = find_delim(&url, 1);
+
+ if (delim != 0) {
+ goto out;
+ }
+
+ parsed->vhost = start;
+ res = 0;
+ } else if (delim == 0) {
+ res = 0;
+ }
+
+ /* Any other delimiter is bad, and we will return
+ ERROR_BAD_AMQP_URL. */
+
+out:
+ return res;
}
diff --git a/librabbitmq/unix/socket.c b/librabbitmq/unix/socket.c
index 69bb036..8c8ba6b 100644
--- a/librabbitmq/unix/socket.c
+++ b/librabbitmq/unix/socket.c
@@ -45,37 +45,38 @@
int
amqp_socket_init(void)
{
- return 0;
+ return 0;
}
int
amqp_socket_error(void)
{
- return errno | ERROR_CATEGORY_OS;
+ return errno | ERROR_CATEGORY_OS;
}
int amqp_socket_socket(int domain, int type, int proto)
{
- int flags;
+ int flags;
- int s = socket(domain, type, proto);
- if (s < 0)
- return s;
+ int s = socket(domain, type, proto);
+ if (s < 0) {
+ return s;
+ }
- /* Always enable CLOEXEC on the socket */
- flags = fcntl(s, F_GETFD);
- if (flags == -1
- || fcntl(s, F_SETFD, (long)(flags | FD_CLOEXEC)) == -1) {
- int e = errno;
- close(s);
- errno = e;
- return -1;
- }
+ /* Always enable CLOEXEC on the socket */
+ flags = fcntl(s, F_GETFD);
+ if (flags == -1
+ || fcntl(s, F_SETFD, (long)(flags | FD_CLOEXEC)) == -1) {
+ int e = errno;
+ close(s);
+ errno = e;
+ return -1;
+ }
- return s;
+ return s;
}
char *amqp_os_error_string(int err)
{
- return strdup(strerror(err));
+ return strdup(strerror(err));
}
diff --git a/librabbitmq/win32/socket.c b/librabbitmq/win32/socket.c
index 9919b6b..f3c8fc7 100644
--- a/librabbitmq/win32/socket.c
+++ b/librabbitmq/win32/socket.c
@@ -48,55 +48,58 @@ static int called_wsastartup;
int amqp_socket_init(void)
{
- if (!called_wsastartup) {
- WSADATA data;
- int res = WSAStartup(0x0202, &data);
- if (res)
- return -res;
+ if (!called_wsastartup) {
+ WSADATA data;
+ int res = WSAStartup(0x0202, &data);
+ if (res) {
+ return -res;
+ }
- called_wsastartup = 1;
- }
+ called_wsastartup = 1;
+ }
- return 0;
+ return 0;
}
char *amqp_os_error_string(int err)
{
- char *msg, *copy;
+ char *msg, *copy;
- if (!FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM
- | FORMAT_MESSAGE_ALLOCATE_BUFFER,
- NULL, err,
- MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
- (LPSTR)&msg, 0, NULL))
- return strdup("(error retrieving Windows error message)");
+ if (!FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM
+ | FORMAT_MESSAGE_ALLOCATE_BUFFER,
+ NULL, err,
+ MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
+ (LPSTR)&msg, 0, NULL)) {
+ return strdup("(error retrieving Windows error message)");
+ }
- copy = strdup(msg);
- LocalFree(msg);
- return copy;
+ copy = strdup(msg);
+ LocalFree(msg);
+ return copy;
}
int
amqp_socket_setsockopt(int sock, int level, int optname,
- const void *optval, size_t optlen)
+ const void *optval, size_t optlen)
{
- /* the winsock setsockopt function has its 4th argument as a
- const char * */
- return setsockopt(sock, level, optname, (const char *)optval, optlen);
+ /* the winsock setsockopt function has its 4th argument as a
+ const char * */
+ return setsockopt(sock, level, optname, (const char *)optval, optlen);
}
int
amqp_socket_writev(int sock, struct iovec *iov, int nvecs)
{
- DWORD ret;
- if (WSASend(sock, (LPWSABUF)iov, nvecs, &ret, 0, NULL, NULL) == 0)
- return ret;
- else
- return -1;
+ DWORD ret;
+ if (WSASend(sock, (LPWSABUF)iov, nvecs, &ret, 0, NULL, NULL) == 0) {
+ return ret;
+ } else {
+ return -1;
+ }
}
int
amqp_socket_error(void)
{
- return WSAGetLastError() | ERROR_CATEGORY_OS;
+ return WSAGetLastError() | ERROR_CATEGORY_OS;
}
diff --git a/librabbitmq/win32/socket.h b/librabbitmq/win32/socket.h
index 9351b1f..43e2a13 100644
--- a/librabbitmq/win32/socket.h
+++ b/librabbitmq/win32/socket.h
@@ -39,8 +39,8 @@
/* same as WSABUF */
struct iovec {
- u_long iov_len;
- void *iov_base;
+ u_long iov_len;
+ void *iov_base;
};
int
@@ -51,7 +51,7 @@ amqp_socket_init(void);
int
amqp_socket_setsockopt(int sock, int level, int optname, const void *optval,
- size_t optlen);
+ size_t optlen);
int
amqp_socket_writev(int sock, struct iovec *iov, int nvecs);