From 3cb254d902a9b226bf95696af3a98839bb7797a4 Mon Sep 17 00:00:00 2001 From: Scott Brooks Date: Wed, 14 Oct 2009 10:42:50 -0400 Subject: Updated amqp_simple_rpc to take a 0 terminated array of amqp_method_number_t replies Updated AMQP_SIMPLE_RPC macro for the new format Added AMQP_MULTIPLE_RESPONSE_RPC macro to take a 0 terminated array of amqp_method_number_t replies Added amqp_basic_get function Added amqp_queue_purge function Added amqp_get_rpc_reply function to expose amqp_rpc_reply when we are not statically linking Added amqp_data_in_buffer function to check to see if amqp_simple_wait_frames will hit a blocking read --- librabbitmq/amqp.h | 35 ++++++++++++++++++++++++++++++++--- librabbitmq/amqp_api.c | 29 +++++++++++++++++++++++++++++ librabbitmq/amqp_socket.c | 24 ++++++++++++++++++++---- 3 files changed, 81 insertions(+), 7 deletions(-) diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h index 4c135b1..71d77de 100644 --- a/librabbitmq/amqp.h +++ b/librabbitmq/amqp.h @@ -191,18 +191,31 @@ extern int amqp_send_method(amqp_connection_state_t state, extern amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state, amqp_channel_t channel, amqp_method_number_t request_id, - amqp_method_number_t expected_reply_id, + amqp_method_number_t *expected_reply_ids, void *decoded_request_method); +#define AMQP_EXPAND_METHOD(classname, methodname) (AMQP_ ## classname ## _ ## methodname ## _METHOD) + #define AMQP_SIMPLE_RPC(state, channel, classname, requestname, replyname, structname, ...) \ + ({ \ + structname _simple_rpc_request___ = (structname) { __VA_ARGS__ }; \ + amqp_method_number_t _replies__[2] = { AMQP_EXPAND_METHOD(classname, replyname), 0}; \ + amqp_simple_rpc(state, channel, \ + AMQP_EXPAND_METHOD(classname, requestname), \ + (amqp_method_number_t *)&_replies__, \ + &_simple_rpc_request___); \ + }) + +#define AMQP_MULTIPLE_RESPONSE_RPC(state, channel, classname, requestname, replynames, structname, ...) \ ({ \ structname _simple_rpc_request___ = (structname) { __VA_ARGS__ }; \ amqp_simple_rpc(state, channel, \ - AMQP_ ## classname ## _ ## requestname ## _METHOD, \ - AMQP_ ## classname ## _ ## replyname ## _METHOD, \ + AMQP_EXPAND_METHOD(classname, requestname), \ + replynames, \ &_simple_rpc_request___); \ }) + extern amqp_rpc_reply_t amqp_login(amqp_connection_state_t state, char const *vhost, int channel_max, @@ -276,6 +289,22 @@ extern int amqp_basic_ack(amqp_connection_state_t state, uint64_t delivery_tag, amqp_boolean_t multiple); +extern amqp_rpc_reply_t amqp_basic_get(amqp_connection_state_t state, + amqp_channel_t channel, + amqp_bytes_t queue, + amqp_boolean_t no_ack); + +extern struct amqp_queue_purge_ok_t_ *amqp_queue_purge(amqp_connection_state_t state, + amqp_channel_t channel, + amqp_bytes_t queue, + amqp_boolean_t no_wait); + +// Can be used to see if there is data still in the buffer, if so +// calling amqp_simple_wait_frame will not enter the blocking read +// possibly amqp_frames_enqueued should be used for this? +extern amqp_boolean_t amqp_data_in_buffer(amqp_connection_state_t state); +// Expose amqp_rpc_reply to libraries +extern amqp_rpc_reply_t amqp_get_rpc_reply(); #ifdef __cplusplus } #endif diff --git a/librabbitmq/amqp_api.c b/librabbitmq/amqp_api.c index a52d947..2b887c7 100644 --- a/librabbitmq/amqp_api.c +++ b/librabbitmq/amqp_api.c @@ -197,3 +197,32 @@ int amqp_basic_ack(amqp_connection_state_t state, AMQP_CHECK_RESULT(amqp_send_method(state, channel, AMQP_BASIC_ACK_METHOD, &m)); return 0; } + +amqp_queue_purge_ok_t *amqp_queue_purge(amqp_connection_state_t state, + amqp_channel_t channel, + amqp_bytes_t queue, + amqp_boolean_t no_wait) +{ + amqp_rpc_reply = AMQP_SIMPLE_RPC(state, channel, QUEUE, PURGE, PURGE_OK, + amqp_queue_purge_t, channel, queue, no_wait); + return RPC_REPLY(amqp_queue_purge_ok_t); +} + +amqp_rpc_reply_t amqp_basic_get(amqp_connection_state_t state, + amqp_channel_t channel, + amqp_bytes_t queue, + amqp_boolean_t no_ack) +{ + amqp_method_number_t replies[] = { AMQP_EXPAND_METHOD(BASIC, GET_OK), AMQP_EXPAND_METHOD(BASIC, GET_EMPTY), 0}; + amqp_rpc_reply = + AMQP_MULTIPLE_RESPONSE_RPC(state, channel, BASIC, GET, replies, + amqp_basic_get_t, + channel, queue, no_ack); + return amqp_rpc_reply; +} + +// Expose amqp_rpc_reply to dynamically linked libraries +amqp_rpc_reply_t amqp_get_rpc_reply() +{ + return amqp_rpc_reply; +} diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c index d29dcd6..b4b9332 100644 --- a/librabbitmq/amqp_socket.c +++ b/librabbitmq/amqp_socket.c @@ -107,6 +107,12 @@ amqp_boolean_t amqp_frames_enqueued(amqp_connection_state_t state) { return (state->first_queued_frame != NULL); } +// Check to see if we have data in our buffer so we will avoid +// if this returns 1, we will avoid a blocking read in amqp_simple_wait_frame +amqp_boolean_t amqp_data_in_buffer(amqp_connection_state_t state) { + return (state->sock_inbound_offset < state->sock_inbound_limit); +} + static int wait_frame_inner(amqp_connection_state_t state, amqp_frame_t *decoded_frame) { @@ -202,10 +208,19 @@ int amqp_send_method(amqp_connection_state_t state, return amqp_send_frame(state, &frame); } +static int amqp_id_in_reply_list( amqp_method_number_t expected, amqp_method_number_t *list ) +{ + while ( *list != 0 ) { + if ( *list == expected ) return 1; + list++; + } + return 0; +} + amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state, amqp_channel_t channel, amqp_method_number_t request_id, - amqp_method_number_t expected_reply_id, + amqp_method_number_t *expected_reply_ids, void *decoded_request_method) { int status; @@ -241,7 +256,7 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state, */ if (!( (frame.frame_type == AMQP_FRAME_METHOD) && ( ((frame.channel == channel) && - ((frame.payload.method.id == expected_reply_id) || + ((amqp_id_in_reply_list(frame.payload.method.id, expected_reply_ids)) || (frame.payload.method.id == AMQP_CHANNEL_CLOSE_METHOD))) || ((frame.channel == 0) && @@ -265,7 +280,7 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state, goto retry; } - result.reply_type = (frame.payload.method.id == expected_reply_id) + result.reply_type = (amqp_id_in_reply_list(frame.payload.method.id, expected_reply_ids)) ? AMQP_RESPONSE_NORMAL : AMQP_RESPONSE_SERVER_EXCEPTION; @@ -374,10 +389,11 @@ amqp_rpc_reply_t amqp_login(amqp_connection_state_t state, .capabilities = {.len = 0, .bytes = NULL}, .insist = 1 }; + amqp_method_number_t replies[] = { AMQP_EXPAND_METHOD( CONNECTION, OPEN_OK ), 0 }; result = amqp_simple_rpc(state, 0, AMQP_CONNECTION_OPEN_METHOD, - AMQP_CONNECTION_OPEN_OK_METHOD, + (amqp_method_number_t *) &replies, &s); if (result.reply_type != AMQP_RESPONSE_NORMAL) { return result; -- cgit v1.2.1