summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp187
1 files changed, 64 insertions, 123 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index e64d80e214..53f0ccc08c 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -23,8 +23,6 @@
#include "qpid/broker/SessionState.h"
#include "qpid/broker/Connection.h"
#include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/AMQP_AllOperations.h"
-#include "qpid/framing/AllInvoker.h"
#include "qpid/framing/ClusterDumpRequestBody.h"
#include "qpid/framing/ClusterUpdateBody.h"
#include "qpid/framing/ClusterReadyBody.h"
@@ -55,17 +53,6 @@ using qpid::management::Manageable;
using qpid::management::Args;
namespace _qmf = qmf::org::apache::qpid::cluster;
-struct ClusterOperations : public AMQP_AllOperations::ClusterHandler {
- Cluster& cluster;
- MemberId member;
- ClusterOperations(Cluster& c, const MemberId& id) : cluster(c), member(id) {}
- bool invoke(AMQFrame& f) { return framing::invoke(*this, *f.getBody()).wasHandled(); }
-
- void update(const FieldTable& members, uint64_t dumping) { cluster.update(members, dumping); }
- void dumpRequest(const std::string& url) { cluster.dumpRequest(member, url); }
- void ready(const std::string& url) { cluster.ready(member, url); }
-};
-
Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
broker(b),
poller(b.getPoller()),
@@ -79,16 +66,18 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
boost::bind(&Cluster::disconnect, this, _1) // disconnect
),
connectionEventQueue(EventQueue::forEach(boost::bind(&Cluster::connectionEvent, this, _1))),
- state(START)
+ handler(&joiningHandler),
+ joiningHandler(*this),
+ memberHandler(*this)
{
ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
if (agent != 0){
_qmf::Package packageInit(agent);
mgmtObject = new _qmf::Cluster (agent, this, &broker,name.str(),url.str());
agent->addObject (mgmtObject);
- mgmtObject->set_status("JOINING");
+ mgmtObject->set_status("JOINING");
- // if first cluster up set new UUID to set_clusterID() else set UUID of cluster being joined.
+ // if first cluster up set new UUID to set_clusterID() else set UUID of cluster being joined.
}
QPID_LOG(notice, self << " joining cluster " << name.str());
broker.addFinalizer(boost::bind(&Cluster::shutdown, this));
@@ -108,9 +97,6 @@ void Cluster::erase(ConnectionId id) {
connections.erase(id);
}
-// FIXME aconway 2008-09-10: call leave from cluster admin command.
-// Any other type of exit is caught in disconnect().
-//
void Cluster::leave() {
QPID_LOG(notice, self << " leaving cluster " << name.str());
cpg.leave(name);
@@ -147,6 +133,7 @@ std::vector<Url> Cluster::getUrls() const {
}
// FIXME aconway 2008-09-15: volatile for locked/unlocked functions.
+// Check locking from Handler functions.
boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& id) {
Mutex::ScopedLock l(lock);
if (id.getMember() == self)
@@ -179,24 +166,16 @@ void Cluster::deliver(
AMQFrame frame;
while (frame.decode(buf)) {
QPID_LOG(trace, "DLVR [" << e.getConnectionId().getMember() << "]: " << *frame.getBody());
- if (!ClusterOperations(*this, e.getConnectionId().getMember()).invoke(frame))
+ if (!handler->invoke(e.getConnectionId().getMember(), frame))
throw Exception(QPID_MSG("Invalid cluster control"));
}
}
- else {
- // Process connection controls & data via the connectionEventQueue
- // unless we are in the DISCARD state, in which case ignore.
- if (state != DISCARD) {
- e.setConnection(getConnection(e.getConnectionId()));
- connectionEventQueue.push(e);
- }
- }
+ else
+ handler->deliver(e);
}
catch (const std::exception& e) {
- // FIXME aconway 2008-01-30: exception handling.
QPID_LOG(critical, "Error in cluster deliver: " << e.what());
- assert(0);
- throw;
+ leave();
}
}
@@ -208,17 +187,19 @@ void Cluster::connectionEvent(const Event& e) {
else { // control
AMQFrame frame;
while (frame.decode(buf))
- e.getConnection()->deliver(frame);
+ e.getConnection()->received(frame);
}
}
struct AddrList {
const cpg_address* addrs;
int count;
- AddrList(const cpg_address* a, int n) : addrs(a), count(n) {}
+ const char* prefix;
+ AddrList(const cpg_address* a, int n, const char* p=0) : addrs(a), count(n), prefix(p) {}
};
ostream& operator<<(ostream& o, const AddrList& a) {
+ if (a.count && a.prefix) o << a.prefix;
for (const cpg_address* p = a.addrs; p < a.addrs+a.count; ++p) {
const char* reasonString;
switch (p->reason) {
@@ -252,82 +233,41 @@ void Cluster::configChange(
cpg_name */*group*/,
cpg_address *current, int nCurrent,
cpg_address *left, int nLeft,
- cpg_address */*joined*/, int nJoined)
+ cpg_address *joined, int nJoined)
{
Mutex::ScopedLock l(lock);
- // FIXME aconway 2008-09-15: use group terminology not cluster. Member not node.
- QPID_LOG(info, "Current cluster: " << AddrList(current, nCurrent));
- QPID_LOG_IF(info, nLeft, "Left the cluster: " << AddrList(left, nLeft));
+ QPID_LOG(debug, "Cluster: " << AddrList(current, nCurrent) << ". "
+ << AddrList(left, nLeft, "Left: "));
- map.left(left, nLeft);
if (find(left, left+nLeft, self) != left+nLeft) {
// I have left the group, this is the final config change.
QPID_LOG(notice, self << " left cluster " << name.str());
broker.shutdown();
return;
}
+
+ map.left(left, nLeft);
+ handler->configChange(current, nCurrent, left, nLeft, joined, nJoined);
- if (state == START) {
- if (nCurrent == 1 && *current == self) { // First in cluster.
- // First in cluster
- QPID_LOG(notice, self << " first in cluster.");
- map.add(self, url);
- ready();
- }
- updateMemberStats();
- return;
- }
-
- if (state == DISCARD && !map.dumper) // try another dump request.
- mcastControl(ClusterDumpRequestBody(ProtocolVersion(), url.str()), 0);
-
- if (nJoined && map.sendUpdate(self)) // New members need update
- mcastControl(map.toControl(), 0);
-
+ // FIXME aconway 2008-09-17: management update.
//update mgnt stats
- updateMemberStats();
+ updateMemberStats();
}
-void Cluster::update(const FieldTable& members, uint64_t dumper) {
+void Cluster::update(const MemberId& id, const framing::FieldTable& members, uint64_t dumper) {
Mutex::ScopedLock l(lock);
- map.update(members, dumper);
- QPID_LOG(debug, "Cluster update: " << map);
- if (state == START) state = DISCARD; // Got first update.
- if (state == DISCARD && !map.dumper)
- mcastControl(ClusterDumpRequestBody(ProtocolVersion(), url.str()), 0);
+ handler->update(id, members, dumper);
}
void Cluster::dumpRequest(const MemberId& dumpee, const string& urlStr) {
Mutex::ScopedLock l(lock);
- if (map.dumper) return; // Dump already in progress, ignore.
- map.dumper = map.first();
- if (dumpee == self && state == DISCARD) { // My turn to receive a dump.
- QPID_LOG(info, self << " receiving state dump from " << map.dumper);
- // FIXME aconway 2008-09-15: RECEIVE DUMP
- // state = CATCHUP;
- // stall();
- // When received
- mcastControl(ClusterReadyBody(ProtocolVersion(), url.str()), 0);
- ready();
- }
- else if (map.dumper == self && state == READY) { // My turn to send the dump
- QPID_LOG(info, self << " sending state dump to " << dumpee);
- // FIXME aconway 2008-09-15: stall & send brain dump - finish DumpClient.
- // state = DUMPING;
- // stall();
- (void)urlStr;
- // When dump complete:
- assert(map.dumper == self);
- ClusterUpdateBody b = map.toControl();
- b.setDumper(0);
- mcastControl(b, 0);
- // NB: Don't modify my own map till self-delivery.
- }
+ handler->dumpRequest(dumpee, urlStr);
}
void Cluster::ready(const MemberId& member, const std::string& url) {
Mutex::ScopedLock l(lock);
- map.add(member, Url(url));
+ handler->ready(member, url);
+ // FIXME aconway 2008-09-17: management update.
}
broker::Broker& Cluster::getBroker(){ return broker; }
@@ -341,18 +281,18 @@ void Cluster::stall() {
// FIXME aconway 2008-09-11: Flow control, we should slow down or
// stop reading from local connections while stalled to avoid an
// unbounded queue.
- if (mgmtObject!=0)
- mgmtObject->set_status("STALLED");
+ // if (mgmtObject!=0)
+ // mgmtObject->set_status("STALLED");
}
void Cluster::ready() {
// Called with lock held
- QPID_LOG(info, self << " ready with URL " << url);
- state = READY;
+ QPID_LOG(info, self << " ready at URL " << url);
+ mcastControl(ClusterReadyBody(ProtocolVersion(), url.str()), 0);
+ handler = &memberHandler; // Member mode.
connectionEventQueue.start(poller);
- // FIXME aconway 2008-09-15: stall/unstall map?
- if (mgmtObject!=0)
- mgmtObject->set_status("ACTIVE");
+ // if (mgmtObject!=0)
+ // mgmtObject->set_status("ACTIVE");
}
// Called from Broker::~Broker when broker is shut down. At this
@@ -367,52 +307,53 @@ void Cluster::shutdown() {
delete this;
}
-ManagementObject* Cluster::GetManagementObject(void) const
-{
- return (ManagementObject*) mgmtObject;
+ManagementObject* Cluster::GetManagementObject(void) const {
+ return (ManagementObject*) mgmtObject;
}
-Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args& /*args*/, string&)
-{
- Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
- QPID_LOG (debug, "Queue::ManagementMethod [id=" << methodId << "]");
-
- switch (methodId)
- {
- case _qmf::Cluster::METHOD_STOPCLUSTERNODE:
- stopClusterNode();
- break;
- case _qmf::Cluster::METHOD_STOPFULLCLUSTER:
- stopFullCluster();
- break;
- }
-
- return status;
+Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args& /*args*/, string&) {
+ Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
+ QPID_LOG (debug, "Queue::ManagementMethod [id=" << methodId << "]");
+
+ switch (methodId)
+ {
+ case _qmf::Cluster::METHOD_STOPCLUSTERNODE:
+ stopClusterNode();
+ break;
+ case _qmf::Cluster::METHOD_STOPFULLCLUSTER:
+ stopFullCluster();
+ break;
+ }
+
+ return status;
}
void Cluster::stopClusterNode(void)
{
+ // FIXME aconway 2008-09-18:
QPID_LOG(notice, self << " disconnected from cluster " << name.str());
broker.shutdown();
}
void Cluster::stopFullCluster(void)
{
+ // FIXME aconway 2008-09-17: TODO
}
void Cluster::updateMemberStats(void)
{
//update mgnt stats
- if (mgmtObject!=0){
- mgmtObject->set_clusterSize(size());
- std::vector<Url> vectUrl = getUrls();
- string urlstr;
- for(std::vector<Url>::iterator iter = vectUrl.begin(); iter != vectUrl.end(); iter++ ) {
- if (iter != vectUrl.begin()) urlstr += ";";
- urlstr += iter->str();
- }
- mgmtObject->set_members(urlstr);
- }
+ // FIXME aconway 2008-09-18:
+// if (mgmtObject!=0){
+// mgmtObject->set_clusterSize(size());
+// std::vector<Url> vectUrl = getUrls();
+// string urlstr;
+// for(std::vector<Url>::iterator iter = vectUrl.begin(); iter != vectUrl.end(); iter++ ) {
+// if (iter != vectUrl.begin()) urlstr += ";";
+// urlstr += iter->str();
+// }
+// mgmtObject->set_members(urlstr);
+// }
}