summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/broker/Connection.cpp5
-rw-r--r--cpp/src/qpid/client/Connection.h3
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp9
-rw-r--r--cpp/src/qpid/cluster/Cluster.h2
-rw-r--r--cpp/src/qpid/cluster/ClusterHandler.h6
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp58
-rw-r--r--cpp/src/qpid/cluster/Connection.h32
-rw-r--r--cpp/src/qpid/cluster/ConnectionCodec.cpp13
-rw-r--r--cpp/src/qpid/cluster/ConnectionCodec.h2
-rw-r--r--cpp/src/qpid/cluster/DumpClient.cpp18
-rw-r--r--cpp/src/qpid/cluster/JoiningHandler.cpp37
-rw-r--r--cpp/src/qpid/cluster/JoiningHandler.h7
-rw-r--r--cpp/src/qpid/cluster/MemberHandler.cpp30
-rw-r--r--cpp/src/qpid/cluster/MemberHandler.h5
-rw-r--r--cpp/src/qpid/cluster/OutputInterceptor.cpp11
-rw-r--r--cpp/src/qpid/cluster/OutputInterceptor.h8
-rw-r--r--cpp/src/tests/cluster_test.cpp167
-rw-r--r--cpp/xml/cluster.xml4
18 files changed, 270 insertions, 147 deletions
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index dd62c8e6e8..ac4ec81cb9 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -26,6 +26,7 @@
#include "qpid/ptr_map.h"
#include "qpid/framing/AMQP_ClientProxy.h"
#include "qpid/agent/ManagementAgent.h"
+#include "qpid/framing/enum.h"
#include <boost/bind.hpp>
#include <boost/ptr_container/ptr_vector.hpp>
@@ -195,14 +196,14 @@ bool Connection::doOutput() {
ioCallback = 0;
if (mgmtClosing)
- close(403, "Closed by Management Request", 0, 0);
+ close(execution::ERROR_CODE_UNAUTHORIZED_ACCESS, "Closed by Management Request", 0, 0);
else
//then do other output as needed:
return outputTasks.doOutput();
}catch(ConnectionException& e){
close(e.code, e.getMessage(), 0, 0);
}catch(std::exception& e){
- close(541/*internal error*/, e.what(), 0, 0);
+ close(execution::ERROR_CODE_INTERNAL_ERROR, e.what(), 0, 0);
}
return false;
}
diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h
index e8ff2d8660..cf5b09b255 100644
--- a/cpp/src/qpid/client/Connection.h
+++ b/cpp/src/qpid/client/Connection.h
@@ -166,6 +166,9 @@ class Connection
void resume(Session& session);
bool isOpen() const;
+
+
+ friend class ConnectionAccess; ///<@internal
};
}} // namespace qpid::client
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 53f0ccc08c..9549527416 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -89,7 +89,7 @@ Cluster::~Cluster() {}
void Cluster::insert(const boost::intrusive_ptr<Connection>& c) {
Mutex::ScopedLock l(lock);
- connections.insert(ConnectionMap::value_type(ConnectionId(self, c.get()), c));
+ handler->insert(c);
}
void Cluster::erase(ConnectionId id) {
@@ -186,8 +186,10 @@ void Cluster::connectionEvent(const Event& e) {
e.getConnection()->deliverBuffer(buf);
else { // control
AMQFrame frame;
- while (frame.decode(buf))
+ while (frame.decode(buf)) {
+ QPID_LOG(trace, "DLVR [" << self << "]: " << frame);
e.getConnection()->received(frame);
+ }
}
}
@@ -274,6 +276,7 @@ broker::Broker& Cluster::getBroker(){ return broker; }
void Cluster::stall() {
Mutex::ScopedLock l(lock);
+ QPID_LOG(debug, self << " stalling.");
// Stop processing connection events. We still process config changes
// and cluster controls in deliver()
connectionEventQueue.stop();
@@ -357,6 +360,4 @@ void Cluster::updateMemberStats(void)
}
-
-
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index 7c4e121a9b..aa077ef63c 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -91,6 +91,8 @@ class Cluster : private Cpg::Handler, public management::Manageable
void shutdown();
broker::Broker& getBroker();
+
+ void setDumpComplete();
private:
typedef std::map<ConnectionId, boost::intrusive_ptr<cluster::Connection> > ConnectionMap;
diff --git a/cpp/src/qpid/cluster/ClusterHandler.h b/cpp/src/qpid/cluster/ClusterHandler.h
index 5da5cf5b75..95106de016 100644
--- a/cpp/src/qpid/cluster/ClusterHandler.h
+++ b/cpp/src/qpid/cluster/ClusterHandler.h
@@ -24,6 +24,7 @@
#include "Cpg.h"
#include "types.h"
+#include <boost/intrusive_ptr.hpp>
namespace qpid {
@@ -31,6 +32,7 @@ namespace framing { class AMQFrame; }
namespace cluster {
+class Connection;
class Cluster;
class Event;
@@ -44,6 +46,8 @@ class ClusterHandler
ClusterHandler(Cluster& c);
virtual ~ClusterHandler();
+ bool invoke(const MemberId&, framing::AMQFrame& f);
+
virtual void update(const MemberId&, const framing::FieldTable& members, uint64_t dumping) = 0;
virtual void dumpRequest(const MemberId&, const std::string& url) = 0;
virtual void ready(const MemberId&, const std::string& url) = 0;
@@ -54,7 +58,7 @@ class ClusterHandler
cpg_address *left, int nLeft,
cpg_address *joined, int nJoined) = 0;
- bool invoke(const MemberId&, framing::AMQFrame& f);
+ virtual void insert(const boost::intrusive_ptr<Connection>& c) = 0;
protected:
Cluster& cluster;
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 51da5bef25..b225ba3568 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -34,29 +34,31 @@ using namespace framing;
Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
const std::string& wrappedId, ConnectionId myId)
: cluster(c), self(myId), output(*this, out),
- connection(&output, cluster.getBroker(), wrappedId)
+ connection(&output, cluster.getBroker(), wrappedId), catchUp(), exCatchUp()
{}
Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
- const std::string& wrappedId, MemberId myId)
+ const std::string& wrappedId, MemberId myId, bool isCatchUp)
: cluster(c), self(myId, this), output(*this, out),
- connection(&output, cluster.getBroker(), wrappedId)
+ connection(&output, cluster.getBroker(), wrappedId),
+ catchUp(isCatchUp), exCatchUp()
{}
Connection::~Connection() {}
-bool Connection::doOutput() { return output.doOutput(); }
+bool Connection::doOutput() {
+ return output.doOutput();
+}
// Delivery of doOutput allows us to run the real connection doOutput()
// which stocks up the write buffers with data.
//
void Connection::deliverDoOutput(uint32_t requested) {
+ assert(!catchUp);
output.deliverDoOutput(requested);
}
-// Handle frames delivered from cluster.
void Connection::received(framing::AMQFrame& f) {
- QPID_LOG(trace, "DLVR [" << self << "]: " << f);
// Handle connection controls, deliver other frames to connection.
if (!framing::invoke(*this, *f.getBody()).wasHandled())
connection.received(f);
@@ -64,16 +66,28 @@ void Connection::received(framing::AMQFrame& f) {
void Connection::closed() {
try {
- // Called when the local network connection is closed. We still
- // need to process any outstanding cluster frames for this
- // connection to ensure our sessions are up-to-date. We defer
- // closing the Connection object till deliverClosed(), but replace
- // its output handler with a null handler since the network output
- // handler will be deleted.
- //
- connection.setOutputHandler(&discardHandler);
- cluster.mcastControl(ClusterConnectionDeliverCloseBody(), this);
- ++mcastSeq;
+ // Local network connection has closed. We need to keep the
+ // connection around but replace the output handler with a
+ // no-op handler as the network output handler will be
+ // deleted.
+
+ // FIXME aconway 2008-09-18: output handler reset in right place?
+ // connection.setOutputHandler(&discardHandler);
+ output.setOutputHandler(discardHandler);
+ if (catchUp) {
+ // This was a catch-up connection, may be promoted to a
+ // shadow connection.
+ catchUp = false;
+ exCatchUp = true;
+ cluster.insert(boost::intrusive_ptr<Connection>(this));
+ }
+ else {
+ // This was a local replicated connection. Multicast a deliver closed
+ // and process any outstanding frames from the cluster until
+ // self-delivery of deliver-closed.
+ cluster.mcastControl(ClusterConnectionDeliverCloseBody(), this);
+ ++mcastSeq;
+ }
}
catch (const std::exception& e) {
QPID_LOG(error, QPID_MSG("While closing connection: " << e.what()));
@@ -81,17 +95,20 @@ void Connection::closed() {
}
void Connection::deliverClose () {
+ assert(!catchUp);
connection.closed();
cluster.erase(self);
}
size_t Connection::decode(const char* buffer, size_t size) {
+ assert(!catchUp);
++mcastSeq;
cluster.mcastBuffer(buffer, size, self);
return size;
}
void Connection::deliverBuffer(Buffer& buf) {
+ assert(!catchUp);
++deliverSeq;
while (decoder.decode(buf))
received(decoder.frame);
@@ -108,10 +125,15 @@ void Connection::sessionState(const SequenceNumber& /*replayStart*/,
// FIXME aconway 2008-09-10: TODO
}
-void Connection::shadowReady(uint64_t /*memberId*/, uint64_t /*connectionId*/)
-{
+void Connection::shadowReady(uint64_t /*memberId*/, uint64_t /*connectionId*/) {
// FIXME aconway 2008-09-10: TODO
}
+void Connection::dumpComplete() {
+ // FIXME aconway 2008-09-18: use or remove.
+}
+
+bool Connection::isLocal() const { return self.first == cluster.getSelf() && self.second == this; }
+
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h
index d17dc704ed..c664427ea1 100644
--- a/cpp/src/qpid/cluster/Connection.h
+++ b/cpp/src/qpid/cluster/Connection.h
@@ -51,21 +51,21 @@ class Connection :
{
public:
/** Local connection, use this in ConnectionId */
- Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& id, MemberId);
+ Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& id, MemberId, bool catchUp);
/** Shadow connection */
Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& id, ConnectionId);
~Connection();
ConnectionId getId() const { return self; }
broker::Connection& getBrokerConnection() { return connection; }
- bool isLocal() const { return self.second == this; }
+ bool isLocal() const;
- Cluster& getCluster() { return cluster; }
+ /** True if the connection is in "catch-up" mode: building initial state */
+ bool isCatchUp() const { return catchUp; }
+ bool isExCatchUp() const { return exCatchUp; }
- // self-delivery of multicast data.
- void deliverClose();
- void deliverDoOutput(uint32_t requested);
- void deliverBuffer(framing::Buffer&);
+
+ Cluster& getCluster() { return cluster; }
// ConnectionOutputHandler methods
void close() {}
@@ -84,19 +84,27 @@ class Connection :
// ConnectionCodec methods
size_t decode(const char* buffer, size_t size);
- // ConnectionInputHandlerFactory
- sys::ConnectionInputHandler* create(sys::ConnectionOutputHandler* out, const std::string& id, bool isClient);
+ // Called by cluster to deliver a buffer from CPG.
+ void deliverBuffer(framing::Buffer&);
+
+ // ==== Used in catch-up mode to build initial state.
+ //
// State dump methods.
- virtual void sessionState(const SequenceNumber& replayStart,
+ void sessionState(const SequenceNumber& replayStart,
const SequenceSet& sentIncomplete,
const SequenceNumber& expected,
const SequenceNumber& received,
const SequenceSet& unknownCompleted, const SequenceSet& receivedIncomplete);
- virtual void shadowReady(uint64_t memberId, uint64_t connectionId);
+ void shadowReady(uint64_t memberId, uint64_t connectionId);
+
+ void dumpComplete();
private:
+
+ void deliverClose();
+ void deliverDoOutput(uint32_t requested);
void sendDoOutput();
Cluster& cluster;
@@ -108,6 +116,8 @@ class Connection :
broker::Connection connection;
framing::SequenceNumber mcastSeq;
framing::SequenceNumber deliverSeq;
+ bool catchUp;
+ bool exCatchUp;
};
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/ConnectionCodec.cpp b/cpp/src/qpid/cluster/ConnectionCodec.cpp
index 3dfd8ecc38..d95a321adf 100644
--- a/cpp/src/qpid/cluster/ConnectionCodec.cpp
+++ b/cpp/src/qpid/cluster/ConnectionCodec.cpp
@@ -32,7 +32,9 @@ namespace cluster {
sys::ConnectionCodec*
ConnectionCodec::Factory::create(framing::ProtocolVersion v, sys::OutputControl& out, const std::string& id) {
if (v == framing::ProtocolVersion(0, 10))
- return new ConnectionCodec(out, id, cluster);
+ return new ConnectionCodec(out, id, cluster, false);
+ else if (v == framing::ProtocolVersion(0x80 + 0, 0x80 + 10))
+ return new ConnectionCodec(out, id, cluster, true); // Catch-up connection
return 0;
}
@@ -42,9 +44,9 @@ ConnectionCodec::Factory::create(sys::OutputControl& out, const std::string& id)
return next->create(out, id);
}
-ConnectionCodec::ConnectionCodec(sys::OutputControl& out, const std::string& id, Cluster& cluster)
+ConnectionCodec::ConnectionCodec(sys::OutputControl& out, const std::string& id, Cluster& cluster, bool catchUp)
: codec(out, id, false),
- interceptor(new Connection(cluster, codec, id, cluster.getSelf()))
+ interceptor(new Connection(cluster, codec, id, cluster.getSelf(), catchUp))
{
std::auto_ptr<sys::ConnectionInputHandler> ih(new ProxyInputHandler(interceptor));
codec.setInputHandler(ih);
@@ -55,7 +57,10 @@ ConnectionCodec::~ConnectionCodec() {}
// ConnectionCodec functions delegate to the codecOutput
size_t ConnectionCodec::decode(const char* buffer, size_t size) {
- return interceptor->decode(buffer, size);
+ if (interceptor->isCatchUp())
+ return codec.decode(buffer, size);
+ else
+ return interceptor->decode(buffer, size);
}
size_t ConnectionCodec::encode(const char* buffer, size_t size) { return codec.encode(buffer, size); }
diff --git a/cpp/src/qpid/cluster/ConnectionCodec.h b/cpp/src/qpid/cluster/ConnectionCodec.h
index 22d752d174..a82569decd 100644
--- a/cpp/src/qpid/cluster/ConnectionCodec.h
+++ b/cpp/src/qpid/cluster/ConnectionCodec.h
@@ -56,7 +56,7 @@ class ConnectionCodec : public sys::ConnectionCodec {
sys::ConnectionCodec* create(sys::OutputControl&, const std::string& id);
};
- ConnectionCodec(sys::OutputControl& out, const std::string& id, Cluster& c);
+ ConnectionCodec(sys::OutputControl& out, const std::string& id, Cluster& c, bool catchUp);
~ConnectionCodec();
// ConnectionCodec functions.
diff --git a/cpp/src/qpid/cluster/DumpClient.cpp b/cpp/src/qpid/cluster/DumpClient.cpp
index f76a55c0d3..43c30d3b07 100644
--- a/cpp/src/qpid/cluster/DumpClient.cpp
+++ b/cpp/src/qpid/cluster/DumpClient.cpp
@@ -27,12 +27,21 @@
#include "qpid/broker/Exchange.h"
#include "qpid/broker/ExchangeRegistry.h"
#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/ClusterConnectionDumpCompleteBody.h"
#include "qpid/framing/enum.h"
+#include "qpid/framing/ProtocolVersion.h"
#include "qpid/log/Statement.h"
#include "qpid/Url.h"
#include <boost/bind.hpp>
namespace qpid {
+
+namespace client {
+struct ConnectionAccess {
+ static void setVersion(Connection& c, const framing::ProtocolVersion& v) { c.version = v; }
+};
+} // namespace client
+
namespace cluster {
using broker::Broker;
@@ -40,16 +49,18 @@ using broker::Exchange;
using broker::Queue;
using broker::QueueBinding;
using broker::Message;
+using namespace framing;
using namespace framing::message;
-
using namespace client;
+
DumpClient::DumpClient(const Url& url, Broker& b,
const boost::function<void()>& ok,
const boost::function<void(const std::exception&)>& fail)
: donor(b), done(ok), failed(fail)
{
- // FIXME aconway 2008-09-16: Identify as DumpClient connection.
+ // Special version identifies this as a catch-up connectionn.
+ client::ConnectionAccess::setVersion(connection, ProtocolVersion(0x80 , 0x80 + 10));
connection.open(url);
session = connection.newSession();
}
@@ -65,9 +76,10 @@ void DumpClient::dump() {
// Catch-up exchange is used to route messages to the proper queue without modifying routing key.
session.exchangeDeclare(arg::exchange=CATCH_UP, arg::type="fanout", arg::autoDelete=true);
donor.getQueues().eachQueue(boost::bind(&DumpClient::dumpQueue, this, _1));
+ SessionBase_0_10Access sb(session);
+ // FIXME aconway 2008-09-18: inidicate successful end-of-dump.
session.sync();
session.close();
- // FIXME aconway 2008-09-17: send dump complete indication.
connection.close();
}
diff --git a/cpp/src/qpid/cluster/JoiningHandler.cpp b/cpp/src/qpid/cluster/JoiningHandler.cpp
index 3358e3404b..c188fe438e 100644
--- a/cpp/src/qpid/cluster/JoiningHandler.cpp
+++ b/cpp/src/qpid/cluster/JoiningHandler.cpp
@@ -30,7 +30,7 @@ namespace cluster {
using namespace sys;
using namespace framing;
-JoiningHandler::JoiningHandler(Cluster& c) : ClusterHandler(c), state(START) {}
+JoiningHandler::JoiningHandler(Cluster& c) : ClusterHandler(c), state(START), catchUpConnections(0) {}
void JoiningHandler::configChange(
cpg_address *current, int nCurrent,
@@ -74,21 +74,17 @@ void JoiningHandler::dumpRequest(const MemberId& dumpee, const std::string& ) {
else { // Start a new dump
cluster.map.dumper = cluster.map.first();
if (dumpee == cluster.self) { // My turn
-
- state = DUMP_COMPLETE; // FIXME aconway 2008-09-18: bypass dump
-
- QPID_LOG(info, cluster.self << " receiving state dump from " << cluster.map.dumper);
switch (state) {
case START:
case STALLED:
assert(0); break;
case DUMP_REQUESTED:
+ QPID_LOG(info, cluster.self << " stalling for dump from " << cluster.map.dumper);
state = STALLED;
cluster.stall();
break;
- // FIXME aconway 2008-09-17: no transition to DUMP_COMPLETE state.
case DUMP_COMPLETE:
cluster.ready();
break;
@@ -102,5 +98,34 @@ void JoiningHandler::ready(const MemberId& id, const std::string& url) {
checkDumpRequest();
}
+void JoiningHandler::insert(const boost::intrusive_ptr<Connection>& c) {
+ if (c->isCatchUp()) {
+ ++catchUpConnections;
+ QPID_LOG(debug, "Received " << catchUpConnections << " catch-up connections.");
+ }
+ else if (c->isExCatchUp()) {
+ if (c->getId().getConnectionPtr() != c.get()) // become shadow connection
+ cluster.connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c));
+ QPID_LOG(debug, "Catch-up connection terminated " << catchUpConnections-1 << " remaining");
+ if (--catchUpConnections == 0)
+ dumpComplete();
+ }
+ else // Local connection, will be stalled till dump complete.
+ cluster.connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c));
+}
+
+void JoiningHandler::dumpComplete() {
+ // FIXME aconway 2008-09-18: need to detect incomplete dump.
+ //
+ if (state == STALLED) {
+ QPID_LOG(debug, "Dump complete, unstalling.");
+ cluster.ready();
+ }
+ else {
+ QPID_LOG(debug, "Dump complete, waiting for stall point.");
+ assert(state == DUMP_REQUESTED);
+ state = DUMP_COMPLETE;
+ }
+}
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/JoiningHandler.h b/cpp/src/qpid/cluster/JoiningHandler.h
index 07a48b8281..c2cdb2c504 100644
--- a/cpp/src/qpid/cluster/JoiningHandler.h
+++ b/cpp/src/qpid/cluster/JoiningHandler.h
@@ -46,9 +46,14 @@ class JoiningHandler : public ClusterHandler
void dumpRequest(const MemberId&, const std::string& url);
void ready(const MemberId&, const std::string& url);
+ void insert(const boost::intrusive_ptr<Connection>& c);
+
private:
- enum { START, DUMP_REQUESTED, STALLED, DUMP_COMPLETE } state;
void checkDumpRequest();
+ void dumpComplete();
+
+ enum { START, DUMP_REQUESTED, STALLED, DUMP_COMPLETE } state;
+ size_t catchUpConnections;
};
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/MemberHandler.cpp b/cpp/src/qpid/cluster/MemberHandler.cpp
index e82eaec458..1997ced9b0 100644
--- a/cpp/src/qpid/cluster/MemberHandler.cpp
+++ b/cpp/src/qpid/cluster/MemberHandler.cpp
@@ -23,6 +23,7 @@
#include "DumpClient.h"
#include "qpid/log/Statement.h"
#include "qpid/framing/ClusterUpdateBody.h"
+#include "qpid/framing/enum.h"
namespace qpid {
namespace cluster {
@@ -32,6 +33,10 @@ using namespace framing;
MemberHandler::MemberHandler(Cluster& c) : ClusterHandler(c) {}
+MemberHandler::~MemberHandler() {
+ if (dumpThread.id()) dumpThread.join(); // Join the last dumpthread.
+}
+
void MemberHandler::configChange(
cpg_address */*current*/, int /*nCurrent*/,
cpg_address */*left*/, int /*nLeft*/,
@@ -58,11 +63,10 @@ void MemberHandler::dumpRequest(const MemberId& dumpee, const std::string& urlSt
assert(!cluster.connectionEventQueue.isStopped()); // Not currently stalled.
cluster.stall();
- cluster.ready(); // FIXME aconway 2008-09-18: Bypass dump
- (void)urlStr;
-// dumpThread = Thread(new DumpClient(Url(urlStr), cluster.broker,
-// boost::bind(&MemberHandler::dumpDone, this),
-// boost::bind(&MemberHandler::dumpError, this, _1)));
+ if (dumpThread.id()) dumpThread.join(); // Join the last dumpthread.
+ dumpThread = Thread(new DumpClient(Url(urlStr), cluster.broker,
+ boost::bind(&MemberHandler::dumpSent, this),
+ boost::bind(&MemberHandler::dumpError, this, _1)));
}
void MemberHandler::ready(const MemberId& id, const std::string& url) {
@@ -70,14 +74,22 @@ void MemberHandler::ready(const MemberId& id, const std::string& url) {
}
-void MemberHandler::dumpDone() {
- dumpThread.join(); // Clean up.
+void MemberHandler::dumpSent() {
+ QPID_LOG(debug, "Finished sending state dump.");
+ Mutex::ScopedLock l(cluster.lock);
cluster.ready();
}
void MemberHandler::dumpError(const std::exception& e) {
- QPID_LOG(error, "Error in state dump from " << cluster.self << ": " << e.what());
- dumpDone();
+ QPID_LOG(error, "Error sending state dump from " << cluster.self << ": " << e.what());
+ dumpSent();
+}
+
+void MemberHandler::insert(const boost::intrusive_ptr<Connection>& c) {
+ if (c->isCatchUp()) // Not allowed in member mode
+ c->getBrokerConnection().close(execution::ERROR_CODE_ILLEGAL_STATE, "Not in catch-up mode.");
+ else
+ cluster.connections[c->getId()] = c;
}
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/MemberHandler.h b/cpp/src/qpid/cluster/MemberHandler.h
index 630500a740..6657ea4f53 100644
--- a/cpp/src/qpid/cluster/MemberHandler.h
+++ b/cpp/src/qpid/cluster/MemberHandler.h
@@ -35,6 +35,7 @@ class MemberHandler : public ClusterHandler
{
public:
MemberHandler(Cluster& c);
+ ~MemberHandler();
void configChange(
struct cpg_address */*members*/, int /*nMembers*/,
@@ -48,9 +49,11 @@ class MemberHandler : public ClusterHandler
void dumpRequest(const MemberId&, const std::string& url);
void ready(const MemberId&, const std::string& url);
- void dumpDone();
+ void dumpSent();
void dumpError(const std::exception&);
+ void insert(const boost::intrusive_ptr<Connection>& c);
+
public:
sys::Thread dumpThread;
};
diff --git a/cpp/src/qpid/cluster/OutputInterceptor.cpp b/cpp/src/qpid/cluster/OutputInterceptor.cpp
index 4ff0a88b11..8718154d3e 100644
--- a/cpp/src/qpid/cluster/OutputInterceptor.cpp
+++ b/cpp/src/qpid/cluster/OutputInterceptor.cpp
@@ -33,12 +33,12 @@ namespace cluster {
using namespace framing;
OutputInterceptor::OutputInterceptor(cluster::Connection& p, sys::ConnectionOutputHandler& h)
- : parent(p), next(h), sent(), moreOutput(), doingOutput()
+ : parent(p), next(&h), sent(), moreOutput(), doingOutput()
{}
void OutputInterceptor::send(framing::AMQFrame& f) {
Locker l(lock);
- next.send(f);
+ next->send(f);
sent += f.size();
}
@@ -60,7 +60,7 @@ bool OutputInterceptor::doOutput() {
//
void OutputInterceptor::deliverDoOutput(size_t requested) {
Locker l(lock);
- size_t buf = next.getBuffered();
+ size_t buf = next->getBuffered();
if (parent.isLocal())
writeEstimate.delivered(sent, buf); // Update the estimate.
@@ -101,4 +101,9 @@ void OutputInterceptor::sendDoOutput() {
QPID_LOG(trace, &parent << "Send doOutput request for " << request);
}
+void OutputInterceptor::setOutputHandler(sys::ConnectionOutputHandler& h) {
+ Locker l(lock);
+ next = &h;
+}
+
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/OutputInterceptor.h b/cpp/src/qpid/cluster/OutputInterceptor.h
index 548ec32b5b..ad9d9952bf 100644
--- a/cpp/src/qpid/cluster/OutputInterceptor.h
+++ b/cpp/src/qpid/cluster/OutputInterceptor.h
@@ -43,14 +43,16 @@ class OutputInterceptor : public sys::ConnectionOutputHandler {
// sys::ConnectionOutputHandler functions
void send(framing::AMQFrame& f);
void activateOutput();
- void close() { Locker l(lock); next.close(); }
- size_t getBuffered() const { Locker l(lock); return next.getBuffered(); }
+ void close() { Locker l(lock); next->close(); }
+ size_t getBuffered() const { Locker l(lock); return next->getBuffered(); }
// Delivery point for doOutput requests.
void deliverDoOutput(size_t requested);
// Intercept doOutput requests on Connection.
bool doOutput();
+ void setOutputHandler(sys::ConnectionOutputHandler& h);
+
cluster::Connection& parent;
private:
@@ -60,7 +62,7 @@ class OutputInterceptor : public sys::ConnectionOutputHandler {
void sendDoOutput();
mutable sys::Mutex lock;
- sys::ConnectionOutputHandler& next;
+ sys::ConnectionOutputHandler* next;
size_t sent;
WriteEstimate writeEstimate;
bool moreOutput;
diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp
index 8dec23a09b..1b44902054 100644
--- a/cpp/src/tests/cluster_test.cpp
+++ b/cpp/src/tests/cluster_test.cpp
@@ -74,10 +74,12 @@ struct ClusterFixture : public vector<uint16_t> {
string name;
std::auto_ptr<BrokerFixture> broker0;
boost::ptr_vector<ForkedBroker> forkedBrokers;
+ bool init0;
- ClusterFixture(size_t n);
+ ClusterFixture(size_t n, bool init0=true);
void add(size_t n) { for (size_t i=0; i < n; ++i) add(); }
void add();
+ void add0(bool force);
void setup();
void kill(size_t n) {
if (n) forkedBrokers[n-1].kill();
@@ -85,8 +87,9 @@ struct ClusterFixture : public vector<uint16_t> {
}
};
-ClusterFixture::ClusterFixture(size_t n) : name(Uuid(true).str()) {
+ClusterFixture::ClusterFixture(size_t n, bool init0_) : name(Uuid(true).str()), init0(init0_) {
add(n);
+ if (!init0) return; // FIXME aconway 2008-09-18: can't use local hack in this case.
// Wait for all n members to join the cluster
int retry=20; // TODO aconway 2008-07-16: nasty sleeps, clean this up.
while (retry && getGlobalCluster().size() != n) {
@@ -101,24 +104,42 @@ void ClusterFixture::add() {
os << "fork" << size();
std::string prefix = os.str();
+ if (size()) { // Not the first broker, fork.
+
+ const char* argv[] = {
+ "qpidd " __FILE__ ,
+ "--load-module=../.libs/cluster.so",
+ "--cluster-name", name.c_str(),
+ "--auth=no", "--no-data-dir",
+ "--log-prefix", prefix.c_str(),
+ };
+ size_t argc = sizeof(argv)/sizeof(argv[0]);
+
+
+ forkedBrokers.push_back(new ForkedBroker(argc, argv));
+ push_back(forkedBrokers.back().getPort());
+ }
+ else {
+ add0(init0); // First broker, run in this process.
+ }
+}
+
+void ClusterFixture::add0(bool init) {
+ if (!init) {
+ push_back(0);
+ return;
+ }
const char* argv[] = {
"qpidd " __FILE__ ,
"--load-module=../.libs/cluster.so",
"--cluster-name", name.c_str(),
- "--auth=no", "--no-data-dir",
- "--log-prefix", prefix.c_str(),
+ "--auth=no", "--no-data-dir"
};
size_t argc = sizeof(argv)/sizeof(argv[0]);
- if (size()) { // Not the first broker, fork.
- forkedBrokers.push_back(new ForkedBroker(argc, argv));
- push_back(forkedBrokers.back().getPort());
- }
- else { // First broker, run in this process.
- qpid::log::Logger::instance().setPrefix("main");
- broker0.reset(new BrokerFixture(parseOpts(argc, argv)));
- push_back(broker0->getPort());
- }
+ qpid::log::Logger::instance().setPrefix("main");
+ broker0.reset(new BrokerFixture(parseOpts(argc, argv)));
+ push_back(broker0->getPort());
}
// For debugging: op << for CPG types.
@@ -140,60 +161,6 @@ ostream& operator<<(ostream& o, const pair<T*, int>& array) {
return o;
}
-QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testCatchupSharedState, 1) {
- ClusterFixture cluster(1);
- Client c0(cluster[0], "c0");
- // Create some shared state.
- c0.session.queueDeclare("q");
- c0.session.messageTransfer(arg::content=Message("foo","q"));
- while (c0.session.queueQuery("q").getMessageCount() != 1)
- ::usleep(1000); // Wait for message to show up on broker 0.
-
- // Now join new broker, should catch up.
- cluster.add();
- c0.session.messageTransfer(arg::content=Message("bar","q"));
- c0.session.queueDeclare("p");
- c0.session.messageTransfer(arg::content=Message("poo","p"));
-
- // Verify new broker has all state.
- Message m;
- Client c1(cluster[1], "c1");
- BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
- BOOST_CHECK_EQUAL(m.getData(), "foo");
- BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
- BOOST_CHECK_EQUAL(m.getData(), "bar");
- BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), (unsigned)0);
- BOOST_CHECK(c1.subs.get(m, "p", TIME_SEC));
- BOOST_CHECK_EQUAL(m.getData(), "poo");
- BOOST_CHECK_EQUAL(c1.session.queueQuery("p").getMessageCount(), (unsigned)0);
-}
-
-QPID_AUTO_TEST_CASE(testStall) {
- ClusterFixture cluster(2);
- Client c0(cluster[0], "c0");
- Client c1(cluster[1], "c1");
-
- // Declare on all to avoid race condition.
- c0.session.queueDeclare("q");
- c1.session.queueDeclare("q");
-
- // Stall 0, verify it does not process deliverys while stalled.
- getGlobalCluster().stall();
- c1.session.messageTransfer(arg::content=Message("foo","q"));
- while (c1.session.queueQuery("q").getMessageCount() != 1)
- ::usleep(1000); // Wait for message to show up on broker 1.
- sleep(2); // FIXME aconway 2008-09-11: remove.
- // But it should not be on broker 0.
- boost::shared_ptr<broker::Queue> q0 = cluster.broker0->broker->getQueues().find("q");
- BOOST_REQUIRE(q0);
- BOOST_CHECK_EQUAL(q0->getMessageCount(), (unsigned)0);
- // Now unstall and we should get the message.
- getGlobalCluster().ready();
- Message m;
- BOOST_CHECK(c0.subs.get(m, "q", TIME_SEC));
- BOOST_CHECK_EQUAL(m.getData(), "foo");
-}
-
#if 0 // FIXME aconway 2008-09-10: finish & enable
QPID_AUTO_TEST_CASE(testDumpConsumers) {
ClusterFixture cluster(1);
@@ -226,20 +193,36 @@ QPID_AUTO_TEST_CASE(testDumpConsumers) {
#endif
-QPID_AUTO_TEST_CASE(testForkedBroker) {
- // Verify the ForkedBroker works as expected.
- const char* argv[] = { "", "--auth=no", "--no-data-dir", "--log-prefix=testForkedBroker" };
- ForkedBroker broker(sizeof(argv)/sizeof(argv[0]), argv);
- Client c(broker.getPort());
- BOOST_CHECK_EQUAL("direct", c.session.exchangeQuery("amq.direct").getType());
-}
-QPID_AUTO_TEST_CASE(testSingletonCluster) {
- // Test against a singleton cluster, verify basic operation.
+QPID_AUTO_TEST_CASE(testCatchupSharedState) {
ClusterFixture cluster(1);
- Client c(cluster[0]);
- BOOST_CHECK(c.session.queueQuery("q").getQueue().empty());
- BOOST_CHECK(c.session.exchangeQuery("ex").getNotFound());
+
+ Client c0(cluster[0], "c0");
+ // Create some shared state.
+ c0.session.queueDeclare("q");
+ c0.session.messageTransfer(arg::content=Message("foo","q"));
+ c0.session.messageTransfer(arg::content=Message("bar","q"));
+ while (c0.session.queueQuery("q").getMessageCount() != 2)
+ ::usleep(1000); // Wait for message to show up on broker 0.
+
+ // FIXME aconway 2008-09-18: close session until we catchup session state also.
+ c0.session.close();
+ c0.connection.close();
+
+ // Now join new broker, should catch up.
+ cluster.add();
+
+ // FIXME aconway 2008-09-18: when we do session state try adding
+ // further stuff from broker 0, and leaving a subscription active.
+
+ // Verify new broker has all state.
+ Message m;
+ Client c1(cluster[1], "c1");
+ BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "foo");
+ BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "bar");
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), (unsigned)0);
}
QPID_AUTO_TEST_CASE(testWiringReplication) {
@@ -326,4 +309,30 @@ QPID_AUTO_TEST_CASE(testDequeueWaitingSubscription) {
BOOST_CHECK_EQUAL(0u, c2.session.queueQuery("q").getMessageCount());
}
+QPID_AUTO_TEST_CASE(testStall) {
+ ClusterFixture cluster(2);
+ Client c0(cluster[0], "c0");
+ Client c1(cluster[1], "c1");
+
+ // Declare on all to avoid race condition.
+ c0.session.queueDeclare("q");
+ c1.session.queueDeclare("q");
+
+ // Stall 0, verify it does not process deliverys while stalled.
+ getGlobalCluster().stall();
+ c1.session.messageTransfer(arg::content=Message("foo","q"));
+ while (c1.session.queueQuery("q").getMessageCount() != 1)
+ ::usleep(1000); // Wait for message to show up on broker 1.
+ sleep(2); // FIXME aconway 2008-09-11: remove.
+ // But it should not be on broker 0.
+ boost::shared_ptr<broker::Queue> q0 = cluster.broker0->broker->getQueues().find("q");
+ BOOST_REQUIRE(q0);
+ BOOST_CHECK_EQUAL(q0->getMessageCount(), (unsigned)0);
+ // Now unstall and we should get the message.
+ getGlobalCluster().ready();
+ Message m;
+ BOOST_CHECK(c0.subs.get(m, "q", TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "foo");
+}
+
QPID_AUTO_TEST_SUITE_END()
diff --git a/cpp/xml/cluster.xml b/cpp/xml/cluster.xml
index ba4e50d21e..2acf0aea82 100644
--- a/cpp/xml/cluster.xml
+++ b/cpp/xml/cluster.xml
@@ -60,7 +60,8 @@ o<?xml version="1.0"?>
- attach sessions, create consumers, set flow with normal AMQP cokmmands.
- reset session state by sending session-state for each session.
- frames following session-state are replay frames.
- - send shadow-ready to mark end of dump.
+ - send shadow-ready to mark end of shadow dump.
+ - send dump-complete when entire dump is complete.
-->
<control name="session-state" code="0x4" label="Set session state during a brain dump.">
<!-- Target session deduced from channel number. -->
@@ -79,5 +80,6 @@ o<?xml version="1.0"?>
<field name="connection-id" type="uint64"/>
</control>
+ <control name="dump-complete" code="0x6" label="End of brain dump."/>
</class>
</amqp>