summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2006-11-03 13:44:21 +0000
committerGordon Sim <gsim@apache.org>2006-11-03 13:44:21 +0000
commit15a915878c787e6d5ed8330a8dd4375ec885a6c0 (patch)
treee580dbbeb7c61c4dc1a1348156c1970e09311b10
parent20b96a39f539bf5181a58d1235f521d6a544bc47 (diff)
downloadqpid-python-15a915878c787e6d5ed8330a8dd4375ec885a6c0.tar.gz
Added some methods to MessageStore interface and hooked these in where appropriate.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@470810 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/qpid/broker/Message.cpp9
-rw-r--r--cpp/src/qpid/broker/Message.h7
-rw-r--r--cpp/src/qpid/broker/MessageStore.h21
-rw-r--r--cpp/src/qpid/broker/Queue.cpp21
-rw-r--r--cpp/src/qpid/broker/Queue.h6
-rw-r--r--cpp/src/qpid/broker/QueueRegistry.cpp8
-rw-r--r--cpp/src/qpid/broker/QueueRegistry.h5
-rw-r--r--cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp15
-rw-r--r--cpp/src/qpid/broker/SessionHandlerFactoryImpl.h10
-rw-r--r--cpp/src/qpid/broker/SessionHandlerImpl.cpp13
-rw-r--r--cpp/src/qpid/framing/BasicHeaderProperties.h1
-rw-r--r--cpp/test/unit/qpid/broker/ConfigurationTest.cpp4
-rw-r--r--cpp/test/unit/qpid/broker/ExchangeTest.cpp4
-rw-r--r--cpp/test/unit/qpid/broker/QueueTest.cpp6
-rw-r--r--cpp/test/unit/qpid/broker/TxAckTest.cpp142
-rw-r--r--cpp/test/unit/qpid/broker/TxPublishTest.cpp128
16 files changed, 233 insertions, 167 deletions
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index e96cc65b95..baa1b0d915 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -32,7 +32,8 @@ Message::Message(const ConnectionToken* const _publisher,
mandatory(_mandatory),
immediate(_immediate),
redelivered(false),
- size(0) {}
+ size(0),
+ persistenceId(0) {}
Message::~Message(){}
@@ -92,3 +93,9 @@ const ConnectionToken* const Message::getPublisher(){
return publisher;
}
+bool Message::isPersistent()
+{
+ if(!header) return false;
+ BasicHeaderProperties* props = getHeaderProperties();
+ return props && props->getDeliveryMode() == PERSISTENT;
+}
diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h
index cfe29bdfcf..f9acdfd0a5 100644
--- a/cpp/src/qpid/broker/Message.h
+++ b/cpp/src/qpid/broker/Message.h
@@ -48,7 +48,7 @@ namespace qpid {
qpid::framing::AMQHeaderBody::shared_ptr header;
content_list content;
u_int64_t size;
- TxBuffer* tx;
+ u_int64_t persistenceId;
void sendContent(qpid::framing::OutputHandler* out,
int channel, u_int32_t framesize);
@@ -78,11 +78,12 @@ namespace qpid {
void redeliver();
qpid::framing::BasicHeaderProperties* getHeaderProperties();
+ bool isPersistent();
const string& getRoutingKey() const { return routingKey; }
const string& getExchange() const { return exchange; }
u_int64_t contentSize() const { return size; }
- TxBuffer* getTx() const { return tx; }
- void setTx(TxBuffer* _tx) { tx = _tx; }
+ u_int64_t getPersistenceId() const { return persistenceId; }
+ void setPersistenceId(u_int64_t _persistenceId) { persistenceId = _persistenceId; }
};
}
}
diff --git a/cpp/src/qpid/broker/MessageStore.h b/cpp/src/qpid/broker/MessageStore.h
index af9dd20079..9db7e81ed7 100644
--- a/cpp/src/qpid/broker/MessageStore.h
+++ b/cpp/src/qpid/broker/MessageStore.h
@@ -24,12 +24,29 @@
namespace qpid {
namespace broker {
+ class Queue;
+ class QueueRegistry;
+
/**
* An abstraction of the persistent storage for messages.
*/
class MessageStore : public TransactionalStore{
public:
/**
+ * Record the existance of a durable queue
+ */
+ virtual void create(const Queue& queue) = 0;
+ /**
+ * Destroy a durable queue
+ */
+ virtual void destroy(const Queue& queue) = 0;
+
+ /**
+ * Request recovery of queue and message state from store
+ */
+ virtual void recover(QueueRegistry& queues) = 0;
+
+ /**
* Enqueues a message, storing the message if it has not
* been previously stored and recording that the given
* message is on the given queue.
@@ -40,7 +57,7 @@ namespace qpid {
* distributed transaction in which the operation takes
* place or null for 'local' transactions
*/
- virtual void enqueue(Message::shared_ptr& msg, const string& queue, const string * const xid) = 0;
+ virtual void enqueue(Message::shared_ptr& msg, const Queue& queue, const string * const xid) = 0;
/**
* Dequeues a message, recording that the given message is
* no longer on the given queue and deleting the message
@@ -52,7 +69,7 @@ namespace qpid {
* distributed transaction in which the operation takes
* place or null for 'local' transactions
*/
- virtual void dequeue(Message::shared_ptr& msg, const string& queue, const string * const xid) = 0;
+ virtual void dequeue(Message::shared_ptr& msg, const Queue& queue, const string * const xid) = 0;
/**
* Treat all enqueue/dequeues where this xid was specified as being committed.
*/
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index d671cea9a5..8a81b07aef 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -23,13 +23,12 @@
using namespace qpid::broker;
using namespace qpid::concurrent;
-Queue::Queue(const string& _name, bool _durable, u_int32_t _autodelete,
+Queue::Queue(const string& _name, u_int32_t _autodelete,
MessageStore* const _store,
const ConnectionToken* const _owner) :
name(_name),
autodelete(_autodelete),
- durable(_durable),
store(_store),
owner(_owner),
queueing(false),
@@ -166,12 +165,26 @@ bool Queue::canAutoDelete() const{
void Queue::enqueue(Message::shared_ptr& msg, const string * const xid){
if(store){
- store->enqueue(msg, name, xid);
+ store->enqueue(msg, *this, xid);
}
}
void Queue::dequeue(Message::shared_ptr& msg, const string * const xid){
if(store){
- store->dequeue(msg, name, xid);
+ store->dequeue(msg, *this, xid);
+ }
+}
+
+void Queue::create()
+{
+ if(store){
+ store->create(*this);
+ }
+}
+
+void Queue::destroy()
+{
+ if(store){
+ store->destroy(*this);
}
}
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index edc7c99b4f..393ca6b196 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -47,7 +47,6 @@ namespace qpid {
class Queue{
const string name;
const u_int32_t autodelete;
- const bool durable;
MessageStore* const store;
const ConnectionToken* const owner;
std::vector<Consumer*> consumers;
@@ -69,10 +68,13 @@ namespace qpid {
typedef std::vector<shared_ptr> vector;
- Queue(const string& name, bool durable = false, u_int32_t autodelete = 0,
+ Queue(const string& name, u_int32_t autodelete = 0,
MessageStore* const store = 0,
const ConnectionToken* const owner = 0);
~Queue();
+
+ void create();
+ void destroy();
/**
* Informs the queue of a binding that should be cancelled on
* destruction of the queue.
diff --git a/cpp/src/qpid/broker/QueueRegistry.cpp b/cpp/src/qpid/broker/QueueRegistry.cpp
index 949c194bbe..56452ca907 100644
--- a/cpp/src/qpid/broker/QueueRegistry.cpp
+++ b/cpp/src/qpid/broker/QueueRegistry.cpp
@@ -24,20 +24,20 @@
using namespace qpid::broker;
using namespace qpid::concurrent;
-QueueRegistry::QueueRegistry() : counter(1){}
+QueueRegistry::QueueRegistry(MessageStore* const _store) : counter(1), store(_store){}
QueueRegistry::~QueueRegistry(){}
std::pair<Queue::shared_ptr, bool>
-QueueRegistry::declare(const string& declareName, bool durable, u_int32_t autoDelete,
- MessageStore* const store, const ConnectionToken* owner)
+QueueRegistry::declare(const string& declareName, bool durable,
+ u_int32_t autoDelete, const ConnectionToken* owner)
{
Locker locker(lock);
string name = declareName.empty() ? generateName() : declareName;
assert(!name.empty());
QueueMap::iterator i = queues.find(name);
if (i == queues.end()) {
- Queue::shared_ptr queue(new Queue(name, durable, autoDelete, store, owner));
+ Queue::shared_ptr queue(new Queue(name, autoDelete, durable ? store : 0, owner));
queues[name] = queue;
return std::pair<Queue::shared_ptr, bool>(queue, true);
} else {
diff --git a/cpp/src/qpid/broker/QueueRegistry.h b/cpp/src/qpid/broker/QueueRegistry.h
index 4f9e4b882a..fb22ef148a 100644
--- a/cpp/src/qpid/broker/QueueRegistry.h
+++ b/cpp/src/qpid/broker/QueueRegistry.h
@@ -37,7 +37,7 @@ class SessionHandlerImpl;
class QueueRegistry{
public:
- QueueRegistry();
+ QueueRegistry(MessageStore* const store = 0);
~QueueRegistry();
/**
@@ -47,7 +47,6 @@ class QueueRegistry{
* was created by this declare call false if it already existed.
*/
std::pair<Queue::shared_ptr, bool> declare(const string& name, bool durable = false, u_int32_t autodelete = 0,
- MessageStore* const _store = 0,
const ConnectionToken* const owner = 0);
/**
@@ -79,7 +78,7 @@ class QueueRegistry{
QueueMap queues;
qpid::concurrent::Monitor lock;
int counter;
-
+ MessageStore* const store;
};
diff --git a/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp b/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp
index 9883c94a25..76723881dc 100644
--- a/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp
+++ b/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp
@@ -33,7 +33,9 @@ const std::string amq_fanout("amq.fanout");
const std::string amq_match("amq.match");
}
-SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(u_int32_t _timeout) : timeout(_timeout), cleaner(&queues, timeout/10){
+SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(u_int32_t _timeout) :
+ queues(store.get()), timeout(_timeout), cleaner(&queues, timeout/10)
+{
exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
exchanges.declare(amq_direct, DirectExchange::typeName);
exchanges.declare(amq_topic, TopicExchange::typeName);
@@ -42,10 +44,17 @@ SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(u_int32_t _timeout) : timeo
cleaner.start();
}
-SessionHandler* SessionHandlerFactoryImpl::create(SessionContext* ctxt){
+void SessionHandlerFactoryImpl::recover()
+{
+ if(store.get()) store->recover(queues);
+}
+
+SessionHandler* SessionHandlerFactoryImpl::create(SessionContext* ctxt)
+{
return new SessionHandlerImpl(ctxt, &queues, &exchanges, &cleaner, timeout);
}
-SessionHandlerFactoryImpl::~SessionHandlerFactoryImpl(){
+SessionHandlerFactoryImpl::~SessionHandlerFactoryImpl()
+{
cleaner.stop();
}
diff --git a/cpp/src/qpid/broker/SessionHandlerFactoryImpl.h b/cpp/src/qpid/broker/SessionHandlerFactoryImpl.h
index 5bad81412b..cea5c0fa00 100644
--- a/cpp/src/qpid/broker/SessionHandlerFactoryImpl.h
+++ b/cpp/src/qpid/broker/SessionHandlerFactoryImpl.h
@@ -18,27 +18,31 @@
#ifndef _SessionHandlerFactoryImpl_
#define _SessionHandlerFactoryImpl_
-#include "qpid/framing/AMQFrame.h"
#include "qpid/broker/AutoDelete.h"
#include "qpid/broker/ExchangeRegistry.h"
-#include "qpid/framing/ProtocolInitiation.h"
+#include "qpid/broker/MessageStore.h"
#include "qpid/broker/QueueRegistry.h"
-#include "qpid/io/SessionHandlerFactory.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/io/SessionContext.h"
#include "qpid/io/SessionHandler.h"
+#include "qpid/io/SessionHandlerFactory.h"
#include "qpid/io/TimeoutHandler.h"
+#include <memory>
namespace qpid {
namespace broker {
class SessionHandlerFactoryImpl : public virtual qpid::io::SessionHandlerFactory
{
+ std::auto_ptr<MessageStore> store;
QueueRegistry queues;
ExchangeRegistry exchanges;
const u_int32_t timeout;//timeout for auto-deleted queues (in ms)
AutoDelete cleaner;
public:
SessionHandlerFactoryImpl(u_int32_t timeout = 30000);
+ void recover();
virtual qpid::io::SessionHandler* create(qpid::io::SessionContext* ctxt);
virtual ~SessionHandlerFactoryImpl();
};
diff --git a/cpp/src/qpid/broker/SessionHandlerImpl.cpp b/cpp/src/qpid/broker/SessionHandlerImpl.cpp
index 7c94a65d73..7a03132671 100644
--- a/cpp/src/qpid/broker/SessionHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/SessionHandlerImpl.cpp
@@ -250,24 +250,28 @@ void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t
queue = parent->getQueue(name, channel);
} else {
std::pair<Queue::shared_ptr, bool> queue_created =
- parent->queues->declare(name, durable, autoDelete ? parent->timeout : 0, 0, exclusive ? parent : 0);
+ parent->queues->declare(name, durable, autoDelete ? parent->timeout : 0, exclusive ? parent : 0);
queue = queue_created.first;
assert(queue);
if (queue_created.second) { // This is a new queue
parent->getChannel(channel)->setDefaultQueue(queue);
+
+ //create persistent record if required
+ queue_created.first->create();
+
//add default binding:
parent->exchanges->getDefault()->bind(queue, name, 0);
- if(exclusive){
+ if (exclusive) {
parent->exclusiveQueues.push_back(queue);
} else if(autoDelete){
parent->cleaner->add(queue);
}
}
}
- if(exclusive && !queue->isExclusiveOwner(parent)){
+ if (exclusive && !queue->isExclusiveOwner(parent)) {
throw ChannelException(405, "Cannot grant exclusive access to queue");
}
- if(!nowait){
+ if (!nowait) {
name = queue->getName();
parent->client.getQueue().declareOk(channel, name, queue->getMessageCount(), queue->getConsumerCount());
}
@@ -311,6 +315,7 @@ void SessionHandlerImpl::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t
if(i < parent->exclusiveQueues.end()) parent->exclusiveQueues.erase(i);
}
count = q->getMessageCount();
+ q->destroy();
parent->queues->destroy(queue);
}
if(!nowait) parent->client.getQueue().deleteOk(channel, count);
diff --git a/cpp/src/qpid/framing/BasicHeaderProperties.h b/cpp/src/qpid/framing/BasicHeaderProperties.h
index e82699753b..3450782875 100644
--- a/cpp/src/qpid/framing/BasicHeaderProperties.h
+++ b/cpp/src/qpid/framing/BasicHeaderProperties.h
@@ -25,6 +25,7 @@
namespace qpid {
namespace framing {
+ enum delivery_mode {TRANSIENT = 1, PERSISTENT = 2};
//TODO: This could be easily generated from the spec
class BasicHeaderProperties : public HeaderProperties
diff --git a/cpp/test/unit/qpid/broker/ConfigurationTest.cpp b/cpp/test/unit/qpid/broker/ConfigurationTest.cpp
index 7acee7c8b9..8fd252a9a9 100644
--- a/cpp/test/unit/qpid/broker/ConfigurationTest.cpp
+++ b/cpp/test/unit/qpid/broker/ConfigurationTest.cpp
@@ -60,8 +60,8 @@ class ConfigurationTest : public CppUnit::TestCase
void testVarious()
{
Configuration conf;
- char* argv[] = {"ignore", "-t", "--worker-threads", "10", "-a", "blocking"};
- conf.parse(6, argv);
+ char* argv[] = {"ignore", "-t", "--worker-threads", "10"};
+ conf.parse(4, argv);
CPPUNIT_ASSERT_EQUAL(5672, conf.getPort());//default
CPPUNIT_ASSERT_EQUAL(10, conf.getWorkerThreads());
CPPUNIT_ASSERT(conf.isTrace());
diff --git a/cpp/test/unit/qpid/broker/ExchangeTest.cpp b/cpp/test/unit/qpid/broker/ExchangeTest.cpp
index 2fb525312b..14fb6fc097 100644
--- a/cpp/test/unit/qpid/broker/ExchangeTest.cpp
+++ b/cpp/test/unit/qpid/broker/ExchangeTest.cpp
@@ -37,8 +37,8 @@ class ExchangeTest : public CppUnit::TestCase
void testMe()
{
- Queue::shared_ptr queue(new Queue("queue", true, true));
- Queue::shared_ptr queue2(new Queue("queue2", true, true));
+ Queue::shared_ptr queue(new Queue("queue", true));
+ Queue::shared_ptr queue2(new Queue("queue2", true));
TopicExchange topic("topic");
topic.bind(queue, "abc", 0);
diff --git a/cpp/test/unit/qpid/broker/QueueTest.cpp b/cpp/test/unit/qpid/broker/QueueTest.cpp
index 5b06cb93ca..ba1427a087 100644
--- a/cpp/test/unit/qpid/broker/QueueTest.cpp
+++ b/cpp/test/unit/qpid/broker/QueueTest.cpp
@@ -52,7 +52,7 @@ class QueueTest : public CppUnit::TestCase
public:
void testConsumers(){
- Queue::shared_ptr queue(new Queue("my_queue", true, true));
+ Queue::shared_ptr queue(new Queue("my_queue", true));
//Test adding consumers:
TestConsumer c1;
@@ -84,7 +84,7 @@ class QueueTest : public CppUnit::TestCase
}
void testBinding(){
- Queue::shared_ptr queue(new Queue("my_queue", true, true));
+ Queue::shared_ptr queue(new Queue("my_queue", true));
//Test bindings:
TestBinding a;
TestBinding b;
@@ -118,7 +118,7 @@ class QueueTest : public CppUnit::TestCase
}
void testDequeue(){
- Queue::shared_ptr queue(new Queue("my_queue", true, true));
+ Queue::shared_ptr queue(new Queue("my_queue", true));
Message::shared_ptr msg1 = Message::shared_ptr(new Message(0, "e", "A", true, true));
Message::shared_ptr msg2 = Message::shared_ptr(new Message(0, "e", "B", true, true));
diff --git a/cpp/test/unit/qpid/broker/TxAckTest.cpp b/cpp/test/unit/qpid/broker/TxAckTest.cpp
index b787c5793b..ab1e607e87 100644
--- a/cpp/test/unit/qpid/broker/TxAckTest.cpp
+++ b/cpp/test/unit/qpid/broker/TxAckTest.cpp
@@ -16,6 +16,7 @@
*
*/
#include "qpid/broker/MessageStore.h"
+#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/TxAck.h"
#include <qpid_test_plugin.h>
#include <iostream>
@@ -29,81 +30,84 @@ using namespace qpid::broker;
class TxAckTest : public CppUnit::TestCase
{
- class TestMessageStore : public MessageStore
- {
- public:
- vector<Message::shared_ptr> dequeued;
-
- void dequeue(Message::shared_ptr& msg, const string& /*queue*/, const string * const /*xid*/)
- {
- dequeued.push_back(msg);
- }
-
- //dont care about any of the other methods:
- void enqueue(Message::shared_ptr&, const string&, const string * const){}
- void committed(const string * const){}
- void aborted(const string * const){}
- void begin(){}
- void commit(){}
- void abort(){}
- ~TestMessageStore(){}
- };
-
- CPPUNIT_TEST_SUITE(TxAckTest);
- CPPUNIT_TEST(testPrepare);
- CPPUNIT_TEST(testCommit);
- CPPUNIT_TEST_SUITE_END();
-
-
- AccumulatedAck acked;
- TestMessageStore store;
- Queue::shared_ptr queue;
- vector<Message::shared_ptr> messages;
- list<DeliveryRecord> deliveries;
- TxAck op;
-
-
+ class TestMessageStore : public MessageStore
+ {
public:
+ vector<Message::shared_ptr> dequeued;
- TxAckTest() : queue(new Queue("my_queue", true, false, &store, 0)), op(acked, deliveries)
+ void dequeue(Message::shared_ptr& msg, const Queue& /*queue*/, const string * const /*xid*/)
{
- for(int i = 0; i < 10; i++){
- Message::shared_ptr msg(new Message(0, "exchange", "routing_key", false, false));
- messages.push_back(msg);
- deliveries.push_back(DeliveryRecord(msg, queue, "xyz", (i+1)));
- }
-
- //assume msgs 1-5, 7 and 9 are all acked (i.e. 6, 8 & 10 are not)
- acked.range = 5;
- acked.individual.push_back(7);
- acked.individual.push_back(9);
- }
-
- void testPrepare()
- {
- //ensure acked messages are discarded, i.e. dequeued from store
- op.prepare();
- CPPUNIT_ASSERT_EQUAL((size_t) 7, store.dequeued.size());
- CPPUNIT_ASSERT_EQUAL((size_t) 10, deliveries.size());
- CPPUNIT_ASSERT_EQUAL(messages[0], store.dequeued[0]);//msg 1
- CPPUNIT_ASSERT_EQUAL(messages[1], store.dequeued[1]);//msg 2
- CPPUNIT_ASSERT_EQUAL(messages[2], store.dequeued[2]);//msg 3
- CPPUNIT_ASSERT_EQUAL(messages[3], store.dequeued[3]);//msg 4
- CPPUNIT_ASSERT_EQUAL(messages[4], store.dequeued[4]);//msg 5
- CPPUNIT_ASSERT_EQUAL(messages[6], store.dequeued[5]);//msg 7
- CPPUNIT_ASSERT_EQUAL(messages[8], store.dequeued[6]);//msg 9
+ dequeued.push_back(msg);
}
- void testCommit()
- {
- //emsure acked messages are removed from list
- op.commit();
- CPPUNIT_ASSERT_EQUAL((size_t) 3, deliveries.size());
- list<DeliveryRecord>::iterator i = deliveries.begin();
- CPPUNIT_ASSERT(i->matches(6));//msg 6
- CPPUNIT_ASSERT((++i)->matches(8));//msg 8
- CPPUNIT_ASSERT((++i)->matches(10));//msg 10
+ //dont care about any of the other methods:
+ void create(const Queue&){}
+ void destroy(const Queue&){}
+ void recover(QueueRegistry&){}
+ void enqueue(Message::shared_ptr&, const Queue&, const string * const){}
+ void committed(const string * const){}
+ void aborted(const string * const){}
+ void begin(){}
+ void commit(){}
+ void abort(){}
+ ~TestMessageStore(){}
+ };
+
+ CPPUNIT_TEST_SUITE(TxAckTest);
+ CPPUNIT_TEST(testPrepare);
+ CPPUNIT_TEST(testCommit);
+ CPPUNIT_TEST_SUITE_END();
+
+
+ AccumulatedAck acked;
+ TestMessageStore store;
+ Queue::shared_ptr queue;
+ vector<Message::shared_ptr> messages;
+ list<DeliveryRecord> deliveries;
+ TxAck op;
+
+
+public:
+
+ TxAckTest() : queue(new Queue("my_queue", false, &store, 0)), op(acked, deliveries)
+ {
+ for(int i = 0; i < 10; i++){
+ Message::shared_ptr msg(new Message(0, "exchange", "routing_key", false, false));
+ messages.push_back(msg);
+ deliveries.push_back(DeliveryRecord(msg, queue, "xyz", (i+1)));
}
+
+ //assume msgs 1-5, 7 and 9 are all acked (i.e. 6, 8 & 10 are not)
+ acked.range = 5;
+ acked.individual.push_back(7);
+ acked.individual.push_back(9);
+ }
+
+ void testPrepare()
+ {
+ //ensure acked messages are discarded, i.e. dequeued from store
+ op.prepare();
+ CPPUNIT_ASSERT_EQUAL((size_t) 7, store.dequeued.size());
+ CPPUNIT_ASSERT_EQUAL((size_t) 10, deliveries.size());
+ CPPUNIT_ASSERT_EQUAL(messages[0], store.dequeued[0]);//msg 1
+ CPPUNIT_ASSERT_EQUAL(messages[1], store.dequeued[1]);//msg 2
+ CPPUNIT_ASSERT_EQUAL(messages[2], store.dequeued[2]);//msg 3
+ CPPUNIT_ASSERT_EQUAL(messages[3], store.dequeued[3]);//msg 4
+ CPPUNIT_ASSERT_EQUAL(messages[4], store.dequeued[4]);//msg 5
+ CPPUNIT_ASSERT_EQUAL(messages[6], store.dequeued[5]);//msg 7
+ CPPUNIT_ASSERT_EQUAL(messages[8], store.dequeued[6]);//msg 9
+ }
+
+ void testCommit()
+ {
+ //emsure acked messages are removed from list
+ op.commit();
+ CPPUNIT_ASSERT_EQUAL((size_t) 3, deliveries.size());
+ list<DeliveryRecord>::iterator i = deliveries.begin();
+ CPPUNIT_ASSERT(i->matches(6));//msg 6
+ CPPUNIT_ASSERT((++i)->matches(8));//msg 8
+ CPPUNIT_ASSERT((++i)->matches(10));//msg 10
+ }
};
// Make this test suite a plugin.
diff --git a/cpp/test/unit/qpid/broker/TxPublishTest.cpp b/cpp/test/unit/qpid/broker/TxPublishTest.cpp
index b8d3c99cb9..4fe6c7497a 100644
--- a/cpp/test/unit/qpid/broker/TxPublishTest.cpp
+++ b/cpp/test/unit/qpid/broker/TxPublishTest.cpp
@@ -16,6 +16,7 @@
*
*/
#include "qpid/broker/MessageStore.h"
+#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/TxPublish.h"
#include <qpid_test_plugin.h>
#include <iostream>
@@ -30,71 +31,74 @@ using namespace qpid::broker;
class TxPublishTest : public CppUnit::TestCase
{
- class TestMessageStore : public MessageStore
- {
- public:
- vector< pair<string, Message::shared_ptr> > enqueued;
-
- void enqueue(Message::shared_ptr& msg, const string& queue, const string * const /*xid*/)
- {
- enqueued.push_back(pair<string, Message::shared_ptr>(queue,msg));
- }
-
- //dont care about any of the other methods:
- void dequeue(Message::shared_ptr&, const string&, const string * const){}
- void committed(const string * const){}
- void aborted(const string * const){}
- void begin(){}
- void commit(){}
- void abort(){}
- ~TestMessageStore(){}
- };
-
- CPPUNIT_TEST_SUITE(TxPublishTest);
- CPPUNIT_TEST(testPrepare);
- CPPUNIT_TEST(testCommit);
- CPPUNIT_TEST_SUITE_END();
-
-
- TestMessageStore store;
- Queue::shared_ptr queue1;
- Queue::shared_ptr queue2;
- Message::shared_ptr msg;
- TxPublish op;
-
-
+ class TestMessageStore : public MessageStore
+ {
public:
-
- TxPublishTest() : queue1(new Queue("queue1", true, false, &store, 0)),
- queue2(new Queue("queue2", true, false, &store, 0)),
- msg(new Message(0, "exchange", "routing_key", false, false)),
- op(msg)
- {
- op.deliverTo(queue1);
- op.deliverTo(queue2);
- }
-
- void testPrepare()
+ vector< pair<string, Message::shared_ptr> > enqueued;
+
+ void enqueue(Message::shared_ptr& msg, const Queue& queue, const string * const /*xid*/)
{
- //ensure messages are enqueued in store
- op.prepare();
- CPPUNIT_ASSERT_EQUAL((size_t) 2, store.enqueued.size());
- CPPUNIT_ASSERT_EQUAL(string("queue1"), store.enqueued[0].first);
- CPPUNIT_ASSERT_EQUAL(msg, store.enqueued[0].second);
- CPPUNIT_ASSERT_EQUAL(string("queue2"), store.enqueued[1].first);
- CPPUNIT_ASSERT_EQUAL(msg, store.enqueued[1].second);
- }
-
- void testCommit()
- {
- //ensure messages are delivered to queue
- op.commit();
- CPPUNIT_ASSERT_EQUAL((u_int32_t) 1, queue1->getMessageCount());
- CPPUNIT_ASSERT_EQUAL(msg, queue1->dequeue());
-
- CPPUNIT_ASSERT_EQUAL((u_int32_t) 1, queue2->getMessageCount());
- CPPUNIT_ASSERT_EQUAL(msg, queue2->dequeue());
+ enqueued.push_back(pair<string, Message::shared_ptr>(queue.getName(),msg));
}
+
+ //dont care about any of the other methods:
+ void create(const Queue&){}
+ void destroy(const Queue&){}
+ void recover(QueueRegistry&){}
+ void dequeue(Message::shared_ptr&, const Queue&, const string * const){}
+ void committed(const string * const){}
+ void aborted(const string * const){}
+ void begin(){}
+ void commit(){}
+ void abort(){}
+ ~TestMessageStore(){}
+ };
+
+ CPPUNIT_TEST_SUITE(TxPublishTest);
+ CPPUNIT_TEST(testPrepare);
+ CPPUNIT_TEST(testCommit);
+ CPPUNIT_TEST_SUITE_END();
+
+
+ TestMessageStore store;
+ Queue::shared_ptr queue1;
+ Queue::shared_ptr queue2;
+ Message::shared_ptr msg;
+ TxPublish op;
+
+
+public:
+
+ TxPublishTest() : queue1(new Queue("queue1", false, &store, 0)),
+ queue2(new Queue("queue2", false, &store, 0)),
+ msg(new Message(0, "exchange", "routing_key", false, false)),
+ op(msg)
+ {
+ op.deliverTo(queue1);
+ op.deliverTo(queue2);
+ }
+
+ void testPrepare()
+ {
+ //ensure messages are enqueued in store
+ op.prepare();
+ CPPUNIT_ASSERT_EQUAL((size_t) 2, store.enqueued.size());
+ CPPUNIT_ASSERT_EQUAL(string("queue1"), store.enqueued[0].first);
+ CPPUNIT_ASSERT_EQUAL(msg, store.enqueued[0].second);
+ CPPUNIT_ASSERT_EQUAL(string("queue2"), store.enqueued[1].first);
+ CPPUNIT_ASSERT_EQUAL(msg, store.enqueued[1].second);
+ }
+
+ void testCommit()
+ {
+ //ensure messages are delivered to queue
+ op.commit();
+ CPPUNIT_ASSERT_EQUAL((u_int32_t) 1, queue1->getMessageCount());
+ CPPUNIT_ASSERT_EQUAL(msg, queue1->dequeue());
+
+ CPPUNIT_ASSERT_EQUAL((u_int32_t) 1, queue2->getMessageCount());
+ CPPUNIT_ASSERT_EQUAL(msg, queue2->dequeue());
+ }
};
// Make this test suite a plugin.