summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorScott Brooks <scott.brooks@epicadvertising.com>2009-10-14 10:42:50 -0400
committerScott Brooks <scott.brooks@epicadvertising.com>2009-10-14 10:42:50 -0400
commit3cb254d902a9b226bf95696af3a98839bb7797a4 (patch)
tree2999d9061d80af32c6fba046549b402418c15483
parent62fcf89ed4e4356033b65a9a3a99f47ff2e9581a (diff)
downloadrabbitmq-c-github-ask-3cb254d902a9b226bf95696af3a98839bb7797a4.tar.gz
Updated amqp_simple_rpc to take a 0 terminated array of amqp_method_number_t replies
Updated AMQP_SIMPLE_RPC macro for the new format Added AMQP_MULTIPLE_RESPONSE_RPC macro to take a 0 terminated array of amqp_method_number_t replies Added amqp_basic_get function Added amqp_queue_purge function Added amqp_get_rpc_reply function to expose amqp_rpc_reply when we are not statically linking Added amqp_data_in_buffer function to check to see if amqp_simple_wait_frames will hit a blocking read
-rw-r--r--librabbitmq/amqp.h35
-rw-r--r--librabbitmq/amqp_api.c29
-rw-r--r--librabbitmq/amqp_socket.c24
3 files changed, 81 insertions, 7 deletions
diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h
index 4c135b1..71d77de 100644
--- a/librabbitmq/amqp.h
+++ b/librabbitmq/amqp.h
@@ -191,18 +191,31 @@ extern int amqp_send_method(amqp_connection_state_t state,
extern amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state,
amqp_channel_t channel,
amqp_method_number_t request_id,
- amqp_method_number_t expected_reply_id,
+ amqp_method_number_t *expected_reply_ids,
void *decoded_request_method);
+#define AMQP_EXPAND_METHOD(classname, methodname) (AMQP_ ## classname ## _ ## methodname ## _METHOD)
+
#define AMQP_SIMPLE_RPC(state, channel, classname, requestname, replyname, structname, ...) \
({ \
structname _simple_rpc_request___ = (structname) { __VA_ARGS__ }; \
+ amqp_method_number_t _replies__[2] = { AMQP_EXPAND_METHOD(classname, replyname), 0}; \
+ amqp_simple_rpc(state, channel, \
+ AMQP_EXPAND_METHOD(classname, requestname), \
+ (amqp_method_number_t *)&_replies__, \
+ &_simple_rpc_request___); \
+ })
+
+#define AMQP_MULTIPLE_RESPONSE_RPC(state, channel, classname, requestname, replynames, structname, ...) \
+ ({ \
+ structname _simple_rpc_request___ = (structname) { __VA_ARGS__ }; \
amqp_simple_rpc(state, channel, \
- AMQP_ ## classname ## _ ## requestname ## _METHOD, \
- AMQP_ ## classname ## _ ## replyname ## _METHOD, \
+ AMQP_EXPAND_METHOD(classname, requestname), \
+ replynames, \
&_simple_rpc_request___); \
})
+
extern amqp_rpc_reply_t amqp_login(amqp_connection_state_t state,
char const *vhost,
int channel_max,
@@ -276,6 +289,22 @@ extern int amqp_basic_ack(amqp_connection_state_t state,
uint64_t delivery_tag,
amqp_boolean_t multiple);
+extern amqp_rpc_reply_t amqp_basic_get(amqp_connection_state_t state,
+ amqp_channel_t channel,
+ amqp_bytes_t queue,
+ amqp_boolean_t no_ack);
+
+extern struct amqp_queue_purge_ok_t_ *amqp_queue_purge(amqp_connection_state_t state,
+ amqp_channel_t channel,
+ amqp_bytes_t queue,
+ amqp_boolean_t no_wait);
+
+// Can be used to see if there is data still in the buffer, if so
+// calling amqp_simple_wait_frame will not enter the blocking read
+// possibly amqp_frames_enqueued should be used for this?
+extern amqp_boolean_t amqp_data_in_buffer(amqp_connection_state_t state);
+// Expose amqp_rpc_reply to libraries
+extern amqp_rpc_reply_t amqp_get_rpc_reply();
#ifdef __cplusplus
}
#endif
diff --git a/librabbitmq/amqp_api.c b/librabbitmq/amqp_api.c
index a52d947..2b887c7 100644
--- a/librabbitmq/amqp_api.c
+++ b/librabbitmq/amqp_api.c
@@ -197,3 +197,32 @@ int amqp_basic_ack(amqp_connection_state_t state,
AMQP_CHECK_RESULT(amqp_send_method(state, channel, AMQP_BASIC_ACK_METHOD, &m));
return 0;
}
+
+amqp_queue_purge_ok_t *amqp_queue_purge(amqp_connection_state_t state,
+ amqp_channel_t channel,
+ amqp_bytes_t queue,
+ amqp_boolean_t no_wait)
+{
+ amqp_rpc_reply = AMQP_SIMPLE_RPC(state, channel, QUEUE, PURGE, PURGE_OK,
+ amqp_queue_purge_t, channel, queue, no_wait);
+ return RPC_REPLY(amqp_queue_purge_ok_t);
+}
+
+amqp_rpc_reply_t amqp_basic_get(amqp_connection_state_t state,
+ amqp_channel_t channel,
+ amqp_bytes_t queue,
+ amqp_boolean_t no_ack)
+{
+ amqp_method_number_t replies[] = { AMQP_EXPAND_METHOD(BASIC, GET_OK), AMQP_EXPAND_METHOD(BASIC, GET_EMPTY), 0};
+ amqp_rpc_reply =
+ AMQP_MULTIPLE_RESPONSE_RPC(state, channel, BASIC, GET, replies,
+ amqp_basic_get_t,
+ channel, queue, no_ack);
+ return amqp_rpc_reply;
+}
+
+// Expose amqp_rpc_reply to dynamically linked libraries
+amqp_rpc_reply_t amqp_get_rpc_reply()
+{
+ return amqp_rpc_reply;
+}
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index d29dcd6..b4b9332 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -107,6 +107,12 @@ amqp_boolean_t amqp_frames_enqueued(amqp_connection_state_t state) {
return (state->first_queued_frame != NULL);
}
+// Check to see if we have data in our buffer so we will avoid
+// if this returns 1, we will avoid a blocking read in amqp_simple_wait_frame
+amqp_boolean_t amqp_data_in_buffer(amqp_connection_state_t state) {
+ return (state->sock_inbound_offset < state->sock_inbound_limit);
+}
+
static int wait_frame_inner(amqp_connection_state_t state,
amqp_frame_t *decoded_frame)
{
@@ -202,10 +208,19 @@ int amqp_send_method(amqp_connection_state_t state,
return amqp_send_frame(state, &frame);
}
+static int amqp_id_in_reply_list( amqp_method_number_t expected, amqp_method_number_t *list )
+{
+ while ( *list != 0 ) {
+ if ( *list == expected ) return 1;
+ list++;
+ }
+ return 0;
+}
+
amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state,
amqp_channel_t channel,
amqp_method_number_t request_id,
- amqp_method_number_t expected_reply_id,
+ amqp_method_number_t *expected_reply_ids,
void *decoded_request_method)
{
int status;
@@ -241,7 +256,7 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state,
*/
if (!( (frame.frame_type == AMQP_FRAME_METHOD) &&
( ((frame.channel == channel) &&
- ((frame.payload.method.id == expected_reply_id) ||
+ ((amqp_id_in_reply_list(frame.payload.method.id, expected_reply_ids)) ||
(frame.payload.method.id == AMQP_CHANNEL_CLOSE_METHOD)))
||
((frame.channel == 0) &&
@@ -265,7 +280,7 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state,
goto retry;
}
- result.reply_type = (frame.payload.method.id == expected_reply_id)
+ result.reply_type = (amqp_id_in_reply_list(frame.payload.method.id, expected_reply_ids))
? AMQP_RESPONSE_NORMAL
: AMQP_RESPONSE_SERVER_EXCEPTION;
@@ -374,10 +389,11 @@ amqp_rpc_reply_t amqp_login(amqp_connection_state_t state,
.capabilities = {.len = 0, .bytes = NULL},
.insist = 1
};
+ amqp_method_number_t replies[] = { AMQP_EXPAND_METHOD( CONNECTION, OPEN_OK ), 0 };
result = amqp_simple_rpc(state,
0,
AMQP_CONNECTION_OPEN_METHOD,
- AMQP_CONNECTION_OPEN_OK_METHOD,
+ (amqp_method_number_t *) &replies,
&s);
if (result.reply_type != AMQP_RESPONSE_NORMAL) {
return result;