diff options
author | Alan Conway <aconway@apache.org> | 2007-07-05 16:08:29 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-07-05 16:08:29 +0000 |
commit | 0865408d7fa16f913391ed9391fb13268a74d8a1 (patch) | |
tree | 3107946aae1031a960bc68921a33c9708c8bea53 /cpp/src/qpid/cluster/Cluster.cpp | |
parent | 4e60e87b604f8959907a329a36264f98debc9536 (diff) | |
download | qpid-python-0865408d7fa16f913391ed9391fb13268a74d8a1.tar.gz |
* src/qpid/cluster/SessionFrame.cpp, .h: Wrap AMQFrame with
session UUID and direction.
* src/qpid/cluster/Cluster.cpp, .h: Use SessionFrame.
* src/qpid/framing/AMQFrame.h, .cpp: Added setBody(), inline getBody()
* src/qpid/framing/Uuid.h, .cpp: Clean up constructors, inline.
* src/qpid/framing/Buffer.h: Put/get byte*, size_T.
* src/qpid/cluster/SessionManager.cpp, .h:
- Maintain the session map.
- Handle frames from cluster, dispatch to proper channels.
- Implement HandlerUpdater for new channels and maintains
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@553543 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 54 |
1 files changed, 20 insertions, 34 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index e691ad357d..f2d1b75f3f 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -19,6 +19,7 @@ #include "Cluster.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/ClusterNotifyBody.h" +#include "qpid/framing/Uuid.h" #include "qpid/log/Statement.h" #include <boost/bind.hpp> #include <algorithm> @@ -45,24 +46,11 @@ ostream& operator <<(ostream& out, const Cluster::MemberMap& members) { return out; } -namespace { - -/** We mark the high bit of a frame's channel number to know if it's - * an incoming or outgoing frame when frames arrive via multicast. - */ -bool isOutgoing(AMQFrame& frame) { return frame.channel&CHANNEL_HIGH_BIT; } -bool isIncoming(AMQFrame& frame) { return !isOutgoing(frame); } -void markOutgoing(AMQFrame& frame) { frame.channel |= CHANNEL_HIGH_BIT; } -void markIncoming(AMQFrame&) { /*noop*/ } -void unMark(AMQFrame& frame) { frame.channel &= ~CHANNEL_HIGH_BIT; } - -} - struct Cluster::IncomingHandler : public FrameHandler { IncomingHandler(Cluster& c) : cluster(c) {} void handle(AMQFrame& frame) { - markIncoming(frame); - cluster.mcast(frame); + SessionFrame sf(Uuid(true), frame, SessionFrame::IN); + cluster.mcast(sf); } Cluster& cluster; }; @@ -70,18 +58,18 @@ struct Cluster::IncomingHandler : public FrameHandler { struct Cluster::OutgoingHandler : public FrameHandler { OutgoingHandler(Cluster& c) : cluster(c) {} void handle(AMQFrame& frame) { - markOutgoing(frame); - cluster.mcast(frame); + SessionFrame sf(Uuid(true), frame, SessionFrame::OUT); + cluster.mcast(sf); } Cluster& cluster; }; - // TODO aconway 2007-06-28: Right now everything is backed up via // multicast. When we have point-to-point backups the // Incoming/Outgoing handlers must determine where each frame should // be sent: to multicast or only to specific backup(s) via AMQP. + Cluster::Cluster(const std::string& name_, const std::string& url_) : cpg(new Cpg(*this)), name(name_), @@ -114,7 +102,7 @@ Cluster::~Cluster() { } } -void Cluster::mcast(AMQFrame& frame) { +void Cluster::mcast(SessionFrame& frame) { QPID_LOG(trace, *this << " SEND: " << frame); Buffer buf(frame.size()); frame.encode(buf); @@ -124,11 +112,9 @@ void Cluster::mcast(AMQFrame& frame) { } void Cluster::notify() { - // TODO aconway 2007-06-25: Use proxy here. - ProtocolVersion version; - AMQFrame frame(version, 0, - make_shared_ptr(new ClusterNotifyBody(version, url))); - mcast(frame); + SessionFrame sf; + sf.frame.setBody(make_shared_ptr(new ClusterNotifyBody(ProtocolVersion(), url))); + mcast(sf); } size_t Cluster::size() const { @@ -136,12 +122,13 @@ size_t Cluster::size() const { return members.size(); } -void Cluster::setFromChains(const framing::FrameHandler::Chains& chains) { +void Cluster::setReceivedChain(const SessionFrameHandler::Chain& chain) { Mutex::ScopedLock l(lock); - fromChains = chains; + receivedChain = chain; } Cluster::MemberList Cluster::getMembers() const { + // TODO aconway 2007-07-04: use read/write lock? Mutex::ScopedLock l(lock); MemberList result(members.size()); std::transform(members.begin(), members.end(), result.begin(), @@ -159,15 +146,13 @@ void Cluster::deliver( { Id from(nodeid, pid); Buffer buf(static_cast<char*>(msg), msg_len); - AMQFrame frame; + SessionFrame frame; frame.decode(buf); QPID_LOG(trace, *this << " RECV: " << frame << " from: " << from); - if (!handleClusterFrame(from, frame)) { - FrameHandler::Chain chain = isIncoming(frame) ? fromChains.in : fromChains.out; - unMark(frame); - if (chain) - chain->handle(frame); - } + if (frame.uuid.isNull()) + handleClusterFrame(from, frame.frame); + else + receivedChain->handle(frame); } bool Cluster::wait(boost::function<bool(const Cluster&)> predicate, @@ -179,7 +164,8 @@ bool Cluster::wait(boost::function<bool(const Cluster&)> predicate, ; return (predicate(*this)); } - + +// Handle cluster control frame from the null session. bool Cluster::handleClusterFrame(Id from, AMQFrame& frame) { // TODO aconway 2007-06-20: use visitor pattern here. ClusterNotifyBody* notifyIn= |