diff options
author | Scott Brooks <scott.brooks@epicadvertising.com> | 2009-10-14 10:42:50 -0400 |
---|---|---|
committer | Scott Brooks <scott.brooks@epicadvertising.com> | 2009-10-14 10:42:50 -0400 |
commit | 3cb254d902a9b226bf95696af3a98839bb7797a4 (patch) | |
tree | 2999d9061d80af32c6fba046549b402418c15483 /librabbitmq/amqp_socket.c | |
parent | 62fcf89ed4e4356033b65a9a3a99f47ff2e9581a (diff) | |
download | rabbitmq-c-github-ask-3cb254d902a9b226bf95696af3a98839bb7797a4.tar.gz |
Updated amqp_simple_rpc to take a 0 terminated array of amqp_method_number_t replies
Updated AMQP_SIMPLE_RPC macro for the new format
Added AMQP_MULTIPLE_RESPONSE_RPC macro to take a 0 terminated array of amqp_method_number_t replies
Added amqp_basic_get function
Added amqp_queue_purge function
Added amqp_get_rpc_reply function to expose amqp_rpc_reply when we are not statically linking
Added amqp_data_in_buffer function to check to see if amqp_simple_wait_frames will hit a blocking read
Diffstat (limited to 'librabbitmq/amqp_socket.c')
-rw-r--r-- | librabbitmq/amqp_socket.c | 24 |
1 files changed, 20 insertions, 4 deletions
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c index d29dcd6..b4b9332 100644 --- a/librabbitmq/amqp_socket.c +++ b/librabbitmq/amqp_socket.c @@ -107,6 +107,12 @@ amqp_boolean_t amqp_frames_enqueued(amqp_connection_state_t state) { return (state->first_queued_frame != NULL); } +// Check to see if we have data in our buffer so we will avoid +// if this returns 1, we will avoid a blocking read in amqp_simple_wait_frame +amqp_boolean_t amqp_data_in_buffer(amqp_connection_state_t state) { + return (state->sock_inbound_offset < state->sock_inbound_limit); +} + static int wait_frame_inner(amqp_connection_state_t state, amqp_frame_t *decoded_frame) { @@ -202,10 +208,19 @@ int amqp_send_method(amqp_connection_state_t state, return amqp_send_frame(state, &frame); } +static int amqp_id_in_reply_list( amqp_method_number_t expected, amqp_method_number_t *list ) +{ + while ( *list != 0 ) { + if ( *list == expected ) return 1; + list++; + } + return 0; +} + amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state, amqp_channel_t channel, amqp_method_number_t request_id, - amqp_method_number_t expected_reply_id, + amqp_method_number_t *expected_reply_ids, void *decoded_request_method) { int status; @@ -241,7 +256,7 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state, */ if (!( (frame.frame_type == AMQP_FRAME_METHOD) && ( ((frame.channel == channel) && - ((frame.payload.method.id == expected_reply_id) || + ((amqp_id_in_reply_list(frame.payload.method.id, expected_reply_ids)) || (frame.payload.method.id == AMQP_CHANNEL_CLOSE_METHOD))) || ((frame.channel == 0) && @@ -265,7 +280,7 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state, goto retry; } - result.reply_type = (frame.payload.method.id == expected_reply_id) + result.reply_type = (amqp_id_in_reply_list(frame.payload.method.id, expected_reply_ids)) ? AMQP_RESPONSE_NORMAL : AMQP_RESPONSE_SERVER_EXCEPTION; @@ -374,10 +389,11 @@ amqp_rpc_reply_t amqp_login(amqp_connection_state_t state, .capabilities = {.len = 0, .bytes = NULL}, .insist = 1 }; + amqp_method_number_t replies[] = { AMQP_EXPAND_METHOD( CONNECTION, OPEN_OK ), 0 }; result = amqp_simple_rpc(state, 0, AMQP_CONNECTION_OPEN_METHOD, - AMQP_CONNECTION_OPEN_OK_METHOD, + (amqp_method_number_t *) &replies, &s); if (result.reply_type != AMQP_RESPONSE_NORMAL) { return result; |