summaryrefslogtreecommitdiff
path: root/librabbitmq/amqp_connection.c
diff options
context:
space:
mode:
Diffstat (limited to 'librabbitmq/amqp_connection.c')
-rw-r--r--librabbitmq/amqp_connection.c130
1 files changed, 92 insertions, 38 deletions
diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c
index 3d5f87b..9c6eb9d 100644
--- a/librabbitmq/amqp_connection.c
+++ b/librabbitmq/amqp_connection.c
@@ -273,72 +273,126 @@ void amqp_maybe_release_buffers(amqp_connection_state_t state) {
}
}
-int amqp_send_frame(amqp_connection_state_t state,
- amqp_frame_t const *frame)
+static int inner_send_frame(amqp_connection_state_t state,
+ amqp_frame_t const *frame,
+ amqp_bytes_t *encoded,
+ int *payload_len)
{
- amqp_bytes_t encoded;
- int payload_len;
int separate_body;
E_8(state->outbound_buffer, 0, frame->frame_type);
E_16(state->outbound_buffer, 1, frame->channel);
switch (frame->frame_type) {
- case AMQP_FRAME_METHOD: {
+ case AMQP_FRAME_METHOD:
E_32(state->outbound_buffer, HEADER_SIZE, frame->payload.method.id);
- encoded.len = state->outbound_buffer.len - (HEADER_SIZE + 4 + FOOTER_SIZE);
- encoded.bytes = D_BYTES(state->outbound_buffer, HEADER_SIZE + 4, encoded.len);
- payload_len = AMQP_CHECK_RESULT(amqp_encode_method(frame->payload.method.id,
- frame->payload.method.decoded,
- encoded)) + 4;
+ encoded->len = state->outbound_buffer.len - (HEADER_SIZE + 4 + FOOTER_SIZE);
+ encoded->bytes = D_BYTES(state->outbound_buffer, HEADER_SIZE + 4, encoded->len);
+ *payload_len = AMQP_CHECK_RESULT(amqp_encode_method(frame->payload.method.id,
+ frame->payload.method.decoded,
+ *encoded)) + 4;
separate_body = 0;
break;
- }
- case AMQP_FRAME_HEADER: {
+
+ case AMQP_FRAME_HEADER:
E_16(state->outbound_buffer, HEADER_SIZE, frame->payload.properties.class_id);
E_16(state->outbound_buffer, HEADER_SIZE+2, 0); /* "weight" */
E_64(state->outbound_buffer, HEADER_SIZE+4, frame->payload.properties.body_size);
- encoded.len = state->outbound_buffer.len - (HEADER_SIZE + 12 + FOOTER_SIZE);
- encoded.bytes = D_BYTES(state->outbound_buffer, HEADER_SIZE + 12, encoded.len);
- payload_len = AMQP_CHECK_RESULT(amqp_encode_properties(frame->payload.properties.class_id,
- frame->payload.properties.decoded,
- encoded)) + 12;
+ encoded->len = state->outbound_buffer.len - (HEADER_SIZE + 12 + FOOTER_SIZE);
+ encoded->bytes = D_BYTES(state->outbound_buffer, HEADER_SIZE + 12, encoded->len);
+ *payload_len = AMQP_CHECK_RESULT(amqp_encode_properties(frame->payload.properties.class_id,
+ frame->payload.properties.decoded,
+ *encoded)) + 12;
separate_body = 0;
break;
- }
- case AMQP_FRAME_BODY: {
- encoded = frame->payload.body_fragment;
- payload_len = encoded.len;
+
+ case AMQP_FRAME_BODY:
+ *encoded = frame->payload.body_fragment;
+ *payload_len = encoded->len;
separate_body = 1;
break;
- }
+
default:
return -EINVAL;
}
- E_32(state->outbound_buffer, 3, payload_len);
+ E_32(state->outbound_buffer, 3, *payload_len);
+ if (!separate_body) {
+ E_8(state->outbound_buffer, *payload_len + HEADER_SIZE, AMQP_FRAME_END);
+ }
- if (separate_body) {
- char frame_end_byte = AMQP_FRAME_END;
#if 0
+ if (separate_body) {
printf("sending body frame (header):\n");
amqp_dump(state->outbound_buffer.bytes, HEADER_SIZE);
printf("sending body frame (payload):\n");
- amqp_dump(encoded.bytes, payload_len);
-#endif
- AMQP_CHECK_RESULT(write(state->sockfd, state->outbound_buffer.bytes, HEADER_SIZE));
- AMQP_CHECK_RESULT(write(state->sockfd, encoded.bytes, payload_len));
- assert(FOOTER_SIZE == 1);
- AMQP_CHECK_RESULT(write(state->sockfd, &frame_end_byte, FOOTER_SIZE));
+ amqp_dump(encoded->bytes, *payload_len);
} else {
- E_8(state->outbound_buffer, payload_len + HEADER_SIZE, AMQP_FRAME_END);
-#if 0
printf("sending:\n");
- amqp_dump(state->outbound_buffer.bytes, payload_len + HEADER_SIZE + FOOTER_SIZE);
+ amqp_dump(state->outbound_buffer.bytes, *payload_len + HEADER_SIZE + FOOTER_SIZE);
+ }
#endif
- AMQP_CHECK_RESULT(write(state->sockfd,
- state->outbound_buffer.bytes,
- payload_len + (HEADER_SIZE + FOOTER_SIZE)));
+
+ return separate_body;
+}
+
+int amqp_send_frame(amqp_connection_state_t state,
+ amqp_frame_t const *frame)
+{
+ amqp_bytes_t encoded;
+ int payload_len;
+ int separate_body;
+
+ separate_body = inner_send_frame(state, frame, &encoded, &payload_len);
+ switch (separate_body) {
+ case 0:
+ AMQP_CHECK_RESULT(write(state->sockfd,
+ state->outbound_buffer.bytes,
+ payload_len + (HEADER_SIZE + FOOTER_SIZE)));
+ return 0;
+
+ case 1:
+ AMQP_CHECK_RESULT(write(state->sockfd, state->outbound_buffer.bytes, HEADER_SIZE));
+ AMQP_CHECK_RESULT(write(state->sockfd, encoded.bytes, payload_len));
+ {
+ assert(FOOTER_SIZE == 1);
+ char frame_end_byte = AMQP_FRAME_END;
+ AMQP_CHECK_RESULT(write(state->sockfd, &frame_end_byte, FOOTER_SIZE));
+ }
+ return 0;
+
+ default:
+ return separate_body;
}
+}
- return 0;
+int amqp_send_frame_to(amqp_connection_state_t state,
+ amqp_frame_t const *frame,
+ int (*fn)(void *context, void *buffer, size_t count),
+ void *context)
+{
+ amqp_bytes_t encoded;
+ int payload_len;
+ int separate_body;
+
+ separate_body = inner_send_frame(state, frame, &encoded, &payload_len);
+ switch (separate_body) {
+ case 0:
+ AMQP_CHECK_RESULT(fn(context,
+ state->outbound_buffer.bytes,
+ payload_len + (HEADER_SIZE + FOOTER_SIZE)));
+ return 0;
+
+ case 1:
+ AMQP_CHECK_RESULT(fn(context, state->outbound_buffer.bytes, HEADER_SIZE));
+ AMQP_CHECK_RESULT(fn(context, encoded.bytes, payload_len));
+ {
+ assert(FOOTER_SIZE == 1);
+ char frame_end_byte = AMQP_FRAME_END;
+ AMQP_CHECK_RESULT(fn(context, &frame_end_byte, FOOTER_SIZE));
+ }
+ return 0;
+
+ default:
+ return separate_body;
+ }
}