summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTony Garnock-Jones <tonyg@lshift.net>2009-10-19 15:18:29 +0100
committerTony Garnock-Jones <tonyg@lshift.net>2009-10-19 15:18:29 +0100
commitcc25f06e3f18831f3d3b8e8e01bde7970ccd6969 (patch)
treedbfb143414faf6293a640fd664e07cba4ee23b99
parent37c4ae97ae24892ce8f9fef815bb029721ae262f (diff)
parent4f7fdf5dd5f2749809f71fc3c556fc23f8832f10 (diff)
downloadrabbitmq-c-github-ask-cc25f06e3f18831f3d3b8e8e01bde7970ccd6969.tar.gz
Merge default into amqp_0_9_1
-rw-r--r--THANKS4
-rw-r--r--librabbitmq/amqp.h43
-rw-r--r--librabbitmq/amqp_api.c33
-rw-r--r--librabbitmq/amqp_socket.c30
4 files changed, 101 insertions, 9 deletions
diff --git a/THANKS b/THANKS
new file mode 100644
index 0000000..996c140
--- /dev/null
+++ b/THANKS
@@ -0,0 +1,4 @@
+Thank-you to the following people for their contributions to the
+codebase:
+
+ - Scott Brooks / Epic Advertising <scott.brooks@epicadvertising.com>
diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h
index 51bf0ae..4c27f4a 100644
--- a/librabbitmq/amqp.h
+++ b/librabbitmq/amqp.h
@@ -191,18 +191,31 @@ extern int amqp_send_method(amqp_connection_state_t state,
extern amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state,
amqp_channel_t channel,
amqp_method_number_t request_id,
- amqp_method_number_t expected_reply_id,
+ amqp_method_number_t *expected_reply_ids,
void *decoded_request_method);
+#define AMQP_EXPAND_METHOD(classname, methodname) (AMQP_ ## classname ## _ ## methodname ## _METHOD)
+
#define AMQP_SIMPLE_RPC(state, channel, classname, requestname, replyname, structname, ...) \
({ \
structname _simple_rpc_request___ = (structname) { __VA_ARGS__ }; \
+ amqp_method_number_t _replies__[2] = { AMQP_EXPAND_METHOD(classname, replyname), 0}; \
amqp_simple_rpc(state, channel, \
- AMQP_ ## classname ## _ ## requestname ## _METHOD, \
- AMQP_ ## classname ## _ ## replyname ## _METHOD, \
+ AMQP_EXPAND_METHOD(classname, requestname), \
+ (amqp_method_number_t *)&_replies__, \
&_simple_rpc_request___); \
})
+#define AMQP_MULTIPLE_RESPONSE_RPC(state, channel, classname, requestname, replynames, structname, ...) \
+ ({ \
+ structname _simple_rpc_request___ = (structname) { __VA_ARGS__ }; \
+ amqp_simple_rpc(state, channel, \
+ AMQP_EXPAND_METHOD(classname, requestname), \
+ replynames, \
+ &_simple_rpc_request___); \
+ })
+
+
extern amqp_rpc_reply_t amqp_login(amqp_connection_state_t state,
char const *vhost,
int channel_max,
@@ -277,6 +290,30 @@ extern int amqp_basic_ack(amqp_connection_state_t state,
uint64_t delivery_tag,
amqp_boolean_t multiple);
+extern amqp_rpc_reply_t amqp_basic_get(amqp_connection_state_t state,
+ amqp_channel_t channel,
+ amqp_bytes_t queue,
+ amqp_boolean_t no_ack);
+
+extern struct amqp_queue_purge_ok_t_ *amqp_queue_purge(amqp_connection_state_t state,
+ amqp_channel_t channel,
+ amqp_bytes_t queue,
+ amqp_boolean_t no_wait);
+
+/*
+ * Can be used to see if there is data still in the buffer, if so
+ * calling amqp_simple_wait_frame will not immediately enter a
+ * blocking read.
+ *
+ * Possibly amqp_frames_enqueued should be used for this?
+ */
+extern amqp_boolean_t amqp_data_in_buffer(amqp_connection_state_t state);
+
+/*
+ * Expose amqp_rpc_reply to libraries.
+ */
+extern amqp_rpc_reply_t amqp_get_rpc_reply(void);
+
#ifdef __cplusplus
}
#endif
diff --git a/librabbitmq/amqp_api.c b/librabbitmq/amqp_api.c
index a3c8839..8adf3f0 100644
--- a/librabbitmq/amqp_api.c
+++ b/librabbitmq/amqp_api.c
@@ -198,3 +198,36 @@ int amqp_basic_ack(amqp_connection_state_t state,
AMQP_CHECK_RESULT(amqp_send_method(state, channel, AMQP_BASIC_ACK_METHOD, &m));
return 0;
}
+
+amqp_queue_purge_ok_t *amqp_queue_purge(amqp_connection_state_t state,
+ amqp_channel_t channel,
+ amqp_bytes_t queue,
+ amqp_boolean_t no_wait)
+{
+ amqp_rpc_reply = AMQP_SIMPLE_RPC(state, channel, QUEUE, PURGE, PURGE_OK,
+ amqp_queue_purge_t, channel, queue, no_wait);
+ return RPC_REPLY(amqp_queue_purge_ok_t);
+}
+
+amqp_rpc_reply_t amqp_basic_get(amqp_connection_state_t state,
+ amqp_channel_t channel,
+ amqp_bytes_t queue,
+ amqp_boolean_t no_ack)
+{
+ amqp_method_number_t replies[] = { AMQP_BASIC_GET_OK_METHOD,
+ AMQP_BASIC_GET_EMPTY_METHOD,
+ 0 };
+ amqp_rpc_reply =
+ AMQP_MULTIPLE_RESPONSE_RPC(state, channel, BASIC, GET, replies,
+ amqp_basic_get_t,
+ channel, queue, no_ack);
+ return amqp_rpc_reply;
+}
+
+/*
+ * Expose amqp_rpc_reply to dynamically linked libraries
+ */
+amqp_rpc_reply_t amqp_get_rpc_reply(void)
+{
+ return amqp_rpc_reply;
+}
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index 78cb8ba..c6412a9 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -81,7 +81,7 @@ static amqp_bytes_t sasl_method_name(amqp_sasl_method_enum method) {
default:
amqp_assert(0, "Invalid SASL method: %d", (int) method);
}
- abort(); // unreachable
+ abort(); /* unreachable */
}
static amqp_bytes_t sasl_response(amqp_pool_t *pool,
@@ -114,13 +114,21 @@ amqp_boolean_t amqp_frames_enqueued(amqp_connection_state_t state) {
return (state->first_queued_frame != NULL);
}
+/*
+ * Check to see if we have data in our buffer. If this returns 1, we
+ * will avoid an immediate blocking read in amqp_simple_wait_frame.
+ */
+amqp_boolean_t amqp_data_in_buffer(amqp_connection_state_t state) {
+ return (state->sock_inbound_offset < state->sock_inbound_limit);
+}
+
static int wait_frame_inner(amqp_connection_state_t state,
amqp_frame_t *decoded_frame)
{
while (1) {
int result;
- while (state->sock_inbound_offset < state->sock_inbound_limit) {
+ while (amqp_data_in_buffer(state)) {
amqp_bytes_t buffer;
buffer.len = state->sock_inbound_limit - state->sock_inbound_offset;
buffer.bytes = ((char *) state->sock_inbound_buffer.bytes) + state->sock_inbound_offset;
@@ -209,10 +217,19 @@ int amqp_send_method(amqp_connection_state_t state,
return amqp_send_frame(state, &frame);
}
+static int amqp_id_in_reply_list( amqp_method_number_t expected, amqp_method_number_t *list )
+{
+ while ( *list != 0 ) {
+ if ( *list == expected ) return 1;
+ list++;
+ }
+ return 0;
+}
+
amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state,
amqp_channel_t channel,
amqp_method_number_t request_id,
- amqp_method_number_t expected_reply_id,
+ amqp_method_number_t *expected_reply_ids,
void *decoded_request_method)
{
int status;
@@ -248,7 +265,7 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state,
*/
if (!( (frame.frame_type == AMQP_FRAME_METHOD) &&
( ((frame.channel == channel) &&
- ((frame.payload.method.id == expected_reply_id) ||
+ ((amqp_id_in_reply_list(frame.payload.method.id, expected_reply_ids)) ||
(frame.payload.method.id == AMQP_CHANNEL_CLOSE_METHOD)))
||
((frame.channel == 0) &&
@@ -272,7 +289,7 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state,
goto retry;
}
- result.reply_type = (frame.payload.method.id == expected_reply_id)
+ result.reply_type = (amqp_id_in_reply_list(frame.payload.method.id, expected_reply_ids))
? AMQP_RESPONSE_NORMAL
: AMQP_RESPONSE_SERVER_EXCEPTION;
@@ -381,10 +398,11 @@ amqp_rpc_reply_t amqp_login(amqp_connection_state_t state,
.deprecated_capabilities = {.len = 0, .bytes = NULL},
.deprecated_insist = 1
};
+ amqp_method_number_t replies[] = { AMQP_CONNECTION_OPEN_OK_METHOD, 0 };
result = amqp_simple_rpc(state,
0,
AMQP_CONNECTION_OPEN_METHOD,
- AMQP_CONNECTION_OPEN_OK_METHOD,
+ (amqp_method_number_t *) &replies,
&s);
if (result.reply_type != AMQP_RESPONSE_NORMAL) {
return result;