summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2013-10-21 22:29:57 +0000
committerKim van der Riet <kpvdr@apache.org>2013-10-21 22:29:57 +0000
commit7348c75f13673a6a0434bfddc8dff474ba6b69c2 (patch)
tree2534eb7ebd00b731f99668a237f7054185d0195f
parent888581cb9781259073d190edede25e6253ec7dd9 (diff)
downloadqpid-python-7348c75f13673a6a0434bfddc8dff474ba6b69c2.tar.gz
QPID-4984: WIP - Merge from trunk r.1534399.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/linearstore@1534401 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/extras/dispatch/include/qpid/dispatch/router.h1
-rw-r--r--qpid/extras/dispatch/src/agent.c227
-rw-r--r--qpid/extras/dispatch/src/router_agent.c21
3 files changed, 205 insertions, 44 deletions
diff --git a/qpid/extras/dispatch/include/qpid/dispatch/router.h b/qpid/extras/dispatch/include/qpid/dispatch/router.h
index 1dadc8d119..82978dd440 100644
--- a/qpid/extras/dispatch/include/qpid/dispatch/router.h
+++ b/qpid/extras/dispatch/include/qpid/dispatch/router.h
@@ -47,5 +47,6 @@ void dx_router_send2(dx_dispatch_t *dx,
const char *address,
dx_message_t *msg);
+void dx_router_build_node_list(dx_dispatch_t *dx, dx_composed_field_t *field);
#endif
diff --git a/qpid/extras/dispatch/src/agent.c b/qpid/extras/dispatch/src/agent.c
index a7475888b2..784c333ec4 100644
--- a/qpid/extras/dispatch/src/agent.c
+++ b/qpid/extras/dispatch/src/agent.c
@@ -35,24 +35,29 @@
#include <string.h>
#include <stdio.h>
-struct dx_agent_t {
- dx_dispatch_t *dx;
- dx_hash_t *class_hash;
- dx_message_list_t in_fifo;
- dx_message_list_t out_fifo;
- sys_mutex_t *lock;
- dx_timer_t *timer;
- dx_address_t *address;
-};
-
-
struct dx_agent_class_t {
- char *fqname;
+ DEQ_LINKS(dx_agent_class_t);
+ dx_hash_handle_t *hash_handle;
void *context;
dx_agent_schema_cb_t schema_handler;
dx_agent_query_cb_t query_handler; // 0 iff class is an event.
};
+DEQ_DECLARE(dx_agent_class_t, dx_agent_class_list_t);
+
+
+struct dx_agent_t {
+ dx_dispatch_t *dx;
+ dx_hash_t *class_hash;
+ dx_agent_class_list_t class_list;
+ dx_message_list_t in_fifo;
+ dx_message_list_t out_fifo;
+ sys_mutex_t *lock;
+ dx_timer_t *timer;
+ dx_address_t *address;
+ dx_agent_class_t *container_class;
+};
+
typedef struct {
dx_agent_t *agent;
@@ -63,20 +68,8 @@ typedef struct {
static char *log_module = "AGENT";
-static void dx_agent_process_get(dx_agent_t *agent, dx_parsed_field_t *map, dx_field_iterator_t *reply_to)
+static dx_composed_field_t *dx_agent_setup_response(dx_field_iterator_t *reply_to)
{
- dx_parsed_field_t *cls = dx_parse_value_by_key(map, "type");
- if (cls == 0)
- return;
-
- dx_field_iterator_t *cls_string = dx_parse_raw(cls);
- const dx_agent_class_t *cls_record;
- dx_hash_retrieve_const(agent->class_hash, cls_string, (const void**) &cls_record);
- if (cls_record == 0)
- return;
-
- dx_log(log_module, LOG_TRACE, "Received GET request for type: %s", cls_record->fqname);
-
//
// Compose the header
//
@@ -103,9 +96,6 @@ static void dx_agent_process_get(dx_agent_t *agent, dx_parsed_field_t *map, dx_f
//
field = dx_compose(DX_PERFORMATIVE_APPLICATION_PROPERTIES, field);
dx_compose_start_map(field);
- dx_compose_insert_string(field, "operation");
- dx_compose_insert_string(field, "GET");
-
dx_compose_insert_string(field, "status-code");
dx_compose_insert_uint(field, 200);
@@ -113,6 +103,26 @@ static void dx_agent_process_get(dx_agent_t *agent, dx_parsed_field_t *map, dx_f
dx_compose_insert_string(field, "OK");
dx_compose_end_map(field);
+ return field;
+}
+
+
+static void dx_agent_process_get(dx_agent_t *agent, dx_parsed_field_t *map, dx_field_iterator_t *reply_to)
+{
+ dx_parsed_field_t *cls = dx_parse_value_by_key(map, "type");
+ if (cls == 0)
+ return;
+
+ dx_field_iterator_t *cls_string = dx_parse_raw(cls);
+ const dx_agent_class_t *cls_record;
+ dx_hash_retrieve_const(agent->class_hash, cls_string, (const void**) &cls_record);
+ if (cls_record == 0)
+ return;
+
+ dx_log(log_module, LOG_TRACE, "Received GET request for type: %s", dx_hash_key_by_handle(cls_record->hash_handle));
+
+ dx_composed_field_t *field = dx_agent_setup_response(reply_to);
+
//
// Open the Body (AMQP Value) to be filled in by the handler.
//
@@ -147,6 +157,113 @@ static void dx_agent_process_get(dx_agent_t *agent, dx_parsed_field_t *map, dx_f
}
+static void dx_agent_process_discover_types(dx_agent_t *agent, dx_parsed_field_t *map, dx_field_iterator_t *reply_to)
+{
+ dx_log(log_module, LOG_TRACE, "Received DISCOVER-TYPES request");
+
+ dx_composed_field_t *field = dx_agent_setup_response(reply_to);
+
+ //
+ // Open the Body (AMQP Value) to be filled in by the handler.
+ //
+ field = dx_compose(DX_PERFORMATIVE_BODY_AMQP_VALUE, field);
+ dx_compose_start_map(field);
+
+ //
+ // Put entries into the map for each known entity type
+ //
+ sys_mutex_lock(agent->lock);
+ dx_agent_class_t *cls = DEQ_HEAD(agent->class_list);
+ while (cls) {
+ dx_compose_insert_string(field, (const char*) dx_hash_key_by_handle(cls->hash_handle));
+ dx_compose_insert_null(field); // TODO - https://tools.oasis-open.org/issues/browse/AMQP-87
+ cls = DEQ_NEXT(cls);
+ }
+ sys_mutex_unlock(agent->lock);
+ dx_compose_end_map(field);
+
+ //
+ // Create a message and send it.
+ //
+ dx_message_t *msg = dx_message();
+ dx_message_compose_2(msg, field);
+ dx_router_send(agent->dx, reply_to, msg);
+
+ dx_message_free(msg);
+ dx_compose_free(field);
+}
+
+
+static void dx_agent_process_discover_operations(dx_agent_t *agent, dx_parsed_field_t *map, dx_field_iterator_t *reply_to)
+{
+ dx_log(log_module, LOG_TRACE, "Received DISCOVER-OPERATIONS request");
+
+ dx_composed_field_t *field = dx_agent_setup_response(reply_to);
+
+ //
+ // Open the Body (AMQP Value) to be filled in by the handler.
+ //
+ field = dx_compose(DX_PERFORMATIVE_BODY_AMQP_VALUE, field);
+ dx_compose_start_map(field);
+
+ //
+ // Put entries into the map for each known entity type
+ //
+ sys_mutex_lock(agent->lock);
+ dx_agent_class_t *cls = DEQ_HEAD(agent->class_list);
+ while (cls) {
+ dx_compose_insert_string(field, (const char*) dx_hash_key_by_handle(cls->hash_handle));
+ dx_compose_start_list(field);
+ dx_compose_insert_string(field, "READ");
+ dx_compose_end_list(field);
+ cls = DEQ_NEXT(cls);
+ }
+ sys_mutex_unlock(agent->lock);
+ dx_compose_end_map(field);
+
+ //
+ // Create a message and send it.
+ //
+ dx_message_t *msg = dx_message();
+ dx_message_compose_2(msg, field);
+ dx_router_send(agent->dx, reply_to, msg);
+
+ dx_message_free(msg);
+ dx_compose_free(field);
+}
+
+
+static void dx_agent_process_discover_nodes(dx_agent_t *agent, dx_parsed_field_t *map, dx_field_iterator_t *reply_to)
+{
+ dx_log(log_module, LOG_TRACE, "Received DISCOVER-MGMT-NODES request");
+
+ dx_composed_field_t *field = dx_agent_setup_response(reply_to);
+
+ //
+ // Open the Body (AMQP Value) to be filled in by the handler.
+ //
+ field = dx_compose(DX_PERFORMATIVE_BODY_AMQP_VALUE, field);
+
+ //
+ // Put entries into the list for each known management node
+ //
+ dx_compose_start_list(field);
+ dx_compose_insert_string(field, "amqp:/_local/$management");
+ dx_router_build_node_list(agent->dx, field);
+ dx_compose_end_list(field);
+
+ //
+ // Create a message and send it.
+ //
+ dx_message_t *msg = dx_message();
+ dx_message_compose_2(msg, field);
+ dx_router_send(agent->dx, reply_to, msg);
+
+ dx_message_free(msg);
+ dx_compose_free(field);
+}
+
+
static void dx_agent_process_request(dx_agent_t *agent, dx_message_t *msg)
{
//
@@ -213,6 +330,12 @@ static void dx_agent_process_request(dx_agent_t *agent, dx_message_t *msg)
dx_field_iterator_t *operation_string = dx_parse_raw(operation);
if (dx_field_iterator_equal(operation_string, (unsigned char*) "GET"))
dx_agent_process_get(agent, map, reply_to);
+ if (dx_field_iterator_equal(operation_string, (unsigned char*) "DISCOVER-TYPES"))
+ dx_agent_process_discover_types(agent, map, reply_to);
+ if (dx_field_iterator_equal(operation_string, (unsigned char*) "DISCOVER-OPERATIONS"))
+ dx_agent_process_discover_operations(agent, map, reply_to);
+ if (dx_field_iterator_equal(operation_string, (unsigned char*) "DISCOVER-MGMT-NODES"))
+ dx_agent_process_discover_nodes(agent, map, reply_to);
dx_parse_free(map);
dx_field_iterator_free(ap);
@@ -253,11 +376,38 @@ static void dx_agent_rx_handler(void *context, dx_message_t *msg, int unused_lin
}
+static dx_agent_class_t *dx_agent_register_class_LH(dx_agent_t *agent,
+ const char *fqname,
+ void *context,
+ dx_agent_schema_cb_t schema_handler,
+ dx_agent_query_cb_t query_handler)
+{
+ dx_agent_class_t *cls = NEW(dx_agent_class_t);
+ assert(cls);
+ DEQ_ITEM_INIT(cls);
+ cls->context = context;
+ cls->schema_handler = schema_handler;
+ cls->query_handler = query_handler;
+
+ dx_field_iterator_t *iter = dx_field_iterator_string(fqname, ITER_VIEW_ALL);
+ int result = dx_hash_insert_const(agent->class_hash, iter, cls, &cls->hash_handle);
+ dx_field_iterator_free(iter);
+ if (result < 0)
+ assert(false);
+
+ DEQ_INSERT_TAIL(agent->class_list, cls);
+
+ dx_log(log_module, LOG_INFO, "Manageable Entity Type (%s) %s", query_handler ? "object" : "event", fqname);
+ return cls;
+}
+
+
dx_agent_t *dx_agent(dx_dispatch_t *dx)
{
dx_agent_t *agent = NEW(dx_agent_t);
agent->dx = dx;
agent->class_hash = dx_hash(6, 10, 1);
+ DEQ_INIT(agent->class_list);
DEQ_INIT(agent->in_fifo);
DEQ_INIT(agent->out_fifo);
agent->lock = sys_mutex();
@@ -284,23 +434,12 @@ dx_agent_class_t *dx_agent_register_class(dx_dispatch_t *dx,
dx_agent_schema_cb_t schema_handler,
dx_agent_query_cb_t query_handler)
{
- dx_agent_t *agent = dx->agent;
-
- dx_agent_class_t *cls = NEW(dx_agent_class_t);
- assert(cls);
- cls->fqname = (char*) malloc(strlen(fqname) + 1);
- strcpy(cls->fqname, fqname);
- cls->context = context;
- cls->schema_handler = schema_handler;
- cls->query_handler = query_handler;
+ dx_agent_t *agent = dx->agent;
+ dx_agent_class_t *cls;
- dx_field_iterator_t *iter = dx_field_iterator_string(fqname, ITER_VIEW_ALL);
- int result = dx_hash_insert_const(agent->class_hash, iter, cls, 0);
- dx_field_iterator_free(iter);
- if (result < 0)
- assert(false);
-
- dx_log(log_module, LOG_TRACE, "%s class registered: %s", query_handler ? "Object" : "Event", fqname);
+ sys_mutex_lock(agent->lock);
+ cls = dx_agent_register_class_LH(agent, fqname, context, schema_handler, query_handler);
+ sys_mutex_unlock(agent->lock);
return cls;
}
diff --git a/qpid/extras/dispatch/src/router_agent.c b/qpid/extras/dispatch/src/router_agent.c
index d4b719732c..4a70dc4ede 100644
--- a/qpid/extras/dispatch/src/router_agent.c
+++ b/qpid/extras/dispatch/src/router_agent.c
@@ -185,3 +185,24 @@ void dx_router_agent_setup(dx_router_t *router)
dx_router_setup_class(router, "org.apache.qpid.dispatch.router.address", DX_ROUTER_CLASS_ADDRESS);
}
+
+void dx_router_build_node_list(dx_dispatch_t *dx, dx_composed_field_t *field)
+{
+ dx_router_t *router = dx->router;
+ char temp[1000]; // FIXME
+
+ sys_mutex_lock(router->lock);
+ dx_router_node_t *rnode = DEQ_HEAD(router->routers);
+ while (rnode) {
+ strcpy(temp, "amqp:/_topo/");
+ strcat(temp, router->router_area);
+ strcat(temp, "/");
+ const unsigned char* addr = dx_hash_key_by_handle(rnode->owning_addr->hash_handle);
+ strcat(temp, &((char*) addr)[1]);
+ strcat(temp, "/$management");
+ dx_compose_insert_string(field, temp);
+ rnode = DEQ_NEXT(rnode);
+ }
+ sys_mutex_unlock(router->lock);
+}
+