summaryrefslogtreecommitdiff
path: root/M4-RCs/qpid/python/qmf/console.py
diff options
context:
space:
mode:
Diffstat (limited to 'M4-RCs/qpid/python/qmf/console.py')
-rw-r--r--M4-RCs/qpid/python/qmf/console.py1625
1 files changed, 0 insertions, 1625 deletions
diff --git a/M4-RCs/qpid/python/qmf/console.py b/M4-RCs/qpid/python/qmf/console.py
deleted file mode 100644
index 0009726fe7..0000000000
--- a/M4-RCs/qpid/python/qmf/console.py
+++ /dev/null
@@ -1,1625 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-""" Console API for Qpid Management Framework """
-
-import os
-import qpid
-import struct
-import socket
-import re
-from qpid.peer import Closed
-from qpid.connection import Connection, ConnectionFailed
-from qpid.datatypes import UUID, uuid4, Message, RangedSet
-from qpid.util import connect, ssl, URL
-from qpid.codec010 import StringCodec as Codec
-from threading import Lock, Condition, Thread
-from time import time, strftime, gmtime
-from cStringIO import StringIO
-
-class Console:
- """ To access the asynchronous operations, a class must be derived from
- Console with overrides of any combination of the available methods. """
-
- def brokerConnected(self, broker):
- """ Invoked when a connection is established to a broker """
- pass
-
- def brokerDisconnected(self, broker):
- """ Invoked when the connection to a broker is lost """
- pass
-
- def newPackage(self, name):
- """ Invoked when a QMF package is discovered. """
- pass
-
- def newClass(self, kind, classKey):
- """ Invoked when a new class is discovered. Session.getSchema can be
- used to obtain details about the class."""
- pass
-
- def newAgent(self, agent):
- """ Invoked when a QMF agent is discovered. """
- pass
-
- def delAgent(self, agent):
- """ Invoked when a QMF agent disconects. """
- pass
-
- def objectProps(self, broker, record):
- """ Invoked when an object is updated. """
- pass
-
- def objectStats(self, broker, record):
- """ Invoked when an object is updated. """
- pass
-
- def event(self, broker, event):
- """ Invoked when an event is raised. """
- pass
-
- def heartbeat(self, agent, timestamp):
- """ """
- pass
-
- def brokerInfo(self, broker):
- """ """
- pass
-
- def methodResponse(self, broker, seq, response):
- """ """
- pass
-
-class BrokerURL(URL):
- def __init__(self, text):
- URL.__init__(self, text)
- socket.gethostbyname(self.host)
- if self.port is None:
- if self.scheme == URL.AMQPS:
- self.port = 5671
- else:
- self.port = 5672
- self.authName = self.user or "guest"
- self.authPass = self.password or "guest"
- self.authMech = "PLAIN"
-
- def name(self):
- return self.host + ":" + str(self.port)
-
- def match(self, host, port):
- return socket.gethostbyname(self.host) == socket.gethostbyname(host) and self.port == port
-
-class Session:
- """
- An instance of the Session class represents a console session running
- against one or more QMF brokers. A single instance of Session is needed
- to interact with the management framework as a console.
- """
- _CONTEXT_SYNC = 1
- _CONTEXT_STARTUP = 2
- _CONTEXT_MULTIGET = 3
-
- GET_WAIT_TIME = 60
-
- def __init__(self, console=None, rcvObjects=True, rcvEvents=True, rcvHeartbeats=True,
- manageConnections=False, userBindings=False):
- """
- Initialize a session. If the console argument is provided, the
- more advanced asynchronous features are available. If console is
- defaulted, the session will operate in a simpler, synchronous manner.
-
- The rcvObjects, rcvEvents, and rcvHeartbeats arguments are meaningful only if 'console'
- is provided. They control whether object updates, events, and agent-heartbeats are
- subscribed to. If the console is not interested in receiving one or more of the above,
- setting the argument to False will reduce tha bandwidth used by the API.
-
- If manageConnections is set to True, the Session object will manage connections to
- the brokers. This means that if a broker is unreachable, it will retry until a connection
- can be established. If a connection is lost, the Session will attempt to reconnect.
-
- If manageConnections is set to False, the user is responsible for handing failures. In
- this case, an unreachable broker will cause addBroker to raise an exception.
-
- If userBindings is set to False (the default) and rcvObjects is True, the console will
- receive data for all object classes. If userBindings is set to True, the user must select
- which classes the console shall receive by invoking the bindPackage or bindClass methods.
- This allows the console to be configured to receive only information that is relavant to
- a particular application. If rcvObjects id False, userBindings has no meaning.
- """
- self.console = console
- self.brokers = []
- self.packages = {}
- self.seqMgr = SequenceManager()
- self.cv = Condition()
- self.syncSequenceList = []
- self.getResult = []
- self.getSelect = []
- self.error = None
- self.rcvObjects = rcvObjects
- self.rcvEvents = rcvEvents
- self.rcvHeartbeats = rcvHeartbeats
- self.userBindings = userBindings
- if self.console == None:
- self.rcvObjects = False
- self.rcvEvents = False
- self.rcvHeartbeats = False
- self.bindingKeyList = self._bindingKeys()
- self.manageConnections = manageConnections
-
- if self.userBindings and not self.rcvObjects:
- raise Exception("userBindings can't be set unless rcvObjects is set and a console is provided")
-
- def __repr__(self):
- return "QMF Console Session Manager (brokers: %d)" % len(self.brokers)
-
- def addBroker(self, target="localhost"):
- """ Connect to a Qpid broker. Returns an object of type Broker. """
- url = BrokerURL(target)
- broker = Broker(self, url.host, url.port, url.authMech, url.authName, url.authPass,
- ssl = url.scheme == URL.AMQPS)
-
- self.brokers.append(broker)
- if not self.manageConnections:
- self.getObjects(broker=broker, _class="agent")
- return broker
-
- def delBroker(self, broker):
- """ Disconnect from a broker. The 'broker' argument is the object
- returned from the addBroker call """
- broker._shutdown()
- self.brokers.remove(broker)
- del broker
-
- def getPackages(self):
- """ Get the list of known QMF packages """
- for broker in self.brokers:
- broker._waitForStable()
- list = []
- for package in self.packages:
- list.append(package)
- return list
-
- def getClasses(self, packageName):
- """ Get the list of known classes within a QMF package """
- for broker in self.brokers:
- broker._waitForStable()
- list = []
- if packageName in self.packages:
- for pkey in self.packages[packageName]:
- list.append(self.packages[packageName][pkey].getKey())
- return list
-
- def getSchema(self, classKey):
- """ Get the schema for a QMF class """
- for broker in self.brokers:
- broker._waitForStable()
- pname = classKey.getPackageName()
- pkey = classKey.getPackageKey()
- if pname in self.packages:
- if pkey in self.packages[pname]:
- return self.packages[pname][pkey]
-
- def bindPackage(self, packageName):
- """ Request object updates for all table classes within a package. """
- if not self.userBindings or not self.rcvObjects:
- raise Exception("userBindings option not set for Session")
- key = "console.obj.*.*.%s.#" % packageName
- self.bindingKeyList.append(key)
- for broker in self.brokers:
- if broker.isConnected():
- broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName,
- binding_key=key)
-
- def bindClass(self, pname, cname):
- """ Request object updates for a particular table class by package and class name. """
- if not self.userBindings or not self.rcvObjects:
- raise Exception("userBindings option not set for Session")
- key = "console.obj.*.*.%s.%s.#" % (pname, cname)
- self.bindingKeyList.append(key)
- for broker in self.brokers:
- if broker.isConnected():
- broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName,
- binding_key=key)
-
- def bindClassKey(self, classKey):
- """ Request object updates for a particular table class by class key. """
- pname = classKey.getPackageName()
- cname = classKey.getClassName()
- self.bindClass(pname, cname)
-
- def getAgents(self, broker=None):
- """ Get a list of currently known agents """
- brokerList = []
- if broker == None:
- for b in self.brokers:
- brokerList.append(b)
- else:
- brokerList.append(broker)
-
- for b in brokerList:
- b._waitForStable()
- agentList = []
- for b in brokerList:
- for a in b.getAgents():
- agentList.append(a)
- return agentList
-
- def getObjects(self, **kwargs):
- """ Get a list of objects from QMF agents.
- All arguments are passed by name(keyword).
-
- The class for queried objects may be specified in one of the following ways:
-
- _schema = <schema> - supply a schema object returned from getSchema.
- _key = <key> - supply a classKey from the list returned by getClasses.
- _class = <name> - supply a class name as a string. If the class name exists
- in multiple packages, a _package argument may also be supplied.
- _objectId = <id> - get the object referenced by the object-id
-
- If objects should be obtained from only one agent, use the following argument.
- Otherwise, the query will go to all agents.
-
- _agent = <agent> - supply an agent from the list returned by getAgents.
-
- If the get query is to be restricted to one broker (as opposed to all connected brokers),
- add the following argument:
-
- _broker = <broker> - supply a broker as returned by addBroker.
-
- If additional arguments are supplied, they are used as property selectors. For example,
- if the argument name="test" is supplied, only objects whose "name" property is "test"
- will be returned in the result.
- """
- if "_broker" in kwargs:
- brokerList = []
- brokerList.append(kwargs["_broker"])
- else:
- brokerList = self.brokers
- for broker in brokerList:
- broker._waitForStable()
-
- agentList = []
- if "_agent" in kwargs:
- agent = kwargs["_agent"]
- if agent.broker not in brokerList:
- raise Exception("Supplied agent is not accessible through the supplied broker")
- if agent.broker.isConnected():
- agentList.append(agent)
- else:
- if "_objectId" in kwargs:
- oid = kwargs["_objectId"]
- for broker in brokerList:
- for agent in broker.getAgents():
- if agent.getBrokerBank() == oid.getBrokerBank() and agent.getAgentBank() == oid.getAgentBank():
- agentList.append(agent)
- else:
- for broker in brokerList:
- for agent in broker.getAgents():
- if agent.broker.isConnected():
- agentList.append(agent)
-
- if len(agentList) == 0:
- return []
-
- pname = None
- cname = None
- hash = None
- classKey = None
- if "_schema" in kwargs: classKey = kwargs["_schema"].getKey()
- elif "_key" in kwargs: classKey = kwargs["_key"]
- elif "_class" in kwargs:
- cname = kwargs["_class"]
- if "_package" in kwargs:
- pname = kwargs["_package"]
- if cname == None and classKey == None and "_objectId" not in kwargs:
- raise Exception("No class supplied, use '_schema', '_key', '_class', or '_objectId' argument")
-
- map = {}
- self.getSelect = []
- if "_objectId" in kwargs:
- map["_objectid"] = kwargs["_objectId"].__repr__()
- else:
- if cname == None:
- cname = classKey.getClassName()
- pname = classKey.getPackageName()
- hash = classKey.getHash()
- map["_class"] = cname
- if pname != None: map["_package"] = pname
- if hash != None: map["_hash"] = hash
- for item in kwargs:
- if item[0] != '_':
- self.getSelect.append((item, kwargs[item]))
-
- self.getResult = []
- for agent in agentList:
- broker = agent.broker
- sendCodec = Codec(broker.conn.spec)
- try:
- self.cv.acquire()
- seq = self.seqMgr._reserve(self._CONTEXT_MULTIGET)
- self.syncSequenceList.append(seq)
- finally:
- self.cv.release()
- broker._setHeader(sendCodec, 'G', seq)
- sendCodec.write_map(map)
- smsg = broker._message(sendCodec.encoded, "agent.%d.%d" % (agent.brokerBank, agent.agentBank))
- broker._send(smsg)
-
- starttime = time()
- timeout = False
- try:
- self.cv.acquire()
- while len(self.syncSequenceList) > 0 and self.error == None:
- self.cv.wait(self.GET_WAIT_TIME)
- if time() - starttime > self.GET_WAIT_TIME:
- for pendingSeq in self.syncSequenceList:
- self.seqMgr._release(pendingSeq)
- self.syncSequenceList = []
- timeout = True
- finally:
- self.cv.release()
-
- if self.error:
- errorText = self.error
- self.error = None
- raise Exception(errorText)
-
- if len(self.getResult) == 0 and timeout:
- raise RuntimeError("No agent responded within timeout period")
- return self.getResult
-
- def setEventFilter(self, **kwargs):
- """ """
- pass
-
- def _bindingKeys(self):
- keyList = []
- keyList.append("schema.#")
- if self.rcvObjects and self.rcvEvents and self.rcvHeartbeats and not self.userBindings:
- keyList.append("console.#")
- else:
- if self.rcvObjects and not self.userBindings:
- keyList.append("console.obj.#")
- else:
- keyList.append("console.obj.*.*.org.apache.qpid.broker.agent")
- if self.rcvEvents:
- keyList.append("console.event.#")
- if self.rcvHeartbeats:
- keyList.append("console.heartbeat.#")
- return keyList
-
- def _handleBrokerConnect(self, broker):
- if self.console:
- self.console.brokerConnected(broker)
-
- def _handleBrokerDisconnect(self, broker):
- if self.console:
- self.console.brokerDisconnected(broker)
-
- def _handleBrokerResp(self, broker, codec, seq):
- broker.brokerId = UUID(codec.read_uuid())
- if self.console != None:
- self.console.brokerInfo(broker)
-
- # Send a package request
- # (effectively inc and dec outstanding by not doing anything)
- sendCodec = Codec(broker.conn.spec)
- seq = self.seqMgr._reserve(self._CONTEXT_STARTUP)
- broker._setHeader(sendCodec, 'P', seq)
- smsg = broker._message(sendCodec.encoded)
- broker._send(smsg)
-
- def _handlePackageInd(self, broker, codec, seq):
- pname = str(codec.read_str8())
- notify = False
- try:
- self.cv.acquire()
- if pname not in self.packages:
- self.packages[pname] = {}
- notify = True
- finally:
- self.cv.release()
- if notify and self.console != None:
- self.console.newPackage(pname)
-
- # Send a class request
- broker._incOutstanding()
- sendCodec = Codec(broker.conn.spec)
- seq = self.seqMgr._reserve(self._CONTEXT_STARTUP)
- broker._setHeader(sendCodec, 'Q', seq)
- sendCodec.write_str8(pname)
- smsg = broker._message(sendCodec.encoded)
- broker._send(smsg)
-
- def _handleCommandComplete(self, broker, codec, seq):
- code = codec.read_uint32()
- text = codec.read_str8()
- context = self.seqMgr._release(seq)
- if context == self._CONTEXT_STARTUP:
- broker._decOutstanding()
- elif context == self._CONTEXT_SYNC and seq == broker.syncSequence:
- try:
- broker.cv.acquire()
- broker.syncInFlight = False
- broker.cv.notify()
- finally:
- broker.cv.release()
- elif context == self._CONTEXT_MULTIGET and seq in self.syncSequenceList:
- try:
- self.cv.acquire()
- self.syncSequenceList.remove(seq)
- if len(self.syncSequenceList) == 0:
- self.cv.notify()
- finally:
- self.cv.release()
-
- def _handleClassInd(self, broker, codec, seq):
- kind = codec.read_uint8()
- classKey = ClassKey(codec)
- unknown = False
-
- try:
- self.cv.acquire()
- if classKey.getPackageName() in self.packages:
- if classKey.getPackageKey() not in self.packages[classKey.getPackageName()]:
- unknown = True
- finally:
- self.cv.release()
-
- if unknown:
- # Send a schema request for the unknown class
- broker._incOutstanding()
- sendCodec = Codec(broker.conn.spec)
- seq = self.seqMgr._reserve(self._CONTEXT_STARTUP)
- broker._setHeader(sendCodec, 'S', seq)
- classKey.encode(sendCodec)
- smsg = broker._message(sendCodec.encoded)
- broker._send(smsg)
-
- def _handleMethodResp(self, broker, codec, seq):
- code = codec.read_uint32()
- text = codec.read_str16()
- outArgs = {}
- method, synchronous = self.seqMgr._release(seq)
- if code == 0:
- for arg in method.arguments:
- if arg.dir.find("O") != -1:
- outArgs[arg.name] = self._decodeValue(codec, arg.type)
- result = MethodResult(code, text, outArgs)
- if synchronous:
- try:
- broker.cv.acquire()
- broker.syncResult = result
- broker.syncInFlight = False
- broker.cv.notify()
- finally:
- broker.cv.release()
- else:
- if self.console:
- self.console.methodResponse(broker, seq, result)
-
- def _handleHeartbeatInd(self, broker, codec, seq, msg):
- brokerBank = 1
- agentBank = 0
- dp = msg.get("delivery_properties")
- if dp:
- key = dp["routing_key"]
- keyElements = key.split(".")
- if len(keyElements) == 4:
- brokerBank = int(keyElements[2])
- agentBank = int(keyElements[3])
-
- agent = broker.getAgent(brokerBank, agentBank)
- timestamp = codec.read_uint64()
- if self.console != None and agent != None:
- self.console.heartbeat(agent, timestamp)
-
- def _handleEventInd(self, broker, codec, seq):
- if self.console != None:
- event = Event(self, broker, codec)
- self.console.event(broker, event)
-
- def _handleSchemaResp(self, broker, codec, seq):
- kind = codec.read_uint8()
- classKey = ClassKey(codec)
- _class = SchemaClass(kind, classKey, codec)
- try:
- self.cv.acquire()
- self.packages[classKey.getPackageName()][classKey.getPackageKey()] = _class
- finally:
- self.cv.release()
-
- self.seqMgr._release(seq)
- broker._decOutstanding()
- if self.console != None:
- self.console.newClass(kind, classKey)
-
- def _handleContentInd(self, broker, codec, seq, prop=False, stat=False):
- classKey = ClassKey(codec)
- try:
- self.cv.acquire()
- pname = classKey.getPackageName()
- if pname not in self.packages:
- return
- pkey = classKey.getPackageKey()
- if pkey not in self.packages[pname]:
- return
- schema = self.packages[pname][pkey]
- finally:
- self.cv.release()
-
- object = Object(self, broker, schema, codec, prop, stat)
- if pname == "org.apache.qpid.broker" and classKey.getClassName() == "agent" and prop:
- broker._updateAgent(object)
-
- try:
- self.cv.acquire()
- if seq in self.syncSequenceList:
- if object.getTimestamps()[2] == 0 and self._selectMatch(object):
- self.getResult.append(object)
- return
- finally:
- self.cv.release()
-
- if self.console and self.rcvObjects:
- if prop:
- self.console.objectProps(broker, object)
- if stat:
- self.console.objectStats(broker, object)
-
- def _handleError(self, error):
- self.error = error
- try:
- self.cv.acquire()
- self.syncSequenceList = []
- self.cv.notify()
- finally:
- self.cv.release()
-
- def _selectMatch(self, object):
- """ Check the object against self.getSelect to check for a match """
- for key, value in self.getSelect:
- for prop, propval in object.getProperties():
- if key == prop.name and value != propval:
- return False
- return True
-
- def _decodeValue(self, codec, typecode):
- """ Decode, from the codec, a value based on its typecode. """
- if typecode == 1: data = codec.read_uint8() # U8
- elif typecode == 2: data = codec.read_uint16() # U16
- elif typecode == 3: data = codec.read_uint32() # U32
- elif typecode == 4: data = codec.read_uint64() # U64
- elif typecode == 6: data = codec.read_str8() # SSTR
- elif typecode == 7: data = codec.read_str16() # LSTR
- elif typecode == 8: data = codec.read_int64() # ABSTIME
- elif typecode == 9: data = codec.read_uint64() # DELTATIME
- elif typecode == 10: data = ObjectId(codec) # REF
- elif typecode == 11: data = codec.read_uint8() != 0 # BOOL
- elif typecode == 12: data = codec.read_float() # FLOAT
- elif typecode == 13: data = codec.read_double() # DOUBLE
- elif typecode == 14: data = UUID(codec.read_uuid()) # UUID
- elif typecode == 15: data = codec.read_map() # FTABLE
- elif typecode == 16: data = codec.read_int8() # S8
- elif typecode == 17: data = codec.read_int16() # S16
- elif typecode == 18: data = codec.read_int32() # S32
- elif typecode == 19: data = codec.read_int64() # S63
- else:
- raise ValueError("Invalid type code: %d" % typecode)
- return data
-
- def _encodeValue(self, codec, value, typecode):
- """ Encode, into the codec, a value based on its typecode. """
- if typecode == 1: codec.write_uint8 (int(value)) # U8
- elif typecode == 2: codec.write_uint16 (int(value)) # U16
- elif typecode == 3: codec.write_uint32 (long(value)) # U32
- elif typecode == 4: codec.write_uint64 (long(value)) # U64
- elif typecode == 6: codec.write_str8 (value) # SSTR
- elif typecode == 7: codec.write_str16 (value) # LSTR
- elif typecode == 8: codec.write_int64 (long(value)) # ABSTIME
- elif typecode == 9: codec.write_uint64 (long(value)) # DELTATIME
- elif typecode == 10: value.encode (codec) # REF
- elif typecode == 11: codec.write_uint8 (int(value)) # BOOL
- elif typecode == 12: codec.write_float (float(value)) # FLOAT
- elif typecode == 13: codec.write_double (float(value)) # DOUBLE
- elif typecode == 14: codec.write_uuid (value.bytes) # UUID
- elif typecode == 15: codec.write_map (value) # FTABLE
- elif typecode == 16: codec.write_int8 (int(value)) # S8
- elif typecode == 17: codec.write_int16 (int(value)) # S16
- elif typecode == 18: codec.write_int32 (int(value)) # S32
- elif typecode == 19: codec.write_int64 (int(value)) # S64
- else:
- raise ValueError ("Invalid type code: %d" % typecode)
-
- def _displayValue(self, value, typecode):
- """ """
- if typecode == 1: return unicode(value)
- elif typecode == 2: return unicode(value)
- elif typecode == 3: return unicode(value)
- elif typecode == 4: return unicode(value)
- elif typecode == 6: return value
- elif typecode == 7: return value
- elif typecode == 8: return unicode(strftime("%c", gmtime(value / 1000000000)))
- elif typecode == 9: return unicode(value)
- elif typecode == 10: return unicode(value.__repr__())
- elif typecode == 11:
- if value: return u"T"
- else: return u"F"
- elif typecode == 12: return unicode(value)
- elif typecode == 13: return unicode(value)
- elif typecode == 14: return unicode(value.__repr__())
- elif typecode == 15: return unicode(value.__repr__())
- elif typecode == 16: return unicode(value)
- elif typecode == 17: return unicode(value)
- elif typecode == 18: return unicode(value)
- elif typecode == 19: return unicode(value)
- else:
- raise ValueError ("Invalid type code: %d" % typecode)
-
- def _sendMethodRequest(self, broker, schemaKey, objectId, name, argList):
- """ This function can be used to send a method request to an object given only the
- broker, schemaKey, and objectId. This is an uncommon usage pattern as methods are
- normally invoked on the object itself.
- """
- schema = self.getSchema(schemaKey)
- for method in schema.getMethods():
- if name == method.name:
- aIdx = 0
- sendCodec = Codec(broker.conn.spec)
- seq = self.seqMgr._reserve((method, False))
- broker._setHeader(sendCodec, 'M', seq)
- objectId.encode(sendCodec)
- schemaKey.encode(sendCodec)
- sendCodec.write_str8(name)
-
- count = 0
- for arg in method.arguments:
- if arg.dir.find("I") != -1:
- count += 1
- if count != len(argList):
- raise Exception("Incorrect number of arguments: expected %d, got %d" % (count, len(argList)))
-
- for arg in method.arguments:
- if arg.dir.find("I") != -1:
- self._encodeValue(sendCodec, argList[aIdx], arg.type)
- aIdx += 1
- smsg = broker._message(sendCodec.encoded, "agent.%d.%d" %
- (objectId.getBrokerBank(), objectId.getAgentBank()))
- broker._send(smsg)
- return seq
- return None
-
-class Package:
- """ """
- def __init__(self, name):
- self.name = name
-
-class ClassKey:
- """ A ClassKey uniquely identifies a class from the schema. """
- def __init__(self, constructor):
- if type(constructor) == str:
- # construct from __repr__ string
- try:
- self.pname, cls = constructor.split(":")
- self.cname, hsh = cls.split("(")
- hsh = hsh.strip(")")
- hexValues = hsh.split("-")
- h0 = int(hexValues[0], 16)
- h1 = int(hexValues[1], 16)
- h2 = int(hexValues[2], 16)
- h3 = int(hexValues[3], 16)
- self.hash = struct.pack("!LLLL", h0, h1, h2, h3)
- except:
- raise Exception("Invalid ClassKey format")
- else:
- # construct from codec
- codec = constructor
- self.pname = str(codec.read_str8())
- self.cname = str(codec.read_str8())
- self.hash = codec.read_bin128()
-
- def encode(self, codec):
- codec.write_str8(self.pname)
- codec.write_str8(self.cname)
- codec.write_bin128(self.hash)
-
- def getPackageName(self):
- return self.pname
-
- def getClassName(self):
- return self.cname
-
- def getHash(self):
- return self.hash
-
- def getHashString(self):
- return "%08x-%08x-%08x-%08x" % struct.unpack ("!LLLL", self.hash)
-
- def getPackageKey(self):
- return (self.cname, self.hash)
-
- def __repr__(self):
- return self.pname + ":" + self.cname + "(" + self.getHashString() + ")"
-
-class SchemaClass:
- """ """
- CLASS_KIND_TABLE = 1
- CLASS_KIND_EVENT = 2
-
- def __init__(self, kind, key, codec):
- self.kind = kind
- self.classKey = key
- self.properties = []
- self.statistics = []
- self.methods = []
- self.arguments = []
-
- if self.kind == self.CLASS_KIND_TABLE:
- propCount = codec.read_uint16()
- statCount = codec.read_uint16()
- methodCount = codec.read_uint16()
- for idx in range(propCount):
- self.properties.append(SchemaProperty(codec))
- for idx in range(statCount):
- self.statistics.append(SchemaStatistic(codec))
- for idx in range(methodCount):
- self.methods.append(SchemaMethod(codec))
-
- elif self.kind == self.CLASS_KIND_EVENT:
- argCount = codec.read_uint16()
- for idx in range(argCount):
- self.arguments.append(SchemaArgument(codec, methodArg=False))
-
- def __repr__(self):
- if self.kind == self.CLASS_KIND_TABLE:
- kindStr = "Table"
- elif self.kind == self.CLASS_KIND_EVENT:
- kindStr = "Event"
- else:
- kindStr = "Unsupported"
- result = "%s Class: %s " % (kindStr, self.classKey.__repr__())
- return result
-
- def getKey(self):
- """ Return the class-key for this class. """
- return self.classKey
-
- def getProperties(self):
- """ Return the list of properties for the class. """
- return self.properties
-
- def getStatistics(self):
- """ Return the list of statistics for the class. """
- return self.statistics
-
- def getMethods(self):
- """ Return the list of methods for the class. """
- return self.methods
-
- def getArguments(self):
- """ Return the list of events for the class. """
- return self.arguments
-
-class SchemaProperty:
- """ """
- def __init__(self, codec):
- map = codec.read_map()
- self.name = str(map["name"])
- self.type = map["type"]
- self.access = str(map["access"])
- self.index = map["index"] != 0
- self.optional = map["optional"] != 0
- self.unit = None
- self.min = None
- self.max = None
- self.maxlen = None
- self.desc = None
-
- for key, value in map.items():
- if key == "unit" : self.unit = value
- elif key == "min" : self.min = value
- elif key == "max" : self.max = value
- elif key == "maxlen" : self.maxlen = value
- elif key == "desc" : self.desc = value
-
- def __repr__(self):
- return self.name
-
-class SchemaStatistic:
- """ """
- def __init__(self, codec):
- map = codec.read_map()
- self.name = str(map["name"])
- self.type = map["type"]
- self.unit = None
- self.desc = None
-
- for key, value in map.items():
- if key == "unit" : self.unit = value
- elif key == "desc" : self.desc = value
-
- def __repr__(self):
- return self.name
-
-class SchemaMethod:
- """ """
- def __init__(self, codec):
- map = codec.read_map()
- self.name = str(map["name"])
- argCount = map["argCount"]
- if "desc" in map:
- self.desc = map["desc"]
- else:
- self.desc = None
- self.arguments = []
-
- for idx in range(argCount):
- self.arguments.append(SchemaArgument(codec, methodArg=True))
-
- def __repr__(self):
- result = self.name + "("
- first = True
- for arg in self.arguments:
- if arg.dir.find("I") != -1:
- if first:
- first = False
- else:
- result += ", "
- result += arg.name
- result += ")"
- return result
-
-class SchemaArgument:
- """ """
- def __init__(self, codec, methodArg):
- map = codec.read_map()
- self.name = str(map["name"])
- self.type = map["type"]
- if methodArg:
- self.dir = str(map["dir"]).upper()
- self.unit = None
- self.min = None
- self.max = None
- self.maxlen = None
- self.desc = None
- self.default = None
-
- for key, value in map.items():
- if key == "unit" : self.unit = value
- elif key == "min" : self.min = value
- elif key == "max" : self.max = value
- elif key == "maxlen" : self.maxlen = value
- elif key == "desc" : self.desc = value
- elif key == "default" : self.default = value
-
-class ObjectId:
- """ Object that represents QMF object identifiers """
- def __init__(self, codec, first=0, second=0):
- if codec:
- self.first = codec.read_uint64()
- self.second = codec.read_uint64()
- else:
- self.first = first
- self.second = second
-
- def __cmp__(self, other):
- if other == None or not isinstance(other, ObjectId) :
- return 1
- if self.first < other.first:
- return -1
- if self.first > other.first:
- return 1
- if self.second < other.second:
- return -1
- if self.second > other.second:
- return 1
- return 0
-
- def __repr__(self):
- return "%d-%d-%d-%d-%d" % (self.getFlags(), self.getSequence(),
- self.getBrokerBank(), self.getAgentBank(), self.getObject())
-
- def index(self):
- return (self.first, self.second)
-
- def getFlags(self):
- return (self.first & 0xF000000000000000) >> 60
-
- def getSequence(self):
- return (self.first & 0x0FFF000000000000) >> 48
-
- def getBrokerBank(self):
- return (self.first & 0x0000FFFFF0000000) >> 28
-
- def getAgentBank(self):
- return self.first & 0x000000000FFFFFFF
-
- def getObject(self):
- return self.second
-
- def isDurable(self):
- return self.getSequence() == 0
-
- def encode(self, codec):
- codec.write_uint64(self.first)
- codec.write_uint64(self.second)
-
- def __hash__(self):
- return (self.first, self.second).__hash__()
-
- def __eq__(self, other):
- return (self.first, self.second).__eq__(other)
-
-class Object(object):
- """ """
- def __init__(self, session, broker, schema, codec, prop, stat):
- """ """
- self._session = session
- self._broker = broker
- self._schema = schema
- self._currentTime = codec.read_uint64()
- self._createTime = codec.read_uint64()
- self._deleteTime = codec.read_uint64()
- self._objectId = ObjectId(codec)
- self._properties = []
- self._statistics = []
- if prop:
- notPresent = self._parsePresenceMasks(codec, schema)
- for property in schema.getProperties():
- if property.name in notPresent:
- self._properties.append((property, None))
- else:
- self._properties.append((property, self._session._decodeValue(codec, property.type)))
- if stat:
- for statistic in schema.getStatistics():
- self._statistics.append((statistic, self._session._decodeValue(codec, statistic.type)))
-
- def getBroker(self):
- """ Return the broker from which this object was sent """
- return self._broker
-
- def getObjectId(self):
- """ Return the object identifier for this object """
- return self._objectId
-
- def getClassKey(self):
- """ Return the class-key that references the schema describing this object. """
- return self._schema.getKey()
-
- def getSchema(self):
- """ Return the schema that describes this object. """
- return self._schema
-
- def getMethods(self):
- """ Return a list of methods available for this object. """
- return self._schema.getMethods()
-
- def getTimestamps(self):
- """ Return the current, creation, and deletion times for this object. """
- return self._currentTime, self._createTime, self._deleteTime
-
- def getIndex(self):
- """ Return a string describing this object's primary key. """
- result = u""
- for property, value in self._properties:
- if property.index:
- if result != u"":
- result += u":"
- try:
- valstr = unicode(self._session._displayValue(value, property.type))
- except:
- valstr = u"<undecodable>"
- result += valstr
- return result
-
- def getProperties(self):
- return self._properties
-
- def getStatistics(self):
- return self._statistics
-
- def mergeUpdate(self, newer):
- """ Replace properties and/or statistics with a newly received update """
- if self._objectId != newer._objectId:
- raise Exception("Objects with different object-ids")
- if len(newer.getProperties()) > 0:
- self.properties = newer.getProperties()
- if len(newer.getStatistics()) > 0:
- self.statistics = newer.getStatistics()
-
- def __repr__(self):
- key = self.getClassKey()
- return key.getPackageName() + ":" + key.getClassName() +\
- "[" + self.getObjectId().__repr__() + "] " + self.getIndex().encode("utf8")
-
- def __getattr__(self, name):
- for method in self._schema.getMethods():
- if name == method.name:
- return lambda *args, **kwargs : self._invoke(name, args, kwargs)
- for property, value in self._properties:
- if name == property.name:
- return value
- if name == "_" + property.name + "_" and property.type == 10: # Dereference references
- deref = self._session.getObjects(_objectId=value, _broker=self._broker)
- if len(deref) != 1:
- return None
- else:
- return deref[0]
- for statistic, value in self._statistics:
- if name == statistic.name:
- return value
- raise Exception("Type Object has no attribute '%s'" % name)
-
- def _sendMethodRequest(self, name, args, kwargs, synchronous=False):
- for method in self._schema.getMethods():
- if name == method.name:
- aIdx = 0
- sendCodec = Codec(self._broker.conn.spec)
- seq = self._session.seqMgr._reserve((method, synchronous))
- self._broker._setHeader(sendCodec, 'M', seq)
- self._objectId.encode(sendCodec)
- self._schema.getKey().encode(sendCodec)
- sendCodec.write_str8(name)
-
- count = 0
- for arg in method.arguments:
- if arg.dir.find("I") != -1:
- count += 1
- if count != len(args):
- raise Exception("Incorrect number of arguments: expected %d, got %d" % (count, len(args)))
-
- for arg in method.arguments:
- if arg.dir.find("I") != -1:
- self._session._encodeValue(sendCodec, args[aIdx], arg.type)
- aIdx += 1
- smsg = self._broker._message(sendCodec.encoded, "agent.%d.%d" %
- (self._objectId.getBrokerBank(), self._objectId.getAgentBank()))
- if synchronous:
- try:
- self._broker.cv.acquire()
- self._broker.syncInFlight = True
- finally:
- self._broker.cv.release()
- self._broker._send(smsg)
- return seq
- return None
-
- def _invoke(self, name, args, kwargs):
- if self._sendMethodRequest(name, args, kwargs, True):
- try:
- self._broker.cv.acquire()
- starttime = time()
- while self._broker.syncInFlight and self._broker.error == None:
- self._broker.cv.wait(self._broker.SYNC_TIME)
- if time() - starttime > self._broker.SYNC_TIME:
- self._session.seqMgr._release(seq)
- raise RuntimeError("Timed out waiting for method to respond")
- finally:
- self._broker.cv.release()
- if self._broker.error != None:
- errorText = self._broker.error
- self._broker.error = None
- raise Exception(errorText)
- return self._broker.syncResult
- raise Exception("Invalid Method (software defect) [%s]" % name)
-
- def _parsePresenceMasks(self, codec, schema):
- excludeList = []
- bit = 0
- for property in schema.getProperties():
- if property.optional:
- if bit == 0:
- mask = codec.read_uint8()
- bit = 1
- if (mask & bit) == 0:
- excludeList.append(property.name)
- bit *= 2
- if bit == 256:
- bit = 0
- return excludeList
-
-class MethodResult(object):
- """ """
- def __init__(self, status, text, outArgs):
- """ """
- self.status = status
- self.text = text
- self.outArgs = outArgs
-
- def __getattr__(self, name):
- if name in self.outArgs:
- return self.outArgs[name]
-
- def __repr__(self):
- return "%s (%d) - %s" % (self.text, self.status, self.outArgs)
-
-class ManagedConnection(Thread):
- """ Thread class for managing a connection. """
- DELAY_MIN = 1
- DELAY_MAX = 128
- DELAY_FACTOR = 2
-
- def __init__(self, broker):
- Thread.__init__(self)
- self.broker = broker
- self.cv = Condition()
- self.canceled = False
-
- def stop(self):
- """ Tell this thread to stop running and return. """
- try:
- self.cv.acquire()
- self.canceled = True
- self.cv.notify()
- finally:
- self.cv.release()
-
- def disconnected(self):
- """ Notify the thread that the connection was lost. """
- try:
- self.cv.acquire()
- self.cv.notify()
- finally:
- self.cv.release()
-
- def run(self):
- """ Main body of the running thread. """
- delay = self.DELAY_MIN
- while True:
- try:
- self.broker._tryToConnect()
- try:
- self.cv.acquire()
- while (not self.canceled) and self.broker.connected:
- self.cv.wait()
- if self.canceled:
- return
- delay = self.DELAY_MIN
- finally:
- self.cv.release()
- except socket.error:
- if delay < self.DELAY_MAX:
- delay *= self.DELAY_FACTOR
- except SessionDetached:
- if delay < self.DELAY_MAX:
- delay *= self.DELAY_FACTOR
- except Closed:
- if delay < self.DELAY_MAX:
- delay *= self.DELAY_FACTOR
-
- try:
- self.cv.acquire()
- self.cv.wait(delay)
- if self.canceled:
- return
- finally:
- self.cv.release()
-
-class Broker:
- """ This object represents a connection (or potential connection) to a QMF broker. """
- SYNC_TIME = 60
-
- def __init__(self, session, host, port, authMech, authUser, authPass, ssl=False):
- self.session = session
- self.host = host
- self.port = port
- self.ssl = ssl
- self.authUser = authUser
- self.authPass = authPass
- self.cv = Condition()
- self.error = None
- self.brokerId = None
- self.connected = False
- self.amqpSessionId = "%s.%d" % (os.uname()[1], os.getpid())
- if self.session.manageConnections:
- self.thread = ManagedConnection(self)
- self.thread.start()
- else:
- self.thread = None
- self._tryToConnect()
-
- def isConnected(self):
- """ Return True if there is an active connection to the broker. """
- return self.connected
-
- def getError(self):
- """ Return the last error message seen while trying to connect to the broker. """
- return self.error
-
- def getBrokerId(self):
- """ Get broker's unique identifier (UUID) """
- return self.brokerId
-
- def getBrokerBank(self):
- """ Return the broker-bank value. This is the value that the broker assigns to
- objects within its control. This value appears as a field in the ObjectId
- of objects created by agents controlled by this broker. """
- return 1
-
- def getAgent(self, brokerBank, agentBank):
- """ Return the agent object associated with a particular broker and agent bank value."""
- bankKey = (brokerBank, agentBank)
- if bankKey in self.agents:
- return self.agents[bankKey]
- return None
-
- def getSessionId(self):
- """ Get the identifier of the AMQP session to the broker """
- return self.amqpSessionId
-
- def getAgents(self):
- """ Get the list of agents reachable via this broker """
- return self.agents.values()
-
- def getAmqpSession(self):
- """ Get the AMQP session object for this connected broker. """
- return self.amqpSession
-
- def getUrl(self):
- """ """
- return "%s:%d" % (self.host, self.port)
-
- def getFullUrl(self, noAuthIfGuestDefault=True):
- """ """
- ssl = ""
- if self.ssl:
- ssl = "s"
- auth = "%s/%s@" % (self.authUser, self.authPass)
- if self.authUser == "" or \
- (noAuthIfGuestDefault and self.authUser == "guest" and self.authPass == "guest"):
- auth = ""
- return "amqp%s://%s%s:%d" % (ssl, auth, self.host, self.port or 5672)
-
- def __repr__(self):
- if self.connected:
- return "Broker connected at: %s" % self.getUrl()
- else:
- return "Disconnected Broker"
-
- def _tryToConnect(self):
- try:
- self.agents = {}
- self.agents[(1,0)] = Agent(self, 0, "BrokerAgent")
- self.topicBound = False
- self.syncInFlight = False
- self.syncRequest = 0
- self.syncResult = None
- self.reqsOutstanding = 1
-
- sock = connect(self.host, self.port)
- if self.ssl:
- sock = ssl(sock)
- self.conn = Connection(sock, username=self.authUser, password=self.authPass)
- self.conn.start()
- self.replyName = "reply-%s" % self.amqpSessionId
- self.amqpSession = self.conn.session(self.amqpSessionId)
- self.amqpSession.auto_sync = True
- self.amqpSession.queue_declare(queue=self.replyName, exclusive=True, auto_delete=True)
- self.amqpSession.exchange_bind(exchange="amq.direct",
- queue=self.replyName, binding_key=self.replyName)
- self.amqpSession.message_subscribe(queue=self.replyName, destination="rdest",
- accept_mode=self.amqpSession.accept_mode.none,
- acquire_mode=self.amqpSession.acquire_mode.pre_acquired)
- self.amqpSession.incoming("rdest").listen(self._replyCb, self._exceptionCb)
- self.amqpSession.message_set_flow_mode(destination="rdest", flow_mode=1)
- self.amqpSession.message_flow(destination="rdest", unit=0, value=0xFFFFFFFF)
- self.amqpSession.message_flow(destination="rdest", unit=1, value=0xFFFFFFFF)
-
- self.topicName = "topic-%s" % self.amqpSessionId
- self.amqpSession.queue_declare(queue=self.topicName, exclusive=True, auto_delete=True)
- self.amqpSession.message_subscribe(queue=self.topicName, destination="tdest",
- accept_mode=self.amqpSession.accept_mode.none,
- acquire_mode=self.amqpSession.acquire_mode.pre_acquired)
- self.amqpSession.incoming("tdest").listen(self._replyCb)
- self.amqpSession.message_set_flow_mode(destination="tdest", flow_mode=1)
- self.amqpSession.message_flow(destination="tdest", unit=0, value=0xFFFFFFFF)
- self.amqpSession.message_flow(destination="tdest", unit=1, value=0xFFFFFFFF)
-
- self.connected = True
- self.session._handleBrokerConnect(self)
-
- codec = Codec(self.conn.spec)
- self._setHeader(codec, 'B')
- msg = self._message(codec.encoded)
- self._send(msg)
-
- except socket.error, e:
- self.error = "Socket Error %s - %s" % (e[0], e[1])
- raise
- except Closed, e:
- self.error = "Connect Failed %d - %s" % (e[0], e[1])
- raise
- except ConnectionFailed, e:
- self.error = "Connect Failed %d - %s" % (e[0], e[1])
- raise
-
- def _updateAgent(self, obj):
- bankKey = (obj.brokerBank, obj.agentBank)
- if obj._deleteTime == 0:
- if bankKey not in self.agents:
- agent = Agent(self, obj.agentBank, obj.label)
- self.agents[bankKey] = agent
- if self.session.console != None:
- self.session.console.newAgent(agent)
- else:
- agent = self.agents.pop(bankKey, None)
- if agent != None and self.session.console != None:
- self.session.console.delAgent(agent)
-
- def _setHeader(self, codec, opcode, seq=0):
- """ Compose the header of a management message. """
- codec.write_uint8(ord('A'))
- codec.write_uint8(ord('M'))
- codec.write_uint8(ord('2'))
- codec.write_uint8(ord(opcode))
- codec.write_uint32(seq)
-
- def _checkHeader(self, codec):
- """ Check the header of a management message and extract the opcode and class. """
- try:
- octet = chr(codec.read_uint8())
- if octet != 'A':
- return None, None
- octet = chr(codec.read_uint8())
- if octet != 'M':
- return None, None
- octet = chr(codec.read_uint8())
- if octet != '2':
- return None, None
- opcode = chr(codec.read_uint8())
- seq = codec.read_uint32()
- return opcode, seq
- except:
- return None, None
-
- def _message (self, body, routing_key="broker"):
- dp = self.amqpSession.delivery_properties()
- dp.routing_key = routing_key
- mp = self.amqpSession.message_properties()
- mp.content_type = "x-application/qmf"
- mp.reply_to = self.amqpSession.reply_to("amq.direct", self.replyName)
- return Message(dp, mp, body)
-
- def _send(self, msg, dest="qpid.management"):
- self.amqpSession.message_transfer(destination=dest, message=msg)
-
- def _shutdown(self):
- if self.thread:
- self.thread.stop()
- self.thread.join()
- if self.connected:
- self.amqpSession.incoming("rdest").stop()
- if self.session.console != None:
- self.amqpSession.incoming("tdest").stop()
- self.amqpSession.close()
- self.conn.close()
- self.connected = False
-
- def _waitForStable(self):
- try:
- self.cv.acquire()
- if not self.connected:
- return
- if self.reqsOutstanding == 0:
- return
- self.syncInFlight = True
- starttime = time()
- while self.reqsOutstanding != 0:
- self.cv.wait(self.SYNC_TIME)
- if time() - starttime > self.SYNC_TIME:
- raise RuntimeError("Timed out waiting for broker to synchronize")
- finally:
- self.cv.release()
-
- def _incOutstanding(self):
- try:
- self.cv.acquire()
- self.reqsOutstanding += 1
- finally:
- self.cv.release()
-
- def _decOutstanding(self):
- try:
- self.cv.acquire()
- self.reqsOutstanding -= 1
- if self.reqsOutstanding == 0 and not self.topicBound:
- self.topicBound = True
- for key in self.session.bindingKeyList:
- self.amqpSession.exchange_bind(exchange="qpid.management",
- queue=self.topicName, binding_key=key)
- if self.reqsOutstanding == 0 and self.syncInFlight:
- self.syncInFlight = False
- self.cv.notify()
- finally:
- self.cv.release()
-
- def _replyCb(self, msg):
- codec = Codec(self.conn.spec, msg.body)
- while True:
- opcode, seq = self._checkHeader(codec)
- if opcode == None: return
- if opcode == 'b': self.session._handleBrokerResp (self, codec, seq)
- elif opcode == 'p': self.session._handlePackageInd (self, codec, seq)
- elif opcode == 'z': self.session._handleCommandComplete (self, codec, seq)
- elif opcode == 'q': self.session._handleClassInd (self, codec, seq)
- elif opcode == 'm': self.session._handleMethodResp (self, codec, seq)
- elif opcode == 'h': self.session._handleHeartbeatInd (self, codec, seq, msg)
- elif opcode == 'e': self.session._handleEventInd (self, codec, seq)
- elif opcode == 's': self.session._handleSchemaResp (self, codec, seq)
- elif opcode == 'c': self.session._handleContentInd (self, codec, seq, prop=True)
- elif opcode == 'i': self.session._handleContentInd (self, codec, seq, stat=True)
- elif opcode == 'g': self.session._handleContentInd (self, codec, seq, prop=True, stat=True)
-
- def _exceptionCb(self, data):
- self.connected = False
- self.error = data
- try:
- self.cv.acquire()
- if self.syncInFlight:
- self.cv.notify()
- finally:
- self.cv.release()
- self.session._handleError(self.error)
- self.session._handleBrokerDisconnect(self)
- if self.thread:
- self.thread.disconnected()
-
-class Agent:
- """ """
- def __init__(self, broker, agentBank, label):
- self.broker = broker
- self.brokerBank = broker.getBrokerBank()
- self.agentBank = agentBank
- self.label = label
-
- def __repr__(self):
- return "Agent at bank %d.%d (%s)" % (self.brokerBank, self.agentBank, self.label)
-
- def getBroker(self):
- return self.broker
-
- def getBrokerBank(self):
- return self.brokerBank
-
- def getAgentBank(self):
- return self.agentBank
-
-class Event:
- """ """
- def __init__(self, session, broker, codec):
- self.session = session
- self.broker = broker
- self.classKey = ClassKey(codec)
- self.timestamp = codec.read_int64()
- self.severity = codec.read_uint8()
- self.schema = None
- pname = self.classKey.getPackageName()
- pkey = self.classKey.getPackageKey()
- if pname in session.packages:
- if pkey in session.packages[pname]:
- self.schema = session.packages[pname][pkey]
- self.arguments = {}
- for arg in self.schema.arguments:
- self.arguments[arg.name] = session._decodeValue(codec, arg.type)
-
- def __repr__(self):
- if self.schema == None:
- return "<uninterpretable>"
- out = strftime("%c", gmtime(self.timestamp / 1000000000))
- out += " " + self._sevName() + " " + self.classKey.getPackageName() + ":" + self.classKey.getClassName()
- out += " broker=" + self.broker.getUrl()
- for arg in self.schema.arguments:
- disp = self.session._displayValue(self.arguments[arg.name], arg.type).encode("utf8")
- if " " in disp:
- disp = "\"" + disp + "\""
- out += " " + arg.name + "=" + disp
- return out
-
- def _sevName(self):
- if self.severity == 0 : return "EMER "
- if self.severity == 1 : return "ALERT"
- if self.severity == 2 : return "CRIT "
- if self.severity == 3 : return "ERROR"
- if self.severity == 4 : return "WARN "
- if self.severity == 5 : return "NOTIC"
- if self.severity == 6 : return "INFO "
- if self.severity == 7 : return "DEBUG"
- return "INV-%d" % self.severity
-
- def getClassKey(self):
- return self.classKey
-
- def getArguments(self):
- return self.arguments
-
- def getTimestamp(self):
- return self.timestamp
-
- def getName(self):
- return self.name
-
- def getSchema(self):
- return self.schema
-
-class SequenceManager:
- """ Manage sequence numbers for asynchronous method calls """
- def __init__(self):
- self.lock = Lock()
- self.sequence = 0
- self.pending = {}
-
- def _reserve(self, data):
- """ Reserve a unique sequence number """
- try:
- self.lock.acquire()
- result = self.sequence
- self.sequence = self.sequence + 1
- self.pending[result] = data
- finally:
- self.lock.release()
- return result
-
- def _release(self, seq):
- """ Release a reserved sequence number """
- data = None
- try:
- self.lock.acquire()
- if seq in self.pending:
- data = self.pending[seq]
- del self.pending[seq]
- finally:
- self.lock.release()
- return data
-
-
-class DebugConsole(Console):
- """ """
- def brokerConnected(self, broker):
- print "brokerConnected:", broker
-
- def brokerDisconnected(self, broker):
- print "brokerDisconnected:", broker
-
- def newPackage(self, name):
- print "newPackage:", name
-
- def newClass(self, kind, classKey):
- print "newClass:", kind, classKey
-
- def newAgent(self, agent):
- print "newAgent:", agent
-
- def delAgent(self, agent):
- print "delAgent:", agent
-
- def objectProps(self, broker, record):
- print "objectProps:", record
-
- def objectStats(self, broker, record):
- print "objectStats:", record
-
- def event(self, broker, event):
- print "event:", event
-
- def heartbeat(self, agent, timestamp):
- print "heartbeat:", agent
-
- def brokerInfo(self, broker):
- print "brokerInfo:", broker
-