summaryrefslogtreecommitdiff
path: root/librabbitmq/amqp_api.c
diff options
context:
space:
mode:
authorTony Garnock-Jones <tonyg@kcbbs.gen.nz>2009-05-12 06:39:26 +0100
committerTony Garnock-Jones <tonyg@kcbbs.gen.nz>2009-05-12 06:39:26 +0100
commitaabfa202da960cb782cf7c0b90a54cee5969e1fb (patch)
treeac96edee91c0f9e3b01956b76f1cbcfbd9a632dc /librabbitmq/amqp_api.c
parentefc0f1ddc3a677434869258e62da90d00fa177cd (diff)
downloadrabbitmq-c-aabfa202da960cb782cf7c0b90a54cee5969e1fb.tar.gz
Support for multiple channels.
- amqp_login() no longer does amqp_channel_open() for you - amqp_login() takes a channel_max argument - amqp_login() actually calls amqp_tune_connection() now - amqp_channel_close() and amqp_basic_publish() now take a channel number
Diffstat (limited to 'librabbitmq/amqp_api.c')
-rw-r--r--librabbitmq/amqp_api.c42
1 files changed, 29 insertions, 13 deletions
diff --git a/librabbitmq/amqp_api.c b/librabbitmq/amqp_api.c
index b6dc647..476cb2d 100644
--- a/librabbitmq/amqp_api.c
+++ b/librabbitmq/amqp_api.c
@@ -10,7 +10,25 @@
#include <assert.h>
+amqp_rpc_reply_t amqp_rpc_reply;
+
+#define RPC_REPLY(replytype) \
+ (amqp_rpc_reply.reply_type == AMQP_RESPONSE_NORMAL \
+ ? (replytype *) amqp_rpc_reply.reply.decoded \
+ : NULL)
+
+amqp_channel_open_ok_t *amqp_channel_open(amqp_connection_state_t state,
+ amqp_channel_t channel)
+{
+ amqp_rpc_reply =
+ AMQP_SIMPLE_RPC(state, channel, CHANNEL, OPEN, OPEN_OK,
+ amqp_channel_open_t,
+ AMQP_EMPTY_BYTES);
+ return RPC_REPLY(amqp_channel_open_ok_t);
+}
+
int amqp_basic_publish(amqp_connection_state_t state,
+ amqp_channel_t channel,
amqp_bytes_t exchange,
amqp_bytes_t routing_key,
amqp_boolean_t mandatory,
@@ -32,7 +50,7 @@ int amqp_basic_publish(amqp_connection_state_t state,
amqp_basic_properties_t default_properties;
- AMQP_CHECK_RESULT(amqp_send_method(state, 1, AMQP_BASIC_PUBLISH_METHOD, &m));
+ AMQP_CHECK_RESULT(amqp_send_method(state, channel, AMQP_BASIC_PUBLISH_METHOD, &m));
if (properties == NULL) {
memset(&default_properties, 0, sizeof(default_properties));
@@ -40,7 +58,7 @@ int amqp_basic_publish(amqp_connection_state_t state,
}
f.frame_type = AMQP_FRAME_HEADER;
- f.channel = 1;
+ f.channel = channel;
f.payload.properties.class_id = AMQP_BASIC_CLASS;
f.payload.properties.body_size = body.len;
f.payload.properties.decoded = (void *) properties;
@@ -55,7 +73,7 @@ int amqp_basic_publish(amqp_connection_state_t state,
break;
f.frame_type = AMQP_FRAME_BODY;
- f.channel = 1;
+ f.channel = channel;
f.payload.body_fragment.bytes = BUF_AT(body, body_offset);
if (remaining >= usable_body_payload_size) {
f.payload.body_fragment.len = usable_body_payload_size;
@@ -70,15 +88,20 @@ int amqp_basic_publish(amqp_connection_state_t state,
return 0;
}
-amqp_rpc_reply_t amqp_channel_close(amqp_connection_state_t state, int code) {
+amqp_rpc_reply_t amqp_channel_close(amqp_connection_state_t state,
+ amqp_channel_t channel,
+ int code)
+{
char codestr[13];
snprintf(codestr, sizeof(codestr), "%d", code);
- return AMQP_SIMPLE_RPC(state, 1, CHANNEL, CLOSE, CLOSE_OK,
+ return AMQP_SIMPLE_RPC(state, channel, CHANNEL, CLOSE, CLOSE_OK,
amqp_channel_close_t,
code, amqp_cstring_bytes(codestr), 0, 0);
}
-amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state, int code) {
+amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state,
+ int code)
+{
char codestr[13];
snprintf(codestr, sizeof(codestr), "%d", code);
return AMQP_SIMPLE_RPC(state, 0, CONNECTION, CLOSE, CLOSE_OK,
@@ -86,13 +109,6 @@ amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state, int code)
code, amqp_cstring_bytes(codestr), 0, 0);
}
-amqp_rpc_reply_t amqp_rpc_reply;
-
-#define RPC_REPLY(replytype) \
- (amqp_rpc_reply.reply_type == AMQP_RESPONSE_NORMAL \
- ? (replytype *) amqp_rpc_reply.reply.decoded \
- : NULL)
-
amqp_exchange_declare_ok_t *amqp_exchange_declare(amqp_connection_state_t state,
amqp_channel_t channel,
amqp_bytes_t exchange,