diff options
Diffstat (limited to 'qpid/extras/qmf/src/py/qmf/console.py')
-rw-r--r-- | qpid/extras/qmf/src/py/qmf/console.py | 855 |
1 files changed, 588 insertions, 267 deletions
diff --git a/qpid/extras/qmf/src/py/qmf/console.py b/qpid/extras/qmf/src/py/qmf/console.py index 0b0ec417d0..4bbe69655e 100644 --- a/qpid/extras/qmf/src/py/qmf/console.py +++ b/qpid/extras/qmf/src/py/qmf/console.py @@ -41,6 +41,9 @@ from cStringIO import StringIO #import qpid.log #qpid.log.enable(name="qpid.io.cmd", level=qpid.log.DEBUG) +#=================================================================================================== +# CONSOLE +#=================================================================================================== class Console: """ To access the asynchronous operations, a class must be derived from Console with overrides of any combination of the available methods. """ @@ -94,6 +97,10 @@ class Console: """ Invoked when a method response from an asynchronous method call is received. """ pass + +#=================================================================================================== +# BrokerURL +#=================================================================================================== class BrokerURL(URL): def __init__(self, text): URL.__init__(self, text) @@ -115,13 +122,22 @@ class BrokerURL(URL): def match(self, host, port): return socket.getaddrinfo(self.host, self.port)[0][4] == socket.getaddrinfo(host, port)[0][4] + +#=================================================================================================== +# Object +#=================================================================================================== class Object(object): - """ This class defines a 'proxy' object representing a real managed object on an agent. - Actions taken on this proxy are remotely affected on the real managed object. """ - def __init__(self, session, broker, schema, codec=None, prop=None, stat=None, managed=True, v2Map=None, agentName=None, kwargs={}): - self._session = session - self._broker = broker + This class defines a 'proxy' object representing a real managed object on an agent. + Actions taken on this proxy are remotely affected on the real managed object. + """ + def __init__(self, agent, schema, codec=None, prop=None, stat=None, v2Map=None, agentName=None, kwargs={}): + self._agent = agent + self._session = None + self._broker = None + if agent: + self._session = agent.session + self._broker = agent.broker self._schema = schema self._properties = [] self._statistics = [] @@ -129,8 +145,7 @@ class Object(object): self.v2Init(v2Map, agentName) return - self._managed = managed - if self._managed: + if self._agent: self._currentTime = codec.read_uint64() self._createTime = codec.read_uint64() self._deleteTime = codec.read_uint64() @@ -176,10 +191,9 @@ class Object(object): if '_subtypes' in omap: self._subtypes = omap['_subtypes'] if '_object_id' in omap: - self._managed = True self._objectId = ObjectId(omap['_object_id'], agentName=agentName) else: - self._managed = None + self._objectId = None def getBroker(self): """ Return the broker from which this object was sent """ @@ -211,7 +225,7 @@ class Object(object): def isManaged(self): """ Return True iff this object is a proxy for a managed object on an agent. """ - return self._managed + return self._objectId and self._agent def getIndex(self): """ Return a string describing this object's primary key. """ @@ -250,7 +264,7 @@ class Object(object): """ Contact the agent and retrieve the lastest property and statistic values for this object. """ if not self.isManaged(): raise Exception("Object is not managed") - obj = self._session.getObjects(_objectId = self._objectId, _broker=self._broker) + obj = self._agent.getObjects(_objectId=self._objectId) if obj: self.mergeUpdate(obj[0]) else: @@ -423,6 +437,10 @@ class Object(object): bit = 0 return excludeList + +#=================================================================================================== +# Session +#=================================================================================================== class Session: """ An instance of the Session class represents a console session running @@ -495,132 +513,26 @@ class Session: if self.userBindings and not self.rcvObjects: raise Exception("userBindings can't be set unless rcvObjects is set and a console is provided") - """ - ## - ## v2_data_queues is used to store object data received from QMFv2 agents. - ## It is stored here in case we need to go and query schema data from the - ## agent before reporting to the user. - ## - ## v2_data_queues is a map, keyed by agent address of queues of entries - ## The format of entries in the queue is a data map - ## This list must be protected by self.cv - ## - """ - self.v2_data_queues = {} - self.v2_pending_queues = {} - def _getBrokerForAgentAddr(self, agent_addr): - broker = None try: self.cv.acquire() key = (1, agent_addr) for b in self.brokers: if key in b.agents: - broker = b - finally: - self.cv.release() - return broker - - def _processV2Data(self): - """ - Attempt to make progress on the entries in the v2_data_queue. If an entry has a schema - that is in our schema cache, process it. Otherwise, send a request for the schema information - to the agent that manages the object. - """ - try: - self.cv.acquire() - pop_list = [] - for agent_addr in self.v2_data_queues: - entries = self.v2_data_queues[agent_addr] - keep_going = True - while keep_going and len(entries) > 0: - schemaId = self._getSchemaIdforV2ObjectLH(entries[0]) - schema = self.schemaCache.getSchema(schemaId) - if schema: - broker = self._getBrokerForAgentAddr(agent_addr) - obj = Object(self, broker, schema, v2Map=entries[0], agentName=agent_addr) - entries.pop(0) - - """ - TODO: This following code assumes that the data indication came unsolicited. - This needs to be enhanced to handle the case of a query response. - """ - if self.console: - self.console.objectProps(broker, obj) - - else: - """ - We have no schema for this data object, move the queue to the pending map and request - schema data from the agent - """ - self.v2_pending_queues[agent_addr] = self.v2_data_queues[agent_addr] - pop_list.append(agent_addr) - self._v2SendSchemaRequest(agent_addr, schemaId) - keep_going = None - for agent_addr in pop_list: - self.v2_data_queues.pop(agent_addr) - finally: - self.cv.release() - - def _addV2Data(self, agent_addr, data_map): - """ - Add data-for-processing to the work queue - """ - process = None - try: - self.cv.acquire() - if agent_addr in self.v2_pending_queues: - self.v2_pending_queues[agent_addr].append(data_map) - else: - if agent_addr not in self.v2_data_queues: - self.v2_data_queues[agent_addr] = [] - self.v2_data_queues[agent_addr].append(data_map) - process = True - finally: - self.cv.release() - - if process: - self._processV2Data() - - def _removeV2Agent(self, agent): - """ - Remove entries in the data queues related to a lost agent. - """ - agent_name = agent.getAgentBank() - try: - self.cv.acquire() - if agent_name in self.v2_data_queues: - self.v2_data_queues.pop(agent_name) - if agent_name in self.v2_pending_queues: - self.v2_pending_queues.pop(agent_name) + return b finally: self.cv.release() + return None - def _schemaInfoFromV2Agent(self, agent_addr): - """ - We have just received new schema information from an agent. Check to see if there's - more work that can now be done. - """ - re_process = None + def _getAgentForAgentAddr(self, agent_addr): try: self.cv.acquire() - if agent_addr in self.v2_pending_queues: - self.v2_data_queues[agent_addr] = self.v2_pending_queues.pop(agent_addr) - re_process = True + key = agent_addr + for b in self.brokers: + if key in b.agents: + return b.agents[key] finally: self.cv.release() - - if re_process: - self._processV2Data() - - def _getSchemaIdforV2ObjectLH(self, data): - """ - Given a data map, extract the schema-identifier. - """ - if data.__class__ != dict: - return None - if '_schema_id' in data: - return ClassKey(data['_schema_id']) return None def __repr__(self): @@ -642,7 +554,6 @@ class Session: returned from the addBroker call """ if self.console: for agent in broker.getAgents(): - self.console.removev2Agent(agent) self.console.delAgent(agent) broker._shutdown() self.brokers.remove(broker) @@ -711,12 +622,12 @@ class Session: agentList.append(a) return agentList - def makeObject(self, classKey, broker=None, **kwargs): + def makeObject(self, classKey, **kwargs): """ Create a new, unmanaged object of the schema indicated by classKey """ schema = self.getSchema(classKey) if schema == None: raise Exception("Schema not found for classKey") - return Object(self, broker, schema, None, True, True, False, kwargs) + return Object(None, schema, None, True, True, kwargs) def getObjects(self, **kwargs): """ Get a list of objects from QMF agents. @@ -886,7 +797,6 @@ class Session: def _handleBrokerDisconnect(self, broker): if self.console: for agent in broker.getAgents(): - self.session._removeV2Agent(agent) self.console.delAgent(agent) self.console.brokerDisconnected(broker) @@ -955,31 +865,6 @@ class Session: smsg = broker._message(sendCodec.encoded) broker._send(smsg) - def _handleMethodResp(self, broker, codec, seq): - code = codec.read_uint32() - text = codec.read_str16() - outArgs = {} - pair = self.seqMgr._release(seq) - if pair == None: - return - method, synchronous = pair - if code == 0: - for arg in method.arguments: - if arg.dir.find("O") != -1: - outArgs[arg.name] = self._decodeValue(codec, arg.type, broker) - result = MethodResult(code, text, outArgs) - if synchronous: - try: - broker.cv.acquire() - broker.syncResult = result - broker.syncInFlight = False - broker.cv.notify() - finally: - broker.cv.release() - else: - if self.console: - self.console.methodResponse(broker, seq, result) - def _handleHeartbeatInd(self, broker, codec, seq, msg): brokerBank = 1 agentBank = 0 @@ -1003,58 +888,32 @@ class Session: self.console.heartbeat(agent, timestamp) broker._ageAgents() - def _handleEventInd(self, broker, codec, seq): - if self.console != None: - event = Event(self, broker, codec) - self.console.event(broker, event) - def _handleSchemaResp(self, broker, codec, seq, agent_addr): kind = codec.read_uint8() classKey = ClassKey(codec) _class = SchemaClass(kind, classKey, codec, self) self.schemaCache.declareClass(classKey, _class) - self.seqMgr._release(seq) - broker._decOutstanding() + ctx = self.seqMgr._release(seq) + if ctx: + broker._decOutstanding() if self.console != None: self.console.newClass(kind, classKey) - if agent_addr: - self._schemaInfoFromV2Agent(agent_addr) - - def _handleContentInd(self, broker, codec, seq, prop=False, stat=False): - classKey = ClassKey(codec) - schema = self.schemaCache.getSchema(classKey) - if not schema: - return - - object = Object(self, broker, schema, codec, prop, stat) - if classKey.getPackageName() == "org.apache.qpid.broker" and classKey.getClassName() == "agent" and prop: - broker._updateAgent(object) - - try: - self.cv.acquire() - if seq in self.syncSequenceList: - if object.getTimestamps()[2] == 0 and self._selectMatch(object): - self.getResult.append(object) - return - finally: - self.cv.release() - - if self.console and self.rcvObjects: - if prop: - self.console.objectProps(broker, object) - if stat: - self.console.objectStats(broker, object) + if agent_addr and (agent_addr.__class__ == str or agent_addr.__class__ == unicode): + agent = self._getAgentForAgentAddr(agent_addr) + if agent: + agent._schemaInfoFromV2Agent() def _v2HandleHeartbeatInd(self, broker, mp, ah, content): - brokerBank = 1 - agentName = ah["qmf.agent"] - values = content["_values"] - timestamp = values["timestamp"] - interval = values["heartbeat_interval"] - if agentName == None: + try: + agentName = ah["qmf.agent"] + values = content["_values"] + timestamp = values["timestamp"] + interval = values["heartbeat_interval"] + except: return - agent = broker.getAgent(brokerBank, agentName) + + agent = broker.getAgent(1, agentName) if agent == None: agent = Agent(broker, agentName, "QMFv2 Agent", True, interval) broker._addAgent(agentName, agent) @@ -1067,44 +926,6 @@ class Session: def _v2HandleAgentLocateRsp(self, broker, mp, ah, content): self._v2HandleHeartbeatInd(broker, mp, ah, content) - def _v2HandleDataInd(self, broker, mp, ah, content): - kind = "_data" - if "qmf.content" in ah: - kind = ah["qmf.content"] - agent_addr = ah["qmf.agent"] - if content.__class__ != list: - return - if kind == "_data": - for omap in content: - self._addV2Data(agent_addr, omap) - - def _v2HandleQueryRsp(self, broker, mp, ah, content): - pass - - def _v2HandleMethodRsp(self, broker, mp, ah, content): - pass - - def _v2HandleException(self, broker, mp, ah, content): - pass - - def _v2SendSchemaRequest(self, agent_addr, schemaId): - """ - Send a query to an agent to request details on a particular schema class. - IMPORTANT: This function currently sends a QMFv1 schema-request to the address of - the agent. The agent will send its response to amq.direct/<our-key>. - Eventually, this will be converted to a proper QMFv2 schema query. - """ - broker = self._getBrokerForAgentAddr(agent_addr) - if not broker: - return - - sendCodec = Codec() - seq = self.seqMgr._reserve(None) - broker._setHeader(sendCodec, 'S', seq) - schemaId.encode(sendCodec) - smsg = broker._message(sendCodec.encoded, agent_addr) - broker._send(smsg, "qmf.default.direct") - def _handleError(self, error): try: self.cv.acquire() @@ -1343,6 +1164,10 @@ class Session: return seq return None + +#=================================================================================================== +# SchemaCache +#=================================================================================================== class SchemaCache(object): """ The SchemaCache is a data structure that stores learned schema information. @@ -1419,6 +1244,10 @@ class SchemaCache(object): self.lock.release() return True + +#=================================================================================================== +# ClassKey +#=================================================================================================== class ClassKey: """ A ClassKey uniquely identifies a class from the schema. """ def __init__(self, constructor): @@ -1443,7 +1272,7 @@ class ClassKey: try: self.pname = constructor['_package_name'] self.cname = constructor['_class_name'] - self.hash = constructor['_hash_str'] + self.hash = constructor['_hash'] except: raise Exception("Invalid ClassKey map format") else: @@ -1458,6 +1287,9 @@ class ClassKey: codec.write_str8(self.cname) codec.write_bin128(self.hash.bytes) + def asMap(self): + return {'_package_name': self.pname, '_class_name': self.cname, '_hash': self.hash} + def getPackageName(self): return self.pname @@ -1476,6 +1308,10 @@ class ClassKey: def __repr__(self): return self.pname + ":" + self.cname + "(" + self.getHashString() + ")" + +#=================================================================================================== +# SchemaClass +#=================================================================================================== class SchemaClass: """ """ CLASS_KIND_TABLE = 1 @@ -1558,6 +1394,10 @@ class SchemaClass: else: return self.arguments + self.session.getSchema(self.superTypeKey).getArguments() + +#=================================================================================================== +# SchemaProperty +#=================================================================================================== class SchemaProperty: """ """ def __init__(self, codec): @@ -1587,6 +1427,10 @@ class SchemaProperty: def __repr__(self): return self.name + +#=================================================================================================== +# SchemaStatistic +#=================================================================================================== class SchemaStatistic: """ """ def __init__(self, codec): @@ -1603,6 +1447,10 @@ class SchemaStatistic: def __repr__(self): return self.name + +#=================================================================================================== +# SchemaMethod +#=================================================================================================== class SchemaMethod: """ """ def __init__(self, codec): @@ -1631,6 +1479,10 @@ class SchemaMethod: result += ")" return result + +#=================================================================================================== +# SchemaArgument +#=================================================================================================== class SchemaArgument: """ """ def __init__(self, codec, methodArg): @@ -1658,6 +1510,10 @@ class SchemaArgument: elif key == "refPackage" : self.refPackage = value elif key == "refClass" : self.refClass = value + +#=================================================================================================== +# ObjectId +#=================================================================================================== class ObjectId: """ Object that represents QMF object identifiers """ def __init__(self, constructor, first=0, second=0, agentName=None): @@ -1742,12 +1598,22 @@ class ObjectId: codec.write_uint64(first) codec.write_uint64(second) + def asMap(self): + omap = {'_agent_name': self.agentName, '_object_name': self.objectName} + if self.agentEpoch != 0: + omap['_agent_epoch'] = self.agentEpoch + return omap + def __hash__(self): return self.__repr__().__hash__() def __eq__(self, other): return self.__repr__().__eq__(other) + +#=================================================================================================== +# MethodResult +#=================================================================================================== class MethodResult(object): """ """ def __init__(self, status, text, outArgs): @@ -1763,6 +1629,10 @@ class MethodResult(object): def __repr__(self): return "%s (%d) - %s" % (self.text, self.status, self.outArgs) + +#=================================================================================================== +# ManagedConnection +#=================================================================================================== class ManagedConnection(Thread): """ Thread class for managing a connection. """ DELAY_MIN = 1 @@ -1825,6 +1695,10 @@ class ManagedConnection(Thread): finally: self.cv.release() + +#=================================================================================================== +# Broker +#=================================================================================================== class Broker: """ This object represents a connection (or potential connection) to a QMF broker. """ SYNC_TIME = 60 @@ -1872,7 +1746,7 @@ class Broker: def getAgent(self, brokerBank, agentBank): """ Return the agent object associated with a particular broker and agent bank value.""" - bankKey = (brokerBank, agentBank) + bankKey = agentBank try: self.cv.acquire() if bankKey in self.agents: @@ -1923,7 +1797,7 @@ class Broker: try: self.cv.acquire() self.agents = {} - self.agents[(1,0)] = Agent(self, 0, "BrokerAgent") + self.agents[0] = Agent(self, 0, "BrokerAgent") finally: self.cv.release() @@ -1960,7 +1834,7 @@ class Broker: self.amqpSession.message_subscribe(queue=self.replyName, destination="rdest", accept_mode=self.amqpSession.accept_mode.none, acquire_mode=self.amqpSession.acquire_mode.pre_acquired) - self.amqpSession.incoming("rdest").listen(self._replyCb, self._exceptionCb) + self.amqpSession.incoming("rdest").listen(self._v1Cb, self._exceptionCb) self.amqpSession.message_set_flow_mode(destination="rdest", flow_mode=1) self.amqpSession.message_flow(destination="rdest", unit=0, value=0xFFFFFFFFL) self.amqpSession.message_flow(destination="rdest", unit=1, value=0xFFFFFFFFL) @@ -1970,7 +1844,7 @@ class Broker: self.amqpSession.message_subscribe(queue=self.topicName, destination="tdest", accept_mode=self.amqpSession.accept_mode.none, acquire_mode=self.amqpSession.acquire_mode.pre_acquired) - self.amqpSession.incoming("tdest").listen(self._replyCb) + self.amqpSession.incoming("tdest").listen(self._v1Cb) self.amqpSession.message_set_flow_mode(destination="tdest", flow_mode=1) self.amqpSession.message_flow(destination="tdest", unit=0, value=0xFFFFFFFFL) self.amqpSession.message_flow(destination="tdest", unit=1, value=0xFFFFFFFFL) @@ -2013,7 +1887,7 @@ class Broker: raise def _updateAgent(self, obj): - bankKey = (obj.brokerBank, obj.agentBank) + bankKey = obj.agentBank agent = None if obj._deleteTime == 0: try: @@ -2037,7 +1911,7 @@ class Broker: def _addAgent(self, name, agent): try: self.cv.acquire() - self.agents[(1, name)] = agent + self.agents[name] = agent finally: self.cv.release() if self.session.console: @@ -2057,7 +1931,6 @@ class Broker: self.cv.release() if self.session.console: for agent in to_notify: - self.session._removeV2Agent(agent) self.session.console.delAgent(agent) def _v2SendAgentLocate(self, predicate={}): @@ -2167,51 +2040,88 @@ class Broker: finally: self.cv.release() - def _replyCb(self, msg): + def _v1Cb(self, msg): + """ + This is the general message handler for messages received via the QMFv1 exchanges. + """ + agent = None agent_addr = None mp = msg.get("message_properties") ah = mp.application_headers if ah and 'qmf.agent' in ah: agent_addr = ah['qmf.agent'] + + if not agent_addr: + # + # See if we can determine the agent identity from the routing key + # + dp = msg.get("delivery_properties") + rkey = None + if dp.routing_key: + rkey = dp.routing_key + items = rkey.split('.') + if len(items) >= 4: + if items[0] == 'console' and items[3].isdigit(): + agent_addr = int(items[3]) # The QMFv1 Agent Bank + if agent_addr and agent_addr in self.agents: + agent = self.agents[agent_addr] + codec = Codec(msg.body) while True: opcode, seq = self._checkHeader(codec) if opcode == None: return if opcode == 'b': self.session._handleBrokerResp (self, codec, seq) elif opcode == 'p': self.session._handlePackageInd (self, codec, seq) - elif opcode == 'z': self.session._handleCommandComplete (self, codec, seq) elif opcode == 'q': self.session._handleClassInd (self, codec, seq) - elif opcode == 'm': self.session._handleMethodResp (self, codec, seq) - elif opcode == 'h': self.session._handleHeartbeatInd (self, codec, seq, msg) - elif opcode == 'e': self.session._handleEventInd (self, codec, seq) elif opcode == 's': self.session._handleSchemaResp (self, codec, seq, agent_addr) - elif opcode == 'c': self.session._handleContentInd (self, codec, seq, prop=True) - elif opcode == 'i': self.session._handleContentInd (self, codec, seq, stat=True) - elif opcode == 'g': self.session._handleContentInd (self, codec, seq, prop=True, stat=True) - self.session.receiver._completed.add(msg.id) - self.session.channel.session_completed(self.session.receiver._completed) + elif opcode == 'h': self.session._handleHeartbeatInd (self, codec, seq, msg) + elif opcode == 'z': self.session._handleCommandComplete (self, codec, seq) + elif agent: + agent._handleQmfV1Message(opcode, mp, ah, codec) + + self.amqpSession.receiver._completed.add(msg.id) + self.amqpSession.channel.session_completed(self.amqpSession.receiver._completed) def _v2Cb(self, msg): - dp = msg.get("delivery_properties") + """ + This is the general message handler for messages received via QMFv2 exchanges. + """ mp = msg.get("message_properties") ah = mp["application_headers"] - opcode = ah["qmf.opcode"] codec = Codec(msg.body) - if mp.content_type == "amqp/list": - content = codec.read_list() - elif mp.content_type == "amqp/map": - content = codec.read_map() - else: - return - - if opcode == None: return - elif opcode == '_agent_heartbeat_indication': self.session._v2HandleHeartbeatInd(self, mp, ah, content) - elif opcode == '_agent_locate_response': self.session._v2HandleAgentLocateRsp(self, mp, ah, content) - elif opcode == '_data_indication': self.session._v2HandleDataInd(self, mp, ah, content) - elif opcode == '_query_response': self.session._v2HandleQueryRsp(self, mp, ah, content) - elif opcode == '_method_response': self.session._v2HandleMethodRsp(self, mp, ah, content) - elif opcode == '_exception': self.session._v2HandleException(self, mp, ah, content) + if 'qmf.opcode' in ah: + opcode = ah['qmf.opcode'] + if mp.content_type == "amqp/list": + content = codec.read_list() + if not content: + content = [] + elif mp.content_type == "amqp/map": + content = codec.read_map() + if not content: + content = {} + else: + content = None + + if content != None: + ## + ## Directly handle agent heartbeats and agent locate responses as these are broker-scope (they are + ## used to maintain the broker's list of agent proxies. + ## + if opcode == '_agent_heartbeat_indication': self.session._v2HandleHeartbeatInd(self, mp, ah, content) + elif opcode == '_agent_locate_response': self.session._v2HandleAgentLocateRsp(self, mp, ah, content) + else: + ## + ## All other opcodes are agent-scope and are forwarded to the agent proxy representing the sender + ## of the message. + ## + agent_addr = ah['qmf.agent'] + if agent_addr in self.agents: + agent = self.agents[agent_addr] + agent._handleQmfV2Message(opcode, mp, ah, content) + + self.amqpSession.receiver._completed.add(msg.id) + self.amqpSession.channel.session_completed(self.amqpSession.receiver._completed) def _exceptionCb(self, data): self.connected = False @@ -2227,20 +2137,46 @@ class Broker: if self.thread: self.thread.disconnected() + +#=================================================================================================== +# Agent +#=================================================================================================== class Agent: - """ """ + """ + This class represents a proxy for a remote agent being managed + """ def __init__(self, broker, agentBank, label, isV2=False, interval=0): self.broker = broker + self.session = broker.session + self.schemaCache = self.session.schemaCache self.brokerBank = broker.getBrokerBank() self.agentBank = agentBank self.label = label self.isV2 = isV2 self.heartbeatInterval = interval + self.lock = Lock() + self.seqMgr = self.session.seqMgr + self.contextMap = {} + self.unsolicitedContext = RequestContext(self, self) self.lastSeenTime = time() + + def __call__(self, **kwargs): + """ + This is the handler for unsolicited stuff received from the agent + """ + if 'qmf_object' in kwargs: + if self.session.console: + self.session.console.objectProps(self.broker, kwargs['qmf_object']) + if 'qmf_object_stats' in kwargs: + if self.session.console: + self.session.console.objectStats(self.broker, kwargs['qmf_object_stats']) + + def touch(self): self.lastSeenTime = time() + def isOld(self): if self.heartbeatInterval == 0: return None @@ -2248,6 +2184,7 @@ class Agent: return True return None + def __repr__(self): if self.isV2: ver = "v2" @@ -2255,15 +2192,392 @@ class Agent: ver = "v1" return "Agent(%s) at bank %d.%s (%s)" % (ver, self.brokerBank, self.agentBank, self.label) + def getBroker(self): return self.broker + def getBrokerBank(self): return self.brokerBank + def getAgentBank(self): return self.agentBank + + def getObjects(self, notifiable=None, **kwargs): + """ Get a list of objects from QMF agents. + All arguments are passed by name(keyword). + + If 'notifiable' is None (default), this call will block until completion or timeout. + If supplied, notifiable is assumed to be a callable object that will be called when the + list of queried objects arrives. The single argument to the call shall be a list of + the returned objects. + + The class for queried objects may be specified in one of the following ways: + + _schema = <schema> - supply a schema object returned from getSchema. + _key = <key> - supply a classKey from the list returned by getClasses. + _class = <name> - supply a class name as a string. If the class name exists + in multiple packages, a _package argument may also be supplied. + _objectId = <id> - get the object referenced by the object-id + + The default timeout for this synchronous operation is 60 seconds. To change the timeout, + use the following argument: + + _timeout = <time in seconds> + + If additional arguments are supplied, they are used as property selectors. For example, + if the argument name="test" is supplied, only objects whose "name" property is "test" + will be returned in the result. + """ + if notifiable: + if not callable(notifiable): + raise Exception("notifiable object must be callable") + + # + # Allocate a context to track this asynchronous request. + # + context = RequestContext(self, notifiable) + sequence = self.seqMgr._reserve(context) + try: + self.lock.acquire() + self.contextMap[sequence] = context + finally: + self.lock.release() + + # + # Compose and send the query message to the agent using the appropriate protocol for the + # agent's QMF version. + # + if self.isV2: + self._v2SendGetQuery(sequence, kwargs) + else: + self._v1SendGetQuery(sequence, kwargs) + + # + # If this is a synchronous call, block and wait for completion. + # + if not notifiable: + timeout = 60 + if '_timeout' in kwargs: + timeout = kwargs['_timeout'] + context.waitForSignal(timeout) + if context.exception: + raise Exception(context.exception) + result = context.queryResults + self.contextMap.pop(sequence) + return result + + + def _schemaInfoFromV2Agent(self): + """ + We have just received new schema information from this agent. Check to see if there's + more work that can now be done. + """ + try: + self.lock.acquire() + copy_of_map = self.contextMap + finally: + self.lock.release() + + self.unsolicitedContext.reprocess() + for context in copy_of_map: + copy_of_map[context].reprocess() + + + def _v1HandleMethodResp(self, codec, seq): + """ + Handle a QMFv1 method response + """ + code = codec.read_uint32() + text = codec.read_str16() + outArgs = {} + pair = self.seqMgr._release(seq) + if pair == None: + return + method, synchronous = pair + if code == 0: + for arg in method.arguments: + if arg.dir.find("O") != -1: + outArgs[arg.name] = self._decodeValue(codec, arg.type, broker) + result = MethodResult(code, text, outArgs) + if synchronous: + try: + broker.cv.acquire() + broker.syncResult = result + broker.syncInFlight = False + broker.cv.notify() + finally: + broker.cv.release() + else: + if self.console: + self.console.methodResponse(broker, seq, result) + + + def _v1HandleEventInd(self, broker, codec, seq): + """ + Handle a QMFv1 event indication + """ + if self.console != None: + event = Event(self, broker, codec) + self.console.event(broker, event) + + + def _v1HandleContentInd(self, broker, codec, seq, prop=False, stat=False): + """ + Handle a QMFv1 content indication + """ + classKey = ClassKey(codec) + schema = self.schemaCache.getSchema(classKey) + if not schema: + return + + obj = Object(self, broker, schema, codec, prop, stat) + if classKey.getPackageName() == "org.apache.qpid.broker" and classKey.getClassName() == "agent" and prop: + broker._updateAgent(obj) + + try: + self.lock.acquire() + if seq in self.syncSequenceList: + if object.getTimestamps()[2] == 0 and self._selectMatch(object): + self.getResult.append(object) + return + finally: + self.lock.release() + + if self.console and self.rcvObjects: + if prop: + self.console.objectProps(broker, object) + if stat: + self.console.objectStats(broker, object) + + + def _v2HandleDataInd(self, mp, ah, content): + """ + Handle a QMFv2 data indication from the agent + """ + if mp.correlation_id: + sequence = int(mp.correlation_id) + if sequence not in self.contextMap: + return + context = self.contextMap[sequence] + else: + context = self.unsolicitedContext + + kind = "_data" + if "qmf.content" in ah: + kind = ah["qmf.content"] + if kind == "_data": + if content.__class__ != list: + return + for omap in content: + context.addV2QueryResult(omap) + context.processV2Data() + + if 'partial' not in ah: + context.signal() + + + def _v2HandleMethodRsp(self, mp, ah, content): + pass + + + def _v2HandleException(self, mp, ah, content): + pass + + + def _v1SendGetQuery(self, kwargs): + pass + + + def _v2SendGetQuery(self, sequence, kwargs): + """ + Send a get query to a QMFv2 agent. + """ + # + # Build the query map + # + query = {'_what': 'OBJECT'} + if '_class' in kwargs: + schemaMap = {'_class_name': kwargs['_class']} + if '_package' in kwargs: + schemaMap['_package_name'] = kwargs['_package'] + query['_schema_id'] = schemaMap + elif '_key' in kwargs: + query['_schema_id'] = kwargs['_key'].asMap() + elif '_objectId' in kwargs: + query['_object_id'] = kwargs['_objectId'].asMap + + # + # Construct and transmit the message + # + dp = self.broker.amqpSession.delivery_properties() + dp.routing_key = self.agentBank + mp = self.broker.amqpSession.message_properties() + mp.content_type = "amqp/map" + mp.user_id = self.broker.authUser + mp.correlation_id = str(sequence) + mp.app_id = "qmf2" + mp.reply_to = self.broker.amqpSession.reply_to("qmf.default.direct", self.broker.v2_queue_name) + mp.application_headers = {'qmf.opcode':'_query_request'} + sendCodec = Codec() + sendCodec.write_map(query) + msg = Message(dp, mp, sendCodec.encoded) + self.broker._send(msg, "qmf.default.direct") + + + def _v2SendSchemaRequest(self, schemaId): + """ + Send a query to an agent to request details on a particular schema class. + IMPORTANT: This function currently sends a QMFv1 schema-request to the address of + the agent. The agent will send its response to amq.direct/<our-key>. + Eventually, this will be converted to a proper QMFv2 schema query. + """ + sendCodec = Codec() + seq = self.seqMgr._reserve(None) + self.broker._setHeader(sendCodec, 'S', seq) + schemaId.encode(sendCodec) + smsg = self.broker._message(sendCodec.encoded, self.agentBank) + self.broker._send(smsg, "qmf.default.direct") + + + def _handleQmfV1Message(self, opcode, mp, ah, codec): + """ + Process QMFv1 messages arriving from an agent. + """ + if opcode == 'm': self._v1HandleMethodResp(codec, seq) + elif opcode == 'e': self._v1HandleEventInd(codec, seq) + elif opcode == 'c': self._v1HandleContentInd(codec, seq, prop=True) + elif opcode == 'i': self._v1HandleContentInd(codec, seq, stat=True) + elif opcode == 'g': self._v1HandleContentInd(codec, seq, prop=True, stat=True) + + + def _handleQmfV2Message(self, opcode, mp, ah, content): + """ + Process QMFv2 messages arriving from an agent. + """ + if opcode == '_data_indication': self._v2HandleDataInd(mp, ah, content) + elif opcode == '_query_response': self._v2HandleDataInd(mp, ah, content) + elif opcode == '_method_response': self._v2HandleMethodRsp(mp, ah, content) + elif opcode == '_exception': self._v2HandleException(mp, ah, content) + + +#=================================================================================================== +# RequestContext +#=================================================================================================== +class RequestContext(object): + """ + This class tracks an asynchronous request sent to an agent. + """ + def __init__(self, agent, notifiable): + self.agent = agent + self.schemaCache = self.agent.schemaCache + self.notifiable = notifiable + self.startTime = time() + self.rawQueryResults = [] + self.queryResults = [] + self.exception = None + self.waitingForSchema = None + self.cv = Condition() + self.blocked = notifiable == None + + + def addV2QueryResult(self, data): + self.rawQueryResults.append(data) + + + def setException(self, ex): + self.exception = ex + + + def getAge(self): + return time() - self.startTime + + + def waitForSignal(self, timeout): + try: + self.cv.acquire() + while self.blocked: + if (time() - self.startTime) > timeout: + self.exception = "Request timed out after %d seconds" % timeout + return + self.cv.wait(1) + finally: + self.cv.release() + + + def signal(self): + try: + self.cv.acquire() + self.blocked = None + self.cv.notify() + finally: + self.cv.release() + + + def processV2Data(self): + """ + Attempt to make progress on the entries in the raw_query_results queue. If an entry has a schema + that is in our schema cache, process it. Otherwise, send a request for the schema information + to the agent that manages the object. + """ + schemaId = None + queryResults = [] + try: + self.cv.acquire() + if self.waitingForSchema: + return + while (not self.waitingForSchema) and len(self.rawQueryResults) > 0: + head = self.rawQueryResults[0] + schemaId = self._getSchemaIdforV2ObjectLH(head) + schema = self.schemaCache.getSchema(schemaId) + if schema: + obj = Object(self.agent, schema, v2Map=head, agentName=self.agent.agentBank) + queryResults.append(obj) + self.rawQueryResults.pop(0) + else: + self.waitingForSchema = True + finally: + self.cv.release() + + if self.waitingForSchema: + self.agent._v2SendSchemaRequest(schemaId) + + for result in queryResults: + if self.notifiable: + self.notifiable(qmf_object=result) + else: + self.queryResults.append(result) + + + def reprocess(self): + """ + New schema information has been added to the schema-cache. Clear our 'waiting' status + and see if we can make more progress on the raw query list. + """ + try: + self.cv.acquire() + self.waitingForSchema = None + finally: + self.cv.release() + self.processV2Data() + + + def _getSchemaIdforV2ObjectLH(self, data): + """ + Given a data map, extract the schema-identifier. + """ + if data.__class__ != dict: + return None + if '_schema_id' in data: + return ClassKey(data['_schema_id']) + return None + + +#=================================================================================================== +# Event +#=================================================================================================== class Event: """ """ def __init__(self, session, broker, codec): @@ -2318,6 +2632,10 @@ class Event: def getSchema(self): return self.schema + +#=================================================================================================== +# SequenceManager +#=================================================================================================== class SequenceManager: """ Manage sequence numbers for asynchronous method calls """ def __init__(self): @@ -2349,6 +2667,9 @@ class SequenceManager: return data +#=================================================================================================== +# DebugConsole +#=================================================================================================== class DebugConsole(Console): """ """ def brokerConnected(self, broker): |