summaryrefslogtreecommitdiff
path: root/librabbitmq/amqp_socket.c
diff options
context:
space:
mode:
authorAlan Antonuk <alan.antonuk@gmail.com>2013-02-12 22:00:58 -0800
committerAlan Antonuk <alan.antonuk@gmail.com>2013-04-09 00:14:57 -0700
commit21b124e2fd2f1c343fb37b708f393d1b9580cfad (patch)
tree50be437d1ff415380d22e9a070bfc785ce1d9833 /librabbitmq/amqp_socket.c
parent6217c6ace0db83451c48c578093b9d545f335b1b (diff)
downloadrabbitmq-c-github-ask-21b124e2fd2f1c343fb37b708f393d1b9580cfad.tar.gz
Add amqp_login_with_properties function
Allows clients to specify arbitrary properties when logging on to the broker
Diffstat (limited to 'librabbitmq/amqp_socket.c')
-rw-r--r--librabbitmq/amqp_socket.c197
1 files changed, 140 insertions, 57 deletions
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index 5e7fa9c..22cc2c5 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -410,32 +410,56 @@ amqp_rpc_reply_t amqp_get_rpc_reply(amqp_connection_state_t state)
}
-static int amqp_login_inner(amqp_connection_state_t state,
- int channel_max,
- int frame_max,
- int heartbeat,
- amqp_sasl_method_enum sasl_method,
- va_list vl)
+static int amqp_table_contains_entry(const amqp_table_t *table,
+ const amqp_table_entry_t *entry)
+{
+ int i;
+ amqp_table_entry_t *current_entry;
+
+ assert(table != NULL);
+ assert(entry != NULL);
+
+ current_entry = table->entries;
+
+ for (i = 0; i < table->num_entries; ++i, ++current_entry) {
+ if (0 == amqp_table_entry_cmp(current_entry, entry)) {
+ return 1;
+ }
+ }
+
+ return 0;
+}
+
+static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state,
+ char const *vhost,
+ int channel_max,
+ int frame_max,
+ int heartbeat,
+ const amqp_table_t *client_properties,
+ amqp_sasl_method_enum sasl_method,
+ va_list vl)
{
int res;
amqp_method_t method;
int server_frame_max;
uint16_t server_channel_max;
uint16_t server_heartbeat;
+ amqp_rpc_reply_t result;
amqp_send_header(state);
res = amqp_simple_wait_method(state, 0, AMQP_CONNECTION_START_METHOD,
&method);
if (res < 0) {
- return res;
+ goto error_res;
}
{
amqp_connection_start_t *s = (amqp_connection_start_t *) method.decoded;
- if ((s->version_major != AMQP_PROTOCOL_VERSION_MAJOR) ||
- (s->version_minor != AMQP_PROTOCOL_VERSION_MINOR)) {
- return -ERROR_INCOMPATIBLE_AMQP_VERSION;
+ if ((s->version_major != AMQP_PROTOCOL_VERSION_MAJOR)
+ || (s->version_minor != AMQP_PROTOCOL_VERSION_MINOR)) {
+ res = -ERROR_INCOMPATIBLE_AMQP_VERSION;
+ goto error_res;
}
/* TODO: check that our chosen SASL mechanism is in the list of
@@ -444,27 +468,71 @@ static int amqp_login_inner(amqp_connection_state_t state,
}
{
- amqp_table_entry_t properties[2];
+ amqp_table_entry_t default_properties[2];
+ amqp_table_t default_table;
amqp_connection_start_ok_t s;
amqp_bytes_t response_bytes = sasl_response(&state->decoding_pool,
- sasl_method, vl);
+ sasl_method, vl);
if (response_bytes.bytes == NULL) {
- return -ERROR_NO_MEMORY;
+ res = -ERROR_NO_MEMORY;
+ goto error_res;
}
- properties[0].key = amqp_cstring_bytes("product");
- properties[0].value.kind = AMQP_FIELD_KIND_UTF8;
- properties[0].value.value.bytes
- = amqp_cstring_bytes("rabbitmq-c");
+ default_properties[0].key = amqp_cstring_bytes("product");
+ default_properties[0].value.kind = AMQP_FIELD_KIND_UTF8;
+ default_properties[0].value.value.bytes =
+ amqp_cstring_bytes("rabbitmq-c");
+
+ default_properties[1].key = amqp_cstring_bytes("information");
+ default_properties[1].value.kind = AMQP_FIELD_KIND_UTF8;
+ default_properties[1].value.value.bytes =
+ amqp_cstring_bytes("See https://github.com/alanxz/rabbitmq-c");
+
+ default_table.entries = default_properties;
+ default_table.num_entries = sizeof(default_properties) / sizeof(amqp_table_entry_t);
- properties[1].key = amqp_cstring_bytes("information");
- properties[1].value.kind = AMQP_FIELD_KIND_UTF8;
- properties[1].value.value.bytes
- = amqp_cstring_bytes("See https://github.com/alanxz/rabbitmq-c");
+ if (0 == client_properties->num_entries) {
+ s.client_properties = default_table;
+ } else {
+ /* Merge provided properties with our default properties:
+ * - Copy default properties.
+ * - Any provided property that doesn't have the same key as a default
+ * property is also copied.
+ *
+ * TODO: if one of the default properties is a capabilities table, we will
+ * need to figure out how to merge this if the user provides a capabilites
+ * table
+ */
+ int i;
+ amqp_table_entry_t *current_entry;
+
+ s.client_properties.entries = amqp_pool_alloc(&state->decoding_pool,
+ sizeof(amqp_table_entry_t) * (default_table.num_entries + client_properties->num_entries));
+ if (NULL == s.client_properties.entries) {
+ res = -ERROR_NO_MEMORY;
+ goto error_res;
+ }
+ s.client_properties.num_entries = 0;
+
+ current_entry = s.client_properties.entries;
+
+ for (i = 0; i < default_table.num_entries; ++i) {
+ memcpy(current_entry, &default_table.entries[i], sizeof(amqp_table_entry_t));
+ s.client_properties.num_entries += 1;
+ ++current_entry;
+ }
+
+ for (i = 0; i < client_properties->num_entries; ++i) {
+ if (amqp_table_contains_entry(&default_table, &client_properties->entries[i])) {
+ continue;
+ }
+ memcpy(current_entry, &client_properties->entries[i], sizeof(amqp_table_entry_t));
+ s.client_properties.num_entries += 1;
+ ++current_entry;
+ }
+ }
- s.client_properties.num_entries = 2;
- s.client_properties.entries = properties;
s.mechanism = sasl_method_name(sasl_method);
s.response = response_bytes;
s.locale.bytes = "en_US";
@@ -472,7 +540,7 @@ static int amqp_login_inner(amqp_connection_state_t state,
res = amqp_send_method(state, 0, AMQP_CONNECTION_START_OK_METHOD, &s);
if (res < 0) {
- return res;
+ goto error_res;
}
}
@@ -481,7 +549,7 @@ static int amqp_login_inner(amqp_connection_state_t state,
res = amqp_simple_wait_method(state, 0, AMQP_CONNECTION_TUNE_METHOD,
&method);
if (res < 0) {
- return res;
+ goto error_res;
}
{
@@ -505,7 +573,7 @@ static int amqp_login_inner(amqp_connection_state_t state,
res = amqp_tune_connection(state, channel_max, frame_max, heartbeat);
if (res < 0) {
- return res;
+ goto error_res;
}
{
@@ -516,38 +584,12 @@ static int amqp_login_inner(amqp_connection_state_t state,
res = amqp_send_method(state, 0, AMQP_CONNECTION_TUNE_OK_METHOD, &s);
if (res < 0) {
- return res;
+ goto error_res;
}
}
amqp_release_buffers(state);
- return 0;
-}
-
-amqp_rpc_reply_t amqp_login(amqp_connection_state_t state,
- char const *vhost,
- int channel_max,
- int frame_max,
- int heartbeat,
- amqp_sasl_method_enum sasl_method,
- ...)
-{
- va_list vl;
- amqp_rpc_reply_t result;
- int status;
-
- va_start(vl, sasl_method);
-
- status = amqp_login_inner(state, channel_max, frame_max, heartbeat, sasl_method, vl);
- if (status < 0) {
- result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
- result.reply.id = 0;
- result.reply.decoded = NULL;
- result.library_error = -status;
- return result;
- }
-
{
amqp_method_number_t replies[] = { AMQP_CONNECTION_OPEN_OK_METHOD, 0 };
amqp_connection_open_t s;
@@ -562,16 +604,57 @@ amqp_rpc_reply_t amqp_login(amqp_connection_state_t state,
(amqp_method_number_t *) &replies,
&s);
if (result.reply_type != AMQP_RESPONSE_NORMAL) {
- return result;
+ goto out;
}
}
- amqp_maybe_release_buffers(state);
-
- va_end(vl);
result.reply_type = AMQP_RESPONSE_NORMAL;
result.reply.id = 0;
result.reply.decoded = NULL;
result.library_error = 0;
+
+out:
+ amqp_maybe_release_buffers(state);
return result;
+
+error_res:
+ result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
+ result.reply.id = 0;
+ result.reply.decoded = NULL;
+ result.library_error = -res;
+
+ goto out;
+}
+
+amqp_rpc_reply_t amqp_login(amqp_connection_state_t state,
+ char const *vhost,
+ int channel_max,
+ int frame_max,
+ int heartbeat,
+ amqp_sasl_method_enum sasl_method,
+ ...)
+{
+ va_list vl;
+
+ va_start(vl, sasl_method);
+
+ return amqp_login_inner(state, vhost, channel_max, frame_max, heartbeat,
+ &amqp_empty_table, sasl_method, vl);
+}
+
+amqp_rpc_reply_t amqp_login_with_properties(amqp_connection_state_t state,
+ char const *vhost,
+ int channel_max,
+ int frame_max,
+ int heartbeat,
+ const amqp_table_t *client_properties,
+ amqp_sasl_method_enum sasl_method,
+ ...)
+{
+ va_list vl;
+
+ va_start(vl, sasl_method);
+
+ return amqp_login_inner(state, vhost, channel_max, frame_max, heartbeat,
+ client_properties, sasl_method, vl);
}