diff options
author | Ted Ross <tross@apache.org> | 2008-11-20 20:28:17 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2008-11-20 20:28:17 +0000 |
commit | db2361819178cec396d5ea6ee40900a3fff20759 (patch) | |
tree | 2e7a54f90899bf7fce3d20486575b8b8e551d6d7 /python/qmf/console.py | |
parent | 47cbb850a3c4816042f825059127759820d2a8f4 (diff) | |
download | qpid-python-db2361819178cec396d5ea6ee40900a3fff20759.tar.gz |
QPID-1464 - Moved qmf console API out of python/qpid and into python/qmf
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@719359 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qmf/console.py')
-rw-r--r-- | python/qmf/console.py | 1472 |
1 files changed, 1472 insertions, 0 deletions
diff --git a/python/qmf/console.py b/python/qmf/console.py new file mode 100644 index 0000000000..bdd93e6f94 --- /dev/null +++ b/python/qmf/console.py @@ -0,0 +1,1472 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" Console API for Qpid Management Framework """ + +import os +import qpid +import struct +import socket +import re +from qpid.peer import Closed +from qpid.connection import Connection, ConnectionFailed +from qpid.datatypes import UUID, uuid4, Message, RangedSet +from qpid.util import connect, ssl, URL +from qpid.codec010 import StringCodec as Codec +from threading import Lock, Condition +from time import time, strftime, gmtime +from cStringIO import StringIO + +class Console: + """ To access the asynchronous operations, a class must be derived from + Console with overrides of any combination of the available methods. """ + + def brokerConnected(self, broker): + """ Invoked when a connection is established to a broker """ + pass + + def brokerDisconnected(self, broker): + """ Invoked when the connection to a broker is lost """ + pass + + def newPackage(self, name): + """ Invoked when a QMF package is discovered. """ + pass + + def newClass(self, kind, classKey): + """ Invoked when a new class is discovered. Session.getSchema can be + used to obtain details about the class.""" + pass + + def newAgent(self, agent): + """ Invoked when a QMF agent is discovered. """ + pass + + def delAgent(self, agent): + """ Invoked when a QMF agent disconects. """ + pass + + def objectProps(self, broker, record): + """ Invoked when an object is updated. """ + pass + + def objectStats(self, broker, record): + """ Invoked when an object is updated. """ + pass + + def event(self, broker, event): + """ Invoked when an event is raised. """ + pass + + def heartbeat(self, agent, timestamp): + """ """ + pass + + def brokerInfo(self, broker): + """ """ + pass + + def methodResponse(self, broker, seq, response): + """ """ + pass + +class BrokerURL(URL): + def __init__(self, text): + URL.__init__(self, text) + socket.gethostbyname(self.host) + if self.port is None: + if self.scheme == URL.AMQPS: + self.port = 5671 + else: + self.port = 5672 + self.authName = self.user or "guest" + self.authPass = self.password or "guest" + self.authMech = "PLAIN" + + def name(self): + return self.host + ":" + str(self.port) + + def match(self, host, port): + return socket.gethostbyname(self.host) == socket.gethostbyname(host) and self.port == port + +class Session: + """ + An instance of the Session class represents a console session running + against one or more QMF brokers. A single instance of Session is needed + to interact with the management framework as a console. + """ + _CONTEXT_SYNC = 1 + _CONTEXT_STARTUP = 2 + _CONTEXT_MULTIGET = 3 + + GET_WAIT_TIME = 60 + + def __init__(self, console=None, rcvObjects=True, rcvEvents=True, rcvHeartbeats=True, + manageConnections=False, userBindings=False): + """ + Initialize a session. If the console argument is provided, the + more advanced asynchronous features are available. If console is + defaulted, the session will operate in a simpler, synchronous manner. + + The rcvObjects, rcvEvents, and rcvHeartbeats arguments are meaningful only if 'console' + is provided. They control whether object updates, events, and agent-heartbeats are + subscribed to. If the console is not interested in receiving one or more of the above, + setting the argument to False will reduce tha bandwidth used by the API. + + If manageConnections is set to True, the Session object will manage connections to + the brokers. This means that if a broker is unreachable, it will retry until a connection + can be established. If a connection is lost, the Session will attempt to reconnect. + + If manageConnections is set to False, the user is responsible for handing failures. In + this case, an unreachable broker will cause addBroker to raise an exception. + + If userBindings is set to False (the default) and rcvObjects is True, the console will + receive data for all object classes. If userBindings is set to True, the user must select + which classes the console shall receive by invoking the bindPackage or bindClass methods. + This allows the console to be configured to receive only information that is relavant to + a particular application. If rcvObjects id False, userBindings has no meaning. + """ + self.console = console + self.brokers = [] + self.packages = {} + self.seqMgr = SequenceManager() + self.cv = Condition() + self.syncSequenceList = [] + self.getResult = [] + self.getSelect = [] + self.error = None + self.rcvObjects = rcvObjects + self.rcvEvents = rcvEvents + self.rcvHeartbeats = rcvHeartbeats + self.userBindings = userBindings + if self.console == None: + self.rcvObjects = False + self.rcvEvents = False + self.rcvHeartbeats = False + self.bindingKeyList = self._bindingKeys() + self.manageConnections = manageConnections + + if self.userBindings and not self.rcvObjects: + raise Exception("userBindings can't be set unless rcvObjects is set and a console is provided") + + if manageConnections: + raise Exception("manageConnections - not yet implemented") + + def __repr__(self): + return "QMF Console Session Manager (brokers connected: %d)" % len(self.brokers) + + def addBroker(self, target="localhost"): + """ Connect to a Qpid broker. Returns an object of type Broker. """ + url = BrokerURL(target) + broker = Broker(self, url.host, url.port, url.authMech, url.authName, url.authPass, + ssl = url.scheme == URL.AMQPS) + if not broker.isConnected and not self.manageConnections: + raise Exception(broker.error) + + self.brokers.append(broker) + self.getObjects(broker=broker, _class="agent") + return broker + + def delBroker(self, broker): + """ Disconnect from a broker. The 'broker' argument is the object + returned from the addBroker call """ + broker._shutdown() + self.brokers.remove(broker) + del broker + + def getPackages(self): + """ Get the list of known QMF packages """ + for broker in self.brokers: + broker._waitForStable() + list = [] + for package in self.packages: + list.append(package) + return list + + def getClasses(self, packageName): + """ Get the list of known classes within a QMF package """ + for broker in self.brokers: + broker._waitForStable() + list = [] + if packageName in self.packages: + for cname, hash in self.packages[packageName]: + list.append((packageName, cname, hash)) + return list + + def getSchema(self, classKey): + """ Get the schema for a QMF class """ + for broker in self.brokers: + broker._waitForStable() + pname, cname, hash = classKey + if pname in self.packages: + if (cname, hash) in self.packages[pname]: + return self.packages[pname][(cname, hash)] + + def bindPackage(self, packageName): + """ """ + if not self.userBindings or not self.rcvObjects: + raise Exception("userBindings option not set for Session") + for broker in self.brokers: + broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName, + binding_key="console.obj.*.*.%s.#" % packageName) + + def bindClass(self, classKey): + """ """ + if not self.userBindings or not self.rcvObjects: + raise Exception("userBindings option not set for Session") + pname, cname, hash = classKey + for broker in self.brokers: + broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName, + binding_key="console.obj.*.*.%s.%s.#" % (pname, cname)) + + def getAgents(self, broker=None): + """ Get a list of currently known agents """ + brokerList = [] + if broker == None: + for b in self.brokers: + brokerList.append(b) + else: + brokerList.append(broker) + + for b in brokerList: + b._waitForStable() + agentList = [] + for b in brokerList: + for a in b.getAgents(): + agentList.append(a) + return agentList + + def getObjects(self, **kwargs): + """ Get a list of objects from QMF agents. + All arguments are passed by name(keyword). + + The class for queried objects may be specified in one of the following ways: + + _schema = <schema> - supply a schema object returned from getSchema. + _key = <key> - supply a classKey from the list returned by getClasses. + _class = <name> - supply a class name as a string. If the class name exists + in multiple packages, a _package argument may also be supplied. + _objectId = <id> - get the object referenced by the object-id + + If objects should be obtained from only one agent, use the following argument. + Otherwise, the query will go to all agents. + + _agent = <agent> - supply an agent from the list returned by getAgents. + + If the get query is to be restricted to one broker (as opposed to all connected brokers), + add the following argument: + + _broker = <broker> - supply a broker as returned by addBroker. + + If additional arguments are supplied, they are used as property selectors. For example, + if the argument name="test" is supplied, only objects whose "name" property is "test" + will be returned in the result. + """ + if "_broker" in kwargs: + brokerList = [] + brokerList.append(kwargs["_broker"]) + else: + brokerList = self.brokers + for broker in brokerList: + broker._waitForStable() + + agentList = [] + if "_agent" in kwargs: + agent = kwargs["_agent"] + if agent.broker not in brokerList: + raise Exception("Supplied agent is not accessible through the supplied broker") + agentList.append(agent) + else: + for broker in brokerList: + for agent in broker.getAgents(): + agentList.append(agent) + + cname = None + if "_schema" in kwargs: pname, cname, hash = kwargs["_schema"].getKey() + elif "_key" in kwargs: pname, cname, hash = kwargs["_key"] + elif "_class" in kwargs: + pname, cname, hash = None, kwargs["_class"], None + if "_package" in kwargs: + pname = kwargs["_package"] + if cname == None and "_objectId" not in kwargs: + raise Exception("No class supplied, use '_schema', '_key', '_class', or '_objectId' argument") + + map = {} + self.getSelect = [] + if "_objectId" in kwargs: + map["_objectid"] = kwargs["_objectId"].__repr__() + else: + map["_class"] = cname + if pname != None: map["_package"] = pname + if hash != None: map["_hash"] = hash + for item in kwargs: + if item[0] != '_': + self.getSelect.append((item, kwargs[item])) + + self.getResult = [] + for agent in agentList: + broker = agent.broker + sendCodec = Codec(broker.conn.spec) + try: + self.cv.acquire() + seq = self.seqMgr._reserve(self._CONTEXT_MULTIGET) + self.syncSequenceList.append(seq) + finally: + self.cv.release() + broker._setHeader(sendCodec, 'G', seq) + sendCodec.write_map(map) + smsg = broker._message(sendCodec.encoded, "agent.%s" % agent.bank) + broker._send(smsg) + + starttime = time() + timeout = False + try: + self.cv.acquire() + while len(self.syncSequenceList) > 0 and self.error == None: + self.cv.wait(self.GET_WAIT_TIME) + if time() - starttime > self.GET_WAIT_TIME: + for pendingSeq in self.syncSequenceList: + self.seqMgr._release(pendingSeq) + self.syncSequenceList = [] + timeout = True + finally: + self.cv.release() + + if self.error: + errorText = self.error + self.error = None + raise Exception(errorText) + + if len(self.getResult) == 0 and timeout: + raise RuntimeError("No agent responded within timeout period") + return self.getResult + + def setEventFilter(self, **kwargs): + """ """ + pass + + def _bindingKeys(self): + keyList = [] + keyList.append("schema.#") + if self.rcvObjects and self.rcvEvents and self.rcvHeartbeats and not self.userBindings: + keyList.append("console.#") + else: + if self.rcvObjects and not self.userBindings: + keyList.append("console.obj.#") + else: + keyList.append("console.obj.*.*.org.apache.qpid.broker.agent") + if self.rcvEvents: + keyList.append("console.event.#") + if self.rcvHeartbeats: + keyList.append("console.heartbeat.#") + return keyList + + def _handleBrokerConnect(self, broker): + pass + + def _handleBrokerDisconnect(self, broker): + pass + + def _handleBrokerResp(self, broker, codec, seq): + broker.brokerId = UUID(codec.read_uuid()) + if self.console != None: + self.console.brokerInfo(broker) + + # Send a package request + # (effectively inc and dec outstanding by not doing anything) + sendCodec = Codec(broker.conn.spec) + seq = self.seqMgr._reserve(self._CONTEXT_STARTUP) + broker._setHeader(sendCodec, 'P', seq) + smsg = broker._message(sendCodec.encoded) + broker._send(smsg) + + def _handlePackageInd(self, broker, codec, seq): + pname = str(codec.read_str8()) + notify = False + try: + self.cv.acquire() + if pname not in self.packages: + self.packages[pname] = {} + notify = True + finally: + self.cv.release() + if notify and self.console != None: + self.console.newPackage(pname) + + # Send a class request + broker._incOutstanding() + sendCodec = Codec(broker.conn.spec) + seq = self.seqMgr._reserve(self._CONTEXT_STARTUP) + broker._setHeader(sendCodec, 'Q', seq) + sendCodec.write_str8(pname) + smsg = broker._message(sendCodec.encoded) + broker._send(smsg) + + def _handleCommandComplete(self, broker, codec, seq): + code = codec.read_uint32() + text = codec.read_str8() + context = self.seqMgr._release(seq) + if context == self._CONTEXT_STARTUP: + broker._decOutstanding() + elif context == self._CONTEXT_SYNC and seq == broker.syncSequence: + try: + broker.cv.acquire() + broker.syncInFlight = False + broker.cv.notify() + finally: + broker.cv.release() + elif context == self._CONTEXT_MULTIGET and seq in self.syncSequenceList: + try: + self.cv.acquire() + self.syncSequenceList.remove(seq) + if len(self.syncSequenceList) == 0: + self.cv.notify() + finally: + self.cv.release() + + def _handleClassInd(self, broker, codec, seq): + kind = codec.read_uint8() + pname = str(codec.read_str8()) + cname = str(codec.read_str8()) + hash = codec.read_bin128() + unknown = False + + try: + self.cv.acquire() + if pname in self.packages: + if (cname, hash) not in self.packages[pname]: + unknown = True + finally: + self.cv.release() + + if unknown: + # Send a schema request for the unknown class + broker._incOutstanding() + sendCodec = Codec(broker.conn.spec) + seq = self.seqMgr._reserve(self._CONTEXT_STARTUP) + broker._setHeader(sendCodec, 'S', seq) + sendCodec.write_str8(pname) + sendCodec.write_str8(cname) + sendCodec.write_bin128(hash) + smsg = broker._message(sendCodec.encoded) + broker._send(smsg) + + def _handleMethodResp(self, broker, codec, seq): + code = codec.read_uint32() + text = codec.read_str16() + outArgs = {} + method, synchronous = self.seqMgr._release(seq) + if code == 0: + for arg in method.arguments: + if arg.dir.find("O") != -1: + outArgs[arg.name] = self._decodeValue(codec, arg.type) + result = MethodResult(code, text, outArgs) + if synchronous: + try: + broker.cv.acquire() + broker.syncResult = result + broker.syncInFlight = False + broker.cv.notify() + finally: + broker.cv.release() + else: + if self.console: + self.console.methodResponse(broker, seq, result) + + def _handleHeartbeatInd(self, broker, codec, seq, msg): + brokerBank = 1 + agentBank = 0 + dp = msg.get("delivery_properties") + if dp: + key = dp["routing_key"] + keyElements = key.split(".") + if len(keyElements) == 4: + brokerBank = int(keyElements[2]) + agentBank = int(keyElements[3]) + + agent = broker.getAgent(brokerBank, agentBank) + timestamp = codec.read_uint64() + if self.console != None: + self.console.heartbeat(agent, timestamp) + + def _handleEventInd(self, broker, codec, seq): + if self.console != None: + event = Event(self, broker, codec) + self.console.event(broker, event) + + def _handleSchemaResp(self, broker, codec, seq): + kind = codec.read_uint8() + pname = str(codec.read_str8()) + cname = str(codec.read_str8()) + hash = codec.read_bin128() + classKey = (pname, cname, hash) + _class = SchemaClass(kind, classKey, codec) + try: + self.cv.acquire() + self.packages[pname][(cname, hash)] = _class + finally: + self.cv.release() + + self.seqMgr._release(seq) + broker._decOutstanding() + if self.console != None: + self.console.newClass(kind, classKey) + + def _handleContentInd(self, broker, codec, seq, prop=False, stat=False): + pname = str(codec.read_str8()) + cname = str(codec.read_str8()) + hash = codec.read_bin128() + classKey = (pname, cname, hash) + try: + self.cv.acquire() + if pname not in self.packages: + return + if (cname, hash) not in self.packages[pname]: + return + schema = self.packages[pname][(cname, hash)] + finally: + self.cv.release() + + object = Object(self, broker, schema, codec, prop, stat) + if pname == "org.apache.qpid.broker" and cname == "agent": + broker._updateAgent(object) + + try: + self.cv.acquire() + if seq in self.syncSequenceList: + if object.getTimestamps()[2] == 0 and self._selectMatch(object): + self.getResult.append(object) + return + finally: + self.cv.release() + + if self.console != None: + if prop: + self.console.objectProps(broker, object) + if stat: + self.console.objectStats(broker, object) + + def _handleError(self, error): + self.error = error + try: + self.cv.acquire() + self.syncSequenceList = [] + self.cv.notify() + finally: + self.cv.release() + + def _selectMatch(self, object): + """ Check the object against self.getSelect to check for a match """ + for key, value in self.getSelect: + for prop, propval in object.getProperties(): + if key == prop.name and value != propval: + return False + return True + + def _decodeValue(self, codec, typecode): + """ Decode, from the codec, a value based on its typecode. """ + if typecode == 1: data = codec.read_uint8() # U8 + elif typecode == 2: data = codec.read_uint16() # U16 + elif typecode == 3: data = codec.read_uint32() # U32 + elif typecode == 4: data = codec.read_uint64() # U64 + elif typecode == 6: data = codec.read_str8() # SSTR + elif typecode == 7: data = codec.read_str16() # LSTR + elif typecode == 8: data = codec.read_int64() # ABSTIME + elif typecode == 9: data = codec.read_uint64() # DELTATIME + elif typecode == 10: data = ObjectId(codec) # REF + elif typecode == 11: data = codec.read_uint8() != 0 # BOOL + elif typecode == 12: data = codec.read_float() # FLOAT + elif typecode == 13: data = codec.read_double() # DOUBLE + elif typecode == 14: data = UUID(codec.read_uuid()) # UUID + elif typecode == 15: data = codec.read_map() # FTABLE + elif typecode == 16: data = codec.read_int8() # S8 + elif typecode == 17: data = codec.read_int16() # S16 + elif typecode == 18: data = codec.read_int32() # S32 + elif typecode == 19: data = codec.read_int64() # S63 + else: + raise ValueError("Invalid type code: %d" % typecode) + return data + + def _encodeValue(self, codec, value, typecode): + """ Encode, into the codec, a value based on its typecode. """ + if typecode == 1: codec.write_uint8 (int(value)) # U8 + elif typecode == 2: codec.write_uint16 (int(value)) # U16 + elif typecode == 3: codec.write_uint32 (long(value)) # U32 + elif typecode == 4: codec.write_uint64 (long(value)) # U64 + elif typecode == 6: codec.write_str8 (value) # SSTR + elif typecode == 7: codec.write_str16 (value) # LSTR + elif typecode == 8: codec.write_int64 (long(value)) # ABSTIME + elif typecode == 9: codec.write_uint64 (long(value)) # DELTATIME + elif typecode == 10: value.encode (codec) # REF + elif typecode == 11: codec.write_uint8 (int(value)) # BOOL + elif typecode == 12: codec.write_float (float(value)) # FLOAT + elif typecode == 13: codec.write_double (double(value)) # DOUBLE + elif typecode == 14: codec.write_uuid (value.bytes) # UUID + elif typecode == 15: codec.write_map (value) # FTABLE + elif typecode == 16: codec.write_int8 (int(value)) # S8 + elif typecode == 17: codec.write_int16 (int(value)) # S16 + elif typecode == 18: codec.write_int32 (int(value)) # S32 + elif typecode == 19: codec.write_int64 (int(value)) # S64 + else: + raise ValueError ("Invalid type code: %d" % typecode) + + def _displayValue(self, value, typecode): + """ """ + if typecode == 1: return unicode(value) + elif typecode == 2: return unicode(value) + elif typecode == 3: return unicode(value) + elif typecode == 4: return unicode(value) + elif typecode == 6: return value + elif typecode == 7: return value + elif typecode == 8: return unicode(strftime("%c", gmtime(value / 1000000000))) + elif typecode == 9: return unicode(value) + elif typecode == 10: return unicode(value.__repr__()) + elif typecode == 11: + if value: return u"T" + else: return u"F" + elif typecode == 12: return unicode(value) + elif typecode == 13: return unicode(value) + elif typecode == 14: return unicode(value.__repr__()) + elif typecode == 15: return unicode(value.__repr__()) + elif typecode == 16: return unicode(value) + elif typecode == 17: return unicode(value) + elif typecode == 18: return unicode(value) + elif typecode == 19: return unicode(value) + else: + raise ValueError ("Invalid type code: %d" % typecode) + + def _sendMethodRequest(self, broker, schemaKey, objectId, name, argList): + """ This function can be used to send a method request to an object given only the + broker, schemaKey, and objectId. This is an uncommon usage pattern as methods are + normally invoked on the object itself. + """ + schema = self.getSchema(schemaKey) + for method in schema.getMethods(): + if name == method.name: + aIdx = 0 + sendCodec = Codec(broker.conn.spec) + seq = self.seqMgr._reserve((method, False)) + broker._setHeader(sendCodec, 'M', seq) + objectId.encode(sendCodec) + pname, cname, hash = schemaKey + sendCodec.write_str8(pname) + sendCodec.write_str8(cname) + sendCodec.write_bin128(hash) + sendCodec.write_str8(name) + + count = 0 + for arg in method.arguments: + if arg.dir.find("I") != -1: + count += 1 + if count != len(argList): + raise Exception("Incorrect number of arguments: expected %d, got %d" % (count, len(argList))) + + for arg in method.arguments: + if arg.dir.find("I") != -1: + self._encodeValue(sendCodec, argList[aIdx], arg.type) + aIdx += 1 + smsg = broker._message(sendCodec.encoded, "agent.%d.%d" % + (objectId.getBrokerBank(), objectId.getAgentBank())) + broker._send(smsg) + return seq + return None + +class Package: + """ """ + def __init__(self, name): + self.name = name + +class SchemaClass: + """ """ + CLASS_KIND_TABLE = 1 + CLASS_KIND_EVENT = 2 + + def __init__(self, kind, key, codec): + self.kind = kind + self.classKey = key + self.properties = [] + self.statistics = [] + self.methods = [] + self.arguments = [] + + if self.kind == self.CLASS_KIND_TABLE: + propCount = codec.read_uint16() + statCount = codec.read_uint16() + methodCount = codec.read_uint16() + for idx in range(propCount): + self.properties.append(SchemaProperty(codec)) + for idx in range(statCount): + self.statistics.append(SchemaStatistic(codec)) + for idx in range(methodCount): + self.methods.append(SchemaMethod(codec)) + + elif self.kind == self.CLASS_KIND_EVENT: + argCount = codec.read_uint16() + for idx in range(argCount): + self.arguments.append(SchemaArgument(codec, methodArg=False)) + + def __repr__(self): + pname, cname, hash = self.classKey + if self.kind == self.CLASS_KIND_TABLE: + kindStr = "Table" + elif self.kind == self.CLASS_KIND_EVENT: + kindStr = "Event" + else: + kindStr = "Unsupported" + result = "%s Class: %s:%s " % (kindStr, pname, cname) + result += "(%08x-%04x-%04x-%04x-%04x%08x)" % struct.unpack ("!LHHHHL", hash) + return result + + def getKey(self): + """ Return the class-key for this class. """ + return self.classKey + + def getProperties(self): + """ Return the list of properties for the class. """ + return self.properties + + def getStatistics(self): + """ Return the list of statistics for the class. """ + return self.statistics + + def getMethods(self): + """ Return the list of methods for the class. """ + return self.methods + + def getArguments(self): + """ Return the list of events for the class. """ + return self.arguments + +class SchemaProperty: + """ """ + def __init__(self, codec): + map = codec.read_map() + self.name = str(map["name"]) + self.type = map["type"] + self.access = str(map["access"]) + self.index = map["index"] != 0 + self.optional = map["optional"] != 0 + self.unit = None + self.min = None + self.max = None + self.maxlen = None + self.desc = None + + for key, value in map.items(): + if key == "unit" : self.unit = value + elif key == "min" : self.min = value + elif key == "max" : self.max = value + elif key == "maxlen" : self.maxlen = value + elif key == "desc" : self.desc = value + + def __repr__(self): + return self.name + +class SchemaStatistic: + """ """ + def __init__(self, codec): + map = codec.read_map() + self.name = str(map["name"]) + self.type = map["type"] + self.unit = None + self.desc = None + + for key, value in map.items(): + if key == "unit" : self.unit = value + elif key == "desc" : self.desc = value + + def __repr__(self): + return self.name + +class SchemaMethod: + """ """ + def __init__(self, codec): + map = codec.read_map() + self.name = str(map["name"]) + argCount = map["argCount"] + if "desc" in map: + self.desc = map["desc"] + else: + self.desc = None + self.arguments = [] + + for idx in range(argCount): + self.arguments.append(SchemaArgument(codec, methodArg=True)) + + def __repr__(self): + result = self.name + "(" + first = True + for arg in self.arguments: + if arg.dir.find("I") != -1: + if first: + first = False + else: + result += ", " + result += arg.name + result += ")" + return result + +class SchemaArgument: + """ """ + def __init__(self, codec, methodArg): + map = codec.read_map() + self.name = str(map["name"]) + self.type = map["type"] + if methodArg: + self.dir = str(map["dir"]).upper() + self.unit = None + self.min = None + self.max = None + self.maxlen = None + self.desc = None + self.default = None + + for key, value in map.items(): + if key == "unit" : self.unit = value + elif key == "min" : self.min = value + elif key == "max" : self.max = value + elif key == "maxlen" : self.maxlen = value + elif key == "desc" : self.desc = value + elif key == "default" : self.default = value + +class ObjectId: + """ Object that represents QMF object identifiers """ + def __init__(self, codec, first=0, second=0): + if codec: + self.first = codec.read_uint64() + self.second = codec.read_uint64() + else: + self.first = first + self.second = second + + def __cmp__(self, other): + if other == None or not isinstance(other, ObjectId) : + return 1 + if self.first < other.first: + return -1 + if self.first > other.first: + return 1 + if self.second < other.second: + return -1 + if self.second > other.second: + return 1 + return 0 + + def __repr__(self): + return "%d-%d-%d-%d-%d" % (self.getFlags(), self.getSequence(), + self.getBrokerBank(), self.getAgentBank(), self.getObject()) + + def index(self): + return (self.first, self.second) + + def getFlags(self): + return (self.first & 0xF000000000000000) >> 60 + + def getSequence(self): + return (self.first & 0x0FFF000000000000) >> 48 + + def getBrokerBank(self): + return (self.first & 0x0000FFFFF0000000) >> 28 + + def getAgentBank(self): + return self.first & 0x000000000FFFFFFF + + def getObject(self): + return self.second + + def isDurable(self): + return self.getSequence() == 0 + + def encode(self, codec): + codec.write_uint64(self.first) + codec.write_uint64(self.second) + +class Object(object): + """ """ + def __init__(self, session, broker, schema, codec, prop, stat): + """ """ + self._session = session + self._broker = broker + self._schema = schema + self._currentTime = codec.read_uint64() + self._createTime = codec.read_uint64() + self._deleteTime = codec.read_uint64() + self._objectId = ObjectId(codec) + self._properties = [] + self._statistics = [] + if prop: + notPresent = self._parsePresenceMasks(codec, schema) + for property in schema.getProperties(): + if property.name in notPresent: + self._properties.append((property, None)) + else: + self._properties.append((property, self._session._decodeValue(codec, property.type))) + if stat: + for statistic in schema.getStatistics(): + self._statistics.append((statistic, self._session._decodeValue(codec, statistic.type))) + + def getBroker(self): + """ Return the broker from which this object was sent """ + return self._broker + + def getObjectId(self): + """ Return the object identifier for this object """ + return self._objectId + + def getClassKey(self): + """ Return the class-key that references the schema describing this object. """ + return self._schema.getKey() + + def getSchema(self): + """ Return the schema that describes this object. """ + return self._schema + + def getMethods(self): + """ Return a list of methods available for this object. """ + return self._schema.getMethods() + + def getTimestamps(self): + """ Return the current, creation, and deletion times for this object. """ + return self._currentTime, self._createTime, self._deleteTime + + def getIndex(self): + """ Return a string describing this object's primary key. """ + result = u"" + for property, value in self._properties: + if property.index: + if result != u"": + result += u":" + try: + valstr = unicode(value) + except: + valstr = u"<undecodable>" + result += valstr + return result + + def getProperties(self): + return self._properties + + def getStatistics(self): + return self._statistics + + def mergeUpdate(self, newer): + """ Replace properties and/or statistics with a newly received update """ + if self._objectId != newer._objectId: + raise Exception("Objects with different object-ids") + if len(newer.getProperties()) > 0: + self.properties = newer.getProperties() + if len(newer.getStatistics()) > 0: + self.statistics = newer.getStatistics() + + def __repr__(self): + return self.getIndex().encode("utf8") + + def __getattr__(self, name): + for method in self._schema.getMethods(): + if name == method.name: + return lambda *args, **kwargs : self._invoke(name, args, kwargs) + for property, value in self._properties: + if name == property.name: + return value + if name == "_" + property.name + "_" and property.type == 10: # Dereference references + deref = self._session.getObjects(_objectId=value, _broker=self._broker) + if len(deref) != 1: + return None + else: + return deref[0] + for statistic, value in self._statistics: + if name == statistic.name: + return value + raise Exception("Type Object has no attribute '%s'" % name) + + def _sendMethodRequest(self, name, args, kwargs, synchronous=False): + for method in self._schema.getMethods(): + if name == method.name: + aIdx = 0 + sendCodec = Codec(self._broker.conn.spec) + seq = self._session.seqMgr._reserve((method, synchronous)) + self._broker._setHeader(sendCodec, 'M', seq) + self._objectId.encode(sendCodec) + pname, cname, hash = self._schema.getKey() + sendCodec.write_str8(pname) + sendCodec.write_str8(cname) + sendCodec.write_bin128(hash) + sendCodec.write_str8(name) + + count = 0 + for arg in method.arguments: + if arg.dir.find("I") != -1: + count += 1 + if count != len(args): + raise Exception("Incorrect number of arguments: expected %d, got %d" % (count, len(args))) + + for arg in method.arguments: + if arg.dir.find("I") != -1: + self._session._encodeValue(sendCodec, args[aIdx], arg.type) + aIdx += 1 + smsg = self._broker._message(sendCodec.encoded, "agent.%d.%d" % + (self._objectId.getBrokerBank(), self._objectId.getAgentBank())) + if synchronous: + try: + self._broker.cv.acquire() + self._broker.syncInFlight = True + finally: + self._broker.cv.release() + self._broker._send(smsg) + return seq + return None + + def _invoke(self, name, args, kwargs): + if self._sendMethodRequest(name, args, kwargs, True): + try: + self._broker.cv.acquire() + starttime = time() + while self._broker.syncInFlight and self._broker.error == None: + self._broker.cv.wait(self._broker.SYNC_TIME) + if time() - starttime > self._broker.SYNC_TIME: + self._session.seqMgr._release(seq) + raise RuntimeError("Timed out waiting for method to respond") + finally: + self._broker.cv.release() + if self._broker.error != None: + errorText = self._broker.error + self._broker.error = None + raise Exception(errorText) + return self._broker.syncResult + raise Exception("Invalid Method (software defect) [%s]" % name) + + def _parsePresenceMasks(self, codec, schema): + excludeList = [] + bit = 0 + for property in schema.getProperties(): + if property.optional: + if bit == 0: + mask = codec.read_uint8() + bit = 1 + if (mask & bit) == 0: + excludeList.append(property.name) + bit *= 2 + if bit == 256: + bit = 0 + return excludeList + +class MethodResult(object): + """ """ + def __init__(self, status, text, outArgs): + """ """ + self.status = status + self.text = text + self.outArgs = outArgs + + def __getattr__(self, name): + if name in self.outArgs: + return self.outArgs[name] + + def __repr__(self): + return "%s (%d) - %s" % (self.text, self.status, self.outArgs) + +class Broker: + """ """ + SYNC_TIME = 60 + + def __init__(self, session, host, port, authMech, authUser, authPass, ssl=False): + self.session = session + self.host = host + self.port = port + self.ssl = ssl + self.authUser = authUser + self.authPass = authPass + self.agents = {} + self.agents["1.0"] = Agent(self, "1.0", "BrokerAgent") + self.topicBound = False + self.cv = Condition() + self.syncInFlight = False + self.syncRequest = 0 + self.syncResult = None + self.reqsOutstanding = 1 + self.error = None + self.brokerId = None + self.isConnected = False + self.amqpSessionId = "%s.%d" % (os.uname()[1], os.getpid()) + self._tryToConnect() + + def isConnected(self): + return self.isConnected + + def getError(self): + return self.error + + def getBrokerId(self): + """ Get broker's unique identifier (UUID) """ + return self.brokerId + + def getBrokerBank(self): + return 1 + + def getAgent(self, brokerBank, agentBank): + bankKey = "%d.%d" % (brokerBank, agentBank) + if bankKey in self.agents: + return self.agents[bankKey] + return None + + def getSessionId(self): + """ Get the identifier of the AMQP session to the broker """ + return self.amqpSessionId + + def getAgents(self): + """ Get the list of agents reachable via this broker """ + return self.agents.values() + + def getAmqpSession(self): + """ Get the AMQP session object for this connected broker. """ + return self.amqpSession + + def getUrl(self): + """ """ + return "%s:%d" % (self.host, self.port) + + def getFullUrl(self, noAuthIfGuestDefault=True): + """ """ + ssl = "" + if self.ssl: + ssl = "s" + auth = "%s/%s@" % (self.authUser, self.authPass) + if self.authUser == "" or \ + (noAuthIfGuestDefault and self.authUser == "guest" and self.authPass == "guest"): + auth = "" + return "amqp%s://%s%s:%d" % (ssl, auth, self.host, self.port or 5672) + + def __repr__(self): + if self.isConnected: + return "Broker connected at: %s" % self.getUrl() + else: + return "Disconnected Broker" + + def _tryToConnect(self): + try: + sock = connect(self.host, self.port) + if self.ssl: + sock = ssl(sock) + self.conn = Connection(sock, username=self.authUser, password=self.authPass) + self.conn.start() + self.replyName = "reply-%s" % self.amqpSessionId + self.amqpSession = self.conn.session(self.amqpSessionId) + self.amqpSession.auto_sync = True + self.amqpSession.queue_declare(queue=self.replyName, exclusive=True, auto_delete=True) + self.amqpSession.exchange_bind(exchange="amq.direct", + queue=self.replyName, binding_key=self.replyName) + self.amqpSession.message_subscribe(queue=self.replyName, destination="rdest", + accept_mode=self.amqpSession.accept_mode.none, + acquire_mode=self.amqpSession.acquire_mode.pre_acquired) + self.amqpSession.incoming("rdest").listen(self._replyCb, self._exceptionCb) + self.amqpSession.message_set_flow_mode(destination="rdest", flow_mode=1) + self.amqpSession.message_flow(destination="rdest", unit=0, value=0xFFFFFFFF) + self.amqpSession.message_flow(destination="rdest", unit=1, value=0xFFFFFFFF) + + self.topicName = "topic-%s" % self.amqpSessionId + self.amqpSession.queue_declare(queue=self.topicName, exclusive=True, auto_delete=True) + self.amqpSession.message_subscribe(queue=self.topicName, destination="tdest", + accept_mode=self.amqpSession.accept_mode.none, + acquire_mode=self.amqpSession.acquire_mode.pre_acquired) + self.amqpSession.incoming("tdest").listen(self._replyCb) + self.amqpSession.message_set_flow_mode(destination="tdest", flow_mode=1) + self.amqpSession.message_flow(destination="tdest", unit=0, value=0xFFFFFFFF) + self.amqpSession.message_flow(destination="tdest", unit=1, value=0xFFFFFFFF) + + self.isConnected = True + self.session._handleBrokerConnect(self) + + codec = Codec(self.conn.spec) + self._setHeader(codec, 'B') + msg = self._message(codec.encoded) + self._send(msg) + + except socket.error, e: + self.error = "Socket Error %s - %s" % (e[0], e[1]) + except Closed, e: + self.error = "Connect Failed %d - %s" % (e[0], e[1]) + except ConnectionFailed, e: + self.error = "Connect Failed %d - %s" % (e[0], e[1]) + + def _updateAgent(self, obj): + bankKey = "%d.%d" % (obj.brokerBank, obj.agentBank) + if obj._deleteTime == 0: + if bankKey not in self.agents: + agent = Agent(self, bankKey, obj.label) + self.agents[bankKey] = agent + if self.session.console != None: + self.session.console.newAgent(agent) + else: + agent = self.agents.pop(bankKey, None) + if agent != None and self.session.console != None: + self.session.console.delAgent(agent) + + def _setHeader(self, codec, opcode, seq=0): + """ Compose the header of a management message. """ + codec.write_uint8(ord('A')) + codec.write_uint8(ord('M')) + codec.write_uint8(ord('2')) + codec.write_uint8(ord(opcode)) + codec.write_uint32(seq) + + def _checkHeader(self, codec): + """ Check the header of a management message and extract the opcode and class. """ + try: + octet = chr(codec.read_uint8()) + if octet != 'A': + return None, None + octet = chr(codec.read_uint8()) + if octet != 'M': + return None, None + octet = chr(codec.read_uint8()) + if octet != '2': + return None, None + opcode = chr(codec.read_uint8()) + seq = codec.read_uint32() + return opcode, seq + except: + return None, None + + def _message (self, body, routing_key="broker"): + dp = self.amqpSession.delivery_properties() + dp.routing_key = routing_key + mp = self.amqpSession.message_properties() + mp.content_type = "x-application/qmf" + mp.reply_to = self.amqpSession.reply_to("amq.direct", self.replyName) + return Message(dp, mp, body) + + def _send(self, msg, dest="qpid.management"): + self.amqpSession.message_transfer(destination=dest, message=msg) + + def _shutdown(self): + if self.isConnected: + self.amqpSession.incoming("rdest").stop() + if self.session.console != None: + self.amqpSession.incoming("tdest").stop() + self.amqpSession.close() + self.conn.close() + self.isConnected = False + else: + raise Exception("Broker already disconnected") + + def _waitForStable(self): + try: + self.cv.acquire() + if self.reqsOutstanding == 0: + return + self.syncInFlight = True + starttime = time() + while self.reqsOutstanding != 0: + self.cv.wait(self.SYNC_TIME) + if time() - starttime > self.SYNC_TIME: + raise RuntimeError("Timed out waiting for broker to synchronize") + finally: + self.cv.release() + + def _incOutstanding(self): + try: + self.cv.acquire() + self.reqsOutstanding += 1 + finally: + self.cv.release() + + def _decOutstanding(self): + try: + self.cv.acquire() + self.reqsOutstanding -= 1 + if self.reqsOutstanding == 0 and not self.topicBound: + self.topicBound = True + for key in self.session.bindingKeyList: + self.amqpSession.exchange_bind(exchange="qpid.management", + queue=self.topicName, binding_key=key) + if self.reqsOutstanding == 0 and self.syncInFlight: + self.syncInFlight = False + self.cv.notify() + finally: + self.cv.release() + + def _replyCb(self, msg): + codec = Codec(self.conn.spec, msg.body) + while True: + opcode, seq = self._checkHeader(codec) + if opcode == None: return + if opcode == 'b': self.session._handleBrokerResp (self, codec, seq) + elif opcode == 'p': self.session._handlePackageInd (self, codec, seq) + elif opcode == 'z': self.session._handleCommandComplete (self, codec, seq) + elif opcode == 'q': self.session._handleClassInd (self, codec, seq) + elif opcode == 'm': self.session._handleMethodResp (self, codec, seq) + elif opcode == 'h': self.session._handleHeartbeatInd (self, codec, seq, msg) + elif opcode == 'e': self.session._handleEventInd (self, codec, seq) + elif opcode == 's': self.session._handleSchemaResp (self, codec, seq) + elif opcode == 'c': self.session._handleContentInd (self, codec, seq, prop=True) + elif opcode == 'i': self.session._handleContentInd (self, codec, seq, stat=True) + elif opcode == 'g': self.session._handleContentInd (self, codec, seq, prop=True, stat=True) + + def _exceptionCb(self, data): + self.isConnected = False + self.error = data + try: + self.cv.acquire() + if self.syncInFlight: + self.cv.notify() + finally: + self.cv.release() + self.session._handleError(self.error) + self.session._handleBrokerDisconnect(self) + +class Agent: + """ """ + def __init__(self, broker, bank, label): + self.broker = broker + self.bank = bank + self.label = label + + def __repr__(self): + return "Agent at bank %s (%s)" % (self.bank, self.label) + + def getBroker(self): + return self.broker + + def getAgentBank(self): + return self.bank + +class Event: + """ """ + def __init__(self, session, broker, codec): + self.session = session + self.broker = broker + pname = str(codec.read_str8()) + cname = str(codec.read_str8()) + hash = codec.read_bin128() + self.classKey = (pname, cname, hash) + self.timestamp = codec.read_int64() + self.severity = codec.read_uint8() + self.schema = None + if pname in session.packages: + if (cname, hash) in session.packages[pname]: + self.schema = session.packages[pname][(cname, hash)] + self.arguments = {} + for arg in self.schema.arguments: + self.arguments[arg.name] = session._decodeValue(codec, arg.type) + + def __repr__(self): + if self.schema == None: + return "<uninterpretable>" + out = strftime("%c", gmtime(self.timestamp / 1000000000)) + out += " " + self._sevName() + " " + self.classKey[0] + ":" + self.classKey[1] + out += " broker=" + self.broker.getUrl() + for arg in self.schema.arguments: + disp = self.session._displayValue(self.arguments[arg.name], arg.type).encode("utf8") + if " " in disp: + disp = "\"" + disp + "\"" + out += " " + arg.name + "=" + disp + return out + + def _sevName(self): + if self.severity == 0 : return "EMER " + if self.severity == 1 : return "ALERT" + if self.severity == 2 : return "CRIT " + if self.severity == 3 : return "ERROR" + if self.severity == 4 : return "WARN " + if self.severity == 5 : return "NOTIC" + if self.severity == 6 : return "INFO " + if self.severity == 7 : return "DEBUG" + return "INV-%d" % self.severity + + def getClassKey(self): + return self.classKey + + def getArguments(self): + return self.arguments + + def getTimestamp(self): + return self.timestamp + + def getName(self): + return self.name + + def getSchema(self): + return self.schema + +class SequenceManager: + """ Manage sequence numbers for asynchronous method calls """ + def __init__(self): + self.lock = Lock() + self.sequence = 0 + self.pending = {} + + def _reserve(self, data): + """ Reserve a unique sequence number """ + try: + self.lock.acquire() + result = self.sequence + self.sequence = self.sequence + 1 + self.pending[result] = data + finally: + self.lock.release() + return result + + def _release(self, seq): + """ Release a reserved sequence number """ + data = None + try: + self.lock.acquire() + if seq in self.pending: + data = self.pending[seq] + del self.pending[seq] + finally: + self.lock.release() + return data + + +class DebugConsole(Console): + """ """ + def brokerConnected(self, broker): + print "brokerConnected:", broker + + def brokerDisconnected(self, broker): + print "brokerDisconnected:", broker + + def newPackage(self, name): + print "newPackage:", name + + def newClass(self, kind, classKey): + print "newClass:", kind, classKey + + def newAgent(self, agent): + print "newAgent:", agent + + def delAgent(self, agent): + print "delAgent:", agent + + def objectProps(self, broker, record): + print "objectProps:", record.getClassKey() + + def objectStats(self, broker, record): + print "objectStats:", record.getClassKey() + + def event(self, broker, event): + print "event:", event + + def heartbeat(self, agent, timestamp): + print "heartbeat:", agent + + def brokerInfo(self, broker): + print "brokerInfo:", broker + |