diff options
author | Alan Conway <aconway@apache.org> | 2007-07-19 21:52:24 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-07-19 21:52:24 +0000 |
commit | cb566519d58ded6704507fa5530bf901e620edf6 (patch) | |
tree | ab4b29ddd0ad2b5e9015647e379bede84163b13e /cpp/src/qpid/cluster/Cluster.cpp | |
parent | 3f900af77d5f781431dc25e307974e0fc27aa561 (diff) | |
download | qpid-python-cb566519d58ded6704507fa5530bf901e620edf6.tar.gz |
* Summary:
- Connect cluster handlers into broker handler chains.
- Progress on wiring replication.
* src/tests/cluster.mk: Temporarily disabled Cluster test.
* src/tests/Cluster.h, cpp, Cluster_child.cpp: Updated to use UUIDs.
* src/qpidd.cpp:
- Load optional libs (cluster)
- Include plugin config in options.parse.
* src/qpid/cluster/SessionManager.h:
- Create sessions, update handler chains (as HandlerUpdater)
- Handle frames from cluster.
* src/qpid/cluster/ClusterPlugin.h, .cpp:
- renamed from ClusterPluginProvider
- Create and connect Cluster and SessionManager.
- Register SessionManager as HandlerUpdater.
* src/qpid/cluster/Cluster.h, .cpp: Refactor as SessionFrameHandler.
* src/qpid/broker/Connection.cpp: Apply HandlerUpdaters.
* src/qpid/broker/Broker.h, .cpp:
- Initialize plugins
- Apply HandlerUpdaters
* src/qpid/Plugin.h, .cpp: Simplified plugin framework.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@557788 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 62 |
1 files changed, 19 insertions, 43 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index f2d1b75f3f..256378ccd5 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -32,6 +32,7 @@ using namespace qpid::framing; using namespace qpid::sys; using namespace std; + ostream& operator <<(ostream& out, const Cluster& cluster) { return out << "cluster[" << cluster.name.str() << " " << cluster.self << "]"; } @@ -46,38 +47,20 @@ ostream& operator <<(ostream& out, const Cluster::MemberMap& members) { return out; } -struct Cluster::IncomingHandler : public FrameHandler { - IncomingHandler(Cluster& c) : cluster(c) {} - void handle(AMQFrame& frame) { - SessionFrame sf(Uuid(true), frame, SessionFrame::IN); - cluster.mcast(sf); - } - Cluster& cluster; -}; - -struct Cluster::OutgoingHandler : public FrameHandler { - OutgoingHandler(Cluster& c) : cluster(c) {} - void handle(AMQFrame& frame) { - SessionFrame sf(Uuid(true), frame, SessionFrame::OUT); - cluster.mcast(sf); - } - Cluster& cluster; -}; -// TODO aconway 2007-06-28: Right now everything is backed up via -// multicast. When we have point-to-point backups the -// Incoming/Outgoing handlers must determine where each frame should -// be sent: to multicast or only to specific backup(s) via AMQP. -Cluster::Cluster(const std::string& name_, const std::string& url_) : +Cluster::Cluster( + const std::string& name_, const std::string& url_, + const SessionFrameHandler::Chain& next +) : + SessionFrameHandler(next), cpg(new Cpg(*this)), name(name_), url(url_), - self(cpg->getLocalNoideId(), getpid()), - toChains(new IncomingHandler(*this), new OutgoingHandler(*this)) + self(cpg->getLocalNoideId(), getpid()) { - QPID_LOG(trace, *this << " Joining cluster."); + QPID_LOG(trace, *this << " Joining cluster: " << name_); cpg->join(name); notify(); dispatcher=Thread(*this); @@ -102,7 +85,7 @@ Cluster::~Cluster() { } } -void Cluster::mcast(SessionFrame& frame) { +void Cluster::handle(SessionFrame& frame) { QPID_LOG(trace, *this << " SEND: " << frame); Buffer buf(frame.size()); frame.encode(buf); @@ -114,7 +97,7 @@ void Cluster::mcast(SessionFrame& frame) { void Cluster::notify() { SessionFrame sf; sf.frame.setBody(make_shared_ptr(new ClusterNotifyBody(ProtocolVersion(), url))); - mcast(sf); + handle(sf); } size_t Cluster::size() const { @@ -122,11 +105,6 @@ size_t Cluster::size() const { return members.size(); } -void Cluster::setReceivedChain(const SessionFrameHandler::Chain& chain) { - Mutex::ScopedLock l(lock); - receivedChain = chain; -} - Cluster::MemberList Cluster::getMembers() const { // TODO aconway 2007-07-04: use read/write lock? Mutex::ScopedLock l(lock); @@ -152,7 +130,7 @@ void Cluster::deliver( if (frame.uuid.isNull()) handleClusterFrame(from, frame.frame); else - receivedChain->handle(frame); + next->handle(frame); } bool Cluster::wait(boost::function<bool(const Cluster&)> predicate, @@ -166,24 +144,22 @@ bool Cluster::wait(boost::function<bool(const Cluster&)> predicate, } // Handle cluster control frame from the null session. -bool Cluster::handleClusterFrame(Id from, AMQFrame& frame) { +void Cluster::handleClusterFrame(Id from, AMQFrame& frame) { // TODO aconway 2007-06-20: use visitor pattern here. ClusterNotifyBody* notifyIn= dynamic_cast<ClusterNotifyBody*>(frame.getBody().get()); - if (notifyIn) { + assert(notifyIn); MemberList list; { Mutex::ScopedLock l(lock); - if (!members[from]) - members[from].reset(new Member(url)); + shared_ptr<Member>& member=members[from]; + if (!member) + member.reset(new Member(notifyIn->getUrl())); else - members[from]->url = notifyIn->getUrl(); - QPID_LOG(trace, *this << ": member update: " << members); + member->url = notifyIn->getUrl(); lock.notifyAll(); + QPID_LOG(trace, *this << ": members joined: " << members); } - return true; - } - return false; } void Cluster::configChange( @@ -207,7 +183,7 @@ void Cluster::configChange( // We don't record members joining here, we record them when // we get their ClusterNotify message. } - if (newMembers) + if (newMembers) // Notify new members of my presence. notify(); } |