diff options
author | Gordon Sim <gsim@apache.org> | 2006-11-03 13:44:21 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2006-11-03 13:44:21 +0000 |
commit | 15a915878c787e6d5ed8330a8dd4375ec885a6c0 (patch) | |
tree | e580dbbeb7c61c4dc1a1348156c1970e09311b10 /cpp/src | |
parent | 20b96a39f539bf5181a58d1235f521d6a544bc47 (diff) | |
download | qpid-python-15a915878c787e6d5ed8330a8dd4375ec885a6c0.tar.gz |
Added some methods to MessageStore interface and hooked these in where appropriate.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@470810 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/Message.cpp | 9 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Message.h | 7 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageStore.h | 21 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 21 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/broker/QueueRegistry.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/broker/QueueRegistry.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp | 15 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionHandlerFactoryImpl.h | 10 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionHandlerImpl.cpp | 13 | ||||
-rw-r--r-- | cpp/src/qpid/framing/BasicHeaderProperties.h | 1 |
11 files changed, 87 insertions, 29 deletions
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index e96cc65b95..baa1b0d915 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -32,7 +32,8 @@ Message::Message(const ConnectionToken* const _publisher, mandatory(_mandatory), immediate(_immediate), redelivered(false), - size(0) {} + size(0), + persistenceId(0) {} Message::~Message(){} @@ -92,3 +93,9 @@ const ConnectionToken* const Message::getPublisher(){ return publisher; } +bool Message::isPersistent() +{ + if(!header) return false; + BasicHeaderProperties* props = getHeaderProperties(); + return props && props->getDeliveryMode() == PERSISTENT; +} diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h index cfe29bdfcf..f9acdfd0a5 100644 --- a/cpp/src/qpid/broker/Message.h +++ b/cpp/src/qpid/broker/Message.h @@ -48,7 +48,7 @@ namespace qpid { qpid::framing::AMQHeaderBody::shared_ptr header; content_list content; u_int64_t size; - TxBuffer* tx; + u_int64_t persistenceId; void sendContent(qpid::framing::OutputHandler* out, int channel, u_int32_t framesize); @@ -78,11 +78,12 @@ namespace qpid { void redeliver(); qpid::framing::BasicHeaderProperties* getHeaderProperties(); + bool isPersistent(); const string& getRoutingKey() const { return routingKey; } const string& getExchange() const { return exchange; } u_int64_t contentSize() const { return size; } - TxBuffer* getTx() const { return tx; } - void setTx(TxBuffer* _tx) { tx = _tx; } + u_int64_t getPersistenceId() const { return persistenceId; } + void setPersistenceId(u_int64_t _persistenceId) { persistenceId = _persistenceId; } }; } } diff --git a/cpp/src/qpid/broker/MessageStore.h b/cpp/src/qpid/broker/MessageStore.h index af9dd20079..9db7e81ed7 100644 --- a/cpp/src/qpid/broker/MessageStore.h +++ b/cpp/src/qpid/broker/MessageStore.h @@ -24,12 +24,29 @@ namespace qpid { namespace broker { + class Queue; + class QueueRegistry; + /** * An abstraction of the persistent storage for messages. */ class MessageStore : public TransactionalStore{ public: /** + * Record the existance of a durable queue + */ + virtual void create(const Queue& queue) = 0; + /** + * Destroy a durable queue + */ + virtual void destroy(const Queue& queue) = 0; + + /** + * Request recovery of queue and message state from store + */ + virtual void recover(QueueRegistry& queues) = 0; + + /** * Enqueues a message, storing the message if it has not * been previously stored and recording that the given * message is on the given queue. @@ -40,7 +57,7 @@ namespace qpid { * distributed transaction in which the operation takes * place or null for 'local' transactions */ - virtual void enqueue(Message::shared_ptr& msg, const string& queue, const string * const xid) = 0; + virtual void enqueue(Message::shared_ptr& msg, const Queue& queue, const string * const xid) = 0; /** * Dequeues a message, recording that the given message is * no longer on the given queue and deleting the message @@ -52,7 +69,7 @@ namespace qpid { * distributed transaction in which the operation takes * place or null for 'local' transactions */ - virtual void dequeue(Message::shared_ptr& msg, const string& queue, const string * const xid) = 0; + virtual void dequeue(Message::shared_ptr& msg, const Queue& queue, const string * const xid) = 0; /** * Treat all enqueue/dequeues where this xid was specified as being committed. */ diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index d671cea9a5..8a81b07aef 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -23,13 +23,12 @@ using namespace qpid::broker; using namespace qpid::concurrent; -Queue::Queue(const string& _name, bool _durable, u_int32_t _autodelete, +Queue::Queue(const string& _name, u_int32_t _autodelete, MessageStore* const _store, const ConnectionToken* const _owner) : name(_name), autodelete(_autodelete), - durable(_durable), store(_store), owner(_owner), queueing(false), @@ -166,12 +165,26 @@ bool Queue::canAutoDelete() const{ void Queue::enqueue(Message::shared_ptr& msg, const string * const xid){ if(store){ - store->enqueue(msg, name, xid); + store->enqueue(msg, *this, xid); } } void Queue::dequeue(Message::shared_ptr& msg, const string * const xid){ if(store){ - store->dequeue(msg, name, xid); + store->dequeue(msg, *this, xid); + } +} + +void Queue::create() +{ + if(store){ + store->create(*this); + } +} + +void Queue::destroy() +{ + if(store){ + store->destroy(*this); } } diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index edc7c99b4f..393ca6b196 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -47,7 +47,6 @@ namespace qpid { class Queue{ const string name; const u_int32_t autodelete; - const bool durable; MessageStore* const store; const ConnectionToken* const owner; std::vector<Consumer*> consumers; @@ -69,10 +68,13 @@ namespace qpid { typedef std::vector<shared_ptr> vector; - Queue(const string& name, bool durable = false, u_int32_t autodelete = 0, + Queue(const string& name, u_int32_t autodelete = 0, MessageStore* const store = 0, const ConnectionToken* const owner = 0); ~Queue(); + + void create(); + void destroy(); /** * Informs the queue of a binding that should be cancelled on * destruction of the queue. diff --git a/cpp/src/qpid/broker/QueueRegistry.cpp b/cpp/src/qpid/broker/QueueRegistry.cpp index 949c194bbe..56452ca907 100644 --- a/cpp/src/qpid/broker/QueueRegistry.cpp +++ b/cpp/src/qpid/broker/QueueRegistry.cpp @@ -24,20 +24,20 @@ using namespace qpid::broker; using namespace qpid::concurrent; -QueueRegistry::QueueRegistry() : counter(1){} +QueueRegistry::QueueRegistry(MessageStore* const _store) : counter(1), store(_store){} QueueRegistry::~QueueRegistry(){} std::pair<Queue::shared_ptr, bool> -QueueRegistry::declare(const string& declareName, bool durable, u_int32_t autoDelete, - MessageStore* const store, const ConnectionToken* owner) +QueueRegistry::declare(const string& declareName, bool durable, + u_int32_t autoDelete, const ConnectionToken* owner) { Locker locker(lock); string name = declareName.empty() ? generateName() : declareName; assert(!name.empty()); QueueMap::iterator i = queues.find(name); if (i == queues.end()) { - Queue::shared_ptr queue(new Queue(name, durable, autoDelete, store, owner)); + Queue::shared_ptr queue(new Queue(name, autoDelete, durable ? store : 0, owner)); queues[name] = queue; return std::pair<Queue::shared_ptr, bool>(queue, true); } else { diff --git a/cpp/src/qpid/broker/QueueRegistry.h b/cpp/src/qpid/broker/QueueRegistry.h index 4f9e4b882a..fb22ef148a 100644 --- a/cpp/src/qpid/broker/QueueRegistry.h +++ b/cpp/src/qpid/broker/QueueRegistry.h @@ -37,7 +37,7 @@ class SessionHandlerImpl; class QueueRegistry{ public: - QueueRegistry(); + QueueRegistry(MessageStore* const store = 0); ~QueueRegistry(); /** @@ -47,7 +47,6 @@ class QueueRegistry{ * was created by this declare call false if it already existed. */ std::pair<Queue::shared_ptr, bool> declare(const string& name, bool durable = false, u_int32_t autodelete = 0, - MessageStore* const _store = 0, const ConnectionToken* const owner = 0); /** @@ -79,7 +78,7 @@ class QueueRegistry{ QueueMap queues; qpid::concurrent::Monitor lock; int counter; - + MessageStore* const store; }; diff --git a/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp b/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp index 9883c94a25..76723881dc 100644 --- a/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp +++ b/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp @@ -33,7 +33,9 @@ const std::string amq_fanout("amq.fanout"); const std::string amq_match("amq.match"); } -SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(u_int32_t _timeout) : timeout(_timeout), cleaner(&queues, timeout/10){ +SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(u_int32_t _timeout) : + queues(store.get()), timeout(_timeout), cleaner(&queues, timeout/10) +{ exchanges.declare(empty, DirectExchange::typeName); // Default exchange. exchanges.declare(amq_direct, DirectExchange::typeName); exchanges.declare(amq_topic, TopicExchange::typeName); @@ -42,10 +44,17 @@ SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(u_int32_t _timeout) : timeo cleaner.start(); } -SessionHandler* SessionHandlerFactoryImpl::create(SessionContext* ctxt){ +void SessionHandlerFactoryImpl::recover() +{ + if(store.get()) store->recover(queues); +} + +SessionHandler* SessionHandlerFactoryImpl::create(SessionContext* ctxt) +{ return new SessionHandlerImpl(ctxt, &queues, &exchanges, &cleaner, timeout); } -SessionHandlerFactoryImpl::~SessionHandlerFactoryImpl(){ +SessionHandlerFactoryImpl::~SessionHandlerFactoryImpl() +{ cleaner.stop(); } diff --git a/cpp/src/qpid/broker/SessionHandlerFactoryImpl.h b/cpp/src/qpid/broker/SessionHandlerFactoryImpl.h index 5bad81412b..cea5c0fa00 100644 --- a/cpp/src/qpid/broker/SessionHandlerFactoryImpl.h +++ b/cpp/src/qpid/broker/SessionHandlerFactoryImpl.h @@ -18,27 +18,31 @@ #ifndef _SessionHandlerFactoryImpl_ #define _SessionHandlerFactoryImpl_ -#include "qpid/framing/AMQFrame.h" #include "qpid/broker/AutoDelete.h" #include "qpid/broker/ExchangeRegistry.h" -#include "qpid/framing/ProtocolInitiation.h" +#include "qpid/broker/MessageStore.h" #include "qpid/broker/QueueRegistry.h" -#include "qpid/io/SessionHandlerFactory.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/ProtocolInitiation.h" #include "qpid/io/SessionContext.h" #include "qpid/io/SessionHandler.h" +#include "qpid/io/SessionHandlerFactory.h" #include "qpid/io/TimeoutHandler.h" +#include <memory> namespace qpid { namespace broker { class SessionHandlerFactoryImpl : public virtual qpid::io::SessionHandlerFactory { + std::auto_ptr<MessageStore> store; QueueRegistry queues; ExchangeRegistry exchanges; const u_int32_t timeout;//timeout for auto-deleted queues (in ms) AutoDelete cleaner; public: SessionHandlerFactoryImpl(u_int32_t timeout = 30000); + void recover(); virtual qpid::io::SessionHandler* create(qpid::io::SessionContext* ctxt); virtual ~SessionHandlerFactoryImpl(); }; diff --git a/cpp/src/qpid/broker/SessionHandlerImpl.cpp b/cpp/src/qpid/broker/SessionHandlerImpl.cpp index 7c94a65d73..7a03132671 100644 --- a/cpp/src/qpid/broker/SessionHandlerImpl.cpp +++ b/cpp/src/qpid/broker/SessionHandlerImpl.cpp @@ -250,24 +250,28 @@ void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t queue = parent->getQueue(name, channel); } else { std::pair<Queue::shared_ptr, bool> queue_created = - parent->queues->declare(name, durable, autoDelete ? parent->timeout : 0, 0, exclusive ? parent : 0); + parent->queues->declare(name, durable, autoDelete ? parent->timeout : 0, exclusive ? parent : 0); queue = queue_created.first; assert(queue); if (queue_created.second) { // This is a new queue parent->getChannel(channel)->setDefaultQueue(queue); + + //create persistent record if required + queue_created.first->create(); + //add default binding: parent->exchanges->getDefault()->bind(queue, name, 0); - if(exclusive){ + if (exclusive) { parent->exclusiveQueues.push_back(queue); } else if(autoDelete){ parent->cleaner->add(queue); } } } - if(exclusive && !queue->isExclusiveOwner(parent)){ + if (exclusive && !queue->isExclusiveOwner(parent)) { throw ChannelException(405, "Cannot grant exclusive access to queue"); } - if(!nowait){ + if (!nowait) { name = queue->getName(); parent->client.getQueue().declareOk(channel, name, queue->getMessageCount(), queue->getConsumerCount()); } @@ -311,6 +315,7 @@ void SessionHandlerImpl::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t if(i < parent->exclusiveQueues.end()) parent->exclusiveQueues.erase(i); } count = q->getMessageCount(); + q->destroy(); parent->queues->destroy(queue); } if(!nowait) parent->client.getQueue().deleteOk(channel, count); diff --git a/cpp/src/qpid/framing/BasicHeaderProperties.h b/cpp/src/qpid/framing/BasicHeaderProperties.h index e82699753b..3450782875 100644 --- a/cpp/src/qpid/framing/BasicHeaderProperties.h +++ b/cpp/src/qpid/framing/BasicHeaderProperties.h @@ -25,6 +25,7 @@ namespace qpid { namespace framing { + enum delivery_mode {TRANSIENT = 1, PERSISTENT = 2}; //TODO: This could be easily generated from the spec class BasicHeaderProperties : public HeaderProperties |