diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-01-20 14:35:45 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-01-20 14:35:45 +0000 |
commit | b8f785cb02c988135f84128b9f930d79827270d4 (patch) | |
tree | 024236e7892561499f30976e226d01b06a07dbcb | |
parent | 9ebba0e9a3c32d9263365140ae7066ff6f3ef38c (diff) | |
download | qpid-python-b8f785cb02c988135f84128b9f930d79827270d4.tar.gz |
JIRA QPID-2261
Checkpoint:
*) added to python test infrastructure:
agent discovery tests
basic query tests
basic method call tests
You can run the tests from the python directory using the following command:
./qpid-python-test -m qmf.test
*) misc fixes to bugs flushed out by tests
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmfv2@901217 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/python/qmf/qmfAgent.py | 76 | ||||
-rw-r--r-- | qpid/python/qmf/qmfCommon.py | 20 | ||||
-rw-r--r-- | qpid/python/qmf/qmfConsole.py | 137 | ||||
-rw-r--r-- | qpid/python/qmf/test/agent_test.py | 229 | ||||
-rw-r--r-- | qpid/python/qmf/test/console_test.py | 21 |
5 files changed, 351 insertions, 132 deletions
diff --git a/qpid/python/qmf/qmfAgent.py b/qpid/python/qmf/qmfAgent.py index 89be722833..59382f36ab 100644 --- a/qpid/python/qmf/qmfAgent.py +++ b/qpid/python/qmf/qmfAgent.py @@ -100,17 +100,32 @@ class Agent(Thread): self._work_q = Queue.Queue() self._work_q_put = False + + def destroy(self, timeout=None): + """ + Must be called before the Agent is deleted. + Frees up all resources and shuts down all background threads. + + @type timeout: float + @param timeout: maximum time in seconds to wait for all background threads to terminate. Default: forever. + """ + logging.debug("Destroying Agent %s" % self.name) + if self._conn: + self.remove_connection(timeout) + logging.debug("Agent Destroyed") + + def get_name(self): return self.name - def setConnection(self, conn): + def set_connection(self, conn): my_addr = QmfAddress.direct(self.name, self._domain) locate_addr = QmfAddress.topic(AMQP_QMF_AGENT_LOCATE, self._domain) ind_addr = QmfAddress.topic(AMQP_QMF_AGENT_INDICATION, self._domain) - logging.error("my direct addr=%s" % my_addr) - logging.error("agent.locate addr=%s" % locate_addr) - logging.error("agent.ind addr=%s" % ind_addr) + logging.debug("my direct addr=%s" % my_addr) + logging.debug("agent.locate addr=%s" % locate_addr) + logging.debug("agent.ind addr=%s" % ind_addr) self._conn = conn self._session = self._conn.session() @@ -127,7 +142,32 @@ class Agent(Thread): self._running = True self.start() - + + def remove_connection(self, timeout=None): + # tell connection thread to shutdown + self._running = False + if self.isAlive(): + # kick my thread to wake it up + my_addr = QmfAddress.direct(self.name, self._domain) + logging.debug("Making temp sender for [%s]" % str(my_addr)) + tmp_sender = self._session.sender(str(my_addr)) + try: + msg = Message(subject=makeSubject(OpCode.noop)) + tmp_sender.send( msg, sync=True ) + except SendError, e: + logging.error(str(e)) + logging.debug("waiting for agent receiver thread to exit") + self.join(timeout) + if self.isAlive(): + logging.error( "Agent thread '%s' is hung..." % self.name) + self._direct_receiver.close() + self._locate_receiver.close() + self._ind_sender.close() + self._session.close() + self._session = None + self._conn = None + logging.debug("agent connection removal complete") + def register_object_class(self, schema): """ Register an instance of a SchemaClass with this agent @@ -177,6 +217,14 @@ class Agent(Thread): finally: self._lock.release() + def get_object(self, id): + self._lock.acquire() + try: + data = self._agent_data.get(id) + finally: + self._lock.release() + return data + def method_response(self, handle, _out_args=None, _error=None): """ @@ -233,6 +281,7 @@ class Agent(Thread): def run(self): global _callback_thread next_heartbeat = datetime.datetime.utcnow() + batch_limit = 10 # a guess while self._running: now = datetime.datetime.utcnow() @@ -249,7 +298,7 @@ class Agent(Thread): except Empty: continue - while True: + for i in range(batch_limit): try: msg = self._locate_receiver.fetch(timeout=0) except Empty: @@ -257,7 +306,7 @@ class Agent(Thread): if msg and msg.content_type == "amqp/map": self._dispatch(msg, _direct=False) - while True: + for i in range(batch_limit): try: msg = self._direct_receiver.fetch(timeout=0) except Empty: @@ -294,7 +343,7 @@ class Agent(Thread): @param _direct: True if msg directly addressed to this agent. """ - logging.error( "Message received from Console! [%s]" % msg ) + logging.debug( "Message received from Console! [%s]" % msg ) try: version,opcode = parseSubject(msg.subject) except: @@ -564,12 +613,17 @@ class QmfAgentData(QmfData): super(QmfAgentData, self).set_value(_name, _value, _subType) # @todo: publish change - def inc_value(self, name, delta): + def inc_value(self, name, delta=1): """ add the delta to the property """ # @todo: need to take write-lock - logging.error(" TBD!!!") + val = self.get_value(name) + try: + val += delta + except: + raise + self.set_value(name, val) - def dec_value(self, name, delta): + def dec_value(self, name, delta=1): """ subtract the delta from the property """ # @todo: need to take write-lock logging.error(" TBD!!!") diff --git a/qpid/python/qmf/qmfCommon.py b/qpid/python/qmf/qmfCommon.py index 580d86b7a7..24a54691fc 100644 --- a/qpid/python/qmf/qmfCommon.py +++ b/qpid/python/qmf/qmfCommon.py @@ -1143,24 +1143,44 @@ class qmfTypes(object): TYPE_UINT16 = 2 TYPE_UINT32 = 3 TYPE_UINT64 = 4 + TYPE_SSTR = 6 TYPE_LSTR = 7 + TYPE_ABSTIME = 8 TYPE_DELTATIME = 9 + TYPE_REF = 10 + TYPE_BOOL = 11 + TYPE_FLOAT = 12 TYPE_DOUBLE = 13 + TYPE_UUID = 14 + TYPE_MAP = 15 + TYPE_INT8 = 16 TYPE_INT16 = 17 TYPE_INT32 = 18 TYPE_INT64 = 19 + TYPE_OBJECT = 20 + TYPE_LIST = 21 + TYPE_ARRAY = 22 +# New subtypes: +# integer (for time, duration, signed/unsigned) +# double (float) +# bool +# string +# map (ref, qmfdata) +# list +# uuid + class qmfAccess(object): READ_CREATE = 1 diff --git a/qpid/python/qmf/qmfConsole.py b/qpid/python/qmf/qmfConsole.py index 97acdd767e..291f596400 100644 --- a/qpid/python/qmf/qmfConsole.py +++ b/qpid/python/qmf/qmfConsole.py @@ -64,7 +64,6 @@ class _Mailbox(object): self._msgs.append(obj) # if was empty, notify waiters if len(self._msgs) == 1: - logging.error("Delivering @ %s" % time.time()) self._cv.notify() finally: self._cv.release() @@ -118,7 +117,7 @@ class SequencedWaiter(object): self.lock.acquire() try: if seq in self.pending: - logging.error("Putting seq %d @ %s" % (seq,time.time())) + # logging.error("Putting seq %d @ %s" % (seq,time.time())) self.pending[seq].deliver(new_data) else: logging.error( "seq %d not found!" % seq ) @@ -242,8 +241,29 @@ class QmfConsoleData(QmfData): request that the Agent update the value of this object's contents. """ - logging.error(" TBD!!!") - return None + if _reply_handle is not None: + logging.error(" ASYNC REFRESH TBD!!!") + return None + + assert self._agent + assert self._agent._console + + if _timeout is None: + _timeout = self._agent._console._reply_timeout + + + # create query to agent using this objects ID + oid = self.get_object_id() + query = QmfQuery.create_id(QmfQuery.TARGET_OBJECT, + self.get_object_id()) + obj_list = self._agent._console.doQuery(self._agent, query, + timeout=_timeout) + if obj_list is None or len(obj_list) != 1: + return None + + self._update(obj_list[0]) + return self + def invoke_method(self, name, _in_args={}, _reply_handle=None, _timeout=None): @@ -267,7 +287,7 @@ class QmfConsoleData(QmfData): # validate ms = self._schema.get_method(name) if ms is None: - raise ValueError("Method '%s' is undefined." % ms) + raise ValueError("Method '%s' is undefined." % name) for aname,prop in ms.get_arguments().iteritems(): if aname not in _in_args: @@ -326,7 +346,12 @@ class QmfConsoleData(QmfData): else: return MethodResult(_out_args=_map.get(SchemaMethod.KEY_ARGUMENTS)) - + def _update(self, newer): + super(QmfConsoleData,self).__init__(_values=newer._values, _subtypes=newer._subtypes, + _tag=newer._tag, _object_id=newer._object_id, + _ctime=newer._ctime, _utime=newer._utime, + _dtime=newer._dtime, + _schema=newer._schema, _const=True) class QmfLocalData(QmfData): """ @@ -637,15 +662,15 @@ class Console(Thread): " x-properties:" " {type:direct}}}", capacity=1) - logging.error("local addr=%s" % self._address) + logging.debug("local addr=%s" % self._address) ind_addr = QmfAddress.topic(AMQP_QMF_AGENT_INDICATION, self._domain) - logging.error("agent.ind addr=%s" % ind_addr) + logging.debug("agent.ind addr=%s" % ind_addr) self._announce_recvr = self._session.receiver(str(ind_addr) + ";{create:always," " node-properties:{type:topic}}", capacity=1) locate_addr = QmfAddress.topic(AMQP_QMF_AGENT_LOCATE, self._domain) - logging.error("agent.locate addr=%s" % locate_addr) + logging.debug("agent.locate addr=%s" % locate_addr) self._locate_sender = self._session.sender(str(locate_addr) + ";{create:always," " node-properties:{type:topic}}") @@ -713,9 +738,9 @@ class Console(Thread): finally: self._lock.release() - def findAgent(self, name, timeout=None ): + def find_agent(self, name, timeout=None ): """ - Given the id of a particular agent, return an instance of class Agent + Given the name of a particular agent, return an instance of class Agent representing that agent. Return None if the agent does not exist. """ @@ -891,7 +916,7 @@ class Console(Thread): if self._next_agent_expire > now: timeout = ((self._next_agent_expire - now) + datetime.timedelta(microseconds=999999)).seconds try: - logging.error("waiting for next rcvr (timeout=%s)..." % timeout) + logging.debug("waiting for next rcvr (timeout=%s)..." % timeout) self._session.next_receiver(timeout = timeout) except Empty: pass @@ -899,6 +924,78 @@ class Console(Thread): logging.debug("Shutting down Console thread") + def get_objects(self, + _schema_id=None, + _pname=None, _cname=None, + _object_id=None, + _agents=None, + _timeout=None): + """ + @todo + """ + if _object_id is not None: + # query by object id + query = QmfQuery.create_id(QmfQuery.TARGET_OBJECT, _object_id) + elif _schema_id is not None: + pred = QmfQueryPredicate({QmfQuery.CMP_EQ: + [QmfData.KEY_SCHEMA_ID, + _schema_id.map_encode()]}) + query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT, pred) + elif _pname is not None: + # query by package name (and maybe class name) + if _cname is not None: + pred = QmfQueryPredicate({QmfQuery.LOGIC_AND: + [{QmfQuery.CMP_EQ: + [SchemaClassId.KEY_PACKAGE, + _pname]}, + {QmfQuery.CMP_EQ: + [SchemaClassId.KEY_CLASS, + _cname]}]}) + else: + pred = QmfQueryPredicate({QmfQuery.CMP_EQ: + [SchemaClassId.KEY_PACKAGE, + _pname]}) + query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT, pred) + + else: + raise Exception("invalid arguments") + + if _agents is None: + # use copy of current agent list + self._lock.acquire() + try: + agent_list = self._agent_map.values() + finally: + self._lock.release() + elif isinstance(_agents, Agent): + agent_list = [_agents] + else: + agent_list = _agents + # @todo validate this list! + + # @todo: fix when async doQuery done - query all agents at once, then + # wait for replies, instead of per-agent querying.... + + if _timeout is None: + _timeout = self._reply_timeout + + obj_list = [] + expired = datetime.datetime.utcnow() + datetime.timedelta(seconds=_timeout) + for agent in agent_list: + if not agent.isActive(): + continue + now = datetime.datetime.utcnow() + if now >= expired: + break + timeout = ((expired - now) + datetime.timedelta(microseconds=999999)).seconds + reply = self.doQuery(agent, query, timeout) + if reply: + obj_list = obj_list + reply + + if obj_list: + return obj_list + return None + # called by run() thread ONLY @@ -908,7 +1005,7 @@ class Console(Thread): PRIVATE: Process a message received from an Agent """ - logging.error( "Message received from Agent! [%s]" % msg ) + logging.debug( "Message received from Agent! [%s]" % msg ) try: version,opcode = parseSubject(msg.subject) @@ -995,14 +1092,14 @@ class Console(Thread): self._lock.release() if old_timestamp == None and matched: - logging.error("AGENT_ADDED for %s (%s)" % (agent, time.time())) + logging.debug("AGENT_ADDED for %s (%s)" % (agent, time.time())) wi = WorkItem(WorkItem.AGENT_ADDED, None, {"agent": agent}) self._work_q.put(wi) self._work_q_put = True if correlated: # wake up all waiters - logging.error("waking waiters for correlation id %s" % msg.correlation_id) + logging.debug("waking waiters for correlation id %s" % msg.correlation_id) self._req_correlation.put_data(msg.correlation_id, msg) @@ -1015,11 +1112,12 @@ class Console(Thread): logging.debug("_handleDataIndMsg '%s' (%s)" % (msg, time.time())) if not self._req_correlation.isValid(msg.correlation_id): - logging.error("FIXME: uncorrelated data indicate??? msg='%s'" % str(msg)) + logging.debug("Data indicate received with unknown correlation_id" + " msg='%s'" % str(msg)) return # wake up all waiters - logging.error("waking waiters for correlation id %s" % msg.correlation_id) + logging.debug("waking waiters for correlation id %s" % msg.correlation_id) self._req_correlation.put_data(msg.correlation_id, msg) @@ -1031,11 +1129,12 @@ class Console(Thread): logging.debug("_handleResponseMsg '%s' (%s)" % (msg, time.time())) if not self._req_correlation.isValid(msg.correlation_id): - logging.error("FIXME: uncorrelated response??? msg='%s'" % str(msg)) + logging.debug("Response msg received with unknown correlation_id" + " msg='%s'" % str(msg)) return # wake up all waiters - logging.error("waking waiters for correlation id %s" % msg.correlation_id) + logging.debug("waking waiters for correlation id %s" % msg.correlation_id) self._req_correlation.put_data(msg.correlation_id, msg) diff --git a/qpid/python/qmf/test/agent_test.py b/qpid/python/qmf/test/agent_test.py index dd55b49316..127fbe2080 100644 --- a/qpid/python/qmf/test/agent_test.py +++ b/qpid/python/qmf/test/agent_test.py @@ -1,13 +1,31 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# import logging import time +import unittest from threading import Semaphore from qpid.messaging import * -from qmfCommon import (qmfTypes, SchemaProperty, SchemaObjectClass, QmfData, +from qmf.qmfCommon import (qmfTypes, SchemaProperty, SchemaObjectClass, QmfData, QmfEvent, SchemaMethod, Notifier, SchemaClassId, WorkItem) -from qmfAgent import (Agent, QmfAgentData) +from qmf.qmfAgent import (Agent, QmfAgentData) @@ -25,114 +43,125 @@ class ExampleNotifier(Notifier): + +class QmfTest(unittest.TestCase): + def test_begin(self): + print("!!! being test") + + def test_end(self): + print("!!! end test") + + # # An example agent application # -_notifier = ExampleNotifier() -_agent = Agent( "qmf.testAgent", _notifier=_notifier ) - -# Dynamically construct a class schema - -_schema = SchemaObjectClass( _classId=SchemaClassId("MyPackage", "MyClass"), - _desc="A test data schema", - _object_id_names=["index1", "index2"] ) -# add properties -_schema.add_property( "index1", SchemaProperty(qmfTypes.TYPE_UINT8)) -_schema.add_property( "index2", SchemaProperty(qmfTypes.TYPE_LSTR)) - -# these two properties are statistics -_schema.add_property( "query_count", SchemaProperty(qmfTypes.TYPE_UINT32)) -_schema.add_property( "method_call_count", SchemaProperty(qmfTypes.TYPE_UINT32)) - -# These two properties can be set via the method call -_schema.add_property( "set_string", SchemaProperty(qmfTypes.TYPE_LSTR)) -_schema.add_property( "set_int", SchemaProperty(qmfTypes.TYPE_UINT32)) - - -# add method -_meth = SchemaMethod( _desc="Method to set string and int in object." ) -_meth.add_argument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) ) -_meth.add_argument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) ) -_schema.add_method( "set_meth", _meth ) - -# Add schema to Agent - -_agent.register_object_class(_schema) - -# instantiate managed data objects matching the schema - -_obj1 = QmfAgentData( _agent, _schema=_schema ) -_obj1.set_value("index1", 100) -_obj1.set_value("index2", "a name" ) -_obj1.set_value("set_string", "UNSET") -_obj1.set_value("set_int", 0) -_obj1.set_value("query_count", 0) -_obj1.set_value("method_call_count", 0) -_agent.add_object( _obj1 ) - -_agent.add_object( QmfAgentData( _agent, _schema=_schema, - _values={"index1":99, - "index2": "another name", - "set_string": "UNSET", - "set_int": 0, - "query_count": 0, - "method_call_count": 0} )) - -# add an "unstructured" object to the Agent -_obj2 = QmfAgentData(_agent, _object_id="01545") -_obj2.set_value("field1", "a value") -_obj2.set_value("field2", 2) -_obj2.set_value("field3", {"a":1, "map":2, "value":3}) -_obj2.set_value("field4", ["a", "list", "value"]) -_agent.add_object(_obj2) - - -## Now connect to the broker - -_c = Connection("localhost") -_c.connect() -_agent.setConnection(_c) - -_error_data = QmfData.create({"code": -1, "description": "You made a boo-boo."}) - -_done = False -while not _done: - # try: - _notifier.waitForWork() - - _wi = _agent.get_next_workitem(timeout=0) - while _wi: - - if _wi.get_type() == WorkItem.METHOD_CALL: - mc = _wi.get_params() + +if __name__ == '__main__': + _notifier = ExampleNotifier() + _agent = Agent( "qmf.testAgent", _notifier=_notifier ) + + # Dynamically construct a class schema + + _schema = SchemaObjectClass( _classId=SchemaClassId("MyPackage", "MyClass"), + _desc="A test data schema", + _object_id_names=["index1", "index2"] ) + # add properties + _schema.add_property( "index1", SchemaProperty(qmfTypes.TYPE_UINT8)) + _schema.add_property( "index2", SchemaProperty(qmfTypes.TYPE_LSTR)) + + # these two properties are statistics + _schema.add_property( "query_count", SchemaProperty(qmfTypes.TYPE_UINT32)) + _schema.add_property( "method_call_count", SchemaProperty(qmfTypes.TYPE_UINT32)) + + # These two properties can be set via the method call + _schema.add_property( "set_string", SchemaProperty(qmfTypes.TYPE_LSTR)) + _schema.add_property( "set_int", SchemaProperty(qmfTypes.TYPE_UINT32)) + + + # add method + _meth = SchemaMethod( _desc="Method to set string and int in object." ) + _meth.add_argument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) ) + _meth.add_argument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) ) + _schema.add_method( "set_meth", _meth ) + + # Add schema to Agent + + _agent.register_object_class(_schema) + + # instantiate managed data objects matching the schema + + _obj1 = QmfAgentData( _agent, _schema=_schema ) + _obj1.set_value("index1", 100) + _obj1.set_value("index2", "a name" ) + _obj1.set_value("set_string", "UNSET") + _obj1.set_value("set_int", 0) + _obj1.set_value("query_count", 0) + _obj1.set_value("method_call_count", 0) + _agent.add_object( _obj1 ) + + _agent.add_object( QmfAgentData( _agent, _schema=_schema, + _values={"index1":99, + "index2": "another name", + "set_string": "UNSET", + "set_int": 0, + "query_count": 0, + "method_call_count": 0} )) + + # add an "unstructured" object to the Agent + _obj2 = QmfAgentData(_agent, _object_id="01545") + _obj2.set_value("field1", "a value") + _obj2.set_value("field2", 2) + _obj2.set_value("field3", {"a":1, "map":2, "value":3}) + _obj2.set_value("field4", ["a", "list", "value"]) + _agent.add_object(_obj2) + + + ## Now connect to the broker + + _c = Connection("localhost") + _c.connect() + _agent.setConnection(_c) + + _error_data = QmfData.create({"code": -1, "description": "You made a boo-boo."}) + + _done = False + while not _done: + # try: + _notifier.waitForWork() + + _wi = _agent.get_next_workitem(timeout=0) + while _wi: + + if _wi.get_type() == WorkItem.METHOD_CALL: + mc = _wi.get_params() - if mc.get_name() == "set_meth": - print("!!! Calling 'set_meth' on Object_id = %s" % mc.get_object_id()) - print("!!! args='%s'" % str(mc.get_args())) - print("!!! userid=%s" % str(mc.get_user_id())) - print("!!! handle=%s" % _wi.get_handle()) - _agent.method_response(_wi.get_handle(), - {"rc1": 100, "rc2": "Success"}) + if mc.get_name() == "set_meth": + print("!!! Calling 'set_meth' on Object_id = %s" % mc.get_object_id()) + print("!!! args='%s'" % str(mc.get_args())) + print("!!! userid=%s" % str(mc.get_user_id())) + print("!!! handle=%s" % _wi.get_handle()) + _agent.method_response(_wi.get_handle(), + {"rc1": 100, "rc2": "Success"}) + else: + print("!!! Unknown Method name = %s" % mc.get_name()) + _agent.method_response(_wi.get_handle(), _error=_error_data) else: - print("!!! Unknown Method name = %s" % mc.get_name()) - _agent.method_response(_wi.get_handle(), _error=_error_data) - else: - print("TBD: work item %d:%s" % (_wi.get_type(), str(_wi.get_params()))) + print("TBD: work item %d:%s" % (_wi.get_type(), str(_wi.get_params()))) - _agent.release_workitem(_wi) - _wi = _agent.get_next_workitem(timeout=0) - # except: - # print( "shutting down...") - # _done = True + _agent.release_workitem(_wi) + _wi = _agent.get_next_workitem(timeout=0) + # except: + # print( "shutting down...") + # _done = True -print( "Removing connection... TBD!!!" ) -#_myConsole.remove_connection( _c, 10 ) + print( "Removing connection... TBD!!!" ) + #_myConsole.remove_connection( _c, 10 ) -print( "Destroying agent... TBD!!!" ) -#_myConsole.destroy( 10 ) + print( "Destroying agent... TBD!!!" ) + #_myConsole.destroy( 10 ) -print( "******** agent test done ********" ) + print( "******** agent test done ********" ) diff --git a/qpid/python/qmf/test/console_test.py b/qpid/python/qmf/test/console_test.py index a7703e24a6..7077f32507 100644 --- a/qpid/python/qmf/test/console_test.py +++ b/qpid/python/qmf/test/console_test.py @@ -1,12 +1,29 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# import logging import time from threading import Semaphore from qpid.messaging import * -from qmfCommon import (Notifier, QmfQuery, QmfQueryPredicate, MsgKey, +from qmf.qmfCommon import (Notifier, QmfQuery, QmfQueryPredicate, MsgKey, SchemaClassId, SchemaClass, QmfData) -from qmfConsole import Console +from qmf.qmfConsole import Console class ExampleNotifier(Notifier): |