diff options
author | Tony Garnock-Jones <tonyg@lshift.net> | 2009-10-19 15:18:29 +0100 |
---|---|---|
committer | Tony Garnock-Jones <tonyg@lshift.net> | 2009-10-19 15:18:29 +0100 |
commit | cc25f06e3f18831f3d3b8e8e01bde7970ccd6969 (patch) | |
tree | dbfb143414faf6293a640fd664e07cba4ee23b99 /librabbitmq | |
parent | 37c4ae97ae24892ce8f9fef815bb029721ae262f (diff) | |
parent | 4f7fdf5dd5f2749809f71fc3c556fc23f8832f10 (diff) | |
download | rabbitmq-c-github-ask-cc25f06e3f18831f3d3b8e8e01bde7970ccd6969.tar.gz |
Merge default into amqp_0_9_1
Diffstat (limited to 'librabbitmq')
-rw-r--r-- | librabbitmq/amqp.h | 43 | ||||
-rw-r--r-- | librabbitmq/amqp_api.c | 33 | ||||
-rw-r--r-- | librabbitmq/amqp_socket.c | 30 |
3 files changed, 97 insertions, 9 deletions
diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h index 51bf0ae..4c27f4a 100644 --- a/librabbitmq/amqp.h +++ b/librabbitmq/amqp.h @@ -191,18 +191,31 @@ extern int amqp_send_method(amqp_connection_state_t state, extern amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state, amqp_channel_t channel, amqp_method_number_t request_id, - amqp_method_number_t expected_reply_id, + amqp_method_number_t *expected_reply_ids, void *decoded_request_method); +#define AMQP_EXPAND_METHOD(classname, methodname) (AMQP_ ## classname ## _ ## methodname ## _METHOD) + #define AMQP_SIMPLE_RPC(state, channel, classname, requestname, replyname, structname, ...) \ ({ \ structname _simple_rpc_request___ = (structname) { __VA_ARGS__ }; \ + amqp_method_number_t _replies__[2] = { AMQP_EXPAND_METHOD(classname, replyname), 0}; \ amqp_simple_rpc(state, channel, \ - AMQP_ ## classname ## _ ## requestname ## _METHOD, \ - AMQP_ ## classname ## _ ## replyname ## _METHOD, \ + AMQP_EXPAND_METHOD(classname, requestname), \ + (amqp_method_number_t *)&_replies__, \ &_simple_rpc_request___); \ }) +#define AMQP_MULTIPLE_RESPONSE_RPC(state, channel, classname, requestname, replynames, structname, ...) \ + ({ \ + structname _simple_rpc_request___ = (structname) { __VA_ARGS__ }; \ + amqp_simple_rpc(state, channel, \ + AMQP_EXPAND_METHOD(classname, requestname), \ + replynames, \ + &_simple_rpc_request___); \ + }) + + extern amqp_rpc_reply_t amqp_login(amqp_connection_state_t state, char const *vhost, int channel_max, @@ -277,6 +290,30 @@ extern int amqp_basic_ack(amqp_connection_state_t state, uint64_t delivery_tag, amqp_boolean_t multiple); +extern amqp_rpc_reply_t amqp_basic_get(amqp_connection_state_t state, + amqp_channel_t channel, + amqp_bytes_t queue, + amqp_boolean_t no_ack); + +extern struct amqp_queue_purge_ok_t_ *amqp_queue_purge(amqp_connection_state_t state, + amqp_channel_t channel, + amqp_bytes_t queue, + amqp_boolean_t no_wait); + +/* + * Can be used to see if there is data still in the buffer, if so + * calling amqp_simple_wait_frame will not immediately enter a + * blocking read. + * + * Possibly amqp_frames_enqueued should be used for this? + */ +extern amqp_boolean_t amqp_data_in_buffer(amqp_connection_state_t state); + +/* + * Expose amqp_rpc_reply to libraries. + */ +extern amqp_rpc_reply_t amqp_get_rpc_reply(void); + #ifdef __cplusplus } #endif diff --git a/librabbitmq/amqp_api.c b/librabbitmq/amqp_api.c index a3c8839..8adf3f0 100644 --- a/librabbitmq/amqp_api.c +++ b/librabbitmq/amqp_api.c @@ -198,3 +198,36 @@ int amqp_basic_ack(amqp_connection_state_t state, AMQP_CHECK_RESULT(amqp_send_method(state, channel, AMQP_BASIC_ACK_METHOD, &m)); return 0; } + +amqp_queue_purge_ok_t *amqp_queue_purge(amqp_connection_state_t state, + amqp_channel_t channel, + amqp_bytes_t queue, + amqp_boolean_t no_wait) +{ + amqp_rpc_reply = AMQP_SIMPLE_RPC(state, channel, QUEUE, PURGE, PURGE_OK, + amqp_queue_purge_t, channel, queue, no_wait); + return RPC_REPLY(amqp_queue_purge_ok_t); +} + +amqp_rpc_reply_t amqp_basic_get(amqp_connection_state_t state, + amqp_channel_t channel, + amqp_bytes_t queue, + amqp_boolean_t no_ack) +{ + amqp_method_number_t replies[] = { AMQP_BASIC_GET_OK_METHOD, + AMQP_BASIC_GET_EMPTY_METHOD, + 0 }; + amqp_rpc_reply = + AMQP_MULTIPLE_RESPONSE_RPC(state, channel, BASIC, GET, replies, + amqp_basic_get_t, + channel, queue, no_ack); + return amqp_rpc_reply; +} + +/* + * Expose amqp_rpc_reply to dynamically linked libraries + */ +amqp_rpc_reply_t amqp_get_rpc_reply(void) +{ + return amqp_rpc_reply; +} diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c index 78cb8ba..c6412a9 100644 --- a/librabbitmq/amqp_socket.c +++ b/librabbitmq/amqp_socket.c @@ -81,7 +81,7 @@ static amqp_bytes_t sasl_method_name(amqp_sasl_method_enum method) { default: amqp_assert(0, "Invalid SASL method: %d", (int) method); } - abort(); // unreachable + abort(); /* unreachable */ } static amqp_bytes_t sasl_response(amqp_pool_t *pool, @@ -114,13 +114,21 @@ amqp_boolean_t amqp_frames_enqueued(amqp_connection_state_t state) { return (state->first_queued_frame != NULL); } +/* + * Check to see if we have data in our buffer. If this returns 1, we + * will avoid an immediate blocking read in amqp_simple_wait_frame. + */ +amqp_boolean_t amqp_data_in_buffer(amqp_connection_state_t state) { + return (state->sock_inbound_offset < state->sock_inbound_limit); +} + static int wait_frame_inner(amqp_connection_state_t state, amqp_frame_t *decoded_frame) { while (1) { int result; - while (state->sock_inbound_offset < state->sock_inbound_limit) { + while (amqp_data_in_buffer(state)) { amqp_bytes_t buffer; buffer.len = state->sock_inbound_limit - state->sock_inbound_offset; buffer.bytes = ((char *) state->sock_inbound_buffer.bytes) + state->sock_inbound_offset; @@ -209,10 +217,19 @@ int amqp_send_method(amqp_connection_state_t state, return amqp_send_frame(state, &frame); } +static int amqp_id_in_reply_list( amqp_method_number_t expected, amqp_method_number_t *list ) +{ + while ( *list != 0 ) { + if ( *list == expected ) return 1; + list++; + } + return 0; +} + amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state, amqp_channel_t channel, amqp_method_number_t request_id, - amqp_method_number_t expected_reply_id, + amqp_method_number_t *expected_reply_ids, void *decoded_request_method) { int status; @@ -248,7 +265,7 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state, */ if (!( (frame.frame_type == AMQP_FRAME_METHOD) && ( ((frame.channel == channel) && - ((frame.payload.method.id == expected_reply_id) || + ((amqp_id_in_reply_list(frame.payload.method.id, expected_reply_ids)) || (frame.payload.method.id == AMQP_CHANNEL_CLOSE_METHOD))) || ((frame.channel == 0) && @@ -272,7 +289,7 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state, goto retry; } - result.reply_type = (frame.payload.method.id == expected_reply_id) + result.reply_type = (amqp_id_in_reply_list(frame.payload.method.id, expected_reply_ids)) ? AMQP_RESPONSE_NORMAL : AMQP_RESPONSE_SERVER_EXCEPTION; @@ -381,10 +398,11 @@ amqp_rpc_reply_t amqp_login(amqp_connection_state_t state, .deprecated_capabilities = {.len = 0, .bytes = NULL}, .deprecated_insist = 1 }; + amqp_method_number_t replies[] = { AMQP_CONNECTION_OPEN_OK_METHOD, 0 }; result = amqp_simple_rpc(state, 0, AMQP_CONNECTION_OPEN_METHOD, - AMQP_CONNECTION_OPEN_OK_METHOD, + (amqp_method_number_t *) &replies, &s); if (result.reply_type != AMQP_RESPONSE_NORMAL) { return result; |