diff options
author | Alan Antonuk <alan.antonuk@gmail.com> | 2013-02-12 22:00:58 -0800 |
---|---|---|
committer | Alan Antonuk <alan.antonuk@gmail.com> | 2013-04-09 00:14:57 -0700 |
commit | 21b124e2fd2f1c343fb37b708f393d1b9580cfad (patch) | |
tree | 50be437d1ff415380d22e9a070bfc785ce1d9833 /librabbitmq/amqp_socket.c | |
parent | 6217c6ace0db83451c48c578093b9d545f335b1b (diff) | |
download | rabbitmq-c-github-ask-21b124e2fd2f1c343fb37b708f393d1b9580cfad.tar.gz |
Add amqp_login_with_properties function
Allows clients to specify arbitrary properties when logging on to the
broker
Diffstat (limited to 'librabbitmq/amqp_socket.c')
-rw-r--r-- | librabbitmq/amqp_socket.c | 197 |
1 files changed, 140 insertions, 57 deletions
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c index 5e7fa9c..22cc2c5 100644 --- a/librabbitmq/amqp_socket.c +++ b/librabbitmq/amqp_socket.c @@ -410,32 +410,56 @@ amqp_rpc_reply_t amqp_get_rpc_reply(amqp_connection_state_t state) } -static int amqp_login_inner(amqp_connection_state_t state, - int channel_max, - int frame_max, - int heartbeat, - amqp_sasl_method_enum sasl_method, - va_list vl) +static int amqp_table_contains_entry(const amqp_table_t *table, + const amqp_table_entry_t *entry) +{ + int i; + amqp_table_entry_t *current_entry; + + assert(table != NULL); + assert(entry != NULL); + + current_entry = table->entries; + + for (i = 0; i < table->num_entries; ++i, ++current_entry) { + if (0 == amqp_table_entry_cmp(current_entry, entry)) { + return 1; + } + } + + return 0; +} + +static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state, + char const *vhost, + int channel_max, + int frame_max, + int heartbeat, + const amqp_table_t *client_properties, + amqp_sasl_method_enum sasl_method, + va_list vl) { int res; amqp_method_t method; int server_frame_max; uint16_t server_channel_max; uint16_t server_heartbeat; + amqp_rpc_reply_t result; amqp_send_header(state); res = amqp_simple_wait_method(state, 0, AMQP_CONNECTION_START_METHOD, &method); if (res < 0) { - return res; + goto error_res; } { amqp_connection_start_t *s = (amqp_connection_start_t *) method.decoded; - if ((s->version_major != AMQP_PROTOCOL_VERSION_MAJOR) || - (s->version_minor != AMQP_PROTOCOL_VERSION_MINOR)) { - return -ERROR_INCOMPATIBLE_AMQP_VERSION; + if ((s->version_major != AMQP_PROTOCOL_VERSION_MAJOR) + || (s->version_minor != AMQP_PROTOCOL_VERSION_MINOR)) { + res = -ERROR_INCOMPATIBLE_AMQP_VERSION; + goto error_res; } /* TODO: check that our chosen SASL mechanism is in the list of @@ -444,27 +468,71 @@ static int amqp_login_inner(amqp_connection_state_t state, } { - amqp_table_entry_t properties[2]; + amqp_table_entry_t default_properties[2]; + amqp_table_t default_table; amqp_connection_start_ok_t s; amqp_bytes_t response_bytes = sasl_response(&state->decoding_pool, - sasl_method, vl); + sasl_method, vl); if (response_bytes.bytes == NULL) { - return -ERROR_NO_MEMORY; + res = -ERROR_NO_MEMORY; + goto error_res; } - properties[0].key = amqp_cstring_bytes("product"); - properties[0].value.kind = AMQP_FIELD_KIND_UTF8; - properties[0].value.value.bytes - = amqp_cstring_bytes("rabbitmq-c"); + default_properties[0].key = amqp_cstring_bytes("product"); + default_properties[0].value.kind = AMQP_FIELD_KIND_UTF8; + default_properties[0].value.value.bytes = + amqp_cstring_bytes("rabbitmq-c"); + + default_properties[1].key = amqp_cstring_bytes("information"); + default_properties[1].value.kind = AMQP_FIELD_KIND_UTF8; + default_properties[1].value.value.bytes = + amqp_cstring_bytes("See https://github.com/alanxz/rabbitmq-c"); + + default_table.entries = default_properties; + default_table.num_entries = sizeof(default_properties) / sizeof(amqp_table_entry_t); - properties[1].key = amqp_cstring_bytes("information"); - properties[1].value.kind = AMQP_FIELD_KIND_UTF8; - properties[1].value.value.bytes - = amqp_cstring_bytes("See https://github.com/alanxz/rabbitmq-c"); + if (0 == client_properties->num_entries) { + s.client_properties = default_table; + } else { + /* Merge provided properties with our default properties: + * - Copy default properties. + * - Any provided property that doesn't have the same key as a default + * property is also copied. + * + * TODO: if one of the default properties is a capabilities table, we will + * need to figure out how to merge this if the user provides a capabilites + * table + */ + int i; + amqp_table_entry_t *current_entry; + + s.client_properties.entries = amqp_pool_alloc(&state->decoding_pool, + sizeof(amqp_table_entry_t) * (default_table.num_entries + client_properties->num_entries)); + if (NULL == s.client_properties.entries) { + res = -ERROR_NO_MEMORY; + goto error_res; + } + s.client_properties.num_entries = 0; + + current_entry = s.client_properties.entries; + + for (i = 0; i < default_table.num_entries; ++i) { + memcpy(current_entry, &default_table.entries[i], sizeof(amqp_table_entry_t)); + s.client_properties.num_entries += 1; + ++current_entry; + } + + for (i = 0; i < client_properties->num_entries; ++i) { + if (amqp_table_contains_entry(&default_table, &client_properties->entries[i])) { + continue; + } + memcpy(current_entry, &client_properties->entries[i], sizeof(amqp_table_entry_t)); + s.client_properties.num_entries += 1; + ++current_entry; + } + } - s.client_properties.num_entries = 2; - s.client_properties.entries = properties; s.mechanism = sasl_method_name(sasl_method); s.response = response_bytes; s.locale.bytes = "en_US"; @@ -472,7 +540,7 @@ static int amqp_login_inner(amqp_connection_state_t state, res = amqp_send_method(state, 0, AMQP_CONNECTION_START_OK_METHOD, &s); if (res < 0) { - return res; + goto error_res; } } @@ -481,7 +549,7 @@ static int amqp_login_inner(amqp_connection_state_t state, res = amqp_simple_wait_method(state, 0, AMQP_CONNECTION_TUNE_METHOD, &method); if (res < 0) { - return res; + goto error_res; } { @@ -505,7 +573,7 @@ static int amqp_login_inner(amqp_connection_state_t state, res = amqp_tune_connection(state, channel_max, frame_max, heartbeat); if (res < 0) { - return res; + goto error_res; } { @@ -516,38 +584,12 @@ static int amqp_login_inner(amqp_connection_state_t state, res = amqp_send_method(state, 0, AMQP_CONNECTION_TUNE_OK_METHOD, &s); if (res < 0) { - return res; + goto error_res; } } amqp_release_buffers(state); - return 0; -} - -amqp_rpc_reply_t amqp_login(amqp_connection_state_t state, - char const *vhost, - int channel_max, - int frame_max, - int heartbeat, - amqp_sasl_method_enum sasl_method, - ...) -{ - va_list vl; - amqp_rpc_reply_t result; - int status; - - va_start(vl, sasl_method); - - status = amqp_login_inner(state, channel_max, frame_max, heartbeat, sasl_method, vl); - if (status < 0) { - result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; - result.reply.id = 0; - result.reply.decoded = NULL; - result.library_error = -status; - return result; - } - { amqp_method_number_t replies[] = { AMQP_CONNECTION_OPEN_OK_METHOD, 0 }; amqp_connection_open_t s; @@ -562,16 +604,57 @@ amqp_rpc_reply_t amqp_login(amqp_connection_state_t state, (amqp_method_number_t *) &replies, &s); if (result.reply_type != AMQP_RESPONSE_NORMAL) { - return result; + goto out; } } - amqp_maybe_release_buffers(state); - - va_end(vl); result.reply_type = AMQP_RESPONSE_NORMAL; result.reply.id = 0; result.reply.decoded = NULL; result.library_error = 0; + +out: + amqp_maybe_release_buffers(state); return result; + +error_res: + result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; + result.reply.id = 0; + result.reply.decoded = NULL; + result.library_error = -res; + + goto out; +} + +amqp_rpc_reply_t amqp_login(amqp_connection_state_t state, + char const *vhost, + int channel_max, + int frame_max, + int heartbeat, + amqp_sasl_method_enum sasl_method, + ...) +{ + va_list vl; + + va_start(vl, sasl_method); + + return amqp_login_inner(state, vhost, channel_max, frame_max, heartbeat, + &amqp_empty_table, sasl_method, vl); +} + +amqp_rpc_reply_t amqp_login_with_properties(amqp_connection_state_t state, + char const *vhost, + int channel_max, + int frame_max, + int heartbeat, + const amqp_table_t *client_properties, + amqp_sasl_method_enum sasl_method, + ...) +{ + va_list vl; + + va_start(vl, sasl_method); + + return amqp_login_inner(state, vhost, channel_max, frame_max, heartbeat, + client_properties, sasl_method, vl); } |