diff options
-rw-r--r-- | qpid/python/qmf/console.py | 60 | ||||
-rw-r--r-- | qpid/python/qpid/testlib.py | 4 | ||||
-rw-r--r-- | qpid/python/tests_0-10/management.py | 71 | ||||
-rw-r--r-- | qpid/ruby/lib/qpid/delegates.rb | 14 | ||||
-rw-r--r-- | qpid/ruby/lib/qpid/qmf.rb | 61 | ||||
-rw-r--r-- | qpid/ruby/tests/qmf.rb | 86 |
6 files changed, 243 insertions, 53 deletions
diff --git a/qpid/python/qmf/console.py b/qpid/python/qmf/console.py index 3b99595f1f..ef2ab264eb 100644 --- a/qpid/python/qmf/console.py +++ b/qpid/python/qmf/console.py @@ -77,15 +77,15 @@ class Console: pass def heartbeat(self, agent, timestamp): - """ """ + """ Invoked when an agent heartbeat is received. """ pass def brokerInfo(self, broker): - """ """ + """ Invoked when the connection sequence reaches the point where broker information is available. """ pass def methodResponse(self, broker, seq, response): - """ """ + """ Invoked when a method response from an asynchronous method call is received. """ pass class BrokerURL(URL): @@ -117,7 +117,7 @@ class Session: _CONTEXT_STARTUP = 2 _CONTEXT_MULTIGET = 3 - GET_WAIT_TIME = 60 + DEFAULT_GET_WAIT_TIME = 60 def __init__(self, console=None, rcvObjects=True, rcvEvents=True, rcvHeartbeats=True, manageConnections=False, userBindings=False): @@ -284,6 +284,11 @@ class Session: _broker = <broker> - supply a broker as returned by addBroker. + The default timeout for this synchronous operation is 60 seconds. To change the timeout, + use the following argument: + + _timeout = <time in seconds> + If additional arguments are supplied, they are used as property selectors. For example, if the argument name="test" is supplied, only objects whose "name" property is "test" will be returned in the result. @@ -365,11 +370,15 @@ class Session: starttime = time() timeout = False + if "_timeout" in kwargs: + waitTime = kwargs["_timeout"] + else: + waitTime = self.DEFAULT_GET_WAIT_TIME try: self.cv.acquire() while len(self.syncSequenceList) > 0 and self.error == None: - self.cv.wait(self.GET_WAIT_TIME) - if time() - starttime > self.GET_WAIT_TIME: + self.cv.wait(waitTime) + if time() - starttime > waitTime: for pendingSeq in self.syncSequenceList: self.seqMgr._release(pendingSeq) self.syncSequenceList = [] @@ -498,7 +507,10 @@ class Session: code = codec.read_uint32() text = codec.read_str16() outArgs = {} - method, synchronous = self.seqMgr._release(seq) + pair = self.seqMgr._release(seq) + if pair == None: + return + method, synchronous = pair if code == 0: for arg in method.arguments: if arg.dir.find("O") != -1: @@ -1083,7 +1095,7 @@ class Object(object): return value raise Exception("Type Object has no attribute '%s'" % name) - def _sendMethodRequest(self, name, args, kwargs, synchronous=False): + def _sendMethodRequest(self, name, args, kwargs, synchronous=False, timeWait=None): for method in self._schema.getMethods(): if name == method.name: aIdx = 0 @@ -1105,8 +1117,13 @@ class Object(object): if arg.dir.find("I") != -1: self._session._encodeValue(sendCodec, args[aIdx], arg.type) aIdx += 1 + if timeWait: + ttl = timeWait * 1000 + else: + ttl = None smsg = self._broker._message(sendCodec.encoded, "agent.%d.%d" % - (self._objectId.getBrokerBank(), self._objectId.getAgentBank())) + (self._objectId.getBrokerBank(), self._objectId.getAgentBank()), + ttl=ttl) if synchronous: try: self._broker.cv.acquire() @@ -1118,13 +1135,28 @@ class Object(object): return None def _invoke(self, name, args, kwargs): - if self._sendMethodRequest(name, args, kwargs, True): + if "_timeout" in kwargs: + timeout = kwargs["_timeout"] + else: + timeout = self._broker.SYNC_TIME + + if "_async" in kwargs and kwargs["_async"]: + sync = False + if "_timeout" not in kwargs: + timeout = None + else: + sync = True + + seq = self._sendMethodRequest(name, args, kwargs, sync, timeout) + if seq: + if not sync: + return seq try: self._broker.cv.acquire() starttime = time() while self._broker.syncInFlight and self._broker.error == None: - self._broker.cv.wait(self._broker.SYNC_TIME) - if time() - starttime > self._broker.SYNC_TIME: + self._broker.cv.wait(timeout) + if time() - starttime > timeout: self._session.seqMgr._release(seq) raise RuntimeError("Timed out waiting for method to respond") finally: @@ -1407,9 +1439,11 @@ class Broker: except: return None, None - def _message (self, body, routing_key="broker"): + def _message (self, body, routing_key="broker", ttl=None): dp = self.amqpSession.delivery_properties() dp.routing_key = routing_key + if ttl: + dp.ttl = ttl mp = self.amqpSession.message_properties() mp.content_type = "x-application/qmf" mp.reply_to = self.amqpSession.reply_to("amq.direct", self.replyName) diff --git a/qpid/python/qpid/testlib.py b/qpid/python/qpid/testlib.py index 7f5ac1fcd2..114a56de08 100644 --- a/qpid/python/qpid/testlib.py +++ b/qpid/python/qpid/testlib.py @@ -365,8 +365,8 @@ class TestBase010(unittest.TestCase): self.session = self.conn.session("test-session", timeout=10) self.qmf = None - def startQmf(self): - self.qmf = qmf.console.Session() + def startQmf(self, handler=None): + self.qmf = qmf.console.Session(handler) self.qmf_broker = self.qmf.addBroker(str(testrunner.url)) def connect(self, host=None, port=None): diff --git a/qpid/python/tests_0-10/management.py b/qpid/python/tests_0-10/management.py index 545dc3db3b..29394c6a7d 100644 --- a/qpid/python/tests_0-10/management.py +++ b/qpid/python/tests_0-10/management.py @@ -20,6 +20,9 @@ from qpid.datatypes import Message, RangedSet from qpid.testlib import TestBase010 from qpid.management import managementChannel, managementClient +from threading import Condition +from time import sleep +import qmf.console class ManagementTest (TestBase010): """ @@ -52,7 +55,7 @@ class ManagementTest (TestBase010): self.assertEqual (res.body, body) mc.removeChannel (mch) - def test_broker_connectivity (self): + def test_methods_sync (self): """ Call the "echo" method on the broker to verify it is alive and talking. """ @@ -60,16 +63,16 @@ class ManagementTest (TestBase010): self.startQmf() brokers = self.qmf.getObjects(_class="broker") - self.assertEqual (len(brokers), 1) + self.assertEqual(len(brokers), 1) broker = brokers[0] body = "Echo Message Body" - for seq in range (1, 10): + for seq in range(1, 20): res = broker.echo(seq, body) - self.assertEqual (res.status, 0) - self.assertEqual (res.text, "OK") - self.assertEqual (res.sequence, seq) - self.assertEqual (res.body, body) + self.assertEqual(res.status, 0) + self.assertEqual(res.text, "OK") + self.assertEqual(res.sequence, seq) + self.assertEqual(res.body, body) def test_get_objects(self): self.startQmf() @@ -238,3 +241,57 @@ class ManagementTest (TestBase010): pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0] self.assertEqual (pq.msgDepth,0) + def test_methods_async (self): + """ + """ + class Handler (qmf.console.Console): + def __init__(self): + self.cv = Condition() + self.xmtList = {} + self.rcvList = {} + + def methodResponse(self, broker, seq, response): + self.cv.acquire() + try: + self.rcvList[seq] = response + finally: + self.cv.release() + + def request(self, broker, count): + self.count = count + for idx in range(count): + self.cv.acquire() + try: + seq = broker.echo(idx, "Echo Message", _async = True) + self.xmtList[seq] = idx + finally: + self.cv.release() + + def check(self): + if self.count != len(self.xmtList): + return "fail (attempted send=%d, actual sent=%d)" % (self.count, len(self.xmtList)) + lost = 0 + mismatched = 0 + for seq in self.xmtList: + value = self.xmtList[seq] + if seq in self.rcvList: + result = self.rcvList.pop(seq) + if result.sequence != value: + mismatched += 1 + else: + lost += 1 + spurious = len(self.rcvList) + if lost == 0 and mismatched == 0 and spurious == 0: + return "pass" + else: + return "fail (lost=%d, mismatch=%d, spurious=%d)" % (lost, mismatched, spurious) + + handler = Handler() + self.startQmf(handler) + brokers = self.qmf.getObjects(_class="broker") + self.assertEqual(len(brokers), 1) + broker = brokers[0] + handler.request(broker, 20) + sleep(1) + self.assertEqual(handler.check(), "pass") + diff --git a/qpid/ruby/lib/qpid/delegates.rb b/qpid/ruby/lib/qpid/delegates.rb index 171f310e48..8d866e895f 100644 --- a/qpid/ruby/lib/qpid/delegates.rb +++ b/qpid/ruby/lib/qpid/delegates.rb @@ -200,10 +200,16 @@ module Qpid start.mechanisms.each do |m| mech_list += m + " " end - resp = Sasl.client_start(@saslConn, mech_list) - ch.connection_start_ok(:client_properties => PROPERTIES, - :mechanism => resp[2], - :response => resp[1]) + begin + resp = Sasl.client_start(@saslConn, mech_list) + ch.connection_start_ok(:client_properties => PROPERTIES, + :mechanism => resp[2], + :response => resp[1]) + rescue exception + ch.connection_close(:message => $!.message) + @connection.failed = true + @connection.signal + end end def connection_secure(ch, secure) diff --git a/qpid/ruby/lib/qpid/qmf.rb b/qpid/ruby/lib/qpid/qmf.rb index ee165305c3..ebca7ee5ab 100644 --- a/qpid/ruby/lib/qpid/qmf.rb +++ b/qpid/ruby/lib/qpid/qmf.rb @@ -58,9 +58,14 @@ module Qpid::Qmf # Invoked when an event is raised def event(broker, event); end + # Invoked when an agent heartbeat is received. def heartbeat(agent, timestamp); end + # Invoked when the connection sequence reaches the point where broker information is available. def broker_info(broker); end + + # Invoked when a method response from an asynchronous method call is received. + def method_response(broker, seq, response); end end class BrokerURL @@ -105,7 +110,7 @@ module Qpid::Qmf CONTEXT_STARTUP = 2 CONTEXT_MULTIGET = 3 - GET_WAIT_TIME = 60 + DEFAULT_GET_WAIT_TIME = 60 include MonitorMixin @@ -305,11 +310,17 @@ module Qpid::Qmf # Otherwise, the query will go to all agents. # # :agent = <agent> - supply an agent from the list returned by getAgents. + # # If the get query is to be restricted to one broker (as opposed to # all connected brokers), add the following argument: # # :broker = <broker> - supply a broker as returned by addBroker. # + # The default timeout for this synchronous operation is 60 seconds. To change the timeout, + # use the following argument: + # + # :_timeout = <time in seconds> + # # If additional arguments are supplied, they are used as property # selectors, as long as their keys are strings. For example, if # the argument "name" => "test" is supplied, only objects whose @@ -389,9 +400,13 @@ module Qpid::Qmf end timeout = false + if kwargs.include?(:_timeout) + wait_time = kwargs[:_timeout] + else + wait_time = DEFAULT_GET_WAIT_TIME + end synchronize do - unless @cv.wait_for(GET_WAIT_TIME) { - @sync_sequence_list.empty? || @error } + unless @cv.wait_for(wait_time) { @sync_sequence_list.empty? || @error } @sync_sequence_list.each do |pending_seq| @seq_mgr.release(pending_seq) end @@ -504,10 +519,11 @@ module Qpid::Qmf def handle_method_resp(broker, codec, seq) code = codec.read_uint32 - text = codec.read_str16 out_args = {} - method, synchronous = @seq_mgr.release(seq) + pair = @seq_mgr.release(seq) + return unless pair + method, synchronous = pair if code == 0 method.arguments.each do |arg| if arg.dir.index(?O) @@ -1054,7 +1070,7 @@ module Qpid::Qmf private - def send_method_request(method, name, args, synchronous = false) + def send_method_request(method, name, args, synchronous = false, time_wait = nil) @schema.methods.each do |schema_method| if name == schema_method.name send_codec = Qpid::StringCodec.new(@broker.conn.spec) @@ -1077,9 +1093,9 @@ module Qpid::Qmf @session.encode_value(send_codec, actual, formal.type) end + ttl = time_wait ? time_wait * 1000 : nil smsg = @broker.message(send_codec.encoded, - "agent.#{object_id.broker_bank}.#{object_id.agent_bank}") - + "agent.#{object_id.broker_bank}.#{object_id.agent_bank}", ttl=ttl) @broker.sync_start if synchronous @broker.emit(smsg) @@ -1089,8 +1105,25 @@ module Qpid::Qmf end def invoke(method, name, args) - if send_method_request(method, name, args, synchronous = true) - unless @broker.wait_for_sync_done + kwargs = args[args.size - 1] + sync = true + timeout = nil + + if kwargs.class == Hash + if kwargs.include?(:_timeout) + timeout = kwargs[:_timeout] + end + + if kwargs.include?(:_async) + sync = !kwargs[:_async] + end + args.pop + end + + seq = send_method_request(method, name, args, synchronous = sync) + if seq + return seq unless sync + unless @broker.wait_for_sync_done(timeout) @session.seq_mgr.release(seq) raise "Timed out waiting for method to respond" end @@ -1284,9 +1317,10 @@ module Qpid::Qmf end end - def wait_for_sync_done + def wait_for_sync_done(timeout=nil) + wait_time = timeout ? timeout : SYNC_TIME synchronize do - return @cv.wait_for(SYNC_TIME) { ! @sync_in_flight || @error } + return @cv.wait_for(wait_time) { ! @sync_in_flight || @error } end end @@ -1309,9 +1343,10 @@ module Qpid::Qmf codec.write_uint32(seq) end - def message(body, routing_key="broker") + def message(body, routing_key="broker", ttl=nil) dp = @amqp_session.delivery_properties dp.routing_key = routing_key + dp.ttl = ttl if ttl mp = @amqp_session.message_properties mp.content_type = "x-application/qmf" mp.reply_to = amqp_session.reply_to("amq.direct", @reply_name) diff --git a/qpid/ruby/tests/qmf.rb b/qpid/ruby/tests/qmf.rb index b0ee19c3d5..06371a5a82 100644 --- a/qpid/ruby/tests/qmf.rb +++ b/qpid/ruby/tests/qmf.rb @@ -21,31 +21,69 @@ require "test/unit" require "qpid" require "tests/util" require "socket" +require "monitor.rb" class QmfTest < Test::Unit::TestCase + class Handler < Qpid::Qmf::Console + include MonitorMixin + + def initialize + super() + @xmt_list = {} + @rcv_list = {} + end + + def method_response(broker, seq, response) + synchronize do + @rcv_list[seq] = response + end + end + + def request(broker, count) + @count = count + for idx in 0...count + synchronize do + seq = broker.echo(idx, "Echo Message", :_async => true) + @xmt_list[seq] = idx + end + end + end + + def check + return "fail (attempted send=%d, actual sent=%d)" % [@count, @xmt_list.size] unless @count == @xmt_list.size + lost = 0 + mismatched = 0 + @xmt_list.each do |seq, value| + if @rcv_list.include?(seq) + result = @rcv_list.delete(seq) + mismatch += 1 unless result.sequence == value + else + lost += 1 + end + end + spurious = @rcv_list.size + if lost == 0 and mismatched == 0 and spurious == 0 + return "pass" + else + return "fail (lost=%d, mismatch=%d, spurious=%d)" % [lost, mismatched, spurious] + end + end + end + def setup() # Make sure errors in threads lead to a noisy death of the test Thread.abort_on_exception = true - host = ENV.fetch("QMF_TEST_HOST", 'localhost') - port = ENV.fetch("QMF_TEST_PORT", 5672) + @host = ENV.fetch("QMF_TEST_HOST", 'localhost') + @port = ENV.fetch("QMF_TEST_PORT", 5672) - sock = TCPSocket.new(host, port) + sock = TCPSocket.new(@host, @port) @conn = Qpid::Connection.new(sock) @conn.start() @session = @conn.session("test-session") - - # It's a bit odd that we're using two connections but that's the way - # the python one works afaict. - @qmf = Qpid::Qmf::Session.new() - @qmf_broker = @qmf.add_broker("amqp://%s:%d" % [host, port]) - - brokers = @qmf.objects(:class => "broker") - assert_equal(1, brokers.length) - @broker = brokers[0] end def teardown @@ -58,10 +96,20 @@ class QmfTest < Test::Unit::TestCase end end - def test_broker_connectivity() + def start_qmf(kwargs = {}) + @qmf = Qpid::Qmf::Session.new(kwargs) + @qmf_broker = @qmf.add_broker("amqp://%s:%d" % [@host, @port]) + + brokers = @qmf.objects(:class => "broker") + assert_equal(1, brokers.length) + @broker = brokers[0] + end + + def test_methods_sync() + start_qmf body = "Echo Message Body" for seq in 1..10 - res = @broker.echo(seq, body) + res = @broker.echo(seq, body, :_timeout => 10) assert_equal(0, res.status) assert_equal("OK", res.text) assert_equal(seq, res.sequence) @@ -69,6 +117,14 @@ class QmfTest < Test::Unit::TestCase end end + def test_methods_async() + handler = Handler.new + start_qmf(:console => handler) + handler.request(@broker, 20) + sleep(1) + assert_equal("pass", handler.check) + end + def test_move_queued_messages() """ Test ability to move messages from the head of one queue to another. @@ -76,6 +132,7 @@ class QmfTest < Test::Unit::TestCase """ "Set up source queue" + start_qmf @session.queue_declare(:queue => "src-queue", :exclusive => true, :auto_delete => true) @session.exchange_bind(:queue => "src-queue", :exchange => "amq.direct", :binding_key => "routing_key") @@ -151,6 +208,7 @@ class QmfTest < Test::Unit::TestCase # Test ability to purge messages from the head of a queue. Need to test # moveing all, 1 (top message) and N messages. def test_purge_queue + start_qmf # Set up purge queue" @session.queue_declare(:queue => "purge-queue", :exclusive => true, |