summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2006-11-03 13:44:21 +0000
committerGordon Sim <gsim@apache.org>2006-11-03 13:44:21 +0000
commit15a915878c787e6d5ed8330a8dd4375ec885a6c0 (patch)
treee580dbbeb7c61c4dc1a1348156c1970e09311b10 /cpp/src
parent20b96a39f539bf5181a58d1235f521d6a544bc47 (diff)
downloadqpid-python-15a915878c787e6d5ed8330a8dd4375ec885a6c0.tar.gz
Added some methods to MessageStore interface and hooked these in where appropriate.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@470810 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/Message.cpp9
-rw-r--r--cpp/src/qpid/broker/Message.h7
-rw-r--r--cpp/src/qpid/broker/MessageStore.h21
-rw-r--r--cpp/src/qpid/broker/Queue.cpp21
-rw-r--r--cpp/src/qpid/broker/Queue.h6
-rw-r--r--cpp/src/qpid/broker/QueueRegistry.cpp8
-rw-r--r--cpp/src/qpid/broker/QueueRegistry.h5
-rw-r--r--cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp15
-rw-r--r--cpp/src/qpid/broker/SessionHandlerFactoryImpl.h10
-rw-r--r--cpp/src/qpid/broker/SessionHandlerImpl.cpp13
-rw-r--r--cpp/src/qpid/framing/BasicHeaderProperties.h1
11 files changed, 87 insertions, 29 deletions
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index e96cc65b95..baa1b0d915 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -32,7 +32,8 @@ Message::Message(const ConnectionToken* const _publisher,
mandatory(_mandatory),
immediate(_immediate),
redelivered(false),
- size(0) {}
+ size(0),
+ persistenceId(0) {}
Message::~Message(){}
@@ -92,3 +93,9 @@ const ConnectionToken* const Message::getPublisher(){
return publisher;
}
+bool Message::isPersistent()
+{
+ if(!header) return false;
+ BasicHeaderProperties* props = getHeaderProperties();
+ return props && props->getDeliveryMode() == PERSISTENT;
+}
diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h
index cfe29bdfcf..f9acdfd0a5 100644
--- a/cpp/src/qpid/broker/Message.h
+++ b/cpp/src/qpid/broker/Message.h
@@ -48,7 +48,7 @@ namespace qpid {
qpid::framing::AMQHeaderBody::shared_ptr header;
content_list content;
u_int64_t size;
- TxBuffer* tx;
+ u_int64_t persistenceId;
void sendContent(qpid::framing::OutputHandler* out,
int channel, u_int32_t framesize);
@@ -78,11 +78,12 @@ namespace qpid {
void redeliver();
qpid::framing::BasicHeaderProperties* getHeaderProperties();
+ bool isPersistent();
const string& getRoutingKey() const { return routingKey; }
const string& getExchange() const { return exchange; }
u_int64_t contentSize() const { return size; }
- TxBuffer* getTx() const { return tx; }
- void setTx(TxBuffer* _tx) { tx = _tx; }
+ u_int64_t getPersistenceId() const { return persistenceId; }
+ void setPersistenceId(u_int64_t _persistenceId) { persistenceId = _persistenceId; }
};
}
}
diff --git a/cpp/src/qpid/broker/MessageStore.h b/cpp/src/qpid/broker/MessageStore.h
index af9dd20079..9db7e81ed7 100644
--- a/cpp/src/qpid/broker/MessageStore.h
+++ b/cpp/src/qpid/broker/MessageStore.h
@@ -24,12 +24,29 @@
namespace qpid {
namespace broker {
+ class Queue;
+ class QueueRegistry;
+
/**
* An abstraction of the persistent storage for messages.
*/
class MessageStore : public TransactionalStore{
public:
/**
+ * Record the existance of a durable queue
+ */
+ virtual void create(const Queue& queue) = 0;
+ /**
+ * Destroy a durable queue
+ */
+ virtual void destroy(const Queue& queue) = 0;
+
+ /**
+ * Request recovery of queue and message state from store
+ */
+ virtual void recover(QueueRegistry& queues) = 0;
+
+ /**
* Enqueues a message, storing the message if it has not
* been previously stored and recording that the given
* message is on the given queue.
@@ -40,7 +57,7 @@ namespace qpid {
* distributed transaction in which the operation takes
* place or null for 'local' transactions
*/
- virtual void enqueue(Message::shared_ptr& msg, const string& queue, const string * const xid) = 0;
+ virtual void enqueue(Message::shared_ptr& msg, const Queue& queue, const string * const xid) = 0;
/**
* Dequeues a message, recording that the given message is
* no longer on the given queue and deleting the message
@@ -52,7 +69,7 @@ namespace qpid {
* distributed transaction in which the operation takes
* place or null for 'local' transactions
*/
- virtual void dequeue(Message::shared_ptr& msg, const string& queue, const string * const xid) = 0;
+ virtual void dequeue(Message::shared_ptr& msg, const Queue& queue, const string * const xid) = 0;
/**
* Treat all enqueue/dequeues where this xid was specified as being committed.
*/
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index d671cea9a5..8a81b07aef 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -23,13 +23,12 @@
using namespace qpid::broker;
using namespace qpid::concurrent;
-Queue::Queue(const string& _name, bool _durable, u_int32_t _autodelete,
+Queue::Queue(const string& _name, u_int32_t _autodelete,
MessageStore* const _store,
const ConnectionToken* const _owner) :
name(_name),
autodelete(_autodelete),
- durable(_durable),
store(_store),
owner(_owner),
queueing(false),
@@ -166,12 +165,26 @@ bool Queue::canAutoDelete() const{
void Queue::enqueue(Message::shared_ptr& msg, const string * const xid){
if(store){
- store->enqueue(msg, name, xid);
+ store->enqueue(msg, *this, xid);
}
}
void Queue::dequeue(Message::shared_ptr& msg, const string * const xid){
if(store){
- store->dequeue(msg, name, xid);
+ store->dequeue(msg, *this, xid);
+ }
+}
+
+void Queue::create()
+{
+ if(store){
+ store->create(*this);
+ }
+}
+
+void Queue::destroy()
+{
+ if(store){
+ store->destroy(*this);
}
}
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index edc7c99b4f..393ca6b196 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -47,7 +47,6 @@ namespace qpid {
class Queue{
const string name;
const u_int32_t autodelete;
- const bool durable;
MessageStore* const store;
const ConnectionToken* const owner;
std::vector<Consumer*> consumers;
@@ -69,10 +68,13 @@ namespace qpid {
typedef std::vector<shared_ptr> vector;
- Queue(const string& name, bool durable = false, u_int32_t autodelete = 0,
+ Queue(const string& name, u_int32_t autodelete = 0,
MessageStore* const store = 0,
const ConnectionToken* const owner = 0);
~Queue();
+
+ void create();
+ void destroy();
/**
* Informs the queue of a binding that should be cancelled on
* destruction of the queue.
diff --git a/cpp/src/qpid/broker/QueueRegistry.cpp b/cpp/src/qpid/broker/QueueRegistry.cpp
index 949c194bbe..56452ca907 100644
--- a/cpp/src/qpid/broker/QueueRegistry.cpp
+++ b/cpp/src/qpid/broker/QueueRegistry.cpp
@@ -24,20 +24,20 @@
using namespace qpid::broker;
using namespace qpid::concurrent;
-QueueRegistry::QueueRegistry() : counter(1){}
+QueueRegistry::QueueRegistry(MessageStore* const _store) : counter(1), store(_store){}
QueueRegistry::~QueueRegistry(){}
std::pair<Queue::shared_ptr, bool>
-QueueRegistry::declare(const string& declareName, bool durable, u_int32_t autoDelete,
- MessageStore* const store, const ConnectionToken* owner)
+QueueRegistry::declare(const string& declareName, bool durable,
+ u_int32_t autoDelete, const ConnectionToken* owner)
{
Locker locker(lock);
string name = declareName.empty() ? generateName() : declareName;
assert(!name.empty());
QueueMap::iterator i = queues.find(name);
if (i == queues.end()) {
- Queue::shared_ptr queue(new Queue(name, durable, autoDelete, store, owner));
+ Queue::shared_ptr queue(new Queue(name, autoDelete, durable ? store : 0, owner));
queues[name] = queue;
return std::pair<Queue::shared_ptr, bool>(queue, true);
} else {
diff --git a/cpp/src/qpid/broker/QueueRegistry.h b/cpp/src/qpid/broker/QueueRegistry.h
index 4f9e4b882a..fb22ef148a 100644
--- a/cpp/src/qpid/broker/QueueRegistry.h
+++ b/cpp/src/qpid/broker/QueueRegistry.h
@@ -37,7 +37,7 @@ class SessionHandlerImpl;
class QueueRegistry{
public:
- QueueRegistry();
+ QueueRegistry(MessageStore* const store = 0);
~QueueRegistry();
/**
@@ -47,7 +47,6 @@ class QueueRegistry{
* was created by this declare call false if it already existed.
*/
std::pair<Queue::shared_ptr, bool> declare(const string& name, bool durable = false, u_int32_t autodelete = 0,
- MessageStore* const _store = 0,
const ConnectionToken* const owner = 0);
/**
@@ -79,7 +78,7 @@ class QueueRegistry{
QueueMap queues;
qpid::concurrent::Monitor lock;
int counter;
-
+ MessageStore* const store;
};
diff --git a/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp b/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp
index 9883c94a25..76723881dc 100644
--- a/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp
+++ b/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp
@@ -33,7 +33,9 @@ const std::string amq_fanout("amq.fanout");
const std::string amq_match("amq.match");
}
-SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(u_int32_t _timeout) : timeout(_timeout), cleaner(&queues, timeout/10){
+SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(u_int32_t _timeout) :
+ queues(store.get()), timeout(_timeout), cleaner(&queues, timeout/10)
+{
exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
exchanges.declare(amq_direct, DirectExchange::typeName);
exchanges.declare(amq_topic, TopicExchange::typeName);
@@ -42,10 +44,17 @@ SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(u_int32_t _timeout) : timeo
cleaner.start();
}
-SessionHandler* SessionHandlerFactoryImpl::create(SessionContext* ctxt){
+void SessionHandlerFactoryImpl::recover()
+{
+ if(store.get()) store->recover(queues);
+}
+
+SessionHandler* SessionHandlerFactoryImpl::create(SessionContext* ctxt)
+{
return new SessionHandlerImpl(ctxt, &queues, &exchanges, &cleaner, timeout);
}
-SessionHandlerFactoryImpl::~SessionHandlerFactoryImpl(){
+SessionHandlerFactoryImpl::~SessionHandlerFactoryImpl()
+{
cleaner.stop();
}
diff --git a/cpp/src/qpid/broker/SessionHandlerFactoryImpl.h b/cpp/src/qpid/broker/SessionHandlerFactoryImpl.h
index 5bad81412b..cea5c0fa00 100644
--- a/cpp/src/qpid/broker/SessionHandlerFactoryImpl.h
+++ b/cpp/src/qpid/broker/SessionHandlerFactoryImpl.h
@@ -18,27 +18,31 @@
#ifndef _SessionHandlerFactoryImpl_
#define _SessionHandlerFactoryImpl_
-#include "qpid/framing/AMQFrame.h"
#include "qpid/broker/AutoDelete.h"
#include "qpid/broker/ExchangeRegistry.h"
-#include "qpid/framing/ProtocolInitiation.h"
+#include "qpid/broker/MessageStore.h"
#include "qpid/broker/QueueRegistry.h"
-#include "qpid/io/SessionHandlerFactory.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/io/SessionContext.h"
#include "qpid/io/SessionHandler.h"
+#include "qpid/io/SessionHandlerFactory.h"
#include "qpid/io/TimeoutHandler.h"
+#include <memory>
namespace qpid {
namespace broker {
class SessionHandlerFactoryImpl : public virtual qpid::io::SessionHandlerFactory
{
+ std::auto_ptr<MessageStore> store;
QueueRegistry queues;
ExchangeRegistry exchanges;
const u_int32_t timeout;//timeout for auto-deleted queues (in ms)
AutoDelete cleaner;
public:
SessionHandlerFactoryImpl(u_int32_t timeout = 30000);
+ void recover();
virtual qpid::io::SessionHandler* create(qpid::io::SessionContext* ctxt);
virtual ~SessionHandlerFactoryImpl();
};
diff --git a/cpp/src/qpid/broker/SessionHandlerImpl.cpp b/cpp/src/qpid/broker/SessionHandlerImpl.cpp
index 7c94a65d73..7a03132671 100644
--- a/cpp/src/qpid/broker/SessionHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/SessionHandlerImpl.cpp
@@ -250,24 +250,28 @@ void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t
queue = parent->getQueue(name, channel);
} else {
std::pair<Queue::shared_ptr, bool> queue_created =
- parent->queues->declare(name, durable, autoDelete ? parent->timeout : 0, 0, exclusive ? parent : 0);
+ parent->queues->declare(name, durable, autoDelete ? parent->timeout : 0, exclusive ? parent : 0);
queue = queue_created.first;
assert(queue);
if (queue_created.second) { // This is a new queue
parent->getChannel(channel)->setDefaultQueue(queue);
+
+ //create persistent record if required
+ queue_created.first->create();
+
//add default binding:
parent->exchanges->getDefault()->bind(queue, name, 0);
- if(exclusive){
+ if (exclusive) {
parent->exclusiveQueues.push_back(queue);
} else if(autoDelete){
parent->cleaner->add(queue);
}
}
}
- if(exclusive && !queue->isExclusiveOwner(parent)){
+ if (exclusive && !queue->isExclusiveOwner(parent)) {
throw ChannelException(405, "Cannot grant exclusive access to queue");
}
- if(!nowait){
+ if (!nowait) {
name = queue->getName();
parent->client.getQueue().declareOk(channel, name, queue->getMessageCount(), queue->getConsumerCount());
}
@@ -311,6 +315,7 @@ void SessionHandlerImpl::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t
if(i < parent->exclusiveQueues.end()) parent->exclusiveQueues.erase(i);
}
count = q->getMessageCount();
+ q->destroy();
parent->queues->destroy(queue);
}
if(!nowait) parent->client.getQueue().deleteOk(channel, count);
diff --git a/cpp/src/qpid/framing/BasicHeaderProperties.h b/cpp/src/qpid/framing/BasicHeaderProperties.h
index e82699753b..3450782875 100644
--- a/cpp/src/qpid/framing/BasicHeaderProperties.h
+++ b/cpp/src/qpid/framing/BasicHeaderProperties.h
@@ -25,6 +25,7 @@
namespace qpid {
namespace framing {
+ enum delivery_mode {TRANSIENT = 1, PERSISTENT = 2};
//TODO: This could be easily generated from the spec
class BasicHeaderProperties : public HeaderProperties