summaryrefslogtreecommitdiff
path: root/librabbitmq/amqp_connection.c
diff options
context:
space:
mode:
Diffstat (limited to 'librabbitmq/amqp_connection.c')
-rw-r--r--librabbitmq/amqp_connection.c147
1 files changed, 86 insertions, 61 deletions
diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c
index 421c745..be7b1a8 100644
--- a/librabbitmq/amqp_connection.c
+++ b/librabbitmq/amqp_connection.c
@@ -46,36 +46,40 @@
#define INITIAL_DECODING_POOL_PAGE_SIZE 131072
#define INITIAL_INBOUND_SOCK_BUFFER_SIZE 131072
-#define ENFORCE_STATE(statevec, statenum) \
- { \
- amqp_connection_state_t _check_state = (statevec); \
- size_t _wanted_state = (statenum); \
- if (_check_state->state != _wanted_state) \
+#define ENFORCE_STATE(statevec, statenum) \
+ { \
+ amqp_connection_state_t _check_state = (statevec); \
+ size_t _wanted_state = (statenum); \
+ if (_check_state->state != _wanted_state) \
amqp_abort("Programming error: invalid AMQP connection state: expected %d, got %d", \
- _wanted_state, \
- _check_state->state); \
+ _wanted_state, \
+ _check_state->state); \
}
-amqp_connection_state_t amqp_new_connection(void) {
+amqp_connection_state_t amqp_new_connection(void)
+{
int res;
amqp_connection_state_t state =
(amqp_connection_state_t) calloc(1, sizeof(struct amqp_connection_state_t_));
- if (state == NULL)
+ if (state == NULL) {
return NULL;
+ }
init_amqp_pool(&state->frame_pool, INITIAL_FRAME_POOL_PAGE_SIZE);
init_amqp_pool(&state->decoding_pool, INITIAL_DECODING_POOL_PAGE_SIZE);
res = amqp_tune_connection(state, 0, INITIAL_FRAME_POOL_PAGE_SIZE, 0);
- if (-ERROR_NO_MEMORY == res)
+ if (-ERROR_NO_MEMORY == res) {
return NULL;
- else if (0 != res)
+ } else if (0 != res) {
goto out_nomem;
+ }
state->inbound_buffer.bytes = amqp_pool_alloc(&state->frame_pool, state->inbound_buffer.len);
- if (state->inbound_buffer.bytes == NULL)
+ if (state->inbound_buffer.bytes == NULL) {
goto out_nomem;
+ }
state->state = CONNECTION_STATE_INITIAL;
/* the server protocol version response is 8 bytes, which conveniently
@@ -85,12 +89,13 @@ amqp_connection_state_t amqp_new_connection(void) {
state->sockfd = -1;
state->sock_inbound_buffer.len = INITIAL_INBOUND_SOCK_BUFFER_SIZE;
state->sock_inbound_buffer.bytes = malloc(INITIAL_INBOUND_SOCK_BUFFER_SIZE);
- if (state->sock_inbound_buffer.bytes == NULL)
+ if (state->sock_inbound_buffer.bytes == NULL) {
goto out_nomem;
+ }
return state;
- out_nomem:
+out_nomem:
free(state->sock_inbound_buffer.bytes);
empty_amqp_pool(&state->frame_pool);
empty_amqp_pool(&state->decoding_pool);
@@ -98,20 +103,21 @@ amqp_connection_state_t amqp_new_connection(void) {
return NULL;
}
-int amqp_get_sockfd(amqp_connection_state_t state) {
+int amqp_get_sockfd(amqp_connection_state_t state)
+{
return state->sockfd;
}
void amqp_set_sockfd(amqp_connection_state_t state,
- int sockfd)
+ int sockfd)
{
state->sockfd = sockfd;
}
int amqp_tune_connection(amqp_connection_state_t state,
- int channel_max,
- int frame_max,
- int heartbeat)
+ int channel_max,
+ int frame_max,
+ int heartbeat)
{
void *newbuf;
@@ -136,11 +142,13 @@ int amqp_tune_connection(amqp_connection_state_t state,
return 0;
}
-int amqp_get_channel_max(amqp_connection_state_t state) {
+int amqp_get_channel_max(amqp_connection_state_t state)
+{
return state->channel_max;
}
-int amqp_destroy_connection(amqp_connection_state_t state) {
+int amqp_destroy_connection(amqp_connection_state_t state)
+{
int s = state->sockfd;
empty_amqp_pool(&state->frame_pool);
@@ -149,13 +157,15 @@ int amqp_destroy_connection(amqp_connection_state_t state) {
free(state->sock_inbound_buffer.bytes);
free(state);
- if (s >= 0 && amqp_socket_close(s) < 0)
+ if (s >= 0 && amqp_socket_close(s) < 0) {
return -amqp_socket_error();
- else
+ } else {
return 0;
+ }
}
-static void return_to_idle(amqp_connection_state_t state) {
+static void return_to_idle(amqp_connection_state_t state)
+{
state->inbound_buffer.bytes = NULL;
state->inbound_offset = 0;
state->target_size = HEADER_SIZE;
@@ -163,15 +173,16 @@ static void return_to_idle(amqp_connection_state_t state) {
}
static size_t consume_data(amqp_connection_state_t state,
- amqp_bytes_t *received_data)
+ amqp_bytes_t *received_data)
{
/* how much data is available and will fit? */
size_t bytes_consumed = state->target_size - state->inbound_offset;
- if (received_data->len < bytes_consumed)
+ if (received_data->len < bytes_consumed) {
bytes_consumed = received_data->len;
+ }
memcpy(amqp_offset(state->inbound_buffer.bytes, state->inbound_offset),
- received_data->bytes, bytes_consumed);
+ received_data->bytes, bytes_consumed);
state->inbound_offset += bytes_consumed;
received_data->bytes = amqp_offset(received_data->bytes, bytes_consumed);
received_data->len -= bytes_consumed;
@@ -180,8 +191,8 @@ static size_t consume_data(amqp_connection_state_t state,
}
int amqp_handle_input(amqp_connection_state_t state,
- amqp_bytes_t received_data,
- amqp_frame_t *decoded_frame)
+ amqp_bytes_t received_data,
+ amqp_frame_t *decoded_frame)
{
size_t bytes_consumed;
void *raw_frame;
@@ -190,17 +201,20 @@ int amqp_handle_input(amqp_connection_state_t state,
or a complete, ignored frame was read. */
decoded_frame->frame_type = 0;
- if (received_data.len == 0)
+ if (received_data.len == 0) {
return 0;
+ }
if (state->state == CONNECTION_STATE_IDLE) {
state->inbound_buffer.bytes = amqp_pool_alloc(&state->frame_pool,
- state->inbound_buffer.len);
+ state->inbound_buffer.len);
if (state->inbound_buffer.bytes == NULL)
/* state->inbound_buffer.len is always nonzero, because it
- corresponds to frame_max, which is not permitted to be less
- than AMQP_FRAME_MIN_SIZE (currently 4096 bytes). */
+ corresponds to frame_max, which is not permitted to be less
+ than AMQP_FRAME_MIN_SIZE (currently 4096 bytes). */
+ {
return -ERROR_NO_MEMORY;
+ }
state->state = CONNECTION_STATE_HEADER;
}
@@ -209,8 +223,9 @@ int amqp_handle_input(amqp_connection_state_t state,
/* do we have target_size data yet? if not, return with the
expectation that more will arrive */
- if (state->inbound_offset < state->target_size)
+ if (state->inbound_offset < state->target_size) {
return bytes_consumed;
+ }
raw_frame = state->inbound_buffer.bytes;
@@ -222,13 +237,13 @@ int amqp_handle_input(amqp_connection_state_t state,
decoded_frame->channel = 0;
decoded_frame->payload.protocol_header.transport_high
- = amqp_d8(raw_frame, 4);
+ = amqp_d8(raw_frame, 4);
decoded_frame->payload.protocol_header.transport_low
- = amqp_d8(raw_frame, 5);
+ = amqp_d8(raw_frame, 5);
decoded_frame->payload.protocol_header.protocol_version_major
- = amqp_d8(raw_frame, 6);
+ = amqp_d8(raw_frame, 6);
decoded_frame->payload.protocol_header.protocol_version_minor
- = amqp_d8(raw_frame, 7);
+ = amqp_d8(raw_frame, 7);
return_to_idle(state);
return bytes_consumed;
@@ -240,15 +255,16 @@ int amqp_handle_input(amqp_connection_state_t state,
case CONNECTION_STATE_HEADER:
/* frame length is 3 bytes in */
state->target_size
- = amqp_d32(raw_frame, 3) + HEADER_SIZE + FOOTER_SIZE;
+ = amqp_d32(raw_frame, 3) + HEADER_SIZE + FOOTER_SIZE;
state->state = CONNECTION_STATE_BODY;
bytes_consumed += consume_data(state, &received_data);
/* do we have target_size data yet? if not, return with the
expectation that more will arrive */
- if (state->inbound_offset < state->target_size)
+ if (state->inbound_offset < state->target_size) {
return bytes_consumed;
+ }
/* fall through to process body */
@@ -257,8 +273,9 @@ int amqp_handle_input(amqp_connection_state_t state,
int res;
/* Check frame end marker (footer) */
- if (amqp_d8(raw_frame, state->target_size - 1) != AMQP_FRAME_END)
+ if (amqp_d8(raw_frame, state->target_size - 1) != AMQP_FRAME_END) {
return -ERROR_BAD_AMQP_DATA;
+ }
decoded_frame->frame_type = amqp_d8(raw_frame, 0);
decoded_frame->channel = amqp_d16(raw_frame, 1);
@@ -270,19 +287,20 @@ int amqp_handle_input(amqp_connection_state_t state,
encoded.len = state->target_size - HEADER_SIZE - 4 - FOOTER_SIZE;
res = amqp_decode_method(decoded_frame->payload.method.id,
- &state->decoding_pool, encoded,
- &decoded_frame->payload.method.decoded);
- if (res < 0)
- return res;
+ &state->decoding_pool, encoded,
+ &decoded_frame->payload.method.decoded);
+ if (res < 0) {
+ return res;
+ }
break;
case AMQP_FRAME_HEADER:
decoded_frame->payload.properties.class_id
- = amqp_d16(raw_frame, HEADER_SIZE);
+ = amqp_d16(raw_frame, HEADER_SIZE);
/* unused 2-byte weight field goes here */
decoded_frame->payload.properties.body_size
- = amqp_d64(raw_frame, HEADER_SIZE + 4);
+ = amqp_d64(raw_frame, HEADER_SIZE + 4);
encoded.bytes = amqp_offset(raw_frame, HEADER_SIZE + 12);
encoded.len = state->target_size - HEADER_SIZE - 12 - FOOTER_SIZE;
decoded_frame->payload.properties.raw = encoded;
@@ -290,16 +308,17 @@ int amqp_handle_input(amqp_connection_state_t state,
res = amqp_decode_properties(decoded_frame->payload.properties.class_id,
&state->decoding_pool, encoded,
&decoded_frame->payload.properties.decoded);
- if (res < 0)
+ if (res < 0) {
return res;
+ }
break;
case AMQP_FRAME_BODY:
decoded_frame->payload.body_fragment.len
- = state->target_size - HEADER_SIZE - FOOTER_SIZE;
+ = state->target_size - HEADER_SIZE - FOOTER_SIZE;
decoded_frame->payload.body_fragment.bytes
- = amqp_offset(raw_frame, HEADER_SIZE);
+ = amqp_offset(raw_frame, HEADER_SIZE);
break;
case AMQP_FRAME_HEARTBEAT:
@@ -321,28 +340,32 @@ int amqp_handle_input(amqp_connection_state_t state,
}
}
-amqp_boolean_t amqp_release_buffers_ok(amqp_connection_state_t state) {
+amqp_boolean_t amqp_release_buffers_ok(amqp_connection_state_t state)
+{
return (state->state == CONNECTION_STATE_IDLE) && (state->first_queued_frame == NULL);
}
-void amqp_release_buffers(amqp_connection_state_t state) {
+void amqp_release_buffers(amqp_connection_state_t state)
+{
ENFORCE_STATE(state, CONNECTION_STATE_IDLE);
- if (state->first_queued_frame)
+ if (state->first_queued_frame) {
amqp_abort("Programming error: attempt to amqp_release_buffers while waiting events enqueued");
+ }
recycle_amqp_pool(&state->frame_pool);
recycle_amqp_pool(&state->decoding_pool);
}
-void amqp_maybe_release_buffers(amqp_connection_state_t state) {
+void amqp_maybe_release_buffers(amqp_connection_state_t state)
+{
if (amqp_release_buffers_ok(state)) {
amqp_release_buffers(state);
}
}
int amqp_send_frame(amqp_connection_state_t state,
- const amqp_frame_t *frame)
+ const amqp_frame_t *frame)
{
void *out_frame = state->outbound_buffer.bytes;
int res;
@@ -367,8 +390,7 @@ int amqp_send_frame(amqp_connection_state_t state,
iov[2].iov_len = FOOTER_SIZE;
res = amqp_socket_writev(state->sockfd, iov, 3);
- }
- else {
+ } else {
size_t out_frame_len;
amqp_bytes_t encoded;
@@ -381,8 +403,9 @@ int amqp_send_frame(amqp_connection_state_t state,
res = amqp_encode_method(frame->payload.method.id,
frame->payload.method.decoded, encoded);
- if (res < 0)
+ if (res < 0) {
return res;
+ }
out_frame_len = res + 4;
break;
@@ -397,8 +420,9 @@ int amqp_send_frame(amqp_connection_state_t state,
res = amqp_encode_properties(frame->payload.properties.class_id,
frame->payload.properties.decoded, encoded);
- if (res < 0)
+ if (res < 0) {
return res;
+ }
out_frame_len = res + 12;
break;
@@ -417,8 +441,9 @@ int amqp_send_frame(amqp_connection_state_t state,
out_frame_len + HEADER_SIZE + FOOTER_SIZE, MSG_NOSIGNAL);
}
- if (res < 0)
+ if (res < 0) {
return -amqp_socket_error();
- else
+ } else {
return 0;
+ }
}