diff options
Diffstat (limited to 'librabbitmq/amqp_api.c')
-rw-r--r-- | librabbitmq/amqp_api.c | 303 |
1 files changed, 220 insertions, 83 deletions
diff --git a/librabbitmq/amqp_api.c b/librabbitmq/amqp_api.c index b2793ff..bf19761 100644 --- a/librabbitmq/amqp_api.c +++ b/librabbitmq/amqp_api.c @@ -52,6 +52,7 @@ #include <stdio.h> #include <string.h> #include <stdint.h> +#include <stdarg.h> #include "amqp.h" #include "amqp_framing.h" @@ -69,6 +70,12 @@ static const char *client_error_strings[ERROR_MAX] = { "connection closed unexpectedly", /* ERROR_CONNECTION_CLOSED */ }; +/* strdup is not in ISO C90! */ +static inline char *strdup(const char *str) +{ + return strcpy(malloc(strlen(str) + 1),str); +} + char *amqp_error_string(int err) { const char *str; @@ -93,6 +100,20 @@ char *amqp_error_string(int err) return strdup(str); } +void amqp_abort(const char *fmt, ...) +{ + va_list ap; + va_start(ap, fmt); + vfprintf(stderr, fmt, ap); + va_end(ap); + fputc('\n', stderr); + abort(); +} + +const amqp_bytes_t amqp_empty_bytes = { 0, NULL }; +const amqp_table_t amqp_empty_table = { 0, NULL }; +const amqp_array_t amqp_empty_array = { 0, NULL }; + #define RPC_REPLY(replytype) \ (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL \ ? (replytype *) state->most_recent_api_result.reply.decoded \ @@ -101,11 +122,18 @@ char *amqp_error_string(int err) amqp_channel_open_ok_t *amqp_channel_open(amqp_connection_state_t state, amqp_channel_t channel) { - state->most_recent_api_result = - AMQP_SIMPLE_RPC(state, channel, CHANNEL, OPEN, OPEN_OK, - amqp_channel_open_t, - AMQP_EMPTY_BYTES); - return RPC_REPLY(amqp_channel_open_ok_t); + amqp_method_number_t replies[2] = { AMQP_CHANNEL_OPEN_OK_METHOD, 0}; + amqp_channel_open_t req; + req.out_of_band.bytes = NULL; + req.out_of_band.len = 0; + + state->most_recent_api_result = amqp_simple_rpc(state, channel, + AMQP_CHANNEL_OPEN_METHOD, + replies, &req); + if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) + return state->most_recent_api_result.reply.decoded; + else + return NULL; } int amqp_basic_publish(amqp_connection_state_t state, @@ -120,18 +148,19 @@ int amqp_basic_publish(amqp_connection_state_t state, amqp_frame_t f; size_t body_offset; size_t usable_body_payload_size = state->frame_max - (HEADER_SIZE + FOOTER_SIZE); + int res; - amqp_basic_publish_t m = - (amqp_basic_publish_t) { - .exchange = exchange, - .routing_key = routing_key, - .mandatory = mandatory, - .immediate = immediate - }; - + amqp_basic_publish_t m; amqp_basic_properties_t default_properties; - AMQP_CHECK_RESULT(amqp_send_method(state, channel, AMQP_BASIC_PUBLISH_METHOD, &m)); + m.exchange = exchange; + m.routing_key = routing_key; + m.mandatory = mandatory; + m.immediate = immediate; + + res = amqp_send_method(state, channel, AMQP_BASIC_PUBLISH_METHOD, &m); + if (res < 0) + return res; if (properties == NULL) { memset(&default_properties, 0, sizeof(default_properties)); @@ -143,7 +172,10 @@ int amqp_basic_publish(amqp_connection_state_t state, f.payload.properties.class_id = AMQP_BASIC_CLASS; f.payload.properties.body_size = body.len; f.payload.properties.decoded = (void *) properties; - AMQP_CHECK_RESULT(amqp_send_frame(state, &f)); + + res = amqp_send_frame(state, &f); + if (res < 0) + return res; body_offset = 0; while (1) { @@ -155,7 +187,7 @@ int amqp_basic_publish(amqp_connection_state_t state, f.frame_type = AMQP_FRAME_BODY; f.channel = channel; - f.payload.body_fragment.bytes = BUF_AT(body, body_offset); + f.payload.body_fragment.bytes = amqp_offset(body.bytes, body_offset); if (remaining >= usable_body_payload_size) { f.payload.body_fragment.len = usable_body_payload_size; } else { @@ -163,7 +195,9 @@ int amqp_basic_publish(amqp_connection_state_t state, } body_offset += f.payload.body_fragment.len; - AMQP_CHECK_RESULT(amqp_send_frame(state, &f)); + res = amqp_send_frame(state, &f); + if (res < 0) + return res; } return 0; @@ -174,20 +208,34 @@ amqp_rpc_reply_t amqp_channel_close(amqp_connection_state_t state, int code) { char codestr[13]; - snprintf(codestr, sizeof(codestr), "%d", code); - return AMQP_SIMPLE_RPC(state, channel, CHANNEL, CLOSE, CLOSE_OK, - amqp_channel_close_t, - code, amqp_cstring_bytes(codestr), 0, 0); + amqp_method_number_t replies[2] = { AMQP_CHANNEL_CLOSE_OK_METHOD, 0}; + amqp_channel_close_t req; + + req.reply_code = code; + req.reply_text.bytes = codestr; + req.reply_text.len = sprintf(codestr, "%d", code); + req.class_id = 0; + req.method_id = 0; + + return amqp_simple_rpc(state, channel, AMQP_CHANNEL_CLOSE_METHOD, + replies, &req); } amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state, int code) { char codestr[13]; - snprintf(codestr, sizeof(codestr), "%d", code); - return AMQP_SIMPLE_RPC(state, 0, CONNECTION, CLOSE, CLOSE_OK, - amqp_connection_close_t, - code, amqp_cstring_bytes(codestr), 0, 0); + amqp_method_number_t replies[2] = { AMQP_CONNECTION_CLOSE_OK_METHOD, 0}; + amqp_channel_close_t req; + + req.reply_code = code; + req.reply_text.bytes = codestr; + req.reply_text.len = sprintf(codestr, "%d", code); + req.class_id = 0; + req.method_id = 0; + + return amqp_simple_rpc(state, 0, AMQP_CONNECTION_CLOSE_METHOD, + replies, &req); } amqp_exchange_declare_ok_t *amqp_exchange_declare(amqp_connection_state_t state, @@ -198,11 +246,24 @@ amqp_exchange_declare_ok_t *amqp_exchange_declare(amqp_connection_state_t state, amqp_boolean_t durable, amqp_table_t arguments) { - state->most_recent_api_result = - AMQP_SIMPLE_RPC(state, channel, EXCHANGE, DECLARE, DECLARE_OK, - amqp_exchange_declare_t, - 0, exchange, type, passive, durable, 0, 0, 0, arguments); - return RPC_REPLY(amqp_exchange_declare_ok_t); + amqp_method_number_t replies[2] = { AMQP_EXCHANGE_DECLARE_OK_METHOD, 0}; + amqp_exchange_declare_t req; + req.exchange = exchange; + req.type = type; + req.passive = passive; + req.durable = durable; + req.auto_delete = 0; + req.internal = 0; + req.nowait = 0; + req.arguments = arguments; + + state->most_recent_api_result = amqp_simple_rpc(state, channel, + AMQP_EXCHANGE_DECLARE_METHOD, + replies, &req); + if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) + return state->most_recent_api_result.reply.decoded; + else + return NULL; } amqp_queue_declare_ok_t *amqp_queue_declare(amqp_connection_state_t state, @@ -214,11 +275,23 @@ amqp_queue_declare_ok_t *amqp_queue_declare(amqp_connection_state_t state, amqp_boolean_t auto_delete, amqp_table_t arguments) { - state->most_recent_api_result = - AMQP_SIMPLE_RPC(state, channel, QUEUE, DECLARE, DECLARE_OK, - amqp_queue_declare_t, - 0, queue, passive, durable, exclusive, auto_delete, 0, arguments); - return RPC_REPLY(amqp_queue_declare_ok_t); + amqp_method_number_t replies[2] = { AMQP_QUEUE_DECLARE_OK_METHOD, 0}; + amqp_queue_declare_t req; + req.queue = queue; + req.passive = passive; + req.durable = durable; + req.exclusive = exclusive; + req.auto_delete = auto_delete; + req.nowait = 0; + req.arguments = arguments; + + state->most_recent_api_result = amqp_simple_rpc(state, channel, + AMQP_QUEUE_DECLARE_METHOD, + replies, &req); + if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) + return state->most_recent_api_result.reply.decoded; + else + return NULL; } amqp_queue_delete_ok_t *amqp_queue_delete(amqp_connection_state_t state, @@ -227,11 +300,20 @@ amqp_queue_delete_ok_t *amqp_queue_delete(amqp_connection_state_t state, amqp_boolean_t if_unused, amqp_boolean_t if_empty) { - state->most_recent_api_result = - AMQP_SIMPLE_RPC(state, channel, QUEUE, DELETE, DELETE_OK, - amqp_queue_delete_t, - 0, queue, if_unused, if_empty, 0); - return RPC_REPLY(amqp_queue_delete_ok_t); + amqp_method_number_t replies[2] = { AMQP_QUEUE_DELETE_OK_METHOD, 0}; + amqp_queue_delete_t req; + req.queue = queue; + req.if_unused = if_unused; + req.if_empty = if_empty; + req.nowait = 0; + + state->most_recent_api_result = amqp_simple_rpc(state, channel, + AMQP_QUEUE_DELETE_METHOD, + replies, &req); + if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) + return state->most_recent_api_result.reply.decoded; + else + return NULL; } amqp_queue_bind_ok_t *amqp_queue_bind(amqp_connection_state_t state, @@ -241,11 +323,22 @@ amqp_queue_bind_ok_t *amqp_queue_bind(amqp_connection_state_t state, amqp_bytes_t routing_key, amqp_table_t arguments) { - state->most_recent_api_result = - AMQP_SIMPLE_RPC(state, channel, QUEUE, BIND, BIND_OK, - amqp_queue_bind_t, - 0, queue, exchange, routing_key, 0, arguments); - return RPC_REPLY(amqp_queue_bind_ok_t); + amqp_method_number_t replies[2] = { AMQP_QUEUE_BIND_OK_METHOD, 0}; + amqp_queue_bind_t req; + req.ticket = 0; + req.queue = queue; + req.exchange = exchange; + req.routing_key = routing_key; + req.nowait = 0; + req.arguments = arguments; + + state->most_recent_api_result = amqp_simple_rpc(state, channel, + AMQP_QUEUE_BIND_METHOD, + replies, &req); + if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) + return state->most_recent_api_result.reply.decoded; + else + return NULL; } amqp_queue_unbind_ok_t *amqp_queue_unbind(amqp_connection_state_t state, @@ -255,11 +348,21 @@ amqp_queue_unbind_ok_t *amqp_queue_unbind(amqp_connection_state_t state, amqp_bytes_t routing_key, amqp_table_t arguments) { - state->most_recent_api_result = - AMQP_SIMPLE_RPC(state, channel, QUEUE, UNBIND, UNBIND_OK, - amqp_queue_unbind_t, - 0, queue, exchange, routing_key, arguments); - return RPC_REPLY(amqp_queue_unbind_ok_t); + amqp_method_number_t replies[2] = { AMQP_QUEUE_UNBIND_OK_METHOD, 0}; + amqp_queue_unbind_t req; + req.ticket = 0; + req.queue = queue; + req.exchange = exchange; + req.routing_key = routing_key; + req.arguments = arguments; + + state->most_recent_api_result = amqp_simple_rpc(state, channel, + AMQP_QUEUE_UNBIND_METHOD, + replies, &req); + if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) + return state->most_recent_api_result.reply.decoded; + else + return NULL; } amqp_basic_consume_ok_t *amqp_basic_consume(amqp_connection_state_t state, @@ -271,11 +374,24 @@ amqp_basic_consume_ok_t *amqp_basic_consume(amqp_connection_state_t state, amqp_boolean_t exclusive, amqp_table_t filter) { - state->most_recent_api_result = - AMQP_SIMPLE_RPC(state, channel, BASIC, CONSUME, CONSUME_OK, - amqp_basic_consume_t, - 0, queue, consumer_tag, no_local, no_ack, exclusive, 0, filter); - return RPC_REPLY(amqp_basic_consume_ok_t); + amqp_method_number_t replies[2] = { AMQP_BASIC_CONSUME_OK_METHOD, 0}; + amqp_basic_consume_t req; + req.ticket = 0; + req.queue = queue; + req.consumer_tag = consumer_tag; + req.no_local = no_local; + req.no_ack = no_ack; + req.exclusive = exclusive; + req.nowait = 0; + req.arguments = filter; + + state->most_recent_api_result = amqp_simple_rpc(state, channel, + AMQP_BASIC_CONSUME_METHOD, + replies, &req); + if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) + return state->most_recent_api_result.reply.decoded; + else + return NULL; } int amqp_basic_ack(amqp_connection_state_t state, @@ -283,13 +399,10 @@ int amqp_basic_ack(amqp_connection_state_t state, uint64_t delivery_tag, amqp_boolean_t multiple) { - amqp_basic_ack_t m = - (amqp_basic_ack_t) { - .delivery_tag = delivery_tag, - .multiple = multiple - }; - AMQP_CHECK_RESULT(amqp_send_method(state, channel, AMQP_BASIC_ACK_METHOD, &m)); - return 0; + amqp_basic_ack_t m; + m.delivery_tag = delivery_tag; + m.multiple = multiple; + return amqp_send_method(state, channel, AMQP_BASIC_ACK_METHOD, &m); } amqp_queue_purge_ok_t *amqp_queue_purge(amqp_connection_state_t state, @@ -297,11 +410,19 @@ amqp_queue_purge_ok_t *amqp_queue_purge(amqp_connection_state_t state, amqp_bytes_t queue, amqp_boolean_t no_wait) { - state->most_recent_api_result = - AMQP_SIMPLE_RPC(state, channel, QUEUE, PURGE, PURGE_OK, - amqp_queue_purge_t, - 0, queue, no_wait); - return RPC_REPLY(amqp_queue_purge_ok_t); + amqp_method_number_t replies[2] = { AMQP_QUEUE_PURGE_OK_METHOD, 0}; + amqp_queue_purge_t req; + req.ticket = 0; + req.queue = queue; + req.nowait = 0; + + state->most_recent_api_result = amqp_simple_rpc(state, channel, + AMQP_QUEUE_PURGE_METHOD, + replies, &req); + if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) + return state->most_recent_api_result.reply.decoded; + else + return NULL; } amqp_rpc_reply_t amqp_basic_get(amqp_connection_state_t state, @@ -312,38 +433,54 @@ amqp_rpc_reply_t amqp_basic_get(amqp_connection_state_t state, amqp_method_number_t replies[] = { AMQP_BASIC_GET_OK_METHOD, AMQP_BASIC_GET_EMPTY_METHOD, 0 }; - state->most_recent_api_result = - AMQP_MULTIPLE_RESPONSE_RPC(state, channel, BASIC, GET, replies, - amqp_basic_get_t, - 0, queue, no_ack); + amqp_basic_get_t req; + req.ticket = 0; + req.queue = queue; + req.no_ack = no_ack; + + state->most_recent_api_result = amqp_simple_rpc(state, channel, + AMQP_BASIC_GET_METHOD, + replies, &req); return state->most_recent_api_result; } amqp_tx_select_ok_t *amqp_tx_select(amqp_connection_state_t state, amqp_channel_t channel) { - state->most_recent_api_result = - AMQP_SIMPLE_RPC(state, channel, TX, SELECT, SELECT_OK, - amqp_tx_select_t); - return RPC_REPLY(amqp_tx_select_ok_t); + amqp_method_number_t replies[2] = { AMQP_TX_SELECT_OK_METHOD, 0}; + state->most_recent_api_result = amqp_simple_rpc(state, channel, + AMQP_TX_SELECT_METHOD, + replies, NULL); + if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) + return state->most_recent_api_result.reply.decoded; + else + return NULL; } amqp_tx_commit_ok_t *amqp_tx_commit(amqp_connection_state_t state, amqp_channel_t channel) { - state->most_recent_api_result = - AMQP_SIMPLE_RPC(state, channel, TX, COMMIT, COMMIT_OK, - amqp_tx_commit_t); - return RPC_REPLY(amqp_tx_commit_ok_t); + amqp_method_number_t replies[2] = { AMQP_TX_COMMIT_OK_METHOD, 0}; + state->most_recent_api_result = amqp_simple_rpc(state, channel, + AMQP_TX_COMMIT_METHOD, + replies, NULL); + if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) + return state->most_recent_api_result.reply.decoded; + else + return NULL; } amqp_tx_rollback_ok_t *amqp_tx_rollback(amqp_connection_state_t state, amqp_channel_t channel) { - state->most_recent_api_result = - AMQP_SIMPLE_RPC(state, channel, TX, ROLLBACK, ROLLBACK_OK, - amqp_tx_rollback_t); - return RPC_REPLY(amqp_tx_rollback_ok_t); + amqp_method_number_t replies[2] = { AMQP_TX_ROLLBACK_OK_METHOD, 0}; + state->most_recent_api_result = amqp_simple_rpc(state, channel, + AMQP_TX_ROLLBACK_METHOD, + replies, NULL); + if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) + return state->most_recent_api_result.reply.decoded; + else + return NULL; } amqp_rpc_reply_t amqp_get_rpc_reply(amqp_connection_state_t state) |