summaryrefslogtreecommitdiff
path: root/librabbitmq/amqp_socket.c
diff options
context:
space:
mode:
authorScott Brooks <scott.brooks@epicadvertising.com>2009-10-14 10:42:50 -0400
committerScott Brooks <scott.brooks@epicadvertising.com>2009-10-14 10:42:50 -0400
commit3cb254d902a9b226bf95696af3a98839bb7797a4 (patch)
tree2999d9061d80af32c6fba046549b402418c15483 /librabbitmq/amqp_socket.c
parent62fcf89ed4e4356033b65a9a3a99f47ff2e9581a (diff)
downloadrabbitmq-c-github-ask-3cb254d902a9b226bf95696af3a98839bb7797a4.tar.gz
Updated amqp_simple_rpc to take a 0 terminated array of amqp_method_number_t replies
Updated AMQP_SIMPLE_RPC macro for the new format Added AMQP_MULTIPLE_RESPONSE_RPC macro to take a 0 terminated array of amqp_method_number_t replies Added amqp_basic_get function Added amqp_queue_purge function Added amqp_get_rpc_reply function to expose amqp_rpc_reply when we are not statically linking Added amqp_data_in_buffer function to check to see if amqp_simple_wait_frames will hit a blocking read
Diffstat (limited to 'librabbitmq/amqp_socket.c')
-rw-r--r--librabbitmq/amqp_socket.c24
1 files changed, 20 insertions, 4 deletions
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index d29dcd6..b4b9332 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -107,6 +107,12 @@ amqp_boolean_t amqp_frames_enqueued(amqp_connection_state_t state) {
return (state->first_queued_frame != NULL);
}
+// Check to see if we have data in our buffer so we will avoid
+// if this returns 1, we will avoid a blocking read in amqp_simple_wait_frame
+amqp_boolean_t amqp_data_in_buffer(amqp_connection_state_t state) {
+ return (state->sock_inbound_offset < state->sock_inbound_limit);
+}
+
static int wait_frame_inner(amqp_connection_state_t state,
amqp_frame_t *decoded_frame)
{
@@ -202,10 +208,19 @@ int amqp_send_method(amqp_connection_state_t state,
return amqp_send_frame(state, &frame);
}
+static int amqp_id_in_reply_list( amqp_method_number_t expected, amqp_method_number_t *list )
+{
+ while ( *list != 0 ) {
+ if ( *list == expected ) return 1;
+ list++;
+ }
+ return 0;
+}
+
amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state,
amqp_channel_t channel,
amqp_method_number_t request_id,
- amqp_method_number_t expected_reply_id,
+ amqp_method_number_t *expected_reply_ids,
void *decoded_request_method)
{
int status;
@@ -241,7 +256,7 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state,
*/
if (!( (frame.frame_type == AMQP_FRAME_METHOD) &&
( ((frame.channel == channel) &&
- ((frame.payload.method.id == expected_reply_id) ||
+ ((amqp_id_in_reply_list(frame.payload.method.id, expected_reply_ids)) ||
(frame.payload.method.id == AMQP_CHANNEL_CLOSE_METHOD)))
||
((frame.channel == 0) &&
@@ -265,7 +280,7 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state,
goto retry;
}
- result.reply_type = (frame.payload.method.id == expected_reply_id)
+ result.reply_type = (amqp_id_in_reply_list(frame.payload.method.id, expected_reply_ids))
? AMQP_RESPONSE_NORMAL
: AMQP_RESPONSE_SERVER_EXCEPTION;
@@ -374,10 +389,11 @@ amqp_rpc_reply_t amqp_login(amqp_connection_state_t state,
.capabilities = {.len = 0, .bytes = NULL},
.insist = 1
};
+ amqp_method_number_t replies[] = { AMQP_EXPAND_METHOD( CONNECTION, OPEN_OK ), 0 };
result = amqp_simple_rpc(state,
0,
AMQP_CONNECTION_OPEN_METHOD,
- AMQP_CONNECTION_OPEN_OK_METHOD,
+ (amqp_method_number_t *) &replies,
&s);
if (result.reply_type != AMQP_RESPONSE_NORMAL) {
return result;