diff options
Diffstat (limited to 'qpid/ruby/tests/qmf.rb')
-rw-r--r-- | qpid/ruby/tests/qmf.rb | 248 |
1 files changed, 248 insertions, 0 deletions
diff --git a/qpid/ruby/tests/qmf.rb b/qpid/ruby/tests/qmf.rb new file mode 100644 index 0000000000..274e38416e --- /dev/null +++ b/qpid/ruby/tests/qmf.rb @@ -0,0 +1,248 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require "test/unit" +require "qpid" +require "tests/util" +require "socket" +require "monitor.rb" + +class QmfTest < Test::Unit::TestCase + + class Handler < Qpid::Qmf::Console + include MonitorMixin + + def initialize + super() + @xmt_list = {} + @rcv_list = {} + end + + def method_response(broker, seq, response) + synchronize do + @rcv_list[seq] = response + end + end + + def request(broker, count) + @count = count + for idx in 0...count + synchronize do + seq = broker.echo(idx, "Echo Message", :async => true) + @xmt_list[seq] = idx + end + end + end + + def check + return "fail (attempted send=%d, actual sent=%d)" % [@count, @xmt_list.size] unless @count == @xmt_list.size + lost = 0 + mismatched = 0 + @xmt_list.each do |seq, value| + if @rcv_list.include?(seq) + result = @rcv_list.delete(seq) + mismatch += 1 unless result.sequence == value + else + lost += 1 + end + end + spurious = @rcv_list.size + if lost == 0 and mismatched == 0 and spurious == 0 + return "pass" + else + return "fail (lost=%d, mismatch=%d, spurious=%d)" % [lost, mismatched, spurious] + end + end + end + + def setup() + # Make sure errors in threads lead to a noisy death of the test + Thread.abort_on_exception = true + + @host = ENV.fetch("QMF_TEST_HOST", 'localhost') + @port = ENV.fetch("QMF_TEST_PORT", 5672) + + sock = TCPSocket.new(@host, @port) + + @conn = Qpid::Connection.new(sock) + @conn.start() + + @session = @conn.session("test-session") + end + + def teardown + unless @session.error? + @session.close(10) + end + @conn.close(10) + if @qmf + @qmf.del_broker(@qmf_broker) + end + end + + def start_qmf(kwargs = {}) + @qmf = Qpid::Qmf::Session.new(kwargs) + @qmf_broker = @qmf.add_broker("amqp://%s:%d" % [@host, @port]) + + brokers = @qmf.objects(:class => "broker") + assert_equal(1, brokers.length) + @broker = brokers[0] + end + + def test_methods_sync() + start_qmf + body = "Echo Message Body" + for seq in 1..10 + res = @broker.echo(seq, body, :timeout => 10) + assert_equal(0, res.status) + assert_equal("OK", res.text) + assert_equal(seq, res.sequence) + assert_equal(body, res.body) + end + end + + def test_methods_async() + handler = Handler.new + start_qmf(:console => handler) + handler.request(@broker, 20) + sleep(1) + assert_equal("pass", handler.check) + end + + def test_move_queued_messages() + """ + Test ability to move messages from the head of one queue to another. + Need to test moveing all and N messages. + """ + + "Set up source queue" + start_qmf + @session.queue_declare(:queue => "src-queue", :exclusive => true, :auto_delete => true) + @session.exchange_bind(:queue => "src-queue", :exchange => "amq.direct", :binding_key => "routing_key") + + props = @session.delivery_properties(:routing_key => "routing_key") + for count in 1..20 + body = "Move Message %d" % count + src_msg = Qpid::Message.new(props, body) + @session.message_transfer(:destination => "amq.direct", :message => src_msg) + end + + "Set up destination queue" + @session.queue_declare(:queue => "dest-queue", :exclusive => true, :auto_delete => true) + @session.exchange_bind(:queue => "dest-queue", :exchange => "amq.direct") + + queues = @qmf.objects(:class => "queue") + + "Move 10 messages from src-queue to dest-queue" + result = @qmf.objects(:class => "broker")[0].queueMoveMessages("src-queue", "dest-queue", 10) + assert_equal(0, result.status) + + sq = @qmf.objects(:class => "queue", "name" => "src-queue")[0] + dq = @qmf.objects(:class => "queue", "name" => "dest-queue")[0] + + assert_equal(10, sq.msgDepth) + assert_equal(10, dq.msgDepth) + + "Move all remaining messages to destination" + result = @qmf.objects(:class => "broker")[0].queueMoveMessages("src-queue", "dest-queue", 0) + assert_equal(0, result.status) + + sq = @qmf.objects(:class => "queue", 'name' => "src-queue")[0] + dq = @qmf.objects(:class => "queue", 'name' => "dest-queue")[0] + + assert_equal(0, sq.msgDepth) + assert_equal(20, dq.msgDepth) + + "Use a bad source queue name" + result = @qmf.objects(:class => "broker")[0].queueMoveMessages("bad-src-queue", "dest-queue", 0) + assert_equal(4, result.status) + + "Use a bad destination queue name" + result = @qmf.objects(:class => "broker")[0].queueMoveMessages("src-queue", "bad-dest-queue", 0) + assert_equal(4, result.status) + + " Use a large qty (40) to move from dest-queue back to " + " src-queue- should move all " + result = @qmf.objects(:class => "broker")[0].queueMoveMessages("dest-queue", "src-queue", 40) + assert_equal(0, result.status) + + sq = @qmf.objects(:class => "queue", 'name' => "src-queue")[0] + dq = @qmf.objects(:class => "queue", 'name' => "dest-queue")[0] + + assert_equal(20, sq.msgDepth) + assert_equal(0, dq.msgDepth) + + "Consume the messages of the queue and check they are all there in order" + @session.message_subscribe(:queue => "src-queue", + :destination => "tag") + @session.message_flow(:destination => "tag", + :unit => @session.message_credit_unit.message, + :value => 0xFFFFFFFF) + @session.message_flow(:destination => "tag", + :unit => @session.message_credit_unit.byte, + :value => 0xFFFFFFFF) + queue = @session.incoming("tag") + for count in 1..20 + consumed_msg = queue.get(timeout=1) + body = "Move Message %d" % count + assert_equal(body, consumed_msg.body) + end + end + + # Test ability to purge messages from the head of a queue. Need to test + # moveing all, 1 (top message) and N messages. + def test_purge_queue + start_qmf + # Set up purge queue" + @session.queue_declare(:queue => "purge-queue", + :exclusive => true, + :auto_delete => true) + @session.exchange_bind(:queue => "purge-queue", + :exchange => "amq.direct", + :binding_key => "routing_key") + + props = @session.delivery_properties(:routing_key => "routing_key") + 20.times do |count| + body = "Purge Message %d" % count + msg = Qpid::Message.new(props, body) + @session.message_transfer(:destination => "amq.direct", + :message => msg) + end + + pq = @qmf.objects(:class => "queue", 'name' => "purge-queue")[0] + + "Purge top message from purge-queue" + result = pq.purge(1) + assert_equal(0, result.status) + pq = @qmf.objects(:class => "queue", 'name' => "purge-queue")[0] + assert_equal(19, pq.msgDepth) + + "Purge top 9 messages from purge-queue" + result = pq.purge(9) + assert_equal(0, result.status) + pq = @qmf.objects(:class => "queue", 'name' => "purge-queue")[0] + assert_equal(10, pq.msgDepth) + + "Purge all messages from purge-queue" + result = pq.purge(0) + assert_equal(0, result.status) + pq = @qmf.objects(:class => "queue", 'name' => "purge-queue")[0] + assert_equal(0, pq.msgDepth) + end +end |