summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTony Garnock-Jones <tonyg@kcbbs.gen.nz>2009-04-25 22:11:44 +0100
committerTony Garnock-Jones <tonyg@kcbbs.gen.nz>2009-04-25 22:11:44 +0100
commitaf9315c575b8788b28bb3c5d959a9462b12b9a15 (patch)
tree615ddf61ed58796bcbeb32dc9165f9c2fd925582
parent67970c9c56ebd49b57e61d50255b04fa1ac7d27d (diff)
downloadrabbitmq-c-github-ask-af9315c575b8788b28bb3c5d959a9462b12b9a15.tar.gz
More work
-rw-r--r--examples/amqp_sendstring.c46
-rw-r--r--librabbitmq/Makefile.am2
-rw-r--r--librabbitmq/amqp.h23
-rw-r--r--librabbitmq/amqp_connection.c186
-rw-r--r--librabbitmq/amqp_private.h6
-rw-r--r--librabbitmq/amqp_socket.c78
-rw-r--r--librabbitmq/codegen.py20
7 files changed, 352 insertions, 9 deletions
diff --git a/examples/amqp_sendstring.c b/examples/amqp_sendstring.c
index 9258f7f..bdf4285 100644
--- a/examples/amqp_sendstring.c
+++ b/examples/amqp_sendstring.c
@@ -3,10 +3,52 @@
#include <string.h>
#include <stdint.h>
-#include "amqp.h"
+#include <amqp.h>
+
+#include <unistd.h>
+
+static void die_on_error(int x, char const *context) {
+ if (x < 0) {
+ fprintf(stderr, "%s: %s\n", context, strerror(-x));
+ exit(1);
+ }
+}
int main(int argc, char const * const *argv) {
- amqp_connection_state_t conn = amqp_new_connection();
+ char const *hostname;
+ int port;
+ char const *exchange;
+ char const *routingkey;
+ char const *messagebody;
+
+ int sockfd;
+ amqp_connection_state_t conn;
+
+ if (argc < 6) {
+ fprintf(stderr, "Usage: amqp_sendstring host port exchange routingkey messagebody\n");
+ return 1;
+ }
+
+ hostname = argv[1];
+ port = atoi(argv[2]);
+ exchange = argv[3];
+ routingkey = argv[4];
+ messagebody = argv[5];
+
+ die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket");
+ amqp_send_header((amqp_writer_fun_t) write, sockfd);
+
+ conn = amqp_new_connection();
+
+ while (1) {
+ amqp_frame_t frame;
+ printf("Result %d\n", amqp_simple_wait_frame(conn, sockfd, &frame));
+ printf("Frame type %d, channel %d\n", frame.frame_type, frame.channel);
+ }
+
amqp_destroy_connection(conn);
+
+ die_on_error(close(sockfd), "Closing socket");
+
return 0;
}
diff --git a/librabbitmq/Makefile.am b/librabbitmq/Makefile.am
index 5fec206..6fae6d3 100644
--- a/librabbitmq/Makefile.am
+++ b/librabbitmq/Makefile.am
@@ -1,6 +1,6 @@
lib_LTLIBRARIES = librabbitmq.la
-librabbitmq_la_SOURCES = amqp_mem.c amqp_table.c
+librabbitmq_la_SOURCES = amqp_mem.c amqp_table.c amqp_connection.c amqp_socket.c
nodist_librabbitmq_la_SOURCES = amqp_framing.c
librabbitmq_la_INCLUDES = amqp_framing.h amqp.h
noinst_librabbitmq_la_INCLUDES = amqp_private.h
diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h
index 2554cf8..314e2c9 100644
--- a/librabbitmq/amqp.h
+++ b/librabbitmq/amqp.h
@@ -5,6 +5,8 @@
extern "C" {
#endif
+typedef int (*amqp_writer_fun_t)(int context, void const *buf, size_t len);
+
typedef int amqp_boolean_t;
typedef uint32_t amqp_method_number_t;
typedef uint32_t amqp_flags_t;
@@ -73,6 +75,9 @@ typedef struct amqp_frame_t_ {
#define AMQP_EXCEPTION_CATEGORY_CONNECTION 1
#define AMQP_EXCEPTION_CATEGORY_CHANNEL 2
+#define AMQP_FRAME_COMPLETE 0
+#define AMQP_FRAME_INCOMPLETE 1
+
/* Opaque struct. */
typedef struct amqp_connection_state_t_ *amqp_connection_state_t;
@@ -85,23 +90,29 @@ extern void empty_amqp_pool(amqp_pool_t *pool);
extern void *amqp_pool_alloc(amqp_pool_t *pool, size_t amount);
extern amqp_connection_state_t amqp_new_connection(void);
-extern int amqp_tune_connection(amqp_connection_state_t state,
- int frame_max);
-extern int amqp_destroy_connection(amqp_connection_state_t state);
+extern void amqp_tune_connection(amqp_connection_state_t state,
+ int frame_max);
+extern void amqp_destroy_connection(amqp_connection_state_t state);
extern int amqp_handle_input(amqp_connection_state_t state,
- amqp_pool_t *pool,
amqp_bytes_t received_data,
amqp_frame_t *decoded_frame);
extern int amqp_send_frame(amqp_connection_state_t state,
- amqp_pool_t *pool,
- int (*writer)(int context, void const *buf, size_t len),
+ amqp_writer_fun_t writer,
int context,
amqp_frame_t const *frame);
extern int amqp_table_entry_cmp(void const *entry1, void const *entry2);
+extern int amqp_open_socket(char const *hostname, int portnumber);
+
+extern int amqp_send_header(amqp_writer_fun_t writer, int context);
+
+extern int amqp_simple_wait_frame(amqp_connection_state_t state,
+ int sockfd,
+ amqp_frame_t *decoded_frame);
+
#ifdef __cplusplus
}
#endif
diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c
new file mode 100644
index 0000000..3100f7e
--- /dev/null
+++ b/librabbitmq/amqp_connection.c
@@ -0,0 +1,186 @@
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <stdint.h>
+#include <errno.h>
+
+#include "amqp.h"
+#include "amqp_framing.h"
+#include "amqp_private.h"
+
+#define INITIAL_FRAME_MAX 65536
+
+amqp_connection_state_t amqp_new_connection(void) {
+ amqp_connection_state_t state =
+ (amqp_connection_state_t) calloc(1, sizeof(struct amqp_connection_state_t_));
+ init_amqp_pool(&state->pool, INITIAL_FRAME_MAX);
+ state->inbound_buffer.len = INITIAL_FRAME_MAX;
+ state->inbound_buffer.bytes = malloc(INITIAL_FRAME_MAX);
+ state->outbound_buffer.len = INITIAL_FRAME_MAX;
+ state->outbound_buffer.bytes = malloc(INITIAL_FRAME_MAX);
+ state->frame_size_target = 0;
+ state->reset_required = 0;
+ return state;
+}
+
+void amqp_tune_connection(amqp_connection_state_t state,
+ int frame_max)
+{
+ empty_amqp_pool(&state->pool);
+ init_amqp_pool(&state->pool, frame_max);
+ state->inbound_buffer.len = frame_max;
+ realloc(state->inbound_buffer.bytes, frame_max);
+ state->outbound_buffer.len = frame_max;
+ realloc(state->outbound_buffer.bytes, frame_max);
+}
+
+void amqp_destroy_connection(amqp_connection_state_t state) {
+ empty_amqp_pool(&state->pool);
+ free(state->inbound_buffer.bytes);
+ free(state->outbound_buffer.bytes);
+ free(state);
+}
+
+int amqp_handle_input(amqp_connection_state_t state,
+ amqp_bytes_t received_data,
+ amqp_frame_t *decoded_frame)
+{
+ if (state->reset_required) {
+ size_t unconsumed_byte_count = state->frame_offset - state->frame_size_target;
+ recycle_amqp_pool(&state->pool);
+ memmove(state->inbound_buffer.bytes,
+ D_BYTES(state->inbound_buffer,
+ state->frame_size_target,
+ unconsumed_byte_count),
+ unconsumed_byte_count);
+ state->frame_offset = unconsumed_byte_count;
+ state->frame_size_target = 0;
+ state->reset_required = 0;
+ }
+
+ E_BYTES(state->inbound_buffer, state->frame_offset, received_data.len, received_data.bytes);
+ state->frame_offset += received_data.len;
+
+ if (state->frame_size_target == 0) {
+ if (state->frame_offset < 7) {
+ return AMQP_FRAME_INCOMPLETE;
+ }
+
+ state->frame_size_target = D_32(state->inbound_buffer, 3) + 8; /* 7 for header, 1 for footer */
+ }
+
+ if (state->frame_offset < state->frame_size_target) {
+ return AMQP_FRAME_INCOMPLETE;
+ }
+
+ if (D_8(state->inbound_buffer, state->frame_size_target - 1) != AMQP_FRAME_END) {
+ return -EINVAL;
+ }
+
+ state->reset_required = 1;
+
+ decoded_frame->frame_type = D_8(state->inbound_buffer, 0);
+ decoded_frame->channel = D_16(state->inbound_buffer, 1);
+ switch (decoded_frame->frame_type) {
+ case AMQP_FRAME_METHOD: {
+ amqp_bytes_t encoded;
+ int decode_result;
+
+ encoded.len = state->frame_size_target - 12; /* 7 for header, 4 for method id, 1 for footer */
+ encoded.bytes = D_BYTES(state->inbound_buffer, 11, encoded.len);
+
+ decoded_frame->payload.method.id = D_32(state->inbound_buffer, 7);
+ decode_result = amqp_decode_method(decoded_frame->payload.method.id,
+ &state->pool,
+ encoded,
+ &decoded_frame->payload.method.decoded);
+ if (decode_result != 0) return decode_result;
+
+ return AMQP_FRAME_COMPLETE;
+ }
+ case AMQP_FRAME_HEADER: {
+ amqp_bytes_t encoded;
+ int decode_result;
+
+ encoded.len = state->frame_size_target - 20; /* 7 for header, 12 for prop hdr, 1 for footer */
+ encoded.bytes = D_BYTES(state->inbound_buffer, 19, encoded.len);
+
+ decoded_frame->payload.properties.class_id = D_16(state->inbound_buffer, 7);
+ decode_result = amqp_decode_properties(decoded_frame->payload.properties.class_id,
+ &state->pool,
+ encoded,
+ &decoded_frame->payload.properties.decoded);
+ if (decode_result != 0) return decode_result;
+
+ return AMQP_FRAME_COMPLETE;
+ }
+ case AMQP_FRAME_BODY: {
+ size_t fragment_len = state->frame_size_target - 8; /* 7 for header, 1 for footer */
+ decoded_frame->payload.body_fragment.len = fragment_len;
+ decoded_frame->payload.body_fragment.bytes = D_BYTES(state->inbound_buffer, 7, fragment_len);
+ return AMQP_FRAME_COMPLETE;
+ }
+ default:
+ /* Ignore the frame */
+ return AMQP_FRAME_INCOMPLETE;
+ }
+}
+
+int amqp_send_frame(amqp_connection_state_t state,
+ amqp_writer_fun_t writer,
+ int context,
+ amqp_frame_t const *frame)
+{
+ amqp_bytes_t encoded;
+ int payload_len;
+ int separate_body;
+
+ E_8(state->outbound_buffer, 0, frame->frame_type);
+ E_16(state->outbound_buffer, 1, frame->channel);
+ switch (frame->frame_type) {
+ case AMQP_FRAME_METHOD: {
+ encoded.len = state->outbound_buffer.len - 8;
+ encoded.bytes = D_BYTES(state->outbound_buffer, 7, encoded.len);
+ payload_len = amqp_encode_method(frame->payload.method.id,
+ frame->payload.method.decoded,
+ encoded);
+ separate_body = 0;
+ break;
+ }
+ case AMQP_FRAME_HEADER: {
+ encoded.len = state->outbound_buffer.len - 8;
+ encoded.bytes = D_BYTES(state->outbound_buffer, 7, encoded.len);
+ payload_len = amqp_encode_properties(frame->payload.properties.class_id,
+ frame->payload.properties.decoded,
+ encoded);
+ separate_body = 0;
+ break;
+ }
+ case AMQP_FRAME_BODY: {
+ encoded = frame->payload.body_fragment;
+ payload_len = encoded.len;
+ separate_body = 1;
+ break;
+ }
+ default:
+ return -EINVAL;
+ }
+
+ if (payload_len < 0) {
+ return payload_len;
+ }
+
+ E_32(state->outbound_buffer, 3, payload_len);
+
+ if (separate_body) {
+ int result = writer(context, state->outbound_buffer.bytes, 7);
+ char frame_end_byte = AMQP_FRAME_END;
+ if (result < 0) return result;
+ result = writer(context, encoded.bytes, payload_len);
+ if (result < 0) return result;
+ return writer(context, &frame_end_byte, 1);
+ } else {
+ E_8(state->outbound_buffer, payload_len + 7, AMQP_FRAME_END);
+ return writer(context, state->outbound_buffer.bytes, payload_len + 8);
+ }
+}
diff --git a/librabbitmq/amqp_private.h b/librabbitmq/amqp_private.h
index 4f6628d..7a874e2 100644
--- a/librabbitmq/amqp_private.h
+++ b/librabbitmq/amqp_private.h
@@ -6,6 +6,12 @@ extern "C" {
#endif
struct amqp_connection_state_t_ {
+ amqp_pool_t pool;
+ amqp_bytes_t inbound_buffer;
+ amqp_bytes_t outbound_buffer;
+ size_t frame_offset;
+ size_t frame_size_target; /* 0 for unknown, still waiting for header */
+ amqp_boolean_t reset_required;
};
#define CHECK_LIMIT(b, o, l, v) ({ if ((o + l) > (b).len) { return -EFAULT; } (v); })
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
new file mode 100644
index 0000000..32af3e9
--- /dev/null
+++ b/librabbitmq/amqp_socket.c
@@ -0,0 +1,78 @@
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <stdint.h>
+#include <errno.h>
+
+#include "amqp.h"
+#include "amqp_framing.h"
+
+#include <sys/types.h>
+#include <sys/uio.h>
+#include <unistd.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <netinet/in.h>
+
+int amqp_open_socket(char const *hostname,
+ int portnumber)
+{
+ int sockfd;
+ struct sockaddr_in addr;
+ struct hostent *he;
+
+ he = gethostbyname(hostname);
+ if (he == NULL) {
+ return -ENOENT;
+ }
+
+ addr.sin_family = AF_INET;
+ addr.sin_port = htons(portnumber);
+ addr.sin_addr.s_addr = * (uint32_t *) he->h_addr_list[0];
+
+ sockfd = socket(PF_INET, SOCK_STREAM, 0);
+ if (connect(sockfd, (struct sockaddr *) &addr, sizeof(addr)) < 0) {
+ int result = -errno;
+ close(sockfd);
+ return result;
+ }
+
+ return sockfd;
+}
+
+int amqp_send_header(amqp_writer_fun_t writer, int context) {
+ char header[8];
+ header[0] = 'A';
+ header[1] = 'M';
+ header[2] = 'Q';
+ header[3] = 'P';
+ header[4] = 1;
+ header[5] = 1;
+ header[6] = AMQP_PROTOCOL_VERSION_MAJOR;
+ header[7] = AMQP_PROTOCOL_VERSION_MINOR;
+ return writer(context, &header[0], 8);
+}
+
+int amqp_simple_wait_frame(amqp_connection_state_t state,
+ int sockfd,
+ amqp_frame_t *decoded_frame)
+{
+ amqp_bytes_t buffer;
+ char buffer_bytes[4096];
+ buffer.bytes = buffer_bytes;
+ while (1) {
+ int result = read(sockfd, buffer_bytes, sizeof(buffer_bytes));
+ if (result < 0) {
+ return -errno;
+ }
+ buffer.len = result;
+ switch ((result = amqp_handle_input(state, buffer, decoded_frame))) {
+ case AMQP_FRAME_COMPLETE:
+ return AMQP_FRAME_COMPLETE;
+ case AMQP_FRAME_INCOMPLETE:
+ break;
+ default:
+ return result;
+ }
+ }
+}
diff --git a/librabbitmq/codegen.py b/librabbitmq/codegen.py
index 00159ae..474be78 100644
--- a/librabbitmq/codegen.py
+++ b/librabbitmq/codegen.py
@@ -392,6 +392,26 @@ extern "C" {
print "#define %s %s" % (cConstantName(c), v)
print
+ print """/* Function prototypes. */
+extern char const *amqp_method_name(amqp_method_number_t methodNumber);
+extern amqp_boolean_t amqp_method_has_content(amqp_method_number_t methodNumber);
+extern int amqp_decode_method(amqp_method_number_t methodNumber,
+ amqp_pool_t *pool,
+ amqp_bytes_t encoded,
+ void **decoded);
+extern int amqp_decode_properties(uint16_t class_id,
+ amqp_pool_t *pool,
+ amqp_bytes_t encoded,
+ void **decoded);
+extern int amqp_encode_method(amqp_method_number_t methodNumber,
+ void *decoded,
+ amqp_bytes_t encoded);
+extern int amqp_encode_properties(uint16_t class_id,
+ void *decoded,
+ amqp_bytes_t encoded);
+extern int amqp_exception_category(uint16_t code);
+"""
+
print "/* Method field records. */"
for m in methods:
methodid = m.klass.index << 16 | m.index