From 5568ccff3edef70041e45910077ed0ee17a435d7 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Thu, 18 Feb 2010 17:17:56 +1300 Subject: Remove amqp_rpc_reply global variable, making it instead state-local and accessible only through amqp_get_rpc_reply(). Add a better comment on amqp_get_rpc_reply() to be a little clearer about when it's inappropriate to use it. --- examples/amqp_bind.c | 4 ++-- examples/amqp_consumer.c | 8 ++++---- examples/amqp_exchange_declare.c | 4 ++-- examples/amqp_listen.c | 8 ++++---- examples/amqp_listenq.c | 4 ++-- examples/amqp_producer.c | 2 +- examples/amqp_sendstring.c | 2 +- examples/amqp_unbind.c | 4 ++-- librabbitmq/amqp.h | 16 ++++++++++++---- librabbitmq/amqp_api.c | 36 ++++++++++++++++-------------------- librabbitmq/amqp_private.h | 2 ++ 11 files changed, 48 insertions(+), 42 deletions(-) diff --git a/examples/amqp_bind.c b/examples/amqp_bind.c index afbfed9..fa3f907 100644 --- a/examples/amqp_bind.c +++ b/examples/amqp_bind.c @@ -38,14 +38,14 @@ int main(int argc, char const * const *argv) { die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in"); amqp_channel_open(conn, 1); - die_on_amqp_error(amqp_rpc_reply, "Opening channel"); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); amqp_queue_bind(conn, 1, amqp_cstring_bytes(queue), amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey), AMQP_EMPTY_TABLE); - die_on_amqp_error(amqp_rpc_reply, "Unbinding"); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Unbinding"); die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection"); diff --git a/examples/amqp_consumer.c b/examples/amqp_consumer.c index 230a94e..f93ddf3 100644 --- a/examples/amqp_consumer.c +++ b/examples/amqp_consumer.c @@ -107,12 +107,12 @@ int main(int argc, char const * const *argv) { die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in"); amqp_channel_open(conn, 1); - die_on_amqp_error(amqp_rpc_reply, "Opening channel"); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); { amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, AMQP_EMPTY_BYTES, 0, 0, 0, 1, AMQP_EMPTY_TABLE); - die_on_amqp_error(amqp_rpc_reply, "Declaring queue"); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue"); queuename = amqp_bytes_malloc_dup(r->queue); if (queuename.bytes == NULL) { die_on_error(-ENOMEM, "Copying queue name"); @@ -121,10 +121,10 @@ int main(int argc, char const * const *argv) { amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey), AMQP_EMPTY_TABLE); - die_on_amqp_error(amqp_rpc_reply, "Binding queue"); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue"); amqp_basic_consume(conn, 1, queuename, AMQP_EMPTY_BYTES, 0, 1, 0); - die_on_amqp_error(amqp_rpc_reply, "Consuming"); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming"); run(conn); diff --git a/examples/amqp_exchange_declare.c b/examples/amqp_exchange_declare.c index 02884e8..d0d456c 100644 --- a/examples/amqp_exchange_declare.c +++ b/examples/amqp_exchange_declare.c @@ -36,11 +36,11 @@ int main(int argc, char const * const *argv) { die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in"); amqp_channel_open(conn, 1); - die_on_amqp_error(amqp_rpc_reply, "Opening channel"); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); amqp_exchange_declare(conn, 1, amqp_cstring_bytes(exchange), amqp_cstring_bytes(exchangetype), 0, 0, 0, AMQP_EMPTY_TABLE); - die_on_amqp_error(amqp_rpc_reply, "Declaring exchange"); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring exchange"); die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection"); diff --git a/examples/amqp_listen.c b/examples/amqp_listen.c index d39e4b4..e487255 100644 --- a/examples/amqp_listen.c +++ b/examples/amqp_listen.c @@ -43,12 +43,12 @@ int main(int argc, char const * const *argv) { die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in"); amqp_channel_open(conn, 1); - die_on_amqp_error(amqp_rpc_reply, "Opening channel"); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); { amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, AMQP_EMPTY_BYTES, 0, 0, 0, 1, AMQP_EMPTY_TABLE); - die_on_amqp_error(amqp_rpc_reply, "Declaring queue"); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue"); queuename = amqp_bytes_malloc_dup(r->queue); if (queuename.bytes == NULL) { die_on_error(-ENOMEM, "Copying queue name"); @@ -57,10 +57,10 @@ int main(int argc, char const * const *argv) { amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey), AMQP_EMPTY_TABLE); - die_on_amqp_error(amqp_rpc_reply, "Binding queue"); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue"); amqp_basic_consume(conn, 1, queuename, AMQP_EMPTY_BYTES, 0, 1, 0); - die_on_amqp_error(amqp_rpc_reply, "Consuming"); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming"); { amqp_frame_t frame; diff --git a/examples/amqp_listenq.c b/examples/amqp_listenq.c index b6d37fc..2478127 100644 --- a/examples/amqp_listenq.c +++ b/examples/amqp_listenq.c @@ -39,10 +39,10 @@ int main(int argc, char const * const *argv) { die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in"); amqp_channel_open(conn, 1); - die_on_amqp_error(amqp_rpc_reply, "Opening channel"); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); amqp_basic_consume(conn, 1, amqp_cstring_bytes(queuename), AMQP_EMPTY_BYTES, 0, 0, 0); - die_on_amqp_error(amqp_rpc_reply, "Consuming"); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming"); { amqp_frame_t frame; diff --git a/examples/amqp_producer.c b/examples/amqp_producer.c index a60cb3e..e0b8079 100644 --- a/examples/amqp_producer.c +++ b/examples/amqp_producer.c @@ -95,7 +95,7 @@ int main(int argc, char const * const *argv) { die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in"); amqp_channel_open(conn, 1); - die_on_amqp_error(amqp_rpc_reply, "Opening channel"); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); send_batch(conn, "test queue", rate_limit, message_count); diff --git a/examples/amqp_sendstring.c b/examples/amqp_sendstring.c index a321f52..e512ef0 100644 --- a/examples/amqp_sendstring.c +++ b/examples/amqp_sendstring.c @@ -38,7 +38,7 @@ int main(int argc, char const * const *argv) { die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in"); amqp_channel_open(conn, 1); - die_on_amqp_error(amqp_rpc_reply, "Opening channel"); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); { amqp_basic_properties_t props; diff --git a/examples/amqp_unbind.c b/examples/amqp_unbind.c index 9a9a35c..e033702 100644 --- a/examples/amqp_unbind.c +++ b/examples/amqp_unbind.c @@ -38,14 +38,14 @@ int main(int argc, char const * const *argv) { die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in"); amqp_channel_open(conn, 1); - die_on_amqp_error(amqp_rpc_reply, "Opening channel"); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); amqp_queue_unbind(conn, 1, amqp_cstring_bytes(queue), amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey), AMQP_EMPTY_TABLE); - die_on_amqp_error(amqp_rpc_reply, "Unbinding"); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Unbinding"); die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection"); diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h index a2b062c..906c900 100644 --- a/librabbitmq/amqp.h +++ b/librabbitmq/amqp.h @@ -337,8 +337,6 @@ extern amqp_rpc_reply_t amqp_login(amqp_connection_state_t state, int heartbeat, amqp_sasl_method_enum sasl_method, ...); -extern amqp_rpc_reply_t amqp_rpc_reply; - extern struct amqp_channel_open_ok_t_ *amqp_channel_open(amqp_connection_state_t state, amqp_channel_t channel); @@ -423,9 +421,19 @@ extern struct amqp_queue_purge_ok_t_ *amqp_queue_purge(amqp_connection_state_t s extern amqp_boolean_t amqp_data_in_buffer(amqp_connection_state_t state); /* - * Expose amqp_rpc_reply to libraries. + * For those API operations (such as amqp_basic_ack, + * amqp_queue_declare, and so on) that do not themselves return + * amqp_rpc_reply_t instances, we need some way of discovering what, + * if anything, went wrong. amqp_get_rpc_reply() returns the most + * recent amqp_rpc_reply_t instance corresponding to such an API + * operation for the given connection. + * + * Only use it for operations that do not themselves return + * amqp_rpc_reply_t; operations that do return amqp_rpc_reply_t + * generally do NOT update this per-connection-global amqp_rpc_reply_t + * instance. */ -extern amqp_rpc_reply_t amqp_get_rpc_reply(void); +extern amqp_rpc_reply_t amqp_get_rpc_reply(amqp_connection_state_t state); #ifdef __cplusplus } diff --git a/librabbitmq/amqp_api.c b/librabbitmq/amqp_api.c index 1d329e0..7cb865d 100644 --- a/librabbitmq/amqp_api.c +++ b/librabbitmq/amqp_api.c @@ -10,17 +10,15 @@ #include -amqp_rpc_reply_t amqp_rpc_reply; - -#define RPC_REPLY(replytype) \ - (amqp_rpc_reply.reply_type == AMQP_RESPONSE_NORMAL \ - ? (replytype *) amqp_rpc_reply.reply.decoded \ +#define RPC_REPLY(replytype) \ + (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL \ + ? (replytype *) state->most_recent_api_result.reply.decoded \ : NULL) amqp_channel_open_ok_t *amqp_channel_open(amqp_connection_state_t state, amqp_channel_t channel) { - amqp_rpc_reply = + state->most_recent_api_result = AMQP_SIMPLE_RPC(state, channel, CHANNEL, OPEN, OPEN_OK, amqp_channel_open_t, AMQP_EMPTY_BYTES); @@ -118,7 +116,7 @@ amqp_exchange_declare_ok_t *amqp_exchange_declare(amqp_connection_state_t state, amqp_boolean_t auto_delete, amqp_table_t arguments) { - amqp_rpc_reply = + state->most_recent_api_result = AMQP_SIMPLE_RPC(state, channel, EXCHANGE, DECLARE, DECLARE_OK, amqp_exchange_declare_t, 0, exchange, type, passive, durable, auto_delete, 0, 0, arguments); @@ -134,7 +132,7 @@ amqp_queue_declare_ok_t *amqp_queue_declare(amqp_connection_state_t state, amqp_boolean_t auto_delete, amqp_table_t arguments) { - amqp_rpc_reply = + state->most_recent_api_result = AMQP_SIMPLE_RPC(state, channel, QUEUE, DECLARE, DECLARE_OK, amqp_queue_declare_t, 0, queue, passive, durable, exclusive, auto_delete, 0, arguments); @@ -148,7 +146,7 @@ amqp_queue_bind_ok_t *amqp_queue_bind(amqp_connection_state_t state, amqp_bytes_t routing_key, amqp_table_t arguments) { - amqp_rpc_reply = + state->most_recent_api_result = AMQP_SIMPLE_RPC(state, channel, QUEUE, BIND, BIND_OK, amqp_queue_bind_t, 0, queue, exchange, routing_key, 0, arguments); @@ -162,7 +160,7 @@ amqp_queue_unbind_ok_t *amqp_queue_unbind(amqp_connection_state_t state, amqp_bytes_t binding_key, amqp_table_t arguments) { - amqp_rpc_reply = + state->most_recent_api_result = AMQP_SIMPLE_RPC(state, channel, QUEUE, UNBIND, UNBIND_OK, amqp_queue_unbind_t, 0, queue, exchange, binding_key, arguments); @@ -177,7 +175,7 @@ amqp_basic_consume_ok_t *amqp_basic_consume(amqp_connection_state_t state, amqp_boolean_t no_ack, amqp_boolean_t exclusive) { - amqp_rpc_reply = + state->most_recent_api_result = AMQP_SIMPLE_RPC(state, channel, BASIC, CONSUME, CONSUME_OK, amqp_basic_consume_t, 0, queue, consumer_tag, no_local, no_ack, exclusive, 0); @@ -203,8 +201,9 @@ amqp_queue_purge_ok_t *amqp_queue_purge(amqp_connection_state_t state, amqp_bytes_t queue, amqp_boolean_t no_wait) { - amqp_rpc_reply = AMQP_SIMPLE_RPC(state, channel, QUEUE, PURGE, PURGE_OK, - amqp_queue_purge_t, channel, queue, no_wait); + state->most_recent_api_result = + AMQP_SIMPLE_RPC(state, channel, QUEUE, PURGE, PURGE_OK, + amqp_queue_purge_t, channel, queue, no_wait); return RPC_REPLY(amqp_queue_purge_ok_t); } @@ -216,17 +215,14 @@ amqp_rpc_reply_t amqp_basic_get(amqp_connection_state_t state, amqp_method_number_t replies[] = { AMQP_BASIC_GET_OK_METHOD, AMQP_BASIC_GET_EMPTY_METHOD, 0 }; - amqp_rpc_reply = + state->most_recent_api_result = AMQP_MULTIPLE_RESPONSE_RPC(state, channel, BASIC, GET, replies, amqp_basic_get_t, channel, queue, no_ack); - return amqp_rpc_reply; + return state->most_recent_api_result; } -/* - * Expose amqp_rpc_reply to dynamically linked libraries - */ -amqp_rpc_reply_t amqp_get_rpc_reply(void) +amqp_rpc_reply_t amqp_get_rpc_reply(amqp_connection_state_t state) { - return amqp_rpc_reply; + return state->most_recent_api_result; } diff --git a/librabbitmq/amqp_private.h b/librabbitmq/amqp_private.h index 03a46fe..aba63b5 100644 --- a/librabbitmq/amqp_private.h +++ b/librabbitmq/amqp_private.h @@ -71,6 +71,8 @@ struct amqp_connection_state_t_ { amqp_link_t *first_queued_frame; amqp_link_t *last_queued_frame; + + amqp_rpc_reply_t most_recent_api_result; }; #define CHECK_LIMIT(b, o, l, v) ({ if ((o + l) > (b).len) { return -EFAULT; } (v); }) -- cgit v1.2.1