diff options
author | Ted Ross <tross@apache.org> | 2008-11-21 13:53:53 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2008-11-21 13:53:53 +0000 |
commit | cb2b4af29370fd3cd3494cd58fc0d532c568dc21 (patch) | |
tree | d4414a96651fdebae2495112ba02d7014d9e50f3 /python/qmf/console.py | |
parent | c653274f9dee056e20e32ee7d75150f55315af56 (diff) | |
download | qpid-python-cb2b4af29370fd3cd3494cd58fc0d532c568dc21.tar.gz |
code cleanup on qmf console API
Formalized schema-class as an object with an all-string __repr__
Added additional testing for the console API
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@719580 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qmf/console.py')
-rw-r--r-- | python/qmf/console.py | 143 |
1 files changed, 85 insertions, 58 deletions
diff --git a/python/qmf/console.py b/python/qmf/console.py index bdd93e6f94..22c499e40a 100644 --- a/python/qmf/console.py +++ b/python/qmf/console.py @@ -171,11 +171,11 @@ class Session: def __repr__(self): return "QMF Console Session Manager (brokers connected: %d)" % len(self.brokers) - def addBroker(self, target="localhost"): + def addBroker(self, target="localhost", initialTopicCredits=0xFFFFFFFF): """ Connect to a Qpid broker. Returns an object of type Broker. """ url = BrokerURL(target) broker = Broker(self, url.host, url.port, url.authMech, url.authName, url.authPass, - ssl = url.scheme == URL.AMQPS) + ssl = url.scheme == URL.AMQPS, topicCredits=initialTopicCredits) if not broker.isConnected and not self.manageConnections: raise Exception(broker.error) @@ -205,18 +205,19 @@ class Session: broker._waitForStable() list = [] if packageName in self.packages: - for cname, hash in self.packages[packageName]: - list.append((packageName, cname, hash)) + for pkey in self.packages[packageName]: + list.append(self.packages[packageName][pkey].getKey()) return list def getSchema(self, classKey): """ Get the schema for a QMF class """ for broker in self.brokers: broker._waitForStable() - pname, cname, hash = classKey + pname = classKey.getPackageName() + pkey = classKey.getPackageKey() if pname in self.packages: - if (cname, hash) in self.packages[pname]: - return self.packages[pname][(cname, hash)] + if pkey in self.packages[pname]: + return self.packages[pname][pkey] def bindPackage(self, packageName): """ """ @@ -230,7 +231,8 @@ class Session: """ """ if not self.userBindings or not self.rcvObjects: raise Exception("userBindings option not set for Session") - pname, cname, hash = classKey + pname = classKey.getPackageName() + cname = classKey.getClassName() for broker in self.brokers: broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName, binding_key="console.obj.*.*.%s.%s.#" % (pname, cname)) @@ -297,14 +299,17 @@ class Session: for agent in broker.getAgents(): agentList.append(agent) + pname = None cname = None - if "_schema" in kwargs: pname, cname, hash = kwargs["_schema"].getKey() - elif "_key" in kwargs: pname, cname, hash = kwargs["_key"] + hash = None + classKey = None + if "_schema" in kwargs: classKey = kwargs["_schema"].getKey() + elif "_key" in kwargs: classKey = kwargs["_key"] elif "_class" in kwargs: - pname, cname, hash = None, kwargs["_class"], None + cname = kwargs["_class"] if "_package" in kwargs: pname = kwargs["_package"] - if cname == None and "_objectId" not in kwargs: + if cname == None and classKey == None and "_objectId" not in kwargs: raise Exception("No class supplied, use '_schema', '_key', '_class', or '_objectId' argument") map = {} @@ -312,6 +317,10 @@ class Session: if "_objectId" in kwargs: map["_objectid"] = kwargs["_objectId"].__repr__() else: + if cname == None: + cname = classKey.getClassName() + pname = classKey.getPackageName() + hash = classKey.getHash() map["_class"] = cname if pname != None: map["_package"] = pname if hash != None: map["_hash"] = hash @@ -442,15 +451,13 @@ class Session: def _handleClassInd(self, broker, codec, seq): kind = codec.read_uint8() - pname = str(codec.read_str8()) - cname = str(codec.read_str8()) - hash = codec.read_bin128() + classKey = ClassKey(codec) unknown = False try: self.cv.acquire() - if pname in self.packages: - if (cname, hash) not in self.packages[pname]: + if classKey.getPackageName() in self.packages: + if classKey.getPackageKey() not in self.packages[classKey.getPackageName()]: unknown = True finally: self.cv.release() @@ -461,9 +468,7 @@ class Session: sendCodec = Codec(broker.conn.spec) seq = self.seqMgr._reserve(self._CONTEXT_STARTUP) broker._setHeader(sendCodec, 'S', seq) - sendCodec.write_str8(pname) - sendCodec.write_str8(cname) - sendCodec.write_bin128(hash) + classKey.encode(sendCodec) smsg = broker._message(sendCodec.encoded) broker._send(smsg) @@ -512,14 +517,11 @@ class Session: def _handleSchemaResp(self, broker, codec, seq): kind = codec.read_uint8() - pname = str(codec.read_str8()) - cname = str(codec.read_str8()) - hash = codec.read_bin128() - classKey = (pname, cname, hash) + classKey = ClassKey(codec) _class = SchemaClass(kind, classKey, codec) try: self.cv.acquire() - self.packages[pname][(cname, hash)] = _class + self.packages[classKey.getPackageName()][classKey.getPackageKey()] = _class finally: self.cv.release() @@ -529,22 +531,21 @@ class Session: self.console.newClass(kind, classKey) def _handleContentInd(self, broker, codec, seq, prop=False, stat=False): - pname = str(codec.read_str8()) - cname = str(codec.read_str8()) - hash = codec.read_bin128() - classKey = (pname, cname, hash) + classKey = ClassKey(codec) try: self.cv.acquire() + pname = classKey.getPackageName() if pname not in self.packages: return - if (cname, hash) not in self.packages[pname]: + pkey = classKey.getPackageKey() + if pkey not in self.packages[pname]: return - schema = self.packages[pname][(cname, hash)] + schema = self.packages[pname][pkey] finally: self.cv.release() object = Object(self, broker, schema, codec, prop, stat) - if pname == "org.apache.qpid.broker" and cname == "agent": + if pname == "org.apache.qpid.broker" and classKey.getClassName() == "agent": broker._updateAgent(object) try: @@ -664,10 +665,7 @@ class Session: seq = self.seqMgr._reserve((method, False)) broker._setHeader(sendCodec, 'M', seq) objectId.encode(sendCodec) - pname, cname, hash = schemaKey - sendCodec.write_str8(pname) - sendCodec.write_str8(cname) - sendCodec.write_bin128(hash) + schemaKey.encode(sendCodec) sendCodec.write_str8(name) count = 0 @@ -692,6 +690,36 @@ class Package: def __init__(self, name): self.name = name +class ClassKey: + """ """ + def __init__(self, codec): + self.pname = str(codec.read_str8()) + self.cname = str(codec.read_str8()) + self.hash = codec.read_bin128() + + def encode(self, codec): + codec.write_str8(self.pname) + codec.write_str8(self.cname) + codec.write_bin128(self.hash) + + def getPackageName(self): + return self.pname + + def getClassName(self): + return self.cname + + def getHash(self): + return self.hash + + def getHashString(self): + return "%08x-%04x-%04x-%04x-%04x%08x" % struct.unpack ("!LHHHHL", self.hash) + + def getPackageKey(self): + return (self.cname, self.hash) + + def __repr__(self): + return self.pname + ":" + self.cname + "(" + self.getHashString() + ")" + class SchemaClass: """ """ CLASS_KIND_TABLE = 1 @@ -722,15 +750,13 @@ class SchemaClass: self.arguments.append(SchemaArgument(codec, methodArg=False)) def __repr__(self): - pname, cname, hash = self.classKey if self.kind == self.CLASS_KIND_TABLE: kindStr = "Table" elif self.kind == self.CLASS_KIND_EVENT: kindStr = "Event" else: kindStr = "Unsupported" - result = "%s Class: %s:%s " % (kindStr, pname, cname) - result += "(%08x-%04x-%04x-%04x-%04x%08x)" % struct.unpack ("!LHHHHL", hash) + result = "%s Class: %s " % (kindStr, self.classKey.__repr__()) return result def getKey(self): @@ -1003,10 +1029,7 @@ class Object(object): seq = self._session.seqMgr._reserve((method, synchronous)) self._broker._setHeader(sendCodec, 'M', seq) self._objectId.encode(sendCodec) - pname, cname, hash = self._schema.getKey() - sendCodec.write_str8(pname) - sendCodec.write_str8(cname) - sendCodec.write_bin128(hash) + self._schema.getKey().encode(sendCodec) sendCodec.write_str8(name) count = 0 @@ -1085,14 +1108,16 @@ class Broker: """ """ SYNC_TIME = 60 - def __init__(self, session, host, port, authMech, authUser, authPass, ssl=False): + def __init__(self, session, host, port, authMech, authUser, authPass, + ssl=False, topicCredits=0xFFFFFFFF): self.session = session - self.host = host - self.port = port + self.host = host + self.port = port self.ssl = ssl self.authUser = authUser self.authPass = authPass - self.agents = {} + self.topicCredits = topicCredits + self.agents = {} self.agents["1.0"] = Agent(self, "1.0", "BrokerAgent") self.topicBound = False self.cv = Condition() @@ -1100,8 +1125,8 @@ class Broker: self.syncRequest = 0 self.syncResult = None self.reqsOutstanding = 1 - self.error = None - self.brokerId = None + self.error = None + self.brokerId = None self.isConnected = False self.amqpSessionId = "%s.%d" % (os.uname()[1], os.getpid()) self._tryToConnect() @@ -1152,6 +1177,9 @@ class Broker: auth = "" return "amqp%s://%s%s:%d" % (ssl, auth, self.host, self.port or 5672) + def replenishCredits(self, credits): + self.amqpSession.message_flow(destination="tdest", unit=0, value=credits) + def __repr__(self): if self.isConnected: return "Broker connected at: %s" % self.getUrl() @@ -1185,8 +1213,8 @@ class Broker: accept_mode=self.amqpSession.accept_mode.none, acquire_mode=self.amqpSession.acquire_mode.pre_acquired) self.amqpSession.incoming("tdest").listen(self._replyCb) - self.amqpSession.message_set_flow_mode(destination="tdest", flow_mode=1) - self.amqpSession.message_flow(destination="tdest", unit=0, value=0xFFFFFFFF) + self.amqpSession.message_set_flow_mode(destination="tdest", flow_mode=0) + self.amqpSession.message_flow(destination="tdest", unit=0, value=self.topicCredits) self.amqpSession.message_flow(destination="tdest", unit=1, value=0xFFFFFFFF) self.isConnected = True @@ -1351,16 +1379,15 @@ class Event: def __init__(self, session, broker, codec): self.session = session self.broker = broker - pname = str(codec.read_str8()) - cname = str(codec.read_str8()) - hash = codec.read_bin128() - self.classKey = (pname, cname, hash) + self.classKey = ClassKey(codec) self.timestamp = codec.read_int64() self.severity = codec.read_uint8() self.schema = None + pname = self.classKey.getPackageName() + pkey = self.classKey.getPackageKey() if pname in session.packages: - if (cname, hash) in session.packages[pname]: - self.schema = session.packages[pname][(cname, hash)] + if pkey in session.packages[pname]: + self.schema = session.packages[pname][pkey] self.arguments = {} for arg in self.schema.arguments: self.arguments[arg.name] = session._decodeValue(codec, arg.type) @@ -1369,7 +1396,7 @@ class Event: if self.schema == None: return "<uninterpretable>" out = strftime("%c", gmtime(self.timestamp / 1000000000)) - out += " " + self._sevName() + " " + self.classKey[0] + ":" + self.classKey[1] + out += " " + self._sevName() + " " + self.classKey.getPackageName() + ":" + self.classKey.getClassName() out += " broker=" + self.broker.getUrl() for arg in self.schema.arguments: disp = self.session._displayValue(self.arguments[arg.name], arg.type).encode("utf8") |