summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-09-18 13:55:30 +0000
committerAlan Conway <aconway@apache.org>2008-09-18 13:55:30 +0000
commitb208766dcaf114eac162d6f230fb05370b01e04b (patch)
tree3cc58c975cab4072bc7bbf97ac1057c1219c17da /qpid/cpp
parentf5714202a325b2c17c348b336846c2832e54f8ee (diff)
downloadqpid-python-b208766dcaf114eac162d6f230fb05370b01e04b.tar.gz
Refactor Cluster logic into separate handlers for Joining & Member modes.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@696657 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/src/cluster.mk8
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp187
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.h15
-rw-r--r--qpid/cpp/src/qpid/cluster/ClusterHandler.cpp53
-rw-r--r--qpid/cpp/src/qpid/cluster/ClusterHandler.h65
-rw-r--r--qpid/cpp/src/qpid/cluster/ClusterMap.cpp8
-rw-r--r--qpid/cpp/src/qpid/cluster/ClusterMap.h7
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.cpp14
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.h1
-rw-r--r--qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp5
-rw-r--r--qpid/cpp/src/qpid/cluster/Cpg.cpp4
-rw-r--r--qpid/cpp/src/qpid/cluster/DumpClient.cpp21
-rw-r--r--qpid/cpp/src/qpid/cluster/DumpClient.h8
-rw-r--r--qpid/cpp/src/qpid/cluster/JoiningHandler.cpp106
-rw-r--r--qpid/cpp/src/qpid/cluster/JoiningHandler.h56
-rw-r--r--qpid/cpp/src/qpid/cluster/MemberHandler.cpp83
-rw-r--r--qpid/cpp/src/qpid/cluster/MemberHandler.h60
-rw-r--r--qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp1
-rw-r--r--qpid/cpp/src/qpid/sys/PollableQueue.h10
-rw-r--r--qpid/cpp/src/tests/DumpClientTest.cpp2
20 files changed, 546 insertions, 168 deletions
diff --git a/qpid/cpp/src/cluster.mk b/qpid/cpp/src/cluster.mk
index 443db3fb15..8060e49b97 100644
--- a/qpid/cpp/src/cluster.mk
+++ b/qpid/cpp/src/cluster.mk
@@ -28,7 +28,13 @@ cluster_la_SOURCES = \
qpid/cluster/DumpClient.h \
qpid/cluster/DumpClient.cpp \
qpid/cluster/ClusterMap.h \
- qpid/cluster/ClusterMap.cpp
+ qpid/cluster/ClusterMap.cpp \
+ qpid/cluster/ClusterHandler.h \
+ qpid/cluster/ClusterHandler.cpp \
+ qpid/cluster/JoiningHandler.h \
+ qpid/cluster/JoiningHandler.cpp \
+ qpid/cluster/MemberHandler.h \
+ qpid/cluster/MemberHandler.cpp
cluster_la_LIBADD= -lcpg libqpidbroker.la libqpidclient.la
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index e64d80e214..53f0ccc08c 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -23,8 +23,6 @@
#include "qpid/broker/SessionState.h"
#include "qpid/broker/Connection.h"
#include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/AMQP_AllOperations.h"
-#include "qpid/framing/AllInvoker.h"
#include "qpid/framing/ClusterDumpRequestBody.h"
#include "qpid/framing/ClusterUpdateBody.h"
#include "qpid/framing/ClusterReadyBody.h"
@@ -55,17 +53,6 @@ using qpid::management::Manageable;
using qpid::management::Args;
namespace _qmf = qmf::org::apache::qpid::cluster;
-struct ClusterOperations : public AMQP_AllOperations::ClusterHandler {
- Cluster& cluster;
- MemberId member;
- ClusterOperations(Cluster& c, const MemberId& id) : cluster(c), member(id) {}
- bool invoke(AMQFrame& f) { return framing::invoke(*this, *f.getBody()).wasHandled(); }
-
- void update(const FieldTable& members, uint64_t dumping) { cluster.update(members, dumping); }
- void dumpRequest(const std::string& url) { cluster.dumpRequest(member, url); }
- void ready(const std::string& url) { cluster.ready(member, url); }
-};
-
Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
broker(b),
poller(b.getPoller()),
@@ -79,16 +66,18 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
boost::bind(&Cluster::disconnect, this, _1) // disconnect
),
connectionEventQueue(EventQueue::forEach(boost::bind(&Cluster::connectionEvent, this, _1))),
- state(START)
+ handler(&joiningHandler),
+ joiningHandler(*this),
+ memberHandler(*this)
{
ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
if (agent != 0){
_qmf::Package packageInit(agent);
mgmtObject = new _qmf::Cluster (agent, this, &broker,name.str(),url.str());
agent->addObject (mgmtObject);
- mgmtObject->set_status("JOINING");
+ mgmtObject->set_status("JOINING");
- // if first cluster up set new UUID to set_clusterID() else set UUID of cluster being joined.
+ // if first cluster up set new UUID to set_clusterID() else set UUID of cluster being joined.
}
QPID_LOG(notice, self << " joining cluster " << name.str());
broker.addFinalizer(boost::bind(&Cluster::shutdown, this));
@@ -108,9 +97,6 @@ void Cluster::erase(ConnectionId id) {
connections.erase(id);
}
-// FIXME aconway 2008-09-10: call leave from cluster admin command.
-// Any other type of exit is caught in disconnect().
-//
void Cluster::leave() {
QPID_LOG(notice, self << " leaving cluster " << name.str());
cpg.leave(name);
@@ -147,6 +133,7 @@ std::vector<Url> Cluster::getUrls() const {
}
// FIXME aconway 2008-09-15: volatile for locked/unlocked functions.
+// Check locking from Handler functions.
boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& id) {
Mutex::ScopedLock l(lock);
if (id.getMember() == self)
@@ -179,24 +166,16 @@ void Cluster::deliver(
AMQFrame frame;
while (frame.decode(buf)) {
QPID_LOG(trace, "DLVR [" << e.getConnectionId().getMember() << "]: " << *frame.getBody());
- if (!ClusterOperations(*this, e.getConnectionId().getMember()).invoke(frame))
+ if (!handler->invoke(e.getConnectionId().getMember(), frame))
throw Exception(QPID_MSG("Invalid cluster control"));
}
}
- else {
- // Process connection controls & data via the connectionEventQueue
- // unless we are in the DISCARD state, in which case ignore.
- if (state != DISCARD) {
- e.setConnection(getConnection(e.getConnectionId()));
- connectionEventQueue.push(e);
- }
- }
+ else
+ handler->deliver(e);
}
catch (const std::exception& e) {
- // FIXME aconway 2008-01-30: exception handling.
QPID_LOG(critical, "Error in cluster deliver: " << e.what());
- assert(0);
- throw;
+ leave();
}
}
@@ -208,17 +187,19 @@ void Cluster::connectionEvent(const Event& e) {
else { // control
AMQFrame frame;
while (frame.decode(buf))
- e.getConnection()->deliver(frame);
+ e.getConnection()->received(frame);
}
}
struct AddrList {
const cpg_address* addrs;
int count;
- AddrList(const cpg_address* a, int n) : addrs(a), count(n) {}
+ const char* prefix;
+ AddrList(const cpg_address* a, int n, const char* p=0) : addrs(a), count(n), prefix(p) {}
};
ostream& operator<<(ostream& o, const AddrList& a) {
+ if (a.count && a.prefix) o << a.prefix;
for (const cpg_address* p = a.addrs; p < a.addrs+a.count; ++p) {
const char* reasonString;
switch (p->reason) {
@@ -252,82 +233,41 @@ void Cluster::configChange(
cpg_name */*group*/,
cpg_address *current, int nCurrent,
cpg_address *left, int nLeft,
- cpg_address */*joined*/, int nJoined)
+ cpg_address *joined, int nJoined)
{
Mutex::ScopedLock l(lock);
- // FIXME aconway 2008-09-15: use group terminology not cluster. Member not node.
- QPID_LOG(info, "Current cluster: " << AddrList(current, nCurrent));
- QPID_LOG_IF(info, nLeft, "Left the cluster: " << AddrList(left, nLeft));
+ QPID_LOG(debug, "Cluster: " << AddrList(current, nCurrent) << ". "
+ << AddrList(left, nLeft, "Left: "));
- map.left(left, nLeft);
if (find(left, left+nLeft, self) != left+nLeft) {
// I have left the group, this is the final config change.
QPID_LOG(notice, self << " left cluster " << name.str());
broker.shutdown();
return;
}
+
+ map.left(left, nLeft);
+ handler->configChange(current, nCurrent, left, nLeft, joined, nJoined);
- if (state == START) {
- if (nCurrent == 1 && *current == self) { // First in cluster.
- // First in cluster
- QPID_LOG(notice, self << " first in cluster.");
- map.add(self, url);
- ready();
- }
- updateMemberStats();
- return;
- }
-
- if (state == DISCARD && !map.dumper) // try another dump request.
- mcastControl(ClusterDumpRequestBody(ProtocolVersion(), url.str()), 0);
-
- if (nJoined && map.sendUpdate(self)) // New members need update
- mcastControl(map.toControl(), 0);
-
+ // FIXME aconway 2008-09-17: management update.
//update mgnt stats
- updateMemberStats();
+ updateMemberStats();
}
-void Cluster::update(const FieldTable& members, uint64_t dumper) {
+void Cluster::update(const MemberId& id, const framing::FieldTable& members, uint64_t dumper) {
Mutex::ScopedLock l(lock);
- map.update(members, dumper);
- QPID_LOG(debug, "Cluster update: " << map);
- if (state == START) state = DISCARD; // Got first update.
- if (state == DISCARD && !map.dumper)
- mcastControl(ClusterDumpRequestBody(ProtocolVersion(), url.str()), 0);
+ handler->update(id, members, dumper);
}
void Cluster::dumpRequest(const MemberId& dumpee, const string& urlStr) {
Mutex::ScopedLock l(lock);
- if (map.dumper) return; // Dump already in progress, ignore.
- map.dumper = map.first();
- if (dumpee == self && state == DISCARD) { // My turn to receive a dump.
- QPID_LOG(info, self << " receiving state dump from " << map.dumper);
- // FIXME aconway 2008-09-15: RECEIVE DUMP
- // state = CATCHUP;
- // stall();
- // When received
- mcastControl(ClusterReadyBody(ProtocolVersion(), url.str()), 0);
- ready();
- }
- else if (map.dumper == self && state == READY) { // My turn to send the dump
- QPID_LOG(info, self << " sending state dump to " << dumpee);
- // FIXME aconway 2008-09-15: stall & send brain dump - finish DumpClient.
- // state = DUMPING;
- // stall();
- (void)urlStr;
- // When dump complete:
- assert(map.dumper == self);
- ClusterUpdateBody b = map.toControl();
- b.setDumper(0);
- mcastControl(b, 0);
- // NB: Don't modify my own map till self-delivery.
- }
+ handler->dumpRequest(dumpee, urlStr);
}
void Cluster::ready(const MemberId& member, const std::string& url) {
Mutex::ScopedLock l(lock);
- map.add(member, Url(url));
+ handler->ready(member, url);
+ // FIXME aconway 2008-09-17: management update.
}
broker::Broker& Cluster::getBroker(){ return broker; }
@@ -341,18 +281,18 @@ void Cluster::stall() {
// FIXME aconway 2008-09-11: Flow control, we should slow down or
// stop reading from local connections while stalled to avoid an
// unbounded queue.
- if (mgmtObject!=0)
- mgmtObject->set_status("STALLED");
+ // if (mgmtObject!=0)
+ // mgmtObject->set_status("STALLED");
}
void Cluster::ready() {
// Called with lock held
- QPID_LOG(info, self << " ready with URL " << url);
- state = READY;
+ QPID_LOG(info, self << " ready at URL " << url);
+ mcastControl(ClusterReadyBody(ProtocolVersion(), url.str()), 0);
+ handler = &memberHandler; // Member mode.
connectionEventQueue.start(poller);
- // FIXME aconway 2008-09-15: stall/unstall map?
- if (mgmtObject!=0)
- mgmtObject->set_status("ACTIVE");
+ // if (mgmtObject!=0)
+ // mgmtObject->set_status("ACTIVE");
}
// Called from Broker::~Broker when broker is shut down. At this
@@ -367,52 +307,53 @@ void Cluster::shutdown() {
delete this;
}
-ManagementObject* Cluster::GetManagementObject(void) const
-{
- return (ManagementObject*) mgmtObject;
+ManagementObject* Cluster::GetManagementObject(void) const {
+ return (ManagementObject*) mgmtObject;
}
-Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args& /*args*/, string&)
-{
- Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
- QPID_LOG (debug, "Queue::ManagementMethod [id=" << methodId << "]");
-
- switch (methodId)
- {
- case _qmf::Cluster::METHOD_STOPCLUSTERNODE:
- stopClusterNode();
- break;
- case _qmf::Cluster::METHOD_STOPFULLCLUSTER:
- stopFullCluster();
- break;
- }
-
- return status;
+Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args& /*args*/, string&) {
+ Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
+ QPID_LOG (debug, "Queue::ManagementMethod [id=" << methodId << "]");
+
+ switch (methodId)
+ {
+ case _qmf::Cluster::METHOD_STOPCLUSTERNODE:
+ stopClusterNode();
+ break;
+ case _qmf::Cluster::METHOD_STOPFULLCLUSTER:
+ stopFullCluster();
+ break;
+ }
+
+ return status;
}
void Cluster::stopClusterNode(void)
{
+ // FIXME aconway 2008-09-18:
QPID_LOG(notice, self << " disconnected from cluster " << name.str());
broker.shutdown();
}
void Cluster::stopFullCluster(void)
{
+ // FIXME aconway 2008-09-17: TODO
}
void Cluster::updateMemberStats(void)
{
//update mgnt stats
- if (mgmtObject!=0){
- mgmtObject->set_clusterSize(size());
- std::vector<Url> vectUrl = getUrls();
- string urlstr;
- for(std::vector<Url>::iterator iter = vectUrl.begin(); iter != vectUrl.end(); iter++ ) {
- if (iter != vectUrl.begin()) urlstr += ";";
- urlstr += iter->str();
- }
- mgmtObject->set_members(urlstr);
- }
+ // FIXME aconway 2008-09-18:
+// if (mgmtObject!=0){
+// mgmtObject->set_clusterSize(size());
+// std::vector<Url> vectUrl = getUrls();
+// string urlstr;
+// for(std::vector<Url>::iterator iter = vectUrl.begin(); iter != vectUrl.end(); iter++ ) {
+// if (iter != vectUrl.begin()) urlstr += ";";
+// urlstr += iter->str();
+// }
+// mgmtObject->set_members(urlstr);
+// }
}
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h
index 847f179cc0..7c4e121a9b 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.h
+++ b/qpid/cpp/src/qpid/cluster/Cluster.h
@@ -23,11 +23,12 @@
#include "Event.h"
#include "NoOpConnectionOutputHandler.h"
#include "ClusterMap.h"
+#include "JoiningHandler.h"
+#include "MemberHandler.h"
#include "qpid/broker/Broker.h"
#include "qpid/sys/PollableQueue.h"
#include "qpid/sys/Monitor.h"
-#include "qpid/framing/AMQP_AllOperations.h"
#include "qpid/Url.h"
#include "qpid/management/Manageable.h"
#include "qmf/org/apache/qpid/cluster/Cluster.h"
@@ -78,7 +79,7 @@ class Cluster : private Cpg::Handler, public management::Manageable
void leave();
// Cluster controls.
- void update(const framing::FieldTable& members, uint64_t dumping);
+ void update(const MemberId&, const framing::FieldTable& members, uint64_t dumping);
void dumpRequest(const MemberId&, const std::string& url);
void ready(const MemberId&, const std::string& url);
@@ -127,8 +128,6 @@ class Cluster : private Cpg::Handler, public management::Manageable
/** Callback if CPG fd is disconnected. */
void disconnect(sys::DispatchHandle&);
- void handleMethod(MemberId from, cluster::Connection* connection, framing::AMQMethodBody& method);
-
boost::intrusive_ptr<cluster::Connection> getConnection(const ConnectionId&);
virtual qpid::management::ManagementObject* GetManagementObject(void) const;
@@ -151,6 +150,14 @@ class Cluster : private Cpg::Handler, public management::Manageable
EventQueue connectionEventQueue;
State state;
qmf::org::apache::qpid::cluster::Cluster* mgmtObject; // mgnt owns lifecycle
+
+ // Handlers for different states.
+ ClusterHandler* handler;
+ JoiningHandler joiningHandler;
+ MemberHandler memberHandler;
+
+ friend class JoiningHandler;
+ friend class MemberHandler;
};
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/ClusterHandler.cpp b/qpid/cpp/src/qpid/cluster/ClusterHandler.cpp
new file mode 100644
index 0000000000..648d40470c
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/ClusterHandler.cpp
@@ -0,0 +1,53 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/framing/AllInvoker.h"
+
+#include "ClusterHandler.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/FieldTable.h"
+
+
+
+namespace qpid {
+namespace cluster {
+
+struct Operations : public framing::AMQP_AllOperations::ClusterHandler {
+ qpid::cluster::ClusterHandler& handler;
+ MemberId member;
+ Operations(qpid::cluster::ClusterHandler& c, const MemberId& id) : handler(c), member(id) {}
+
+ void update(const framing::FieldTable& members, uint64_t dumping) { handler.update(member, members, dumping); }
+ void dumpRequest(const std::string& url) { handler.dumpRequest(member, url); }
+ void ready(const std::string& url) { handler.ready(member, url); }
+};
+
+ClusterHandler::~ClusterHandler() {}
+
+ClusterHandler::ClusterHandler(Cluster& c) : cluster (c) {}
+
+bool ClusterHandler::invoke(const MemberId& id, framing::AMQFrame& frame) {
+ Operations ops(*this, id);
+ return framing::invoke(ops, *frame.getBody()).wasHandled();
+}
+
+}} // namespace qpid::cluster
+
diff --git a/qpid/cpp/src/qpid/cluster/ClusterHandler.h b/qpid/cpp/src/qpid/cluster/ClusterHandler.h
new file mode 100644
index 0000000000..5da5cf5b75
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/ClusterHandler.h
@@ -0,0 +1,65 @@
+#ifndef QPID_CLUSTER_CLUSTERHANDLER_H
+#define QPID_CLUSTER_CLUSTERHANDLER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Cpg.h"
+#include "types.h"
+
+namespace qpid {
+
+namespace framing { class AMQFrame; }
+
+namespace cluster {
+
+class Cluster;
+class Event;
+
+/**
+ * Interface for handing cluster events.
+ * Implementations provide different behavior for different states of a member..
+ */
+class ClusterHandler
+{
+ public:
+ ClusterHandler(Cluster& c);
+ virtual ~ClusterHandler();
+
+ virtual void update(const MemberId&, const framing::FieldTable& members, uint64_t dumping) = 0;
+ virtual void dumpRequest(const MemberId&, const std::string& url) = 0;
+ virtual void ready(const MemberId&, const std::string& url) = 0;
+
+ virtual void deliver(Event& e) = 0; // Deliver a connection event.
+
+ virtual void configChange(cpg_address *current, int nCurrent,
+ cpg_address *left, int nLeft,
+ cpg_address *joined, int nJoined) = 0;
+
+ bool invoke(const MemberId&, framing::AMQFrame& f);
+
+ protected:
+ Cluster& cluster;
+};
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_CLUSTERHANDLER_H*/
diff --git a/qpid/cpp/src/qpid/cluster/ClusterMap.cpp b/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
index 51e360ad73..e14e35998f 100644
--- a/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
+++ b/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
@@ -53,8 +53,8 @@ framing::ClusterUpdateBody ClusterMap::toControl() const {
return b;
}
-void ClusterMap::update(const FieldTable& ftMembers, uint64_t dumper_) {
- FieldTable::ValueMap::const_iterator i;
+void ClusterMap::update(const framing::FieldTable& ftMembers, uint64_t dumper_) {
+ framing:: FieldTable::ValueMap::const_iterator i;
for (i = ftMembers.begin(); i != ftMembers.end(); ++i)
members[i->first] = Url(i->second->get<std::string>());
dumper = MemberId(dumper_);
@@ -82,8 +82,10 @@ bool ClusterMap::sendUpdate(const MemberId& id) const {
return dumper==id || (!dumper && first() == id);
}
-void ClusterMap::add(const MemberId& id, const Url& url) {
+void ClusterMap::ready(const MemberId& id, const Url& url) {
members[id] = url;
+ if (id == dumper)
+ dumper = MemberId();
}
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/ClusterMap.h b/qpid/cpp/src/qpid/cluster/ClusterMap.h
index c626c7688d..b00f818f88 100644
--- a/qpid/cpp/src/qpid/cluster/ClusterMap.h
+++ b/qpid/cpp/src/qpid/cluster/ClusterMap.h
@@ -34,8 +34,6 @@
namespace qpid {
namespace cluster {
-// FIXME aconway 2008-09-15: rename cluster status?
-
/**
* Map of established cluster members and brain-dumps in progress.
* A dumper is an established member that is sending catch-up data.
@@ -58,8 +56,8 @@ class ClusterMap {
/** Convert map contents to a cluster update body. */
framing::ClusterUpdateBody toControl() const;
- /** Add a new member. */
- void add(const MemberId& id, const Url& url);
+ /** Add a new member or dump complete if id == dumper. */
+ void ready(const MemberId& id, const Url& url);
/** Apply update delivered from clsuter. */
void update(const framing::FieldTable& members, uint64_t dumper);
@@ -70,6 +68,7 @@ class ClusterMap {
std::vector<Url> memberUrls() const;
size_t size() const { return members.size(); }
+ bool empty() const { return members.empty(); }
private:
friend std::ostream& operator<<(std::ostream&, const ClusterMap&);
diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp
index 6cc21633d3..51da5bef25 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.cpp
+++ b/qpid/cpp/src/qpid/cluster/Connection.cpp
@@ -21,11 +21,9 @@
#include "Connection.h"
#include "Cluster.h"
#include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/Invoker.h"
-#include "qpid/framing/AllInvoker.h"
#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
#include "qpid/log/Statement.h"
-
+#include "qpid/framing/AllInvoker.h"
#include <boost/current_function.hpp>
namespace qpid {
@@ -47,11 +45,6 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
Connection::~Connection() {}
-void Connection::received(framing::AMQFrame& ) {
- // FIXME aconway 2008-09-02: not called, codec sends straight to deliver
- assert(0);
-}
-
bool Connection::doOutput() { return output.doOutput(); }
// Delivery of doOutput allows us to run the real connection doOutput()
@@ -62,7 +55,7 @@ void Connection::deliverDoOutput(uint32_t requested) {
}
// Handle frames delivered from cluster.
-void Connection::deliver(framing::AMQFrame& f) {
+void Connection::received(framing::AMQFrame& f) {
QPID_LOG(trace, "DLVR [" << self << "]: " << f);
// Handle connection controls, deliver other frames to connection.
if (!framing::invoke(*this, *f.getBody()).wasHandled())
@@ -95,14 +88,13 @@ void Connection::deliverClose () {
size_t Connection::decode(const char* buffer, size_t size) {
++mcastSeq;
cluster.mcastBuffer(buffer, size, self);
- // FIXME aconway 2008-09-01: deserialize?
return size;
}
void Connection::deliverBuffer(Buffer& buf) {
++deliverSeq;
while (decoder.decode(buf))
- deliver(decoder.frame); // FIXME aconway 2008-09-01: Queue frames for delivery in separate thread.
+ received(decoder.frame);
}
diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h
index 37ff2ac6b4..d17dc704ed 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.h
+++ b/qpid/cpp/src/qpid/cluster/Connection.h
@@ -63,7 +63,6 @@ class Connection :
Cluster& getCluster() { return cluster; }
// self-delivery of multicast data.
- void deliver(framing::AMQFrame& f);
void deliverClose();
void deliverDoOutput(uint32_t requested);
void deliverBuffer(framing::Buffer&);
diff --git a/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp b/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
index ed046f2ede..3dfd8ecc38 100644
--- a/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
+++ b/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
@@ -36,9 +36,7 @@ ConnectionCodec::Factory::create(framing::ProtocolVersion v, sys::OutputControl&
return 0;
}
-// FIXME aconway 2008-08-27: outbound connections need to be made
-// with proper qpid::client code for failover, get rid of this
-// broker-side hack.
+// Used for outgoing Link connections, we don't care.
sys::ConnectionCodec*
ConnectionCodec::Factory::create(sys::OutputControl& out, const std::string& id) {
return next->create(out, id);
@@ -60,7 +58,6 @@ size_t ConnectionCodec::decode(const char* buffer, size_t size) {
return interceptor->decode(buffer, size);
}
-// FIXME aconway 2008-09-02: delegate to interceptor?
size_t ConnectionCodec::encode(const char* buffer, size_t size) { return codec.encode(buffer, size); }
bool ConnectionCodec::canEncode() { return codec.canEncode(); }
void ConnectionCodec::closed() { codec.closed(); }
diff --git a/qpid/cpp/src/qpid/cluster/Cpg.cpp b/qpid/cpp/src/qpid/cluster/Cpg.cpp
index 2a77fa437a..9b71e4235d 100644
--- a/qpid/cpp/src/qpid/cluster/Cpg.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cpg.cpp
@@ -101,9 +101,7 @@ bool Cpg::isFlowControlEnabled() {
return flowState == CPG_FLOW_CONTROL_ENABLED;
}
-// TODO aconway 2008-08-07: better handling of flow control.
-// Wait for flow control to be disabled.
-// FIXME aconway 2008-08-08: does flow control check involve a round-trip? If so maybe remove...
+// FIXME aconway 2008-08-07: better handling of cpg flow control, no sleeping.
void Cpg::waitForFlowControl() {
int delayNs=1000; // one millisecond
int tries=8; // double the delay on each try.
diff --git a/qpid/cpp/src/qpid/cluster/DumpClient.cpp b/qpid/cpp/src/qpid/cluster/DumpClient.cpp
index f20ceb2ab6..f76a55c0d3 100644
--- a/qpid/cpp/src/qpid/cluster/DumpClient.cpp
+++ b/qpid/cpp/src/qpid/cluster/DumpClient.cpp
@@ -44,36 +44,39 @@ using namespace framing::message;
using namespace client;
-DumpClient::DumpClient(const Url& url, Broker& b, const boost::function<void(const char*)>& f)
- : donor(b), failed(f)
+DumpClient::DumpClient(const Url& url, Broker& b,
+ const boost::function<void()>& ok,
+ const boost::function<void(const std::exception&)>& fail)
+ : donor(b), done(ok), failed(fail)
{
+ // FIXME aconway 2008-09-16: Identify as DumpClient connection.
connection.open(url);
session = connection.newSession();
}
-DumpClient::~DumpClient() {
- session.close();
- connection.close();
-}
+DumpClient::~DumpClient() {}
// Catch-up exchange name: an illegal AMQP exchange name to avoid clashes.
static const char CATCH_UP_CHARS[] = "\000qpid-dump-exchange";
static const std::string CATCH_UP(CATCH_UP_CHARS, sizeof(CATCH_UP_CHARS));
void DumpClient::dump() {
- // FIXME aconway 2008-09-08: send cluster map frame first.
donor.getExchanges().eachExchange(boost::bind(&DumpClient::dumpExchange, this, _1));
// Catch-up exchange is used to route messages to the proper queue without modifying routing key.
session.exchangeDeclare(arg::exchange=CATCH_UP, arg::type="fanout", arg::autoDelete=true);
donor.getQueues().eachQueue(boost::bind(&DumpClient::dumpQueue, this, _1));
session.sync();
+ session.close();
+ // FIXME aconway 2008-09-17: send dump complete indication.
+ connection.close();
}
void DumpClient::run() {
try {
dump();
- } catch (const Exception& e) {
- failed(e.what());
+ done();
+ } catch (const std::exception& e) {
+ failed(e);
}
delete this;
}
diff --git a/qpid/cpp/src/qpid/cluster/DumpClient.h b/qpid/cpp/src/qpid/cluster/DumpClient.h
index 1c49b417d7..83c9ac4076 100644
--- a/qpid/cpp/src/qpid/cluster/DumpClient.h
+++ b/qpid/cpp/src/qpid/cluster/DumpClient.h
@@ -54,7 +54,10 @@ namespace cluster {
*/
class DumpClient : public sys::Runnable {
public:
- DumpClient(const Url& url, broker::Broker& donor, const boost::function<void(const char*)>& onFail);
+ DumpClient(const Url& url, broker::Broker& donor,
+ const boost::function<void()>& done,
+ const boost::function<void(const std::exception&)>& fail);
+
~DumpClient();
void dump();
void run(); // Will delete this when finished.
@@ -69,7 +72,8 @@ class DumpClient : public sys::Runnable {
client::Connection connection;
client::AsyncSession session;
broker::Broker& donor;
- boost::function<void(const char*)> failed;
+ boost::function<void()> done;
+ boost::function<void(const std::exception& e)> failed;
};
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp b/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp
new file mode 100644
index 0000000000..3358e3404b
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp
@@ -0,0 +1,106 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "JoiningHandler.h"
+#include "Cluster.h"
+#include "qpid/framing/ClusterDumpRequestBody.h"
+#include "qpid/framing/ClusterReadyBody.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace cluster {
+
+using namespace sys;
+using namespace framing;
+
+JoiningHandler::JoiningHandler(Cluster& c) : ClusterHandler(c), state(START) {}
+
+void JoiningHandler::configChange(
+ cpg_address *current, int nCurrent,
+ cpg_address */*left*/, int nLeft,
+ cpg_address */*joined*/, int /*nJoined*/)
+{
+ if (nLeft == 0 && nCurrent == 1 && *current == cluster.self) { // First in cluster.
+ QPID_LOG(notice, cluster.self << " first in cluster.");
+ cluster.map.ready(cluster.self, cluster.url);
+ cluster.ready();
+ }
+}
+
+void JoiningHandler::deliver(Event& e) {
+ // Discard connection events unless we are stalled and getting a dump.
+ if (state == STALLED) {
+ e.setConnection(cluster.getConnection(e.getConnectionId()));
+ cluster.connectionEventQueue.push(e);
+ }
+}
+
+void JoiningHandler::update(const MemberId&, const framing::FieldTable& members, uint64_t dumper) {
+ cluster.map.update(members, dumper);
+ QPID_LOG(debug, "Cluster update: " << cluster.map);
+ checkDumpRequest();
+}
+
+void JoiningHandler::checkDumpRequest() {
+ if (state == START && !cluster.map.dumper) {
+ cluster.broker.getPort(); // ensure the broker is listening.
+ state = DUMP_REQUESTED;
+ cluster.mcastControl(ClusterDumpRequestBody(framing::ProtocolVersion(), cluster.url.str()), 0);
+ }
+}
+
+void JoiningHandler::dumpRequest(const MemberId& dumpee, const std::string& ) {
+ if (cluster.map.dumper) { // Already a dump in progress.
+ if (dumpee == cluster.self && state == DUMP_REQUESTED)
+ state = START; // Need to make another request.
+ }
+ else { // Start a new dump
+ cluster.map.dumper = cluster.map.first();
+ if (dumpee == cluster.self) { // My turn
+
+ state = DUMP_COMPLETE; // FIXME aconway 2008-09-18: bypass dump
+
+ QPID_LOG(info, cluster.self << " receiving state dump from " << cluster.map.dumper);
+ switch (state) {
+ case START:
+ case STALLED:
+ assert(0); break;
+
+ case DUMP_REQUESTED:
+ state = STALLED;
+ cluster.stall();
+ break;
+
+ // FIXME aconway 2008-09-17: no transition to DUMP_COMPLETE state.
+ case DUMP_COMPLETE:
+ cluster.ready();
+ break;
+ }
+ }
+ }
+}
+
+void JoiningHandler::ready(const MemberId& id, const std::string& url) {
+ cluster.map.ready(id, Url(url));
+ checkDumpRequest();
+}
+
+
+}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/JoiningHandler.h b/qpid/cpp/src/qpid/cluster/JoiningHandler.h
new file mode 100644
index 0000000000..07a48b8281
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/JoiningHandler.h
@@ -0,0 +1,56 @@
+#ifndef QPID_CLUSTER_JOININGHANDLER_H
+#define QPID_CLUSTER_JOININGHANDLER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ClusterHandler.h"
+
+namespace qpid {
+namespace cluster {
+
+/**
+ * Cluster handler for the "joining" phase, before the process is a
+ * full cluster member.
+ */
+class JoiningHandler : public ClusterHandler
+{
+ public:
+ JoiningHandler(Cluster& c);
+
+ void configChange(struct cpg_address */*members*/, int /*nMembers*/,
+ struct cpg_address */*left*/, int /*nLeft*/,
+ struct cpg_address */*joined*/, int /*nJoined*/
+ );
+
+ void deliver(Event& e);
+
+ void update(const MemberId&, const framing::FieldTable& members, uint64_t dumping);
+ void dumpRequest(const MemberId&, const std::string& url);
+ void ready(const MemberId&, const std::string& url);
+
+ private:
+ enum { START, DUMP_REQUESTED, STALLED, DUMP_COMPLETE } state;
+ void checkDumpRequest();
+
+};
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_JOININGHANDLER_H*/
diff --git a/qpid/cpp/src/qpid/cluster/MemberHandler.cpp b/qpid/cpp/src/qpid/cluster/MemberHandler.cpp
new file mode 100644
index 0000000000..e82eaec458
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/MemberHandler.cpp
@@ -0,0 +1,83 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "MemberHandler.h"
+#include "Cluster.h"
+#include "DumpClient.h"
+#include "qpid/log/Statement.h"
+#include "qpid/framing/ClusterUpdateBody.h"
+
+namespace qpid {
+namespace cluster {
+
+using namespace sys;
+using namespace framing;
+
+MemberHandler::MemberHandler(Cluster& c) : ClusterHandler(c) {}
+
+void MemberHandler::configChange(
+ cpg_address */*current*/, int /*nCurrent*/,
+ cpg_address */*left*/, int /*nLeft*/,
+ cpg_address */*joined*/, int nJoined)
+{
+ if (nJoined && cluster.map.sendUpdate(cluster.self)) // New members need update
+ cluster.mcastControl(cluster.map.toControl(), 0);
+}
+
+void MemberHandler::deliver(Event& e) {
+ e.setConnection(cluster.getConnection(e.getConnectionId()));
+ cluster.connectionEventQueue.push(e);
+}
+
+void MemberHandler::update(const MemberId&, const framing::FieldTable& , uint64_t) {}
+
+void MemberHandler::dumpRequest(const MemberId& dumpee, const std::string& urlStr) {
+ if (cluster.map.dumper) return; // dump in progress, ignore request.
+
+ cluster.map.dumper = cluster.map.first();
+ if (cluster.map.dumper != cluster.self) return;
+
+ QPID_LOG(info, cluster.self << " sending state dump to " << dumpee);
+ assert(!cluster.connectionEventQueue.isStopped()); // Not currently stalled.
+ cluster.stall();
+
+ cluster.ready(); // FIXME aconway 2008-09-18: Bypass dump
+ (void)urlStr;
+// dumpThread = Thread(new DumpClient(Url(urlStr), cluster.broker,
+// boost::bind(&MemberHandler::dumpDone, this),
+// boost::bind(&MemberHandler::dumpError, this, _1)));
+}
+
+void MemberHandler::ready(const MemberId& id, const std::string& url) {
+ cluster.map.ready(id, Url(url));
+}
+
+
+void MemberHandler::dumpDone() {
+ dumpThread.join(); // Clean up.
+ cluster.ready();
+}
+
+void MemberHandler::dumpError(const std::exception& e) {
+ QPID_LOG(error, "Error in state dump from " << cluster.self << ": " << e.what());
+ dumpDone();
+}
+
+}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/MemberHandler.h b/qpid/cpp/src/qpid/cluster/MemberHandler.h
new file mode 100644
index 0000000000..630500a740
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/MemberHandler.h
@@ -0,0 +1,60 @@
+#ifndef QPID_CLUSTER_MEMBERHANDLER_H
+#define QPID_CLUSTER_MEMBERHANDLER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ClusterHandler.h"
+#include "qpid/sys/Thread.h"
+
+namespace qpid {
+namespace cluster {
+
+/**
+ * Cluster handler for the "member" phase, before the process is a
+ * full cluster member.
+ */
+class MemberHandler : public ClusterHandler
+{
+ public:
+ MemberHandler(Cluster& c);
+
+ void configChange(
+ struct cpg_address */*members*/, int /*nMembers*/,
+ struct cpg_address */*left*/, int /*nLeft*/,
+ struct cpg_address */*joined*/, int /*nJoined*/
+ );
+
+ void deliver(Event& e);
+
+ void update(const MemberId&, const framing::FieldTable& members, uint64_t dumping);
+ void dumpRequest(const MemberId&, const std::string& url);
+ void ready(const MemberId&, const std::string& url);
+
+ void dumpDone();
+ void dumpError(const std::exception&);
+
+ public:
+ sys::Thread dumpThread;
+};
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_MEMBERHANDLER_H*/
+
diff --git a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
index 3212d34775..4ff0a88b11 100644
--- a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
+++ b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
@@ -88,7 +88,6 @@ void OutputInterceptor::startDoOutput() {
// Send a doOutput request if one is not already in flight.
void OutputInterceptor::sendDoOutput() {
// Call with lock held.
- // FIXME aconway 2008-08-28: used to have || parent.getClosed())
if (!parent.isLocal()) return;
doingOutput = true;
diff --git a/qpid/cpp/src/qpid/sys/PollableQueue.h b/qpid/cpp/src/qpid/sys/PollableQueue.h
index ca97c0d8c9..2c326b998f 100644
--- a/qpid/cpp/src/qpid/sys/PollableQueue.h
+++ b/qpid/cpp/src/qpid/sys/PollableQueue.h
@@ -71,6 +71,9 @@ class PollableQueue {
/** Stop polling and wait for the current callback, if any, to complete. */
void stop();
+
+ /** Are we currently stopped?*/
+ bool isStopped() const;
private:
typedef sys::Monitor::ScopedLock ScopedLock;
@@ -78,7 +81,7 @@ class PollableQueue {
void dispatch(sys::DispatchHandle&);
- sys::Monitor lock;
+ mutable sys::Monitor lock;
Callback callback;
PollableCondition condition;
sys::DispatchHandle handle;
@@ -130,6 +133,11 @@ template <class T> void PollableQueue<T>::stop() {
while (dispatching) lock.wait();
}
+template <class T> bool PollableQueue<T>::isStopped() const {
+ ScopedLock l(lock);
+ return stopped;
+}
+
}} // namespace qpid::sys
#endif /*!QPID_SYS_POLLABLEQUEUE_H*/
diff --git a/qpid/cpp/src/tests/DumpClientTest.cpp b/qpid/cpp/src/tests/DumpClientTest.cpp
index 27c4174ffe..03cf12aec6 100644
--- a/qpid/cpp/src/tests/DumpClientTest.cpp
+++ b/qpid/cpp/src/tests/DumpClientTest.cpp
@@ -61,7 +61,7 @@ QPID_AUTO_TEST_CASE(testDumpClientSharedState) {
c.connection.close();
}
Url url(Url::getIpAddressesUrl(receiver.getPort()));
- qpid::cluster::DumpClient dump(url, *donor.broker, 0);
+ qpid::cluster::DumpClient dump(url, *donor.broker, 0, 0);
dump.dump();
{
Client r(receiver.getPort());