summaryrefslogtreecommitdiff
path: root/librabbitmq
diff options
context:
space:
mode:
authorTony Garnock-Jones <tonyg@kcbbs.gen.nz>2009-04-26 23:21:23 +0100
committerTony Garnock-Jones <tonyg@kcbbs.gen.nz>2009-04-26 23:21:23 +0100
commite1dec49d13caa05af05984af12f3e8354387f33f (patch)
tree8e83237174c0db0c7fd4e1b42ce7c8b3afb8b3ee /librabbitmq
parentaf9315c575b8788b28bb3c5d959a9462b12b9a15 (diff)
downloadrabbitmq-c-github-ask-e1dec49d13caa05af05984af12f3e8354387f33f.tar.gz
Publication works!
Diffstat (limited to 'librabbitmq')
-rw-r--r--librabbitmq/Makefile.am2
-rw-r--r--librabbitmq/amqp.h80
-rw-r--r--librabbitmq/amqp_connection.c299
-rw-r--r--librabbitmq/amqp_debug.c83
-rw-r--r--librabbitmq/amqp_mem.c13
-rw-r--r--librabbitmq/amqp_private.h94
-rw-r--r--librabbitmq/amqp_socket.c376
-rw-r--r--librabbitmq/amqp_table.c15
-rw-r--r--librabbitmq/codegen.py36
9 files changed, 824 insertions, 174 deletions
diff --git a/librabbitmq/Makefile.am b/librabbitmq/Makefile.am
index 6fae6d3..5ecd370 100644
--- a/librabbitmq/Makefile.am
+++ b/librabbitmq/Makefile.am
@@ -1,6 +1,6 @@
lib_LTLIBRARIES = librabbitmq.la
-librabbitmq_la_SOURCES = amqp_mem.c amqp_table.c amqp_connection.c amqp_socket.c
+librabbitmq_la_SOURCES = amqp_mem.c amqp_table.c amqp_connection.c amqp_socket.c amqp_debug.c
nodist_librabbitmq_la_SOURCES = amqp_framing.c
librabbitmq_la_INCLUDES = amqp_framing.h amqp.h
noinst_librabbitmq_la_INCLUDES = amqp_private.h
diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h
index 314e2c9..18a7c76 100644
--- a/librabbitmq/amqp.h
+++ b/librabbitmq/amqp.h
@@ -5,8 +5,6 @@
extern "C" {
#endif
-typedef int (*amqp_writer_fun_t)(int context, void const *buf, size_t len);
-
typedef int amqp_boolean_t;
typedef uint32_t amqp_method_number_t;
typedef uint32_t amqp_flags_t;
@@ -55,28 +53,41 @@ typedef struct amqp_pool_t_ {
size_t alloc_used;
} amqp_pool_t;
+typedef struct amqp_method_t_ {
+ amqp_method_number_t id;
+ void *decoded;
+} amqp_method_t;
+
typedef struct amqp_frame_t_ {
uint8_t frame_type; /* 0 means no event */
amqp_channel_t channel;
union {
- struct {
- amqp_method_number_t id;
- void *decoded;
- } method;
+ amqp_method_t method;
struct {
uint16_t class_id;
+ uint64_t body_size;
void *decoded;
} properties;
amqp_bytes_t body_fragment;
} payload;
} amqp_frame_t;
-#define AMQP_EXCEPTION_CATEGORY_UNKNOWN 0
-#define AMQP_EXCEPTION_CATEGORY_CONNECTION 1
-#define AMQP_EXCEPTION_CATEGORY_CHANNEL 2
+typedef enum amqp_response_type_enum_ {
+ AMQP_RESPONSE_NONE = 0,
+ AMQP_RESPONSE_NORMAL,
+ AMQP_RESPONSE_LIBRARY_EXCEPTION,
+ AMQP_RESPONSE_SERVER_EXCEPTION
+} amqp_response_type_enum;
+
+typedef struct amqp_rpc_reply_t_ {
+ amqp_response_type_enum reply_type;
+ amqp_method_t reply;
+ int library_errno; /* if AMQP_RESPONSE_LIBRARY_EXCEPTION, then 0 here means socket EOF */
+} amqp_rpc_reply_t;
-#define AMQP_FRAME_COMPLETE 0
-#define AMQP_FRAME_INCOMPLETE 1
+typedef enum amqp_sasl_method_enum_ {
+ AMQP_SASL_METHOD_PLAIN = 0
+} amqp_sasl_method_enum;
/* Opaque struct. */
typedef struct amqp_connection_state_t_ *amqp_connection_state_t;
@@ -88,8 +99,13 @@ extern void recycle_amqp_pool(amqp_pool_t *pool);
extern void empty_amqp_pool(amqp_pool_t *pool);
extern void *amqp_pool_alloc(amqp_pool_t *pool, size_t amount);
+extern void amqp_pool_alloc_bytes(amqp_pool_t *pool, size_t amount, amqp_bytes_t *output);
+
+extern amqp_bytes_t amqp_cstring_bytes(char const *cstr);
extern amqp_connection_state_t amqp_new_connection(void);
+extern void amqp_set_sockfd(amqp_connection_state_t state,
+ int sockfd);
extern void amqp_tune_connection(amqp_connection_state_t state,
int frame_max);
extern void amqp_destroy_connection(amqp_connection_state_t state);
@@ -98,21 +114,55 @@ extern int amqp_handle_input(amqp_connection_state_t state,
amqp_bytes_t received_data,
amqp_frame_t *decoded_frame);
+extern amqp_boolean_t amqp_release_buffers_ok(amqp_connection_state_t state);
+
+extern void amqp_release_buffers(amqp_connection_state_t state);
+
+extern void amqp_maybe_release_buffers(amqp_connection_state_t state);
+
extern int amqp_send_frame(amqp_connection_state_t state,
- amqp_writer_fun_t writer,
- int context,
amqp_frame_t const *frame);
extern int amqp_table_entry_cmp(void const *entry1, void const *entry2);
extern int amqp_open_socket(char const *hostname, int portnumber);
-extern int amqp_send_header(amqp_writer_fun_t writer, int context);
+extern int amqp_send_header(amqp_connection_state_t state);
+
+extern amqp_boolean_t amqp_frames_enqueued(amqp_connection_state_t state);
extern int amqp_simple_wait_frame(amqp_connection_state_t state,
- int sockfd,
amqp_frame_t *decoded_frame);
+extern int amqp_simple_wait_method(amqp_connection_state_t state,
+ amqp_method_number_t expected_or_zero,
+ amqp_method_t *output);
+
+extern int amqp_send_method(amqp_connection_state_t state,
+ amqp_channel_t channel,
+ amqp_method_number_t id,
+ void *decoded);
+
+extern amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state,
+ amqp_channel_t channel,
+ amqp_method_number_t request_id,
+ amqp_method_number_t expected_reply_id,
+ void *decoded_request_method);
+
+extern amqp_rpc_reply_t amqp_login(amqp_connection_state_t state,
+ char const *vhost,
+ int frame_max,
+ amqp_sasl_method_enum sasl_method, ...);
+
+struct amqp_basic_properties_t_;
+extern int amqp_basic_publish(amqp_connection_state_t state,
+ amqp_bytes_t exchange,
+ amqp_bytes_t routing_key,
+ amqp_boolean_t mandatory,
+ amqp_boolean_t immediate,
+ struct amqp_basic_properties_t_ const *properties,
+ amqp_bytes_t body);
+
#ifdef __cplusplus
}
#endif
diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c
index 3100f7e..57db2c3 100644
--- a/librabbitmq/amqp_connection.c
+++ b/librabbitmq/amqp_connection.c
@@ -4,40 +4,83 @@
#include <stdint.h>
#include <errno.h>
+#include <unistd.h>
+#include <sys/uio.h>
+
#include "amqp.h"
#include "amqp_framing.h"
#include "amqp_private.h"
-#define INITIAL_FRAME_MAX 65536
+#include <assert.h>
+
+#define INITIAL_FRAME_POOL_PAGE_SIZE 65536
+#define INITIAL_DECODING_POOL_PAGE_SIZE 131072
+#define INITIAL_INBOUND_SOCK_BUFFER_SIZE 131072
+
+#define ENFORCE_STATE(statevec, statenum) \
+ { \
+ amqp_connection_state_t _check_state = (statevec); \
+ int _wanted_state = (statenum); \
+ amqp_assert(_check_state->state == _wanted_state, \
+ "Programming error: invalid AMQP connection state: expected %d, got %d", \
+ _wanted_state, \
+ _check_state->state); \
+ }
amqp_connection_state_t amqp_new_connection(void) {
amqp_connection_state_t state =
(amqp_connection_state_t) calloc(1, sizeof(struct amqp_connection_state_t_));
- init_amqp_pool(&state->pool, INITIAL_FRAME_MAX);
- state->inbound_buffer.len = INITIAL_FRAME_MAX;
- state->inbound_buffer.bytes = malloc(INITIAL_FRAME_MAX);
- state->outbound_buffer.len = INITIAL_FRAME_MAX;
- state->outbound_buffer.bytes = malloc(INITIAL_FRAME_MAX);
- state->frame_size_target = 0;
- state->reset_required = 0;
+
+ init_amqp_pool(&state->frame_pool, INITIAL_FRAME_POOL_PAGE_SIZE);
+ init_amqp_pool(&state->decoding_pool, INITIAL_DECODING_POOL_PAGE_SIZE);
+
+ state->state = CONNECTION_STATE_IDLE;
+
+ state->inbound_buffer.bytes = NULL;
+ state->outbound_buffer.bytes = NULL;
+ amqp_tune_connection(state, INITIAL_FRAME_POOL_PAGE_SIZE);
+
+ state->inbound_offset = 0;
+ state->target_size = HEADER_SIZE;
+
+ state->sockfd = -1;
+ state->sock_inbound_buffer.len = INITIAL_INBOUND_SOCK_BUFFER_SIZE;
+ state->sock_inbound_buffer.bytes = malloc(INITIAL_INBOUND_SOCK_BUFFER_SIZE);
+ state->sock_inbound_offset = 0;
+ state->sock_inbound_limit = 0;
+
+ state->first_queued_frame = NULL;
+ state->last_queued_frame = NULL;
+
return state;
}
+void amqp_set_sockfd(amqp_connection_state_t state,
+ int sockfd)
+{
+ state->sockfd = sockfd;
+}
+
void amqp_tune_connection(amqp_connection_state_t state,
int frame_max)
{
- empty_amqp_pool(&state->pool);
- init_amqp_pool(&state->pool, frame_max);
+ ENFORCE_STATE(state, CONNECTION_STATE_IDLE);
+
+ state->frame_max = frame_max;
+
+ empty_amqp_pool(&state->frame_pool);
+ init_amqp_pool(&state->frame_pool, frame_max);
+
state->inbound_buffer.len = frame_max;
- realloc(state->inbound_buffer.bytes, frame_max);
state->outbound_buffer.len = frame_max;
- realloc(state->outbound_buffer.bytes, frame_max);
+ state->outbound_buffer.bytes = realloc(state->outbound_buffer.bytes, frame_max);
}
void amqp_destroy_connection(amqp_connection_state_t state) {
- empty_amqp_pool(&state->pool);
- free(state->inbound_buffer.bytes);
+ empty_amqp_pool(&state->frame_pool);
+ empty_amqp_pool(&state->decoding_pool);
free(state->outbound_buffer.bytes);
+ free(state->sock_inbound_buffer.bytes);
free(state);
}
@@ -45,90 +88,140 @@ int amqp_handle_input(amqp_connection_state_t state,
amqp_bytes_t received_data,
amqp_frame_t *decoded_frame)
{
- if (state->reset_required) {
- size_t unconsumed_byte_count = state->frame_offset - state->frame_size_target;
- recycle_amqp_pool(&state->pool);
- memmove(state->inbound_buffer.bytes,
- D_BYTES(state->inbound_buffer,
- state->frame_size_target,
- unconsumed_byte_count),
- unconsumed_byte_count);
- state->frame_offset = unconsumed_byte_count;
- state->frame_size_target = 0;
- state->reset_required = 0;
- }
-
- E_BYTES(state->inbound_buffer, state->frame_offset, received_data.len, received_data.bytes);
- state->frame_offset += received_data.len;
+ int total_bytes_consumed = 0;
+ int bytes_consumed;
- if (state->frame_size_target == 0) {
- if (state->frame_offset < 7) {
- return AMQP_FRAME_INCOMPLETE;
- }
+ read_more:
+ if (received_data.len == 0) {
+ return total_bytes_consumed;
+ }
- state->frame_size_target = D_32(state->inbound_buffer, 3) + 8; /* 7 for header, 1 for footer */
+ if (state->state == CONNECTION_STATE_IDLE) {
+ state->inbound_buffer.bytes = amqp_pool_alloc(&state->frame_pool, state->inbound_buffer.len);
+ state->state = CONNECTION_STATE_WAITING_FOR_HEADER;
}
- if (state->frame_offset < state->frame_size_target) {
- return AMQP_FRAME_INCOMPLETE;
+ bytes_consumed = state->target_size - state->inbound_offset;
+ if (received_data.len < bytes_consumed) {
+ bytes_consumed = received_data.len;
}
- if (D_8(state->inbound_buffer, state->frame_size_target - 1) != AMQP_FRAME_END) {
- return -EINVAL;
+ E_BYTES(state->inbound_buffer, state->inbound_offset, bytes_consumed, received_data.bytes);
+ state->inbound_offset += bytes_consumed;
+ total_bytes_consumed += bytes_consumed;
+
+ assert(state->inbound_offset <= state->target_size);
+
+ if (state->inbound_offset < state->target_size) {
+ return total_bytes_consumed;
}
- state->reset_required = 1;
+ switch (state->state) {
+ case CONNECTION_STATE_WAITING_FOR_HEADER:
+ state->target_size = D_32(state->inbound_buffer, 3) + HEADER_SIZE + FOOTER_SIZE;
+ state->state = CONNECTION_STATE_WAITING_FOR_BODY;
- decoded_frame->frame_type = D_8(state->inbound_buffer, 0);
- decoded_frame->channel = D_16(state->inbound_buffer, 1);
- switch (decoded_frame->frame_type) {
- case AMQP_FRAME_METHOD: {
- amqp_bytes_t encoded;
- int decode_result;
+ /* Wind buffer forward, and try to read some body out of it. */
+ received_data.len -= bytes_consumed;
+ received_data.bytes = ((char *) received_data.bytes) + bytes_consumed;
+ goto read_more;
- encoded.len = state->frame_size_target - 12; /* 7 for header, 4 for method id, 1 for footer */
- encoded.bytes = D_BYTES(state->inbound_buffer, 11, encoded.len);
+ case CONNECTION_STATE_WAITING_FOR_BODY: {
+ int frame_type = D_8(state->inbound_buffer, 0);
- decoded_frame->payload.method.id = D_32(state->inbound_buffer, 7);
- decode_result = amqp_decode_method(decoded_frame->payload.method.id,
- &state->pool,
- encoded,
- &decoded_frame->payload.method.decoded);
- if (decode_result != 0) return decode_result;
+ printf("recving:\n");
+ amqp_dump(state->inbound_buffer.bytes, state->target_size);
- return AMQP_FRAME_COMPLETE;
- }
- case AMQP_FRAME_HEADER: {
- amqp_bytes_t encoded;
- int decode_result;
+ /* Check frame end marker (footer) */
+ if (D_8(state->inbound_buffer, state->target_size - 1) != AMQP_FRAME_END) {
+ return -EINVAL;
+ }
- encoded.len = state->frame_size_target - 20; /* 7 for header, 12 for prop hdr, 1 for footer */
- encoded.bytes = D_BYTES(state->inbound_buffer, 19, encoded.len);
+ decoded_frame->channel = D_16(state->inbound_buffer, 1);
- decoded_frame->payload.properties.class_id = D_16(state->inbound_buffer, 7);
- decode_result = amqp_decode_properties(decoded_frame->payload.properties.class_id,
- &state->pool,
- encoded,
- &decoded_frame->payload.properties.decoded);
- if (decode_result != 0) return decode_result;
+ switch (frame_type) {
+ case AMQP_FRAME_METHOD: {
+ amqp_bytes_t encoded;
- return AMQP_FRAME_COMPLETE;
- }
- case AMQP_FRAME_BODY: {
- size_t fragment_len = state->frame_size_target - 8; /* 7 for header, 1 for footer */
- decoded_frame->payload.body_fragment.len = fragment_len;
- decoded_frame->payload.body_fragment.bytes = D_BYTES(state->inbound_buffer, 7, fragment_len);
- return AMQP_FRAME_COMPLETE;
+ /* Four bytes of method ID before the method args. */
+ encoded.len = state->target_size - (HEADER_SIZE + 4 + FOOTER_SIZE);
+ encoded.bytes = D_BYTES(state->inbound_buffer, HEADER_SIZE + 4, encoded.len);
+
+ decoded_frame->frame_type = AMQP_FRAME_METHOD;
+ decoded_frame->payload.method.id = D_32(state->inbound_buffer, HEADER_SIZE);
+ AMQP_CHECK_RESULT(amqp_decode_method(decoded_frame->payload.method.id,
+ &state->decoding_pool,
+ encoded,
+ &decoded_frame->payload.method.decoded));
+ break;
+ }
+
+ case AMQP_FRAME_HEADER: {
+ amqp_bytes_t encoded;
+
+ /* 12 bytes for properties header. */
+ encoded.len = state->target_size - (HEADER_SIZE + 12 + FOOTER_SIZE);
+ encoded.bytes = D_BYTES(state->inbound_buffer, HEADER_SIZE + 12, encoded.len);
+
+ decoded_frame->frame_type = AMQP_FRAME_HEADER;
+ decoded_frame->payload.properties.class_id = D_16(state->inbound_buffer, HEADER_SIZE);
+ decoded_frame->payload.properties.body_size = D_64(state->inbound_buffer, HEADER_SIZE+4);
+ AMQP_CHECK_RESULT(amqp_decode_properties(decoded_frame->payload.properties.class_id,
+ &state->decoding_pool,
+ encoded,
+ &decoded_frame->payload.properties.decoded));
+ break;
+ }
+
+ case AMQP_FRAME_BODY: {
+ size_t fragment_len = state->target_size - (HEADER_SIZE + FOOTER_SIZE);
+
+ decoded_frame->frame_type = AMQP_FRAME_BODY;
+ decoded_frame->payload.body_fragment.len = fragment_len;
+ decoded_frame->payload.body_fragment.bytes =
+ D_BYTES(state->inbound_buffer, HEADER_SIZE, fragment_len);
+ break;
+ }
+
+ default:
+ /* Ignore the frame */
+ decoded_frame->frame_type = 0;
+ break;
+ }
+
+ state->inbound_buffer.bytes = NULL;
+ state->inbound_offset = 0;
+ state->target_size = HEADER_SIZE;
+ state->state = CONNECTION_STATE_IDLE;
+ return total_bytes_consumed;
}
+
default:
- /* Ignore the frame */
- return AMQP_FRAME_INCOMPLETE;
+ amqp_assert(0, "Internal error: invalid amqp_connection_state_t->state %d", state->state);
+ }
+}
+
+amqp_boolean_t amqp_release_buffers_ok(amqp_connection_state_t state) {
+ return (state->state == CONNECTION_STATE_IDLE) && (state->first_queued_frame == NULL);
+}
+
+void amqp_release_buffers(amqp_connection_state_t state) {
+ ENFORCE_STATE(state, CONNECTION_STATE_IDLE);
+
+ amqp_assert(state->first_queued_frame == NULL,
+ "Programming error: attempt to amqp_release_buffers while waiting events enqueued");
+
+ recycle_amqp_pool(&state->frame_pool);
+ recycle_amqp_pool(&state->decoding_pool);
+}
+
+void amqp_maybe_release_buffers(amqp_connection_state_t state) {
+ if (amqp_release_buffers_ok(state)) {
+ amqp_release_buffers(state);
}
}
int amqp_send_frame(amqp_connection_state_t state,
- amqp_writer_fun_t writer,
- int context,
amqp_frame_t const *frame)
{
amqp_bytes_t encoded;
@@ -139,20 +232,24 @@ int amqp_send_frame(amqp_connection_state_t state,
E_16(state->outbound_buffer, 1, frame->channel);
switch (frame->frame_type) {
case AMQP_FRAME_METHOD: {
- encoded.len = state->outbound_buffer.len - 8;
- encoded.bytes = D_BYTES(state->outbound_buffer, 7, encoded.len);
- payload_len = amqp_encode_method(frame->payload.method.id,
- frame->payload.method.decoded,
- encoded);
+ E_32(state->outbound_buffer, HEADER_SIZE, frame->payload.method.id);
+ encoded.len = state->outbound_buffer.len - (HEADER_SIZE + 4 + FOOTER_SIZE);
+ encoded.bytes = D_BYTES(state->outbound_buffer, HEADER_SIZE + 4, encoded.len);
+ payload_len = AMQP_CHECK_RESULT(amqp_encode_method(frame->payload.method.id,
+ frame->payload.method.decoded,
+ encoded)) + 4;
separate_body = 0;
break;
}
case AMQP_FRAME_HEADER: {
- encoded.len = state->outbound_buffer.len - 8;
- encoded.bytes = D_BYTES(state->outbound_buffer, 7, encoded.len);
- payload_len = amqp_encode_properties(frame->payload.properties.class_id,
- frame->payload.properties.decoded,
- encoded);
+ E_16(state->outbound_buffer, HEADER_SIZE, frame->payload.properties.class_id);
+ E_16(state->outbound_buffer, HEADER_SIZE+2, 0); /* "weight" */
+ E_64(state->outbound_buffer, HEADER_SIZE+4, frame->payload.properties.body_size);
+ encoded.len = state->outbound_buffer.len - (HEADER_SIZE + 12 + FOOTER_SIZE);
+ encoded.bytes = D_BYTES(state->outbound_buffer, HEADER_SIZE + 12, encoded.len);
+ payload_len = AMQP_CHECK_RESULT(amqp_encode_properties(frame->payload.properties.class_id,
+ frame->payload.properties.decoded,
+ encoded)) + 12;
separate_body = 0;
break;
}
@@ -166,21 +263,27 @@ int amqp_send_frame(amqp_connection_state_t state,
return -EINVAL;
}
- if (payload_len < 0) {
- return payload_len;
- }
-
E_32(state->outbound_buffer, 3, payload_len);
if (separate_body) {
- int result = writer(context, state->outbound_buffer.bytes, 7);
char frame_end_byte = AMQP_FRAME_END;
- if (result < 0) return result;
- result = writer(context, encoded.bytes, payload_len);
- if (result < 0) return result;
- return writer(context, &frame_end_byte, 1);
+ printf("sending body frame (header):\n");
+ amqp_dump(state->outbound_buffer.bytes, HEADER_SIZE);
+ AMQP_CHECK_RESULT(write(state->sockfd, state->outbound_buffer.bytes, HEADER_SIZE));
+ printf("sending body frame (payload):\n");
+ amqp_dump(encoded.bytes, payload_len);
+ AMQP_CHECK_RESULT(write(state->sockfd, encoded.bytes, payload_len));
+ printf("sending body frame (footer).\n");
+ assert(FOOTER_SIZE == 1);
+ AMQP_CHECK_RESULT(write(state->sockfd, &frame_end_byte, FOOTER_SIZE));
} else {
- E_8(state->outbound_buffer, payload_len + 7, AMQP_FRAME_END);
- return writer(context, state->outbound_buffer.bytes, payload_len + 8);
+ E_8(state->outbound_buffer, payload_len + HEADER_SIZE, AMQP_FRAME_END);
+ printf("sending:\n");
+ amqp_dump(state->outbound_buffer.bytes, payload_len + HEADER_SIZE + FOOTER_SIZE);
+ AMQP_CHECK_RESULT(write(state->sockfd,
+ state->outbound_buffer.bytes,
+ payload_len + (HEADER_SIZE + FOOTER_SIZE)));
}
+
+ return 0;
}
diff --git a/librabbitmq/amqp_debug.c b/librabbitmq/amqp_debug.c
new file mode 100644
index 0000000..64db9ec
--- /dev/null
+++ b/librabbitmq/amqp_debug.c
@@ -0,0 +1,83 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <ctype.h>
+
+static void dump_row(long count, int numinrow, int *chs) {
+ int i;
+
+ printf("%08lX:", count - numinrow);
+
+ if (numinrow > 0) {
+ for (i = 0; i < numinrow; i++) {
+ if (i == 8)
+ printf(" :");
+ printf(" %02X", chs[i]);
+ }
+ for (i = numinrow; i < 16; i++) {
+ if (i == 8)
+ printf(" :");
+ printf(" ");
+ }
+ printf(" ");
+ for (i = 0; i < numinrow; i++) {
+ if (isprint(chs[i]))
+ printf("%c", chs[i]);
+ else
+ printf(".");
+ }
+ }
+ printf("\n");
+}
+
+static int rows_eq(int *a, int *b) {
+ int i;
+
+ for (i=0; i<16; i++)
+ if (a[i] != b[i])
+ return 0;
+
+ return 1;
+}
+
+void amqp_dump(void const *buffer, size_t len) {
+ unsigned char *buf = (unsigned char *) buffer;
+ long count = 0;
+ int numinrow = 0;
+ int chs[16];
+ int oldchs[16];
+ int showed_dots = 0;
+ int i;
+
+ for (i = 0; i < len; i++) {
+ int ch = buf[i];
+
+ if (numinrow == 16) {
+ int i;
+
+ if (rows_eq(oldchs, chs)) {
+ if (!showed_dots) {
+ showed_dots = 1;
+ printf(" .. .. .. .. .. .. .. .. : .. .. .. .. .. .. .. ..\n");
+ }
+ } else {
+ showed_dots = 0;
+ dump_row(count, numinrow, chs);
+ }
+
+ for (i=0; i<16; i++)
+ oldchs[i] = chs[i];
+
+ numinrow = 0;
+ }
+
+ count++;
+ chs[numinrow++] = ch;
+ }
+
+ dump_row(count, numinrow, chs);
+
+ if (numinrow != 0)
+ printf("%08lX:\n", count);
+}
diff --git a/librabbitmq/amqp_mem.c b/librabbitmq/amqp_mem.c
index 64d06de..e0795d2 100644
--- a/librabbitmq/amqp_mem.c
+++ b/librabbitmq/amqp_mem.c
@@ -100,3 +100,16 @@ void *amqp_pool_alloc(amqp_pool_t *pool, size_t amount) {
return pool->alloc_block;
}
+
+void amqp_pool_alloc_bytes(amqp_pool_t *pool, size_t amount, amqp_bytes_t *output) {
+ output->len = amount;
+ output->bytes = amqp_pool_alloc(pool, amount);
+}
+
+amqp_bytes_t amqp_cstring_bytes(char const *cstr) {
+ amqp_bytes_t result;
+ result.len = strlen(cstr);
+ result.bytes = (void *) cstr;
+ return result;
+}
+
diff --git a/librabbitmq/amqp_private.h b/librabbitmq/amqp_private.h
index 7a874e2..6f9d508 100644
--- a/librabbitmq/amqp_private.h
+++ b/librabbitmq/amqp_private.h
@@ -5,13 +5,62 @@
extern "C" {
#endif
+/*
+ * Connection states:
+ *
+ * - CONNECTION_STATE_IDLE: initial state, and entered again after
+ * each frame is completed. Means that no bytes of the next frame
+ * have been seen yet. Connections may only be reconfigured, and the
+ * connection's pools recycled, when in this state. Whenever we're
+ * in this state, the inbound_buffer's bytes pointer must be NULL;
+ * any other state, and it must point to a block of memory allocated
+ * from the frame_pool.
+ *
+ * - CONNECTION_STATE_WAITING_FOR_HEADER: Some bytes of an incoming
+ * frame have been seen, but not a complete frame header's worth.
+ *
+ * - CONNECTION_STATE_WAITING_FOR_BODY: A complete frame header has
+ * been seen, but the frame is not yet complete. When it is
+ * completed, it will be returned, and the connection will return to
+ * IDLE state.
+ *
+ */
+typedef enum amqp_connection_state_enum_ {
+ CONNECTION_STATE_IDLE = 0,
+ CONNECTION_STATE_WAITING_FOR_HEADER,
+ CONNECTION_STATE_WAITING_FOR_BODY
+} amqp_connection_state_enum;
+
+/* 7 bytes up front, then payload, then 1 byte footer */
+#define HEADER_SIZE 7
+#define FOOTER_SIZE 1
+
+typedef struct amqp_link_t_ {
+ struct amqp_link_t_ *next;
+ void *data;
+} amqp_link_t;
+
struct amqp_connection_state_t_ {
- amqp_pool_t pool;
+ amqp_pool_t frame_pool;
+ amqp_pool_t decoding_pool;
+
+ amqp_connection_state_enum state;
+
+ int frame_max;
amqp_bytes_t inbound_buffer;
+
+ size_t inbound_offset;
+ size_t target_size;
+
amqp_bytes_t outbound_buffer;
- size_t frame_offset;
- size_t frame_size_target; /* 0 for unknown, still waiting for header */
- amqp_boolean_t reset_required;
+
+ int sockfd;
+ amqp_bytes_t sock_inbound_buffer;
+ size_t sock_inbound_offset;
+ size_t sock_inbound_limit;
+
+ amqp_link_t *first_queued_frame;
+ amqp_link_t *last_queued_frame;
};
#define CHECK_LIMIT(b, o, l, v) ({ if ((o + l) > (b).len) { return -EFAULT; } (v); })
@@ -31,10 +80,10 @@ struct amqp_connection_state_t_ {
#define E_8(b, o, v) CHECK_LIMIT(b, o, 1, * (uint8_t *) BUF_AT(b, o) = (v))
#define E_16(b, o, v) CHECK_LIMIT(b, o, 2, ({uint16_t vv = htons(v); memcpy(BUF_AT(b, o), &vv, 2);}))
#define E_32(b, o, v) CHECK_LIMIT(b, o, 4, ({uint32_t vv = htonl(v); memcpy(BUF_AT(b, o), &vv, 4);}))
-#define E_64(b, o, v) ({ \
- E_32(b, o, (uint32_t) (v >> 32)); \
- E_32(b, o, (uint32_t) (v & 0xFFFFFFFF)); \
-})
+#define E_64(b, o, v) ({ \
+ E_32(b, o, (uint32_t) (((uint64_t) v) >> 32)); \
+ E_32(b, o + 4, (uint32_t) (((uint64_t) v) & 0xFFFFFFFF)); \
+ })
#define E_BYTES(b, o, l, v) CHECK_LIMIT(b, o, l, memcpy(BUF_AT(b, o), (v), (l)))
@@ -47,6 +96,35 @@ extern int amqp_encode_table(amqp_bytes_t encoded,
amqp_table_t *input,
int *offsetptr);
+#define amqp_assert(condition, ...) \
+ ({ \
+ if (!(condition)) { \
+ fprintf(stderr, __VA_ARGS__); \
+ fputc('\n', stderr); \
+ abort(); \
+ } \
+ })
+
+#define AMQP_CHECK_RESULT(expr) \
+ ({ \
+ int _result = (expr); \
+ if (_result < 0) return _result; \
+ _result; \
+ })
+
+#define AMQP_CHECK_EOF_RESULT(expr) \
+ ({ \
+ int _result = (expr); \
+ if (_result <= 0) return _result; \
+ _result; \
+ })
+
+#ifndef NDEBUG
+extern void amqp_dump(void const *buffer, size_t len);
+#else
+#define amqp_dump(buffer, len) ((void) 0)
+#endif
+
#ifdef __cplusplus
}
#endif
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index 32af3e9..20686c4 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -3,9 +3,11 @@
#include <string.h>
#include <stdint.h>
#include <errno.h>
+#include <stdarg.h>
#include "amqp.h"
#include "amqp_framing.h"
+#include "amqp_private.h"
#include <sys/types.h>
#include <sys/uio.h>
@@ -14,6 +16,8 @@
#include <netdb.h>
#include <netinet/in.h>
+#include <assert.h>
+
int amqp_open_socket(char const *hostname,
int portnumber)
{
@@ -40,7 +44,7 @@ int amqp_open_socket(char const *hostname,
return sockfd;
}
-int amqp_send_header(amqp_writer_fun_t writer, int context) {
+int amqp_send_header(amqp_connection_state_t state) {
char header[8];
header[0] = 'A';
header[1] = 'M';
@@ -50,29 +54,367 @@ int amqp_send_header(amqp_writer_fun_t writer, int context) {
header[5] = 1;
header[6] = AMQP_PROTOCOL_VERSION_MAJOR;
header[7] = AMQP_PROTOCOL_VERSION_MINOR;
- return writer(context, &header[0], 8);
+ return write(state->sockfd, &header[0], 8);
}
-int amqp_simple_wait_frame(amqp_connection_state_t state,
- int sockfd,
- amqp_frame_t *decoded_frame)
+static amqp_bytes_t sasl_method_name(amqp_sasl_method_enum method) {
+ switch (method) {
+ case AMQP_SASL_METHOD_PLAIN: return (amqp_bytes_t) {.len = 5, .bytes = "PLAIN"};
+ default:
+ amqp_assert(0, "Invalid SASL method: %d", (int) method);
+ }
+ abort(); // unreachable
+}
+
+static amqp_bytes_t sasl_response(amqp_pool_t *pool,
+ amqp_sasl_method_enum method,
+ va_list args)
+{
+ amqp_bytes_t response;
+
+ switch (method) {
+ case AMQP_SASL_METHOD_PLAIN: {
+ char *username = va_arg(args, char *);
+ size_t username_len = strlen(username);
+ char *password = va_arg(args, char *);
+ size_t password_len = strlen(password);
+ amqp_pool_alloc_bytes(pool, strlen(username) + strlen(password) + 2, &response);
+ *BUF_AT(response, 0) = 0;
+ memcpy(((char *) response.bytes) + 1, username, username_len);
+ *BUF_AT(response, username_len + 1) = 0;
+ memcpy(((char *) response.bytes) + username_len + 2, password, password_len);
+ break;
+ }
+ default:
+ amqp_assert(0, "Invalid SASL method: %d", (int) method);
+ }
+
+ return response;
+}
+
+amqp_boolean_t amqp_frames_enqueued(amqp_connection_state_t state) {
+ return (state->first_queued_frame != NULL);
+}
+
+static int wait_frame_inner(amqp_connection_state_t state,
+ amqp_frame_t *decoded_frame)
{
- amqp_bytes_t buffer;
- char buffer_bytes[4096];
- buffer.bytes = buffer_bytes;
while (1) {
- int result = read(sockfd, buffer_bytes, sizeof(buffer_bytes));
+ int result;
+
+ while (state->sock_inbound_offset < state->sock_inbound_limit) {
+ amqp_bytes_t buffer;
+ buffer.len = state->sock_inbound_limit - state->sock_inbound_offset;
+ buffer.bytes = ((char *) state->sock_inbound_buffer.bytes) + state->sock_inbound_offset;
+ AMQP_CHECK_RESULT((result = amqp_handle_input(state, buffer, decoded_frame)));
+ state->sock_inbound_offset += result;
+
+ if (decoded_frame->frame_type != 0) {
+ /* Complete frame was read. Return it. */
+ return 1;
+ }
+
+ /* Incomplete or ignored frame. Keep processing input. */
+ assert(result != 0);
+ }
+
+ result = read(state->sockfd,
+ state->sock_inbound_buffer.bytes,
+ state->sock_inbound_buffer.len);
if (result < 0) {
return -errno;
}
- buffer.len = result;
- switch ((result = amqp_handle_input(state, buffer, decoded_frame))) {
- case AMQP_FRAME_COMPLETE:
- return AMQP_FRAME_COMPLETE;
- case AMQP_FRAME_INCOMPLETE:
- break;
- default:
- return result;
+ if (result == 0) {
+ /* EOF. */
+ return 0;
+ }
+
+ state->sock_inbound_limit = result;
+ state->sock_inbound_offset = 0;
+ }
+}
+
+int amqp_simple_wait_frame(amqp_connection_state_t state,
+ amqp_frame_t *decoded_frame)
+{
+ if (state->first_queued_frame != NULL) {
+ amqp_frame_t *f = (amqp_frame_t *) state->first_queued_frame->data;
+ state->first_queued_frame = state->first_queued_frame->next;
+ if (state->first_queued_frame == NULL) {
+ state->last_queued_frame = NULL;
+ }
+ *decoded_frame = *f;
+ return 1;
+ } else {
+ return wait_frame_inner(state, decoded_frame);
+ }
+}
+
+int amqp_simple_wait_method(amqp_connection_state_t state,
+ amqp_method_number_t expected_or_zero,
+ amqp_method_t *output)
+{
+ amqp_frame_t frame;
+
+ AMQP_CHECK_EOF_RESULT(amqp_simple_wait_frame(state, &frame));
+ amqp_assert(frame.frame_type == AMQP_FRAME_METHOD,
+ "Expected 0x%08X method frame", expected_or_zero);
+ amqp_assert((expected_or_zero == 0) || (frame.payload.method.id == expected_or_zero),
+ "Expected method ID 0x%08X", expected_or_zero);
+ *output = frame.payload.method;
+ return 1;
+}
+
+int amqp_send_method(amqp_connection_state_t state,
+ amqp_channel_t channel,
+ amqp_method_number_t id,
+ void *decoded)
+{
+ amqp_frame_t frame;
+
+ frame.frame_type = AMQP_FRAME_METHOD;
+ frame.channel = channel;
+ frame.payload.method.id = id;
+ frame.payload.method.decoded = decoded;
+ return amqp_send_frame(state, &frame);
+}
+
+amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state,
+ amqp_channel_t channel,
+ amqp_method_number_t request_id,
+ amqp_method_number_t expected_reply_id,
+ void *decoded_request_method)
+{
+ int status;
+ amqp_rpc_reply_t result;
+
+ memset(&result, 0, sizeof(result));
+
+ status = amqp_send_method(state, channel, request_id, decoded_request_method);
+ if (status < 0) {
+ result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
+ result.library_errno = -status;
+ return result;
+ }
+
+ {
+ amqp_frame_t frame;
+
+ retry:
+ status = wait_frame_inner(state, &frame);
+ if (status <= 0) {
+ result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
+ result.library_errno = -status;
+ return result;
+ }
+
+ if (!((frame.frame_type == AMQP_FRAME_METHOD) &&
+ (frame.channel == channel) &&
+ ((frame.payload.method.id == expected_reply_id) ||
+ (frame.payload.method.id == AMQP_CONNECTION_CLOSE_METHOD) ||
+ (frame.payload.method.id == AMQP_CHANNEL_CLOSE_METHOD))))
+ {
+ amqp_frame_t *frame_copy = amqp_pool_alloc(&state->decoding_pool, sizeof(amqp_frame_t));
+ amqp_link_t *link = amqp_pool_alloc(&state->decoding_pool, sizeof(amqp_link_t));
+
+ *frame_copy = frame;
+
+ link->next = NULL;
+ link->data = frame_copy;
+
+ if (state->last_queued_frame == NULL) {
+ state->first_queued_frame = link;
+ } else {
+ state->last_queued_frame->next = link;
+ }
+ state->last_queued_frame = link;
+
+ goto retry;
+ }
+
+ result.reply_type = (frame.payload.method.id == expected_reply_id)
+ ? AMQP_RESPONSE_NORMAL
+ : AMQP_RESPONSE_SERVER_EXCEPTION;
+
+ result.reply = frame.payload.method;
+ return result;
+ }
+}
+
+static int amqp_login_inner(amqp_connection_state_t state,
+ int frame_max,
+ amqp_sasl_method_enum sasl_method,
+ va_list vl)
+{
+ amqp_method_t method;
+ uint32_t server_frame_max;
+
+ amqp_send_header(state);
+
+ AMQP_CHECK_EOF_RESULT(amqp_simple_wait_method(state, AMQP_CONNECTION_START_METHOD, &method));
+ {
+ amqp_connection_start_t *s = (amqp_connection_start_t *) method.decoded;
+ if ((s->version_major != AMQP_PROTOCOL_VERSION_MAJOR) ||
+ (s->version_minor != AMQP_PROTOCOL_VERSION_MINOR)) {
+ return -EPROTOTYPE;
+ }
+
+ /* TODO: check that our chosen SASL mechanism is in the list of
+ acceptable mechanisms. Or even let the application choose from
+ the list! */
+ }
+
+ {
+ amqp_bytes_t response_bytes = sasl_response(&state->decoding_pool, sasl_method, vl);
+ amqp_connection_start_ok_t s =
+ (amqp_connection_start_ok_t) {
+ .client_properties = {.num_entries = 0, .entries = NULL},
+ .mechanism = sasl_method_name(sasl_method),
+ .response = response_bytes,
+ .locale = {.len = 5, .bytes = "en_US"}
+ };
+ AMQP_CHECK_RESULT(amqp_send_method(state, 0, AMQP_CONNECTION_START_OK_METHOD, &s));
+ }
+
+ amqp_release_buffers(state);
+
+ AMQP_CHECK_EOF_RESULT(amqp_simple_wait_method(state, AMQP_CONNECTION_TUNE_METHOD, &method));
+ {
+ amqp_connection_tune_t *s = (amqp_connection_tune_t *) method.decoded;
+ server_frame_max = s->frame_max;
+ }
+
+ if (server_frame_max != 0 && server_frame_max < frame_max) {
+ frame_max = server_frame_max;
+ }
+
+ {
+ amqp_connection_tune_ok_t s =
+ (amqp_connection_tune_ok_t) {
+ .channel_max = 1,
+ .frame_max = frame_max,
+ .heartbeat = 0
+ };
+ AMQP_CHECK_RESULT(amqp_send_method(state, 0, AMQP_CONNECTION_TUNE_OK_METHOD, &s));
+ }
+
+ amqp_release_buffers(state);
+
+ return 1;
+}
+
+amqp_rpc_reply_t amqp_login(amqp_connection_state_t state,
+ char const *vhost,
+ int frame_max,
+ amqp_sasl_method_enum sasl_method,
+ ...)
+{
+ va_list vl;
+ amqp_rpc_reply_t result;
+
+ va_start(vl, sasl_method);
+
+ amqp_login_inner(state, frame_max, sasl_method, vl);
+
+ {
+ amqp_connection_open_t s =
+ (amqp_connection_open_t) {
+ .virtual_host = amqp_cstring_bytes(vhost),
+ .capabilities = {.len = 0, .bytes = NULL},
+ .insist = 1
+ };
+ result = amqp_simple_rpc(state,
+ 0,
+ AMQP_CONNECTION_OPEN_METHOD,
+ AMQP_CONNECTION_OPEN_OK_METHOD,
+ &s);
+ if (result.reply_type != AMQP_RESPONSE_NORMAL) {
+ return result;
+ }
+ }
+ amqp_maybe_release_buffers(state);
+
+ {
+ amqp_channel_open_t s =
+ (amqp_channel_open_t) {
+ .out_of_band = {.len = 0, .bytes = NULL}
+ };
+ result = amqp_simple_rpc(state,
+ 1,
+ AMQP_CHANNEL_OPEN_METHOD,
+ AMQP_CHANNEL_OPEN_OK_METHOD,
+ &s);
+ if (result.reply_type != AMQP_RESPONSE_NORMAL) {
+ return result;
+ }
+ }
+ amqp_maybe_release_buffers(state);
+
+ va_end(vl);
+
+ result.reply_type = AMQP_RESPONSE_NORMAL;
+ result.reply.id = 0;
+ result.reply.decoded = NULL;
+ result.library_errno = 0;
+ return result;
+}
+
+int amqp_basic_publish(amqp_connection_state_t state,
+ amqp_bytes_t exchange,
+ amqp_bytes_t routing_key,
+ amqp_boolean_t mandatory,
+ amqp_boolean_t immediate,
+ amqp_basic_properties_t const *properties,
+ amqp_bytes_t body)
+{
+ amqp_frame_t f;
+ size_t body_offset;
+ size_t usable_body_payload_size = state->frame_max - (HEADER_SIZE + FOOTER_SIZE);
+
+ amqp_basic_publish_t m =
+ (amqp_basic_publish_t) {
+ .exchange = exchange,
+ .routing_key = routing_key,
+ .mandatory = mandatory,
+ .immediate = immediate
+ };
+
+ amqp_basic_properties_t default_properties;
+
+ AMQP_CHECK_RESULT(amqp_send_method(state, 1, AMQP_BASIC_PUBLISH_METHOD, &m));
+
+ if (properties == NULL) {
+ memset(&default_properties, 0, sizeof(default_properties));
+ properties = &default_properties;
+ }
+
+ f.frame_type = AMQP_FRAME_HEADER;
+ f.channel = 1;
+ f.payload.properties.class_id = AMQP_BASIC_CLASS;
+ f.payload.properties.body_size = body.len;
+ f.payload.properties.decoded = (void *) properties;
+ AMQP_CHECK_RESULT(amqp_send_frame(state, &f));
+
+ body_offset = 0;
+ while (1) {
+ int remaining = body.len - body_offset;
+ assert(remaining >= 0);
+
+ if (remaining == 0)
+ break;
+
+ f.frame_type = AMQP_FRAME_BODY;
+ f.channel = 1;
+ f.payload.body_fragment.bytes = BUF_AT(body, body_offset);
+ if (remaining >= usable_body_payload_size) {
+ f.payload.body_fragment.len = usable_body_payload_size;
+ } else {
+ f.payload.body_fragment.len = remaining;
}
+
+ body_offset += f.payload.body_fragment.len;
+ AMQP_CHECK_RESULT(amqp_send_frame(state, &f));
}
+
+ return 0;
}
diff --git a/librabbitmq/amqp_table.c b/librabbitmq/amqp_table.c
index fac42d9..d8200e4 100644
--- a/librabbitmq/amqp_table.c
+++ b/librabbitmq/amqp_table.c
@@ -5,7 +5,6 @@
#include <errno.h>
#include "amqp.h"
-#include "../config.h"
#include "amqp_private.h"
#define INITIAL_TABLE_SIZE 16
@@ -66,10 +65,9 @@ int amqp_decode_table(amqp_bytes_t encoded,
entry->value.u64 = D_64(encoded, offset);
offset += 8;
break;
- case 'F': {
- int table_result = amqp_decode_table(encoded, pool, &(entry->value.table), &offset);
- if (table_result != 0) return table_result;
- }
+ case 'F':
+ AMQP_CHECK_RESULT(amqp_decode_table(encoded, pool, &(entry->value.table), &offset));
+ break;
default:
return -EINVAL;
}
@@ -128,10 +126,9 @@ int amqp_encode_table(amqp_bytes_t encoded,
E_64(encoded, offset, entry->value.u64);
offset += 8;
break;
- case 'F': {
- int table_result = amqp_encode_table(encoded, &(entry->value.table), &offset);
- if (table_result != 0) return table_result;
- }
+ case 'F':
+ AMQP_CHECK_RESULT(amqp_encode_table(encoded, &(entry->value.table), &offset));
+ break;
default:
return -EINVAL;
}
diff --git a/librabbitmq/codegen.py b/librabbitmq/codegen.py
index 474be78..9f6cde0 100644
--- a/librabbitmq/codegen.py
+++ b/librabbitmq/codegen.py
@@ -108,7 +108,7 @@ def genErl(spec):
elif type == 'table':
print prefix + "table_result = amqp_decode_table(encoded, pool, &(%s), &offset);" % \
(cLvalue,)
- print prefix + "if (table_result != 0) return table_result;"
+ print prefix + "AMQP_CHECK_RESULT(table_result);"
else:
raise "Illegal domain in genSingleDecode", type
@@ -144,7 +144,7 @@ def genErl(spec):
elif type == 'table':
print prefix + "table_result = amqp_encode_table(encoded, &(%s), &offset);" % \
(cValue,)
- print prefix + "if (table_result != 0) return table_result;"
+ print prefix + "if (table_result < 0) return table_result;"
else:
raise "Illegal domain in genSingleEncode", type
@@ -231,17 +231,6 @@ def genErl(spec):
print " return offset;"
print " }"
- def genLookupException(c,v,cls):
- # We do this because 0.8 uses "soft error" and 8.1 uses "soft-error".
- mCls = c_ize(cls).upper()
- if mCls == 'SOFT_ERROR': genLookupException1(c,'AMQP_EXCEPTION_CATEGORY_CHANNEL')
- elif mCls == 'HARD_ERROR': genLookupException1(c, 'AMQP_EXCEPTION_CATEGORY_CONNECTION')
- elif mCls == '': pass
- else: raise 'Unknown constant class', cls
-
- def genLookupException1(c, cCategory):
- print ' case %s: return %s;' % (cConstantName(c), cCategory)
-
methods = spec.allMethods()
print '#include <stdlib.h>'
@@ -340,14 +329,14 @@ int amqp_encode_properties(uint16_t class_id,
similarity of structure between classes */
amqp_flags_t flags = * (amqp_flags_t *) decoded; /* cheating! */
- while (flags != 0) {
+ do {
amqp_flags_t remainder = flags >> 16;
uint16_t partial_flags = flags & 0xFFFE;
if (remainder != 0) { partial_flags |= 1; }
E_16(encoded, offset, partial_flags);
offset += 2;
flags = remainder;
- }
+ } while (flags != 0);
switch (class_id) {"""
for c in spec.allClasses(): genEncodeProperties(c)
@@ -355,14 +344,6 @@ int amqp_encode_properties(uint16_t class_id,
}
}"""
- print """
-int amqp_exception_category(uint16_t code) {
- switch (code) {"""
- for (c,v,cls) in spec.constants: genLookupException(c,v,cls)
- print """ default: return 0;
- }
-}"""
-
def genHrl(spec):
def cType(domain):
return cTypeMap[spec.resolveDomain(domain)]
@@ -409,7 +390,6 @@ extern int amqp_encode_method(amqp_method_number_t methodNumber,
extern int amqp_encode_properties(uint16_t class_id,
void *decoded,
amqp_bytes_t encoded);
-extern int amqp_exception_category(uint16_t code);
"""
print "/* Method field records. */"
@@ -425,6 +405,8 @@ extern int amqp_exception_category(uint16_t code);
print "/* Class property records. */"
for c in spec.allClasses():
+ print "#define %s (0x%.04X) /* %d */" % \
+ (cConstantName(c.name + "_class"), c.index, c.index)
index = 0
for f in c.fields:
if index % 16 == 15:
@@ -434,8 +416,10 @@ extern int amqp_exception_category(uint16_t code);
bitindex = shortnum * 16 + partialindex
print '#define %s (1 << %d)' % (cFlagName(c, f), bitindex)
index = index + 1
- print "typedef struct {\n amqp_flags_t _flags;\n%s} %s;\n" % \
- (fieldDeclList(c.fields), c.structName())
+ print "typedef struct %s_ {\n amqp_flags_t _flags;\n%s} %s;\n" % \
+ (c.structName(),
+ fieldDeclList(c.fields),
+ c.structName())
print """#ifdef __cplusplus
}