diff options
Diffstat (limited to 'qpid/extras/qmf/src/py/qmf/console.py')
-rw-r--r-- | qpid/extras/qmf/src/py/qmf/console.py | 4043 |
1 files changed, 4043 insertions, 0 deletions
diff --git a/qpid/extras/qmf/src/py/qmf/console.py b/qpid/extras/qmf/src/py/qmf/console.py new file mode 100644 index 0000000000..ecb0e1d9d0 --- /dev/null +++ b/qpid/extras/qmf/src/py/qmf/console.py @@ -0,0 +1,4043 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" Console API for Qpid Management Framework """ + +import os +import platform +import qpid +import struct +import socket +import re +from qpid.datatypes import UUID +from qpid.datatypes import timestamp +from qpid.datatypes import datetime +from qpid.exceptions import Closed +from qpid.session import SessionDetached +from qpid.connection import Connection, ConnectionFailed, Timeout +from qpid.datatypes import Message, RangedSet, UUID +from qpid.util import connect, ssl, URL +from qpid.codec010 import StringCodec as Codec +from threading import Lock, Condition, Thread, Semaphore +from Queue import Queue, Empty +from time import time, strftime, gmtime, sleep +from cStringIO import StringIO + +#import qpid.log +#qpid.log.enable(name="qpid.io.cmd", level=qpid.log.DEBUG) + +#=================================================================================================== +# CONSOLE +#=================================================================================================== +class Console: + """ To access the asynchronous operations, a class must be derived from + Console with overrides of any combination of the available methods. """ + + def brokerConnected(self, broker): + """ Invoked when a connection is established to a broker """ + pass + + def brokerConnectionFailed(self, broker): + """ Invoked when a connection to a broker fails """ + pass + + def brokerDisconnected(self, broker): + """ Invoked when the connection to a broker is lost """ + pass + + def newPackage(self, name): + """ Invoked when a QMF package is discovered. """ + pass + + def newClass(self, kind, classKey): + """ Invoked when a new class is discovered. Session.getSchema can be + used to obtain details about the class.""" + pass + + def newAgent(self, agent): + """ Invoked when a QMF agent is discovered. """ + pass + + def delAgent(self, agent): + """ Invoked when a QMF agent disconects. """ + pass + + def objectProps(self, broker, record): + """ Invoked when an object is updated. """ + pass + + def objectStats(self, broker, record): + """ Invoked when an object is updated. """ + pass + + def event(self, broker, event): + """ Invoked when an event is raised. """ + pass + + def heartbeat(self, agent, timestamp): + """ Invoked when an agent heartbeat is received. """ + pass + + def brokerInfo(self, broker): + """ Invoked when the connection sequence reaches the point where broker information is available. """ + pass + + def methodResponse(self, broker, seq, response): + """ Invoked when a method response from an asynchronous method call is received. """ + pass + + +#=================================================================================================== +# BrokerURL +#=================================================================================================== +class BrokerURL(URL): + def __init__(self, text): + URL.__init__(self, text) + if self.port is None: + if self.scheme == URL.AMQPS: + self.port = 5671 + else: + self.port = 5672 + self.authName = None + self.authPass = None + if self.user: + self.authName = str(self.user) + if self.password: + self.authPass = str(self.password) + + def name(self): + return self.host + ":" + str(self.port) + + def match(self, host, port): + return socket.getaddrinfo(self.host, self.port)[0][4] == socket.getaddrinfo(host, port)[0][4] + + +#=================================================================================================== +# Object +#=================================================================================================== +class Object(object): + """ + This class defines a 'proxy' object representing a real managed object on an agent. + Actions taken on this proxy are remotely affected on the real managed object. + """ + def __init__(self, agent, schema, codec=None, prop=None, stat=None, v2Map=None, agentName=None, kwargs={}): + self._agent = agent + self._session = None + self._broker = None + if agent: + self._session = agent.session + self._broker = agent.broker + self._schema = schema + self._properties = [] + self._statistics = [] + self._currentTime = None + self._createTime = None + self._deleteTime = 0 + self._objectId = None + if v2Map: + self.v2Init(v2Map, agentName) + return + + if self._agent: + self._currentTime = codec.read_uint64() + self._createTime = codec.read_uint64() + self._deleteTime = codec.read_uint64() + self._objectId = ObjectId(codec) + if codec: + if prop: + notPresent = self._parsePresenceMasks(codec, schema) + for property in schema.getProperties(): + if property.name in notPresent: + self._properties.append((property, None)) + else: + self._properties.append((property, self._session._decodeValue(codec, property.type, self._broker))) + if stat: + for statistic in schema.getStatistics(): + self._statistics.append((statistic, self._session._decodeValue(codec, statistic.type, self._broker))) + else: + for property in schema.getProperties(): + if property.optional: + self._properties.append((property, None)) + else: + self._properties.append((property, self._session._defaultValue(property, self._broker, kwargs))) + for statistic in schema.getStatistics(): + self._statistics.append((statistic, self._session._defaultValue(statistic, self._broker, kwargs))) + + def v2Init(self, omap, agentName): + if omap.__class__ != dict: + raise Exception("QMFv2 object data must be a map/dict") + if '_values' not in omap: + raise Exception("QMFv2 object must have '_values' element") + + values = omap['_values'] + for prop in self._schema.getProperties(): + if prop.name in values: + if prop.type == 10: # Reference + self._properties.append((prop, ObjectId(values[prop.name], agentName=agentName))) + else: + self._properties.append((prop, values[prop.name])) + for stat in self._schema.getStatistics(): + if stat.name in values: + self._statistics.append((stat, values[stat.name])) + if '_subtypes' in omap: + self._subtypes = omap['_subtypes'] + if '_object_id' in omap: + self._objectId = ObjectId(omap['_object_id'], agentName=agentName) + else: + self._objectId = None + + self._currentTime = omap.get("_update_ts", 0) + self._createTime = omap.get("_create_ts", 0) + self._deleteTime = omap.get("_delete_ts", 0) + + def getAgent(self): + """ Return the agent from which this object was sent """ + return self._agent + + def getBroker(self): + """ Return the broker from which this object was sent """ + return self._broker + + def getV2RoutingKey(self): + """ Get the QMFv2 routing key to address this object """ + return self._agent.getV2RoutingKey() + + def getObjectId(self): + """ Return the object identifier for this object """ + return self._objectId + + def getClassKey(self): + """ Return the class-key that references the schema describing this object. """ + return self._schema.getKey() + + def getSchema(self): + """ Return the schema that describes this object. """ + return self._schema + + def getMethods(self): + """ Return a list of methods available for this object. """ + return self._schema.getMethods() + + def getTimestamps(self): + """ Return the current, creation, and deletion times for this object. """ + return self._currentTime, self._createTime, self._deleteTime + + def isDeleted(self): + """ Return True iff this object has been deleted. """ + return self._deleteTime != 0 + + def isManaged(self): + """ Return True iff this object is a proxy for a managed object on an agent. """ + return self._objectId and self._agent + + def getIndex(self): + """ Return a string describing this object's primary key. """ + if self._objectId.isV2: + return self._objectId.getObject() + result = u"" + for prop, value in self._properties: + if prop.index: + if result != u"": + result += u":" + try: + valstr = unicode(self._session._displayValue(value, prop.type)) + except Exception, e: + valstr = u"<undecodable>" + result += valstr + return result + + def getProperties(self): + """ Return a list of object properties """ + return self._properties + + def getStatistics(self): + """ Return a list of object statistics """ + return self._statistics + + def mergeUpdate(self, newer): + """ Replace properties and/or statistics with a newly received update """ + if not self.isManaged(): + raise Exception("Object is not managed") + if self._objectId != newer._objectId: + raise Exception("Objects with different object-ids") + if len(newer.getProperties()) > 0: + self._properties = newer.getProperties() + if len(newer.getStatistics()) > 0: + self._statistics = newer.getStatistics() + self._currentTime = newer._currentTime + self._deleteTime = newer._deleteTime + + def update(self): + """ Contact the agent and retrieve the lastest property and statistic values for this object. """ + if not self.isManaged(): + raise Exception("Object is not managed") + obj = self._agent.getObjects(_objectId=self._objectId) + if obj: + self.mergeUpdate(obj[0]) + else: + raise Exception("Underlying object no longer exists") + + def __repr__(self): + if self.isManaged(): + id = self.getObjectId().__repr__() + else: + id = "unmanaged" + key = self.getClassKey() + return key.getPackageName() + ":" + key.getClassName() +\ + "[" + id + "] " + self.getIndex().encode("utf8") + + def __getattr__(self, name): + for method in self._schema.getMethods(): + if name == method.name: + return lambda *args, **kwargs : self._invoke(name, args, kwargs) + for prop, value in self._properties: + if name == prop.name: + return value + if name == "_" + prop.name + "_" and prop.type == 10: # Dereference references + deref = self._agent.getObjects(_objectId=value) + if len(deref) != 1: + return None + else: + return deref[0] + for stat, value in self._statistics: + if name == stat.name: + return value + + # + # Check to see if the name is in the schema. If so, return None (i.e. this is a not-present attribute) + # + for prop in self._schema.getProperties(): + if name == prop.name: + return None + for stat in self._schema.getStatistics(): + if name == stat.name: + return None + raise Exception("Type Object has no attribute '%s'" % name) + + def __setattr__(self, name, value): + if name[0] == '_': + super.__setattr__(self, name, value) + return + + for prop, unusedValue in self._properties: + if name == prop.name: + newprop = (prop, value) + newlist = [] + for old, val in self._properties: + if name == old.name: + newlist.append(newprop) + else: + newlist.append((old, val)) + self._properties = newlist + return + super.__setattr__(self, name, value) + + def _sendMethodRequest(self, name, args, kwargs, synchronous=False, timeWait=None): + for method in self._schema.getMethods(): + if name == method.name: + aIdx = 0 + sendCodec = Codec() + seq = self._session.seqMgr._reserve((method, synchronous)) + + count = 0 + for arg in method.arguments: + if arg.dir.find("I") != -1: + count += 1 + if count != len(args): + raise Exception("Incorrect number of arguments: expected %d, got %d" % (count, len(args))) + + if self._agent.isV2: + # + # Compose and send a QMFv2 method request + # + call = {} + call['_object_id'] = self._objectId.asMap() + call['_method_name'] = name + argMap = {} + for arg in method.arguments: + if arg.dir.find("I") != -1: + argMap[arg.name] = args[aIdx] + aIdx += 1 + call['_arguments'] = argMap + + dp = self._broker.amqpSession.delivery_properties() + dp.routing_key = self.getV2RoutingKey() + mp = self._broker.amqpSession.message_properties() + mp.content_type = "amqp/map" + if self._broker.saslUser: + mp.user_id = self._broker.saslUser + mp.correlation_id = str(seq) + mp.app_id = "qmf2" + mp.reply_to = self._broker.amqpSession.reply_to("qmf.default.direct", self._broker.v2_direct_queue) + mp.application_headers = {'qmf.opcode':'_method_request'} + sendCodec.write_map(call) + smsg = Message(dp, mp, sendCodec.encoded) + exchange = "qmf.default.direct" + + else: + # + # Associate this sequence with the agent hosting the object so we can correctly + # route the method-response + # + agent = self._broker.getAgent(self._broker.getBrokerBank(), self._objectId.getAgentBank()) + self._broker._setSequence(seq, agent) + + # + # Compose and send a QMFv1 method request + # + self._broker._setHeader(sendCodec, 'M', seq) + self._objectId.encode(sendCodec) + self._schema.getKey().encode(sendCodec) + sendCodec.write_str8(name) + + for arg in method.arguments: + if arg.dir.find("I") != -1: + self._session._encodeValue(sendCodec, args[aIdx], arg.type) + aIdx += 1 + smsg = self._broker._message(sendCodec.encoded, "agent.%d.%s" % + (self._objectId.getBrokerBank(), self._objectId.getAgentBank())) + exchange = "qpid.management" + + if synchronous: + try: + self._broker.cv.acquire() + self._broker.syncInFlight = True + finally: + self._broker.cv.release() + self._broker._send(smsg, exchange) + return seq + return None + + def _invoke(self, name, args, kwargs): + if not self.isManaged(): + raise Exception("Object is not managed") + if "_timeout" in kwargs: + timeout = kwargs["_timeout"] + else: + timeout = self._broker.SYNC_TIME + + if "_async" in kwargs and kwargs["_async"]: + sync = False + if "_timeout" not in kwargs: + timeout = None + else: + sync = True + + seq = self._sendMethodRequest(name, args, kwargs, sync, timeout) + if seq: + if not sync: + return seq + self._broker.cv.acquire() + try: + starttime = time() + while self._broker.syncInFlight and self._broker.error == None: + self._broker.cv.wait(timeout) + if time() - starttime > timeout: + raise RuntimeError("Timed out waiting for method to respond") + finally: + self._session.seqMgr._release(seq) + self._broker.cv.release() + if self._broker.error != None: + errorText = self._broker.error + self._broker.error = None + raise Exception(errorText) + return self._broker.syncResult + raise Exception("Invalid Method (software defect) [%s]" % name) + + def _encodeUnmanaged(self, codec): + codec.write_uint8(20) + codec.write_str8(self._schema.getKey().getPackageName()) + codec.write_str8(self._schema.getKey().getClassName()) + codec.write_bin128(self._schema.getKey().getHash()) + + # emit presence masks for optional properties + mask = 0 + bit = 0 + for prop, value in self._properties: + if prop.optional: + if bit == 0: + bit = 1 + if value: + mask |= bit + bit = bit << 1 + if bit == 256: + bit = 0 + codec.write_uint8(mask) + mask = 0 + if bit != 0: + codec.write_uint8(mask) + + # encode properties + for prop, value in self._properties: + if value != None: + self._session._encodeValue(codec, value, prop.type) + + # encode statistics + for stat, value in self._statistics: + self._session._encodeValue(codec, value, stat.type) + + def _parsePresenceMasks(self, codec, schema): + excludeList = [] + bit = 0 + for property in schema.getProperties(): + if property.optional: + if bit == 0: + mask = codec.read_uint8() + bit = 1 + if (mask & bit) == 0: + excludeList.append(property.name) + bit *= 2 + if bit == 256: + bit = 0 + return excludeList + + +#=================================================================================================== +# Session +#=================================================================================================== +class Session: + """ + An instance of the Session class represents a console session running + against one or more QMF brokers. A single instance of Session is needed + to interact with the management framework as a console. + """ + _CONTEXT_SYNC = 1 + _CONTEXT_STARTUP = 2 + _CONTEXT_MULTIGET = 3 + + DEFAULT_GET_WAIT_TIME = 60 + + ENCODINGS = { + str: 7, + timestamp: 8, + datetime: 8, + int: 9, + long: 9, + float: 13, + UUID: 14, + Object: 20, + list: 21 + } + + + def __init__(self, console=None, rcvObjects=True, rcvEvents=True, rcvHeartbeats=True, + manageConnections=False, userBindings=False): + """ + Initialize a session. If the console argument is provided, the + more advanced asynchronous features are available. If console is + defaulted, the session will operate in a simpler, synchronous manner. + + The rcvObjects, rcvEvents, and rcvHeartbeats arguments are meaningful only if 'console' + is provided. They control whether object updates, events, and agent-heartbeats are + subscribed to. If the console is not interested in receiving one or more of the above, + setting the argument to False will reduce tha bandwidth used by the API. + + If manageConnections is set to True, the Session object will manage connections to + the brokers. This means that if a broker is unreachable, it will retry until a connection + can be established. If a connection is lost, the Session will attempt to reconnect. + + If manageConnections is set to False, the user is responsible for handing failures. In + this case, an unreachable broker will cause addBroker to raise an exception. + + If userBindings is set to False (the default) and rcvObjects is True, the console will + receive data for all object classes. If userBindings is set to True, the user must select + which classes the console shall receive by invoking the bindPackage or bindClass methods. + This allows the console to be configured to receive only information that is relavant to + a particular application. If rcvObjects id False, userBindings has no meaning. + """ + self.console = console + self.brokers = [] + self.schemaCache = SchemaCache() + self.seqMgr = SequenceManager() + self.cv = Condition() + self.syncSequenceList = [] + self.getResult = [] + self.getSelect = [] + self.error = None + self.rcvObjects = rcvObjects + self.rcvEvents = rcvEvents + self.rcvHeartbeats = rcvHeartbeats + self.userBindings = userBindings + if self.console == None: + self.rcvObjects = False + self.rcvEvents = False + self.rcvHeartbeats = False + self.v1BindingKeyList, self.v2BindingKeyList = self._bindingKeys() + self.manageConnections = manageConnections + # callback filters: + self.agent_filter = [] # (vendor, product, instance) || v1-agent-label-str + self.class_filter = [] # (pkg, class) + self.event_filter = [] # (pkg, event) + self.agent_heartbeat_min = 10 # minimum agent heartbeat timeout interval + self.agent_heartbeat_miss = 3 # # of heartbeats to miss before deleting agent + + if self.userBindings and not self.console: + raise Exception("userBindings can't be set unless a console is provided.") + + def close(self): + """ Releases all resources held by the session. Must be called by the + application when it is done with the Session object. + """ + self.cv.acquire() + try: + while len(self.brokers): + b = self.brokers.pop() + try: + b._shutdown() + except: + pass + finally: + self.cv.release() + + def _getBrokerForAgentAddr(self, agent_addr): + try: + self.cv.acquire() + key = (1, agent_addr) + for b in self.brokers: + if key in b.agents: + return b + finally: + self.cv.release() + return None + + + def _getAgentForAgentAddr(self, agent_addr): + try: + self.cv.acquire() + key = agent_addr + for b in self.brokers: + if key in b.agents: + return b.agents[key] + finally: + self.cv.release() + return None + + + def __repr__(self): + return "QMF Console Session Manager (brokers: %d)" % len(self.brokers) + + + def addBroker(self, target="localhost", timeout=None, mechanisms=None): + """ Connect to a Qpid broker. Returns an object of type Broker. + Will raise an exception if the session is not managing the connection and + the connection setup to the broker fails. + """ + url = BrokerURL(target) + broker = Broker(self, url.host, url.port, mechanisms, url.authName, url.authPass, + ssl = url.scheme == URL.AMQPS, connTimeout=timeout) + + self.brokers.append(broker) + return broker + + + def delBroker(self, broker): + """ Disconnect from a broker, and deallocate the broker proxy object. The + 'broker' argument is the object returned from the addBroker call. Errors + are ignored. + """ + broker._shutdown() + self.brokers.remove(broker) + del broker + + + def getPackages(self): + """ Get the list of known QMF packages """ + for broker in self.brokers: + broker._waitForStable() + return self.schemaCache.getPackages() + + + def getClasses(self, packageName): + """ Get the list of known classes within a QMF package """ + for broker in self.brokers: + broker._waitForStable() + return self.schemaCache.getClasses(packageName) + + + def getSchema(self, classKey): + """ Get the schema for a QMF class """ + for broker in self.brokers: + broker._waitForStable() + return self.schemaCache.getSchema(classKey) + + + def bindPackage(self, packageName): + """ Filter object and event callbacks to only those elements of the + specified package. Also filters newPackage and newClass callbacks to the + given package. Only valid if userBindings is True. + """ + if not self.userBindings: + raise Exception("userBindings option must be set for this Session.") + if not self.rcvObjects and not self.rcvEvents: + raise Exception("Session needs to be configured to receive events or objects.") + v1keys = ["console.obj.*.*.%s.#" % packageName, "console.event.*.*.%s.#" % packageName] + v2keys = ["agent.ind.data.%s.#" % packageName.replace(".", "_"), + "agent.ind.event.%s.#" % packageName.replace(".", "_"),] + if (packageName, None) not in self.class_filter: + self.class_filter.append((packageName, None)) + if (packageName, None) not in self.event_filter: + self.event_filter.append((packageName, None)) + self.v1BindingKeyList.extend(v1keys) + self.v2BindingKeyList.extend(v2keys) + for broker in self.brokers: + if broker.isConnected(): + for v1key in v1keys: + broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName, binding_key=v1key) + if broker.brokerSupportsV2: + for v2key in v2keys: + # data indications should arrive on the unsolicited indication queue + broker.amqpSession.exchange_bind(exchange="qmf.default.topic", queue=broker.v2_topic_queue_ui, binding_key=v2key) + + + def bindClass(self, pname, cname=None): + """ Filter object callbacks to only those objects of the specified package + and optional class. Will also filter newPackage/newClass callbacks to the + specified package and class. Only valid if userBindings is True and + rcvObjects is True. + """ + if not self.userBindings: + raise Exception("userBindings option must be set for this Session.") + if not self.rcvObjects: + raise Exception("Session needs to be configured with rcvObjects=True.") + if cname is not None: + v1key = "console.obj.*.*.%s.%s.#" % (pname, cname) + v2key = "agent.ind.data.%s.%s.#" % (pname.replace(".", "_"), cname.replace(".", "_")) + else: + v1key = "console.obj.*.*.%s.#" % pname + v2key = "agent.ind.data.%s.#" % pname.replace(".", "_") + self.v1BindingKeyList.append(v1key) + self.v2BindingKeyList.append(v2key) + if (pname, cname) not in self.class_filter: + self.class_filter.append((pname, cname)) + for broker in self.brokers: + if broker.isConnected(): + broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName, binding_key=v1key) + if broker.brokerSupportsV2: + # data indications should arrive on the unsolicited indication queue + broker.amqpSession.exchange_bind(exchange="qmf.default.topic", queue=broker.v2_topic_queue_ui, binding_key=v2key) + + + def bindClassKey(self, classKey): + """ Filter object callbacks to only those objects of the specified + class. Will also filter newPackage/newClass callbacks to the specified + package and class. Only valid if userBindings is True and rcvObjects is + True. + """ + pname = classKey.getPackageName() + cname = classKey.getClassName() + self.bindClass(pname, cname) + + def bindEvent(self, pname, ename=None): + """ Filter event callbacks only from a particular class by package and + event name, or all events in a package if ename=None. Will also filter + newPackage/newClass callbacks to the specified package and class. Only + valid if userBindings is True and rcvEvents is True. + """ + if not self.userBindings: + raise Exception("userBindings option must be set for this Session.") + if not self.rcvEvents: + raise Exception("Session needs to be configured with rcvEvents=True.") + if ename is not None: + v1key = "console.event.*.*.%s.%s.#" % (pname, ename) + v2key = "agent.ind.event.%s.%s.#" % (pname.replace(".", "_"), ename.replace(".", "_")) + else: + v1key = "console.event.*.*.%s.#" % pname + v2key = "agent.ind.event.%s.#" % pname.replace(".", "_") + self.v1BindingKeyList.append(v1key) + self.v2BindingKeyList.append(v2key) + if (pname, ename) not in self.event_filter: + self.event_filter.append((pname, ename)) + for broker in self.brokers: + if broker.isConnected(): + broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName, binding_key=v1key) + if broker.brokerSupportsV2: + # event indications should arrive on the unsolicited indication queue + broker.amqpSession.exchange_bind(exchange="qmf.default.topic", queue=broker.v2_topic_queue_ui, binding_key=v2key) + + def bindEventKey(self, eventKey): + """ Filter event callbacks only from a particular class key. Will also + filter newPackage/newClass callbacks to the specified package and + class. Only valid if userBindings is True and rcvEvents is True. + """ + pname = eventKey.getPackageName() + ename = eventKey.getClassName() + self.bindEvent(pname, ename) + + def bindAgent(self, vendor=None, product=None, instance=None, label=None): + """ Receive heartbeats, newAgent and delAgent callbacks only for those + agent(s) that match the passed identification criteria: + V2 agents: vendor, optionally product and instance strings + V1 agents: the label string. + Only valid if userBindings is True. + """ + if not self.userBindings: + raise Exception("Session not configured for binding specific agents.") + if vendor is None and label is None: + raise Exception("Must specify at least a vendor (V2 agents)" + " or label (V1 agents).") + + if vendor: # V2 agent identification + if product is not None: + v2key = "agent.ind.heartbeat.%s.%s.#" % (vendor.replace(".", "_"), product.replace(".", "_")) + else: + v2key = "agent.ind.heartbeat.%s.#" % vendor.replace(".", "_") + self.v2BindingKeyList.append(v2key) + + # allow wildcards - only add filter if a non-wildcarded component is given + if vendor == "*": + vendor = None + if product == "*": + product = None + if instance == "*": + instance = None + if vendor or product or instance: + if (vendor, product, instance) not in self.agent_filter: + self.agent_filter.append((vendor, product, instance)) + + for broker in self.brokers: + if broker.isConnected(): + if broker.brokerSupportsV2: + # heartbeats should arrive on the heartbeat queue + broker.amqpSession.exchange_bind(exchange="qmf.default.topic", + queue=broker.v2_topic_queue_hb, + binding_key=v2key) + elif label != "*": # non-wildcard V1 agent label + # V1 format heartbeats do not have any agent identifier in the routing + # key, so we cannot filter them by bindings. + if label not in self.agent_filter: + self.agent_filter.append(label) + + + def getAgents(self, broker=None): + """ Get a list of currently known agents """ + brokerList = [] + if broker == None: + for b in self.brokers: + brokerList.append(b) + else: + brokerList.append(broker) + + for b in brokerList: + b._waitForStable() + agentList = [] + for b in brokerList: + for a in b.getAgents(): + agentList.append(a) + return agentList + + + def makeObject(self, classKey, **kwargs): + """ Create a new, unmanaged object of the schema indicated by classKey """ + schema = self.getSchema(classKey) + if schema == None: + raise Exception("Schema not found for classKey") + return Object(None, schema, None, True, True, kwargs) + + + def getObjects(self, **kwargs): + """ Get a list of objects from QMF agents. + All arguments are passed by name(keyword). + + The class for queried objects may be specified in one of the following ways: + + _schema = <schema> - supply a schema object returned from getSchema. + _key = <key> - supply a classKey from the list returned by getClasses. + _class = <name> - supply a class name as a string. If the class name exists + in multiple packages, a _package argument may also be supplied. + _objectId = <id> - get the object referenced by the object-id + + If objects should be obtained from only one agent, use the following argument. + Otherwise, the query will go to all agents. + + _agent = <agent> - supply an agent from the list returned by getAgents. + + If the get query is to be restricted to one broker (as opposed to all connected brokers), + add the following argument: + + _broker = <broker> - supply a broker as returned by addBroker. + + The default timeout for this synchronous operation is 60 seconds. To change the timeout, + use the following argument: + + _timeout = <time in seconds> + + If additional arguments are supplied, they are used as property selectors. For example, + if the argument name="test" is supplied, only objects whose "name" property is "test" + will be returned in the result. + """ + if "_broker" in kwargs: + brokerList = [] + brokerList.append(kwargs["_broker"]) + else: + brokerList = self.brokers + for broker in brokerList: + broker._waitForStable() + if broker.isConnected(): + if "_package" not in kwargs or "_class" not in kwargs or \ + kwargs["_package"] != "org.apache.qpid.broker" or \ + kwargs["_class"] != "agent": + self.getObjects(_package = "org.apache.qpid.broker", _class = "agent", + _agent = broker.getAgent(1,0)) + + agentList = [] + if "_agent" in kwargs: + agent = kwargs["_agent"] + if agent.broker not in brokerList: + raise Exception("Supplied agent is not accessible through the supplied broker") + if agent.broker.isConnected(): + agentList.append(agent) + else: + if "_objectId" in kwargs: + oid = kwargs["_objectId"] + for broker in brokerList: + for agent in broker.getAgents(): + if agent.getBrokerBank() == oid.getBrokerBank() and agent.getAgentBank() == oid.getAgentBank(): + agentList.append(agent) + else: + for broker in brokerList: + for agent in broker.getAgents(): + if agent.broker.isConnected(): + agentList.append(agent) + + if len(agentList) == 0: + return [] + + # + # We now have a list of agents to query, start the queries and gather the results. + # + request = SessionGetRequest(len(agentList)) + for agent in agentList: + agent.getObjects(request, **kwargs) + timeout = 60 + if '_timeout' in kwargs: + timeout = kwargs['_timeout'] + request.wait(timeout) + return request.result + + + def addEventFilter(self, **kwargs): + """Filter unsolicited events based on package and event name. + QMF v2 also can filter on vendor, product, and severity values. + + By default, a console receives unsolicted events by binding to: + + qpid.management/console.event.# (v1) + + qmf.default.topic/agent.ind.event.# (v2) + + A V1 event filter binding uses the pattern: + + qpid.management/console.event.*.*[.<package>[.<event>]].# + + A V2 event filter binding uses the pattern: + + qmf.default.topic/agent.ind.event.<Vendor|*>.<Product|*>.<severity|*>.<package|*>.<event|*>.# + """ + package = kwargs.get("package", "*") + event = kwargs.get("event", "*") + vendor = kwargs.get("vendor", "*") + product = kwargs.get("product", "*") + severity = kwargs.get("severity", "*") + + if package == "*" and event != "*": + raise Exception("'package' parameter required if 'event' parameter" + " supplied") + + # V1 key - can only filter on package (and event) + if package == "*": + key = "console.event.*.*." + str(package) + if event != "*": + key += "." + str(event) + key += ".#" + + if key not in self.v1BindingKeyList: + self.v1BindingKeyList.append(key) + try: + # remove default wildcard binding + self.v1BindingKeyList.remove("console.event.#") + except: + pass + + # V2 key - escape any "." in the filter strings + + key = "agent.ind.event." + str(package).replace(".", "_") \ + + "." + str(event).replace(".", "_") \ + + "." + str(severity).replace(".", "_") \ + + "." + str(vendor).replace(".", "_") \ + + "." + str(product).replace(".", "_") \ + + ".#" + + if key not in self.v2BindingKeyList: + self.v2BindingKeyList.append(key) + try: + # remove default wildcard binding + self.v2BindingKeyList.remove("agent.ind.event.#") + except: + pass + + if package != "*": + if event != "*": + f = (package, event) + else: + f = (package, None) + if f not in self.event_filter: + self.event_filter.append(f) + + + def addAgentFilter(self, vendor, product=None): + """ Deprecate - use bindAgent() instead + """ + self.addHeartbeatFilter(vendor=vendor, product=product) + + def addHeartbeatFilter(self, **kwargs): + """ Deprecate - use bindAgent() instead. + """ + vendor = kwargs.get("vendor") + product = kwargs.get("product") + if vendor is None: + raise Exception("vendor parameter required!") + + # V1 heartbeats do not have any agent identifier - we cannot + # filter them by agent. + + # build the binding key - escape "."s... + key = "agent.ind.heartbeat." + str(vendor).replace(".", "_") + if product is not None: + key += "." + str(product).replace(".", "_") + key += ".#" + + if key not in self.v2BindingKeyList: + self.v2BindingKeyList.append(key) + self.agent_filter.append((vendor, product, None)) + + # be sure we don't ever filter the local broker + local_broker_key = "agent.ind.heartbeat." + "org.apache".replace(".", "_") \ + + "." + "qpidd".replace(".", "_") + ".#" + if local_broker_key not in self.v2BindingKeyList: + self.v2BindingKeyList.append(local_broker_key) + + # remove the wildcard key if present + try: + self.v2BindingKeyList.remove("agent.ind.heartbeat.#") + except: + pass + + def _bindingKeys(self): + v1KeyList = [] + v2KeyList = [] + v1KeyList.append("schema.#") + # note well: any binding that starts with 'agent.ind.heartbeat' will be + # bound to the heartbeat queue, otherwise it will be bound to the + # unsolicited indication queue. See _decOutstanding() for the binding. + if not self.userBindings: + if self.rcvObjects and self.rcvEvents and self.rcvHeartbeats: + v1KeyList.append("console.#") + v2KeyList.append("agent.ind.data.#") + v2KeyList.append("agent.ind.event.#") + v2KeyList.append("agent.ind.heartbeat.#") + else: + # need heartbeats for V2 newAgent()/delAgent() + v2KeyList.append("agent.ind.heartbeat.#") + if self.rcvObjects: + v1KeyList.append("console.obj.#") + v2KeyList.append("agent.ind.data.#") + else: + v1KeyList.append("console.obj.*.*.org.apache.qpid.broker.agent") + if self.rcvEvents: + v1KeyList.append("console.event.#") + v2KeyList.append("agent.ind.event.#") + else: + v1KeyList.append("console.event.*.*.org.apache.qpid.broker.agent") + if self.rcvHeartbeats: + v1KeyList.append("console.heartbeat.#") + else: + # mandatory bindings + v1KeyList.append("console.obj.*.*.org.apache.qpid.broker.agent") + v1KeyList.append("console.event.*.*.org.apache.qpid.broker.agent") + v1KeyList.append("console.heartbeat.#") # no way to turn this on later + v2KeyList.append("agent.ind.heartbeat.org_apache.qpidd.#") + + return (v1KeyList, v2KeyList) + + + def _handleBrokerConnect(self, broker): + if self.console: + for agent in broker.getAgents(): + self._newAgentCallback(agent) + self.console.brokerConnected(broker) + + + def _handleBrokerDisconnect(self, broker): + if self.console: + for agent in broker.getAgents(): + self._delAgentCallback(agent) + self.console.brokerDisconnected(broker) + + + def _handleBrokerResp(self, broker, codec, seq): + broker.brokerId = codec.read_uuid() + if self.console != None: + self.console.brokerInfo(broker) + + # Send a package request + # (effectively inc and dec outstanding by not doing anything) + sendCodec = Codec() + seq = self.seqMgr._reserve(self._CONTEXT_STARTUP) + broker._setHeader(sendCodec, 'P', seq) + smsg = broker._message(sendCodec.encoded) + broker._send(smsg) + + + def _handlePackageInd(self, broker, codec, seq): + pname = str(codec.read_str8()) + notify = self.schemaCache.declarePackage(pname) + if notify and self.console != None: + self._newPackageCallback(pname) + + # Send a class request + broker._incOutstanding() + sendCodec = Codec() + seq = self.seqMgr._reserve(self._CONTEXT_STARTUP) + broker._setHeader(sendCodec, 'Q', seq) + sendCodec.write_str8(pname) + smsg = broker._message(sendCodec.encoded) + broker._send(smsg) + + + def _handleCommandComplete(self, broker, codec, seq, agent): + code = codec.read_uint32() + text = codec.read_str8() + context = self.seqMgr._release(seq) + if context == self._CONTEXT_STARTUP: + broker._decOutstanding() + elif context == self._CONTEXT_SYNC and seq == broker.syncSequence: + try: + broker.cv.acquire() + broker.syncInFlight = False + broker.cv.notify() + finally: + broker.cv.release() + elif context == self._CONTEXT_MULTIGET and seq in self.syncSequenceList: + try: + self.cv.acquire() + self.syncSequenceList.remove(seq) + if len(self.syncSequenceList) == 0: + self.cv.notify() + finally: + self.cv.release() + + if agent: + agent._handleV1Completion(seq, code, text) + + + def _handleClassInd(self, broker, codec, seq): + kind = codec.read_uint8() + classKey = ClassKey(codec) + classKey._setType(kind) + schema = self.schemaCache.getSchema(classKey) + + if not schema: + # Send a schema request for the unknown class + broker._incOutstanding() + sendCodec = Codec() + seq = self.seqMgr._reserve(self._CONTEXT_STARTUP) + broker._setHeader(sendCodec, 'S', seq) + classKey.encode(sendCodec) + smsg = broker._message(sendCodec.encoded) + broker._send(smsg) + + + def _handleHeartbeatInd(self, broker, codec, seq, msg): + brokerBank = 1 + agentBank = 0 + dp = msg.get("delivery_properties") + if dp: + key = dp["routing_key"] + if key: + keyElements = key.split(".") + if len(keyElements) == 4: + brokerBank = int(keyElements[2]) + agentBank = int(keyElements[3]) + else: + # If there's no routing key in the delivery properties, + # assume the message is from the broker. + brokerBank = 1 + agentBank = 0 + + agent = broker.getAgent(brokerBank, agentBank) + if self.rcvHeartbeats and self.console != None and agent != None: + timestamp = codec.read_uint64() + self._heartbeatCallback(agent, timestamp) + + + def _handleSchemaResp(self, broker, codec, seq, agent_addr): + kind = codec.read_uint8() + classKey = ClassKey(codec) + classKey._setType(kind) + _class = SchemaClass(kind, classKey, codec, self) + new_pkg, new_cls = self.schemaCache.declareClass(classKey, _class) + ctx = self.seqMgr._release(seq) + if ctx: + broker._decOutstanding() + if self.console != None: + if new_pkg: + self._newPackageCallback(classKey.getPackageName()) + if new_cls: + self._newClassCallback(kind, classKey) + + if agent_addr and (agent_addr.__class__ == str or agent_addr.__class__ == unicode): + agent = self._getAgentForAgentAddr(agent_addr) + if agent: + agent._schemaInfoFromV2Agent() + + + def _v2HandleHeartbeatInd(self, broker, mp, ah, content): + try: + agentName = ah["qmf.agent"] + values = content["_values"] + + if '_timestamp' in values: + timestamp = values["_timestamp"] + else: + timestamp = values['timestamp'] + + if '_heartbeat_interval' in values: + interval = values['_heartbeat_interval'] + else: + interval = values['heartbeat_interval'] + + epoch = 0 + if '_epoch' in values: + epoch = values['_epoch'] + elif 'epoch' in values: + epoch = values['epoch'] + except Exception,e: + return + + ## + ## For now, ignore heartbeats from messaging brokers. We already have the "local-broker" + ## agent in our list. + ## + if '_vendor' in values and values['_vendor'] == 'apache.org' and \ + '_product' in values and values['_product'] == 'qpidd': + return + + if self.agent_filter: + # only allow V2 agents that satisfy the filter + v = agentName.split(":", 2) + if len(v) != 3 or ((v[0], None, None) not in self.agent_filter + and (v[0], v[1], None) not in self.agent_filter + and (v[0], v[1], v[2]) not in self.agent_filter): + return + + agent = broker.getAgent(1, agentName) + if agent == None: + agent = Agent(broker, agentName, "QMFv2 Agent", True, interval) + agent.setEpoch(epoch) + broker._addAgent(agentName, agent) + else: + agent.touch() + if self.rcvHeartbeats and self.console and agent: + self._heartbeatCallback(agent, timestamp) + agent.update_schema_timestamp(values.get("_schema_updated", 0)) + + + def _v2HandleAgentLocateRsp(self, broker, mp, ah, content): + self._v2HandleHeartbeatInd(broker, mp, ah, content) + + + def _handleError(self, error): + try: + self.cv.acquire() + if len(self.syncSequenceList) > 0: + self.error = error + self.syncSequenceList = [] + self.cv.notify() + finally: + self.cv.release() + + + def _selectMatch(self, object): + """ Check the object against self.getSelect to check for a match """ + for key, value in self.getSelect: + for prop, propval in object.getProperties(): + if key == prop.name and value != propval: + return False + return True + + + def _decodeValue(self, codec, typecode, broker=None): + """ Decode, from the codec, a value based on its typecode. """ + if typecode == 1: data = codec.read_uint8() # U8 + elif typecode == 2: data = codec.read_uint16() # U16 + elif typecode == 3: data = codec.read_uint32() # U32 + elif typecode == 4: data = codec.read_uint64() # U64 + elif typecode == 6: data = codec.read_str8() # SSTR + elif typecode == 7: data = codec.read_str16() # LSTR + elif typecode == 8: data = codec.read_int64() # ABSTIME + elif typecode == 9: data = codec.read_uint64() # DELTATIME + elif typecode == 10: data = ObjectId(codec) # REF + elif typecode == 11: data = codec.read_uint8() != 0 # BOOL + elif typecode == 12: data = codec.read_float() # FLOAT + elif typecode == 13: data = codec.read_double() # DOUBLE + elif typecode == 14: data = codec.read_uuid() # UUID + elif typecode == 16: data = codec.read_int8() # S8 + elif typecode == 17: data = codec.read_int16() # S16 + elif typecode == 18: data = codec.read_int32() # S32 + elif typecode == 19: data = codec.read_int64() # S63 + elif typecode == 15: data = codec.read_map() # FTABLE + elif typecode == 20: # OBJECT + # Peek at the type, and if it is still 20 pull it decode. If + # Not, call back into self. + inner_type_code = codec.read_uint8() + if inner_type_code == 20: + classKey = ClassKey(codec) + schema = self.schemaCache.getSchema(classKey) + if not schema: + return None + data = Object(self, broker, schema, codec, True, True, False) + else: + data = self._decodeValue(codec, inner_type_code, broker) + elif typecode == 21: data = codec.read_list() # List + elif typecode == 22: #Array + #taken from codec10.read_array + sc = Codec(codec.read_vbin32()) + count = sc.read_uint32() + type = sc.read_uint8() + data = [] + while count > 0: + data.append(self._decodeValue(sc,type,broker)) + count -= 1 + else: + raise ValueError("Invalid type code: %d" % typecode) + return data + + + def _encodeValue(self, codec, value, typecode): + """ Encode, into the codec, a value based on its typecode. """ + if typecode == 1: codec.write_uint8 (int(value)) # U8 + elif typecode == 2: codec.write_uint16 (int(value)) # U16 + elif typecode == 3: codec.write_uint32 (long(value)) # U32 + elif typecode == 4: codec.write_uint64 (long(value)) # U64 + elif typecode == 6: codec.write_str8 (value) # SSTR + elif typecode == 7: codec.write_str16 (value) # LSTR + elif typecode == 8: codec.write_int64 (long(value)) # ABSTIME + elif typecode == 9: codec.write_uint64 (long(value)) # DELTATIME + elif typecode == 10: value.encode (codec) # REF + elif typecode == 11: codec.write_uint8 (int(value)) # BOOL + elif typecode == 12: codec.write_float (float(value)) # FLOAT + elif typecode == 13: codec.write_double (float(value)) # DOUBLE + elif typecode == 14: codec.write_uuid (value.bytes) # UUID + elif typecode == 16: codec.write_int8 (int(value)) # S8 + elif typecode == 17: codec.write_int16 (int(value)) # S16 + elif typecode == 18: codec.write_int32 (int(value)) # S32 + elif typecode == 19: codec.write_int64 (int(value)) # S64 + elif typecode == 20: value._encodeUnmanaged(codec) # OBJECT + elif typecode == 15: codec.write_map (value) # FTABLE + elif typecode == 21: codec.write_list (value) # List + elif typecode == 22: # Array + sc = Codec() + self._encodeValue(sc, len(value), 3) + if len(value) > 0: + ltype = self.encoding(value[0]) + self._encodeValue(sc,ltype,1) + for o in value: + self._encodeValue(sc, o, ltype) + codec.write_vbin32(sc.encoded) + else: + raise ValueError ("Invalid type code: %d" % typecode) + + + def encoding(self, value): + return self._encoding(value.__class__) + + + def _encoding(self, klass): + if Session.ENCODINGS.has_key(klass): + return self.ENCODINGS[klass] + for base in klass.__bases__: + result = self._encoding(base) + if result != None: + return result + + + def _displayValue(self, value, typecode): + """ """ + if typecode == 1: return unicode(value) + elif typecode == 2: return unicode(value) + elif typecode == 3: return unicode(value) + elif typecode == 4: return unicode(value) + elif typecode == 6: return value + elif typecode == 7: return value + elif typecode == 8: return unicode(strftime("%c", gmtime(value / 1000000000))) + elif typecode == 9: return unicode(value) + elif typecode == 10: return unicode(value.__repr__()) + elif typecode == 11: + if value: return u"T" + else: return u"F" + elif typecode == 12: return unicode(value) + elif typecode == 13: return unicode(value) + elif typecode == 14: return unicode(value.__repr__()) + elif typecode == 15: return unicode(value.__repr__()) + elif typecode == 16: return unicode(value) + elif typecode == 17: return unicode(value) + elif typecode == 18: return unicode(value) + elif typecode == 19: return unicode(value) + elif typecode == 20: return unicode(value.__repr__()) + elif typecode == 21: return unicode(value.__repr__()) + elif typecode == 22: return unicode(value.__repr__()) + else: + raise ValueError ("Invalid type code: %d" % typecode) + + + def _defaultValue(self, stype, broker=None, kwargs={}): + """ """ + typecode = stype.type + if typecode == 1: return 0 + elif typecode == 2: return 0 + elif typecode == 3: return 0 + elif typecode == 4: return 0 + elif typecode == 6: return "" + elif typecode == 7: return "" + elif typecode == 8: return 0 + elif typecode == 9: return 0 + elif typecode == 10: return ObjectId(None) + elif typecode == 11: return False + elif typecode == 12: return 0.0 + elif typecode == 13: return 0.0 + elif typecode == 14: return UUID(bytes=[0 for i in range(16)]) + elif typecode == 15: return {} + elif typecode == 16: return 0 + elif typecode == 17: return 0 + elif typecode == 18: return 0 + elif typecode == 19: return 0 + elif typecode == 21: return [] + elif typecode == 22: return [] + elif typecode == 20: + try: + if "classKeys" in kwargs: + keyList = kwargs["classKeys"] + else: + keyList = None + classKey = self._bestClassKey(stype.refPackage, stype.refClass, keyList) + if classKey: + return self.makeObject(classKey, broker, kwargs) + except: + pass + return None + else: + raise ValueError ("Invalid type code: %d" % typecode) + + + def _bestClassKey(self, pname, cname, preferredList): + """ """ + if pname == None or cname == None: + if len(preferredList) == 0: + return None + return preferredList[0] + for p in preferredList: + if p.getPackageName() == pname and p.getClassName() == cname: + return p + clist = self.getClasses(pname) + for c in clist: + if c.getClassName() == cname: + return c + return None + + + def _sendMethodRequest(self, broker, schemaKey, objectId, name, argList): + """ This function can be used to send a method request to an object given only the + broker, schemaKey, and objectId. This is an uncommon usage pattern as methods are + normally invoked on the object itself. + """ + schema = self.getSchema(schemaKey) + if not schema: + raise Exception("Schema not present (Key=%s)" % str(schemaKey)) + for method in schema.getMethods(): + if name == method.name: + # + # Count the arguments supplied and validate that the number is what is expected + # based on the schema. + # + count = 0 + for arg in method.arguments: + if arg.dir.find("I") != -1: + count += 1 + if count != len(argList): + raise Exception("Incorrect number of arguments: expected %d, got %d" % (count, len(argList))) + + aIdx = 0 + sendCodec = Codec() + seq = self.seqMgr._reserve((method, False)) + + if objectId.isV2: + # + # Compose and send a QMFv2 method request + # + call = {} + call['_object_id'] = objectId.asMap() + call['_method_name'] = name + args = {} + for arg in method.arguments: + if arg.dir.find("I") != -1: + args[arg.name] = argList[aIdx] + aIdx += 1 + call['_arguments'] = args + + dp = broker.amqpSession.delivery_properties() + dp.routing_key = objectId.getV2RoutingKey() + mp = broker.amqpSession.message_properties() + mp.content_type = "amqp/map" + if broker.saslUser: + mp.user_id = broker.saslUser + mp.correlation_id = str(seq) + mp.app_id = "qmf2" + mp.reply_to = broker.amqpSession.reply_to("qmf.default.direct", broker.v2_direct_queue) + mp.application_headers = {'qmf.opcode':'_method_request'} + sendCodec.write_map(call) + msg = Message(dp, mp, sendCodec.encoded) + broker._send(msg, "qmf.default.direct") + + else: + # + # Associate this sequence with the agent hosting the object so we can correctly + # route the method-response + # + agent = broker.getAgent(broker.getBrokerBank(), objectId.getAgentBank()) + broker._setSequence(seq, agent) + + # + # Compose and send a QMFv1 method request + # + broker._setHeader(sendCodec, 'M', seq) + objectId.encode(sendCodec) + schemaKey.encode(sendCodec) + sendCodec.write_str8(name) + + for arg in method.arguments: + if arg.dir.find("I") != -1: + self._encodeValue(sendCodec, argList[aIdx], arg.type) + aIdx += 1 + smsg = broker._message(sendCodec.encoded, "agent.%d.%s" % + (objectId.getBrokerBank(), objectId.getAgentBank())) + broker._send(smsg) + return seq + return None + + def _newPackageCallback(self, pname): + """ + Invokes the console.newPackage() callback if the callback is present and + the package is not filtered. + """ + if self.console: + if len(self.class_filter) == 0 and len(self.event_filter) == 0: + self.console.newPackage(pname) + else: + for x in self.class_filter: + if x[0] == pname: + self.console.newPackage(pname) + return + + for x in self.event_filter: + if x[0] == pname: + self.console.newPackage(pname) + return + + + def _newClassCallback(self, ctype, ckey): + """ + Invokes the console.newClass() callback if the callback is present and the + class is not filtered. + """ + if self.console: + if ctype == ClassKey.TYPE_DATA: + if (len(self.class_filter) == 0 + or (ckey.getPackageName(), ckey.getClassName()) in self.class_filter): + self.console.newClass(ctype, ckey) + elif ctype == ClassKey.TYPE_EVENT: + if (len(self.event_filter) == 0 + or (ckey.getPackageName(), ckey.getClassName()) in self.event_filter): + self.console.newClass(ctype, ckey) + else: # old class keys did not contain type info, check both filters + if ((len(self.class_filter) == 0 and len(self.event_filter) == 0) + or (ckey.getPackageName(), ckey.getClassName()) in self.class_filter + or (ckey.getPackageName(), ckey.getClassName()) in self.event_filter): + self.console.newClass(ctype, ckey) + + def _agentAllowed(self, agentName, isV2): + """ True if the agent is NOT filtered. + """ + if self.agent_filter: + if isV2: + v = agentName.split(":", 2) + return ((len(v) > 2 and (v[0], v[1], v[2]) in self.agent_filter) + or (len(v) > 1 and (v[0], v[1], None) in self.agent_filter) + or (v and (v[0], None, None) in self.agent_filter)); + else: + return agentName in self.agent_filter + return True + + def _heartbeatCallback(self, agent, timestamp): + """ + Invokes the console.heartbeat() callback if the callback is present and the + agent is not filtered. + """ + if self.console and self.rcvHeartbeats: + if ((agent.isV2 and self._agentAllowed(agent.agentBank, True)) + or ((not agent.isV2) and self._agentAllowed(agent.label, False))): + self.console.heartbeat(agent, timestamp) + + def _newAgentCallback(self, agent): + """ + Invokes the console.newAgent() callback if the callback is present and the + agent is not filtered. + """ + if self.console: + if ((agent.isV2 and self._agentAllowed(agent.agentBank, True)) + or ((not agent.isV2) and self._agentAllowed(agent.label, False))): + self.console.newAgent(agent) + + def _delAgentCallback(self, agent): + """ + Invokes the console.delAgent() callback if the callback is present and the + agent is not filtered. + """ + if self.console: + if ((agent.isV2 and self._agentAllowed(agent.agentBank, True)) + or ((not agent.isV2) and self._agentAllowed(agent.label, False))): + self.console.delAgent(agent) + +#=================================================================================================== +# SessionGetRequest +#=================================================================================================== +class SessionGetRequest(object): + """ + This class is used to track get-object queries at the Session level. + """ + def __init__(self, agentCount): + self.agentCount = agentCount + self.result = [] + self.cv = Condition() + self.waiting = True + + def __call__(self, **kwargs): + """ + Callable entry point for gathering collected objects. + """ + try: + self.cv.acquire() + if 'qmf_object' in kwargs: + self.result.append(kwargs['qmf_object']) + elif 'qmf_complete' in kwargs or 'qmf_exception' in kwargs: + self.agentCount -= 1 + if self.agentCount == 0: + self.waiting = None + self.cv.notify() + finally: + self.cv.release() + + def wait(self, timeout): + starttime = time() + try: + self.cv.acquire() + while self.waiting: + if (time() - starttime) > timeout: + raise Exception("Timed out after %d seconds" % timeout) + self.cv.wait(1) + finally: + self.cv.release() + + +#=================================================================================================== +# SchemaCache +#=================================================================================================== +class SchemaCache(object): + """ + The SchemaCache is a data structure that stores learned schema information. + """ + def __init__(self): + """ + Create a map of schema packages and a lock to protect this data structure. + Note that this lock is at the bottom of any lock hierarchy. If it is held, no other + lock in the system should attempt to be acquired. + """ + self.packages = {} + self.lock = Lock() + + def getPackages(self): + """ Get the list of known QMF packages """ + list = [] + try: + self.lock.acquire() + for package in self.packages: + list.append(package) + finally: + self.lock.release() + return list + + def getClasses(self, packageName): + """ Get the list of known classes within a QMF package """ + list = [] + try: + self.lock.acquire() + if packageName in self.packages: + for pkey in self.packages[packageName]: + if isinstance(self.packages[packageName][pkey], SchemaClass): + list.append(self.packages[packageName][pkey].getKey()) + elif self.packages[packageName][pkey] is not None: + # schema not present yet, but we have schema type + list.append(ClassKey({"_package_name": packageName, + "_class_name": pkey[0], + "_hash": pkey[1], + "_type": self.packages[packageName][pkey]})) + finally: + self.lock.release() + return list + + def getSchema(self, classKey): + """ Get the schema for a QMF class, return None if schema not available """ + pname = classKey.getPackageName() + pkey = classKey.getPackageKey() + try: + self.lock.acquire() + if pname in self.packages: + if (pkey in self.packages[pname] and + isinstance(self.packages[pname][pkey], SchemaClass)): + # hack: value may be schema type info if schema not available + return self.packages[pname][pkey] + finally: + self.lock.release() + return None + + def declarePackage(self, pname): + """ Maybe add a package to the cache. Return True if package was added, None if it pre-existed. """ + try: + self.lock.acquire() + if pname in self.packages: + return None + self.packages[pname] = {} + finally: + self.lock.release() + return True + + def declareClass(self, classKey, classDef=None): + """ Add a class definition to the cache, if supplied. Return a pair + indicating if the package or class is new. + """ + new_package = False + new_class = False + pname = classKey.getPackageName() + pkey = classKey.getPackageKey() + try: + self.lock.acquire() + if pname not in self.packages: + self.packages[pname] = {} + new_package = True + packageMap = self.packages[pname] + if pkey not in packageMap or not isinstance(packageMap[pkey], SchemaClass): + if classDef is not None: + new_class = True + packageMap[pkey] = classDef + elif classKey.getType() is not None: + # hack: don't indicate "new_class" to caller unless the classKey type + # information is present. "new_class" causes the console.newClass() + # callback to be invoked, which -requires- a valid classKey type! + new_class = True + # store the type for the getClasses() method: + packageMap[pkey] = classKey.getType() + + finally: + self.lock.release() + return (new_package, new_class) + + +#=================================================================================================== +# ClassKey +#=================================================================================================== +class ClassKey: + """ A ClassKey uniquely identifies a class from the schema. """ + + TYPE_DATA = "_data" + TYPE_EVENT = "_event" + + def __init__(self, constructor): + if constructor.__class__ == str: + # construct from __repr__ string + try: + # supports two formats: + # type present = P:C:T(H) + # no type present = P:C(H) + tmp = constructor.split(":") + if len(tmp) == 3: + self.pname, self.cname, rem = tmp + self.type, hsh = rem.split("(") + else: + self.pname, rem = tmp + self.cname, hsh = rem.split("(") + self.type = None + hsh = hsh.strip(")") + hexValues = hsh.split("-") + h0 = int(hexValues[0], 16) + h1 = int(hexValues[1], 16) + h2 = int(hexValues[2], 16) + h3 = int(hexValues[3], 16) + h4 = int(hexValues[4][0:4], 16) + h5 = int(hexValues[4][4:12], 16) + self.hash = UUID(bytes=struct.pack("!LHHHHL", h0, h1, h2, h3, h4, h5)) + except: + raise Exception("Invalid ClassKey format") + elif constructor.__class__ == dict: + # construct from QMFv2 map + try: + self.pname = constructor['_package_name'] + self.cname = constructor['_class_name'] + self.hash = constructor['_hash'] + self.type = constructor.get('_type') + except: + raise Exception("Invalid ClassKey map format %s" % str(constructor)) + else: + # construct from codec + codec = constructor + self.pname = str(codec.read_str8()) + self.cname = str(codec.read_str8()) + self.hash = UUID(bytes=codec.read_bin128()) + # old V1 codec did not include "type" + self.type = None + + def encode(self, codec): + # old V1 codec did not include "type" + codec.write_str8(self.pname) + codec.write_str8(self.cname) + codec.write_bin128(self.hash.bytes) + + def asMap(self): + m = {'_package_name': self.pname, + '_class_name': self.cname, + '_hash': self.hash} + if self.type is not None: + m['_type'] = self.type + return m + + def getPackageName(self): + return self.pname + + def getClassName(self): + return self.cname + + def getHash(self): + return self.hash + + def getType(self): + return self.type + + def getHashString(self): + return str(self.hash) + + def getPackageKey(self): + return (self.cname, self.hash) + + def __repr__(self): + if self.type is None: + return self.pname + ":" + self.cname + "(" + self.getHashString() + ")" + return self.pname + ":" + self.cname + ":" + self.type + "(" + self.getHashString() + ")" + + def _setType(self, _type): + if _type == 2 or _type == ClassKey.TYPE_EVENT: + self.type = ClassKey.TYPE_EVENT + else: + self.type = ClassKey.TYPE_DATA + + def __hash__(self): + ss = self.pname + self.cname + self.getHashString() + return ss.__hash__() + + def __eq__(self, other): + return self.__repr__() == other.__repr__() + +#=================================================================================================== +# SchemaClass +#=================================================================================================== +class SchemaClass: + """ """ + CLASS_KIND_TABLE = 1 + CLASS_KIND_EVENT = 2 + + def __init__(self, kind, key, codec, session): + self.kind = kind + self.classKey = key + self.properties = [] + self.statistics = [] + self.methods = [] + self.arguments = [] + self.session = session + + hasSupertype = 0 #codec.read_uint8() + if self.kind == self.CLASS_KIND_TABLE: + propCount = codec.read_uint16() + statCount = codec.read_uint16() + methodCount = codec.read_uint16() + if hasSupertype == 1: + self.superTypeKey = ClassKey(codec) + else: + self.superTypeKey = None ; + for idx in range(propCount): + self.properties.append(SchemaProperty(codec)) + for idx in range(statCount): + self.statistics.append(SchemaStatistic(codec)) + for idx in range(methodCount): + self.methods.append(SchemaMethod(codec)) + + elif self.kind == self.CLASS_KIND_EVENT: + argCount = codec.read_uint16() + if (hasSupertype): + self.superTypeKey = ClassKey(codec) + else: + self.superTypeKey = None ; + for idx in range(argCount): + self.arguments.append(SchemaArgument(codec, methodArg=False)) + + def __repr__(self): + if self.kind == self.CLASS_KIND_TABLE: + kindStr = "Table" + elif self.kind == self.CLASS_KIND_EVENT: + kindStr = "Event" + else: + kindStr = "Unsupported" + result = "%s Class: %s " % (kindStr, self.classKey.__repr__()) + return result + + def getKey(self): + """ Return the class-key for this class. """ + return self.classKey + + def getProperties(self): + """ Return the list of properties for the class. """ + if (self.superTypeKey == None): + return self.properties + else: + return self.properties + self.session.getSchema(self.superTypeKey).getProperties() + + def getStatistics(self): + """ Return the list of statistics for the class. """ + if (self.superTypeKey == None): + return self.statistics + else: + return self.statistics + self.session.getSchema(self.superTypeKey).getStatistics() + + def getMethods(self): + """ Return the list of methods for the class. """ + if (self.superTypeKey == None): + return self.methods + else: + return self.methods + self.session.getSchema(self.superTypeKey).getMethods() + + def getArguments(self): + """ Return the list of events for the class. """ + """ Return the list of methods for the class. """ + if (self.superTypeKey == None): + return self.arguments + else: + return self.arguments + self.session.getSchema(self.superTypeKey).getArguments() + + +#=================================================================================================== +# SchemaProperty +#=================================================================================================== +class SchemaProperty: + """ """ + def __init__(self, codec): + map = codec.read_map() + self.name = str(map["name"]) + self.type = map["type"] + self.access = str(map["access"]) + self.index = map["index"] != 0 + self.optional = map["optional"] != 0 + self.refPackage = None + self.refClass = None + self.unit = None + self.min = None + self.max = None + self.maxlen = None + self.desc = None + + for key, value in map.items(): + if key == "unit" : self.unit = value + elif key == "min" : self.min = value + elif key == "max" : self.max = value + elif key == "maxlen" : self.maxlen = value + elif key == "desc" : self.desc = value + elif key == "refPackage" : self.refPackage = value + elif key == "refClass" : self.refClass = value + + def __repr__(self): + return self.name + + +#=================================================================================================== +# SchemaStatistic +#=================================================================================================== +class SchemaStatistic: + """ """ + def __init__(self, codec): + map = codec.read_map() + self.name = str(map["name"]) + self.type = map["type"] + self.unit = None + self.desc = None + + for key, value in map.items(): + if key == "unit" : self.unit = value + elif key == "desc" : self.desc = value + + def __repr__(self): + return self.name + + +#=================================================================================================== +# SchemaMethod +#=================================================================================================== +class SchemaMethod: + """ """ + def __init__(self, codec): + map = codec.read_map() + self.name = str(map["name"]) + argCount = map["argCount"] + if "desc" in map: + self.desc = map["desc"] + else: + self.desc = None + self.arguments = [] + + for idx in range(argCount): + self.arguments.append(SchemaArgument(codec, methodArg=True)) + + def __repr__(self): + result = self.name + "(" + first = True + for arg in self.arguments: + if arg.dir.find("I") != -1: + if first: + first = False + else: + result += ", " + result += arg.name + result += ")" + return result + + +#=================================================================================================== +# SchemaArgument +#=================================================================================================== +class SchemaArgument: + """ """ + def __init__(self, codec, methodArg): + map = codec.read_map() + self.name = str(map["name"]) + self.type = map["type"] + if methodArg: + self.dir = str(map["dir"]).upper() + self.unit = None + self.min = None + self.max = None + self.maxlen = None + self.desc = None + self.default = None + self.refPackage = None + self.refClass = None + + for key, value in map.items(): + if key == "unit" : self.unit = value + elif key == "min" : self.min = value + elif key == "max" : self.max = value + elif key == "maxlen" : self.maxlen = value + elif key == "desc" : self.desc = value + elif key == "default" : self.default = value + elif key == "refPackage" : self.refPackage = value + elif key == "refClass" : self.refClass = value + + +#=================================================================================================== +# ObjectId +#=================================================================================================== +class ObjectId: + """ Object that represents QMF object identifiers """ + def __init__(self, constructor, first=0, second=0, agentName=None): + if constructor.__class__ == dict: + self.isV2 = True + self.agentName = agentName + self.agentEpoch = 0 + if '_agent_name' in constructor: self.agentName = constructor['_agent_name'] + if '_agent_epoch' in constructor: self.agentEpoch = constructor['_agent_epoch'] + if '_object_name' not in constructor: + raise Exception("QMFv2 OBJECT_ID must have the '_object_name' field.") + self.objectName = constructor['_object_name'] + else: + self.isV2 = None + if not constructor: + first = first + second = second + else: + first = constructor.read_uint64() + second = constructor.read_uint64() + self.agentName = str(first & 0x000000000FFFFFFF) + self.agentEpoch = (first & 0x0FFF000000000000) >> 48 + self.objectName = str(second) + + def _create(cls, agent_name, object_name, epoch=0): + oid = {"_agent_name": agent_name, + "_object_name": object_name, + "_agent_epoch": epoch} + return cls(oid) + create = classmethod(_create) + + def __cmp__(self, other): + if other == None or not isinstance(other, ObjectId) : + return 1 + + if self.objectName < other.objectName: + return -1 + if self.objectName > other.objectName: + return 1 + + if self.agentName < other.agentName: + return -1 + if self.agentName > other.agentName: + return 1 + + if self.agentEpoch < other.agentEpoch: + return -1 + if self.agentEpoch > other.agentEpoch: + return 1 + return 0 + + def __repr__(self): + return "%d-%d-%d-%s-%s" % (self.getFlags(), self.getSequence(), + self.getBrokerBank(), self.getAgentBank(), self.getObject()) + + def index(self): + return self.__repr__() + + def getFlags(self): + return 0 + + def getSequence(self): + return self.agentEpoch + + def getBrokerBank(self): + return 1 + + def getAgentBank(self): + return self.agentName + + def getV2RoutingKey(self): + if self.agentName == '0': + return "broker" + return self.agentName + + def getObject(self): + return self.objectName + + def isDurable(self): + return self.getSequence() == 0 + + def encode(self, codec): + first = (self.agentEpoch << 48) + (1 << 28) + second = 0 + + try: + first += int(self.agentName) + except: + pass + + try: + second = int(self.objectName) + except: + pass + + codec.write_uint64(first) + codec.write_uint64(second) + + def asMap(self): + omap = {'_agent_name': self.agentName, '_object_name': self.objectName} + if self.agentEpoch != 0: + omap['_agent_epoch'] = self.agentEpoch + return omap + + def __hash__(self): + return self.__repr__().__hash__() + + def __eq__(self, other): + return self.__repr__().__eq__(other) + + +#=================================================================================================== +# MethodResult +#=================================================================================================== +class MethodResult(object): + """ """ + def __init__(self, status, text, outArgs): + """ """ + self.status = status + self.text = text + self.outArgs = outArgs + + def __getattr__(self, name): + if name in self.outArgs: + return self.outArgs[name] + + def __repr__(self): + return "%s (%d) - %s" % (self.text, self.status, self.outArgs) + + +#=================================================================================================== +# Broker +#=================================================================================================== +class Broker(Thread): + """ This object represents a connection (or potential connection) to a QMF broker. """ + SYNC_TIME = 60 + nextSeq = 1 + + # for connection recovery + DELAY_MIN = 1 + DELAY_MAX = 128 + DELAY_FACTOR = 2 + + class _q_item: + """ Broker-private class to encapsulate data sent to the broker thread + queue. + """ + type_wakeup = 0 + type_v1msg = 1 + type_v2msg = 2 + + def __init__(self, typecode, data): + self.typecode = typecode + self.data = data + + def __init__(self, session, host, port, authMechs, authUser, authPass, ssl=False, connTimeout=None): + """ Create a broker proxy and setup a connection to the broker. Will raise + an exception if the connection fails and the session is not configured to + retry connection setup (manageConnections = False). + + Spawns a thread to manage the broker connection. Call _shutdown() to + shutdown the thread when releasing the broker. + """ + Thread.__init__(self) + self.session = session + self.host = host + self.port = port + self.mechanisms = authMechs + self.ssl = ssl + if connTimeout is not None: + connTimeout = float(connTimeout) + self.connTimeout = connTimeout + self.authUser = authUser + self.authPass = authPass + self.saslUser = None + self.cv = Condition() + self.seqToAgentMap = {} + self.error = None + self.conn_exc = None # exception hit by _tryToConnect() + self.brokerId = None + self.connected = False + self.brokerAgent = None + self.brokerSupportsV2 = None + self.rcv_queue = Queue() # for msg received on session + self.conn = None + self.amqpSession = None + self.amqpSessionId = "%s.%d.%d" % (platform.uname()[1], os.getpid(), Broker.nextSeq) + Broker.nextSeq += 1 + self.last_age_check = time() + + # thread control + self.setDaemon(True) + self.setName("Thread for broker: %s:%d" % (host, port)) + self.canceled = False + self.ready = Semaphore(0) + self.start() + if not self.session.manageConnections: + # wait for connection setup to complete in subthread. + # On failure, propagate exception to caller + self.ready.acquire() + if self.conn_exc: + self._shutdown() # wait for the subthread to clean up... + raise self.conn_exc + # connection up - wait for stable... + try: + self._waitForStable() + agent = self.getBrokerAgent() + if agent: + agent.getObjects(_class="agent") + except: + self._shutdown() # wait for the subthread to clean up... + raise + + + def isConnected(self): + """ Return True if there is an active connection to the broker. """ + return self.connected + + def getError(self): + """ Return the last error message seen while trying to connect to the broker. """ + return self.error + + def getBrokerId(self): + """ Get broker's unique identifier (UUID) """ + return self.brokerId + + def getBrokerBank(self): + """ Return the broker-bank value. This is the value that the broker assigns to + objects within its control. This value appears as a field in the ObjectId + of objects created by agents controlled by this broker. """ + return 1 + + def getAgent(self, brokerBank, agentBank): + """ Return the agent object associated with a particular broker and agent bank value.""" + bankKey = str(agentBank) + try: + self.cv.acquire() + if bankKey in self.agents: + return self.agents[bankKey] + finally: + self.cv.release() + return None + + def getBrokerAgent(self): + return self.brokerAgent + + def getSessionId(self): + """ Get the identifier of the AMQP session to the broker """ + return self.amqpSessionId + + def getAgents(self): + """ Get the list of agents reachable via this broker """ + try: + self.cv.acquire() + return self.agents.values() + finally: + self.cv.release() + + def getAmqpSession(self): + """ Get the AMQP session object for this connected broker. """ + return self.amqpSession + + def getUrl(self): + """ """ + return "%s:%d" % (self.host, self.port) + + def getFullUrl(self, noAuthIfGuestDefault=True): + """ """ + ssl = "" + if self.ssl: + ssl = "s" + auth = "%s/%s@" % (self.authUser, self.authPass) + if self.authUser == "" or \ + (noAuthIfGuestDefault and self.authUser == "guest" and self.authPass == "guest"): + auth = "" + return "amqp%s://%s%s:%d" % (ssl, auth, self.host, self.port or 5672) + + def __repr__(self): + if self.connected: + return "Broker connected at: %s" % self.getUrl() + else: + return "Disconnected Broker" + + def _setSequence(self, sequence, agent): + try: + self.cv.acquire() + self.seqToAgentMap[sequence] = agent + finally: + self.cv.release() + + def _clearSequence(self, sequence): + try: + self.cv.acquire() + self.seqToAgentMap.pop(sequence) + finally: + self.cv.release() + + def _tryToConnect(self): + """ Connect to the broker. Returns True if connection setup completes + successfully, otherwise returns False and sets self.error/self.conn_exc + with error info. Does not raise exceptions. + """ + self.error = None + self.conn_exc = None + try: + try: + self.cv.acquire() + self.agents = {} + finally: + self.cv.release() + + self.topicBound = False + self.syncInFlight = False + self.syncRequest = 0 + self.syncResult = None + self.reqsOutstanding = 1 + + try: + if self.amqpSession: + self.amqpSession.close() + except: + pass + self.amqpSession = None + + try: + if self.conn: + self.conn.close() + except: + pass + self.conn = None + + sock = connect(self.host, self.port) + sock.settimeout(5) + oldTimeout = sock.gettimeout() + sock.settimeout(self.connTimeout) + if self.ssl: + connSock = ssl(sock) + else: + connSock = sock + self.conn = Connection(connSock, username=self.authUser, password=self.authPass, + mechanism = self.mechanisms, host=self.host, service="qpidd") + def aborted(): + raise Timeout("Waiting for connection to be established with broker") + oldAborted = self.conn.aborted + self.conn.aborted = aborted + self.conn.start() + sock.settimeout(oldTimeout) + self.conn.aborted = oldAborted + uid = self.conn.user_id + if uid.__class__ == tuple and len(uid) == 2: + self.saslUser = uid[1] + else: + self.saslUser = self.authUser + + # prevent topic queues from filling up (and causing the agents to + # disconnect) by discarding the oldest queued messages when full. + topic_queue_options = {"qpid.policy_type":"ring"} + + self.replyName = "reply-%s" % self.amqpSessionId + self.amqpSession = self.conn.session(self.amqpSessionId) + self.amqpSession.timeout = self.SYNC_TIME + self.amqpSession.auto_sync = True + self.amqpSession.queue_declare(queue=self.replyName, exclusive=True, auto_delete=True) + self.amqpSession.exchange_bind(exchange="amq.direct", + queue=self.replyName, binding_key=self.replyName) + self.amqpSession.message_subscribe(queue=self.replyName, destination="rdest", + accept_mode=self.amqpSession.accept_mode.none, + acquire_mode=self.amqpSession.acquire_mode.pre_acquired) + self.amqpSession.incoming("rdest").listen(self._v1Cb, self._exceptionCb) + self.amqpSession.message_set_flow_mode(destination="rdest", flow_mode=self.amqpSession.flow_mode.window) + self.amqpSession.message_flow(destination="rdest", unit=self.amqpSession.credit_unit.byte, value=0xFFFFFFFFL) + self.amqpSession.message_flow(destination="rdest", unit=self.amqpSession.credit_unit.message, value=200) + + self.topicName = "topic-%s" % self.amqpSessionId + self.amqpSession.queue_declare(queue=self.topicName, exclusive=True, + auto_delete=True, + arguments=topic_queue_options) + self.amqpSession.message_subscribe(queue=self.topicName, destination="tdest", + accept_mode=self.amqpSession.accept_mode.none, + acquire_mode=self.amqpSession.acquire_mode.pre_acquired) + self.amqpSession.incoming("tdest").listen(self._v1Cb, self._exceptionCb) + self.amqpSession.message_set_flow_mode(destination="tdest", flow_mode=self.amqpSession.flow_mode.window) + self.amqpSession.message_flow(destination="tdest", unit=self.amqpSession.credit_unit.byte, value=0xFFFFFFFFL) + self.amqpSession.message_flow(destination="tdest", unit=self.amqpSession.credit_unit.message, value=200) + + ## + ## Check to see if the broker has QMFv2 exchanges configured + ## + direct_result = self.amqpSession.exchange_query("qmf.default.direct") + topic_result = self.amqpSession.exchange_query("qmf.default.topic") + self.brokerSupportsV2 = not (direct_result.not_found or topic_result.not_found) + + try: + self.cv.acquire() + self.agents = {} + self.brokerAgent = Agent(self, 0, "BrokerAgent", isV2=self.brokerSupportsV2) + self.agents['0'] = self.brokerAgent + finally: + self.cv.release() + + ## + ## Set up connectivity for QMFv2 + ## + if self.brokerSupportsV2: + # set up 3 queues: + # 1 direct queue - for responses destined to this console. + # 2 topic queues - one for heartbeats (hb), one for unsolicited data + # and event indications (ui). + self.v2_direct_queue = "qmfc-v2-%s" % self.amqpSessionId + self.amqpSession.queue_declare(queue=self.v2_direct_queue, exclusive=True, auto_delete=True) + self.v2_topic_queue_ui = "qmfc-v2-ui-%s" % self.amqpSessionId + self.amqpSession.queue_declare(queue=self.v2_topic_queue_ui, + exclusive=True, auto_delete=True, + arguments=topic_queue_options) + self.v2_topic_queue_hb = "qmfc-v2-hb-%s" % self.amqpSessionId + self.amqpSession.queue_declare(queue=self.v2_topic_queue_hb, + exclusive=True, auto_delete=True, + arguments=topic_queue_options) + + self.amqpSession.exchange_bind(exchange="qmf.default.direct", + queue=self.v2_direct_queue, binding_key=self.v2_direct_queue) + ## Other bindings here... + + self.amqpSession.message_subscribe(queue=self.v2_direct_queue, destination="v2dest", + accept_mode=self.amqpSession.accept_mode.none, + acquire_mode=self.amqpSession.acquire_mode.pre_acquired) + self.amqpSession.incoming("v2dest").listen(self._v2Cb, self._exceptionCb) + self.amqpSession.message_set_flow_mode(destination="v2dest", flow_mode=self.amqpSession.flow_mode.window) + self.amqpSession.message_flow(destination="v2dest", unit=self.amqpSession.credit_unit.byte, value=0xFFFFFFFFL) + self.amqpSession.message_flow(destination="v2dest", unit=self.amqpSession.credit_unit.message, value=50) + + self.amqpSession.message_subscribe(queue=self.v2_topic_queue_ui, destination="v2TopicUI", + accept_mode=self.amqpSession.accept_mode.none, + acquire_mode=self.amqpSession.acquire_mode.pre_acquired) + self.amqpSession.incoming("v2TopicUI").listen(self._v2Cb, self._exceptionCb) + self.amqpSession.message_set_flow_mode(destination="v2TopicUI", flow_mode=self.amqpSession.flow_mode.window) + self.amqpSession.message_flow(destination="v2TopicUI", unit=self.amqpSession.credit_unit.byte, value=0xFFFFFFFFL) + self.amqpSession.message_flow(destination="v2TopicUI", unit=self.amqpSession.credit_unit.message, value=25) + + + self.amqpSession.message_subscribe(queue=self.v2_topic_queue_hb, destination="v2TopicHB", + accept_mode=self.amqpSession.accept_mode.none, + acquire_mode=self.amqpSession.acquire_mode.pre_acquired) + self.amqpSession.incoming("v2TopicHB").listen(self._v2Cb, self._exceptionCb) + self.amqpSession.message_set_flow_mode(destination="v2TopicHB", flow_mode=self.amqpSession.flow_mode.window) + self.amqpSession.message_flow(destination="v2TopicHB", unit=self.amqpSession.credit_unit.byte, value=0xFFFFFFFFL) + self.amqpSession.message_flow(destination="v2TopicHB", unit=self.amqpSession.credit_unit.message, value=100) + + codec = Codec() + self._setHeader(codec, 'B') + msg = self._message(codec.encoded) + self._send(msg) + + return True # connection complete + + except Exception, e: + self.error = "Exception during connection setup: %s - %s" % (e.__class__.__name__, e) + self.conn_exc = e + if self.session.console: + self.session.console.brokerConnectionFailed(self) + return False # connection failed + + def _updateAgent(self, obj): + """ + Just received an object of class "org.apache.qpid.broker:agent", which + represents a V1 agent. Add or update the list of agent proxies. + """ + bankKey = str(obj.agentBank) + agent = None + if obj._deleteTime == 0: + try: + self.cv.acquire() + if bankKey not in self.agents: + # add new agent only if label is not filtered + if len(self.session.agent_filter) == 0 or obj.label in self.session.agent_filter: + agent = Agent(self, obj.agentBank, obj.label) + self.agents[bankKey] = agent + finally: + self.cv.release() + if agent and self.session.console: + self.session._newAgentCallback(agent) + else: + try: + self.cv.acquire() + agent = self.agents.pop(bankKey, None) + if agent: + agent.close() + finally: + self.cv.release() + if agent and self.session.console: + self.session._delAgentCallback(agent) + + def _addAgent(self, name, agent): + try: + self.cv.acquire() + self.agents[name] = agent + finally: + self.cv.release() + if self.session.console: + self.session._newAgentCallback(agent) + + def _ageAgents(self): + if (time() - self.last_age_check) < self.session.agent_heartbeat_min: + # don't age if it's too soon + return + self.cv.acquire() + try: + to_delete = [] + to_notify = [] + for key in self.agents: + if self.agents[key].isOld(): + to_delete.append(key) + for key in to_delete: + agent = self.agents.pop(key) + agent.close() + to_notify.append(agent) + self.last_age_check = time() + finally: + self.cv.release() + if self.session.console: + for agent in to_notify: + self.session._delAgentCallback(agent) + + def _v2SendAgentLocate(self, predicate=[]): + """ + Broadcast an agent-locate request to cause all agents in the domain to tell us who they are. + """ + # @todo: send locate only to those agents in agent_filter? + dp = self.amqpSession.delivery_properties() + dp.routing_key = "console.request.agent_locate" + mp = self.amqpSession.message_properties() + mp.content_type = "amqp/list" + if self.saslUser: + mp.user_id = self.saslUser + mp.app_id = "qmf2" + mp.reply_to = self.amqpSession.reply_to("qmf.default.direct", self.v2_direct_queue) + mp.application_headers = {'qmf.opcode':'_agent_locate_request'} + sendCodec = Codec() + sendCodec.write_list(predicate) + msg = Message(dp, mp, sendCodec.encoded) + self._send(msg, "qmf.default.topic") + + def _setHeader(self, codec, opcode, seq=0): + """ Compose the header of a management message. """ + codec.write_uint8(ord('A')) + codec.write_uint8(ord('M')) + codec.write_uint8(ord('2')) + codec.write_uint8(ord(opcode)) + codec.write_uint32(seq) + + def _checkHeader(self, codec): + """ Check the header of a management message and extract the opcode and class. """ + try: + octet = chr(codec.read_uint8()) + if octet != 'A': + return None, None + octet = chr(codec.read_uint8()) + if octet != 'M': + return None, None + octet = chr(codec.read_uint8()) + if octet != '2': + return None, None + opcode = chr(codec.read_uint8()) + seq = codec.read_uint32() + return opcode, seq + except: + return None, None + + def _message (self, body, routing_key="broker", ttl=None): + dp = self.amqpSession.delivery_properties() + dp.routing_key = routing_key + if ttl: + dp.ttl = ttl + mp = self.amqpSession.message_properties() + mp.content_type = "x-application/qmf" + if self.saslUser: + mp.user_id = self.saslUser + mp.reply_to = self.amqpSession.reply_to("amq.direct", self.replyName) + return Message(dp, mp, body) + + def _send(self, msg, dest="qpid.management"): + self.amqpSession.message_transfer(destination=dest, message=msg) + + def _disconnect(self, err_info=None): + """ Called when the remote broker has disconnected. Re-initializes all + state associated with the broker. + """ + # notify any waiters, and callback + self.cv.acquire() + try: + if err_info is not None: + self.error = err_info + _agents = self.agents + self.agents = {} + for agent in _agents.itervalues(): + agent.close() + self.syncInFlight = False + self.reqsOutstanding = 0 + self.cv.notifyAll() + finally: + self.cv.release() + + if self.session.console: + for agent in _agents.itervalues(): + self.session._delAgentCallback(agent) + + def _shutdown(self, _timeout=10): + """ Disconnect from a broker, and release its resources. Errors are + ignored. + """ + if self.isAlive(): + # kick the thread + self.canceled = True + self.rcv_queue.put(Broker._q_item(Broker._q_item.type_wakeup, None)) + self.join(_timeout) + + # abort any pending transactions and delete agents + self._disconnect("broker shutdown") + + try: + if self.amqpSession: + self.amqpSession.close(); + except: + pass + self.amqpSession = None + try: + if self.conn: + self.conn.close() + except: + pass + self.conn = None + self.connected = False + + def _waitForStable(self): + try: + self.cv.acquire() + if not self.connected: + return + if self.reqsOutstanding == 0: + return + self.syncInFlight = True + starttime = time() + while self.reqsOutstanding != 0: + self.cv.wait(self.SYNC_TIME) + if time() - starttime > self.SYNC_TIME: + raise RuntimeError("Timed out waiting for broker to synchronize") + finally: + self.cv.release() + + def _incOutstanding(self): + try: + self.cv.acquire() + self.reqsOutstanding += 1 + finally: + self.cv.release() + + def _decOutstanding(self): + try: + self.cv.acquire() + self.reqsOutstanding -= 1 + if self.reqsOutstanding == 0 and not self.topicBound: + self.topicBound = True + for key in self.session.v1BindingKeyList: + self.amqpSession.exchange_bind(exchange="qpid.management", + queue=self.topicName, binding_key=key) + if self.brokerSupportsV2: + # do not drop heartbeat indications when under load from data + # or event indications. Put heartbeats on their own dedicated + # queue. + # + for key in self.session.v2BindingKeyList: + if key.startswith("agent.ind.heartbeat"): + self.amqpSession.exchange_bind(exchange="qmf.default.topic", + queue=self.v2_topic_queue_hb, + binding_key=key) + else: + self.amqpSession.exchange_bind(exchange="qmf.default.topic", + queue=self.v2_topic_queue_ui, + binding_key=key) + # solicit an agent locate now, after we bind to agent.ind.data, + # because the agent locate will cause the agent to publish a + # data indication - and now we're able to receive it! + self._v2SendAgentLocate() + + + if self.reqsOutstanding == 0 and self.syncInFlight: + self.syncInFlight = False + self.cv.notify() + finally: + self.cv.release() + + def _v1Cb(self, msg): + """ Callback from session receive thread for V1 messages + """ + self.rcv_queue.put(Broker._q_item(Broker._q_item.type_v1msg, msg)) + + def _v1Dispatch(self, msg): + try: + self._v1DispatchProtected(msg) + except Exception, e: + print "EXCEPTION in Broker._v1Cb:", e + import traceback + traceback.print_exc() + + def _v1DispatchProtected(self, msg): + """ + This is the general message handler for messages received via the QMFv1 exchanges. + """ + try: + agent = None + agent_addr = None + mp = msg.get("message_properties") + ah = mp.application_headers + if ah and 'qmf.agent' in ah: + agent_addr = ah['qmf.agent'] + + if not agent_addr: + # + # See if we can determine the agent identity from the routing key + # + dp = msg.get("delivery_properties") + rkey = None + if dp and dp.routing_key: + rkey = dp.routing_key + items = rkey.split('.') + if len(items) >= 4: + if items[0] == 'console' and items[3].isdigit(): + agent_addr = str(items[3]) # The QMFv1 Agent Bank + if agent_addr != None and agent_addr in self.agents: + agent = self.agents[agent_addr] + + codec = Codec(msg.body) + alreadyTried = None + while True: + opcode, seq = self._checkHeader(codec) + + if not agent and not alreadyTried: + alreadyTried = True + try: + self.cv.acquire() + if seq in self.seqToAgentMap: + agent = self.seqToAgentMap[seq] + finally: + self.cv.release() + + if opcode == None: break + if opcode == 'b': self.session._handleBrokerResp (self, codec, seq) + elif opcode == 'p': self.session._handlePackageInd (self, codec, seq) + elif opcode == 'q': self.session._handleClassInd (self, codec, seq) + elif opcode == 's': self.session._handleSchemaResp (self, codec, seq, agent_addr) + elif opcode == 'h': self.session._handleHeartbeatInd (self, codec, seq, msg) + elif opcode == 'z': self.session._handleCommandComplete (self, codec, seq, agent) + elif agent: + agent._handleQmfV1Message(opcode, seq, mp, ah, codec) + agent.touch() # mark agent as being alive + + finally: # always ack the message! + try: + # ignore failures as the session may be shutting down... + self.amqpSession.receiver._completed.add(msg.id) + self.amqpSession.channel.session_completed(self.amqpSession.receiver._completed) + except: + pass + + + def _v2Cb(self, msg): + """ Callback from session receive thread for V2 messages + """ + self.rcv_queue.put(Broker._q_item(Broker._q_item.type_v2msg, msg)) + + def _v2Dispatch(self, msg): + try: + self._v2DispatchProtected(msg) + except Exception, e: + print "EXCEPTION in Broker._v2Cb:", e + import traceback + traceback.print_exc() + + def _v2DispatchProtected(self, msg): + """ + This is the general message handler for messages received via QMFv2 exchanges. + """ + try: + mp = msg.get("message_properties") + ah = mp["application_headers"] + codec = Codec(msg.body) + + if 'qmf.opcode' in ah: + opcode = ah['qmf.opcode'] + if mp.content_type == "amqp/list": + try: + content = codec.read_list() + if not content: + content = [] + except: + # malformed list - ignore + content = None + elif mp.content_type == "amqp/map": + try: + content = codec.read_map() + if not content: + content = {} + except: + # malformed map - ignore + content = None + else: + content = None + + if content != None: + ## + ## Directly handle agent heartbeats and agent locate responses as these are broker-scope (they are + ## used to maintain the broker's list of agent proxies. + ## + if opcode == '_agent_heartbeat_indication': self.session._v2HandleHeartbeatInd(self, mp, ah, content) + elif opcode == '_agent_locate_response': self.session._v2HandleAgentLocateRsp(self, mp, ah, content) + else: + ## + ## All other opcodes are agent-scope and are forwarded to the agent proxy representing the sender + ## of the message. + ## + agent_addr = ah['qmf.agent'] + if agent_addr == 'broker': + agent_addr = '0' + if agent_addr in self.agents: + agent = self.agents[agent_addr] + agent._handleQmfV2Message(opcode, mp, ah, content) + agent.touch() + + finally: # always ack the message! + try: + # ignore failures as the session may be shutting down... + self.amqpSession.receiver._completed.add(msg.id) + self.amqpSession.channel.session_completed(self.amqpSession.receiver._completed) + except: + pass + + def _exceptionCb(self, data): + """ Exception notification callback from session receive thread. + """ + self.cv.acquire() + try: + self.connected = False + self.error = "exception received from messaging layer: %s" % str(data) + finally: + self.cv.release() + self.rcv_queue.put(Broker._q_item(Broker._q_item.type_wakeup, None)) + + def run(self): + """ Main body of the running thread. """ + + # First, attempt a connection. In the unmanaged case, + # failure to connect needs to cause the Broker() + # constructor to raise an exception. + delay = self.DELAY_MIN + while not self.canceled: + if self._tryToConnect(): # connection up + break + # unmanaged connection - fail & wake up constructor + if not self.session.manageConnections: + self.ready.release() + return + # managed connection - try again + count = 0 + while not self.canceled and count < delay: + sleep(1) + count += 1 + if delay < self.DELAY_MAX: + delay *= self.DELAY_FACTOR + + if self.canceled: + self.ready.release() + return + + # connection successful! + self.cv.acquire() + try: + self.connected = True + finally: + self.cv.release() + + self.session._handleBrokerConnect(self) + self.ready.release() + + while not self.canceled: + + try: + item = self.rcv_queue.get(timeout=self.session.agent_heartbeat_min) + except Empty: + item = None + + while not self.canceled and item is not None: + + if not self.connected: + # connection failure + while item: + # drain the queue + try: + item = self.rcv_queue.get(block=False) + except Empty: + item = None + break + + self._disconnect() # clean up any pending agents + self.session._handleError(self.error) + self.session._handleBrokerDisconnect(self) + + if not self.session.manageConnections: + return # do not attempt recovery + + # retry connection setup + delay = self.DELAY_MIN + while not self.canceled: + if self._tryToConnect(): + break + # managed connection - try again + count = 0 + while not self.canceled and count < delay: + sleep(1) + count += 1 + if delay < self.DELAY_MAX: + delay *= self.DELAY_FACTOR + + if self.canceled: + return + + # connection successful! + self.cv.acquire() + try: + self.connected = True + finally: + self.cv.release() + + self.session._handleBrokerConnect(self) + + elif item.typecode == Broker._q_item.type_v1msg: + self._v1Dispatch(item.data) + elif item.typecode == Broker._q_item.type_v2msg: + self._v2Dispatch(item.data) + + try: + item = self.rcv_queue.get(block=False) + except Empty: + item = None + + # queue drained, age the agents... + if not self.canceled: + self._ageAgents() + +#=================================================================================================== +# Agent +#=================================================================================================== +class Agent: + """ + This class represents a proxy for a remote agent being managed + """ + def __init__(self, broker, agentBank, label, isV2=False, interval=0): + self.broker = broker + self.session = broker.session + self.schemaCache = self.session.schemaCache + self.brokerBank = broker.getBrokerBank() + self.agentBank = str(agentBank) + self.label = label + self.isV2 = isV2 + self.heartbeatInterval = 0 + if interval: + if interval < self.session.agent_heartbeat_min: + self.heartbeatInterval = self.session.agent_heartbeat_min + else: + self.heartbeatInterval = interval + self.lock = Lock() + self.seqMgr = self.session.seqMgr + self.contextMap = {} + self.unsolicitedContext = RequestContext(self, self) + self.lastSeenTime = time() + self.closed = None + self.epoch = 0 + self.schema_timestamp = None + + + def _checkClosed(self): + if self.closed: + raise Exception("Agent is disconnected") + + + def __call__(self, **kwargs): + """ + This is the handler for unsolicited stuff received from the agent + """ + if 'qmf_object' in kwargs: + if self.session.console: + obj = kwargs['qmf_object'] + if len(self.session.class_filter) == 0: + self.session.console.objectProps(self.broker, obj) + elif obj.getClassKey(): + # slow path: check classKey against event_filter + pname = obj.getClassKey().getPackageName() + cname = obj.getClassKey().getClassName() + if ((pname, cname) in self.session.class_filter + or (pname, None) in self.session.class_filter): + self.session.console.objectProps(self.broker, obj) + elif 'qmf_object_stats' in kwargs: + if self.session.console: + obj = kwargs['qmf_object_stats'] + if len(self.session.class_filter) == 0: + self.session.console.objectStats(self.broker, obj) + elif obj.getClassKey(): + # slow path: check classKey against event_filter + pname = obj.getClassKey().getPackageName() + cname = obj.getClassKey().getClassName() + if ((pname, cname) in self.session.class_filter + or (pname, None) in self.session.class_filter): + self.session.console.objectStats(self.broker, obj) + elif 'qmf_event' in kwargs: + if self.session.console: + event = kwargs['qmf_event'] + if len(self.session.event_filter) == 0: + self.session.console.event(self.broker, event) + elif event.classKey: + # slow path: check classKey against event_filter + pname = event.classKey.getPackageName() + ename = event.classKey.getClassName() + if ((pname, ename) in self.session.event_filter + or (pname, None) in self.session.event_filter): + self.session.console.event(self.broker, event) + elif 'qmf_schema_id' in kwargs: + ckey = kwargs['qmf_schema_id'] + new_pkg, new_cls = self.session.schemaCache.declareClass(ckey) + if self.session.console: + if new_pkg: + self.session._newPackageCallback(ckey.getPackageName()) + if new_cls: + # translate V2's string based type value to legacy + # integer value for backward compatibility + cls_type = ckey.getType() + if str(cls_type) == ckey.TYPE_DATA: + cls_type = 1 + elif str(cls_type) == ckey.TYPE_EVENT: + cls_type = 2 + self.session._newClassCallback(cls_type, ckey) + + def touch(self): + if self.heartbeatInterval: + self.lastSeenTime = time() + + + def setEpoch(self, epoch): + self.epoch = epoch + + def update_schema_timestamp(self, timestamp): + """ Check the latest schema timestamp from the agent V2 heartbeat. Issue a + query for all packages & classes should the timestamp change. + """ + self.lock.acquire() + try: + if self.schema_timestamp == timestamp: + return + self.schema_timestamp = timestamp + + context = RequestContext(self, self) + sequence = self.seqMgr._reserve(context) + + self.contextMap[sequence] = context + context.setSequence(sequence) + + finally: + self.lock.release() + + self._v2SendSchemaIdQuery(sequence, {}) + + + def epochMismatch(self, epoch): + if epoch == 0 or self.epoch == 0: + return None + if epoch == self.epoch: + return None + return True + + + def isOld(self): + if self.heartbeatInterval == 0: + return None + if time() - self.lastSeenTime > (self.session.agent_heartbeat_miss * self.heartbeatInterval): + return True + return None + + + def close(self): + self.closed = True + copy = {} + try: + self.lock.acquire() + for seq in self.contextMap: + copy[seq] = self.contextMap[seq] + finally: + self.lock.release() + + for seq in copy: + context = copy[seq] + context.cancel("Agent disconnected") + self.seqMgr._release(seq) + + + def __repr__(self): + if self.isV2: + ver = "v2" + else: + ver = "v1" + return "Agent(%s) at bank %d.%s (%s)" % (ver, self.brokerBank, self.agentBank, self.label) + + + def getBroker(self): + return self.broker + + + def getBrokerBank(self): + return self.brokerBank + + + def getAgentBank(self): + return self.agentBank + + + def getV2RoutingKey(self): + if self.agentBank == '0': + return 'broker' + return self.agentBank + + + def getObjects(self, notifiable=None, **kwargs): + """ Get a list of objects from QMF agents. + All arguments are passed by name(keyword). + + If 'notifiable' is None (default), this call will block until completion or timeout. + If supplied, notifiable is assumed to be a callable object that will be called when the + list of queried objects arrives. The single argument to the call shall be a list of + the returned objects. + + The class for queried objects may be specified in one of the following ways: + + _schema = <schema> - supply a schema object returned from getSchema. + _key = <key> - supply a classKey from the list returned by getClasses. + _class = <name> - supply a class name as a string. If the class name exists + in multiple packages, a _package argument may also be supplied. + _objectId = <id> - get the object referenced by the object-id + + The default timeout for this synchronous operation is 60 seconds. To change the timeout, + use the following argument: + + _timeout = <time in seconds> + + If additional arguments are supplied, they are used as property selectors. For example, + if the argument name="test" is supplied, only objects whose "name" property is "test" + will be returned in the result. + """ + self._checkClosed() + if notifiable: + if not callable(notifiable): + raise Exception("notifiable object must be callable") + + # + # Isolate the selectors from the kwargs + # + selectors = {} + for key in kwargs: + value = kwargs[key] + if key[0] != '_': + selectors[key] = value + + # + # Allocate a context to track this asynchronous request. + # + context = RequestContext(self, notifiable, selectors) + sequence = self.seqMgr._reserve(context) + try: + self.lock.acquire() + self.contextMap[sequence] = context + context.setSequence(sequence) + finally: + self.lock.release() + + # + # Compose and send the query message to the agent using the appropriate protocol for the + # agent's QMF version. + # + if self.isV2: + self._v2SendGetQuery(sequence, kwargs) + else: + self.broker._setSequence(sequence, self) + self._v1SendGetQuery(sequence, kwargs) + + # + # If this is a synchronous call, block and wait for completion. + # + if not notifiable: + timeout = 60 + if '_timeout' in kwargs: + timeout = kwargs['_timeout'] + context.waitForSignal(timeout) + if context.exception: + raise Exception(context.exception) + result = context.queryResults + return result + + + def _clearContext(self, sequence): + try: + self.lock.acquire() + try: + self.contextMap.pop(sequence) + except KeyError: + pass # @todo - shouldn't happen, log a warning. + finally: + self.lock.release() + + + def _schemaInfoFromV2Agent(self): + """ + We have just received new schema information from this agent. Check to see if there's + more work that can now be done. + """ + try: + self.lock.acquire() + copy_of_map = {} + for item in self.contextMap: + copy_of_map[item] = self.contextMap[item] + finally: + self.lock.release() + + self.unsolicitedContext.reprocess() + for context in copy_of_map: + copy_of_map[context].reprocess() + + + def _handleV1Completion(self, sequence, code, text): + """ + Called if one of this agent's V1 commands completed + """ + context = None + try: + self.lock.acquire() + if sequence in self.contextMap: + context = self.contextMap[sequence] + finally: + self.lock.release() + + if context: + if code != 0: + ex = "Error %d: %s" % (code, text) + context.setException(ex) + context.signal() + self.broker._clearSequence(sequence) + + + def _v1HandleMethodResp(self, codec, seq): + """ + Handle a QMFv1 method response + """ + code = codec.read_uint32() + text = codec.read_str16() + outArgs = {} + self.broker._clearSequence(seq) + pair = self.seqMgr._release(seq) + if pair == None: + return + method, synchronous = pair + if code == 0: + for arg in method.arguments: + if arg.dir.find("O") != -1: + outArgs[arg.name] = self.session._decodeValue(codec, arg.type, self.broker) + result = MethodResult(code, text, outArgs) + if synchronous: + try: + self.broker.cv.acquire() + self.broker.syncResult = result + self.broker.syncInFlight = False + self.broker.cv.notify() + finally: + self.broker.cv.release() + else: + if self.session.console: + self.session.console.methodResponse(self.broker, seq, result) + + + def _v1HandleEventInd(self, codec, seq): + """ + Handle a QMFv1 event indication + """ + event = Event(self, codec) + self.unsolicitedContext.doEvent(event) + + + def _v1HandleContentInd(self, codec, sequence, prop=False, stat=False): + """ + Handle a QMFv1 content indication + """ + classKey = ClassKey(codec) + schema = self.schemaCache.getSchema(classKey) + if not schema: + return + + obj = Object(self, schema, codec, prop, stat) + if classKey.getPackageName() == "org.apache.qpid.broker" and classKey.getClassName() == "agent" and prop: + self.broker._updateAgent(obj) + + context = self.unsolicitedContext + try: + self.lock.acquire() + if sequence in self.contextMap: + context = self.contextMap[sequence] + finally: + self.lock.release() + + context.addV1QueryResult(obj, prop, stat) + + + def _v2HandleDataInd(self, mp, ah, content): + """ + Handle a QMFv2 data indication from the agent. Note: called from context + of the Broker thread. + """ + if content.__class__ != list: + return + + if mp.correlation_id: + try: + self.lock.acquire() + sequence = int(mp.correlation_id) + if sequence not in self.contextMap: + return + context = self.contextMap[sequence] + finally: + self.lock.release() + else: + context = self.unsolicitedContext + + kind = "_data" + if "qmf.content" in ah: + kind = ah["qmf.content"] + if kind == "_data": + for omap in content: + context.addV2QueryResult(omap) + context.processV2Data() + if 'partial' not in ah: + context.signal() + + elif kind == "_event": + for omap in content: + event = Event(self, v2Map=omap) + if event.classKey is None or event.schema: + # schema optional or present + context.doEvent(event) + else: + # schema not optional and not present + if context.addPendingEvent(event): + self._v2SendSchemaRequest(event.classKey) + + elif kind == "_schema_id": + for sid in content: + try: + ckey = ClassKey(sid) + except: + # @todo: log error + ckey = None + if ckey is not None: + # @todo: for now, the application cannot directly send a query for + # _schema_id. This request _must_ have been initiated by the framework + # in order to update the schema cache. + context.notifiable(qmf_schema_id=ckey) + + + def _v2HandleMethodResp(self, mp, ah, content): + """ + Handle a QMFv2 method response from the agent + """ + context = None + sequence = None + if mp.correlation_id: + try: + self.lock.acquire() + seq = int(mp.correlation_id) + finally: + self.lock.release() + else: + return + + pair = self.seqMgr._release(seq) + if pair == None: + return + method, synchronous = pair + + result = MethodResult(0, 'OK', content['_arguments']) + if synchronous: + try: + self.broker.cv.acquire() + self.broker.syncResult = result + self.broker.syncInFlight = False + self.broker.cv.notify() + finally: + self.broker.cv.release() + else: + if self.session.console: + self.session.console.methodResponse(self.broker, seq, result) + + def _v2HandleException(self, mp, ah, content): + """ + Handle a QMFv2 exception + """ + context = None + if mp.correlation_id: + try: + self.lock.acquire() + seq = int(mp.correlation_id) + finally: + self.lock.release() + else: + return + + values = {} + if '_values' in content: + values = content['_values'] + + code = 7 + text = "error" + if 'error_code' in values: + code = values['error_code'] + if 'error_text' in values: + text = values['error_text'] + + pair = self.seqMgr._release(seq) + if pair == None: + return + + if pair.__class__ == RequestContext: + pair.cancel(text) + return + + method, synchronous = pair + + result = MethodResult(code, text, {}) + if synchronous: + try: + self.broker.cv.acquire() + self.broker.syncResult = result + self.broker.syncInFlight = False + self.broker.cv.notify() + finally: + self.broker.cv.release() + else: + if self.session.console: + self.session.console.methodResponse(self.broker, seq, result) + + + def _v1SendGetQuery(self, sequence, kwargs): + """ + Send a get query to a QMFv1 agent. + """ + # + # Build the query map + # + query = {} + if '_class' in kwargs: + query['_class'] = kwargs['_class'] + if '_package' in kwargs: + query['_package'] = kwargs['_package'] + elif '_key' in kwargs: + key = kwargs['_key'] + query['_class'] = key.getClassName() + query['_package'] = key.getPackageName() + elif '_objectId' in kwargs: + query['_objectid'] = kwargs['_objectId'].__repr__() + + # + # Construct and transmit the message + # + sendCodec = Codec() + self.broker._setHeader(sendCodec, 'G', sequence) + sendCodec.write_map(query) + smsg = self.broker._message(sendCodec.encoded, "agent.%d.%s" % (self.brokerBank, self.agentBank)) + self.broker._send(smsg) + + + def _v2SendQuery(self, query, sequence): + """ + Given a query map, construct and send a V2 Query message. + """ + dp = self.broker.amqpSession.delivery_properties() + dp.routing_key = self.getV2RoutingKey() + mp = self.broker.amqpSession.message_properties() + mp.content_type = "amqp/map" + if self.broker.saslUser: + mp.user_id = self.broker.saslUser + mp.correlation_id = str(sequence) + mp.app_id = "qmf2" + mp.reply_to = self.broker.amqpSession.reply_to("qmf.default.direct", self.broker.v2_direct_queue) + mp.application_headers = {'qmf.opcode':'_query_request'} + sendCodec = Codec() + sendCodec.write_map(query) + msg = Message(dp, mp, sendCodec.encoded) + self.broker._send(msg, "qmf.default.direct") + + + def _v2SendGetQuery(self, sequence, kwargs): + """ + Send a get query to a QMFv2 agent. + """ + # + # Build the query map + # + query = {'_what': 'OBJECT'} + if '_class' in kwargs: + schemaMap = {'_class_name': kwargs['_class']} + if '_package' in kwargs: + schemaMap['_package_name'] = kwargs['_package'] + query['_schema_id'] = schemaMap + elif '_key' in kwargs: + query['_schema_id'] = kwargs['_key'].asMap() + elif '_objectId' in kwargs: + query['_object_id'] = kwargs['_objectId'].asMap() + + self._v2SendQuery(query, sequence) + + + def _v2SendSchemaIdQuery(self, sequence, kwargs): + """ + Send a query for all schema ids to a QMFv2 agent. + """ + # + # Build the query map + # + query = {'_what': 'SCHEMA_ID'} + # @todo - predicate support. For now, return all known schema ids. + + self._v2SendQuery(query, sequence) + + + def _v2SendSchemaRequest(self, schemaId): + """ + Send a query to an agent to request details on a particular schema class. + IMPORTANT: This function currently sends a QMFv1 schema-request to the address of + the agent. The agent will send its response to amq.direct/<our-key>. + Eventually, this will be converted to a proper QMFv2 schema query. + """ + sendCodec = Codec() + seq = self.seqMgr._reserve(None) + self.broker._setHeader(sendCodec, 'S', seq) + schemaId.encode(sendCodec) + smsg = self.broker._message(sendCodec.encoded, self.agentBank) + self.broker._send(smsg, "qmf.default.direct") + + + def _handleQmfV1Message(self, opcode, seq, mp, ah, codec): + """ + Process QMFv1 messages arriving from an agent. Note well: this method is + called from the context of the Broker thread. + """ + if opcode == 'm': self._v1HandleMethodResp(codec, seq) + elif opcode == 'e': self._v1HandleEventInd(codec, seq) + elif opcode == 'c': self._v1HandleContentInd(codec, seq, prop=True) + elif opcode == 'i': self._v1HandleContentInd(codec, seq, stat=True) + elif opcode == 'g': self._v1HandleContentInd(codec, seq, prop=True, stat=True) + + + def _handleQmfV2Message(self, opcode, mp, ah, content): + """ + Process QMFv2 messages arriving from an agent. Note well: this method is + called from the context of the Broker thread. + """ + if opcode == '_data_indication': self._v2HandleDataInd(mp, ah, content) + elif opcode == '_query_response': self._v2HandleDataInd(mp, ah, content) + elif opcode == '_method_response': self._v2HandleMethodResp(mp, ah, content) + elif opcode == '_exception': self._v2HandleException(mp, ah, content) + + +#=================================================================================================== +# RequestContext +#=================================================================================================== +class RequestContext(object): + """ + This class tracks an asynchronous request sent to an agent. + TODO: Add logic for client-side selection and filtering deleted objects from get-queries + """ + def __init__(self, agent, notifiable, selectors={}): + self.sequence = None + self.agent = agent + self.schemaCache = self.agent.schemaCache + self.notifiable = notifiable + self.selectors = selectors + self.startTime = time() + self.rawQueryResults = [] + self.queryResults = [] + self.pendingEvents = {} + self.exception = None + self.waitingForSchema = None + self.pendingSignal = None + self.cv = Condition() + self.blocked = notifiable == None + + + def setSequence(self, sequence): + self.sequence = sequence + + + def addV1QueryResult(self, data, has_props, has_stats): + values = {} + if has_props: + for prop, val in data.getProperties(): + values[prop.name] = val + if has_stats: + for stat, val in data.getStatistics(): + values[stat.name] = val + for key in values: + val = values[key] + if key in self.selectors and val != self.selectors[key]: + return + + if self.notifiable: + if has_props: + self.notifiable(qmf_object=data) + if has_stats: + self.notifiable(qmf_object_stats=data) + else: + self.queryResults.append(data) + + + def addV2QueryResult(self, data): + values = data['_values'] + for key in values: + val = values[key] + if key in self.selectors: + sel_val = self.selectors[key] + if sel_val.__class__ == ObjectId: + val = ObjectId(val, agentName=self.agent.getAgentBank()) + if val != sel_val: + return + self.rawQueryResults.append(data) + + def addPendingEvent(self, event): + """ Stores a received event that is pending a schema. Returns True if this + event is the first instance of a given schema identifier. + """ + self.cv.acquire() + try: + if event.classKey in self.pendingEvents: + self.pendingEvents[event.classKey].append((event, time())) + return False + self.pendingEvents[event.classKey] = [(event, time())] + return True + finally: + self.cv.release() + + def processPendingEvents(self): + """ Walk the pending events looking for schemas that are now + available. Remove any events that now have schema, and process them. + """ + keysToDelete = [] + events = [] + self.cv.acquire() + try: + for key in self.pendingEvents.iterkeys(): + schema = self.schemaCache.getSchema(key) + if schema: + keysToDelete.append(key) + for item in self.pendingEvents[key]: + # item is (timestamp, event-obj) tuple. + # hack: I have no idea what a valid lifetime for an event + # should be. 60 seconds??? + if (time() - item[1]) < 60: + item[0].schema = schema + events.append(item[0]) + for key in keysToDelete: + self.pendingEvents.pop(key) + finally: + self.cv.release() + for event in events: + self.doEvent(event) + + def doEvent(self, data): + if self.notifiable: + self.notifiable(qmf_event=data) + + + def setException(self, ex): + self.exception = ex + + + def getAge(self): + return time() - self.startTime + + + def cancel(self, exception): + self.setException(exception) + try: + self.cv.acquire() + self.blocked = None + self.waitingForSchema = None + self.cv.notify() + finally: + self.cv.release() + self._complete() + + + def waitForSignal(self, timeout): + try: + self.cv.acquire() + while self.blocked: + if (time() - self.startTime) > timeout: + self.exception = "Request timed out after %d seconds" % timeout + return + self.cv.wait(1) + finally: + self.cv.release() + + + def signal(self): + try: + self.cv.acquire() + if self.waitingForSchema: + self.pendingSignal = True + return + else: + self.blocked = None + self.cv.notify() + finally: + self.cv.release() + self._complete() + + + def _complete(self): + if self.notifiable: + if self.exception: + self.notifiable(qmf_exception=self.exception) + else: + self.notifiable(qmf_complete=True) + + if self.sequence: + self.agent._clearContext(self.sequence) + + + def processV2Data(self): + """ + Attempt to make progress on the entries in the raw_query_results queue. If an entry has a schema + that is in our schema cache, process it. Otherwise, send a request for the schema information + to the agent that manages the object. + """ + schemaId = None + queryResults = [] + try: + self.cv.acquire() + if self.waitingForSchema: + return + while (not self.waitingForSchema) and len(self.rawQueryResults) > 0: + head = self.rawQueryResults[0] + schemaId = self._getSchemaIdforV2ObjectLH(head) + schema = self.schemaCache.getSchema(schemaId) + if schema: + obj = Object(self.agent, schema, v2Map=head, agentName=self.agent.agentBank) + queryResults.append(obj) + self.rawQueryResults.pop(0) + else: + self.waitingForSchema = True + finally: + self.cv.release() + + if self.waitingForSchema: + self.agent._v2SendSchemaRequest(schemaId) + + for result in queryResults: + key = result.getClassKey() + if key.getPackageName() == "org.apache.qpid.broker" and key.getClassName() == "agent": + self.agent.broker._updateAgent(result) + if self.notifiable: + self.notifiable(qmf_object=result) + else: + self.queryResults.append(result) + + complete = None + try: + self.cv.acquire() + if not self.waitingForSchema and self.pendingSignal: + self.blocked = None + self.cv.notify() + complete = True + finally: + self.cv.release() + + if complete: + self._complete() + + + def reprocess(self): + """ + New schema information has been added to the schema-cache. Clear our 'waiting' status + and see if we can make more progress on any pending inbound events/objects. + """ + try: + self.cv.acquire() + self.waitingForSchema = None + finally: + self.cv.release() + self.processV2Data() + self.processPendingEvents() + + def _getSchemaIdforV2ObjectLH(self, data): + """ + Given a data map, extract the schema-identifier. + """ + if data.__class__ != dict: + return None + if '_schema_id' in data: + return ClassKey(data['_schema_id']) + return None + + +#=================================================================================================== +# Event +#=================================================================================================== +class Event: + """ """ + def __init__(self, agent, codec=None, v2Map=None): + self.agent = agent + self.session = agent.session + self.broker = agent.broker + + if isinstance(v2Map,dict): + self.classKey = None + self.schema = None + try: + self.arguments = v2Map["_values"] + self.timestamp = long(v2Map["_timestamp"]) + self.severity = v2Map["_severity"] + if "_schema_id" in v2Map: + self.classKey = ClassKey(v2Map["_schema_id"]) + self.classKey._setType(ClassKey.TYPE_EVENT) + except: + raise Exception("Invalid event object: %s " % str(v2Map)) + if self.classKey is not None: + self.schema = self.session.schemaCache.getSchema(self.classKey) + + elif codec is not None: + self.classKey = ClassKey(codec) + self.classKey._setType(ClassKey.TYPE_EVENT) + self.timestamp = codec.read_int64() + self.severity = codec.read_uint8() + self.arguments = {} + self.schema = self.session.schemaCache.getSchema(self.classKey) + if not self.schema: + return + for arg in self.schema.arguments: + self.arguments[arg.name] = self.session._decodeValue(codec, arg.type, + self.broker) + else: + raise Exception("No constructor for event object.") + + + def __repr__(self): + if self.schema == None: + return "<uninterpretable>" + out = strftime("%c", gmtime(self.timestamp / 1000000000)) + out += " " + self._sevName() + " " + self.classKey.getPackageName() + ":" + self.classKey.getClassName() + out += " broker=" + self.broker.getUrl() + for arg in self.schema.arguments: + disp = self.session._displayValue(self.arguments[arg.name], arg.type).encode("utf8") + if " " in disp: + disp = "\"" + disp + "\"" + out += " " + arg.name + "=" + disp + return out + + def _sevName(self): + if self.severity == 0 : return "EMER " + if self.severity == 1 : return "ALERT" + if self.severity == 2 : return "CRIT " + if self.severity == 3 : return "ERROR" + if self.severity == 4 : return "WARN " + if self.severity == 5 : return "NOTIC" + if self.severity == 6 : return "INFO " + if self.severity == 7 : return "DEBUG" + return "INV-%d" % self.severity + + def getClassKey(self): + return self.classKey + + def getArguments(self): + return self.arguments + + def getTimestamp(self): + return self.timestamp + + def getSchema(self): + return self.schema + + +#=================================================================================================== +# SequenceManager +#=================================================================================================== +class SequenceManager: + """ Manage sequence numbers for asynchronous method calls """ + def __init__(self): + self.lock = Lock() + self.sequence = long(time()) # pseudo-randomize the start + self.pending = {} + + def _reserve(self, data): + """ Reserve a unique sequence number """ + try: + self.lock.acquire() + result = self.sequence + self.sequence = self.sequence + 1 + self.pending[result] = data + finally: + self.lock.release() + return result + + def _release(self, seq): + """ Release a reserved sequence number """ + data = None + try: + self.lock.acquire() + if seq in self.pending: + data = self.pending[seq] + del self.pending[seq] + finally: + self.lock.release() + return data + + +#=================================================================================================== +# DebugConsole +#=================================================================================================== +class DebugConsole(Console): + """ """ + def brokerConnected(self, broker): + print "brokerConnected:", broker + + def brokerConnectionFailed(self, broker): + print "brokerConnectionFailed:", broker + + def brokerDisconnected(self, broker): + print "brokerDisconnected:", broker + + def newPackage(self, name): + print "newPackage:", name + + def newClass(self, kind, classKey): + print "newClass:", kind, classKey + + def newAgent(self, agent): + print "newAgent:", agent + + def delAgent(self, agent): + print "delAgent:", agent + + def objectProps(self, broker, record): + print "objectProps:", record + + def objectStats(self, broker, record): + print "objectStats:", record + + def event(self, broker, event): + print "event:", event + + def heartbeat(self, agent, timestamp): + print "heartbeat:", agent + + def brokerInfo(self, broker): + print "brokerInfo:", broker + |