diff options
Diffstat (limited to 'qpid')
-rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.cpp | 36 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/MessageStore.h | 54 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/MessageStoreModule.cpp | 15 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/MessageStoreModule.h | 11 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/NullMessageStore.cpp | 21 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/NullMessageStore.h | 9 |
6 files changed, 83 insertions, 63 deletions
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index a5379634b6..c43eca6e5b 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -114,11 +114,11 @@ Broker::Options::Options(const std::string& name) : ("staging-threshold", optValue(stagingThreshold, "N"), "Stages messages over N bytes to disk") ("mgmt-enable,m", optValue(enableMgmt,"yes|no"), "Enable Management") ("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"), "Management Publish Interval") - ("queue-purge-interval", optValue(queueCleanInterval, "SECONDS"), + ("queue-purge-interval", optValue(queueCleanInterval, "SECONDS"), "Interval between attempts to purge any expired messages from queues") ("auth", optValue(auth, "yes|no"), "Enable authentication, if disabled all incoming connections will be trusted") ("realm", optValue(realm, "REALM"), "Use the given realm when performing authentication") - ("default-queue-limit", optValue(queueLimit, "BYTES"), "Default maximum size for queues (in bytes)") + ("default-queue-limit", optValue(queueLimit, "BYTES"), "Default maximum size for queues (in bytes)") ("tcp-nodelay", optValue(tcpNoDelay), "Set TCP_NODELAY on TCP connections") ("require-encryption", optValue(requireEncrypted), "Only accept connections that are encrypted") ("known-hosts-url", optValue(knownHosts, "URL or 'none'"), "URL to send as 'known-hosts' to clients ('none' implies empty list)") @@ -176,7 +176,7 @@ Broker::Broker(const Broker::Options& conf) : mgmtObject->set_dataDir(dataDir.getPath()); else mgmtObject->clr_dataDir(); - + managementAgent->addObject(mgmtObject, 0x1000000000000002LL); // Since there is currently no support for virtual hosts, a placeholder object @@ -218,12 +218,14 @@ Broker::Broker(const Broker::Options& conf) : // The cluster plug-in will setRecovery(false) on all but the first // broker to join a cluster. if (getRecovery()) { - RecoveryManagerImpl recoverer(queues, exchanges, links, dtxManager, + RecoveryManagerImpl recoverer(queues, exchanges, links, dtxManager, conf.stagingThreshold); store->recover(recoverer); } - else - QPID_LOG(notice, "Recovering from cluster, no recovery from local journal"); + else { + QPID_LOG(notice, "Cluster recovery: recovered journal data discarded and journal files pushed down"); + store->discardInit(true); + } } //ensure standard exchanges exist (done after recovery from store) @@ -266,11 +268,11 @@ Broker::Broker(const Broker::Options& conf) : //initialize known broker urls (TODO: add support for urls for other transports (SSL, RDMA)): if (conf.knownHosts.empty()) { boost::shared_ptr<ProtocolFactory> factory = getProtocolFactory(TCP_TRANSPORT); - if (factory) { + if (factory) { knownBrokers.push_back ( qpid::Url::getIpAddressesUrl ( factory->getPort() ) ); } } else if (conf.knownHosts != knownHostsNone) { - knownBrokers.push_back(Url(conf.knownHosts)); + knownBrokers.push_back(Url(conf.knownHosts)); } } @@ -284,14 +286,14 @@ void Broker::declareStandardExchange(const std::string& name, const std::string& } -boost::intrusive_ptr<Broker> Broker::create(int16_t port) +boost::intrusive_ptr<Broker> Broker::create(int16_t port) { Options config; config.port=port; return create(config); } -boost::intrusive_ptr<Broker> Broker::create(const Options& opts) +boost::intrusive_ptr<Broker> Broker::create(const Options& opts) { return boost::intrusive_ptr<Broker>(new Broker(opts)); } @@ -398,7 +400,7 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, } boost::shared_ptr<ProtocolFactory> Broker::getProtocolFactory(const std::string& name) const { - ProtocolFactoryMap::const_iterator i + ProtocolFactoryMap::const_iterator i = name.empty() ? protocolFactories.begin() : protocolFactories.find(name); if (i == protocolFactories.end()) return boost::shared_ptr<ProtocolFactory>(); else return i->second; @@ -406,7 +408,7 @@ boost::shared_ptr<ProtocolFactory> Broker::getProtocolFactory(const std::string& uint16_t Broker::getPort(const std::string& name) const { boost::shared_ptr<ProtocolFactory> factory = getProtocolFactory(name); - if (factory) { + if (factory) { return factory->getPort(); } else { throw NoSuchTransportException(QPID_MSG("No such transport: '" << name << "'")); @@ -443,8 +445,8 @@ void Broker::connect( connect(addr->host, addr->port, TCP_TRANSPORT, failed, f); } -uint32_t Broker::queueMoveMessages( - const std::string& srcQueue, +uint32_t Broker::queueMoveMessages( + const std::string& srcQueue, const std::string& destQueue, uint32_t qty) { @@ -461,7 +463,7 @@ uint32_t Broker::queueMoveMessages( boost::shared_ptr<sys::Poller> Broker::getPoller() { return poller; } -std::vector<Url> +std::vector<Url> Broker::getKnownBrokersImpl() { return knownBrokers; diff --git a/qpid/cpp/src/qpid/broker/MessageStore.h b/qpid/cpp/src/qpid/broker/MessageStore.h index 4c4c21dfba..3d8bbbb02c 100644 --- a/qpid/cpp/src/qpid/broker/MessageStore.h +++ b/qpid/cpp/src/qpid/broker/MessageStore.h @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -46,14 +46,26 @@ class MessageStore : public TransactionalStore, public Recoverable { public: /** - * init the store, call before any other call. If not called, store + * init the store, call before any other call. If not called, store * is free to pick any defaults - * + * * @param options Options object provided by concrete store plug in. */ virtual bool init(const Options* options) = 0; /** + * If called after init() but before recovery, will discard the database + * and reinitialize using an empty store dir. If the parameter pushDownStoreFiles + * is true, the content of the store dir will be moved to a backup dir inside the + * store dir. This is used when cluster nodes recover and must get thier content + * from a cluster sync rather than directly fromt the store. + * + * @param pushDownStoreFiles If true, will move content of the store dir into a + * subdir, leaving the store dir otherwise empty. + */ + virtual void discardInit(const bool pushDownStoreFiles = false) = 0; + + /** * Record the existence of a durable queue */ virtual void create(PersistableQueue& queue, @@ -62,7 +74,7 @@ class MessageStore : public TransactionalStore, public Recoverable { * Destroy a durable queue */ virtual void destroy(PersistableQueue& queue) = 0; - + /** * Record the existence of a durable exchange */ @@ -72,17 +84,17 @@ class MessageStore : public TransactionalStore, public Recoverable { * Destroy a durable exchange */ virtual void destroy(const PersistableExchange& exchange) = 0; - + /** * Record a binding */ - virtual void bind(const PersistableExchange& exchange, const PersistableQueue& queue, + virtual void bind(const PersistableExchange& exchange, const PersistableQueue& queue, const std::string& key, const framing::FieldTable& args) = 0; /** * Forget a binding */ - virtual void unbind(const PersistableExchange& exchange, const PersistableQueue& queue, + virtual void unbind(const PersistableExchange& exchange, const PersistableQueue& queue, const std::string& key, const framing::FieldTable& args) = 0; /** @@ -102,10 +114,10 @@ class MessageStore : public TransactionalStore, public Recoverable { * point). If the message has not yet been stored it will * store the headers as well as any content passed in. A * persistence id will be set on the message which can be - * used to load the content or to append to it. + * used to load the content or to append to it. */ virtual void stage(const boost::intrusive_ptr<PersistableMessage>& msg) = 0; - + /** * Destroys a previously staged message. This only needs * to be called if the message is never enqueued. (Once @@ -119,7 +131,7 @@ class MessageStore : public TransactionalStore, public Recoverable { */ virtual void appendContent(const boost::intrusive_ptr<const PersistableMessage>& msg, const std::string& data) = 0; - + /** * Loads (a section) of content data for the specified * message (previously stored through a call to stage or @@ -128,18 +140,18 @@ class MessageStore : public TransactionalStore, public Recoverable { * content should be loaded, not the headers or related * meta-data). */ - virtual void loadContent(const qpid::broker::PersistableQueue& queue, + virtual void loadContent(const qpid::broker::PersistableQueue& queue, const boost::intrusive_ptr<const PersistableMessage>& msg, std::string& data, uint64_t offset, uint32_t length) = 0; - + /** * Enqueues a message, storing the message if it has not * been previously stored and recording that the given - * message is on the given queue. + * message is on the given queue. * * Note: that this is async so the return of the function does * not mean the opperation is complete. - * + * * @param msg the message to enqueue * @param queue the name of the queue onto which it is to be enqueued * @param xid (a pointer to) an identifier of the @@ -149,7 +161,7 @@ class MessageStore : public TransactionalStore, public Recoverable { virtual void enqueue(TransactionContext* ctxt, const boost::intrusive_ptr<PersistableMessage>& msg, const PersistableQueue& queue) = 0; - + /** * Dequeues a message, recording that the given message is * no longer on the given queue and deleting the message @@ -157,7 +169,7 @@ class MessageStore : public TransactionalStore, public Recoverable { * * Note: that this is async so the return of the function does * not mean the opperation is complete. - * + * * @param msg the message to dequeue * @param queue the name of the queue from which it is to be dequeued * @param xid (a pointer to) an identifier of the @@ -173,22 +185,22 @@ class MessageStore : public TransactionalStore, public Recoverable { * * Note: that this is async so the return of the function does * not mean the opperation is complete. - * + * * @param queue the name of the queue from which it is to be dequeued */ virtual void flush(const qpid::broker::PersistableQueue& queue)=0; /** * Returns the number of outstanding AIO's for a given queue - * - * If 0, than all the enqueue / dequeues have been stored + * + * If 0, than all the enqueue / dequeues have been stored * to disk * * @param queue the name of the queue to check for outstanding AIO */ virtual uint32_t outstandingQueueAIO(const PersistableQueue& queue) = 0; - + virtual ~MessageStore(){} }; diff --git a/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp b/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp index 96186d508b..66ce73e8a0 100644 --- a/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp +++ b/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -41,6 +41,11 @@ MessageStoreModule::~MessageStoreModule() bool MessageStoreModule::init(const Options*) { return true; } +void MessageStoreModule::discardInit(const bool pushDownStoreFiles) +{ + TRANSFER_EXCEPTION(store->discardInit(pushDownStoreFiles)); +} + void MessageStoreModule::create(PersistableQueue& queue, const FieldTable& args) { TRANSFER_EXCEPTION(store->create(queue, args)); @@ -61,13 +66,13 @@ void MessageStoreModule::destroy(const PersistableExchange& exchange) TRANSFER_EXCEPTION(store->destroy(exchange)); } -void MessageStoreModule::bind(const PersistableExchange& e, const PersistableQueue& q, +void MessageStoreModule::bind(const PersistableExchange& e, const PersistableQueue& q, const std::string& k, const framing::FieldTable& a) { TRANSFER_EXCEPTION(store->bind(e, q, k, a)); } -void MessageStoreModule::unbind(const PersistableExchange& e, const PersistableQueue& q, +void MessageStoreModule::unbind(const PersistableExchange& e, const PersistableQueue& q, const std::string& k, const framing::FieldTable& a) { TRANSFER_EXCEPTION(store->unbind(e, q, k, a)); @@ -105,7 +110,7 @@ void MessageStoreModule::appendContent(const intrusive_ptr<const PersistableMess } void MessageStoreModule::loadContent( - const qpid::broker::PersistableQueue& queue, + const qpid::broker::PersistableQueue& queue, const intrusive_ptr<const PersistableMessage>& msg, string& data, uint64_t offset, uint32_t length) { diff --git a/qpid/cpp/src/qpid/broker/MessageStoreModule.h b/qpid/cpp/src/qpid/broker/MessageStoreModule.h index 0b51610a46..774bc8f8eb 100644 --- a/qpid/cpp/src/qpid/broker/MessageStoreModule.h +++ b/qpid/cpp/src/qpid/broker/MessageStoreModule.h @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -40,6 +40,7 @@ class MessageStoreModule : public MessageStore MessageStoreModule(MessageStore* store); bool init(const Options* options); + void discardInit(const bool pushDownStoreFiles = false); std::auto_ptr<TransactionContext> begin(); std::auto_ptr<TPCTransactionContext> begin(const std::string& xid); void prepare(TPCTransactionContext& txn); @@ -51,9 +52,9 @@ class MessageStoreModule : public MessageStore void destroy(PersistableQueue& queue); void create(const PersistableExchange& exchange, const framing::FieldTable& args); void destroy(const PersistableExchange& exchange); - void bind(const PersistableExchange& exchange, const PersistableQueue& queue, + void bind(const PersistableExchange& exchange, const PersistableQueue& queue, const std::string& key, const framing::FieldTable& args); - void unbind(const PersistableExchange& exchange, const PersistableQueue& queue, + void unbind(const PersistableExchange& exchange, const PersistableQueue& queue, const std::string& key, const framing::FieldTable& args); void create(const PersistableConfig& config); void destroy(const PersistableConfig& config); @@ -61,7 +62,7 @@ class MessageStoreModule : public MessageStore void stage(const boost::intrusive_ptr<PersistableMessage>& msg); void destroy(PersistableMessage& msg); void appendContent(const boost::intrusive_ptr<const PersistableMessage>& msg, const std::string& data); - void loadContent(const qpid::broker::PersistableQueue& queue, + void loadContent(const qpid::broker::PersistableQueue& queue, const boost::intrusive_ptr<const PersistableMessage>& msg, std::string& data, uint64_t offset, uint32_t length); diff --git a/qpid/cpp/src/qpid/broker/NullMessageStore.cpp b/qpid/cpp/src/qpid/broker/NullMessageStore.cpp index ad0143ce43..62b546b3eb 100644 --- a/qpid/cpp/src/qpid/broker/NullMessageStore.cpp +++ b/qpid/cpp/src/qpid/broker/NullMessageStore.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -36,12 +36,12 @@ const std::string nullxid = ""; class SimpleDummyCtxt : public TransactionContext {}; -class DummyCtxt : public TPCTransactionContext +class DummyCtxt : public TPCTransactionContext { const std::string xid; public: DummyCtxt(const std::string& _xid) : xid(_xid) {} - static std::string getXid(TransactionContext& ctxt) + static std::string getXid(TransactionContext& ctxt) { DummyCtxt* c(dynamic_cast<DummyCtxt*>(&ctxt)); return c ? c->xid : nullxid; @@ -54,22 +54,21 @@ NullMessageStore::NullMessageStore() : nextPersistenceId(1) { bool NullMessageStore::init(const Options* /*options*/) {return true;} +void NullMessageStore::discardInit(const bool /*pushDownStoreFiles*/) {} + void NullMessageStore::create(PersistableQueue& queue, const framing::FieldTable& /*args*/) { queue.setPersistenceId(nextPersistenceId++); } -void NullMessageStore::destroy(PersistableQueue&) -{ -} +void NullMessageStore::destroy(PersistableQueue&) {} void NullMessageStore::create(const PersistableExchange& exchange, const framing::FieldTable& /*args*/) { exchange.setPersistenceId(nextPersistenceId++); } -void NullMessageStore::destroy(const PersistableExchange& ) -{} +void NullMessageStore::destroy(const PersistableExchange& ) {} void NullMessageStore::bind(const PersistableExchange&, const PersistableQueue&, const std::string&, const framing::FieldTable&){} @@ -92,7 +91,7 @@ void NullMessageStore::appendContent(const intrusive_ptr<const PersistableMessag void NullMessageStore::loadContent(const qpid::broker::PersistableQueue&, const intrusive_ptr<const PersistableMessage>&, - string&, uint64_t, uint32_t) + string&, uint64_t, uint32_t) { throw qpid::framing::InternalErrorException("Can't load content; persistence not enabled"); } @@ -101,7 +100,7 @@ void NullMessageStore::enqueue(TransactionContext*, const intrusive_ptr<PersistableMessage>& msg, const PersistableQueue&) { - msg->enqueueComplete(); + msg->enqueueComplete(); } void NullMessageStore::dequeue(TransactionContext*, diff --git a/qpid/cpp/src/qpid/broker/NullMessageStore.h b/qpid/cpp/src/qpid/broker/NullMessageStore.h index a44f8d2804..552ea05272 100644 --- a/qpid/cpp/src/qpid/broker/NullMessageStore.h +++ b/qpid/cpp/src/qpid/broker/NullMessageStore.h @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -42,6 +42,7 @@ class NullMessageStore : public MessageStore QPID_BROKER_EXTERN NullMessageStore(); QPID_BROKER_EXTERN virtual bool init(const Options* options); + QPID_BROKER_EXTERN virtual void discardInit(const bool pushDownStoreFiles = false); QPID_BROKER_EXTERN virtual std::auto_ptr<TransactionContext> begin(); QPID_BROKER_EXTERN virtual std::auto_ptr<TPCTransactionContext> begin(const std::string& xid); QPID_BROKER_EXTERN virtual void prepare(TPCTransactionContext& txn); @@ -57,11 +58,11 @@ class NullMessageStore : public MessageStore QPID_BROKER_EXTERN virtual void destroy(const PersistableExchange& exchange); QPID_BROKER_EXTERN virtual void bind(const PersistableExchange& exchange, - const PersistableQueue& queue, + const PersistableQueue& queue, const std::string& key, const framing::FieldTable& args); QPID_BROKER_EXTERN virtual void unbind(const PersistableExchange& exchange, - const PersistableQueue& queue, + const PersistableQueue& queue, const std::string& key, const framing::FieldTable& args); QPID_BROKER_EXTERN virtual void create(const PersistableConfig& config); |