diff options
author | Tony Garnock-Jones <tonyg@kcbbs.gen.nz> | 2009-05-12 06:39:26 +0100 |
---|---|---|
committer | Tony Garnock-Jones <tonyg@kcbbs.gen.nz> | 2009-05-12 06:39:26 +0100 |
commit | aabfa202da960cb782cf7c0b90a54cee5969e1fb (patch) | |
tree | ac96edee91c0f9e3b01956b76f1cbcfbd9a632dc /librabbitmq/amqp_api.c | |
parent | efc0f1ddc3a677434869258e62da90d00fa177cd (diff) | |
download | rabbitmq-c-aabfa202da960cb782cf7c0b90a54cee5969e1fb.tar.gz |
Support for multiple channels.
- amqp_login() no longer does amqp_channel_open() for you
- amqp_login() takes a channel_max argument
- amqp_login() actually calls amqp_tune_connection() now
- amqp_channel_close() and amqp_basic_publish() now take a channel number
Diffstat (limited to 'librabbitmq/amqp_api.c')
-rw-r--r-- | librabbitmq/amqp_api.c | 42 |
1 files changed, 29 insertions, 13 deletions
diff --git a/librabbitmq/amqp_api.c b/librabbitmq/amqp_api.c index b6dc647..476cb2d 100644 --- a/librabbitmq/amqp_api.c +++ b/librabbitmq/amqp_api.c @@ -10,7 +10,25 @@ #include <assert.h> +amqp_rpc_reply_t amqp_rpc_reply; + +#define RPC_REPLY(replytype) \ + (amqp_rpc_reply.reply_type == AMQP_RESPONSE_NORMAL \ + ? (replytype *) amqp_rpc_reply.reply.decoded \ + : NULL) + +amqp_channel_open_ok_t *amqp_channel_open(amqp_connection_state_t state, + amqp_channel_t channel) +{ + amqp_rpc_reply = + AMQP_SIMPLE_RPC(state, channel, CHANNEL, OPEN, OPEN_OK, + amqp_channel_open_t, + AMQP_EMPTY_BYTES); + return RPC_REPLY(amqp_channel_open_ok_t); +} + int amqp_basic_publish(amqp_connection_state_t state, + amqp_channel_t channel, amqp_bytes_t exchange, amqp_bytes_t routing_key, amqp_boolean_t mandatory, @@ -32,7 +50,7 @@ int amqp_basic_publish(amqp_connection_state_t state, amqp_basic_properties_t default_properties; - AMQP_CHECK_RESULT(amqp_send_method(state, 1, AMQP_BASIC_PUBLISH_METHOD, &m)); + AMQP_CHECK_RESULT(amqp_send_method(state, channel, AMQP_BASIC_PUBLISH_METHOD, &m)); if (properties == NULL) { memset(&default_properties, 0, sizeof(default_properties)); @@ -40,7 +58,7 @@ int amqp_basic_publish(amqp_connection_state_t state, } f.frame_type = AMQP_FRAME_HEADER; - f.channel = 1; + f.channel = channel; f.payload.properties.class_id = AMQP_BASIC_CLASS; f.payload.properties.body_size = body.len; f.payload.properties.decoded = (void *) properties; @@ -55,7 +73,7 @@ int amqp_basic_publish(amqp_connection_state_t state, break; f.frame_type = AMQP_FRAME_BODY; - f.channel = 1; + f.channel = channel; f.payload.body_fragment.bytes = BUF_AT(body, body_offset); if (remaining >= usable_body_payload_size) { f.payload.body_fragment.len = usable_body_payload_size; @@ -70,15 +88,20 @@ int amqp_basic_publish(amqp_connection_state_t state, return 0; } -amqp_rpc_reply_t amqp_channel_close(amqp_connection_state_t state, int code) { +amqp_rpc_reply_t amqp_channel_close(amqp_connection_state_t state, + amqp_channel_t channel, + int code) +{ char codestr[13]; snprintf(codestr, sizeof(codestr), "%d", code); - return AMQP_SIMPLE_RPC(state, 1, CHANNEL, CLOSE, CLOSE_OK, + return AMQP_SIMPLE_RPC(state, channel, CHANNEL, CLOSE, CLOSE_OK, amqp_channel_close_t, code, amqp_cstring_bytes(codestr), 0, 0); } -amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state, int code) { +amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state, + int code) +{ char codestr[13]; snprintf(codestr, sizeof(codestr), "%d", code); return AMQP_SIMPLE_RPC(state, 0, CONNECTION, CLOSE, CLOSE_OK, @@ -86,13 +109,6 @@ amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state, int code) code, amqp_cstring_bytes(codestr), 0, 0); } -amqp_rpc_reply_t amqp_rpc_reply; - -#define RPC_REPLY(replytype) \ - (amqp_rpc_reply.reply_type == AMQP_RESPONSE_NORMAL \ - ? (replytype *) amqp_rpc_reply.reply.decoded \ - : NULL) - amqp_exchange_declare_ok_t *amqp_exchange_declare(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t exchange, |