diff options
Diffstat (limited to 'tools/src/py/qpidtoollibs/broker.py')
-rw-r--r-- | tools/src/py/qpidtoollibs/broker.py | 168 |
1 files changed, 123 insertions, 45 deletions
diff --git a/tools/src/py/qpidtoollibs/broker.py b/tools/src/py/qpidtoollibs/broker.py index 6a380caf8d..0bae786306 100644 --- a/tools/src/py/qpidtoollibs/broker.py +++ b/tools/src/py/qpidtoollibs/broker.py @@ -24,6 +24,9 @@ except ImportError: from qpid.datatypes import uuid4 class BrokerAgent(object): + """ + Proxy for a manageable Qpid Broker - Invoke with an opened qpid.messaging.Connection. + """ def __init__(self, conn): self.conn = conn self.sess = self.conn.session() @@ -35,6 +38,9 @@ class BrokerAgent(object): self.next_correlator = 1 def close(self): + """ + Close the proxy session. This will not affect the connection used in creating the object. + """ self.sess.close() def _method(self, method, arguments, addr="org.apache.qpid.broker:broker:amqp-broker", timeout=10): @@ -89,9 +95,8 @@ class BrokerAgent(object): self.sess.acknowledge() return items - def _doNameQuery(self, class_name, object_name, package_name='org.apache.qpid.broker'): - query = {'_what' : 'OBJECT', - '_object_id' : {'_object_name' : "%s:%s:%s" % (package_name, class_name, object_name)}} + def _doNameQuery(self, object_id): + query = {'_what' : 'OBJECT', '_object_id' : {'_object_name' : object_id}} correlator = self._sendRequest('_query_request', query) response = self.reply_rx.fetch(10) if response.properties['qmf.opcode'] != '_query_response': @@ -116,65 +121,74 @@ class BrokerAgent(object): for item in items: objs.append(cls(self, item)) return objs - - def _getBrokerObject(self, cls, name): - obj = self._doNameQuery(cls.__name__.lower(), name) + + def _getBrokerObject(self, cls, oid): + obj = self._doNameQuery(oid) if obj: return cls(self, obj) return None - def getCluster(self): - return self._getAllBrokerObjects(Cluster) - - def getBroker(self): + def _getSingleObject(self, cls): # # getAllBrokerObjects is used instead of getBrokerObject(Broker, 'amqp-broker') because # of a bug that used to be in the broker whereby by-name queries did not return the # object timestamps. # - brokers = self._getAllBrokerObjects(Broker) - if brokers: - return brokers[0] + objects = self._getAllBrokerObjects(cls) + if objects: return objects[0] return None - def getMemory(self): - return self._getAllBrokerObjects(Memory)[0] + def getBroker(self): + """ + Get the Broker object that contains broker-scope statistics and operations. + """ + return self._getSingleObject(Broker) + + + def getCluster(self): + return self._getSingleObject(Cluster) + + def getHaBroker(self): + return self._getSingleObject(HaBroker) def getAllConnections(self): return self._getAllBrokerObjects(Connection) - def getConnection(self, name): - return self._getBrokerObject(Connection, name) + def getConnection(self, oid): + return self._getBrokerObject(Connection, "org.apache.qpid.broker:connection:%s" % oid) def getAllSessions(self): return self._getAllBrokerObjects(Session) - def getSession(self, name): - return self._getBrokerObject(Session, name) + def getSession(self, oid): + return self._getBrokerObject(Session, "org.apache.qpid.broker:session:%s" % oid) def getAllSubscriptions(self): return self._getAllBrokerObjects(Subscription) - def getSubscription(self, name): - return self._getBrokerObject(Subscription, name) + def getSubscription(self, oid): + return self._getBrokerObject(Subscription, "org.apache.qpid.broker:subscription:%s" % oid) def getAllExchanges(self): return self._getAllBrokerObjects(Exchange) def getExchange(self, name): - return self._getBrokerObject(Exchange, name) + return self._getBrokerObject(Exchange, "org.apache.qpid.broker:exchange:%s" % name) def getAllQueues(self): return self._getAllBrokerObjects(Queue) def getQueue(self, name): - return self._getBrokerObject(Queue, name) + return self._getBrokerObject(Queue, "org.apache.qpid.broker:queue:%s" % name) def getAllBindings(self): return self._getAllBrokerObjects(Binding) - def getBinding(self, exchange=None, queue=None): - pass + def getAllLinks(self): + return self._getAllBrokerObjects(Link) + + def getAcl(self): + return self._getSingleObject(Acl) def echo(self, sequence, body): """Request a response to test the path to the management broker""" @@ -204,23 +218,69 @@ class BrokerAgent(object): """Get the message timestamping configuration""" pass -# def addExchange(self, exchange_type, name, **kwargs): -# pass - -# def delExchange(self, name): -# pass - -# def addQueue(self, name, **kwargs): -# pass - -# def delQueue(self, name): -# pass - -# def bind(self, exchange, queue, key, **kwargs): -# pass - -# def unbind(self, exchange, queue, key, **kwargs): -# pass + def addExchange(self, exchange_type, name, options={}, **kwargs): + properties = {} + properties['exchange-type'] = exchange_type + for k,v in options.items(): + properties[k] = v + for k,v in kwargs.items(): + properties[k] = v + args = {'type': 'exchange', + 'name': name, + 'properties': properties, + 'strict': True} + self._method('create', args) + + def delExchange(self, name): + args = {'type': 'exchange', 'name': name} + self._method('delete', args) + + def addQueue(self, name, options={}, **kwargs): + properties = options + for k,v in kwargs.items(): + properties[k] = v + args = {'type': 'queue', + 'name': name, + 'properties': properties, + 'strict': True} + self._method('create', args) + + def delQueue(self, name): + args = {'type': 'queue', 'name': name} + self._method('delete', args) + + def bind(self, exchange, queue, key, options={}, **kwargs): + properties = options + for k,v in kwargs.items(): + properties[k] = v + args = {'type': 'binding', + 'name': "%s/%s/%s" % (exchange, queue, key), + 'properties': properties, + 'strict': True} + self._method('create', args) + + def unbind(self, exchange, queue, key, **kwargs): + args = {'type': 'binding', + 'name': "%s/%s/%s" % (exchange, queue, key), + 'strict': True} + self._method('delete', args) + + def reloadAclFile(self): + self._method('reloadACLFile', {}, "org.apache.qpid.acl:acl:org.apache.qpid.broker:broker:amqp-broker") + + def acl_lookup(self, userName, action, aclObj, aclObjName, propMap): + args = {'userId': userName, + 'action': action, + 'object': aclObj, + 'objectName': aclObjName, + 'propertyMap': propMap} + return self._method('Lookup', args, "org.apache.qpid.acl:acl:org.apache.qpid.broker:broker:amqp-broker") + + def acl_lookupPublish(self, userName, exchange, key): + args = {'userId': userName, + 'exchangeName': exchange, + 'routingKey': key} + return self._method('LookupPublish', args, "org.apache.qpid.acl:acl:org.apache.qpid.broker:broker:amqp-broker") def create(self, _type, name, properties, strict): """Create an object of the specified type""" @@ -230,9 +290,9 @@ class BrokerAgent(object): """Delete an object of the specified type""" pass - def query(self, _type, name): + def query(self, _type, oid): """Query the current state of an object""" - return self._getBrokerObject(self, _type, name) + return self._getBrokerObject(self, _type, oid) class BrokerObject(object): @@ -255,6 +315,9 @@ class BrokerObject(object): return full_name[colon+1:] return value + def getObjectId(self): + return self.content['_object_id']['_object_name'] + def getAttributes(self): return self.values @@ -271,7 +334,7 @@ class BrokerObject(object): """ Reload the property values from the agent. """ - refreshed = self.broker._getBrokerObject(self.__class__, self.name) + refreshed = self.broker._getBrokerObject(self.__class__, self.getObjectId()) if refreshed: self.content = refreshed.content self.values = self.content['_values'] @@ -282,6 +345,14 @@ class Broker(BrokerObject): def __init__(self, broker, values): BrokerObject.__init__(self, broker, values) +class Cluster(BrokerObject): + def __init__(self, broker, values): + BrokerObject.__init__(self, broker, values) + +class HaBroker(BrokerObject): + def __init__(self, broker, values): + BrokerObject.__init__(self, broker, values) + class Memory(BrokerObject): def __init__(self, broker, values): BrokerObject.__init__(self, broker, values) @@ -328,3 +399,10 @@ class Queue(BrokerObject): self.broker._method("reroute", {'request':request,'useAltExchange':useAltExchange,'exchange':exchange,'filter':filter}, "org.apache.qpid.broker:queue:%s" % self.name) +class Link(BrokerObject): + def __init__(self, broker, values): + BrokerObject.__init__(self, broker, values) + +class Acl(BrokerObject): + def __init__(self, broker, values): + BrokerObject.__init__(self, broker, values) |