summaryrefslogtreecommitdiff
path: root/python/qpid/qmfconsole.py
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2008-09-24 18:03:01 +0000
committerTed Ross <tross@apache.org>2008-09-24 18:03:01 +0000
commitb80fd373fbee66c47245d62889ecf962de077b3e (patch)
treec6a5c97eafd05ebb492e44d7ce91b2879d40daaa /python/qpid/qmfconsole.py
parenta2a56cf9a7483e165fb579d0b519b284d02009e3 (diff)
downloadqpid-python-b80fd373fbee66c47245d62889ecf962de077b3e.tar.gz
Added event handling, did some code cleanup and fixed some small bugs
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@698678 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/qmfconsole.py')
-rw-r--r--python/qpid/qmfconsole.py375
1 files changed, 258 insertions, 117 deletions
diff --git a/python/qpid/qmfconsole.py b/python/qpid/qmfconsole.py
index 435da475d7..4d06e4a725 100644
--- a/python/qpid/qmfconsole.py
+++ b/python/qpid/qmfconsole.py
@@ -26,18 +26,25 @@ import socket
import re
from qpid.peer import Closed
from qpid.connection import Connection, ConnectionFailed
-from qpid.datatypes import uuid4
+from qpid.datatypes import uuid4, Message, RangedSet
from qpid.util import connect
-from datatypes import Message, RangedSet
+from qpid.codec010 import StringCodec as Codec
from threading import Lock, Condition
-from codec010 import StringCodec as Codec
-from time import time
+from time import time, strftime, gmtime
from cStringIO import StringIO
class Console:
""" To access the asynchronous operations, a class must be derived from
Console with overrides of any combination of the available methods. """
+ def brokerConnected(self, broker):
+ """ Invoked when a connection is established to a broker """
+ pass
+
+ def brokerDisconnected(self, broker):
+ """ Invoked when the connection to a broker is lost """
+ pass
+
def newPackage(self, name):
""" Invoked when a QMF package is discovered. """
pass
@@ -106,20 +113,38 @@ class Session:
GET_WAIT_TIME = 10
- def __init__(self, console=None):
+ def __init__(self, console=None, rcvObjects=True, rcvEvents=True, rcvHeartbeats=True,
+ manageConnections=False):
"""
Initialize a session. If the console argument is provided, the
more advanced asynchronous features are available. If console is
defaulted, the session will operate in a simpler, synchronous manner.
+
+ The rcvObjects, rcvEvents, and rcvHeartbeats arguments are meaningful only if 'console'
+ is provided. They control whether object updates, events, and agent-heartbeats are
+ subscribed to. If the console is not interested in receiving one or more of the above,
+ setting the argument to False will reduce tha bandwidth used by the API.
+
+ If manageConnections is set to True, the Session object will manage connections to
+ the brokers. This means that if a broker is unreachable, it will retry until a connection
+ can be established. If a connection is lost, the Session will attempt to reconnect.
+
+ If manageConnections is set to False, the user is responsible for handing failures. In
+ this case, an unreachable broker will cause addBroker to raise an exception.
"""
- self.console = console
- self.brokers = []
- self.packages = {}
- self.seqMgr = SequenceManager()
- self.cv = Condition()
- self.syncSequenceList = []
- self.getResult = []
- self.error = None
+ self.console = console
+ self.brokers = []
+ self.packages = {}
+ self.seqMgr = SequenceManager()
+ self.cv = Condition()
+ self.syncSequenceList = []
+ self.getResult = []
+ self.error = None
+ self.bindingKeyList = self._bindingKeys(rcvObjects, rcvEvents, rcvHeartbeats)
+ self.manageConnections = manageConnections
+
+ if (manageConnections):
+ raise Exception("manageConnections - not yet implemented")
def __repr__(self):
return "QMF Console Session Manager (brokers connected: %d)" % len(self.brokers)
@@ -128,6 +153,9 @@ class Session:
""" Connect to a Qpid broker. Returns an object of type Broker. """
url = BrokerURL(target)
broker = Broker(self, url.host, url.port, url.authMech, url.authName, url.authPass)
+ if not broker.isConnected and not self.manageConnections:
+ raise Exception(broker.error)
+
self.brokers.append(broker)
self.getObjects(broker=broker, name="agent")
return broker
@@ -272,6 +300,26 @@ class Session:
""" """
pass
+ def _bindingKeys(self, rcvObjects, rcvEvents, rcvHeartbeats):
+ keyList = []
+ if rcvObjects and rcvEvents and rcvHeartbeats:
+ keyList.append("mgmt.#")
+ else:
+ if rcvObjects:
+ keyList.append("mgmt.*.prop.#")
+ keyList.append("mgmt.*.stat.#")
+ if rcvEvents:
+ keyList.append("mgmt.event")
+ if rcvHeartbeats:
+ keyList.append("mgmt.*.heartbeat.#")
+ return keyList
+
+ def _handleBrokerConnect(self, broker):
+ pass
+
+ def _handleBrokerDisconnect(self, broker):
+ pass
+
def _handleBrokerResp(self, broker, codec, seq):
broker.brokerId = codec.read_uuid()
if self.console != None:
@@ -355,7 +403,7 @@ class Session:
if code == 0:
for arg in method.arguments:
if arg.dir.find("O") != -1:
- outArgs[arg.name] = obj._decodeValue(codec, arg.type)
+ outArgs[arg.name] = self._decodeValue(codec, arg.type)
broker.cv.acquire()
broker.syncResult = MethodResult(code, text, outArgs)
broker.syncInFlight = False
@@ -364,10 +412,13 @@ class Session:
def _handleHeartbeatInd(self, broker, codec, seq):
timestamp = codec.read_uint64()
- pass
+ if self.console != None:
+ self.console.heartbeat(None, timestamp)
def _handleEventInd(self, broker, codec, seq):
- pass
+ if self.console != None:
+ event = Event(self, codec)
+ self.console.event(broker, event)
def _handleSchemaResp(self, broker, codec, seq):
pname = str(codec.read_str8())
@@ -402,7 +453,8 @@ class Session:
self.cv.acquire()
if seq in self.syncSequenceList:
- self.getResult.append(object)
+ if object.getTimestamps()[2] == 0:
+ self.getResult.append(object)
self.cv.release()
return
self.cv.release()
@@ -420,6 +472,79 @@ class Session:
self.cv.notify()
self.cv.release()
+ def _decodeValue(self, codec, typecode):
+ """ Decode, from the codec, a value based on its typecode. """
+ if typecode == 1: data = codec.read_uint8() # U8
+ elif typecode == 2: data = codec.read_uint16() # U16
+ elif typecode == 3: data = codec.read_uint32() # U32
+ elif typecode == 4: data = codec.read_uint64() # U64
+ elif typecode == 6: data = str(codec.read_str8()) # SSTR
+ elif typecode == 7: data = codec.read_vbin32() # LSTR
+ elif typecode == 8: data = codec.read_int64() # ABSTIME
+ elif typecode == 9: data = codec.read_uint64() # DELTATIME
+ elif typecode == 10: data = ObjectId(codec) # REF
+ elif typecode == 11: data = codec.read_uint8() # BOOL
+ elif typecode == 12: data = codec.read_float() # FLOAT
+ elif typecode == 13: data = codec.read_double() # DOUBLE
+ elif typecode == 14: data = codec.read_uuid() # UUID
+ elif typecode == 15: data = codec.read_map() # FTABLE
+ elif typecode == 16: data = codec.read_int8() # S8
+ elif typecode == 17: data = codec.read_int16() # S16
+ elif typecode == 18: data = codec.read_int32() # S32
+ elif typecode == 19: data = codec.read_int64() # S63
+ else:
+ raise ValueError("Invalid type code: %d" % typecode)
+ return data
+
+ def _encodeValue(self, codec, value, typecode):
+ """ Encode, into the codec, a value based on its typecode. """
+ if typecode == 1: codec.write_uint8 (int(value)) # U8
+ elif typecode == 2: codec.write_uint16 (int(value)) # U16
+ elif typecode == 3: codec.write_uint32 (long(value)) # U32
+ elif typecode == 4: codec.write_uint64 (long(value)) # U64
+ elif typecode == 6: codec.write_str8 (value) # SSTR
+ elif typecode == 7: codec.write_vbin32 (value) # LSTR
+ elif typecode == 8: codec.write_int64 (long(value)) # ABSTIME
+ elif typecode == 9: codec.write_uint64 (long(value)) # DELTATIME
+ elif typecode == 10: value.encode (codec) # REF
+ elif typecode == 11: codec.write_uint8 (int(value)) # BOOL
+ elif typecode == 12: codec.write_float (float(value)) # FLOAT
+ elif typecode == 13: codec.write_double (double(value)) # DOUBLE
+ elif typecode == 14: codec.write_uuid (value) # UUID
+ elif typecode == 15: codec.write_map (value) # FTABLE
+ elif typecode == 16: codec.write_int8 (int(value)) # S8
+ elif typecode == 17: codec.write_int16 (int(value)) # S16
+ elif typecode == 18: codec.write_int32 (int(value)) # S32
+ elif typecode == 19: codec.write_int64 (int(value)) # S64
+ else:
+ raise ValueError ("Invalid type code: %d" % typecode)
+
+ def _displayValue(self, value, typecode):
+ """ """
+ if typecode == 1: return str(value)
+ elif typecode == 2: return str(value)
+ elif typecode == 3: return str(value)
+ elif typecode == 4: return str(value)
+ elif typecode == 6: return str(value)
+ elif typecode == 7: return str(value)
+ elif typecode == 8: return strftime("%c", gmtime(value / 1000000000))
+ elif typecode == 9: return str(value)
+ elif typecode == 10: return value.__repr__()
+ elif typecode == 11:
+ if value: return 'T'
+ else: return 'F'
+ elif typecode == 12: return str(value)
+ elif typecode == 13: return str(value)
+ elif typecode == 14: return "%08x-%04x-%04x-%04x-%04x%08x" % struct.unpack("!LHHHHL", value)
+ elif typecode == 15: return value.__repr__()
+ elif typecode == 16: return str(value)
+ elif typecode == 17: return str(value)
+ elif typecode == 18: return str(value)
+ elif typecode == 19: return str(value)
+ else:
+ raise ValueError ("Invalid type code: %d" % typecode)
+
+
class Package:
""" """
def __init__(self, name):
@@ -672,10 +797,10 @@ class Object(object):
if property.name in notPresent:
self.properties.append((property, None))
else:
- self.properties.append((property, self._decodeValue(codec, property.type)))
+ self.properties.append((property, self.session._decodeValue(codec, property.type)))
if stat:
for statistic in schema.getStatistics():
- self.statistics.append((statistic, self._decodeValue(codec, statistic.type)))
+ self.statistics.append((statistic, self.session._decodeValue(codec, statistic.type)))
def getObjectId(self):
""" Return the object identifier for this object """
@@ -735,9 +860,17 @@ class Object(object):
sendCodec.write_str8(cname)
sendCodec.write_bin128(hash)
sendCodec.write_str8(name)
+
+ count = 0
+ for arg in method.arguments:
+ if arg.dir.find("I") != -1:
+ count += 1
+ if count != len(args):
+ raise Exception("Incorrect number of arguments: expected %d, got %d" % (count, len(args)))
+
for arg in method.arguments:
if arg.dir.find("I") != -1:
- self._encodeValue(sendCodec, args[aIdx], arg.type)
+ self.session._encodeValue(sendCodec, args[aIdx], arg.type)
aIdx += 1
smsg = self.broker._message(sendCodec.encoded, "agent." + str(self.objectId.getBank()))
self.broker.cv.acquire()
@@ -777,53 +910,6 @@ class Object(object):
bit = 0
return excludeList
- def _decodeValue(self, codec, typecode):
- """ Decode, from the codec, a value based on its typecode. """
- if typecode == 1: data = codec.read_uint8() # U8
- elif typecode == 2: data = codec.read_uint16() # U16
- elif typecode == 3: data = codec.read_uint32() # U32
- elif typecode == 4: data = codec.read_uint64() # U64
- elif typecode == 6: data = str(codec.read_str8()) # SSTR
- elif typecode == 7: data = codec.read_vbin32() # LSTR
- elif typecode == 8: data = codec.read_int64() # ABSTIME
- elif typecode == 9: data = codec.read_uint64() # DELTATIME
- elif typecode == 10: data = ObjectId(codec) # REF
- elif typecode == 11: data = codec.read_uint8() # BOOL
- elif typecode == 12: data = codec.read_float() # FLOAT
- elif typecode == 13: data = codec.read_double() # DOUBLE
- elif typecode == 14: data = codec.read_uuid() # UUID
- elif typecode == 15: data = codec.read_map() # FTABLE
- elif typecode == 16: data = codec.read_int8() # S8
- elif typecode == 17: data = codec.read_int16() # S16
- elif typecode == 18: data = codec.read_int32() # S32
- elif typecode == 19: data = codec.read_int64() # S63
- else:
- raise ValueError("Invalid type code: %d" % typecode)
- return data
-
- def _encodeValue(self, codec, value, typecode):
- """ Encode, into the codec, a value based on its typecode. """
- if typecode == 1: codec.write_uint8 (int(value)) # U8
- elif typecode == 2: codec.write_uint16 (int(value)) # U16
- elif typecode == 3: codec.write_uint32 (long(value)) # U32
- elif typecode == 4: codec.write_uint64 (long(value)) # U64
- elif typecode == 6: codec.write_str8 (value) # SSTR
- elif typecode == 7: codec.write_vbin32 (value) # LSTR
- elif typecode == 8: codec.write_int64 (long(value)) # ABSTIME
- elif typecode == 9: codec.write_uint64 (long(value)) # DELTATIME
- elif typecode == 10: value.encode (codec) # REF
- elif typecode == 11: codec.write_uint8 (int(value)) # BOOL
- elif typecode == 12: codec.write_float (float(value)) # FLOAT
- elif typecode == 13: codec.write_double (double(value)) # DOUBLE
- elif typecode == 14: codec.write_uuid (value) # UUID
- elif typecode == 15: codec.write_map (value) # FTABLE
- elif typecode == 16: codec.write_int8 (int(value)) # S8
- elif typecode == 17: codec.write_int16 (int(value)) # S16
- elif typecode == 18: codec.write_int32 (int(value)) # S32
- elif typecode == 19: codec.write_int64 (int(value)) # S64
- else:
- raise ValueError ("Invalid type code: %d" % typecode)
-
class MethodResult(object):
""" """
def __init__(self, status, text, outArgs):
@@ -844,10 +930,12 @@ class Broker:
SYNC_TIME = 10
def __init__(self, session, host, port, authMech, authUser, authPass):
- self.session = session
- self.host = host
- self.port = port
- self.agents = {}
+ self.session = session
+ self.host = host
+ self.port = port
+ self.authUser = authUser
+ self.authPass = authPass
+ self.agents = {}
self.agents[0] = Agent(self, 0, "BrokerAgent")
self.topicBound = False
self.cv = Condition()
@@ -857,17 +945,52 @@ class Broker:
self.reqsOutstanding = 1
self.error = None
self.brokerId = None
- err = None
+ self.isConnected = False
+ self._tryToConnect()
+
+ def isConnected(self):
+ return self.isConnected
+
+ def getError(self):
+ return self.error
+
+ def getBrokerId(self):
+ """ Get broker's unique identifier (UUID) """
+ return self.brokerId
+
+ def getSessionId(self):
+ """ Get the identifier of the AMQP session to the broker """
+ return self.amqpSessionId
+
+ def getAgents(self):
+ """ Get the list of agents reachable via this broker """
+ return self.agents.values()
+
+ def getAmqpSession(self):
+ """ Get the AMQP session object for this connected broker. """
+ return self.amqpSession
+
+ def __repr__(self):
+ if self.isConnected:
+ if self.port == 5672:
+ port = ""
+ else:
+ port = ":%d" % self.port
+ return "Broker connected at: amqp://%s%s" % (self.host, port)
+ else:
+ return "Disconnected Broker"
+
+ def _tryToConnect(self):
try:
self.amqpSessionId = "%s.%d" % (os.uname()[1], os.getpid())
- self.conn = Connection(connect(host, port), username=authUser, password=authPass)
+ self.conn = Connection(connect(self.host, self.port), username=self.authUser, password=self.authPass)
self.conn.start()
self.replyName = "reply-%s" % self.amqpSessionId
self.amqpSession = self.conn.session(self.amqpSessionId)
self.amqpSession.auto_sync = True
self.amqpSession.queue_declare(queue=self.replyName, exclusive=True, auto_delete=True)
self.amqpSession.exchange_bind(exchange="amq.direct",
- queue=self.replyName, binding_key=self.replyName)
+ queue=self.replyName, binding_key=self.replyName)
self.amqpSession.message_subscribe(queue=self.replyName, destination="rdest")
self.amqpSession.incoming("rdest").listen(self._replyCb, self._exceptionCb)
self.amqpSession.message_set_flow_mode(destination="rdest", flow_mode=1)
@@ -883,50 +1006,20 @@ class Broker:
self.amqpSession.message_flow(destination="tdest", unit=0, value=0xFFFFFFFF)
self.amqpSession.message_flow(destination="tdest", unit=1, value=0xFFFFFFFF)
+ self.isConnected = True
+ self.session._handleBrokerConnect(self)
+
codec = Codec(self.conn.spec)
self._setHeader(codec, 'B')
msg = self._message(codec.encoded)
self._send(msg)
except socket.error, e:
- err = "Socket Error %s - %s" % (e[0], e[1])
+ self.error = "Socket Error %s - %s" % (e[0], e[1])
except Closed, e:
- err = "Connect Failed %d - %s" % (e[0], e[1])
+ self.error = "Connect Failed %d - %s" % (e[0], e[1])
except ConnectionFailed, e:
- err = "Connect Failed %d - %s" % (e[0], e[1])
-
- self.active = True
- if err != None:
- raise Exception(err)
-
- def getBrokerId(self):
- """ Get broker's unique identifier (UUID) """
- return self.brokerId
-
- def getSessionId(self):
- """ Get the identifier of the AMQP session to the broker """
- return self.amqpSessionId
-
- def getAgents(self):
- """ Get the list of agents reachable via this broker """
- return self.agents.values()
-
- def getAmqpSession(self):
- """ Get the AMQP session object for this connected broker. """
- return self.amqpSession
-
- def isConnected(self):
- return self.active
-
- def __repr__(self):
- if self.active:
- if self.port == 5672:
- port = ""
- else:
- port = ":%d" % self.port
- return "Broker connected at: amqp://%s%s" % (self.host, port)
- else:
- return "Disconnected Broker"
+ self.error = "Connect Failed %d - %s" % (e[0], e[1])
def _updateAgent(self, obj):
if obj.deleteTime == 0:
@@ -975,12 +1068,12 @@ class Broker:
self.amqpSession.message_transfer(destination=dest, message=msg)
def _shutdown(self):
- if self.active:
+ if self.isConnected:
self.amqpSession.incoming("rdest").stop()
if self.session.console != None:
self.amqpSession.incoming("tdest").stop()
self.amqpSession.close()
- self.active = False
+ self.isConnected = False
else:
raise Exception("Broker already disconnected")
@@ -1008,18 +1101,21 @@ class Broker:
self.reqsOutstanding -= 1
if self.reqsOutstanding == 0 and not self.topicBound and self.session.console != None:
self.topicBound = True
- self.amqpSession.exchange_bind(exchange="qpid.management",
- queue=self.topicName, binding_key="mgmt.#")
+ for key in self.session.bindingKeyList:
+ self.amqpSession.exchange_bind(exchange="qpid.management",
+ queue=self.topicName, binding_key=key)
if self.reqsOutstanding == 0 and self.syncInFlight:
self.syncInFlight = False
self.cv.notify()
self.cv.release()
def _replyCb(self, msg):
+ self.amqpSession.message_accept(RangedSet(msg.id))
codec = Codec(self.conn.spec, msg.body)
opcode, seq = self._checkHeader(codec)
if opcode == None:
return
+
if opcode == 'b': self.session._handleBrokerResp (self, codec, seq)
elif opcode == 'p': self.session._handlePackageInd (self, codec, seq)
elif opcode == 'z': self.session._handleCommandComplete (self, codec, seq)
@@ -1033,13 +1129,14 @@ class Broker:
elif opcode == 'g': self.session._handleContentInd (self, codec, seq, prop=True, stat=True)
def _exceptionCb(self, data):
- self.active = False
+ self.isConnected = False
self.error = data
self.cv.acquire()
if self.syncInFlight:
self.cv.notify()
self.cv.release()
self.session._handleError(self.error)
+ self.session._handleBrokerDisconnect(self)
class Agent:
""" """
@@ -1053,8 +1150,46 @@ class Agent:
class Event:
""" """
- def __init__(self):
- pass
+ def __init__(self, session, codec):
+ self.session = session
+ self.timestamp = codec.read_int64()
+ self.objectId = ObjectId(codec)
+ pname = codec.read_str8()
+ cname = codec.read_str8()
+ hash = codec.read_bin128()
+ self.classKey = (pname, cname, hash)
+ self.name = codec.read_str8()
+ if pname in session.packages:
+ if (cname, hash) in session.packages[pname]:
+ schema = session.packages[pname][(cname, hash)]
+ for event in schema.getEvents():
+ if event.name == self.name:
+ self.schemaEvent = event
+ self.arguments = {}
+ for arg in event.arguments:
+ self.arguments[arg.name] = session._decodeValue(codec, arg.type)
+
+ def __repr__(self):
+ return self.getSyslogText()
+
+ def getClassKey(self):
+ return self.classKey
+
+ def getArguments(self):
+ return self.arguments
+
+ def getTimestamp(self):
+ return self.timerstamp
+
+ def getName(self):
+ return self.name
+
+ def getSyslogText(self):
+ out = strftime("%c", gmtime(self.timestamp / 1000000000))
+ out += " " + self.classKey[0] + ":" + self.classKey[1] + " " + self.name
+ for arg in self.schemaEvent.arguments:
+ out += " " + arg.name + "=" + self.session._displayValue(self.arguments[arg.name], arg.type)
+ return out
class SequenceManager:
""" Manage sequence numbers for asynchronous method calls """
@@ -1085,6 +1220,12 @@ class SequenceManager:
class DebugConsole(Console):
""" """
+ def brokerConnected(self, broker):
+ print "brokerConnected:", broker
+
+ def brokerDisconnected(self, broker):
+ print "brokerDisconnected:", broker
+
def newPackage(self, name):
print "newPackage:", name