From 9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919 Mon Sep 17 00:00:00 2001 From: Kim van der Riet Date: Thu, 28 Feb 2013 16:14:30 +0000 Subject: Update from trunk r1375509 through r1450773 git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1451244 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/tests/legacystore/SimpleTest.cpp | 497 +++++++++++++++++++++++++++++++ 1 file changed, 497 insertions(+) create mode 100644 cpp/src/tests/legacystore/SimpleTest.cpp (limited to 'cpp/src/tests/legacystore/SimpleTest.cpp') diff --git a/cpp/src/tests/legacystore/SimpleTest.cpp b/cpp/src/tests/legacystore/SimpleTest.cpp new file mode 100644 index 0000000000..a49333d876 --- /dev/null +++ b/cpp/src/tests/legacystore/SimpleTest.cpp @@ -0,0 +1,497 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "unit_test.h" + +#include "qpid/legacystore/MessageStoreImpl.h" +#include +#include "tests/legacystore/MessageUtils.h" +#include "qpid/legacystore/StoreException.h" +#include "qpid/broker/DirectExchange.h" +#include +#include +#include +#include +#include +#include +#include "qpid/log/Logger.h" +#include "qpid/sys/Timer.h" + +qpid::broker::Broker::Options opts; +qpid::broker::Broker br(opts); + +#define SET_LOG_LEVEL(level) \ + qpid::log::Options opts(""); \ + opts.selectors.clear(); \ + opts.selectors.push_back(level); \ + qpid::log::Logger::instance().configure(opts); + + +using boost::intrusive_ptr; +using boost::static_pointer_cast; +using namespace qpid; +using namespace qpid::broker; +using namespace qpid::framing; +using namespace mrg::msgstore; +using namespace std; + +QPID_AUTO_TEST_SUITE(SimpleTest) + +const string test_filename("SimpleTest"); +const char* tdp = getenv("TMP_DATA_DIR"); +const string test_dir(tdp && strlen(tdp) > 0 ? tdp : "/tmp/SimpleTest"); + +// === Helper fns === + +struct DummyHandler : OutputHandler +{ + std::vector frames; + + virtual void send(AMQFrame& frame){ + frames.push_back(frame); + } +}; + +void recover(MessageStoreImpl& store, QueueRegistry& queues, ExchangeRegistry& exchanges, LinkRegistry& links) +{ + sys::Timer t; + DtxManager mgr(t); + mgr.setStore (&store); + RecoveryManagerImpl recovery(queues, exchanges, links, mgr, br.getProtocolRegistry()); + store.recover(recovery); +} + +void recover(MessageStoreImpl& store, ExchangeRegistry& exchanges) +{ + QueueRegistry queues; + LinkRegistry links; + recover(store, queues, exchanges, links); +} + +void recover(MessageStoreImpl& store, QueueRegistry& queues) +{ + ExchangeRegistry exchanges; + LinkRegistry links; + recover(store, queues, exchanges, links); +} + +void bindAndUnbind(const string& exchangeName, const string& queueName, + const string& key, const FieldTable& args) +{ + { + MessageStoreImpl store(&br); + store.init(test_dir, 4, 1, true); // truncate store + Exchange::shared_ptr exchange(new DirectExchange(exchangeName, true, args)); + Queue::shared_ptr queue(new Queue(queueName, 0, &store, 0)); + store.create(*exchange, qpid::framing::FieldTable()); + store.create(*queue, qpid::framing::FieldTable()); + BOOST_REQUIRE(exchange->bind(queue, key, &args)); + store.bind(*exchange, *queue, key, args); + }//db will be closed + { + MessageStoreImpl store(&br); + store.init(test_dir, 4, 1); + ExchangeRegistry exchanges; + QueueRegistry queues; + LinkRegistry links; + + recover(store, queues, exchanges, links); + + Exchange::shared_ptr exchange = exchanges.get(exchangeName); + Queue::shared_ptr queue = queues.find(queueName); + // check exchange args are still set + for (FieldTable::ValueMap::const_iterator i = args.begin(); i!=args.end(); i++) { + BOOST_CHECK(exchange->getArgs().get((*i).first)->getData() == (*i).second->getData()); + } + //check it is bound by unbinding + BOOST_REQUIRE(exchange->unbind(queue, key, &args)); + store.unbind(*exchange, *queue, key, args); + } + { + MessageStoreImpl store(&br); + store.init(test_dir, 4, 1); + ExchangeRegistry exchanges; + QueueRegistry queues; + LinkRegistry links; + + recover(store, queues, exchanges, links); + + Exchange::shared_ptr exchange = exchanges.get(exchangeName); + Queue::shared_ptr queue = queues.find(queueName); + // check exchange args are still set + for (FieldTable::ValueMap::const_iterator i = args.begin(); i!=args.end(); i++) { + BOOST_CHECK(exchange->getArgs().get((*i).first)->getData() == (*i).second->getData()); + } + //make sure it is no longer bound + BOOST_REQUIRE(!exchange->unbind(queue, key, &args)); + } +} + + +// === Test suite === + +QPID_AUTO_TEST_CASE(CreateDelete) +{ + SET_LOG_LEVEL("error+"); // This only needs to be set once. + + cout << test_filename << ".CreateDelete: " << flush; + MessageStoreImpl store(&br); + store.init(test_dir, 4, 1, true); // truncate store + string name("CreateDeleteQueue"); + Queue queue(name, 0, &store, 0); + store.create(queue, qpid::framing::FieldTable()); +// TODO - check dir exists + BOOST_REQUIRE(queue.getPersistenceId()); + store.destroy(queue); +// TODO - check dir is deleted + + cout << "ok" << endl; +} + +QPID_AUTO_TEST_CASE(EmptyRecover) +{ + cout << test_filename << ".EmptyRecover: " << flush; + MessageStoreImpl store(&br); + store.init(test_dir, 4, 1, true); // truncate store + QueueRegistry registry; + registry.setStore (&store); + recover(store, registry); + //nothing to assert, just testing it doesn't blow up + + cout << "ok" << endl; +} + +QPID_AUTO_TEST_CASE(QueueCreate) +{ + cout << test_filename << ".QueueCreate: " << flush; + + uint64_t id(0); + string name("MyDurableQueue"); + { + MessageStoreImpl store(&br); + store.init(test_dir, 4, 1, true); // truncate store + Queue queue(name, 0, &store, 0); + store.create(queue, qpid::framing::FieldTable()); + BOOST_REQUIRE(queue.getPersistenceId()); + id = queue.getPersistenceId(); + }//db will be closed + { + MessageStoreImpl store(&br); + store.init(test_dir, 4, 1); + QueueRegistry registry; + registry.setStore (&store); + recover(store, registry); + Queue::shared_ptr queue = registry.find(name); + BOOST_REQUIRE(queue.get()); + BOOST_CHECK_EQUAL(id, queue->getPersistenceId()); + } + + cout << "ok" << endl; +} + +QPID_AUTO_TEST_CASE(QueueCreateWithSettings) +{ + cout << test_filename << ".QueueCreateWithSettings: " << flush; + + FieldTable arguments; + arguments.setInt("qpid.max_count", 202); + arguments.setInt("qpid.max_size", 1003); + QueueSettings settings; + settings.populate(arguments, settings.storeSettings); + string name("MyDurableQueue"); + { + MessageStoreImpl store(&br); + store.init(test_dir, 4, 1, true); // truncate store + Queue queue(name, settings, &store, 0); + queue.create(); + BOOST_REQUIRE(queue.getPersistenceId()); + }//db will be closed + { + MessageStoreImpl store(&br); + store.init(test_dir, 4, 1); + QueueRegistry registry; + registry.setStore (&store); + recover(store, registry); + Queue::shared_ptr queue = registry.find(name); + BOOST_REQUIRE(queue); + BOOST_CHECK_EQUAL(settings.maxDepth.getCount(), 202); + BOOST_CHECK_EQUAL(settings.maxDepth.getSize(), 1003); + BOOST_CHECK_EQUAL(settings.maxDepth.getCount(), queue->getSettings().maxDepth.getCount()); + BOOST_CHECK_EQUAL(settings.maxDepth.getSize(), queue->getSettings().maxDepth.getSize()); + } + + cout << "ok" << endl; +} + +QPID_AUTO_TEST_CASE(QueueDestroy) +{ + cout << test_filename << ".QueueDestroy: " << flush; + + string name("MyDurableQueue"); + { + MessageStoreImpl store(&br); + store.init(test_dir, 4, 1, true); // truncate store + Queue queue(name, 0, &store, 0); + store.create(queue, qpid::framing::FieldTable()); + store.destroy(queue); + }//db will be closed + { + MessageStoreImpl store(&br); + store.init(test_dir, 4, 1); + QueueRegistry registry; + registry.setStore (&store); + recover(store, registry); + BOOST_REQUIRE(!registry.find(name)); + } + + cout << "ok" << endl; +} + +QPID_AUTO_TEST_CASE(Enqueue) +{ + cout << test_filename << ".Enqueue: " << flush; + + //TODO: this is largely copy & paste'd from MessageTest in + //qpid tree. ideally need some helper routines for reducing + //this to a simpler less duplicated form + + string name("MyDurableQueue"); + string exchange("MyExchange"); + string routingKey("MyRoutingKey"); + Uuid messageId(true); + string data1("abcdefg"); + string data2("hijklmn"); + { + MessageStoreImpl store(&br); + store.init(test_dir, 4, 1, true); // truncate store + Queue::shared_ptr queue(new Queue(name, 0, &store, 0)); + queue->create(); + + Message msg = MessageUtils::createMessage(exchange, routingKey, messageId, true, 14); + MessageUtils::addContent(msg, data1); + MessageUtils::addContent(msg, data2); + + msg.addAnnotation("abc", "xyz"); + + queue->deliver(msg); + }//db will be closed + { + MessageStoreImpl store(&br); + store.init(test_dir, 4, 1); + QueueRegistry registry; + registry.setStore (&store); + recover(store, registry); + Queue::shared_ptr queue = registry.find(name); + BOOST_REQUIRE(queue); + BOOST_CHECK_EQUAL((u_int32_t) 1, queue->getMessageCount()); + Message msg = MessageUtils::get(*queue); + + BOOST_CHECK_EQUAL(routingKey, msg.getRoutingKey()); + BOOST_CHECK_EQUAL(messageId, MessageUtils::getMessageId(msg)); + BOOST_CHECK_EQUAL(std::string("xyz"), msg.getAnnotation("abc")); + BOOST_CHECK_EQUAL((u_int64_t) 14, msg.getContentSize()); + + DummyHandler handler; + MessageUtils::deliver(msg, handler, 100); + BOOST_CHECK_EQUAL((size_t) 2, handler.frames.size()); + AMQContentBody* contentBody(dynamic_cast(handler.frames[1].getBody())); + BOOST_REQUIRE(contentBody); + BOOST_CHECK_EQUAL(data1.size() + data2.size(), contentBody->getData().size()); + BOOST_CHECK_EQUAL(data1 + data2, contentBody->getData()); + } + + cout << "ok" << endl; +} + +QPID_AUTO_TEST_CASE(Dequeue) +{ + cout << test_filename << ".Dequeue: " << flush; + + //TODO: reduce the duplication in these tests + string name("MyDurableQueue"); + { + string exchange("MyExchange"); + string routingKey("MyRoutingKey"); + Uuid messageId(true); + string data("abcdefg"); + MessageStoreImpl store(&br); + store.init(test_dir, 4, 1, true); // truncate store + Queue::shared_ptr queue(new Queue(name, 0, &store, 0)); + queue->create(); + + Message msg = MessageUtils::createMessage(exchange, routingKey, messageId, true, 7); + MessageUtils::addContent(msg, data); + + queue->deliver(msg); + + QueueCursor cursor; + MessageUtils::get(*queue, &cursor); + queue->dequeue(0, cursor); + }//db will be closed + { + MessageStoreImpl store(&br); + store.init(test_dir, 4, 1); + QueueRegistry registry; + registry.setStore (&store); + recover(store, registry); + Queue::shared_ptr queue = registry.find(name); + BOOST_REQUIRE(queue); + BOOST_CHECK_EQUAL((u_int32_t) 0, queue->getMessageCount()); + } + + cout << "ok" << endl; +} + +QPID_AUTO_TEST_CASE(ExchangeCreateAndDestroy) +{ + cout << test_filename << ".ExchangeCreateAndDestroy: " << flush; + + uint64_t id(0); + string name("MyDurableExchange"); + string type("direct"); + FieldTable args; + args.setString("a", "A"); + { + MessageStoreImpl store(&br); + store.init(test_dir, 4, 1, true); // truncate store + ExchangeRegistry registry; + Exchange::shared_ptr exchange = registry.declare(name, type, true, args).first; + store.create(*exchange, qpid::framing::FieldTable()); + id = exchange->getPersistenceId(); + BOOST_REQUIRE(id); + }//db will be closed + { + MessageStoreImpl store(&br); + store.init(test_dir, 4, 1); + ExchangeRegistry registry; + + recover(store, registry); + + Exchange::shared_ptr exchange = registry.get(name); + BOOST_CHECK_EQUAL(id, exchange->getPersistenceId()); + BOOST_CHECK_EQUAL(type, exchange->getType()); + BOOST_REQUIRE(exchange->isDurable()); + BOOST_CHECK_EQUAL(*args.get("a"), *exchange->getArgs().get("a")); + store.destroy(*exchange); + } + { + MessageStoreImpl store(&br); + store.init(test_dir, 4, 1); + ExchangeRegistry registry; + + recover(store, registry); + + try { + Exchange::shared_ptr exchange = registry.get(name); + BOOST_FAIL("Expected exchange not to be found"); + } catch (const SessionException& e) { + BOOST_CHECK_EQUAL((framing::ReplyCode) 404, e.code); + } + } + + cout << "ok" << endl; +} + +QPID_AUTO_TEST_CASE(ExchangeBindAndUnbind) +{ + cout << test_filename << ".ExchangeBindAndUnbind: " << flush; + + bindAndUnbind("MyDurableExchange", "MyDurableQueue", "my-routing-key", FieldTable()); + + cout << "ok" << endl; +} + +QPID_AUTO_TEST_CASE(ExchangeBindAndUnbindWithArgs) +{ + cout << test_filename << ".ExchangeBindAndUnbindWithArgs: " << flush; + + FieldTable args; + args.setString("a", "A"); + args.setString("b", "B"); + bindAndUnbind("MyDurableExchange", "MyDurableQueue", "my-routing-key", args); + + cout << "ok" << endl; +} + +QPID_AUTO_TEST_CASE(ExchangeImplicitUnbind) +{ + cout << test_filename << ".ExchangeImplicitUnbind: " << flush; + + string exchangeName("MyDurableExchange"); + string queueName1("MyDurableQueue1"); + string queueName2("MyDurableQueue2"); + string key("my-routing-key"); + FieldTable args; + { + MessageStoreImpl store(&br); + store.init(test_dir, 4, 1, true); // truncate store + Exchange::shared_ptr exchange(new DirectExchange(exchangeName, true, args)); + Queue::shared_ptr queue1(new Queue(queueName1, 0, &store, 0)); + Queue::shared_ptr queue2(new Queue(queueName2, 0, &store, 0)); + store.create(*exchange, qpid::framing::FieldTable()); + store.create(*queue1, qpid::framing::FieldTable()); + store.create(*queue2, qpid::framing::FieldTable()); + store.bind(*exchange, *queue1, key, args); + store.bind(*exchange, *queue2, key, args); + //delete queue1: + store.destroy(*queue1); + }//db will be closed + { + MessageStoreImpl store(&br); + store.init(test_dir, 4, 1); + ExchangeRegistry exchanges; + QueueRegistry queues; + LinkRegistry links; + + //ensure recovery works ok: + recover(store, queues, exchanges, links); + + Exchange::shared_ptr exchange = exchanges.get(exchangeName); + BOOST_REQUIRE(!queues.find(queueName1).get()); + BOOST_REQUIRE(queues.find(queueName2).get()); + + //delete exchange: + store.destroy(*exchange); + } + { + MessageStoreImpl store(&br); + store.init(test_dir, 4, 1); + ExchangeRegistry exchanges; + QueueRegistry queues; + LinkRegistry links; + + //ensure recovery works ok: + recover(store, queues, exchanges, links); + + try { + Exchange::shared_ptr exchange = exchanges.get(exchangeName); + BOOST_FAIL("Expected exchange not to be found"); + } catch (const SessionException& e) { + BOOST_CHECK_EQUAL((framing::ReplyCode) 404, e.code); + } + Queue::shared_ptr queue = queues.find(queueName2); + store.destroy(*queue); + } + + cout << "ok" << endl; +} + +QPID_AUTO_TEST_SUITE_END() -- cgit v1.2.1