summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTony Garnock-Jones <tonyg@kcbbs.gen.nz>2009-05-12 06:39:26 +0100
committerTony Garnock-Jones <tonyg@kcbbs.gen.nz>2009-05-12 06:39:26 +0100
commitaabfa202da960cb782cf7c0b90a54cee5969e1fb (patch)
treeac96edee91c0f9e3b01956b76f1cbcfbd9a632dc
parentefc0f1ddc3a677434869258e62da90d00fa177cd (diff)
downloadrabbitmq-c-github-ask-aabfa202da960cb782cf7c0b90a54cee5969e1fb.tar.gz
Support for multiple channels.
- amqp_login() no longer does amqp_channel_open() for you - amqp_login() takes a channel_max argument - amqp_login() actually calls amqp_tune_connection() now - amqp_channel_close() and amqp_basic_publish() now take a channel number
-rw-r--r--examples/amqp_consumer.c6
-rw-r--r--examples/amqp_exchange_declare.c6
-rw-r--r--examples/amqp_listen.c6
-rw-r--r--examples/amqp_producer.c7
-rw-r--r--examples/amqp_sendstring.c7
-rw-r--r--librabbitmq/amqp.h22
-rw-r--r--librabbitmq/amqp_api.c42
-rw-r--r--librabbitmq/amqp_connection.c12
-rw-r--r--librabbitmq/amqp_private.h1
-rw-r--r--librabbitmq/amqp_socket.c54
10 files changed, 108 insertions, 55 deletions
diff --git a/examples/amqp_consumer.c b/examples/amqp_consumer.c
index 2ce6707..3dd3262 100644
--- a/examples/amqp_consumer.c
+++ b/examples/amqp_consumer.c
@@ -104,8 +104,10 @@ int main(int argc, char const * const *argv) {
die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket");
amqp_set_sockfd(conn, sockfd);
- die_on_amqp_error(amqp_login(conn, "/", 131072, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
+ die_on_amqp_error(amqp_login(conn, "/", 0, 131072, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
"Logging in");
+ amqp_channel_open(conn, 1);
+ die_on_amqp_error(amqp_rpc_reply, "Opening channel");
{
amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, AMQP_EMPTY_BYTES, 0, 0, 0, 1,
@@ -126,7 +128,7 @@ int main(int argc, char const * const *argv) {
run(conn);
- die_on_amqp_error(amqp_channel_close(conn, AMQP_REPLY_SUCCESS), "Closing channel");
+ die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");
amqp_destroy_connection(conn);
die_on_error(close(sockfd), "Closing socket");
diff --git a/examples/amqp_exchange_declare.c b/examples/amqp_exchange_declare.c
index f163db1..0082d62 100644
--- a/examples/amqp_exchange_declare.c
+++ b/examples/amqp_exchange_declare.c
@@ -33,14 +33,16 @@ int main(int argc, char const * const *argv) {
die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket");
amqp_set_sockfd(conn, sockfd);
- die_on_amqp_error(amqp_login(conn, "/", 131072, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
+ die_on_amqp_error(amqp_login(conn, "/", 0, 131072, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
"Logging in");
+ amqp_channel_open(conn, 1);
+ die_on_amqp_error(amqp_rpc_reply, "Opening channel");
amqp_exchange_declare(conn, 1, amqp_cstring_bytes(exchange), amqp_cstring_bytes(exchangetype),
0, 0, 0, AMQP_EMPTY_TABLE);
die_on_amqp_error(amqp_rpc_reply, "Declaring exchange");
- die_on_amqp_error(amqp_channel_close(conn, AMQP_REPLY_SUCCESS), "Closing channel");
+ die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");
amqp_destroy_connection(conn);
die_on_error(close(sockfd), "Closing socket");
diff --git a/examples/amqp_listen.c b/examples/amqp_listen.c
index 975d49f..dc0f00c 100644
--- a/examples/amqp_listen.c
+++ b/examples/amqp_listen.c
@@ -40,8 +40,10 @@ int main(int argc, char const * const *argv) {
die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket");
amqp_set_sockfd(conn, sockfd);
- die_on_amqp_error(amqp_login(conn, "/", 131072, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
+ die_on_amqp_error(amqp_login(conn, "/", 0, 131072, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
"Logging in");
+ amqp_channel_open(conn, 1);
+ die_on_amqp_error(amqp_rpc_reply, "Opening channel");
{
amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, AMQP_EMPTY_BYTES, 0, 0, 0, 1,
@@ -133,7 +135,7 @@ int main(int argc, char const * const *argv) {
}
}
- die_on_amqp_error(amqp_channel_close(conn, AMQP_REPLY_SUCCESS), "Closing channel");
+ die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");
amqp_destroy_connection(conn);
die_on_error(close(sockfd), "Closing socket");
diff --git a/examples/amqp_producer.c b/examples/amqp_producer.c
index 89e56bf..106997d 100644
--- a/examples/amqp_producer.c
+++ b/examples/amqp_producer.c
@@ -33,6 +33,7 @@ static void send_batch(amqp_connection_state_t conn,
for (i = 0; i < message_count; i++) {
long long now = now_microseconds();
die_on_error(amqp_basic_publish(conn,
+ 1,
amqp_cstring_bytes("amq.direct"),
amqp_cstring_bytes(queue_name),
0,
@@ -91,12 +92,14 @@ int main(int argc, char const * const *argv) {
die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket");
amqp_set_sockfd(conn, sockfd);
- die_on_amqp_error(amqp_login(conn, "/", 131072, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
+ die_on_amqp_error(amqp_login(conn, "/", 0, 131072, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
"Logging in");
+ amqp_channel_open(conn, 1);
+ die_on_amqp_error(amqp_rpc_reply, "Opening channel");
send_batch(conn, "test queue", rate_limit, message_count);
- die_on_amqp_error(amqp_channel_close(conn, AMQP_REPLY_SUCCESS), "Closing channel");
+ die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");
amqp_destroy_connection(conn);
die_on_error(close(sockfd), "Closing socket");
diff --git a/examples/amqp_sendstring.c b/examples/amqp_sendstring.c
index c914f86..a26cd3a 100644
--- a/examples/amqp_sendstring.c
+++ b/examples/amqp_sendstring.c
@@ -35,14 +35,17 @@ int main(int argc, char const * const *argv) {
die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket");
amqp_set_sockfd(conn, sockfd);
- die_on_amqp_error(amqp_login(conn, "/", 131072, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
+ die_on_amqp_error(amqp_login(conn, "/", 0, 131072, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
"Logging in");
+ amqp_channel_open(conn, 1);
+ die_on_amqp_error(amqp_rpc_reply, "Opening channel");
{
amqp_basic_properties_t props;
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG;
props.content_type = amqp_cstring_bytes("text/plain");
die_on_error(amqp_basic_publish(conn,
+ 1,
amqp_cstring_bytes(exchange),
amqp_cstring_bytes(routingkey),
0,
@@ -52,7 +55,7 @@ int main(int argc, char const * const *argv) {
"Publishing");
}
- die_on_amqp_error(amqp_channel_close(conn, AMQP_REPLY_SUCCESS), "Closing channel");
+ die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");
amqp_destroy_connection(conn);
die_on_error(close(sockfd), "Closing socket");
diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h
index 6c9773f..d485093 100644
--- a/librabbitmq/amqp.h
+++ b/librabbitmq/amqp.h
@@ -137,10 +137,13 @@ extern amqp_bytes_t amqp_bytes_malloc_dup(amqp_bytes_t src);
})
extern amqp_connection_state_t amqp_new_connection(void);
+extern int amqp_get_sockfd(amqp_connection_state_t state);
extern void amqp_set_sockfd(amqp_connection_state_t state,
int sockfd);
extern int amqp_tune_connection(amqp_connection_state_t state,
+ int channel_max,
int frame_max);
+int amqp_get_channel_max(amqp_connection_state_t state);
extern void amqp_destroy_connection(amqp_connection_state_t state);
extern int amqp_handle_input(amqp_connection_state_t state,
@@ -175,7 +178,8 @@ extern int amqp_simple_wait_frame(amqp_connection_state_t state,
amqp_frame_t *decoded_frame);
extern int amqp_simple_wait_method(amqp_connection_state_t state,
- amqp_method_number_t expected_or_zero,
+ amqp_channel_t expected_channel,
+ amqp_method_number_t expected_method,
amqp_method_t *output);
extern int amqp_send_method(amqp_connection_state_t state,
@@ -200,11 +204,18 @@ extern amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state,
extern amqp_rpc_reply_t amqp_login(amqp_connection_state_t state,
char const *vhost,
+ int channel_max,
int frame_max,
amqp_sasl_method_enum sasl_method, ...);
+extern amqp_rpc_reply_t amqp_rpc_reply;
+
+extern struct amqp_channel_open_ok_t_ *amqp_channel_open(amqp_connection_state_t state,
+ amqp_channel_t channel);
+
struct amqp_basic_properties_t_;
extern int amqp_basic_publish(amqp_connection_state_t state,
+ amqp_channel_t channel,
amqp_bytes_t exchange,
amqp_bytes_t routing_key,
amqp_boolean_t mandatory,
@@ -212,10 +223,11 @@ extern int amqp_basic_publish(amqp_connection_state_t state,
struct amqp_basic_properties_t_ const *properties,
amqp_bytes_t body);
-extern amqp_rpc_reply_t amqp_channel_close(amqp_connection_state_t state, int code);
-extern amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state, int code);
-
-extern amqp_rpc_reply_t amqp_rpc_reply;
+extern amqp_rpc_reply_t amqp_channel_close(amqp_connection_state_t state,
+ amqp_channel_t channel,
+ int code);
+extern amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state,
+ int code);
extern struct amqp_exchange_declare_ok_t_ *amqp_exchange_declare(amqp_connection_state_t state,
amqp_channel_t channel,
diff --git a/librabbitmq/amqp_api.c b/librabbitmq/amqp_api.c
index b6dc647..476cb2d 100644
--- a/librabbitmq/amqp_api.c
+++ b/librabbitmq/amqp_api.c
@@ -10,7 +10,25 @@
#include <assert.h>
+amqp_rpc_reply_t amqp_rpc_reply;
+
+#define RPC_REPLY(replytype) \
+ (amqp_rpc_reply.reply_type == AMQP_RESPONSE_NORMAL \
+ ? (replytype *) amqp_rpc_reply.reply.decoded \
+ : NULL)
+
+amqp_channel_open_ok_t *amqp_channel_open(amqp_connection_state_t state,
+ amqp_channel_t channel)
+{
+ amqp_rpc_reply =
+ AMQP_SIMPLE_RPC(state, channel, CHANNEL, OPEN, OPEN_OK,
+ amqp_channel_open_t,
+ AMQP_EMPTY_BYTES);
+ return RPC_REPLY(amqp_channel_open_ok_t);
+}
+
int amqp_basic_publish(amqp_connection_state_t state,
+ amqp_channel_t channel,
amqp_bytes_t exchange,
amqp_bytes_t routing_key,
amqp_boolean_t mandatory,
@@ -32,7 +50,7 @@ int amqp_basic_publish(amqp_connection_state_t state,
amqp_basic_properties_t default_properties;
- AMQP_CHECK_RESULT(amqp_send_method(state, 1, AMQP_BASIC_PUBLISH_METHOD, &m));
+ AMQP_CHECK_RESULT(amqp_send_method(state, channel, AMQP_BASIC_PUBLISH_METHOD, &m));
if (properties == NULL) {
memset(&default_properties, 0, sizeof(default_properties));
@@ -40,7 +58,7 @@ int amqp_basic_publish(amqp_connection_state_t state,
}
f.frame_type = AMQP_FRAME_HEADER;
- f.channel = 1;
+ f.channel = channel;
f.payload.properties.class_id = AMQP_BASIC_CLASS;
f.payload.properties.body_size = body.len;
f.payload.properties.decoded = (void *) properties;
@@ -55,7 +73,7 @@ int amqp_basic_publish(amqp_connection_state_t state,
break;
f.frame_type = AMQP_FRAME_BODY;
- f.channel = 1;
+ f.channel = channel;
f.payload.body_fragment.bytes = BUF_AT(body, body_offset);
if (remaining >= usable_body_payload_size) {
f.payload.body_fragment.len = usable_body_payload_size;
@@ -70,15 +88,20 @@ int amqp_basic_publish(amqp_connection_state_t state,
return 0;
}
-amqp_rpc_reply_t amqp_channel_close(amqp_connection_state_t state, int code) {
+amqp_rpc_reply_t amqp_channel_close(amqp_connection_state_t state,
+ amqp_channel_t channel,
+ int code)
+{
char codestr[13];
snprintf(codestr, sizeof(codestr), "%d", code);
- return AMQP_SIMPLE_RPC(state, 1, CHANNEL, CLOSE, CLOSE_OK,
+ return AMQP_SIMPLE_RPC(state, channel, CHANNEL, CLOSE, CLOSE_OK,
amqp_channel_close_t,
code, amqp_cstring_bytes(codestr), 0, 0);
}
-amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state, int code) {
+amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state,
+ int code)
+{
char codestr[13];
snprintf(codestr, sizeof(codestr), "%d", code);
return AMQP_SIMPLE_RPC(state, 0, CONNECTION, CLOSE, CLOSE_OK,
@@ -86,13 +109,6 @@ amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state, int code)
code, amqp_cstring_bytes(codestr), 0, 0);
}
-amqp_rpc_reply_t amqp_rpc_reply;
-
-#define RPC_REPLY(replytype) \
- (amqp_rpc_reply.reply_type == AMQP_RESPONSE_NORMAL \
- ? (replytype *) amqp_rpc_reply.reply.decoded \
- : NULL)
-
amqp_exchange_declare_ok_t *amqp_exchange_declare(amqp_connection_state_t state,
amqp_channel_t channel,
amqp_bytes_t exchange,
diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c
index d39a829..9653694 100644
--- a/librabbitmq/amqp_connection.c
+++ b/librabbitmq/amqp_connection.c
@@ -42,7 +42,7 @@ amqp_connection_state_t amqp_new_connection(void) {
state->inbound_buffer.bytes = NULL;
state->outbound_buffer.bytes = NULL;
- if (amqp_tune_connection(state, INITIAL_FRAME_POOL_PAGE_SIZE) != 0) {
+ if (amqp_tune_connection(state, 0, INITIAL_FRAME_POOL_PAGE_SIZE) != 0) {
empty_amqp_pool(&state->frame_pool);
empty_amqp_pool(&state->decoding_pool);
free(state);
@@ -69,6 +69,10 @@ amqp_connection_state_t amqp_new_connection(void) {
return state;
}
+int amqp_get_sockfd(amqp_connection_state_t state) {
+ return state->sockfd;
+}
+
void amqp_set_sockfd(amqp_connection_state_t state,
int sockfd)
{
@@ -76,12 +80,14 @@ void amqp_set_sockfd(amqp_connection_state_t state,
}
int amqp_tune_connection(amqp_connection_state_t state,
+ int channel_max,
int frame_max)
{
void *newbuf;
ENFORCE_STATE(state, CONNECTION_STATE_IDLE);
+ state->channel_max = channel_max;
state->frame_max = frame_max;
empty_amqp_pool(&state->frame_pool);
@@ -99,6 +105,10 @@ int amqp_tune_connection(amqp_connection_state_t state,
return 0;
}
+int amqp_get_channel_max(amqp_connection_state_t state) {
+ return state->channel_max;
+}
+
void amqp_destroy_connection(amqp_connection_state_t state) {
empty_amqp_pool(&state->frame_pool);
empty_amqp_pool(&state->decoding_pool);
diff --git a/librabbitmq/amqp_private.h b/librabbitmq/amqp_private.h
index d4b1469..0f9987a 100644
--- a/librabbitmq/amqp_private.h
+++ b/librabbitmq/amqp_private.h
@@ -54,6 +54,7 @@ struct amqp_connection_state_t_ {
amqp_connection_state_enum state;
+ int channel_max;
int frame_max;
amqp_bytes_t inbound_buffer;
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index b42ba47..04e8c32 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -162,19 +162,27 @@ int amqp_simple_wait_frame(amqp_connection_state_t state,
}
int amqp_simple_wait_method(amqp_connection_state_t state,
- amqp_method_number_t expected_or_zero,
+ amqp_channel_t expected_channel,
+ amqp_method_number_t expected_method,
amqp_method_t *output)
{
amqp_frame_t frame;
AMQP_CHECK_EOF_RESULT(amqp_simple_wait_frame(state, &frame));
+ amqp_assert(frame.channel == expected_channel,
+ "Expected 0x%08X method frame on channel %d, got frame on channel %d",
+ expected_method,
+ expected_channel,
+ frame.channel);
amqp_assert(frame.frame_type == AMQP_FRAME_METHOD,
- "Expected 0x%08X method frame, got frame type %d",
- expected_or_zero,
+ "Expected 0x%08X method frame on channel %d, got frame type %d",
+ expected_method,
+ expected_channel,
frame.frame_type);
- amqp_assert((expected_or_zero == 0) || (frame.payload.method.id == expected_or_zero),
- "Expected method ID 0x%08X, got ID 0x%08X",
- expected_or_zero,
+ amqp_assert(frame.payload.method.id == expected_method,
+ "Expected method ID 0x%08X on channel %d, got ID 0x%08X",
+ expected_method,
+ expected_channel,
frame.payload.method.id);
*output = frame.payload.method;
return 1;
@@ -267,16 +275,18 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state,
}
static int amqp_login_inner(amqp_connection_state_t state,
+ int channel_max,
int frame_max,
amqp_sasl_method_enum sasl_method,
va_list vl)
{
amqp_method_t method;
uint32_t server_frame_max;
+ uint16_t server_channel_max;
amqp_send_header(state);
- AMQP_CHECK_EOF_RESULT(amqp_simple_wait_method(state, AMQP_CONNECTION_START_METHOD, &method));
+ AMQP_CHECK_EOF_RESULT(amqp_simple_wait_method(state, 0, AMQP_CONNECTION_START_METHOD, &method));
{
amqp_connection_start_t *s = (amqp_connection_start_t *) method.decoded;
if ((s->version_major != AMQP_PROTOCOL_VERSION_MAJOR) ||
@@ -303,20 +313,27 @@ static int amqp_login_inner(amqp_connection_state_t state,
amqp_release_buffers(state);
- AMQP_CHECK_EOF_RESULT(amqp_simple_wait_method(state, AMQP_CONNECTION_TUNE_METHOD, &method));
+ AMQP_CHECK_EOF_RESULT(amqp_simple_wait_method(state, 0, AMQP_CONNECTION_TUNE_METHOD, &method));
{
amqp_connection_tune_t *s = (amqp_connection_tune_t *) method.decoded;
+ server_channel_max = s->channel_max;
server_frame_max = s->frame_max;
}
+ if (server_channel_max != 0 && server_channel_max < channel_max) {
+ channel_max = server_channel_max;
+ }
+
if (server_frame_max != 0 && server_frame_max < frame_max) {
frame_max = server_frame_max;
}
+ AMQP_CHECK_RESULT(amqp_tune_connection(state, channel_max, frame_max));
+
{
amqp_connection_tune_ok_t s =
(amqp_connection_tune_ok_t) {
- .channel_max = 1,
+ .channel_max = channel_max,
.frame_max = frame_max,
.heartbeat = 0
};
@@ -330,6 +347,7 @@ static int amqp_login_inner(amqp_connection_state_t state,
amqp_rpc_reply_t amqp_login(amqp_connection_state_t state,
char const *vhost,
+ int channel_max,
int frame_max,
amqp_sasl_method_enum sasl_method,
...)
@@ -339,7 +357,7 @@ amqp_rpc_reply_t amqp_login(amqp_connection_state_t state,
va_start(vl, sasl_method);
- amqp_login_inner(state, frame_max, sasl_method, vl);
+ amqp_login_inner(state, channel_max, frame_max, sasl_method, vl);
{
amqp_connection_open_t s =
@@ -359,22 +377,6 @@ amqp_rpc_reply_t amqp_login(amqp_connection_state_t state,
}
amqp_maybe_release_buffers(state);
- {
- amqp_channel_open_t s =
- (amqp_channel_open_t) {
- .out_of_band = {.len = 0, .bytes = NULL}
- };
- result = amqp_simple_rpc(state,
- 1,
- AMQP_CHANNEL_OPEN_METHOD,
- AMQP_CHANNEL_OPEN_OK_METHOD,
- &s);
- if (result.reply_type != AMQP_RESPONSE_NORMAL) {
- return result;
- }
- }
- amqp_maybe_release_buffers(state);
-
va_end(vl);
result.reply_type = AMQP_RESPONSE_NORMAL;