summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-07-05 16:08:29 +0000
committerAlan Conway <aconway@apache.org>2007-07-05 16:08:29 +0000
commit0865408d7fa16f913391ed9391fb13268a74d8a1 (patch)
tree3107946aae1031a960bc68921a33c9708c8bea53 /cpp/src/qpid/cluster/Cluster.cpp
parent4e60e87b604f8959907a329a36264f98debc9536 (diff)
downloadqpid-python-0865408d7fa16f913391ed9391fb13268a74d8a1.tar.gz
* src/qpid/cluster/SessionFrame.cpp, .h: Wrap AMQFrame with
session UUID and direction. * src/qpid/cluster/Cluster.cpp, .h: Use SessionFrame. * src/qpid/framing/AMQFrame.h, .cpp: Added setBody(), inline getBody() * src/qpid/framing/Uuid.h, .cpp: Clean up constructors, inline. * src/qpid/framing/Buffer.h: Put/get byte*, size_T. * src/qpid/cluster/SessionManager.cpp, .h: - Maintain the session map. - Handle frames from cluster, dispatch to proper channels. - Implement HandlerUpdater for new channels and maintains git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@553543 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp54
1 files changed, 20 insertions, 34 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index e691ad357d..f2d1b75f3f 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -19,6 +19,7 @@
#include "Cluster.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/ClusterNotifyBody.h"
+#include "qpid/framing/Uuid.h"
#include "qpid/log/Statement.h"
#include <boost/bind.hpp>
#include <algorithm>
@@ -45,24 +46,11 @@ ostream& operator <<(ostream& out, const Cluster::MemberMap& members) {
return out;
}
-namespace {
-
-/** We mark the high bit of a frame's channel number to know if it's
- * an incoming or outgoing frame when frames arrive via multicast.
- */
-bool isOutgoing(AMQFrame& frame) { return frame.channel&CHANNEL_HIGH_BIT; }
-bool isIncoming(AMQFrame& frame) { return !isOutgoing(frame); }
-void markOutgoing(AMQFrame& frame) { frame.channel |= CHANNEL_HIGH_BIT; }
-void markIncoming(AMQFrame&) { /*noop*/ }
-void unMark(AMQFrame& frame) { frame.channel &= ~CHANNEL_HIGH_BIT; }
-
-}
-
struct Cluster::IncomingHandler : public FrameHandler {
IncomingHandler(Cluster& c) : cluster(c) {}
void handle(AMQFrame& frame) {
- markIncoming(frame);
- cluster.mcast(frame);
+ SessionFrame sf(Uuid(true), frame, SessionFrame::IN);
+ cluster.mcast(sf);
}
Cluster& cluster;
};
@@ -70,18 +58,18 @@ struct Cluster::IncomingHandler : public FrameHandler {
struct Cluster::OutgoingHandler : public FrameHandler {
OutgoingHandler(Cluster& c) : cluster(c) {}
void handle(AMQFrame& frame) {
- markOutgoing(frame);
- cluster.mcast(frame);
+ SessionFrame sf(Uuid(true), frame, SessionFrame::OUT);
+ cluster.mcast(sf);
}
Cluster& cluster;
};
-
// TODO aconway 2007-06-28: Right now everything is backed up via
// multicast. When we have point-to-point backups the
// Incoming/Outgoing handlers must determine where each frame should
// be sent: to multicast or only to specific backup(s) via AMQP.
+
Cluster::Cluster(const std::string& name_, const std::string& url_) :
cpg(new Cpg(*this)),
name(name_),
@@ -114,7 +102,7 @@ Cluster::~Cluster() {
}
}
-void Cluster::mcast(AMQFrame& frame) {
+void Cluster::mcast(SessionFrame& frame) {
QPID_LOG(trace, *this << " SEND: " << frame);
Buffer buf(frame.size());
frame.encode(buf);
@@ -124,11 +112,9 @@ void Cluster::mcast(AMQFrame& frame) {
}
void Cluster::notify() {
- // TODO aconway 2007-06-25: Use proxy here.
- ProtocolVersion version;
- AMQFrame frame(version, 0,
- make_shared_ptr(new ClusterNotifyBody(version, url)));
- mcast(frame);
+ SessionFrame sf;
+ sf.frame.setBody(make_shared_ptr(new ClusterNotifyBody(ProtocolVersion(), url)));
+ mcast(sf);
}
size_t Cluster::size() const {
@@ -136,12 +122,13 @@ size_t Cluster::size() const {
return members.size();
}
-void Cluster::setFromChains(const framing::FrameHandler::Chains& chains) {
+void Cluster::setReceivedChain(const SessionFrameHandler::Chain& chain) {
Mutex::ScopedLock l(lock);
- fromChains = chains;
+ receivedChain = chain;
}
Cluster::MemberList Cluster::getMembers() const {
+ // TODO aconway 2007-07-04: use read/write lock?
Mutex::ScopedLock l(lock);
MemberList result(members.size());
std::transform(members.begin(), members.end(), result.begin(),
@@ -159,15 +146,13 @@ void Cluster::deliver(
{
Id from(nodeid, pid);
Buffer buf(static_cast<char*>(msg), msg_len);
- AMQFrame frame;
+ SessionFrame frame;
frame.decode(buf);
QPID_LOG(trace, *this << " RECV: " << frame << " from: " << from);
- if (!handleClusterFrame(from, frame)) {
- FrameHandler::Chain chain = isIncoming(frame) ? fromChains.in : fromChains.out;
- unMark(frame);
- if (chain)
- chain->handle(frame);
- }
+ if (frame.uuid.isNull())
+ handleClusterFrame(from, frame.frame);
+ else
+ receivedChain->handle(frame);
}
bool Cluster::wait(boost::function<bool(const Cluster&)> predicate,
@@ -179,7 +164,8 @@ bool Cluster::wait(boost::function<bool(const Cluster&)> predicate,
;
return (predicate(*this));
}
-
+
+// Handle cluster control frame from the null session.
bool Cluster::handleClusterFrame(Id from, AMQFrame& frame) {
// TODO aconway 2007-06-20: use visitor pattern here.
ClusterNotifyBody* notifyIn=