diff options
author | Tony Garnock-Jones <tonyg@kcbbs.gen.nz> | 2009-05-12 06:39:26 +0100 |
---|---|---|
committer | Tony Garnock-Jones <tonyg@kcbbs.gen.nz> | 2009-05-12 06:39:26 +0100 |
commit | aabfa202da960cb782cf7c0b90a54cee5969e1fb (patch) | |
tree | ac96edee91c0f9e3b01956b76f1cbcfbd9a632dc /librabbitmq | |
parent | efc0f1ddc3a677434869258e62da90d00fa177cd (diff) | |
download | rabbitmq-c-github-ask-aabfa202da960cb782cf7c0b90a54cee5969e1fb.tar.gz |
Support for multiple channels.
- amqp_login() no longer does amqp_channel_open() for you
- amqp_login() takes a channel_max argument
- amqp_login() actually calls amqp_tune_connection() now
- amqp_channel_close() and amqp_basic_publish() now take a channel number
Diffstat (limited to 'librabbitmq')
-rw-r--r-- | librabbitmq/amqp.h | 22 | ||||
-rw-r--r-- | librabbitmq/amqp_api.c | 42 | ||||
-rw-r--r-- | librabbitmq/amqp_connection.c | 12 | ||||
-rw-r--r-- | librabbitmq/amqp_private.h | 1 | ||||
-rw-r--r-- | librabbitmq/amqp_socket.c | 54 |
5 files changed, 86 insertions, 45 deletions
diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h index 6c9773f..d485093 100644 --- a/librabbitmq/amqp.h +++ b/librabbitmq/amqp.h @@ -137,10 +137,13 @@ extern amqp_bytes_t amqp_bytes_malloc_dup(amqp_bytes_t src); }) extern amqp_connection_state_t amqp_new_connection(void); +extern int amqp_get_sockfd(amqp_connection_state_t state); extern void amqp_set_sockfd(amqp_connection_state_t state, int sockfd); extern int amqp_tune_connection(amqp_connection_state_t state, + int channel_max, int frame_max); +int amqp_get_channel_max(amqp_connection_state_t state); extern void amqp_destroy_connection(amqp_connection_state_t state); extern int amqp_handle_input(amqp_connection_state_t state, @@ -175,7 +178,8 @@ extern int amqp_simple_wait_frame(amqp_connection_state_t state, amqp_frame_t *decoded_frame); extern int amqp_simple_wait_method(amqp_connection_state_t state, - amqp_method_number_t expected_or_zero, + amqp_channel_t expected_channel, + amqp_method_number_t expected_method, amqp_method_t *output); extern int amqp_send_method(amqp_connection_state_t state, @@ -200,11 +204,18 @@ extern amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state, extern amqp_rpc_reply_t amqp_login(amqp_connection_state_t state, char const *vhost, + int channel_max, int frame_max, amqp_sasl_method_enum sasl_method, ...); +extern amqp_rpc_reply_t amqp_rpc_reply; + +extern struct amqp_channel_open_ok_t_ *amqp_channel_open(amqp_connection_state_t state, + amqp_channel_t channel); + struct amqp_basic_properties_t_; extern int amqp_basic_publish(amqp_connection_state_t state, + amqp_channel_t channel, amqp_bytes_t exchange, amqp_bytes_t routing_key, amqp_boolean_t mandatory, @@ -212,10 +223,11 @@ extern int amqp_basic_publish(amqp_connection_state_t state, struct amqp_basic_properties_t_ const *properties, amqp_bytes_t body); -extern amqp_rpc_reply_t amqp_channel_close(amqp_connection_state_t state, int code); -extern amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state, int code); - -extern amqp_rpc_reply_t amqp_rpc_reply; +extern amqp_rpc_reply_t amqp_channel_close(amqp_connection_state_t state, + amqp_channel_t channel, + int code); +extern amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state, + int code); extern struct amqp_exchange_declare_ok_t_ *amqp_exchange_declare(amqp_connection_state_t state, amqp_channel_t channel, diff --git a/librabbitmq/amqp_api.c b/librabbitmq/amqp_api.c index b6dc647..476cb2d 100644 --- a/librabbitmq/amqp_api.c +++ b/librabbitmq/amqp_api.c @@ -10,7 +10,25 @@ #include <assert.h> +amqp_rpc_reply_t amqp_rpc_reply; + +#define RPC_REPLY(replytype) \ + (amqp_rpc_reply.reply_type == AMQP_RESPONSE_NORMAL \ + ? (replytype *) amqp_rpc_reply.reply.decoded \ + : NULL) + +amqp_channel_open_ok_t *amqp_channel_open(amqp_connection_state_t state, + amqp_channel_t channel) +{ + amqp_rpc_reply = + AMQP_SIMPLE_RPC(state, channel, CHANNEL, OPEN, OPEN_OK, + amqp_channel_open_t, + AMQP_EMPTY_BYTES); + return RPC_REPLY(amqp_channel_open_ok_t); +} + int amqp_basic_publish(amqp_connection_state_t state, + amqp_channel_t channel, amqp_bytes_t exchange, amqp_bytes_t routing_key, amqp_boolean_t mandatory, @@ -32,7 +50,7 @@ int amqp_basic_publish(amqp_connection_state_t state, amqp_basic_properties_t default_properties; - AMQP_CHECK_RESULT(amqp_send_method(state, 1, AMQP_BASIC_PUBLISH_METHOD, &m)); + AMQP_CHECK_RESULT(amqp_send_method(state, channel, AMQP_BASIC_PUBLISH_METHOD, &m)); if (properties == NULL) { memset(&default_properties, 0, sizeof(default_properties)); @@ -40,7 +58,7 @@ int amqp_basic_publish(amqp_connection_state_t state, } f.frame_type = AMQP_FRAME_HEADER; - f.channel = 1; + f.channel = channel; f.payload.properties.class_id = AMQP_BASIC_CLASS; f.payload.properties.body_size = body.len; f.payload.properties.decoded = (void *) properties; @@ -55,7 +73,7 @@ int amqp_basic_publish(amqp_connection_state_t state, break; f.frame_type = AMQP_FRAME_BODY; - f.channel = 1; + f.channel = channel; f.payload.body_fragment.bytes = BUF_AT(body, body_offset); if (remaining >= usable_body_payload_size) { f.payload.body_fragment.len = usable_body_payload_size; @@ -70,15 +88,20 @@ int amqp_basic_publish(amqp_connection_state_t state, return 0; } -amqp_rpc_reply_t amqp_channel_close(amqp_connection_state_t state, int code) { +amqp_rpc_reply_t amqp_channel_close(amqp_connection_state_t state, + amqp_channel_t channel, + int code) +{ char codestr[13]; snprintf(codestr, sizeof(codestr), "%d", code); - return AMQP_SIMPLE_RPC(state, 1, CHANNEL, CLOSE, CLOSE_OK, + return AMQP_SIMPLE_RPC(state, channel, CHANNEL, CLOSE, CLOSE_OK, amqp_channel_close_t, code, amqp_cstring_bytes(codestr), 0, 0); } -amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state, int code) { +amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state, + int code) +{ char codestr[13]; snprintf(codestr, sizeof(codestr), "%d", code); return AMQP_SIMPLE_RPC(state, 0, CONNECTION, CLOSE, CLOSE_OK, @@ -86,13 +109,6 @@ amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state, int code) code, amqp_cstring_bytes(codestr), 0, 0); } -amqp_rpc_reply_t amqp_rpc_reply; - -#define RPC_REPLY(replytype) \ - (amqp_rpc_reply.reply_type == AMQP_RESPONSE_NORMAL \ - ? (replytype *) amqp_rpc_reply.reply.decoded \ - : NULL) - amqp_exchange_declare_ok_t *amqp_exchange_declare(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t exchange, diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c index d39a829..9653694 100644 --- a/librabbitmq/amqp_connection.c +++ b/librabbitmq/amqp_connection.c @@ -42,7 +42,7 @@ amqp_connection_state_t amqp_new_connection(void) { state->inbound_buffer.bytes = NULL; state->outbound_buffer.bytes = NULL; - if (amqp_tune_connection(state, INITIAL_FRAME_POOL_PAGE_SIZE) != 0) { + if (amqp_tune_connection(state, 0, INITIAL_FRAME_POOL_PAGE_SIZE) != 0) { empty_amqp_pool(&state->frame_pool); empty_amqp_pool(&state->decoding_pool); free(state); @@ -69,6 +69,10 @@ amqp_connection_state_t amqp_new_connection(void) { return state; } +int amqp_get_sockfd(amqp_connection_state_t state) { + return state->sockfd; +} + void amqp_set_sockfd(amqp_connection_state_t state, int sockfd) { @@ -76,12 +80,14 @@ void amqp_set_sockfd(amqp_connection_state_t state, } int amqp_tune_connection(amqp_connection_state_t state, + int channel_max, int frame_max) { void *newbuf; ENFORCE_STATE(state, CONNECTION_STATE_IDLE); + state->channel_max = channel_max; state->frame_max = frame_max; empty_amqp_pool(&state->frame_pool); @@ -99,6 +105,10 @@ int amqp_tune_connection(amqp_connection_state_t state, return 0; } +int amqp_get_channel_max(amqp_connection_state_t state) { + return state->channel_max; +} + void amqp_destroy_connection(amqp_connection_state_t state) { empty_amqp_pool(&state->frame_pool); empty_amqp_pool(&state->decoding_pool); diff --git a/librabbitmq/amqp_private.h b/librabbitmq/amqp_private.h index d4b1469..0f9987a 100644 --- a/librabbitmq/amqp_private.h +++ b/librabbitmq/amqp_private.h @@ -54,6 +54,7 @@ struct amqp_connection_state_t_ { amqp_connection_state_enum state; + int channel_max; int frame_max; amqp_bytes_t inbound_buffer; diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c index b42ba47..04e8c32 100644 --- a/librabbitmq/amqp_socket.c +++ b/librabbitmq/amqp_socket.c @@ -162,19 +162,27 @@ int amqp_simple_wait_frame(amqp_connection_state_t state, } int amqp_simple_wait_method(amqp_connection_state_t state, - amqp_method_number_t expected_or_zero, + amqp_channel_t expected_channel, + amqp_method_number_t expected_method, amqp_method_t *output) { amqp_frame_t frame; AMQP_CHECK_EOF_RESULT(amqp_simple_wait_frame(state, &frame)); + amqp_assert(frame.channel == expected_channel, + "Expected 0x%08X method frame on channel %d, got frame on channel %d", + expected_method, + expected_channel, + frame.channel); amqp_assert(frame.frame_type == AMQP_FRAME_METHOD, - "Expected 0x%08X method frame, got frame type %d", - expected_or_zero, + "Expected 0x%08X method frame on channel %d, got frame type %d", + expected_method, + expected_channel, frame.frame_type); - amqp_assert((expected_or_zero == 0) || (frame.payload.method.id == expected_or_zero), - "Expected method ID 0x%08X, got ID 0x%08X", - expected_or_zero, + amqp_assert(frame.payload.method.id == expected_method, + "Expected method ID 0x%08X on channel %d, got ID 0x%08X", + expected_method, + expected_channel, frame.payload.method.id); *output = frame.payload.method; return 1; @@ -267,16 +275,18 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state, } static int amqp_login_inner(amqp_connection_state_t state, + int channel_max, int frame_max, amqp_sasl_method_enum sasl_method, va_list vl) { amqp_method_t method; uint32_t server_frame_max; + uint16_t server_channel_max; amqp_send_header(state); - AMQP_CHECK_EOF_RESULT(amqp_simple_wait_method(state, AMQP_CONNECTION_START_METHOD, &method)); + AMQP_CHECK_EOF_RESULT(amqp_simple_wait_method(state, 0, AMQP_CONNECTION_START_METHOD, &method)); { amqp_connection_start_t *s = (amqp_connection_start_t *) method.decoded; if ((s->version_major != AMQP_PROTOCOL_VERSION_MAJOR) || @@ -303,20 +313,27 @@ static int amqp_login_inner(amqp_connection_state_t state, amqp_release_buffers(state); - AMQP_CHECK_EOF_RESULT(amqp_simple_wait_method(state, AMQP_CONNECTION_TUNE_METHOD, &method)); + AMQP_CHECK_EOF_RESULT(amqp_simple_wait_method(state, 0, AMQP_CONNECTION_TUNE_METHOD, &method)); { amqp_connection_tune_t *s = (amqp_connection_tune_t *) method.decoded; + server_channel_max = s->channel_max; server_frame_max = s->frame_max; } + if (server_channel_max != 0 && server_channel_max < channel_max) { + channel_max = server_channel_max; + } + if (server_frame_max != 0 && server_frame_max < frame_max) { frame_max = server_frame_max; } + AMQP_CHECK_RESULT(amqp_tune_connection(state, channel_max, frame_max)); + { amqp_connection_tune_ok_t s = (amqp_connection_tune_ok_t) { - .channel_max = 1, + .channel_max = channel_max, .frame_max = frame_max, .heartbeat = 0 }; @@ -330,6 +347,7 @@ static int amqp_login_inner(amqp_connection_state_t state, amqp_rpc_reply_t amqp_login(amqp_connection_state_t state, char const *vhost, + int channel_max, int frame_max, amqp_sasl_method_enum sasl_method, ...) @@ -339,7 +357,7 @@ amqp_rpc_reply_t amqp_login(amqp_connection_state_t state, va_start(vl, sasl_method); - amqp_login_inner(state, frame_max, sasl_method, vl); + amqp_login_inner(state, channel_max, frame_max, sasl_method, vl); { amqp_connection_open_t s = @@ -359,22 +377,6 @@ amqp_rpc_reply_t amqp_login(amqp_connection_state_t state, } amqp_maybe_release_buffers(state); - { - amqp_channel_open_t s = - (amqp_channel_open_t) { - .out_of_band = {.len = 0, .bytes = NULL} - }; - result = amqp_simple_rpc(state, - 1, - AMQP_CHANNEL_OPEN_METHOD, - AMQP_CHANNEL_OPEN_OK_METHOD, - &s); - if (result.reply_type != AMQP_RESPONSE_NORMAL) { - return result; - } - } - amqp_maybe_release_buffers(state); - va_end(vl); result.reply_type = AMQP_RESPONSE_NORMAL; |