summaryrefslogtreecommitdiff
path: root/tools/src/py/qpidtoollibs/broker.py
diff options
context:
space:
mode:
Diffstat (limited to 'tools/src/py/qpidtoollibs/broker.py')
-rw-r--r--tools/src/py/qpidtoollibs/broker.py168
1 files changed, 123 insertions, 45 deletions
diff --git a/tools/src/py/qpidtoollibs/broker.py b/tools/src/py/qpidtoollibs/broker.py
index 6a380caf8d..0bae786306 100644
--- a/tools/src/py/qpidtoollibs/broker.py
+++ b/tools/src/py/qpidtoollibs/broker.py
@@ -24,6 +24,9 @@ except ImportError:
from qpid.datatypes import uuid4
class BrokerAgent(object):
+ """
+ Proxy for a manageable Qpid Broker - Invoke with an opened qpid.messaging.Connection.
+ """
def __init__(self, conn):
self.conn = conn
self.sess = self.conn.session()
@@ -35,6 +38,9 @@ class BrokerAgent(object):
self.next_correlator = 1
def close(self):
+ """
+ Close the proxy session. This will not affect the connection used in creating the object.
+ """
self.sess.close()
def _method(self, method, arguments, addr="org.apache.qpid.broker:broker:amqp-broker", timeout=10):
@@ -89,9 +95,8 @@ class BrokerAgent(object):
self.sess.acknowledge()
return items
- def _doNameQuery(self, class_name, object_name, package_name='org.apache.qpid.broker'):
- query = {'_what' : 'OBJECT',
- '_object_id' : {'_object_name' : "%s:%s:%s" % (package_name, class_name, object_name)}}
+ def _doNameQuery(self, object_id):
+ query = {'_what' : 'OBJECT', '_object_id' : {'_object_name' : object_id}}
correlator = self._sendRequest('_query_request', query)
response = self.reply_rx.fetch(10)
if response.properties['qmf.opcode'] != '_query_response':
@@ -116,65 +121,74 @@ class BrokerAgent(object):
for item in items:
objs.append(cls(self, item))
return objs
-
- def _getBrokerObject(self, cls, name):
- obj = self._doNameQuery(cls.__name__.lower(), name)
+
+ def _getBrokerObject(self, cls, oid):
+ obj = self._doNameQuery(oid)
if obj:
return cls(self, obj)
return None
- def getCluster(self):
- return self._getAllBrokerObjects(Cluster)
-
- def getBroker(self):
+ def _getSingleObject(self, cls):
#
# getAllBrokerObjects is used instead of getBrokerObject(Broker, 'amqp-broker') because
# of a bug that used to be in the broker whereby by-name queries did not return the
# object timestamps.
#
- brokers = self._getAllBrokerObjects(Broker)
- if brokers:
- return brokers[0]
+ objects = self._getAllBrokerObjects(cls)
+ if objects: return objects[0]
return None
- def getMemory(self):
- return self._getAllBrokerObjects(Memory)[0]
+ def getBroker(self):
+ """
+ Get the Broker object that contains broker-scope statistics and operations.
+ """
+ return self._getSingleObject(Broker)
+
+
+ def getCluster(self):
+ return self._getSingleObject(Cluster)
+
+ def getHaBroker(self):
+ return self._getSingleObject(HaBroker)
def getAllConnections(self):
return self._getAllBrokerObjects(Connection)
- def getConnection(self, name):
- return self._getBrokerObject(Connection, name)
+ def getConnection(self, oid):
+ return self._getBrokerObject(Connection, "org.apache.qpid.broker:connection:%s" % oid)
def getAllSessions(self):
return self._getAllBrokerObjects(Session)
- def getSession(self, name):
- return self._getBrokerObject(Session, name)
+ def getSession(self, oid):
+ return self._getBrokerObject(Session, "org.apache.qpid.broker:session:%s" % oid)
def getAllSubscriptions(self):
return self._getAllBrokerObjects(Subscription)
- def getSubscription(self, name):
- return self._getBrokerObject(Subscription, name)
+ def getSubscription(self, oid):
+ return self._getBrokerObject(Subscription, "org.apache.qpid.broker:subscription:%s" % oid)
def getAllExchanges(self):
return self._getAllBrokerObjects(Exchange)
def getExchange(self, name):
- return self._getBrokerObject(Exchange, name)
+ return self._getBrokerObject(Exchange, "org.apache.qpid.broker:exchange:%s" % name)
def getAllQueues(self):
return self._getAllBrokerObjects(Queue)
def getQueue(self, name):
- return self._getBrokerObject(Queue, name)
+ return self._getBrokerObject(Queue, "org.apache.qpid.broker:queue:%s" % name)
def getAllBindings(self):
return self._getAllBrokerObjects(Binding)
- def getBinding(self, exchange=None, queue=None):
- pass
+ def getAllLinks(self):
+ return self._getAllBrokerObjects(Link)
+
+ def getAcl(self):
+ return self._getSingleObject(Acl)
def echo(self, sequence, body):
"""Request a response to test the path to the management broker"""
@@ -204,23 +218,69 @@ class BrokerAgent(object):
"""Get the message timestamping configuration"""
pass
-# def addExchange(self, exchange_type, name, **kwargs):
-# pass
-
-# def delExchange(self, name):
-# pass
-
-# def addQueue(self, name, **kwargs):
-# pass
-
-# def delQueue(self, name):
-# pass
-
-# def bind(self, exchange, queue, key, **kwargs):
-# pass
-
-# def unbind(self, exchange, queue, key, **kwargs):
-# pass
+ def addExchange(self, exchange_type, name, options={}, **kwargs):
+ properties = {}
+ properties['exchange-type'] = exchange_type
+ for k,v in options.items():
+ properties[k] = v
+ for k,v in kwargs.items():
+ properties[k] = v
+ args = {'type': 'exchange',
+ 'name': name,
+ 'properties': properties,
+ 'strict': True}
+ self._method('create', args)
+
+ def delExchange(self, name):
+ args = {'type': 'exchange', 'name': name}
+ self._method('delete', args)
+
+ def addQueue(self, name, options={}, **kwargs):
+ properties = options
+ for k,v in kwargs.items():
+ properties[k] = v
+ args = {'type': 'queue',
+ 'name': name,
+ 'properties': properties,
+ 'strict': True}
+ self._method('create', args)
+
+ def delQueue(self, name):
+ args = {'type': 'queue', 'name': name}
+ self._method('delete', args)
+
+ def bind(self, exchange, queue, key, options={}, **kwargs):
+ properties = options
+ for k,v in kwargs.items():
+ properties[k] = v
+ args = {'type': 'binding',
+ 'name': "%s/%s/%s" % (exchange, queue, key),
+ 'properties': properties,
+ 'strict': True}
+ self._method('create', args)
+
+ def unbind(self, exchange, queue, key, **kwargs):
+ args = {'type': 'binding',
+ 'name': "%s/%s/%s" % (exchange, queue, key),
+ 'strict': True}
+ self._method('delete', args)
+
+ def reloadAclFile(self):
+ self._method('reloadACLFile', {}, "org.apache.qpid.acl:acl:org.apache.qpid.broker:broker:amqp-broker")
+
+ def acl_lookup(self, userName, action, aclObj, aclObjName, propMap):
+ args = {'userId': userName,
+ 'action': action,
+ 'object': aclObj,
+ 'objectName': aclObjName,
+ 'propertyMap': propMap}
+ return self._method('Lookup', args, "org.apache.qpid.acl:acl:org.apache.qpid.broker:broker:amqp-broker")
+
+ def acl_lookupPublish(self, userName, exchange, key):
+ args = {'userId': userName,
+ 'exchangeName': exchange,
+ 'routingKey': key}
+ return self._method('LookupPublish', args, "org.apache.qpid.acl:acl:org.apache.qpid.broker:broker:amqp-broker")
def create(self, _type, name, properties, strict):
"""Create an object of the specified type"""
@@ -230,9 +290,9 @@ class BrokerAgent(object):
"""Delete an object of the specified type"""
pass
- def query(self, _type, name):
+ def query(self, _type, oid):
"""Query the current state of an object"""
- return self._getBrokerObject(self, _type, name)
+ return self._getBrokerObject(self, _type, oid)
class BrokerObject(object):
@@ -255,6 +315,9 @@ class BrokerObject(object):
return full_name[colon+1:]
return value
+ def getObjectId(self):
+ return self.content['_object_id']['_object_name']
+
def getAttributes(self):
return self.values
@@ -271,7 +334,7 @@ class BrokerObject(object):
"""
Reload the property values from the agent.
"""
- refreshed = self.broker._getBrokerObject(self.__class__, self.name)
+ refreshed = self.broker._getBrokerObject(self.__class__, self.getObjectId())
if refreshed:
self.content = refreshed.content
self.values = self.content['_values']
@@ -282,6 +345,14 @@ class Broker(BrokerObject):
def __init__(self, broker, values):
BrokerObject.__init__(self, broker, values)
+class Cluster(BrokerObject):
+ def __init__(self, broker, values):
+ BrokerObject.__init__(self, broker, values)
+
+class HaBroker(BrokerObject):
+ def __init__(self, broker, values):
+ BrokerObject.__init__(self, broker, values)
+
class Memory(BrokerObject):
def __init__(self, broker, values):
BrokerObject.__init__(self, broker, values)
@@ -328,3 +399,10 @@ class Queue(BrokerObject):
self.broker._method("reroute", {'request':request,'useAltExchange':useAltExchange,'exchange':exchange,'filter':filter},
"org.apache.qpid.broker:queue:%s" % self.name)
+class Link(BrokerObject):
+ def __init__(self, broker, values):
+ BrokerObject.__init__(self, broker, values)
+
+class Acl(BrokerObject):
+ def __init__(self, broker, values):
+ BrokerObject.__init__(self, broker, values)