summaryrefslogtreecommitdiff
path: root/cpp/src/tests/legacystore/SimpleTest.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/legacystore/SimpleTest.cpp')
-rw-r--r--cpp/src/tests/legacystore/SimpleTest.cpp497
1 files changed, 497 insertions, 0 deletions
diff --git a/cpp/src/tests/legacystore/SimpleTest.cpp b/cpp/src/tests/legacystore/SimpleTest.cpp
new file mode 100644
index 0000000000..a49333d876
--- /dev/null
+++ b/cpp/src/tests/legacystore/SimpleTest.cpp
@@ -0,0 +1,497 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "unit_test.h"
+
+#include "qpid/legacystore/MessageStoreImpl.h"
+#include <iostream>
+#include "tests/legacystore/MessageUtils.h"
+#include "qpid/legacystore/StoreException.h"
+#include "qpid/broker/DirectExchange.h"
+#include <qpid/broker/Queue.h>
+#include <qpid/broker/QueueSettings.h>
+#include <qpid/broker/RecoveryManagerImpl.h>
+#include <qpid/framing/AMQHeaderBody.h>
+#include <qpid/framing/FieldTable.h>
+#include <qpid/framing/FieldValue.h>
+#include "qpid/log/Logger.h"
+#include "qpid/sys/Timer.h"
+
+qpid::broker::Broker::Options opts;
+qpid::broker::Broker br(opts);
+
+#define SET_LOG_LEVEL(level) \
+ qpid::log::Options opts(""); \
+ opts.selectors.clear(); \
+ opts.selectors.push_back(level); \
+ qpid::log::Logger::instance().configure(opts);
+
+
+using boost::intrusive_ptr;
+using boost::static_pointer_cast;
+using namespace qpid;
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace mrg::msgstore;
+using namespace std;
+
+QPID_AUTO_TEST_SUITE(SimpleTest)
+
+const string test_filename("SimpleTest");
+const char* tdp = getenv("TMP_DATA_DIR");
+const string test_dir(tdp && strlen(tdp) > 0 ? tdp : "/tmp/SimpleTest");
+
+// === Helper fns ===
+
+struct DummyHandler : OutputHandler
+{
+ std::vector<AMQFrame> frames;
+
+ virtual void send(AMQFrame& frame){
+ frames.push_back(frame);
+ }
+};
+
+void recover(MessageStoreImpl& store, QueueRegistry& queues, ExchangeRegistry& exchanges, LinkRegistry& links)
+{
+ sys::Timer t;
+ DtxManager mgr(t);
+ mgr.setStore (&store);
+ RecoveryManagerImpl recovery(queues, exchanges, links, mgr, br.getProtocolRegistry());
+ store.recover(recovery);
+}
+
+void recover(MessageStoreImpl& store, ExchangeRegistry& exchanges)
+{
+ QueueRegistry queues;
+ LinkRegistry links;
+ recover(store, queues, exchanges, links);
+}
+
+void recover(MessageStoreImpl& store, QueueRegistry& queues)
+{
+ ExchangeRegistry exchanges;
+ LinkRegistry links;
+ recover(store, queues, exchanges, links);
+}
+
+void bindAndUnbind(const string& exchangeName, const string& queueName,
+ const string& key, const FieldTable& args)
+{
+ {
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1, true); // truncate store
+ Exchange::shared_ptr exchange(new DirectExchange(exchangeName, true, args));
+ Queue::shared_ptr queue(new Queue(queueName, 0, &store, 0));
+ store.create(*exchange, qpid::framing::FieldTable());
+ store.create(*queue, qpid::framing::FieldTable());
+ BOOST_REQUIRE(exchange->bind(queue, key, &args));
+ store.bind(*exchange, *queue, key, args);
+ }//db will be closed
+ {
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1);
+ ExchangeRegistry exchanges;
+ QueueRegistry queues;
+ LinkRegistry links;
+
+ recover(store, queues, exchanges, links);
+
+ Exchange::shared_ptr exchange = exchanges.get(exchangeName);
+ Queue::shared_ptr queue = queues.find(queueName);
+ // check exchange args are still set
+ for (FieldTable::ValueMap::const_iterator i = args.begin(); i!=args.end(); i++) {
+ BOOST_CHECK(exchange->getArgs().get((*i).first)->getData() == (*i).second->getData());
+ }
+ //check it is bound by unbinding
+ BOOST_REQUIRE(exchange->unbind(queue, key, &args));
+ store.unbind(*exchange, *queue, key, args);
+ }
+ {
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1);
+ ExchangeRegistry exchanges;
+ QueueRegistry queues;
+ LinkRegistry links;
+
+ recover(store, queues, exchanges, links);
+
+ Exchange::shared_ptr exchange = exchanges.get(exchangeName);
+ Queue::shared_ptr queue = queues.find(queueName);
+ // check exchange args are still set
+ for (FieldTable::ValueMap::const_iterator i = args.begin(); i!=args.end(); i++) {
+ BOOST_CHECK(exchange->getArgs().get((*i).first)->getData() == (*i).second->getData());
+ }
+ //make sure it is no longer bound
+ BOOST_REQUIRE(!exchange->unbind(queue, key, &args));
+ }
+}
+
+
+// === Test suite ===
+
+QPID_AUTO_TEST_CASE(CreateDelete)
+{
+ SET_LOG_LEVEL("error+"); // This only needs to be set once.
+
+ cout << test_filename << ".CreateDelete: " << flush;
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1, true); // truncate store
+ string name("CreateDeleteQueue");
+ Queue queue(name, 0, &store, 0);
+ store.create(queue, qpid::framing::FieldTable());
+// TODO - check dir exists
+ BOOST_REQUIRE(queue.getPersistenceId());
+ store.destroy(queue);
+// TODO - check dir is deleted
+
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(EmptyRecover)
+{
+ cout << test_filename << ".EmptyRecover: " << flush;
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1, true); // truncate store
+ QueueRegistry registry;
+ registry.setStore (&store);
+ recover(store, registry);
+ //nothing to assert, just testing it doesn't blow up
+
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(QueueCreate)
+{
+ cout << test_filename << ".QueueCreate: " << flush;
+
+ uint64_t id(0);
+ string name("MyDurableQueue");
+ {
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1, true); // truncate store
+ Queue queue(name, 0, &store, 0);
+ store.create(queue, qpid::framing::FieldTable());
+ BOOST_REQUIRE(queue.getPersistenceId());
+ id = queue.getPersistenceId();
+ }//db will be closed
+ {
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1);
+ QueueRegistry registry;
+ registry.setStore (&store);
+ recover(store, registry);
+ Queue::shared_ptr queue = registry.find(name);
+ BOOST_REQUIRE(queue.get());
+ BOOST_CHECK_EQUAL(id, queue->getPersistenceId());
+ }
+
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(QueueCreateWithSettings)
+{
+ cout << test_filename << ".QueueCreateWithSettings: " << flush;
+
+ FieldTable arguments;
+ arguments.setInt("qpid.max_count", 202);
+ arguments.setInt("qpid.max_size", 1003);
+ QueueSettings settings;
+ settings.populate(arguments, settings.storeSettings);
+ string name("MyDurableQueue");
+ {
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1, true); // truncate store
+ Queue queue(name, settings, &store, 0);
+ queue.create();
+ BOOST_REQUIRE(queue.getPersistenceId());
+ }//db will be closed
+ {
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1);
+ QueueRegistry registry;
+ registry.setStore (&store);
+ recover(store, registry);
+ Queue::shared_ptr queue = registry.find(name);
+ BOOST_REQUIRE(queue);
+ BOOST_CHECK_EQUAL(settings.maxDepth.getCount(), 202);
+ BOOST_CHECK_EQUAL(settings.maxDepth.getSize(), 1003);
+ BOOST_CHECK_EQUAL(settings.maxDepth.getCount(), queue->getSettings().maxDepth.getCount());
+ BOOST_CHECK_EQUAL(settings.maxDepth.getSize(), queue->getSettings().maxDepth.getSize());
+ }
+
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(QueueDestroy)
+{
+ cout << test_filename << ".QueueDestroy: " << flush;
+
+ string name("MyDurableQueue");
+ {
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1, true); // truncate store
+ Queue queue(name, 0, &store, 0);
+ store.create(queue, qpid::framing::FieldTable());
+ store.destroy(queue);
+ }//db will be closed
+ {
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1);
+ QueueRegistry registry;
+ registry.setStore (&store);
+ recover(store, registry);
+ BOOST_REQUIRE(!registry.find(name));
+ }
+
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(Enqueue)
+{
+ cout << test_filename << ".Enqueue: " << flush;
+
+ //TODO: this is largely copy & paste'd from MessageTest in
+ //qpid tree. ideally need some helper routines for reducing
+ //this to a simpler less duplicated form
+
+ string name("MyDurableQueue");
+ string exchange("MyExchange");
+ string routingKey("MyRoutingKey");
+ Uuid messageId(true);
+ string data1("abcdefg");
+ string data2("hijklmn");
+ {
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1, true); // truncate store
+ Queue::shared_ptr queue(new Queue(name, 0, &store, 0));
+ queue->create();
+
+ Message msg = MessageUtils::createMessage(exchange, routingKey, messageId, true, 14);
+ MessageUtils::addContent(msg, data1);
+ MessageUtils::addContent(msg, data2);
+
+ msg.addAnnotation("abc", "xyz");
+
+ queue->deliver(msg);
+ }//db will be closed
+ {
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1);
+ QueueRegistry registry;
+ registry.setStore (&store);
+ recover(store, registry);
+ Queue::shared_ptr queue = registry.find(name);
+ BOOST_REQUIRE(queue);
+ BOOST_CHECK_EQUAL((u_int32_t) 1, queue->getMessageCount());
+ Message msg = MessageUtils::get(*queue);
+
+ BOOST_CHECK_EQUAL(routingKey, msg.getRoutingKey());
+ BOOST_CHECK_EQUAL(messageId, MessageUtils::getMessageId(msg));
+ BOOST_CHECK_EQUAL(std::string("xyz"), msg.getAnnotation("abc"));
+ BOOST_CHECK_EQUAL((u_int64_t) 14, msg.getContentSize());
+
+ DummyHandler handler;
+ MessageUtils::deliver(msg, handler, 100);
+ BOOST_CHECK_EQUAL((size_t) 2, handler.frames.size());
+ AMQContentBody* contentBody(dynamic_cast<AMQContentBody*>(handler.frames[1].getBody()));
+ BOOST_REQUIRE(contentBody);
+ BOOST_CHECK_EQUAL(data1.size() + data2.size(), contentBody->getData().size());
+ BOOST_CHECK_EQUAL(data1 + data2, contentBody->getData());
+ }
+
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(Dequeue)
+{
+ cout << test_filename << ".Dequeue: " << flush;
+
+ //TODO: reduce the duplication in these tests
+ string name("MyDurableQueue");
+ {
+ string exchange("MyExchange");
+ string routingKey("MyRoutingKey");
+ Uuid messageId(true);
+ string data("abcdefg");
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1, true); // truncate store
+ Queue::shared_ptr queue(new Queue(name, 0, &store, 0));
+ queue->create();
+
+ Message msg = MessageUtils::createMessage(exchange, routingKey, messageId, true, 7);
+ MessageUtils::addContent(msg, data);
+
+ queue->deliver(msg);
+
+ QueueCursor cursor;
+ MessageUtils::get(*queue, &cursor);
+ queue->dequeue(0, cursor);
+ }//db will be closed
+ {
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1);
+ QueueRegistry registry;
+ registry.setStore (&store);
+ recover(store, registry);
+ Queue::shared_ptr queue = registry.find(name);
+ BOOST_REQUIRE(queue);
+ BOOST_CHECK_EQUAL((u_int32_t) 0, queue->getMessageCount());
+ }
+
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(ExchangeCreateAndDestroy)
+{
+ cout << test_filename << ".ExchangeCreateAndDestroy: " << flush;
+
+ uint64_t id(0);
+ string name("MyDurableExchange");
+ string type("direct");
+ FieldTable args;
+ args.setString("a", "A");
+ {
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1, true); // truncate store
+ ExchangeRegistry registry;
+ Exchange::shared_ptr exchange = registry.declare(name, type, true, args).first;
+ store.create(*exchange, qpid::framing::FieldTable());
+ id = exchange->getPersistenceId();
+ BOOST_REQUIRE(id);
+ }//db will be closed
+ {
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1);
+ ExchangeRegistry registry;
+
+ recover(store, registry);
+
+ Exchange::shared_ptr exchange = registry.get(name);
+ BOOST_CHECK_EQUAL(id, exchange->getPersistenceId());
+ BOOST_CHECK_EQUAL(type, exchange->getType());
+ BOOST_REQUIRE(exchange->isDurable());
+ BOOST_CHECK_EQUAL(*args.get("a"), *exchange->getArgs().get("a"));
+ store.destroy(*exchange);
+ }
+ {
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1);
+ ExchangeRegistry registry;
+
+ recover(store, registry);
+
+ try {
+ Exchange::shared_ptr exchange = registry.get(name);
+ BOOST_FAIL("Expected exchange not to be found");
+ } catch (const SessionException& e) {
+ BOOST_CHECK_EQUAL((framing::ReplyCode) 404, e.code);
+ }
+ }
+
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(ExchangeBindAndUnbind)
+{
+ cout << test_filename << ".ExchangeBindAndUnbind: " << flush;
+
+ bindAndUnbind("MyDurableExchange", "MyDurableQueue", "my-routing-key", FieldTable());
+
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(ExchangeBindAndUnbindWithArgs)
+{
+ cout << test_filename << ".ExchangeBindAndUnbindWithArgs: " << flush;
+
+ FieldTable args;
+ args.setString("a", "A");
+ args.setString("b", "B");
+ bindAndUnbind("MyDurableExchange", "MyDurableQueue", "my-routing-key", args);
+
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(ExchangeImplicitUnbind)
+{
+ cout << test_filename << ".ExchangeImplicitUnbind: " << flush;
+
+ string exchangeName("MyDurableExchange");
+ string queueName1("MyDurableQueue1");
+ string queueName2("MyDurableQueue2");
+ string key("my-routing-key");
+ FieldTable args;
+ {
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1, true); // truncate store
+ Exchange::shared_ptr exchange(new DirectExchange(exchangeName, true, args));
+ Queue::shared_ptr queue1(new Queue(queueName1, 0, &store, 0));
+ Queue::shared_ptr queue2(new Queue(queueName2, 0, &store, 0));
+ store.create(*exchange, qpid::framing::FieldTable());
+ store.create(*queue1, qpid::framing::FieldTable());
+ store.create(*queue2, qpid::framing::FieldTable());
+ store.bind(*exchange, *queue1, key, args);
+ store.bind(*exchange, *queue2, key, args);
+ //delete queue1:
+ store.destroy(*queue1);
+ }//db will be closed
+ {
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1);
+ ExchangeRegistry exchanges;
+ QueueRegistry queues;
+ LinkRegistry links;
+
+ //ensure recovery works ok:
+ recover(store, queues, exchanges, links);
+
+ Exchange::shared_ptr exchange = exchanges.get(exchangeName);
+ BOOST_REQUIRE(!queues.find(queueName1).get());
+ BOOST_REQUIRE(queues.find(queueName2).get());
+
+ //delete exchange:
+ store.destroy(*exchange);
+ }
+ {
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1);
+ ExchangeRegistry exchanges;
+ QueueRegistry queues;
+ LinkRegistry links;
+
+ //ensure recovery works ok:
+ recover(store, queues, exchanges, links);
+
+ try {
+ Exchange::shared_ptr exchange = exchanges.get(exchangeName);
+ BOOST_FAIL("Expected exchange not to be found");
+ } catch (const SessionException& e) {
+ BOOST_CHECK_EQUAL((framing::ReplyCode) 404, e.code);
+ }
+ Queue::shared_ptr queue = queues.find(queueName2);
+ store.destroy(*queue);
+ }
+
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_SUITE_END()