summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNuno Santos <nsantos@apache.org>2008-03-27 20:51:42 +0000
committerNuno Santos <nsantos@apache.org>2008-03-27 20:51:42 +0000
commit3ebe0d3e1499243c6517aa69e22faea00facf7be (patch)
tree08ee62a7f598c88d82865f5170b532c02d9e10f6
parentc9bf075ee735b96e0c59ae306fc447d6ad73a544 (diff)
downloadqpid-python-3ebe0d3e1499243c6517aa69e22faea00facf7be.tar.gz
QPID-883: applying patch supplied by Ted Ross
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@641976 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--python/mgmt-cli/managementdata.py3
-rw-r--r--python/qpid/management.py131
-rw-r--r--python/tests_0-10_preview/__init__.py1
-rw-r--r--python/tests_0-10_preview/management.py89
-rw-r--r--specs/management-schema.xml16
5 files changed, 215 insertions, 25 deletions
diff --git a/python/mgmt-cli/managementdata.py b/python/mgmt-cli/managementdata.py
index adff05a710..badd8cd9d1 100644
--- a/python/mgmt-cli/managementdata.py
+++ b/python/mgmt-cli/managementdata.py
@@ -156,8 +156,7 @@ class ManagementData:
self.mclient = managementClient (self.spec, self.ctrlHandler, self.configHandler,
self.instHandler, self.methodReply)
self.mclient.schemaListener (self.schemaHandler)
- self.mch = managementChannel (self.channel, self.mclient.topicCb, self.mclient.replyCb)
- self.mclient.addChannel (self.mch)
+ self.mch = self.mclient.addChannel (self.channel)
def close (self):
self.mclient.removeChannel (self.mch)
diff --git a/python/qpid/management.py b/python/qpid/management.py
index 33679cf0da..b3bc068166 100644
--- a/python/qpid/management.py
+++ b/python/qpid/management.py
@@ -26,12 +26,12 @@ import base64
import socket
from threading import Thread
from message import Message
-from time import sleep
+from time import time
from qpid.client import Client
from qpid.content import Content
from cStringIO import StringIO
from codec import Codec, EOF
-from threading import Lock
+from threading import Lock, Condition
class SequenceManager:
@@ -61,10 +61,29 @@ class SequenceManager:
return data
+class mgmtObject (object):
+ """ Generic object that holds the contents of a management object with its
+ attributes set as object attributes. """
+
+ def __init__ (self, classKey, timestamps, row):
+ self.classKey = classKey
+ self.timestamps = timestamps
+ for cell in row:
+ setattr (self, cell[0], cell[1])
+
+class methodResult:
+ """ Object that contains the result of a method call """
+
+ def __init__ (self, status, sText, args):
+ self.status = status
+ self.statusText = sText
+ for arg in args:
+ setattr (self, arg, args[arg])
+
class managementChannel:
""" This class represents a connection to an AMQP broker. """
- def __init__ (self, ch, topicCb, replyCb, cbContext=None):
+ def __init__ (self, ch, topicCb, replyCb, cbContext):
""" Given a channel on an established AMQP broker connection, this method
opens a session and performs all of the declarations and bindings needed
to participate in the management protocol. """
@@ -120,10 +139,12 @@ class managementClient:
CTRL_SCHEMA_LOADED = 2
CTRL_USER = 3
+ SYNC_TIME = 10.0
+
#========================================================
# User API - interacts with the class's user
#========================================================
- def __init__ (self, amqpSpec, ctrlCb, configCb, instCb, methodCb=None):
+ def __init__ (self, amqpSpec, ctrlCb=None, configCb=None, instCb=None, methodCb=None):
self.spec = amqpSpec
self.ctrlCb = ctrlCb
self.configCb = configCb
@@ -135,6 +156,10 @@ class managementClient:
self.seqMgr = SequenceManager ()
self.schema = {}
self.packages = {}
+ self.cv = Condition ()
+ self.syncInFlight = False
+ self.syncSequence = 0
+ self.syncResult = None
def schemaListener (self, schemaCb):
""" Optionally register a callback to receive details of the schema of
@@ -146,9 +171,11 @@ class managementClient:
in the network. """
self.eventCb = eventCb
- def addChannel (self, channel):
+ def addChannel (self, channel, cbContext=None):
""" Register a new channel. """
- self.channels.append (channel)
+ mch = managementChannel (channel, self.topicCb, self.replyCb, cbContext)
+
+ self.channels.append (mch)
codec = Codec (StringIO (), self.spec)
self.setHeader (codec, ord ('B'))
msg = Content (codec.stream.getvalue ())
@@ -156,12 +183,13 @@ class managementClient:
msg["routing_key"] = "agent"
msg["reply_to"] = self.spec.struct ("reply_to")
msg["reply_to"]["exchange_name"] = "amq.direct"
- msg["reply_to"]["routing_key"] = channel.replyName
- channel.send ("qpid.management", msg)
+ msg["reply_to"]["routing_key"] = mch.replyName
+ mch.send ("qpid.management", msg)
+ return mch
- def removeChannel (self, channel):
+ def removeChannel (self, mch):
""" Remove a previously added channel from management. """
- self.channels.remove (channel)
+ self.channels.remove (mch)
def callMethod (self, channel, userSequence, objId, className, methodName, args=None):
""" Invoke a method on a managed object. """
@@ -182,6 +210,55 @@ class managementClient:
msg["reply_to"]["routing_key"] = channel.replyName
channel.send ("qpid.management", msg)
+ def syncWaitForStable (self, channel):
+ """ Synchronous (blocking) call to wait for schema stability on a channel """
+ self.cv.acquire ()
+ self.syncInFlight = True
+ starttime = time ()
+ while channel.reqsOutstanding != 0:
+ self.cv.wait (self.SYNC_TIME)
+ if time () - starttime > self.SYNC_TIME:
+ self.cv.release ()
+ raise RuntimeError ("Timed out waiting for response on channel")
+ self.cv.release ()
+
+ def syncCallMethod (self, channel, objId, className, methodName, args=None):
+ """ Synchronous (blocking) method call """
+ self.cv.acquire ()
+ self.syncInFlight = True
+ self.syncResult = None
+ self.syncSequence = self.seqMgr.reserve ("sync")
+ self.cv.release ()
+ self.callMethod (channel, self.syncSequence, objId, className, methodName, args)
+ self.cv.acquire ()
+ starttime = time ()
+ while self.syncInFlight:
+ self.cv.wait (self.SYNC_TIME)
+ if time () - starttime > self.SYNC_TIME:
+ self.cv.release ()
+ raise RuntimeError ("Timed out waiting for response on channel")
+ result = self.syncResult
+ self.cv.release ()
+ return result
+
+ def syncGetObjects (self, channel, className):
+ """ Synchronous (blocking) get call """
+ self.cv.acquire ()
+ self.syncInFlight = True
+ self.syncResult = []
+ self.syncSequence = self.seqMgr.reserve ("sync")
+ self.cv.release ()
+ self.getObjects (channel, self.syncSequence, className)
+ self.cv.acquire ()
+ starttime = time ()
+ while self.syncInFlight:
+ self.cv.wait (self.SYNC_TIME)
+ if time () - starttime > self.SYNC_TIME:
+ self.cv.release ()
+ raise RuntimeError ("Timed out waiting for response on channel")
+ result = self.syncResult
+ self.cv.release ()
+ return result
#========================================================
# Channel API - interacts with registered channel objects
@@ -312,10 +389,18 @@ class managementClient:
return data
def incOutstanding (self, ch):
+ self.cv.acquire ()
ch.reqsOutstanding = ch.reqsOutstanding + 1
+ self.cv.release ()
def decOutstanding (self, ch):
+ self.cv.acquire ()
ch.reqsOutstanding = ch.reqsOutstanding - 1
+ if ch.reqsOutstanding == 0 and self.syncInFlight:
+ self.syncInFlight = False
+ self.cv.notify ()
+ self.cv.release ()
+
if ch.reqsOutstanding == 0:
if self.ctrlCb != None:
self.ctrlCb (ch.context, self.CTRL_SCHEMA_LOADED, None)
@@ -330,6 +415,7 @@ class managementClient:
(userSequence, classId, methodName) = data
args = {}
+ context = self.seqMgr.release (userSequence)
if status == 0:
schemaClass = self.schema[classId]
@@ -346,7 +432,13 @@ class managementClient:
if arg[2].find("O") != -1:
args[arg[0]] = self.decodeValue (codec, arg[1])
- if self.methodCb != None:
+ if context == "sync" and userSequence == self.syncSequence:
+ self.cv.acquire ()
+ self.syncInFlight = False
+ self.syncResult = methodResult (status, sText, args)
+ self.cv.notify ()
+ self.cv.release ()
+ elif self.methodCb != None:
self.methodCb (ch.context, userSequence, status, sText, args)
def handleCommandComplete (self, ch, codec, seq):
@@ -356,6 +448,11 @@ class managementClient:
context = self.seqMgr.release (seq)
if context == "outstanding":
self.decOutstanding (ch)
+ elif context == "sync" and seq == self.syncSequence:
+ self.cv.acquire ()
+ self.syncInFlight = False
+ self.cv.notify ()
+ self.cv.release ()
elif self.ctrlCb != None:
self.ctrlCb (ch.context, self.CTRL_USER, data)
@@ -541,9 +638,9 @@ class managementClient:
if self.schemaCb != None:
self.schemaCb (ch.context, classKey, configs, insts, methods, events)
- def parseContent (self, ch, cls, codec):
+ def parseContent (self, ch, cls, codec, seq=0):
""" Parse a received content message. """
- if (cls == 'C' or cls == 'B') and self.configCb == None:
+ if (cls == 'C' or (cls == 'B' and seq == 0)) and self.configCb == None:
return
if cls == 'I' and self.instCb == None:
return
@@ -582,8 +679,12 @@ class managementClient:
data = self.decodeValue (codec, tc)
row.append ((name, data))
- if cls == 'C' or cls == 'B':
+ if cls == 'C' or (cls == 'B' and seq != self.syncSequence):
self.configCb (ch.context, classKey, row, timestamps)
+ elif cls == 'B' and seq == self.syncSequence:
+ if timestamps[2] == 0:
+ obj = mgmtObject (classKey, timestamps, row)
+ self.syncResult.append (obj)
elif cls == 'I':
self.instCb (ch.context, classKey, row, timestamps)
@@ -596,7 +697,7 @@ class managementClient:
elif opcode == 'i':
self.parseContent (ch, 'I', codec)
elif opcode == 'g':
- self.parseContent (ch, 'B', codec)
+ self.parseContent (ch, 'B', codec, seq)
else:
raise ValueError ("Unknown opcode: %c" % opcode);
diff --git a/python/tests_0-10_preview/__init__.py b/python/tests_0-10_preview/__init__.py
index fe96d9e122..f0acf9c632 100644
--- a/python/tests_0-10_preview/__init__.py
+++ b/python/tests_0-10_preview/__init__.py
@@ -25,6 +25,7 @@ from dtx import *
from example import *
from exchange import *
from execution import *
+from management import *
from message import *
from query import *
from queue import *
diff --git a/python/tests_0-10_preview/management.py b/python/tests_0-10_preview/management.py
new file mode 100644
index 0000000000..de6161ae96
--- /dev/null
+++ b/python/tests_0-10_preview/management.py
@@ -0,0 +1,89 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpid.datatypes import Message, RangedSet
+from qpid.testlib import TestBase
+from qpid.management import managementChannel, managementClient
+
+class ManagementTest (TestBase):
+ """
+ Tests for the management hooks
+ """
+
+ def test_broker_connectivity (self):
+ """
+ Call the "echo" method on the broker to verify it is alive and talking.
+ """
+ channel = self.client.channel(2)
+
+ mc = managementClient (channel.spec)
+ mch = mc.addChannel (channel)
+
+ mc.syncWaitForStable (mch)
+ brokers = mc.syncGetObjects (mch, "broker")
+ self.assertEqual (len (brokers), 1)
+ broker = brokers[0]
+ args = {}
+ body = "Echo Message Body"
+ args["body"] = body
+
+ for seq in range (1, 5):
+ args["sequence"] = seq
+ res = mc.syncCallMethod (mch, broker.id, broker.classKey, "echo", args)
+ self.assertEqual (res.status, 0)
+ self.assertEqual (res.statusText, "OK")
+ self.assertEqual (res.sequence, seq)
+ self.assertEqual (res.body, body)
+
+ def test_system_object (self):
+ channel = self.client.channel(2)
+
+ mc = managementClient (channel.spec)
+ mch = mc.addChannel (channel)
+
+ mc.syncWaitForStable (mch)
+ systems = mc.syncGetObjects (mch, "system")
+ self.assertEqual (len (systems), 1)
+
+ def test_standard_exchanges (self):
+ channel = self.client.channel(2)
+
+ mc = managementClient (channel.spec)
+ mch = mc.addChannel (channel)
+
+ mc.syncWaitForStable (mch)
+ exchanges = mc.syncGetObjects (mch, "exchange")
+ exchange = self.findExchange (exchanges, "")
+ self.assertEqual (exchange.type, "direct")
+ exchange = self.findExchange (exchanges, "amq.direct")
+ self.assertEqual (exchange.type, "direct")
+ exchange = self.findExchange (exchanges, "amq.topic")
+ self.assertEqual (exchange.type, "topic")
+ exchange = self.findExchange (exchanges, "amq.fanout")
+ self.assertEqual (exchange.type, "fanout")
+ exchange = self.findExchange (exchanges, "amq.match")
+ self.assertEqual (exchange.type, "headers")
+ exchange = self.findExchange (exchanges, "qpid.management")
+ self.assertEqual (exchange.type, "topic")
+
+ def findExchange (self, exchanges, name):
+ for exchange in exchanges:
+ if exchange.name == name:
+ return exchange
+ return None
diff --git a/specs/management-schema.xml b/specs/management-schema.xml
index a704a95a2c..edf14e2789 100644
--- a/specs/management-schema.xml
+++ b/specs/management-schema.xml
@@ -47,11 +47,11 @@
<class name="system">
<configElement name="sysId" index="y" type="sstr" access="RC"/>
- <instElement name="osName" type="sstr" desc="Operating System Name"/>
- <instElement name="nodeName" type="sstr" desc="Node Name"/>
- <instElement name="release" type="sstr"/>
- <instElement name="version" type="sstr"/>
- <instElement name="machine" type="sstr"/>
+ <configElement name="osName" type="sstr" access="RO" desc="Operating System Name"/>
+ <configElement name="nodeName" type="sstr" access="RO" desc="Node Name"/>
+ <configElement name="release" type="sstr" access="RO"/>
+ <configElement name="version" type="sstr" access="RO"/>
+ <configElement name="machine" type="sstr" access="RO"/>
</class>
@@ -215,11 +215,11 @@
This class represents an inter-broker connection.
- <configElement name="vhostRef" type="objId" access="RC" index="y" parentRef="y"/>
- <configElement name="address" type="sstr" access="RC" index="y"/>
+ <configElement name="vhostRef" type="objId" access="RC" index="y" parentRef="y"/>
+ <configElement name="address" type="sstr" access="RC" index="y"/>
+ <configElement name="authIdentity" type="sstr" access="RO"/>
<instElement name="closing" type="bool" desc="This link is closing by management request"/>
- <instElement name="authIdentity" type="sstr"/>
<instElement name="framesFromPeer" type="count64"/>
<instElement name="framesToPeer" type="count64"/>
<instElement name="bytesFromPeer" type="count64"/>