diff options
author | Alan Conway <aconway@apache.org> | 2008-09-26 19:22:00 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-09-26 19:22:00 +0000 |
commit | 44366590caa13db09e55e2c853bd66b363558fa7 (patch) | |
tree | e6ecd10c956fee5f5e865670911ea9ef46e1ed76 | |
parent | b22dd47558cc11572d080ac25808012092dda597 (diff) | |
download | qpid-python-44366590caa13db09e55e2c853bd66b363558fa7.tar.gz |
cluster:]
- call updateMemberStats() exactly once for each change in cluster membership.
- fix spurious replication of catch-up connection close events.
Removed unused client/MessageQueue.h
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@699456 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/src/Makefile.am | 1 | ||||
-rw-r--r-- | cpp/src/qpid/client/MessageQueue.h | 50 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ClusterMap.cpp | 49 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ClusterMap.h | 15 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 33 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/JoiningHandler.cpp | 9 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/MemberHandler.cpp | 11 |
8 files changed, 70 insertions, 104 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 52087cbfaa..037a284822 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -507,7 +507,6 @@ nobase_include_HEADERS = \ qpid/client/LocalQueue.h \ qpid/client/Message.h \ qpid/client/MessageListener.h \ - qpid/client/MessageQueue.h \ qpid/client/Results.h \ qpid/client/SessionBase_0_10.h \ qpid/client/Session.h \ diff --git a/cpp/src/qpid/client/MessageQueue.h b/cpp/src/qpid/client/MessageQueue.h deleted file mode 100644 index ab6d351ba7..0000000000 --- a/cpp/src/qpid/client/MessageQueue.h +++ /dev/null @@ -1,50 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#ifndef _MessageQueue_ -#define _MessageQueue_ -#include <iostream> -#include "qpid/sys/BlockingQueue.h" -#include "MessageListener.h" - -namespace qpid { -namespace client { - -/** - * A MessageListener implementation that queues up - * messages. - * - * \ingroup clientapi - */ -class MessageQueue : public sys::BlockingQueue<Message>, public MessageListener -{ - public: - void received(Message& msg) - { - push(msg); - } -}; - -} -} - - -#endif diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index c1775616a8..9cd8d1842c 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -248,7 +248,7 @@ void Cluster::configChange( cpg_address *joined, int nJoined) { Mutex::ScopedLock l(lock); - QPID_LOG(debug, "CPG members: " << AddrList(current, nCurrent) + QPID_LOG(debug, "Process members: " << AddrList(current, nCurrent) << AddrList(left, nLeft, "( ", ")")); if (find(left, left+nLeft, self) != left+nLeft) { @@ -258,7 +258,7 @@ void Cluster::configChange( return; } - map.left(left, nLeft); + if (map.left(left, nLeft)) updateMemberStats(); handler->configChange(current, nCurrent, left, nLeft, joined, nJoined); } @@ -326,7 +326,7 @@ void Cluster::stopFullCluster(void) { mcastControl(ClusterShutdownBody(), 0); } -void Cluster::updateMemberStats(void) { +void Cluster::updateMemberStats() { if (mgmtObject) { mgmtObject->set_clusterSize(size()); std::vector<Url> vectUrl = getUrls(); diff --git a/cpp/src/qpid/cluster/ClusterMap.cpp b/cpp/src/qpid/cluster/ClusterMap.cpp index 20d6fbb21e..c721749aba 100644 --- a/cpp/src/qpid/cluster/ClusterMap.cpp +++ b/cpp/src/qpid/cluster/ClusterMap.cpp @@ -39,11 +39,14 @@ MemberId ClusterMap::first() const { return (members.empty()) ? MemberId() : members.begin()->first; } -void ClusterMap::left(const cpg_address* addrs, size_t size) { - size_t (Members::*erase)(const MemberId&) = &Members::erase; - std::for_each(addrs, addrs+size, boost::bind(erase, &members, _1)); +bool ClusterMap::left(const cpg_address* addrs, size_t nLeft) { + bool changed=false; + for (const cpg_address* a = addrs; a < addrs+nLeft; ++a) + changed = members.erase(*a) || changed; if (dumper && !isMember(dumper)) dumper = MemberId(); + QPID_LOG_IF(debug, changed, *this); + return changed; } framing::ClusterUpdateBody ClusterMap::toControl() const { @@ -54,11 +57,17 @@ framing::ClusterUpdateBody ClusterMap::toControl() const { return b; } -void ClusterMap::update(const framing::FieldTable& ftMembers, uint64_t dumper_) { - framing:: FieldTable::ValueMap::const_iterator i; - for (i = ftMembers.begin(); i != ftMembers.end(); ++i) - members[i->first] = Url(i->second->get<std::string>()); +bool ClusterMap::update(const framing::FieldTable& ftMembers, uint64_t dumper_) { dumper = MemberId(dumper_); + bool changed = false; + framing:: FieldTable::ValueMap::const_iterator i; + for (i = ftMembers.begin(); i != ftMembers.end(); ++i) { + MemberId id(i->first); + Url url(i->second->get<std::string>()); + changed = members.insert(Members::value_type(id, url)).second || changed; + } + QPID_LOG_IF(debug, changed, *this); + return changed; } std::vector<Url> ClusterMap::memberUrls() const { @@ -68,14 +77,12 @@ std::vector<Url> ClusterMap::memberUrls() const { return result; } -std::ostream& operator<<(std::ostream& o, const ClusterMap::Members::value_type& mv) { - return o << mv.first << "=" << mv.second; -} - std::ostream& operator<<(std::ostream& o, const ClusterMap& m) { - std::ostream_iterator<ClusterMap::Members::value_type> im(o, "\n "); - o << "dumper=" << m.dumper << ", members:\n "; - std::copy(m.members.begin(), m.members.end(), im); + o << "Broker members:"; + for (ClusterMap::Members::const_iterator i=m.members.begin(); i != m.members.end(); ++i) { + o << " " << i->first; + if (i->first == m.dumper) o << "(dumping)"; + } return o; } @@ -83,11 +90,17 @@ bool ClusterMap::sendUpdate(const MemberId& id) const { return dumper==id || (!dumper && first() == id); } -void ClusterMap::ready(const MemberId& id, const Url& url) { - members[id] = url; - if (id == dumper) +bool ClusterMap::ready(const MemberId& id, const Url& url) { + bool changed = members.insert(Members::value_type(id,url)).second; + if (id == dumper) { dumper = MemberId(); - QPID_LOG(info, id << " joined cluster: " << *this); + QPID_LOG(info, id << " finished dump."); + } + else { + QPID_LOG(info, id << " joined cluster, url=" << url); + } + QPID_LOG_IF(debug, changed, *this); + return changed; } }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/ClusterMap.h b/cpp/src/qpid/cluster/ClusterMap.h index b00f818f88..60fef75f0e 100644 --- a/cpp/src/qpid/cluster/ClusterMap.h +++ b/cpp/src/qpid/cluster/ClusterMap.h @@ -50,17 +50,21 @@ class ClusterMap { /** First member of the cluster in ID order, gets to perform one-off tasks. */ MemberId first() const; - /** Update for members leaving. */ - void left(const cpg_address* addrs, size_t size); + /** Update for members leaving. + *@return true if the cluster membership changed. + */ + bool left(const cpg_address* addrs, size_t size); /** Convert map contents to a cluster update body. */ framing::ClusterUpdateBody toControl() const; /** Add a new member or dump complete if id == dumper. */ - void ready(const MemberId& id, const Url& url); + bool ready(const MemberId& id, const Url& url); - /** Apply update delivered from clsuter. */ - void update(const framing::FieldTable& members, uint64_t dumper); + /** Apply update delivered from cluster. + *@return true if cluster membership changed. + **/ + bool update(const framing::FieldTable& members, uint64_t dumper); bool isMember(const MemberId& id) const { return members.find(id) != members.end(); } @@ -72,7 +76,6 @@ class ClusterMap { private: friend std::ostream& operator<<(std::ostream&, const ClusterMap&); - friend std::ostream& operator<<(std::ostream& o, const ClusterMap::Members::value_type& mv); }; }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 4aa66cce1f..0db4a01f3f 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -106,23 +106,26 @@ void Connection::closed() { QPID_LOG(debug, "Connection closed " << *this); if (catchUp) { - catchUp = false; cluster.catchUpClosed(boost::intrusive_ptr<Connection>(this)); - if (!isShadow()) connection.closed(); + if (isShadow()) + catchUp = false; + else + connection.closed(); } - - // Local network connection has closed. We need to keep the - // connection around but replace the output handler with a - // no-op handler as the network output handler will be - // deleted. - output.setOutputHandler(discardHandler); - - if (isLocal()) { - // This was a local replicated connection. Multicast a deliver - // closed and process any outstanding frames from the cluster - // until self-delivery of deliver-close. - cluster.mcastControl(ClusterConnectionDeliverCloseBody(), this); - ++mcastSeq; + else { + // Local network connection has closed. We need to keep the + // connection around but replace the output handler with a + // no-op handler as the network output handler will be + // deleted. + output.setOutputHandler(discardHandler); + + if (isLocal()) { + // This was a local replicated connection. Multicast a deliver + // closed and process any outstanding frames from the cluster + // until self-delivery of deliver-close. + cluster.mcastControl(ClusterConnectionDeliverCloseBody(), this); + ++mcastSeq; + } } } catch (const std::exception& e) { diff --git a/cpp/src/qpid/cluster/JoiningHandler.cpp b/cpp/src/qpid/cluster/JoiningHandler.cpp index 664a8b38cd..8a05068796 100644 --- a/cpp/src/qpid/cluster/JoiningHandler.cpp +++ b/cpp/src/qpid/cluster/JoiningHandler.cpp @@ -55,10 +55,8 @@ void JoiningHandler::deliver(Event& e) { void JoiningHandler::update(const MemberId&, const framing::FieldTable& members, uint64_t dumper) { Mutex::ScopedLock l(cluster.lock); - cluster.map.update(members, dumper); - QPID_LOG(debug, "Cluster update: " << cluster.map); + if (cluster.map.update(members, dumper)) cluster.updateMemberStats(); checkDumpRequest(); - cluster.updateMemberStats(); } void JoiningHandler::checkDumpRequest() { @@ -99,9 +97,10 @@ void JoiningHandler::dumpRequest(const MemberId& dumpee, const std::string& ) { } } -void JoiningHandler::ready(const MemberId& id, const std::string& url) { +void JoiningHandler::ready(const MemberId& id, const std::string& urlStr) { Mutex::ScopedLock l(cluster.lock); - cluster.map.ready(id, Url(url)); + if (cluster.map.ready(id, Url(urlStr))) + cluster.updateMemberStats(); checkDumpRequest(); } diff --git a/cpp/src/qpid/cluster/MemberHandler.cpp b/cpp/src/qpid/cluster/MemberHandler.cpp index 53a1d13a4d..5f02754443 100644 --- a/cpp/src/qpid/cluster/MemberHandler.cpp +++ b/cpp/src/qpid/cluster/MemberHandler.cpp @@ -52,10 +52,8 @@ void MemberHandler::deliver(Event& e) { cluster.connectionEventQueue.push(e); } -void MemberHandler::update(const MemberId&, const framing::FieldTable& , uint64_t) { - Mutex::ScopedLock l(cluster.lock); - cluster.updateMemberStats(); -} +// Updates are for new joiners. +void MemberHandler::update(const MemberId&, const framing::FieldTable& , uint64_t) {} void MemberHandler::dumpRequest(const MemberId& dumpee, const std::string& urlStr) { Mutex::ScopedLock l(cluster.lock); @@ -75,8 +73,9 @@ void MemberHandler::dumpRequest(const MemberId& dumpee, const std::string& urlSt boost::bind(&MemberHandler::dumpError, this, _1))); } -void MemberHandler::ready(const MemberId& id, const std::string& url) { - cluster.map.ready(id, Url(url)); +void MemberHandler::ready(const MemberId& id, const std::string& urlStr) { + if (cluster.map.ready(id, Url(urlStr))) + cluster.updateMemberStats(); } |