summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTony Garnock-Jones <tonyg@kcbbs.gen.nz>2009-05-10 19:50:13 +0100
committerTony Garnock-Jones <tonyg@kcbbs.gen.nz>2009-05-10 19:50:13 +0100
commit9c66730ac06685622eebf5e93b002d3f64f40468 (patch)
tree14b693efd36e2f25dd319f178d892b9ac199e699
parenta5a5525d048f7a08a60debe41750cba8b5b5ccd7 (diff)
downloadrabbitmq-c-github-ask-9c66730ac06685622eebf5e93b002d3f64f40468.tar.gz
Support generic frame transmission.
-rw-r--r--librabbitmq/amqp.h7
-rw-r--r--librabbitmq/amqp_connection.c130
2 files changed, 99 insertions, 38 deletions
diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h
index 69dc978..a7160f3 100644
--- a/librabbitmq/amqp.h
+++ b/librabbitmq/amqp.h
@@ -145,12 +145,19 @@ extern void amqp_maybe_release_buffers(amqp_connection_state_t state);
extern int amqp_send_frame(amqp_connection_state_t state,
amqp_frame_t const *frame);
+extern int amqp_send_frame_to(amqp_connection_state_t state,
+ amqp_frame_t const *frame,
+ int (*fn)(void *context, void *buffer, size_t count),
+ void *context);
extern int amqp_table_entry_cmp(void const *entry1, void const *entry2);
extern int amqp_open_socket(char const *hostname, int portnumber);
extern int amqp_send_header(amqp_connection_state_t state);
+extern int amqp_send_header_to(amqp_connection_state_t state,
+ int (*fn)(void *context, void *buffer, size_t count),
+ void *context);
extern amqp_boolean_t amqp_frames_enqueued(amqp_connection_state_t state);
diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c
index 3d5f87b..9c6eb9d 100644
--- a/librabbitmq/amqp_connection.c
+++ b/librabbitmq/amqp_connection.c
@@ -273,72 +273,126 @@ void amqp_maybe_release_buffers(amqp_connection_state_t state) {
}
}
-int amqp_send_frame(amqp_connection_state_t state,
- amqp_frame_t const *frame)
+static int inner_send_frame(amqp_connection_state_t state,
+ amqp_frame_t const *frame,
+ amqp_bytes_t *encoded,
+ int *payload_len)
{
- amqp_bytes_t encoded;
- int payload_len;
int separate_body;
E_8(state->outbound_buffer, 0, frame->frame_type);
E_16(state->outbound_buffer, 1, frame->channel);
switch (frame->frame_type) {
- case AMQP_FRAME_METHOD: {
+ case AMQP_FRAME_METHOD:
E_32(state->outbound_buffer, HEADER_SIZE, frame->payload.method.id);
- encoded.len = state->outbound_buffer.len - (HEADER_SIZE + 4 + FOOTER_SIZE);
- encoded.bytes = D_BYTES(state->outbound_buffer, HEADER_SIZE + 4, encoded.len);
- payload_len = AMQP_CHECK_RESULT(amqp_encode_method(frame->payload.method.id,
- frame->payload.method.decoded,
- encoded)) + 4;
+ encoded->len = state->outbound_buffer.len - (HEADER_SIZE + 4 + FOOTER_SIZE);
+ encoded->bytes = D_BYTES(state->outbound_buffer, HEADER_SIZE + 4, encoded->len);
+ *payload_len = AMQP_CHECK_RESULT(amqp_encode_method(frame->payload.method.id,
+ frame->payload.method.decoded,
+ *encoded)) + 4;
separate_body = 0;
break;
- }
- case AMQP_FRAME_HEADER: {
+
+ case AMQP_FRAME_HEADER:
E_16(state->outbound_buffer, HEADER_SIZE, frame->payload.properties.class_id);
E_16(state->outbound_buffer, HEADER_SIZE+2, 0); /* "weight" */
E_64(state->outbound_buffer, HEADER_SIZE+4, frame->payload.properties.body_size);
- encoded.len = state->outbound_buffer.len - (HEADER_SIZE + 12 + FOOTER_SIZE);
- encoded.bytes = D_BYTES(state->outbound_buffer, HEADER_SIZE + 12, encoded.len);
- payload_len = AMQP_CHECK_RESULT(amqp_encode_properties(frame->payload.properties.class_id,
- frame->payload.properties.decoded,
- encoded)) + 12;
+ encoded->len = state->outbound_buffer.len - (HEADER_SIZE + 12 + FOOTER_SIZE);
+ encoded->bytes = D_BYTES(state->outbound_buffer, HEADER_SIZE + 12, encoded->len);
+ *payload_len = AMQP_CHECK_RESULT(amqp_encode_properties(frame->payload.properties.class_id,
+ frame->payload.properties.decoded,
+ *encoded)) + 12;
separate_body = 0;
break;
- }
- case AMQP_FRAME_BODY: {
- encoded = frame->payload.body_fragment;
- payload_len = encoded.len;
+
+ case AMQP_FRAME_BODY:
+ *encoded = frame->payload.body_fragment;
+ *payload_len = encoded->len;
separate_body = 1;
break;
- }
+
default:
return -EINVAL;
}
- E_32(state->outbound_buffer, 3, payload_len);
+ E_32(state->outbound_buffer, 3, *payload_len);
+ if (!separate_body) {
+ E_8(state->outbound_buffer, *payload_len + HEADER_SIZE, AMQP_FRAME_END);
+ }
- if (separate_body) {
- char frame_end_byte = AMQP_FRAME_END;
#if 0
+ if (separate_body) {
printf("sending body frame (header):\n");
amqp_dump(state->outbound_buffer.bytes, HEADER_SIZE);
printf("sending body frame (payload):\n");
- amqp_dump(encoded.bytes, payload_len);
-#endif
- AMQP_CHECK_RESULT(write(state->sockfd, state->outbound_buffer.bytes, HEADER_SIZE));
- AMQP_CHECK_RESULT(write(state->sockfd, encoded.bytes, payload_len));
- assert(FOOTER_SIZE == 1);
- AMQP_CHECK_RESULT(write(state->sockfd, &frame_end_byte, FOOTER_SIZE));
+ amqp_dump(encoded->bytes, *payload_len);
} else {
- E_8(state->outbound_buffer, payload_len + HEADER_SIZE, AMQP_FRAME_END);
-#if 0
printf("sending:\n");
- amqp_dump(state->outbound_buffer.bytes, payload_len + HEADER_SIZE + FOOTER_SIZE);
+ amqp_dump(state->outbound_buffer.bytes, *payload_len + HEADER_SIZE + FOOTER_SIZE);
+ }
#endif
- AMQP_CHECK_RESULT(write(state->sockfd,
- state->outbound_buffer.bytes,
- payload_len + (HEADER_SIZE + FOOTER_SIZE)));
+
+ return separate_body;
+}
+
+int amqp_send_frame(amqp_connection_state_t state,
+ amqp_frame_t const *frame)
+{
+ amqp_bytes_t encoded;
+ int payload_len;
+ int separate_body;
+
+ separate_body = inner_send_frame(state, frame, &encoded, &payload_len);
+ switch (separate_body) {
+ case 0:
+ AMQP_CHECK_RESULT(write(state->sockfd,
+ state->outbound_buffer.bytes,
+ payload_len + (HEADER_SIZE + FOOTER_SIZE)));
+ return 0;
+
+ case 1:
+ AMQP_CHECK_RESULT(write(state->sockfd, state->outbound_buffer.bytes, HEADER_SIZE));
+ AMQP_CHECK_RESULT(write(state->sockfd, encoded.bytes, payload_len));
+ {
+ assert(FOOTER_SIZE == 1);
+ char frame_end_byte = AMQP_FRAME_END;
+ AMQP_CHECK_RESULT(write(state->sockfd, &frame_end_byte, FOOTER_SIZE));
+ }
+ return 0;
+
+ default:
+ return separate_body;
}
+}
- return 0;
+int amqp_send_frame_to(amqp_connection_state_t state,
+ amqp_frame_t const *frame,
+ int (*fn)(void *context, void *buffer, size_t count),
+ void *context)
+{
+ amqp_bytes_t encoded;
+ int payload_len;
+ int separate_body;
+
+ separate_body = inner_send_frame(state, frame, &encoded, &payload_len);
+ switch (separate_body) {
+ case 0:
+ AMQP_CHECK_RESULT(fn(context,
+ state->outbound_buffer.bytes,
+ payload_len + (HEADER_SIZE + FOOTER_SIZE)));
+ return 0;
+
+ case 1:
+ AMQP_CHECK_RESULT(fn(context, state->outbound_buffer.bytes, HEADER_SIZE));
+ AMQP_CHECK_RESULT(fn(context, encoded.bytes, payload_len));
+ {
+ assert(FOOTER_SIZE == 1);
+ char frame_end_byte = AMQP_FRAME_END;
+ AMQP_CHECK_RESULT(fn(context, &frame_end_byte, FOOTER_SIZE));
+ }
+ return 0;
+
+ default:
+ return separate_body;
+ }
}