summaryrefslogtreecommitdiff
path: root/librabbitmq
diff options
context:
space:
mode:
authorDavid Wragg <david@rabbitmq.com>2010-10-21 17:49:04 +0100
committerDavid Wragg <david@rabbitmq.com>2010-10-21 17:49:04 +0100
commite50a94ba2fd471c61509ed3120f42d4506d99ffb (patch)
tree78eeb2427b750d46b15ee7dc5aa19fd3d7d4f4ee /librabbitmq
parentceda8246d3fe0ceb51f680848c8cbe45c332f71a (diff)
downloadrabbitmq-c-github-ask-e50a94ba2fd471c61509ed3120f42d4506d99ffb.tar.gz
Convert other librabbitmq .c files to the new helper functions
Diffstat (limited to 'librabbitmq')
-rw-r--r--librabbitmq/amqp.h12
-rw-r--r--librabbitmq/amqp_connection.c457
-rw-r--r--librabbitmq/amqp_private.h33
-rw-r--r--librabbitmq/amqp_socket.c7
4 files changed, 219 insertions, 290 deletions
diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h
index 03072bc..810cf5b 100644
--- a/librabbitmq/amqp.h
+++ b/librabbitmq/amqp.h
@@ -271,11 +271,6 @@ typedef enum amqp_sasl_method_enum_ {
AMQP_SASL_METHOD_PLAIN = 0
} amqp_sasl_method_enum;
-#define AMQP_PSEUDOFRAME_PROTOCOL_HEADER ((uint8_t) 'A')
-#define AMQP_PSEUDOFRAME_PROTOCOL_CHANNEL ((amqp_channel_t) ((((int) 'M') << 8) | ((int) 'Q')))
-
-typedef int (*amqp_output_fn_t)(void *context, void *buffer, size_t count);
-
/* Opaque struct. */
typedef struct amqp_connection_state_t_ *amqp_connection_state_t;
@@ -316,19 +311,12 @@ extern void amqp_maybe_release_buffers(amqp_connection_state_t state);
extern int amqp_send_frame(amqp_connection_state_t state,
amqp_frame_t const *frame);
-extern int amqp_send_frame_to(amqp_connection_state_t state,
- amqp_frame_t const *frame,
- amqp_output_fn_t fn,
- void *context);
extern int amqp_table_entry_cmp(void const *entry1, void const *entry2);
extern int amqp_open_socket(char const *hostname, int portnumber);
extern int amqp_send_header(amqp_connection_state_t state);
-extern int amqp_send_header_to(amqp_connection_state_t state,
- amqp_output_fn_t fn,
- void *context);
extern amqp_boolean_t amqp_frames_enqueued(amqp_connection_state_t state);
diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c
index ee467c3..8b9ace6 100644
--- a/librabbitmq/amqp_connection.c
+++ b/librabbitmq/amqp_connection.c
@@ -76,42 +76,38 @@ amqp_connection_state_t amqp_new_connection(void) {
amqp_connection_state_t state =
(amqp_connection_state_t) calloc(1, sizeof(struct amqp_connection_state_t_));
- if (state == NULL) {
+ if (state == NULL)
return NULL;
- }
init_amqp_pool(&state->frame_pool, INITIAL_FRAME_POOL_PAGE_SIZE);
init_amqp_pool(&state->decoding_pool, INITIAL_DECODING_POOL_PAGE_SIZE);
- state->state = CONNECTION_STATE_IDLE;
+ if (amqp_tune_connection(state, 0, INITIAL_FRAME_POOL_PAGE_SIZE, 0) != 0)
+ goto out_nomem;
- state->inbound_buffer.bytes = NULL;
- state->outbound_buffer.bytes = NULL;
- if (amqp_tune_connection(state, 0, INITIAL_FRAME_POOL_PAGE_SIZE, 0) != 0) {
- empty_amqp_pool(&state->frame_pool);
- empty_amqp_pool(&state->decoding_pool);
- free(state);
- return NULL;
- }
+ state->inbound_buffer.bytes = amqp_pool_alloc(&state->frame_pool, state->inbound_buffer.len);
+ if (state->inbound_buffer.bytes == NULL)
+ goto out_nomem;
- state->inbound_offset = 0;
- state->target_size = HEADER_SIZE;
+ state->state = CONNECTION_STATE_INITIAL;
+ /* the server protocol version response is 8 bytes, which conveniently
+ is also the minimum frame size */
+ state->target_size = 8;
state->sockfd = -1;
state->sock_inbound_buffer.len = INITIAL_INBOUND_SOCK_BUFFER_SIZE;
state->sock_inbound_buffer.bytes = malloc(INITIAL_INBOUND_SOCK_BUFFER_SIZE);
- if (state->sock_inbound_buffer.bytes == NULL) {
- amqp_destroy_connection(state);
- return NULL;
- }
-
- state->sock_inbound_offset = 0;
- state->sock_inbound_limit = 0;
-
- state->first_queued_frame = NULL;
- state->last_queued_frame = NULL;
+ if (state->sock_inbound_buffer.bytes == NULL)
+ goto out_nomem;
return state;
+
+ out_nomem:
+ free(state->sock_inbound_buffer.bytes);
+ empty_amqp_pool(&state->frame_pool);
+ empty_amqp_pool(&state->decoding_pool);
+ free(state);
+ return NULL;
}
int amqp_get_sockfd(amqp_connection_state_t state) {
@@ -178,154 +174,162 @@ static void return_to_idle(amqp_connection_state_t state) {
state->state = CONNECTION_STATE_IDLE;
}
+static size_t consume_data(amqp_connection_state_t state,
+ amqp_bytes_t *received_data)
+{
+ /* how much data is available and will fit? */
+ size_t bytes_consumed = state->target_size - state->inbound_offset;
+ if (received_data->len < bytes_consumed)
+ bytes_consumed = received_data->len;
+
+ memcpy(amqp_offset(state->inbound_buffer.bytes, state->inbound_offset),
+ received_data->bytes, bytes_consumed);
+ state->inbound_offset += bytes_consumed;
+ received_data->bytes = amqp_offset(received_data->bytes, bytes_consumed);
+ received_data->len -= bytes_consumed;
+
+ return bytes_consumed;
+}
+
int amqp_handle_input(amqp_connection_state_t state,
amqp_bytes_t received_data,
amqp_frame_t *decoded_frame)
{
- int total_bytes_consumed = 0;
- int bytes_consumed;
+ size_t bytes_consumed;
+ void *raw_frame;
/* Returning frame_type of zero indicates either insufficient input,
or a complete, ignored frame was read. */
decoded_frame->frame_type = 0;
- read_more:
- if (received_data.len == 0) {
- return total_bytes_consumed;
- }
+ if (received_data.len == 0)
+ return 0;
if (state->state == CONNECTION_STATE_IDLE) {
- state->inbound_buffer.bytes = amqp_pool_alloc(&state->frame_pool, state->inbound_buffer.len);
- if (state->inbound_buffer.bytes == NULL) {
+ state->inbound_buffer.bytes = amqp_pool_alloc(&state->frame_pool,
+ state->inbound_buffer.len);
+ if (state->inbound_buffer.bytes == NULL)
/* state->inbound_buffer.len is always nonzero, because it
corresponds to frame_max, which is not permitted to be less
than AMQP_FRAME_MIN_SIZE (currently 4096 bytes). */
return -ERROR_NO_MEMORY;
- }
- state->state = CONNECTION_STATE_WAITING_FOR_HEADER;
- }
- bytes_consumed = state->target_size - state->inbound_offset;
- if (received_data.len < bytes_consumed) {
- bytes_consumed = received_data.len;
+ state->state = CONNECTION_STATE_HEADER;
}
- E_BYTES(state->inbound_buffer, state->inbound_offset, bytes_consumed, received_data.bytes);
- state->inbound_offset += bytes_consumed;
- total_bytes_consumed += bytes_consumed;
+ bytes_consumed = consume_data(state, &received_data);
- assert(state->inbound_offset <= state->target_size);
+ /* do we have target_size data yet? if not, return with the
+ expectation that more will arrive */
+ if (state->inbound_offset < state->target_size)
+ return bytes_consumed;
- if (state->inbound_offset < state->target_size) {
- return total_bytes_consumed;
- }
+ raw_frame = state->inbound_buffer.bytes;
switch (state->state) {
- case CONNECTION_STATE_WAITING_FOR_HEADER:
- if (D_8(state->inbound_buffer, 0) == AMQP_PSEUDOFRAME_PROTOCOL_HEADER &&
- D_16(state->inbound_buffer, 1) == AMQP_PSEUDOFRAME_PROTOCOL_CHANNEL)
- {
- state->target_size = 8;
- state->state = CONNECTION_STATE_WAITING_FOR_PROTOCOL_HEADER;
- } else {
- state->target_size = D_32(state->inbound_buffer, 3) + HEADER_SIZE + FOOTER_SIZE;
- state->state = CONNECTION_STATE_WAITING_FOR_BODY;
- }
-
- /* Wind buffer forward, and try to read some body out of it. */
- received_data.len -= bytes_consumed;
- received_data.bytes = ((char *) received_data.bytes) + bytes_consumed;
- goto read_more;
-
- case CONNECTION_STATE_WAITING_FOR_BODY: {
- int frame_type = D_8(state->inbound_buffer, 0);
-
-#if 0
- printf("recving:\n");
- amqp_dump(state->inbound_buffer.bytes, state->target_size);
-#endif
-
- /* Check frame end marker (footer) */
- if (D_8(state->inbound_buffer, state->target_size - 1) != AMQP_FRAME_END) {
- return -ERROR_BAD_AMQP_DATA;
- }
-
- decoded_frame->channel = D_16(state->inbound_buffer, 1);
-
- switch (frame_type) {
- case AMQP_FRAME_METHOD: {
- amqp_bytes_t encoded;
-
- /* Four bytes of method ID before the method args. */
- encoded.len = state->target_size - (HEADER_SIZE + 4 + FOOTER_SIZE);
- encoded.bytes = D_BYTES(state->inbound_buffer, HEADER_SIZE + 4, encoded.len);
-
- decoded_frame->frame_type = AMQP_FRAME_METHOD;
- decoded_frame->payload.method.id = D_32(state->inbound_buffer, HEADER_SIZE);
- AMQP_CHECK_RESULT(amqp_decode_method(decoded_frame->payload.method.id,
- &state->decoding_pool,
- encoded,
- &decoded_frame->payload.method.decoded));
- break;
- }
-
- case AMQP_FRAME_HEADER: {
- amqp_bytes_t encoded;
-
- /* 12 bytes for properties header. */
- encoded.len = state->target_size - (HEADER_SIZE + 12 + FOOTER_SIZE);
- encoded.bytes = D_BYTES(state->inbound_buffer, HEADER_SIZE + 12, encoded.len);
-
- decoded_frame->frame_type = AMQP_FRAME_HEADER;
- decoded_frame->payload.properties.class_id = D_16(state->inbound_buffer, HEADER_SIZE);
- decoded_frame->payload.properties.body_size = D_64(state->inbound_buffer, HEADER_SIZE+4);
- decoded_frame->payload.properties.raw = encoded;
- AMQP_CHECK_RESULT(amqp_decode_properties(decoded_frame->payload.properties.class_id,
- &state->decoding_pool,
- encoded,
- &decoded_frame->payload.properties.decoded));
- break;
- }
-
- case AMQP_FRAME_BODY: {
- size_t fragment_len = state->target_size - (HEADER_SIZE + FOOTER_SIZE);
-
- decoded_frame->frame_type = AMQP_FRAME_BODY;
- decoded_frame->payload.body_fragment.len = fragment_len;
- decoded_frame->payload.body_fragment.bytes =
- D_BYTES(state->inbound_buffer, HEADER_SIZE, fragment_len);
- break;
- }
-
- case AMQP_FRAME_HEARTBEAT:
- decoded_frame->frame_type = AMQP_FRAME_HEARTBEAT;
- break;
-
- default:
- /* Ignore the frame by not changing frame_type away from 0. */
- break;
- }
+ case CONNECTION_STATE_INITIAL:
+ /* check for a protocol header from the server */
+ if (memcmp(raw_frame, "AMQP", 4) == 0) {
+ decoded_frame->frame_type = AMQP_PSEUDOFRAME_PROTOCOL_HEADER;
+ decoded_frame->channel = 0;
+
+ decoded_frame->payload.protocol_header.transport_high
+ = amqp_d8(raw_frame, 4);
+ decoded_frame->payload.protocol_header.transport_low
+ = amqp_d8(raw_frame, 5);
+ decoded_frame->payload.protocol_header.protocol_version_major
+ = amqp_d8(raw_frame, 6);
+ decoded_frame->payload.protocol_header.protocol_version_minor
+ = amqp_d8(raw_frame, 7);
return_to_idle(state);
- return total_bytes_consumed;
+ return bytes_consumed;
}
- case CONNECTION_STATE_WAITING_FOR_PROTOCOL_HEADER:
- decoded_frame->frame_type = AMQP_PSEUDOFRAME_PROTOCOL_HEADER;
- decoded_frame->channel = AMQP_PSEUDOFRAME_PROTOCOL_CHANNEL;
- if (D_8(state->inbound_buffer, 3) != (uint8_t) 'P')
- amqp_abort("Invalid protocol header received");
+ /* it's not a protocol header; fall through to process it as a
+ regular frame header */
- decoded_frame->payload.protocol_header.transport_high = D_8(state->inbound_buffer, 4);
- decoded_frame->payload.protocol_header.transport_low = D_8(state->inbound_buffer, 5);
- decoded_frame->payload.protocol_header.protocol_version_major = D_8(state->inbound_buffer, 6);
- decoded_frame->payload.protocol_header.protocol_version_minor = D_8(state->inbound_buffer, 7);
+ case CONNECTION_STATE_HEADER:
+ /* frame length is 3 bytes in */
+ state->target_size
+ = amqp_d32(raw_frame, 3) + HEADER_SIZE + FOOTER_SIZE;
+ state->state = CONNECTION_STATE_BODY;
- return_to_idle(state);
- return total_bytes_consumed;
+ bytes_consumed += consume_data(state, &received_data);
+
+ /* do we have target_size data yet? if not, return with the
+ expectation that more will arrive */
+ if (state->inbound_offset < state->target_size)
+ return bytes_consumed;
+
+ /* fall through to process body */
+
+ case CONNECTION_STATE_BODY: {
+ amqp_bytes_t encoded;
+ int res;
+
+ /* Check frame end marker (footer) */
+ if (amqp_d8(raw_frame, state->target_size - 1) != AMQP_FRAME_END)
+ return -ERROR_BAD_AMQP_DATA;
+
+ decoded_frame->frame_type = amqp_d8(raw_frame, 0);
+ decoded_frame->channel = amqp_d16(raw_frame, 1);
+
+ switch (decoded_frame->frame_type) {
+ case AMQP_FRAME_METHOD:
+ decoded_frame->payload.method.id = amqp_d32(raw_frame, HEADER_SIZE);
+ encoded.bytes = amqp_offset(raw_frame, HEADER_SIZE + 4);
+ encoded.len = state->target_size - HEADER_SIZE - 4 - FOOTER_SIZE;
+
+ res = amqp_decode_method(decoded_frame->payload.method.id,
+ &state->decoding_pool, encoded,
+ &decoded_frame->payload.method.decoded);
+ if (res < 0)
+ return res;
+
+ break;
+
+ case AMQP_FRAME_HEADER:
+ decoded_frame->payload.properties.class_id
+ = amqp_d16(raw_frame, HEADER_SIZE);
+ /* unused 2-byte weight field goes here */
+ decoded_frame->payload.properties.body_size
+ = amqp_d64(raw_frame, HEADER_SIZE + 4);
+ encoded.bytes = amqp_offset(raw_frame, HEADER_SIZE + 12);
+ encoded.len = state->target_size - HEADER_SIZE - 12 - FOOTER_SIZE;
+ decoded_frame->payload.properties.raw = encoded;
+
+ res = amqp_decode_properties(decoded_frame->payload.properties.class_id,
+ &state->decoding_pool, encoded,
+ &decoded_frame->payload.properties.decoded);
+ if (res < 0)
+ return res;
+
+ break;
+
+ case AMQP_FRAME_BODY:
+ decoded_frame->payload.body_fragment.len
+ = state->target_size - HEADER_SIZE - FOOTER_SIZE;
+ decoded_frame->payload.body_fragment.bytes
+ = amqp_offset(raw_frame, HEADER_SIZE);
+ break;
+
+ case AMQP_FRAME_HEARTBEAT:
+ break;
default:
- amqp_abort("Internal error: invalid amqp_connection_state_t->state %d", state->state);
+ /* Ignore the frame */
+ decoded_frame->frame_type = 0;
+ break;
+ }
+
+ return_to_idle(state);
+ return bytes_consumed;
+ }
+
+ default:
+ amqp_abort("Internal error: invalid amqp_connection_state_t->state %d", state->state);
+ return bytes_consumed;
}
}
@@ -349,103 +353,80 @@ void amqp_maybe_release_buffers(amqp_connection_state_t state) {
}
}
-static int inner_send_frame(amqp_connection_state_t state,
- amqp_frame_t const *frame,
- amqp_bytes_t *encoded,
- int *payload_len)
+int amqp_send_frame(amqp_connection_state_t state,
+ const amqp_frame_t *frame)
{
- int separate_body;
+ void *out_frame = state->outbound_buffer.bytes;
+ int res;
- E_8(state->outbound_buffer, 0, frame->frame_type);
- E_16(state->outbound_buffer, 1, frame->channel);
- switch (frame->frame_type) {
- case AMQP_FRAME_METHOD:
- E_32(state->outbound_buffer, HEADER_SIZE, frame->payload.method.id);
- encoded->len = state->outbound_buffer.len - (HEADER_SIZE + 4 + FOOTER_SIZE);
- encoded->bytes = D_BYTES(state->outbound_buffer, HEADER_SIZE + 4, encoded->len);
- *payload_len = AMQP_CHECK_RESULT(amqp_encode_method(frame->payload.method.id,
- frame->payload.method.decoded,
- *encoded)) + 4;
- separate_body = 0;
- break;
+ amqp_e8(out_frame, 0, frame->frame_type);
+ amqp_e16(out_frame, 1, frame->channel);
- case AMQP_FRAME_HEADER:
- E_16(state->outbound_buffer, HEADER_SIZE, frame->payload.properties.class_id);
- E_16(state->outbound_buffer, HEADER_SIZE+2, 0); /* "weight" */
- E_64(state->outbound_buffer, HEADER_SIZE+4, frame->payload.properties.body_size);
- encoded->len = state->outbound_buffer.len - (HEADER_SIZE + 12 + FOOTER_SIZE);
- encoded->bytes = D_BYTES(state->outbound_buffer, HEADER_SIZE + 12, encoded->len);
- *payload_len = AMQP_CHECK_RESULT(amqp_encode_properties(frame->payload.properties.class_id,
- frame->payload.properties.decoded,
- *encoded)) + 12;
- separate_body = 0;
- break;
+ if (frame->frame_type == AMQP_FRAME_BODY) {
+ /* For a body frame, rather than copying data around, we use
+ writev to compose the frame */
+ struct iovec iov[3];
+ char frame_end_byte = AMQP_FRAME_END;
+ const amqp_bytes_t *body = &frame->payload.body_fragment;
- case AMQP_FRAME_BODY:
- *encoded = frame->payload.body_fragment;
- *payload_len = encoded->len;
- separate_body = 1;
- break;
+ amqp_e32(out_frame, 3, body->len);
- case AMQP_FRAME_HEARTBEAT:
- *encoded = AMQP_EMPTY_BYTES;
- *payload_len = 0;
- separate_body = 0;
- break;
+ iov[0].iov_base = out_frame;
+ iov[0].iov_len = HEADER_SIZE;
+ iov[1].iov_base = body->bytes;
+ iov[1].iov_len = body->len;
+ iov[2].iov_base = &frame_end_byte;
+ iov[2].iov_len = FOOTER_SIZE;
- default:
- abort();
+ res = amqp_socket_writev(state->sockfd, iov, 3);
}
+ else {
+ size_t out_frame_len;
+ amqp_bytes_t encoded;
- E_32(state->outbound_buffer, 3, *payload_len);
- if (!separate_body) {
- E_8(state->outbound_buffer, *payload_len + HEADER_SIZE, AMQP_FRAME_END);
- }
+ switch (frame->frame_type) {
+ case AMQP_FRAME_METHOD:
+ amqp_e32(out_frame, HEADER_SIZE, frame->payload.method.id);
-#if 0
- if (separate_body) {
- printf("sending body frame (header):\n");
- amqp_dump(state->outbound_buffer.bytes, HEADER_SIZE);
- printf("sending body frame (payload):\n");
- amqp_dump(encoded->bytes, *payload_len);
- } else {
- printf("sending:\n");
- amqp_dump(state->outbound_buffer.bytes, *payload_len + HEADER_SIZE + FOOTER_SIZE);
- }
-#endif
+ encoded.bytes = amqp_offset(out_frame, HEADER_SIZE + 4);
+ encoded.len = state->outbound_buffer.len - HEADER_SIZE - 4 - FOOTER_SIZE;
- return separate_body;
-}
+ res = amqp_encode_method(frame->payload.method.id,
+ frame->payload.method.decoded, encoded);
+ if (res < 0)
+ return res;
-int amqp_send_frame(amqp_connection_state_t state,
- amqp_frame_t const *frame)
-{
- amqp_bytes_t encoded;
- int payload_len, res;
-
- res = inner_send_frame(state, frame, &encoded, &payload_len);
- switch (res) {
- case 0:
- res = send(state->sockfd, state->outbound_buffer.bytes,
- payload_len + (HEADER_SIZE + FOOTER_SIZE), 0);
+ out_frame_len = res + 4;
break;
- case 1: {
- struct iovec iov[3];
- char frame_end_byte = AMQP_FRAME_END;
- iov[0].iov_base = state->outbound_buffer.bytes;
- iov[0].iov_len = HEADER_SIZE;
- iov[1].iov_base = encoded.bytes;
- iov[1].iov_len = payload_len;
- iov[2].iov_base = &frame_end_byte;
- assert(FOOTER_SIZE == 1);
- iov[2].iov_len = FOOTER_SIZE;
- res = amqp_socket_writev(state->sockfd, &iov[0], 3);
+ case AMQP_FRAME_HEADER:
+ amqp_e16(out_frame, HEADER_SIZE, frame->payload.properties.class_id);
+ amqp_e16(out_frame, HEADER_SIZE+2, 0); /* "weight" */
+ amqp_e64(out_frame, HEADER_SIZE+4, frame->payload.properties.body_size);
+
+ encoded.bytes = amqp_offset(out_frame, HEADER_SIZE + 12);
+ encoded.len = state->outbound_buffer.len - HEADER_SIZE - 12 - FOOTER_SIZE;
+
+ res = amqp_encode_properties(frame->payload.properties.class_id,
+ frame->payload.properties.decoded, encoded);
+ if (res < 0)
+ return res;
+
+ out_frame_len = res + 12;
+ break;
+
+ case AMQP_FRAME_HEARTBEAT:
+ out_frame_len = 0;
break;
- }
default:
- return res;
+ abort();
+ }
+
+ amqp_e32(out_frame, 3, out_frame_len);
+ amqp_e8(out_frame, out_frame_len + HEADER_SIZE, AMQP_FRAME_END);
+ res = send(state->sockfd, out_frame,
+ out_frame_len + HEADER_SIZE + FOOTER_SIZE, 0);
}
if (res < 0)
@@ -453,35 +434,3 @@ int amqp_send_frame(amqp_connection_state_t state,
else
return 0;
}
-
-int amqp_send_frame_to(amqp_connection_state_t state,
- amqp_frame_t const *frame,
- amqp_output_fn_t fn,
- void *context)
-{
- amqp_bytes_t encoded;
- int payload_len;
- int separate_body;
-
- separate_body = inner_send_frame(state, frame, &encoded, &payload_len);
- switch (separate_body) {
- case 0:
- AMQP_CHECK_RESULT(fn(context,
- state->outbound_buffer.bytes,
- payload_len + (HEADER_SIZE + FOOTER_SIZE)));
- return 0;
-
- case 1:
- AMQP_CHECK_RESULT(fn(context, state->outbound_buffer.bytes, HEADER_SIZE));
- AMQP_CHECK_RESULT(fn(context, encoded.bytes, payload_len));
- {
- assert(FOOTER_SIZE == 1);
- char frame_end_byte = AMQP_FRAME_END;
- AMQP_CHECK_RESULT(fn(context, &frame_end_byte, FOOTER_SIZE));
- }
- return 0;
-
- default:
- return separate_body;
- }
-}
diff --git a/librabbitmq/amqp_private.h b/librabbitmq/amqp_private.h
index 0ff71c7..439008a 100644
--- a/librabbitmq/amqp_private.h
+++ b/librabbitmq/amqp_private.h
@@ -79,41 +79,40 @@ extern char *amqp_os_error_string(int err);
#include "socket.h"
/*
- * Connection states:
+ * Connection states: XXX FIX THIS
*
- * - CONNECTION_STATE_IDLE: initial state, and entered again after
- * each frame is completed. Means that no bytes of the next frame
- * have been seen yet. Connections may only be reconfigured, and the
+ * - CONNECTION_STATE_INITIAL: The initial state, when we cannot be
+ * sure if the next thing we will get is the first AMQP frame, or a
+ * protocol header from the server.
+ *
+ * - CONNECTION_STATE_IDLE: The normal state between
+ * frames. Connections may only be reconfigured, and the
* connection's pools recycled, when in this state. Whenever we're
* in this state, the inbound_buffer's bytes pointer must be NULL;
* any other state, and it must point to a block of memory allocated
* from the frame_pool.
*
- * - CONNECTION_STATE_WAITING_FOR_HEADER: Some bytes of an incoming
- * frame have been seen, but not a complete frame header's worth.
- *
- * - CONNECTION_STATE_WAITING_FOR_BODY: A complete frame header has
- * been seen, but the frame is not yet complete. When it is
- * completed, it will be returned, and the connection will return to
- * IDLE state.
+ * - CONNECTION_STATE_HEADER: Some bytes of an incoming frame have
+ * been seen, but not a complete frame header's worth.
*
- * - CONNECTION_STATE_WAITING_FOR_PROTOCOL_HEADER: The beginning of a
- * protocol version header has been seen, but the full eight bytes
- * hasn't yet been received. When it is completed, it will be
+ * - CONNECTION_STATE_BODY: A complete frame header has been seen, but
+ * the frame is not yet complete. When it is completed, it will be
* returned, and the connection will return to IDLE state.
*
*/
typedef enum amqp_connection_state_enum_ {
CONNECTION_STATE_IDLE = 0,
- CONNECTION_STATE_WAITING_FOR_HEADER,
- CONNECTION_STATE_WAITING_FOR_BODY,
- CONNECTION_STATE_WAITING_FOR_PROTOCOL_HEADER
+ CONNECTION_STATE_INITIAL,
+ CONNECTION_STATE_HEADER,
+ CONNECTION_STATE_BODY,
} amqp_connection_state_enum;
/* 7 bytes up front, then payload, then 1 byte footer */
#define HEADER_SIZE 7
#define FOOTER_SIZE 1
+#define AMQP_PSEUDOFRAME_PROTOCOL_HEADER 'A'
+
typedef struct amqp_link_t_ {
struct amqp_link_t_ *next;
void *data;
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index 6f93d0c..109f60b 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -113,13 +113,6 @@ int amqp_send_header(amqp_connection_state_t state) {
return send(state->sockfd, header(), 8, 0);
}
-int amqp_send_header_to(amqp_connection_state_t state,
- amqp_output_fn_t fn,
- void *context)
-{
- return fn(context, header(), 8);
-}
-
static amqp_bytes_t sasl_method_name(amqp_sasl_method_enum method) {
switch (method) {
case AMQP_SASL_METHOD_PLAIN: return (amqp_bytes_t) {.len = 5, .bytes = "PLAIN"};