summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-09-26 19:22:00 +0000
committerAlan Conway <aconway@apache.org>2008-09-26 19:22:00 +0000
commit44366590caa13db09e55e2c853bd66b363558fa7 (patch)
treee6ecd10c956fee5f5e865670911ea9ef46e1ed76 /cpp/src
parentb22dd47558cc11572d080ac25808012092dda597 (diff)
downloadqpid-python-44366590caa13db09e55e2c853bd66b363558fa7.tar.gz
cluster:]
- call updateMemberStats() exactly once for each change in cluster membership. - fix spurious replication of catch-up connection close events. Removed unused client/MessageQueue.h git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@699456 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/Makefile.am1
-rw-r--r--cpp/src/qpid/client/MessageQueue.h50
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp6
-rw-r--r--cpp/src/qpid/cluster/ClusterMap.cpp49
-rw-r--r--cpp/src/qpid/cluster/ClusterMap.h15
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp33
-rw-r--r--cpp/src/qpid/cluster/JoiningHandler.cpp9
-rw-r--r--cpp/src/qpid/cluster/MemberHandler.cpp11
8 files changed, 70 insertions, 104 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 52087cbfaa..037a284822 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -507,7 +507,6 @@ nobase_include_HEADERS = \
qpid/client/LocalQueue.h \
qpid/client/Message.h \
qpid/client/MessageListener.h \
- qpid/client/MessageQueue.h \
qpid/client/Results.h \
qpid/client/SessionBase_0_10.h \
qpid/client/Session.h \
diff --git a/cpp/src/qpid/client/MessageQueue.h b/cpp/src/qpid/client/MessageQueue.h
deleted file mode 100644
index ab6d351ba7..0000000000
--- a/cpp/src/qpid/client/MessageQueue.h
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#ifndef _MessageQueue_
-#define _MessageQueue_
-#include <iostream>
-#include "qpid/sys/BlockingQueue.h"
-#include "MessageListener.h"
-
-namespace qpid {
-namespace client {
-
-/**
- * A MessageListener implementation that queues up
- * messages.
- *
- * \ingroup clientapi
- */
-class MessageQueue : public sys::BlockingQueue<Message>, public MessageListener
-{
- public:
- void received(Message& msg)
- {
- push(msg);
- }
-};
-
-}
-}
-
-
-#endif
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index c1775616a8..9cd8d1842c 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -248,7 +248,7 @@ void Cluster::configChange(
cpg_address *joined, int nJoined)
{
Mutex::ScopedLock l(lock);
- QPID_LOG(debug, "CPG members: " << AddrList(current, nCurrent)
+ QPID_LOG(debug, "Process members: " << AddrList(current, nCurrent)
<< AddrList(left, nLeft, "( ", ")"));
if (find(left, left+nLeft, self) != left+nLeft) {
@@ -258,7 +258,7 @@ void Cluster::configChange(
return;
}
- map.left(left, nLeft);
+ if (map.left(left, nLeft)) updateMemberStats();
handler->configChange(current, nCurrent, left, nLeft, joined, nJoined);
}
@@ -326,7 +326,7 @@ void Cluster::stopFullCluster(void) {
mcastControl(ClusterShutdownBody(), 0);
}
-void Cluster::updateMemberStats(void) {
+void Cluster::updateMemberStats() {
if (mgmtObject) {
mgmtObject->set_clusterSize(size());
std::vector<Url> vectUrl = getUrls();
diff --git a/cpp/src/qpid/cluster/ClusterMap.cpp b/cpp/src/qpid/cluster/ClusterMap.cpp
index 20d6fbb21e..c721749aba 100644
--- a/cpp/src/qpid/cluster/ClusterMap.cpp
+++ b/cpp/src/qpid/cluster/ClusterMap.cpp
@@ -39,11 +39,14 @@ MemberId ClusterMap::first() const {
return (members.empty()) ? MemberId() : members.begin()->first;
}
-void ClusterMap::left(const cpg_address* addrs, size_t size) {
- size_t (Members::*erase)(const MemberId&) = &Members::erase;
- std::for_each(addrs, addrs+size, boost::bind(erase, &members, _1));
+bool ClusterMap::left(const cpg_address* addrs, size_t nLeft) {
+ bool changed=false;
+ for (const cpg_address* a = addrs; a < addrs+nLeft; ++a)
+ changed = members.erase(*a) || changed;
if (dumper && !isMember(dumper))
dumper = MemberId();
+ QPID_LOG_IF(debug, changed, *this);
+ return changed;
}
framing::ClusterUpdateBody ClusterMap::toControl() const {
@@ -54,11 +57,17 @@ framing::ClusterUpdateBody ClusterMap::toControl() const {
return b;
}
-void ClusterMap::update(const framing::FieldTable& ftMembers, uint64_t dumper_) {
- framing:: FieldTable::ValueMap::const_iterator i;
- for (i = ftMembers.begin(); i != ftMembers.end(); ++i)
- members[i->first] = Url(i->second->get<std::string>());
+bool ClusterMap::update(const framing::FieldTable& ftMembers, uint64_t dumper_) {
dumper = MemberId(dumper_);
+ bool changed = false;
+ framing:: FieldTable::ValueMap::const_iterator i;
+ for (i = ftMembers.begin(); i != ftMembers.end(); ++i) {
+ MemberId id(i->first);
+ Url url(i->second->get<std::string>());
+ changed = members.insert(Members::value_type(id, url)).second || changed;
+ }
+ QPID_LOG_IF(debug, changed, *this);
+ return changed;
}
std::vector<Url> ClusterMap::memberUrls() const {
@@ -68,14 +77,12 @@ std::vector<Url> ClusterMap::memberUrls() const {
return result;
}
-std::ostream& operator<<(std::ostream& o, const ClusterMap::Members::value_type& mv) {
- return o << mv.first << "=" << mv.second;
-}
-
std::ostream& operator<<(std::ostream& o, const ClusterMap& m) {
- std::ostream_iterator<ClusterMap::Members::value_type> im(o, "\n ");
- o << "dumper=" << m.dumper << ", members:\n ";
- std::copy(m.members.begin(), m.members.end(), im);
+ o << "Broker members:";
+ for (ClusterMap::Members::const_iterator i=m.members.begin(); i != m.members.end(); ++i) {
+ o << " " << i->first;
+ if (i->first == m.dumper) o << "(dumping)";
+ }
return o;
}
@@ -83,11 +90,17 @@ bool ClusterMap::sendUpdate(const MemberId& id) const {
return dumper==id || (!dumper && first() == id);
}
-void ClusterMap::ready(const MemberId& id, const Url& url) {
- members[id] = url;
- if (id == dumper)
+bool ClusterMap::ready(const MemberId& id, const Url& url) {
+ bool changed = members.insert(Members::value_type(id,url)).second;
+ if (id == dumper) {
dumper = MemberId();
- QPID_LOG(info, id << " joined cluster: " << *this);
+ QPID_LOG(info, id << " finished dump.");
+ }
+ else {
+ QPID_LOG(info, id << " joined cluster, url=" << url);
+ }
+ QPID_LOG_IF(debug, changed, *this);
+ return changed;
}
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/ClusterMap.h b/cpp/src/qpid/cluster/ClusterMap.h
index b00f818f88..60fef75f0e 100644
--- a/cpp/src/qpid/cluster/ClusterMap.h
+++ b/cpp/src/qpid/cluster/ClusterMap.h
@@ -50,17 +50,21 @@ class ClusterMap {
/** First member of the cluster in ID order, gets to perform one-off tasks. */
MemberId first() const;
- /** Update for members leaving. */
- void left(const cpg_address* addrs, size_t size);
+ /** Update for members leaving.
+ *@return true if the cluster membership changed.
+ */
+ bool left(const cpg_address* addrs, size_t size);
/** Convert map contents to a cluster update body. */
framing::ClusterUpdateBody toControl() const;
/** Add a new member or dump complete if id == dumper. */
- void ready(const MemberId& id, const Url& url);
+ bool ready(const MemberId& id, const Url& url);
- /** Apply update delivered from clsuter. */
- void update(const framing::FieldTable& members, uint64_t dumper);
+ /** Apply update delivered from cluster.
+ *@return true if cluster membership changed.
+ **/
+ bool update(const framing::FieldTable& members, uint64_t dumper);
bool isMember(const MemberId& id) const { return members.find(id) != members.end(); }
@@ -72,7 +76,6 @@ class ClusterMap {
private:
friend std::ostream& operator<<(std::ostream&, const ClusterMap&);
- friend std::ostream& operator<<(std::ostream& o, const ClusterMap::Members::value_type& mv);
};
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 4aa66cce1f..0db4a01f3f 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -106,23 +106,26 @@ void Connection::closed() {
QPID_LOG(debug, "Connection closed " << *this);
if (catchUp) {
- catchUp = false;
cluster.catchUpClosed(boost::intrusive_ptr<Connection>(this));
- if (!isShadow()) connection.closed();
+ if (isShadow())
+ catchUp = false;
+ else
+ connection.closed();
}
-
- // Local network connection has closed. We need to keep the
- // connection around but replace the output handler with a
- // no-op handler as the network output handler will be
- // deleted.
- output.setOutputHandler(discardHandler);
-
- if (isLocal()) {
- // This was a local replicated connection. Multicast a deliver
- // closed and process any outstanding frames from the cluster
- // until self-delivery of deliver-close.
- cluster.mcastControl(ClusterConnectionDeliverCloseBody(), this);
- ++mcastSeq;
+ else {
+ // Local network connection has closed. We need to keep the
+ // connection around but replace the output handler with a
+ // no-op handler as the network output handler will be
+ // deleted.
+ output.setOutputHandler(discardHandler);
+
+ if (isLocal()) {
+ // This was a local replicated connection. Multicast a deliver
+ // closed and process any outstanding frames from the cluster
+ // until self-delivery of deliver-close.
+ cluster.mcastControl(ClusterConnectionDeliverCloseBody(), this);
+ ++mcastSeq;
+ }
}
}
catch (const std::exception& e) {
diff --git a/cpp/src/qpid/cluster/JoiningHandler.cpp b/cpp/src/qpid/cluster/JoiningHandler.cpp
index 664a8b38cd..8a05068796 100644
--- a/cpp/src/qpid/cluster/JoiningHandler.cpp
+++ b/cpp/src/qpid/cluster/JoiningHandler.cpp
@@ -55,10 +55,8 @@ void JoiningHandler::deliver(Event& e) {
void JoiningHandler::update(const MemberId&, const framing::FieldTable& members, uint64_t dumper) {
Mutex::ScopedLock l(cluster.lock);
- cluster.map.update(members, dumper);
- QPID_LOG(debug, "Cluster update: " << cluster.map);
+ if (cluster.map.update(members, dumper)) cluster.updateMemberStats();
checkDumpRequest();
- cluster.updateMemberStats();
}
void JoiningHandler::checkDumpRequest() {
@@ -99,9 +97,10 @@ void JoiningHandler::dumpRequest(const MemberId& dumpee, const std::string& ) {
}
}
-void JoiningHandler::ready(const MemberId& id, const std::string& url) {
+void JoiningHandler::ready(const MemberId& id, const std::string& urlStr) {
Mutex::ScopedLock l(cluster.lock);
- cluster.map.ready(id, Url(url));
+ if (cluster.map.ready(id, Url(urlStr)))
+ cluster.updateMemberStats();
checkDumpRequest();
}
diff --git a/cpp/src/qpid/cluster/MemberHandler.cpp b/cpp/src/qpid/cluster/MemberHandler.cpp
index 53a1d13a4d..5f02754443 100644
--- a/cpp/src/qpid/cluster/MemberHandler.cpp
+++ b/cpp/src/qpid/cluster/MemberHandler.cpp
@@ -52,10 +52,8 @@ void MemberHandler::deliver(Event& e) {
cluster.connectionEventQueue.push(e);
}
-void MemberHandler::update(const MemberId&, const framing::FieldTable& , uint64_t) {
- Mutex::ScopedLock l(cluster.lock);
- cluster.updateMemberStats();
-}
+// Updates are for new joiners.
+void MemberHandler::update(const MemberId&, const framing::FieldTable& , uint64_t) {}
void MemberHandler::dumpRequest(const MemberId& dumpee, const std::string& urlStr) {
Mutex::ScopedLock l(cluster.lock);
@@ -75,8 +73,9 @@ void MemberHandler::dumpRequest(const MemberId& dumpee, const std::string& urlSt
boost::bind(&MemberHandler::dumpError, this, _1)));
}
-void MemberHandler::ready(const MemberId& id, const std::string& url) {
- cluster.map.ready(id, Url(url));
+void MemberHandler::ready(const MemberId& id, const std::string& urlStr) {
+ if (cluster.map.ready(id, Url(urlStr)))
+ cluster.updateMemberStats();
}