diff options
author | Gordon Sim <gsim@apache.org> | 2006-11-03 13:44:21 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2006-11-03 13:44:21 +0000 |
commit | 15a915878c787e6d5ed8330a8dd4375ec885a6c0 (patch) | |
tree | e580dbbeb7c61c4dc1a1348156c1970e09311b10 | |
parent | 20b96a39f539bf5181a58d1235f521d6a544bc47 (diff) | |
download | qpid-python-15a915878c787e6d5ed8330a8dd4375ec885a6c0.tar.gz |
Added some methods to MessageStore interface and hooked these in where appropriate.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@470810 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/src/qpid/broker/Message.cpp | 9 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Message.h | 7 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageStore.h | 21 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 21 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/broker/QueueRegistry.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/broker/QueueRegistry.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp | 15 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionHandlerFactoryImpl.h | 10 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionHandlerImpl.cpp | 13 | ||||
-rw-r--r-- | cpp/src/qpid/framing/BasicHeaderProperties.h | 1 | ||||
-rw-r--r-- | cpp/test/unit/qpid/broker/ConfigurationTest.cpp | 4 | ||||
-rw-r--r-- | cpp/test/unit/qpid/broker/ExchangeTest.cpp | 4 | ||||
-rw-r--r-- | cpp/test/unit/qpid/broker/QueueTest.cpp | 6 | ||||
-rw-r--r-- | cpp/test/unit/qpid/broker/TxAckTest.cpp | 142 | ||||
-rw-r--r-- | cpp/test/unit/qpid/broker/TxPublishTest.cpp | 128 |
16 files changed, 233 insertions, 167 deletions
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index e96cc65b95..baa1b0d915 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -32,7 +32,8 @@ Message::Message(const ConnectionToken* const _publisher, mandatory(_mandatory), immediate(_immediate), redelivered(false), - size(0) {} + size(0), + persistenceId(0) {} Message::~Message(){} @@ -92,3 +93,9 @@ const ConnectionToken* const Message::getPublisher(){ return publisher; } +bool Message::isPersistent() +{ + if(!header) return false; + BasicHeaderProperties* props = getHeaderProperties(); + return props && props->getDeliveryMode() == PERSISTENT; +} diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h index cfe29bdfcf..f9acdfd0a5 100644 --- a/cpp/src/qpid/broker/Message.h +++ b/cpp/src/qpid/broker/Message.h @@ -48,7 +48,7 @@ namespace qpid { qpid::framing::AMQHeaderBody::shared_ptr header; content_list content; u_int64_t size; - TxBuffer* tx; + u_int64_t persistenceId; void sendContent(qpid::framing::OutputHandler* out, int channel, u_int32_t framesize); @@ -78,11 +78,12 @@ namespace qpid { void redeliver(); qpid::framing::BasicHeaderProperties* getHeaderProperties(); + bool isPersistent(); const string& getRoutingKey() const { return routingKey; } const string& getExchange() const { return exchange; } u_int64_t contentSize() const { return size; } - TxBuffer* getTx() const { return tx; } - void setTx(TxBuffer* _tx) { tx = _tx; } + u_int64_t getPersistenceId() const { return persistenceId; } + void setPersistenceId(u_int64_t _persistenceId) { persistenceId = _persistenceId; } }; } } diff --git a/cpp/src/qpid/broker/MessageStore.h b/cpp/src/qpid/broker/MessageStore.h index af9dd20079..9db7e81ed7 100644 --- a/cpp/src/qpid/broker/MessageStore.h +++ b/cpp/src/qpid/broker/MessageStore.h @@ -24,12 +24,29 @@ namespace qpid { namespace broker { + class Queue; + class QueueRegistry; + /** * An abstraction of the persistent storage for messages. */ class MessageStore : public TransactionalStore{ public: /** + * Record the existance of a durable queue + */ + virtual void create(const Queue& queue) = 0; + /** + * Destroy a durable queue + */ + virtual void destroy(const Queue& queue) = 0; + + /** + * Request recovery of queue and message state from store + */ + virtual void recover(QueueRegistry& queues) = 0; + + /** * Enqueues a message, storing the message if it has not * been previously stored and recording that the given * message is on the given queue. @@ -40,7 +57,7 @@ namespace qpid { * distributed transaction in which the operation takes * place or null for 'local' transactions */ - virtual void enqueue(Message::shared_ptr& msg, const string& queue, const string * const xid) = 0; + virtual void enqueue(Message::shared_ptr& msg, const Queue& queue, const string * const xid) = 0; /** * Dequeues a message, recording that the given message is * no longer on the given queue and deleting the message @@ -52,7 +69,7 @@ namespace qpid { * distributed transaction in which the operation takes * place or null for 'local' transactions */ - virtual void dequeue(Message::shared_ptr& msg, const string& queue, const string * const xid) = 0; + virtual void dequeue(Message::shared_ptr& msg, const Queue& queue, const string * const xid) = 0; /** * Treat all enqueue/dequeues where this xid was specified as being committed. */ diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index d671cea9a5..8a81b07aef 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -23,13 +23,12 @@ using namespace qpid::broker; using namespace qpid::concurrent; -Queue::Queue(const string& _name, bool _durable, u_int32_t _autodelete, +Queue::Queue(const string& _name, u_int32_t _autodelete, MessageStore* const _store, const ConnectionToken* const _owner) : name(_name), autodelete(_autodelete), - durable(_durable), store(_store), owner(_owner), queueing(false), @@ -166,12 +165,26 @@ bool Queue::canAutoDelete() const{ void Queue::enqueue(Message::shared_ptr& msg, const string * const xid){ if(store){ - store->enqueue(msg, name, xid); + store->enqueue(msg, *this, xid); } } void Queue::dequeue(Message::shared_ptr& msg, const string * const xid){ if(store){ - store->dequeue(msg, name, xid); + store->dequeue(msg, *this, xid); + } +} + +void Queue::create() +{ + if(store){ + store->create(*this); + } +} + +void Queue::destroy() +{ + if(store){ + store->destroy(*this); } } diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index edc7c99b4f..393ca6b196 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -47,7 +47,6 @@ namespace qpid { class Queue{ const string name; const u_int32_t autodelete; - const bool durable; MessageStore* const store; const ConnectionToken* const owner; std::vector<Consumer*> consumers; @@ -69,10 +68,13 @@ namespace qpid { typedef std::vector<shared_ptr> vector; - Queue(const string& name, bool durable = false, u_int32_t autodelete = 0, + Queue(const string& name, u_int32_t autodelete = 0, MessageStore* const store = 0, const ConnectionToken* const owner = 0); ~Queue(); + + void create(); + void destroy(); /** * Informs the queue of a binding that should be cancelled on * destruction of the queue. diff --git a/cpp/src/qpid/broker/QueueRegistry.cpp b/cpp/src/qpid/broker/QueueRegistry.cpp index 949c194bbe..56452ca907 100644 --- a/cpp/src/qpid/broker/QueueRegistry.cpp +++ b/cpp/src/qpid/broker/QueueRegistry.cpp @@ -24,20 +24,20 @@ using namespace qpid::broker; using namespace qpid::concurrent; -QueueRegistry::QueueRegistry() : counter(1){} +QueueRegistry::QueueRegistry(MessageStore* const _store) : counter(1), store(_store){} QueueRegistry::~QueueRegistry(){} std::pair<Queue::shared_ptr, bool> -QueueRegistry::declare(const string& declareName, bool durable, u_int32_t autoDelete, - MessageStore* const store, const ConnectionToken* owner) +QueueRegistry::declare(const string& declareName, bool durable, + u_int32_t autoDelete, const ConnectionToken* owner) { Locker locker(lock); string name = declareName.empty() ? generateName() : declareName; assert(!name.empty()); QueueMap::iterator i = queues.find(name); if (i == queues.end()) { - Queue::shared_ptr queue(new Queue(name, durable, autoDelete, store, owner)); + Queue::shared_ptr queue(new Queue(name, autoDelete, durable ? store : 0, owner)); queues[name] = queue; return std::pair<Queue::shared_ptr, bool>(queue, true); } else { diff --git a/cpp/src/qpid/broker/QueueRegistry.h b/cpp/src/qpid/broker/QueueRegistry.h index 4f9e4b882a..fb22ef148a 100644 --- a/cpp/src/qpid/broker/QueueRegistry.h +++ b/cpp/src/qpid/broker/QueueRegistry.h @@ -37,7 +37,7 @@ class SessionHandlerImpl; class QueueRegistry{ public: - QueueRegistry(); + QueueRegistry(MessageStore* const store = 0); ~QueueRegistry(); /** @@ -47,7 +47,6 @@ class QueueRegistry{ * was created by this declare call false if it already existed. */ std::pair<Queue::shared_ptr, bool> declare(const string& name, bool durable = false, u_int32_t autodelete = 0, - MessageStore* const _store = 0, const ConnectionToken* const owner = 0); /** @@ -79,7 +78,7 @@ class QueueRegistry{ QueueMap queues; qpid::concurrent::Monitor lock; int counter; - + MessageStore* const store; }; diff --git a/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp b/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp index 9883c94a25..76723881dc 100644 --- a/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp +++ b/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp @@ -33,7 +33,9 @@ const std::string amq_fanout("amq.fanout"); const std::string amq_match("amq.match"); } -SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(u_int32_t _timeout) : timeout(_timeout), cleaner(&queues, timeout/10){ +SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(u_int32_t _timeout) : + queues(store.get()), timeout(_timeout), cleaner(&queues, timeout/10) +{ exchanges.declare(empty, DirectExchange::typeName); // Default exchange. exchanges.declare(amq_direct, DirectExchange::typeName); exchanges.declare(amq_topic, TopicExchange::typeName); @@ -42,10 +44,17 @@ SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(u_int32_t _timeout) : timeo cleaner.start(); } -SessionHandler* SessionHandlerFactoryImpl::create(SessionContext* ctxt){ +void SessionHandlerFactoryImpl::recover() +{ + if(store.get()) store->recover(queues); +} + +SessionHandler* SessionHandlerFactoryImpl::create(SessionContext* ctxt) +{ return new SessionHandlerImpl(ctxt, &queues, &exchanges, &cleaner, timeout); } -SessionHandlerFactoryImpl::~SessionHandlerFactoryImpl(){ +SessionHandlerFactoryImpl::~SessionHandlerFactoryImpl() +{ cleaner.stop(); } diff --git a/cpp/src/qpid/broker/SessionHandlerFactoryImpl.h b/cpp/src/qpid/broker/SessionHandlerFactoryImpl.h index 5bad81412b..cea5c0fa00 100644 --- a/cpp/src/qpid/broker/SessionHandlerFactoryImpl.h +++ b/cpp/src/qpid/broker/SessionHandlerFactoryImpl.h @@ -18,27 +18,31 @@ #ifndef _SessionHandlerFactoryImpl_ #define _SessionHandlerFactoryImpl_ -#include "qpid/framing/AMQFrame.h" #include "qpid/broker/AutoDelete.h" #include "qpid/broker/ExchangeRegistry.h" -#include "qpid/framing/ProtocolInitiation.h" +#include "qpid/broker/MessageStore.h" #include "qpid/broker/QueueRegistry.h" -#include "qpid/io/SessionHandlerFactory.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/ProtocolInitiation.h" #include "qpid/io/SessionContext.h" #include "qpid/io/SessionHandler.h" +#include "qpid/io/SessionHandlerFactory.h" #include "qpid/io/TimeoutHandler.h" +#include <memory> namespace qpid { namespace broker { class SessionHandlerFactoryImpl : public virtual qpid::io::SessionHandlerFactory { + std::auto_ptr<MessageStore> store; QueueRegistry queues; ExchangeRegistry exchanges; const u_int32_t timeout;//timeout for auto-deleted queues (in ms) AutoDelete cleaner; public: SessionHandlerFactoryImpl(u_int32_t timeout = 30000); + void recover(); virtual qpid::io::SessionHandler* create(qpid::io::SessionContext* ctxt); virtual ~SessionHandlerFactoryImpl(); }; diff --git a/cpp/src/qpid/broker/SessionHandlerImpl.cpp b/cpp/src/qpid/broker/SessionHandlerImpl.cpp index 7c94a65d73..7a03132671 100644 --- a/cpp/src/qpid/broker/SessionHandlerImpl.cpp +++ b/cpp/src/qpid/broker/SessionHandlerImpl.cpp @@ -250,24 +250,28 @@ void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t queue = parent->getQueue(name, channel); } else { std::pair<Queue::shared_ptr, bool> queue_created = - parent->queues->declare(name, durable, autoDelete ? parent->timeout : 0, 0, exclusive ? parent : 0); + parent->queues->declare(name, durable, autoDelete ? parent->timeout : 0, exclusive ? parent : 0); queue = queue_created.first; assert(queue); if (queue_created.second) { // This is a new queue parent->getChannel(channel)->setDefaultQueue(queue); + + //create persistent record if required + queue_created.first->create(); + //add default binding: parent->exchanges->getDefault()->bind(queue, name, 0); - if(exclusive){ + if (exclusive) { parent->exclusiveQueues.push_back(queue); } else if(autoDelete){ parent->cleaner->add(queue); } } } - if(exclusive && !queue->isExclusiveOwner(parent)){ + if (exclusive && !queue->isExclusiveOwner(parent)) { throw ChannelException(405, "Cannot grant exclusive access to queue"); } - if(!nowait){ + if (!nowait) { name = queue->getName(); parent->client.getQueue().declareOk(channel, name, queue->getMessageCount(), queue->getConsumerCount()); } @@ -311,6 +315,7 @@ void SessionHandlerImpl::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t if(i < parent->exclusiveQueues.end()) parent->exclusiveQueues.erase(i); } count = q->getMessageCount(); + q->destroy(); parent->queues->destroy(queue); } if(!nowait) parent->client.getQueue().deleteOk(channel, count); diff --git a/cpp/src/qpid/framing/BasicHeaderProperties.h b/cpp/src/qpid/framing/BasicHeaderProperties.h index e82699753b..3450782875 100644 --- a/cpp/src/qpid/framing/BasicHeaderProperties.h +++ b/cpp/src/qpid/framing/BasicHeaderProperties.h @@ -25,6 +25,7 @@ namespace qpid { namespace framing { + enum delivery_mode {TRANSIENT = 1, PERSISTENT = 2}; //TODO: This could be easily generated from the spec class BasicHeaderProperties : public HeaderProperties diff --git a/cpp/test/unit/qpid/broker/ConfigurationTest.cpp b/cpp/test/unit/qpid/broker/ConfigurationTest.cpp index 7acee7c8b9..8fd252a9a9 100644 --- a/cpp/test/unit/qpid/broker/ConfigurationTest.cpp +++ b/cpp/test/unit/qpid/broker/ConfigurationTest.cpp @@ -60,8 +60,8 @@ class ConfigurationTest : public CppUnit::TestCase void testVarious() { Configuration conf; - char* argv[] = {"ignore", "-t", "--worker-threads", "10", "-a", "blocking"}; - conf.parse(6, argv); + char* argv[] = {"ignore", "-t", "--worker-threads", "10"}; + conf.parse(4, argv); CPPUNIT_ASSERT_EQUAL(5672, conf.getPort());//default CPPUNIT_ASSERT_EQUAL(10, conf.getWorkerThreads()); CPPUNIT_ASSERT(conf.isTrace()); diff --git a/cpp/test/unit/qpid/broker/ExchangeTest.cpp b/cpp/test/unit/qpid/broker/ExchangeTest.cpp index 2fb525312b..14fb6fc097 100644 --- a/cpp/test/unit/qpid/broker/ExchangeTest.cpp +++ b/cpp/test/unit/qpid/broker/ExchangeTest.cpp @@ -37,8 +37,8 @@ class ExchangeTest : public CppUnit::TestCase void testMe() { - Queue::shared_ptr queue(new Queue("queue", true, true)); - Queue::shared_ptr queue2(new Queue("queue2", true, true)); + Queue::shared_ptr queue(new Queue("queue", true)); + Queue::shared_ptr queue2(new Queue("queue2", true)); TopicExchange topic("topic"); topic.bind(queue, "abc", 0); diff --git a/cpp/test/unit/qpid/broker/QueueTest.cpp b/cpp/test/unit/qpid/broker/QueueTest.cpp index 5b06cb93ca..ba1427a087 100644 --- a/cpp/test/unit/qpid/broker/QueueTest.cpp +++ b/cpp/test/unit/qpid/broker/QueueTest.cpp @@ -52,7 +52,7 @@ class QueueTest : public CppUnit::TestCase public: void testConsumers(){ - Queue::shared_ptr queue(new Queue("my_queue", true, true)); + Queue::shared_ptr queue(new Queue("my_queue", true)); //Test adding consumers: TestConsumer c1; @@ -84,7 +84,7 @@ class QueueTest : public CppUnit::TestCase } void testBinding(){ - Queue::shared_ptr queue(new Queue("my_queue", true, true)); + Queue::shared_ptr queue(new Queue("my_queue", true)); //Test bindings: TestBinding a; TestBinding b; @@ -118,7 +118,7 @@ class QueueTest : public CppUnit::TestCase } void testDequeue(){ - Queue::shared_ptr queue(new Queue("my_queue", true, true)); + Queue::shared_ptr queue(new Queue("my_queue", true)); Message::shared_ptr msg1 = Message::shared_ptr(new Message(0, "e", "A", true, true)); Message::shared_ptr msg2 = Message::shared_ptr(new Message(0, "e", "B", true, true)); diff --git a/cpp/test/unit/qpid/broker/TxAckTest.cpp b/cpp/test/unit/qpid/broker/TxAckTest.cpp index b787c5793b..ab1e607e87 100644 --- a/cpp/test/unit/qpid/broker/TxAckTest.cpp +++ b/cpp/test/unit/qpid/broker/TxAckTest.cpp @@ -16,6 +16,7 @@ * */ #include "qpid/broker/MessageStore.h" +#include "qpid/broker/QueueRegistry.h" #include "qpid/broker/TxAck.h" #include <qpid_test_plugin.h> #include <iostream> @@ -29,81 +30,84 @@ using namespace qpid::broker; class TxAckTest : public CppUnit::TestCase { - class TestMessageStore : public MessageStore - { - public: - vector<Message::shared_ptr> dequeued; - - void dequeue(Message::shared_ptr& msg, const string& /*queue*/, const string * const /*xid*/) - { - dequeued.push_back(msg); - } - - //dont care about any of the other methods: - void enqueue(Message::shared_ptr&, const string&, const string * const){} - void committed(const string * const){} - void aborted(const string * const){} - void begin(){} - void commit(){} - void abort(){} - ~TestMessageStore(){} - }; - - CPPUNIT_TEST_SUITE(TxAckTest); - CPPUNIT_TEST(testPrepare); - CPPUNIT_TEST(testCommit); - CPPUNIT_TEST_SUITE_END(); - - - AccumulatedAck acked; - TestMessageStore store; - Queue::shared_ptr queue; - vector<Message::shared_ptr> messages; - list<DeliveryRecord> deliveries; - TxAck op; - - + class TestMessageStore : public MessageStore + { public: + vector<Message::shared_ptr> dequeued; - TxAckTest() : queue(new Queue("my_queue", true, false, &store, 0)), op(acked, deliveries) + void dequeue(Message::shared_ptr& msg, const Queue& /*queue*/, const string * const /*xid*/) { - for(int i = 0; i < 10; i++){ - Message::shared_ptr msg(new Message(0, "exchange", "routing_key", false, false)); - messages.push_back(msg); - deliveries.push_back(DeliveryRecord(msg, queue, "xyz", (i+1))); - } - - //assume msgs 1-5, 7 and 9 are all acked (i.e. 6, 8 & 10 are not) - acked.range = 5; - acked.individual.push_back(7); - acked.individual.push_back(9); - } - - void testPrepare() - { - //ensure acked messages are discarded, i.e. dequeued from store - op.prepare(); - CPPUNIT_ASSERT_EQUAL((size_t) 7, store.dequeued.size()); - CPPUNIT_ASSERT_EQUAL((size_t) 10, deliveries.size()); - CPPUNIT_ASSERT_EQUAL(messages[0], store.dequeued[0]);//msg 1 - CPPUNIT_ASSERT_EQUAL(messages[1], store.dequeued[1]);//msg 2 - CPPUNIT_ASSERT_EQUAL(messages[2], store.dequeued[2]);//msg 3 - CPPUNIT_ASSERT_EQUAL(messages[3], store.dequeued[3]);//msg 4 - CPPUNIT_ASSERT_EQUAL(messages[4], store.dequeued[4]);//msg 5 - CPPUNIT_ASSERT_EQUAL(messages[6], store.dequeued[5]);//msg 7 - CPPUNIT_ASSERT_EQUAL(messages[8], store.dequeued[6]);//msg 9 + dequeued.push_back(msg); } - void testCommit() - { - //emsure acked messages are removed from list - op.commit(); - CPPUNIT_ASSERT_EQUAL((size_t) 3, deliveries.size()); - list<DeliveryRecord>::iterator i = deliveries.begin(); - CPPUNIT_ASSERT(i->matches(6));//msg 6 - CPPUNIT_ASSERT((++i)->matches(8));//msg 8 - CPPUNIT_ASSERT((++i)->matches(10));//msg 10 + //dont care about any of the other methods: + void create(const Queue&){} + void destroy(const Queue&){} + void recover(QueueRegistry&){} + void enqueue(Message::shared_ptr&, const Queue&, const string * const){} + void committed(const string * const){} + void aborted(const string * const){} + void begin(){} + void commit(){} + void abort(){} + ~TestMessageStore(){} + }; + + CPPUNIT_TEST_SUITE(TxAckTest); + CPPUNIT_TEST(testPrepare); + CPPUNIT_TEST(testCommit); + CPPUNIT_TEST_SUITE_END(); + + + AccumulatedAck acked; + TestMessageStore store; + Queue::shared_ptr queue; + vector<Message::shared_ptr> messages; + list<DeliveryRecord> deliveries; + TxAck op; + + +public: + + TxAckTest() : queue(new Queue("my_queue", false, &store, 0)), op(acked, deliveries) + { + for(int i = 0; i < 10; i++){ + Message::shared_ptr msg(new Message(0, "exchange", "routing_key", false, false)); + messages.push_back(msg); + deliveries.push_back(DeliveryRecord(msg, queue, "xyz", (i+1))); } + + //assume msgs 1-5, 7 and 9 are all acked (i.e. 6, 8 & 10 are not) + acked.range = 5; + acked.individual.push_back(7); + acked.individual.push_back(9); + } + + void testPrepare() + { + //ensure acked messages are discarded, i.e. dequeued from store + op.prepare(); + CPPUNIT_ASSERT_EQUAL((size_t) 7, store.dequeued.size()); + CPPUNIT_ASSERT_EQUAL((size_t) 10, deliveries.size()); + CPPUNIT_ASSERT_EQUAL(messages[0], store.dequeued[0]);//msg 1 + CPPUNIT_ASSERT_EQUAL(messages[1], store.dequeued[1]);//msg 2 + CPPUNIT_ASSERT_EQUAL(messages[2], store.dequeued[2]);//msg 3 + CPPUNIT_ASSERT_EQUAL(messages[3], store.dequeued[3]);//msg 4 + CPPUNIT_ASSERT_EQUAL(messages[4], store.dequeued[4]);//msg 5 + CPPUNIT_ASSERT_EQUAL(messages[6], store.dequeued[5]);//msg 7 + CPPUNIT_ASSERT_EQUAL(messages[8], store.dequeued[6]);//msg 9 + } + + void testCommit() + { + //emsure acked messages are removed from list + op.commit(); + CPPUNIT_ASSERT_EQUAL((size_t) 3, deliveries.size()); + list<DeliveryRecord>::iterator i = deliveries.begin(); + CPPUNIT_ASSERT(i->matches(6));//msg 6 + CPPUNIT_ASSERT((++i)->matches(8));//msg 8 + CPPUNIT_ASSERT((++i)->matches(10));//msg 10 + } }; // Make this test suite a plugin. diff --git a/cpp/test/unit/qpid/broker/TxPublishTest.cpp b/cpp/test/unit/qpid/broker/TxPublishTest.cpp index b8d3c99cb9..4fe6c7497a 100644 --- a/cpp/test/unit/qpid/broker/TxPublishTest.cpp +++ b/cpp/test/unit/qpid/broker/TxPublishTest.cpp @@ -16,6 +16,7 @@ * */ #include "qpid/broker/MessageStore.h" +#include "qpid/broker/QueueRegistry.h" #include "qpid/broker/TxPublish.h" #include <qpid_test_plugin.h> #include <iostream> @@ -30,71 +31,74 @@ using namespace qpid::broker; class TxPublishTest : public CppUnit::TestCase { - class TestMessageStore : public MessageStore - { - public: - vector< pair<string, Message::shared_ptr> > enqueued; - - void enqueue(Message::shared_ptr& msg, const string& queue, const string * const /*xid*/) - { - enqueued.push_back(pair<string, Message::shared_ptr>(queue,msg)); - } - - //dont care about any of the other methods: - void dequeue(Message::shared_ptr&, const string&, const string * const){} - void committed(const string * const){} - void aborted(const string * const){} - void begin(){} - void commit(){} - void abort(){} - ~TestMessageStore(){} - }; - - CPPUNIT_TEST_SUITE(TxPublishTest); - CPPUNIT_TEST(testPrepare); - CPPUNIT_TEST(testCommit); - CPPUNIT_TEST_SUITE_END(); - - - TestMessageStore store; - Queue::shared_ptr queue1; - Queue::shared_ptr queue2; - Message::shared_ptr msg; - TxPublish op; - - + class TestMessageStore : public MessageStore + { public: - - TxPublishTest() : queue1(new Queue("queue1", true, false, &store, 0)), - queue2(new Queue("queue2", true, false, &store, 0)), - msg(new Message(0, "exchange", "routing_key", false, false)), - op(msg) - { - op.deliverTo(queue1); - op.deliverTo(queue2); - } - - void testPrepare() + vector< pair<string, Message::shared_ptr> > enqueued; + + void enqueue(Message::shared_ptr& msg, const Queue& queue, const string * const /*xid*/) { - //ensure messages are enqueued in store - op.prepare(); - CPPUNIT_ASSERT_EQUAL((size_t) 2, store.enqueued.size()); - CPPUNIT_ASSERT_EQUAL(string("queue1"), store.enqueued[0].first); - CPPUNIT_ASSERT_EQUAL(msg, store.enqueued[0].second); - CPPUNIT_ASSERT_EQUAL(string("queue2"), store.enqueued[1].first); - CPPUNIT_ASSERT_EQUAL(msg, store.enqueued[1].second); - } - - void testCommit() - { - //ensure messages are delivered to queue - op.commit(); - CPPUNIT_ASSERT_EQUAL((u_int32_t) 1, queue1->getMessageCount()); - CPPUNIT_ASSERT_EQUAL(msg, queue1->dequeue()); - - CPPUNIT_ASSERT_EQUAL((u_int32_t) 1, queue2->getMessageCount()); - CPPUNIT_ASSERT_EQUAL(msg, queue2->dequeue()); + enqueued.push_back(pair<string, Message::shared_ptr>(queue.getName(),msg)); } + + //dont care about any of the other methods: + void create(const Queue&){} + void destroy(const Queue&){} + void recover(QueueRegistry&){} + void dequeue(Message::shared_ptr&, const Queue&, const string * const){} + void committed(const string * const){} + void aborted(const string * const){} + void begin(){} + void commit(){} + void abort(){} + ~TestMessageStore(){} + }; + + CPPUNIT_TEST_SUITE(TxPublishTest); + CPPUNIT_TEST(testPrepare); + CPPUNIT_TEST(testCommit); + CPPUNIT_TEST_SUITE_END(); + + + TestMessageStore store; + Queue::shared_ptr queue1; + Queue::shared_ptr queue2; + Message::shared_ptr msg; + TxPublish op; + + +public: + + TxPublishTest() : queue1(new Queue("queue1", false, &store, 0)), + queue2(new Queue("queue2", false, &store, 0)), + msg(new Message(0, "exchange", "routing_key", false, false)), + op(msg) + { + op.deliverTo(queue1); + op.deliverTo(queue2); + } + + void testPrepare() + { + //ensure messages are enqueued in store + op.prepare(); + CPPUNIT_ASSERT_EQUAL((size_t) 2, store.enqueued.size()); + CPPUNIT_ASSERT_EQUAL(string("queue1"), store.enqueued[0].first); + CPPUNIT_ASSERT_EQUAL(msg, store.enqueued[0].second); + CPPUNIT_ASSERT_EQUAL(string("queue2"), store.enqueued[1].first); + CPPUNIT_ASSERT_EQUAL(msg, store.enqueued[1].second); + } + + void testCommit() + { + //ensure messages are delivered to queue + op.commit(); + CPPUNIT_ASSERT_EQUAL((u_int32_t) 1, queue1->getMessageCount()); + CPPUNIT_ASSERT_EQUAL(msg, queue1->dequeue()); + + CPPUNIT_ASSERT_EQUAL((u_int32_t) 1, queue2->getMessageCount()); + CPPUNIT_ASSERT_EQUAL(msg, queue2->dequeue()); + } }; // Make this test suite a plugin. |