summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2010-01-20 14:35:45 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2010-01-20 14:35:45 +0000
commitb8f785cb02c988135f84128b9f930d79827270d4 (patch)
tree024236e7892561499f30976e226d01b06a07dbcb
parent9ebba0e9a3c32d9263365140ae7066ff6f3ef38c (diff)
downloadqpid-python-b8f785cb02c988135f84128b9f930d79827270d4.tar.gz
JIRA QPID-2261
Checkpoint: *) added to python test infrastructure: agent discovery tests basic query tests basic method call tests You can run the tests from the python directory using the following command: ./qpid-python-test -m qmf.test *) misc fixes to bugs flushed out by tests git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmfv2@901217 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/python/qmf/qmfAgent.py76
-rw-r--r--qpid/python/qmf/qmfCommon.py20
-rw-r--r--qpid/python/qmf/qmfConsole.py137
-rw-r--r--qpid/python/qmf/test/agent_test.py229
-rw-r--r--qpid/python/qmf/test/console_test.py21
5 files changed, 351 insertions, 132 deletions
diff --git a/qpid/python/qmf/qmfAgent.py b/qpid/python/qmf/qmfAgent.py
index 89be722833..59382f36ab 100644
--- a/qpid/python/qmf/qmfAgent.py
+++ b/qpid/python/qmf/qmfAgent.py
@@ -100,17 +100,32 @@ class Agent(Thread):
self._work_q = Queue.Queue()
self._work_q_put = False
+
+ def destroy(self, timeout=None):
+ """
+ Must be called before the Agent is deleted.
+ Frees up all resources and shuts down all background threads.
+
+ @type timeout: float
+ @param timeout: maximum time in seconds to wait for all background threads to terminate. Default: forever.
+ """
+ logging.debug("Destroying Agent %s" % self.name)
+ if self._conn:
+ self.remove_connection(timeout)
+ logging.debug("Agent Destroyed")
+
+
def get_name(self):
return self.name
- def setConnection(self, conn):
+ def set_connection(self, conn):
my_addr = QmfAddress.direct(self.name, self._domain)
locate_addr = QmfAddress.topic(AMQP_QMF_AGENT_LOCATE, self._domain)
ind_addr = QmfAddress.topic(AMQP_QMF_AGENT_INDICATION, self._domain)
- logging.error("my direct addr=%s" % my_addr)
- logging.error("agent.locate addr=%s" % locate_addr)
- logging.error("agent.ind addr=%s" % ind_addr)
+ logging.debug("my direct addr=%s" % my_addr)
+ logging.debug("agent.locate addr=%s" % locate_addr)
+ logging.debug("agent.ind addr=%s" % ind_addr)
self._conn = conn
self._session = self._conn.session()
@@ -127,7 +142,32 @@ class Agent(Thread):
self._running = True
self.start()
-
+
+ def remove_connection(self, timeout=None):
+ # tell connection thread to shutdown
+ self._running = False
+ if self.isAlive():
+ # kick my thread to wake it up
+ my_addr = QmfAddress.direct(self.name, self._domain)
+ logging.debug("Making temp sender for [%s]" % str(my_addr))
+ tmp_sender = self._session.sender(str(my_addr))
+ try:
+ msg = Message(subject=makeSubject(OpCode.noop))
+ tmp_sender.send( msg, sync=True )
+ except SendError, e:
+ logging.error(str(e))
+ logging.debug("waiting for agent receiver thread to exit")
+ self.join(timeout)
+ if self.isAlive():
+ logging.error( "Agent thread '%s' is hung..." % self.name)
+ self._direct_receiver.close()
+ self._locate_receiver.close()
+ self._ind_sender.close()
+ self._session.close()
+ self._session = None
+ self._conn = None
+ logging.debug("agent connection removal complete")
+
def register_object_class(self, schema):
"""
Register an instance of a SchemaClass with this agent
@@ -177,6 +217,14 @@ class Agent(Thread):
finally:
self._lock.release()
+ def get_object(self, id):
+ self._lock.acquire()
+ try:
+ data = self._agent_data.get(id)
+ finally:
+ self._lock.release()
+ return data
+
def method_response(self, handle, _out_args=None, _error=None):
"""
@@ -233,6 +281,7 @@ class Agent(Thread):
def run(self):
global _callback_thread
next_heartbeat = datetime.datetime.utcnow()
+ batch_limit = 10 # a guess
while self._running:
now = datetime.datetime.utcnow()
@@ -249,7 +298,7 @@ class Agent(Thread):
except Empty:
continue
- while True:
+ for i in range(batch_limit):
try:
msg = self._locate_receiver.fetch(timeout=0)
except Empty:
@@ -257,7 +306,7 @@ class Agent(Thread):
if msg and msg.content_type == "amqp/map":
self._dispatch(msg, _direct=False)
- while True:
+ for i in range(batch_limit):
try:
msg = self._direct_receiver.fetch(timeout=0)
except Empty:
@@ -294,7 +343,7 @@ class Agent(Thread):
@param _direct: True if msg directly addressed to this agent.
"""
- logging.error( "Message received from Console! [%s]" % msg )
+ logging.debug( "Message received from Console! [%s]" % msg )
try:
version,opcode = parseSubject(msg.subject)
except:
@@ -564,12 +613,17 @@ class QmfAgentData(QmfData):
super(QmfAgentData, self).set_value(_name, _value, _subType)
# @todo: publish change
- def inc_value(self, name, delta):
+ def inc_value(self, name, delta=1):
""" add the delta to the property """
# @todo: need to take write-lock
- logging.error(" TBD!!!")
+ val = self.get_value(name)
+ try:
+ val += delta
+ except:
+ raise
+ self.set_value(name, val)
- def dec_value(self, name, delta):
+ def dec_value(self, name, delta=1):
""" subtract the delta from the property """
# @todo: need to take write-lock
logging.error(" TBD!!!")
diff --git a/qpid/python/qmf/qmfCommon.py b/qpid/python/qmf/qmfCommon.py
index 580d86b7a7..24a54691fc 100644
--- a/qpid/python/qmf/qmfCommon.py
+++ b/qpid/python/qmf/qmfCommon.py
@@ -1143,24 +1143,44 @@ class qmfTypes(object):
TYPE_UINT16 = 2
TYPE_UINT32 = 3
TYPE_UINT64 = 4
+
TYPE_SSTR = 6
TYPE_LSTR = 7
+
TYPE_ABSTIME = 8
TYPE_DELTATIME = 9
+
TYPE_REF = 10
+
TYPE_BOOL = 11
+
TYPE_FLOAT = 12
TYPE_DOUBLE = 13
+
TYPE_UUID = 14
+
TYPE_MAP = 15
+
TYPE_INT8 = 16
TYPE_INT16 = 17
TYPE_INT32 = 18
TYPE_INT64 = 19
+
TYPE_OBJECT = 20
+
TYPE_LIST = 21
+
TYPE_ARRAY = 22
+# New subtypes:
+# integer (for time, duration, signed/unsigned)
+# double (float)
+# bool
+# string
+# map (ref, qmfdata)
+# list
+# uuid
+
class qmfAccess(object):
READ_CREATE = 1
diff --git a/qpid/python/qmf/qmfConsole.py b/qpid/python/qmf/qmfConsole.py
index 97acdd767e..291f596400 100644
--- a/qpid/python/qmf/qmfConsole.py
+++ b/qpid/python/qmf/qmfConsole.py
@@ -64,7 +64,6 @@ class _Mailbox(object):
self._msgs.append(obj)
# if was empty, notify waiters
if len(self._msgs) == 1:
- logging.error("Delivering @ %s" % time.time())
self._cv.notify()
finally:
self._cv.release()
@@ -118,7 +117,7 @@ class SequencedWaiter(object):
self.lock.acquire()
try:
if seq in self.pending:
- logging.error("Putting seq %d @ %s" % (seq,time.time()))
+ # logging.error("Putting seq %d @ %s" % (seq,time.time()))
self.pending[seq].deliver(new_data)
else:
logging.error( "seq %d not found!" % seq )
@@ -242,8 +241,29 @@ class QmfConsoleData(QmfData):
request that the Agent update the value of this object's
contents.
"""
- logging.error(" TBD!!!")
- return None
+ if _reply_handle is not None:
+ logging.error(" ASYNC REFRESH TBD!!!")
+ return None
+
+ assert self._agent
+ assert self._agent._console
+
+ if _timeout is None:
+ _timeout = self._agent._console._reply_timeout
+
+
+ # create query to agent using this objects ID
+ oid = self.get_object_id()
+ query = QmfQuery.create_id(QmfQuery.TARGET_OBJECT,
+ self.get_object_id())
+ obj_list = self._agent._console.doQuery(self._agent, query,
+ timeout=_timeout)
+ if obj_list is None or len(obj_list) != 1:
+ return None
+
+ self._update(obj_list[0])
+ return self
+
def invoke_method(self, name, _in_args={}, _reply_handle=None,
_timeout=None):
@@ -267,7 +287,7 @@ class QmfConsoleData(QmfData):
# validate
ms = self._schema.get_method(name)
if ms is None:
- raise ValueError("Method '%s' is undefined." % ms)
+ raise ValueError("Method '%s' is undefined." % name)
for aname,prop in ms.get_arguments().iteritems():
if aname not in _in_args:
@@ -326,7 +346,12 @@ class QmfConsoleData(QmfData):
else:
return MethodResult(_out_args=_map.get(SchemaMethod.KEY_ARGUMENTS))
-
+ def _update(self, newer):
+ super(QmfConsoleData,self).__init__(_values=newer._values, _subtypes=newer._subtypes,
+ _tag=newer._tag, _object_id=newer._object_id,
+ _ctime=newer._ctime, _utime=newer._utime,
+ _dtime=newer._dtime,
+ _schema=newer._schema, _const=True)
class QmfLocalData(QmfData):
"""
@@ -637,15 +662,15 @@ class Console(Thread):
" x-properties:"
" {type:direct}}}",
capacity=1)
- logging.error("local addr=%s" % self._address)
+ logging.debug("local addr=%s" % self._address)
ind_addr = QmfAddress.topic(AMQP_QMF_AGENT_INDICATION, self._domain)
- logging.error("agent.ind addr=%s" % ind_addr)
+ logging.debug("agent.ind addr=%s" % ind_addr)
self._announce_recvr = self._session.receiver(str(ind_addr) +
";{create:always,"
" node-properties:{type:topic}}",
capacity=1)
locate_addr = QmfAddress.topic(AMQP_QMF_AGENT_LOCATE, self._domain)
- logging.error("agent.locate addr=%s" % locate_addr)
+ logging.debug("agent.locate addr=%s" % locate_addr)
self._locate_sender = self._session.sender(str(locate_addr) +
";{create:always,"
" node-properties:{type:topic}}")
@@ -713,9 +738,9 @@ class Console(Thread):
finally:
self._lock.release()
- def findAgent(self, name, timeout=None ):
+ def find_agent(self, name, timeout=None ):
"""
- Given the id of a particular agent, return an instance of class Agent
+ Given the name of a particular agent, return an instance of class Agent
representing that agent. Return None if the agent does not exist.
"""
@@ -891,7 +916,7 @@ class Console(Thread):
if self._next_agent_expire > now:
timeout = ((self._next_agent_expire - now) + datetime.timedelta(microseconds=999999)).seconds
try:
- logging.error("waiting for next rcvr (timeout=%s)..." % timeout)
+ logging.debug("waiting for next rcvr (timeout=%s)..." % timeout)
self._session.next_receiver(timeout = timeout)
except Empty:
pass
@@ -899,6 +924,78 @@ class Console(Thread):
logging.debug("Shutting down Console thread")
+ def get_objects(self,
+ _schema_id=None,
+ _pname=None, _cname=None,
+ _object_id=None,
+ _agents=None,
+ _timeout=None):
+ """
+ @todo
+ """
+ if _object_id is not None:
+ # query by object id
+ query = QmfQuery.create_id(QmfQuery.TARGET_OBJECT, _object_id)
+ elif _schema_id is not None:
+ pred = QmfQueryPredicate({QmfQuery.CMP_EQ:
+ [QmfData.KEY_SCHEMA_ID,
+ _schema_id.map_encode()]})
+ query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT, pred)
+ elif _pname is not None:
+ # query by package name (and maybe class name)
+ if _cname is not None:
+ pred = QmfQueryPredicate({QmfQuery.LOGIC_AND:
+ [{QmfQuery.CMP_EQ:
+ [SchemaClassId.KEY_PACKAGE,
+ _pname]},
+ {QmfQuery.CMP_EQ:
+ [SchemaClassId.KEY_CLASS,
+ _cname]}]})
+ else:
+ pred = QmfQueryPredicate({QmfQuery.CMP_EQ:
+ [SchemaClassId.KEY_PACKAGE,
+ _pname]})
+ query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT, pred)
+
+ else:
+ raise Exception("invalid arguments")
+
+ if _agents is None:
+ # use copy of current agent list
+ self._lock.acquire()
+ try:
+ agent_list = self._agent_map.values()
+ finally:
+ self._lock.release()
+ elif isinstance(_agents, Agent):
+ agent_list = [_agents]
+ else:
+ agent_list = _agents
+ # @todo validate this list!
+
+ # @todo: fix when async doQuery done - query all agents at once, then
+ # wait for replies, instead of per-agent querying....
+
+ if _timeout is None:
+ _timeout = self._reply_timeout
+
+ obj_list = []
+ expired = datetime.datetime.utcnow() + datetime.timedelta(seconds=_timeout)
+ for agent in agent_list:
+ if not agent.isActive():
+ continue
+ now = datetime.datetime.utcnow()
+ if now >= expired:
+ break
+ timeout = ((expired - now) + datetime.timedelta(microseconds=999999)).seconds
+ reply = self.doQuery(agent, query, timeout)
+ if reply:
+ obj_list = obj_list + reply
+
+ if obj_list:
+ return obj_list
+ return None
+
# called by run() thread ONLY
@@ -908,7 +1005,7 @@ class Console(Thread):
PRIVATE: Process a message received from an Agent
"""
- logging.error( "Message received from Agent! [%s]" % msg )
+ logging.debug( "Message received from Agent! [%s]" % msg )
try:
version,opcode = parseSubject(msg.subject)
@@ -995,14 +1092,14 @@ class Console(Thread):
self._lock.release()
if old_timestamp == None and matched:
- logging.error("AGENT_ADDED for %s (%s)" % (agent, time.time()))
+ logging.debug("AGENT_ADDED for %s (%s)" % (agent, time.time()))
wi = WorkItem(WorkItem.AGENT_ADDED, None, {"agent": agent})
self._work_q.put(wi)
self._work_q_put = True
if correlated:
# wake up all waiters
- logging.error("waking waiters for correlation id %s" % msg.correlation_id)
+ logging.debug("waking waiters for correlation id %s" % msg.correlation_id)
self._req_correlation.put_data(msg.correlation_id, msg)
@@ -1015,11 +1112,12 @@ class Console(Thread):
logging.debug("_handleDataIndMsg '%s' (%s)" % (msg, time.time()))
if not self._req_correlation.isValid(msg.correlation_id):
- logging.error("FIXME: uncorrelated data indicate??? msg='%s'" % str(msg))
+ logging.debug("Data indicate received with unknown correlation_id"
+ " msg='%s'" % str(msg))
return
# wake up all waiters
- logging.error("waking waiters for correlation id %s" % msg.correlation_id)
+ logging.debug("waking waiters for correlation id %s" % msg.correlation_id)
self._req_correlation.put_data(msg.correlation_id, msg)
@@ -1031,11 +1129,12 @@ class Console(Thread):
logging.debug("_handleResponseMsg '%s' (%s)" % (msg, time.time()))
if not self._req_correlation.isValid(msg.correlation_id):
- logging.error("FIXME: uncorrelated response??? msg='%s'" % str(msg))
+ logging.debug("Response msg received with unknown correlation_id"
+ " msg='%s'" % str(msg))
return
# wake up all waiters
- logging.error("waking waiters for correlation id %s" % msg.correlation_id)
+ logging.debug("waking waiters for correlation id %s" % msg.correlation_id)
self._req_correlation.put_data(msg.correlation_id, msg)
diff --git a/qpid/python/qmf/test/agent_test.py b/qpid/python/qmf/test/agent_test.py
index dd55b49316..127fbe2080 100644
--- a/qpid/python/qmf/test/agent_test.py
+++ b/qpid/python/qmf/test/agent_test.py
@@ -1,13 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
import logging
import time
+import unittest
from threading import Semaphore
from qpid.messaging import *
-from qmfCommon import (qmfTypes, SchemaProperty, SchemaObjectClass, QmfData,
+from qmf.qmfCommon import (qmfTypes, SchemaProperty, SchemaObjectClass, QmfData,
QmfEvent, SchemaMethod, Notifier, SchemaClassId,
WorkItem)
-from qmfAgent import (Agent, QmfAgentData)
+from qmf.qmfAgent import (Agent, QmfAgentData)
@@ -25,114 +43,125 @@ class ExampleNotifier(Notifier):
+
+class QmfTest(unittest.TestCase):
+ def test_begin(self):
+ print("!!! being test")
+
+ def test_end(self):
+ print("!!! end test")
+
+
#
# An example agent application
#
-_notifier = ExampleNotifier()
-_agent = Agent( "qmf.testAgent", _notifier=_notifier )
-
-# Dynamically construct a class schema
-
-_schema = SchemaObjectClass( _classId=SchemaClassId("MyPackage", "MyClass"),
- _desc="A test data schema",
- _object_id_names=["index1", "index2"] )
-# add properties
-_schema.add_property( "index1", SchemaProperty(qmfTypes.TYPE_UINT8))
-_schema.add_property( "index2", SchemaProperty(qmfTypes.TYPE_LSTR))
-
-# these two properties are statistics
-_schema.add_property( "query_count", SchemaProperty(qmfTypes.TYPE_UINT32))
-_schema.add_property( "method_call_count", SchemaProperty(qmfTypes.TYPE_UINT32))
-
-# These two properties can be set via the method call
-_schema.add_property( "set_string", SchemaProperty(qmfTypes.TYPE_LSTR))
-_schema.add_property( "set_int", SchemaProperty(qmfTypes.TYPE_UINT32))
-
-
-# add method
-_meth = SchemaMethod( _desc="Method to set string and int in object." )
-_meth.add_argument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) )
-_meth.add_argument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) )
-_schema.add_method( "set_meth", _meth )
-
-# Add schema to Agent
-
-_agent.register_object_class(_schema)
-
-# instantiate managed data objects matching the schema
-
-_obj1 = QmfAgentData( _agent, _schema=_schema )
-_obj1.set_value("index1", 100)
-_obj1.set_value("index2", "a name" )
-_obj1.set_value("set_string", "UNSET")
-_obj1.set_value("set_int", 0)
-_obj1.set_value("query_count", 0)
-_obj1.set_value("method_call_count", 0)
-_agent.add_object( _obj1 )
-
-_agent.add_object( QmfAgentData( _agent, _schema=_schema,
- _values={"index1":99,
- "index2": "another name",
- "set_string": "UNSET",
- "set_int": 0,
- "query_count": 0,
- "method_call_count": 0} ))
-
-# add an "unstructured" object to the Agent
-_obj2 = QmfAgentData(_agent, _object_id="01545")
-_obj2.set_value("field1", "a value")
-_obj2.set_value("field2", 2)
-_obj2.set_value("field3", {"a":1, "map":2, "value":3})
-_obj2.set_value("field4", ["a", "list", "value"])
-_agent.add_object(_obj2)
-
-
-## Now connect to the broker
-
-_c = Connection("localhost")
-_c.connect()
-_agent.setConnection(_c)
-
-_error_data = QmfData.create({"code": -1, "description": "You made a boo-boo."})
-
-_done = False
-while not _done:
- # try:
- _notifier.waitForWork()
-
- _wi = _agent.get_next_workitem(timeout=0)
- while _wi:
-
- if _wi.get_type() == WorkItem.METHOD_CALL:
- mc = _wi.get_params()
+
+if __name__ == '__main__':
+ _notifier = ExampleNotifier()
+ _agent = Agent( "qmf.testAgent", _notifier=_notifier )
+
+ # Dynamically construct a class schema
+
+ _schema = SchemaObjectClass( _classId=SchemaClassId("MyPackage", "MyClass"),
+ _desc="A test data schema",
+ _object_id_names=["index1", "index2"] )
+ # add properties
+ _schema.add_property( "index1", SchemaProperty(qmfTypes.TYPE_UINT8))
+ _schema.add_property( "index2", SchemaProperty(qmfTypes.TYPE_LSTR))
+
+ # these two properties are statistics
+ _schema.add_property( "query_count", SchemaProperty(qmfTypes.TYPE_UINT32))
+ _schema.add_property( "method_call_count", SchemaProperty(qmfTypes.TYPE_UINT32))
+
+ # These two properties can be set via the method call
+ _schema.add_property( "set_string", SchemaProperty(qmfTypes.TYPE_LSTR))
+ _schema.add_property( "set_int", SchemaProperty(qmfTypes.TYPE_UINT32))
+
+
+ # add method
+ _meth = SchemaMethod( _desc="Method to set string and int in object." )
+ _meth.add_argument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) )
+ _meth.add_argument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) )
+ _schema.add_method( "set_meth", _meth )
+
+ # Add schema to Agent
+
+ _agent.register_object_class(_schema)
+
+ # instantiate managed data objects matching the schema
+
+ _obj1 = QmfAgentData( _agent, _schema=_schema )
+ _obj1.set_value("index1", 100)
+ _obj1.set_value("index2", "a name" )
+ _obj1.set_value("set_string", "UNSET")
+ _obj1.set_value("set_int", 0)
+ _obj1.set_value("query_count", 0)
+ _obj1.set_value("method_call_count", 0)
+ _agent.add_object( _obj1 )
+
+ _agent.add_object( QmfAgentData( _agent, _schema=_schema,
+ _values={"index1":99,
+ "index2": "another name",
+ "set_string": "UNSET",
+ "set_int": 0,
+ "query_count": 0,
+ "method_call_count": 0} ))
+
+ # add an "unstructured" object to the Agent
+ _obj2 = QmfAgentData(_agent, _object_id="01545")
+ _obj2.set_value("field1", "a value")
+ _obj2.set_value("field2", 2)
+ _obj2.set_value("field3", {"a":1, "map":2, "value":3})
+ _obj2.set_value("field4", ["a", "list", "value"])
+ _agent.add_object(_obj2)
+
+
+ ## Now connect to the broker
+
+ _c = Connection("localhost")
+ _c.connect()
+ _agent.setConnection(_c)
+
+ _error_data = QmfData.create({"code": -1, "description": "You made a boo-boo."})
+
+ _done = False
+ while not _done:
+ # try:
+ _notifier.waitForWork()
+
+ _wi = _agent.get_next_workitem(timeout=0)
+ while _wi:
+
+ if _wi.get_type() == WorkItem.METHOD_CALL:
+ mc = _wi.get_params()
- if mc.get_name() == "set_meth":
- print("!!! Calling 'set_meth' on Object_id = %s" % mc.get_object_id())
- print("!!! args='%s'" % str(mc.get_args()))
- print("!!! userid=%s" % str(mc.get_user_id()))
- print("!!! handle=%s" % _wi.get_handle())
- _agent.method_response(_wi.get_handle(),
- {"rc1": 100, "rc2": "Success"})
+ if mc.get_name() == "set_meth":
+ print("!!! Calling 'set_meth' on Object_id = %s" % mc.get_object_id())
+ print("!!! args='%s'" % str(mc.get_args()))
+ print("!!! userid=%s" % str(mc.get_user_id()))
+ print("!!! handle=%s" % _wi.get_handle())
+ _agent.method_response(_wi.get_handle(),
+ {"rc1": 100, "rc2": "Success"})
+ else:
+ print("!!! Unknown Method name = %s" % mc.get_name())
+ _agent.method_response(_wi.get_handle(), _error=_error_data)
else:
- print("!!! Unknown Method name = %s" % mc.get_name())
- _agent.method_response(_wi.get_handle(), _error=_error_data)
- else:
- print("TBD: work item %d:%s" % (_wi.get_type(), str(_wi.get_params())))
+ print("TBD: work item %d:%s" % (_wi.get_type(), str(_wi.get_params())))
- _agent.release_workitem(_wi)
- _wi = _agent.get_next_workitem(timeout=0)
- # except:
- # print( "shutting down...")
- # _done = True
+ _agent.release_workitem(_wi)
+ _wi = _agent.get_next_workitem(timeout=0)
+ # except:
+ # print( "shutting down...")
+ # _done = True
-print( "Removing connection... TBD!!!" )
-#_myConsole.remove_connection( _c, 10 )
+ print( "Removing connection... TBD!!!" )
+ #_myConsole.remove_connection( _c, 10 )
-print( "Destroying agent... TBD!!!" )
-#_myConsole.destroy( 10 )
+ print( "Destroying agent... TBD!!!" )
+ #_myConsole.destroy( 10 )
-print( "******** agent test done ********" )
+ print( "******** agent test done ********" )
diff --git a/qpid/python/qmf/test/console_test.py b/qpid/python/qmf/test/console_test.py
index a7703e24a6..7077f32507 100644
--- a/qpid/python/qmf/test/console_test.py
+++ b/qpid/python/qmf/test/console_test.py
@@ -1,12 +1,29 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
import logging
import time
from threading import Semaphore
from qpid.messaging import *
-from qmfCommon import (Notifier, QmfQuery, QmfQueryPredicate, MsgKey,
+from qmf.qmfCommon import (Notifier, QmfQuery, QmfQueryPredicate, MsgKey,
SchemaClassId, SchemaClass, QmfData)
-from qmfConsole import Console
+from qmf.qmfConsole import Console
class ExampleNotifier(Notifier):