diff options
author | David Wragg <david@rabbitmq.com> | 2010-10-21 17:49:04 +0100 |
---|---|---|
committer | David Wragg <david@rabbitmq.com> | 2010-10-21 17:49:04 +0100 |
commit | e50a94ba2fd471c61509ed3120f42d4506d99ffb (patch) | |
tree | 78eeb2427b750d46b15ee7dc5aa19fd3d7d4f4ee | |
parent | ceda8246d3fe0ceb51f680848c8cbe45c332f71a (diff) | |
download | rabbitmq-c-github-ask-e50a94ba2fd471c61509ed3120f42d4506d99ffb.tar.gz |
Convert other librabbitmq .c files to the new helper functions
-rw-r--r-- | librabbitmq/amqp.h | 12 | ||||
-rw-r--r-- | librabbitmq/amqp_connection.c | 457 | ||||
-rw-r--r-- | librabbitmq/amqp_private.h | 33 | ||||
-rw-r--r-- | librabbitmq/amqp_socket.c | 7 |
4 files changed, 219 insertions, 290 deletions
diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h index 03072bc..810cf5b 100644 --- a/librabbitmq/amqp.h +++ b/librabbitmq/amqp.h @@ -271,11 +271,6 @@ typedef enum amqp_sasl_method_enum_ { AMQP_SASL_METHOD_PLAIN = 0 } amqp_sasl_method_enum; -#define AMQP_PSEUDOFRAME_PROTOCOL_HEADER ((uint8_t) 'A') -#define AMQP_PSEUDOFRAME_PROTOCOL_CHANNEL ((amqp_channel_t) ((((int) 'M') << 8) | ((int) 'Q'))) - -typedef int (*amqp_output_fn_t)(void *context, void *buffer, size_t count); - /* Opaque struct. */ typedef struct amqp_connection_state_t_ *amqp_connection_state_t; @@ -316,19 +311,12 @@ extern void amqp_maybe_release_buffers(amqp_connection_state_t state); extern int amqp_send_frame(amqp_connection_state_t state, amqp_frame_t const *frame); -extern int amqp_send_frame_to(amqp_connection_state_t state, - amqp_frame_t const *frame, - amqp_output_fn_t fn, - void *context); extern int amqp_table_entry_cmp(void const *entry1, void const *entry2); extern int amqp_open_socket(char const *hostname, int portnumber); extern int amqp_send_header(amqp_connection_state_t state); -extern int amqp_send_header_to(amqp_connection_state_t state, - amqp_output_fn_t fn, - void *context); extern amqp_boolean_t amqp_frames_enqueued(amqp_connection_state_t state); diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c index ee467c3..8b9ace6 100644 --- a/librabbitmq/amqp_connection.c +++ b/librabbitmq/amqp_connection.c @@ -76,42 +76,38 @@ amqp_connection_state_t amqp_new_connection(void) { amqp_connection_state_t state = (amqp_connection_state_t) calloc(1, sizeof(struct amqp_connection_state_t_)); - if (state == NULL) { + if (state == NULL) return NULL; - } init_amqp_pool(&state->frame_pool, INITIAL_FRAME_POOL_PAGE_SIZE); init_amqp_pool(&state->decoding_pool, INITIAL_DECODING_POOL_PAGE_SIZE); - state->state = CONNECTION_STATE_IDLE; + if (amqp_tune_connection(state, 0, INITIAL_FRAME_POOL_PAGE_SIZE, 0) != 0) + goto out_nomem; - state->inbound_buffer.bytes = NULL; - state->outbound_buffer.bytes = NULL; - if (amqp_tune_connection(state, 0, INITIAL_FRAME_POOL_PAGE_SIZE, 0) != 0) { - empty_amqp_pool(&state->frame_pool); - empty_amqp_pool(&state->decoding_pool); - free(state); - return NULL; - } + state->inbound_buffer.bytes = amqp_pool_alloc(&state->frame_pool, state->inbound_buffer.len); + if (state->inbound_buffer.bytes == NULL) + goto out_nomem; - state->inbound_offset = 0; - state->target_size = HEADER_SIZE; + state->state = CONNECTION_STATE_INITIAL; + /* the server protocol version response is 8 bytes, which conveniently + is also the minimum frame size */ + state->target_size = 8; state->sockfd = -1; state->sock_inbound_buffer.len = INITIAL_INBOUND_SOCK_BUFFER_SIZE; state->sock_inbound_buffer.bytes = malloc(INITIAL_INBOUND_SOCK_BUFFER_SIZE); - if (state->sock_inbound_buffer.bytes == NULL) { - amqp_destroy_connection(state); - return NULL; - } - - state->sock_inbound_offset = 0; - state->sock_inbound_limit = 0; - - state->first_queued_frame = NULL; - state->last_queued_frame = NULL; + if (state->sock_inbound_buffer.bytes == NULL) + goto out_nomem; return state; + + out_nomem: + free(state->sock_inbound_buffer.bytes); + empty_amqp_pool(&state->frame_pool); + empty_amqp_pool(&state->decoding_pool); + free(state); + return NULL; } int amqp_get_sockfd(amqp_connection_state_t state) { @@ -178,154 +174,162 @@ static void return_to_idle(amqp_connection_state_t state) { state->state = CONNECTION_STATE_IDLE; } +static size_t consume_data(amqp_connection_state_t state, + amqp_bytes_t *received_data) +{ + /* how much data is available and will fit? */ + size_t bytes_consumed = state->target_size - state->inbound_offset; + if (received_data->len < bytes_consumed) + bytes_consumed = received_data->len; + + memcpy(amqp_offset(state->inbound_buffer.bytes, state->inbound_offset), + received_data->bytes, bytes_consumed); + state->inbound_offset += bytes_consumed; + received_data->bytes = amqp_offset(received_data->bytes, bytes_consumed); + received_data->len -= bytes_consumed; + + return bytes_consumed; +} + int amqp_handle_input(amqp_connection_state_t state, amqp_bytes_t received_data, amqp_frame_t *decoded_frame) { - int total_bytes_consumed = 0; - int bytes_consumed; + size_t bytes_consumed; + void *raw_frame; /* Returning frame_type of zero indicates either insufficient input, or a complete, ignored frame was read. */ decoded_frame->frame_type = 0; - read_more: - if (received_data.len == 0) { - return total_bytes_consumed; - } + if (received_data.len == 0) + return 0; if (state->state == CONNECTION_STATE_IDLE) { - state->inbound_buffer.bytes = amqp_pool_alloc(&state->frame_pool, state->inbound_buffer.len); - if (state->inbound_buffer.bytes == NULL) { + state->inbound_buffer.bytes = amqp_pool_alloc(&state->frame_pool, + state->inbound_buffer.len); + if (state->inbound_buffer.bytes == NULL) /* state->inbound_buffer.len is always nonzero, because it corresponds to frame_max, which is not permitted to be less than AMQP_FRAME_MIN_SIZE (currently 4096 bytes). */ return -ERROR_NO_MEMORY; - } - state->state = CONNECTION_STATE_WAITING_FOR_HEADER; - } - bytes_consumed = state->target_size - state->inbound_offset; - if (received_data.len < bytes_consumed) { - bytes_consumed = received_data.len; + state->state = CONNECTION_STATE_HEADER; } - E_BYTES(state->inbound_buffer, state->inbound_offset, bytes_consumed, received_data.bytes); - state->inbound_offset += bytes_consumed; - total_bytes_consumed += bytes_consumed; + bytes_consumed = consume_data(state, &received_data); - assert(state->inbound_offset <= state->target_size); + /* do we have target_size data yet? if not, return with the + expectation that more will arrive */ + if (state->inbound_offset < state->target_size) + return bytes_consumed; - if (state->inbound_offset < state->target_size) { - return total_bytes_consumed; - } + raw_frame = state->inbound_buffer.bytes; switch (state->state) { - case CONNECTION_STATE_WAITING_FOR_HEADER: - if (D_8(state->inbound_buffer, 0) == AMQP_PSEUDOFRAME_PROTOCOL_HEADER && - D_16(state->inbound_buffer, 1) == AMQP_PSEUDOFRAME_PROTOCOL_CHANNEL) - { - state->target_size = 8; - state->state = CONNECTION_STATE_WAITING_FOR_PROTOCOL_HEADER; - } else { - state->target_size = D_32(state->inbound_buffer, 3) + HEADER_SIZE + FOOTER_SIZE; - state->state = CONNECTION_STATE_WAITING_FOR_BODY; - } - - /* Wind buffer forward, and try to read some body out of it. */ - received_data.len -= bytes_consumed; - received_data.bytes = ((char *) received_data.bytes) + bytes_consumed; - goto read_more; - - case CONNECTION_STATE_WAITING_FOR_BODY: { - int frame_type = D_8(state->inbound_buffer, 0); - -#if 0 - printf("recving:\n"); - amqp_dump(state->inbound_buffer.bytes, state->target_size); -#endif - - /* Check frame end marker (footer) */ - if (D_8(state->inbound_buffer, state->target_size - 1) != AMQP_FRAME_END) { - return -ERROR_BAD_AMQP_DATA; - } - - decoded_frame->channel = D_16(state->inbound_buffer, 1); - - switch (frame_type) { - case AMQP_FRAME_METHOD: { - amqp_bytes_t encoded; - - /* Four bytes of method ID before the method args. */ - encoded.len = state->target_size - (HEADER_SIZE + 4 + FOOTER_SIZE); - encoded.bytes = D_BYTES(state->inbound_buffer, HEADER_SIZE + 4, encoded.len); - - decoded_frame->frame_type = AMQP_FRAME_METHOD; - decoded_frame->payload.method.id = D_32(state->inbound_buffer, HEADER_SIZE); - AMQP_CHECK_RESULT(amqp_decode_method(decoded_frame->payload.method.id, - &state->decoding_pool, - encoded, - &decoded_frame->payload.method.decoded)); - break; - } - - case AMQP_FRAME_HEADER: { - amqp_bytes_t encoded; - - /* 12 bytes for properties header. */ - encoded.len = state->target_size - (HEADER_SIZE + 12 + FOOTER_SIZE); - encoded.bytes = D_BYTES(state->inbound_buffer, HEADER_SIZE + 12, encoded.len); - - decoded_frame->frame_type = AMQP_FRAME_HEADER; - decoded_frame->payload.properties.class_id = D_16(state->inbound_buffer, HEADER_SIZE); - decoded_frame->payload.properties.body_size = D_64(state->inbound_buffer, HEADER_SIZE+4); - decoded_frame->payload.properties.raw = encoded; - AMQP_CHECK_RESULT(amqp_decode_properties(decoded_frame->payload.properties.class_id, - &state->decoding_pool, - encoded, - &decoded_frame->payload.properties.decoded)); - break; - } - - case AMQP_FRAME_BODY: { - size_t fragment_len = state->target_size - (HEADER_SIZE + FOOTER_SIZE); - - decoded_frame->frame_type = AMQP_FRAME_BODY; - decoded_frame->payload.body_fragment.len = fragment_len; - decoded_frame->payload.body_fragment.bytes = - D_BYTES(state->inbound_buffer, HEADER_SIZE, fragment_len); - break; - } - - case AMQP_FRAME_HEARTBEAT: - decoded_frame->frame_type = AMQP_FRAME_HEARTBEAT; - break; - - default: - /* Ignore the frame by not changing frame_type away from 0. */ - break; - } + case CONNECTION_STATE_INITIAL: + /* check for a protocol header from the server */ + if (memcmp(raw_frame, "AMQP", 4) == 0) { + decoded_frame->frame_type = AMQP_PSEUDOFRAME_PROTOCOL_HEADER; + decoded_frame->channel = 0; + + decoded_frame->payload.protocol_header.transport_high + = amqp_d8(raw_frame, 4); + decoded_frame->payload.protocol_header.transport_low + = amqp_d8(raw_frame, 5); + decoded_frame->payload.protocol_header.protocol_version_major + = amqp_d8(raw_frame, 6); + decoded_frame->payload.protocol_header.protocol_version_minor + = amqp_d8(raw_frame, 7); return_to_idle(state); - return total_bytes_consumed; + return bytes_consumed; } - case CONNECTION_STATE_WAITING_FOR_PROTOCOL_HEADER: - decoded_frame->frame_type = AMQP_PSEUDOFRAME_PROTOCOL_HEADER; - decoded_frame->channel = AMQP_PSEUDOFRAME_PROTOCOL_CHANNEL; - if (D_8(state->inbound_buffer, 3) != (uint8_t) 'P') - amqp_abort("Invalid protocol header received"); + /* it's not a protocol header; fall through to process it as a + regular frame header */ - decoded_frame->payload.protocol_header.transport_high = D_8(state->inbound_buffer, 4); - decoded_frame->payload.protocol_header.transport_low = D_8(state->inbound_buffer, 5); - decoded_frame->payload.protocol_header.protocol_version_major = D_8(state->inbound_buffer, 6); - decoded_frame->payload.protocol_header.protocol_version_minor = D_8(state->inbound_buffer, 7); + case CONNECTION_STATE_HEADER: + /* frame length is 3 bytes in */ + state->target_size + = amqp_d32(raw_frame, 3) + HEADER_SIZE + FOOTER_SIZE; + state->state = CONNECTION_STATE_BODY; - return_to_idle(state); - return total_bytes_consumed; + bytes_consumed += consume_data(state, &received_data); + + /* do we have target_size data yet? if not, return with the + expectation that more will arrive */ + if (state->inbound_offset < state->target_size) + return bytes_consumed; + + /* fall through to process body */ + + case CONNECTION_STATE_BODY: { + amqp_bytes_t encoded; + int res; + + /* Check frame end marker (footer) */ + if (amqp_d8(raw_frame, state->target_size - 1) != AMQP_FRAME_END) + return -ERROR_BAD_AMQP_DATA; + + decoded_frame->frame_type = amqp_d8(raw_frame, 0); + decoded_frame->channel = amqp_d16(raw_frame, 1); + + switch (decoded_frame->frame_type) { + case AMQP_FRAME_METHOD: + decoded_frame->payload.method.id = amqp_d32(raw_frame, HEADER_SIZE); + encoded.bytes = amqp_offset(raw_frame, HEADER_SIZE + 4); + encoded.len = state->target_size - HEADER_SIZE - 4 - FOOTER_SIZE; + + res = amqp_decode_method(decoded_frame->payload.method.id, + &state->decoding_pool, encoded, + &decoded_frame->payload.method.decoded); + if (res < 0) + return res; + + break; + + case AMQP_FRAME_HEADER: + decoded_frame->payload.properties.class_id + = amqp_d16(raw_frame, HEADER_SIZE); + /* unused 2-byte weight field goes here */ + decoded_frame->payload.properties.body_size + = amqp_d64(raw_frame, HEADER_SIZE + 4); + encoded.bytes = amqp_offset(raw_frame, HEADER_SIZE + 12); + encoded.len = state->target_size - HEADER_SIZE - 12 - FOOTER_SIZE; + decoded_frame->payload.properties.raw = encoded; + + res = amqp_decode_properties(decoded_frame->payload.properties.class_id, + &state->decoding_pool, encoded, + &decoded_frame->payload.properties.decoded); + if (res < 0) + return res; + + break; + + case AMQP_FRAME_BODY: + decoded_frame->payload.body_fragment.len + = state->target_size - HEADER_SIZE - FOOTER_SIZE; + decoded_frame->payload.body_fragment.bytes + = amqp_offset(raw_frame, HEADER_SIZE); + break; + + case AMQP_FRAME_HEARTBEAT: + break; default: - amqp_abort("Internal error: invalid amqp_connection_state_t->state %d", state->state); + /* Ignore the frame */ + decoded_frame->frame_type = 0; + break; + } + + return_to_idle(state); + return bytes_consumed; + } + + default: + amqp_abort("Internal error: invalid amqp_connection_state_t->state %d", state->state); + return bytes_consumed; } } @@ -349,103 +353,80 @@ void amqp_maybe_release_buffers(amqp_connection_state_t state) { } } -static int inner_send_frame(amqp_connection_state_t state, - amqp_frame_t const *frame, - amqp_bytes_t *encoded, - int *payload_len) +int amqp_send_frame(amqp_connection_state_t state, + const amqp_frame_t *frame) { - int separate_body; + void *out_frame = state->outbound_buffer.bytes; + int res; - E_8(state->outbound_buffer, 0, frame->frame_type); - E_16(state->outbound_buffer, 1, frame->channel); - switch (frame->frame_type) { - case AMQP_FRAME_METHOD: - E_32(state->outbound_buffer, HEADER_SIZE, frame->payload.method.id); - encoded->len = state->outbound_buffer.len - (HEADER_SIZE + 4 + FOOTER_SIZE); - encoded->bytes = D_BYTES(state->outbound_buffer, HEADER_SIZE + 4, encoded->len); - *payload_len = AMQP_CHECK_RESULT(amqp_encode_method(frame->payload.method.id, - frame->payload.method.decoded, - *encoded)) + 4; - separate_body = 0; - break; + amqp_e8(out_frame, 0, frame->frame_type); + amqp_e16(out_frame, 1, frame->channel); - case AMQP_FRAME_HEADER: - E_16(state->outbound_buffer, HEADER_SIZE, frame->payload.properties.class_id); - E_16(state->outbound_buffer, HEADER_SIZE+2, 0); /* "weight" */ - E_64(state->outbound_buffer, HEADER_SIZE+4, frame->payload.properties.body_size); - encoded->len = state->outbound_buffer.len - (HEADER_SIZE + 12 + FOOTER_SIZE); - encoded->bytes = D_BYTES(state->outbound_buffer, HEADER_SIZE + 12, encoded->len); - *payload_len = AMQP_CHECK_RESULT(amqp_encode_properties(frame->payload.properties.class_id, - frame->payload.properties.decoded, - *encoded)) + 12; - separate_body = 0; - break; + if (frame->frame_type == AMQP_FRAME_BODY) { + /* For a body frame, rather than copying data around, we use + writev to compose the frame */ + struct iovec iov[3]; + char frame_end_byte = AMQP_FRAME_END; + const amqp_bytes_t *body = &frame->payload.body_fragment; - case AMQP_FRAME_BODY: - *encoded = frame->payload.body_fragment; - *payload_len = encoded->len; - separate_body = 1; - break; + amqp_e32(out_frame, 3, body->len); - case AMQP_FRAME_HEARTBEAT: - *encoded = AMQP_EMPTY_BYTES; - *payload_len = 0; - separate_body = 0; - break; + iov[0].iov_base = out_frame; + iov[0].iov_len = HEADER_SIZE; + iov[1].iov_base = body->bytes; + iov[1].iov_len = body->len; + iov[2].iov_base = &frame_end_byte; + iov[2].iov_len = FOOTER_SIZE; - default: - abort(); + res = amqp_socket_writev(state->sockfd, iov, 3); } + else { + size_t out_frame_len; + amqp_bytes_t encoded; - E_32(state->outbound_buffer, 3, *payload_len); - if (!separate_body) { - E_8(state->outbound_buffer, *payload_len + HEADER_SIZE, AMQP_FRAME_END); - } + switch (frame->frame_type) { + case AMQP_FRAME_METHOD: + amqp_e32(out_frame, HEADER_SIZE, frame->payload.method.id); -#if 0 - if (separate_body) { - printf("sending body frame (header):\n"); - amqp_dump(state->outbound_buffer.bytes, HEADER_SIZE); - printf("sending body frame (payload):\n"); - amqp_dump(encoded->bytes, *payload_len); - } else { - printf("sending:\n"); - amqp_dump(state->outbound_buffer.bytes, *payload_len + HEADER_SIZE + FOOTER_SIZE); - } -#endif + encoded.bytes = amqp_offset(out_frame, HEADER_SIZE + 4); + encoded.len = state->outbound_buffer.len - HEADER_SIZE - 4 - FOOTER_SIZE; - return separate_body; -} + res = amqp_encode_method(frame->payload.method.id, + frame->payload.method.decoded, encoded); + if (res < 0) + return res; -int amqp_send_frame(amqp_connection_state_t state, - amqp_frame_t const *frame) -{ - amqp_bytes_t encoded; - int payload_len, res; - - res = inner_send_frame(state, frame, &encoded, &payload_len); - switch (res) { - case 0: - res = send(state->sockfd, state->outbound_buffer.bytes, - payload_len + (HEADER_SIZE + FOOTER_SIZE), 0); + out_frame_len = res + 4; break; - case 1: { - struct iovec iov[3]; - char frame_end_byte = AMQP_FRAME_END; - iov[0].iov_base = state->outbound_buffer.bytes; - iov[0].iov_len = HEADER_SIZE; - iov[1].iov_base = encoded.bytes; - iov[1].iov_len = payload_len; - iov[2].iov_base = &frame_end_byte; - assert(FOOTER_SIZE == 1); - iov[2].iov_len = FOOTER_SIZE; - res = amqp_socket_writev(state->sockfd, &iov[0], 3); + case AMQP_FRAME_HEADER: + amqp_e16(out_frame, HEADER_SIZE, frame->payload.properties.class_id); + amqp_e16(out_frame, HEADER_SIZE+2, 0); /* "weight" */ + amqp_e64(out_frame, HEADER_SIZE+4, frame->payload.properties.body_size); + + encoded.bytes = amqp_offset(out_frame, HEADER_SIZE + 12); + encoded.len = state->outbound_buffer.len - HEADER_SIZE - 12 - FOOTER_SIZE; + + res = amqp_encode_properties(frame->payload.properties.class_id, + frame->payload.properties.decoded, encoded); + if (res < 0) + return res; + + out_frame_len = res + 12; + break; + + case AMQP_FRAME_HEARTBEAT: + out_frame_len = 0; break; - } default: - return res; + abort(); + } + + amqp_e32(out_frame, 3, out_frame_len); + amqp_e8(out_frame, out_frame_len + HEADER_SIZE, AMQP_FRAME_END); + res = send(state->sockfd, out_frame, + out_frame_len + HEADER_SIZE + FOOTER_SIZE, 0); } if (res < 0) @@ -453,35 +434,3 @@ int amqp_send_frame(amqp_connection_state_t state, else return 0; } - -int amqp_send_frame_to(amqp_connection_state_t state, - amqp_frame_t const *frame, - amqp_output_fn_t fn, - void *context) -{ - amqp_bytes_t encoded; - int payload_len; - int separate_body; - - separate_body = inner_send_frame(state, frame, &encoded, &payload_len); - switch (separate_body) { - case 0: - AMQP_CHECK_RESULT(fn(context, - state->outbound_buffer.bytes, - payload_len + (HEADER_SIZE + FOOTER_SIZE))); - return 0; - - case 1: - AMQP_CHECK_RESULT(fn(context, state->outbound_buffer.bytes, HEADER_SIZE)); - AMQP_CHECK_RESULT(fn(context, encoded.bytes, payload_len)); - { - assert(FOOTER_SIZE == 1); - char frame_end_byte = AMQP_FRAME_END; - AMQP_CHECK_RESULT(fn(context, &frame_end_byte, FOOTER_SIZE)); - } - return 0; - - default: - return separate_body; - } -} diff --git a/librabbitmq/amqp_private.h b/librabbitmq/amqp_private.h index 0ff71c7..439008a 100644 --- a/librabbitmq/amqp_private.h +++ b/librabbitmq/amqp_private.h @@ -79,41 +79,40 @@ extern char *amqp_os_error_string(int err); #include "socket.h" /* - * Connection states: + * Connection states: XXX FIX THIS * - * - CONNECTION_STATE_IDLE: initial state, and entered again after - * each frame is completed. Means that no bytes of the next frame - * have been seen yet. Connections may only be reconfigured, and the + * - CONNECTION_STATE_INITIAL: The initial state, when we cannot be + * sure if the next thing we will get is the first AMQP frame, or a + * protocol header from the server. + * + * - CONNECTION_STATE_IDLE: The normal state between + * frames. Connections may only be reconfigured, and the * connection's pools recycled, when in this state. Whenever we're * in this state, the inbound_buffer's bytes pointer must be NULL; * any other state, and it must point to a block of memory allocated * from the frame_pool. * - * - CONNECTION_STATE_WAITING_FOR_HEADER: Some bytes of an incoming - * frame have been seen, but not a complete frame header's worth. - * - * - CONNECTION_STATE_WAITING_FOR_BODY: A complete frame header has - * been seen, but the frame is not yet complete. When it is - * completed, it will be returned, and the connection will return to - * IDLE state. + * - CONNECTION_STATE_HEADER: Some bytes of an incoming frame have + * been seen, but not a complete frame header's worth. * - * - CONNECTION_STATE_WAITING_FOR_PROTOCOL_HEADER: The beginning of a - * protocol version header has been seen, but the full eight bytes - * hasn't yet been received. When it is completed, it will be + * - CONNECTION_STATE_BODY: A complete frame header has been seen, but + * the frame is not yet complete. When it is completed, it will be * returned, and the connection will return to IDLE state. * */ typedef enum amqp_connection_state_enum_ { CONNECTION_STATE_IDLE = 0, - CONNECTION_STATE_WAITING_FOR_HEADER, - CONNECTION_STATE_WAITING_FOR_BODY, - CONNECTION_STATE_WAITING_FOR_PROTOCOL_HEADER + CONNECTION_STATE_INITIAL, + CONNECTION_STATE_HEADER, + CONNECTION_STATE_BODY, } amqp_connection_state_enum; /* 7 bytes up front, then payload, then 1 byte footer */ #define HEADER_SIZE 7 #define FOOTER_SIZE 1 +#define AMQP_PSEUDOFRAME_PROTOCOL_HEADER 'A' + typedef struct amqp_link_t_ { struct amqp_link_t_ *next; void *data; diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c index 6f93d0c..109f60b 100644 --- a/librabbitmq/amqp_socket.c +++ b/librabbitmq/amqp_socket.c @@ -113,13 +113,6 @@ int amqp_send_header(amqp_connection_state_t state) { return send(state->sockfd, header(), 8, 0); } -int amqp_send_header_to(amqp_connection_state_t state, - amqp_output_fn_t fn, - void *context) -{ - return fn(context, header(), 8); -} - static amqp_bytes_t sasl_method_name(amqp_sasl_method_enum method) { switch (method) { case AMQP_SASL_METHOD_PLAIN: return (amqp_bytes_t) {.len = 5, .bytes = "PLAIN"}; |