summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/python/qmf/console.py60
-rw-r--r--qpid/python/qpid/testlib.py4
-rw-r--r--qpid/python/tests_0-10/management.py71
-rw-r--r--qpid/ruby/lib/qpid/delegates.rb14
-rw-r--r--qpid/ruby/lib/qpid/qmf.rb61
-rw-r--r--qpid/ruby/tests/qmf.rb86
6 files changed, 243 insertions, 53 deletions
diff --git a/qpid/python/qmf/console.py b/qpid/python/qmf/console.py
index 3b99595f1f..ef2ab264eb 100644
--- a/qpid/python/qmf/console.py
+++ b/qpid/python/qmf/console.py
@@ -77,15 +77,15 @@ class Console:
pass
def heartbeat(self, agent, timestamp):
- """ """
+ """ Invoked when an agent heartbeat is received. """
pass
def brokerInfo(self, broker):
- """ """
+ """ Invoked when the connection sequence reaches the point where broker information is available. """
pass
def methodResponse(self, broker, seq, response):
- """ """
+ """ Invoked when a method response from an asynchronous method call is received. """
pass
class BrokerURL(URL):
@@ -117,7 +117,7 @@ class Session:
_CONTEXT_STARTUP = 2
_CONTEXT_MULTIGET = 3
- GET_WAIT_TIME = 60
+ DEFAULT_GET_WAIT_TIME = 60
def __init__(self, console=None, rcvObjects=True, rcvEvents=True, rcvHeartbeats=True,
manageConnections=False, userBindings=False):
@@ -284,6 +284,11 @@ class Session:
_broker = <broker> - supply a broker as returned by addBroker.
+ The default timeout for this synchronous operation is 60 seconds. To change the timeout,
+ use the following argument:
+
+ _timeout = <time in seconds>
+
If additional arguments are supplied, they are used as property selectors. For example,
if the argument name="test" is supplied, only objects whose "name" property is "test"
will be returned in the result.
@@ -365,11 +370,15 @@ class Session:
starttime = time()
timeout = False
+ if "_timeout" in kwargs:
+ waitTime = kwargs["_timeout"]
+ else:
+ waitTime = self.DEFAULT_GET_WAIT_TIME
try:
self.cv.acquire()
while len(self.syncSequenceList) > 0 and self.error == None:
- self.cv.wait(self.GET_WAIT_TIME)
- if time() - starttime > self.GET_WAIT_TIME:
+ self.cv.wait(waitTime)
+ if time() - starttime > waitTime:
for pendingSeq in self.syncSequenceList:
self.seqMgr._release(pendingSeq)
self.syncSequenceList = []
@@ -498,7 +507,10 @@ class Session:
code = codec.read_uint32()
text = codec.read_str16()
outArgs = {}
- method, synchronous = self.seqMgr._release(seq)
+ pair = self.seqMgr._release(seq)
+ if pair == None:
+ return
+ method, synchronous = pair
if code == 0:
for arg in method.arguments:
if arg.dir.find("O") != -1:
@@ -1083,7 +1095,7 @@ class Object(object):
return value
raise Exception("Type Object has no attribute '%s'" % name)
- def _sendMethodRequest(self, name, args, kwargs, synchronous=False):
+ def _sendMethodRequest(self, name, args, kwargs, synchronous=False, timeWait=None):
for method in self._schema.getMethods():
if name == method.name:
aIdx = 0
@@ -1105,8 +1117,13 @@ class Object(object):
if arg.dir.find("I") != -1:
self._session._encodeValue(sendCodec, args[aIdx], arg.type)
aIdx += 1
+ if timeWait:
+ ttl = timeWait * 1000
+ else:
+ ttl = None
smsg = self._broker._message(sendCodec.encoded, "agent.%d.%d" %
- (self._objectId.getBrokerBank(), self._objectId.getAgentBank()))
+ (self._objectId.getBrokerBank(), self._objectId.getAgentBank()),
+ ttl=ttl)
if synchronous:
try:
self._broker.cv.acquire()
@@ -1118,13 +1135,28 @@ class Object(object):
return None
def _invoke(self, name, args, kwargs):
- if self._sendMethodRequest(name, args, kwargs, True):
+ if "_timeout" in kwargs:
+ timeout = kwargs["_timeout"]
+ else:
+ timeout = self._broker.SYNC_TIME
+
+ if "_async" in kwargs and kwargs["_async"]:
+ sync = False
+ if "_timeout" not in kwargs:
+ timeout = None
+ else:
+ sync = True
+
+ seq = self._sendMethodRequest(name, args, kwargs, sync, timeout)
+ if seq:
+ if not sync:
+ return seq
try:
self._broker.cv.acquire()
starttime = time()
while self._broker.syncInFlight and self._broker.error == None:
- self._broker.cv.wait(self._broker.SYNC_TIME)
- if time() - starttime > self._broker.SYNC_TIME:
+ self._broker.cv.wait(timeout)
+ if time() - starttime > timeout:
self._session.seqMgr._release(seq)
raise RuntimeError("Timed out waiting for method to respond")
finally:
@@ -1407,9 +1439,11 @@ class Broker:
except:
return None, None
- def _message (self, body, routing_key="broker"):
+ def _message (self, body, routing_key="broker", ttl=None):
dp = self.amqpSession.delivery_properties()
dp.routing_key = routing_key
+ if ttl:
+ dp.ttl = ttl
mp = self.amqpSession.message_properties()
mp.content_type = "x-application/qmf"
mp.reply_to = self.amqpSession.reply_to("amq.direct", self.replyName)
diff --git a/qpid/python/qpid/testlib.py b/qpid/python/qpid/testlib.py
index 7f5ac1fcd2..114a56de08 100644
--- a/qpid/python/qpid/testlib.py
+++ b/qpid/python/qpid/testlib.py
@@ -365,8 +365,8 @@ class TestBase010(unittest.TestCase):
self.session = self.conn.session("test-session", timeout=10)
self.qmf = None
- def startQmf(self):
- self.qmf = qmf.console.Session()
+ def startQmf(self, handler=None):
+ self.qmf = qmf.console.Session(handler)
self.qmf_broker = self.qmf.addBroker(str(testrunner.url))
def connect(self, host=None, port=None):
diff --git a/qpid/python/tests_0-10/management.py b/qpid/python/tests_0-10/management.py
index 545dc3db3b..29394c6a7d 100644
--- a/qpid/python/tests_0-10/management.py
+++ b/qpid/python/tests_0-10/management.py
@@ -20,6 +20,9 @@
from qpid.datatypes import Message, RangedSet
from qpid.testlib import TestBase010
from qpid.management import managementChannel, managementClient
+from threading import Condition
+from time import sleep
+import qmf.console
class ManagementTest (TestBase010):
"""
@@ -52,7 +55,7 @@ class ManagementTest (TestBase010):
self.assertEqual (res.body, body)
mc.removeChannel (mch)
- def test_broker_connectivity (self):
+ def test_methods_sync (self):
"""
Call the "echo" method on the broker to verify it is alive and talking.
"""
@@ -60,16 +63,16 @@ class ManagementTest (TestBase010):
self.startQmf()
brokers = self.qmf.getObjects(_class="broker")
- self.assertEqual (len(brokers), 1)
+ self.assertEqual(len(brokers), 1)
broker = brokers[0]
body = "Echo Message Body"
- for seq in range (1, 10):
+ for seq in range(1, 20):
res = broker.echo(seq, body)
- self.assertEqual (res.status, 0)
- self.assertEqual (res.text, "OK")
- self.assertEqual (res.sequence, seq)
- self.assertEqual (res.body, body)
+ self.assertEqual(res.status, 0)
+ self.assertEqual(res.text, "OK")
+ self.assertEqual(res.sequence, seq)
+ self.assertEqual(res.body, body)
def test_get_objects(self):
self.startQmf()
@@ -238,3 +241,57 @@ class ManagementTest (TestBase010):
pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
self.assertEqual (pq.msgDepth,0)
+ def test_methods_async (self):
+ """
+ """
+ class Handler (qmf.console.Console):
+ def __init__(self):
+ self.cv = Condition()
+ self.xmtList = {}
+ self.rcvList = {}
+
+ def methodResponse(self, broker, seq, response):
+ self.cv.acquire()
+ try:
+ self.rcvList[seq] = response
+ finally:
+ self.cv.release()
+
+ def request(self, broker, count):
+ self.count = count
+ for idx in range(count):
+ self.cv.acquire()
+ try:
+ seq = broker.echo(idx, "Echo Message", _async = True)
+ self.xmtList[seq] = idx
+ finally:
+ self.cv.release()
+
+ def check(self):
+ if self.count != len(self.xmtList):
+ return "fail (attempted send=%d, actual sent=%d)" % (self.count, len(self.xmtList))
+ lost = 0
+ mismatched = 0
+ for seq in self.xmtList:
+ value = self.xmtList[seq]
+ if seq in self.rcvList:
+ result = self.rcvList.pop(seq)
+ if result.sequence != value:
+ mismatched += 1
+ else:
+ lost += 1
+ spurious = len(self.rcvList)
+ if lost == 0 and mismatched == 0 and spurious == 0:
+ return "pass"
+ else:
+ return "fail (lost=%d, mismatch=%d, spurious=%d)" % (lost, mismatched, spurious)
+
+ handler = Handler()
+ self.startQmf(handler)
+ brokers = self.qmf.getObjects(_class="broker")
+ self.assertEqual(len(brokers), 1)
+ broker = brokers[0]
+ handler.request(broker, 20)
+ sleep(1)
+ self.assertEqual(handler.check(), "pass")
+
diff --git a/qpid/ruby/lib/qpid/delegates.rb b/qpid/ruby/lib/qpid/delegates.rb
index 171f310e48..8d866e895f 100644
--- a/qpid/ruby/lib/qpid/delegates.rb
+++ b/qpid/ruby/lib/qpid/delegates.rb
@@ -200,10 +200,16 @@ module Qpid
start.mechanisms.each do |m|
mech_list += m + " "
end
- resp = Sasl.client_start(@saslConn, mech_list)
- ch.connection_start_ok(:client_properties => PROPERTIES,
- :mechanism => resp[2],
- :response => resp[1])
+ begin
+ resp = Sasl.client_start(@saslConn, mech_list)
+ ch.connection_start_ok(:client_properties => PROPERTIES,
+ :mechanism => resp[2],
+ :response => resp[1])
+ rescue exception
+ ch.connection_close(:message => $!.message)
+ @connection.failed = true
+ @connection.signal
+ end
end
def connection_secure(ch, secure)
diff --git a/qpid/ruby/lib/qpid/qmf.rb b/qpid/ruby/lib/qpid/qmf.rb
index ee165305c3..ebca7ee5ab 100644
--- a/qpid/ruby/lib/qpid/qmf.rb
+++ b/qpid/ruby/lib/qpid/qmf.rb
@@ -58,9 +58,14 @@ module Qpid::Qmf
# Invoked when an event is raised
def event(broker, event); end
+ # Invoked when an agent heartbeat is received.
def heartbeat(agent, timestamp); end
+ # Invoked when the connection sequence reaches the point where broker information is available.
def broker_info(broker); end
+
+ # Invoked when a method response from an asynchronous method call is received.
+ def method_response(broker, seq, response); end
end
class BrokerURL
@@ -105,7 +110,7 @@ module Qpid::Qmf
CONTEXT_STARTUP = 2
CONTEXT_MULTIGET = 3
- GET_WAIT_TIME = 60
+ DEFAULT_GET_WAIT_TIME = 60
include MonitorMixin
@@ -305,11 +310,17 @@ module Qpid::Qmf
# Otherwise, the query will go to all agents.
#
# :agent = <agent> - supply an agent from the list returned by getAgents.
+ #
# If the get query is to be restricted to one broker (as opposed to
# all connected brokers), add the following argument:
#
# :broker = <broker> - supply a broker as returned by addBroker.
#
+ # The default timeout for this synchronous operation is 60 seconds. To change the timeout,
+ # use the following argument:
+ #
+ # :_timeout = <time in seconds>
+ #
# If additional arguments are supplied, they are used as property
# selectors, as long as their keys are strings. For example, if
# the argument "name" => "test" is supplied, only objects whose
@@ -389,9 +400,13 @@ module Qpid::Qmf
end
timeout = false
+ if kwargs.include?(:_timeout)
+ wait_time = kwargs[:_timeout]
+ else
+ wait_time = DEFAULT_GET_WAIT_TIME
+ end
synchronize do
- unless @cv.wait_for(GET_WAIT_TIME) {
- @sync_sequence_list.empty? || @error }
+ unless @cv.wait_for(wait_time) { @sync_sequence_list.empty? || @error }
@sync_sequence_list.each do |pending_seq|
@seq_mgr.release(pending_seq)
end
@@ -504,10 +519,11 @@ module Qpid::Qmf
def handle_method_resp(broker, codec, seq)
code = codec.read_uint32
-
text = codec.read_str16
out_args = {}
- method, synchronous = @seq_mgr.release(seq)
+ pair = @seq_mgr.release(seq)
+ return unless pair
+ method, synchronous = pair
if code == 0
method.arguments.each do |arg|
if arg.dir.index(?O)
@@ -1054,7 +1070,7 @@ module Qpid::Qmf
private
- def send_method_request(method, name, args, synchronous = false)
+ def send_method_request(method, name, args, synchronous = false, time_wait = nil)
@schema.methods.each do |schema_method|
if name == schema_method.name
send_codec = Qpid::StringCodec.new(@broker.conn.spec)
@@ -1077,9 +1093,9 @@ module Qpid::Qmf
@session.encode_value(send_codec, actual, formal.type)
end
+ ttl = time_wait ? time_wait * 1000 : nil
smsg = @broker.message(send_codec.encoded,
- "agent.#{object_id.broker_bank}.#{object_id.agent_bank}")
-
+ "agent.#{object_id.broker_bank}.#{object_id.agent_bank}", ttl=ttl)
@broker.sync_start if synchronous
@broker.emit(smsg)
@@ -1089,8 +1105,25 @@ module Qpid::Qmf
end
def invoke(method, name, args)
- if send_method_request(method, name, args, synchronous = true)
- unless @broker.wait_for_sync_done
+ kwargs = args[args.size - 1]
+ sync = true
+ timeout = nil
+
+ if kwargs.class == Hash
+ if kwargs.include?(:_timeout)
+ timeout = kwargs[:_timeout]
+ end
+
+ if kwargs.include?(:_async)
+ sync = !kwargs[:_async]
+ end
+ args.pop
+ end
+
+ seq = send_method_request(method, name, args, synchronous = sync)
+ if seq
+ return seq unless sync
+ unless @broker.wait_for_sync_done(timeout)
@session.seq_mgr.release(seq)
raise "Timed out waiting for method to respond"
end
@@ -1284,9 +1317,10 @@ module Qpid::Qmf
end
end
- def wait_for_sync_done
+ def wait_for_sync_done(timeout=nil)
+ wait_time = timeout ? timeout : SYNC_TIME
synchronize do
- return @cv.wait_for(SYNC_TIME) { ! @sync_in_flight || @error }
+ return @cv.wait_for(wait_time) { ! @sync_in_flight || @error }
end
end
@@ -1309,9 +1343,10 @@ module Qpid::Qmf
codec.write_uint32(seq)
end
- def message(body, routing_key="broker")
+ def message(body, routing_key="broker", ttl=nil)
dp = @amqp_session.delivery_properties
dp.routing_key = routing_key
+ dp.ttl = ttl if ttl
mp = @amqp_session.message_properties
mp.content_type = "x-application/qmf"
mp.reply_to = amqp_session.reply_to("amq.direct", @reply_name)
diff --git a/qpid/ruby/tests/qmf.rb b/qpid/ruby/tests/qmf.rb
index b0ee19c3d5..06371a5a82 100644
--- a/qpid/ruby/tests/qmf.rb
+++ b/qpid/ruby/tests/qmf.rb
@@ -21,31 +21,69 @@ require "test/unit"
require "qpid"
require "tests/util"
require "socket"
+require "monitor.rb"
class QmfTest < Test::Unit::TestCase
+ class Handler < Qpid::Qmf::Console
+ include MonitorMixin
+
+ def initialize
+ super()
+ @xmt_list = {}
+ @rcv_list = {}
+ end
+
+ def method_response(broker, seq, response)
+ synchronize do
+ @rcv_list[seq] = response
+ end
+ end
+
+ def request(broker, count)
+ @count = count
+ for idx in 0...count
+ synchronize do
+ seq = broker.echo(idx, "Echo Message", :_async => true)
+ @xmt_list[seq] = idx
+ end
+ end
+ end
+
+ def check
+ return "fail (attempted send=%d, actual sent=%d)" % [@count, @xmt_list.size] unless @count == @xmt_list.size
+ lost = 0
+ mismatched = 0
+ @xmt_list.each do |seq, value|
+ if @rcv_list.include?(seq)
+ result = @rcv_list.delete(seq)
+ mismatch += 1 unless result.sequence == value
+ else
+ lost += 1
+ end
+ end
+ spurious = @rcv_list.size
+ if lost == 0 and mismatched == 0 and spurious == 0
+ return "pass"
+ else
+ return "fail (lost=%d, mismatch=%d, spurious=%d)" % [lost, mismatched, spurious]
+ end
+ end
+ end
+
def setup()
# Make sure errors in threads lead to a noisy death of the test
Thread.abort_on_exception = true
- host = ENV.fetch("QMF_TEST_HOST", 'localhost')
- port = ENV.fetch("QMF_TEST_PORT", 5672)
+ @host = ENV.fetch("QMF_TEST_HOST", 'localhost')
+ @port = ENV.fetch("QMF_TEST_PORT", 5672)
- sock = TCPSocket.new(host, port)
+ sock = TCPSocket.new(@host, @port)
@conn = Qpid::Connection.new(sock)
@conn.start()
@session = @conn.session("test-session")
-
- # It's a bit odd that we're using two connections but that's the way
- # the python one works afaict.
- @qmf = Qpid::Qmf::Session.new()
- @qmf_broker = @qmf.add_broker("amqp://%s:%d" % [host, port])
-
- brokers = @qmf.objects(:class => "broker")
- assert_equal(1, brokers.length)
- @broker = brokers[0]
end
def teardown
@@ -58,10 +96,20 @@ class QmfTest < Test::Unit::TestCase
end
end
- def test_broker_connectivity()
+ def start_qmf(kwargs = {})
+ @qmf = Qpid::Qmf::Session.new(kwargs)
+ @qmf_broker = @qmf.add_broker("amqp://%s:%d" % [@host, @port])
+
+ brokers = @qmf.objects(:class => "broker")
+ assert_equal(1, brokers.length)
+ @broker = brokers[0]
+ end
+
+ def test_methods_sync()
+ start_qmf
body = "Echo Message Body"
for seq in 1..10
- res = @broker.echo(seq, body)
+ res = @broker.echo(seq, body, :_timeout => 10)
assert_equal(0, res.status)
assert_equal("OK", res.text)
assert_equal(seq, res.sequence)
@@ -69,6 +117,14 @@ class QmfTest < Test::Unit::TestCase
end
end
+ def test_methods_async()
+ handler = Handler.new
+ start_qmf(:console => handler)
+ handler.request(@broker, 20)
+ sleep(1)
+ assert_equal("pass", handler.check)
+ end
+
def test_move_queued_messages()
"""
Test ability to move messages from the head of one queue to another.
@@ -76,6 +132,7 @@ class QmfTest < Test::Unit::TestCase
"""
"Set up source queue"
+ start_qmf
@session.queue_declare(:queue => "src-queue", :exclusive => true, :auto_delete => true)
@session.exchange_bind(:queue => "src-queue", :exchange => "amq.direct", :binding_key => "routing_key")
@@ -151,6 +208,7 @@ class QmfTest < Test::Unit::TestCase
# Test ability to purge messages from the head of a queue. Need to test
# moveing all, 1 (top message) and N messages.
def test_purge_queue
+ start_qmf
# Set up purge queue"
@session.queue_declare(:queue => "purge-queue",
:exclusive => true,