# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # """ Console API for Qpid Management Framework """ import os import platform import qpid import struct import socket import re from qpid.datatypes import UUID from qpid.datatypes import timestamp from qpid.datatypes import datetime from qpid.peer import Closed from qpid.session import SessionDetached from qpid.connection import Connection, ConnectionFailed, Timeout from qpid.datatypes import Message, RangedSet, UUID from qpid.util import connect, ssl, URL from qpid.codec010 import StringCodec as Codec from threading import Lock, Condition, Thread from time import time, strftime, gmtime from cStringIO import StringIO class Console: """ To access the asynchronous operations, a class must be derived from Console with overrides of any combination of the available methods. """ def brokerConnected(self, broker): """ Invoked when a connection is established to a broker """ pass def brokerDisconnected(self, broker): """ Invoked when the connection to a broker is lost """ pass def newPackage(self, name): """ Invoked when a QMF package is discovered. """ pass def newClass(self, kind, classKey): """ Invoked when a new class is discovered. Session.getSchema can be used to obtain details about the class.""" pass def newAgent(self, agent): """ Invoked when a QMF agent is discovered. """ pass def delAgent(self, agent): """ Invoked when a QMF agent disconects. """ pass def objectProps(self, broker, record): """ Invoked when an object is updated. """ pass def objectStats(self, broker, record): """ Invoked when an object is updated. """ pass def event(self, broker, event): """ Invoked when an event is raised. """ pass def heartbeat(self, agent, timestamp): """ Invoked when an agent heartbeat is received. """ pass def brokerInfo(self, broker): """ Invoked when the connection sequence reaches the point where broker information is available. """ pass def methodResponse(self, broker, seq, response): """ Invoked when a method response from an asynchronous method call is received. """ pass class BrokerURL(URL): def __init__(self, text): URL.__init__(self, text) socket.gethostbyname(self.host) if self.port is None: if self.scheme == URL.AMQPS: self.port = 5671 else: self.port = 5672 self.authName = str(self.user or "guest") self.authPass = str(self.password or "guest") self.authMech = "PLAIN" def name(self): return self.host + ":" + str(self.port) def match(self, host, port): return socket.gethostbyname(self.host) == socket.gethostbyname(host) and self.port == port class Object(object): """ This class defines a 'proxy' object representing a real managed object on an agent. Actions taken on this proxy are remotely affected on the real managed object. """ def __init__(self, session, broker, schema, codec, prop, stat, managed=True, kwargs={}): self._session = session self._broker = broker self._schema = schema self._managed = managed if self._managed: self._currentTime = codec.read_uint64() self._createTime = codec.read_uint64() self._deleteTime = codec.read_uint64() self._objectId = ObjectId(codec) else: self._currentTime = None self._createTime = None self._deleteTime = None self._objectId = None self._properties = [] self._statistics = [] if codec: if prop: notPresent = self._parsePresenceMasks(codec, schema) for property in schema.getProperties(): if property.name in notPresent: self._properties.append((property, None)) else: self._properties.append((property, self._session._decodeValue(codec, property.type, broker))) if stat: for statistic in schema.getStatistics(): self._statistics.append((statistic, self._session._decodeValue(codec, statistic.type, broker))) else: for property in schema.getProperties(): if property.optional: self._properties.append((property, None)) else: self._properties.append((property, self._session._defaultValue(property, broker, kwargs))) for statistic in schema.getStatistics(): self._statistics.append((statistic, self._session._defaultValue(statistic, broker, kwargs))) def getBroker(self): """ Return the broker from which this object was sent """ return self._broker def getObjectId(self): """ Return the object identifier for this object """ return self._objectId def getClassKey(self): """ Return the class-key that references the schema describing this object. """ return self._schema.getKey() def getSchema(self): """ Return the schema that describes this object. """ return self._schema def getMethods(self): """ Return a list of methods available for this object. """ return self._schema.getMethods() def getTimestamps(self): """ Return the current, creation, and deletion times for this object. """ return self._currentTime, self._createTime, self._deleteTime def isDeleted(self): """ Return True iff this object has been deleted. """ return self._deleteTime != 0 def isManaged(self): """ Return True iff this object is a proxy for a managed object on an agent. """ return self._managed def getIndex(self): """ Return a string describing this object's primary key. """ result = u"" for property, value in self._properties: if property.index: if result != u"": result += u":" try: valstr = unicode(self._session._displayValue(value, property.type)) except: valstr = u"" result += valstr return result def getProperties(self): """ Return a list of object properties """ return self._properties def getStatistics(self): """ Return a list of object statistics """ return self._statistics def mergeUpdate(self, newer): """ Replace properties and/or statistics with a newly received update """ if not self.isManaged(): raise Exception("Object is not managed") if self._objectId != newer._objectId: raise Exception("Objects with different object-ids") if len(newer.getProperties()) > 0: self._properties = newer.getProperties() if len(newer.getStatistics()) > 0: self._statistics = newer.getStatistics() def update(self): """ Contact the agent and retrieve the lastest property and statistic values for this object. """ if not self.isManaged(): raise Exception("Object is not managed") obj = self._session.getObjects(_objectId = self._objectId, _broker=self._broker) if obj: self.mergeUpdate(obj[0]) else: raise Exception("Underlying object no longer exists") def __repr__(self): if self.isManaged(): id = self.getObjectId().__repr__() else: id = "unmanaged" key = self.getClassKey() return key.getPackageName() + ":" + key.getClassName() +\ "[" + id + "] " + self.getIndex().encode("utf8") def __getattr__(self, name): for method in self._schema.getMethods(): if name == method.name: return lambda *args, **kwargs : self._invoke(name, args, kwargs) for property, value in self._properties: if name == property.name: return value if name == "_" + property.name + "_" and property.type == 10: # Dereference references deref = self._session.getObjects(_objectId=value, _broker=self._broker) if len(deref) != 1: return None else: return deref[0] for statistic, value in self._statistics: if name == statistic.name: return value raise Exception("Type Object has no attribute '%s'" % name) def __setattr__(self, name, value): if name[0] == '_': super.__setattr__(self, name, value) return for prop, unusedValue in self._properties: if name == prop.name: newprop = (prop, value) newlist = [] for old, val in self._properties: if name == old.name: newlist.append(newprop) else: newlist.append((old, val)) self._properties = newlist return super.__setattr__(self, name, value) def _sendMethodRequest(self, name, args, kwargs, synchronous=False, timeWait=None): for method in self._schema.getMethods(): if name == method.name: aIdx = 0 sendCodec = Codec() seq = self._session.seqMgr._reserve((method, synchronous)) self._broker._setHeader(sendCodec, 'M', seq) self._objectId.encode(sendCodec) self._schema.getKey().encode(sendCodec) sendCodec.write_str8(name) count = 0 for arg in method.arguments: if arg.dir.find("I") != -1: count += 1 if count != len(args): raise Exception("Incorrect number of arguments: expected %d, got %d" % (count, len(args))) for arg in method.arguments: if arg.dir.find("I") != -1: self._session._encodeValue(sendCodec, args[aIdx], arg.type) aIdx += 1 if timeWait: ttl = timeWait * 1000 else: ttl = None smsg = self._broker._message(sendCodec.encoded, "agent.%d.%d" % (self._objectId.getBrokerBank(), self._objectId.getAgentBank()), ttl=ttl) if synchronous: try: self._broker.cv.acquire() self._broker.syncInFlight = True finally: self._broker.cv.release() self._broker._send(smsg) return seq return None def _invoke(self, name, args, kwargs): if not self.isManaged(): raise Exception("Object is not managed") if "_timeout" in kwargs: timeout = kwargs["_timeout"] else: timeout = self._broker.SYNC_TIME if "_async" in kwargs and kwargs["_async"]: sync = False if "_timeout" not in kwargs: timeout = None else: sync = True seq = self._sendMethodRequest(name, args, kwargs, sync, timeout) if seq: if not sync: return seq try: self._broker.cv.acquire() starttime = time() while self._broker.syncInFlight and self._broker.error == None: self._broker.cv.wait(timeout) if time() - starttime > timeout: self._session.seqMgr._release(seq) raise RuntimeError("Timed out waiting for method to respond") finally: self._broker.cv.release() if self._broker.error != None: errorText = self._broker.error self._broker.error = None raise Exception(errorText) return self._broker.syncResult raise Exception("Invalid Method (software defect) [%s]" % name) def _encodeUnmanaged(self, codec): codec.write_uint8(20) codec.write_str8(self._schema.getKey().getPackageName()) codec.write_str8(self._schema.getKey().getClassName()) codec.write_bin128(self._schema.getKey().getHash()) # emit presence masks for optional properties mask = 0 bit = 0 for prop, value in self._properties: if prop.optional: if bit == 0: bit = 1 if value: mask |= bit bit = bit << 1 if bit == 256: bit = 0 codec.write_uint8(mask) mask = 0 if bit != 0: codec.write_uint8(mask) # encode properties for prop, value in self._properties: if value != None: self._session._encodeValue(codec, value, prop.type) # encode statistics for stat, value in self._statistics: self._session._encodeValue(codec, value, stat.type) def _parsePresenceMasks(self, codec, schema): excludeList = [] bit = 0 for property in schema.getProperties(): if property.optional: if bit == 0: mask = codec.read_uint8() bit = 1 if (mask & bit) == 0: excludeList.append(property.name) bit *= 2 if bit == 256: bit = 0 return excludeList class Session: """ An instance of the Session class represents a console session running against one or more QMF brokers. A single instance of Session is needed to interact with the management framework as a console. """ _CONTEXT_SYNC = 1 _CONTEXT_STARTUP = 2 _CONTEXT_MULTIGET = 3 DEFAULT_GET_WAIT_TIME = 60 ENCODINGS = { str: 7, timestamp: 8, datetime: 8, int: 9, long: 9, float: 13, UUID: 14, Object: 20, list: 21 } def __init__(self, console=None, rcvObjects=True, rcvEvents=True, rcvHeartbeats=True, manageConnections=False, userBindings=False): """ Initialize a session. If the console argument is provided, the more advanced asynchronous features are available. If console is defaulted, the session will operate in a simpler, synchronous manner. The rcvObjects, rcvEvents, and rcvHeartbeats arguments are meaningful only if 'console' is provided. They control whether object updates, events, and agent-heartbeats are subscribed to. If the console is not interested in receiving one or more of the above, setting the argument to False will reduce tha bandwidth used by the API. If manageConnections is set to True, the Session object will manage connections to the brokers. This means that if a broker is unreachable, it will retry until a connection can be established. If a connection is lost, the Session will attempt to reconnect. If manageConnections is set to False, the user is responsible for handing failures. In this case, an unreachable broker will cause addBroker to raise an exception. If userBindings is set to False (the default) and rcvObjects is True, the console will receive data for all object classes. If userBindings is set to True, the user must select which classes the console shall receive by invoking the bindPackage or bindClass methods. This allows the console to be configured to receive only information that is relavant to a particular application. If rcvObjects id False, userBindings has no meaning. """ self.console = console self.brokers = [] self.packages = {} self.seqMgr = SequenceManager() self.cv = Condition() self.syncSequenceList = [] self.getResult = [] self.getSelect = [] self.error = None self.rcvObjects = rcvObjects self.rcvEvents = rcvEvents self.rcvHeartbeats = rcvHeartbeats self.userBindings = userBindings if self.console == None: self.rcvObjects = False self.rcvEvents = False self.rcvHeartbeats = False self.bindingKeyList = self._bindingKeys() self.manageConnections = manageConnections if self.userBindings and not self.rcvObjects: raise Exception("userBindings can't be set unless rcvObjects is set and a console is provided") def __repr__(self): return "QMF Console Session Manager (brokers: %d)" % len(self.brokers) def addBroker(self, target="localhost", timeout=None): """ Connect to a Qpid broker. Returns an object of type Broker. """ url = BrokerURL(target) broker = Broker(self, url.host, url.port, url.authMech, url.authName, url.authPass, ssl = url.scheme == URL.AMQPS, connTimeout=timeout) self.brokers.append(broker) if not self.manageConnections: self.getObjects(broker=broker, _class="agent") return broker def delBroker(self, broker): """ Disconnect from a broker. The 'broker' argument is the object returned from the addBroker call """ if self.console: for agent in broker.getAgents(): self.console.delAgent(agent) broker._shutdown() self.brokers.remove(broker) del broker def getPackages(self): """ Get the list of known QMF packages """ for broker in self.brokers: broker._waitForStable() list = [] for package in self.packages: list.append(package) return list def getClasses(self, packageName): """ Get the list of known classes within a QMF package """ for broker in self.brokers: broker._waitForStable() list = [] if packageName in self.packages: for pkey in self.packages[packageName]: list.append(self.packages[packageName][pkey].getKey()) return list def getSchema(self, classKey): """ Get the schema for a QMF class """ for broker in self.brokers: broker._waitForStable() pname = classKey.getPackageName() pkey = classKey.getPackageKey() if pname in self.packages: if pkey in self.packages[pname]: return self.packages[pname][pkey] def bindPackage(self, packageName): """ Request object updates for all table classes within a package. """ if not self.userBindings or not self.rcvObjects: raise Exception("userBindings option not set for Session") key = "console.obj.*.*.%s.#" % packageName self.bindingKeyList.append(key) for broker in self.brokers: if broker.isConnected(): broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName, binding_key=key) def bindClass(self, pname, cname): """ Request object updates for a particular table class by package and class name. """ if not self.userBindings or not self.rcvObjects: raise Exception("userBindings option not set for Session") key = "console.obj.*.*.%s.%s.#" % (pname, cname) self.bindingKeyList.append(key) for broker in self.brokers: if broker.isConnected(): broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName, binding_key=key) def bindClassKey(self, classKey): """ Request object updates for a particular table class by class key. """ pname = classKey.getPackageName() cname = classKey.getClassName() self.bindClass(pname, cname) def getAgents(self, broker=None): """ Get a list of currently known agents """ brokerList = [] if broker == None: for b in self.brokers: brokerList.append(b) else: brokerList.append(broker) for b in brokerList: b._waitForStable() agentList = [] for b in brokerList: for a in b.getAgents(): agentList.append(a) return agentList def makeObject(self, classKey, broker=None, **kwargs): """ Create a new, unmanaged object of the schema indicated by classKey """ schema = self.getSchema(classKey) if schema == None: raise Exception("Schema not found for classKey") return Object(self, broker, schema, None, True, True, False, kwargs) def getObjects(self, **kwargs): """ Get a list of objects from QMF agents. All arguments are passed by name(keyword). The class for queried objects may be specified in one of the following ways: _schema = - supply a schema object returned from getSchema. _key = - supply a classKey from the list returned by getClasses. _class = - supply a class name as a string. If the class name exists in multiple packages, a _package argument may also be supplied. _objectId = - get the object referenced by the object-id If objects should be obtained from only one agent, use the following argument. Otherwise, the query will go to all agents. _agent = - supply an agent from the list returned by getAgents. If the get query is to be restricted to one broker (as opposed to all connected brokers), add the following argument: _broker = - supply a broker as returned by addBroker. The default timeout for this synchronous operation is 60 seconds. To change the timeout, use the following argument: _timeout =