diff options
author | Alan Conway <aconway@apache.org> | 2009-02-04 17:04:45 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2009-02-04 17:04:45 +0000 |
commit | 314eb1b65a752daaa80a2cb5174bac78c4643bcb (patch) | |
tree | a8fcbb5f9cc7d5af1cd5016f253c98296fa9f3bb /cpp/src | |
parent | 80c1c1da2855cc0c03d08a0fcb425c38b3344333 (diff) | |
download | qpid-python-314eb1b65a752daaa80a2cb5174bac78c4643bcb.tar.gz |
Cluster sets recovery flag on Broker for first member in cluster.
Disable recovery from local store if the recovery flag is not set.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@740793 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 15 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 55 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 17 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ClusterPlugin.cpp | 8 | ||||
-rwxr-xr-x | cpp/src/tests/run_acl_tests | 1 |
6 files changed, 69 insertions, 32 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index f692ff72f3..091f67ec58 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -149,6 +149,7 @@ Broker::Broker(const Broker::Options& conf) : *this), queueCleaner(queues, timer), queueEvents(poller), + recovery(true), getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this)) { if (conf.enableMgmt) { @@ -209,11 +210,17 @@ Broker::Broker(const Broker::Options& conf) : setStore (new NullMessageStore()); exchanges.declare(empty, DirectExchange::typeName); // Default exchange. - + if (store.get() != 0) { - RecoveryManagerImpl recoverer(queues, exchanges, links, dtxManager, - conf.stagingThreshold); - store->recover(recoverer); + // The cluster plug-in will setRecovery(false) on all but the first + // broker to join a cluster. + if (getRecovery()) { + RecoveryManagerImpl recoverer(queues, exchanges, links, dtxManager, + conf.stagingThreshold); + store->recover(recoverer); + } + else + QPID_LOG(notice, "Recovering from cluster, no recovery from local journal"); } //ensure standard exchanges exist (done after recovery from store) diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index c50ef46baa..71b69b51aa 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -139,6 +139,8 @@ class Broker : public sys::Runnable, public Plugin::Target, std::vector<Url> getKnownBrokersImpl(); std::string federationTag; + bool recovery; + public: @@ -223,6 +225,9 @@ class Broker : public sys::Runnable, public Plugin::Target, boost::function<std::vector<Url> ()> getKnownBrokers; static const std::string TCP_TRANSPORT; + + void setRecovery(bool set) { recovery = set; } + bool getRecovery() const { return recovery; } }; }} diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index eaa4a720b1..41688b5c49 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -21,28 +21,30 @@ #include "UpdateClient.h" #include "FailoverExchange.h" +#include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h" +#include "qmf/org/apache/qpid/cluster/Package.h" #include "qpid/broker/Broker.h" -#include "qpid/broker/SessionState.h" #include "qpid/broker/Connection.h" #include "qpid/broker/QueueRegistry.h" +#include "qpid/broker/SessionState.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/AMQP_AllOperations.h" #include "qpid/framing/AllInvoker.h" -#include "qpid/framing/ClusterUpdateRequestBody.h" -#include "qpid/framing/ClusterReadyBody.h" #include "qpid/framing/ClusterConfigChangeBody.h" -#include "qpid/framing/ClusterUpdateOfferBody.h" -#include "qpid/framing/ClusterShutdownBody.h" #include "qpid/framing/ClusterConnectionDeliverCloseBody.h" #include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h" -#include "qpid/log/Statement.h" +#include "qpid/framing/ClusterReadyBody.h" +#include "qpid/framing/ClusterShutdownBody.h" +#include "qpid/framing/ClusterUpdateOfferBody.h" +#include "qpid/framing/ClusterUpdateRequestBody.h" #include "qpid/log/Helpers.h" -#include "qpid/sys/Thread.h" -#include "qpid/sys/LatencyMetric.h" +#include "qpid/log/Statement.h" +#include "qpid/management/IdAllocator.h" +#include "qpid/management/ManagementBroker.h" #include "qpid/memory.h" #include "qpid/shared_ptr.h" -#include "qmf/org/apache/qpid/cluster/Package.h" -#include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h" +#include "qpid/sys/LatencyMetric.h" +#include "qpid/sys/Thread.h" #include <boost/bind.hpp> #include <boost/cast.hpp> @@ -101,11 +103,28 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b poller), connections(*this), decoder(boost::bind(&PollableFrameQueue::push, &deliverFrameQueue, _1)), + initialized(false), state(INIT), lastSize(0), lastBroker(false), sequence(0) { + failoverExchange.reset(new FailoverExchange(this)); + if (quorum_) quorum.init(); + cpg.join(name); + // pump the CPG dispatch manually till we get initialized. + while (!initialized) + cpg.dispatchOne(); +} + +Cluster::~Cluster() { + if (updateThread.id()) updateThread.join(); // Join the previous updatethread. +} + +void Cluster::initialize() { + if (myUrl.empty()) + myUrl = Url::getIpAddressesUrl(broker.getPort(broker::Broker::TCP_TRANSPORT)); + QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << myUrl); mAgent = ManagementAgent::Singleton::getInstance(); if (mAgent != 0){ _qmf::Package packageInit(mAgent); @@ -114,18 +133,11 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b mgmtObject->set_status("JOINING"); } broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this); - failoverExchange.reset(new FailoverExchange(this)); dispatcher.start(); deliverEventQueue.start(); deliverFrameQueue.start(); - QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << myUrl); - if (quorum_) quorum.init(); - cpg.join(name); - broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this)); // Must be last for exception safety. -} - -Cluster::~Cluster() { - if (updateThread.id()) updateThread.join(); // Join the previous updatethread. + // Add finalizer last for exception safety. + broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this)); } // Called in connection thread to insert a client connection. @@ -279,6 +291,11 @@ void Cluster::configChange ( cpg_address */*joined*/, int /*nJoined*/) { Mutex::ScopedLock l(lock); + if (state == INIT) { // First config change. + // Recover only if we are first in cluster. + broker.setRecovery(nCurrent == 1); + initialized = true; + } QPID_LOG(debug, *this << " config change: " << AddrList(current, nCurrent) << AddrList(left, nLeft, "( ", ")")); std::string addresses; diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index 1cfcd04c6f..f7955aa743 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -64,15 +64,17 @@ class Cluster : private Cpg::Handler, public management::Manageable { public: typedef boost::intrusive_ptr<Connection> ConnectionPtr; typedef std::vector<ConnectionPtr> Connections; - - /** - * Join a cluster. - */ + + /** Construct the cluster in plugin earlyInitialize */ Cluster(const std::string& name, const Url& url, broker::Broker&, bool useQuorum, size_t readMax, size_t writeEstimate); virtual ~Cluster(); + /** Join the cluster in plugin initialize. Requires transport + * plugins to be available.. */ + void initialize(); + // Connection map - called in connection threads. void addLocalConnection(const ConnectionPtr&); void addShadowConnection(const ConnectionPtr&); @@ -177,7 +179,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { boost::shared_ptr<sys::Poller> poller; Cpg cpg; const std::string name; - const Url myUrl; + Url myUrl; const MemberId myId; const size_t readMax; const size_t writeEstimate; @@ -197,7 +199,10 @@ class Cluster : private Cpg::Handler, public management::Manageable { // Called only from event delivery thread Decoder decoder; - + + // Used only during initialization + bool initialized; + // Remaining members are protected by lock mutable sys::Monitor lock; diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp index 1c15747c77..7e0bdcbea8 100644 --- a/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -136,13 +136,13 @@ struct ClusterPlugin : public Plugin { Options* getOptions() { return &options; } - void initialize(Plugin::Target& target) { + void earlyInitialize(Plugin::Target& target) { if (values.name.empty()) return; // Only if --cluster-name option was specified. Broker* broker = dynamic_cast<Broker*>(&target); if (!broker) return; cluster = new Cluster( values.name, - values.getUrl(broker->getPort(Broker::TCP_TRANSPORT)), + values.url.empty() ? Url() : Url(values.url), *broker, values.quorum, values.readMax, values.writeEstimate*1024 @@ -158,7 +158,9 @@ struct ClusterPlugin : public Plugin { } } - void earlyInitialize(Plugin::Target&) {} + void initialize(Plugin::Target& ) { + cluster->initialize(); + } }; static ClusterPlugin instance; // Static initialization. diff --git a/cpp/src/tests/run_acl_tests b/cpp/src/tests/run_acl_tests index 1cd322b401..4270e6545e 100755 --- a/cpp/src/tests/run_acl_tests +++ b/cpp/src/tests/run_acl_tests @@ -20,6 +20,7 @@ # # Run the acl tests. $srcdir is set by the Makefile. +set -x PYTHON_DIR=$srcdir/../../../python DATA_DIR=`pwd`/data_dir |