diff options
Diffstat (limited to 'cpp/src/tests')
-rw-r--r-- | cpp/src/tests/BrokerChannelTest.cpp | 97 | ||||
-rw-r--r-- | cpp/src/tests/Cluster.cpp | 8 | ||||
-rw-r--r-- | cpp/src/tests/Cluster_child.cpp | 2 | ||||
-rw-r--r-- | cpp/src/tests/ExchangeTest.cpp | 3 | ||||
-rw-r--r-- | cpp/src/tests/FramingTest.cpp | 5 | ||||
-rw-r--r-- | cpp/src/tests/HeaderTest.cpp | 27 | ||||
-rw-r--r-- | cpp/src/tests/InMemoryContentTest.cpp | 91 | ||||
-rw-r--r-- | cpp/src/tests/LazyLoadedContentTest.cpp | 113 | ||||
-rw-r--r-- | cpp/src/tests/Makefile.am | 4 | ||||
-rw-r--r-- | cpp/src/tests/MessageBuilderTest.cpp | 284 | ||||
-rw-r--r-- | cpp/src/tests/MessageTest.cpp | 65 | ||||
-rw-r--r-- | cpp/src/tests/MessageUtils.h | 53 | ||||
-rw-r--r-- | cpp/src/tests/QueueTest.cpp | 9 | ||||
-rw-r--r-- | cpp/src/tests/ReferenceTest.cpp | 94 | ||||
-rw-r--r-- | cpp/src/tests/TxAckTest.cpp | 12 | ||||
-rw-r--r-- | cpp/src/tests/TxPublishTest.cpp | 7 |
16 files changed, 302 insertions, 572 deletions
diff --git a/cpp/src/tests/BrokerChannelTest.cpp b/cpp/src/tests/BrokerChannelTest.cpp index 3253a3d27a..1e5a30f157 100644 --- a/cpp/src/tests/BrokerChannelTest.cpp +++ b/cpp/src/tests/BrokerChannelTest.cpp @@ -19,12 +19,14 @@ * */ #include "qpid/broker/BrokerChannel.h" -#include "qpid/broker/BrokerMessage.h" #include "qpid/broker/BrokerQueue.h" #include "qpid/broker/FanOutExchange.h" +#include "qpid/broker/Message.h" +#include "qpid/broker/MessageDelivery.h" #include "qpid/broker/NullMessageStore.h" #include "qpid_test_plugin.h" #include <iostream> +#include <sstream> #include <memory> #include "qpid/framing/AMQP_HighestVersion.h" #include "qpid/framing/AMQFrame.h" @@ -72,7 +74,6 @@ class BrokerChannelTest : public CppUnit::TestCase CPPUNIT_TEST_SUITE(BrokerChannelTest); CPPUNIT_TEST(testConsumerMgmt);; CPPUNIT_TEST(testDeliveryNoAck); - CPPUNIT_TEST(testStaging); CPPUNIT_TEST(testQueuePolicy); CPPUNIT_TEST(testFlow); CPPUNIT_TEST(testAsyncMesgToMoreThanOneQueue); @@ -155,7 +156,16 @@ class BrokerChannelTest : public CppUnit::TestCase void check() { - CPPUNIT_ASSERT(expected.empty()); + if (!expected.empty()) { + std::stringstream error; + error << "Expected: "; + while (!expected.empty()) { + MethodCall& m = expected.front(); + error << m.name << "(" << m.msg << ", '" << m.data << "'); "; + expected.pop(); + } + CPPUNIT_FAIL(error.str()); + } } }; @@ -173,7 +183,7 @@ class BrokerChannelTest : public CppUnit::TestCase void testConsumerMgmt(){ Queue::shared_ptr queue(new Queue("my_queue")); - Channel channel(connection, recorder, 0, 0); + Channel channel(connection, recorder, 0); channel.open(); CPPUNIT_ASSERT(!channel.exists("my_consumer")); @@ -203,7 +213,7 @@ class BrokerChannelTest : public CppUnit::TestCase Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14)); Queue::shared_ptr queue(new Queue("my_queue")); string tag("test"); - DeliveryToken::shared_ptr token(BasicMessage::createConsumeToken("my-token")); + DeliveryToken::shared_ptr token(MessageDelivery::getBasicConsumeToken("my-token")); channel.consume(token, tag, queue, false, false, 0); queue->deliver(msg); sleep(2); @@ -213,48 +223,6 @@ class BrokerChannelTest : public CppUnit::TestCase CPPUNIT_ASSERT_EQUAL(token, recorder.delivered.front().second); } - void testStaging(){ - MockMessageStore store; - connection.setFrameMax(1000); - connection.setStagingThreshold(10); - Channel channel(connection, recorder, 1, &store); - const string data[] = {"abcde", "fghij", "klmno"}; - - Message* msg = new BasicMessage(0, "my_exchange", "my_routing_key", false, false); - - store.expect(); - store.stage(*msg); - for (int i = 0; i < 3; i++) { - store.appendContent(*msg, data[i]); - } - store.destroy(*msg); - store.test(); - - Exchange::shared_ptr exchange = - broker->getExchanges().declare("my_exchange", "fanout").first; - Queue::shared_ptr queue(new Queue("my_queue")); - exchange->bind(queue, "", 0); - - AMQHeaderBody header(BASIC); - uint64_t contentSize(0); - for (int i = 0; i < 3; i++) { - contentSize += data[i].size(); - } - header.setContentSize(contentSize); - channel.handlePublish(msg); - channel.handleHeader(&header); - - for (int i = 0; i < 3; i++) { - AMQContentBody body(data[i]); - channel.handleContent(&body); - } - Message::shared_ptr msg2 = queue->dequeue(); - CPPUNIT_ASSERT_EQUAL(msg, msg2.get()); - msg2.reset();//should trigger destroy call - - store.check(); - } - //NOTE: strictly speaking this should/could be part of QueueTest, //but as it can usefully use the same utility classes as this @@ -279,7 +247,6 @@ class BrokerChannelTest : public CppUnit::TestCase store.expect(); store.stage(*msg3); - store.destroy(*msg3); store.test(); Queue::shared_ptr queue(new Queue("my_queue", false, &store, 0)); @@ -348,16 +315,17 @@ class BrokerChannelTest : public CppUnit::TestCase CPPUNIT_ASSERT_EQUAL(ChannelId(0), handler.frames[0].getChannel()); CPPUNIT_ASSERT(dynamic_cast<ConnectionStartBody*>(handler.frames[0].getBody())); - const string data("abcdefghijklmn"); - - Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14)); - addContent(msg, data); Queue::shared_ptr queue(new Queue("my_queue")); string tag("test"); - DeliveryToken::shared_ptr token(BasicMessage::createConsumeToken("my-token")); + DeliveryToken::shared_ptr token(MessageDelivery::getBasicConsumeToken("my-token")); channel.consume(token, tag, queue, false, false, 0); channel.flow(false); + + //'publish' a message + Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14)); + addContent(msg, "abcdefghijklmn"); queue->deliver(msg); + //ensure no messages have been delivered CPPUNIT_ASSERT_EQUAL((size_t) 0, recorder.delivered.size()); @@ -369,21 +337,26 @@ class BrokerChannelTest : public CppUnit::TestCase CPPUNIT_ASSERT_EQUAL(token, recorder.delivered.front().second); } - Message* createMessage(const string& exchange, const string& routingKey, const string& messageId, uint64_t contentSize) + Message::shared_ptr createMessage(const string& exchange, const string& routingKey, const string& messageId, uint64_t contentSize) { - BasicMessage* msg = new BasicMessage( - 0, exchange, routingKey, false, false); - AMQHeaderBody header(BASIC); - header.setContentSize(contentSize); - msg->setHeader(&header); - msg->getHeaderProperties()->setMessageId(messageId); + Message::shared_ptr msg(new Message()); + + AMQFrame method(0, MessageTransferBody(ProtocolVersion(), 0, exchange, 0, 0)); + AMQFrame header(0, AMQHeaderBody()); + + msg->getFrames().append(method); + msg->getFrames().append(header); + MessageProperties* props = msg->getFrames().getHeaders()->get<MessageProperties>(true); + props->setContentLength(contentSize); + props->setMessageId(messageId); + msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey); return msg; } void addContent(Message::shared_ptr msg, const string& data) { - AMQContentBody body(data); - msg->addContent(&body); + AMQFrame content(0, AMQContentBody(data)); + msg->getFrames().append(content); } }; diff --git a/cpp/src/tests/Cluster.cpp b/cpp/src/tests/Cluster.cpp index a9caa89321..b3a6a745b8 100644 --- a/cpp/src/tests/Cluster.cpp +++ b/cpp/src/tests/Cluster.cpp @@ -34,7 +34,7 @@ static const ProtocolVersion VER; /** Verify membership in a cluster with one member. */ BOOST_AUTO_TEST_CASE(testClusterOne) { TestCluster cluster("clusterOne", "amqp:one:1"); - AMQFrame send(VER, 1, SessionOpenBody(VER)); + AMQFrame send(1, SessionOpenBody(VER)); cluster.handle(send); AMQFrame received; BOOST_REQUIRE(cluster.received.waitPop(received)); @@ -60,7 +60,7 @@ BOOST_AUTO_TEST_CASE(testClusterTwo) { BOOST_REQUIRE(cluster.waitFor(2)); // Myself and child. // Exchange frames with child. - AMQFrame send(VER, 1, SessionOpenBody(VER)); + AMQFrame send(1, SessionOpenBody(VER)); cluster.handle(send); AMQFrame received; BOOST_REQUIRE(cluster.received.waitPop(received)); @@ -91,8 +91,8 @@ struct CountHandler : public FrameHandler { /** Test the ClassifierHandler */ BOOST_AUTO_TEST_CASE(testClassifierHandlerWiring) { - AMQFrame queueDecl(VER, 0, QueueDeclareBody(VER)); - AMQFrame messageTrans(VER, 0, MessageTransferBody(VER)); + AMQFrame queueDecl(0, QueueDeclareBody(VER)); + AMQFrame messageTrans(0, MessageTransferBody(VER)); shared_ptr<CountHandler> wiring(new CountHandler()); shared_ptr<CountHandler> other(new CountHandler()); diff --git a/cpp/src/tests/Cluster_child.cpp b/cpp/src/tests/Cluster_child.cpp index bd76e58127..c03d7396f0 100644 --- a/cpp/src/tests/Cluster_child.cpp +++ b/cpp/src/tests/Cluster_child.cpp @@ -40,7 +40,7 @@ void clusterTwo() { BOOST_CHECK_TYPEID_EQUAL(SessionOpenBody, *frame.getBody()); BOOST_CHECK_EQUAL(2u, cluster.size()); // Me and parent - AMQFrame send(VER, 1, SessionAttachedBody(VER)); + AMQFrame send(1, SessionAttachedBody(VER)); cluster.handle(send); BOOST_REQUIRE(cluster.received.waitPop(frame)); BOOST_CHECK_TYPEID_EQUAL(SessionAttachedBody, *frame.getBody()); diff --git a/cpp/src/tests/ExchangeTest.cpp b/cpp/src/tests/ExchangeTest.cpp index ef2646519d..59941864e2 100644 --- a/cpp/src/tests/ExchangeTest.cpp +++ b/cpp/src/tests/ExchangeTest.cpp @@ -31,6 +31,7 @@ #include "qpid_test_plugin.h" #include <iostream> #include "qpid/framing/BasicGetBody.h" +#include "MessageUtils.h" using namespace qpid::broker; using namespace qpid::framing; @@ -63,7 +64,7 @@ class ExchangeTest : public CppUnit::TestCase queue.reset(); queue2.reset(); - Message::shared_ptr msgPtr(new BasicMessage(0, "e", "A", true, true)); + Message::shared_ptr msgPtr(MessageUtils::createMessage("exchange", "key", "id")); DeliverableMessage msg(msgPtr); topic.route(msg, "abc", 0); direct.route(msg, "abc", 0); diff --git a/cpp/src/tests/FramingTest.cpp b/cpp/src/tests/FramingTest.cpp index a0dd8d37f6..1b843defc1 100644 --- a/cpp/src/tests/FramingTest.cpp +++ b/cpp/src/tests/FramingTest.cpp @@ -137,8 +137,7 @@ class FramingTest : public CppUnit::TestCase { std::string a = "hostA"; std::string b = "hostB"; - AMQFrame in(version, 999, - ConnectionRedirectBody(version, a, b)); + AMQFrame in(999, ConnectionRedirectBody(version, a, b)); in.encode(buffer); buffer.flip(); AMQFrame out; @@ -149,7 +148,7 @@ class FramingTest : public CppUnit::TestCase void testBasicConsumeOkBodyFrame() { std::string s = "hostA"; - AMQFrame in(version, 999, BasicConsumeOkBody(version, s)); + AMQFrame in(999, BasicConsumeOkBody(version, s)); in.encode(buffer); buffer.flip(); AMQFrame out; diff --git a/cpp/src/tests/HeaderTest.cpp b/cpp/src/tests/HeaderTest.cpp index 17381cc868..df2230342c 100644 --- a/cpp/src/tests/HeaderTest.cpp +++ b/cpp/src/tests/HeaderTest.cpp @@ -36,8 +36,8 @@ public: void testGenericProperties() { - AMQHeaderBody body(BASIC); - dynamic_cast<BasicHeaderProperties*>(body.getProperties())->getHeaders().setString("A", "BCDE"); + AMQHeaderBody body; + body.get<BasicHeaderProperties>(true)->getHeaders().setString("A", "BCDE"); Buffer buffer(100); body.encode(buffer); @@ -45,7 +45,7 @@ public: AMQHeaderBody body2; body2.decode(buffer, body.size()); BasicHeaderProperties* props = - dynamic_cast<BasicHeaderProperties*>(body2.getProperties()); + body2.get<BasicHeaderProperties>(true); CPPUNIT_ASSERT_EQUAL(std::string("BCDE"), props->getHeaders().getString("A")); } @@ -64,10 +64,11 @@ public: string userId("guest"); string appId("just testing"); string clusterId("no clustering required"); + uint64_t contentLength(54321); - AMQHeaderBody body(BASIC); + AMQFrame out(0, AMQHeaderBody()); BasicHeaderProperties* properties = - dynamic_cast<BasicHeaderProperties*>(body.getProperties()); + out.castBody<AMQHeaderBody>()->get<BasicHeaderProperties>(true); properties->setContentType(contentType); properties->getHeaders().setString("A", "BCDE"); properties->setDeliveryMode(deliveryMode); @@ -81,13 +82,14 @@ public: properties->setUserId(userId); properties->setAppId(appId); properties->setClusterId(clusterId); + properties->setContentLength(contentLength); Buffer buffer(10000); - body.encode(buffer); + out.encode(buffer); buffer.flip(); - AMQHeaderBody temp; - temp.decode(buffer, body.size()); - properties = dynamic_cast<BasicHeaderProperties*>(temp.getProperties()); + AMQFrame in; + in.decode(buffer); + properties = in.castBody<AMQHeaderBody>()->get<BasicHeaderProperties>(true); CPPUNIT_ASSERT_EQUAL(contentType, properties->getContentType()); CPPUNIT_ASSERT_EQUAL(std::string("BCDE"), properties->getHeaders().getString("A")); @@ -102,6 +104,7 @@ public: CPPUNIT_ASSERT_EQUAL(userId, properties->getUserId()); CPPUNIT_ASSERT_EQUAL(appId, properties->getAppId()); CPPUNIT_ASSERT_EQUAL(clusterId, properties->getClusterId()); + CPPUNIT_ASSERT_EQUAL(contentLength, properties->getContentLength()); } void testSomeSpecificProperties(){ @@ -111,9 +114,9 @@ public: string expiration("Z"); uint64_t timestamp(0xabe4a34a); - AMQHeaderBody body(BASIC); + AMQHeaderBody body; BasicHeaderProperties* properties = - dynamic_cast<BasicHeaderProperties*>(body.getProperties()); + body.get<BasicHeaderProperties>(true); properties->setContentType(contentType); properties->setDeliveryMode(deliveryMode); properties->setPriority(priority); @@ -125,7 +128,7 @@ public: buffer.flip(); AMQHeaderBody temp; temp.decode(buffer, body.size()); - properties = dynamic_cast<BasicHeaderProperties*>(temp.getProperties()); + properties = temp.get<BasicHeaderProperties>(true); CPPUNIT_ASSERT_EQUAL(contentType, properties->getContentType()); CPPUNIT_ASSERT_EQUAL((int) deliveryMode, (int) properties->getDeliveryMode()); diff --git a/cpp/src/tests/InMemoryContentTest.cpp b/cpp/src/tests/InMemoryContentTest.cpp deleted file mode 100644 index bc95548d45..0000000000 --- a/cpp/src/tests/InMemoryContentTest.cpp +++ /dev/null @@ -1,91 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/broker/InMemoryContent.h" -#include "qpid_test_plugin.h" -#include "qpid/framing/AMQP_HighestVersion.h" -#include <iostream> -#include <list> -#include "qpid/framing/AMQFrame.h" -#include "MockChannel.h" - -using std::list; -using std::string; -using boost::dynamic_pointer_cast; -using namespace qpid::broker; -using namespace qpid::framing; - - -class InMemoryContentTest : public CppUnit::TestCase -{ - CPPUNIT_TEST_SUITE(InMemoryContentTest); - CPPUNIT_TEST(testRefragmentation); - CPPUNIT_TEST_SUITE_END(); - -public: - void testRefragmentation() - { - {//no remainder - string out[] = {"abcde", "fghij", "klmno", "pqrst"}; - string in[] = {out[0] + out[1], out[2] + out[3]}; - refragment(2, in, 4, out); - } - {//remainder for last frame - string out[] = {"abcde", "fghij", "klmno", "pqrst", "uvw"}; - string in[] = {out[0] + out[1], out[2] + out[3] + out[4]}; - refragment(2, in, 5, out); - } - } - - - void refragment(size_t inCount, string* in, size_t outCount, string* out, uint32_t framesize = 5) - { - InMemoryContent content; - MockChannel channel(3); - - addframes(content, inCount, in); - content.send(channel, framesize); - CPPUNIT_ASSERT_EQUAL(outCount, channel.out.frames.size()); - - for (unsigned int i = 0; i < outCount; i++) { - AMQContentBody* chunk = dynamic_cast<AMQContentBody*>( - channel.out.frames[i].getBody()); - CPPUNIT_ASSERT(chunk); - CPPUNIT_ASSERT_EQUAL(out[i], chunk->getData()); - CPPUNIT_ASSERT_EQUAL( - ChannelId(3), channel.out.frames[i].getChannel()); - } - } - - void addframes(InMemoryContent& content, size_t frameCount, string* frameData) - { - for (unsigned int i = 0; i < frameCount; i++) { - AMQContentBody frame(frameData[i]); - content.add(&frame); - } - } - - -}; - -// Make this test suite a plugin. -CPPUNIT_PLUGIN_IMPLEMENT(); -CPPUNIT_TEST_SUITE_REGISTRATION(InMemoryContentTest); - diff --git a/cpp/src/tests/LazyLoadedContentTest.cpp b/cpp/src/tests/LazyLoadedContentTest.cpp deleted file mode 100644 index df46f6b48e..0000000000 --- a/cpp/src/tests/LazyLoadedContentTest.cpp +++ /dev/null @@ -1,113 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/broker/LazyLoadedContent.h" -#include "qpid/framing/AMQP_HighestVersion.h" -#include "qpid/broker/NullMessageStore.h" -#include "qpid_test_plugin.h" -#include <iostream> -#include <list> -#include <sstream> -#include "qpid/framing/AMQFrame.h" -#include "MockChannel.h" -using std::list; -using std::string; -using boost::dynamic_pointer_cast; -using namespace qpid::broker; -using namespace qpid::framing; - - - -class LazyLoadedContentTest : public CppUnit::TestCase -{ - CPPUNIT_TEST_SUITE(LazyLoadedContentTest); - CPPUNIT_TEST(testFragmented); - CPPUNIT_TEST(testWhole); - CPPUNIT_TEST(testHalved); - CPPUNIT_TEST_SUITE_END(); - - class TestMessageStore : public NullMessageStore - { - const string content; - - public: - TestMessageStore(const string& _content) : content(_content) {} - - void loadContent(PersistableMessage&, string& data, uint64_t offset, uint32_t length) - { - if (offset + length <= content.size()) { - data = content.substr(offset, length); - } else{ - std::stringstream error; - error << "Invalid segment: offset=" << offset << ", length=" << length << ", content_length=" << content.size(); - throw qpid::Exception(error.str()); - } - } - }; - - -public: - void testFragmented() - { - string data = "abcdefghijklmnopqrstuvwxyz"; - uint32_t framesize = 5; - string out[] = {"abcde", "fghij", "klmno", "pqrst", "uvwxy", "z"}; - load(data, 6, out, framesize); - } - - void testWhole() - { - string data = "abcdefghijklmnopqrstuvwxyz"; - uint32_t framesize = 50; - string out[] = {data}; - load(data, 1, out, framesize); - } - - void testHalved() - { - string data = "abcdefghijklmnopqrstuvwxyz"; - uint32_t framesize = 13; - string out[] = {"abcdefghijklm", "nopqrstuvwxyz"}; - load(data, 2, out, framesize); - } - - void load(string& in, size_t outCount, string* out, uint32_t framesize) - { - TestMessageStore store(in); - LazyLoadedContent content(&store, 0, in.size()); - MockChannel channel(3); - content.send(channel, framesize); - CPPUNIT_ASSERT_EQUAL(outCount, channel.out.frames.size()); - - for (unsigned int i = 0; i < outCount; i++) { - AMQContentBody* chunk(dynamic_cast<AMQContentBody*>( - channel.out.frames[i].getBody())); - CPPUNIT_ASSERT(chunk); - CPPUNIT_ASSERT_EQUAL(out[i], chunk->getData()); - CPPUNIT_ASSERT_EQUAL( - ChannelId(3), channel.out.frames[i].getChannel()); - } - } -}; - -// Make this test suite a plugin. -CPPUNIT_PLUGIN_IMPLEMENT(); -CPPUNIT_TEST_SUITE_REGISTRATION(LazyLoadedContentTest); - diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index 34e7e973ac..7ff6a843a9 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -82,11 +82,8 @@ broker_unit_tests = \ DtxWorkRecordTest \ ExchangeTest \ HeadersExchangeTest \ - InMemoryContentTest \ - LazyLoadedContentTest \ MessageBuilderTest \ MessageTest \ - ReferenceTest \ QueueRegistryTest \ QueueTest \ QueuePolicyTest \ @@ -142,6 +139,7 @@ EXTRA_DIST += \ .valgrind.supp-default \ .valgrindrc-default \ InProcessBroker.h \ + MessageUtils.h \ MockChannel.h \ MockConnectionInputHandler.h \ TxMocks.h \ diff --git a/cpp/src/tests/MessageBuilderTest.cpp b/cpp/src/tests/MessageBuilderTest.cpp index a12fc603ce..341fdf56f5 100644 --- a/cpp/src/tests/MessageBuilderTest.cpp +++ b/cpp/src/tests/MessageBuilderTest.cpp @@ -18,15 +18,13 @@ * under the License. * */ -#include "qpid/Exception.h" -#include "qpid/broker/BrokerMessage.h" +#include "qpid/broker/Message.h" #include "qpid/broker/MessageBuilder.h" #include "qpid/broker/NullMessageStore.h" -#include "qpid/framing/Buffer.h" +#include "qpid/framing/frame_functors.h" +#include "qpid/framing/TypeFilter.h" #include "qpid_test_plugin.h" -#include <iostream> -#include <memory> -#include "MockChannel.h" +#include <list> using namespace boost; using namespace qpid::broker; @@ -35,72 +33,55 @@ using namespace qpid::sys; class MessageBuilderTest : public CppUnit::TestCase { - struct MockHandler : CompletionHandler { - Message::shared_ptr msg; + class MockMessageStore : public NullMessageStore + { + enum Op {STAGE=1, APPEND=2}; - virtual void complete(Message::shared_ptr _msg){ - msg = _msg; + uint64_t id; + PersistableMessage* expectedMsg; + string expectedData; + std::list<Op> ops; + + void checkExpectation(Op actual) + { + CPPUNIT_ASSERT_EQUAL(ops.front(), actual); + ops.pop_front(); } - }; - class TestMessageStore : public NullMessageStore - { - Buffer* header; - Buffer* content; - const uint32_t contentBufferSize; - - public: + public: + MockMessageStore() : id(0), expectedMsg(0) {} - void stage(PersistableMessage& msg) - { - if (msg.getPersistenceId() == 0) { - header = new Buffer(msg.encodedSize()); - msg.encode(*header); - content = new Buffer(contentBufferSize); - msg.setPersistenceId(1); - } else { - throw qpid::Exception("Message already staged!"); - } + void expectStage(PersistableMessage& msg) + { + expectedMsg = &msg; + ops.push_back(STAGE); } - void appendContent(PersistableMessage& msg, const string& data) - { - if (msg.getPersistenceId() == 1) { - content->putRawData(data); - } else { - throw qpid::Exception("Invalid message id!"); - } + void expectAppendContent(PersistableMessage& msg, const string& data) + { + expectedMsg = &msg; + expectedData = data; + ops.push_back(APPEND); } - using NullMessageStore::destroy; + void stage(PersistableMessage& msg) + { + checkExpectation(STAGE); + CPPUNIT_ASSERT_EQUAL(expectedMsg, &msg); + msg.setPersistenceId(++id); + } - void destroy(PersistableMessage& msg) + void appendContent(PersistableMessage& msg, const string& data) { - CPPUNIT_ASSERT(msg.getPersistenceId()); + checkExpectation(APPEND); + CPPUNIT_ASSERT_EQUAL(expectedMsg, &msg); + CPPUNIT_ASSERT_EQUAL(expectedData, data); } - BasicMessage::shared_ptr getRestoredMessage() + bool expectationsMet() { - BasicMessage::shared_ptr msg(new BasicMessage()); - if (header) { - header->flip(); - msg->decodeHeader(*header); - delete header; - header = 0; - if (content) { - content->flip(); - msg->decodeContent(*content); - delete content; - content = 0; - } - } - return msg; + return ops.empty(); } - - //dont care about any of the other methods: - TestMessageStore(uint32_t _contentBufferSize) : NullMessageStore(), header(0), content(0), - contentBufferSize(_contentBufferSize) {} - ~TestMessageStore(){} }; CPPUNIT_TEST_SUITE(MessageBuilderTest); @@ -113,106 +94,115 @@ class MessageBuilderTest : public CppUnit::TestCase public: void testHeaderOnly(){ - MockHandler handler; - MessageBuilder builder(&handler); - - Message::shared_ptr message( - new BasicMessage( - 0, "test", "my_routing_key", false, false)); - AMQHeaderBody header(BASIC); - header.setContentSize(0); - - builder.initialise(message); - CPPUNIT_ASSERT(!handler.msg); - builder.setHeader(&header); - CPPUNIT_ASSERT(handler.msg); - CPPUNIT_ASSERT_EQUAL(message, handler.msg); + MessageBuilder builder; + builder.start(SequenceNumber()); + + std::string exchange("builder-exchange"); + std::string key("builder-exchange"); + + AMQFrame method(0, MessageTransferBody(ProtocolVersion(), 0, exchange, 0, 0)); + AMQFrame header(0, AMQHeaderBody()); + + header.castBody<AMQHeaderBody>()->get<MessageProperties>(true)->setContentLength(0); + header.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true)->setRoutingKey(key); + + builder.handle(method); + builder.handle(header); + + CPPUNIT_ASSERT(builder.getMessage()); + CPPUNIT_ASSERT_EQUAL(exchange, builder.getMessage()->getExchangeName()); + CPPUNIT_ASSERT_EQUAL(key, builder.getMessage()->getRoutingKey()); + CPPUNIT_ASSERT(builder.getMessage()->getFrames().isComplete()); } void test1ContentFrame(){ - MockHandler handler; - MessageBuilder builder(&handler); + MessageBuilder builder; + builder.start(SequenceNumber()); - string data1("abcdefg"); + std::string data("abcdefg"); + std::string exchange("builder-exchange"); + std::string key("builder-exchange"); - Message::shared_ptr message( - new BasicMessage(0, "test", "my_routing_key", false, false)); - AMQHeaderBody header(BASIC); - header.setContentSize(7); - AMQContentBody part1(data1); - - builder.initialise(message); - CPPUNIT_ASSERT(!handler.msg); - builder.setHeader(&header); - CPPUNIT_ASSERT(!handler.msg); - builder.addContent(&part1); - CPPUNIT_ASSERT(handler.msg); - CPPUNIT_ASSERT_EQUAL(message, handler.msg); + AMQFrame method(0, MessageTransferBody(ProtocolVersion(), 0, exchange, 0, 0)); + AMQFrame header(0, AMQHeaderBody()); + AMQFrame content(0, AMQContentBody(data)); + + header.castBody<AMQHeaderBody>()->get<MessageProperties>(true)->setContentLength(data.size()); + header.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true)->setRoutingKey(key); + + builder.handle(method); + CPPUNIT_ASSERT(builder.getMessage()); + CPPUNIT_ASSERT(!builder.getMessage()->getFrames().isComplete()); + + builder.handle(header); + CPPUNIT_ASSERT(builder.getMessage()); + CPPUNIT_ASSERT(!builder.getMessage()->getFrames().isComplete()); + + builder.handle(content); + CPPUNIT_ASSERT(builder.getMessage()); + CPPUNIT_ASSERT(builder.getMessage()->getFrames().isComplete()); } void test2ContentFrames(){ - MockHandler handler; - MessageBuilder builder(&handler); - - string data1("abcdefg"); - string data2("hijklmn"); - - Message::shared_ptr message( - new BasicMessage(0, "test", "my_routing_key", false, false)); - AMQHeaderBody header(BASIC); - header.setContentSize(14); - AMQContentBody part1(data1); - AMQContentBody part2(data2); - - builder.initialise(message); - CPPUNIT_ASSERT(!handler.msg); - builder.setHeader(&header); - CPPUNIT_ASSERT(!handler.msg); - builder.addContent(&part1); - CPPUNIT_ASSERT(!handler.msg); - builder.addContent(&part2); - CPPUNIT_ASSERT(handler.msg); - CPPUNIT_ASSERT_EQUAL(message, handler.msg); + MessageBuilder builder; + builder.start(SequenceNumber()); + + std::string data1("abcdefg"); + std::string data2("hijklmn"); + std::string exchange("builder-exchange"); + std::string key("builder-exchange"); + + AMQFrame method(0, MessageTransferBody(ProtocolVersion(), 0, exchange, 0, 0)); + AMQFrame header(0, AMQHeaderBody()); + AMQFrame content1(0, AMQContentBody(data1)); + AMQFrame content2(0, AMQContentBody(data2)); + + header.castBody<AMQHeaderBody>()->get<MessageProperties>(true)->setContentLength(data1.size() + data2.size()); + header.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true)->setRoutingKey(key); + + builder.handle(method); + builder.handle(header); + builder.handle(content1); + CPPUNIT_ASSERT(builder.getMessage()); + CPPUNIT_ASSERT(!builder.getMessage()->getFrames().isComplete()); + + builder.handle(content2); + CPPUNIT_ASSERT(builder.getMessage()); + CPPUNIT_ASSERT(builder.getMessage()->getFrames().isComplete()); } void testStaging(){ - //store must be the last thing to be destroyed or destructor - //of Message fails (it uses the store to call destroy if lazy - //loaded content is in use) - TestMessageStore store(14); - { - MockHandler handler; - MessageBuilder builder(&handler, &store, 5); - - string data1("abcdefg"); - string data2("hijklmn"); - - Message::shared_ptr message( - new BasicMessage(0, "test", "my_routing_key", false, false)); - AMQHeaderBody header(BASIC); - header.setContentSize(14); - BasicHeaderProperties* properties = dynamic_cast<BasicHeaderProperties*>(header.getProperties()); - properties->setMessageId("MyMessage"); - properties->getHeaders().setString("abc", "xyz"); - - AMQContentBody part1(data1); - AMQContentBody part2(data2); - - builder.initialise(message); - builder.setHeader(&header); - builder.addContent(&part1); - builder.addContent(&part2); - CPPUNIT_ASSERT(handler.msg); - CPPUNIT_ASSERT_EQUAL(message, handler.msg); - - BasicMessage::shared_ptr restored = store.getRestoredMessage(); - CPPUNIT_ASSERT_EQUAL(message->getExchange(), restored->getExchange()); - CPPUNIT_ASSERT_EQUAL(message->getRoutingKey(), restored->getRoutingKey()); - CPPUNIT_ASSERT_EQUAL(message->getHeaderProperties()->getMessageId(), restored->getHeaderProperties()->getMessageId()); - CPPUNIT_ASSERT_EQUAL(message->getHeaderProperties()->getHeaders().getString("abc"), - restored->getHeaderProperties()->getHeaders().getString("abc")); - CPPUNIT_ASSERT_EQUAL((uint64_t) 14, restored->contentSize()); - } + MockMessageStore store; + MessageBuilder builder(&store, 5); + builder.start(SequenceNumber()); + + std::string data1("abcdefg"); + std::string data2("hijklmn"); + std::string exchange("builder-exchange"); + std::string key("builder-exchange"); + + AMQFrame method(0, MessageTransferBody(ProtocolVersion(), 0, exchange, 0, 0)); + AMQFrame header(0, AMQHeaderBody()); + AMQFrame content1(0, AMQContentBody(data1)); + AMQFrame content2(0, AMQContentBody(data2)); + + header.castBody<AMQHeaderBody>()->get<MessageProperties>(true)->setContentLength(data1.size() + data2.size()); + header.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true)->setRoutingKey(key); + + builder.handle(method); + builder.handle(header); + + store.expectStage(*builder.getMessage()); + builder.handle(content1); + CPPUNIT_ASSERT(store.expectationsMet()); + CPPUNIT_ASSERT_EQUAL((uint64_t) 1, builder.getMessage()->getPersistenceId()); + + store.expectAppendContent(*builder.getMessage(), data2); + builder.handle(content2); + CPPUNIT_ASSERT(store.expectationsMet()); + + //were the content frames dropped? + CPPUNIT_ASSERT_EQUAL((uint64_t) 0, builder.getMessage()->contentSize()); } }; diff --git a/cpp/src/tests/MessageTest.cpp b/cpp/src/tests/MessageTest.cpp index 1fbb18b7d3..3d080ef3dc 100644 --- a/cpp/src/tests/MessageTest.cpp +++ b/cpp/src/tests/MessageTest.cpp @@ -18,7 +18,7 @@ * under the License. * */ -#include "qpid/broker/BrokerMessage.h" +#include "qpid/broker/Message.h" #include "qpid_test_plugin.h" #include <iostream> #include "qpid/framing/AMQP_HighestVersion.h" @@ -45,40 +45,45 @@ class MessageTest : public CppUnit::TestCase string data1("abcdefg"); string data2("hijklmn"); - BasicMessage::shared_ptr msg( - new BasicMessage(0, exchange, routingKey, false, false)); - AMQHeaderBody header(BASIC); - header.setContentSize(14); - AMQContentBody part1(data1); - AMQContentBody part2(data2); - msg->setHeader(&header); - msg->addContent(&part1); - msg->addContent(&part2); + Message::shared_ptr msg(new Message()); + + AMQFrame method(0, MessageTransferBody(ProtocolVersion(), 0, exchange, 0, 0)); + AMQFrame header(0, AMQHeaderBody()); + AMQFrame content1(0, AMQContentBody(data1)); + AMQFrame content2(0, AMQContentBody(data2)); + + msg->getFrames().append(method); + msg->getFrames().append(header); + msg->getFrames().append(content1); + msg->getFrames().append(content2); + + MessageProperties* mProps = msg->getFrames().getHeaders()->get<MessageProperties>(true); + mProps->setContentLength(data1.size() + data2.size()); + mProps->setMessageId(messageId); + FieldTable applicationHeaders; + applicationHeaders.setString("abc", "xyz"); + mProps->setApplicationHeaders(applicationHeaders); + DeliveryProperties* dProps = msg->getFrames().getHeaders()->get<DeliveryProperties>(true); + dProps->setRoutingKey(routingKey); + dProps->setDeliveryMode(PERSISTENT); + CPPUNIT_ASSERT(msg->isPersistent()); - msg->getHeaderProperties()->setMessageId(messageId); - msg->getHeaderProperties()->setDeliveryMode(PERSISTENT); - msg->getHeaderProperties()->getHeaders().setString("abc", "xyz"); Buffer buffer(msg->encodedSize()); msg->encode(buffer); - buffer.flip(); - - msg.reset(new BasicMessage()); - msg->decode(buffer); - CPPUNIT_ASSERT_EQUAL(exchange, msg->getExchange()); - CPPUNIT_ASSERT_EQUAL(routingKey, msg->getRoutingKey()); - CPPUNIT_ASSERT_EQUAL(messageId, msg->getHeaderProperties()->getMessageId()); - CPPUNIT_ASSERT_EQUAL(PERSISTENT, msg->getHeaderProperties()->getDeliveryMode()); - CPPUNIT_ASSERT_EQUAL(string("xyz"), msg->getHeaderProperties()->getHeaders().getString("abc")); - CPPUNIT_ASSERT_EQUAL((uint64_t) 14, msg->contentSize()); + buffer.flip(); + msg.reset(new Message()); + msg->decodeHeader(buffer); + msg->decodeContent(buffer); - MockChannel channel(1); - msg->deliver(channel, "ignore", 0, 100); - CPPUNIT_ASSERT_EQUAL((size_t) 3, channel.out.frames.size()); - AMQContentBody* contentBody( - dynamic_cast<AMQContentBody*>(channel.out.frames[2].getBody())); - CPPUNIT_ASSERT(contentBody); - CPPUNIT_ASSERT_EQUAL(data1 + data2, contentBody->getData()); + CPPUNIT_ASSERT_EQUAL(exchange, msg->getExchangeName()); + CPPUNIT_ASSERT_EQUAL(routingKey, msg->getRoutingKey()); + CPPUNIT_ASSERT_EQUAL((uint64_t) data1.size() + data2.size(), msg->contentSize()); + CPPUNIT_ASSERT_EQUAL((uint64_t) data1.size() + data2.size(), msg->getProperties<MessageProperties>()->getContentLength()); + CPPUNIT_ASSERT_EQUAL(messageId, msg->getProperties<MessageProperties>()->getMessageId()); + CPPUNIT_ASSERT_EQUAL(string("xyz"), msg->getProperties<MessageProperties>()->getApplicationHeaders().getString("abc")); + CPPUNIT_ASSERT_EQUAL((uint8_t) PERSISTENT, msg->getProperties<DeliveryProperties>()->getDeliveryMode()); + CPPUNIT_ASSERT(msg->isPersistent()); } }; diff --git a/cpp/src/tests/MessageUtils.h b/cpp/src/tests/MessageUtils.h new file mode 100644 index 0000000000..7fb1755c4b --- /dev/null +++ b/cpp/src/tests/MessageUtils.h @@ -0,0 +1,53 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/broker/Message.h" +#include "qpid/broker/MessageDelivery.h" +#include "qpid/framing/AMQFrame.h" + +using namespace qpid::broker; +using namespace qpid::framing; + +struct MessageUtils +{ + static Message::shared_ptr createMessage(const string& exchange, const string& routingKey, + const string& messageId, uint64_t contentSize = 0) + { + Message::shared_ptr msg(new Message()); + + AMQFrame method(0, MessageTransferBody(ProtocolVersion(), 0, exchange, 0, 0)); + AMQFrame header(0, AMQHeaderBody()); + + msg->getFrames().append(method); + msg->getFrames().append(header); + MessageProperties* props = msg->getFrames().getHeaders()->get<MessageProperties>(true); + props->setContentLength(contentSize); + props->setMessageId(messageId); + msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey); + return msg; + } + + static void addContent(Message::shared_ptr msg, const string& data) + { + AMQFrame content(0, AMQContentBody(data)); + msg->getFrames().append(content); + } +}; diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp index e7ca124631..ef1518af4c 100644 --- a/cpp/src/tests/QueueTest.cpp +++ b/cpp/src/tests/QueueTest.cpp @@ -70,8 +70,13 @@ class QueueTest : public CppUnit::TestCase public: Message::shared_ptr message(std::string exchange, std::string routingKey) { - return Message::shared_ptr( - new BasicMessage(0, exchange, routingKey, false, false)); + Message::shared_ptr msg(new Message()); + AMQFrame method(0, MessageTransferBody(ProtocolVersion(), 0, exchange, 0, 0)); + AMQFrame header(0, AMQHeaderBody()); + msg->getFrames().append(method); + msg->getFrames().append(header); + msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey); + return msg; } diff --git a/cpp/src/tests/ReferenceTest.cpp b/cpp/src/tests/ReferenceTest.cpp deleted file mode 100644 index 411462564a..0000000000 --- a/cpp/src/tests/ReferenceTest.cpp +++ /dev/null @@ -1,94 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <iostream> -#include <memory> -#include "qpid_test_plugin.h" -#include "qpid/broker/Reference.h" -#include "qpid/broker/BrokerMessageMessage.h" -#include "qpid/framing/MessageTransferBody.h" -#include "qpid/framing/MessageAppendBody.h" -#include "qpid/broker/CompletionHandler.h" - -using namespace boost; -using namespace qpid; -using namespace qpid::broker; -using namespace qpid::framing; -using namespace std; - -class ReferenceTest : public CppUnit::TestCase -{ - CPPUNIT_TEST_SUITE(ReferenceTest); - CPPUNIT_TEST(testRegistry); - CPPUNIT_TEST(testReference); - CPPUNIT_TEST_SUITE_END(); - - ProtocolVersion v; - ReferenceRegistry registry; - - public: - void testRegistry() { - Reference::shared_ptr ref = registry.open("foo"); - CPPUNIT_ASSERT_EQUAL(string("foo"), ref->getId()); - CPPUNIT_ASSERT(ref == registry.get("foo")); - try { - registry.get("none"); - CPPUNIT_FAIL("Expected exception"); - } catch (...) {} - try { - registry.open("foo"); - CPPUNIT_FAIL("Expected exception"); - } catch(...) {} - ref->close(); - try { - registry.get("foo"); - CPPUNIT_FAIL("Expected exception"); - } catch(...) {} - } - - void testReference() { - - Reference::shared_ptr r1(registry.open("bar")); - - MessageTransferBody t1(v); - // TODO aconway 2007-04-03: hack around lack of generated setters. Clean this up. - const_cast<framing::Content&>(t1.getBody()) = framing::Content(REFERENCE,"bar"); - MessageMessage::shared_ptr m1(new MessageMessage(0, &t1, r1)); - - MessageTransferBody t2(v); - const_cast<framing::Content&>(t2.getBody()) = framing::Content(REFERENCE,"bar"); - MessageMessage::shared_ptr m2(new MessageMessage(0, &t2, r1)); - - MessageAppendBody a1(v); - MessageAppendBody a2(v); - - r1->addMessage(m1); - r1->addMessage(m2); - CPPUNIT_ASSERT_EQUAL(size_t(2), r1->getMessages().size()); - r1->append(a1); - r1->append(a2); - CPPUNIT_ASSERT_EQUAL(size_t(2), r1->getAppends().size()); - r1->close(); - } -}; - -// Make this test suite a plugin. -CPPUNIT_PLUGIN_IMPLEMENT(); -CPPUNIT_TEST_SUITE_REGISTRATION(ReferenceTest); diff --git a/cpp/src/tests/TxAckTest.cpp b/cpp/src/tests/TxAckTest.cpp index 24e8aac701..89a907d495 100644 --- a/cpp/src/tests/TxAckTest.cpp +++ b/cpp/src/tests/TxAckTest.cpp @@ -68,11 +68,13 @@ public: TxAckTest() : acked(0), queue(new Queue("my_queue", false, &store, 0)), op(acked, deliveries) { for(int i = 0; i < 10; i++){ - Message::shared_ptr msg( - new BasicMessage(0, "exchange", "routing_key", false, false)); - AMQHeaderBody body(BASIC); - msg->setHeader(&body); - msg->getHeaderProperties()->setDeliveryMode(PERSISTENT); + Message::shared_ptr msg(new Message()); + AMQFrame method(0, MessageTransferBody(ProtocolVersion(), 0, "exchange", 0, 0)); + AMQFrame header(0, AMQHeaderBody()); + msg->getFrames().append(method); + msg->getFrames().append(header); + msg->getProperties<DeliveryProperties>()->setDeliveryMode(PERSISTENT); + msg->getProperties<DeliveryProperties>()->setRoutingKey("routing_key"); messages.push_back(msg); deliveries.push_back(DeliveryRecord(msg, queue, "xyz", (i+1))); } diff --git a/cpp/src/tests/TxPublishTest.cpp b/cpp/src/tests/TxPublishTest.cpp index d009dd9112..5628cf1d1c 100644 --- a/cpp/src/tests/TxPublishTest.cpp +++ b/cpp/src/tests/TxPublishTest.cpp @@ -26,6 +26,7 @@ #include <list> #include <vector> #include "MockChannel.h" +#include "MessageUtils.h" using std::list; using std::pair; @@ -70,12 +71,10 @@ public: TxPublishTest() : queue1(new Queue("queue1", false, &store, 0)), queue2(new Queue("queue2", false, &store, 0)), - msg(new BasicMessage(0, "exchange", "routing_key", false, false)), + msg(MessageUtils::createMessage("exchange", "routing_key", "id")), op(msg) { - AMQHeaderBody body(BASIC); - msg->setHeader(&body); - msg->getHeaderProperties()->setDeliveryMode(PERSISTENT); + msg->getProperties<DeliveryProperties>()->setDeliveryMode(PERSISTENT); op.deliverTo(queue1); op.deliverTo(queue2); } |