summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-06-29 17:59:00 +0000
committerAlan Conway <aconway@apache.org>2007-06-29 17:59:00 +0000
commitfda6dadde945a9c73c97b73dc79e93368b743348 (patch)
treed7755539ae485efdfbc46298cd1ef6632515159e /cpp/src/qpid/cluster/Cluster.cpp
parent79cd6c772da003ddc917eff362f9adaa99e28b49 (diff)
downloadqpid-python-fda6dadde945a9c73c97b73dc79e93368b743348.tar.gz
* Summary:
- Improved plugin framework and HandlerUpdater interface. - Cluster handlers for traffic to/from cluster. - Cluster HandlerUpdater configures channel chains for cluster. - Cluster PluginProvider registers cluster objects with broker. * src/qpid/framing/AMQFrame.h: Made data members public. Handlers need to be able to modify frame data, getters/setters are just a nuisance here. * src/tests/Cluster.cpp: Updated for cluster changes, using handlers instead of friendship to hook test into Cluster code. * src/qpid/framing/amqp_types.h: Added CHANNEL_MAX and CHANNEL_HIGH_BIT constants. * src/qpid/framing/HandlerUpdater.h: Renamed ChannelInitializer, broke dependency on broker channel types. * src/qpid/framing/Handler.h: Added constructors and nextHandler() * src/qpid/framing/AMQFrame.h (class AMQFrame): Inlined getChannel() * src/qpid/cluster/ClusterPluginProvider.cpp: Provider for cluster plugins. * src/qpid/cluster/Cluster.cpp: Use ChannelManager. Factor out plugin details to ClusterPluginProvider. * src/qpid/cluster/ChannelManager.h: Insert cluster handlers into channel chains, route frames between cluster and channels. * src/qpid/broker/BrokerAdapter.cpp (startOk): use CHANNEL_MAX constant. * src/qpid/broker/Broker.cpp: - Refactored for new plugin framework. - Added getUrl(). * src/qpid/Url.h: Added constructor from Address. * src/qpid/Plugin.h: Generalized plugin framework, broke dependency on Broker interfaces. We may want to use plug-ins for clients also at some point. * src/tests/run_test: Fix bug when VALGRIND is not set. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@551981 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp121
1 files changed, 75 insertions, 46 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 30073c4551..8d898eefa3 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -17,13 +17,13 @@
*/
#include "Cluster.h"
-#include "Cpg.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/ClusterNotifyBody.h"
#include "qpid/log/Statement.h"
#include <boost/bind.hpp>
#include <algorithm>
#include <iterator>
+#include <map>
namespace qpid {
namespace cluster {
@@ -35,40 +35,62 @@ ostream& operator <<(ostream& out, const Cluster& cluster) {
return out << cluster.name.str() << "(" << cluster.self << ")";
}
+namespace {
+Cluster::Member::Status statusMap[CPG_REASON_PROCDOWN+1];
+struct StatusMapInit {
+ StatusMapInit() {
+ statusMap[CPG_REASON_JOIN] = Cluster::Member::JOIN;
+ statusMap[CPG_REASON_LEAVE] = Cluster::Member::LEAVE;
+ statusMap[CPG_REASON_NODEDOWN] = Cluster::Member::NODEDOWN;
+ statusMap[CPG_REASON_NODEUP] = Cluster::Member::NODEUP;
+ statusMap[CPG_REASON_PROCDOWN] = Cluster::Member::PROCDOWN;
+ }
+} instance;
+}
+
+Cluster::Member::Member(const cpg_address& addr)
+ : status(statusMap[addr.reason]) {}
+
void Cluster::notify() {
+ ProtocolVersion version;
// TODO aconway 2007-06-25: Use proxy here.
AMQFrame frame(version, 0,
make_shared_ptr(new ClusterNotifyBody(version, url)));
handle(frame);
}
-Cluster::Cluster(
- const std::string& name_, const std::string& url_, FrameHandler& next_,
- ProtocolVersion ver)
- : name(name_), url(url_), version(ver),
- cpg(new Cpg(boost::bind(&Cluster::cpgDeliver, this, _1, _2, _3, _4, _5, _6),
- boost::bind(&Cluster::cpgConfigChange, this, _1, _2, _3, _4, _5, _6, _7, _8))),
- next(next_)
-{
- self=Id(cpg->getLocalNoideId(), getpid());
+Cluster::Cluster(const std::string& name_, const std::string& url_) :
+ name(name_),
+ url(url_),
+ cpg(new Cpg(
+ boost::bind(&Cluster::cpgDeliver, this, _1, _2, _3, _4, _5, _6),
+ boost::bind(&Cluster::cpgConfigChange, this, _1, _2, _3, _4, _5, _6, _7, _8))),
+ self(cpg->getLocalNoideId(), getpid())
+{}
+
+void Cluster::join(FrameHandler::Chain next) {
QPID_LOG(trace, *this << " Joining cluster.");
+ next = next;
+ dispatcher=Thread(*this);
cpg->join(name);
notify();
- dispatcher=Thread(*this);
}
Cluster::~Cluster() {
- try {
- QPID_LOG(trace, *this << " Leaving cluster.");
- cpg->leave(name);
- cpg.reset();
- dispatcher.join();
- } catch (const std::exception& e) {
- QPID_LOG(error, "Exception leaving cluster " << e.what());
+ if (cpg) {
+ try {
+ QPID_LOG(trace, *this << " Leaving cluster.");
+ cpg->leave(name);
+ cpg.reset();
+ dispatcher.join();
+ } catch (const std::exception& e) {
+ QPID_LOG(error, "Exception leaving cluster " << e.what());
+ }
}
}
void Cluster::handle(AMQFrame& frame) {
+ assert(cpg);
QPID_LOG(trace, *this << " SEND: " << frame);
Buffer buf(frame.size());
frame.encode(buf);
@@ -104,52 +126,59 @@ void Cluster::cpgDeliver(
frame.decode(buf);
QPID_LOG(trace, *this << " RECV: " << frame);
// TODO aconway 2007-06-20: use visitor pattern.
- ClusterNotifyBody* notifyIn= dynamic_cast<ClusterNotifyBody*>(frame.getBody().get());
+ ClusterNotifyBody* notifyIn=
+ dynamic_cast<ClusterNotifyBody*>(frame.getBody().get());
if (notifyIn) {
- Mutex::ScopedLock l(lock);
- members[from].reset(new Member(notifyIn->getUrl()));
- lock.notifyAll();
+ {
+ Mutex::ScopedLock l(lock);
+ assert(members[from]);
+ members[from]->url = notifyIn->getUrl();
+ members[from]->status = Member::BROKER;
+ }
+ if (callback)
+ callback();
}
else
- next.handle(frame);
+ next->handle(frame);
}
void Cluster::cpgConfigChange(
cpg_handle_t /*handle*/,
struct cpg_name */*group*/,
- struct cpg_address *ccMembers, int nMembers,
+ struct cpg_address *current, int nCurrent,
struct cpg_address *left, int nLeft,
struct cpg_address *joined, int nJoined
)
{
- QPID_LOG(
- trace,
- *this << " Configuration change. " << endl
- << " Joined: " << make_pair(joined, nJoined) << endl
- << " Left: " << make_pair(left, nLeft) << endl
- << " Current: " << make_pair(ccMembers, nMembers));
-
+ QPID_LOG(trace,
+ *this << " Configuration change. " << endl
+ << " Joined: " << make_pair(joined, nJoined) << endl
+ << " Left: " << make_pair(left, nLeft) << endl
+ << " Current: " << make_pair(current, nCurrent));
+
+ bool needNotify=false;
+ MemberList updated;
{
Mutex::ScopedLock l(lock);
- // Erase members that left.
- for (int i = 0; i < nLeft; ++i)
- members.erase(Id(left[i]));
- lock.notifyAll();
- }
-
- // If there are new members (other than myself) then notify.
- for (int i=0; i< nJoined; ++i) {
- if (Id(joined[i]) != self) {
- notify();
- break;
+ for (int i = 0; i < nJoined; ++i) {
+ Id id(current[i]);
+ members[id].reset(new Member(current[i]));
+ if (id != self)
+ needNotify = true; // Notify new members other than myself.
}
- }
-
- // Note: New members are be added to my map when cpgDeliver
- // gets a cluster.notify frame.
+ for (int i = 0; i < nLeft; ++i)
+ members.erase(Id(current[i]));
+ } // End of locked scope.
+ if (needNotify)
+ notify();
+ if (callback)
+ callback();
}
+void Cluster::setCallback(boost::function<void()> f) { callback=f; }
+
void Cluster::run() {
+ assert(cpg);
cpg->dispatchBlocking();
}