diff options
author | Tony Garnock-Jones <tonyg@kcbbs.gen.nz> | 2009-05-10 19:30:25 +0100 |
---|---|---|
committer | Tony Garnock-Jones <tonyg@kcbbs.gen.nz> | 2009-05-10 19:30:25 +0100 |
commit | a5a5525d048f7a08a60debe41750cba8b5b5ccd7 (patch) | |
tree | acfafed65b5ff5895fc157f0833df74483199818 | |
parent | e6383a87a6d707051c54f879a0b9840b6189ff6c (diff) | |
download | rabbitmq-c-github-ask-a5a5525d048f7a08a60debe41750cba8b5b5ccd7.tar.gz |
Support parsing of protocol header.
-rw-r--r-- | librabbitmq/amqp.h | 9 | ||||
-rw-r--r-- | librabbitmq/amqp_connection.c | 36 | ||||
-rw-r--r-- | librabbitmq/amqp_private.h | 8 |
3 files changed, 46 insertions, 7 deletions
diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h index 7210ec6..69dc978 100644 --- a/librabbitmq/amqp.h +++ b/librabbitmq/amqp.h @@ -82,6 +82,12 @@ typedef struct amqp_frame_t_ { void *decoded; } properties; amqp_bytes_t body_fragment; + struct { + uint8_t transport_high; + uint8_t transport_low; + uint8_t protocol_version_major; + uint8_t protocol_version_minor; + } protocol_header; } payload; } amqp_frame_t; @@ -102,6 +108,9 @@ typedef enum amqp_sasl_method_enum_ { AMQP_SASL_METHOD_PLAIN = 0 } amqp_sasl_method_enum; +#define AMQP_PSEUDOFRAME_PROTOCOL_HEADER ((uint8_t) 'A') +#define AMQP_PSEUDOFRAME_PROTOCOL_CHANNEL ((amqp_channel_t) ((((int) 'M') << 8) | ((int) 'Q'))) + /* Opaque struct. */ typedef struct amqp_connection_state_t_ *amqp_connection_state_t; diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c index e56fd1f..3d5f87b 100644 --- a/librabbitmq/amqp_connection.c +++ b/librabbitmq/amqp_connection.c @@ -107,6 +107,13 @@ void amqp_destroy_connection(amqp_connection_state_t state) { free(state); } +static void return_to_idle(amqp_connection_state_t state) { + state->inbound_buffer.bytes = NULL; + state->inbound_offset = 0; + state->target_size = HEADER_SIZE; + state->state = CONNECTION_STATE_IDLE; +} + int amqp_handle_input(amqp_connection_state_t state, amqp_bytes_t received_data, amqp_frame_t *decoded_frame) @@ -145,8 +152,15 @@ int amqp_handle_input(amqp_connection_state_t state, switch (state->state) { case CONNECTION_STATE_WAITING_FOR_HEADER: - state->target_size = D_32(state->inbound_buffer, 3) + HEADER_SIZE + FOOTER_SIZE; - state->state = CONNECTION_STATE_WAITING_FOR_BODY; + if (D_8(state->inbound_buffer, 0) == AMQP_PSEUDOFRAME_PROTOCOL_HEADER && + D_16(state->inbound_buffer, 1) == AMQP_PSEUDOFRAME_PROTOCOL_CHANNEL) + { + state->target_size = 8; + state->state = CONNECTION_STATE_WAITING_FOR_PROTOCOL_HEADER; + } else { + state->target_size = D_32(state->inbound_buffer, 3) + HEADER_SIZE + FOOTER_SIZE; + state->state = CONNECTION_STATE_WAITING_FOR_BODY; + } /* Wind buffer forward, and try to read some body out of it. */ received_data.len -= bytes_consumed; @@ -217,13 +231,23 @@ int amqp_handle_input(amqp_connection_state_t state, break; } - state->inbound_buffer.bytes = NULL; - state->inbound_offset = 0; - state->target_size = HEADER_SIZE; - state->state = CONNECTION_STATE_IDLE; + return_to_idle(state); return total_bytes_consumed; } + case CONNECTION_STATE_WAITING_FOR_PROTOCOL_HEADER: + decoded_frame->frame_type = AMQP_PSEUDOFRAME_PROTOCOL_HEADER; + decoded_frame->channel = AMQP_PSEUDOFRAME_PROTOCOL_CHANNEL; + amqp_assert(D_8(state->inbound_buffer, 3) == (uint8_t) 'P', + "Invalid protocol header received"); + decoded_frame->payload.protocol_header.transport_high = D_8(state->inbound_buffer, 4); + decoded_frame->payload.protocol_header.transport_low = D_8(state->inbound_buffer, 5); + decoded_frame->payload.protocol_header.protocol_version_major = D_8(state->inbound_buffer, 6); + decoded_frame->payload.protocol_header.protocol_version_minor = D_8(state->inbound_buffer, 7); + + return_to_idle(state); + return total_bytes_consumed; + default: amqp_assert(0, "Internal error: invalid amqp_connection_state_t->state %d", state->state); } diff --git a/librabbitmq/amqp_private.h b/librabbitmq/amqp_private.h index 495e8f5..d4b1469 100644 --- a/librabbitmq/amqp_private.h +++ b/librabbitmq/amqp_private.h @@ -26,11 +26,17 @@ extern "C" { * completed, it will be returned, and the connection will return to * IDLE state. * + * - CONNECTION_STATE_WAITING_FOR_PROTOCOL_HEADER: The beginning of a + * protocol version header has been seen, but the full eight bytes + * hasn't yet been received. When it is completed, it will be + * returned, and the connection will return to IDLE state. + * */ typedef enum amqp_connection_state_enum_ { CONNECTION_STATE_IDLE = 0, CONNECTION_STATE_WAITING_FOR_HEADER, - CONNECTION_STATE_WAITING_FOR_BODY + CONNECTION_STATE_WAITING_FOR_BODY, + CONNECTION_STATE_WAITING_FOR_PROTOCOL_HEADER } amqp_connection_state_enum; /* 7 bytes up front, then payload, then 1 byte footer */ |