diff options
author | Tony Garnock-Jones <tonyg@kcbbs.gen.nz> | 2009-05-10 19:50:13 +0100 |
---|---|---|
committer | Tony Garnock-Jones <tonyg@kcbbs.gen.nz> | 2009-05-10 19:50:13 +0100 |
commit | 9c66730ac06685622eebf5e93b002d3f64f40468 (patch) | |
tree | 14b693efd36e2f25dd319f178d892b9ac199e699 /librabbitmq | |
parent | a5a5525d048f7a08a60debe41750cba8b5b5ccd7 (diff) | |
download | rabbitmq-c-github-ask-9c66730ac06685622eebf5e93b002d3f64f40468.tar.gz |
Support generic frame transmission.
Diffstat (limited to 'librabbitmq')
-rw-r--r-- | librabbitmq/amqp.h | 7 | ||||
-rw-r--r-- | librabbitmq/amqp_connection.c | 130 |
2 files changed, 99 insertions, 38 deletions
diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h index 69dc978..a7160f3 100644 --- a/librabbitmq/amqp.h +++ b/librabbitmq/amqp.h @@ -145,12 +145,19 @@ extern void amqp_maybe_release_buffers(amqp_connection_state_t state); extern int amqp_send_frame(amqp_connection_state_t state, amqp_frame_t const *frame); +extern int amqp_send_frame_to(amqp_connection_state_t state, + amqp_frame_t const *frame, + int (*fn)(void *context, void *buffer, size_t count), + void *context); extern int amqp_table_entry_cmp(void const *entry1, void const *entry2); extern int amqp_open_socket(char const *hostname, int portnumber); extern int amqp_send_header(amqp_connection_state_t state); +extern int amqp_send_header_to(amqp_connection_state_t state, + int (*fn)(void *context, void *buffer, size_t count), + void *context); extern amqp_boolean_t amqp_frames_enqueued(amqp_connection_state_t state); diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c index 3d5f87b..9c6eb9d 100644 --- a/librabbitmq/amqp_connection.c +++ b/librabbitmq/amqp_connection.c @@ -273,72 +273,126 @@ void amqp_maybe_release_buffers(amqp_connection_state_t state) { } } -int amqp_send_frame(amqp_connection_state_t state, - amqp_frame_t const *frame) +static int inner_send_frame(amqp_connection_state_t state, + amqp_frame_t const *frame, + amqp_bytes_t *encoded, + int *payload_len) { - amqp_bytes_t encoded; - int payload_len; int separate_body; E_8(state->outbound_buffer, 0, frame->frame_type); E_16(state->outbound_buffer, 1, frame->channel); switch (frame->frame_type) { - case AMQP_FRAME_METHOD: { + case AMQP_FRAME_METHOD: E_32(state->outbound_buffer, HEADER_SIZE, frame->payload.method.id); - encoded.len = state->outbound_buffer.len - (HEADER_SIZE + 4 + FOOTER_SIZE); - encoded.bytes = D_BYTES(state->outbound_buffer, HEADER_SIZE + 4, encoded.len); - payload_len = AMQP_CHECK_RESULT(amqp_encode_method(frame->payload.method.id, - frame->payload.method.decoded, - encoded)) + 4; + encoded->len = state->outbound_buffer.len - (HEADER_SIZE + 4 + FOOTER_SIZE); + encoded->bytes = D_BYTES(state->outbound_buffer, HEADER_SIZE + 4, encoded->len); + *payload_len = AMQP_CHECK_RESULT(amqp_encode_method(frame->payload.method.id, + frame->payload.method.decoded, + *encoded)) + 4; separate_body = 0; break; - } - case AMQP_FRAME_HEADER: { + + case AMQP_FRAME_HEADER: E_16(state->outbound_buffer, HEADER_SIZE, frame->payload.properties.class_id); E_16(state->outbound_buffer, HEADER_SIZE+2, 0); /* "weight" */ E_64(state->outbound_buffer, HEADER_SIZE+4, frame->payload.properties.body_size); - encoded.len = state->outbound_buffer.len - (HEADER_SIZE + 12 + FOOTER_SIZE); - encoded.bytes = D_BYTES(state->outbound_buffer, HEADER_SIZE + 12, encoded.len); - payload_len = AMQP_CHECK_RESULT(amqp_encode_properties(frame->payload.properties.class_id, - frame->payload.properties.decoded, - encoded)) + 12; + encoded->len = state->outbound_buffer.len - (HEADER_SIZE + 12 + FOOTER_SIZE); + encoded->bytes = D_BYTES(state->outbound_buffer, HEADER_SIZE + 12, encoded->len); + *payload_len = AMQP_CHECK_RESULT(amqp_encode_properties(frame->payload.properties.class_id, + frame->payload.properties.decoded, + *encoded)) + 12; separate_body = 0; break; - } - case AMQP_FRAME_BODY: { - encoded = frame->payload.body_fragment; - payload_len = encoded.len; + + case AMQP_FRAME_BODY: + *encoded = frame->payload.body_fragment; + *payload_len = encoded->len; separate_body = 1; break; - } + default: return -EINVAL; } - E_32(state->outbound_buffer, 3, payload_len); + E_32(state->outbound_buffer, 3, *payload_len); + if (!separate_body) { + E_8(state->outbound_buffer, *payload_len + HEADER_SIZE, AMQP_FRAME_END); + } - if (separate_body) { - char frame_end_byte = AMQP_FRAME_END; #if 0 + if (separate_body) { printf("sending body frame (header):\n"); amqp_dump(state->outbound_buffer.bytes, HEADER_SIZE); printf("sending body frame (payload):\n"); - amqp_dump(encoded.bytes, payload_len); -#endif - AMQP_CHECK_RESULT(write(state->sockfd, state->outbound_buffer.bytes, HEADER_SIZE)); - AMQP_CHECK_RESULT(write(state->sockfd, encoded.bytes, payload_len)); - assert(FOOTER_SIZE == 1); - AMQP_CHECK_RESULT(write(state->sockfd, &frame_end_byte, FOOTER_SIZE)); + amqp_dump(encoded->bytes, *payload_len); } else { - E_8(state->outbound_buffer, payload_len + HEADER_SIZE, AMQP_FRAME_END); -#if 0 printf("sending:\n"); - amqp_dump(state->outbound_buffer.bytes, payload_len + HEADER_SIZE + FOOTER_SIZE); + amqp_dump(state->outbound_buffer.bytes, *payload_len + HEADER_SIZE + FOOTER_SIZE); + } #endif - AMQP_CHECK_RESULT(write(state->sockfd, - state->outbound_buffer.bytes, - payload_len + (HEADER_SIZE + FOOTER_SIZE))); + + return separate_body; +} + +int amqp_send_frame(amqp_connection_state_t state, + amqp_frame_t const *frame) +{ + amqp_bytes_t encoded; + int payload_len; + int separate_body; + + separate_body = inner_send_frame(state, frame, &encoded, &payload_len); + switch (separate_body) { + case 0: + AMQP_CHECK_RESULT(write(state->sockfd, + state->outbound_buffer.bytes, + payload_len + (HEADER_SIZE + FOOTER_SIZE))); + return 0; + + case 1: + AMQP_CHECK_RESULT(write(state->sockfd, state->outbound_buffer.bytes, HEADER_SIZE)); + AMQP_CHECK_RESULT(write(state->sockfd, encoded.bytes, payload_len)); + { + assert(FOOTER_SIZE == 1); + char frame_end_byte = AMQP_FRAME_END; + AMQP_CHECK_RESULT(write(state->sockfd, &frame_end_byte, FOOTER_SIZE)); + } + return 0; + + default: + return separate_body; } +} - return 0; +int amqp_send_frame_to(amqp_connection_state_t state, + amqp_frame_t const *frame, + int (*fn)(void *context, void *buffer, size_t count), + void *context) +{ + amqp_bytes_t encoded; + int payload_len; + int separate_body; + + separate_body = inner_send_frame(state, frame, &encoded, &payload_len); + switch (separate_body) { + case 0: + AMQP_CHECK_RESULT(fn(context, + state->outbound_buffer.bytes, + payload_len + (HEADER_SIZE + FOOTER_SIZE))); + return 0; + + case 1: + AMQP_CHECK_RESULT(fn(context, state->outbound_buffer.bytes, HEADER_SIZE)); + AMQP_CHECK_RESULT(fn(context, encoded.bytes, payload_len)); + { + assert(FOOTER_SIZE == 1); + char frame_end_byte = AMQP_FRAME_END; + AMQP_CHECK_RESULT(fn(context, &frame_end_byte, FOOTER_SIZE)); + } + return 0; + + default: + return separate_body; + } } |