summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp197
1 files changed, 122 insertions, 75 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 8d898eefa3..e691ad357d 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -32,65 +32,89 @@ using namespace qpid::sys;
using namespace std;
ostream& operator <<(ostream& out, const Cluster& cluster) {
- return out << cluster.name.str() << "(" << cluster.self << ")";
+ return out << "cluster[" << cluster.name.str() << " " << cluster.self << "]";
}
-namespace {
-Cluster::Member::Status statusMap[CPG_REASON_PROCDOWN+1];
-struct StatusMapInit {
- StatusMapInit() {
- statusMap[CPG_REASON_JOIN] = Cluster::Member::JOIN;
- statusMap[CPG_REASON_LEAVE] = Cluster::Member::LEAVE;
- statusMap[CPG_REASON_NODEDOWN] = Cluster::Member::NODEDOWN;
- statusMap[CPG_REASON_NODEUP] = Cluster::Member::NODEUP;
- statusMap[CPG_REASON_PROCDOWN] = Cluster::Member::PROCDOWN;
- }
-} instance;
+ostream& operator<<(ostream& out, const Cluster::MemberMap::value_type& m) {
+ return out << m.first << "=" << m.second->url;
}
-Cluster::Member::Member(const cpg_address& addr)
- : status(statusMap[addr.reason]) {}
+ostream& operator <<(ostream& out, const Cluster::MemberMap& members) {
+ ostream_iterator<Cluster::MemberMap::value_type> o(out, " ");
+ copy(members.begin(), members.end(), o);
+ return out;
+}
+
+namespace {
+
+/** We mark the high bit of a frame's channel number to know if it's
+ * an incoming or outgoing frame when frames arrive via multicast.
+ */
+bool isOutgoing(AMQFrame& frame) { return frame.channel&CHANNEL_HIGH_BIT; }
+bool isIncoming(AMQFrame& frame) { return !isOutgoing(frame); }
+void markOutgoing(AMQFrame& frame) { frame.channel |= CHANNEL_HIGH_BIT; }
+void markIncoming(AMQFrame&) { /*noop*/ }
+void unMark(AMQFrame& frame) { frame.channel &= ~CHANNEL_HIGH_BIT; }
-void Cluster::notify() {
- ProtocolVersion version;
- // TODO aconway 2007-06-25: Use proxy here.
- AMQFrame frame(version, 0,
- make_shared_ptr(new ClusterNotifyBody(version, url)));
- handle(frame);
}
+struct Cluster::IncomingHandler : public FrameHandler {
+ IncomingHandler(Cluster& c) : cluster(c) {}
+ void handle(AMQFrame& frame) {
+ markIncoming(frame);
+ cluster.mcast(frame);
+ }
+ Cluster& cluster;
+};
+
+struct Cluster::OutgoingHandler : public FrameHandler {
+ OutgoingHandler(Cluster& c) : cluster(c) {}
+ void handle(AMQFrame& frame) {
+ markOutgoing(frame);
+ cluster.mcast(frame);
+ }
+ Cluster& cluster;
+};
+
+
+// TODO aconway 2007-06-28: Right now everything is backed up via
+// multicast. When we have point-to-point backups the
+// Incoming/Outgoing handlers must determine where each frame should
+// be sent: to multicast or only to specific backup(s) via AMQP.
+
Cluster::Cluster(const std::string& name_, const std::string& url_) :
+ cpg(new Cpg(*this)),
name(name_),
url(url_),
- cpg(new Cpg(
- boost::bind(&Cluster::cpgDeliver, this, _1, _2, _3, _4, _5, _6),
- boost::bind(&Cluster::cpgConfigChange, this, _1, _2, _3, _4, _5, _6, _7, _8))),
- self(cpg->getLocalNoideId(), getpid())
-{}
-
-void Cluster::join(FrameHandler::Chain next) {
+ self(cpg->getLocalNoideId(), getpid()),
+ toChains(new IncomingHandler(*this), new OutgoingHandler(*this))
+{
QPID_LOG(trace, *this << " Joining cluster.");
- next = next;
- dispatcher=Thread(*this);
cpg->join(name);
notify();
+ dispatcher=Thread(*this);
+ // Wait till we show up in the cluster map.
+ {
+ Mutex::ScopedLock l(lock);
+ while (empty())
+ lock.wait();
+ }
}
Cluster::~Cluster() {
- if (cpg) {
- try {
- QPID_LOG(trace, *this << " Leaving cluster.");
- cpg->leave(name);
- cpg.reset();
- dispatcher.join();
- } catch (const std::exception& e) {
- QPID_LOG(error, "Exception leaving cluster " << e.what());
- }
+ QPID_LOG(trace, *this << " Leaving cluster.");
+ try {
+ cpg->leave(name);
+ cpg.reset();
+ dispatcher.join();
+ }
+ catch (const std::exception& e) {
+ QPID_LOG(error, "Exception leaving cluster " << *this << ": "
+ << e.what());
}
}
-void Cluster::handle(AMQFrame& frame) {
- assert(cpg);
+void Cluster::mcast(AMQFrame& frame) {
QPID_LOG(trace, *this << " SEND: " << frame);
Buffer buf(frame.size());
frame.encode(buf);
@@ -99,11 +123,24 @@ void Cluster::handle(AMQFrame& frame) {
cpg->mcast(name, &iov, 1);
}
+void Cluster::notify() {
+ // TODO aconway 2007-06-25: Use proxy here.
+ ProtocolVersion version;
+ AMQFrame frame(version, 0,
+ make_shared_ptr(new ClusterNotifyBody(version, url)));
+ mcast(frame);
+}
+
size_t Cluster::size() const {
Mutex::ScopedLock l(lock);
return members.size();
}
+void Cluster::setFromChains(const framing::FrameHandler::Chains& chains) {
+ Mutex::ScopedLock l(lock);
+ fromChains = chains;
+}
+
Cluster::MemberList Cluster::getMembers() const {
Mutex::ScopedLock l(lock);
MemberList result(members.size());
@@ -112,7 +149,7 @@ Cluster::MemberList Cluster::getMembers() const {
return result;
}
-void Cluster::cpgDeliver(
+void Cluster::deliver(
cpg_handle_t /*handle*/,
struct cpg_name* /* group */,
uint32_t nodeid,
@@ -124,61 +161,71 @@ void Cluster::cpgDeliver(
Buffer buf(static_cast<char*>(msg), msg_len);
AMQFrame frame;
frame.decode(buf);
- QPID_LOG(trace, *this << " RECV: " << frame);
- // TODO aconway 2007-06-20: use visitor pattern.
+ QPID_LOG(trace, *this << " RECV: " << frame << " from: " << from);
+ if (!handleClusterFrame(from, frame)) {
+ FrameHandler::Chain chain = isIncoming(frame) ? fromChains.in : fromChains.out;
+ unMark(frame);
+ if (chain)
+ chain->handle(frame);
+ }
+}
+
+bool Cluster::wait(boost::function<bool(const Cluster&)> predicate,
+ Duration timeout) const
+{
+ AbsTime deadline(now(), timeout);
+ Mutex::ScopedLock l(lock);
+ while (!predicate(*this) && lock.wait(deadline))
+ ;
+ return (predicate(*this));
+}
+
+bool Cluster::handleClusterFrame(Id from, AMQFrame& frame) {
+ // TODO aconway 2007-06-20: use visitor pattern here.
ClusterNotifyBody* notifyIn=
dynamic_cast<ClusterNotifyBody*>(frame.getBody().get());
if (notifyIn) {
+ MemberList list;
{
Mutex::ScopedLock l(lock);
- assert(members[from]);
- members[from]->url = notifyIn->getUrl();
- members[from]->status = Member::BROKER;
+ if (!members[from])
+ members[from].reset(new Member(url));
+ else
+ members[from]->url = notifyIn->getUrl();
+ QPID_LOG(trace, *this << ": member update: " << members);
+ lock.notifyAll();
}
- if (callback)
- callback();
+ return true;
}
- else
- next->handle(frame);
+ return false;
}
-void Cluster::cpgConfigChange(
+void Cluster::configChange(
cpg_handle_t /*handle*/,
struct cpg_name */*group*/,
- struct cpg_address *current, int nCurrent,
+ struct cpg_address */*current*/, int /*nCurrent*/,
struct cpg_address *left, int nLeft,
- struct cpg_address *joined, int nJoined
-)
+ struct cpg_address *joined, int nJoined)
{
- QPID_LOG(trace,
- *this << " Configuration change. " << endl
- << " Joined: " << make_pair(joined, nJoined) << endl
- << " Left: " << make_pair(left, nLeft) << endl
- << " Current: " << make_pair(current, nCurrent));
-
- bool needNotify=false;
+ bool newMembers=false;
MemberList updated;
{
Mutex::ScopedLock l(lock);
- for (int i = 0; i < nJoined; ++i) {
- Id id(current[i]);
- members[id].reset(new Member(current[i]));
- if (id != self)
- needNotify = true; // Notify new members other than myself.
+ if (nLeft) {
+ for (int i = 0; i < nLeft; ++i)
+ members.erase(Id(left[i]));
+ QPID_LOG(trace, *this << ": members left: " << members);
+ lock.notifyAll();
}
- for (int i = 0; i < nLeft; ++i)
- members.erase(Id(current[i]));
- } // End of locked scope.
- if (needNotify)
+ newMembers = nJoined > 1 || (nJoined==1 && Id(joined[0]) != self);
+ // We don't record members joining here, we record them when
+ // we get their ClusterNotify message.
+ }
+ if (newMembers)
notify();
- if (callback)
- callback();
}
-void Cluster::setCallback(boost::function<void()> f) { callback=f; }
-
void Cluster::run() {
- assert(cpg);
cpg->dispatchBlocking();
}