diff options
author | Alan Antonuk <alan.antonuk@gmail.com> | 2013-04-08 14:52:53 -0700 |
---|---|---|
committer | Alan Antonuk <alan.antonuk@gmail.com> | 2013-04-08 14:52:53 -0700 |
commit | ad2b116059e22d393b7e44ad54f345a3fb4e267b (patch) | |
tree | cfd98876757f85d9fe7bfe56d179bd5fb63a8ff0 /librabbitmq | |
parent | f3074030723b840690458983b63c746549aa3bd5 (diff) | |
download | rabbitmq-c-github-ask-ad2b116059e22d393b7e44ad54f345a3fb4e267b.tar.gz |
Formatted source code with astyle utilty
Diffstat (limited to 'librabbitmq')
-rw-r--r-- | librabbitmq/amqp.h | 98 | ||||
-rw-r--r-- | librabbitmq/amqp_api.c | 86 | ||||
-rw-r--r-- | librabbitmq/amqp_connection.c | 147 | ||||
-rw-r--r-- | librabbitmq/amqp_mem.c | 48 | ||||
-rw-r--r-- | librabbitmq/amqp_private.h | 160 | ||||
-rw-r--r-- | librabbitmq/amqp_socket.c | 294 | ||||
-rw-r--r-- | librabbitmq/amqp_table.c | 143 | ||||
-rw-r--r-- | librabbitmq/amqp_url.c | 298 | ||||
-rw-r--r-- | librabbitmq/unix/socket.c | 35 | ||||
-rw-r--r-- | librabbitmq/win32/socket.c | 59 | ||||
-rw-r--r-- | librabbitmq/win32/socket.h | 6 |
11 files changed, 740 insertions, 634 deletions
diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h index 5de39d5..c379125 100644 --- a/librabbitmq/amqp.h +++ b/librabbitmq/amqp.h @@ -152,28 +152,28 @@ typedef struct amqp_array_t_ { t t Boolean b b Signed 8-bit B Unsigned 8-bit - U s Signed 16-bit (A1) + U s Signed 16-bit (A1) u Unsigned 16-bit - I I I Signed 32-bit - i Unsigned 32-bit - L l Signed 64-bit (B) - l Unsigned 64-bit - f f 32-bit float - d d 64-bit float - D D D Decimal - s Short string (A2) - S S S Long string - A Nested Array - T T T Timestamp (u64) - F F F Nested Table - V V V Void - x Byte array + I I I Signed 32-bit + i Unsigned 32-bit + L l Signed 64-bit (B) + l Unsigned 64-bit + f f 32-bit float + d d 64-bit float + D D D Decimal + s Short string (A2) + S S S Long string + A Nested Array + T T T Timestamp (u64) + F F F Nested Table + V V V Void + x Byte array Remarks: A1, A2: Notice how the types **CONFLICT** here. In Qpid and Rabbit, 's' means a signed 16-bit integer; in 0-9-1, it means a - short string. + short string. B: Notice how the signednesses **CONFLICT** here. In Qpid and Rabbit, 'l' means a signed 64-bit integer; in 0-9-1, it means an unsigned @@ -360,9 +360,9 @@ AMQP_CALL amqp_set_sockfd(amqp_connection_state_t state, int sockfd); AMQP_PUBLIC_FUNCTION int AMQP_CALL amqp_tune_connection(amqp_connection_state_t state, - int channel_max, - int frame_max, - int heartbeat); + int channel_max, + int frame_max, + int heartbeat); AMQP_PUBLIC_FUNCTION int @@ -375,8 +375,8 @@ AMQP_CALL amqp_destroy_connection(amqp_connection_state_t state); AMQP_PUBLIC_FUNCTION int AMQP_CALL amqp_handle_input(amqp_connection_state_t state, - amqp_bytes_t received_data, - amqp_frame_t *decoded_frame); + amqp_bytes_t received_data, + amqp_frame_t *decoded_frame); AMQP_PUBLIC_FUNCTION amqp_boolean_t @@ -413,37 +413,37 @@ AMQP_CALL amqp_frames_enqueued(amqp_connection_state_t state); AMQP_PUBLIC_FUNCTION int AMQP_CALL amqp_simple_wait_frame(amqp_connection_state_t state, - amqp_frame_t *decoded_frame); + amqp_frame_t *decoded_frame); AMQP_PUBLIC_FUNCTION int AMQP_CALL amqp_simple_wait_method(amqp_connection_state_t state, - amqp_channel_t expected_channel, - amqp_method_number_t expected_method, - amqp_method_t *output); + amqp_channel_t expected_channel, + amqp_method_number_t expected_method, + amqp_method_t *output); AMQP_PUBLIC_FUNCTION int AMQP_CALL amqp_send_method(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_method_number_t id, - void *decoded); + amqp_channel_t channel, + amqp_method_number_t id, + void *decoded); AMQP_PUBLIC_FUNCTION amqp_rpc_reply_t AMQP_CALL amqp_simple_rpc(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_method_number_t request_id, - amqp_method_number_t *expected_reply_ids, - void *decoded_request_method); + amqp_channel_t channel, + amqp_method_number_t request_id, + amqp_method_number_t *expected_reply_ids, + void *decoded_request_method); AMQP_PUBLIC_FUNCTION void * AMQP_CALL amqp_simple_rpc_decoded(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_method_number_t request_id, - amqp_method_number_t reply_id, - void *decoded_request_method); + amqp_channel_t channel, + amqp_method_number_t request_id, + amqp_method_number_t reply_id, + void *decoded_request_method); /* * The API methods corresponding to most synchronous AMQP methods @@ -465,23 +465,23 @@ AMQP_CALL amqp_get_rpc_reply(amqp_connection_state_t state); AMQP_PUBLIC_FUNCTION amqp_rpc_reply_t AMQP_CALL amqp_login(amqp_connection_state_t state, char const *vhost, - int channel_max, int frame_max, int heartbeat, - amqp_sasl_method_enum sasl_method, ...); + int channel_max, int frame_max, int heartbeat, + amqp_sasl_method_enum sasl_method, ...); struct amqp_basic_properties_t_; AMQP_PUBLIC_FUNCTION int AMQP_CALL amqp_basic_publish(amqp_connection_state_t state, amqp_channel_t channel, - amqp_bytes_t exchange, amqp_bytes_t routing_key, - amqp_boolean_t mandatory, amqp_boolean_t immediate, - struct amqp_basic_properties_t_ const *properties, - amqp_bytes_t body); + amqp_bytes_t exchange, amqp_bytes_t routing_key, + amqp_boolean_t mandatory, amqp_boolean_t immediate, + struct amqp_basic_properties_t_ const *properties, + amqp_bytes_t body); AMQP_PUBLIC_FUNCTION amqp_rpc_reply_t AMQP_CALL amqp_channel_close(amqp_connection_state_t state, amqp_channel_t channel, - int code); + int code); AMQP_PUBLIC_FUNCTION amqp_rpc_reply_t @@ -490,17 +490,17 @@ AMQP_CALL amqp_connection_close(amqp_connection_state_t state, int code); AMQP_PUBLIC_FUNCTION int AMQP_CALL amqp_basic_ack(amqp_connection_state_t state, amqp_channel_t channel, - uint64_t delivery_tag, amqp_boolean_t multiple); + uint64_t delivery_tag, amqp_boolean_t multiple); AMQP_PUBLIC_FUNCTION amqp_rpc_reply_t AMQP_CALL amqp_basic_get(amqp_connection_state_t state, amqp_channel_t channel, - amqp_bytes_t queue, amqp_boolean_t no_ack); + amqp_bytes_t queue, amqp_boolean_t no_ack); AMQP_PUBLIC_FUNCTION int AMQP_CALL amqp_basic_reject(amqp_connection_state_t state, amqp_channel_t channel, - uint64_t delivery_tag, amqp_boolean_t requeue); + uint64_t delivery_tag, amqp_boolean_t requeue); /* * Can be used to see if there is data still in the buffer, if so @@ -526,7 +526,7 @@ AMQP_CALL amqp_error_string(int err); AMQP_PUBLIC_FUNCTION int AMQP_CALL amqp_decode_table(amqp_bytes_t encoded, amqp_pool_t *pool, - amqp_table_t *output, size_t *offset); + amqp_table_t *output, size_t *offset); AMQP_PUBLIC_FUNCTION int @@ -541,11 +541,11 @@ struct amqp_connection_info { }; AMQP_PUBLIC_FUNCTION -void +void AMQP_CALL amqp_default_connection_info(struct amqp_connection_info *parsed); AMQP_PUBLIC_FUNCTION -int +int AMQP_CALL amqp_parse_url(char *url, struct amqp_connection_info *parsed); AMQP_END_DECLS diff --git a/librabbitmq/amqp_api.c b/librabbitmq/amqp_api.c index 6faabbb..aacaf91 100644 --- a/librabbitmq/amqp_api.c +++ b/librabbitmq/amqp_api.c @@ -62,10 +62,11 @@ char *amqp_error_string(int err) switch (category) { case ERROR_CATEGORY_CLIENT: - if (err < 1 || err > ERROR_MAX) + if (err < 1 || err > ERROR_MAX) { str = "(undefined librabbitmq error)"; - else + } else { str = client_error_strings[err - 1]; + } break; case ERROR_CATEGORY_OS: @@ -80,31 +81,31 @@ char *amqp_error_string(int err) void amqp_abort(const char *fmt, ...) { - va_list ap; - va_start(ap, fmt); - vfprintf(stderr, fmt, ap); - va_end(ap); - fputc('\n', stderr); - abort(); + va_list ap; + va_start(ap, fmt); + vfprintf(stderr, fmt, ap); + va_end(ap); + fputc('\n', stderr); + abort(); } const amqp_bytes_t amqp_empty_bytes = { 0, NULL }; const amqp_table_t amqp_empty_table = { 0, NULL }; const amqp_array_t amqp_empty_array = { 0, NULL }; -#define RPC_REPLY(replytype) \ - (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL \ - ? (replytype *) state->most_recent_api_result.reply.decoded \ +#define RPC_REPLY(replytype)\ + (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL\ + ? (replytype *) state->most_recent_api_result.reply.decoded\ : NULL) int amqp_basic_publish(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t exchange, - amqp_bytes_t routing_key, - amqp_boolean_t mandatory, - amqp_boolean_t immediate, - amqp_basic_properties_t const *properties, - amqp_bytes_t body) + amqp_channel_t channel, + amqp_bytes_t exchange, + amqp_bytes_t routing_key, + amqp_boolean_t mandatory, + amqp_boolean_t immediate, + amqp_basic_properties_t const *properties, + amqp_bytes_t body) { amqp_frame_t f; size_t body_offset; @@ -121,8 +122,9 @@ int amqp_basic_publish(amqp_connection_state_t state, m.ticket = 0; res = amqp_send_method(state, channel, AMQP_BASIC_PUBLISH_METHOD, &m); - if (res < 0) + if (res < 0) { return res; + } if (properties == NULL) { memset(&default_properties, 0, sizeof(default_properties)); @@ -136,15 +138,17 @@ int amqp_basic_publish(amqp_connection_state_t state, f.payload.properties.decoded = (void *) properties; res = amqp_send_frame(state, &f); - if (res < 0) + if (res < 0) { return res; + } body_offset = 0; while (body_offset < body.len) { size_t remaining = body.len - body_offset; - if (remaining == 0) + if (remaining == 0) { break; + } f.frame_type = AMQP_FRAME_BODY; f.channel = channel; @@ -157,16 +161,17 @@ int amqp_basic_publish(amqp_connection_state_t state, body_offset += f.payload.body_fragment.len; res = amqp_send_frame(state, &f); - if (res < 0) + if (res < 0) { return res; + } } return 0; } amqp_rpc_reply_t amqp_channel_close(amqp_connection_state_t state, - amqp_channel_t channel, - int code) + amqp_channel_t channel, + int code) { char codestr[13]; amqp_method_number_t replies[2] = { AMQP_CHANNEL_CLOSE_OK_METHOD, 0}; @@ -179,11 +184,11 @@ amqp_rpc_reply_t amqp_channel_close(amqp_connection_state_t state, req.method_id = 0; return amqp_simple_rpc(state, channel, AMQP_CHANNEL_CLOSE_METHOD, - replies, &req); + replies, &req); } amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state, - int code) + int code) { char codestr[13]; amqp_method_number_t replies[2] = { AMQP_CONNECTION_CLOSE_OK_METHOD, 0}; @@ -196,13 +201,13 @@ amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state, req.method_id = 0; return amqp_simple_rpc(state, 0, AMQP_CONNECTION_CLOSE_METHOD, - replies, &req); + replies, &req); } int amqp_basic_ack(amqp_connection_state_t state, - amqp_channel_t channel, - uint64_t delivery_tag, - amqp_boolean_t multiple) + amqp_channel_t channel, + uint64_t delivery_tag, + amqp_boolean_t multiple) { amqp_basic_ack_t m; m.delivery_tag = delivery_tag; @@ -211,28 +216,29 @@ int amqp_basic_ack(amqp_connection_state_t state, } amqp_rpc_reply_t amqp_basic_get(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_boolean_t no_ack) + amqp_channel_t channel, + amqp_bytes_t queue, + amqp_boolean_t no_ack) { amqp_method_number_t replies[] = { AMQP_BASIC_GET_OK_METHOD, - AMQP_BASIC_GET_EMPTY_METHOD, - 0 }; + AMQP_BASIC_GET_EMPTY_METHOD, + 0 + }; amqp_basic_get_t req; req.ticket = 0; req.queue = queue; req.no_ack = no_ack; state->most_recent_api_result = amqp_simple_rpc(state, channel, - AMQP_BASIC_GET_METHOD, - replies, &req); + AMQP_BASIC_GET_METHOD, + replies, &req); return state->most_recent_api_result; } int amqp_basic_reject(amqp_connection_state_t state, - amqp_channel_t channel, - uint64_t delivery_tag, - amqp_boolean_t requeue) + amqp_channel_t channel, + uint64_t delivery_tag, + amqp_boolean_t requeue) { amqp_basic_reject_t req; req.delivery_tag = delivery_tag; diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c index 421c745..be7b1a8 100644 --- a/librabbitmq/amqp_connection.c +++ b/librabbitmq/amqp_connection.c @@ -46,36 +46,40 @@ #define INITIAL_DECODING_POOL_PAGE_SIZE 131072 #define INITIAL_INBOUND_SOCK_BUFFER_SIZE 131072 -#define ENFORCE_STATE(statevec, statenum) \ - { \ - amqp_connection_state_t _check_state = (statevec); \ - size_t _wanted_state = (statenum); \ - if (_check_state->state != _wanted_state) \ +#define ENFORCE_STATE(statevec, statenum) \ + { \ + amqp_connection_state_t _check_state = (statevec); \ + size_t _wanted_state = (statenum); \ + if (_check_state->state != _wanted_state) \ amqp_abort("Programming error: invalid AMQP connection state: expected %d, got %d", \ - _wanted_state, \ - _check_state->state); \ + _wanted_state, \ + _check_state->state); \ } -amqp_connection_state_t amqp_new_connection(void) { +amqp_connection_state_t amqp_new_connection(void) +{ int res; amqp_connection_state_t state = (amqp_connection_state_t) calloc(1, sizeof(struct amqp_connection_state_t_)); - if (state == NULL) + if (state == NULL) { return NULL; + } init_amqp_pool(&state->frame_pool, INITIAL_FRAME_POOL_PAGE_SIZE); init_amqp_pool(&state->decoding_pool, INITIAL_DECODING_POOL_PAGE_SIZE); res = amqp_tune_connection(state, 0, INITIAL_FRAME_POOL_PAGE_SIZE, 0); - if (-ERROR_NO_MEMORY == res) + if (-ERROR_NO_MEMORY == res) { return NULL; - else if (0 != res) + } else if (0 != res) { goto out_nomem; + } state->inbound_buffer.bytes = amqp_pool_alloc(&state->frame_pool, state->inbound_buffer.len); - if (state->inbound_buffer.bytes == NULL) + if (state->inbound_buffer.bytes == NULL) { goto out_nomem; + } state->state = CONNECTION_STATE_INITIAL; /* the server protocol version response is 8 bytes, which conveniently @@ -85,12 +89,13 @@ amqp_connection_state_t amqp_new_connection(void) { state->sockfd = -1; state->sock_inbound_buffer.len = INITIAL_INBOUND_SOCK_BUFFER_SIZE; state->sock_inbound_buffer.bytes = malloc(INITIAL_INBOUND_SOCK_BUFFER_SIZE); - if (state->sock_inbound_buffer.bytes == NULL) + if (state->sock_inbound_buffer.bytes == NULL) { goto out_nomem; + } return state; - out_nomem: +out_nomem: free(state->sock_inbound_buffer.bytes); empty_amqp_pool(&state->frame_pool); empty_amqp_pool(&state->decoding_pool); @@ -98,20 +103,21 @@ amqp_connection_state_t amqp_new_connection(void) { return NULL; } -int amqp_get_sockfd(amqp_connection_state_t state) { +int amqp_get_sockfd(amqp_connection_state_t state) +{ return state->sockfd; } void amqp_set_sockfd(amqp_connection_state_t state, - int sockfd) + int sockfd) { state->sockfd = sockfd; } int amqp_tune_connection(amqp_connection_state_t state, - int channel_max, - int frame_max, - int heartbeat) + int channel_max, + int frame_max, + int heartbeat) { void *newbuf; @@ -136,11 +142,13 @@ int amqp_tune_connection(amqp_connection_state_t state, return 0; } -int amqp_get_channel_max(amqp_connection_state_t state) { +int amqp_get_channel_max(amqp_connection_state_t state) +{ return state->channel_max; } -int amqp_destroy_connection(amqp_connection_state_t state) { +int amqp_destroy_connection(amqp_connection_state_t state) +{ int s = state->sockfd; empty_amqp_pool(&state->frame_pool); @@ -149,13 +157,15 @@ int amqp_destroy_connection(amqp_connection_state_t state) { free(state->sock_inbound_buffer.bytes); free(state); - if (s >= 0 && amqp_socket_close(s) < 0) + if (s >= 0 && amqp_socket_close(s) < 0) { return -amqp_socket_error(); - else + } else { return 0; + } } -static void return_to_idle(amqp_connection_state_t state) { +static void return_to_idle(amqp_connection_state_t state) +{ state->inbound_buffer.bytes = NULL; state->inbound_offset = 0; state->target_size = HEADER_SIZE; @@ -163,15 +173,16 @@ static void return_to_idle(amqp_connection_state_t state) { } static size_t consume_data(amqp_connection_state_t state, - amqp_bytes_t *received_data) + amqp_bytes_t *received_data) { /* how much data is available and will fit? */ size_t bytes_consumed = state->target_size - state->inbound_offset; - if (received_data->len < bytes_consumed) + if (received_data->len < bytes_consumed) { bytes_consumed = received_data->len; + } memcpy(amqp_offset(state->inbound_buffer.bytes, state->inbound_offset), - received_data->bytes, bytes_consumed); + received_data->bytes, bytes_consumed); state->inbound_offset += bytes_consumed; received_data->bytes = amqp_offset(received_data->bytes, bytes_consumed); received_data->len -= bytes_consumed; @@ -180,8 +191,8 @@ static size_t consume_data(amqp_connection_state_t state, } int amqp_handle_input(amqp_connection_state_t state, - amqp_bytes_t received_data, - amqp_frame_t *decoded_frame) + amqp_bytes_t received_data, + amqp_frame_t *decoded_frame) { size_t bytes_consumed; void *raw_frame; @@ -190,17 +201,20 @@ int amqp_handle_input(amqp_connection_state_t state, or a complete, ignored frame was read. */ decoded_frame->frame_type = 0; - if (received_data.len == 0) + if (received_data.len == 0) { return 0; + } if (state->state == CONNECTION_STATE_IDLE) { state->inbound_buffer.bytes = amqp_pool_alloc(&state->frame_pool, - state->inbound_buffer.len); + state->inbound_buffer.len); if (state->inbound_buffer.bytes == NULL) /* state->inbound_buffer.len is always nonzero, because it - corresponds to frame_max, which is not permitted to be less - than AMQP_FRAME_MIN_SIZE (currently 4096 bytes). */ + corresponds to frame_max, which is not permitted to be less + than AMQP_FRAME_MIN_SIZE (currently 4096 bytes). */ + { return -ERROR_NO_MEMORY; + } state->state = CONNECTION_STATE_HEADER; } @@ -209,8 +223,9 @@ int amqp_handle_input(amqp_connection_state_t state, /* do we have target_size data yet? if not, return with the expectation that more will arrive */ - if (state->inbound_offset < state->target_size) + if (state->inbound_offset < state->target_size) { return bytes_consumed; + } raw_frame = state->inbound_buffer.bytes; @@ -222,13 +237,13 @@ int amqp_handle_input(amqp_connection_state_t state, decoded_frame->channel = 0; decoded_frame->payload.protocol_header.transport_high - = amqp_d8(raw_frame, 4); + = amqp_d8(raw_frame, 4); decoded_frame->payload.protocol_header.transport_low - = amqp_d8(raw_frame, 5); + = amqp_d8(raw_frame, 5); decoded_frame->payload.protocol_header.protocol_version_major - = amqp_d8(raw_frame, 6); + = amqp_d8(raw_frame, 6); decoded_frame->payload.protocol_header.protocol_version_minor - = amqp_d8(raw_frame, 7); + = amqp_d8(raw_frame, 7); return_to_idle(state); return bytes_consumed; @@ -240,15 +255,16 @@ int amqp_handle_input(amqp_connection_state_t state, case CONNECTION_STATE_HEADER: /* frame length is 3 bytes in */ state->target_size - = amqp_d32(raw_frame, 3) + HEADER_SIZE + FOOTER_SIZE; + = amqp_d32(raw_frame, 3) + HEADER_SIZE + FOOTER_SIZE; state->state = CONNECTION_STATE_BODY; bytes_consumed += consume_data(state, &received_data); /* do we have target_size data yet? if not, return with the expectation that more will arrive */ - if (state->inbound_offset < state->target_size) + if (state->inbound_offset < state->target_size) { return bytes_consumed; + } /* fall through to process body */ @@ -257,8 +273,9 @@ int amqp_handle_input(amqp_connection_state_t state, int res; /* Check frame end marker (footer) */ - if (amqp_d8(raw_frame, state->target_size - 1) != AMQP_FRAME_END) + if (amqp_d8(raw_frame, state->target_size - 1) != AMQP_FRAME_END) { return -ERROR_BAD_AMQP_DATA; + } decoded_frame->frame_type = amqp_d8(raw_frame, 0); decoded_frame->channel = amqp_d16(raw_frame, 1); @@ -270,19 +287,20 @@ int amqp_handle_input(amqp_connection_state_t state, encoded.len = state->target_size - HEADER_SIZE - 4 - FOOTER_SIZE; res = amqp_decode_method(decoded_frame->payload.method.id, - &state->decoding_pool, encoded, - &decoded_frame->payload.method.decoded); - if (res < 0) - return res; + &state->decoding_pool, encoded, + &decoded_frame->payload.method.decoded); + if (res < 0) { + return res; + } break; case AMQP_FRAME_HEADER: decoded_frame->payload.properties.class_id - = amqp_d16(raw_frame, HEADER_SIZE); + = amqp_d16(raw_frame, HEADER_SIZE); /* unused 2-byte weight field goes here */ decoded_frame->payload.properties.body_size - = amqp_d64(raw_frame, HEADER_SIZE + 4); + = amqp_d64(raw_frame, HEADER_SIZE + 4); encoded.bytes = amqp_offset(raw_frame, HEADER_SIZE + 12); encoded.len = state->target_size - HEADER_SIZE - 12 - FOOTER_SIZE; decoded_frame->payload.properties.raw = encoded; @@ -290,16 +308,17 @@ int amqp_handle_input(amqp_connection_state_t state, res = amqp_decode_properties(decoded_frame->payload.properties.class_id, &state->decoding_pool, encoded, &decoded_frame->payload.properties.decoded); - if (res < 0) + if (res < 0) { return res; + } break; case AMQP_FRAME_BODY: decoded_frame->payload.body_fragment.len - = state->target_size - HEADER_SIZE - FOOTER_SIZE; + = state->target_size - HEADER_SIZE - FOOTER_SIZE; decoded_frame->payload.body_fragment.bytes - = amqp_offset(raw_frame, HEADER_SIZE); + = amqp_offset(raw_frame, HEADER_SIZE); break; case AMQP_FRAME_HEARTBEAT: @@ -321,28 +340,32 @@ int amqp_handle_input(amqp_connection_state_t state, } } -amqp_boolean_t amqp_release_buffers_ok(amqp_connection_state_t state) { +amqp_boolean_t amqp_release_buffers_ok(amqp_connection_state_t state) +{ return (state->state == CONNECTION_STATE_IDLE) && (state->first_queued_frame == NULL); } -void amqp_release_buffers(amqp_connection_state_t state) { +void amqp_release_buffers(amqp_connection_state_t state) +{ ENFORCE_STATE(state, CONNECTION_STATE_IDLE); - if (state->first_queued_frame) + if (state->first_queued_frame) { amqp_abort("Programming error: attempt to amqp_release_buffers while waiting events enqueued"); + } recycle_amqp_pool(&state->frame_pool); recycle_amqp_pool(&state->decoding_pool); } -void amqp_maybe_release_buffers(amqp_connection_state_t state) { +void amqp_maybe_release_buffers(amqp_connection_state_t state) +{ if (amqp_release_buffers_ok(state)) { amqp_release_buffers(state); } } int amqp_send_frame(amqp_connection_state_t state, - const amqp_frame_t *frame) + const amqp_frame_t *frame) { void *out_frame = state->outbound_buffer.bytes; int res; @@ -367,8 +390,7 @@ int amqp_send_frame(amqp_connection_state_t state, iov[2].iov_len = FOOTER_SIZE; res = amqp_socket_writev(state->sockfd, iov, 3); - } - else { + } else { size_t out_frame_len; amqp_bytes_t encoded; @@ -381,8 +403,9 @@ int amqp_send_frame(amqp_connection_state_t state, res = amqp_encode_method(frame->payload.method.id, frame->payload.method.decoded, encoded); - if (res < 0) + if (res < 0) { return res; + } out_frame_len = res + 4; break; @@ -397,8 +420,9 @@ int amqp_send_frame(amqp_connection_state_t state, res = amqp_encode_properties(frame->payload.properties.class_id, frame->payload.properties.decoded, encoded); - if (res < 0) + if (res < 0) { return res; + } out_frame_len = res + 12; break; @@ -417,8 +441,9 @@ int amqp_send_frame(amqp_connection_state_t state, out_frame_len + HEADER_SIZE + FOOTER_SIZE, MSG_NOSIGNAL); } - if (res < 0) + if (res < 0) { return -amqp_socket_error(); - else + } else { return 0; + } } diff --git a/librabbitmq/amqp_mem.c b/librabbitmq/amqp_mem.c index 6915763..0f49df2 100644 --- a/librabbitmq/amqp_mem.c +++ b/librabbitmq/amqp_mem.c @@ -43,11 +43,13 @@ #include <string.h> #include <sys/types.h> -char const *amqp_version(void) { +char const *amqp_version(void) +{ return VERSION; /* defined in config.h */ } -void init_amqp_pool(amqp_pool_t *pool, size_t pagesize) { +void init_amqp_pool(amqp_pool_t *pool, size_t pagesize) +{ pool->pagesize = pagesize ? pagesize : 4096; pool->pages.num_blocks = 0; @@ -61,7 +63,8 @@ void init_amqp_pool(amqp_pool_t *pool, size_t pagesize) { pool->alloc_used = 0; } -static void empty_blocklist(amqp_pool_blocklist_t *x) { +static void empty_blocklist(amqp_pool_blocklist_t *x) +{ int i; for (i = 0; i < x->num_blocks; i++) { @@ -74,30 +77,35 @@ static void empty_blocklist(amqp_pool_blocklist_t *x) { x->blocklist = NULL; } -void recycle_amqp_pool(amqp_pool_t *pool) { +void recycle_amqp_pool(amqp_pool_t *pool) +{ empty_blocklist(&pool->large_blocks); pool->next_page = 0; pool->alloc_block = NULL; pool->alloc_used = 0; } -void empty_amqp_pool(amqp_pool_t *pool) { +void empty_amqp_pool(amqp_pool_t *pool) +{ recycle_amqp_pool(pool); empty_blocklist(&pool->pages); } /* Returns 1 on success, 0 on failure */ -static int record_pool_block(amqp_pool_blocklist_t *x, void *block) { +static int record_pool_block(amqp_pool_blocklist_t *x, void *block) +{ size_t blocklistlength = sizeof(void *) * (x->num_blocks + 1); if (x->blocklist == NULL) { x->blocklist = malloc(blocklistlength); - if (x->blocklist == NULL) + if (x->blocklist == NULL) { return 0; + } } else { void *newbl = realloc(x->blocklist, blocklistlength); - if (newbl == NULL) + if (newbl == NULL) { return 0; + } x->blocklist = newbl; } @@ -106,7 +114,8 @@ static int record_pool_block(amqp_pool_blocklist_t *x, void *block) { return 1; } -void *amqp_pool_alloc(amqp_pool_t *pool, size_t amount) { +void *amqp_pool_alloc(amqp_pool_t *pool, size_t amount) +{ if (amount == 0) { return NULL; } @@ -118,8 +127,9 @@ void *amqp_pool_alloc(amqp_pool_t *pool, size_t amount) { if (result == NULL) { return NULL; } - if (!record_pool_block(&pool->large_blocks, result)) + if (!record_pool_block(&pool->large_blocks, result)) { return NULL; + } return result; } @@ -138,8 +148,9 @@ void *amqp_pool_alloc(amqp_pool_t *pool, size_t amount) { if (pool->alloc_block == NULL) { return NULL; } - if (!record_pool_block(&pool->pages, pool->alloc_block)) + if (!record_pool_block(&pool->pages, pool->alloc_block)) { return NULL; + } pool->next_page = pool->pages.num_blocks; } else { pool->alloc_block = pool->pages.blocklist[pool->next_page]; @@ -151,19 +162,22 @@ void *amqp_pool_alloc(amqp_pool_t *pool, size_t amount) { return pool->alloc_block; } -void amqp_pool_alloc_bytes(amqp_pool_t *pool, size_t amount, amqp_bytes_t *output) { +void amqp_pool_alloc_bytes(amqp_pool_t *pool, size_t amount, amqp_bytes_t *output) +{ output->len = amount; output->bytes = amqp_pool_alloc(pool, amount); } -amqp_bytes_t amqp_cstring_bytes(char const *cstr) { +amqp_bytes_t amqp_cstring_bytes(char const *cstr) +{ amqp_bytes_t result; result.len = strlen(cstr); result.bytes = (void *) cstr; return result; } -amqp_bytes_t amqp_bytes_malloc_dup(amqp_bytes_t src) { +amqp_bytes_t amqp_bytes_malloc_dup(amqp_bytes_t src) +{ amqp_bytes_t result; result.len = src.len; result.bytes = malloc(src.len); @@ -173,13 +187,15 @@ amqp_bytes_t amqp_bytes_malloc_dup(amqp_bytes_t src) { return result; } -amqp_bytes_t amqp_bytes_malloc(size_t amount) { +amqp_bytes_t amqp_bytes_malloc(size_t amount) +{ amqp_bytes_t result; result.len = amount; result.bytes = malloc(amount); /* will return NULL if it fails */ return result; } -void amqp_bytes_free(amqp_bytes_t bytes) { +void amqp_bytes_free(amqp_bytes_t bytes) +{ free(bytes.bytes); } diff --git a/librabbitmq/amqp_private.h b/librabbitmq/amqp_private.h index 725e3c5..191155e 100644 --- a/librabbitmq/amqp_private.h +++ b/librabbitmq/amqp_private.h @@ -159,53 +159,53 @@ static inline void *amqp_offset(void *data, size_t offset) /* This macro defines the encoding and decoding functions associated with a simple type. */ -#define DECLARE_CODEC_BASE_TYPE(bits, htonx, ntohx) \ - \ -static inline void amqp_e##bits(void *data, size_t offset, \ - uint##bits##_t val) \ -{ \ - /* The AMQP data might be unaligned. So we encode and then copy the \ - result into place. */ \ - uint##bits##_t res = htonx(val); \ - memcpy(amqp_offset(data, offset), &res, bits/8); \ -} \ - \ -static inline uint##bits##_t amqp_d##bits(void *data, size_t offset) \ -{ \ - /* The AMQP data might be unaligned. So we copy the source value \ - into a variable and then decode it. */ \ - uint##bits##_t val; \ - memcpy(&val, amqp_offset(data, offset), bits/8); \ - return ntohx(val); \ -} \ - \ -static inline int amqp_encode_##bits(amqp_bytes_t encoded, size_t *offset, \ - uint##bits##_t input) \ - \ -{ \ - size_t o = *offset; \ - if ((*offset = o + bits / 8) <= encoded.len) { \ - amqp_e##bits(encoded.bytes, o, input); \ - return 1; \ - } \ - else { \ - return 0; \ - } \ -} \ - \ -static inline int amqp_decode_##bits(amqp_bytes_t encoded, size_t *offset, \ - uint##bits##_t *output) \ - \ -{ \ - size_t o = *offset; \ - if ((*offset = o + bits / 8) <= encoded.len) { \ - *output = amqp_d##bits(encoded.bytes, o); \ - return 1; \ - } \ - else { \ - return 0; \ - } \ -} +#define DECLARE_CODEC_BASE_TYPE(bits, htonx, ntohx) \ + \ + static inline void amqp_e##bits(void *data, size_t offset, \ + uint##bits##_t val) \ + { \ + /* The AMQP data might be unaligned. So we encode and then copy the \ + result into place. */ \ + uint##bits##_t res = htonx(val); \ + memcpy(amqp_offset(data, offset), &res, bits/8); \ + } \ + \ + static inline uint##bits##_t amqp_d##bits(void *data, size_t offset) \ + { \ + /* The AMQP data might be unaligned. So we copy the source value \ + into a variable and then decode it. */ \ + uint##bits##_t val; \ + memcpy(&val, amqp_offset(data, offset), bits/8); \ + return ntohx(val); \ + } \ + \ + static inline int amqp_encode_##bits(amqp_bytes_t encoded, size_t *offset, \ + uint##bits##_t input) \ + \ + { \ + size_t o = *offset; \ + if ((*offset = o + bits / 8) <= encoded.len) { \ + amqp_e##bits(encoded.bytes, o, input); \ + return 1; \ + } \ + else { \ + return 0; \ + } \ + } \ + \ + static inline int amqp_decode_##bits(amqp_bytes_t encoded, size_t *offset, \ + uint##bits##_t *output) \ + \ + { \ + size_t o = *offset; \ + if ((*offset = o + bits / 8) <= encoded.len) { \ + *output = amqp_d##bits(encoded.bytes, o); \ + return 1; \ + } \ + else { \ + return 0; \ + } \ + } /* Determine byte order */ #if defined(__GLIBC__) @@ -215,7 +215,7 @@ static inline int amqp_decode_##bits(amqp_bytes_t encoded, size_t *offset, \ # elif (__BYTE_ORDER == __BIG_ENDIAN) # define AMQP_BIG_ENDIAN # else - /* Don't define anything */ +/* Don't define anything */ # endif #elif defined(_BIG_ENDIAN) && !defined(_LITTLE_ENDIAN) || \ defined(__BIG_ENDIAN__) && !defined(__LITTLE_ENDIAN__) @@ -235,40 +235,40 @@ static inline int amqp_decode_##bits(amqp_bytes_t encoded, size_t *offset, \ defined(__i386__) || defined(_M_IX86) # define AMQP_LITTLE_ENDIAN #else - /* Don't define anything */ +/* Don't define anything */ #endif #if defined(AMQP_LITTLE_ENDIAN) -#define DECLARE_XTOXLL(func) \ -static inline uint64_t func##ll(uint64_t val) \ -{ \ - union { \ - uint64_t whole; \ - uint32_t halves[2]; \ - } u; \ - uint32_t t; \ - u.whole = val; \ - t = u.halves[0]; \ - u.halves[0] = func##l(u.halves[1]); \ - u.halves[1] = func##l(t); \ - return u.whole; \ -} +#define DECLARE_XTOXLL(func) \ + static inline uint64_t func##ll(uint64_t val) \ + { \ + union { \ + uint64_t whole; \ + uint32_t halves[2]; \ + } u; \ + uint32_t t; \ + u.whole = val; \ + t = u.halves[0]; \ + u.halves[0] = func##l(u.halves[1]); \ + u.halves[1] = func##l(t); \ + return u.whole; \ + } #elif defined(AMQP_BIG_ENDIAN) -#define DECLARE_XTOXLL(func) \ -static inline uint64_t func##ll(uint64_t val) \ -{ \ - union { \ - uint64_t whole; \ - uint32_t halves[2]; \ - } u; \ - u.whole = val; \ - u.halves[0] = func##l(u.halves[0]); \ - u.halves[1] = func##l(u.halves[1]); \ - return u.whole; \ -} +#define DECLARE_XTOXLL(func) \ + static inline uint64_t func##ll(uint64_t val) \ + { \ + union { \ + uint64_t whole; \ + uint32_t halves[2]; \ + } u; \ + u.whole = val; \ + u.halves[0] = func##l(u.halves[0]); \ + u.halves[1] = func##l(u.halves[1]); \ + return u.whole; \ + } #else # error Endianness not known @@ -285,28 +285,26 @@ DECLARE_CODEC_BASE_TYPE(32, htonl, ntohl) DECLARE_CODEC_BASE_TYPE(64, htonll, ntohll) static inline int amqp_encode_bytes(amqp_bytes_t encoded, size_t *offset, - amqp_bytes_t input) + amqp_bytes_t input) { size_t o = *offset; if ((*offset = o + input.len) <= encoded.len) { memcpy(amqp_offset(encoded.bytes, o), input.bytes, input.len); return 1; - } - else { + } else { return 0; } } static inline int amqp_decode_bytes(amqp_bytes_t encoded, size_t *offset, - amqp_bytes_t *output, size_t len) + amqp_bytes_t *output, size_t len) { size_t o = *offset; if ((*offset = o + len) <= encoded.len) { output->bytes = amqp_offset(encoded.bytes, o); output->len = len; return 1; - } - else { + } else { return 0; } } diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c index c323792..8454d45 100644 --- a/librabbitmq/amqp_socket.c +++ b/librabbitmq/amqp_socket.c @@ -44,7 +44,7 @@ #include <assert.h> int amqp_open_socket(char const *hostname, - int portnumber) + int portnumber) { struct addrinfo hint; struct addrinfo *address_list; @@ -54,8 +54,9 @@ int amqp_open_socket(char const *hostname, int last_error = 0; int one = 1; /* for setsockopt */ - if (0 != (last_error = amqp_socket_init())) + if (0 != (last_error = amqp_socket_init())) { return last_error; + } memset(&hint, 0, sizeof(hint)); hint.ai_family = PF_UNSPEC; /* PF_INET or PF_INET6 */ @@ -66,63 +67,58 @@ int amqp_open_socket(char const *hostname, last_error = getaddrinfo(hostname, portnumber_string, &hint, &address_list); - if (last_error != 0) - { + if (last_error != 0) { return -ERROR_GETHOSTBYNAME_FAILED; } - for (addr = address_list; addr; addr = addr->ai_next) - { + for (addr = address_list; addr; addr = addr->ai_next) { /* This cast is to squash warnings on Win64, see: http://stackoverflow.com/questions/1953639/is-it-safe-to-cast-socket-to-int-under-win64 */ sockfd = (int)socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol); - if (-1 == sockfd) - { + if (-1 == sockfd) { last_error = -amqp_socket_error(); continue; } #ifdef DISABLE_SIGPIPE_WITH_SETSOCKOPT - if (0 != amqp_socket_setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one))) - { + if (0 != amqp_socket_setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one))) { last_error = -amqp_socket_error(); amqp_socket_close(sockfd); continue; } #endif /* DISABLE_SIGPIPE_WITH_SETSOCKOPT */ if (0 != amqp_socket_setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one)) - || 0 != connect(sockfd, addr->ai_addr, addr->ai_addrlen)) - { + || 0 != connect(sockfd, addr->ai_addr, addr->ai_addrlen)) { last_error = -amqp_socket_error(); amqp_socket_close(sockfd); continue; - } - else - { + } else { last_error = 0; break; } } freeaddrinfo(address_list); - if (last_error != 0) - { + if (last_error != 0) { return last_error; } return sockfd; } -int amqp_send_header(amqp_connection_state_t state) { +int amqp_send_header(amqp_connection_state_t state) +{ static const uint8_t header[8] = { 'A', 'M', 'Q', 'P', 0, - AMQP_PROTOCOL_VERSION_MAJOR, - AMQP_PROTOCOL_VERSION_MINOR, - AMQP_PROTOCOL_VERSION_REVISION }; + AMQP_PROTOCOL_VERSION_MAJOR, + AMQP_PROTOCOL_VERSION_MINOR, + AMQP_PROTOCOL_VERSION_REVISION + }; return send(state->sockfd, (void *)header, 8, MSG_NOSIGNAL); } -static amqp_bytes_t sasl_method_name(amqp_sasl_method_enum method) { +static amqp_bytes_t sasl_method_name(amqp_sasl_method_enum method) +{ amqp_bytes_t res; switch (method) { @@ -139,40 +135,43 @@ static amqp_bytes_t sasl_method_name(amqp_sasl_method_enum method) { } static amqp_bytes_t sasl_response(amqp_pool_t *pool, - amqp_sasl_method_enum method, - va_list args) + amqp_sasl_method_enum method, + va_list args) { amqp_bytes_t response; switch (method) { - case AMQP_SASL_METHOD_PLAIN: { - char *username = va_arg(args, char *); - size_t username_len = strlen(username); - char *password = va_arg(args, char *); - size_t password_len = strlen(password); - char *response_buf; - - amqp_pool_alloc_bytes(pool, strlen(username) + strlen(password) + 2, &response); - if (response.bytes == NULL) - /* We never request a zero-length block, because of the +2 - above, so a NULL here really is ENOMEM. */ - return response; - - response_buf = response.bytes; - response_buf[0] = 0; - memcpy(response_buf + 1, username, username_len); - response_buf[username_len + 1] = 0; - memcpy(response_buf + username_len + 2, password, password_len); - break; + case AMQP_SASL_METHOD_PLAIN: { + char *username = va_arg(args, char *); + size_t username_len = strlen(username); + char *password = va_arg(args, char *); + size_t password_len = strlen(password); + char *response_buf; + + amqp_pool_alloc_bytes(pool, strlen(username) + strlen(password) + 2, &response); + if (response.bytes == NULL) + /* We never request a zero-length block, because of the +2 + above, so a NULL here really is ENOMEM. */ + { + return response; } - default: - amqp_abort("Invalid SASL method: %d", (int) method); + + response_buf = response.bytes; + response_buf[0] = 0; + memcpy(response_buf + 1, username, username_len); + response_buf[username_len + 1] = 0; + memcpy(response_buf + username_len + 2, password, password_len); + break; + } + default: + amqp_abort("Invalid SASL method: %d", (int) method); } return response; } -amqp_boolean_t amqp_frames_enqueued(amqp_connection_state_t state) { +amqp_boolean_t amqp_frames_enqueued(amqp_connection_state_t state) +{ return (state->first_queued_frame != NULL); } @@ -180,12 +179,13 @@ amqp_boolean_t amqp_frames_enqueued(amqp_connection_state_t state) { * Check to see if we have data in our buffer. If this returns 1, we * will avoid an immediate blocking read in amqp_simple_wait_frame. */ -amqp_boolean_t amqp_data_in_buffer(amqp_connection_state_t state) { +amqp_boolean_t amqp_data_in_buffer(amqp_connection_state_t state) +{ return (state->sock_inbound_offset < state->sock_inbound_limit); } static int wait_frame_inner(amqp_connection_state_t state, - amqp_frame_t *decoded_frame) + amqp_frame_t *decoded_frame) { while (1) { int res; @@ -196,26 +196,29 @@ static int wait_frame_inner(amqp_connection_state_t state, buffer.bytes = ((char *) state->sock_inbound_buffer.bytes) + state->sock_inbound_offset; res = amqp_handle_input(state, buffer, decoded_frame); - if (res < 0) - return res; + if (res < 0) { + return res; + } state->sock_inbound_offset += res; - if (decoded_frame->frame_type != 0) - /* Complete frame was read. Return it. */ - return 0; + if (decoded_frame->frame_type != 0) { + /* Complete frame was read. Return it. */ + return 0; + } /* Incomplete or ignored frame. Keep processing input. */ assert(res != 0); } res = recv(state->sockfd, state->sock_inbound_buffer.bytes, - state->sock_inbound_buffer.len, 0); + state->sock_inbound_buffer.len, 0); if (res <= 0) { - if (res == 0) - return -ERROR_CONNECTION_CLOSED; - else - return -amqp_socket_error(); + if (res == 0) { + return -ERROR_CONNECTION_CLOSED; + } else { + return -amqp_socket_error(); + } } state->sock_inbound_limit = res; @@ -224,7 +227,7 @@ static int wait_frame_inner(amqp_connection_state_t state, } int amqp_simple_wait_frame(amqp_connection_state_t state, - amqp_frame_t *decoded_frame) + amqp_frame_t *decoded_frame) { if (state->first_queued_frame != NULL) { amqp_frame_t *f = (amqp_frame_t *) state->first_queued_frame->data; @@ -240,38 +243,42 @@ int amqp_simple_wait_frame(amqp_connection_state_t state, } int amqp_simple_wait_method(amqp_connection_state_t state, - amqp_channel_t expected_channel, - amqp_method_number_t expected_method, - amqp_method_t *output) + amqp_channel_t expected_channel, + amqp_method_number_t expected_method, + amqp_method_t *output) { amqp_frame_t frame; int res = amqp_simple_wait_frame(state, &frame); - if (res < 0) + if (res < 0) { return res; + } - if (frame.channel != expected_channel) + if (frame.channel != expected_channel) { amqp_abort("Expected 0x%08X method frame on channel %d, got frame on channel %d", - expected_method, - expected_channel, - frame.channel); - if (frame.frame_type != AMQP_FRAME_METHOD) + expected_method, + expected_channel, + frame.channel); + } + if (frame.frame_type != AMQP_FRAME_METHOD) { amqp_abort("Expected 0x%08X method frame on channel %d, got frame type %d", - expected_method, - expected_channel, - frame.frame_type); - if (frame.payload.method.id != expected_method) + expected_method, + expected_channel, + frame.frame_type); + } + if (frame.payload.method.id != expected_method) { amqp_abort("Expected method ID 0x%08X on channel %d, got ID 0x%08X", - expected_method, - expected_channel, - frame.payload.method.id); + expected_method, + expected_channel, + frame.payload.method.id); + } *output = frame.payload.method; return 0; } int amqp_send_method(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_method_number_t id, - void *decoded) + amqp_channel_t channel, + amqp_method_number_t id, + void *decoded) { amqp_frame_t frame; @@ -285,17 +292,19 @@ int amqp_send_method(amqp_connection_state_t state, static int amqp_id_in_reply_list( amqp_method_number_t expected, amqp_method_number_t *list ) { while ( *list != 0 ) { - if ( *list == expected ) return 1; + if ( *list == expected ) { + return 1; + } list++; } return 0; } amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_method_number_t request_id, - amqp_method_number_t *expected_reply_ids, - void *decoded_request_method) + amqp_channel_t channel, + amqp_method_number_t request_id, + amqp_method_number_t *expected_reply_ids, + void *decoded_request_method) { int status; amqp_rpc_reply_t result; @@ -312,7 +321,7 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state, { amqp_frame_t frame; - retry: +retry: status = wait_frame_inner(state, &frame); if (status < 0) { result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; @@ -328,21 +337,23 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state, * - on the channel we want, and a channel.close frame, or * - on channel zero, and a connection.close frame. */ - if (!( (frame.frame_type == AMQP_FRAME_METHOD) && - ( ((frame.channel == channel) && - ((amqp_id_in_reply_list(frame.payload.method.id, expected_reply_ids)) || - (frame.payload.method.id == AMQP_CHANNEL_CLOSE_METHOD))) - || - ((frame.channel == 0) && - (frame.payload.method.id == AMQP_CONNECTION_CLOSE_METHOD)) ) )) - { + if (!((frame.frame_type == AMQP_FRAME_METHOD) + && ( + ((frame.channel == channel) + && (amqp_id_in_reply_list(frame.payload.method.id, expected_reply_ids) + || (frame.payload.method.id == AMQP_CHANNEL_CLOSE_METHOD))) + || + ((frame.channel == 0) + && (frame.payload.method.id == AMQP_CONNECTION_CLOSE_METHOD)) + ) + )) { amqp_frame_t *frame_copy = amqp_pool_alloc(&state->decoding_pool, sizeof(amqp_frame_t)); amqp_link_t *link = amqp_pool_alloc(&state->decoding_pool, sizeof(amqp_link_t)); if (frame_copy == NULL || link == NULL) { - result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; - result.library_error = ERROR_NO_MEMORY; - return result; + result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; + result.library_error = ERROR_NO_MEMORY; + return result; } *frame_copy = frame; @@ -351,9 +362,9 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state, link->data = frame_copy; if (state->last_queued_frame == NULL) { - state->first_queued_frame = link; + state->first_queued_frame = link; } else { - state->last_queued_frame->next = link; + state->last_queued_frame->next = link; } state->last_queued_frame = link; @@ -361,8 +372,8 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state, } result.reply_type = (amqp_id_in_reply_list(frame.payload.method.id, expected_reply_ids)) - ? AMQP_RESPONSE_NORMAL - : AMQP_RESPONSE_SERVER_EXCEPTION; + ? AMQP_RESPONSE_NORMAL + : AMQP_RESPONSE_SERVER_EXCEPTION; result.reply = frame.payload.method; return result; @@ -370,10 +381,10 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state, } void *amqp_simple_rpc_decoded(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_method_number_t request_id, - amqp_method_number_t reply_id, - void *decoded_request_method) + amqp_channel_t channel, + amqp_method_number_t request_id, + amqp_method_number_t reply_id, + void *decoded_request_method) { amqp_method_number_t replies[2]; @@ -381,12 +392,13 @@ void *amqp_simple_rpc_decoded(amqp_connection_state_t state, replies[1] = 0; state->most_recent_api_result = amqp_simple_rpc(state, channel, - request_id, replies, - decoded_request_method); - if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) + request_id, replies, + decoded_request_method); + if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) { return state->most_recent_api_result.reply.decoded; - else + } else { return NULL; + } } amqp_rpc_reply_t amqp_get_rpc_reply(amqp_connection_state_t state) @@ -396,11 +408,11 @@ amqp_rpc_reply_t amqp_get_rpc_reply(amqp_connection_state_t state) static int amqp_login_inner(amqp_connection_state_t state, - int channel_max, - int frame_max, - int heartbeat, - amqp_sasl_method_enum sasl_method, - va_list vl) + int channel_max, + int frame_max, + int heartbeat, + amqp_sasl_method_enum sasl_method, + va_list vl) { int res; amqp_method_t method; @@ -411,14 +423,15 @@ static int amqp_login_inner(amqp_connection_state_t state, amqp_send_header(state); res = amqp_simple_wait_method(state, 0, AMQP_CONNECTION_START_METHOD, - &method); - if (res < 0) + &method); + if (res < 0) { return res; + } { amqp_connection_start_t *s = (amqp_connection_start_t *) method.decoded; if ((s->version_major != AMQP_PROTOCOL_VERSION_MAJOR) || - (s->version_minor != AMQP_PROTOCOL_VERSION_MINOR)) { + (s->version_minor != AMQP_PROTOCOL_VERSION_MINOR)) { return -ERROR_INCOMPATIBLE_AMQP_VERSION; } @@ -431,10 +444,11 @@ static int amqp_login_inner(amqp_connection_state_t state, amqp_table_entry_t properties[2]; amqp_connection_start_ok_t s; amqp_bytes_t response_bytes = sasl_response(&state->decoding_pool, - sasl_method, vl); + sasl_method, vl); - if (response_bytes.bytes == NULL) + if (response_bytes.bytes == NULL) { return -ERROR_NO_MEMORY; + } properties[0].key = amqp_cstring_bytes("product"); properties[0].value.kind = AMQP_FIELD_KIND_UTF8; @@ -454,16 +468,18 @@ static int amqp_login_inner(amqp_connection_state_t state, s.locale.len = 5; res = amqp_send_method(state, 0, AMQP_CONNECTION_START_OK_METHOD, &s); - if (res < 0) + if (res < 0) { return res; + } } amqp_release_buffers(state); res = amqp_simple_wait_method(state, 0, AMQP_CONNECTION_TUNE_METHOD, - &method); - if (res < 0) + &method); + if (res < 0) { return res; + } { amqp_connection_tune_t *s = (amqp_connection_tune_t *) method.decoded; @@ -472,18 +488,22 @@ static int amqp_login_inner(amqp_connection_state_t state, server_heartbeat = s->heartbeat; } - if (server_channel_max != 0 && server_channel_max < channel_max) + if (server_channel_max != 0 && server_channel_max < channel_max) { channel_max = server_channel_max; + } - if (server_frame_max != 0 && server_frame_max < frame_max) + if (server_frame_max != 0 && server_frame_max < frame_max) { frame_max = server_frame_max; + } - if (server_heartbeat != 0 && server_heartbeat < heartbeat) + if (server_heartbeat != 0 && server_heartbeat < heartbeat) { heartbeat = server_heartbeat; + } res = amqp_tune_connection(state, channel_max, frame_max, heartbeat); - if (res < 0) + if (res < 0) { return res; + } { amqp_connection_tune_ok_t s; @@ -492,8 +512,9 @@ static int amqp_login_inner(amqp_connection_state_t state, s.heartbeat = heartbeat; res = amqp_send_method(state, 0, AMQP_CONNECTION_TUNE_OK_METHOD, &s); - if (res < 0) + if (res < 0) { return res; + } } amqp_release_buffers(state); @@ -502,12 +523,12 @@ static int amqp_login_inner(amqp_connection_state_t state, } amqp_rpc_reply_t amqp_login(amqp_connection_state_t state, - char const *vhost, - int channel_max, - int frame_max, - int heartbeat, - amqp_sasl_method_enum sasl_method, - ...) + char const *vhost, + int channel_max, + int frame_max, + int heartbeat, + amqp_sasl_method_enum sasl_method, + ...) { va_list vl; amqp_rpc_reply_t result; @@ -533,12 +554,13 @@ amqp_rpc_reply_t amqp_login(amqp_connection_state_t state, s.insist = 1; result = amqp_simple_rpc(state, - 0, - AMQP_CONNECTION_OPEN_METHOD, - (amqp_method_number_t *) &replies, - &s); - if (result.reply_type != AMQP_RESPONSE_NORMAL) + 0, + AMQP_CONNECTION_OPEN_METHOD, + (amqp_method_number_t *) &replies, + &s); + if (result.reply_type != AMQP_RESPONSE_NORMAL) { return result; + } } amqp_maybe_release_buffers(state); diff --git a/librabbitmq/amqp_table.c b/librabbitmq/amqp_table.c index d82c946..d9383d6 100644 --- a/librabbitmq/amqp_table.c +++ b/librabbitmq/amqp_table.c @@ -46,20 +46,20 @@ #define INITIAL_TABLE_SIZE 16 static int amqp_decode_field_value(amqp_bytes_t encoded, - amqp_pool_t *pool, - amqp_field_value_t *entry, - size_t *offset); + amqp_pool_t *pool, + amqp_field_value_t *entry, + size_t *offset); static int amqp_encode_field_value(amqp_bytes_t encoded, - amqp_field_value_t *entry, - size_t *offset); + amqp_field_value_t *entry, + size_t *offset); /*---------------------------------------------------------------------------*/ static int amqp_decode_array(amqp_bytes_t encoded, - amqp_pool_t *pool, - amqp_array_t *output, - size_t *offset) + amqp_pool_t *pool, + amqp_array_t *output, + size_t *offset) { uint32_t arraysize; int num_entries = 0; @@ -68,12 +68,14 @@ static int amqp_decode_array(amqp_bytes_t encoded, size_t limit; int res; - if (!amqp_decode_32(encoded, offset, &arraysize)) + if (!amqp_decode_32(encoded, offset, &arraysize)) { return -ERROR_BAD_AMQP_DATA; + } entries = malloc(allocated_entries * sizeof(amqp_field_value_t)); - if (entries == NULL) + if (entries == NULL) { return -ERROR_NO_MEMORY; + } limit = *offset + arraysize; while (*offset < limit) { @@ -82,16 +84,18 @@ static int amqp_decode_array(amqp_bytes_t encoded, allocated_entries = allocated_entries * 2; newentries = realloc(entries, allocated_entries * sizeof(amqp_field_value_t)); res = -ERROR_NO_MEMORY; - if (newentries == NULL) - goto out; + if (newentries == NULL) { + goto out; + } entries = newentries; } res = amqp_decode_field_value(encoded, pool, &entries[num_entries], - offset); - if (res < 0) + offset); + if (res < 0) { goto out; + } num_entries++; } @@ -100,21 +104,22 @@ static int amqp_decode_array(amqp_bytes_t encoded, output->entries = amqp_pool_alloc(pool, num_entries * sizeof(amqp_field_value_t)); res = -ERROR_NO_MEMORY; /* NULL is legitimate if we requested a zero-length block. */ - if (output->entries == NULL && num_entries > 0) + if (output->entries == NULL && num_entries > 0) { goto out; + } memcpy(output->entries, entries, num_entries * sizeof(amqp_field_value_t)); res = 0; - out: +out: free(entries); return res; } int amqp_decode_table(amqp_bytes_t encoded, - amqp_pool_t *pool, - amqp_table_t *output, - size_t *offset) + amqp_pool_t *pool, + amqp_table_t *output, + size_t *offset) { uint32_t tablesize; int num_entries = 0; @@ -123,40 +128,46 @@ int amqp_decode_table(amqp_bytes_t encoded, size_t limit; int res; - if (!amqp_decode_32(encoded, offset, &tablesize)) + if (!amqp_decode_32(encoded, offset, &tablesize)) { return -ERROR_BAD_AMQP_DATA; + } entries = malloc(allocated_entries * sizeof(amqp_table_entry_t)); - if (entries == NULL) + if (entries == NULL) { return -ERROR_NO_MEMORY; + } limit = *offset + tablesize; while (*offset < limit) { uint8_t keylen; res = -ERROR_BAD_AMQP_DATA; - if (!amqp_decode_8(encoded, offset, &keylen)) + if (!amqp_decode_8(encoded, offset, &keylen)) { goto out; + } if (num_entries >= allocated_entries) { void *newentries; allocated_entries = allocated_entries * 2; newentries = realloc(entries, allocated_entries * sizeof(amqp_table_entry_t)); res = -ERROR_NO_MEMORY; - if (newentries == NULL) - goto out; + if (newentries == NULL) { + goto out; + } entries = newentries; } res = -ERROR_BAD_AMQP_DATA; - if (!amqp_decode_bytes(encoded, offset, &entries[num_entries].key, keylen)) + if (!amqp_decode_bytes(encoded, offset, &entries[num_entries].key, keylen)) { goto out; + } res = amqp_decode_field_value(encoded, pool, &entries[num_entries].value, - offset); - if (res < 0) + offset); + if (res < 0) { goto out; + } num_entries++; } @@ -165,26 +176,28 @@ int amqp_decode_table(amqp_bytes_t encoded, output->entries = amqp_pool_alloc(pool, num_entries * sizeof(amqp_table_entry_t)); res = -ERROR_NO_MEMORY; /* NULL is legitimate if we requested a zero-length block. */ - if (output->entries == NULL && num_entries > 0) + if (output->entries == NULL && num_entries > 0) { goto out; + } memcpy(output->entries, entries, num_entries * sizeof(amqp_table_entry_t)); res = 0; - out: +out: free(entries); return res; } static int amqp_decode_field_value(amqp_bytes_t encoded, - amqp_pool_t *pool, - amqp_field_value_t *entry, - size_t *offset) + amqp_pool_t *pool, + amqp_field_value_t *entry, + size_t *offset) { int res = -ERROR_BAD_AMQP_DATA; - if (!amqp_decode_8(encoded, offset, &entry->kind)) + if (!amqp_decode_8(encoded, offset, &entry->kind)) { goto out; + } #define TRIVIAL_FIELD_DECODER(bits) if (!amqp_decode_##bits(encoded, offset, &entry->value.u##bits)) goto out; break #define SIMPLE_FIELD_DECODER(bits, dest, how) { uint##bits##_t val; if (!amqp_decode_##bits(encoded, offset, &val)) goto out; entry->value.dest = how; } break @@ -223,8 +236,9 @@ static int amqp_decode_field_value(amqp_bytes_t encoded, case AMQP_FIELD_KIND_DECIMAL: if (!amqp_decode_8(encoded, offset, &entry->value.decimal.decimals) - || !amqp_decode_32(encoded, offset, &entry->value.decimal.value)) + || !amqp_decode_32(encoded, offset, &entry->value.decimal.value)) { goto out; + } break; case AMQP_FIELD_KIND_UTF8: @@ -234,8 +248,9 @@ static int amqp_decode_field_value(amqp_bytes_t encoded, case AMQP_FIELD_KIND_BYTES: { uint32_t len; if (!amqp_decode_32(encoded, offset, &len) - || !amqp_decode_bytes(encoded, offset, &entry->value.bytes, len)) + || !amqp_decode_bytes(encoded, offset, &entry->value.bytes, len)) { goto out; + } break; } @@ -259,15 +274,15 @@ static int amqp_decode_field_value(amqp_bytes_t encoded, res = 0; - out: +out: return res; } /*---------------------------------------------------------------------------*/ static int amqp_encode_array(amqp_bytes_t encoded, - amqp_array_t *input, - size_t *offset) + amqp_array_t *input, + size_t *offset) { size_t start = *offset; int i, res; @@ -276,22 +291,24 @@ static int amqp_encode_array(amqp_bytes_t encoded, for (i = 0; i < input->num_entries; i++) { res = amqp_encode_field_value(encoded, &input->entries[i], offset); - if (res < 0) + if (res < 0) { goto out; + } } - if (amqp_encode_32(encoded, &start, *offset - start - 4)) + if (amqp_encode_32(encoded, &start, *offset - start - 4)) { res = 0; - else + } else { res = -ERROR_BAD_AMQP_DATA; + } - out: +out: return res; } int amqp_encode_table(amqp_bytes_t encoded, - amqp_table_t *input, - size_t *offset) + amqp_table_t *input, + size_t *offset) { size_t start = *offset; int i, res; @@ -300,35 +317,40 @@ int amqp_encode_table(amqp_bytes_t encoded, for (i = 0; i < input->num_entries; i++) { res = amqp_encode_8(encoded, offset, input->entries[i].key.len); - if (res < 0) + if (res < 0) { goto out; + } res = amqp_encode_bytes(encoded, offset, input->entries[i].key); - if (res < 0) + if (res < 0) { goto out; + } res = amqp_encode_field_value(encoded, &input->entries[i].value, offset); - if (res < 0) + if (res < 0) { goto out; + } } - if (amqp_encode_32(encoded, &start, *offset - start - 4)) + if (amqp_encode_32(encoded, &start, *offset - start - 4)) { res = 0; - else + } else { res = -ERROR_BAD_AMQP_DATA; + } - out: +out: return res; } static int amqp_encode_field_value(amqp_bytes_t encoded, - amqp_field_value_t *entry, - size_t *offset) + amqp_field_value_t *entry, + size_t *offset) { int res = -ERROR_BAD_AMQP_DATA; - if (!amqp_encode_8(encoded, offset, entry->kind)) + if (!amqp_encode_8(encoded, offset, entry->kind)) { goto out; + } #define FIELD_ENCODER(bits, val) if (!amqp_encode_##bits(encoded, offset, val)) goto out; break @@ -366,8 +388,9 @@ static int amqp_encode_field_value(amqp_bytes_t encoded, case AMQP_FIELD_KIND_DECIMAL: if (!amqp_encode_8(encoded, offset, entry->value.decimal.decimals) - || !amqp_encode_32(encoded, offset, entry->value.decimal.value)) + || !amqp_encode_32(encoded, offset, entry->value.decimal.value)) { goto out; + } break; case AMQP_FIELD_KIND_UTF8: @@ -376,8 +399,9 @@ static int amqp_encode_field_value(amqp_bytes_t encoded, /* fall through */ case AMQP_FIELD_KIND_BYTES: if (!amqp_encode_32(encoded, offset, entry->value.bytes.len) - || !amqp_encode_bytes(encoded, offset, entry->value.bytes)) + || !amqp_encode_bytes(encoded, offset, entry->value.bytes)) { goto out; + } break; case AMQP_FIELD_KIND_ARRAY: @@ -400,13 +424,14 @@ static int amqp_encode_field_value(amqp_bytes_t encoded, res = 0; - out: +out: return res; } /*---------------------------------------------------------------------------*/ -int amqp_table_entry_cmp(void const *entry1, void const *entry2) { +int amqp_table_entry_cmp(void const *entry1, void const *entry2) +{ amqp_table_entry_t const *p1 = (amqp_table_entry_t const *) entry1; amqp_table_entry_t const *p2 = (amqp_table_entry_t const *) entry2; @@ -414,7 +439,9 @@ int amqp_table_entry_cmp(void const *entry1, void const *entry2) { size_t minlen; minlen = p1->key.len; - if (p2->key.len < minlen) minlen = p2->key.len; + if (p2->key.len < minlen) { + minlen = p2->key.len; + } d = memcmp(p1->key.bytes, p2->key.bytes, minlen); if (d != 0) { diff --git a/librabbitmq/amqp_url.c b/librabbitmq/amqp_url.c index 367376d..d3e6724 100644 --- a/librabbitmq/amqp_url.c +++ b/librabbitmq/amqp_url.c @@ -43,159 +43,167 @@ void amqp_default_connection_info(struct amqp_connection_info *ci) { - /* Apply defaults */ - ci->user = "guest"; - ci->password = "guest"; - ci->host = "localhost"; - ci->port = 5672; - ci->vhost = "/"; + /* Apply defaults */ + ci->user = "guest"; + ci->password = "guest"; + ci->host = "localhost"; + ci->port = 5672; + ci->vhost = "/"; } /* Scan for the next delimiter, handling percent-encodings on the way. */ static char find_delim(char **pp, int colon_and_at_sign_are_delims) { - char *from = *pp; - char *to = from; - - for (;;) { - char ch = *from++; - - switch (ch) { - case ':': - case '@': - if (!colon_and_at_sign_are_delims) { - *to++ = ch; - break; - } - - /* fall through */ - case 0: - case '/': - case '?': - case '#': - case '[': - case ']': - *to = 0; - *pp = from; - return ch; - - case '%': { - unsigned int val; - int chars; - int res = sscanf(from, "%2x%n", &val, &chars); - - if (res == EOF || res < 1 || chars != 2) - /* Return a surprising delimiter to - force an error. */ - return '%'; - - *to++ = val; - from += 2; - break; - } - - default: - *to++ = ch; - break; - } - } + char *from = *pp; + char *to = from; + + for (;;) { + char ch = *from++; + + switch (ch) { + case ':': + case '@': + if (!colon_and_at_sign_are_delims) { + *to++ = ch; + break; + } + + /* fall through */ + case 0: + case '/': + case '?': + case '#': + case '[': + case ']': + *to = 0; + *pp = from; + return ch; + + case '%': { + unsigned int val; + int chars; + int res = sscanf(from, "%2x%n", &val, &chars); + + if (res == EOF || res < 1 || chars != 2) + /* Return a surprising delimiter to + force an error. */ + { + return '%'; + } + + *to++ = val; + from += 2; + break; + } + + default: + *to++ = ch; + break; + } + } } /* Parse an AMQP URL into its component parts. */ int amqp_parse_url(char *url, struct amqp_connection_info *parsed) { - int res = -ERROR_BAD_AMQP_URL; - char delim; - char *start; - char *host; - char *port = NULL; - - /* check the prefix */ - if (strncmp(url, "amqp://", 7)) - goto out; - - host = start = url += 7; - delim = find_delim(&url, 1); - - if (delim == ':') { - /* The colon could be introducing the port or the - password part of the userinfo. We don't know yet, - so stash the preceding component. */ - port = start = url; - delim = find_delim(&url, 1); - } - - if (delim == '@') { - /* What might have been the host and port were in fact - the username and password */ - parsed->user = host; - if (port) - parsed->password = port; - - port = NULL; - host = start = url; - delim = find_delim(&url, 1); - } - - if (delim == '[') { - /* IPv6 address. The bracket should be the first - character in the host. */ - if (host != start || *host != 0) - goto out; - - start = url; - delim = find_delim(&url, 0); - - if (delim != ']') - goto out; - - parsed->host = start; - start = url; - delim = find_delim(&url, 1); - - /* Closing bracket should be the last character in the - host. */ - if (*start != 0) - goto out; - } - else { - /* If we haven't seen the host yet, this is it. */ - if (*host != 0) - parsed->host = host; - } - - if (delim == ':') { - port = start = url; - delim = find_delim(&url, 1); - } - - if (port) { - char *end; - long portnum = strtol(port, &end, 10); - - if (port == end || *end != 0 || portnum < 0 || portnum > 65535) - goto out; - - parsed->port = portnum; - } - - if (delim == '/') { - start = url; - delim = find_delim(&url, 1); - - if (delim != 0) - goto out; - - parsed->vhost = start; - res = 0; - } - else if (delim == 0) { - res = 0; - } - - /* Any other delimiter is bad, and we will return - ERROR_BAD_AMQP_URL. */ - - out: - return res; + int res = -ERROR_BAD_AMQP_URL; + char delim; + char *start; + char *host; + char *port = NULL; + + /* check the prefix */ + if (strncmp(url, "amqp://", 7)) { + goto out; + } + + host = start = url += 7; + delim = find_delim(&url, 1); + + if (delim == ':') { + /* The colon could be introducing the port or the + password part of the userinfo. We don't know yet, + so stash the preceding component. */ + port = start = url; + delim = find_delim(&url, 1); + } + + if (delim == '@') { + /* What might have been the host and port were in fact + the username and password */ + parsed->user = host; + if (port) { + parsed->password = port; + } + + port = NULL; + host = start = url; + delim = find_delim(&url, 1); + } + + if (delim == '[') { + /* IPv6 address. The bracket should be the first + character in the host. */ + if (host != start || *host != 0) { + goto out; + } + + start = url; + delim = find_delim(&url, 0); + + if (delim != ']') { + goto out; + } + + parsed->host = start; + start = url; + delim = find_delim(&url, 1); + + /* Closing bracket should be the last character in the + host. */ + if (*start != 0) { + goto out; + } + } else { + /* If we haven't seen the host yet, this is it. */ + if (*host != 0) { + parsed->host = host; + } + } + + if (delim == ':') { + port = start = url; + delim = find_delim(&url, 1); + } + + if (port) { + char *end; + long portnum = strtol(port, &end, 10); + + if (port == end || *end != 0 || portnum < 0 || portnum > 65535) { + goto out; + } + + parsed->port = portnum; + } + + if (delim == '/') { + start = url; + delim = find_delim(&url, 1); + + if (delim != 0) { + goto out; + } + + parsed->vhost = start; + res = 0; + } else if (delim == 0) { + res = 0; + } + + /* Any other delimiter is bad, and we will return + ERROR_BAD_AMQP_URL. */ + +out: + return res; } diff --git a/librabbitmq/unix/socket.c b/librabbitmq/unix/socket.c index 69bb036..8c8ba6b 100644 --- a/librabbitmq/unix/socket.c +++ b/librabbitmq/unix/socket.c @@ -45,37 +45,38 @@ int amqp_socket_init(void) { - return 0; + return 0; } int amqp_socket_error(void) { - return errno | ERROR_CATEGORY_OS; + return errno | ERROR_CATEGORY_OS; } int amqp_socket_socket(int domain, int type, int proto) { - int flags; + int flags; - int s = socket(domain, type, proto); - if (s < 0) - return s; + int s = socket(domain, type, proto); + if (s < 0) { + return s; + } - /* Always enable CLOEXEC on the socket */ - flags = fcntl(s, F_GETFD); - if (flags == -1 - || fcntl(s, F_SETFD, (long)(flags | FD_CLOEXEC)) == -1) { - int e = errno; - close(s); - errno = e; - return -1; - } + /* Always enable CLOEXEC on the socket */ + flags = fcntl(s, F_GETFD); + if (flags == -1 + || fcntl(s, F_SETFD, (long)(flags | FD_CLOEXEC)) == -1) { + int e = errno; + close(s); + errno = e; + return -1; + } - return s; + return s; } char *amqp_os_error_string(int err) { - return strdup(strerror(err)); + return strdup(strerror(err)); } diff --git a/librabbitmq/win32/socket.c b/librabbitmq/win32/socket.c index 9919b6b..f3c8fc7 100644 --- a/librabbitmq/win32/socket.c +++ b/librabbitmq/win32/socket.c @@ -48,55 +48,58 @@ static int called_wsastartup; int amqp_socket_init(void) { - if (!called_wsastartup) { - WSADATA data; - int res = WSAStartup(0x0202, &data); - if (res) - return -res; + if (!called_wsastartup) { + WSADATA data; + int res = WSAStartup(0x0202, &data); + if (res) { + return -res; + } - called_wsastartup = 1; - } + called_wsastartup = 1; + } - return 0; + return 0; } char *amqp_os_error_string(int err) { - char *msg, *copy; + char *msg, *copy; - if (!FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM - | FORMAT_MESSAGE_ALLOCATE_BUFFER, - NULL, err, - MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), - (LPSTR)&msg, 0, NULL)) - return strdup("(error retrieving Windows error message)"); + if (!FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM + | FORMAT_MESSAGE_ALLOCATE_BUFFER, + NULL, err, + MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), + (LPSTR)&msg, 0, NULL)) { + return strdup("(error retrieving Windows error message)"); + } - copy = strdup(msg); - LocalFree(msg); - return copy; + copy = strdup(msg); + LocalFree(msg); + return copy; } int amqp_socket_setsockopt(int sock, int level, int optname, - const void *optval, size_t optlen) + const void *optval, size_t optlen) { - /* the winsock setsockopt function has its 4th argument as a - const char * */ - return setsockopt(sock, level, optname, (const char *)optval, optlen); + /* the winsock setsockopt function has its 4th argument as a + const char * */ + return setsockopt(sock, level, optname, (const char *)optval, optlen); } int amqp_socket_writev(int sock, struct iovec *iov, int nvecs) { - DWORD ret; - if (WSASend(sock, (LPWSABUF)iov, nvecs, &ret, 0, NULL, NULL) == 0) - return ret; - else - return -1; + DWORD ret; + if (WSASend(sock, (LPWSABUF)iov, nvecs, &ret, 0, NULL, NULL) == 0) { + return ret; + } else { + return -1; + } } int amqp_socket_error(void) { - return WSAGetLastError() | ERROR_CATEGORY_OS; + return WSAGetLastError() | ERROR_CATEGORY_OS; } diff --git a/librabbitmq/win32/socket.h b/librabbitmq/win32/socket.h index 9351b1f..43e2a13 100644 --- a/librabbitmq/win32/socket.h +++ b/librabbitmq/win32/socket.h @@ -39,8 +39,8 @@ /* same as WSABUF */ struct iovec { - u_long iov_len; - void *iov_base; + u_long iov_len; + void *iov_base; }; int @@ -51,7 +51,7 @@ amqp_socket_init(void); int amqp_socket_setsockopt(int sock, int level, int optname, const void *optval, - size_t optlen); + size_t optlen); int amqp_socket_writev(int sock, struct iovec *iov, int nvecs); |