diff options
author | Nuno Santos <nsantos@apache.org> | 2008-03-27 20:51:42 +0000 |
---|---|---|
committer | Nuno Santos <nsantos@apache.org> | 2008-03-27 20:51:42 +0000 |
commit | 3ebe0d3e1499243c6517aa69e22faea00facf7be (patch) | |
tree | 08ee62a7f598c88d82865f5170b532c02d9e10f6 | |
parent | c9bf075ee735b96e0c59ae306fc447d6ad73a544 (diff) | |
download | qpid-python-3ebe0d3e1499243c6517aa69e22faea00facf7be.tar.gz |
QPID-883: applying patch supplied by Ted Ross
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@641976 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | python/mgmt-cli/managementdata.py | 3 | ||||
-rw-r--r-- | python/qpid/management.py | 131 | ||||
-rw-r--r-- | python/tests_0-10_preview/__init__.py | 1 | ||||
-rw-r--r-- | python/tests_0-10_preview/management.py | 89 | ||||
-rw-r--r-- | specs/management-schema.xml | 16 |
5 files changed, 215 insertions, 25 deletions
diff --git a/python/mgmt-cli/managementdata.py b/python/mgmt-cli/managementdata.py index adff05a710..badd8cd9d1 100644 --- a/python/mgmt-cli/managementdata.py +++ b/python/mgmt-cli/managementdata.py @@ -156,8 +156,7 @@ class ManagementData: self.mclient = managementClient (self.spec, self.ctrlHandler, self.configHandler, self.instHandler, self.methodReply) self.mclient.schemaListener (self.schemaHandler) - self.mch = managementChannel (self.channel, self.mclient.topicCb, self.mclient.replyCb) - self.mclient.addChannel (self.mch) + self.mch = self.mclient.addChannel (self.channel) def close (self): self.mclient.removeChannel (self.mch) diff --git a/python/qpid/management.py b/python/qpid/management.py index 33679cf0da..b3bc068166 100644 --- a/python/qpid/management.py +++ b/python/qpid/management.py @@ -26,12 +26,12 @@ import base64 import socket from threading import Thread from message import Message -from time import sleep +from time import time from qpid.client import Client from qpid.content import Content from cStringIO import StringIO from codec import Codec, EOF -from threading import Lock +from threading import Lock, Condition class SequenceManager: @@ -61,10 +61,29 @@ class SequenceManager: return data +class mgmtObject (object): + """ Generic object that holds the contents of a management object with its + attributes set as object attributes. """ + + def __init__ (self, classKey, timestamps, row): + self.classKey = classKey + self.timestamps = timestamps + for cell in row: + setattr (self, cell[0], cell[1]) + +class methodResult: + """ Object that contains the result of a method call """ + + def __init__ (self, status, sText, args): + self.status = status + self.statusText = sText + for arg in args: + setattr (self, arg, args[arg]) + class managementChannel: """ This class represents a connection to an AMQP broker. """ - def __init__ (self, ch, topicCb, replyCb, cbContext=None): + def __init__ (self, ch, topicCb, replyCb, cbContext): """ Given a channel on an established AMQP broker connection, this method opens a session and performs all of the declarations and bindings needed to participate in the management protocol. """ @@ -120,10 +139,12 @@ class managementClient: CTRL_SCHEMA_LOADED = 2 CTRL_USER = 3 + SYNC_TIME = 10.0 + #======================================================== # User API - interacts with the class's user #======================================================== - def __init__ (self, amqpSpec, ctrlCb, configCb, instCb, methodCb=None): + def __init__ (self, amqpSpec, ctrlCb=None, configCb=None, instCb=None, methodCb=None): self.spec = amqpSpec self.ctrlCb = ctrlCb self.configCb = configCb @@ -135,6 +156,10 @@ class managementClient: self.seqMgr = SequenceManager () self.schema = {} self.packages = {} + self.cv = Condition () + self.syncInFlight = False + self.syncSequence = 0 + self.syncResult = None def schemaListener (self, schemaCb): """ Optionally register a callback to receive details of the schema of @@ -146,9 +171,11 @@ class managementClient: in the network. """ self.eventCb = eventCb - def addChannel (self, channel): + def addChannel (self, channel, cbContext=None): """ Register a new channel. """ - self.channels.append (channel) + mch = managementChannel (channel, self.topicCb, self.replyCb, cbContext) + + self.channels.append (mch) codec = Codec (StringIO (), self.spec) self.setHeader (codec, ord ('B')) msg = Content (codec.stream.getvalue ()) @@ -156,12 +183,13 @@ class managementClient: msg["routing_key"] = "agent" msg["reply_to"] = self.spec.struct ("reply_to") msg["reply_to"]["exchange_name"] = "amq.direct" - msg["reply_to"]["routing_key"] = channel.replyName - channel.send ("qpid.management", msg) + msg["reply_to"]["routing_key"] = mch.replyName + mch.send ("qpid.management", msg) + return mch - def removeChannel (self, channel): + def removeChannel (self, mch): """ Remove a previously added channel from management. """ - self.channels.remove (channel) + self.channels.remove (mch) def callMethod (self, channel, userSequence, objId, className, methodName, args=None): """ Invoke a method on a managed object. """ @@ -182,6 +210,55 @@ class managementClient: msg["reply_to"]["routing_key"] = channel.replyName channel.send ("qpid.management", msg) + def syncWaitForStable (self, channel): + """ Synchronous (blocking) call to wait for schema stability on a channel """ + self.cv.acquire () + self.syncInFlight = True + starttime = time () + while channel.reqsOutstanding != 0: + self.cv.wait (self.SYNC_TIME) + if time () - starttime > self.SYNC_TIME: + self.cv.release () + raise RuntimeError ("Timed out waiting for response on channel") + self.cv.release () + + def syncCallMethod (self, channel, objId, className, methodName, args=None): + """ Synchronous (blocking) method call """ + self.cv.acquire () + self.syncInFlight = True + self.syncResult = None + self.syncSequence = self.seqMgr.reserve ("sync") + self.cv.release () + self.callMethod (channel, self.syncSequence, objId, className, methodName, args) + self.cv.acquire () + starttime = time () + while self.syncInFlight: + self.cv.wait (self.SYNC_TIME) + if time () - starttime > self.SYNC_TIME: + self.cv.release () + raise RuntimeError ("Timed out waiting for response on channel") + result = self.syncResult + self.cv.release () + return result + + def syncGetObjects (self, channel, className): + """ Synchronous (blocking) get call """ + self.cv.acquire () + self.syncInFlight = True + self.syncResult = [] + self.syncSequence = self.seqMgr.reserve ("sync") + self.cv.release () + self.getObjects (channel, self.syncSequence, className) + self.cv.acquire () + starttime = time () + while self.syncInFlight: + self.cv.wait (self.SYNC_TIME) + if time () - starttime > self.SYNC_TIME: + self.cv.release () + raise RuntimeError ("Timed out waiting for response on channel") + result = self.syncResult + self.cv.release () + return result #======================================================== # Channel API - interacts with registered channel objects @@ -312,10 +389,18 @@ class managementClient: return data def incOutstanding (self, ch): + self.cv.acquire () ch.reqsOutstanding = ch.reqsOutstanding + 1 + self.cv.release () def decOutstanding (self, ch): + self.cv.acquire () ch.reqsOutstanding = ch.reqsOutstanding - 1 + if ch.reqsOutstanding == 0 and self.syncInFlight: + self.syncInFlight = False + self.cv.notify () + self.cv.release () + if ch.reqsOutstanding == 0: if self.ctrlCb != None: self.ctrlCb (ch.context, self.CTRL_SCHEMA_LOADED, None) @@ -330,6 +415,7 @@ class managementClient: (userSequence, classId, methodName) = data args = {} + context = self.seqMgr.release (userSequence) if status == 0: schemaClass = self.schema[classId] @@ -346,7 +432,13 @@ class managementClient: if arg[2].find("O") != -1: args[arg[0]] = self.decodeValue (codec, arg[1]) - if self.methodCb != None: + if context == "sync" and userSequence == self.syncSequence: + self.cv.acquire () + self.syncInFlight = False + self.syncResult = methodResult (status, sText, args) + self.cv.notify () + self.cv.release () + elif self.methodCb != None: self.methodCb (ch.context, userSequence, status, sText, args) def handleCommandComplete (self, ch, codec, seq): @@ -356,6 +448,11 @@ class managementClient: context = self.seqMgr.release (seq) if context == "outstanding": self.decOutstanding (ch) + elif context == "sync" and seq == self.syncSequence: + self.cv.acquire () + self.syncInFlight = False + self.cv.notify () + self.cv.release () elif self.ctrlCb != None: self.ctrlCb (ch.context, self.CTRL_USER, data) @@ -541,9 +638,9 @@ class managementClient: if self.schemaCb != None: self.schemaCb (ch.context, classKey, configs, insts, methods, events) - def parseContent (self, ch, cls, codec): + def parseContent (self, ch, cls, codec, seq=0): """ Parse a received content message. """ - if (cls == 'C' or cls == 'B') and self.configCb == None: + if (cls == 'C' or (cls == 'B' and seq == 0)) and self.configCb == None: return if cls == 'I' and self.instCb == None: return @@ -582,8 +679,12 @@ class managementClient: data = self.decodeValue (codec, tc) row.append ((name, data)) - if cls == 'C' or cls == 'B': + if cls == 'C' or (cls == 'B' and seq != self.syncSequence): self.configCb (ch.context, classKey, row, timestamps) + elif cls == 'B' and seq == self.syncSequence: + if timestamps[2] == 0: + obj = mgmtObject (classKey, timestamps, row) + self.syncResult.append (obj) elif cls == 'I': self.instCb (ch.context, classKey, row, timestamps) @@ -596,7 +697,7 @@ class managementClient: elif opcode == 'i': self.parseContent (ch, 'I', codec) elif opcode == 'g': - self.parseContent (ch, 'B', codec) + self.parseContent (ch, 'B', codec, seq) else: raise ValueError ("Unknown opcode: %c" % opcode); diff --git a/python/tests_0-10_preview/__init__.py b/python/tests_0-10_preview/__init__.py index fe96d9e122..f0acf9c632 100644 --- a/python/tests_0-10_preview/__init__.py +++ b/python/tests_0-10_preview/__init__.py @@ -25,6 +25,7 @@ from dtx import * from example import * from exchange import * from execution import * +from management import * from message import * from query import * from queue import * diff --git a/python/tests_0-10_preview/management.py b/python/tests_0-10_preview/management.py new file mode 100644 index 0000000000..de6161ae96 --- /dev/null +++ b/python/tests_0-10_preview/management.py @@ -0,0 +1,89 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from qpid.datatypes import Message, RangedSet +from qpid.testlib import TestBase +from qpid.management import managementChannel, managementClient + +class ManagementTest (TestBase): + """ + Tests for the management hooks + """ + + def test_broker_connectivity (self): + """ + Call the "echo" method on the broker to verify it is alive and talking. + """ + channel = self.client.channel(2) + + mc = managementClient (channel.spec) + mch = mc.addChannel (channel) + + mc.syncWaitForStable (mch) + brokers = mc.syncGetObjects (mch, "broker") + self.assertEqual (len (brokers), 1) + broker = brokers[0] + args = {} + body = "Echo Message Body" + args["body"] = body + + for seq in range (1, 5): + args["sequence"] = seq + res = mc.syncCallMethod (mch, broker.id, broker.classKey, "echo", args) + self.assertEqual (res.status, 0) + self.assertEqual (res.statusText, "OK") + self.assertEqual (res.sequence, seq) + self.assertEqual (res.body, body) + + def test_system_object (self): + channel = self.client.channel(2) + + mc = managementClient (channel.spec) + mch = mc.addChannel (channel) + + mc.syncWaitForStable (mch) + systems = mc.syncGetObjects (mch, "system") + self.assertEqual (len (systems), 1) + + def test_standard_exchanges (self): + channel = self.client.channel(2) + + mc = managementClient (channel.spec) + mch = mc.addChannel (channel) + + mc.syncWaitForStable (mch) + exchanges = mc.syncGetObjects (mch, "exchange") + exchange = self.findExchange (exchanges, "") + self.assertEqual (exchange.type, "direct") + exchange = self.findExchange (exchanges, "amq.direct") + self.assertEqual (exchange.type, "direct") + exchange = self.findExchange (exchanges, "amq.topic") + self.assertEqual (exchange.type, "topic") + exchange = self.findExchange (exchanges, "amq.fanout") + self.assertEqual (exchange.type, "fanout") + exchange = self.findExchange (exchanges, "amq.match") + self.assertEqual (exchange.type, "headers") + exchange = self.findExchange (exchanges, "qpid.management") + self.assertEqual (exchange.type, "topic") + + def findExchange (self, exchanges, name): + for exchange in exchanges: + if exchange.name == name: + return exchange + return None diff --git a/specs/management-schema.xml b/specs/management-schema.xml index a704a95a2c..edf14e2789 100644 --- a/specs/management-schema.xml +++ b/specs/management-schema.xml @@ -47,11 +47,11 @@ <class name="system"> <configElement name="sysId" index="y" type="sstr" access="RC"/> - <instElement name="osName" type="sstr" desc="Operating System Name"/> - <instElement name="nodeName" type="sstr" desc="Node Name"/> - <instElement name="release" type="sstr"/> - <instElement name="version" type="sstr"/> - <instElement name="machine" type="sstr"/> + <configElement name="osName" type="sstr" access="RO" desc="Operating System Name"/> + <configElement name="nodeName" type="sstr" access="RO" desc="Node Name"/> + <configElement name="release" type="sstr" access="RO"/> + <configElement name="version" type="sstr" access="RO"/> + <configElement name="machine" type="sstr" access="RO"/> </class> @@ -215,11 +215,11 @@ This class represents an inter-broker connection. - <configElement name="vhostRef" type="objId" access="RC" index="y" parentRef="y"/> - <configElement name="address" type="sstr" access="RC" index="y"/> + <configElement name="vhostRef" type="objId" access="RC" index="y" parentRef="y"/> + <configElement name="address" type="sstr" access="RC" index="y"/> + <configElement name="authIdentity" type="sstr" access="RO"/> <instElement name="closing" type="bool" desc="This link is closing by management request"/> - <instElement name="authIdentity" type="sstr"/> <instElement name="framesFromPeer" type="count64"/> <instElement name="framesToPeer" type="count64"/> <instElement name="bytesFromPeer" type="count64"/> |