summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp38
1 files changed, 16 insertions, 22 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 6623d1cde0..d18fd452e4 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -56,8 +56,8 @@ ostream& operator <<(ostream& out, const Cluster::MemberMap& members) {
}
Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
- cpg(*this),
broker(&b),
+ cpg(*this),
name(name_),
url(url_),
self(cpg.self())
@@ -75,10 +75,7 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
}
}
-Cluster::~Cluster() {
- cpg.shutdown();
- dispatcher.join();
-}
+Cluster::~Cluster() {}
// local connection initializes plugins
void Cluster::initialize(broker::Connection& c) {
@@ -88,16 +85,16 @@ void Cluster::initialize(broker::Connection& c) {
}
void Cluster::leave() {
- if (!broker.get()) return; // Already left
- QPID_LOG(info, QPID_MSG("Leaving cluster " << *this));
- // Must not be called in the dispatch thread.
- assert(Thread::current().id() != dispatcher.id());
+ Mutex::ScopedLock l(lock);
+ if (!broker) return; // Already left.
+ assert(Thread::current().id() != dispatcher.id()); // Must not be called in the dispatch thread.
+ QPID_LOG(debug, "Leaving cluster " << *this);
cpg.leave(name);
- // Wait till final config-change is delivered and broker is released.
- {
- Mutex::ScopedLock l(lock);
- while(broker.get()) lock.wait();
- }
+ // The dispatch thread sets broker=0 when the final config-change
+ // is delivered.
+ while(broker) lock.wait();
+ cpg.shutdown();
+ dispatcher.join();
}
template <class T> void decodePtr(Buffer& buf, T*& ptr) {
@@ -115,7 +112,6 @@ void Cluster::send(const AMQFrame& frame, ConnectionInterceptor* connection) {
// FIXME aconway 2008-07-03: More efficient buffer management.
// Cache coded form of decoded frames for re-encoding?
Buffer buf(buffer);
- assert(frame.size() + 64 < sizeof(buffer));
frame.encode(buf);
encodePtr(buf, connection);
iovec iov = { buffer, buf.getPosition() };
@@ -145,6 +141,7 @@ ConnectionInterceptor* Cluster::getShadowConnection(const Cpg::Id& member, void*
if (i == shadowConnectionMap.end()) { // A new shadow connection.
std::ostringstream os;
os << name << ":" << member << ":" << remotePtr;
+ assert(broker);
broker::Connection* c = new broker::Connection(&shadowOut, *broker, os.str());
ShadowConnectionMap::value_type value(id, new ConnectionInterceptor(*c, *this, id));
i = shadowConnectionMap.insert(value).first;
@@ -169,8 +166,8 @@ void Cluster::deliver(
decodePtr(buf, connection);
QPID_LOG(trace, "DLVR [" << from << " " << connection << "] " << frame);
- if (!broker.get()) {
- QPID_LOG(warning, "Ignoring late DLVR, already left the cluster.");
+ if (!broker) {
+ QPID_LOG(warning, "Unexpected DLVR, already left the cluster.");
return;
}
@@ -232,11 +229,8 @@ void Cluster::configChange(
QPID_LOG(debug, "Cluster members: " << nCurrent << " ("<< nLeft << " left, " << nJoined << " joined):"
<< members);
assert(members.size() == size_t(nCurrent));
- if (members.find(self) == members.end()) {
- QPID_LOG(debug, "Left cluster " << *this);
- broker = 0; // Release broker reference.
- }
-
+ if (members.find(self) == members.end())
+ broker = 0; // We have left the group, this is the final config change.
lock.notifyAll(); // Threads waiting for membership changes.
}