summaryrefslogtreecommitdiff
path: root/extras
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2013-10-18 18:20:41 +0000
committerTed Ross <tross@apache.org>2013-10-18 18:20:41 +0000
commit101d4332d72385929f1d896a986fc99b8c74dabe (patch)
treeaa4532528435138e11b3cdcc5445fed8991977ae /extras
parentfef9bb8b051a2944c32e19a37a60387a3671f64c (diff)
downloadqpid-python-101d4332d72385929f1d896a986fc99b8c74dabe.tar.gz
QPID-5216
- Fixed propagation of the deletion of locally-attached mobile addresses - Changed 'global' address class to 'mobile' to be consistent git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1533581 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'extras')
-rw-r--r--extras/dispatch/include/qpid/dispatch/hash.h1
-rw-r--r--extras/dispatch/python/qpid/dispatch/router/mobile.py16
-rw-r--r--extras/dispatch/python/qpid/dispatch/router/router_engine.py7
-rw-r--r--extras/dispatch/src/hash.c12
-rw-r--r--extras/dispatch/src/router_node.c67
-rw-r--r--extras/dispatch/src/router_private.h6
-rw-r--r--extras/dispatch/src/router_pynode.c26
-rwxr-xr-xextras/dispatch/tools/src/py/qdstat2
8 files changed, 112 insertions, 25 deletions
diff --git a/extras/dispatch/include/qpid/dispatch/hash.h b/extras/dispatch/include/qpid/dispatch/hash.h
index 3026c55dcf..4f079f6993 100644
--- a/extras/dispatch/include/qpid/dispatch/hash.h
+++ b/extras/dispatch/include/qpid/dispatch/hash.h
@@ -39,6 +39,7 @@ dx_error_t dx_hash_remove(dx_hash_t *h, dx_field_iterator_t *key);
void dx_hash_handle_free(dx_hash_handle_t *handle);
const unsigned char *dx_hash_key_by_handle(const dx_hash_handle_t *handle);
dx_error_t dx_hash_remove_by_handle(dx_hash_t *h, dx_hash_handle_t *handle);
+dx_error_t dx_hash_remove_by_handle2(dx_hash_t *h, dx_hash_handle_t *handle, unsigned char **key);
#endif
diff --git a/extras/dispatch/python/qpid/dispatch/router/mobile.py b/extras/dispatch/python/qpid/dispatch/router/mobile.py
index 0e27910e42..117cf98c22 100644
--- a/extras/dispatch/python/qpid/dispatch/router/mobile.py
+++ b/extras/dispatch/python/qpid/dispatch/router/mobile.py
@@ -145,8 +145,10 @@ class MobileAddressEngine(object):
for addr in msg.del_list:
_list.remove(addr)
self.remote_lists[msg.id] = (msg.mobile_seq, _list)
- self.node_tracker.add_addresses(msg.id, msg.add_list)
- self.node_tracker.del_addresses(msg.id, msg.del_list)
+ if msg.add_list:
+ self.node_tracker.add_addresses(msg.id, msg.add_list)
+ if msg.del_list:
+ self.node_tracker.del_addresses(msg.id, msg.del_list)
self._activate_remotes(msg.id, msg.add_list, msg.del_list)
else:
self.needed_mars[(msg.id, msg.area, _seq)] = None
@@ -178,8 +180,10 @@ class MobileAddressEngine(object):
def _activate_remotes(self, _id, added, deleted):
bit = self.node_tracker.maskbit_for_node(_id)
- for a in added:
- self.container.router_adapter.map_destination(a, bit)
- for d in deleted:
- self.container.router_adapter.unmap_destination(d, bit)
+ if added:
+ for a in added:
+ self.container.router_adapter.map_destination(a, bit)
+ if deleted:
+ for d in deleted:
+ self.container.router_adapter.unmap_destination(d, bit)
diff --git a/extras/dispatch/python/qpid/dispatch/router/router_engine.py b/extras/dispatch/python/qpid/dispatch/router/router_engine.py
index 5128d175a6..f8b2dd9c94 100644
--- a/extras/dispatch/python/qpid/dispatch/router/router_engine.py
+++ b/extras/dispatch/python/qpid/dispatch/router/router_engine.py
@@ -29,6 +29,9 @@ from mobile import MobileAddressEngine
from routing import RoutingTableEngine
from node import NodeTracker
+import sys
+import traceback
+
##
## Import the Dispatch adapters from the environment. If they are not found
## (i.e. we are in a test bench, etc.), load the stub versions.
@@ -106,7 +109,7 @@ class RouterEngine:
try:
if addr.find('Mtemp.') == 0:
return
- if key.find('M') == 0:
+ if addr.find('M') == 0:
self.mobile_address_engine.del_local_address(addr[1:])
except Exception, e:
self.log(LOG_ERROR, "Exception in del-address processing: exception=%r" % e)
@@ -165,6 +168,8 @@ class RouterEngine:
except Exception, e:
self.log(LOG_ERROR, "Exception in message processing: opcode=%s body=%r exception=%r" % (opcode, body, e))
+ exc_type, exc_value, exc_traceback = sys.exc_info()
+ traceback.print_tb(exc_traceback)
def receive(self, message_properties, body, link_id):
diff --git a/extras/dispatch/src/hash.c b/extras/dispatch/src/hash.c
index 24f0999fdc..0cb32acd05 100644
--- a/extras/dispatch/src/hash.c
+++ b/extras/dispatch/src/hash.c
@@ -264,9 +264,19 @@ const unsigned char *dx_hash_key_by_handle(const dx_hash_handle_t *handle)
dx_error_t dx_hash_remove_by_handle(dx_hash_t *h, dx_hash_handle_t *handle)
{
+ unsigned char *key = 0;
+ dx_error_t error = dx_hash_remove_by_handle2(h, handle, &key);
+ if (key)
+ free(key);
+ return error;
+}
+
+
+dx_error_t dx_hash_remove_by_handle2(dx_hash_t *h, dx_hash_handle_t *handle, unsigned char **key)
+{
if (!handle)
return DX_ERROR_NOT_FOUND;
- free(handle->item->key);
+ *key = handle->item->key;
DEQ_REMOVE(handle->bucket->items, handle->item);
free_dx_hash_item_t(handle->item);
h->size--;
diff --git a/extras/dispatch/src/router_node.c b/extras/dispatch/src/router_node.c
index eefd37cb77..28f22317f3 100644
--- a/extras/dispatch/src/router_node.c
+++ b/extras/dispatch/src/router_node.c
@@ -107,18 +107,51 @@ void dx_router_del_node_ref_LH(dx_router_ref_list_t *ref_list, dx_router_node_t
* Depending on its policy, the address may be eligible for being closed out
* (i.e. Logging its terminal statistics and freeing its resources).
*/
-void dx_router_check_addr_LH(dx_router_t *router, dx_address_t *addr)
+void dx_router_check_addr(dx_router_t *router, dx_address_t *addr, int was_local)
{
if (addr == 0)
return;
- if (addr->handler || DEQ_SIZE(addr->rlinks) > 0 || DEQ_SIZE(addr->rnodes) > 0)
- return;
+ unsigned char *key = 0;
+ const unsigned char *key_const = 0;
+ int to_delete = 0;
+ int no_more_locals = 0;
+
+ sys_mutex_lock(router->lock);
+ if (addr->handler == 0 && DEQ_SIZE(addr->rlinks) == 0 && DEQ_SIZE(addr->rnodes) == 0)
+ to_delete = 1;
+
+ if (was_local && DEQ_SIZE(addr->rlinks) == 0)
+ no_more_locals = 1;
+
+ if (to_delete) {
+ dx_hash_remove_by_handle2(router->addr_hash, addr->hash_handle, &key);
+ DEQ_REMOVE(router->addrs, addr);
+ dx_hash_handle_free(addr->hash_handle);
+ free_dx_address_t(addr);
+ }
- dx_hash_remove_by_handle(router->addr_hash, addr->hash_handle);
- DEQ_REMOVE(router->addrs, addr);
- dx_hash_handle_free(addr->hash_handle);
- free_dx_address_t(addr);
+ if (!to_delete && no_more_locals)
+ key_const = dx_hash_key_by_handle(addr->hash_handle);
+
+ sys_mutex_unlock(router->lock);
+
+ //
+ // If the address is mobile-class and it was just removed from a local link,
+ // tell the router module that it is no longer attached locally.
+ //
+ if (no_more_locals) {
+ if (key && key[0] == 'M')
+ dx_router_mobile_removed(router, (const char*) key);
+ if (key_const && key_const[0] == 'M')
+ dx_router_mobile_removed(router, (const char*) key_const);
+ }
+
+ //
+ // Free the key that was not freed by the hash table.
+ //
+ if (key)
+ free(key);
}
@@ -452,6 +485,16 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del
if (iter) {
dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
+
+ //
+ // Note: This function is going to need to be refactored so we can put an
+ // asynchronous address lookup here. In the event there is a translation
+ // of the address (via namespace), it will have to be done here after
+ // obtaining the iterator and before doing the hash lookup.
+ //
+ // Note that this lookup is only done for global/mobile class addresses.
+ //
+
dx_hash_retrieve(router->addr_hash, iter, (void*) &addr);
dx_field_iterator_reset_view(iter, ITER_VIEW_NO_HOST);
int is_local = dx_field_iterator_prefix(iter, local_prefix);
@@ -836,7 +879,7 @@ static int router_outgoing_link_handler(void* context, dx_link_t *link)
sys_mutex_unlock(router->lock);
if (propagate)
- dx_router_global_added(router, iter);
+ dx_router_mobile_added(router, iter);
if (iter)
dx_field_iterator_free(iter);
@@ -853,6 +896,7 @@ static int router_link_detach_handler(void* context, dx_link_t *link, int closed
dx_router_t *router = (dx_router_t*) context;
dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link);
dx_router_conn_t *shared = (dx_router_conn_t*) dx_link_get_conn_context(link);
+ dx_address_t *oaddr = 0;
if (shared) {
dx_link_set_conn_context(link, 0);
@@ -869,7 +913,7 @@ static int router_link_detach_handler(void* context, dx_link_t *link, int closed
//
if (rlink->link_direction == DX_OUTGOING && rlink->owning_addr) {
dx_router_del_link_ref_LH(&rlink->owning_addr->rlinks, rlink);
- dx_router_check_addr_LH(router, rlink->owning_addr);
+ oaddr = rlink->owning_addr;
}
//
@@ -895,6 +939,11 @@ static int router_link_detach_handler(void* context, dx_link_t *link, int closed
DEQ_REMOVE(router->links, rlink);
sys_mutex_unlock(router->lock);
+ //
+ // Check to see if the owning address should be deleted
+ //
+ dx_router_check_addr(router, oaddr, 1);
+
// TODO - wrap the free to handle the recursive items
free_dx_router_link_t(rlink);
diff --git a/extras/dispatch/src/router_private.h b/extras/dispatch/src/router_private.h
index 9800189548..066188b3f7 100644
--- a/extras/dispatch/src/router_private.h
+++ b/extras/dispatch/src/router_private.h
@@ -163,15 +163,15 @@ struct dx_router_t {
-void dx_router_check_addr_LH(dx_router_t *router, dx_address_t *addr);
+void dx_router_check_addr(dx_router_t *router, dx_address_t *addr, int was_local);
void dx_router_add_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link);
void dx_router_del_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link);
void dx_router_add_node_ref_LH(dx_router_ref_list_t *ref_list, dx_router_node_t *rnode);
void dx_router_del_node_ref_LH(dx_router_ref_list_t *ref_list, dx_router_node_t *rnode);
-void dx_router_global_added(dx_router_t *router, dx_field_iterator_t *iter);
-void dx_router_global_removed(dx_router_t *router, const char *addr);
+void dx_router_mobile_added(dx_router_t *router, dx_field_iterator_t *iter);
+void dx_router_mobile_removed(dx_router_t *router, const char *addr);
#endif
diff --git a/extras/dispatch/src/router_pynode.c b/extras/dispatch/src/router_pynode.c
index 358fa50447..82b08c0785 100644
--- a/extras/dispatch/src/router_pynode.c
+++ b/extras/dispatch/src/router_pynode.c
@@ -430,9 +430,10 @@ static PyObject* dx_unmap_destination(PyObject *self, PyObject *args)
}
dx_router_del_node_ref_LH(&addr->rnodes, rnode);
- dx_router_check_addr_LH(router, addr);
sys_mutex_unlock(router->lock);
+ dx_router_check_addr(router, addr, 0);
+
dx_log(module, LOG_DEBUG, "Remote Destination '%s' Unmapped from router %d", addr_string, maskbit);
Py_INCREF(Py_None);
@@ -631,7 +632,7 @@ void dx_pyrouter_tick(dx_router_t *router)
}
-void dx_router_global_added(dx_router_t *router, dx_field_iterator_t *iter)
+void dx_router_mobile_added(dx_router_t *router, dx_field_iterator_t *iter)
{
PyObject *pArgs;
PyObject *pValue;
@@ -641,7 +642,7 @@ void dx_router_global_added(dx_router_t *router, dx_field_iterator_t *iter)
char *address = (char*) dx_field_iterator_copy(iter);
dx_python_lock();
- pArgs = PyTuple_New(1);
+ pArgs = PyTuple_New(1);
PyTuple_SetItem(pArgs, 0, PyString_FromString(address));
pValue = PyObject_CallObject(router->pyAdded, pArgs);
if (PyErr_Occurred()) {
@@ -658,7 +659,24 @@ void dx_router_global_added(dx_router_t *router, dx_field_iterator_t *iter)
}
-void dx_router_global_removed(dx_router_t *router, const char *addr)
+void dx_router_mobile_removed(dx_router_t *router, const char *address)
{
+ PyObject *pArgs;
+ PyObject *pValue;
+
+ if (router->pyRemoved && router->router_mode == DX_ROUTER_MODE_INTERIOR) {
+ dx_python_lock();
+ pArgs = PyTuple_New(1);
+ PyTuple_SetItem(pArgs, 0, PyString_FromString(address));
+ pValue = PyObject_CallObject(router->pyRemoved, pArgs);
+ if (PyErr_Occurred()) {
+ PyErr_Print();
+ }
+ Py_DECREF(pArgs);
+ if (pValue) {
+ Py_DECREF(pValue);
+ }
+ dx_python_unlock();
+ }
}
diff --git a/extras/dispatch/tools/src/py/qdstat b/extras/dispatch/tools/src/py/qdstat
index e08505501c..32594b4d4e 100755
--- a/extras/dispatch/tools/src/py/qdstat
+++ b/extras/dispatch/tools/src/py/qdstat
@@ -114,7 +114,7 @@ class BusManager:
def _addr_class(self, addr):
if not addr:
return "-"
- if addr[0] == 'M' : return "global"
+ if addr[0] == 'M' : return "mobile"
if addr[0] == 'R' : return "router"
if addr[0] == 'A' : return "area"
if addr[0] == 'L' : return "local"