diff options
author | Tony Garnock-Jones <tonyg@kcbbs.gen.nz> | 2009-04-26 23:21:23 +0100 |
---|---|---|
committer | Tony Garnock-Jones <tonyg@kcbbs.gen.nz> | 2009-04-26 23:21:23 +0100 |
commit | e1dec49d13caa05af05984af12f3e8354387f33f (patch) | |
tree | 8e83237174c0db0c7fd4e1b42ce7c8b3afb8b3ee /librabbitmq | |
parent | af9315c575b8788b28bb3c5d959a9462b12b9a15 (diff) | |
download | rabbitmq-c-github-ask-e1dec49d13caa05af05984af12f3e8354387f33f.tar.gz |
Publication works!
Diffstat (limited to 'librabbitmq')
-rw-r--r-- | librabbitmq/Makefile.am | 2 | ||||
-rw-r--r-- | librabbitmq/amqp.h | 80 | ||||
-rw-r--r-- | librabbitmq/amqp_connection.c | 299 | ||||
-rw-r--r-- | librabbitmq/amqp_debug.c | 83 | ||||
-rw-r--r-- | librabbitmq/amqp_mem.c | 13 | ||||
-rw-r--r-- | librabbitmq/amqp_private.h | 94 | ||||
-rw-r--r-- | librabbitmq/amqp_socket.c | 376 | ||||
-rw-r--r-- | librabbitmq/amqp_table.c | 15 | ||||
-rw-r--r-- | librabbitmq/codegen.py | 36 |
9 files changed, 824 insertions, 174 deletions
diff --git a/librabbitmq/Makefile.am b/librabbitmq/Makefile.am index 6fae6d3..5ecd370 100644 --- a/librabbitmq/Makefile.am +++ b/librabbitmq/Makefile.am @@ -1,6 +1,6 @@ lib_LTLIBRARIES = librabbitmq.la -librabbitmq_la_SOURCES = amqp_mem.c amqp_table.c amqp_connection.c amqp_socket.c +librabbitmq_la_SOURCES = amqp_mem.c amqp_table.c amqp_connection.c amqp_socket.c amqp_debug.c nodist_librabbitmq_la_SOURCES = amqp_framing.c librabbitmq_la_INCLUDES = amqp_framing.h amqp.h noinst_librabbitmq_la_INCLUDES = amqp_private.h diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h index 314e2c9..18a7c76 100644 --- a/librabbitmq/amqp.h +++ b/librabbitmq/amqp.h @@ -5,8 +5,6 @@ extern "C" { #endif -typedef int (*amqp_writer_fun_t)(int context, void const *buf, size_t len); - typedef int amqp_boolean_t; typedef uint32_t amqp_method_number_t; typedef uint32_t amqp_flags_t; @@ -55,28 +53,41 @@ typedef struct amqp_pool_t_ { size_t alloc_used; } amqp_pool_t; +typedef struct amqp_method_t_ { + amqp_method_number_t id; + void *decoded; +} amqp_method_t; + typedef struct amqp_frame_t_ { uint8_t frame_type; /* 0 means no event */ amqp_channel_t channel; union { - struct { - amqp_method_number_t id; - void *decoded; - } method; + amqp_method_t method; struct { uint16_t class_id; + uint64_t body_size; void *decoded; } properties; amqp_bytes_t body_fragment; } payload; } amqp_frame_t; -#define AMQP_EXCEPTION_CATEGORY_UNKNOWN 0 -#define AMQP_EXCEPTION_CATEGORY_CONNECTION 1 -#define AMQP_EXCEPTION_CATEGORY_CHANNEL 2 +typedef enum amqp_response_type_enum_ { + AMQP_RESPONSE_NONE = 0, + AMQP_RESPONSE_NORMAL, + AMQP_RESPONSE_LIBRARY_EXCEPTION, + AMQP_RESPONSE_SERVER_EXCEPTION +} amqp_response_type_enum; + +typedef struct amqp_rpc_reply_t_ { + amqp_response_type_enum reply_type; + amqp_method_t reply; + int library_errno; /* if AMQP_RESPONSE_LIBRARY_EXCEPTION, then 0 here means socket EOF */ +} amqp_rpc_reply_t; -#define AMQP_FRAME_COMPLETE 0 -#define AMQP_FRAME_INCOMPLETE 1 +typedef enum amqp_sasl_method_enum_ { + AMQP_SASL_METHOD_PLAIN = 0 +} amqp_sasl_method_enum; /* Opaque struct. */ typedef struct amqp_connection_state_t_ *amqp_connection_state_t; @@ -88,8 +99,13 @@ extern void recycle_amqp_pool(amqp_pool_t *pool); extern void empty_amqp_pool(amqp_pool_t *pool); extern void *amqp_pool_alloc(amqp_pool_t *pool, size_t amount); +extern void amqp_pool_alloc_bytes(amqp_pool_t *pool, size_t amount, amqp_bytes_t *output); + +extern amqp_bytes_t amqp_cstring_bytes(char const *cstr); extern amqp_connection_state_t amqp_new_connection(void); +extern void amqp_set_sockfd(amqp_connection_state_t state, + int sockfd); extern void amqp_tune_connection(amqp_connection_state_t state, int frame_max); extern void amqp_destroy_connection(amqp_connection_state_t state); @@ -98,21 +114,55 @@ extern int amqp_handle_input(amqp_connection_state_t state, amqp_bytes_t received_data, amqp_frame_t *decoded_frame); +extern amqp_boolean_t amqp_release_buffers_ok(amqp_connection_state_t state); + +extern void amqp_release_buffers(amqp_connection_state_t state); + +extern void amqp_maybe_release_buffers(amqp_connection_state_t state); + extern int amqp_send_frame(amqp_connection_state_t state, - amqp_writer_fun_t writer, - int context, amqp_frame_t const *frame); extern int amqp_table_entry_cmp(void const *entry1, void const *entry2); extern int amqp_open_socket(char const *hostname, int portnumber); -extern int amqp_send_header(amqp_writer_fun_t writer, int context); +extern int amqp_send_header(amqp_connection_state_t state); + +extern amqp_boolean_t amqp_frames_enqueued(amqp_connection_state_t state); extern int amqp_simple_wait_frame(amqp_connection_state_t state, - int sockfd, amqp_frame_t *decoded_frame); +extern int amqp_simple_wait_method(amqp_connection_state_t state, + amqp_method_number_t expected_or_zero, + amqp_method_t *output); + +extern int amqp_send_method(amqp_connection_state_t state, + amqp_channel_t channel, + amqp_method_number_t id, + void *decoded); + +extern amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state, + amqp_channel_t channel, + amqp_method_number_t request_id, + amqp_method_number_t expected_reply_id, + void *decoded_request_method); + +extern amqp_rpc_reply_t amqp_login(amqp_connection_state_t state, + char const *vhost, + int frame_max, + amqp_sasl_method_enum sasl_method, ...); + +struct amqp_basic_properties_t_; +extern int amqp_basic_publish(amqp_connection_state_t state, + amqp_bytes_t exchange, + amqp_bytes_t routing_key, + amqp_boolean_t mandatory, + amqp_boolean_t immediate, + struct amqp_basic_properties_t_ const *properties, + amqp_bytes_t body); + #ifdef __cplusplus } #endif diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c index 3100f7e..57db2c3 100644 --- a/librabbitmq/amqp_connection.c +++ b/librabbitmq/amqp_connection.c @@ -4,40 +4,83 @@ #include <stdint.h> #include <errno.h> +#include <unistd.h> +#include <sys/uio.h> + #include "amqp.h" #include "amqp_framing.h" #include "amqp_private.h" -#define INITIAL_FRAME_MAX 65536 +#include <assert.h> + +#define INITIAL_FRAME_POOL_PAGE_SIZE 65536 +#define INITIAL_DECODING_POOL_PAGE_SIZE 131072 +#define INITIAL_INBOUND_SOCK_BUFFER_SIZE 131072 + +#define ENFORCE_STATE(statevec, statenum) \ + { \ + amqp_connection_state_t _check_state = (statevec); \ + int _wanted_state = (statenum); \ + amqp_assert(_check_state->state == _wanted_state, \ + "Programming error: invalid AMQP connection state: expected %d, got %d", \ + _wanted_state, \ + _check_state->state); \ + } amqp_connection_state_t amqp_new_connection(void) { amqp_connection_state_t state = (amqp_connection_state_t) calloc(1, sizeof(struct amqp_connection_state_t_)); - init_amqp_pool(&state->pool, INITIAL_FRAME_MAX); - state->inbound_buffer.len = INITIAL_FRAME_MAX; - state->inbound_buffer.bytes = malloc(INITIAL_FRAME_MAX); - state->outbound_buffer.len = INITIAL_FRAME_MAX; - state->outbound_buffer.bytes = malloc(INITIAL_FRAME_MAX); - state->frame_size_target = 0; - state->reset_required = 0; + + init_amqp_pool(&state->frame_pool, INITIAL_FRAME_POOL_PAGE_SIZE); + init_amqp_pool(&state->decoding_pool, INITIAL_DECODING_POOL_PAGE_SIZE); + + state->state = CONNECTION_STATE_IDLE; + + state->inbound_buffer.bytes = NULL; + state->outbound_buffer.bytes = NULL; + amqp_tune_connection(state, INITIAL_FRAME_POOL_PAGE_SIZE); + + state->inbound_offset = 0; + state->target_size = HEADER_SIZE; + + state->sockfd = -1; + state->sock_inbound_buffer.len = INITIAL_INBOUND_SOCK_BUFFER_SIZE; + state->sock_inbound_buffer.bytes = malloc(INITIAL_INBOUND_SOCK_BUFFER_SIZE); + state->sock_inbound_offset = 0; + state->sock_inbound_limit = 0; + + state->first_queued_frame = NULL; + state->last_queued_frame = NULL; + return state; } +void amqp_set_sockfd(amqp_connection_state_t state, + int sockfd) +{ + state->sockfd = sockfd; +} + void amqp_tune_connection(amqp_connection_state_t state, int frame_max) { - empty_amqp_pool(&state->pool); - init_amqp_pool(&state->pool, frame_max); + ENFORCE_STATE(state, CONNECTION_STATE_IDLE); + + state->frame_max = frame_max; + + empty_amqp_pool(&state->frame_pool); + init_amqp_pool(&state->frame_pool, frame_max); + state->inbound_buffer.len = frame_max; - realloc(state->inbound_buffer.bytes, frame_max); state->outbound_buffer.len = frame_max; - realloc(state->outbound_buffer.bytes, frame_max); + state->outbound_buffer.bytes = realloc(state->outbound_buffer.bytes, frame_max); } void amqp_destroy_connection(amqp_connection_state_t state) { - empty_amqp_pool(&state->pool); - free(state->inbound_buffer.bytes); + empty_amqp_pool(&state->frame_pool); + empty_amqp_pool(&state->decoding_pool); free(state->outbound_buffer.bytes); + free(state->sock_inbound_buffer.bytes); free(state); } @@ -45,90 +88,140 @@ int amqp_handle_input(amqp_connection_state_t state, amqp_bytes_t received_data, amqp_frame_t *decoded_frame) { - if (state->reset_required) { - size_t unconsumed_byte_count = state->frame_offset - state->frame_size_target; - recycle_amqp_pool(&state->pool); - memmove(state->inbound_buffer.bytes, - D_BYTES(state->inbound_buffer, - state->frame_size_target, - unconsumed_byte_count), - unconsumed_byte_count); - state->frame_offset = unconsumed_byte_count; - state->frame_size_target = 0; - state->reset_required = 0; - } - - E_BYTES(state->inbound_buffer, state->frame_offset, received_data.len, received_data.bytes); - state->frame_offset += received_data.len; + int total_bytes_consumed = 0; + int bytes_consumed; - if (state->frame_size_target == 0) { - if (state->frame_offset < 7) { - return AMQP_FRAME_INCOMPLETE; - } + read_more: + if (received_data.len == 0) { + return total_bytes_consumed; + } - state->frame_size_target = D_32(state->inbound_buffer, 3) + 8; /* 7 for header, 1 for footer */ + if (state->state == CONNECTION_STATE_IDLE) { + state->inbound_buffer.bytes = amqp_pool_alloc(&state->frame_pool, state->inbound_buffer.len); + state->state = CONNECTION_STATE_WAITING_FOR_HEADER; } - if (state->frame_offset < state->frame_size_target) { - return AMQP_FRAME_INCOMPLETE; + bytes_consumed = state->target_size - state->inbound_offset; + if (received_data.len < bytes_consumed) { + bytes_consumed = received_data.len; } - if (D_8(state->inbound_buffer, state->frame_size_target - 1) != AMQP_FRAME_END) { - return -EINVAL; + E_BYTES(state->inbound_buffer, state->inbound_offset, bytes_consumed, received_data.bytes); + state->inbound_offset += bytes_consumed; + total_bytes_consumed += bytes_consumed; + + assert(state->inbound_offset <= state->target_size); + + if (state->inbound_offset < state->target_size) { + return total_bytes_consumed; } - state->reset_required = 1; + switch (state->state) { + case CONNECTION_STATE_WAITING_FOR_HEADER: + state->target_size = D_32(state->inbound_buffer, 3) + HEADER_SIZE + FOOTER_SIZE; + state->state = CONNECTION_STATE_WAITING_FOR_BODY; - decoded_frame->frame_type = D_8(state->inbound_buffer, 0); - decoded_frame->channel = D_16(state->inbound_buffer, 1); - switch (decoded_frame->frame_type) { - case AMQP_FRAME_METHOD: { - amqp_bytes_t encoded; - int decode_result; + /* Wind buffer forward, and try to read some body out of it. */ + received_data.len -= bytes_consumed; + received_data.bytes = ((char *) received_data.bytes) + bytes_consumed; + goto read_more; - encoded.len = state->frame_size_target - 12; /* 7 for header, 4 for method id, 1 for footer */ - encoded.bytes = D_BYTES(state->inbound_buffer, 11, encoded.len); + case CONNECTION_STATE_WAITING_FOR_BODY: { + int frame_type = D_8(state->inbound_buffer, 0); - decoded_frame->payload.method.id = D_32(state->inbound_buffer, 7); - decode_result = amqp_decode_method(decoded_frame->payload.method.id, - &state->pool, - encoded, - &decoded_frame->payload.method.decoded); - if (decode_result != 0) return decode_result; + printf("recving:\n"); + amqp_dump(state->inbound_buffer.bytes, state->target_size); - return AMQP_FRAME_COMPLETE; - } - case AMQP_FRAME_HEADER: { - amqp_bytes_t encoded; - int decode_result; + /* Check frame end marker (footer) */ + if (D_8(state->inbound_buffer, state->target_size - 1) != AMQP_FRAME_END) { + return -EINVAL; + } - encoded.len = state->frame_size_target - 20; /* 7 for header, 12 for prop hdr, 1 for footer */ - encoded.bytes = D_BYTES(state->inbound_buffer, 19, encoded.len); + decoded_frame->channel = D_16(state->inbound_buffer, 1); - decoded_frame->payload.properties.class_id = D_16(state->inbound_buffer, 7); - decode_result = amqp_decode_properties(decoded_frame->payload.properties.class_id, - &state->pool, - encoded, - &decoded_frame->payload.properties.decoded); - if (decode_result != 0) return decode_result; + switch (frame_type) { + case AMQP_FRAME_METHOD: { + amqp_bytes_t encoded; - return AMQP_FRAME_COMPLETE; - } - case AMQP_FRAME_BODY: { - size_t fragment_len = state->frame_size_target - 8; /* 7 for header, 1 for footer */ - decoded_frame->payload.body_fragment.len = fragment_len; - decoded_frame->payload.body_fragment.bytes = D_BYTES(state->inbound_buffer, 7, fragment_len); - return AMQP_FRAME_COMPLETE; + /* Four bytes of method ID before the method args. */ + encoded.len = state->target_size - (HEADER_SIZE + 4 + FOOTER_SIZE); + encoded.bytes = D_BYTES(state->inbound_buffer, HEADER_SIZE + 4, encoded.len); + + decoded_frame->frame_type = AMQP_FRAME_METHOD; + decoded_frame->payload.method.id = D_32(state->inbound_buffer, HEADER_SIZE); + AMQP_CHECK_RESULT(amqp_decode_method(decoded_frame->payload.method.id, + &state->decoding_pool, + encoded, + &decoded_frame->payload.method.decoded)); + break; + } + + case AMQP_FRAME_HEADER: { + amqp_bytes_t encoded; + + /* 12 bytes for properties header. */ + encoded.len = state->target_size - (HEADER_SIZE + 12 + FOOTER_SIZE); + encoded.bytes = D_BYTES(state->inbound_buffer, HEADER_SIZE + 12, encoded.len); + + decoded_frame->frame_type = AMQP_FRAME_HEADER; + decoded_frame->payload.properties.class_id = D_16(state->inbound_buffer, HEADER_SIZE); + decoded_frame->payload.properties.body_size = D_64(state->inbound_buffer, HEADER_SIZE+4); + AMQP_CHECK_RESULT(amqp_decode_properties(decoded_frame->payload.properties.class_id, + &state->decoding_pool, + encoded, + &decoded_frame->payload.properties.decoded)); + break; + } + + case AMQP_FRAME_BODY: { + size_t fragment_len = state->target_size - (HEADER_SIZE + FOOTER_SIZE); + + decoded_frame->frame_type = AMQP_FRAME_BODY; + decoded_frame->payload.body_fragment.len = fragment_len; + decoded_frame->payload.body_fragment.bytes = + D_BYTES(state->inbound_buffer, HEADER_SIZE, fragment_len); + break; + } + + default: + /* Ignore the frame */ + decoded_frame->frame_type = 0; + break; + } + + state->inbound_buffer.bytes = NULL; + state->inbound_offset = 0; + state->target_size = HEADER_SIZE; + state->state = CONNECTION_STATE_IDLE; + return total_bytes_consumed; } + default: - /* Ignore the frame */ - return AMQP_FRAME_INCOMPLETE; + amqp_assert(0, "Internal error: invalid amqp_connection_state_t->state %d", state->state); + } +} + +amqp_boolean_t amqp_release_buffers_ok(amqp_connection_state_t state) { + return (state->state == CONNECTION_STATE_IDLE) && (state->first_queued_frame == NULL); +} + +void amqp_release_buffers(amqp_connection_state_t state) { + ENFORCE_STATE(state, CONNECTION_STATE_IDLE); + + amqp_assert(state->first_queued_frame == NULL, + "Programming error: attempt to amqp_release_buffers while waiting events enqueued"); + + recycle_amqp_pool(&state->frame_pool); + recycle_amqp_pool(&state->decoding_pool); +} + +void amqp_maybe_release_buffers(amqp_connection_state_t state) { + if (amqp_release_buffers_ok(state)) { + amqp_release_buffers(state); } } int amqp_send_frame(amqp_connection_state_t state, - amqp_writer_fun_t writer, - int context, amqp_frame_t const *frame) { amqp_bytes_t encoded; @@ -139,20 +232,24 @@ int amqp_send_frame(amqp_connection_state_t state, E_16(state->outbound_buffer, 1, frame->channel); switch (frame->frame_type) { case AMQP_FRAME_METHOD: { - encoded.len = state->outbound_buffer.len - 8; - encoded.bytes = D_BYTES(state->outbound_buffer, 7, encoded.len); - payload_len = amqp_encode_method(frame->payload.method.id, - frame->payload.method.decoded, - encoded); + E_32(state->outbound_buffer, HEADER_SIZE, frame->payload.method.id); + encoded.len = state->outbound_buffer.len - (HEADER_SIZE + 4 + FOOTER_SIZE); + encoded.bytes = D_BYTES(state->outbound_buffer, HEADER_SIZE + 4, encoded.len); + payload_len = AMQP_CHECK_RESULT(amqp_encode_method(frame->payload.method.id, + frame->payload.method.decoded, + encoded)) + 4; separate_body = 0; break; } case AMQP_FRAME_HEADER: { - encoded.len = state->outbound_buffer.len - 8; - encoded.bytes = D_BYTES(state->outbound_buffer, 7, encoded.len); - payload_len = amqp_encode_properties(frame->payload.properties.class_id, - frame->payload.properties.decoded, - encoded); + E_16(state->outbound_buffer, HEADER_SIZE, frame->payload.properties.class_id); + E_16(state->outbound_buffer, HEADER_SIZE+2, 0); /* "weight" */ + E_64(state->outbound_buffer, HEADER_SIZE+4, frame->payload.properties.body_size); + encoded.len = state->outbound_buffer.len - (HEADER_SIZE + 12 + FOOTER_SIZE); + encoded.bytes = D_BYTES(state->outbound_buffer, HEADER_SIZE + 12, encoded.len); + payload_len = AMQP_CHECK_RESULT(amqp_encode_properties(frame->payload.properties.class_id, + frame->payload.properties.decoded, + encoded)) + 12; separate_body = 0; break; } @@ -166,21 +263,27 @@ int amqp_send_frame(amqp_connection_state_t state, return -EINVAL; } - if (payload_len < 0) { - return payload_len; - } - E_32(state->outbound_buffer, 3, payload_len); if (separate_body) { - int result = writer(context, state->outbound_buffer.bytes, 7); char frame_end_byte = AMQP_FRAME_END; - if (result < 0) return result; - result = writer(context, encoded.bytes, payload_len); - if (result < 0) return result; - return writer(context, &frame_end_byte, 1); + printf("sending body frame (header):\n"); + amqp_dump(state->outbound_buffer.bytes, HEADER_SIZE); + AMQP_CHECK_RESULT(write(state->sockfd, state->outbound_buffer.bytes, HEADER_SIZE)); + printf("sending body frame (payload):\n"); + amqp_dump(encoded.bytes, payload_len); + AMQP_CHECK_RESULT(write(state->sockfd, encoded.bytes, payload_len)); + printf("sending body frame (footer).\n"); + assert(FOOTER_SIZE == 1); + AMQP_CHECK_RESULT(write(state->sockfd, &frame_end_byte, FOOTER_SIZE)); } else { - E_8(state->outbound_buffer, payload_len + 7, AMQP_FRAME_END); - return writer(context, state->outbound_buffer.bytes, payload_len + 8); + E_8(state->outbound_buffer, payload_len + HEADER_SIZE, AMQP_FRAME_END); + printf("sending:\n"); + amqp_dump(state->outbound_buffer.bytes, payload_len + HEADER_SIZE + FOOTER_SIZE); + AMQP_CHECK_RESULT(write(state->sockfd, + state->outbound_buffer.bytes, + payload_len + (HEADER_SIZE + FOOTER_SIZE))); } + + return 0; } diff --git a/librabbitmq/amqp_debug.c b/librabbitmq/amqp_debug.c new file mode 100644 index 0000000..64db9ec --- /dev/null +++ b/librabbitmq/amqp_debug.c @@ -0,0 +1,83 @@ +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +#include <ctype.h> + +static void dump_row(long count, int numinrow, int *chs) { + int i; + + printf("%08lX:", count - numinrow); + + if (numinrow > 0) { + for (i = 0; i < numinrow; i++) { + if (i == 8) + printf(" :"); + printf(" %02X", chs[i]); + } + for (i = numinrow; i < 16; i++) { + if (i == 8) + printf(" :"); + printf(" "); + } + printf(" "); + for (i = 0; i < numinrow; i++) { + if (isprint(chs[i])) + printf("%c", chs[i]); + else + printf("."); + } + } + printf("\n"); +} + +static int rows_eq(int *a, int *b) { + int i; + + for (i=0; i<16; i++) + if (a[i] != b[i]) + return 0; + + return 1; +} + +void amqp_dump(void const *buffer, size_t len) { + unsigned char *buf = (unsigned char *) buffer; + long count = 0; + int numinrow = 0; + int chs[16]; + int oldchs[16]; + int showed_dots = 0; + int i; + + for (i = 0; i < len; i++) { + int ch = buf[i]; + + if (numinrow == 16) { + int i; + + if (rows_eq(oldchs, chs)) { + if (!showed_dots) { + showed_dots = 1; + printf(" .. .. .. .. .. .. .. .. : .. .. .. .. .. .. .. ..\n"); + } + } else { + showed_dots = 0; + dump_row(count, numinrow, chs); + } + + for (i=0; i<16; i++) + oldchs[i] = chs[i]; + + numinrow = 0; + } + + count++; + chs[numinrow++] = ch; + } + + dump_row(count, numinrow, chs); + + if (numinrow != 0) + printf("%08lX:\n", count); +} diff --git a/librabbitmq/amqp_mem.c b/librabbitmq/amqp_mem.c index 64d06de..e0795d2 100644 --- a/librabbitmq/amqp_mem.c +++ b/librabbitmq/amqp_mem.c @@ -100,3 +100,16 @@ void *amqp_pool_alloc(amqp_pool_t *pool, size_t amount) { return pool->alloc_block; } + +void amqp_pool_alloc_bytes(amqp_pool_t *pool, size_t amount, amqp_bytes_t *output) { + output->len = amount; + output->bytes = amqp_pool_alloc(pool, amount); +} + +amqp_bytes_t amqp_cstring_bytes(char const *cstr) { + amqp_bytes_t result; + result.len = strlen(cstr); + result.bytes = (void *) cstr; + return result; +} + diff --git a/librabbitmq/amqp_private.h b/librabbitmq/amqp_private.h index 7a874e2..6f9d508 100644 --- a/librabbitmq/amqp_private.h +++ b/librabbitmq/amqp_private.h @@ -5,13 +5,62 @@ extern "C" { #endif +/* + * Connection states: + * + * - CONNECTION_STATE_IDLE: initial state, and entered again after + * each frame is completed. Means that no bytes of the next frame + * have been seen yet. Connections may only be reconfigured, and the + * connection's pools recycled, when in this state. Whenever we're + * in this state, the inbound_buffer's bytes pointer must be NULL; + * any other state, and it must point to a block of memory allocated + * from the frame_pool. + * + * - CONNECTION_STATE_WAITING_FOR_HEADER: Some bytes of an incoming + * frame have been seen, but not a complete frame header's worth. + * + * - CONNECTION_STATE_WAITING_FOR_BODY: A complete frame header has + * been seen, but the frame is not yet complete. When it is + * completed, it will be returned, and the connection will return to + * IDLE state. + * + */ +typedef enum amqp_connection_state_enum_ { + CONNECTION_STATE_IDLE = 0, + CONNECTION_STATE_WAITING_FOR_HEADER, + CONNECTION_STATE_WAITING_FOR_BODY +} amqp_connection_state_enum; + +/* 7 bytes up front, then payload, then 1 byte footer */ +#define HEADER_SIZE 7 +#define FOOTER_SIZE 1 + +typedef struct amqp_link_t_ { + struct amqp_link_t_ *next; + void *data; +} amqp_link_t; + struct amqp_connection_state_t_ { - amqp_pool_t pool; + amqp_pool_t frame_pool; + amqp_pool_t decoding_pool; + + amqp_connection_state_enum state; + + int frame_max; amqp_bytes_t inbound_buffer; + + size_t inbound_offset; + size_t target_size; + amqp_bytes_t outbound_buffer; - size_t frame_offset; - size_t frame_size_target; /* 0 for unknown, still waiting for header */ - amqp_boolean_t reset_required; + + int sockfd; + amqp_bytes_t sock_inbound_buffer; + size_t sock_inbound_offset; + size_t sock_inbound_limit; + + amqp_link_t *first_queued_frame; + amqp_link_t *last_queued_frame; }; #define CHECK_LIMIT(b, o, l, v) ({ if ((o + l) > (b).len) { return -EFAULT; } (v); }) @@ -31,10 +80,10 @@ struct amqp_connection_state_t_ { #define E_8(b, o, v) CHECK_LIMIT(b, o, 1, * (uint8_t *) BUF_AT(b, o) = (v)) #define E_16(b, o, v) CHECK_LIMIT(b, o, 2, ({uint16_t vv = htons(v); memcpy(BUF_AT(b, o), &vv, 2);})) #define E_32(b, o, v) CHECK_LIMIT(b, o, 4, ({uint32_t vv = htonl(v); memcpy(BUF_AT(b, o), &vv, 4);})) -#define E_64(b, o, v) ({ \ - E_32(b, o, (uint32_t) (v >> 32)); \ - E_32(b, o, (uint32_t) (v & 0xFFFFFFFF)); \ -}) +#define E_64(b, o, v) ({ \ + E_32(b, o, (uint32_t) (((uint64_t) v) >> 32)); \ + E_32(b, o + 4, (uint32_t) (((uint64_t) v) & 0xFFFFFFFF)); \ + }) #define E_BYTES(b, o, l, v) CHECK_LIMIT(b, o, l, memcpy(BUF_AT(b, o), (v), (l))) @@ -47,6 +96,35 @@ extern int amqp_encode_table(amqp_bytes_t encoded, amqp_table_t *input, int *offsetptr); +#define amqp_assert(condition, ...) \ + ({ \ + if (!(condition)) { \ + fprintf(stderr, __VA_ARGS__); \ + fputc('\n', stderr); \ + abort(); \ + } \ + }) + +#define AMQP_CHECK_RESULT(expr) \ + ({ \ + int _result = (expr); \ + if (_result < 0) return _result; \ + _result; \ + }) + +#define AMQP_CHECK_EOF_RESULT(expr) \ + ({ \ + int _result = (expr); \ + if (_result <= 0) return _result; \ + _result; \ + }) + +#ifndef NDEBUG +extern void amqp_dump(void const *buffer, size_t len); +#else +#define amqp_dump(buffer, len) ((void) 0) +#endif + #ifdef __cplusplus } #endif diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c index 32af3e9..20686c4 100644 --- a/librabbitmq/amqp_socket.c +++ b/librabbitmq/amqp_socket.c @@ -3,9 +3,11 @@ #include <string.h> #include <stdint.h> #include <errno.h> +#include <stdarg.h> #include "amqp.h" #include "amqp_framing.h" +#include "amqp_private.h" #include <sys/types.h> #include <sys/uio.h> @@ -14,6 +16,8 @@ #include <netdb.h> #include <netinet/in.h> +#include <assert.h> + int amqp_open_socket(char const *hostname, int portnumber) { @@ -40,7 +44,7 @@ int amqp_open_socket(char const *hostname, return sockfd; } -int amqp_send_header(amqp_writer_fun_t writer, int context) { +int amqp_send_header(amqp_connection_state_t state) { char header[8]; header[0] = 'A'; header[1] = 'M'; @@ -50,29 +54,367 @@ int amqp_send_header(amqp_writer_fun_t writer, int context) { header[5] = 1; header[6] = AMQP_PROTOCOL_VERSION_MAJOR; header[7] = AMQP_PROTOCOL_VERSION_MINOR; - return writer(context, &header[0], 8); + return write(state->sockfd, &header[0], 8); } -int amqp_simple_wait_frame(amqp_connection_state_t state, - int sockfd, - amqp_frame_t *decoded_frame) +static amqp_bytes_t sasl_method_name(amqp_sasl_method_enum method) { + switch (method) { + case AMQP_SASL_METHOD_PLAIN: return (amqp_bytes_t) {.len = 5, .bytes = "PLAIN"}; + default: + amqp_assert(0, "Invalid SASL method: %d", (int) method); + } + abort(); // unreachable +} + +static amqp_bytes_t sasl_response(amqp_pool_t *pool, + amqp_sasl_method_enum method, + va_list args) +{ + amqp_bytes_t response; + + switch (method) { + case AMQP_SASL_METHOD_PLAIN: { + char *username = va_arg(args, char *); + size_t username_len = strlen(username); + char *password = va_arg(args, char *); + size_t password_len = strlen(password); + amqp_pool_alloc_bytes(pool, strlen(username) + strlen(password) + 2, &response); + *BUF_AT(response, 0) = 0; + memcpy(((char *) response.bytes) + 1, username, username_len); + *BUF_AT(response, username_len + 1) = 0; + memcpy(((char *) response.bytes) + username_len + 2, password, password_len); + break; + } + default: + amqp_assert(0, "Invalid SASL method: %d", (int) method); + } + + return response; +} + +amqp_boolean_t amqp_frames_enqueued(amqp_connection_state_t state) { + return (state->first_queued_frame != NULL); +} + +static int wait_frame_inner(amqp_connection_state_t state, + amqp_frame_t *decoded_frame) { - amqp_bytes_t buffer; - char buffer_bytes[4096]; - buffer.bytes = buffer_bytes; while (1) { - int result = read(sockfd, buffer_bytes, sizeof(buffer_bytes)); + int result; + + while (state->sock_inbound_offset < state->sock_inbound_limit) { + amqp_bytes_t buffer; + buffer.len = state->sock_inbound_limit - state->sock_inbound_offset; + buffer.bytes = ((char *) state->sock_inbound_buffer.bytes) + state->sock_inbound_offset; + AMQP_CHECK_RESULT((result = amqp_handle_input(state, buffer, decoded_frame))); + state->sock_inbound_offset += result; + + if (decoded_frame->frame_type != 0) { + /* Complete frame was read. Return it. */ + return 1; + } + + /* Incomplete or ignored frame. Keep processing input. */ + assert(result != 0); + } + + result = read(state->sockfd, + state->sock_inbound_buffer.bytes, + state->sock_inbound_buffer.len); if (result < 0) { return -errno; } - buffer.len = result; - switch ((result = amqp_handle_input(state, buffer, decoded_frame))) { - case AMQP_FRAME_COMPLETE: - return AMQP_FRAME_COMPLETE; - case AMQP_FRAME_INCOMPLETE: - break; - default: - return result; + if (result == 0) { + /* EOF. */ + return 0; + } + + state->sock_inbound_limit = result; + state->sock_inbound_offset = 0; + } +} + +int amqp_simple_wait_frame(amqp_connection_state_t state, + amqp_frame_t *decoded_frame) +{ + if (state->first_queued_frame != NULL) { + amqp_frame_t *f = (amqp_frame_t *) state->first_queued_frame->data; + state->first_queued_frame = state->first_queued_frame->next; + if (state->first_queued_frame == NULL) { + state->last_queued_frame = NULL; + } + *decoded_frame = *f; + return 1; + } else { + return wait_frame_inner(state, decoded_frame); + } +} + +int amqp_simple_wait_method(amqp_connection_state_t state, + amqp_method_number_t expected_or_zero, + amqp_method_t *output) +{ + amqp_frame_t frame; + + AMQP_CHECK_EOF_RESULT(amqp_simple_wait_frame(state, &frame)); + amqp_assert(frame.frame_type == AMQP_FRAME_METHOD, + "Expected 0x%08X method frame", expected_or_zero); + amqp_assert((expected_or_zero == 0) || (frame.payload.method.id == expected_or_zero), + "Expected method ID 0x%08X", expected_or_zero); + *output = frame.payload.method; + return 1; +} + +int amqp_send_method(amqp_connection_state_t state, + amqp_channel_t channel, + amqp_method_number_t id, + void *decoded) +{ + amqp_frame_t frame; + + frame.frame_type = AMQP_FRAME_METHOD; + frame.channel = channel; + frame.payload.method.id = id; + frame.payload.method.decoded = decoded; + return amqp_send_frame(state, &frame); +} + +amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state, + amqp_channel_t channel, + amqp_method_number_t request_id, + amqp_method_number_t expected_reply_id, + void *decoded_request_method) +{ + int status; + amqp_rpc_reply_t result; + + memset(&result, 0, sizeof(result)); + + status = amqp_send_method(state, channel, request_id, decoded_request_method); + if (status < 0) { + result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; + result.library_errno = -status; + return result; + } + + { + amqp_frame_t frame; + + retry: + status = wait_frame_inner(state, &frame); + if (status <= 0) { + result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; + result.library_errno = -status; + return result; + } + + if (!((frame.frame_type == AMQP_FRAME_METHOD) && + (frame.channel == channel) && + ((frame.payload.method.id == expected_reply_id) || + (frame.payload.method.id == AMQP_CONNECTION_CLOSE_METHOD) || + (frame.payload.method.id == AMQP_CHANNEL_CLOSE_METHOD)))) + { + amqp_frame_t *frame_copy = amqp_pool_alloc(&state->decoding_pool, sizeof(amqp_frame_t)); + amqp_link_t *link = amqp_pool_alloc(&state->decoding_pool, sizeof(amqp_link_t)); + + *frame_copy = frame; + + link->next = NULL; + link->data = frame_copy; + + if (state->last_queued_frame == NULL) { + state->first_queued_frame = link; + } else { + state->last_queued_frame->next = link; + } + state->last_queued_frame = link; + + goto retry; + } + + result.reply_type = (frame.payload.method.id == expected_reply_id) + ? AMQP_RESPONSE_NORMAL + : AMQP_RESPONSE_SERVER_EXCEPTION; + + result.reply = frame.payload.method; + return result; + } +} + +static int amqp_login_inner(amqp_connection_state_t state, + int frame_max, + amqp_sasl_method_enum sasl_method, + va_list vl) +{ + amqp_method_t method; + uint32_t server_frame_max; + + amqp_send_header(state); + + AMQP_CHECK_EOF_RESULT(amqp_simple_wait_method(state, AMQP_CONNECTION_START_METHOD, &method)); + { + amqp_connection_start_t *s = (amqp_connection_start_t *) method.decoded; + if ((s->version_major != AMQP_PROTOCOL_VERSION_MAJOR) || + (s->version_minor != AMQP_PROTOCOL_VERSION_MINOR)) { + return -EPROTOTYPE; + } + + /* TODO: check that our chosen SASL mechanism is in the list of + acceptable mechanisms. Or even let the application choose from + the list! */ + } + + { + amqp_bytes_t response_bytes = sasl_response(&state->decoding_pool, sasl_method, vl); + amqp_connection_start_ok_t s = + (amqp_connection_start_ok_t) { + .client_properties = {.num_entries = 0, .entries = NULL}, + .mechanism = sasl_method_name(sasl_method), + .response = response_bytes, + .locale = {.len = 5, .bytes = "en_US"} + }; + AMQP_CHECK_RESULT(amqp_send_method(state, 0, AMQP_CONNECTION_START_OK_METHOD, &s)); + } + + amqp_release_buffers(state); + + AMQP_CHECK_EOF_RESULT(amqp_simple_wait_method(state, AMQP_CONNECTION_TUNE_METHOD, &method)); + { + amqp_connection_tune_t *s = (amqp_connection_tune_t *) method.decoded; + server_frame_max = s->frame_max; + } + + if (server_frame_max != 0 && server_frame_max < frame_max) { + frame_max = server_frame_max; + } + + { + amqp_connection_tune_ok_t s = + (amqp_connection_tune_ok_t) { + .channel_max = 1, + .frame_max = frame_max, + .heartbeat = 0 + }; + AMQP_CHECK_RESULT(amqp_send_method(state, 0, AMQP_CONNECTION_TUNE_OK_METHOD, &s)); + } + + amqp_release_buffers(state); + + return 1; +} + +amqp_rpc_reply_t amqp_login(amqp_connection_state_t state, + char const *vhost, + int frame_max, + amqp_sasl_method_enum sasl_method, + ...) +{ + va_list vl; + amqp_rpc_reply_t result; + + va_start(vl, sasl_method); + + amqp_login_inner(state, frame_max, sasl_method, vl); + + { + amqp_connection_open_t s = + (amqp_connection_open_t) { + .virtual_host = amqp_cstring_bytes(vhost), + .capabilities = {.len = 0, .bytes = NULL}, + .insist = 1 + }; + result = amqp_simple_rpc(state, + 0, + AMQP_CONNECTION_OPEN_METHOD, + AMQP_CONNECTION_OPEN_OK_METHOD, + &s); + if (result.reply_type != AMQP_RESPONSE_NORMAL) { + return result; + } + } + amqp_maybe_release_buffers(state); + + { + amqp_channel_open_t s = + (amqp_channel_open_t) { + .out_of_band = {.len = 0, .bytes = NULL} + }; + result = amqp_simple_rpc(state, + 1, + AMQP_CHANNEL_OPEN_METHOD, + AMQP_CHANNEL_OPEN_OK_METHOD, + &s); + if (result.reply_type != AMQP_RESPONSE_NORMAL) { + return result; + } + } + amqp_maybe_release_buffers(state); + + va_end(vl); + + result.reply_type = AMQP_RESPONSE_NORMAL; + result.reply.id = 0; + result.reply.decoded = NULL; + result.library_errno = 0; + return result; +} + +int amqp_basic_publish(amqp_connection_state_t state, + amqp_bytes_t exchange, + amqp_bytes_t routing_key, + amqp_boolean_t mandatory, + amqp_boolean_t immediate, + amqp_basic_properties_t const *properties, + amqp_bytes_t body) +{ + amqp_frame_t f; + size_t body_offset; + size_t usable_body_payload_size = state->frame_max - (HEADER_SIZE + FOOTER_SIZE); + + amqp_basic_publish_t m = + (amqp_basic_publish_t) { + .exchange = exchange, + .routing_key = routing_key, + .mandatory = mandatory, + .immediate = immediate + }; + + amqp_basic_properties_t default_properties; + + AMQP_CHECK_RESULT(amqp_send_method(state, 1, AMQP_BASIC_PUBLISH_METHOD, &m)); + + if (properties == NULL) { + memset(&default_properties, 0, sizeof(default_properties)); + properties = &default_properties; + } + + f.frame_type = AMQP_FRAME_HEADER; + f.channel = 1; + f.payload.properties.class_id = AMQP_BASIC_CLASS; + f.payload.properties.body_size = body.len; + f.payload.properties.decoded = (void *) properties; + AMQP_CHECK_RESULT(amqp_send_frame(state, &f)); + + body_offset = 0; + while (1) { + int remaining = body.len - body_offset; + assert(remaining >= 0); + + if (remaining == 0) + break; + + f.frame_type = AMQP_FRAME_BODY; + f.channel = 1; + f.payload.body_fragment.bytes = BUF_AT(body, body_offset); + if (remaining >= usable_body_payload_size) { + f.payload.body_fragment.len = usable_body_payload_size; + } else { + f.payload.body_fragment.len = remaining; } + + body_offset += f.payload.body_fragment.len; + AMQP_CHECK_RESULT(amqp_send_frame(state, &f)); } + + return 0; } diff --git a/librabbitmq/amqp_table.c b/librabbitmq/amqp_table.c index fac42d9..d8200e4 100644 --- a/librabbitmq/amqp_table.c +++ b/librabbitmq/amqp_table.c @@ -5,7 +5,6 @@ #include <errno.h> #include "amqp.h" -#include "../config.h" #include "amqp_private.h" #define INITIAL_TABLE_SIZE 16 @@ -66,10 +65,9 @@ int amqp_decode_table(amqp_bytes_t encoded, entry->value.u64 = D_64(encoded, offset); offset += 8; break; - case 'F': { - int table_result = amqp_decode_table(encoded, pool, &(entry->value.table), &offset); - if (table_result != 0) return table_result; - } + case 'F': + AMQP_CHECK_RESULT(amqp_decode_table(encoded, pool, &(entry->value.table), &offset)); + break; default: return -EINVAL; } @@ -128,10 +126,9 @@ int amqp_encode_table(amqp_bytes_t encoded, E_64(encoded, offset, entry->value.u64); offset += 8; break; - case 'F': { - int table_result = amqp_encode_table(encoded, &(entry->value.table), &offset); - if (table_result != 0) return table_result; - } + case 'F': + AMQP_CHECK_RESULT(amqp_encode_table(encoded, &(entry->value.table), &offset)); + break; default: return -EINVAL; } diff --git a/librabbitmq/codegen.py b/librabbitmq/codegen.py index 474be78..9f6cde0 100644 --- a/librabbitmq/codegen.py +++ b/librabbitmq/codegen.py @@ -108,7 +108,7 @@ def genErl(spec): elif type == 'table': print prefix + "table_result = amqp_decode_table(encoded, pool, &(%s), &offset);" % \ (cLvalue,) - print prefix + "if (table_result != 0) return table_result;" + print prefix + "AMQP_CHECK_RESULT(table_result);" else: raise "Illegal domain in genSingleDecode", type @@ -144,7 +144,7 @@ def genErl(spec): elif type == 'table': print prefix + "table_result = amqp_encode_table(encoded, &(%s), &offset);" % \ (cValue,) - print prefix + "if (table_result != 0) return table_result;" + print prefix + "if (table_result < 0) return table_result;" else: raise "Illegal domain in genSingleEncode", type @@ -231,17 +231,6 @@ def genErl(spec): print " return offset;" print " }" - def genLookupException(c,v,cls): - # We do this because 0.8 uses "soft error" and 8.1 uses "soft-error". - mCls = c_ize(cls).upper() - if mCls == 'SOFT_ERROR': genLookupException1(c,'AMQP_EXCEPTION_CATEGORY_CHANNEL') - elif mCls == 'HARD_ERROR': genLookupException1(c, 'AMQP_EXCEPTION_CATEGORY_CONNECTION') - elif mCls == '': pass - else: raise 'Unknown constant class', cls - - def genLookupException1(c, cCategory): - print ' case %s: return %s;' % (cConstantName(c), cCategory) - methods = spec.allMethods() print '#include <stdlib.h>' @@ -340,14 +329,14 @@ int amqp_encode_properties(uint16_t class_id, similarity of structure between classes */ amqp_flags_t flags = * (amqp_flags_t *) decoded; /* cheating! */ - while (flags != 0) { + do { amqp_flags_t remainder = flags >> 16; uint16_t partial_flags = flags & 0xFFFE; if (remainder != 0) { partial_flags |= 1; } E_16(encoded, offset, partial_flags); offset += 2; flags = remainder; - } + } while (flags != 0); switch (class_id) {""" for c in spec.allClasses(): genEncodeProperties(c) @@ -355,14 +344,6 @@ int amqp_encode_properties(uint16_t class_id, } }""" - print """ -int amqp_exception_category(uint16_t code) { - switch (code) {""" - for (c,v,cls) in spec.constants: genLookupException(c,v,cls) - print """ default: return 0; - } -}""" - def genHrl(spec): def cType(domain): return cTypeMap[spec.resolveDomain(domain)] @@ -409,7 +390,6 @@ extern int amqp_encode_method(amqp_method_number_t methodNumber, extern int amqp_encode_properties(uint16_t class_id, void *decoded, amqp_bytes_t encoded); -extern int amqp_exception_category(uint16_t code); """ print "/* Method field records. */" @@ -425,6 +405,8 @@ extern int amqp_exception_category(uint16_t code); print "/* Class property records. */" for c in spec.allClasses(): + print "#define %s (0x%.04X) /* %d */" % \ + (cConstantName(c.name + "_class"), c.index, c.index) index = 0 for f in c.fields: if index % 16 == 15: @@ -434,8 +416,10 @@ extern int amqp_exception_category(uint16_t code); bitindex = shortnum * 16 + partialindex print '#define %s (1 << %d)' % (cFlagName(c, f), bitindex) index = index + 1 - print "typedef struct {\n amqp_flags_t _flags;\n%s} %s;\n" % \ - (fieldDeclList(c.fields), c.structName()) + print "typedef struct %s_ {\n amqp_flags_t _flags;\n%s} %s;\n" % \ + (c.structName(), + fieldDeclList(c.fields), + c.structName()) print """#ifdef __cplusplus } |