summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Antonuk <alan.antonuk@gmail.com>2014-12-25 21:37:44 -0800
committerAlan Antonuk <alan.antonuk@gmail.com>2015-05-06 23:52:26 -0700
commitfb8e31833ffc92ba3ce8d5b7e5913ba07925b887 (patch)
tree6a465c1487f6aa203f4f43ac5f9879467f3417ce
parent858576bba0e60a4e4f11feaf26e6376f424352f3 (diff)
downloadrabbitmq-c-fb8e31833ffc92ba3ce8d5b7e5913ba07925b887.tar.gz
Add cababilities merge function.
Add amqp_merge_capabilities function that merges client-properties tables.
-rw-r--r--librabbitmq/amqp_socket.c116
-rw-r--r--librabbitmq/amqp_socket.h2
2 files changed, 64 insertions, 54 deletions
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index 6425508..d234c0d 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -43,6 +43,8 @@
#endif
#include "amqp_private.h"
+#include "amqp_socket.h"
+#include "amqp_table.h"
#include "amqp_time.h"
#include <assert.h>
@@ -1095,25 +1097,66 @@ amqp_rpc_reply_t amqp_get_rpc_reply(amqp_connection_state_t state)
return state->most_recent_api_result;
}
-
-static int amqp_table_contains_entry(const amqp_table_t *table,
- const amqp_table_entry_t *entry)
-{
+/*
+ * Merge base and add tables. If the two tables contain an entry with the same
+ * key, the entry from the add table takes precedence. For entries that are both
+ * tables with the same key, the table is recursively merged.
+ */
+int amqp_merge_capabilities(const amqp_table_t *base, const amqp_table_t *add,
+ amqp_table_t *result, amqp_pool_t *pool) {
int i;
- amqp_table_entry_t *current_entry;
-
- assert(table != NULL);
- assert(entry != NULL);
-
- current_entry = table->entries;
+ int res;
+ amqp_pool_t temp_pool;
+ amqp_table_t temp_result;
+ assert(base != NULL);
+ assert(result != NULL);
+ assert(pool != NULL);
+
+ if (NULL == add) {
+ return amqp_table_clone(base, result, pool);
+ }
- for (i = 0; i < table->num_entries; ++i, ++current_entry) {
- if (0 == amqp_table_entry_cmp(current_entry, entry)) {
- return 1;
+ init_amqp_pool(&temp_pool, 4096);
+ temp_result.num_entries = 0;
+ temp_result.entries =
+ amqp_pool_alloc(&temp_pool, sizeof(amqp_table_entry_t) *
+ (base->num_entries + add->num_entries));
+ if (NULL == temp_result.entries) {
+ res = AMQP_STATUS_NO_MEMORY;
+ goto error_out;
+ }
+ for (i = 0; i < base->num_entries; ++i) {
+ temp_result.entries[temp_result.num_entries] = base->entries[i];
+ temp_result.num_entries++;
+ }
+ for (i = 0; i < add->num_entries; ++i) {
+ amqp_table_entry_t *e =
+ amqp_table_get_entry_by_key(&temp_result, add->entries[i].key);
+ if (NULL != e) {
+ if (AMQP_FIELD_KIND_TABLE == add->entries[i].value.kind &&
+ AMQP_FIELD_KIND_TABLE == e->value.kind) {
+ int res;
+ amqp_table_entry_t *be =
+ amqp_table_get_entry_by_key(base, add->entries[i].key);
+
+ res = amqp_merge_capabilities(&be->value.value.table,
+ &add->entries[i].value.value.table,
+ &e->value.value.table, &temp_pool);
+ if (AMQP_STATUS_OK != res) {
+ goto error_out;
+ }
+ } else {
+ e->value = add->entries[i].value;
+ }
+ } else {
+ temp_result.entries[temp_result.num_entries] = add->entries[i];
+ temp_result.num_entries++;
}
}
-
- return 0;
+ res = amqp_table_clone(&temp_result, result, pool);
+error_out:
+ empty_amqp_pool(&temp_pool);
+ return res;
}
static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state,
@@ -1204,45 +1247,10 @@ static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state,
default_table.num_entries =
sizeof(default_properties) / sizeof(amqp_table_entry_t);
- if (0 == client_properties->num_entries) {
- s.client_properties = default_table;
- } else {
- /* Merge provided properties with our default properties:
- * - Copy default properties.
- * - Any provided property that doesn't have the same key as a default
- * property is also copied.
- *
- * TODO: if one of the default properties is a capabilities table, we will
- * need to figure out how to merge this if the user provides a capabilites
- * table
- */
- int i;
- amqp_table_entry_t *current_entry;
-
- s.client_properties.entries = amqp_pool_alloc(channel_pool,
- sizeof(amqp_table_entry_t) * (default_table.num_entries + client_properties->num_entries));
- if (NULL == s.client_properties.entries) {
- res = AMQP_STATUS_NO_MEMORY;
- goto error_res;
- }
- s.client_properties.num_entries = 0;
-
- current_entry = s.client_properties.entries;
-
- for (i = 0; i < default_table.num_entries; ++i) {
- memcpy(current_entry, &default_table.entries[i], sizeof(amqp_table_entry_t));
- s.client_properties.num_entries += 1;
- ++current_entry;
- }
-
- for (i = 0; i < client_properties->num_entries; ++i) {
- if (amqp_table_contains_entry(&default_table, &client_properties->entries[i])) {
- continue;
- }
- memcpy(current_entry, &client_properties->entries[i], sizeof(amqp_table_entry_t));
- s.client_properties.num_entries += 1;
- ++current_entry;
- }
+ res = amqp_merge_capabilities(&default_table, client_properties,
+ &s.client_properties, channel_pool);
+ if (AMQP_STATUS_OK == res) {
+ goto error_res;
}
s.mechanism = sasl_method_name(sasl_method);
diff --git a/librabbitmq/amqp_socket.h b/librabbitmq/amqp_socket.h
index fdf2b0f..fee7d7b 100644
--- a/librabbitmq/amqp_socket.h
+++ b/librabbitmq/amqp_socket.h
@@ -177,6 +177,8 @@ amqp_simple_wait_frame_on_channel(amqp_connection_state_t state,
int
sasl_mechanism_in_list(amqp_bytes_t mechanisms, amqp_sasl_method_enum method);
+int amqp_merge_capabilities(const amqp_table_t *base, const amqp_table_t *add,
+ amqp_table_t *result, amqp_pool_t *pool);
AMQP_END_DECLS
#endif /* AMQP_SOCKET_H */