summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTony Garnock-Jones <tonygarnockjones@gmail.com>2010-02-18 17:20:39 +1300
committerTony Garnock-Jones <tonygarnockjones@gmail.com>2010-02-18 17:20:39 +1300
commitc877f5da14b591ece42d93651ed0a7d9c38b3677 (patch)
tree111b9ed9135d3defc2d4893cfa17de20364a78a1
parent896df535ac9d877ca239c95003d55c8fee5ef047 (diff)
parent5568ccff3edef70041e45910077ed0ee17a435d7 (diff)
downloadrabbitmq-c-github-ask-c877f5da14b591ece42d93651ed0a7d9c38b3677.tar.gz
Merge default into amqp_0_9_1
-rw-r--r--README82
-rw-r--r--examples/amqp_bind.c4
-rw-r--r--examples/amqp_consumer.c8
-rw-r--r--examples/amqp_exchange_declare.c4
-rw-r--r--examples/amqp_listen.c8
-rw-r--r--examples/amqp_listenq.c4
-rw-r--r--examples/amqp_producer.c2
-rw-r--r--examples/amqp_sendstring.c2
-rw-r--r--examples/amqp_unbind.c4
-rw-r--r--librabbitmq/amqp.h16
-rw-r--r--librabbitmq/amqp_api.c60
-rw-r--r--librabbitmq/amqp_private.h2
-rw-r--r--librabbitmq/amqp_socket.c10
13 files changed, 149 insertions, 57 deletions
diff --git a/README b/README
index eea22e6..3308d9d 100644
--- a/README
+++ b/README
@@ -1,8 +1,86 @@
-If you're working from a mercurial checkout,
+# RabbitMQ C AMQP client library
+
+## Introduction
+
+This is a C-language AMQP client library for use with AMQP servers
+speaking protocol versions 0-8 and 0-9-1.
+
+ - <http://www.rabbitmq.com/>
+ - <http://www.amqp.org/>
+ - <http://hg.rabbitmq.com/rabbitmq-c>
+
+*NB*: This library's source code supports *either* 0-8 *or* 0-9-1, not
+both simultaneously. Please check carefully that you have the variant
+you require.
+
+Announcements regarding the library are periodically made on the
+RabbitMQ mailing list and on LShift's blog.
+
+ - <http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss>
+ - <http://www.lshift.net/blog/>
+ - <http://www.lshift.net/blog/category/lshift-sw/rabbitmq/>
+
+## Retrieving the code
+
+In addition to the source code for this library, you will require a
+copy of `rabbitmq-codegen`. Here is a short `sh` script for retrieving
+the necessary pieces:
+
+ hg clone http://hg.rabbitmq.com/rabbitmq-codegen/
+ hg clone http://hg.rabbitmq.com/rabbitmq-c/
+
+You will also need a recent python with the simplejson module
+installed, and the GNU autotools (autoconf, automake, libtool etc).
+
+In current releases, the default (trunk) branch of the mercurial
+repository hosting the `rabbitmq-c` code is set up for AMQP 0-8
+support, with AMQP 0-9-1 support living on a separate mercurial
+branch. To switch your checked-out copy of the source code to 0-9-1
+support,
+
+ (cd rabbitmq-codegen; hg up amqp_0_9_1)
+ (cd rabbitmq-c; hg up amqp_0_9_1)
+
+before building the code. If you switch branches after having compiled
+the code, make sure to rerun `autoreconf`, `configure`, `make clean`
+and `make` after switching branches.
+
+## Building the code
+
+Once you have all the prerequisites, change to the `rabbitmq-c`
+directory and run
autoreconf -i
-before
+to run the GNU autotools and generate the configure script, followed
+by
./configure
make
+
+to build the `librabbitmq` library and the example programs.
+
+## Running the examples
+
+Arrange for a RabbitMQ or other AMQP server to be running on
+`localhost` at TCP port number 5672.
+
+In one terminal, run
+
+ ./examples/amqp_listen localhost 5672 amq.direct test
+
+In another terminal,
+
+ ./examples/amqp_sendstring localhost 5672 amq.direct test "hello world"
+
+You should see output similar to the following in the listener's
+terminal window:
+
+ Result 1
+ Frame type 1, channel 1
+ Method AMQP_BASIC_DELIVER_METHOD
+ Delivery 1, exchange amq.direct routingkey test
+ Content-type: text/plain
+ ----
+ 00000000: 68 65 6C 6C 6F 20 77 6F : 72 6C 64 hello world
+ 0000000B:
diff --git a/examples/amqp_bind.c b/examples/amqp_bind.c
index afbfed9..fa3f907 100644
--- a/examples/amqp_bind.c
+++ b/examples/amqp_bind.c
@@ -38,14 +38,14 @@ int main(int argc, char const * const *argv) {
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
- die_on_amqp_error(amqp_rpc_reply, "Opening channel");
+ die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
amqp_queue_bind(conn, 1,
amqp_cstring_bytes(queue),
amqp_cstring_bytes(exchange),
amqp_cstring_bytes(bindingkey),
AMQP_EMPTY_TABLE);
- die_on_amqp_error(amqp_rpc_reply, "Unbinding");
+ die_on_amqp_error(amqp_get_rpc_reply(conn), "Unbinding");
die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");
diff --git a/examples/amqp_consumer.c b/examples/amqp_consumer.c
index b91ec16..3ab296d 100644
--- a/examples/amqp_consumer.c
+++ b/examples/amqp_consumer.c
@@ -107,12 +107,12 @@ int main(int argc, char const * const *argv) {
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
- die_on_amqp_error(amqp_rpc_reply, "Opening channel");
+ die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
{
amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, AMQP_EMPTY_BYTES, 0, 0, 0, 1,
AMQP_EMPTY_TABLE);
- die_on_amqp_error(amqp_rpc_reply, "Declaring queue");
+ die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue");
queuename = amqp_bytes_malloc_dup(r->queue);
if (queuename.bytes == NULL) {
die_on_error(-ENOMEM, "Copying queue name");
@@ -121,10 +121,10 @@ int main(int argc, char const * const *argv) {
amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey),
AMQP_EMPTY_TABLE);
- die_on_amqp_error(amqp_rpc_reply, "Binding queue");
+ die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue");
amqp_basic_consume(conn, 1, queuename, AMQP_EMPTY_BYTES, 0, 1, 0, AMQP_EMPTY_TABLE);
- die_on_amqp_error(amqp_rpc_reply, "Consuming");
+ die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming");
run(conn);
diff --git a/examples/amqp_exchange_declare.c b/examples/amqp_exchange_declare.c
index 02884e8..d0d456c 100644
--- a/examples/amqp_exchange_declare.c
+++ b/examples/amqp_exchange_declare.c
@@ -36,11 +36,11 @@ int main(int argc, char const * const *argv) {
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
- die_on_amqp_error(amqp_rpc_reply, "Opening channel");
+ die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
amqp_exchange_declare(conn, 1, amqp_cstring_bytes(exchange), amqp_cstring_bytes(exchangetype),
0, 0, 0, AMQP_EMPTY_TABLE);
- die_on_amqp_error(amqp_rpc_reply, "Declaring exchange");
+ die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring exchange");
die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");
diff --git a/examples/amqp_listen.c b/examples/amqp_listen.c
index 3f52da4..7be83be 100644
--- a/examples/amqp_listen.c
+++ b/examples/amqp_listen.c
@@ -43,12 +43,12 @@ int main(int argc, char const * const *argv) {
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
- die_on_amqp_error(amqp_rpc_reply, "Opening channel");
+ die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
{
amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, AMQP_EMPTY_BYTES, 0, 0, 0, 1,
AMQP_EMPTY_TABLE);
- die_on_amqp_error(amqp_rpc_reply, "Declaring queue");
+ die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue");
queuename = amqp_bytes_malloc_dup(r->queue);
if (queuename.bytes == NULL) {
die_on_error(-ENOMEM, "Copying queue name");
@@ -57,10 +57,10 @@ int main(int argc, char const * const *argv) {
amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey),
AMQP_EMPTY_TABLE);
- die_on_amqp_error(amqp_rpc_reply, "Binding queue");
+ die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue");
amqp_basic_consume(conn, 1, queuename, AMQP_EMPTY_BYTES, 0, 1, 0, AMQP_EMPTY_TABLE);
- die_on_amqp_error(amqp_rpc_reply, "Consuming");
+ die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming");
{
amqp_frame_t frame;
diff --git a/examples/amqp_listenq.c b/examples/amqp_listenq.c
index 958a063..5dea81e 100644
--- a/examples/amqp_listenq.c
+++ b/examples/amqp_listenq.c
@@ -39,10 +39,10 @@ int main(int argc, char const * const *argv) {
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
- die_on_amqp_error(amqp_rpc_reply, "Opening channel");
+ die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
amqp_basic_consume(conn, 1, amqp_cstring_bytes(queuename), AMQP_EMPTY_BYTES, 0, 0, 0, AMQP_EMPTY_TABLE);
- die_on_amqp_error(amqp_rpc_reply, "Consuming");
+ die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming");
{
amqp_frame_t frame;
diff --git a/examples/amqp_producer.c b/examples/amqp_producer.c
index a60cb3e..e0b8079 100644
--- a/examples/amqp_producer.c
+++ b/examples/amqp_producer.c
@@ -95,7 +95,7 @@ int main(int argc, char const * const *argv) {
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
- die_on_amqp_error(amqp_rpc_reply, "Opening channel");
+ die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
send_batch(conn, "test queue", rate_limit, message_count);
diff --git a/examples/amqp_sendstring.c b/examples/amqp_sendstring.c
index a321f52..e512ef0 100644
--- a/examples/amqp_sendstring.c
+++ b/examples/amqp_sendstring.c
@@ -38,7 +38,7 @@ int main(int argc, char const * const *argv) {
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
- die_on_amqp_error(amqp_rpc_reply, "Opening channel");
+ die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
{
amqp_basic_properties_t props;
diff --git a/examples/amqp_unbind.c b/examples/amqp_unbind.c
index 9a9a35c..e033702 100644
--- a/examples/amqp_unbind.c
+++ b/examples/amqp_unbind.c
@@ -38,14 +38,14 @@ int main(int argc, char const * const *argv) {
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
- die_on_amqp_error(amqp_rpc_reply, "Opening channel");
+ die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
amqp_queue_unbind(conn, 1,
amqp_cstring_bytes(queue),
amqp_cstring_bytes(exchange),
amqp_cstring_bytes(bindingkey),
AMQP_EMPTY_TABLE);
- die_on_amqp_error(amqp_rpc_reply, "Unbinding");
+ die_on_amqp_error(amqp_get_rpc_reply(conn), "Unbinding");
die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");
diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h
index 749671d..adfaaf6 100644
--- a/librabbitmq/amqp.h
+++ b/librabbitmq/amqp.h
@@ -337,8 +337,6 @@ extern amqp_rpc_reply_t amqp_login(amqp_connection_state_t state,
int heartbeat,
amqp_sasl_method_enum sasl_method, ...);
-extern amqp_rpc_reply_t amqp_rpc_reply;
-
extern struct amqp_channel_open_ok_t_ *amqp_channel_open(amqp_connection_state_t state,
amqp_channel_t channel);
@@ -424,9 +422,19 @@ extern struct amqp_queue_purge_ok_t_ *amqp_queue_purge(amqp_connection_state_t s
extern amqp_boolean_t amqp_data_in_buffer(amqp_connection_state_t state);
/*
- * Expose amqp_rpc_reply to libraries.
+ * For those API operations (such as amqp_basic_ack,
+ * amqp_queue_declare, and so on) that do not themselves return
+ * amqp_rpc_reply_t instances, we need some way of discovering what,
+ * if anything, went wrong. amqp_get_rpc_reply() returns the most
+ * recent amqp_rpc_reply_t instance corresponding to such an API
+ * operation for the given connection.
+ *
+ * Only use it for operations that do not themselves return
+ * amqp_rpc_reply_t; operations that do return amqp_rpc_reply_t
+ * generally do NOT update this per-connection-global amqp_rpc_reply_t
+ * instance.
*/
-extern amqp_rpc_reply_t amqp_get_rpc_reply(void);
+extern amqp_rpc_reply_t amqp_get_rpc_reply(amqp_connection_state_t state);
#ifdef __cplusplus
}
diff --git a/librabbitmq/amqp_api.c b/librabbitmq/amqp_api.c
index 8adf3f0..fdf64a0 100644
--- a/librabbitmq/amqp_api.c
+++ b/librabbitmq/amqp_api.c
@@ -10,17 +10,15 @@
#include <assert.h>
-amqp_rpc_reply_t amqp_rpc_reply;
-
-#define RPC_REPLY(replytype) \
- (amqp_rpc_reply.reply_type == AMQP_RESPONSE_NORMAL \
- ? (replytype *) amqp_rpc_reply.reply.decoded \
+#define RPC_REPLY(replytype) \
+ (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL \
+ ? (replytype *) state->most_recent_api_result.reply.decoded \
: NULL)
amqp_channel_open_ok_t *amqp_channel_open(amqp_connection_state_t state,
amqp_channel_t channel)
{
- amqp_rpc_reply =
+ state->most_recent_api_result =
AMQP_SIMPLE_RPC(state, channel, CHANNEL, OPEN, OPEN_OK,
amqp_channel_open_t,
AMQP_EMPTY_BYTES);
@@ -118,7 +116,7 @@ amqp_exchange_declare_ok_t *amqp_exchange_declare(amqp_connection_state_t state,
amqp_boolean_t auto_delete,
amqp_table_t arguments)
{
- amqp_rpc_reply =
+ state->most_recent_api_result =
AMQP_SIMPLE_RPC(state, channel, EXCHANGE, DECLARE, DECLARE_OK,
amqp_exchange_declare_t,
0, exchange, type, passive, durable, auto_delete, 0, 0, arguments);
@@ -134,7 +132,7 @@ amqp_queue_declare_ok_t *amqp_queue_declare(amqp_connection_state_t state,
amqp_boolean_t auto_delete,
amqp_table_t arguments)
{
- amqp_rpc_reply =
+ state->most_recent_api_result =
AMQP_SIMPLE_RPC(state, channel, QUEUE, DECLARE, DECLARE_OK,
amqp_queue_declare_t,
0, queue, passive, durable, exclusive, auto_delete, 0, arguments);
@@ -148,7 +146,7 @@ amqp_queue_bind_ok_t *amqp_queue_bind(amqp_connection_state_t state,
amqp_bytes_t routing_key,
amqp_table_t arguments)
{
- amqp_rpc_reply =
+ state->most_recent_api_result =
AMQP_SIMPLE_RPC(state, channel, QUEUE, BIND, BIND_OK,
amqp_queue_bind_t,
0, queue, exchange, routing_key, 0, arguments);
@@ -162,7 +160,7 @@ amqp_queue_unbind_ok_t *amqp_queue_unbind(amqp_connection_state_t state,
amqp_bytes_t binding_key,
amqp_table_t arguments)
{
- amqp_rpc_reply =
+ state->most_recent_api_result =
AMQP_SIMPLE_RPC(state, channel, QUEUE, UNBIND, UNBIND_OK,
amqp_queue_unbind_t,
0, queue, exchange, binding_key, arguments);
@@ -178,7 +176,7 @@ amqp_basic_consume_ok_t *amqp_basic_consume(amqp_connection_state_t state,
amqp_boolean_t exclusive,
amqp_table_t filter)
{
- amqp_rpc_reply =
+ state->most_recent_api_result =
AMQP_SIMPLE_RPC(state, channel, BASIC, CONSUME, CONSUME_OK,
amqp_basic_consume_t,
0, queue, consumer_tag, no_local, no_ack, exclusive, 0, filter);
@@ -200,34 +198,32 @@ int amqp_basic_ack(amqp_connection_state_t state,
}
amqp_queue_purge_ok_t *amqp_queue_purge(amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_bytes_t queue,
- amqp_boolean_t no_wait)
+ amqp_channel_t channel,
+ amqp_bytes_t queue,
+ amqp_boolean_t no_wait)
{
- amqp_rpc_reply = AMQP_SIMPLE_RPC(state, channel, QUEUE, PURGE, PURGE_OK,
- amqp_queue_purge_t, channel, queue, no_wait);
+ state->most_recent_api_result =
+ AMQP_SIMPLE_RPC(state, channel, QUEUE, PURGE, PURGE_OK,
+ amqp_queue_purge_t, channel, queue, no_wait);
return RPC_REPLY(amqp_queue_purge_ok_t);
}
amqp_rpc_reply_t amqp_basic_get(amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_bytes_t queue,
- amqp_boolean_t no_ack)
+ amqp_channel_t channel,
+ amqp_bytes_t queue,
+ amqp_boolean_t no_ack)
{
- amqp_method_number_t replies[] = { AMQP_BASIC_GET_OK_METHOD,
- AMQP_BASIC_GET_EMPTY_METHOD,
- 0 };
- amqp_rpc_reply =
- AMQP_MULTIPLE_RESPONSE_RPC(state, channel, BASIC, GET, replies,
- amqp_basic_get_t,
- channel, queue, no_ack);
- return amqp_rpc_reply;
+ amqp_method_number_t replies[] = { AMQP_BASIC_GET_OK_METHOD,
+ AMQP_BASIC_GET_EMPTY_METHOD,
+ 0 };
+ state->most_recent_api_result =
+ AMQP_MULTIPLE_RESPONSE_RPC(state, channel, BASIC, GET, replies,
+ amqp_basic_get_t,
+ channel, queue, no_ack);
+ return state->most_recent_api_result;
}
-/*
- * Expose amqp_rpc_reply to dynamically linked libraries
- */
-amqp_rpc_reply_t amqp_get_rpc_reply(void)
+amqp_rpc_reply_t amqp_get_rpc_reply(amqp_connection_state_t state)
{
- return amqp_rpc_reply;
+ return state->most_recent_api_result;
}
diff --git a/librabbitmq/amqp_private.h b/librabbitmq/amqp_private.h
index 03a46fe..aba63b5 100644
--- a/librabbitmq/amqp_private.h
+++ b/librabbitmq/amqp_private.h
@@ -71,6 +71,8 @@ struct amqp_connection_state_t_ {
amqp_link_t *first_queued_frame;
amqp_link_t *last_queued_frame;
+
+ amqp_rpc_reply_t most_recent_api_result;
};
#define CHECK_LIMIT(b, o, l, v) ({ if ((o + l) > (b).len) { return -EFAULT; } (v); })
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index c6412a9..0cf9c18 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -386,10 +386,18 @@ amqp_rpc_reply_t amqp_login(amqp_connection_state_t state,
{
va_list vl;
amqp_rpc_reply_t result;
+ int status;
va_start(vl, sasl_method);
- amqp_login_inner(state, channel_max, frame_max, heartbeat, sasl_method, vl);
+ status = amqp_login_inner(state, channel_max, frame_max, heartbeat, sasl_method, vl);
+ if (status <= 0) {
+ result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
+ result.reply.id = 0;
+ result.reply.decoded = NULL;
+ result.library_errno = -status;
+ return result;
+ }
{
amqp_connection_open_t s =