summaryrefslogtreecommitdiff
path: root/qpid/ruby/tests/qmf.rb
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/ruby/tests/qmf.rb')
-rw-r--r--qpid/ruby/tests/qmf.rb248
1 files changed, 248 insertions, 0 deletions
diff --git a/qpid/ruby/tests/qmf.rb b/qpid/ruby/tests/qmf.rb
new file mode 100644
index 0000000000..274e38416e
--- /dev/null
+++ b/qpid/ruby/tests/qmf.rb
@@ -0,0 +1,248 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+require "test/unit"
+require "qpid"
+require "tests/util"
+require "socket"
+require "monitor.rb"
+
+class QmfTest < Test::Unit::TestCase
+
+ class Handler < Qpid::Qmf::Console
+ include MonitorMixin
+
+ def initialize
+ super()
+ @xmt_list = {}
+ @rcv_list = {}
+ end
+
+ def method_response(broker, seq, response)
+ synchronize do
+ @rcv_list[seq] = response
+ end
+ end
+
+ def request(broker, count)
+ @count = count
+ for idx in 0...count
+ synchronize do
+ seq = broker.echo(idx, "Echo Message", :async => true)
+ @xmt_list[seq] = idx
+ end
+ end
+ end
+
+ def check
+ return "fail (attempted send=%d, actual sent=%d)" % [@count, @xmt_list.size] unless @count == @xmt_list.size
+ lost = 0
+ mismatched = 0
+ @xmt_list.each do |seq, value|
+ if @rcv_list.include?(seq)
+ result = @rcv_list.delete(seq)
+ mismatch += 1 unless result.sequence == value
+ else
+ lost += 1
+ end
+ end
+ spurious = @rcv_list.size
+ if lost == 0 and mismatched == 0 and spurious == 0
+ return "pass"
+ else
+ return "fail (lost=%d, mismatch=%d, spurious=%d)" % [lost, mismatched, spurious]
+ end
+ end
+ end
+
+ def setup()
+ # Make sure errors in threads lead to a noisy death of the test
+ Thread.abort_on_exception = true
+
+ @host = ENV.fetch("QMF_TEST_HOST", 'localhost')
+ @port = ENV.fetch("QMF_TEST_PORT", 5672)
+
+ sock = TCPSocket.new(@host, @port)
+
+ @conn = Qpid::Connection.new(sock)
+ @conn.start()
+
+ @session = @conn.session("test-session")
+ end
+
+ def teardown
+ unless @session.error?
+ @session.close(10)
+ end
+ @conn.close(10)
+ if @qmf
+ @qmf.del_broker(@qmf_broker)
+ end
+ end
+
+ def start_qmf(kwargs = {})
+ @qmf = Qpid::Qmf::Session.new(kwargs)
+ @qmf_broker = @qmf.add_broker("amqp://%s:%d" % [@host, @port])
+
+ brokers = @qmf.objects(:class => "broker")
+ assert_equal(1, brokers.length)
+ @broker = brokers[0]
+ end
+
+ def test_methods_sync()
+ start_qmf
+ body = "Echo Message Body"
+ for seq in 1..10
+ res = @broker.echo(seq, body, :timeout => 10)
+ assert_equal(0, res.status)
+ assert_equal("OK", res.text)
+ assert_equal(seq, res.sequence)
+ assert_equal(body, res.body)
+ end
+ end
+
+ def test_methods_async()
+ handler = Handler.new
+ start_qmf(:console => handler)
+ handler.request(@broker, 20)
+ sleep(1)
+ assert_equal("pass", handler.check)
+ end
+
+ def test_move_queued_messages()
+ """
+ Test ability to move messages from the head of one queue to another.
+ Need to test moveing all and N messages.
+ """
+
+ "Set up source queue"
+ start_qmf
+ @session.queue_declare(:queue => "src-queue", :exclusive => true, :auto_delete => true)
+ @session.exchange_bind(:queue => "src-queue", :exchange => "amq.direct", :binding_key => "routing_key")
+
+ props = @session.delivery_properties(:routing_key => "routing_key")
+ for count in 1..20
+ body = "Move Message %d" % count
+ src_msg = Qpid::Message.new(props, body)
+ @session.message_transfer(:destination => "amq.direct", :message => src_msg)
+ end
+
+ "Set up destination queue"
+ @session.queue_declare(:queue => "dest-queue", :exclusive => true, :auto_delete => true)
+ @session.exchange_bind(:queue => "dest-queue", :exchange => "amq.direct")
+
+ queues = @qmf.objects(:class => "queue")
+
+ "Move 10 messages from src-queue to dest-queue"
+ result = @qmf.objects(:class => "broker")[0].queueMoveMessages("src-queue", "dest-queue", 10)
+ assert_equal(0, result.status)
+
+ sq = @qmf.objects(:class => "queue", "name" => "src-queue")[0]
+ dq = @qmf.objects(:class => "queue", "name" => "dest-queue")[0]
+
+ assert_equal(10, sq.msgDepth)
+ assert_equal(10, dq.msgDepth)
+
+ "Move all remaining messages to destination"
+ result = @qmf.objects(:class => "broker")[0].queueMoveMessages("src-queue", "dest-queue", 0)
+ assert_equal(0, result.status)
+
+ sq = @qmf.objects(:class => "queue", 'name' => "src-queue")[0]
+ dq = @qmf.objects(:class => "queue", 'name' => "dest-queue")[0]
+
+ assert_equal(0, sq.msgDepth)
+ assert_equal(20, dq.msgDepth)
+
+ "Use a bad source queue name"
+ result = @qmf.objects(:class => "broker")[0].queueMoveMessages("bad-src-queue", "dest-queue", 0)
+ assert_equal(4, result.status)
+
+ "Use a bad destination queue name"
+ result = @qmf.objects(:class => "broker")[0].queueMoveMessages("src-queue", "bad-dest-queue", 0)
+ assert_equal(4, result.status)
+
+ " Use a large qty (40) to move from dest-queue back to "
+ " src-queue- should move all "
+ result = @qmf.objects(:class => "broker")[0].queueMoveMessages("dest-queue", "src-queue", 40)
+ assert_equal(0, result.status)
+
+ sq = @qmf.objects(:class => "queue", 'name' => "src-queue")[0]
+ dq = @qmf.objects(:class => "queue", 'name' => "dest-queue")[0]
+
+ assert_equal(20, sq.msgDepth)
+ assert_equal(0, dq.msgDepth)
+
+ "Consume the messages of the queue and check they are all there in order"
+ @session.message_subscribe(:queue => "src-queue",
+ :destination => "tag")
+ @session.message_flow(:destination => "tag",
+ :unit => @session.message_credit_unit.message,
+ :value => 0xFFFFFFFF)
+ @session.message_flow(:destination => "tag",
+ :unit => @session.message_credit_unit.byte,
+ :value => 0xFFFFFFFF)
+ queue = @session.incoming("tag")
+ for count in 1..20
+ consumed_msg = queue.get(timeout=1)
+ body = "Move Message %d" % count
+ assert_equal(body, consumed_msg.body)
+ end
+ end
+
+ # Test ability to purge messages from the head of a queue. Need to test
+ # moveing all, 1 (top message) and N messages.
+ def test_purge_queue
+ start_qmf
+ # Set up purge queue"
+ @session.queue_declare(:queue => "purge-queue",
+ :exclusive => true,
+ :auto_delete => true)
+ @session.exchange_bind(:queue => "purge-queue",
+ :exchange => "amq.direct",
+ :binding_key => "routing_key")
+
+ props = @session.delivery_properties(:routing_key => "routing_key")
+ 20.times do |count|
+ body = "Purge Message %d" % count
+ msg = Qpid::Message.new(props, body)
+ @session.message_transfer(:destination => "amq.direct",
+ :message => msg)
+ end
+
+ pq = @qmf.objects(:class => "queue", 'name' => "purge-queue")[0]
+
+ "Purge top message from purge-queue"
+ result = pq.purge(1)
+ assert_equal(0, result.status)
+ pq = @qmf.objects(:class => "queue", 'name' => "purge-queue")[0]
+ assert_equal(19, pq.msgDepth)
+
+ "Purge top 9 messages from purge-queue"
+ result = pq.purge(9)
+ assert_equal(0, result.status)
+ pq = @qmf.objects(:class => "queue", 'name' => "purge-queue")[0]
+ assert_equal(10, pq.msgDepth)
+
+ "Purge all messages from purge-queue"
+ result = pq.purge(0)
+ assert_equal(0, result.status)
+ pq = @qmf.objects(:class => "queue", 'name' => "purge-queue")[0]
+ assert_equal(0, pq.msgDepth)
+ end
+end