summaryrefslogtreecommitdiff
path: root/librabbitmq/amqp_api.c
diff options
context:
space:
mode:
Diffstat (limited to 'librabbitmq/amqp_api.c')
-rw-r--r--librabbitmq/amqp_api.c303
1 files changed, 220 insertions, 83 deletions
diff --git a/librabbitmq/amqp_api.c b/librabbitmq/amqp_api.c
index b2793ff..bf19761 100644
--- a/librabbitmq/amqp_api.c
+++ b/librabbitmq/amqp_api.c
@@ -52,6 +52,7 @@
#include <stdio.h>
#include <string.h>
#include <stdint.h>
+#include <stdarg.h>
#include "amqp.h"
#include "amqp_framing.h"
@@ -69,6 +70,12 @@ static const char *client_error_strings[ERROR_MAX] = {
"connection closed unexpectedly", /* ERROR_CONNECTION_CLOSED */
};
+/* strdup is not in ISO C90! */
+static inline char *strdup(const char *str)
+{
+ return strcpy(malloc(strlen(str) + 1),str);
+}
+
char *amqp_error_string(int err)
{
const char *str;
@@ -93,6 +100,20 @@ char *amqp_error_string(int err)
return strdup(str);
}
+void amqp_abort(const char *fmt, ...)
+{
+ va_list ap;
+ va_start(ap, fmt);
+ vfprintf(stderr, fmt, ap);
+ va_end(ap);
+ fputc('\n', stderr);
+ abort();
+}
+
+const amqp_bytes_t amqp_empty_bytes = { 0, NULL };
+const amqp_table_t amqp_empty_table = { 0, NULL };
+const amqp_array_t amqp_empty_array = { 0, NULL };
+
#define RPC_REPLY(replytype) \
(state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL \
? (replytype *) state->most_recent_api_result.reply.decoded \
@@ -101,11 +122,18 @@ char *amqp_error_string(int err)
amqp_channel_open_ok_t *amqp_channel_open(amqp_connection_state_t state,
amqp_channel_t channel)
{
- state->most_recent_api_result =
- AMQP_SIMPLE_RPC(state, channel, CHANNEL, OPEN, OPEN_OK,
- amqp_channel_open_t,
- AMQP_EMPTY_BYTES);
- return RPC_REPLY(amqp_channel_open_ok_t);
+ amqp_method_number_t replies[2] = { AMQP_CHANNEL_OPEN_OK_METHOD, 0};
+ amqp_channel_open_t req;
+ req.out_of_band.bytes = NULL;
+ req.out_of_band.len = 0;
+
+ state->most_recent_api_result = amqp_simple_rpc(state, channel,
+ AMQP_CHANNEL_OPEN_METHOD,
+ replies, &req);
+ if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL)
+ return state->most_recent_api_result.reply.decoded;
+ else
+ return NULL;
}
int amqp_basic_publish(amqp_connection_state_t state,
@@ -120,18 +148,19 @@ int amqp_basic_publish(amqp_connection_state_t state,
amqp_frame_t f;
size_t body_offset;
size_t usable_body_payload_size = state->frame_max - (HEADER_SIZE + FOOTER_SIZE);
+ int res;
- amqp_basic_publish_t m =
- (amqp_basic_publish_t) {
- .exchange = exchange,
- .routing_key = routing_key,
- .mandatory = mandatory,
- .immediate = immediate
- };
-
+ amqp_basic_publish_t m;
amqp_basic_properties_t default_properties;
- AMQP_CHECK_RESULT(amqp_send_method(state, channel, AMQP_BASIC_PUBLISH_METHOD, &m));
+ m.exchange = exchange;
+ m.routing_key = routing_key;
+ m.mandatory = mandatory;
+ m.immediate = immediate;
+
+ res = amqp_send_method(state, channel, AMQP_BASIC_PUBLISH_METHOD, &m);
+ if (res < 0)
+ return res;
if (properties == NULL) {
memset(&default_properties, 0, sizeof(default_properties));
@@ -143,7 +172,10 @@ int amqp_basic_publish(amqp_connection_state_t state,
f.payload.properties.class_id = AMQP_BASIC_CLASS;
f.payload.properties.body_size = body.len;
f.payload.properties.decoded = (void *) properties;
- AMQP_CHECK_RESULT(amqp_send_frame(state, &f));
+
+ res = amqp_send_frame(state, &f);
+ if (res < 0)
+ return res;
body_offset = 0;
while (1) {
@@ -155,7 +187,7 @@ int amqp_basic_publish(amqp_connection_state_t state,
f.frame_type = AMQP_FRAME_BODY;
f.channel = channel;
- f.payload.body_fragment.bytes = BUF_AT(body, body_offset);
+ f.payload.body_fragment.bytes = amqp_offset(body.bytes, body_offset);
if (remaining >= usable_body_payload_size) {
f.payload.body_fragment.len = usable_body_payload_size;
} else {
@@ -163,7 +195,9 @@ int amqp_basic_publish(amqp_connection_state_t state,
}
body_offset += f.payload.body_fragment.len;
- AMQP_CHECK_RESULT(amqp_send_frame(state, &f));
+ res = amqp_send_frame(state, &f);
+ if (res < 0)
+ return res;
}
return 0;
@@ -174,20 +208,34 @@ amqp_rpc_reply_t amqp_channel_close(amqp_connection_state_t state,
int code)
{
char codestr[13];
- snprintf(codestr, sizeof(codestr), "%d", code);
- return AMQP_SIMPLE_RPC(state, channel, CHANNEL, CLOSE, CLOSE_OK,
- amqp_channel_close_t,
- code, amqp_cstring_bytes(codestr), 0, 0);
+ amqp_method_number_t replies[2] = { AMQP_CHANNEL_CLOSE_OK_METHOD, 0};
+ amqp_channel_close_t req;
+
+ req.reply_code = code;
+ req.reply_text.bytes = codestr;
+ req.reply_text.len = sprintf(codestr, "%d", code);
+ req.class_id = 0;
+ req.method_id = 0;
+
+ return amqp_simple_rpc(state, channel, AMQP_CHANNEL_CLOSE_METHOD,
+ replies, &req);
}
amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state,
int code)
{
char codestr[13];
- snprintf(codestr, sizeof(codestr), "%d", code);
- return AMQP_SIMPLE_RPC(state, 0, CONNECTION, CLOSE, CLOSE_OK,
- amqp_connection_close_t,
- code, amqp_cstring_bytes(codestr), 0, 0);
+ amqp_method_number_t replies[2] = { AMQP_CONNECTION_CLOSE_OK_METHOD, 0};
+ amqp_channel_close_t req;
+
+ req.reply_code = code;
+ req.reply_text.bytes = codestr;
+ req.reply_text.len = sprintf(codestr, "%d", code);
+ req.class_id = 0;
+ req.method_id = 0;
+
+ return amqp_simple_rpc(state, 0, AMQP_CONNECTION_CLOSE_METHOD,
+ replies, &req);
}
amqp_exchange_declare_ok_t *amqp_exchange_declare(amqp_connection_state_t state,
@@ -198,11 +246,24 @@ amqp_exchange_declare_ok_t *amqp_exchange_declare(amqp_connection_state_t state,
amqp_boolean_t durable,
amqp_table_t arguments)
{
- state->most_recent_api_result =
- AMQP_SIMPLE_RPC(state, channel, EXCHANGE, DECLARE, DECLARE_OK,
- amqp_exchange_declare_t,
- 0, exchange, type, passive, durable, 0, 0, 0, arguments);
- return RPC_REPLY(amqp_exchange_declare_ok_t);
+ amqp_method_number_t replies[2] = { AMQP_EXCHANGE_DECLARE_OK_METHOD, 0};
+ amqp_exchange_declare_t req;
+ req.exchange = exchange;
+ req.type = type;
+ req.passive = passive;
+ req.durable = durable;
+ req.auto_delete = 0;
+ req.internal = 0;
+ req.nowait = 0;
+ req.arguments = arguments;
+
+ state->most_recent_api_result = amqp_simple_rpc(state, channel,
+ AMQP_EXCHANGE_DECLARE_METHOD,
+ replies, &req);
+ if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL)
+ return state->most_recent_api_result.reply.decoded;
+ else
+ return NULL;
}
amqp_queue_declare_ok_t *amqp_queue_declare(amqp_connection_state_t state,
@@ -214,11 +275,23 @@ amqp_queue_declare_ok_t *amqp_queue_declare(amqp_connection_state_t state,
amqp_boolean_t auto_delete,
amqp_table_t arguments)
{
- state->most_recent_api_result =
- AMQP_SIMPLE_RPC(state, channel, QUEUE, DECLARE, DECLARE_OK,
- amqp_queue_declare_t,
- 0, queue, passive, durable, exclusive, auto_delete, 0, arguments);
- return RPC_REPLY(amqp_queue_declare_ok_t);
+ amqp_method_number_t replies[2] = { AMQP_QUEUE_DECLARE_OK_METHOD, 0};
+ amqp_queue_declare_t req;
+ req.queue = queue;
+ req.passive = passive;
+ req.durable = durable;
+ req.exclusive = exclusive;
+ req.auto_delete = auto_delete;
+ req.nowait = 0;
+ req.arguments = arguments;
+
+ state->most_recent_api_result = amqp_simple_rpc(state, channel,
+ AMQP_QUEUE_DECLARE_METHOD,
+ replies, &req);
+ if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL)
+ return state->most_recent_api_result.reply.decoded;
+ else
+ return NULL;
}
amqp_queue_delete_ok_t *amqp_queue_delete(amqp_connection_state_t state,
@@ -227,11 +300,20 @@ amqp_queue_delete_ok_t *amqp_queue_delete(amqp_connection_state_t state,
amqp_boolean_t if_unused,
amqp_boolean_t if_empty)
{
- state->most_recent_api_result =
- AMQP_SIMPLE_RPC(state, channel, QUEUE, DELETE, DELETE_OK,
- amqp_queue_delete_t,
- 0, queue, if_unused, if_empty, 0);
- return RPC_REPLY(amqp_queue_delete_ok_t);
+ amqp_method_number_t replies[2] = { AMQP_QUEUE_DELETE_OK_METHOD, 0};
+ amqp_queue_delete_t req;
+ req.queue = queue;
+ req.if_unused = if_unused;
+ req.if_empty = if_empty;
+ req.nowait = 0;
+
+ state->most_recent_api_result = amqp_simple_rpc(state, channel,
+ AMQP_QUEUE_DELETE_METHOD,
+ replies, &req);
+ if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL)
+ return state->most_recent_api_result.reply.decoded;
+ else
+ return NULL;
}
amqp_queue_bind_ok_t *amqp_queue_bind(amqp_connection_state_t state,
@@ -241,11 +323,22 @@ amqp_queue_bind_ok_t *amqp_queue_bind(amqp_connection_state_t state,
amqp_bytes_t routing_key,
amqp_table_t arguments)
{
- state->most_recent_api_result =
- AMQP_SIMPLE_RPC(state, channel, QUEUE, BIND, BIND_OK,
- amqp_queue_bind_t,
- 0, queue, exchange, routing_key, 0, arguments);
- return RPC_REPLY(amqp_queue_bind_ok_t);
+ amqp_method_number_t replies[2] = { AMQP_QUEUE_BIND_OK_METHOD, 0};
+ amqp_queue_bind_t req;
+ req.ticket = 0;
+ req.queue = queue;
+ req.exchange = exchange;
+ req.routing_key = routing_key;
+ req.nowait = 0;
+ req.arguments = arguments;
+
+ state->most_recent_api_result = amqp_simple_rpc(state, channel,
+ AMQP_QUEUE_BIND_METHOD,
+ replies, &req);
+ if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL)
+ return state->most_recent_api_result.reply.decoded;
+ else
+ return NULL;
}
amqp_queue_unbind_ok_t *amqp_queue_unbind(amqp_connection_state_t state,
@@ -255,11 +348,21 @@ amqp_queue_unbind_ok_t *amqp_queue_unbind(amqp_connection_state_t state,
amqp_bytes_t routing_key,
amqp_table_t arguments)
{
- state->most_recent_api_result =
- AMQP_SIMPLE_RPC(state, channel, QUEUE, UNBIND, UNBIND_OK,
- amqp_queue_unbind_t,
- 0, queue, exchange, routing_key, arguments);
- return RPC_REPLY(amqp_queue_unbind_ok_t);
+ amqp_method_number_t replies[2] = { AMQP_QUEUE_UNBIND_OK_METHOD, 0};
+ amqp_queue_unbind_t req;
+ req.ticket = 0;
+ req.queue = queue;
+ req.exchange = exchange;
+ req.routing_key = routing_key;
+ req.arguments = arguments;
+
+ state->most_recent_api_result = amqp_simple_rpc(state, channel,
+ AMQP_QUEUE_UNBIND_METHOD,
+ replies, &req);
+ if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL)
+ return state->most_recent_api_result.reply.decoded;
+ else
+ return NULL;
}
amqp_basic_consume_ok_t *amqp_basic_consume(amqp_connection_state_t state,
@@ -271,11 +374,24 @@ amqp_basic_consume_ok_t *amqp_basic_consume(amqp_connection_state_t state,
amqp_boolean_t exclusive,
amqp_table_t filter)
{
- state->most_recent_api_result =
- AMQP_SIMPLE_RPC(state, channel, BASIC, CONSUME, CONSUME_OK,
- amqp_basic_consume_t,
- 0, queue, consumer_tag, no_local, no_ack, exclusive, 0, filter);
- return RPC_REPLY(amqp_basic_consume_ok_t);
+ amqp_method_number_t replies[2] = { AMQP_BASIC_CONSUME_OK_METHOD, 0};
+ amqp_basic_consume_t req;
+ req.ticket = 0;
+ req.queue = queue;
+ req.consumer_tag = consumer_tag;
+ req.no_local = no_local;
+ req.no_ack = no_ack;
+ req.exclusive = exclusive;
+ req.nowait = 0;
+ req.arguments = filter;
+
+ state->most_recent_api_result = amqp_simple_rpc(state, channel,
+ AMQP_BASIC_CONSUME_METHOD,
+ replies, &req);
+ if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL)
+ return state->most_recent_api_result.reply.decoded;
+ else
+ return NULL;
}
int amqp_basic_ack(amqp_connection_state_t state,
@@ -283,13 +399,10 @@ int amqp_basic_ack(amqp_connection_state_t state,
uint64_t delivery_tag,
amqp_boolean_t multiple)
{
- amqp_basic_ack_t m =
- (amqp_basic_ack_t) {
- .delivery_tag = delivery_tag,
- .multiple = multiple
- };
- AMQP_CHECK_RESULT(amqp_send_method(state, channel, AMQP_BASIC_ACK_METHOD, &m));
- return 0;
+ amqp_basic_ack_t m;
+ m.delivery_tag = delivery_tag;
+ m.multiple = multiple;
+ return amqp_send_method(state, channel, AMQP_BASIC_ACK_METHOD, &m);
}
amqp_queue_purge_ok_t *amqp_queue_purge(amqp_connection_state_t state,
@@ -297,11 +410,19 @@ amqp_queue_purge_ok_t *amqp_queue_purge(amqp_connection_state_t state,
amqp_bytes_t queue,
amqp_boolean_t no_wait)
{
- state->most_recent_api_result =
- AMQP_SIMPLE_RPC(state, channel, QUEUE, PURGE, PURGE_OK,
- amqp_queue_purge_t,
- 0, queue, no_wait);
- return RPC_REPLY(amqp_queue_purge_ok_t);
+ amqp_method_number_t replies[2] = { AMQP_QUEUE_PURGE_OK_METHOD, 0};
+ amqp_queue_purge_t req;
+ req.ticket = 0;
+ req.queue = queue;
+ req.nowait = 0;
+
+ state->most_recent_api_result = amqp_simple_rpc(state, channel,
+ AMQP_QUEUE_PURGE_METHOD,
+ replies, &req);
+ if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL)
+ return state->most_recent_api_result.reply.decoded;
+ else
+ return NULL;
}
amqp_rpc_reply_t amqp_basic_get(amqp_connection_state_t state,
@@ -312,38 +433,54 @@ amqp_rpc_reply_t amqp_basic_get(amqp_connection_state_t state,
amqp_method_number_t replies[] = { AMQP_BASIC_GET_OK_METHOD,
AMQP_BASIC_GET_EMPTY_METHOD,
0 };
- state->most_recent_api_result =
- AMQP_MULTIPLE_RESPONSE_RPC(state, channel, BASIC, GET, replies,
- amqp_basic_get_t,
- 0, queue, no_ack);
+ amqp_basic_get_t req;
+ req.ticket = 0;
+ req.queue = queue;
+ req.no_ack = no_ack;
+
+ state->most_recent_api_result = amqp_simple_rpc(state, channel,
+ AMQP_BASIC_GET_METHOD,
+ replies, &req);
return state->most_recent_api_result;
}
amqp_tx_select_ok_t *amqp_tx_select(amqp_connection_state_t state,
amqp_channel_t channel)
{
- state->most_recent_api_result =
- AMQP_SIMPLE_RPC(state, channel, TX, SELECT, SELECT_OK,
- amqp_tx_select_t);
- return RPC_REPLY(amqp_tx_select_ok_t);
+ amqp_method_number_t replies[2] = { AMQP_TX_SELECT_OK_METHOD, 0};
+ state->most_recent_api_result = amqp_simple_rpc(state, channel,
+ AMQP_TX_SELECT_METHOD,
+ replies, NULL);
+ if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL)
+ return state->most_recent_api_result.reply.decoded;
+ else
+ return NULL;
}
amqp_tx_commit_ok_t *amqp_tx_commit(amqp_connection_state_t state,
amqp_channel_t channel)
{
- state->most_recent_api_result =
- AMQP_SIMPLE_RPC(state, channel, TX, COMMIT, COMMIT_OK,
- amqp_tx_commit_t);
- return RPC_REPLY(amqp_tx_commit_ok_t);
+ amqp_method_number_t replies[2] = { AMQP_TX_COMMIT_OK_METHOD, 0};
+ state->most_recent_api_result = amqp_simple_rpc(state, channel,
+ AMQP_TX_COMMIT_METHOD,
+ replies, NULL);
+ if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL)
+ return state->most_recent_api_result.reply.decoded;
+ else
+ return NULL;
}
amqp_tx_rollback_ok_t *amqp_tx_rollback(amqp_connection_state_t state,
amqp_channel_t channel)
{
- state->most_recent_api_result =
- AMQP_SIMPLE_RPC(state, channel, TX, ROLLBACK, ROLLBACK_OK,
- amqp_tx_rollback_t);
- return RPC_REPLY(amqp_tx_rollback_ok_t);
+ amqp_method_number_t replies[2] = { AMQP_TX_ROLLBACK_OK_METHOD, 0};
+ state->most_recent_api_result = amqp_simple_rpc(state, channel,
+ AMQP_TX_ROLLBACK_METHOD,
+ replies, NULL);
+ if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL)
+ return state->most_recent_api_result.reply.decoded;
+ else
+ return NULL;
}
amqp_rpc_reply_t amqp_get_rpc_reply(amqp_connection_state_t state)