diff options
author | Kim van der Riet <kpvdr@apache.org> | 2013-10-21 22:29:57 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2013-10-21 22:29:57 +0000 |
commit | 7348c75f13673a6a0434bfddc8dff474ba6b69c2 (patch) | |
tree | 2534eb7ebd00b731f99668a237f7054185d0195f | |
parent | 888581cb9781259073d190edede25e6253ec7dd9 (diff) | |
download | qpid-python-7348c75f13673a6a0434bfddc8dff474ba6b69c2.tar.gz |
QPID-4984: WIP - Merge from trunk r.1534399.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/linearstore@1534401 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/extras/dispatch/include/qpid/dispatch/router.h | 1 | ||||
-rw-r--r-- | qpid/extras/dispatch/src/agent.c | 227 | ||||
-rw-r--r-- | qpid/extras/dispatch/src/router_agent.c | 21 |
3 files changed, 205 insertions, 44 deletions
diff --git a/qpid/extras/dispatch/include/qpid/dispatch/router.h b/qpid/extras/dispatch/include/qpid/dispatch/router.h index 1dadc8d119..82978dd440 100644 --- a/qpid/extras/dispatch/include/qpid/dispatch/router.h +++ b/qpid/extras/dispatch/include/qpid/dispatch/router.h @@ -47,5 +47,6 @@ void dx_router_send2(dx_dispatch_t *dx, const char *address, dx_message_t *msg); +void dx_router_build_node_list(dx_dispatch_t *dx, dx_composed_field_t *field); #endif diff --git a/qpid/extras/dispatch/src/agent.c b/qpid/extras/dispatch/src/agent.c index a7475888b2..784c333ec4 100644 --- a/qpid/extras/dispatch/src/agent.c +++ b/qpid/extras/dispatch/src/agent.c @@ -35,24 +35,29 @@ #include <string.h> #include <stdio.h> -struct dx_agent_t { - dx_dispatch_t *dx; - dx_hash_t *class_hash; - dx_message_list_t in_fifo; - dx_message_list_t out_fifo; - sys_mutex_t *lock; - dx_timer_t *timer; - dx_address_t *address; -}; - - struct dx_agent_class_t { - char *fqname; + DEQ_LINKS(dx_agent_class_t); + dx_hash_handle_t *hash_handle; void *context; dx_agent_schema_cb_t schema_handler; dx_agent_query_cb_t query_handler; // 0 iff class is an event. }; +DEQ_DECLARE(dx_agent_class_t, dx_agent_class_list_t); + + +struct dx_agent_t { + dx_dispatch_t *dx; + dx_hash_t *class_hash; + dx_agent_class_list_t class_list; + dx_message_list_t in_fifo; + dx_message_list_t out_fifo; + sys_mutex_t *lock; + dx_timer_t *timer; + dx_address_t *address; + dx_agent_class_t *container_class; +}; + typedef struct { dx_agent_t *agent; @@ -63,20 +68,8 @@ typedef struct { static char *log_module = "AGENT"; -static void dx_agent_process_get(dx_agent_t *agent, dx_parsed_field_t *map, dx_field_iterator_t *reply_to) +static dx_composed_field_t *dx_agent_setup_response(dx_field_iterator_t *reply_to) { - dx_parsed_field_t *cls = dx_parse_value_by_key(map, "type"); - if (cls == 0) - return; - - dx_field_iterator_t *cls_string = dx_parse_raw(cls); - const dx_agent_class_t *cls_record; - dx_hash_retrieve_const(agent->class_hash, cls_string, (const void**) &cls_record); - if (cls_record == 0) - return; - - dx_log(log_module, LOG_TRACE, "Received GET request for type: %s", cls_record->fqname); - // // Compose the header // @@ -103,9 +96,6 @@ static void dx_agent_process_get(dx_agent_t *agent, dx_parsed_field_t *map, dx_f // field = dx_compose(DX_PERFORMATIVE_APPLICATION_PROPERTIES, field); dx_compose_start_map(field); - dx_compose_insert_string(field, "operation"); - dx_compose_insert_string(field, "GET"); - dx_compose_insert_string(field, "status-code"); dx_compose_insert_uint(field, 200); @@ -113,6 +103,26 @@ static void dx_agent_process_get(dx_agent_t *agent, dx_parsed_field_t *map, dx_f dx_compose_insert_string(field, "OK"); dx_compose_end_map(field); + return field; +} + + +static void dx_agent_process_get(dx_agent_t *agent, dx_parsed_field_t *map, dx_field_iterator_t *reply_to) +{ + dx_parsed_field_t *cls = dx_parse_value_by_key(map, "type"); + if (cls == 0) + return; + + dx_field_iterator_t *cls_string = dx_parse_raw(cls); + const dx_agent_class_t *cls_record; + dx_hash_retrieve_const(agent->class_hash, cls_string, (const void**) &cls_record); + if (cls_record == 0) + return; + + dx_log(log_module, LOG_TRACE, "Received GET request for type: %s", dx_hash_key_by_handle(cls_record->hash_handle)); + + dx_composed_field_t *field = dx_agent_setup_response(reply_to); + // // Open the Body (AMQP Value) to be filled in by the handler. // @@ -147,6 +157,113 @@ static void dx_agent_process_get(dx_agent_t *agent, dx_parsed_field_t *map, dx_f } +static void dx_agent_process_discover_types(dx_agent_t *agent, dx_parsed_field_t *map, dx_field_iterator_t *reply_to) +{ + dx_log(log_module, LOG_TRACE, "Received DISCOVER-TYPES request"); + + dx_composed_field_t *field = dx_agent_setup_response(reply_to); + + // + // Open the Body (AMQP Value) to be filled in by the handler. + // + field = dx_compose(DX_PERFORMATIVE_BODY_AMQP_VALUE, field); + dx_compose_start_map(field); + + // + // Put entries into the map for each known entity type + // + sys_mutex_lock(agent->lock); + dx_agent_class_t *cls = DEQ_HEAD(agent->class_list); + while (cls) { + dx_compose_insert_string(field, (const char*) dx_hash_key_by_handle(cls->hash_handle)); + dx_compose_insert_null(field); // TODO - https://tools.oasis-open.org/issues/browse/AMQP-87 + cls = DEQ_NEXT(cls); + } + sys_mutex_unlock(agent->lock); + dx_compose_end_map(field); + + // + // Create a message and send it. + // + dx_message_t *msg = dx_message(); + dx_message_compose_2(msg, field); + dx_router_send(agent->dx, reply_to, msg); + + dx_message_free(msg); + dx_compose_free(field); +} + + +static void dx_agent_process_discover_operations(dx_agent_t *agent, dx_parsed_field_t *map, dx_field_iterator_t *reply_to) +{ + dx_log(log_module, LOG_TRACE, "Received DISCOVER-OPERATIONS request"); + + dx_composed_field_t *field = dx_agent_setup_response(reply_to); + + // + // Open the Body (AMQP Value) to be filled in by the handler. + // + field = dx_compose(DX_PERFORMATIVE_BODY_AMQP_VALUE, field); + dx_compose_start_map(field); + + // + // Put entries into the map for each known entity type + // + sys_mutex_lock(agent->lock); + dx_agent_class_t *cls = DEQ_HEAD(agent->class_list); + while (cls) { + dx_compose_insert_string(field, (const char*) dx_hash_key_by_handle(cls->hash_handle)); + dx_compose_start_list(field); + dx_compose_insert_string(field, "READ"); + dx_compose_end_list(field); + cls = DEQ_NEXT(cls); + } + sys_mutex_unlock(agent->lock); + dx_compose_end_map(field); + + // + // Create a message and send it. + // + dx_message_t *msg = dx_message(); + dx_message_compose_2(msg, field); + dx_router_send(agent->dx, reply_to, msg); + + dx_message_free(msg); + dx_compose_free(field); +} + + +static void dx_agent_process_discover_nodes(dx_agent_t *agent, dx_parsed_field_t *map, dx_field_iterator_t *reply_to) +{ + dx_log(log_module, LOG_TRACE, "Received DISCOVER-MGMT-NODES request"); + + dx_composed_field_t *field = dx_agent_setup_response(reply_to); + + // + // Open the Body (AMQP Value) to be filled in by the handler. + // + field = dx_compose(DX_PERFORMATIVE_BODY_AMQP_VALUE, field); + + // + // Put entries into the list for each known management node + // + dx_compose_start_list(field); + dx_compose_insert_string(field, "amqp:/_local/$management"); + dx_router_build_node_list(agent->dx, field); + dx_compose_end_list(field); + + // + // Create a message and send it. + // + dx_message_t *msg = dx_message(); + dx_message_compose_2(msg, field); + dx_router_send(agent->dx, reply_to, msg); + + dx_message_free(msg); + dx_compose_free(field); +} + + static void dx_agent_process_request(dx_agent_t *agent, dx_message_t *msg) { // @@ -213,6 +330,12 @@ static void dx_agent_process_request(dx_agent_t *agent, dx_message_t *msg) dx_field_iterator_t *operation_string = dx_parse_raw(operation); if (dx_field_iterator_equal(operation_string, (unsigned char*) "GET")) dx_agent_process_get(agent, map, reply_to); + if (dx_field_iterator_equal(operation_string, (unsigned char*) "DISCOVER-TYPES")) + dx_agent_process_discover_types(agent, map, reply_to); + if (dx_field_iterator_equal(operation_string, (unsigned char*) "DISCOVER-OPERATIONS")) + dx_agent_process_discover_operations(agent, map, reply_to); + if (dx_field_iterator_equal(operation_string, (unsigned char*) "DISCOVER-MGMT-NODES")) + dx_agent_process_discover_nodes(agent, map, reply_to); dx_parse_free(map); dx_field_iterator_free(ap); @@ -253,11 +376,38 @@ static void dx_agent_rx_handler(void *context, dx_message_t *msg, int unused_lin } +static dx_agent_class_t *dx_agent_register_class_LH(dx_agent_t *agent, + const char *fqname, + void *context, + dx_agent_schema_cb_t schema_handler, + dx_agent_query_cb_t query_handler) +{ + dx_agent_class_t *cls = NEW(dx_agent_class_t); + assert(cls); + DEQ_ITEM_INIT(cls); + cls->context = context; + cls->schema_handler = schema_handler; + cls->query_handler = query_handler; + + dx_field_iterator_t *iter = dx_field_iterator_string(fqname, ITER_VIEW_ALL); + int result = dx_hash_insert_const(agent->class_hash, iter, cls, &cls->hash_handle); + dx_field_iterator_free(iter); + if (result < 0) + assert(false); + + DEQ_INSERT_TAIL(agent->class_list, cls); + + dx_log(log_module, LOG_INFO, "Manageable Entity Type (%s) %s", query_handler ? "object" : "event", fqname); + return cls; +} + + dx_agent_t *dx_agent(dx_dispatch_t *dx) { dx_agent_t *agent = NEW(dx_agent_t); agent->dx = dx; agent->class_hash = dx_hash(6, 10, 1); + DEQ_INIT(agent->class_list); DEQ_INIT(agent->in_fifo); DEQ_INIT(agent->out_fifo); agent->lock = sys_mutex(); @@ -284,23 +434,12 @@ dx_agent_class_t *dx_agent_register_class(dx_dispatch_t *dx, dx_agent_schema_cb_t schema_handler, dx_agent_query_cb_t query_handler) { - dx_agent_t *agent = dx->agent; - - dx_agent_class_t *cls = NEW(dx_agent_class_t); - assert(cls); - cls->fqname = (char*) malloc(strlen(fqname) + 1); - strcpy(cls->fqname, fqname); - cls->context = context; - cls->schema_handler = schema_handler; - cls->query_handler = query_handler; + dx_agent_t *agent = dx->agent; + dx_agent_class_t *cls; - dx_field_iterator_t *iter = dx_field_iterator_string(fqname, ITER_VIEW_ALL); - int result = dx_hash_insert_const(agent->class_hash, iter, cls, 0); - dx_field_iterator_free(iter); - if (result < 0) - assert(false); - - dx_log(log_module, LOG_TRACE, "%s class registered: %s", query_handler ? "Object" : "Event", fqname); + sys_mutex_lock(agent->lock); + cls = dx_agent_register_class_LH(agent, fqname, context, schema_handler, query_handler); + sys_mutex_unlock(agent->lock); return cls; } diff --git a/qpid/extras/dispatch/src/router_agent.c b/qpid/extras/dispatch/src/router_agent.c index d4b719732c..4a70dc4ede 100644 --- a/qpid/extras/dispatch/src/router_agent.c +++ b/qpid/extras/dispatch/src/router_agent.c @@ -185,3 +185,24 @@ void dx_router_agent_setup(dx_router_t *router) dx_router_setup_class(router, "org.apache.qpid.dispatch.router.address", DX_ROUTER_CLASS_ADDRESS); } + +void dx_router_build_node_list(dx_dispatch_t *dx, dx_composed_field_t *field) +{ + dx_router_t *router = dx->router; + char temp[1000]; // FIXME + + sys_mutex_lock(router->lock); + dx_router_node_t *rnode = DEQ_HEAD(router->routers); + while (rnode) { + strcpy(temp, "amqp:/_topo/"); + strcat(temp, router->router_area); + strcat(temp, "/"); + const unsigned char* addr = dx_hash_key_by_handle(rnode->owning_addr->hash_handle); + strcat(temp, &((char*) addr)[1]); + strcat(temp, "/$management"); + dx_compose_insert_string(field, temp); + rnode = DEQ_NEXT(rnode); + } + sys_mutex_unlock(router->lock); +} + |