From af9315c575b8788b28bb3c5d959a9462b12b9a15 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Sat, 25 Apr 2009 22:11:44 +0100 Subject: More work --- examples/amqp_sendstring.c | 46 ++++++++++- librabbitmq/Makefile.am | 2 +- librabbitmq/amqp.h | 23 ++++-- librabbitmq/amqp_connection.c | 186 ++++++++++++++++++++++++++++++++++++++++++ librabbitmq/amqp_private.h | 6 ++ librabbitmq/amqp_socket.c | 78 ++++++++++++++++++ librabbitmq/codegen.py | 20 +++++ 7 files changed, 352 insertions(+), 9 deletions(-) create mode 100644 librabbitmq/amqp_connection.c create mode 100644 librabbitmq/amqp_socket.c diff --git a/examples/amqp_sendstring.c b/examples/amqp_sendstring.c index 9258f7f..bdf4285 100644 --- a/examples/amqp_sendstring.c +++ b/examples/amqp_sendstring.c @@ -3,10 +3,52 @@ #include #include -#include "amqp.h" +#include + +#include + +static void die_on_error(int x, char const *context) { + if (x < 0) { + fprintf(stderr, "%s: %s\n", context, strerror(-x)); + exit(1); + } +} int main(int argc, char const * const *argv) { - amqp_connection_state_t conn = amqp_new_connection(); + char const *hostname; + int port; + char const *exchange; + char const *routingkey; + char const *messagebody; + + int sockfd; + amqp_connection_state_t conn; + + if (argc < 6) { + fprintf(stderr, "Usage: amqp_sendstring host port exchange routingkey messagebody\n"); + return 1; + } + + hostname = argv[1]; + port = atoi(argv[2]); + exchange = argv[3]; + routingkey = argv[4]; + messagebody = argv[5]; + + die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket"); + amqp_send_header((amqp_writer_fun_t) write, sockfd); + + conn = amqp_new_connection(); + + while (1) { + amqp_frame_t frame; + printf("Result %d\n", amqp_simple_wait_frame(conn, sockfd, &frame)); + printf("Frame type %d, channel %d\n", frame.frame_type, frame.channel); + } + amqp_destroy_connection(conn); + + die_on_error(close(sockfd), "Closing socket"); + return 0; } diff --git a/librabbitmq/Makefile.am b/librabbitmq/Makefile.am index 5fec206..6fae6d3 100644 --- a/librabbitmq/Makefile.am +++ b/librabbitmq/Makefile.am @@ -1,6 +1,6 @@ lib_LTLIBRARIES = librabbitmq.la -librabbitmq_la_SOURCES = amqp_mem.c amqp_table.c +librabbitmq_la_SOURCES = amqp_mem.c amqp_table.c amqp_connection.c amqp_socket.c nodist_librabbitmq_la_SOURCES = amqp_framing.c librabbitmq_la_INCLUDES = amqp_framing.h amqp.h noinst_librabbitmq_la_INCLUDES = amqp_private.h diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h index 2554cf8..314e2c9 100644 --- a/librabbitmq/amqp.h +++ b/librabbitmq/amqp.h @@ -5,6 +5,8 @@ extern "C" { #endif +typedef int (*amqp_writer_fun_t)(int context, void const *buf, size_t len); + typedef int amqp_boolean_t; typedef uint32_t amqp_method_number_t; typedef uint32_t amqp_flags_t; @@ -73,6 +75,9 @@ typedef struct amqp_frame_t_ { #define AMQP_EXCEPTION_CATEGORY_CONNECTION 1 #define AMQP_EXCEPTION_CATEGORY_CHANNEL 2 +#define AMQP_FRAME_COMPLETE 0 +#define AMQP_FRAME_INCOMPLETE 1 + /* Opaque struct. */ typedef struct amqp_connection_state_t_ *amqp_connection_state_t; @@ -85,23 +90,29 @@ extern void empty_amqp_pool(amqp_pool_t *pool); extern void *amqp_pool_alloc(amqp_pool_t *pool, size_t amount); extern amqp_connection_state_t amqp_new_connection(void); -extern int amqp_tune_connection(amqp_connection_state_t state, - int frame_max); -extern int amqp_destroy_connection(amqp_connection_state_t state); +extern void amqp_tune_connection(amqp_connection_state_t state, + int frame_max); +extern void amqp_destroy_connection(amqp_connection_state_t state); extern int amqp_handle_input(amqp_connection_state_t state, - amqp_pool_t *pool, amqp_bytes_t received_data, amqp_frame_t *decoded_frame); extern int amqp_send_frame(amqp_connection_state_t state, - amqp_pool_t *pool, - int (*writer)(int context, void const *buf, size_t len), + amqp_writer_fun_t writer, int context, amqp_frame_t const *frame); extern int amqp_table_entry_cmp(void const *entry1, void const *entry2); +extern int amqp_open_socket(char const *hostname, int portnumber); + +extern int amqp_send_header(amqp_writer_fun_t writer, int context); + +extern int amqp_simple_wait_frame(amqp_connection_state_t state, + int sockfd, + amqp_frame_t *decoded_frame); + #ifdef __cplusplus } #endif diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c new file mode 100644 index 0000000..3100f7e --- /dev/null +++ b/librabbitmq/amqp_connection.c @@ -0,0 +1,186 @@ +#include +#include +#include +#include +#include + +#include "amqp.h" +#include "amqp_framing.h" +#include "amqp_private.h" + +#define INITIAL_FRAME_MAX 65536 + +amqp_connection_state_t amqp_new_connection(void) { + amqp_connection_state_t state = + (amqp_connection_state_t) calloc(1, sizeof(struct amqp_connection_state_t_)); + init_amqp_pool(&state->pool, INITIAL_FRAME_MAX); + state->inbound_buffer.len = INITIAL_FRAME_MAX; + state->inbound_buffer.bytes = malloc(INITIAL_FRAME_MAX); + state->outbound_buffer.len = INITIAL_FRAME_MAX; + state->outbound_buffer.bytes = malloc(INITIAL_FRAME_MAX); + state->frame_size_target = 0; + state->reset_required = 0; + return state; +} + +void amqp_tune_connection(amqp_connection_state_t state, + int frame_max) +{ + empty_amqp_pool(&state->pool); + init_amqp_pool(&state->pool, frame_max); + state->inbound_buffer.len = frame_max; + realloc(state->inbound_buffer.bytes, frame_max); + state->outbound_buffer.len = frame_max; + realloc(state->outbound_buffer.bytes, frame_max); +} + +void amqp_destroy_connection(amqp_connection_state_t state) { + empty_amqp_pool(&state->pool); + free(state->inbound_buffer.bytes); + free(state->outbound_buffer.bytes); + free(state); +} + +int amqp_handle_input(amqp_connection_state_t state, + amqp_bytes_t received_data, + amqp_frame_t *decoded_frame) +{ + if (state->reset_required) { + size_t unconsumed_byte_count = state->frame_offset - state->frame_size_target; + recycle_amqp_pool(&state->pool); + memmove(state->inbound_buffer.bytes, + D_BYTES(state->inbound_buffer, + state->frame_size_target, + unconsumed_byte_count), + unconsumed_byte_count); + state->frame_offset = unconsumed_byte_count; + state->frame_size_target = 0; + state->reset_required = 0; + } + + E_BYTES(state->inbound_buffer, state->frame_offset, received_data.len, received_data.bytes); + state->frame_offset += received_data.len; + + if (state->frame_size_target == 0) { + if (state->frame_offset < 7) { + return AMQP_FRAME_INCOMPLETE; + } + + state->frame_size_target = D_32(state->inbound_buffer, 3) + 8; /* 7 for header, 1 for footer */ + } + + if (state->frame_offset < state->frame_size_target) { + return AMQP_FRAME_INCOMPLETE; + } + + if (D_8(state->inbound_buffer, state->frame_size_target - 1) != AMQP_FRAME_END) { + return -EINVAL; + } + + state->reset_required = 1; + + decoded_frame->frame_type = D_8(state->inbound_buffer, 0); + decoded_frame->channel = D_16(state->inbound_buffer, 1); + switch (decoded_frame->frame_type) { + case AMQP_FRAME_METHOD: { + amqp_bytes_t encoded; + int decode_result; + + encoded.len = state->frame_size_target - 12; /* 7 for header, 4 for method id, 1 for footer */ + encoded.bytes = D_BYTES(state->inbound_buffer, 11, encoded.len); + + decoded_frame->payload.method.id = D_32(state->inbound_buffer, 7); + decode_result = amqp_decode_method(decoded_frame->payload.method.id, + &state->pool, + encoded, + &decoded_frame->payload.method.decoded); + if (decode_result != 0) return decode_result; + + return AMQP_FRAME_COMPLETE; + } + case AMQP_FRAME_HEADER: { + amqp_bytes_t encoded; + int decode_result; + + encoded.len = state->frame_size_target - 20; /* 7 for header, 12 for prop hdr, 1 for footer */ + encoded.bytes = D_BYTES(state->inbound_buffer, 19, encoded.len); + + decoded_frame->payload.properties.class_id = D_16(state->inbound_buffer, 7); + decode_result = amqp_decode_properties(decoded_frame->payload.properties.class_id, + &state->pool, + encoded, + &decoded_frame->payload.properties.decoded); + if (decode_result != 0) return decode_result; + + return AMQP_FRAME_COMPLETE; + } + case AMQP_FRAME_BODY: { + size_t fragment_len = state->frame_size_target - 8; /* 7 for header, 1 for footer */ + decoded_frame->payload.body_fragment.len = fragment_len; + decoded_frame->payload.body_fragment.bytes = D_BYTES(state->inbound_buffer, 7, fragment_len); + return AMQP_FRAME_COMPLETE; + } + default: + /* Ignore the frame */ + return AMQP_FRAME_INCOMPLETE; + } +} + +int amqp_send_frame(amqp_connection_state_t state, + amqp_writer_fun_t writer, + int context, + amqp_frame_t const *frame) +{ + amqp_bytes_t encoded; + int payload_len; + int separate_body; + + E_8(state->outbound_buffer, 0, frame->frame_type); + E_16(state->outbound_buffer, 1, frame->channel); + switch (frame->frame_type) { + case AMQP_FRAME_METHOD: { + encoded.len = state->outbound_buffer.len - 8; + encoded.bytes = D_BYTES(state->outbound_buffer, 7, encoded.len); + payload_len = amqp_encode_method(frame->payload.method.id, + frame->payload.method.decoded, + encoded); + separate_body = 0; + break; + } + case AMQP_FRAME_HEADER: { + encoded.len = state->outbound_buffer.len - 8; + encoded.bytes = D_BYTES(state->outbound_buffer, 7, encoded.len); + payload_len = amqp_encode_properties(frame->payload.properties.class_id, + frame->payload.properties.decoded, + encoded); + separate_body = 0; + break; + } + case AMQP_FRAME_BODY: { + encoded = frame->payload.body_fragment; + payload_len = encoded.len; + separate_body = 1; + break; + } + default: + return -EINVAL; + } + + if (payload_len < 0) { + return payload_len; + } + + E_32(state->outbound_buffer, 3, payload_len); + + if (separate_body) { + int result = writer(context, state->outbound_buffer.bytes, 7); + char frame_end_byte = AMQP_FRAME_END; + if (result < 0) return result; + result = writer(context, encoded.bytes, payload_len); + if (result < 0) return result; + return writer(context, &frame_end_byte, 1); + } else { + E_8(state->outbound_buffer, payload_len + 7, AMQP_FRAME_END); + return writer(context, state->outbound_buffer.bytes, payload_len + 8); + } +} diff --git a/librabbitmq/amqp_private.h b/librabbitmq/amqp_private.h index 4f6628d..7a874e2 100644 --- a/librabbitmq/amqp_private.h +++ b/librabbitmq/amqp_private.h @@ -6,6 +6,12 @@ extern "C" { #endif struct amqp_connection_state_t_ { + amqp_pool_t pool; + amqp_bytes_t inbound_buffer; + amqp_bytes_t outbound_buffer; + size_t frame_offset; + size_t frame_size_target; /* 0 for unknown, still waiting for header */ + amqp_boolean_t reset_required; }; #define CHECK_LIMIT(b, o, l, v) ({ if ((o + l) > (b).len) { return -EFAULT; } (v); }) diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c new file mode 100644 index 0000000..32af3e9 --- /dev/null +++ b/librabbitmq/amqp_socket.c @@ -0,0 +1,78 @@ +#include +#include +#include +#include +#include + +#include "amqp.h" +#include "amqp_framing.h" + +#include +#include +#include +#include +#include +#include + +int amqp_open_socket(char const *hostname, + int portnumber) +{ + int sockfd; + struct sockaddr_in addr; + struct hostent *he; + + he = gethostbyname(hostname); + if (he == NULL) { + return -ENOENT; + } + + addr.sin_family = AF_INET; + addr.sin_port = htons(portnumber); + addr.sin_addr.s_addr = * (uint32_t *) he->h_addr_list[0]; + + sockfd = socket(PF_INET, SOCK_STREAM, 0); + if (connect(sockfd, (struct sockaddr *) &addr, sizeof(addr)) < 0) { + int result = -errno; + close(sockfd); + return result; + } + + return sockfd; +} + +int amqp_send_header(amqp_writer_fun_t writer, int context) { + char header[8]; + header[0] = 'A'; + header[1] = 'M'; + header[2] = 'Q'; + header[3] = 'P'; + header[4] = 1; + header[5] = 1; + header[6] = AMQP_PROTOCOL_VERSION_MAJOR; + header[7] = AMQP_PROTOCOL_VERSION_MINOR; + return writer(context, &header[0], 8); +} + +int amqp_simple_wait_frame(amqp_connection_state_t state, + int sockfd, + amqp_frame_t *decoded_frame) +{ + amqp_bytes_t buffer; + char buffer_bytes[4096]; + buffer.bytes = buffer_bytes; + while (1) { + int result = read(sockfd, buffer_bytes, sizeof(buffer_bytes)); + if (result < 0) { + return -errno; + } + buffer.len = result; + switch ((result = amqp_handle_input(state, buffer, decoded_frame))) { + case AMQP_FRAME_COMPLETE: + return AMQP_FRAME_COMPLETE; + case AMQP_FRAME_INCOMPLETE: + break; + default: + return result; + } + } +} diff --git a/librabbitmq/codegen.py b/librabbitmq/codegen.py index 00159ae..474be78 100644 --- a/librabbitmq/codegen.py +++ b/librabbitmq/codegen.py @@ -392,6 +392,26 @@ extern "C" { print "#define %s %s" % (cConstantName(c), v) print + print """/* Function prototypes. */ +extern char const *amqp_method_name(amqp_method_number_t methodNumber); +extern amqp_boolean_t amqp_method_has_content(amqp_method_number_t methodNumber); +extern int amqp_decode_method(amqp_method_number_t methodNumber, + amqp_pool_t *pool, + amqp_bytes_t encoded, + void **decoded); +extern int amqp_decode_properties(uint16_t class_id, + amqp_pool_t *pool, + amqp_bytes_t encoded, + void **decoded); +extern int amqp_encode_method(amqp_method_number_t methodNumber, + void *decoded, + amqp_bytes_t encoded); +extern int amqp_encode_properties(uint16_t class_id, + void *decoded, + amqp_bytes_t encoded); +extern int amqp_exception_category(uint16_t code); +""" + print "/* Method field records. */" for m in methods: methodid = m.klass.index << 16 | m.index -- cgit v1.2.1