diff options
Diffstat (limited to 'cpp/src')
61 files changed, 393 insertions, 198 deletions
diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt index ee25393f25..9b641558f8 100644 --- a/cpp/src/CMakeLists.txt +++ b/cpp/src/CMakeLists.txt @@ -1139,6 +1139,7 @@ set (qpidbroker_SOURCES qpid/broker/DtxManager.cpp qpid/broker/DtxTimeout.cpp qpid/broker/DtxWorkRecord.cpp + qpid/broker/EnqueueHandle.cpp qpid/broker/ExchangeRegistry.cpp qpid/broker/FanOutExchange.cpp qpid/broker/HeadersExchange.cpp @@ -1153,11 +1154,13 @@ set (qpidbroker_SOURCES # qpid/broker/MessageStoreModule.cpp qpid/broker/NameGenerator.cpp # qpid/broker/NullMessageStore.cpp + qpid/broker/QueueAsyncContext.cpp qpid/broker/QueueBindings.cpp qpid/broker/QueuedMessage.cpp qpid/broker/QueueCursor.cpp qpid/broker/QueueDepth.cpp qpid/broker/QueueFactory.cpp + qpid/broker/QueueHandle.cpp qpid/broker/QueueRegistry.cpp qpid/broker/QueueSettings.cpp qpid/broker/QueueFlowLimit.cpp @@ -1184,6 +1187,7 @@ set (qpidbroker_SOURCES qpid/broker/TopicExchange.cpp qpid/broker/TxAccept.cpp qpid/broker/TxBuffer.cpp + qpid/broker/TxnHandle.cpp qpid/broker/Vhost.cpp qpid/broker/amqp_0_10/MessageTransfer.cpp qpid/management/ManagementAgent.cpp @@ -1191,8 +1195,6 @@ set (qpidbroker_SOURCES qpid/management/ManagementTopicExchange.cpp qpid/sys/TCPIOPlugin.cpp # New async store objects and new versions of broker objects -# qpid/broker/AsyncResultHandle.cpp -# qpid/broker/AsyncResultHandleImpl.cpp # qpid/broker/IdHandle.cpp # qpid/broker/TxnAsyncContext.cpp # qpid/broker/TxnBuffer.cpp diff --git a/cpp/src/qpid/SessionState.cpp b/cpp/src/qpid/SessionState.cpp index e5019604d2..3acf9fe715 100644 --- a/cpp/src/qpid/SessionState.cpp +++ b/cpp/src/qpid/SessionState.cpp @@ -126,7 +126,7 @@ void SessionState::senderRecord(const AMQFrame& f) { sender.incomplete += sender.sendPoint.command; sender.sendPoint.advance(f); if (config.replayHardLimit && config.replayHardLimit < sender.replaySize) - throw ResourceLimitExceededException("Replay buffer exceeeded hard limit"); + throw ResourceLimitExceededException("Replay buffer exceeded hard limit"); } static const uint32_t SPONTANEOUS_REQUEST_INTERVAL = 65536; diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp index d1706b5907..4604ac643f 100644 --- a/cpp/src/qpid/broker/Bridge.cpp +++ b/cpp/src/qpid/broker/Bridge.cpp @@ -86,7 +86,7 @@ Bridge::~Bridge() mgmtObject->resourceDestroy(); } -void Bridge::create(Connection& c) +void Bridge::create(Connection& c, AsyncStore* const store) { detached = false; // Reset detached in case we are recovering. connState = &c; @@ -153,7 +153,7 @@ void Bridge::create(Connection& c) Exchange::shared_ptr exchange = link->getBroker()->getExchanges().get(args.i_src); if (exchange.get() == 0) throw Exception("Exchange not found for dynamic route"); - exchange->registerDynamicBridge(this); + exchange->registerDynamicBridge(this, store); QPID_LOG(debug, "Activated bridge " << name << " for dynamic route for exchange " << args.i_src); } else { QPID_LOG(debug, "Activated bridge " << name << " for static route from exchange " << args.i_src << " to " << args.i_dest); diff --git a/cpp/src/qpid/broker/Bridge.h b/cpp/src/qpid/broker/Bridge.h index ee298afd45..04ac585d80 100644 --- a/cpp/src/qpid/broker/Bridge.h +++ b/cpp/src/qpid/broker/Bridge.h @@ -141,7 +141,7 @@ class Bridge : public PersistableConfig, bool resetProxy(); // connection Management (called by owning Link) - void create(Connection& c); + void create(Connection& c, AsyncStore* const store); void cancel(Connection& c); void closed(); friend class Link; // to call create, cancel, closed() diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 2382205268..08606516d4 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -421,15 +421,14 @@ void Broker::setStore () { // static void Broker::recoverComplete(const AsyncResultHandle* const arh) { - std::cout << "@@@@ Recover complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl; + std::cout << "@@@@ Broker: Recover complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl; } // static void Broker::configureComplete(const AsyncResultHandle* const arh) { - std::cout << "@@@@ Configure complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl; + std::cout << "@@@@ Broker: Configure complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl; } - void Broker::run() { if (config.workerThreads > 0) { QPID_LOG(notice, "Broker running"); @@ -1124,7 +1123,7 @@ std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue( std::pair<Queue::shared_ptr, bool> result = queues.declare(name, settings, alternate); if (result.second) { //add default binding: - result.first->bind(exchanges.getDefault(), name); + result.first->bind(exchanges.getDefault(), name, qpid::framing::FieldTable()); if (managementAgent.get()) { //TODO: debatable whether we should raise an event here for @@ -1206,6 +1205,7 @@ std::pair<Exchange::shared_ptr, bool> Broker::createExchange( if (durable) { // store->create(*result.first, arguments); ConfigHandle ch = asyncStore->createConfigHandle(); + result.first->setHandle(ch); boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureComplete, &asyncResultQueue)); asyncStore->submitCreate(ch, result.first.get(), bc); } @@ -1249,8 +1249,9 @@ void Broker::deleteExchange(const std::string& name, const std::string& userId, if (exchange->inUseAsAlternate()) throw framing::NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange.")); // if (exchange->isDurable()) store->destroy(*exchange); if (exchange->isDurable()) { -// boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureComplete, &asyncResultQueue)); -// asyncStore->submitDestroy(exchange.getHandle(), bc); + boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureComplete, &asyncResultQueue)); + asyncStore->submitDestroy(exchange->getHandle(), bc); + exchange->resetHandle(); } if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers(); exchanges.destroy(name); @@ -1326,11 +1327,11 @@ void Broker::unbind(const std::string& queueName, } else if (!exchange) { throw framing::NotFoundException(QPID_MSG("Unbind failed. No such exchange: " << exchangeName)); } else { - if (exchange->unbind(queue, key, 0)) { - if (exchange->isDurable() && queue->isDurable()) { + if (exchange->unbind(queue, key, 0, asyncStore.get())) { +// Move this block into Exchange which keeps the broker context. +// if (exchange->isDurable() && queue->isDurable()) { // store->unbind(*exchange, *queue, key, qpid::framing::FieldTable()); - // TODO: kpvdr: Async config destroy here - } +// } getConfigurationObservers().unbind( exchange, queue, key, framing::FieldTable()); if (managementAgent.get()) { diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index e4d1d93423..698d446bca 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -222,7 +222,8 @@ class Broker : public sys::Runnable, public Plugin::Target, // QPID_BROKER_EXTERN void setStore (boost::shared_ptr<MessageStore>& store); void setStore(boost::shared_ptr<AsyncStore>& asyncStore); // MessageStore& getStore() { return *store; } - AsyncStore& getStore() { return *asyncStore; } +// AsyncStore& getStore() { return *asyncStore; } + AsyncStore* getStore() { return asyncStore.get(); } void setAcl (AclModule* _acl) {acl = _acl;} AclModule* getAcl() { return acl; } QueueRegistry& getQueues() { return queues; } @@ -231,6 +232,7 @@ class Broker : public sys::Runnable, public Plugin::Target, DtxManager& getDtxManager() { return dtxManager; } DataDir& getDataDir() { return dataDir; } Options& getOptions() { return config; } + AsyncResultQueueImpl& getAsyncResultQueue() { return asyncResultQueue; } void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) { expiryPolicy = e; } boost::intrusive_ptr<ExpiryPolicy> getExpiryPolicy() { return expiryPolicy; } diff --git a/cpp/src/qpid/broker/DirectExchange.cpp b/cpp/src/qpid/broker/DirectExchange.cpp index c56a1da6cc..b1130c3ec0 100644 --- a/cpp/src/qpid/broker/DirectExchange.cpp +++ b/cpp/src/qpid/broker/DirectExchange.cpp @@ -53,7 +53,7 @@ DirectExchange::DirectExchange(const string& _name, bool _durable, mgmtExchange->set_type(typeName); } -bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args) +bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args, AsyncStore* const store) { string fedOp(fedOpBind); string fedTags; @@ -88,6 +88,8 @@ bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, con bk.fedBinding.addOrigin(queue->getName(), fedOrigin); return false; } + persistBind(b, store); + } else if (fedOp == fedOpUnbind) { Mutex::ScopedLock l(lock); BoundKey& bk = bindings[routingKey]; @@ -97,7 +99,7 @@ bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, con propagate = bk.fedBinding.delOrigin(queue->getName(), fedOrigin); if (bk.fedBinding.countFedBindings(queue->getName()) == 0) - unbind(queue, routingKey, args); + unbind(queue, routingKey, args, store); } else if (fedOp == fedOpReorigin) { /** gather up all the keys that need rebinding in a local vector @@ -127,7 +129,7 @@ bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, con return true; } -bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args) +bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args, AsyncStore* const /*store*/) { string fedOrigin(args ? args->getAsString(qpidFedOrigin) : ""); bool propagate = false; @@ -156,6 +158,12 @@ bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, c return true; } +//boost::shared_ptr<Exchange::Binding> DirectExchange::getBinding(boost::shared_ptr<Queue> queue, const std::string& routingKey) { +// Mutex::ScopedLock l(lock); +// BoundKey& bk = bindings[routingKey]; +// return bk.queues[routingKey]; +//} + void DirectExchange::route(Deliverable& msg) { const string& routingKey = msg.getMessage().getRoutingKey(); diff --git a/cpp/src/qpid/broker/DirectExchange.h b/cpp/src/qpid/broker/DirectExchange.h index a0e1477d0c..6f0d470c54 100644 --- a/cpp/src/qpid/broker/DirectExchange.h +++ b/cpp/src/qpid/broker/DirectExchange.h @@ -55,12 +55,17 @@ public: QPID_BROKER_EXTERN virtual bool bind(boost::shared_ptr<Queue> queue, const std::string& routingKey, - const qpid::framing::FieldTable* args); - virtual bool unbind(boost::shared_ptr<Queue> queue, const std::string& routingKey, const qpid::framing::FieldTable* args); + const qpid::framing::FieldTable* args, + AsyncStore* const store); + virtual bool unbind(boost::shared_ptr<Queue> queue, + const std::string& routingKey, + const qpid::framing::FieldTable* args, + AsyncStore* const store); QPID_BROKER_EXTERN virtual void route(Deliverable& msg); QPID_BROKER_EXTERN virtual bool isBound(boost::shared_ptr<Queue> queue, const std::string* const routingKey, const qpid::framing::FieldTable* const args); +// boost::shared_ptr<Binding> getBinding(boost::shared_ptr<Queue> queue, const std::string& routingKey); QPID_BROKER_EXTERN virtual ~DirectExchange(); diff --git a/cpp/src/qpid/broker/DtxManager.cpp b/cpp/src/qpid/broker/DtxManager.cpp index 7e2eb927a3..c55771c4e6 100644 --- a/cpp/src/qpid/broker/DtxManager.cpp +++ b/cpp/src/qpid/broker/DtxManager.cpp @@ -175,7 +175,7 @@ void DtxManager::DtxCleanup::fire() } //void DtxManager::setStore (TransactionalStore* _store) -void DtxManager::setStore (AsyncTransactionalStore* _ats) +void DtxManager::setStore (AsyncTransactionalStore* const _ats) { // store = _store; asyncTxnStore = _ats; diff --git a/cpp/src/qpid/broker/DtxManager.h b/cpp/src/qpid/broker/DtxManager.h index fe20a89c32..cbc66d6391 100644 --- a/cpp/src/qpid/broker/DtxManager.h +++ b/cpp/src/qpid/broker/DtxManager.h @@ -68,7 +68,7 @@ public: uint32_t getTimeout(const std::string& xid); void timedout(const std::string& xid); // void setStore(TransactionalStore* store); - void setStore(AsyncTransactionalStore* ats); + void setStore(AsyncTransactionalStore* const ats); void setTimer(sys::Timer& t) { timer = &t; } // Used by cluster for replication. diff --git a/cpp/src/qpid/broker/DtxWorkRecord.h b/cpp/src/qpid/broker/DtxWorkRecord.h index 9dd86bdcad..579579df2d 100644 --- a/cpp/src/qpid/broker/DtxWorkRecord.h +++ b/cpp/src/qpid/broker/DtxWorkRecord.h @@ -50,7 +50,7 @@ class DtxWorkRecord const std::string xid; // TransactionalStore* const store; - AsyncTransactionalStore* const asyncTxnStore; + AsyncTransactionalStore* asyncTxnStore; bool completed; bool rolledback; bool prepared; @@ -66,7 +66,7 @@ class DtxWorkRecord public: QPID_BROKER_EXTERN DtxWorkRecord(const std::string& xid, // TransactionalStore* const store); - AsyncTransactionalStore* const store); + AsyncTransactionalStore* const store); QPID_BROKER_EXTERN ~DtxWorkRecord(); QPID_BROKER_EXTERN bool prepare(); QPID_BROKER_EXTERN bool commit(bool onePhase); diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp index bb5dc2b807..2414981481 100644 --- a/cpp/src/qpid/broker/Exchange.cpp +++ b/cpp/src/qpid/broker/Exchange.cpp @@ -19,7 +19,9 @@ * */ +#include "qpid/broker/AsyncResultHandle.h" #include "qpid/broker/Broker.h" +#include "qpid/broker/ConfigAsyncContext.h" #include "qpid/broker/DeliverableMessage.h" #include "qpid/broker/Exchange.h" #include "qpid/broker/ExchangeRegistry.h" @@ -299,7 +301,7 @@ ManagementObject* Exchange::GetManagementObject (void) const return (ManagementObject*) mgmtExchange; } -void Exchange::registerDynamicBridge(DynamicBridge* db) +void Exchange::registerDynamicBridge(DynamicBridge* db, AsyncStore* const store) { if (!supportsDynamicBinding()) throw Exception("Exchange type does not support dynamic binding"); @@ -315,7 +317,7 @@ void Exchange::registerDynamicBridge(DynamicBridge* db) FieldTable args; args.setString(qpidFedOp, fedOpReorigin); - bind(Queue::shared_ptr(), string(), &args); + bind(Queue::shared_ptr(), string(), &args, store); } void Exchange::removeDynamicBridge(DynamicBridge* db) @@ -344,8 +346,8 @@ void Exchange::propagateFedOp(const string& routingKey, const string& tags, cons } Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchange* _parent, - FieldTable _args, const string& _origin) - : parent(_parent), queue(_queue), key(_key), args(_args), origin(_origin), mgmtBinding(0) + FieldTable _args, const string& _origin, ConfigHandle _cfgHandle) + : parent(_parent), queue(_queue), key(_key), args(_args), origin(_origin), cfgHandle(_cfgHandle), mgmtBinding(0) { } @@ -388,6 +390,30 @@ ManagementObject* Exchange::Binding::GetManagementObject () const return (ManagementObject*) mgmtBinding; } +uint64_t Exchange::Binding::getSize() { return 0; } // TODO: kpvdr: implement persistence +void Exchange::Binding::write(char* /*target*/) {} // TODO: kpvdr: implement persistence + +void Exchange::persistBind(Binding::shared_ptr b, AsyncStore* const s) { + if (s && broker != 0 && b->queue->isDurable() && isDurable()) { + b->cfgHandle = s->createConfigHandle(); + boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureComplete, &broker->getAsyncResultQueue())); + s->submitCreate(b->cfgHandle, b.get(), bc); + } +} + +void Exchange::persistUnbind(Binding::shared_ptr b, AsyncStore* const s) { + if (s && broker != 0 && b->queue->isDurable() && isDurable()) { + boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureComplete, &broker->getAsyncResultQueue())); + s->submitDestroy(b->cfgHandle, bc); + b->cfgHandle.reset(); + } +} + +// static +void Exchange::configureComplete(const AsyncResultHandle* const arh) { + std::cout << "@@@@ Exchange: Configure complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl; +} + Exchange::MatchQueue::MatchQueue(Queue::shared_ptr q) : queue(q) {} bool Exchange::MatchQueue::operator()(Exchange::Binding::shared_ptr b) diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h index b4c6b799a4..df6d5d05a4 100644 --- a/cpp/src/qpid/broker/Exchange.h +++ b/cpp/src/qpid/broker/Exchange.h @@ -23,7 +23,6 @@ */ #include <boost/shared_ptr.hpp> -#include <qpid/broker/AsyncStore.h> #include "qpid/broker/BrokerImportExport.h" #include "qpid/broker/Deliverable.h" #include "qpid/broker/Message.h" @@ -36,7 +35,7 @@ #include "qmf/org/apache/qpid/broker/Binding.h" #include "qmf/org/apache/qpid/broker/Broker.h" -#include <set> +//#include <set> namespace qpid { namespace broker { @@ -44,9 +43,9 @@ namespace broker { class Broker; class ExchangeRegistry; -class QPID_BROKER_CLASS_EXTERN Exchange : public PersistableExchange, public DataSource, public management::Manageable { +class QPID_BROKER_CLASS_EXTERN Exchange : public PersistableExchange, public management::Manageable { public: - struct Binding : public management::Manageable { + struct Binding : public DataSource, public management::Manageable { typedef boost::shared_ptr<Binding> shared_ptr; typedef std::vector<Binding::shared_ptr> vector; @@ -55,13 +54,19 @@ public: const std::string key; const framing::FieldTable args; std::string origin; + ConfigHandle cfgHandle; qmf::org::apache::qpid::broker::Binding* mgmtBinding; Binding(const std::string& key, boost::shared_ptr<Queue> queue, Exchange* parent = 0, - framing::FieldTable args = framing::FieldTable(), const std::string& origin = std::string()); + framing::FieldTable args = framing::FieldTable(), const std::string& origin = std::string(), + ConfigHandle cfgHandle = ConfigHandle()); ~Binding(); void startManagement(); management::ManagementObject* GetManagementObject() const; + + // DataSource implementation - allows for persistence + uint64_t getSize(); + void write(char* target); }; private: @@ -71,6 +76,7 @@ private: boost::shared_ptr<Exchange> alternate; uint32_t alternateUsers; mutable uint64_t persistenceId; + static void configureComplete(const AsyncResultHandle* const); protected: mutable qpid::framing::FieldTable args; @@ -92,6 +98,8 @@ protected: typedef boost::shared_ptr< std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> > > BindingList; void doRoute(Deliverable& msg, ConstBindingList b); void routeIVE(); + void persistBind(Binding::shared_ptr b, AsyncStore* const s); + void persistUnbind(Binding::shared_ptr b, AsyncStore* const s); struct MatchQueue { @@ -197,9 +205,10 @@ public: * */ - virtual bool bind(boost::shared_ptr<Queue> queue, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0; - virtual bool unbind(boost::shared_ptr<Queue> queue, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0; + virtual bool bind(boost::shared_ptr<Queue> queue, const std::string& routingKey, const qpid::framing::FieldTable* args, AsyncStore* const store) = 0; + virtual bool unbind(boost::shared_ptr<Queue> queue, const std::string& routingKey, const qpid::framing::FieldTable* args, AsyncStore* const store) = 0; virtual bool isBound(boost::shared_ptr<Queue> queue, const std::string* const routingKey, const qpid::framing::FieldTable* const args) = 0; +// virtual boost::shared_ptr<Binding> getBinding(boost::shared_ptr<Queue> queue, const std::string& routingKey) = 0; //QPID_BROKER_EXTERN virtual void setProperties(Message&); virtual void route(Deliverable& msg) = 0; @@ -224,7 +233,7 @@ public: virtual const std::string& getLocalTag() const = 0; }; - void registerDynamicBridge(DynamicBridge* db); + void registerDynamicBridge(DynamicBridge* db, AsyncStore* const store); void removeDynamicBridge(DynamicBridge* db); virtual bool supportsDynamicBinding() { return false; } Broker* getBroker() const { return broker; } diff --git a/cpp/src/qpid/broker/FanOutExchange.cpp b/cpp/src/qpid/broker/FanOutExchange.cpp index 5b7e0c7324..941d909778 100644 --- a/cpp/src/qpid/broker/FanOutExchange.cpp +++ b/cpp/src/qpid/broker/FanOutExchange.cpp @@ -46,7 +46,7 @@ FanOutExchange::FanOutExchange(const std::string& _name, bool _durable, mgmtExchange->set_type (typeName); } -bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*key*/, const FieldTable* args) +bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*key*/, const FieldTable* args, AsyncStore* const store) { string fedOp(args ? args->getAsString(qpidFedOp) : fedOpBind); string fedTags(args ? args->getAsString(qpidFedTags) : ""); @@ -69,7 +69,7 @@ bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*key*/, const } else if (fedOp == fedOpUnbind) { propagate = fedBinding.delOrigin(queue->getName(), fedOrigin); if (fedBinding.countFedBindings(queue->getName()) == 0) - unbind(queue, "", args); + unbind(queue, "", args, store); } else if (fedOp == fedOpReorigin) { if (fedBinding.hasLocal()) { propagateFedOp(string(), string(), fedOpBind, string()); @@ -82,7 +82,7 @@ bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*key*/, const return true; } -bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*key*/, const FieldTable* args) +bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*key*/, const FieldTable* args, AsyncStore* const /*store*/) { string fedOrigin(args ? args->getAsString(qpidFedOrigin) : ""); bool propagate = false; diff --git a/cpp/src/qpid/broker/FanOutExchange.h b/cpp/src/qpid/broker/FanOutExchange.h index dc301a4266..e6a8f726d6 100644 --- a/cpp/src/qpid/broker/FanOutExchange.h +++ b/cpp/src/qpid/broker/FanOutExchange.h @@ -50,9 +50,13 @@ class FanOutExchange : public virtual Exchange { QPID_BROKER_EXTERN virtual bool bind(Queue::shared_ptr queue, const std::string& routingKey, - const qpid::framing::FieldTable* args); + const qpid::framing::FieldTable* args, + AsyncStore* const store); - virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); + virtual bool unbind(Queue::shared_ptr queue, + const std::string& routingKey, + const qpid::framing::FieldTable* args, + AsyncStore* const store); QPID_BROKER_EXTERN virtual void route(Deliverable& msg); diff --git a/cpp/src/qpid/broker/Handle.h b/cpp/src/qpid/broker/Handle.h index 397f58f2e7..31c029a512 100644 --- a/cpp/src/qpid/broker/Handle.h +++ b/cpp/src/qpid/broker/Handle.h @@ -65,6 +65,8 @@ template <class T> class Handle { void swap(Handle<T>& h) { T* t = h.impl; h.impl = impl; impl = t; } + void reset() { impl = 0; } + protected: typedef T Impl; Handle() :impl() {} diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp index ec19765387..76ffa7a922 100644 --- a/cpp/src/qpid/broker/HeadersExchange.cpp +++ b/cpp/src/qpid/broker/HeadersExchange.cpp @@ -167,7 +167,7 @@ HeadersExchange::HeadersExchange(const std::string& _name, bool _durable, mgmtExchange->set_type (typeName); } -bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, const FieldTable* args) +bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, const FieldTable* args, AsyncStore* const store) { string fedOp(fedOpBind); string fedTags; @@ -221,7 +221,7 @@ bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, co bindings.modify_if(MatchKey(queue, bindingKey), modifier); propagate = modifier.shouldPropagate; if (modifier.shouldUnbind) { - unbind(queue, bindingKey, args); + unbind(queue, bindingKey, args, store); } } else if (fedOp == fedOpReorigin) { @@ -246,7 +246,7 @@ bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, co return true; } -bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& bindingKey, const FieldTable *args){ +bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& bindingKey, const FieldTable *args, AsyncStore* const /*store*/){ bool propagate = false; string fedOrigin(args ? args->getAsString(qpidFedOrigin) : ""); { diff --git a/cpp/src/qpid/broker/HeadersExchange.h b/cpp/src/qpid/broker/HeadersExchange.h index ff0fec4212..10bf9c8c0b 100644 --- a/cpp/src/qpid/broker/HeadersExchange.h +++ b/cpp/src/qpid/broker/HeadersExchange.h @@ -91,9 +91,13 @@ class HeadersExchange : public virtual Exchange { QPID_BROKER_EXTERN virtual bool bind(Queue::shared_ptr queue, const std::string& routingKey, - const qpid::framing::FieldTable* args); + const qpid::framing::FieldTable* args, + AsyncStore* const store); - virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); + virtual bool unbind(Queue::shared_ptr queue, + const std::string& routingKey, + const qpid::framing::FieldTable* args, + AsyncStore* const store); QPID_BROKER_EXTERN virtual void route(Deliverable& msg); diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp index 416c3b7d34..9727040c9b 100644 --- a/cpp/src/qpid/broker/Link.cpp +++ b/cpp/src/qpid/broker/Link.cpp @@ -86,8 +86,8 @@ public: std::string getType() const { return Link::exchangeTypeName; } // Exchange methods - set up to prevent binding/unbinding etc from clients! - bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*) { return false; } - bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*) { return false; } + bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*, AsyncStore* const) { return false; } + bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*, AsyncStore* const) { return false; } bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const) {return false;} // Process messages sent from the remote's amq.failover exchange by extracting the failover URLs @@ -480,7 +480,7 @@ void Link::ioThreadProcessing() if (!created.empty()) { for (Bridges::iterator i = created.begin(); i != created.end(); ++i) { active.push_back(*i); - (*i)->create(*connection); + (*i)->create(*connection, broker->getStore()); } created.clear(); } diff --git a/cpp/src/qpid/broker/LinkRegistry.cpp b/cpp/src/qpid/broker/LinkRegistry.cpp index 43ed208eba..a79081b8ed 100644 --- a/cpp/src/qpid/broker/LinkRegistry.cpp +++ b/cpp/src/qpid/broker/LinkRegistry.cpp @@ -274,7 +274,7 @@ void LinkRegistry::destroyBridge(Bridge *bridge) } //void LinkRegistry::setStore (MessageStore* _store) -void LinkRegistry::setStore (AsyncStore* _asyncStore) { +void LinkRegistry::setStore (AsyncStore* const _asyncStore) { asyncStore = _asyncStore; } diff --git a/cpp/src/qpid/broker/LinkRegistry.h b/cpp/src/qpid/broker/LinkRegistry.h index a30f91e2a0..17f45a60c8 100644 --- a/cpp/src/qpid/broker/LinkRegistry.h +++ b/cpp/src/qpid/broker/LinkRegistry.h @@ -132,7 +132,7 @@ namespace broker { * Set the store to use. May only be called once. */ // QPID_BROKER_EXTERN void setStore (MessageStore*); - QPID_BROKER_EXTERN void setStore (AsyncStore*); + QPID_BROKER_EXTERN void setStore (AsyncStore* const); /** * Return the message store used. diff --git a/cpp/src/qpid/broker/Lvq.cpp b/cpp/src/qpid/broker/Lvq.cpp index 0bededb966..f5e66c8a74 100644 --- a/cpp/src/qpid/broker/Lvq.cpp +++ b/cpp/src/qpid/broker/Lvq.cpp @@ -27,7 +27,7 @@ namespace qpid { namespace broker { //Lvq::Lvq(const std::string& n, std::auto_ptr<MessageMap> m, const QueueSettings& s, MessageStore* const ms, management::Manageable* p, Broker* b) // : Queue(n, s, ms, p, b), messageMap(*m) -Lvq::Lvq(const std::string& n, std::auto_ptr<MessageMap> m, const QueueSettings& s, AsyncStore* const as, management::Manageable* p, Broker* b) +Lvq::Lvq(const std::string& n, std::auto_ptr<MessageMap> m, const QueueSettings& s, AsyncStore* as, management::Manageable* p, Broker* b) : Queue(n, s, as, p, b), messageMap(*m) { messages = m; diff --git a/cpp/src/qpid/broker/PersistableExchange.h b/cpp/src/qpid/broker/PersistableExchange.h index e1a0853247..ffea40493d 100644 --- a/cpp/src/qpid/broker/PersistableExchange.h +++ b/cpp/src/qpid/broker/PersistableExchange.h @@ -23,6 +23,8 @@ */ #include <string> +#include "qpid/broker/AsyncStore.h" +#include "qpid/broker/ConfigHandle.h" #include "qpid/broker/Persistable.h" namespace qpid { @@ -32,11 +34,17 @@ namespace broker { * The interface exchanges must expose to the MessageStore in order to be * persistable. */ -class PersistableExchange : public Persistable +class PersistableExchange : public Persistable, public DataSource { public: virtual const std::string& getName() const = 0; virtual ~PersistableExchange() {}; + ConfigHandle& getHandle() { return configHandle; } + void setHandle(ConfigHandle& ch) { configHandle = ch; } + void resetHandle() { configHandle.reset(); } + +protected: + ConfigHandle configHandle; }; }} diff --git a/cpp/src/qpid/broker/PersistableMessage.cpp b/cpp/src/qpid/broker/PersistableMessage.cpp index 2ef9fbfcbb..4645e5526d 100644 --- a/cpp/src/qpid/broker/PersistableMessage.cpp +++ b/cpp/src/qpid/broker/PersistableMessage.cpp @@ -23,6 +23,7 @@ #include "qpid/broker/PersistableMessage.h" //#include "qpid/broker/MessageStore.h" //#include "qpid/broker/AsyncStore.h" +#include "qpid/broker/EnqueueHandle.h" #include <iostream> using namespace qpid::broker; @@ -83,6 +84,44 @@ void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr, AsyncStore*) bool PersistableMessage::isDequeueComplete() { return false; } void PersistableMessage::dequeueComplete() {} +MessageHandle& PersistableMessage::createMessageHandle(AsyncStore* const store) { + assert (store != 0); + msgHandle = store->createMessageHandle(this); + return msgHandle; +} + +EnqueueHandle& PersistableMessage::createEnqueueHandle(QueueHandle& queueHandle, AsyncStore* const asyncStore) { + std::map<QueueHandle, EnqueueHandle>::iterator ehi = enqueueHandles.find(queueHandle); + if (ehi == enqueueHandles.end()) { + assert (asyncStore != 0); + ehi = enqueueHandles.insert(std::pair<QueueHandle, EnqueueHandle>(queueHandle, + asyncStore->createEnqueueHandle(msgHandle, queueHandle))).first; + } + return ehi->second; +} + +void PersistableMessage::removeEnqueueHandle(QueueHandle& queueHandle) { + std::map<QueueHandle, EnqueueHandle>::iterator ehi = enqueueHandles.find(queueHandle); + if (ehi != enqueueHandles.end()) { + enqueueHandles.erase(ehi); + } +} + +EnqueueHandle& PersistableMessage::getEnqueueHandle(QueueHandle& queueHandle) { + std::map<QueueHandle, EnqueueHandle>::iterator ehi = enqueueHandles.find(queueHandle); + assert (ehi != enqueueHandles.end()); + return ehi->second; +} + +const EnqueueHandle& PersistableMessage::getEnqueueHandle(QueueHandle& queueHandle) const { + std::map<QueueHandle, EnqueueHandle>::const_iterator ehci = enqueueHandles.find(queueHandle); + assert (ehci != enqueueHandles.end()); + return ehci->second; +} + +uint64_t PersistableMessage::getSize() { return 0; } // TODO: kpvdr: implement +void PersistableMessage::write(char* /*target*/) {} // TODO: kpvdr: implement + }} diff --git a/cpp/src/qpid/broker/PersistableMessage.h b/cpp/src/qpid/broker/PersistableMessage.h index 0fd3d169b4..a69d00ca71 100644 --- a/cpp/src/qpid/broker/PersistableMessage.h +++ b/cpp/src/qpid/broker/PersistableMessage.h @@ -41,14 +41,16 @@ class Variant; } namespace broker { +class EnqueueHandle; class MessageStore; class AsyncStore; class Queue; +class QueueHandle; /** * Base class for persistable messages. */ -class PersistableMessage : public Persistable +class PersistableMessage : public Persistable, public DataSource { /** * "Ingress" messages == messages sent _to_ the broker. @@ -63,6 +65,7 @@ class PersistableMessage : public Persistable boost::intrusive_ptr<AsyncCompletion> holder; mutable uint64_t persistenceId; MessageHandle msgHandle; + std::map<QueueHandle, EnqueueHandle> enqueueHandles; public: PersistableMessage(); @@ -96,9 +99,17 @@ class PersistableMessage : public Persistable uint64_t getPersistenceId() const { return persistenceId; } void setPersistenceId(uint64_t _persistenceId) const { persistenceId = _persistenceId; } + MessageHandle& createMessageHandle(AsyncStore* const store); MessageHandle& getMessageHandle() { return msgHandle; } const MessageHandle& getMessagehandle() const { return msgHandle; } + EnqueueHandle& createEnqueueHandle(QueueHandle& queueHandle, AsyncStore* const asyncStore); + void removeEnqueueHandle(QueueHandle& queueHandle); + EnqueueHandle& getEnqueueHandle(QueueHandle& queueHandle); + const EnqueueHandle& getEnqueueHandle(QueueHandle& queueHandle) const; + + uint64_t getSize(); + void write(char* target); virtual void decodeHeader(framing::Buffer& buffer) = 0; virtual void decodeContent(framing::Buffer& buffer) = 0; diff --git a/cpp/src/qpid/broker/PersistableQueue.h b/cpp/src/qpid/broker/PersistableQueue.h index 655d26bc74..ed8c193245 100644 --- a/cpp/src/qpid/broker/PersistableQueue.h +++ b/cpp/src/qpid/broker/PersistableQueue.h @@ -23,12 +23,15 @@ */ #include <string> +#include "qpid/broker/AsyncStore.h" #include "qpid/broker/Persistable.h" +#include "qpid/broker/QueueHandle.h" #include "qpid/management/Manageable.h" #include <boost/shared_ptr.hpp> namespace qpid { namespace broker { +class AsyncResultHandle; /** @@ -48,7 +51,7 @@ public: * The interface queues must expose to the MessageStore in order to be * persistable. */ -class PersistableQueue : public Persistable +class PersistableQueue : public Persistable, public DataSource { public: typedef boost::shared_ptr<PersistableQueue> shared_ptr; @@ -62,14 +65,14 @@ public: virtual void setExternalQueueStore(ExternalQueueStore* inst) = 0; virtual void flush() = 0; - inline ExternalQueueStore* getExternalQueueStore() const {return externalQueueStore;}; + inline ExternalQueueStore* getExternalQueueStore() const {return externalQueueStore;} + inline QueueHandle& getQueueHandle() { return queueHandle; } - PersistableQueue():externalQueueStore(NULL){ - }; + PersistableQueue():externalQueueStore(NULL) {} protected: ExternalQueueStore* externalQueueStore; - + QueueHandle queueHandle; }; }} diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 40574ded3b..f595b81724 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -31,6 +31,7 @@ #include "qpid/broker/MessageDistributor.h" #include "qpid/broker/FifoDistributor.h" //#include "qpid/broker/NullMessageStore.h" +#include "qpid/broker/QueueAsyncContext.h" #include "qpid/broker/QueueRegistry.h" //TODO: get rid of this @@ -818,9 +819,17 @@ bool Queue::enqueue(TransactionContext* /*ctxt*/, Message& msg) boost::intrusive_ptr<PersistableMessage> pmsg = msg.getPersistentContext(); assert(pmsg); // pmsg->enqueueAsync(shared_from_this(), store); - pmsg->enqueueAsync(shared_from_this(), asyncStore); // store->enqueue(ctxt, pmsg, *this); - // TODO - kpvdr: async enqueue here + pmsg->enqueueAsync(shared_from_this(), asyncStore); + pmsg->createMessageHandle(asyncStore); + EnqueueHandle& eh = pmsg->createEnqueueHandle(queueHandle, asyncStore); + TxnHandle th; // TODO: kpvdr: Impement transactions + boost::shared_ptr<QueueAsyncContext> qac( + new QueueAsyncContext(boost::dynamic_pointer_cast<PersistableQueue>(shared_from_this()), + pmsg, + &enqueueComplete, + &broker->getAsyncResultQueue())); + asyncStore->submitEnqueue(eh, th, qac); } return true; } @@ -893,7 +902,15 @@ void Queue::dequeue(TransactionContext* ctxt, const QueueCursor& cursor) // if (store && pmsg) { if (asyncStore && pmsg) { // store->dequeue(ctxt, pmsg, *this); - // TODO: kpvdr: async dequeue here + pmsg->dequeueAsync(shared_from_this(), asyncStore); + TxnHandle th; // TODO: kpvdr: Impement transactions + EnqueueHandle& eh = pmsg->getEnqueueHandle(queueHandle); + boost::shared_ptr<QueueAsyncContext> qac( + new QueueAsyncContext(boost::dynamic_pointer_cast<PersistableQueue>(shared_from_this()), + pmsg, + &dequeueComplete, + &broker->getAsyncResultQueue())); + asyncStore->submitDequeue(eh, th, qac); } } @@ -908,6 +925,7 @@ void Queue::dequeueCommitted(const QueueCursor& cursor) mgmtObject->inc_msgTxnDequeues(); mgmtObject->inc_byteTxnDequeues(contentSize); } + if (brokerMgmtObject) { _qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics(); bStats->msgTxnDequeues += 1; @@ -997,7 +1015,9 @@ void Queue::create() // if (store) { if (asyncStore) { // store->create(*this, settings.storeSettings); - // TODO: kpvdr: async store create here + queueHandle = asyncStore->createQueueHandle(name, qpid::types::Variant::Map()); + boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(boost::dynamic_pointer_cast<PersistableQueue>(shared_from_this()), &createComplete, &broker->getAsyncResultQueue())); + asyncStore->submitCreate(queueHandle, this, qac); } } @@ -1068,12 +1088,14 @@ void Queue::destroyed() if (asyncStore) { barrier.destroy(); // store->flush(*this); - // TODO: kpvdr: async flush here + boost::shared_ptr<QueueAsyncContext> flush_qac(new QueueAsyncContext(boost::dynamic_pointer_cast<PersistableQueue>(shared_from_this()), &flushComplete, &broker->getAsyncResultQueue())); + asyncStore->submitFlush(queueHandle, flush_qac); // store->destroy(*this); - // TODO: kpvdr: async destroy here + boost::shared_ptr<QueueAsyncContext> destroy_qac(new QueueAsyncContext(boost::dynamic_pointer_cast<PersistableQueue>(shared_from_this()), &destroyComplete, &broker->getAsyncResultQueue())); + asyncStore->submitDestroy(queueHandle, destroy_qac); // store = 0;//ensure we make no more calls to the store for this queue // TODO: kpvdr: cannot set asyncStore to 0 until all async store ops are complete. Rather set flag which - // will cause store to be destroyed when all outstanding async ops are complete. + // will prevent new calls from succeeding and cause store to be destroyed when all outstanding async ops are complete. } if (autoDeleteTask) autoDeleteTask = boost::intrusive_ptr<TimerTask>(); notifyDeleted(); @@ -1102,7 +1124,7 @@ void Queue::bound(const string& exchange, const string& key, void Queue::unbind(ExchangeRegistry& exchanges) { - bindings.unbind(exchanges, shared_from_this()); + bindings.unbind(exchanges, shared_from_this(), asyncStore); } uint64_t Queue::getPersistenceId() const @@ -1274,6 +1296,46 @@ void Queue::setExternalQueueStore(ExternalQueueStore* inst) { } } +uint64_t Queue::getSize() { return 0; } // TODO: kpvdr: implement +void Queue::write(char* /*target*/) {} // TODO: kpvdr: implement + +// static +void Queue::createComplete(const AsyncResultHandle* const arh) { + boost::shared_ptr<QueueAsyncContext> qac = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext()); +// std::cout << "@@@@ Queue \"" << qac->getQueue()->getName() << "\": Create complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl; +} + +//static +void Queue::dequeueComplete(const AsyncResultHandle* const arh) { + boost::shared_ptr<QueueAsyncContext> qac = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext()); + boost::shared_ptr<PersistableQueue> pq = qac->getQueue(); + boost::intrusive_ptr<PersistableMessage> pmsg = qac->getMessage(); + QueueHandle& qh = pq->getQueueHandle(); + pmsg->dequeueComplete(); + pmsg->removeEnqueueHandle(qh); +// std::cout << "@@@@ Queue \"" << pq->getName() << "\": Dequeue complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl; +} + +//static +void Queue::destroyComplete(const AsyncResultHandle* const arh) { + boost::shared_ptr<QueueAsyncContext> qac = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext()); + // TODO: kpvdr: set Queue::asyncStore = 0 from here. +// std::cout << "@@@@ Queue \"" << qac->getQueue()->getName() << "\": Destroy complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl; +} + +//static +void Queue::enqueueComplete(const AsyncResultHandle* const arh) { + boost::shared_ptr<QueueAsyncContext> qac = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext()); + qac->getMessage()->enqueueComplete(); +// std::cout << "@@@@ Queue \"" << qac->getQueue()->getName() << "\": Enqueue complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl; +} + +//static +void Queue::flushComplete(const AsyncResultHandle* const arh) { + boost::shared_ptr<QueueAsyncContext> qac = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext()); +// std::cout << "@@@@ Queue \"" << qac->getQueue()->getName() << "\": Flush complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl; +} + void Queue::countRejected() const { if (mgmtObject) { @@ -1464,19 +1526,23 @@ void Queue::flush() ScopedUse u(barrier); // if (u.acquired && store) store->flush(*this); // TODO: kpvdr: Async store flush here - if (u.acquired && asyncStore) { /*store->flush(*this);*/ } + if (u.acquired && asyncStore) { + //store->flush(*this); + std::cout << "&&&& Queue::flush(): Queue=\"" << name << "\"" << std::endl << std::flush; + } } bool Queue::bind(boost::shared_ptr<Exchange> exchange, const std::string& key, const qpid::framing::FieldTable& arguments) { - if (exchange->bind(shared_from_this(), key, &arguments)) { + if (exchange->bind(shared_from_this(), key, &arguments, asyncStore)) { bound(exchange->getName(), key, arguments); - if (exchange->isDurable() && isDurable()) { +// Move this to Exchange::bind() which keeps the binding context +// if (exchange->isDurable() && isDurable()) { // store->bind(*exchange, *this, key, arguments); - // TODO: kpvdr: Store configuration here - } +// // TODO: kpvdr: Store configuration here +// } return true; } else { return false; diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index 28fa8b5ca9..1294f813aa 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -116,7 +116,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, const std::string name; // MessageStore* store; - AsyncStore* asyncStore; + AsyncStore* const asyncStore; const OwnershipToken* owner; uint32_t consumerCount; // Actually a count of all subscriptions, acquiring or not. uint32_t browserCount; // Count of non-acquiring subscriptions. @@ -194,6 +194,12 @@ class Queue : public boost::enable_shared_from_this<Queue>, uint32_t remove(uint32_t maxCount, MessagePredicate, MessageFunctor, SubscriptionType); virtual bool checkDepth(const QueueDepth& increment, const Message&); + static void createComplete(const AsyncResultHandle* const arh); + static void dequeueComplete(const AsyncResultHandle* const arh); + static void destroyComplete(const AsyncResultHandle* const arh); + static void enqueueComplete(const AsyncResultHandle* const arh); + static void flushComplete(const AsyncResultHandle* const arh); + public: typedef boost::shared_ptr<Queue> shared_ptr; @@ -233,7 +239,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, */ QPID_BROKER_EXTERN bool bind( boost::shared_ptr<Exchange> exchange, const std::string& key, - const qpid::framing::FieldTable& arguments=qpid::framing::FieldTable()); + const qpid::framing::FieldTable& arguments/*=qpid::framing::FieldTable()*/); /** * Removes (and dequeues) a message by its sequence number (used @@ -336,6 +342,10 @@ class Queue : public boost::enable_shared_from_this<Queue>, virtual void setExternalQueueStore(ExternalQueueStore* inst); + // Implement DataStore, allows Queue to persist its configuration + uint64_t getSize(); + void write(char* target); + // Increment the rejected-by-consumer counter. QPID_BROKER_EXTERN void countRejected() const; QPID_BROKER_EXTERN void countFlowedToDisk(uint64_t size) const; diff --git a/cpp/src/qpid/broker/QueueAsyncContext.cpp b/cpp/src/qpid/broker/QueueAsyncContext.cpp index 1bad5387a3..24ecaf6b5d 100644 --- a/cpp/src/qpid/broker/QueueAsyncContext.cpp +++ b/cpp/src/qpid/broker/QueueAsyncContext.cpp @@ -33,67 +33,67 @@ QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<PersistableQueue> q, AsyncResultCallback rcb, AsyncResultQueue* const arq) : m_q(q), + m_pmsg(0), + m_tb(0), m_rcb(rcb), m_arq(arq) -{} +{ + //assert(m_q.get() != 0); +} -/* QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<PersistableQueue> q, boost::intrusive_ptr<PersistableMessage> msg, AsyncResultCallback rcb, AsyncResultQueue* const arq) : m_q(q), - m_msg(msg), + m_pmsg(msg), + m_tb(0), m_rcb(rcb), m_arq(arq) -{} -*/ +{ + //assert(m_q.get() != 0); + //assert(m_pmsg.get() != 0); +} QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<PersistableQueue> q, SimpleTxnBuffer* tb, AsyncResultCallback rcb, AsyncResultQueue* const arq) : m_q(q), + m_pmsg(0), m_tb(tb), m_rcb(rcb), m_arq(arq) { - assert(m_q.get() != 0); + //assert(m_q.get() != 0); } -/* QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<PersistableQueue> q, boost::intrusive_ptr<PersistableMessage> msg, SimpleTxnBuffer* tb, AsyncResultCallback rcb, AsyncResultQueue* const arq) : m_q(q), - m_msg(msg), + m_pmsg(msg), m_tb(tb), m_rcb(rcb), m_arq(arq) { - assert(m_q.get() != 0); - assert(m_msg.get() != 0); + //assert(m_q.get() != 0); + //assert(m_pmsg.get() != 0); } -*/ -QueueAsyncContext::~QueueAsyncContext() -{} +QueueAsyncContext::~QueueAsyncContext() {} boost::shared_ptr<PersistableQueue> -QueueAsyncContext::getQueue() const -{ +QueueAsyncContext::getQueue() const { return m_q; } -/* boost::intrusive_ptr<PersistableMessage> -QueueAsyncContext::getMessage() const -{ - return m_msg; +QueueAsyncContext::getMessage() const { + return m_pmsg; } -*/ SimpleTxnBuffer* QueueAsyncContext::getTxnBuffer() const { @@ -101,28 +101,24 @@ QueueAsyncContext::getTxnBuffer() const { } AsyncResultQueue* -QueueAsyncContext::getAsyncResultQueue() const -{ +QueueAsyncContext::getAsyncResultQueue() const { return m_arq; } AsyncResultCallback -QueueAsyncContext::getAsyncResultCallback() const -{ +QueueAsyncContext::getAsyncResultCallback() const { return m_rcb; } void -QueueAsyncContext::invokeCallback(const AsyncResultHandle* const arh) const -{ +QueueAsyncContext::invokeCallback(const AsyncResultHandle* const arh) const { if (m_rcb) { m_rcb(arh); } } void -QueueAsyncContext::destroy() -{ +QueueAsyncContext::destroy() { delete this; } diff --git a/cpp/src/qpid/broker/QueueAsyncContext.h b/cpp/src/qpid/broker/QueueAsyncContext.h index 4988f2af39..2ce77232b9 100644 --- a/cpp/src/qpid/broker/QueueAsyncContext.h +++ b/cpp/src/qpid/broker/QueueAsyncContext.h @@ -36,7 +36,7 @@ namespace qpid { namespace broker { -//class PersistableMessage; +class PersistableMessage; class PersistableQueue; typedef void (*AsyncResultCallback)(const AsyncResultHandle* const); @@ -47,22 +47,22 @@ public: QueueAsyncContext(boost::shared_ptr<PersistableQueue> q, AsyncResultCallback rcb, AsyncResultQueue* const arq); -/* QueueAsyncContext(boost::shared_ptr<PersistableQueue> q, + QueueAsyncContext(boost::shared_ptr<PersistableQueue> q, boost::intrusive_ptr<PersistableMessage> msg, AsyncResultCallback rcb, - AsyncResultQueue* const arq);*/ + AsyncResultQueue* const arq); QueueAsyncContext(boost::shared_ptr<PersistableQueue> q, SimpleTxnBuffer* tb, AsyncResultCallback rcb, AsyncResultQueue* const arq); -/* QueueAsyncContext(boost::shared_ptr<PersistableQueue> q, + QueueAsyncContext(boost::shared_ptr<PersistableQueue> q, boost::intrusive_ptr<PersistableMessage> msg, SimpleTxnBuffer* tb, AsyncResultCallback rcb, - AsyncResultQueue* const arq);*/ + AsyncResultQueue* const arq); virtual ~QueueAsyncContext(); boost::shared_ptr<PersistableQueue> getQueue() const; -// boost::intrusive_ptr<PersistableMessage> getMessage() const; + boost::intrusive_ptr<PersistableMessage> getMessage() const; SimpleTxnBuffer* getTxnBuffer() const; AsyncResultQueue* getAsyncResultQueue() const; AsyncResultCallback getAsyncResultCallback() const; @@ -71,7 +71,7 @@ public: private: boost::shared_ptr<PersistableQueue> m_q; -// boost::intrusive_ptr<PersistableMessage> m_msg; + boost::intrusive_ptr<PersistableMessage> m_pmsg; SimpleTxnBuffer* m_tb; AsyncResultCallback m_rcb; AsyncResultQueue* const m_arq; diff --git a/cpp/src/qpid/broker/QueueBindings.cpp b/cpp/src/qpid/broker/QueueBindings.cpp index 1cc3486d9a..3d04b3123c 100644 --- a/cpp/src/qpid/broker/QueueBindings.cpp +++ b/cpp/src/qpid/broker/QueueBindings.cpp @@ -34,7 +34,7 @@ void QueueBindings::add(const string& exchange, const string& key, const FieldTa bindings.push_back(QueueBinding(exchange, key, args)); } -void QueueBindings::unbind(ExchangeRegistry& exchanges, Queue::shared_ptr queue) +void QueueBindings::unbind(ExchangeRegistry& exchanges, Queue::shared_ptr queue, AsyncStore* const store) { Bindings local; { @@ -44,7 +44,7 @@ void QueueBindings::unbind(ExchangeRegistry& exchanges, Queue::shared_ptr queue) for (Bindings::iterator i = local.begin(); i != local.end(); i++) { Exchange::shared_ptr ex = exchanges.find(i->exchange); - if (ex) ex->unbind(queue, i->key, &(i->args)); + if (ex) ex->unbind(queue, i->key, &(i->args), store); } } diff --git a/cpp/src/qpid/broker/QueueBindings.h b/cpp/src/qpid/broker/QueueBindings.h index f9b07e7431..7e49c783e5 100644 --- a/cpp/src/qpid/broker/QueueBindings.h +++ b/cpp/src/qpid/broker/QueueBindings.h @@ -55,7 +55,7 @@ class QueueBindings } void add(const std::string& exchange, const std::string& key, const qpid::framing::FieldTable& args); - void unbind(ExchangeRegistry& exchanges, boost::shared_ptr<Queue> queue); + void unbind(ExchangeRegistry& exchanges, boost::shared_ptr<Queue> queue, AsyncStore* const store); private: mutable sys::Mutex lock; diff --git a/cpp/src/qpid/broker/QueueFactory.cpp b/cpp/src/qpid/broker/QueueFactory.cpp index 6ff3f832e4..f2d7fb8d35 100644 --- a/cpp/src/qpid/broker/QueueFactory.cpp +++ b/cpp/src/qpid/broker/QueueFactory.cpp @@ -42,7 +42,7 @@ namespace broker { //QueueFactory::QueueFactory() : broker(0), store(0), parent(0) {} -QueueFactory::QueueFactory() : broker(0), asyncStore(0), parent(0) {} +QueueFactory::QueueFactory() : broker(0), asyncStore(), parent(0) {} boost::shared_ptr<Queue> QueueFactory::create(const std::string& name, const QueueSettings& settings) { @@ -103,7 +103,7 @@ Broker* QueueFactory::getBroker() return broker; } //void QueueFactory::setStore (MessageStore* s) -void QueueFactory::setStore (AsyncStore* as) +void QueueFactory::setStore (AsyncStore* const as) { asyncStore = as; } diff --git a/cpp/src/qpid/broker/QueueFactory.h b/cpp/src/qpid/broker/QueueFactory.h index 9d0048e139..cc28356982 100644 --- a/cpp/src/qpid/broker/QueueFactory.h +++ b/cpp/src/qpid/broker/QueueFactory.h @@ -54,7 +54,7 @@ class QueueFactory * Set the store to use. May only be called once. */ // void setStore (MessageStore*); - void setStore (AsyncStore*); + void setStore (AsyncStore* const); /** * Return the message store used. diff --git a/cpp/src/qpid/broker/QueueHandle.cpp b/cpp/src/qpid/broker/QueueHandle.cpp index 9c8d7eba67..3c647cb66a 100644 --- a/cpp/src/qpid/broker/QueueHandle.cpp +++ b/cpp/src/qpid/broker/QueueHandle.cpp @@ -54,12 +54,4 @@ QueueHandle::operator=(const QueueHandle& r) return PrivateImpl::assign(*this, r); } -// --- QueueHandleImpl methods --- - -const std::string& -QueueHandle::getName() const -{ - return impl->getName(); -} - }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/QueueHandle.h b/cpp/src/qpid/broker/QueueHandle.h index 1110367418..b25064d229 100644 --- a/cpp/src/qpid/broker/QueueHandle.h +++ b/cpp/src/qpid/broker/QueueHandle.h @@ -45,9 +45,6 @@ public: ~QueueHandle(); QueueHandle& operator=(const QueueHandle& r); - // --- QueueHandleImpl methods --- - const std::string& getName() const; - private: friend class PrivateImplRef<QueueHandle>; }; diff --git a/cpp/src/qpid/broker/QueueRegistry.cpp b/cpp/src/qpid/broker/QueueRegistry.cpp index 420a9caa28..eb525b6727 100644 --- a/cpp/src/qpid/broker/QueueRegistry.cpp +++ b/cpp/src/qpid/broker/QueueRegistry.cpp @@ -101,7 +101,7 @@ Queue::shared_ptr QueueRegistry::get(const string& name) { } //void QueueRegistry::setStore (MessageStore* _store) -void QueueRegistry::setStore (AsyncStore* _store) +void QueueRegistry::setStore (AsyncStore* const _store) { QueueFactory::setStore(_store); } diff --git a/cpp/src/qpid/broker/QueueRegistry.h b/cpp/src/qpid/broker/QueueRegistry.h index b274493f8d..ada76f9cca 100644 --- a/cpp/src/qpid/broker/QueueRegistry.h +++ b/cpp/src/qpid/broker/QueueRegistry.h @@ -98,7 +98,7 @@ class QueueRegistry : QueueFactory { * Set the store to use. May only be called once. */ // void setStore (MessageStore*); - void setStore (AsyncStore*); + void setStore (AsyncStore* const); /** * Return the message store used. diff --git a/cpp/src/qpid/broker/RecoverableExchange.h b/cpp/src/qpid/broker/RecoverableExchange.h index ca6cc1541e..6bda1e2617 100644 --- a/cpp/src/qpid/broker/RecoverableExchange.h +++ b/cpp/src/qpid/broker/RecoverableExchange.h @@ -27,6 +27,7 @@ namespace qpid { namespace broker { +class AsyncStore; /** * The interface through which bindings are recovered. @@ -42,7 +43,8 @@ public: */ virtual void bind(const std::string& queue, const std::string& routingKey, - qpid::framing::FieldTable& args) = 0; + qpid::framing::FieldTable&, + AsyncStore* const store) = 0; virtual ~RecoverableExchange() {}; }; diff --git a/cpp/src/qpid/broker/RecoveryManager.h b/cpp/src/qpid/broker/RecoveryManager.h index 0cb7c544cd..f2d28c0328 100644 --- a/cpp/src/qpid/broker/RecoveryManager.h +++ b/cpp/src/qpid/broker/RecoveryManager.h @@ -31,12 +31,13 @@ namespace qpid { namespace broker { +class AsyncStore; class RecoveryManager{ public: virtual ~RecoveryManager(){} virtual RecoverableExchange::shared_ptr recoverExchange(framing::Buffer& buffer) = 0; - virtual RecoverableQueue::shared_ptr recoverQueue(framing::Buffer& buffer) = 0; + virtual RecoverableQueue::shared_ptr recoverQueue(framing::Buffer& buffer, AsyncStore* const store) = 0; virtual RecoverableMessage::shared_ptr recoverMessage(framing::Buffer& buffer) = 0; virtual RecoverableTransaction::shared_ptr recoverTransaction(const std::string& xid, std::auto_ptr<TPCTransactionContext> txn) = 0; diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp index 7deeba5e65..f3e1639ca5 100644 --- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp +++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp @@ -81,7 +81,7 @@ class RecoverableExchangeImpl : public RecoverableExchange public: RecoverableExchangeImpl(Exchange::shared_ptr _exchange, QueueRegistry& _queues) : exchange(_exchange), queues(_queues) {} void setPersistenceId(uint64_t id); - void bind(const std::string& queue, const std::string& routingKey, qpid::framing::FieldTable& args); + void bind(const std::string& queue, const std::string& routingKey, qpid::framing::FieldTable& args, AsyncStore* const store); }; class RecoverableConfigImpl : public RecoverableConfig @@ -113,13 +113,13 @@ RecoverableExchange::shared_ptr RecoveryManagerImpl::recoverExchange(framing::Bu } } -RecoverableQueue::shared_ptr RecoveryManagerImpl::recoverQueue(framing::Buffer& buffer) +RecoverableQueue::shared_ptr RecoveryManagerImpl::recoverQueue(framing::Buffer& buffer, AsyncStore* const store) { Queue::shared_ptr queue = Queue::restore(queues, buffer); try { Exchange::shared_ptr exchange = exchanges.getDefault(); if (exchange) { - exchange->bind(queue, queue->getName(), 0); + exchange->bind(queue, queue->getName(), 0, store); queue->bound(exchange->getName(), queue->getName(), framing::FieldTable()); } } catch (const framing::NotFoundException& /*e*/) { @@ -238,10 +238,11 @@ void RecoverableConfigImpl::setPersistenceId(uint64_t id) void RecoverableExchangeImpl::bind(const string& queueName, const string& key, - framing::FieldTable& args) + framing::FieldTable& args, + AsyncStore* const store) { Queue::shared_ptr queue = queues.find(queueName); - exchange->bind(queue, key, &args); + exchange->bind(queue, key, &args, store); queue->bound(exchange->getName(), key, args); } diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.h b/cpp/src/qpid/broker/RecoveryManagerImpl.h index 1ad7892b13..7fca0be194 100644 --- a/cpp/src/qpid/broker/RecoveryManagerImpl.h +++ b/cpp/src/qpid/broker/RecoveryManagerImpl.h @@ -42,7 +42,7 @@ namespace broker { ~RecoveryManagerImpl(); RecoverableExchange::shared_ptr recoverExchange(framing::Buffer& buffer); - RecoverableQueue::shared_ptr recoverQueue(framing::Buffer& buffer); + RecoverableQueue::shared_ptr recoverQueue(framing::Buffer& buffer, AsyncStore* const store); RecoverableMessage::shared_ptr recoverMessage(framing::Buffer& buffer); RecoverableTransaction::shared_ptr recoverTransaction(const std::string& xid, std::auto_ptr<TPCTransactionContext> txn); diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index 97d6dc07b0..5fc9a1a932 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -156,7 +156,7 @@ void SemanticState::startTx() } //void SemanticState::commit(MessageStore* const store) -void SemanticState::commit(AsyncStore* const store) +void SemanticState::commit(AsyncTransactionalStore* const store) { if (!txBuffer) throw CommandInvalidException(QPID_MSG("Session has not been selected for use with transactions")); diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h index a30c7e15b7..9add663e24 100644 --- a/cpp/src/qpid/broker/SemanticState.h +++ b/cpp/src/qpid/broker/SemanticState.h @@ -235,7 +235,7 @@ class SemanticState : private boost::noncopyable { void startTx(); // void commit(MessageStore* const store); - void commit(AsyncStore* const store); + void commit(AsyncTransactionalStore* const store); void rollback(); void selectDtx(); bool getDtxSelected() const { return dtxSelected; } diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp index a05934ed8e..cb2fe15b58 100644 --- a/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/cpp/src/qpid/broker/SessionAdapter.cpp @@ -563,7 +563,7 @@ void SessionAdapter::TxHandlerImpl::select() void SessionAdapter::TxHandlerImpl::commit() { - state.commit(&getBroker().getStore()); + state.commit(getBroker().getStore()); } void SessionAdapter::TxHandlerImpl::rollback() diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index 88cdf7e03a..944cbad0aa 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -401,7 +401,7 @@ void SessionState::IncompleteIngressMsgXfer::completed(bool sync) completerContext->scheduleMsgCompletion(id, requiresAccept, requiresSync); } else { // this path runs directly from the ac->end() call in handleContent() above, - // so *session is definately valid. + // so *session is definitely valid. if (session->isAttached()) { QPID_LOG(debug, ": receive completed for msg seq=" << id); session->completeRcvMsg(id, requiresAccept, requiresSync); diff --git a/cpp/src/qpid/broker/SimpleQueue.h b/cpp/src/qpid/broker/SimpleQueue.h index c2f21076cd..da5c2d9ad6 100644 --- a/cpp/src/qpid/broker/SimpleQueue.h +++ b/cpp/src/qpid/broker/SimpleQueue.h @@ -51,8 +51,7 @@ class SimpleMessage; class SimpleTxnBuffer; class SimpleQueue : public boost::enable_shared_from_this<SimpleQueue>, - public PersistableQueue, - public DataSource + public PersistableQueue { public: SimpleQueue(const std::string& name, diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp index d9871a430b..38d8f255ac 100644 --- a/cpp/src/qpid/broker/TopicExchange.cpp +++ b/cpp/src/qpid/broker/TopicExchange.cpp @@ -156,7 +156,7 @@ TopicExchange::TopicExchange(const std::string& _name, bool _durable, mgmtExchange->set_type (typeName); } -bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args) +bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args, AsyncStore* const /*store*/) { ClearCache cc(&cacheLock,&bindingCache); // clear the cache on function exit. string fedOp(args ? args->getAsString(qpidFedOp) : fedOpBind); @@ -226,7 +226,7 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons return true; } -bool TopicExchange::unbind(Queue::shared_ptr queue, const string& constRoutingKey, const FieldTable* args) +bool TopicExchange::unbind(Queue::shared_ptr queue, const string& constRoutingKey, const FieldTable* args, AsyncStore* const /*store*/) { string fedOrigin(args ? args->getAsString(qpidFedOrigin) : ""); QPID_LOG(debug, "Unbinding key [" << constRoutingKey << "] from queue " << queue->getName() diff --git a/cpp/src/qpid/broker/TopicExchange.h b/cpp/src/qpid/broker/TopicExchange.h index c50ecf1830..329c2f408f 100644 --- a/cpp/src/qpid/broker/TopicExchange.h +++ b/cpp/src/qpid/broker/TopicExchange.h @@ -96,9 +96,13 @@ public: QPID_BROKER_EXTERN virtual bool bind(Queue::shared_ptr queue, const std::string& routingKey, - const qpid::framing::FieldTable* args); + const qpid::framing::FieldTable* args, + AsyncStore* const store); - virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); + virtual bool unbind(Queue::shared_ptr queue, + const std::string& routingKey, + const qpid::framing::FieldTable* args, + AsyncStore* const store); QPID_BROKER_EXTERN virtual void route(Deliverable& msg); diff --git a/cpp/src/qpid/broker/TransactionalStore.h b/cpp/src/qpid/broker/TransactionalStore.h index 2a2bac0c51..9c844c1ee1 100644 --- a/cpp/src/qpid/broker/TransactionalStore.h +++ b/cpp/src/qpid/broker/TransactionalStore.h @@ -18,6 +18,7 @@ * under the License. * */ + #ifndef _TransactionalStore_ #define _TransactionalStore_ diff --git a/cpp/src/qpid/ha/BrokerReplicator.cpp b/cpp/src/qpid/ha/BrokerReplicator.cpp index 0a66527e98..3a3c9c2954 100644 --- a/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -398,7 +398,7 @@ void BrokerReplicator::doEventBind(Variant::Map& values) { QPID_LOG(debug, logPrefix << "Bind event: exchange=" << exchange->getName() << " queue=" << queue->getName() << " key=" << key); - exchange->bind(queue, key, &args); + exchange->bind(queue, key, &args, 0); } } @@ -418,7 +418,7 @@ void BrokerReplicator::doEventUnbind(Variant::Map& values) { QPID_LOG(debug, logPrefix << "Unbind event: exchange=" << exchange->getName() << " queue=" << queue->getName() << " key=" << key); - exchange->unbind(queue, key, &args); + exchange->unbind(queue, key, &args, 0); } } @@ -514,7 +514,7 @@ void BrokerReplicator::doResponseBind(Variant::Map& values) { << " key:" << key); framing::FieldTable args; qpid::amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args); - exchange->bind(queue, key, &args); + exchange->bind(queue, key, &args, 0); } } @@ -616,8 +616,8 @@ boost::shared_ptr<Exchange> BrokerReplicator::createExchange( else return boost::shared_ptr<Exchange>(); } -bool BrokerReplicator::bind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; } -bool BrokerReplicator::unbind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; } +bool BrokerReplicator::bind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*, qpid::broker::AsyncStore* const) { return false; } +bool BrokerReplicator::unbind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*, qpid::broker::AsyncStore* const) { return false; } bool BrokerReplicator::isBound(boost::shared_ptr<Queue>, const string* const, const framing::FieldTable* const) { return false; } // DataSource interface - used to write persistence data to async store diff --git a/cpp/src/qpid/ha/BrokerReplicator.h b/cpp/src/qpid/ha/BrokerReplicator.h index 109d8c638e..f6983e8719 100644 --- a/cpp/src/qpid/ha/BrokerReplicator.h +++ b/cpp/src/qpid/ha/BrokerReplicator.h @@ -35,6 +35,7 @@ namespace qpid { namespace broker { +class AsyncStore; class Broker; class Link; class Bridge; @@ -71,8 +72,8 @@ class BrokerReplicator : public broker::Exchange, // Exchange methods std::string getType() const; - bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*); - bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*); + bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*, qpid::broker::AsyncStore* const store); + bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*, qpid::broker::AsyncStore* const store); void route(broker::Deliverable&); bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const); diff --git a/cpp/src/qpid/ha/QueueReplicator.cpp b/cpp/src/qpid/ha/QueueReplicator.cpp index 8aba7555d4..cac1fdac29 100644 --- a/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/cpp/src/qpid/ha/QueueReplicator.cpp @@ -190,8 +190,8 @@ void QueueReplicator::route(Deliverable& msg) } // Unused Exchange methods. -bool QueueReplicator::bind(boost::shared_ptr<Queue>, const std::string&, const FieldTable*) { return false; } -bool QueueReplicator::unbind(boost::shared_ptr<Queue>, const std::string&, const FieldTable*) { return false; } +bool QueueReplicator::bind(boost::shared_ptr<Queue>, const std::string&, const FieldTable*, qpid::broker::AsyncStore* const) { return false; } +bool QueueReplicator::unbind(boost::shared_ptr<Queue>, const std::string&, const FieldTable*, qpid::broker::AsyncStore* const) { return false; } bool QueueReplicator::isBound(boost::shared_ptr<Queue>, const std::string* const, const FieldTable* const) { return false; } std::string QueueReplicator::getType() const { return TYPE_NAME; } diff --git a/cpp/src/qpid/ha/QueueReplicator.h b/cpp/src/qpid/ha/QueueReplicator.h index a2a158539e..8d8a41a5ba 100644 --- a/cpp/src/qpid/ha/QueueReplicator.h +++ b/cpp/src/qpid/ha/QueueReplicator.h @@ -71,9 +71,8 @@ class QueueReplicator : public broker::Exchange, void deactivate(); // Call before dtor std::string getType() const; - bool bind(boost::shared_ptr<broker::Queue - >, const std::string&, const framing::FieldTable*); - bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*); + bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*, qpid::broker::AsyncStore* const); + bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*, qpid::broker::AsyncStore* const); void route(broker::Deliverable&); bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const); diff --git a/cpp/src/qpid/management/ManagementTopicExchange.cpp b/cpp/src/qpid/management/ManagementTopicExchange.cpp index c8bfef3785..d1c2b1c34f 100644 --- a/cpp/src/qpid/management/ManagementTopicExchange.cpp +++ b/cpp/src/qpid/management/ManagementTopicExchange.cpp @@ -53,11 +53,12 @@ void ManagementTopicExchange::route(Deliverable& msg) bool ManagementTopicExchange::bind(Queue::shared_ptr queue, const std::string& routingKey, - const qpid::framing::FieldTable* args) + const qpid::framing::FieldTable* args, + AsyncStore* const store) { if (qmfVersion == 1) managementAgent->clientAdded(routingKey); - return TopicExchange::bind(queue, routingKey, args); + return TopicExchange::bind(queue, routingKey, args, store); } void ManagementTopicExchange::setManagmentAgent(ManagementAgent* agent, int qv) diff --git a/cpp/src/qpid/management/ManagementTopicExchange.h b/cpp/src/qpid/management/ManagementTopicExchange.h index 0d6b6ad50c..92894855fd 100644 --- a/cpp/src/qpid/management/ManagementTopicExchange.h +++ b/cpp/src/qpid/management/ManagementTopicExchange.h @@ -47,7 +47,8 @@ class ManagementTopicExchange : public virtual TopicExchange virtual bool bind(Queue::shared_ptr queue, const std::string& routingKey, - const qpid::framing::FieldTable* args); + const qpid::framing::FieldTable* args, + AsyncStore* const store); void setManagmentAgent(management::ManagementAgent* agent, int qmfVersion); diff --git a/cpp/src/qpid/xml/XmlExchange.cpp b/cpp/src/qpid/xml/XmlExchange.cpp index 22eeff41c5..55800ac464 100644 --- a/cpp/src/qpid/xml/XmlExchange.cpp +++ b/cpp/src/qpid/xml/XmlExchange.cpp @@ -116,7 +116,7 @@ XmlExchange::XmlExchange(const std::string& _name, bool _durable, mgmtExchange->set_type (typeName); } -bool XmlExchange::bind(Queue::shared_ptr queue, const std::string& bindingKey, const FieldTable* args) +bool XmlExchange::bind(Queue::shared_ptr queue, const std::string& bindingKey, const FieldTable* args, AsyncStore* const store) { // Federation uses bind for unbind and reorigin comands as well as for binds. @@ -136,7 +136,7 @@ bool XmlExchange::bind(Queue::shared_ptr queue, const std::string& bindingKey, c } if (fedOp == fedOpUnbind) { - return fedUnbind(fedOrigin, fedTags, queue, bindingKey, args); + return fedUnbind(fedOrigin, fedTags, queue, bindingKey, args, store); } else if (fedOp == fedOpReorigin) { fedReorigin(); @@ -176,7 +176,7 @@ bool XmlExchange::bind(Queue::shared_ptr queue, const std::string& bindingKey, c return true; } -bool XmlExchange::unbind(Queue::shared_ptr queue, const std::string& bindingKey, const FieldTable* args) +bool XmlExchange::unbind(Queue::shared_ptr queue, const std::string& bindingKey, const FieldTable* args, AsyncStore* const /*store*/) { /* * When called directly, no qpidFedOrigin argument will be @@ -383,11 +383,11 @@ void XmlExchange::propagateFedOp(const std::string& bindingKey, const std::strin Exchange::propagateFedOp(bindingKey, fedTags, fedOp, fedOrigin, propArgs); } -bool XmlExchange::fedUnbind(const std::string& fedOrigin, const std::string& fedTags, Queue::shared_ptr queue, const std::string& bindingKey, const FieldTable* args) +bool XmlExchange::fedUnbind(const std::string& fedOrigin, const std::string& fedTags, Queue::shared_ptr queue, const std::string& bindingKey, const FieldTable* args, AsyncStore* const store) { RWlock::ScopedRlock l(lock); - if (unbind(queue, bindingKey, args)) { + if (unbind(queue, bindingKey, args, store)) { propagateFedOp(bindingKey, fedTags, fedOpUnbind, fedOrigin); return true; } diff --git a/cpp/src/qpid/xml/XmlExchange.h b/cpp/src/qpid/xml/XmlExchange.h index a80588c7ab..1fb9439b9b 100644 --- a/cpp/src/qpid/xml/XmlExchange.h +++ b/cpp/src/qpid/xml/XmlExchange.h @@ -76,9 +76,9 @@ class XmlExchange : public virtual Exchange { virtual std::string getType() const { return typeName; } - virtual bool bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); + virtual bool bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args, AsyncStore* const store); - virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); + virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args, AsyncStore* const store); virtual void route(Deliverable& msg); @@ -86,7 +86,7 @@ class XmlExchange : public virtual Exchange { virtual void propagateFedOp(const std::string& bindingKey, const std::string& fedTags, const std::string& fedOp, const std::string& fedOrigin, const qpid::framing::FieldTable* args=0); - virtual bool fedUnbind(const std::string& fedOrigin, const std::string& fedTags, Queue::shared_ptr queue, const std::string& bindingKey, const qpid::framing::FieldTable* args); + virtual bool fedUnbind(const std::string& fedOrigin, const std::string& fedTags, Queue::shared_ptr queue, const std::string& bindingKey, const qpid::framing::FieldTable* args, AsyncStore* const store); virtual void fedReorigin(); diff --git a/cpp/src/tests/ExchangeTest.cpp b/cpp/src/tests/ExchangeTest.cpp index 4f18b91b5a..259860e6cd 100644 --- a/cpp/src/tests/ExchangeTest.cpp +++ b/cpp/src/tests/ExchangeTest.cpp @@ -51,12 +51,12 @@ QPID_AUTO_TEST_CASE(testMe) Queue::shared_ptr queue2(new Queue("queue2", true)); TopicExchange topic("topic"); - topic.bind(queue, "abc", 0); - topic.bind(queue2, "abc", 0); + topic.bind(queue, "abc", 0, 0); + topic.bind(queue2, "abc", 0, 0); DirectExchange direct("direct"); - direct.bind(queue, "abc", 0); - direct.bind(queue2, "abc", 0); + direct.bind(queue, "abc", 0, 0); + direct.bind(queue2, "abc", 0, 0); queue.reset(); queue2.reset(); @@ -78,9 +78,9 @@ QPID_AUTO_TEST_CASE(testIsBound) string k3("xyz"); FanOutExchange fanout("fanout"); - BOOST_CHECK(fanout.bind(a, "", 0)); - BOOST_CHECK(fanout.bind(b, "", 0)); - BOOST_CHECK(fanout.bind(c, "", 0)); + BOOST_CHECK(fanout.bind(a, "", 0, 0)); + BOOST_CHECK(fanout.bind(b, "", 0, 0)); + BOOST_CHECK(fanout.bind(c, "", 0, 0)); BOOST_CHECK(fanout.isBound(a, 0, 0)); BOOST_CHECK(fanout.isBound(b, 0, 0)); @@ -88,10 +88,10 @@ QPID_AUTO_TEST_CASE(testIsBound) BOOST_CHECK(!fanout.isBound(d, 0, 0)); DirectExchange direct("direct"); - BOOST_CHECK(direct.bind(a, k1, 0)); - BOOST_CHECK(direct.bind(a, k3, 0)); - BOOST_CHECK(direct.bind(b, k2, 0)); - BOOST_CHECK(direct.bind(c, k1, 0)); + BOOST_CHECK(direct.bind(a, k1, 0, 0)); + BOOST_CHECK(direct.bind(a, k3, 0, 0)); + BOOST_CHECK(direct.bind(b, k2, 0, 0)); + BOOST_CHECK(direct.bind(c, k1, 0, 0)); BOOST_CHECK(direct.isBound(a, 0, 0)); BOOST_CHECK(direct.isBound(a, &k1, 0)); @@ -106,10 +106,10 @@ QPID_AUTO_TEST_CASE(testIsBound) BOOST_CHECK(!direct.isBound(d, &k3, 0)); TopicExchange topic("topic"); - BOOST_CHECK(topic.bind(a, k1, 0)); - BOOST_CHECK(topic.bind(a, k3, 0)); - BOOST_CHECK(topic.bind(b, k2, 0)); - BOOST_CHECK(topic.bind(c, k1, 0)); + BOOST_CHECK(topic.bind(a, k1, 0, 0)); + BOOST_CHECK(topic.bind(a, k3, 0, 0)); + BOOST_CHECK(topic.bind(b, k2, 0, 0)); + BOOST_CHECK(topic.bind(c, k1, 0, 0)); BOOST_CHECK(topic.isBound(a, 0, 0)); BOOST_CHECK(topic.isBound(a, &k1, 0)); @@ -137,10 +137,10 @@ QPID_AUTO_TEST_CASE(testIsBound) args3.setString("c", "C"); args3.setInt("b", 6); - headers.bind(a, "", &args1); - headers.bind(a, "", &args3); - headers.bind(b, "", &args2); - headers.bind(c, "", &args1); + headers.bind(a, "", &args1, 0); + headers.bind(a, "", &args3, 0); + headers.bind(b, "", &args2, 0); + headers.bind(c, "", &args1, 0); BOOST_CHECK(headers.isBound(a, 0, 0)); BOOST_CHECK(headers.isBound(a, 0, &args1)); @@ -250,10 +250,10 @@ QPID_AUTO_TEST_CASE(testIVEOption) Queue::shared_ptr queue2(new Queue("queue2", true)); Queue::shared_ptr queue3(new Queue("queue3", true)); - BOOST_CHECK(direct.bind(queue, "abc", 0)); - BOOST_CHECK(fanout.bind(queue1, "abc", 0)); - BOOST_CHECK(header.bind(queue2, "", &args2)); - BOOST_CHECK(topic.bind(queue3, "abc", 0)); + BOOST_CHECK(direct.bind(queue, "abc", 0, 0)); + BOOST_CHECK(fanout.bind(queue1, "abc", 0, 0)); + BOOST_CHECK(header.bind(queue2, "", &args2, 0)); + BOOST_CHECK(topic.bind(queue3, "abc", 0, 0)); BOOST_CHECK_EQUAL(1u,queue->getMessageCount()); BOOST_CHECK_EQUAL(1u,queue1->getMessageCount()); diff --git a/cpp/src/tests/HeadersExchangeTest.cpp b/cpp/src/tests/HeadersExchangeTest.cpp index 40deb59c86..7c8ee4a2d9 100644 --- a/cpp/src/tests/HeadersExchangeTest.cpp +++ b/cpp/src/tests/HeadersExchangeTest.cpp @@ -109,7 +109,7 @@ QPID_AUTO_TEST_CASE(testBindNoXMatch) FieldTable args; try { //just checking this doesn't cause assertion etc - exchange.bind(queue, key, &args); + exchange.bind(queue, key, &args, 0); } catch(qpid::Exception&) { //expected } diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp index 7b7c653029..d86c18c38d 100644 --- a/cpp/src/tests/QueueTest.cpp +++ b/cpp/src/tests/QueueTest.cpp @@ -100,15 +100,15 @@ QPID_AUTO_TEST_CASE(testBound){ ExchangeRegistry exchanges; //establish bindings from exchange->queue and notify the queue as it is bound: Exchange::shared_ptr exchange1 = exchanges.declare("my-exchange-1", "direct").first; - exchange1->bind(queue, key, &args); + exchange1->bind(queue, key, &args, 0); queue->bound(exchange1->getName(), key, args); Exchange::shared_ptr exchange2 = exchanges.declare("my-exchange-2", "fanout").first; - exchange2->bind(queue, key, &args); + exchange2->bind(queue, key, &args, 0); queue->bound(exchange2->getName(), key, args); Exchange::shared_ptr exchange3 = exchanges.declare("my-exchange-3", "topic").first; - exchange3->bind(queue, key, &args); + exchange3->bind(queue, key, &args, 0); queue->bound(exchange3->getName(), key, args); //delete one of the exchanges: |