summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-07-19 21:52:24 +0000
committerAlan Conway <aconway@apache.org>2007-07-19 21:52:24 +0000
commitcb566519d58ded6704507fa5530bf901e620edf6 (patch)
treeab4b29ddd0ad2b5e9015647e379bede84163b13e /cpp/src/qpid/cluster/Cluster.cpp
parent3f900af77d5f781431dc25e307974e0fc27aa561 (diff)
downloadqpid-python-cb566519d58ded6704507fa5530bf901e620edf6.tar.gz
* Summary:
- Connect cluster handlers into broker handler chains. - Progress on wiring replication. * src/tests/cluster.mk: Temporarily disabled Cluster test. * src/tests/Cluster.h, cpp, Cluster_child.cpp: Updated to use UUIDs. * src/qpidd.cpp: - Load optional libs (cluster) - Include plugin config in options.parse. * src/qpid/cluster/SessionManager.h: - Create sessions, update handler chains (as HandlerUpdater) - Handle frames from cluster. * src/qpid/cluster/ClusterPlugin.h, .cpp: - renamed from ClusterPluginProvider - Create and connect Cluster and SessionManager. - Register SessionManager as HandlerUpdater. * src/qpid/cluster/Cluster.h, .cpp: Refactor as SessionFrameHandler. * src/qpid/broker/Connection.cpp: Apply HandlerUpdaters. * src/qpid/broker/Broker.h, .cpp: - Initialize plugins - Apply HandlerUpdaters * src/qpid/Plugin.h, .cpp: Simplified plugin framework. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@557788 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp62
1 files changed, 19 insertions, 43 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index f2d1b75f3f..256378ccd5 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -32,6 +32,7 @@ using namespace qpid::framing;
using namespace qpid::sys;
using namespace std;
+
ostream& operator <<(ostream& out, const Cluster& cluster) {
return out << "cluster[" << cluster.name.str() << " " << cluster.self << "]";
}
@@ -46,38 +47,20 @@ ostream& operator <<(ostream& out, const Cluster::MemberMap& members) {
return out;
}
-struct Cluster::IncomingHandler : public FrameHandler {
- IncomingHandler(Cluster& c) : cluster(c) {}
- void handle(AMQFrame& frame) {
- SessionFrame sf(Uuid(true), frame, SessionFrame::IN);
- cluster.mcast(sf);
- }
- Cluster& cluster;
-};
-
-struct Cluster::OutgoingHandler : public FrameHandler {
- OutgoingHandler(Cluster& c) : cluster(c) {}
- void handle(AMQFrame& frame) {
- SessionFrame sf(Uuid(true), frame, SessionFrame::OUT);
- cluster.mcast(sf);
- }
- Cluster& cluster;
-};
-// TODO aconway 2007-06-28: Right now everything is backed up via
-// multicast. When we have point-to-point backups the
-// Incoming/Outgoing handlers must determine where each frame should
-// be sent: to multicast or only to specific backup(s) via AMQP.
-Cluster::Cluster(const std::string& name_, const std::string& url_) :
+Cluster::Cluster(
+ const std::string& name_, const std::string& url_,
+ const SessionFrameHandler::Chain& next
+) :
+ SessionFrameHandler(next),
cpg(new Cpg(*this)),
name(name_),
url(url_),
- self(cpg->getLocalNoideId(), getpid()),
- toChains(new IncomingHandler(*this), new OutgoingHandler(*this))
+ self(cpg->getLocalNoideId(), getpid())
{
- QPID_LOG(trace, *this << " Joining cluster.");
+ QPID_LOG(trace, *this << " Joining cluster: " << name_);
cpg->join(name);
notify();
dispatcher=Thread(*this);
@@ -102,7 +85,7 @@ Cluster::~Cluster() {
}
}
-void Cluster::mcast(SessionFrame& frame) {
+void Cluster::handle(SessionFrame& frame) {
QPID_LOG(trace, *this << " SEND: " << frame);
Buffer buf(frame.size());
frame.encode(buf);
@@ -114,7 +97,7 @@ void Cluster::mcast(SessionFrame& frame) {
void Cluster::notify() {
SessionFrame sf;
sf.frame.setBody(make_shared_ptr(new ClusterNotifyBody(ProtocolVersion(), url)));
- mcast(sf);
+ handle(sf);
}
size_t Cluster::size() const {
@@ -122,11 +105,6 @@ size_t Cluster::size() const {
return members.size();
}
-void Cluster::setReceivedChain(const SessionFrameHandler::Chain& chain) {
- Mutex::ScopedLock l(lock);
- receivedChain = chain;
-}
-
Cluster::MemberList Cluster::getMembers() const {
// TODO aconway 2007-07-04: use read/write lock?
Mutex::ScopedLock l(lock);
@@ -152,7 +130,7 @@ void Cluster::deliver(
if (frame.uuid.isNull())
handleClusterFrame(from, frame.frame);
else
- receivedChain->handle(frame);
+ next->handle(frame);
}
bool Cluster::wait(boost::function<bool(const Cluster&)> predicate,
@@ -166,24 +144,22 @@ bool Cluster::wait(boost::function<bool(const Cluster&)> predicate,
}
// Handle cluster control frame from the null session.
-bool Cluster::handleClusterFrame(Id from, AMQFrame& frame) {
+void Cluster::handleClusterFrame(Id from, AMQFrame& frame) {
// TODO aconway 2007-06-20: use visitor pattern here.
ClusterNotifyBody* notifyIn=
dynamic_cast<ClusterNotifyBody*>(frame.getBody().get());
- if (notifyIn) {
+ assert(notifyIn);
MemberList list;
{
Mutex::ScopedLock l(lock);
- if (!members[from])
- members[from].reset(new Member(url));
+ shared_ptr<Member>& member=members[from];
+ if (!member)
+ member.reset(new Member(notifyIn->getUrl()));
else
- members[from]->url = notifyIn->getUrl();
- QPID_LOG(trace, *this << ": member update: " << members);
+ member->url = notifyIn->getUrl();
lock.notifyAll();
+ QPID_LOG(trace, *this << ": members joined: " << members);
}
- return true;
- }
- return false;
}
void Cluster::configChange(
@@ -207,7 +183,7 @@ void Cluster::configChange(
// We don't record members joining here, we record them when
// we get their ClusterNotify message.
}
- if (newMembers)
+ if (newMembers) // Notify new members of my presence.
notify();
}