summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp9
-rw-r--r--cpp/src/qpid/cluster/ClusterPlugin.cpp3
-rw-r--r--cpp/src/qpid/cluster/ClusterSettings.h5
-rw-r--r--cpp/src/qpid/cluster/InitialStatusMap.cpp7
-rw-r--r--cpp/src/qpid/cluster/InitialStatusMap.h3
5 files changed, 13 insertions, 14 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 9756ad0a62..5e962e9767 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -208,7 +208,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
decoder(boost::bind(&Cluster::deliverFrame, this, _1)),
discarding(true),
state(INIT),
- initMap(self),
+ initMap(self, settings.size),
lastSize(0),
lastBroker(false),
updateRetracted(false),
@@ -403,8 +403,7 @@ void Cluster::flagError(
<< ": " << msg);
leave(l);
}
- else if (settings.checkErrors)
- error.error(connection, type, map.getFrameSeq(), map.getMembers(), msg);
+ error.error(connection, type, map.getFrameSeq(), map.getMembers(), msg);
}
// Handler for deliverFrameQueue.
@@ -423,7 +422,7 @@ void Cluster::deliveredFrame(const EventFrame& efConst) {
deliverEventQueue.start();
}
// Process each frame through the error checker.
- if (settings.checkErrors && error.isUnresolved()) {
+ if (error.isUnresolved()) {
error.delivered(e);
while (error.canProcess()) // There is a frame ready to process.
processFrame(error.getNext(), l);
@@ -874,7 +873,7 @@ std::ostream& operator<<(std::ostream& o, const Cluster& cluster) {
};
assert(sizeof(STATE)/sizeof(*STATE) == Cluster::LEFT+1);
o << "cluster(" << cluster.self << " " << STATE[cluster.state];
- if (cluster.settings.checkErrors && cluster.error.isUnresolved()) o << "/error";
+ if (cluster.error.isUnresolved()) o << "/error";
return o << ")";;
}
diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp
index 289ec672a8..4eec388866 100644
--- a/cpp/src/qpid/cluster/ClusterPlugin.cpp
+++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp
@@ -71,9 +71,8 @@ struct ClusterOptions : public Options {
#if HAVE_LIBCMAN_H
("cluster-cman", optValue(settings.quorum), "Integrate with Cluster Manager (CMAN) cluster.")
#endif
+ ("cluster-size", optValue(settings.size, "N"), "Wait for N cluster members before allowing clients to connect.")
("cluster-read-max", optValue(settings.readMax,"N"), "Experimental: flow-control limit reads per connection. 0=no limit.")
- // TODO aconway 2009-05-20: temporary, remove
- ("cluster-check-errors", optValue(settings.checkErrors, "yes|no"), "Enable/disable cluster error checks. Normally should be enabled.")
;
}
};
diff --git a/cpp/src/qpid/cluster/ClusterSettings.h b/cpp/src/qpid/cluster/ClusterSettings.h
index 8820e8a5ac..8e708aa139 100644
--- a/cpp/src/qpid/cluster/ClusterSettings.h
+++ b/cpp/src/qpid/cluster/ClusterSettings.h
@@ -34,10 +34,9 @@ struct ClusterSettings {
bool quorum;
size_t readMax;
std::string username, password, mechanism;
- bool checkErrors;
+ size_t size;
- ClusterSettings() : quorum(false), readMax(10),
- checkErrors(true) // TODO aconway 2009-05-20: remove this option.
+ ClusterSettings() : quorum(false), readMax(10), size(1)
{}
Url getUrl(uint16_t port) const {
diff --git a/cpp/src/qpid/cluster/InitialStatusMap.cpp b/cpp/src/qpid/cluster/InitialStatusMap.cpp
index 6d27b3ae72..f2251f4043 100644
--- a/cpp/src/qpid/cluster/InitialStatusMap.cpp
+++ b/cpp/src/qpid/cluster/InitialStatusMap.cpp
@@ -28,8 +28,8 @@ using namespace boost;
namespace qpid {
namespace cluster {
-InitialStatusMap::InitialStatusMap(const MemberId& self_)
- : self(self_), completed(), resendNeeded()
+InitialStatusMap::InitialStatusMap(const MemberId& self_, size_t size_)
+ : self(self_), completed(), resendNeeded(), size(size_)
{}
void InitialStatusMap::configChange(const MemberSet& members) {
@@ -83,7 +83,8 @@ bool InitialStatusMap::isActive(const Map::value_type& v) {
}
bool InitialStatusMap::isComplete() {
- return !map.empty() && find_if(map.begin(), map.end(), &notInitialized) == map.end();
+ return !map.empty() && find_if(map.begin(), map.end(), &notInitialized) == map.end()
+ && (map.size() >= size);
}
bool InitialStatusMap::transitionToComplete() {
diff --git a/cpp/src/qpid/cluster/InitialStatusMap.h b/cpp/src/qpid/cluster/InitialStatusMap.h
index 4605a4c1fe..9e9b71e363 100644
--- a/cpp/src/qpid/cluster/InitialStatusMap.h
+++ b/cpp/src/qpid/cluster/InitialStatusMap.h
@@ -37,7 +37,7 @@ class InitialStatusMap
public:
typedef framing::ClusterInitialStatusBody Status;
- InitialStatusMap(const MemberId& self);
+ InitialStatusMap(const MemberId& self, size_t size);
/** Process a config change. @return true if we need to re-send our status */
void configChange(const MemberSet& newConfig);
/** @return true if we need to re-send status */
@@ -71,6 +71,7 @@ class InitialStatusMap
MemberSet firstConfig;
MemberId self;
bool completed, resendNeeded;
+ size_t size;
};
}} // namespace qpid::cluster