diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 54 |
1 files changed, 20 insertions, 34 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index e691ad357d..f2d1b75f3f 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -19,6 +19,7 @@ #include "Cluster.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/ClusterNotifyBody.h" +#include "qpid/framing/Uuid.h" #include "qpid/log/Statement.h" #include <boost/bind.hpp> #include <algorithm> @@ -45,24 +46,11 @@ ostream& operator <<(ostream& out, const Cluster::MemberMap& members) { return out; } -namespace { - -/** We mark the high bit of a frame's channel number to know if it's - * an incoming or outgoing frame when frames arrive via multicast. - */ -bool isOutgoing(AMQFrame& frame) { return frame.channel&CHANNEL_HIGH_BIT; } -bool isIncoming(AMQFrame& frame) { return !isOutgoing(frame); } -void markOutgoing(AMQFrame& frame) { frame.channel |= CHANNEL_HIGH_BIT; } -void markIncoming(AMQFrame&) { /*noop*/ } -void unMark(AMQFrame& frame) { frame.channel &= ~CHANNEL_HIGH_BIT; } - -} - struct Cluster::IncomingHandler : public FrameHandler { IncomingHandler(Cluster& c) : cluster(c) {} void handle(AMQFrame& frame) { - markIncoming(frame); - cluster.mcast(frame); + SessionFrame sf(Uuid(true), frame, SessionFrame::IN); + cluster.mcast(sf); } Cluster& cluster; }; @@ -70,18 +58,18 @@ struct Cluster::IncomingHandler : public FrameHandler { struct Cluster::OutgoingHandler : public FrameHandler { OutgoingHandler(Cluster& c) : cluster(c) {} void handle(AMQFrame& frame) { - markOutgoing(frame); - cluster.mcast(frame); + SessionFrame sf(Uuid(true), frame, SessionFrame::OUT); + cluster.mcast(sf); } Cluster& cluster; }; - // TODO aconway 2007-06-28: Right now everything is backed up via // multicast. When we have point-to-point backups the // Incoming/Outgoing handlers must determine where each frame should // be sent: to multicast or only to specific backup(s) via AMQP. + Cluster::Cluster(const std::string& name_, const std::string& url_) : cpg(new Cpg(*this)), name(name_), @@ -114,7 +102,7 @@ Cluster::~Cluster() { } } -void Cluster::mcast(AMQFrame& frame) { +void Cluster::mcast(SessionFrame& frame) { QPID_LOG(trace, *this << " SEND: " << frame); Buffer buf(frame.size()); frame.encode(buf); @@ -124,11 +112,9 @@ void Cluster::mcast(AMQFrame& frame) { } void Cluster::notify() { - // TODO aconway 2007-06-25: Use proxy here. - ProtocolVersion version; - AMQFrame frame(version, 0, - make_shared_ptr(new ClusterNotifyBody(version, url))); - mcast(frame); + SessionFrame sf; + sf.frame.setBody(make_shared_ptr(new ClusterNotifyBody(ProtocolVersion(), url))); + mcast(sf); } size_t Cluster::size() const { @@ -136,12 +122,13 @@ size_t Cluster::size() const { return members.size(); } -void Cluster::setFromChains(const framing::FrameHandler::Chains& chains) { +void Cluster::setReceivedChain(const SessionFrameHandler::Chain& chain) { Mutex::ScopedLock l(lock); - fromChains = chains; + receivedChain = chain; } Cluster::MemberList Cluster::getMembers() const { + // TODO aconway 2007-07-04: use read/write lock? Mutex::ScopedLock l(lock); MemberList result(members.size()); std::transform(members.begin(), members.end(), result.begin(), @@ -159,15 +146,13 @@ void Cluster::deliver( { Id from(nodeid, pid); Buffer buf(static_cast<char*>(msg), msg_len); - AMQFrame frame; + SessionFrame frame; frame.decode(buf); QPID_LOG(trace, *this << " RECV: " << frame << " from: " << from); - if (!handleClusterFrame(from, frame)) { - FrameHandler::Chain chain = isIncoming(frame) ? fromChains.in : fromChains.out; - unMark(frame); - if (chain) - chain->handle(frame); - } + if (frame.uuid.isNull()) + handleClusterFrame(from, frame.frame); + else + receivedChain->handle(frame); } bool Cluster::wait(boost::function<bool(const Cluster&)> predicate, @@ -179,7 +164,8 @@ bool Cluster::wait(boost::function<bool(const Cluster&)> predicate, ; return (predicate(*this)); } - + +// Handle cluster control frame from the null session. bool Cluster::handleClusterFrame(Id from, AMQFrame& frame) { // TODO aconway 2007-06-20: use visitor pattern here. ClusterNotifyBody* notifyIn= |