diff options
author | Gordon Sim <gsim@apache.org> | 2007-05-17 11:03:55 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-05-17 11:03:55 +0000 |
commit | 9a6c0d41b19744c8e4dc4711d13a5a0afa2f7ed2 (patch) | |
tree | 539a8102197fa119c7efb77056841932e2eb5c1a /cpp/src | |
parent | decfd77364e211bc8f8784e15f54e06a79e16675 (diff) | |
download | qpid-python-9a6c0d41b19744c8e4dc4711d13a5a0afa2f7ed2.tar.gz |
Changes to support durable exchanges.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@538872 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
25 files changed, 333 insertions, 60 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index f75b1c8ac9..f1dd6b9dd8 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -124,6 +124,7 @@ libqpidbroker_la_SOURCES = \ qpid/broker/BrokerAdapter.cpp \ qpid/broker/BrokerSingleton.cpp \ qpid/broker/BrokerChannel.cpp \ + qpid/broker/BrokerExchange.cpp \ qpid/broker/BrokerMessage.cpp \ qpid/broker/BrokerMessageMessage.cpp \ qpid/broker/BrokerQueue.cpp \ diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp index 36232339e5..3c742b8d2d 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.cpp +++ b/cpp/src/qpid/broker/BrokerAdapter.cpp @@ -118,8 +118,8 @@ void BrokerAdapter::ChannelHandlerImpl::closeOk(const MethodContext&){} void BrokerAdapter::ExchangeHandlerImpl::declare(const MethodContext& context, uint16_t /*ticket*/, const string& exchange, const string& type, - bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait, - const FieldTable& /*arguments*/){ + bool passive, bool durable, bool /*autoDelete*/, bool /*internal*/, bool nowait, + const FieldTable& args){ if(passive){ if(!broker.getExchanges().get(exchange)) { @@ -127,8 +127,10 @@ void BrokerAdapter::ExchangeHandlerImpl::declare(const MethodContext& context, u } }else{ try{ - std::pair<Exchange::shared_ptr, bool> response = broker.getExchanges().declare(exchange, type); - if(!response.second && response.first->getType() != type){ + std::pair<Exchange::shared_ptr, bool> response = broker.getExchanges().declare(exchange, type, durable, args); + if (response.second) { + if (durable) broker.getStore().create(*response.first); + } else if (response.first->getType() != type) { throw ConnectionException( 530, "Exchange already declared to be of type " @@ -145,10 +147,12 @@ void BrokerAdapter::ExchangeHandlerImpl::declare(const MethodContext& context, u } void BrokerAdapter::ExchangeHandlerImpl::delete_(const MethodContext& context, uint16_t /*ticket*/, - const string& exchange, bool /*ifUnused*/, bool nowait){ + const string& name, bool /*ifUnused*/, bool nowait){ //TODO: implement unused - broker.getExchanges().destroy(exchange); + Exchange::shared_ptr exchange(broker.getExchanges().get(name)); + if (exchange->isDurable()) broker.getStore().destroy(*exchange); + broker.getExchanges().destroy(name); if(!nowait) client.deleteOk(context.getRequestId()); } @@ -174,6 +178,8 @@ void BrokerAdapter::QueueHandlerImpl::declare(const MethodContext& context, uint //add default binding: broker.getExchanges().getDefault()->bind(queue, name, 0); + + //handle automatic cleanup: if (exclusive) { connection.exclusiveQueues.push_back(queue); } else if(autoDelete){ @@ -202,7 +208,9 @@ void BrokerAdapter::QueueHandlerImpl::bind(const MethodContext& context, uint16_ Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName); if(exchange){ string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey; - exchange->bind(queue, exchangeRoutingKey, &arguments); + if (exchange->bind(queue, exchangeRoutingKey, &arguments) && exchange->isDurable() && queue->isDurable()) { + broker.getStore().bind(*exchange, *queue, routingKey, arguments); + } if(!nowait) client.bindOk(context.getRequestId()); }else{ throw ChannelException( @@ -225,7 +233,9 @@ BrokerAdapter::QueueHandlerImpl::unbind( Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName); if (!exchange.get()) throw ChannelException(404, "Unbind failed. No such exchange: " + exchangeName); - exchange->unbind(queue, routingKey, &arguments); + if (exchange->unbind(queue, routingKey, &arguments) && exchange->isDurable() && queue->isDurable()) { + broker.getStore().unbind(*exchange, *queue, routingKey, arguments); + } client.unbindOk(context.getRequestId()); } diff --git a/cpp/src/qpid/broker/BrokerExchange.cpp b/cpp/src/qpid/broker/BrokerExchange.cpp new file mode 100644 index 0000000000..4eaf40dbc8 --- /dev/null +++ b/cpp/src/qpid/broker/BrokerExchange.cpp @@ -0,0 +1,60 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "BrokerExchange.h" +#include "ExchangeRegistry.h" + +using namespace qpid::broker; +using qpid::framing::Buffer; +using qpid::framing::FieldTable; + +Exchange::shared_ptr Exchange::decode(ExchangeRegistry& exchanges, Buffer& buffer) +{ + string name; + string type; + FieldTable args; + + buffer.getShortString(name); + bool durable(buffer.getOctet()); + buffer.getShortString(type); + buffer.getFieldTable(args); + + return exchanges.declare(name, type, durable, args).first; +} + +void Exchange::encode(Buffer& buffer) const +{ + buffer.putShortString(name); + buffer.putOctet(durable); + buffer.putShortString(getType()); + buffer.putFieldTable(args); +} + +uint32_t Exchange::encodedSize() const +{ + return name.size() + 1/*short string size*/ + + 1 /*durable*/ + + getType().size() + 1/*short string size*/ + + args.size(); +} + + + diff --git a/cpp/src/qpid/broker/BrokerExchange.h b/cpp/src/qpid/broker/BrokerExchange.h index d4877a5110..62c82aa935 100644 --- a/cpp/src/qpid/broker/BrokerExchange.h +++ b/cpp/src/qpid/broker/BrokerExchange.h @@ -25,24 +25,47 @@ #include <boost/shared_ptr.hpp> #include "Deliverable.h" #include "BrokerQueue.h" +#include "MessageStore.h" +#include "PersistableExchange.h" #include "qpid/framing/FieldTable.h" namespace qpid { namespace broker { using std::string; + class ExchangeRegistry; - class Exchange{ + class Exchange : public PersistableExchange{ + private: const string name; + const bool durable; + qpid::framing::FieldTable args; + mutable uint64_t persistenceId; + public: typedef boost::shared_ptr<Exchange> shared_ptr; - explicit Exchange(const string& _name) : name(_name){} + explicit Exchange(const string& _name) : name(_name), durable(false), persistenceId(0){} + Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args) + : name(_name), durable(_durable), args(_args), persistenceId(0){} virtual ~Exchange(){} - string getName() { return name; } - virtual string getType() = 0; - virtual void bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args) = 0; - virtual void unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args) = 0; + + string getName() const { return name; } + bool isDurable() { return durable; } + qpid::framing::FieldTable& getArgs() { return args; } + + virtual string getType() const = 0; + virtual bool bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args) = 0; + virtual bool unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args) = 0; virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args) = 0; + + //PersistableExchange: + void setPersistenceId(uint64_t id) const { persistenceId = id; } + uint64_t getPersistenceId() const { return persistenceId; } + uint32_t encodedSize() const; + void encode(framing::Buffer& buffer) const; + + static Exchange::shared_ptr decode(ExchangeRegistry& exchanges, framing::Buffer& buffer); + }; } } diff --git a/cpp/src/qpid/broker/BrokerQueue.h b/cpp/src/qpid/broker/BrokerQueue.h index 28f6cfce8f..c45b35566e 100644 --- a/cpp/src/qpid/broker/BrokerQueue.h +++ b/cpp/src/qpid/broker/BrokerQueue.h @@ -122,6 +122,7 @@ namespace qpid { inline const string& getName() const { return name; } inline const bool isExclusiveOwner(const ConnectionToken* const o) const { return o == owner; } inline bool hasExclusiveConsumer() const { return exclusive; } + inline bool isDurable() const { return store != 0; } bool canAutoDelete() const; diff --git a/cpp/src/qpid/broker/DirectExchange.cpp b/cpp/src/qpid/broker/DirectExchange.cpp index 7d15410374..ec77efa0f3 100644 --- a/cpp/src/qpid/broker/DirectExchange.cpp +++ b/cpp/src/qpid/broker/DirectExchange.cpp @@ -25,29 +25,34 @@ using namespace qpid::broker; using namespace qpid::framing; using namespace qpid::sys; -DirectExchange::DirectExchange(const string& _name) : Exchange(_name) { +DirectExchange::DirectExchange(const string& _name) : Exchange(_name) {} +DirectExchange::DirectExchange(const std::string& _name, bool _durable, const FieldTable& _args) : Exchange(_name, _durable, _args) {} -} - -void DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable*){ +bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable*){ Mutex::ScopedLock l(lock); std::vector<Queue::shared_ptr>& queues(bindings[routingKey]); std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue); - if(i == queues.end()){ + if (i == queues.end()) { bindings[routingKey].push_back(queue); + return true; + } else{ + return false; } } -void DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){ +bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){ Mutex::ScopedLock l(lock); std::vector<Queue::shared_ptr>& queues(bindings[routingKey]); std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue); - if(i < queues.end()){ + if (i < queues.end()) { queues.erase(i); if(queues.empty()){ bindings.erase(routingKey); } + return true; + } else { + return false; } } diff --git a/cpp/src/qpid/broker/DirectExchange.h b/cpp/src/qpid/broker/DirectExchange.h index 886b59be30..a06da10f6f 100644 --- a/cpp/src/qpid/broker/DirectExchange.h +++ b/cpp/src/qpid/broker/DirectExchange.h @@ -39,12 +39,14 @@ namespace broker { static const std::string typeName; DirectExchange(const std::string& name); + DirectExchange(const string& _name, bool _durable, + const qpid::framing::FieldTable& _args); - virtual std::string getType(){ return typeName; } + virtual std::string getType() const { return typeName; } - virtual void bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); + virtual bool bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); - virtual void unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); + virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args); diff --git a/cpp/src/qpid/broker/ExchangeRegistry.cpp b/cpp/src/qpid/broker/ExchangeRegistry.cpp index 03863673df..3bf211b960 100644 --- a/cpp/src/qpid/broker/ExchangeRegistry.cpp +++ b/cpp/src/qpid/broker/ExchangeRegistry.cpp @@ -27,21 +27,30 @@ using namespace qpid::broker; using namespace qpid::sys; using std::pair; +using qpid::framing::FieldTable; -pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type) throw(UnknownExchangeTypeException){ +pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type) + throw(UnknownExchangeTypeException){ + + return declare(name, type, false, FieldTable()); +} + +pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type, + bool durable, const FieldTable& args) + throw(UnknownExchangeTypeException){ Mutex::ScopedLock locker(lock); ExchangeMap::iterator i = exchanges.find(name); if (i == exchanges.end()) { Exchange::shared_ptr exchange; if(type == TopicExchange::typeName){ - exchange = Exchange::shared_ptr(new TopicExchange(name)); + exchange = Exchange::shared_ptr(new TopicExchange(name, durable, args)); }else if(type == DirectExchange::typeName){ - exchange = Exchange::shared_ptr(new DirectExchange(name)); + exchange = Exchange::shared_ptr(new DirectExchange(name, durable, args)); }else if(type == FanOutExchange::typeName){ - exchange = Exchange::shared_ptr(new FanOutExchange(name)); + exchange = Exchange::shared_ptr(new FanOutExchange(name, durable, args)); }else if (type == HeadersExchange::typeName) { - exchange = Exchange::shared_ptr(new HeadersExchange(name)); + exchange = Exchange::shared_ptr(new HeadersExchange(name, durable, args)); }else{ throw UnknownExchangeTypeException(); } @@ -54,7 +63,10 @@ pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, c void ExchangeRegistry::destroy(const string& name){ Mutex::ScopedLock locker(lock); - exchanges.erase(name); + ExchangeMap::iterator i = exchanges.find(name); + if (i != exchanges.end()) { + exchanges.erase(i); + } } Exchange::shared_ptr ExchangeRegistry::get(const string& name){ diff --git a/cpp/src/qpid/broker/ExchangeRegistry.h b/cpp/src/qpid/broker/ExchangeRegistry.h index ff7399ba22..59fe51691b 100644 --- a/cpp/src/qpid/broker/ExchangeRegistry.h +++ b/cpp/src/qpid/broker/ExchangeRegistry.h @@ -24,6 +24,8 @@ #include <map> #include "BrokerExchange.h" +#include "MessageStore.h" +#include "qpid/framing/FieldTable.h" #include "qpid/sys/Monitor.h" namespace qpid { @@ -34,8 +36,12 @@ namespace broker { typedef std::map<std::string, Exchange::shared_ptr> ExchangeMap; ExchangeMap exchanges; qpid::sys::Mutex lock; - public: - std::pair<Exchange::shared_ptr, bool> declare(const std::string& name, const std::string& type) throw(UnknownExchangeTypeException); + public: + std::pair<Exchange::shared_ptr, bool> declare(const std::string& name, const std::string& type) + throw(UnknownExchangeTypeException); + std::pair<Exchange::shared_ptr, bool> declare(const std::string& name, const std::string& type, + bool durable, const qpid::framing::FieldTable& args) + throw(UnknownExchangeTypeException); void destroy(const std::string& name); Exchange::shared_ptr get(const std::string& name); Exchange::shared_ptr getDefault(); diff --git a/cpp/src/qpid/broker/FanOutExchange.cpp b/cpp/src/qpid/broker/FanOutExchange.cpp index 1ac92c89e2..5f3a66d115 100644 --- a/cpp/src/qpid/broker/FanOutExchange.cpp +++ b/cpp/src/qpid/broker/FanOutExchange.cpp @@ -26,21 +26,28 @@ using namespace qpid::framing; using namespace qpid::sys; FanOutExchange::FanOutExchange(const std::string& _name) : Exchange(_name) {} +FanOutExchange::FanOutExchange(const std::string& _name, bool _durable, const FieldTable& _args) : Exchange(_name, _durable, _args) {} -void FanOutExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* /*args*/){ +bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* /*args*/){ Mutex::ScopedLock locker(lock); // Add if not already present. Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue); if (i == bindings.end()) { bindings.push_back(queue); + return true; + } else { + return false; } } -void FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* /*args*/){ +bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* /*args*/){ Mutex::ScopedLock locker(lock); Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue); if (i != bindings.end()) { bindings.erase(i); + return true; + } else { + return false; } } diff --git a/cpp/src/qpid/broker/FanOutExchange.h b/cpp/src/qpid/broker/FanOutExchange.h index b6a803673f..cfab710a35 100644 --- a/cpp/src/qpid/broker/FanOutExchange.h +++ b/cpp/src/qpid/broker/FanOutExchange.h @@ -40,12 +40,14 @@ class FanOutExchange : public virtual Exchange { static const std::string typeName; FanOutExchange(const std::string& name); + FanOutExchange(const string& _name, bool _durable, + const qpid::framing::FieldTable& _args); - virtual std::string getType(){ return typeName; } + virtual std::string getType() const { return typeName; } - virtual void bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); + virtual bool bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); - virtual void unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); + virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args); diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp index fef1f3fa4b..c33d638fce 100644 --- a/cpp/src/qpid/broker/HeadersExchange.cpp +++ b/cpp/src/qpid/broker/HeadersExchange.cpp @@ -41,21 +41,35 @@ namespace { } HeadersExchange::HeadersExchange(const string& _name) : Exchange(_name) { } +HeadersExchange::HeadersExchange(const std::string& _name, bool _durable, const FieldTable& _args) : Exchange(_name, _durable, _args) {} -void HeadersExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* args){ +bool HeadersExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* args){ Mutex::ScopedLock locker(lock); std::string what = args->getString("x-match"); if (what != all && what != any) { THROW_QPID_ERROR(PROTOCOL_ERROR, "Invalid x-match value binding to headers exchange."); } - bindings.push_back(Binding(*args, queue)); + Binding binding(*args, queue); + Bindings::iterator i = + std::find(bindings.begin(),bindings.end(), binding); + if (i == bindings.end()) { + bindings.push_back(binding); + return true; + } else { + return false; + } } -void HeadersExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* args){ +bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* args){ Mutex::ScopedLock locker(lock); Bindings::iterator i = std::find(bindings.begin(),bindings.end(), Binding(*args, queue)); - if (i != bindings.end()) bindings.erase(i); + if (i != bindings.end()) { + bindings.erase(i); + return true; + } else { + return false; + } } diff --git a/cpp/src/qpid/broker/HeadersExchange.h b/cpp/src/qpid/broker/HeadersExchange.h index e92b6f19cf..e35ef21ccd 100644 --- a/cpp/src/qpid/broker/HeadersExchange.h +++ b/cpp/src/qpid/broker/HeadersExchange.h @@ -43,12 +43,14 @@ class HeadersExchange : public virtual Exchange { static const std::string typeName; HeadersExchange(const string& name); + HeadersExchange(const string& _name, bool _durable, + const qpid::framing::FieldTable& _args); - virtual std::string getType(){ return typeName; } + virtual std::string getType() const { return typeName; } - virtual void bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args); + virtual bool bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args); - virtual void unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args); + virtual bool unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args); virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args); diff --git a/cpp/src/qpid/broker/MessageStore.h b/cpp/src/qpid/broker/MessageStore.h index 1d9ee86e48..1c02f94727 100644 --- a/cpp/src/qpid/broker/MessageStore.h +++ b/cpp/src/qpid/broker/MessageStore.h @@ -56,9 +56,21 @@ public: virtual void destroy(const PersistableExchange& exchange) = 0; /** + * Record a binding + */ + virtual void bind(const PersistableExchange& exchange, const PersistableQueue& queue, + const std::string& key, const framing::FieldTable& args) = 0; + + /** + * Forget a binding + */ + virtual void unbind(const PersistableExchange& exchange, const PersistableQueue& queue, + const std::string& key, const framing::FieldTable& args) = 0; + + /** * Request recovery of queue and message state from store */ - virtual void recover(RecoveryManager& queues) = 0; + virtual void recover(RecoveryManager& recoverer) = 0; /** * Stores a messages before it has been enqueued diff --git a/cpp/src/qpid/broker/MessageStoreModule.cpp b/cpp/src/qpid/broker/MessageStoreModule.cpp index 17e5d3cca8..0457643b75 100644 --- a/cpp/src/qpid/broker/MessageStoreModule.cpp +++ b/cpp/src/qpid/broker/MessageStoreModule.cpp @@ -48,6 +48,18 @@ void MessageStoreModule::destroy(const PersistableExchange& exchange) store->destroy(exchange); } +void MessageStoreModule::bind(const PersistableExchange& e, const PersistableQueue& q, + const std::string& k, const framing::FieldTable& a) +{ + store->bind(e, q, k, a); +} + +void MessageStoreModule::unbind(const PersistableExchange& e, const PersistableQueue& q, + const std::string& k, const framing::FieldTable& a) +{ + store->unbind(e, q, k, a); +} + void MessageStoreModule::recover(RecoveryManager& registry) { store->recover(registry); diff --git a/cpp/src/qpid/broker/MessageStoreModule.h b/cpp/src/qpid/broker/MessageStoreModule.h index 59c45f68f6..078d2c1fdf 100644 --- a/cpp/src/qpid/broker/MessageStoreModule.h +++ b/cpp/src/qpid/broker/MessageStoreModule.h @@ -50,6 +50,10 @@ public: void destroy(const PersistableQueue& queue); void create(const PersistableExchange& exchange); void destroy(const PersistableExchange& exchange); + void bind(const PersistableExchange& exchange, const PersistableQueue& queue, + const std::string& key, const framing::FieldTable& args); + void unbind(const PersistableExchange& exchange, const PersistableQueue& queue, + const std::string& key, const framing::FieldTable& args); void recover(RecoveryManager& queues); void stage(PersistableMessage& msg); void destroy(PersistableMessage& msg); diff --git a/cpp/src/qpid/broker/NullMessageStore.cpp b/cpp/src/qpid/broker/NullMessageStore.cpp index 393bbb8f02..686c2238ff 100644 --- a/cpp/src/qpid/broker/NullMessageStore.cpp +++ b/cpp/src/qpid/broker/NullMessageStore.cpp @@ -39,13 +39,18 @@ void NullMessageStore::destroy(const PersistableQueue& queue) if (warn) std::cout << "WARNING: Can't destroy durable queue '" << queue.getName() << "'. Persistence not enabled." << std::endl; } -void NullMessageStore::create(const PersistableExchange&) +void NullMessageStore::create(const PersistableExchange& exchange) { + if (warn) std::cout << "WARNING: Can't create durable exchange '" << exchange.getName() << "'. Persistence not enabled." << std::endl; } -void NullMessageStore::destroy(const PersistableExchange&) +void NullMessageStore::destroy(const PersistableExchange& exchange) { + if (warn) std::cout << "WARNING: Can't destroy durable exchange '" << exchange.getName() << "'. Persistence not enabled." << std::endl; } +void NullMessageStore::bind(const PersistableExchange&, const PersistableQueue&, const std::string&, const framing::FieldTable&){} + +void NullMessageStore::unbind(const PersistableExchange&, const PersistableQueue&, const std::string&, const framing::FieldTable&){} void NullMessageStore::recover(RecoveryManager&) { diff --git a/cpp/src/qpid/broker/NullMessageStore.h b/cpp/src/qpid/broker/NullMessageStore.h index e0b215bb39..2835961048 100644 --- a/cpp/src/qpid/broker/NullMessageStore.h +++ b/cpp/src/qpid/broker/NullMessageStore.h @@ -48,6 +48,11 @@ public: virtual void destroy(const PersistableQueue& queue); virtual void create(const PersistableExchange& exchange); virtual void destroy(const PersistableExchange& exchange); + + virtual void bind(const PersistableExchange& exchange, const PersistableQueue& queue, + const std::string& key, const framing::FieldTable& args); + virtual void unbind(const PersistableExchange& exchange, const PersistableQueue& queue, + const std::string& key, const framing::FieldTable& args); virtual void recover(RecoveryManager& queues); virtual void stage(PersistableMessage& msg); virtual void destroy(PersistableMessage& msg); diff --git a/cpp/src/qpid/broker/PersistableExchange.h b/cpp/src/qpid/broker/PersistableExchange.h index 9badf5f609..9ba883cec0 100644 --- a/cpp/src/qpid/broker/PersistableExchange.h +++ b/cpp/src/qpid/broker/PersistableExchange.h @@ -35,6 +35,7 @@ namespace broker { class PersistableExchange : public Persistable { public: + virtual std::string getName() const = 0; virtual ~PersistableExchange() {}; }; diff --git a/cpp/src/qpid/broker/RecoverableExchange.h b/cpp/src/qpid/broker/RecoverableExchange.h new file mode 100644 index 0000000000..0af4aea232 --- /dev/null +++ b/cpp/src/qpid/broker/RecoverableExchange.h @@ -0,0 +1,49 @@ +#ifndef _broker_RecoverableExchange_h +#define _broker_RecoverableExchange_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <boost/shared_ptr.hpp> + +namespace qpid { +namespace broker { + +/** + * The interface through which bindings are recovered. + */ +class RecoverableExchange +{ +public: + typedef boost::shared_ptr<RecoverableExchange> shared_ptr; + + virtual void setPersistenceId(uint64_t id) = 0; + /** + * Recover binding. Nb: queue must have been recovered earlier. + */ + virtual void bind(std::string& queue, std::string& routingKey, qpid::framing::FieldTable& args) = 0; + virtual ~RecoverableExchange() {}; +}; + +}} + + +#endif diff --git a/cpp/src/qpid/broker/RecoveryManager.h b/cpp/src/qpid/broker/RecoveryManager.h index 27f3d9355c..aae2bbe3ac 100644 --- a/cpp/src/qpid/broker/RecoveryManager.h +++ b/cpp/src/qpid/broker/RecoveryManager.h @@ -21,6 +21,7 @@ #ifndef _RecoveryManager_ #define _RecoveryManager_ +#include "RecoverableExchange.h" #include "RecoverableQueue.h" #include "RecoverableMessage.h" #include "qpid/framing/Buffer.h" @@ -31,7 +32,7 @@ namespace broker { class RecoveryManager{ public: virtual ~RecoveryManager(){} - virtual void recoverExchange(framing::Buffer& buffer) = 0; + virtual RecoverableExchange::shared_ptr recoverExchange(framing::Buffer& buffer) = 0; virtual RecoverableQueue::shared_ptr recoverQueue(framing::Buffer& buffer) = 0; virtual RecoverableMessage::shared_ptr recoverMessage(framing::Buffer& buffer) = 0; virtual void recoveryComplete() = 0; diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp index f82399f95c..355c8de926 100644 --- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp +++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp @@ -61,9 +61,19 @@ public: void recover(RecoverableMessage::shared_ptr msg); }; -void RecoveryManagerImpl::recoverExchange(framing::Buffer&) +class RecoverableExchangeImpl : public RecoverableExchange { - //TODO + Exchange::shared_ptr exchange; + QueueRegistry& queues; +public: + RecoverableExchangeImpl(Exchange::shared_ptr _exchange, QueueRegistry& _queues) : exchange(_exchange), queues(_queues) {} + void setPersistenceId(uint64_t id); + void bind(std::string& queue, std::string& routingKey, qpid::framing::FieldTable& args); +}; + +RecoverableExchange::shared_ptr RecoveryManagerImpl::recoverExchange(framing::Buffer& buffer) +{ + return RecoverableExchange::shared_ptr(new RecoverableExchangeImpl(Exchange::decode(exchanges, buffer), queues)); } RecoverableQueue::shared_ptr RecoveryManagerImpl::recoverQueue(framing::Buffer& buffer) @@ -141,3 +151,14 @@ void RecoverableQueueImpl::setPersistenceId(uint64_t id) { queue->setPersistenceId(id); } + +void RecoverableExchangeImpl::setPersistenceId(uint64_t id) +{ + exchange->setPersistenceId(id); +} + +void RecoverableExchangeImpl::bind(string& queueName, string& key, framing::FieldTable& args) +{ + Queue::shared_ptr queue = queues.find(queueName); + exchange->bind(queue, key, &args); +} diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.h b/cpp/src/qpid/broker/RecoveryManagerImpl.h index c40de7895f..7802eee711 100644 --- a/cpp/src/qpid/broker/RecoveryManagerImpl.h +++ b/cpp/src/qpid/broker/RecoveryManagerImpl.h @@ -37,7 +37,7 @@ namespace broker { RecoveryManagerImpl(QueueRegistry& queues, ExchangeRegistry& exchanges, uint64_t stagingThreshold); ~RecoveryManagerImpl(); - void recoverExchange(framing::Buffer& buffer); + RecoverableExchange::shared_ptr recoverExchange(framing::Buffer& buffer); RecoverableQueue::shared_ptr recoverQueue(framing::Buffer& buffer); RecoverableMessage::shared_ptr recoverMessage(framing::Buffer& buffer); void recoveryComplete(); diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp index f29dfc38ba..4ad1607aa2 100644 --- a/cpp/src/qpid/broker/TopicExchange.cpp +++ b/cpp/src/qpid/broker/TopicExchange.cpp @@ -116,24 +116,39 @@ bool TopicPattern::match(const Tokens& target) const } TopicExchange::TopicExchange(const string& _name) : Exchange(_name) { } +TopicExchange::TopicExchange(const std::string& _name, bool _durable, const FieldTable& _args) : Exchange(_name, _durable, _args) {} -void TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){ + +bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){ Monitor::ScopedLock l(lock); TopicPattern routingPattern(routingKey); - bindings[routingPattern].push_back(queue); + if (isBound(queue, routingPattern)) { + return false; + } else { + bindings[routingPattern].push_back(queue); + return true; + } } -void TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){ +bool TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){ Monitor::ScopedLock l(lock); BindingMap::iterator bi = bindings.find(TopicPattern(routingKey)); Queue::vector& qv(bi->second); - if (bi == bindings.end()) return; + if (bi == bindings.end()) return false; Queue::vector::iterator q = find(qv.begin(), qv.end(), queue); - if(q == qv.end()) return; + if(q == qv.end()) return false; qv.erase(q); if(qv.empty()) bindings.erase(bi); + return true; } +bool TopicExchange::isBound(Queue::shared_ptr queue, TopicPattern& pattern) +{ + BindingMap::iterator bi = bindings.find(pattern); + if (bi == bindings.end()) return false; + Queue::vector& qv(bi->second); + return find(qv.begin(), qv.end(), queue) != qv.end(); +} void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){ Monitor::ScopedLock l(lock); diff --git a/cpp/src/qpid/broker/TopicExchange.h b/cpp/src/qpid/broker/TopicExchange.h index e00731af3a..2220e0112b 100644 --- a/cpp/src/qpid/broker/TopicExchange.h +++ b/cpp/src/qpid/broker/TopicExchange.h @@ -76,16 +76,19 @@ class TopicExchange : public virtual Exchange{ BindingMap bindings; qpid::sys::Mutex lock; + bool isBound(Queue::shared_ptr queue, TopicPattern& pattern); public: static const std::string typeName; TopicExchange(const string& name); + TopicExchange(const string& _name, bool _durable, + const qpid::framing::FieldTable& _args); - virtual std::string getType(){ return typeName; } - - virtual void bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args); + virtual std::string getType() const { return typeName; } + + virtual bool bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args); - virtual void unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args); + virtual bool unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args); virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args); |