diff options
author | Ted Ross <tross@apache.org> | 2013-10-18 18:20:41 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2013-10-18 18:20:41 +0000 |
commit | 101d4332d72385929f1d896a986fc99b8c74dabe (patch) | |
tree | aa4532528435138e11b3cdcc5445fed8991977ae /extras | |
parent | fef9bb8b051a2944c32e19a37a60387a3671f64c (diff) | |
download | qpid-python-101d4332d72385929f1d896a986fc99b8c74dabe.tar.gz |
QPID-5216
- Fixed propagation of the deletion of locally-attached mobile addresses
- Changed 'global' address class to 'mobile' to be consistent
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1533581 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'extras')
-rw-r--r-- | extras/dispatch/include/qpid/dispatch/hash.h | 1 | ||||
-rw-r--r-- | extras/dispatch/python/qpid/dispatch/router/mobile.py | 16 | ||||
-rw-r--r-- | extras/dispatch/python/qpid/dispatch/router/router_engine.py | 7 | ||||
-rw-r--r-- | extras/dispatch/src/hash.c | 12 | ||||
-rw-r--r-- | extras/dispatch/src/router_node.c | 67 | ||||
-rw-r--r-- | extras/dispatch/src/router_private.h | 6 | ||||
-rw-r--r-- | extras/dispatch/src/router_pynode.c | 26 | ||||
-rwxr-xr-x | extras/dispatch/tools/src/py/qdstat | 2 |
8 files changed, 112 insertions, 25 deletions
diff --git a/extras/dispatch/include/qpid/dispatch/hash.h b/extras/dispatch/include/qpid/dispatch/hash.h index 3026c55dcf..4f079f6993 100644 --- a/extras/dispatch/include/qpid/dispatch/hash.h +++ b/extras/dispatch/include/qpid/dispatch/hash.h @@ -39,6 +39,7 @@ dx_error_t dx_hash_remove(dx_hash_t *h, dx_field_iterator_t *key); void dx_hash_handle_free(dx_hash_handle_t *handle); const unsigned char *dx_hash_key_by_handle(const dx_hash_handle_t *handle); dx_error_t dx_hash_remove_by_handle(dx_hash_t *h, dx_hash_handle_t *handle); +dx_error_t dx_hash_remove_by_handle2(dx_hash_t *h, dx_hash_handle_t *handle, unsigned char **key); #endif diff --git a/extras/dispatch/python/qpid/dispatch/router/mobile.py b/extras/dispatch/python/qpid/dispatch/router/mobile.py index 0e27910e42..117cf98c22 100644 --- a/extras/dispatch/python/qpid/dispatch/router/mobile.py +++ b/extras/dispatch/python/qpid/dispatch/router/mobile.py @@ -145,8 +145,10 @@ class MobileAddressEngine(object): for addr in msg.del_list: _list.remove(addr) self.remote_lists[msg.id] = (msg.mobile_seq, _list) - self.node_tracker.add_addresses(msg.id, msg.add_list) - self.node_tracker.del_addresses(msg.id, msg.del_list) + if msg.add_list: + self.node_tracker.add_addresses(msg.id, msg.add_list) + if msg.del_list: + self.node_tracker.del_addresses(msg.id, msg.del_list) self._activate_remotes(msg.id, msg.add_list, msg.del_list) else: self.needed_mars[(msg.id, msg.area, _seq)] = None @@ -178,8 +180,10 @@ class MobileAddressEngine(object): def _activate_remotes(self, _id, added, deleted): bit = self.node_tracker.maskbit_for_node(_id) - for a in added: - self.container.router_adapter.map_destination(a, bit) - for d in deleted: - self.container.router_adapter.unmap_destination(d, bit) + if added: + for a in added: + self.container.router_adapter.map_destination(a, bit) + if deleted: + for d in deleted: + self.container.router_adapter.unmap_destination(d, bit) diff --git a/extras/dispatch/python/qpid/dispatch/router/router_engine.py b/extras/dispatch/python/qpid/dispatch/router/router_engine.py index 5128d175a6..f8b2dd9c94 100644 --- a/extras/dispatch/python/qpid/dispatch/router/router_engine.py +++ b/extras/dispatch/python/qpid/dispatch/router/router_engine.py @@ -29,6 +29,9 @@ from mobile import MobileAddressEngine from routing import RoutingTableEngine from node import NodeTracker +import sys +import traceback + ## ## Import the Dispatch adapters from the environment. If they are not found ## (i.e. we are in a test bench, etc.), load the stub versions. @@ -106,7 +109,7 @@ class RouterEngine: try: if addr.find('Mtemp.') == 0: return - if key.find('M') == 0: + if addr.find('M') == 0: self.mobile_address_engine.del_local_address(addr[1:]) except Exception, e: self.log(LOG_ERROR, "Exception in del-address processing: exception=%r" % e) @@ -165,6 +168,8 @@ class RouterEngine: except Exception, e: self.log(LOG_ERROR, "Exception in message processing: opcode=%s body=%r exception=%r" % (opcode, body, e)) + exc_type, exc_value, exc_traceback = sys.exc_info() + traceback.print_tb(exc_traceback) def receive(self, message_properties, body, link_id): diff --git a/extras/dispatch/src/hash.c b/extras/dispatch/src/hash.c index 24f0999fdc..0cb32acd05 100644 --- a/extras/dispatch/src/hash.c +++ b/extras/dispatch/src/hash.c @@ -264,9 +264,19 @@ const unsigned char *dx_hash_key_by_handle(const dx_hash_handle_t *handle) dx_error_t dx_hash_remove_by_handle(dx_hash_t *h, dx_hash_handle_t *handle) { + unsigned char *key = 0; + dx_error_t error = dx_hash_remove_by_handle2(h, handle, &key); + if (key) + free(key); + return error; +} + + +dx_error_t dx_hash_remove_by_handle2(dx_hash_t *h, dx_hash_handle_t *handle, unsigned char **key) +{ if (!handle) return DX_ERROR_NOT_FOUND; - free(handle->item->key); + *key = handle->item->key; DEQ_REMOVE(handle->bucket->items, handle->item); free_dx_hash_item_t(handle->item); h->size--; diff --git a/extras/dispatch/src/router_node.c b/extras/dispatch/src/router_node.c index eefd37cb77..28f22317f3 100644 --- a/extras/dispatch/src/router_node.c +++ b/extras/dispatch/src/router_node.c @@ -107,18 +107,51 @@ void dx_router_del_node_ref_LH(dx_router_ref_list_t *ref_list, dx_router_node_t * Depending on its policy, the address may be eligible for being closed out * (i.e. Logging its terminal statistics and freeing its resources). */ -void dx_router_check_addr_LH(dx_router_t *router, dx_address_t *addr) +void dx_router_check_addr(dx_router_t *router, dx_address_t *addr, int was_local) { if (addr == 0) return; - if (addr->handler || DEQ_SIZE(addr->rlinks) > 0 || DEQ_SIZE(addr->rnodes) > 0) - return; + unsigned char *key = 0; + const unsigned char *key_const = 0; + int to_delete = 0; + int no_more_locals = 0; + + sys_mutex_lock(router->lock); + if (addr->handler == 0 && DEQ_SIZE(addr->rlinks) == 0 && DEQ_SIZE(addr->rnodes) == 0) + to_delete = 1; + + if (was_local && DEQ_SIZE(addr->rlinks) == 0) + no_more_locals = 1; + + if (to_delete) { + dx_hash_remove_by_handle2(router->addr_hash, addr->hash_handle, &key); + DEQ_REMOVE(router->addrs, addr); + dx_hash_handle_free(addr->hash_handle); + free_dx_address_t(addr); + } - dx_hash_remove_by_handle(router->addr_hash, addr->hash_handle); - DEQ_REMOVE(router->addrs, addr); - dx_hash_handle_free(addr->hash_handle); - free_dx_address_t(addr); + if (!to_delete && no_more_locals) + key_const = dx_hash_key_by_handle(addr->hash_handle); + + sys_mutex_unlock(router->lock); + + // + // If the address is mobile-class and it was just removed from a local link, + // tell the router module that it is no longer attached locally. + // + if (no_more_locals) { + if (key && key[0] == 'M') + dx_router_mobile_removed(router, (const char*) key); + if (key_const && key_const[0] == 'M') + dx_router_mobile_removed(router, (const char*) key_const); + } + + // + // Free the key that was not freed by the hash table. + // + if (key) + free(key); } @@ -452,6 +485,16 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del if (iter) { dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH); + + // + // Note: This function is going to need to be refactored so we can put an + // asynchronous address lookup here. In the event there is a translation + // of the address (via namespace), it will have to be done here after + // obtaining the iterator and before doing the hash lookup. + // + // Note that this lookup is only done for global/mobile class addresses. + // + dx_hash_retrieve(router->addr_hash, iter, (void*) &addr); dx_field_iterator_reset_view(iter, ITER_VIEW_NO_HOST); int is_local = dx_field_iterator_prefix(iter, local_prefix); @@ -836,7 +879,7 @@ static int router_outgoing_link_handler(void* context, dx_link_t *link) sys_mutex_unlock(router->lock); if (propagate) - dx_router_global_added(router, iter); + dx_router_mobile_added(router, iter); if (iter) dx_field_iterator_free(iter); @@ -853,6 +896,7 @@ static int router_link_detach_handler(void* context, dx_link_t *link, int closed dx_router_t *router = (dx_router_t*) context; dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link); dx_router_conn_t *shared = (dx_router_conn_t*) dx_link_get_conn_context(link); + dx_address_t *oaddr = 0; if (shared) { dx_link_set_conn_context(link, 0); @@ -869,7 +913,7 @@ static int router_link_detach_handler(void* context, dx_link_t *link, int closed // if (rlink->link_direction == DX_OUTGOING && rlink->owning_addr) { dx_router_del_link_ref_LH(&rlink->owning_addr->rlinks, rlink); - dx_router_check_addr_LH(router, rlink->owning_addr); + oaddr = rlink->owning_addr; } // @@ -895,6 +939,11 @@ static int router_link_detach_handler(void* context, dx_link_t *link, int closed DEQ_REMOVE(router->links, rlink); sys_mutex_unlock(router->lock); + // + // Check to see if the owning address should be deleted + // + dx_router_check_addr(router, oaddr, 1); + // TODO - wrap the free to handle the recursive items free_dx_router_link_t(rlink); diff --git a/extras/dispatch/src/router_private.h b/extras/dispatch/src/router_private.h index 9800189548..066188b3f7 100644 --- a/extras/dispatch/src/router_private.h +++ b/extras/dispatch/src/router_private.h @@ -163,15 +163,15 @@ struct dx_router_t { -void dx_router_check_addr_LH(dx_router_t *router, dx_address_t *addr); +void dx_router_check_addr(dx_router_t *router, dx_address_t *addr, int was_local); void dx_router_add_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link); void dx_router_del_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link); void dx_router_add_node_ref_LH(dx_router_ref_list_t *ref_list, dx_router_node_t *rnode); void dx_router_del_node_ref_LH(dx_router_ref_list_t *ref_list, dx_router_node_t *rnode); -void dx_router_global_added(dx_router_t *router, dx_field_iterator_t *iter); -void dx_router_global_removed(dx_router_t *router, const char *addr); +void dx_router_mobile_added(dx_router_t *router, dx_field_iterator_t *iter); +void dx_router_mobile_removed(dx_router_t *router, const char *addr); #endif diff --git a/extras/dispatch/src/router_pynode.c b/extras/dispatch/src/router_pynode.c index 358fa50447..82b08c0785 100644 --- a/extras/dispatch/src/router_pynode.c +++ b/extras/dispatch/src/router_pynode.c @@ -430,9 +430,10 @@ static PyObject* dx_unmap_destination(PyObject *self, PyObject *args) } dx_router_del_node_ref_LH(&addr->rnodes, rnode); - dx_router_check_addr_LH(router, addr); sys_mutex_unlock(router->lock); + dx_router_check_addr(router, addr, 0); + dx_log(module, LOG_DEBUG, "Remote Destination '%s' Unmapped from router %d", addr_string, maskbit); Py_INCREF(Py_None); @@ -631,7 +632,7 @@ void dx_pyrouter_tick(dx_router_t *router) } -void dx_router_global_added(dx_router_t *router, dx_field_iterator_t *iter) +void dx_router_mobile_added(dx_router_t *router, dx_field_iterator_t *iter) { PyObject *pArgs; PyObject *pValue; @@ -641,7 +642,7 @@ void dx_router_global_added(dx_router_t *router, dx_field_iterator_t *iter) char *address = (char*) dx_field_iterator_copy(iter); dx_python_lock(); - pArgs = PyTuple_New(1); + pArgs = PyTuple_New(1); PyTuple_SetItem(pArgs, 0, PyString_FromString(address)); pValue = PyObject_CallObject(router->pyAdded, pArgs); if (PyErr_Occurred()) { @@ -658,7 +659,24 @@ void dx_router_global_added(dx_router_t *router, dx_field_iterator_t *iter) } -void dx_router_global_removed(dx_router_t *router, const char *addr) +void dx_router_mobile_removed(dx_router_t *router, const char *address) { + PyObject *pArgs; + PyObject *pValue; + + if (router->pyRemoved && router->router_mode == DX_ROUTER_MODE_INTERIOR) { + dx_python_lock(); + pArgs = PyTuple_New(1); + PyTuple_SetItem(pArgs, 0, PyString_FromString(address)); + pValue = PyObject_CallObject(router->pyRemoved, pArgs); + if (PyErr_Occurred()) { + PyErr_Print(); + } + Py_DECREF(pArgs); + if (pValue) { + Py_DECREF(pValue); + } + dx_python_unlock(); + } } diff --git a/extras/dispatch/tools/src/py/qdstat b/extras/dispatch/tools/src/py/qdstat index e08505501c..32594b4d4e 100755 --- a/extras/dispatch/tools/src/py/qdstat +++ b/extras/dispatch/tools/src/py/qdstat @@ -114,7 +114,7 @@ class BusManager: def _addr_class(self, addr): if not addr: return "-" - if addr[0] == 'M' : return "global" + if addr[0] == 'M' : return "mobile" if addr[0] == 'R' : return "router" if addr[0] == 'A' : return "area" if addr[0] == 'L' : return "local" |