diff options
Diffstat (limited to 'M4-RCs/qpid/python/qmf/console.py')
-rw-r--r-- | M4-RCs/qpid/python/qmf/console.py | 1625 |
1 files changed, 0 insertions, 1625 deletions
diff --git a/M4-RCs/qpid/python/qmf/console.py b/M4-RCs/qpid/python/qmf/console.py deleted file mode 100644 index 0009726fe7..0000000000 --- a/M4-RCs/qpid/python/qmf/console.py +++ /dev/null @@ -1,1625 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -""" Console API for Qpid Management Framework """ - -import os -import qpid -import struct -import socket -import re -from qpid.peer import Closed -from qpid.connection import Connection, ConnectionFailed -from qpid.datatypes import UUID, uuid4, Message, RangedSet -from qpid.util import connect, ssl, URL -from qpid.codec010 import StringCodec as Codec -from threading import Lock, Condition, Thread -from time import time, strftime, gmtime -from cStringIO import StringIO - -class Console: - """ To access the asynchronous operations, a class must be derived from - Console with overrides of any combination of the available methods. """ - - def brokerConnected(self, broker): - """ Invoked when a connection is established to a broker """ - pass - - def brokerDisconnected(self, broker): - """ Invoked when the connection to a broker is lost """ - pass - - def newPackage(self, name): - """ Invoked when a QMF package is discovered. """ - pass - - def newClass(self, kind, classKey): - """ Invoked when a new class is discovered. Session.getSchema can be - used to obtain details about the class.""" - pass - - def newAgent(self, agent): - """ Invoked when a QMF agent is discovered. """ - pass - - def delAgent(self, agent): - """ Invoked when a QMF agent disconects. """ - pass - - def objectProps(self, broker, record): - """ Invoked when an object is updated. """ - pass - - def objectStats(self, broker, record): - """ Invoked when an object is updated. """ - pass - - def event(self, broker, event): - """ Invoked when an event is raised. """ - pass - - def heartbeat(self, agent, timestamp): - """ """ - pass - - def brokerInfo(self, broker): - """ """ - pass - - def methodResponse(self, broker, seq, response): - """ """ - pass - -class BrokerURL(URL): - def __init__(self, text): - URL.__init__(self, text) - socket.gethostbyname(self.host) - if self.port is None: - if self.scheme == URL.AMQPS: - self.port = 5671 - else: - self.port = 5672 - self.authName = self.user or "guest" - self.authPass = self.password or "guest" - self.authMech = "PLAIN" - - def name(self): - return self.host + ":" + str(self.port) - - def match(self, host, port): - return socket.gethostbyname(self.host) == socket.gethostbyname(host) and self.port == port - -class Session: - """ - An instance of the Session class represents a console session running - against one or more QMF brokers. A single instance of Session is needed - to interact with the management framework as a console. - """ - _CONTEXT_SYNC = 1 - _CONTEXT_STARTUP = 2 - _CONTEXT_MULTIGET = 3 - - GET_WAIT_TIME = 60 - - def __init__(self, console=None, rcvObjects=True, rcvEvents=True, rcvHeartbeats=True, - manageConnections=False, userBindings=False): - """ - Initialize a session. If the console argument is provided, the - more advanced asynchronous features are available. If console is - defaulted, the session will operate in a simpler, synchronous manner. - - The rcvObjects, rcvEvents, and rcvHeartbeats arguments are meaningful only if 'console' - is provided. They control whether object updates, events, and agent-heartbeats are - subscribed to. If the console is not interested in receiving one or more of the above, - setting the argument to False will reduce tha bandwidth used by the API. - - If manageConnections is set to True, the Session object will manage connections to - the brokers. This means that if a broker is unreachable, it will retry until a connection - can be established. If a connection is lost, the Session will attempt to reconnect. - - If manageConnections is set to False, the user is responsible for handing failures. In - this case, an unreachable broker will cause addBroker to raise an exception. - - If userBindings is set to False (the default) and rcvObjects is True, the console will - receive data for all object classes. If userBindings is set to True, the user must select - which classes the console shall receive by invoking the bindPackage or bindClass methods. - This allows the console to be configured to receive only information that is relavant to - a particular application. If rcvObjects id False, userBindings has no meaning. - """ - self.console = console - self.brokers = [] - self.packages = {} - self.seqMgr = SequenceManager() - self.cv = Condition() - self.syncSequenceList = [] - self.getResult = [] - self.getSelect = [] - self.error = None - self.rcvObjects = rcvObjects - self.rcvEvents = rcvEvents - self.rcvHeartbeats = rcvHeartbeats - self.userBindings = userBindings - if self.console == None: - self.rcvObjects = False - self.rcvEvents = False - self.rcvHeartbeats = False - self.bindingKeyList = self._bindingKeys() - self.manageConnections = manageConnections - - if self.userBindings and not self.rcvObjects: - raise Exception("userBindings can't be set unless rcvObjects is set and a console is provided") - - def __repr__(self): - return "QMF Console Session Manager (brokers: %d)" % len(self.brokers) - - def addBroker(self, target="localhost"): - """ Connect to a Qpid broker. Returns an object of type Broker. """ - url = BrokerURL(target) - broker = Broker(self, url.host, url.port, url.authMech, url.authName, url.authPass, - ssl = url.scheme == URL.AMQPS) - - self.brokers.append(broker) - if not self.manageConnections: - self.getObjects(broker=broker, _class="agent") - return broker - - def delBroker(self, broker): - """ Disconnect from a broker. The 'broker' argument is the object - returned from the addBroker call """ - broker._shutdown() - self.brokers.remove(broker) - del broker - - def getPackages(self): - """ Get the list of known QMF packages """ - for broker in self.brokers: - broker._waitForStable() - list = [] - for package in self.packages: - list.append(package) - return list - - def getClasses(self, packageName): - """ Get the list of known classes within a QMF package """ - for broker in self.brokers: - broker._waitForStable() - list = [] - if packageName in self.packages: - for pkey in self.packages[packageName]: - list.append(self.packages[packageName][pkey].getKey()) - return list - - def getSchema(self, classKey): - """ Get the schema for a QMF class """ - for broker in self.brokers: - broker._waitForStable() - pname = classKey.getPackageName() - pkey = classKey.getPackageKey() - if pname in self.packages: - if pkey in self.packages[pname]: - return self.packages[pname][pkey] - - def bindPackage(self, packageName): - """ Request object updates for all table classes within a package. """ - if not self.userBindings or not self.rcvObjects: - raise Exception("userBindings option not set for Session") - key = "console.obj.*.*.%s.#" % packageName - self.bindingKeyList.append(key) - for broker in self.brokers: - if broker.isConnected(): - broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName, - binding_key=key) - - def bindClass(self, pname, cname): - """ Request object updates for a particular table class by package and class name. """ - if not self.userBindings or not self.rcvObjects: - raise Exception("userBindings option not set for Session") - key = "console.obj.*.*.%s.%s.#" % (pname, cname) - self.bindingKeyList.append(key) - for broker in self.brokers: - if broker.isConnected(): - broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName, - binding_key=key) - - def bindClassKey(self, classKey): - """ Request object updates for a particular table class by class key. """ - pname = classKey.getPackageName() - cname = classKey.getClassName() - self.bindClass(pname, cname) - - def getAgents(self, broker=None): - """ Get a list of currently known agents """ - brokerList = [] - if broker == None: - for b in self.brokers: - brokerList.append(b) - else: - brokerList.append(broker) - - for b in brokerList: - b._waitForStable() - agentList = [] - for b in brokerList: - for a in b.getAgents(): - agentList.append(a) - return agentList - - def getObjects(self, **kwargs): - """ Get a list of objects from QMF agents. - All arguments are passed by name(keyword). - - The class for queried objects may be specified in one of the following ways: - - _schema = <schema> - supply a schema object returned from getSchema. - _key = <key> - supply a classKey from the list returned by getClasses. - _class = <name> - supply a class name as a string. If the class name exists - in multiple packages, a _package argument may also be supplied. - _objectId = <id> - get the object referenced by the object-id - - If objects should be obtained from only one agent, use the following argument. - Otherwise, the query will go to all agents. - - _agent = <agent> - supply an agent from the list returned by getAgents. - - If the get query is to be restricted to one broker (as opposed to all connected brokers), - add the following argument: - - _broker = <broker> - supply a broker as returned by addBroker. - - If additional arguments are supplied, they are used as property selectors. For example, - if the argument name="test" is supplied, only objects whose "name" property is "test" - will be returned in the result. - """ - if "_broker" in kwargs: - brokerList = [] - brokerList.append(kwargs["_broker"]) - else: - brokerList = self.brokers - for broker in brokerList: - broker._waitForStable() - - agentList = [] - if "_agent" in kwargs: - agent = kwargs["_agent"] - if agent.broker not in brokerList: - raise Exception("Supplied agent is not accessible through the supplied broker") - if agent.broker.isConnected(): - agentList.append(agent) - else: - if "_objectId" in kwargs: - oid = kwargs["_objectId"] - for broker in brokerList: - for agent in broker.getAgents(): - if agent.getBrokerBank() == oid.getBrokerBank() and agent.getAgentBank() == oid.getAgentBank(): - agentList.append(agent) - else: - for broker in brokerList: - for agent in broker.getAgents(): - if agent.broker.isConnected(): - agentList.append(agent) - - if len(agentList) == 0: - return [] - - pname = None - cname = None - hash = None - classKey = None - if "_schema" in kwargs: classKey = kwargs["_schema"].getKey() - elif "_key" in kwargs: classKey = kwargs["_key"] - elif "_class" in kwargs: - cname = kwargs["_class"] - if "_package" in kwargs: - pname = kwargs["_package"] - if cname == None and classKey == None and "_objectId" not in kwargs: - raise Exception("No class supplied, use '_schema', '_key', '_class', or '_objectId' argument") - - map = {} - self.getSelect = [] - if "_objectId" in kwargs: - map["_objectid"] = kwargs["_objectId"].__repr__() - else: - if cname == None: - cname = classKey.getClassName() - pname = classKey.getPackageName() - hash = classKey.getHash() - map["_class"] = cname - if pname != None: map["_package"] = pname - if hash != None: map["_hash"] = hash - for item in kwargs: - if item[0] != '_': - self.getSelect.append((item, kwargs[item])) - - self.getResult = [] - for agent in agentList: - broker = agent.broker - sendCodec = Codec(broker.conn.spec) - try: - self.cv.acquire() - seq = self.seqMgr._reserve(self._CONTEXT_MULTIGET) - self.syncSequenceList.append(seq) - finally: - self.cv.release() - broker._setHeader(sendCodec, 'G', seq) - sendCodec.write_map(map) - smsg = broker._message(sendCodec.encoded, "agent.%d.%d" % (agent.brokerBank, agent.agentBank)) - broker._send(smsg) - - starttime = time() - timeout = False - try: - self.cv.acquire() - while len(self.syncSequenceList) > 0 and self.error == None: - self.cv.wait(self.GET_WAIT_TIME) - if time() - starttime > self.GET_WAIT_TIME: - for pendingSeq in self.syncSequenceList: - self.seqMgr._release(pendingSeq) - self.syncSequenceList = [] - timeout = True - finally: - self.cv.release() - - if self.error: - errorText = self.error - self.error = None - raise Exception(errorText) - - if len(self.getResult) == 0 and timeout: - raise RuntimeError("No agent responded within timeout period") - return self.getResult - - def setEventFilter(self, **kwargs): - """ """ - pass - - def _bindingKeys(self): - keyList = [] - keyList.append("schema.#") - if self.rcvObjects and self.rcvEvents and self.rcvHeartbeats and not self.userBindings: - keyList.append("console.#") - else: - if self.rcvObjects and not self.userBindings: - keyList.append("console.obj.#") - else: - keyList.append("console.obj.*.*.org.apache.qpid.broker.agent") - if self.rcvEvents: - keyList.append("console.event.#") - if self.rcvHeartbeats: - keyList.append("console.heartbeat.#") - return keyList - - def _handleBrokerConnect(self, broker): - if self.console: - self.console.brokerConnected(broker) - - def _handleBrokerDisconnect(self, broker): - if self.console: - self.console.brokerDisconnected(broker) - - def _handleBrokerResp(self, broker, codec, seq): - broker.brokerId = UUID(codec.read_uuid()) - if self.console != None: - self.console.brokerInfo(broker) - - # Send a package request - # (effectively inc and dec outstanding by not doing anything) - sendCodec = Codec(broker.conn.spec) - seq = self.seqMgr._reserve(self._CONTEXT_STARTUP) - broker._setHeader(sendCodec, 'P', seq) - smsg = broker._message(sendCodec.encoded) - broker._send(smsg) - - def _handlePackageInd(self, broker, codec, seq): - pname = str(codec.read_str8()) - notify = False - try: - self.cv.acquire() - if pname not in self.packages: - self.packages[pname] = {} - notify = True - finally: - self.cv.release() - if notify and self.console != None: - self.console.newPackage(pname) - - # Send a class request - broker._incOutstanding() - sendCodec = Codec(broker.conn.spec) - seq = self.seqMgr._reserve(self._CONTEXT_STARTUP) - broker._setHeader(sendCodec, 'Q', seq) - sendCodec.write_str8(pname) - smsg = broker._message(sendCodec.encoded) - broker._send(smsg) - - def _handleCommandComplete(self, broker, codec, seq): - code = codec.read_uint32() - text = codec.read_str8() - context = self.seqMgr._release(seq) - if context == self._CONTEXT_STARTUP: - broker._decOutstanding() - elif context == self._CONTEXT_SYNC and seq == broker.syncSequence: - try: - broker.cv.acquire() - broker.syncInFlight = False - broker.cv.notify() - finally: - broker.cv.release() - elif context == self._CONTEXT_MULTIGET and seq in self.syncSequenceList: - try: - self.cv.acquire() - self.syncSequenceList.remove(seq) - if len(self.syncSequenceList) == 0: - self.cv.notify() - finally: - self.cv.release() - - def _handleClassInd(self, broker, codec, seq): - kind = codec.read_uint8() - classKey = ClassKey(codec) - unknown = False - - try: - self.cv.acquire() - if classKey.getPackageName() in self.packages: - if classKey.getPackageKey() not in self.packages[classKey.getPackageName()]: - unknown = True - finally: - self.cv.release() - - if unknown: - # Send a schema request for the unknown class - broker._incOutstanding() - sendCodec = Codec(broker.conn.spec) - seq = self.seqMgr._reserve(self._CONTEXT_STARTUP) - broker._setHeader(sendCodec, 'S', seq) - classKey.encode(sendCodec) - smsg = broker._message(sendCodec.encoded) - broker._send(smsg) - - def _handleMethodResp(self, broker, codec, seq): - code = codec.read_uint32() - text = codec.read_str16() - outArgs = {} - method, synchronous = self.seqMgr._release(seq) - if code == 0: - for arg in method.arguments: - if arg.dir.find("O") != -1: - outArgs[arg.name] = self._decodeValue(codec, arg.type) - result = MethodResult(code, text, outArgs) - if synchronous: - try: - broker.cv.acquire() - broker.syncResult = result - broker.syncInFlight = False - broker.cv.notify() - finally: - broker.cv.release() - else: - if self.console: - self.console.methodResponse(broker, seq, result) - - def _handleHeartbeatInd(self, broker, codec, seq, msg): - brokerBank = 1 - agentBank = 0 - dp = msg.get("delivery_properties") - if dp: - key = dp["routing_key"] - keyElements = key.split(".") - if len(keyElements) == 4: - brokerBank = int(keyElements[2]) - agentBank = int(keyElements[3]) - - agent = broker.getAgent(brokerBank, agentBank) - timestamp = codec.read_uint64() - if self.console != None and agent != None: - self.console.heartbeat(agent, timestamp) - - def _handleEventInd(self, broker, codec, seq): - if self.console != None: - event = Event(self, broker, codec) - self.console.event(broker, event) - - def _handleSchemaResp(self, broker, codec, seq): - kind = codec.read_uint8() - classKey = ClassKey(codec) - _class = SchemaClass(kind, classKey, codec) - try: - self.cv.acquire() - self.packages[classKey.getPackageName()][classKey.getPackageKey()] = _class - finally: - self.cv.release() - - self.seqMgr._release(seq) - broker._decOutstanding() - if self.console != None: - self.console.newClass(kind, classKey) - - def _handleContentInd(self, broker, codec, seq, prop=False, stat=False): - classKey = ClassKey(codec) - try: - self.cv.acquire() - pname = classKey.getPackageName() - if pname not in self.packages: - return - pkey = classKey.getPackageKey() - if pkey not in self.packages[pname]: - return - schema = self.packages[pname][pkey] - finally: - self.cv.release() - - object = Object(self, broker, schema, codec, prop, stat) - if pname == "org.apache.qpid.broker" and classKey.getClassName() == "agent" and prop: - broker._updateAgent(object) - - try: - self.cv.acquire() - if seq in self.syncSequenceList: - if object.getTimestamps()[2] == 0 and self._selectMatch(object): - self.getResult.append(object) - return - finally: - self.cv.release() - - if self.console and self.rcvObjects: - if prop: - self.console.objectProps(broker, object) - if stat: - self.console.objectStats(broker, object) - - def _handleError(self, error): - self.error = error - try: - self.cv.acquire() - self.syncSequenceList = [] - self.cv.notify() - finally: - self.cv.release() - - def _selectMatch(self, object): - """ Check the object against self.getSelect to check for a match """ - for key, value in self.getSelect: - for prop, propval in object.getProperties(): - if key == prop.name and value != propval: - return False - return True - - def _decodeValue(self, codec, typecode): - """ Decode, from the codec, a value based on its typecode. """ - if typecode == 1: data = codec.read_uint8() # U8 - elif typecode == 2: data = codec.read_uint16() # U16 - elif typecode == 3: data = codec.read_uint32() # U32 - elif typecode == 4: data = codec.read_uint64() # U64 - elif typecode == 6: data = codec.read_str8() # SSTR - elif typecode == 7: data = codec.read_str16() # LSTR - elif typecode == 8: data = codec.read_int64() # ABSTIME - elif typecode == 9: data = codec.read_uint64() # DELTATIME - elif typecode == 10: data = ObjectId(codec) # REF - elif typecode == 11: data = codec.read_uint8() != 0 # BOOL - elif typecode == 12: data = codec.read_float() # FLOAT - elif typecode == 13: data = codec.read_double() # DOUBLE - elif typecode == 14: data = UUID(codec.read_uuid()) # UUID - elif typecode == 15: data = codec.read_map() # FTABLE - elif typecode == 16: data = codec.read_int8() # S8 - elif typecode == 17: data = codec.read_int16() # S16 - elif typecode == 18: data = codec.read_int32() # S32 - elif typecode == 19: data = codec.read_int64() # S63 - else: - raise ValueError("Invalid type code: %d" % typecode) - return data - - def _encodeValue(self, codec, value, typecode): - """ Encode, into the codec, a value based on its typecode. """ - if typecode == 1: codec.write_uint8 (int(value)) # U8 - elif typecode == 2: codec.write_uint16 (int(value)) # U16 - elif typecode == 3: codec.write_uint32 (long(value)) # U32 - elif typecode == 4: codec.write_uint64 (long(value)) # U64 - elif typecode == 6: codec.write_str8 (value) # SSTR - elif typecode == 7: codec.write_str16 (value) # LSTR - elif typecode == 8: codec.write_int64 (long(value)) # ABSTIME - elif typecode == 9: codec.write_uint64 (long(value)) # DELTATIME - elif typecode == 10: value.encode (codec) # REF - elif typecode == 11: codec.write_uint8 (int(value)) # BOOL - elif typecode == 12: codec.write_float (float(value)) # FLOAT - elif typecode == 13: codec.write_double (float(value)) # DOUBLE - elif typecode == 14: codec.write_uuid (value.bytes) # UUID - elif typecode == 15: codec.write_map (value) # FTABLE - elif typecode == 16: codec.write_int8 (int(value)) # S8 - elif typecode == 17: codec.write_int16 (int(value)) # S16 - elif typecode == 18: codec.write_int32 (int(value)) # S32 - elif typecode == 19: codec.write_int64 (int(value)) # S64 - else: - raise ValueError ("Invalid type code: %d" % typecode) - - def _displayValue(self, value, typecode): - """ """ - if typecode == 1: return unicode(value) - elif typecode == 2: return unicode(value) - elif typecode == 3: return unicode(value) - elif typecode == 4: return unicode(value) - elif typecode == 6: return value - elif typecode == 7: return value - elif typecode == 8: return unicode(strftime("%c", gmtime(value / 1000000000))) - elif typecode == 9: return unicode(value) - elif typecode == 10: return unicode(value.__repr__()) - elif typecode == 11: - if value: return u"T" - else: return u"F" - elif typecode == 12: return unicode(value) - elif typecode == 13: return unicode(value) - elif typecode == 14: return unicode(value.__repr__()) - elif typecode == 15: return unicode(value.__repr__()) - elif typecode == 16: return unicode(value) - elif typecode == 17: return unicode(value) - elif typecode == 18: return unicode(value) - elif typecode == 19: return unicode(value) - else: - raise ValueError ("Invalid type code: %d" % typecode) - - def _sendMethodRequest(self, broker, schemaKey, objectId, name, argList): - """ This function can be used to send a method request to an object given only the - broker, schemaKey, and objectId. This is an uncommon usage pattern as methods are - normally invoked on the object itself. - """ - schema = self.getSchema(schemaKey) - for method in schema.getMethods(): - if name == method.name: - aIdx = 0 - sendCodec = Codec(broker.conn.spec) - seq = self.seqMgr._reserve((method, False)) - broker._setHeader(sendCodec, 'M', seq) - objectId.encode(sendCodec) - schemaKey.encode(sendCodec) - sendCodec.write_str8(name) - - count = 0 - for arg in method.arguments: - if arg.dir.find("I") != -1: - count += 1 - if count != len(argList): - raise Exception("Incorrect number of arguments: expected %d, got %d" % (count, len(argList))) - - for arg in method.arguments: - if arg.dir.find("I") != -1: - self._encodeValue(sendCodec, argList[aIdx], arg.type) - aIdx += 1 - smsg = broker._message(sendCodec.encoded, "agent.%d.%d" % - (objectId.getBrokerBank(), objectId.getAgentBank())) - broker._send(smsg) - return seq - return None - -class Package: - """ """ - def __init__(self, name): - self.name = name - -class ClassKey: - """ A ClassKey uniquely identifies a class from the schema. """ - def __init__(self, constructor): - if type(constructor) == str: - # construct from __repr__ string - try: - self.pname, cls = constructor.split(":") - self.cname, hsh = cls.split("(") - hsh = hsh.strip(")") - hexValues = hsh.split("-") - h0 = int(hexValues[0], 16) - h1 = int(hexValues[1], 16) - h2 = int(hexValues[2], 16) - h3 = int(hexValues[3], 16) - self.hash = struct.pack("!LLLL", h0, h1, h2, h3) - except: - raise Exception("Invalid ClassKey format") - else: - # construct from codec - codec = constructor - self.pname = str(codec.read_str8()) - self.cname = str(codec.read_str8()) - self.hash = codec.read_bin128() - - def encode(self, codec): - codec.write_str8(self.pname) - codec.write_str8(self.cname) - codec.write_bin128(self.hash) - - def getPackageName(self): - return self.pname - - def getClassName(self): - return self.cname - - def getHash(self): - return self.hash - - def getHashString(self): - return "%08x-%08x-%08x-%08x" % struct.unpack ("!LLLL", self.hash) - - def getPackageKey(self): - return (self.cname, self.hash) - - def __repr__(self): - return self.pname + ":" + self.cname + "(" + self.getHashString() + ")" - -class SchemaClass: - """ """ - CLASS_KIND_TABLE = 1 - CLASS_KIND_EVENT = 2 - - def __init__(self, kind, key, codec): - self.kind = kind - self.classKey = key - self.properties = [] - self.statistics = [] - self.methods = [] - self.arguments = [] - - if self.kind == self.CLASS_KIND_TABLE: - propCount = codec.read_uint16() - statCount = codec.read_uint16() - methodCount = codec.read_uint16() - for idx in range(propCount): - self.properties.append(SchemaProperty(codec)) - for idx in range(statCount): - self.statistics.append(SchemaStatistic(codec)) - for idx in range(methodCount): - self.methods.append(SchemaMethod(codec)) - - elif self.kind == self.CLASS_KIND_EVENT: - argCount = codec.read_uint16() - for idx in range(argCount): - self.arguments.append(SchemaArgument(codec, methodArg=False)) - - def __repr__(self): - if self.kind == self.CLASS_KIND_TABLE: - kindStr = "Table" - elif self.kind == self.CLASS_KIND_EVENT: - kindStr = "Event" - else: - kindStr = "Unsupported" - result = "%s Class: %s " % (kindStr, self.classKey.__repr__()) - return result - - def getKey(self): - """ Return the class-key for this class. """ - return self.classKey - - def getProperties(self): - """ Return the list of properties for the class. """ - return self.properties - - def getStatistics(self): - """ Return the list of statistics for the class. """ - return self.statistics - - def getMethods(self): - """ Return the list of methods for the class. """ - return self.methods - - def getArguments(self): - """ Return the list of events for the class. """ - return self.arguments - -class SchemaProperty: - """ """ - def __init__(self, codec): - map = codec.read_map() - self.name = str(map["name"]) - self.type = map["type"] - self.access = str(map["access"]) - self.index = map["index"] != 0 - self.optional = map["optional"] != 0 - self.unit = None - self.min = None - self.max = None - self.maxlen = None - self.desc = None - - for key, value in map.items(): - if key == "unit" : self.unit = value - elif key == "min" : self.min = value - elif key == "max" : self.max = value - elif key == "maxlen" : self.maxlen = value - elif key == "desc" : self.desc = value - - def __repr__(self): - return self.name - -class SchemaStatistic: - """ """ - def __init__(self, codec): - map = codec.read_map() - self.name = str(map["name"]) - self.type = map["type"] - self.unit = None - self.desc = None - - for key, value in map.items(): - if key == "unit" : self.unit = value - elif key == "desc" : self.desc = value - - def __repr__(self): - return self.name - -class SchemaMethod: - """ """ - def __init__(self, codec): - map = codec.read_map() - self.name = str(map["name"]) - argCount = map["argCount"] - if "desc" in map: - self.desc = map["desc"] - else: - self.desc = None - self.arguments = [] - - for idx in range(argCount): - self.arguments.append(SchemaArgument(codec, methodArg=True)) - - def __repr__(self): - result = self.name + "(" - first = True - for arg in self.arguments: - if arg.dir.find("I") != -1: - if first: - first = False - else: - result += ", " - result += arg.name - result += ")" - return result - -class SchemaArgument: - """ """ - def __init__(self, codec, methodArg): - map = codec.read_map() - self.name = str(map["name"]) - self.type = map["type"] - if methodArg: - self.dir = str(map["dir"]).upper() - self.unit = None - self.min = None - self.max = None - self.maxlen = None - self.desc = None - self.default = None - - for key, value in map.items(): - if key == "unit" : self.unit = value - elif key == "min" : self.min = value - elif key == "max" : self.max = value - elif key == "maxlen" : self.maxlen = value - elif key == "desc" : self.desc = value - elif key == "default" : self.default = value - -class ObjectId: - """ Object that represents QMF object identifiers """ - def __init__(self, codec, first=0, second=0): - if codec: - self.first = codec.read_uint64() - self.second = codec.read_uint64() - else: - self.first = first - self.second = second - - def __cmp__(self, other): - if other == None or not isinstance(other, ObjectId) : - return 1 - if self.first < other.first: - return -1 - if self.first > other.first: - return 1 - if self.second < other.second: - return -1 - if self.second > other.second: - return 1 - return 0 - - def __repr__(self): - return "%d-%d-%d-%d-%d" % (self.getFlags(), self.getSequence(), - self.getBrokerBank(), self.getAgentBank(), self.getObject()) - - def index(self): - return (self.first, self.second) - - def getFlags(self): - return (self.first & 0xF000000000000000) >> 60 - - def getSequence(self): - return (self.first & 0x0FFF000000000000) >> 48 - - def getBrokerBank(self): - return (self.first & 0x0000FFFFF0000000) >> 28 - - def getAgentBank(self): - return self.first & 0x000000000FFFFFFF - - def getObject(self): - return self.second - - def isDurable(self): - return self.getSequence() == 0 - - def encode(self, codec): - codec.write_uint64(self.first) - codec.write_uint64(self.second) - - def __hash__(self): - return (self.first, self.second).__hash__() - - def __eq__(self, other): - return (self.first, self.second).__eq__(other) - -class Object(object): - """ """ - def __init__(self, session, broker, schema, codec, prop, stat): - """ """ - self._session = session - self._broker = broker - self._schema = schema - self._currentTime = codec.read_uint64() - self._createTime = codec.read_uint64() - self._deleteTime = codec.read_uint64() - self._objectId = ObjectId(codec) - self._properties = [] - self._statistics = [] - if prop: - notPresent = self._parsePresenceMasks(codec, schema) - for property in schema.getProperties(): - if property.name in notPresent: - self._properties.append((property, None)) - else: - self._properties.append((property, self._session._decodeValue(codec, property.type))) - if stat: - for statistic in schema.getStatistics(): - self._statistics.append((statistic, self._session._decodeValue(codec, statistic.type))) - - def getBroker(self): - """ Return the broker from which this object was sent """ - return self._broker - - def getObjectId(self): - """ Return the object identifier for this object """ - return self._objectId - - def getClassKey(self): - """ Return the class-key that references the schema describing this object. """ - return self._schema.getKey() - - def getSchema(self): - """ Return the schema that describes this object. """ - return self._schema - - def getMethods(self): - """ Return a list of methods available for this object. """ - return self._schema.getMethods() - - def getTimestamps(self): - """ Return the current, creation, and deletion times for this object. """ - return self._currentTime, self._createTime, self._deleteTime - - def getIndex(self): - """ Return a string describing this object's primary key. """ - result = u"" - for property, value in self._properties: - if property.index: - if result != u"": - result += u":" - try: - valstr = unicode(self._session._displayValue(value, property.type)) - except: - valstr = u"<undecodable>" - result += valstr - return result - - def getProperties(self): - return self._properties - - def getStatistics(self): - return self._statistics - - def mergeUpdate(self, newer): - """ Replace properties and/or statistics with a newly received update """ - if self._objectId != newer._objectId: - raise Exception("Objects with different object-ids") - if len(newer.getProperties()) > 0: - self.properties = newer.getProperties() - if len(newer.getStatistics()) > 0: - self.statistics = newer.getStatistics() - - def __repr__(self): - key = self.getClassKey() - return key.getPackageName() + ":" + key.getClassName() +\ - "[" + self.getObjectId().__repr__() + "] " + self.getIndex().encode("utf8") - - def __getattr__(self, name): - for method in self._schema.getMethods(): - if name == method.name: - return lambda *args, **kwargs : self._invoke(name, args, kwargs) - for property, value in self._properties: - if name == property.name: - return value - if name == "_" + property.name + "_" and property.type == 10: # Dereference references - deref = self._session.getObjects(_objectId=value, _broker=self._broker) - if len(deref) != 1: - return None - else: - return deref[0] - for statistic, value in self._statistics: - if name == statistic.name: - return value - raise Exception("Type Object has no attribute '%s'" % name) - - def _sendMethodRequest(self, name, args, kwargs, synchronous=False): - for method in self._schema.getMethods(): - if name == method.name: - aIdx = 0 - sendCodec = Codec(self._broker.conn.spec) - seq = self._session.seqMgr._reserve((method, synchronous)) - self._broker._setHeader(sendCodec, 'M', seq) - self._objectId.encode(sendCodec) - self._schema.getKey().encode(sendCodec) - sendCodec.write_str8(name) - - count = 0 - for arg in method.arguments: - if arg.dir.find("I") != -1: - count += 1 - if count != len(args): - raise Exception("Incorrect number of arguments: expected %d, got %d" % (count, len(args))) - - for arg in method.arguments: - if arg.dir.find("I") != -1: - self._session._encodeValue(sendCodec, args[aIdx], arg.type) - aIdx += 1 - smsg = self._broker._message(sendCodec.encoded, "agent.%d.%d" % - (self._objectId.getBrokerBank(), self._objectId.getAgentBank())) - if synchronous: - try: - self._broker.cv.acquire() - self._broker.syncInFlight = True - finally: - self._broker.cv.release() - self._broker._send(smsg) - return seq - return None - - def _invoke(self, name, args, kwargs): - if self._sendMethodRequest(name, args, kwargs, True): - try: - self._broker.cv.acquire() - starttime = time() - while self._broker.syncInFlight and self._broker.error == None: - self._broker.cv.wait(self._broker.SYNC_TIME) - if time() - starttime > self._broker.SYNC_TIME: - self._session.seqMgr._release(seq) - raise RuntimeError("Timed out waiting for method to respond") - finally: - self._broker.cv.release() - if self._broker.error != None: - errorText = self._broker.error - self._broker.error = None - raise Exception(errorText) - return self._broker.syncResult - raise Exception("Invalid Method (software defect) [%s]" % name) - - def _parsePresenceMasks(self, codec, schema): - excludeList = [] - bit = 0 - for property in schema.getProperties(): - if property.optional: - if bit == 0: - mask = codec.read_uint8() - bit = 1 - if (mask & bit) == 0: - excludeList.append(property.name) - bit *= 2 - if bit == 256: - bit = 0 - return excludeList - -class MethodResult(object): - """ """ - def __init__(self, status, text, outArgs): - """ """ - self.status = status - self.text = text - self.outArgs = outArgs - - def __getattr__(self, name): - if name in self.outArgs: - return self.outArgs[name] - - def __repr__(self): - return "%s (%d) - %s" % (self.text, self.status, self.outArgs) - -class ManagedConnection(Thread): - """ Thread class for managing a connection. """ - DELAY_MIN = 1 - DELAY_MAX = 128 - DELAY_FACTOR = 2 - - def __init__(self, broker): - Thread.__init__(self) - self.broker = broker - self.cv = Condition() - self.canceled = False - - def stop(self): - """ Tell this thread to stop running and return. """ - try: - self.cv.acquire() - self.canceled = True - self.cv.notify() - finally: - self.cv.release() - - def disconnected(self): - """ Notify the thread that the connection was lost. """ - try: - self.cv.acquire() - self.cv.notify() - finally: - self.cv.release() - - def run(self): - """ Main body of the running thread. """ - delay = self.DELAY_MIN - while True: - try: - self.broker._tryToConnect() - try: - self.cv.acquire() - while (not self.canceled) and self.broker.connected: - self.cv.wait() - if self.canceled: - return - delay = self.DELAY_MIN - finally: - self.cv.release() - except socket.error: - if delay < self.DELAY_MAX: - delay *= self.DELAY_FACTOR - except SessionDetached: - if delay < self.DELAY_MAX: - delay *= self.DELAY_FACTOR - except Closed: - if delay < self.DELAY_MAX: - delay *= self.DELAY_FACTOR - - try: - self.cv.acquire() - self.cv.wait(delay) - if self.canceled: - return - finally: - self.cv.release() - -class Broker: - """ This object represents a connection (or potential connection) to a QMF broker. """ - SYNC_TIME = 60 - - def __init__(self, session, host, port, authMech, authUser, authPass, ssl=False): - self.session = session - self.host = host - self.port = port - self.ssl = ssl - self.authUser = authUser - self.authPass = authPass - self.cv = Condition() - self.error = None - self.brokerId = None - self.connected = False - self.amqpSessionId = "%s.%d" % (os.uname()[1], os.getpid()) - if self.session.manageConnections: - self.thread = ManagedConnection(self) - self.thread.start() - else: - self.thread = None - self._tryToConnect() - - def isConnected(self): - """ Return True if there is an active connection to the broker. """ - return self.connected - - def getError(self): - """ Return the last error message seen while trying to connect to the broker. """ - return self.error - - def getBrokerId(self): - """ Get broker's unique identifier (UUID) """ - return self.brokerId - - def getBrokerBank(self): - """ Return the broker-bank value. This is the value that the broker assigns to - objects within its control. This value appears as a field in the ObjectId - of objects created by agents controlled by this broker. """ - return 1 - - def getAgent(self, brokerBank, agentBank): - """ Return the agent object associated with a particular broker and agent bank value.""" - bankKey = (brokerBank, agentBank) - if bankKey in self.agents: - return self.agents[bankKey] - return None - - def getSessionId(self): - """ Get the identifier of the AMQP session to the broker """ - return self.amqpSessionId - - def getAgents(self): - """ Get the list of agents reachable via this broker """ - return self.agents.values() - - def getAmqpSession(self): - """ Get the AMQP session object for this connected broker. """ - return self.amqpSession - - def getUrl(self): - """ """ - return "%s:%d" % (self.host, self.port) - - def getFullUrl(self, noAuthIfGuestDefault=True): - """ """ - ssl = "" - if self.ssl: - ssl = "s" - auth = "%s/%s@" % (self.authUser, self.authPass) - if self.authUser == "" or \ - (noAuthIfGuestDefault and self.authUser == "guest" and self.authPass == "guest"): - auth = "" - return "amqp%s://%s%s:%d" % (ssl, auth, self.host, self.port or 5672) - - def __repr__(self): - if self.connected: - return "Broker connected at: %s" % self.getUrl() - else: - return "Disconnected Broker" - - def _tryToConnect(self): - try: - self.agents = {} - self.agents[(1,0)] = Agent(self, 0, "BrokerAgent") - self.topicBound = False - self.syncInFlight = False - self.syncRequest = 0 - self.syncResult = None - self.reqsOutstanding = 1 - - sock = connect(self.host, self.port) - if self.ssl: - sock = ssl(sock) - self.conn = Connection(sock, username=self.authUser, password=self.authPass) - self.conn.start() - self.replyName = "reply-%s" % self.amqpSessionId - self.amqpSession = self.conn.session(self.amqpSessionId) - self.amqpSession.auto_sync = True - self.amqpSession.queue_declare(queue=self.replyName, exclusive=True, auto_delete=True) - self.amqpSession.exchange_bind(exchange="amq.direct", - queue=self.replyName, binding_key=self.replyName) - self.amqpSession.message_subscribe(queue=self.replyName, destination="rdest", - accept_mode=self.amqpSession.accept_mode.none, - acquire_mode=self.amqpSession.acquire_mode.pre_acquired) - self.amqpSession.incoming("rdest").listen(self._replyCb, self._exceptionCb) - self.amqpSession.message_set_flow_mode(destination="rdest", flow_mode=1) - self.amqpSession.message_flow(destination="rdest", unit=0, value=0xFFFFFFFF) - self.amqpSession.message_flow(destination="rdest", unit=1, value=0xFFFFFFFF) - - self.topicName = "topic-%s" % self.amqpSessionId - self.amqpSession.queue_declare(queue=self.topicName, exclusive=True, auto_delete=True) - self.amqpSession.message_subscribe(queue=self.topicName, destination="tdest", - accept_mode=self.amqpSession.accept_mode.none, - acquire_mode=self.amqpSession.acquire_mode.pre_acquired) - self.amqpSession.incoming("tdest").listen(self._replyCb) - self.amqpSession.message_set_flow_mode(destination="tdest", flow_mode=1) - self.amqpSession.message_flow(destination="tdest", unit=0, value=0xFFFFFFFF) - self.amqpSession.message_flow(destination="tdest", unit=1, value=0xFFFFFFFF) - - self.connected = True - self.session._handleBrokerConnect(self) - - codec = Codec(self.conn.spec) - self._setHeader(codec, 'B') - msg = self._message(codec.encoded) - self._send(msg) - - except socket.error, e: - self.error = "Socket Error %s - %s" % (e[0], e[1]) - raise - except Closed, e: - self.error = "Connect Failed %d - %s" % (e[0], e[1]) - raise - except ConnectionFailed, e: - self.error = "Connect Failed %d - %s" % (e[0], e[1]) - raise - - def _updateAgent(self, obj): - bankKey = (obj.brokerBank, obj.agentBank) - if obj._deleteTime == 0: - if bankKey not in self.agents: - agent = Agent(self, obj.agentBank, obj.label) - self.agents[bankKey] = agent - if self.session.console != None: - self.session.console.newAgent(agent) - else: - agent = self.agents.pop(bankKey, None) - if agent != None and self.session.console != None: - self.session.console.delAgent(agent) - - def _setHeader(self, codec, opcode, seq=0): - """ Compose the header of a management message. """ - codec.write_uint8(ord('A')) - codec.write_uint8(ord('M')) - codec.write_uint8(ord('2')) - codec.write_uint8(ord(opcode)) - codec.write_uint32(seq) - - def _checkHeader(self, codec): - """ Check the header of a management message and extract the opcode and class. """ - try: - octet = chr(codec.read_uint8()) - if octet != 'A': - return None, None - octet = chr(codec.read_uint8()) - if octet != 'M': - return None, None - octet = chr(codec.read_uint8()) - if octet != '2': - return None, None - opcode = chr(codec.read_uint8()) - seq = codec.read_uint32() - return opcode, seq - except: - return None, None - - def _message (self, body, routing_key="broker"): - dp = self.amqpSession.delivery_properties() - dp.routing_key = routing_key - mp = self.amqpSession.message_properties() - mp.content_type = "x-application/qmf" - mp.reply_to = self.amqpSession.reply_to("amq.direct", self.replyName) - return Message(dp, mp, body) - - def _send(self, msg, dest="qpid.management"): - self.amqpSession.message_transfer(destination=dest, message=msg) - - def _shutdown(self): - if self.thread: - self.thread.stop() - self.thread.join() - if self.connected: - self.amqpSession.incoming("rdest").stop() - if self.session.console != None: - self.amqpSession.incoming("tdest").stop() - self.amqpSession.close() - self.conn.close() - self.connected = False - - def _waitForStable(self): - try: - self.cv.acquire() - if not self.connected: - return - if self.reqsOutstanding == 0: - return - self.syncInFlight = True - starttime = time() - while self.reqsOutstanding != 0: - self.cv.wait(self.SYNC_TIME) - if time() - starttime > self.SYNC_TIME: - raise RuntimeError("Timed out waiting for broker to synchronize") - finally: - self.cv.release() - - def _incOutstanding(self): - try: - self.cv.acquire() - self.reqsOutstanding += 1 - finally: - self.cv.release() - - def _decOutstanding(self): - try: - self.cv.acquire() - self.reqsOutstanding -= 1 - if self.reqsOutstanding == 0 and not self.topicBound: - self.topicBound = True - for key in self.session.bindingKeyList: - self.amqpSession.exchange_bind(exchange="qpid.management", - queue=self.topicName, binding_key=key) - if self.reqsOutstanding == 0 and self.syncInFlight: - self.syncInFlight = False - self.cv.notify() - finally: - self.cv.release() - - def _replyCb(self, msg): - codec = Codec(self.conn.spec, msg.body) - while True: - opcode, seq = self._checkHeader(codec) - if opcode == None: return - if opcode == 'b': self.session._handleBrokerResp (self, codec, seq) - elif opcode == 'p': self.session._handlePackageInd (self, codec, seq) - elif opcode == 'z': self.session._handleCommandComplete (self, codec, seq) - elif opcode == 'q': self.session._handleClassInd (self, codec, seq) - elif opcode == 'm': self.session._handleMethodResp (self, codec, seq) - elif opcode == 'h': self.session._handleHeartbeatInd (self, codec, seq, msg) - elif opcode == 'e': self.session._handleEventInd (self, codec, seq) - elif opcode == 's': self.session._handleSchemaResp (self, codec, seq) - elif opcode == 'c': self.session._handleContentInd (self, codec, seq, prop=True) - elif opcode == 'i': self.session._handleContentInd (self, codec, seq, stat=True) - elif opcode == 'g': self.session._handleContentInd (self, codec, seq, prop=True, stat=True) - - def _exceptionCb(self, data): - self.connected = False - self.error = data - try: - self.cv.acquire() - if self.syncInFlight: - self.cv.notify() - finally: - self.cv.release() - self.session._handleError(self.error) - self.session._handleBrokerDisconnect(self) - if self.thread: - self.thread.disconnected() - -class Agent: - """ """ - def __init__(self, broker, agentBank, label): - self.broker = broker - self.brokerBank = broker.getBrokerBank() - self.agentBank = agentBank - self.label = label - - def __repr__(self): - return "Agent at bank %d.%d (%s)" % (self.brokerBank, self.agentBank, self.label) - - def getBroker(self): - return self.broker - - def getBrokerBank(self): - return self.brokerBank - - def getAgentBank(self): - return self.agentBank - -class Event: - """ """ - def __init__(self, session, broker, codec): - self.session = session - self.broker = broker - self.classKey = ClassKey(codec) - self.timestamp = codec.read_int64() - self.severity = codec.read_uint8() - self.schema = None - pname = self.classKey.getPackageName() - pkey = self.classKey.getPackageKey() - if pname in session.packages: - if pkey in session.packages[pname]: - self.schema = session.packages[pname][pkey] - self.arguments = {} - for arg in self.schema.arguments: - self.arguments[arg.name] = session._decodeValue(codec, arg.type) - - def __repr__(self): - if self.schema == None: - return "<uninterpretable>" - out = strftime("%c", gmtime(self.timestamp / 1000000000)) - out += " " + self._sevName() + " " + self.classKey.getPackageName() + ":" + self.classKey.getClassName() - out += " broker=" + self.broker.getUrl() - for arg in self.schema.arguments: - disp = self.session._displayValue(self.arguments[arg.name], arg.type).encode("utf8") - if " " in disp: - disp = "\"" + disp + "\"" - out += " " + arg.name + "=" + disp - return out - - def _sevName(self): - if self.severity == 0 : return "EMER " - if self.severity == 1 : return "ALERT" - if self.severity == 2 : return "CRIT " - if self.severity == 3 : return "ERROR" - if self.severity == 4 : return "WARN " - if self.severity == 5 : return "NOTIC" - if self.severity == 6 : return "INFO " - if self.severity == 7 : return "DEBUG" - return "INV-%d" % self.severity - - def getClassKey(self): - return self.classKey - - def getArguments(self): - return self.arguments - - def getTimestamp(self): - return self.timestamp - - def getName(self): - return self.name - - def getSchema(self): - return self.schema - -class SequenceManager: - """ Manage sequence numbers for asynchronous method calls """ - def __init__(self): - self.lock = Lock() - self.sequence = 0 - self.pending = {} - - def _reserve(self, data): - """ Reserve a unique sequence number """ - try: - self.lock.acquire() - result = self.sequence - self.sequence = self.sequence + 1 - self.pending[result] = data - finally: - self.lock.release() - return result - - def _release(self, seq): - """ Release a reserved sequence number """ - data = None - try: - self.lock.acquire() - if seq in self.pending: - data = self.pending[seq] - del self.pending[seq] - finally: - self.lock.release() - return data - - -class DebugConsole(Console): - """ """ - def brokerConnected(self, broker): - print "brokerConnected:", broker - - def brokerDisconnected(self, broker): - print "brokerDisconnected:", broker - - def newPackage(self, name): - print "newPackage:", name - - def newClass(self, kind, classKey): - print "newClass:", kind, classKey - - def newAgent(self, agent): - print "newAgent:", agent - - def delAgent(self, agent): - print "delAgent:", agent - - def objectProps(self, broker, record): - print "objectProps:", record - - def objectStats(self, broker, record): - print "objectStats:", record - - def event(self, broker, event): - print "event:", event - - def heartbeat(self, agent, timestamp): - print "heartbeat:", agent - - def brokerInfo(self, broker): - print "brokerInfo:", broker - |