summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-05-17 11:03:55 +0000
committerGordon Sim <gsim@apache.org>2007-05-17 11:03:55 +0000
commit9a6c0d41b19744c8e4dc4711d13a5a0afa2f7ed2 (patch)
tree539a8102197fa119c7efb77056841932e2eb5c1a /cpp/src
parentdecfd77364e211bc8f8784e15f54e06a79e16675 (diff)
downloadqpid-python-9a6c0d41b19744c8e4dc4711d13a5a0afa2f7ed2.tar.gz
Changes to support durable exchanges.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@538872 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/Makefile.am1
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.cpp26
-rw-r--r--cpp/src/qpid/broker/BrokerExchange.cpp60
-rw-r--r--cpp/src/qpid/broker/BrokerExchange.h35
-rw-r--r--cpp/src/qpid/broker/BrokerQueue.h1
-rw-r--r--cpp/src/qpid/broker/DirectExchange.cpp19
-rw-r--r--cpp/src/qpid/broker/DirectExchange.h8
-rw-r--r--cpp/src/qpid/broker/ExchangeRegistry.cpp24
-rw-r--r--cpp/src/qpid/broker/ExchangeRegistry.h10
-rw-r--r--cpp/src/qpid/broker/FanOutExchange.cpp11
-rw-r--r--cpp/src/qpid/broker/FanOutExchange.h8
-rw-r--r--cpp/src/qpid/broker/HeadersExchange.cpp22
-rw-r--r--cpp/src/qpid/broker/HeadersExchange.h8
-rw-r--r--cpp/src/qpid/broker/MessageStore.h14
-rw-r--r--cpp/src/qpid/broker/MessageStoreModule.cpp12
-rw-r--r--cpp/src/qpid/broker/MessageStoreModule.h4
-rw-r--r--cpp/src/qpid/broker/NullMessageStore.cpp9
-rw-r--r--cpp/src/qpid/broker/NullMessageStore.h5
-rw-r--r--cpp/src/qpid/broker/PersistableExchange.h1
-rw-r--r--cpp/src/qpid/broker/RecoverableExchange.h49
-rw-r--r--cpp/src/qpid/broker/RecoveryManager.h3
-rw-r--r--cpp/src/qpid/broker/RecoveryManagerImpl.cpp25
-rw-r--r--cpp/src/qpid/broker/RecoveryManagerImpl.h2
-rw-r--r--cpp/src/qpid/broker/TopicExchange.cpp25
-rw-r--r--cpp/src/qpid/broker/TopicExchange.h11
25 files changed, 333 insertions, 60 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index f75b1c8ac9..f1dd6b9dd8 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -124,6 +124,7 @@ libqpidbroker_la_SOURCES = \
qpid/broker/BrokerAdapter.cpp \
qpid/broker/BrokerSingleton.cpp \
qpid/broker/BrokerChannel.cpp \
+ qpid/broker/BrokerExchange.cpp \
qpid/broker/BrokerMessage.cpp \
qpid/broker/BrokerMessageMessage.cpp \
qpid/broker/BrokerQueue.cpp \
diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp
index 36232339e5..3c742b8d2d 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.cpp
+++ b/cpp/src/qpid/broker/BrokerAdapter.cpp
@@ -118,8 +118,8 @@ void BrokerAdapter::ChannelHandlerImpl::closeOk(const MethodContext&){}
void BrokerAdapter::ExchangeHandlerImpl::declare(const MethodContext& context, uint16_t /*ticket*/, const string& exchange, const string& type,
- bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait,
- const FieldTable& /*arguments*/){
+ bool passive, bool durable, bool /*autoDelete*/, bool /*internal*/, bool nowait,
+ const FieldTable& args){
if(passive){
if(!broker.getExchanges().get(exchange)) {
@@ -127,8 +127,10 @@ void BrokerAdapter::ExchangeHandlerImpl::declare(const MethodContext& context, u
}
}else{
try{
- std::pair<Exchange::shared_ptr, bool> response = broker.getExchanges().declare(exchange, type);
- if(!response.second && response.first->getType() != type){
+ std::pair<Exchange::shared_ptr, bool> response = broker.getExchanges().declare(exchange, type, durable, args);
+ if (response.second) {
+ if (durable) broker.getStore().create(*response.first);
+ } else if (response.first->getType() != type) {
throw ConnectionException(
530,
"Exchange already declared to be of type "
@@ -145,10 +147,12 @@ void BrokerAdapter::ExchangeHandlerImpl::declare(const MethodContext& context, u
}
void BrokerAdapter::ExchangeHandlerImpl::delete_(const MethodContext& context, uint16_t /*ticket*/,
- const string& exchange, bool /*ifUnused*/, bool nowait){
+ const string& name, bool /*ifUnused*/, bool nowait){
//TODO: implement unused
- broker.getExchanges().destroy(exchange);
+ Exchange::shared_ptr exchange(broker.getExchanges().get(name));
+ if (exchange->isDurable()) broker.getStore().destroy(*exchange);
+ broker.getExchanges().destroy(name);
if(!nowait) client.deleteOk(context.getRequestId());
}
@@ -174,6 +178,8 @@ void BrokerAdapter::QueueHandlerImpl::declare(const MethodContext& context, uint
//add default binding:
broker.getExchanges().getDefault()->bind(queue, name, 0);
+
+ //handle automatic cleanup:
if (exclusive) {
connection.exclusiveQueues.push_back(queue);
} else if(autoDelete){
@@ -202,7 +208,9 @@ void BrokerAdapter::QueueHandlerImpl::bind(const MethodContext& context, uint16_
Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName);
if(exchange){
string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey;
- exchange->bind(queue, exchangeRoutingKey, &arguments);
+ if (exchange->bind(queue, exchangeRoutingKey, &arguments) && exchange->isDurable() && queue->isDurable()) {
+ broker.getStore().bind(*exchange, *queue, routingKey, arguments);
+ }
if(!nowait) client.bindOk(context.getRequestId());
}else{
throw ChannelException(
@@ -225,7 +233,9 @@ BrokerAdapter::QueueHandlerImpl::unbind(
Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName);
if (!exchange.get()) throw ChannelException(404, "Unbind failed. No such exchange: " + exchangeName);
- exchange->unbind(queue, routingKey, &arguments);
+ if (exchange->unbind(queue, routingKey, &arguments) && exchange->isDurable() && queue->isDurable()) {
+ broker.getStore().unbind(*exchange, *queue, routingKey, arguments);
+ }
client.unbindOk(context.getRequestId());
}
diff --git a/cpp/src/qpid/broker/BrokerExchange.cpp b/cpp/src/qpid/broker/BrokerExchange.cpp
new file mode 100644
index 0000000000..4eaf40dbc8
--- /dev/null
+++ b/cpp/src/qpid/broker/BrokerExchange.cpp
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "BrokerExchange.h"
+#include "ExchangeRegistry.h"
+
+using namespace qpid::broker;
+using qpid::framing::Buffer;
+using qpid::framing::FieldTable;
+
+Exchange::shared_ptr Exchange::decode(ExchangeRegistry& exchanges, Buffer& buffer)
+{
+ string name;
+ string type;
+ FieldTable args;
+
+ buffer.getShortString(name);
+ bool durable(buffer.getOctet());
+ buffer.getShortString(type);
+ buffer.getFieldTable(args);
+
+ return exchanges.declare(name, type, durable, args).first;
+}
+
+void Exchange::encode(Buffer& buffer) const
+{
+ buffer.putShortString(name);
+ buffer.putOctet(durable);
+ buffer.putShortString(getType());
+ buffer.putFieldTable(args);
+}
+
+uint32_t Exchange::encodedSize() const
+{
+ return name.size() + 1/*short string size*/
+ + 1 /*durable*/
+ + getType().size() + 1/*short string size*/
+ + args.size();
+}
+
+
+
diff --git a/cpp/src/qpid/broker/BrokerExchange.h b/cpp/src/qpid/broker/BrokerExchange.h
index d4877a5110..62c82aa935 100644
--- a/cpp/src/qpid/broker/BrokerExchange.h
+++ b/cpp/src/qpid/broker/BrokerExchange.h
@@ -25,24 +25,47 @@
#include <boost/shared_ptr.hpp>
#include "Deliverable.h"
#include "BrokerQueue.h"
+#include "MessageStore.h"
+#include "PersistableExchange.h"
#include "qpid/framing/FieldTable.h"
namespace qpid {
namespace broker {
using std::string;
+ class ExchangeRegistry;
- class Exchange{
+ class Exchange : public PersistableExchange{
+ private:
const string name;
+ const bool durable;
+ qpid::framing::FieldTable args;
+ mutable uint64_t persistenceId;
+
public:
typedef boost::shared_ptr<Exchange> shared_ptr;
- explicit Exchange(const string& _name) : name(_name){}
+ explicit Exchange(const string& _name) : name(_name), durable(false), persistenceId(0){}
+ Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args)
+ : name(_name), durable(_durable), args(_args), persistenceId(0){}
virtual ~Exchange(){}
- string getName() { return name; }
- virtual string getType() = 0;
- virtual void bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args) = 0;
- virtual void unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args) = 0;
+
+ string getName() const { return name; }
+ bool isDurable() { return durable; }
+ qpid::framing::FieldTable& getArgs() { return args; }
+
+ virtual string getType() const = 0;
+ virtual bool bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args) = 0;
+ virtual bool unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args) = 0;
virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args) = 0;
+
+ //PersistableExchange:
+ void setPersistenceId(uint64_t id) const { persistenceId = id; }
+ uint64_t getPersistenceId() const { return persistenceId; }
+ uint32_t encodedSize() const;
+ void encode(framing::Buffer& buffer) const;
+
+ static Exchange::shared_ptr decode(ExchangeRegistry& exchanges, framing::Buffer& buffer);
+
};
}
}
diff --git a/cpp/src/qpid/broker/BrokerQueue.h b/cpp/src/qpid/broker/BrokerQueue.h
index 28f6cfce8f..c45b35566e 100644
--- a/cpp/src/qpid/broker/BrokerQueue.h
+++ b/cpp/src/qpid/broker/BrokerQueue.h
@@ -122,6 +122,7 @@ namespace qpid {
inline const string& getName() const { return name; }
inline const bool isExclusiveOwner(const ConnectionToken* const o) const { return o == owner; }
inline bool hasExclusiveConsumer() const { return exclusive; }
+ inline bool isDurable() const { return store != 0; }
bool canAutoDelete() const;
diff --git a/cpp/src/qpid/broker/DirectExchange.cpp b/cpp/src/qpid/broker/DirectExchange.cpp
index 7d15410374..ec77efa0f3 100644
--- a/cpp/src/qpid/broker/DirectExchange.cpp
+++ b/cpp/src/qpid/broker/DirectExchange.cpp
@@ -25,29 +25,34 @@ using namespace qpid::broker;
using namespace qpid::framing;
using namespace qpid::sys;
-DirectExchange::DirectExchange(const string& _name) : Exchange(_name) {
+DirectExchange::DirectExchange(const string& _name) : Exchange(_name) {}
+DirectExchange::DirectExchange(const std::string& _name, bool _durable, const FieldTable& _args) : Exchange(_name, _durable, _args) {}
-}
-
-void DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable*){
+bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable*){
Mutex::ScopedLock l(lock);
std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue);
- if(i == queues.end()){
+ if (i == queues.end()) {
bindings[routingKey].push_back(queue);
+ return true;
+ } else{
+ return false;
}
}
-void DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
+bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
Mutex::ScopedLock l(lock);
std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue);
- if(i < queues.end()){
+ if (i < queues.end()) {
queues.erase(i);
if(queues.empty()){
bindings.erase(routingKey);
}
+ return true;
+ } else {
+ return false;
}
}
diff --git a/cpp/src/qpid/broker/DirectExchange.h b/cpp/src/qpid/broker/DirectExchange.h
index 886b59be30..a06da10f6f 100644
--- a/cpp/src/qpid/broker/DirectExchange.h
+++ b/cpp/src/qpid/broker/DirectExchange.h
@@ -39,12 +39,14 @@ namespace broker {
static const std::string typeName;
DirectExchange(const std::string& name);
+ DirectExchange(const string& _name, bool _durable,
+ const qpid::framing::FieldTable& _args);
- virtual std::string getType(){ return typeName; }
+ virtual std::string getType() const { return typeName; }
- virtual void bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
+ virtual bool bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
- virtual void unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
+ virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args);
diff --git a/cpp/src/qpid/broker/ExchangeRegistry.cpp b/cpp/src/qpid/broker/ExchangeRegistry.cpp
index 03863673df..3bf211b960 100644
--- a/cpp/src/qpid/broker/ExchangeRegistry.cpp
+++ b/cpp/src/qpid/broker/ExchangeRegistry.cpp
@@ -27,21 +27,30 @@
using namespace qpid::broker;
using namespace qpid::sys;
using std::pair;
+using qpid::framing::FieldTable;
-pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type) throw(UnknownExchangeTypeException){
+pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type)
+ throw(UnknownExchangeTypeException){
+
+ return declare(name, type, false, FieldTable());
+}
+
+pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type,
+ bool durable, const FieldTable& args)
+ throw(UnknownExchangeTypeException){
Mutex::ScopedLock locker(lock);
ExchangeMap::iterator i = exchanges.find(name);
if (i == exchanges.end()) {
Exchange::shared_ptr exchange;
if(type == TopicExchange::typeName){
- exchange = Exchange::shared_ptr(new TopicExchange(name));
+ exchange = Exchange::shared_ptr(new TopicExchange(name, durable, args));
}else if(type == DirectExchange::typeName){
- exchange = Exchange::shared_ptr(new DirectExchange(name));
+ exchange = Exchange::shared_ptr(new DirectExchange(name, durable, args));
}else if(type == FanOutExchange::typeName){
- exchange = Exchange::shared_ptr(new FanOutExchange(name));
+ exchange = Exchange::shared_ptr(new FanOutExchange(name, durable, args));
}else if (type == HeadersExchange::typeName) {
- exchange = Exchange::shared_ptr(new HeadersExchange(name));
+ exchange = Exchange::shared_ptr(new HeadersExchange(name, durable, args));
}else{
throw UnknownExchangeTypeException();
}
@@ -54,7 +63,10 @@ pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, c
void ExchangeRegistry::destroy(const string& name){
Mutex::ScopedLock locker(lock);
- exchanges.erase(name);
+ ExchangeMap::iterator i = exchanges.find(name);
+ if (i != exchanges.end()) {
+ exchanges.erase(i);
+ }
}
Exchange::shared_ptr ExchangeRegistry::get(const string& name){
diff --git a/cpp/src/qpid/broker/ExchangeRegistry.h b/cpp/src/qpid/broker/ExchangeRegistry.h
index ff7399ba22..59fe51691b 100644
--- a/cpp/src/qpid/broker/ExchangeRegistry.h
+++ b/cpp/src/qpid/broker/ExchangeRegistry.h
@@ -24,6 +24,8 @@
#include <map>
#include "BrokerExchange.h"
+#include "MessageStore.h"
+#include "qpid/framing/FieldTable.h"
#include "qpid/sys/Monitor.h"
namespace qpid {
@@ -34,8 +36,12 @@ namespace broker {
typedef std::map<std::string, Exchange::shared_ptr> ExchangeMap;
ExchangeMap exchanges;
qpid::sys::Mutex lock;
- public:
- std::pair<Exchange::shared_ptr, bool> declare(const std::string& name, const std::string& type) throw(UnknownExchangeTypeException);
+ public:
+ std::pair<Exchange::shared_ptr, bool> declare(const std::string& name, const std::string& type)
+ throw(UnknownExchangeTypeException);
+ std::pair<Exchange::shared_ptr, bool> declare(const std::string& name, const std::string& type,
+ bool durable, const qpid::framing::FieldTable& args)
+ throw(UnknownExchangeTypeException);
void destroy(const std::string& name);
Exchange::shared_ptr get(const std::string& name);
Exchange::shared_ptr getDefault();
diff --git a/cpp/src/qpid/broker/FanOutExchange.cpp b/cpp/src/qpid/broker/FanOutExchange.cpp
index 1ac92c89e2..5f3a66d115 100644
--- a/cpp/src/qpid/broker/FanOutExchange.cpp
+++ b/cpp/src/qpid/broker/FanOutExchange.cpp
@@ -26,21 +26,28 @@ using namespace qpid::framing;
using namespace qpid::sys;
FanOutExchange::FanOutExchange(const std::string& _name) : Exchange(_name) {}
+FanOutExchange::FanOutExchange(const std::string& _name, bool _durable, const FieldTable& _args) : Exchange(_name, _durable, _args) {}
-void FanOutExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* /*args*/){
+bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* /*args*/){
Mutex::ScopedLock locker(lock);
// Add if not already present.
Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue);
if (i == bindings.end()) {
bindings.push_back(queue);
+ return true;
+ } else {
+ return false;
}
}
-void FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* /*args*/){
+bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* /*args*/){
Mutex::ScopedLock locker(lock);
Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue);
if (i != bindings.end()) {
bindings.erase(i);
+ return true;
+ } else {
+ return false;
}
}
diff --git a/cpp/src/qpid/broker/FanOutExchange.h b/cpp/src/qpid/broker/FanOutExchange.h
index b6a803673f..cfab710a35 100644
--- a/cpp/src/qpid/broker/FanOutExchange.h
+++ b/cpp/src/qpid/broker/FanOutExchange.h
@@ -40,12 +40,14 @@ class FanOutExchange : public virtual Exchange {
static const std::string typeName;
FanOutExchange(const std::string& name);
+ FanOutExchange(const string& _name, bool _durable,
+ const qpid::framing::FieldTable& _args);
- virtual std::string getType(){ return typeName; }
+ virtual std::string getType() const { return typeName; }
- virtual void bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
+ virtual bool bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
- virtual void unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
+ virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args);
diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp
index fef1f3fa4b..c33d638fce 100644
--- a/cpp/src/qpid/broker/HeadersExchange.cpp
+++ b/cpp/src/qpid/broker/HeadersExchange.cpp
@@ -41,21 +41,35 @@ namespace {
}
HeadersExchange::HeadersExchange(const string& _name) : Exchange(_name) { }
+HeadersExchange::HeadersExchange(const std::string& _name, bool _durable, const FieldTable& _args) : Exchange(_name, _durable, _args) {}
-void HeadersExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* args){
+bool HeadersExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* args){
Mutex::ScopedLock locker(lock);
std::string what = args->getString("x-match");
if (what != all && what != any) {
THROW_QPID_ERROR(PROTOCOL_ERROR, "Invalid x-match value binding to headers exchange.");
}
- bindings.push_back(Binding(*args, queue));
+ Binding binding(*args, queue);
+ Bindings::iterator i =
+ std::find(bindings.begin(),bindings.end(), binding);
+ if (i == bindings.end()) {
+ bindings.push_back(binding);
+ return true;
+ } else {
+ return false;
+ }
}
-void HeadersExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* args){
+bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* args){
Mutex::ScopedLock locker(lock);
Bindings::iterator i =
std::find(bindings.begin(),bindings.end(), Binding(*args, queue));
- if (i != bindings.end()) bindings.erase(i);
+ if (i != bindings.end()) {
+ bindings.erase(i);
+ return true;
+ } else {
+ return false;
+ }
}
diff --git a/cpp/src/qpid/broker/HeadersExchange.h b/cpp/src/qpid/broker/HeadersExchange.h
index e92b6f19cf..e35ef21ccd 100644
--- a/cpp/src/qpid/broker/HeadersExchange.h
+++ b/cpp/src/qpid/broker/HeadersExchange.h
@@ -43,12 +43,14 @@ class HeadersExchange : public virtual Exchange {
static const std::string typeName;
HeadersExchange(const string& name);
+ HeadersExchange(const string& _name, bool _durable,
+ const qpid::framing::FieldTable& _args);
- virtual std::string getType(){ return typeName; }
+ virtual std::string getType() const { return typeName; }
- virtual void bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args);
+ virtual bool bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args);
- virtual void unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args);
+ virtual bool unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args);
virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args);
diff --git a/cpp/src/qpid/broker/MessageStore.h b/cpp/src/qpid/broker/MessageStore.h
index 1d9ee86e48..1c02f94727 100644
--- a/cpp/src/qpid/broker/MessageStore.h
+++ b/cpp/src/qpid/broker/MessageStore.h
@@ -56,9 +56,21 @@ public:
virtual void destroy(const PersistableExchange& exchange) = 0;
/**
+ * Record a binding
+ */
+ virtual void bind(const PersistableExchange& exchange, const PersistableQueue& queue,
+ const std::string& key, const framing::FieldTable& args) = 0;
+
+ /**
+ * Forget a binding
+ */
+ virtual void unbind(const PersistableExchange& exchange, const PersistableQueue& queue,
+ const std::string& key, const framing::FieldTable& args) = 0;
+
+ /**
* Request recovery of queue and message state from store
*/
- virtual void recover(RecoveryManager& queues) = 0;
+ virtual void recover(RecoveryManager& recoverer) = 0;
/**
* Stores a messages before it has been enqueued
diff --git a/cpp/src/qpid/broker/MessageStoreModule.cpp b/cpp/src/qpid/broker/MessageStoreModule.cpp
index 17e5d3cca8..0457643b75 100644
--- a/cpp/src/qpid/broker/MessageStoreModule.cpp
+++ b/cpp/src/qpid/broker/MessageStoreModule.cpp
@@ -48,6 +48,18 @@ void MessageStoreModule::destroy(const PersistableExchange& exchange)
store->destroy(exchange);
}
+void MessageStoreModule::bind(const PersistableExchange& e, const PersistableQueue& q,
+ const std::string& k, const framing::FieldTable& a)
+{
+ store->bind(e, q, k, a);
+}
+
+void MessageStoreModule::unbind(const PersistableExchange& e, const PersistableQueue& q,
+ const std::string& k, const framing::FieldTable& a)
+{
+ store->unbind(e, q, k, a);
+}
+
void MessageStoreModule::recover(RecoveryManager& registry)
{
store->recover(registry);
diff --git a/cpp/src/qpid/broker/MessageStoreModule.h b/cpp/src/qpid/broker/MessageStoreModule.h
index 59c45f68f6..078d2c1fdf 100644
--- a/cpp/src/qpid/broker/MessageStoreModule.h
+++ b/cpp/src/qpid/broker/MessageStoreModule.h
@@ -50,6 +50,10 @@ public:
void destroy(const PersistableQueue& queue);
void create(const PersistableExchange& exchange);
void destroy(const PersistableExchange& exchange);
+ void bind(const PersistableExchange& exchange, const PersistableQueue& queue,
+ const std::string& key, const framing::FieldTable& args);
+ void unbind(const PersistableExchange& exchange, const PersistableQueue& queue,
+ const std::string& key, const framing::FieldTable& args);
void recover(RecoveryManager& queues);
void stage(PersistableMessage& msg);
void destroy(PersistableMessage& msg);
diff --git a/cpp/src/qpid/broker/NullMessageStore.cpp b/cpp/src/qpid/broker/NullMessageStore.cpp
index 393bbb8f02..686c2238ff 100644
--- a/cpp/src/qpid/broker/NullMessageStore.cpp
+++ b/cpp/src/qpid/broker/NullMessageStore.cpp
@@ -39,13 +39,18 @@ void NullMessageStore::destroy(const PersistableQueue& queue)
if (warn) std::cout << "WARNING: Can't destroy durable queue '" << queue.getName() << "'. Persistence not enabled." << std::endl;
}
-void NullMessageStore::create(const PersistableExchange&)
+void NullMessageStore::create(const PersistableExchange& exchange)
{
+ if (warn) std::cout << "WARNING: Can't create durable exchange '" << exchange.getName() << "'. Persistence not enabled." << std::endl;
}
-void NullMessageStore::destroy(const PersistableExchange&)
+void NullMessageStore::destroy(const PersistableExchange& exchange)
{
+ if (warn) std::cout << "WARNING: Can't destroy durable exchange '" << exchange.getName() << "'. Persistence not enabled." << std::endl;
}
+void NullMessageStore::bind(const PersistableExchange&, const PersistableQueue&, const std::string&, const framing::FieldTable&){}
+
+void NullMessageStore::unbind(const PersistableExchange&, const PersistableQueue&, const std::string&, const framing::FieldTable&){}
void NullMessageStore::recover(RecoveryManager&)
{
diff --git a/cpp/src/qpid/broker/NullMessageStore.h b/cpp/src/qpid/broker/NullMessageStore.h
index e0b215bb39..2835961048 100644
--- a/cpp/src/qpid/broker/NullMessageStore.h
+++ b/cpp/src/qpid/broker/NullMessageStore.h
@@ -48,6 +48,11 @@ public:
virtual void destroy(const PersistableQueue& queue);
virtual void create(const PersistableExchange& exchange);
virtual void destroy(const PersistableExchange& exchange);
+
+ virtual void bind(const PersistableExchange& exchange, const PersistableQueue& queue,
+ const std::string& key, const framing::FieldTable& args);
+ virtual void unbind(const PersistableExchange& exchange, const PersistableQueue& queue,
+ const std::string& key, const framing::FieldTable& args);
virtual void recover(RecoveryManager& queues);
virtual void stage(PersistableMessage& msg);
virtual void destroy(PersistableMessage& msg);
diff --git a/cpp/src/qpid/broker/PersistableExchange.h b/cpp/src/qpid/broker/PersistableExchange.h
index 9badf5f609..9ba883cec0 100644
--- a/cpp/src/qpid/broker/PersistableExchange.h
+++ b/cpp/src/qpid/broker/PersistableExchange.h
@@ -35,6 +35,7 @@ namespace broker {
class PersistableExchange : public Persistable
{
public:
+ virtual std::string getName() const = 0;
virtual ~PersistableExchange() {};
};
diff --git a/cpp/src/qpid/broker/RecoverableExchange.h b/cpp/src/qpid/broker/RecoverableExchange.h
new file mode 100644
index 0000000000..0af4aea232
--- /dev/null
+++ b/cpp/src/qpid/broker/RecoverableExchange.h
@@ -0,0 +1,49 @@
+#ifndef _broker_RecoverableExchange_h
+#define _broker_RecoverableExchange_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+namespace broker {
+
+/**
+ * The interface through which bindings are recovered.
+ */
+class RecoverableExchange
+{
+public:
+ typedef boost::shared_ptr<RecoverableExchange> shared_ptr;
+
+ virtual void setPersistenceId(uint64_t id) = 0;
+ /**
+ * Recover binding. Nb: queue must have been recovered earlier.
+ */
+ virtual void bind(std::string& queue, std::string& routingKey, qpid::framing::FieldTable& args) = 0;
+ virtual ~RecoverableExchange() {};
+};
+
+}}
+
+
+#endif
diff --git a/cpp/src/qpid/broker/RecoveryManager.h b/cpp/src/qpid/broker/RecoveryManager.h
index 27f3d9355c..aae2bbe3ac 100644
--- a/cpp/src/qpid/broker/RecoveryManager.h
+++ b/cpp/src/qpid/broker/RecoveryManager.h
@@ -21,6 +21,7 @@
#ifndef _RecoveryManager_
#define _RecoveryManager_
+#include "RecoverableExchange.h"
#include "RecoverableQueue.h"
#include "RecoverableMessage.h"
#include "qpid/framing/Buffer.h"
@@ -31,7 +32,7 @@ namespace broker {
class RecoveryManager{
public:
virtual ~RecoveryManager(){}
- virtual void recoverExchange(framing::Buffer& buffer) = 0;
+ virtual RecoverableExchange::shared_ptr recoverExchange(framing::Buffer& buffer) = 0;
virtual RecoverableQueue::shared_ptr recoverQueue(framing::Buffer& buffer) = 0;
virtual RecoverableMessage::shared_ptr recoverMessage(framing::Buffer& buffer) = 0;
virtual void recoveryComplete() = 0;
diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
index f82399f95c..355c8de926 100644
--- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
+++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
@@ -61,9 +61,19 @@ public:
void recover(RecoverableMessage::shared_ptr msg);
};
-void RecoveryManagerImpl::recoverExchange(framing::Buffer&)
+class RecoverableExchangeImpl : public RecoverableExchange
{
- //TODO
+ Exchange::shared_ptr exchange;
+ QueueRegistry& queues;
+public:
+ RecoverableExchangeImpl(Exchange::shared_ptr _exchange, QueueRegistry& _queues) : exchange(_exchange), queues(_queues) {}
+ void setPersistenceId(uint64_t id);
+ void bind(std::string& queue, std::string& routingKey, qpid::framing::FieldTable& args);
+};
+
+RecoverableExchange::shared_ptr RecoveryManagerImpl::recoverExchange(framing::Buffer& buffer)
+{
+ return RecoverableExchange::shared_ptr(new RecoverableExchangeImpl(Exchange::decode(exchanges, buffer), queues));
}
RecoverableQueue::shared_ptr RecoveryManagerImpl::recoverQueue(framing::Buffer& buffer)
@@ -141,3 +151,14 @@ void RecoverableQueueImpl::setPersistenceId(uint64_t id)
{
queue->setPersistenceId(id);
}
+
+void RecoverableExchangeImpl::setPersistenceId(uint64_t id)
+{
+ exchange->setPersistenceId(id);
+}
+
+void RecoverableExchangeImpl::bind(string& queueName, string& key, framing::FieldTable& args)
+{
+ Queue::shared_ptr queue = queues.find(queueName);
+ exchange->bind(queue, key, &args);
+}
diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.h b/cpp/src/qpid/broker/RecoveryManagerImpl.h
index c40de7895f..7802eee711 100644
--- a/cpp/src/qpid/broker/RecoveryManagerImpl.h
+++ b/cpp/src/qpid/broker/RecoveryManagerImpl.h
@@ -37,7 +37,7 @@ namespace broker {
RecoveryManagerImpl(QueueRegistry& queues, ExchangeRegistry& exchanges, uint64_t stagingThreshold);
~RecoveryManagerImpl();
- void recoverExchange(framing::Buffer& buffer);
+ RecoverableExchange::shared_ptr recoverExchange(framing::Buffer& buffer);
RecoverableQueue::shared_ptr recoverQueue(framing::Buffer& buffer);
RecoverableMessage::shared_ptr recoverMessage(framing::Buffer& buffer);
void recoveryComplete();
diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp
index f29dfc38ba..4ad1607aa2 100644
--- a/cpp/src/qpid/broker/TopicExchange.cpp
+++ b/cpp/src/qpid/broker/TopicExchange.cpp
@@ -116,24 +116,39 @@ bool TopicPattern::match(const Tokens& target) const
}
TopicExchange::TopicExchange(const string& _name) : Exchange(_name) { }
+TopicExchange::TopicExchange(const std::string& _name, bool _durable, const FieldTable& _args) : Exchange(_name, _durable, _args) {}
-void TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
+
+bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
Monitor::ScopedLock l(lock);
TopicPattern routingPattern(routingKey);
- bindings[routingPattern].push_back(queue);
+ if (isBound(queue, routingPattern)) {
+ return false;
+ } else {
+ bindings[routingPattern].push_back(queue);
+ return true;
+ }
}
-void TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
+bool TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
Monitor::ScopedLock l(lock);
BindingMap::iterator bi = bindings.find(TopicPattern(routingKey));
Queue::vector& qv(bi->second);
- if (bi == bindings.end()) return;
+ if (bi == bindings.end()) return false;
Queue::vector::iterator q = find(qv.begin(), qv.end(), queue);
- if(q == qv.end()) return;
+ if(q == qv.end()) return false;
qv.erase(q);
if(qv.empty()) bindings.erase(bi);
+ return true;
}
+bool TopicExchange::isBound(Queue::shared_ptr queue, TopicPattern& pattern)
+{
+ BindingMap::iterator bi = bindings.find(pattern);
+ if (bi == bindings.end()) return false;
+ Queue::vector& qv(bi->second);
+ return find(qv.begin(), qv.end(), queue) != qv.end();
+}
void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){
Monitor::ScopedLock l(lock);
diff --git a/cpp/src/qpid/broker/TopicExchange.h b/cpp/src/qpid/broker/TopicExchange.h
index e00731af3a..2220e0112b 100644
--- a/cpp/src/qpid/broker/TopicExchange.h
+++ b/cpp/src/qpid/broker/TopicExchange.h
@@ -76,16 +76,19 @@ class TopicExchange : public virtual Exchange{
BindingMap bindings;
qpid::sys::Mutex lock;
+ bool isBound(Queue::shared_ptr queue, TopicPattern& pattern);
public:
static const std::string typeName;
TopicExchange(const string& name);
+ TopicExchange(const string& _name, bool _durable,
+ const qpid::framing::FieldTable& _args);
- virtual std::string getType(){ return typeName; }
-
- virtual void bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args);
+ virtual std::string getType() const { return typeName; }
+
+ virtual bool bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args);
- virtual void unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args);
+ virtual bool unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args);
virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args);