diff options
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 9 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ClusterPlugin.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ClusterSettings.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/InitialStatusMap.cpp | 7 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/InitialStatusMap.h | 3 |
5 files changed, 13 insertions, 14 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 9756ad0a62..5e962e9767 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -208,7 +208,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : decoder(boost::bind(&Cluster::deliverFrame, this, _1)), discarding(true), state(INIT), - initMap(self), + initMap(self, settings.size), lastSize(0), lastBroker(false), updateRetracted(false), @@ -403,8 +403,7 @@ void Cluster::flagError( << ": " << msg); leave(l); } - else if (settings.checkErrors) - error.error(connection, type, map.getFrameSeq(), map.getMembers(), msg); + error.error(connection, type, map.getFrameSeq(), map.getMembers(), msg); } // Handler for deliverFrameQueue. @@ -423,7 +422,7 @@ void Cluster::deliveredFrame(const EventFrame& efConst) { deliverEventQueue.start(); } // Process each frame through the error checker. - if (settings.checkErrors && error.isUnresolved()) { + if (error.isUnresolved()) { error.delivered(e); while (error.canProcess()) // There is a frame ready to process. processFrame(error.getNext(), l); @@ -874,7 +873,7 @@ std::ostream& operator<<(std::ostream& o, const Cluster& cluster) { }; assert(sizeof(STATE)/sizeof(*STATE) == Cluster::LEFT+1); o << "cluster(" << cluster.self << " " << STATE[cluster.state]; - if (cluster.settings.checkErrors && cluster.error.isUnresolved()) o << "/error"; + if (cluster.error.isUnresolved()) o << "/error"; return o << ")";; } diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp index 289ec672a8..4eec388866 100644 --- a/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -71,9 +71,8 @@ struct ClusterOptions : public Options { #if HAVE_LIBCMAN_H ("cluster-cman", optValue(settings.quorum), "Integrate with Cluster Manager (CMAN) cluster.") #endif + ("cluster-size", optValue(settings.size, "N"), "Wait for N cluster members before allowing clients to connect.") ("cluster-read-max", optValue(settings.readMax,"N"), "Experimental: flow-control limit reads per connection. 0=no limit.") - // TODO aconway 2009-05-20: temporary, remove - ("cluster-check-errors", optValue(settings.checkErrors, "yes|no"), "Enable/disable cluster error checks. Normally should be enabled.") ; } }; diff --git a/cpp/src/qpid/cluster/ClusterSettings.h b/cpp/src/qpid/cluster/ClusterSettings.h index 8820e8a5ac..8e708aa139 100644 --- a/cpp/src/qpid/cluster/ClusterSettings.h +++ b/cpp/src/qpid/cluster/ClusterSettings.h @@ -34,10 +34,9 @@ struct ClusterSettings { bool quorum; size_t readMax; std::string username, password, mechanism; - bool checkErrors; + size_t size; - ClusterSettings() : quorum(false), readMax(10), - checkErrors(true) // TODO aconway 2009-05-20: remove this option. + ClusterSettings() : quorum(false), readMax(10), size(1) {} Url getUrl(uint16_t port) const { diff --git a/cpp/src/qpid/cluster/InitialStatusMap.cpp b/cpp/src/qpid/cluster/InitialStatusMap.cpp index 6d27b3ae72..f2251f4043 100644 --- a/cpp/src/qpid/cluster/InitialStatusMap.cpp +++ b/cpp/src/qpid/cluster/InitialStatusMap.cpp @@ -28,8 +28,8 @@ using namespace boost; namespace qpid { namespace cluster { -InitialStatusMap::InitialStatusMap(const MemberId& self_) - : self(self_), completed(), resendNeeded() +InitialStatusMap::InitialStatusMap(const MemberId& self_, size_t size_) + : self(self_), completed(), resendNeeded(), size(size_) {} void InitialStatusMap::configChange(const MemberSet& members) { @@ -83,7 +83,8 @@ bool InitialStatusMap::isActive(const Map::value_type& v) { } bool InitialStatusMap::isComplete() { - return !map.empty() && find_if(map.begin(), map.end(), ¬Initialized) == map.end(); + return !map.empty() && find_if(map.begin(), map.end(), ¬Initialized) == map.end() + && (map.size() >= size); } bool InitialStatusMap::transitionToComplete() { diff --git a/cpp/src/qpid/cluster/InitialStatusMap.h b/cpp/src/qpid/cluster/InitialStatusMap.h index 4605a4c1fe..9e9b71e363 100644 --- a/cpp/src/qpid/cluster/InitialStatusMap.h +++ b/cpp/src/qpid/cluster/InitialStatusMap.h @@ -37,7 +37,7 @@ class InitialStatusMap public: typedef framing::ClusterInitialStatusBody Status; - InitialStatusMap(const MemberId& self); + InitialStatusMap(const MemberId& self, size_t size); /** Process a config change. @return true if we need to re-send our status */ void configChange(const MemberSet& newConfig); /** @return true if we need to re-send status */ @@ -71,6 +71,7 @@ class InitialStatusMap MemberSet firstConfig; MemberId self; bool completed, resendNeeded; + size_t size; }; }} // namespace qpid::cluster |