diff options
author | Ted Ross <tross@apache.org> | 2008-09-24 18:03:01 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2008-09-24 18:03:01 +0000 |
commit | b80fd373fbee66c47245d62889ecf962de077b3e (patch) | |
tree | c6a5c97eafd05ebb492e44d7ce91b2879d40daaa /python/qpid/qmfconsole.py | |
parent | a2a56cf9a7483e165fb579d0b519b284d02009e3 (diff) | |
download | qpid-python-b80fd373fbee66c47245d62889ecf962de077b3e.tar.gz |
Added event handling, did some code cleanup and fixed some small bugs
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@698678 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/qmfconsole.py')
-rw-r--r-- | python/qpid/qmfconsole.py | 375 |
1 files changed, 258 insertions, 117 deletions
diff --git a/python/qpid/qmfconsole.py b/python/qpid/qmfconsole.py index 435da475d7..4d06e4a725 100644 --- a/python/qpid/qmfconsole.py +++ b/python/qpid/qmfconsole.py @@ -26,18 +26,25 @@ import socket import re from qpid.peer import Closed from qpid.connection import Connection, ConnectionFailed -from qpid.datatypes import uuid4 +from qpid.datatypes import uuid4, Message, RangedSet from qpid.util import connect -from datatypes import Message, RangedSet +from qpid.codec010 import StringCodec as Codec from threading import Lock, Condition -from codec010 import StringCodec as Codec -from time import time +from time import time, strftime, gmtime from cStringIO import StringIO class Console: """ To access the asynchronous operations, a class must be derived from Console with overrides of any combination of the available methods. """ + def brokerConnected(self, broker): + """ Invoked when a connection is established to a broker """ + pass + + def brokerDisconnected(self, broker): + """ Invoked when the connection to a broker is lost """ + pass + def newPackage(self, name): """ Invoked when a QMF package is discovered. """ pass @@ -106,20 +113,38 @@ class Session: GET_WAIT_TIME = 10 - def __init__(self, console=None): + def __init__(self, console=None, rcvObjects=True, rcvEvents=True, rcvHeartbeats=True, + manageConnections=False): """ Initialize a session. If the console argument is provided, the more advanced asynchronous features are available. If console is defaulted, the session will operate in a simpler, synchronous manner. + + The rcvObjects, rcvEvents, and rcvHeartbeats arguments are meaningful only if 'console' + is provided. They control whether object updates, events, and agent-heartbeats are + subscribed to. If the console is not interested in receiving one or more of the above, + setting the argument to False will reduce tha bandwidth used by the API. + + If manageConnections is set to True, the Session object will manage connections to + the brokers. This means that if a broker is unreachable, it will retry until a connection + can be established. If a connection is lost, the Session will attempt to reconnect. + + If manageConnections is set to False, the user is responsible for handing failures. In + this case, an unreachable broker will cause addBroker to raise an exception. """ - self.console = console - self.brokers = [] - self.packages = {} - self.seqMgr = SequenceManager() - self.cv = Condition() - self.syncSequenceList = [] - self.getResult = [] - self.error = None + self.console = console + self.brokers = [] + self.packages = {} + self.seqMgr = SequenceManager() + self.cv = Condition() + self.syncSequenceList = [] + self.getResult = [] + self.error = None + self.bindingKeyList = self._bindingKeys(rcvObjects, rcvEvents, rcvHeartbeats) + self.manageConnections = manageConnections + + if (manageConnections): + raise Exception("manageConnections - not yet implemented") def __repr__(self): return "QMF Console Session Manager (brokers connected: %d)" % len(self.brokers) @@ -128,6 +153,9 @@ class Session: """ Connect to a Qpid broker. Returns an object of type Broker. """ url = BrokerURL(target) broker = Broker(self, url.host, url.port, url.authMech, url.authName, url.authPass) + if not broker.isConnected and not self.manageConnections: + raise Exception(broker.error) + self.brokers.append(broker) self.getObjects(broker=broker, name="agent") return broker @@ -272,6 +300,26 @@ class Session: """ """ pass + def _bindingKeys(self, rcvObjects, rcvEvents, rcvHeartbeats): + keyList = [] + if rcvObjects and rcvEvents and rcvHeartbeats: + keyList.append("mgmt.#") + else: + if rcvObjects: + keyList.append("mgmt.*.prop.#") + keyList.append("mgmt.*.stat.#") + if rcvEvents: + keyList.append("mgmt.event") + if rcvHeartbeats: + keyList.append("mgmt.*.heartbeat.#") + return keyList + + def _handleBrokerConnect(self, broker): + pass + + def _handleBrokerDisconnect(self, broker): + pass + def _handleBrokerResp(self, broker, codec, seq): broker.brokerId = codec.read_uuid() if self.console != None: @@ -355,7 +403,7 @@ class Session: if code == 0: for arg in method.arguments: if arg.dir.find("O") != -1: - outArgs[arg.name] = obj._decodeValue(codec, arg.type) + outArgs[arg.name] = self._decodeValue(codec, arg.type) broker.cv.acquire() broker.syncResult = MethodResult(code, text, outArgs) broker.syncInFlight = False @@ -364,10 +412,13 @@ class Session: def _handleHeartbeatInd(self, broker, codec, seq): timestamp = codec.read_uint64() - pass + if self.console != None: + self.console.heartbeat(None, timestamp) def _handleEventInd(self, broker, codec, seq): - pass + if self.console != None: + event = Event(self, codec) + self.console.event(broker, event) def _handleSchemaResp(self, broker, codec, seq): pname = str(codec.read_str8()) @@ -402,7 +453,8 @@ class Session: self.cv.acquire() if seq in self.syncSequenceList: - self.getResult.append(object) + if object.getTimestamps()[2] == 0: + self.getResult.append(object) self.cv.release() return self.cv.release() @@ -420,6 +472,79 @@ class Session: self.cv.notify() self.cv.release() + def _decodeValue(self, codec, typecode): + """ Decode, from the codec, a value based on its typecode. """ + if typecode == 1: data = codec.read_uint8() # U8 + elif typecode == 2: data = codec.read_uint16() # U16 + elif typecode == 3: data = codec.read_uint32() # U32 + elif typecode == 4: data = codec.read_uint64() # U64 + elif typecode == 6: data = str(codec.read_str8()) # SSTR + elif typecode == 7: data = codec.read_vbin32() # LSTR + elif typecode == 8: data = codec.read_int64() # ABSTIME + elif typecode == 9: data = codec.read_uint64() # DELTATIME + elif typecode == 10: data = ObjectId(codec) # REF + elif typecode == 11: data = codec.read_uint8() # BOOL + elif typecode == 12: data = codec.read_float() # FLOAT + elif typecode == 13: data = codec.read_double() # DOUBLE + elif typecode == 14: data = codec.read_uuid() # UUID + elif typecode == 15: data = codec.read_map() # FTABLE + elif typecode == 16: data = codec.read_int8() # S8 + elif typecode == 17: data = codec.read_int16() # S16 + elif typecode == 18: data = codec.read_int32() # S32 + elif typecode == 19: data = codec.read_int64() # S63 + else: + raise ValueError("Invalid type code: %d" % typecode) + return data + + def _encodeValue(self, codec, value, typecode): + """ Encode, into the codec, a value based on its typecode. """ + if typecode == 1: codec.write_uint8 (int(value)) # U8 + elif typecode == 2: codec.write_uint16 (int(value)) # U16 + elif typecode == 3: codec.write_uint32 (long(value)) # U32 + elif typecode == 4: codec.write_uint64 (long(value)) # U64 + elif typecode == 6: codec.write_str8 (value) # SSTR + elif typecode == 7: codec.write_vbin32 (value) # LSTR + elif typecode == 8: codec.write_int64 (long(value)) # ABSTIME + elif typecode == 9: codec.write_uint64 (long(value)) # DELTATIME + elif typecode == 10: value.encode (codec) # REF + elif typecode == 11: codec.write_uint8 (int(value)) # BOOL + elif typecode == 12: codec.write_float (float(value)) # FLOAT + elif typecode == 13: codec.write_double (double(value)) # DOUBLE + elif typecode == 14: codec.write_uuid (value) # UUID + elif typecode == 15: codec.write_map (value) # FTABLE + elif typecode == 16: codec.write_int8 (int(value)) # S8 + elif typecode == 17: codec.write_int16 (int(value)) # S16 + elif typecode == 18: codec.write_int32 (int(value)) # S32 + elif typecode == 19: codec.write_int64 (int(value)) # S64 + else: + raise ValueError ("Invalid type code: %d" % typecode) + + def _displayValue(self, value, typecode): + """ """ + if typecode == 1: return str(value) + elif typecode == 2: return str(value) + elif typecode == 3: return str(value) + elif typecode == 4: return str(value) + elif typecode == 6: return str(value) + elif typecode == 7: return str(value) + elif typecode == 8: return strftime("%c", gmtime(value / 1000000000)) + elif typecode == 9: return str(value) + elif typecode == 10: return value.__repr__() + elif typecode == 11: + if value: return 'T' + else: return 'F' + elif typecode == 12: return str(value) + elif typecode == 13: return str(value) + elif typecode == 14: return "%08x-%04x-%04x-%04x-%04x%08x" % struct.unpack("!LHHHHL", value) + elif typecode == 15: return value.__repr__() + elif typecode == 16: return str(value) + elif typecode == 17: return str(value) + elif typecode == 18: return str(value) + elif typecode == 19: return str(value) + else: + raise ValueError ("Invalid type code: %d" % typecode) + + class Package: """ """ def __init__(self, name): @@ -672,10 +797,10 @@ class Object(object): if property.name in notPresent: self.properties.append((property, None)) else: - self.properties.append((property, self._decodeValue(codec, property.type))) + self.properties.append((property, self.session._decodeValue(codec, property.type))) if stat: for statistic in schema.getStatistics(): - self.statistics.append((statistic, self._decodeValue(codec, statistic.type))) + self.statistics.append((statistic, self.session._decodeValue(codec, statistic.type))) def getObjectId(self): """ Return the object identifier for this object """ @@ -735,9 +860,17 @@ class Object(object): sendCodec.write_str8(cname) sendCodec.write_bin128(hash) sendCodec.write_str8(name) + + count = 0 + for arg in method.arguments: + if arg.dir.find("I") != -1: + count += 1 + if count != len(args): + raise Exception("Incorrect number of arguments: expected %d, got %d" % (count, len(args))) + for arg in method.arguments: if arg.dir.find("I") != -1: - self._encodeValue(sendCodec, args[aIdx], arg.type) + self.session._encodeValue(sendCodec, args[aIdx], arg.type) aIdx += 1 smsg = self.broker._message(sendCodec.encoded, "agent." + str(self.objectId.getBank())) self.broker.cv.acquire() @@ -777,53 +910,6 @@ class Object(object): bit = 0 return excludeList - def _decodeValue(self, codec, typecode): - """ Decode, from the codec, a value based on its typecode. """ - if typecode == 1: data = codec.read_uint8() # U8 - elif typecode == 2: data = codec.read_uint16() # U16 - elif typecode == 3: data = codec.read_uint32() # U32 - elif typecode == 4: data = codec.read_uint64() # U64 - elif typecode == 6: data = str(codec.read_str8()) # SSTR - elif typecode == 7: data = codec.read_vbin32() # LSTR - elif typecode == 8: data = codec.read_int64() # ABSTIME - elif typecode == 9: data = codec.read_uint64() # DELTATIME - elif typecode == 10: data = ObjectId(codec) # REF - elif typecode == 11: data = codec.read_uint8() # BOOL - elif typecode == 12: data = codec.read_float() # FLOAT - elif typecode == 13: data = codec.read_double() # DOUBLE - elif typecode == 14: data = codec.read_uuid() # UUID - elif typecode == 15: data = codec.read_map() # FTABLE - elif typecode == 16: data = codec.read_int8() # S8 - elif typecode == 17: data = codec.read_int16() # S16 - elif typecode == 18: data = codec.read_int32() # S32 - elif typecode == 19: data = codec.read_int64() # S63 - else: - raise ValueError("Invalid type code: %d" % typecode) - return data - - def _encodeValue(self, codec, value, typecode): - """ Encode, into the codec, a value based on its typecode. """ - if typecode == 1: codec.write_uint8 (int(value)) # U8 - elif typecode == 2: codec.write_uint16 (int(value)) # U16 - elif typecode == 3: codec.write_uint32 (long(value)) # U32 - elif typecode == 4: codec.write_uint64 (long(value)) # U64 - elif typecode == 6: codec.write_str8 (value) # SSTR - elif typecode == 7: codec.write_vbin32 (value) # LSTR - elif typecode == 8: codec.write_int64 (long(value)) # ABSTIME - elif typecode == 9: codec.write_uint64 (long(value)) # DELTATIME - elif typecode == 10: value.encode (codec) # REF - elif typecode == 11: codec.write_uint8 (int(value)) # BOOL - elif typecode == 12: codec.write_float (float(value)) # FLOAT - elif typecode == 13: codec.write_double (double(value)) # DOUBLE - elif typecode == 14: codec.write_uuid (value) # UUID - elif typecode == 15: codec.write_map (value) # FTABLE - elif typecode == 16: codec.write_int8 (int(value)) # S8 - elif typecode == 17: codec.write_int16 (int(value)) # S16 - elif typecode == 18: codec.write_int32 (int(value)) # S32 - elif typecode == 19: codec.write_int64 (int(value)) # S64 - else: - raise ValueError ("Invalid type code: %d" % typecode) - class MethodResult(object): """ """ def __init__(self, status, text, outArgs): @@ -844,10 +930,12 @@ class Broker: SYNC_TIME = 10 def __init__(self, session, host, port, authMech, authUser, authPass): - self.session = session - self.host = host - self.port = port - self.agents = {} + self.session = session + self.host = host + self.port = port + self.authUser = authUser + self.authPass = authPass + self.agents = {} self.agents[0] = Agent(self, 0, "BrokerAgent") self.topicBound = False self.cv = Condition() @@ -857,17 +945,52 @@ class Broker: self.reqsOutstanding = 1 self.error = None self.brokerId = None - err = None + self.isConnected = False + self._tryToConnect() + + def isConnected(self): + return self.isConnected + + def getError(self): + return self.error + + def getBrokerId(self): + """ Get broker's unique identifier (UUID) """ + return self.brokerId + + def getSessionId(self): + """ Get the identifier of the AMQP session to the broker """ + return self.amqpSessionId + + def getAgents(self): + """ Get the list of agents reachable via this broker """ + return self.agents.values() + + def getAmqpSession(self): + """ Get the AMQP session object for this connected broker. """ + return self.amqpSession + + def __repr__(self): + if self.isConnected: + if self.port == 5672: + port = "" + else: + port = ":%d" % self.port + return "Broker connected at: amqp://%s%s" % (self.host, port) + else: + return "Disconnected Broker" + + def _tryToConnect(self): try: self.amqpSessionId = "%s.%d" % (os.uname()[1], os.getpid()) - self.conn = Connection(connect(host, port), username=authUser, password=authPass) + self.conn = Connection(connect(self.host, self.port), username=self.authUser, password=self.authPass) self.conn.start() self.replyName = "reply-%s" % self.amqpSessionId self.amqpSession = self.conn.session(self.amqpSessionId) self.amqpSession.auto_sync = True self.amqpSession.queue_declare(queue=self.replyName, exclusive=True, auto_delete=True) self.amqpSession.exchange_bind(exchange="amq.direct", - queue=self.replyName, binding_key=self.replyName) + queue=self.replyName, binding_key=self.replyName) self.amqpSession.message_subscribe(queue=self.replyName, destination="rdest") self.amqpSession.incoming("rdest").listen(self._replyCb, self._exceptionCb) self.amqpSession.message_set_flow_mode(destination="rdest", flow_mode=1) @@ -883,50 +1006,20 @@ class Broker: self.amqpSession.message_flow(destination="tdest", unit=0, value=0xFFFFFFFF) self.amqpSession.message_flow(destination="tdest", unit=1, value=0xFFFFFFFF) + self.isConnected = True + self.session._handleBrokerConnect(self) + codec = Codec(self.conn.spec) self._setHeader(codec, 'B') msg = self._message(codec.encoded) self._send(msg) except socket.error, e: - err = "Socket Error %s - %s" % (e[0], e[1]) + self.error = "Socket Error %s - %s" % (e[0], e[1]) except Closed, e: - err = "Connect Failed %d - %s" % (e[0], e[1]) + self.error = "Connect Failed %d - %s" % (e[0], e[1]) except ConnectionFailed, e: - err = "Connect Failed %d - %s" % (e[0], e[1]) - - self.active = True - if err != None: - raise Exception(err) - - def getBrokerId(self): - """ Get broker's unique identifier (UUID) """ - return self.brokerId - - def getSessionId(self): - """ Get the identifier of the AMQP session to the broker """ - return self.amqpSessionId - - def getAgents(self): - """ Get the list of agents reachable via this broker """ - return self.agents.values() - - def getAmqpSession(self): - """ Get the AMQP session object for this connected broker. """ - return self.amqpSession - - def isConnected(self): - return self.active - - def __repr__(self): - if self.active: - if self.port == 5672: - port = "" - else: - port = ":%d" % self.port - return "Broker connected at: amqp://%s%s" % (self.host, port) - else: - return "Disconnected Broker" + self.error = "Connect Failed %d - %s" % (e[0], e[1]) def _updateAgent(self, obj): if obj.deleteTime == 0: @@ -975,12 +1068,12 @@ class Broker: self.amqpSession.message_transfer(destination=dest, message=msg) def _shutdown(self): - if self.active: + if self.isConnected: self.amqpSession.incoming("rdest").stop() if self.session.console != None: self.amqpSession.incoming("tdest").stop() self.amqpSession.close() - self.active = False + self.isConnected = False else: raise Exception("Broker already disconnected") @@ -1008,18 +1101,21 @@ class Broker: self.reqsOutstanding -= 1 if self.reqsOutstanding == 0 and not self.topicBound and self.session.console != None: self.topicBound = True - self.amqpSession.exchange_bind(exchange="qpid.management", - queue=self.topicName, binding_key="mgmt.#") + for key in self.session.bindingKeyList: + self.amqpSession.exchange_bind(exchange="qpid.management", + queue=self.topicName, binding_key=key) if self.reqsOutstanding == 0 and self.syncInFlight: self.syncInFlight = False self.cv.notify() self.cv.release() def _replyCb(self, msg): + self.amqpSession.message_accept(RangedSet(msg.id)) codec = Codec(self.conn.spec, msg.body) opcode, seq = self._checkHeader(codec) if opcode == None: return + if opcode == 'b': self.session._handleBrokerResp (self, codec, seq) elif opcode == 'p': self.session._handlePackageInd (self, codec, seq) elif opcode == 'z': self.session._handleCommandComplete (self, codec, seq) @@ -1033,13 +1129,14 @@ class Broker: elif opcode == 'g': self.session._handleContentInd (self, codec, seq, prop=True, stat=True) def _exceptionCb(self, data): - self.active = False + self.isConnected = False self.error = data self.cv.acquire() if self.syncInFlight: self.cv.notify() self.cv.release() self.session._handleError(self.error) + self.session._handleBrokerDisconnect(self) class Agent: """ """ @@ -1053,8 +1150,46 @@ class Agent: class Event: """ """ - def __init__(self): - pass + def __init__(self, session, codec): + self.session = session + self.timestamp = codec.read_int64() + self.objectId = ObjectId(codec) + pname = codec.read_str8() + cname = codec.read_str8() + hash = codec.read_bin128() + self.classKey = (pname, cname, hash) + self.name = codec.read_str8() + if pname in session.packages: + if (cname, hash) in session.packages[pname]: + schema = session.packages[pname][(cname, hash)] + for event in schema.getEvents(): + if event.name == self.name: + self.schemaEvent = event + self.arguments = {} + for arg in event.arguments: + self.arguments[arg.name] = session._decodeValue(codec, arg.type) + + def __repr__(self): + return self.getSyslogText() + + def getClassKey(self): + return self.classKey + + def getArguments(self): + return self.arguments + + def getTimestamp(self): + return self.timerstamp + + def getName(self): + return self.name + + def getSyslogText(self): + out = strftime("%c", gmtime(self.timestamp / 1000000000)) + out += " " + self.classKey[0] + ":" + self.classKey[1] + " " + self.name + for arg in self.schemaEvent.arguments: + out += " " + arg.name + "=" + self.session._displayValue(self.arguments[arg.name], arg.type) + return out class SequenceManager: """ Manage sequence numbers for asynchronous method calls """ @@ -1085,6 +1220,12 @@ class SequenceManager: class DebugConsole(Console): """ """ + def brokerConnected(self, broker): + print "brokerConnected:", broker + + def brokerDisconnected(self, broker): + print "brokerDisconnected:", broker + def newPackage(self, name): print "newPackage:", name |