summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTony Garnock-Jones <tonyg@kcbbs.gen.nz>2009-05-10 19:30:25 +0100
committerTony Garnock-Jones <tonyg@kcbbs.gen.nz>2009-05-10 19:30:25 +0100
commita5a5525d048f7a08a60debe41750cba8b5b5ccd7 (patch)
treeacfafed65b5ff5895fc157f0833df74483199818
parente6383a87a6d707051c54f879a0b9840b6189ff6c (diff)
downloadrabbitmq-c-github-ask-a5a5525d048f7a08a60debe41750cba8b5b5ccd7.tar.gz
Support parsing of protocol header.
-rw-r--r--librabbitmq/amqp.h9
-rw-r--r--librabbitmq/amqp_connection.c36
-rw-r--r--librabbitmq/amqp_private.h8
3 files changed, 46 insertions, 7 deletions
diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h
index 7210ec6..69dc978 100644
--- a/librabbitmq/amqp.h
+++ b/librabbitmq/amqp.h
@@ -82,6 +82,12 @@ typedef struct amqp_frame_t_ {
void *decoded;
} properties;
amqp_bytes_t body_fragment;
+ struct {
+ uint8_t transport_high;
+ uint8_t transport_low;
+ uint8_t protocol_version_major;
+ uint8_t protocol_version_minor;
+ } protocol_header;
} payload;
} amqp_frame_t;
@@ -102,6 +108,9 @@ typedef enum amqp_sasl_method_enum_ {
AMQP_SASL_METHOD_PLAIN = 0
} amqp_sasl_method_enum;
+#define AMQP_PSEUDOFRAME_PROTOCOL_HEADER ((uint8_t) 'A')
+#define AMQP_PSEUDOFRAME_PROTOCOL_CHANNEL ((amqp_channel_t) ((((int) 'M') << 8) | ((int) 'Q')))
+
/* Opaque struct. */
typedef struct amqp_connection_state_t_ *amqp_connection_state_t;
diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c
index e56fd1f..3d5f87b 100644
--- a/librabbitmq/amqp_connection.c
+++ b/librabbitmq/amqp_connection.c
@@ -107,6 +107,13 @@ void amqp_destroy_connection(amqp_connection_state_t state) {
free(state);
}
+static void return_to_idle(amqp_connection_state_t state) {
+ state->inbound_buffer.bytes = NULL;
+ state->inbound_offset = 0;
+ state->target_size = HEADER_SIZE;
+ state->state = CONNECTION_STATE_IDLE;
+}
+
int amqp_handle_input(amqp_connection_state_t state,
amqp_bytes_t received_data,
amqp_frame_t *decoded_frame)
@@ -145,8 +152,15 @@ int amqp_handle_input(amqp_connection_state_t state,
switch (state->state) {
case CONNECTION_STATE_WAITING_FOR_HEADER:
- state->target_size = D_32(state->inbound_buffer, 3) + HEADER_SIZE + FOOTER_SIZE;
- state->state = CONNECTION_STATE_WAITING_FOR_BODY;
+ if (D_8(state->inbound_buffer, 0) == AMQP_PSEUDOFRAME_PROTOCOL_HEADER &&
+ D_16(state->inbound_buffer, 1) == AMQP_PSEUDOFRAME_PROTOCOL_CHANNEL)
+ {
+ state->target_size = 8;
+ state->state = CONNECTION_STATE_WAITING_FOR_PROTOCOL_HEADER;
+ } else {
+ state->target_size = D_32(state->inbound_buffer, 3) + HEADER_SIZE + FOOTER_SIZE;
+ state->state = CONNECTION_STATE_WAITING_FOR_BODY;
+ }
/* Wind buffer forward, and try to read some body out of it. */
received_data.len -= bytes_consumed;
@@ -217,13 +231,23 @@ int amqp_handle_input(amqp_connection_state_t state,
break;
}
- state->inbound_buffer.bytes = NULL;
- state->inbound_offset = 0;
- state->target_size = HEADER_SIZE;
- state->state = CONNECTION_STATE_IDLE;
+ return_to_idle(state);
return total_bytes_consumed;
}
+ case CONNECTION_STATE_WAITING_FOR_PROTOCOL_HEADER:
+ decoded_frame->frame_type = AMQP_PSEUDOFRAME_PROTOCOL_HEADER;
+ decoded_frame->channel = AMQP_PSEUDOFRAME_PROTOCOL_CHANNEL;
+ amqp_assert(D_8(state->inbound_buffer, 3) == (uint8_t) 'P',
+ "Invalid protocol header received");
+ decoded_frame->payload.protocol_header.transport_high = D_8(state->inbound_buffer, 4);
+ decoded_frame->payload.protocol_header.transport_low = D_8(state->inbound_buffer, 5);
+ decoded_frame->payload.protocol_header.protocol_version_major = D_8(state->inbound_buffer, 6);
+ decoded_frame->payload.protocol_header.protocol_version_minor = D_8(state->inbound_buffer, 7);
+
+ return_to_idle(state);
+ return total_bytes_consumed;
+
default:
amqp_assert(0, "Internal error: invalid amqp_connection_state_t->state %d", state->state);
}
diff --git a/librabbitmq/amqp_private.h b/librabbitmq/amqp_private.h
index 495e8f5..d4b1469 100644
--- a/librabbitmq/amqp_private.h
+++ b/librabbitmq/amqp_private.h
@@ -26,11 +26,17 @@ extern "C" {
* completed, it will be returned, and the connection will return to
* IDLE state.
*
+ * - CONNECTION_STATE_WAITING_FOR_PROTOCOL_HEADER: The beginning of a
+ * protocol version header has been seen, but the full eight bytes
+ * hasn't yet been received. When it is completed, it will be
+ * returned, and the connection will return to IDLE state.
+ *
*/
typedef enum amqp_connection_state_enum_ {
CONNECTION_STATE_IDLE = 0,
CONNECTION_STATE_WAITING_FOR_HEADER,
- CONNECTION_STATE_WAITING_FOR_BODY
+ CONNECTION_STATE_WAITING_FOR_BODY,
+ CONNECTION_STATE_WAITING_FOR_PROTOCOL_HEADER
} amqp_connection_state_enum;
/* 7 bytes up front, then payload, then 1 byte footer */