summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-09-25 12:30:14 +0000
committerAlan Conway <aconway@apache.org>2008-09-25 12:30:14 +0000
commitad00cfe7646e4fa519b90e6874d68e3ae012db99 (patch)
tree3100e4e171425fde69308d1e22adc035c776a730 /qpid/cpp/src
parenta22a4d1c8b113a21c5010cbf40b38bb56724fea9 (diff)
downloadqpid-python-ad00cfe7646e4fa519b90e6874d68e3ae012db99.tar.gz
Enabled management, add cluster shutdown command.
Remove dead Handler methods in Cluster. Fixed SessionException handling in broker, was throwing some SessionExceptions as "unknown exception" git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@698945 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/SessionState.cpp2
-rw-r--r--qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp9
-rw-r--r--qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h4
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.cpp34
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp119
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.h8
-rw-r--r--qpid/cpp/src/qpid/cluster/ClusterHandler.cpp13
-rw-r--r--qpid/cpp/src/qpid/cluster/ClusterHandler.h1
-rw-r--r--qpid/cpp/src/qpid/cluster/JoiningHandler.cpp10
-rw-r--r--qpid/cpp/src/qpid/cluster/MemberHandler.cpp13
-rw-r--r--qpid/cpp/src/qpid/cluster/MemberHandler.h2
11 files changed, 93 insertions, 122 deletions
diff --git a/qpid/cpp/src/qpid/SessionState.cpp b/qpid/cpp/src/qpid/SessionState.cpp
index 85f86c85c9..61707002fb 100644
--- a/qpid/cpp/src/qpid/SessionState.cpp
+++ b/qpid/cpp/src/qpid/SessionState.cpp
@@ -147,7 +147,7 @@ void SessionState::senderRecordKnownCompleted() {
void SessionState::senderConfirmed(const SessionPoint& confirmed) {
if (confirmed > sender.sendPoint)
- throw InvalidArgumentException(QPID_MSG(getId() << "Confirmed commands not yet sent."));
+ throw InvalidArgumentException(QPID_MSG(getId() << ": confirmed commands not yet sent."));
QPID_LOG(debug, getId() << ": sender confirmed point moved to " << confirmed);
ReplayList::iterator i = sender.replayList.begin();
while (i != sender.replayList.end() && sender.replayPoint.command < confirmed.command) {
diff --git a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
index c9bb57a13e..02d72ffd11 100644
--- a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
+++ b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
@@ -81,6 +81,15 @@ void SessionHandler::handleIn(AMQFrame& f) {
getInHandler()->handle(f);
}
}
+ catch(const SessionException& e) {
+ QPID_LOG(error, "Execution exception: " << e.what());
+ framing::AMQP_AllProxy::Execution execution(channel);
+ AMQMethodBody* m = f.getMethod();
+ SequenceNumber commandId;
+ if (getState()) commandId = getState()->receiverGetCurrent();
+ execution.exception(e.code, commandId, m ? m->amqpClassId() : 0, m ? m->amqpMethodId() : 0, 0, e.what(), FieldTable());
+ sendDetach();
+ }
catch(const ChannelException& e){
QPID_LOG(error, "Channel exception: " << e.what());
peer.detached(name, e.code);
diff --git a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h
index 684258bbae..53ce12b47f 100644
--- a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h
+++ b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h
@@ -43,8 +43,6 @@ class SessionHandler : public framing::AMQP_AllOperations::SessionHandler,
public framing::FrameHandler::InOutHandler
{
public:
- typedef framing::AMQP_AllProxy::Session Peer;
-
SessionHandler(framing::FrameHandler* out=0, uint16_t channel=0);
~SessionHandler();
@@ -103,7 +101,7 @@ class SessionHandler : public framing::AMQP_AllOperations::SessionHandler,
void checkName(const std::string& name);
framing::ChannelHandler channel;
- Peer peer;
+ framing::AMQP_AllProxy::Session peer;
bool ignoring;
bool sendReady, receiveReady;
std::string name;
diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp
index 6358bd2145..18eb1b8466 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionState.cpp
@@ -212,31 +212,15 @@ void SessionState::enqueued(boost::intrusive_ptr<Message> msg)
void SessionState::handleIn(AMQFrame& frame) {
SequenceNumber commandId = receiverGetCurrent();
- try {
- //TODO: make command handling more uniform, regardless of whether
- //commands carry content.
- AMQMethodBody* m = frame.getMethod();
- if (m == 0 || m->isContentBearing()) {
- handleContent(frame, commandId);
- } else if (frame.getBof() && frame.getEof()) {
- handleCommand(frame.getMethod(), commandId);
- } else {
- throw InternalErrorException("Cannot handle multi-frame command segments yet");
- }
- } catch(const SessionException& e) {
- //TODO: better implementation of new exception handling mechanism
-
- //0-10 final changes the types of exceptions, 'model layer'
- //exceptions will all be session exceptions regardless of
- //current channel/connection classification
-
- AMQMethodBody* m = frame.getMethod();
- if (m) {
- getProxy().getExecution().exception(e.code, commandId, m->amqpClassId(), m->amqpMethodId(), 0, e.what(), FieldTable());
- } else {
- getProxy().getExecution().exception(e.code, commandId, 0, 0, 0, e.what(), FieldTable());
- }
- handler->sendDetach();
+ //TODO: make command handling more uniform, regardless of whether
+ //commands carry content.
+ AMQMethodBody* m = frame.getMethod();
+ if (m == 0 || m->isContentBearing()) {
+ handleContent(frame, commandId);
+ } else if (frame.getBof() && frame.getEof()) {
+ handleCommand(frame.getMethod(), commandId);
+ } else {
+ throw InternalErrorException("Cannot handle multi-frame command segments yet");
}
}
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index 93625af948..7edf9f9392 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -26,6 +26,7 @@
#include "qpid/framing/ClusterDumpRequestBody.h"
#include "qpid/framing/ClusterUpdateBody.h"
#include "qpid/framing/ClusterReadyBody.h"
+#include "qpid/framing/ClusterShutdownBody.h"
#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
#include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
#include "qpid/log/Statement.h"
@@ -51,7 +52,7 @@ using qpid::management::ManagementAgent;
using qpid::management::ManagementObject;
using qpid::management::Manageable;
using qpid::management::Args;
-namespace _qmf = qmf::org::apache::qpid::cluster;
+namespace qmf = qmf::org::apache::qpid::cluster;
Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
broker(b),
@@ -66,6 +67,7 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
boost::bind(&Cluster::disconnect, this, _1) // disconnect
),
connectionEventQueue(EventQueue::forEach(boost::bind(&Cluster::connectionEvent, this, _1))),
+ mgmtObject(0),
handler(&joiningHandler),
joiningHandler(*this),
memberHandler(*this),
@@ -73,30 +75,25 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
{
ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
if (agent != 0){
- _qmf::Package packageInit(agent);
- mgmtObject = new _qmf::Cluster (agent, this, &broker,name.str(),url.str());
+ qmf::Package packageInit(agent);
+ mgmtObject = new qmf::Cluster (agent, this, &broker,name.str(),url.str());
agent->addObject (mgmtObject);
mgmtObject->set_status("JOINING");
-
+
+ // FIXME aconway 2008-09-24:
// if first cluster up set new UUID to set_clusterID() else set UUID of cluster being joined.
}
QPID_LOG(notice, self << " joining cluster " << name.str());
- broker.addFinalizer(boost::bind(&Cluster::shutdown, this));
+ broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this));
cpgDispatchHandle.startWatch(poller);
cpg.join(name);
}
Cluster::~Cluster() {}
-void Cluster::insert(const boost::intrusive_ptr<Connection>& c) {
- Mutex::ScopedLock l(lock);
- handler->insert(c);
-}
+void Cluster::insert(const boost::intrusive_ptr<Connection>& c) { handler->insert(c); }
-void Cluster::catchUpClosed(const boost::intrusive_ptr<Connection>& c) {
- Mutex::ScopedLock l(lock);
- handler->catchUpClosed(c);
-}
+void Cluster::catchUpClosed(const boost::intrusive_ptr<Connection>& c) { handler->catchUpClosed(c); }
void Cluster::erase(ConnectionId id) {
Mutex::ScopedLock l(lock);
@@ -239,10 +236,8 @@ void Cluster::dispatch(sys::DispatchHandle& h) {
}
void Cluster::disconnect(sys::DispatchHandle& ) {
- // FIXME aconway 2008-09-11: this should be logged as critical,
- // when we provide admin option to shut down cluster and let
- // members leave cleanly.
- stopClusterNode();
+ QPID_LOG(critical, self << " unexpectedly disconnected from cluster, shutting down");
+ broker.shutdown();
}
void Cluster::configChange(
@@ -265,27 +260,8 @@ void Cluster::configChange(
map.left(left, nLeft);
handler->configChange(current, nCurrent, left, nLeft, joined, nJoined);
-
- // FIXME aconway 2008-09-17: management update.
- //update mgnt stats
- updateMemberStats();
}
-void Cluster::update(const MemberId& id, const framing::FieldTable& members, uint64_t dumper) {
- Mutex::ScopedLock l(lock);
- handler->update(id, members, dumper);
-}
-
-void Cluster::dumpRequest(const MemberId& dumpee, const string& urlStr) {
- Mutex::ScopedLock l(lock);
- handler->dumpRequest(dumpee, urlStr);
-}
-
-void Cluster::ready(const MemberId& member, const std::string& url) {
- Mutex::ScopedLock l(lock);
- handler->ready(member, url);
- // FIXME aconway 2008-09-17: management update.
-}
broker::Broker& Cluster::getBroker(){ return broker; }
@@ -295,12 +271,11 @@ void Cluster::stall() {
// Stop processing connection events. We still process config changes
// and cluster controls in deliver()
connectionEventQueue.stop();
+ if (mgmtObject!=0) mgmtObject->set_status("STALLED");
// FIXME aconway 2008-09-11: Flow control, we should slow down or
// stop reading from local connections while stalled to avoid an
// unbounded queue.
- // if (mgmtObject!=0)
- // mgmtObject->set_status("STALLED");
}
void Cluster::ready() {
@@ -314,8 +289,7 @@ void Cluster::unstall() {
QPID_LOG(debug, self << " un-stalling");
handler = &memberHandler; // Member mode.
connectionEventQueue.start(poller);
- // if (mgmtObject!=0)
- // mgmtObject->set_status("ACTIVE");
+ if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
}
// Called from Broker::~Broker when broker is shut down. At this
@@ -323,61 +297,46 @@ void Cluster::unstall() {
// invoked. We must ensure that CPG has also shut down so no CPG
// callbacks will be invoked.
//
-void Cluster::shutdown() {
+void Cluster::brokerShutdown() {
QPID_LOG(notice, self << " shutting down.");
try { cpg.shutdown(); }
catch (const std::exception& e) { QPID_LOG(error, "During CPG shutdown: " << e.what()); }
delete this;
}
-ManagementObject* Cluster::GetManagementObject(void) const {
- return (ManagementObject*) mgmtObject;
-}
+ManagementObject* Cluster::GetManagementObject(void) const { return mgmtObject; }
-Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args& /*args*/, string&) {
- Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
+Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args&, string&) {
QPID_LOG (debug, "Queue::ManagementMethod [id=" << methodId << "]");
-
- switch (methodId)
- {
- case _qmf::Cluster::METHOD_STOPCLUSTERNODE:
- stopClusterNode();
- break;
- case _qmf::Cluster::METHOD_STOPFULLCLUSTER:
- stopFullCluster();
- break;
+ switch (methodId) {
+ case qmf::Cluster::METHOD_STOPCLUSTERNODE: stopClusterNode(); break;
+ case qmf::Cluster::METHOD_STOPFULLCLUSTER: stopFullCluster(); break;
+ default: return Manageable::STATUS_UNKNOWN_METHOD;
}
-
- return status;
+ return Manageable::STATUS_OK;
}
-void Cluster::stopClusterNode(void)
-{
- // FIXME aconway 2008-09-18: mgmt
- QPID_LOG(notice, self << " disconnected from cluster " << name.str());
- broker.shutdown();
+void Cluster::stopClusterNode(void) {
+ QPID_LOG(notice, self << " stopped by admin");
+ leave();
}
-void Cluster::stopFullCluster(void)
-{
- // FIXME aconway 2008-09-17: TODO
+void Cluster::stopFullCluster(void) {
+ QPID_LOG(notice, self << " sending shutdown to cluster.");
+ mcastControl(ClusterShutdownBody(), 0);
}
-void Cluster::updateMemberStats(void)
-{
- //update mgnt stats
- // FIXME aconway 2008-09-18:
-// if (mgmtObject!=0){
-// mgmtObject->set_clusterSize(size());
-// std::vector<Url> vectUrl = getUrls();
-// string urlstr;
-// for(std::vector<Url>::iterator iter = vectUrl.begin(); iter != vectUrl.end(); iter++ ) {
-// if (iter != vectUrl.begin()) urlstr += ";";
-// urlstr += iter->str();
-// }
-// mgmtObject->set_members(urlstr);
-// }
-
+void Cluster::updateMemberStats(void) {
+ if (mgmtObject) {
+ mgmtObject->set_clusterSize(size());
+ std::vector<Url> vectUrl = getUrls();
+ string urlstr;
+ for(std::vector<Url>::iterator iter = vectUrl.begin(); iter != vectUrl.end(); iter++ ) {
+ if (iter != vectUrl.begin()) urlstr += "\n";
+ urlstr += iter->str();
+ }
+ mgmtObject->set_members(urlstr);
+ }
}
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h
index 55358e25db..ec43c56c69 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.h
+++ b/qpid/cpp/src/qpid/cluster/Cluster.h
@@ -83,11 +83,6 @@ class Cluster : private Cpg::Handler, public management::Manageable
/** Leave the cluster */
void leave();
- // Cluster controls.
- void update(const MemberId&, const framing::FieldTable& members, uint64_t dumping);
- void dumpRequest(const MemberId&, const std::string& url);
- void ready(const MemberId&, const std::string& url);
-
MemberId getSelf() const { return self; }
MemberId getId() const { return self; }
@@ -95,7 +90,7 @@ class Cluster : private Cpg::Handler, public management::Manageable
void stall();
void unstall();
- void shutdown();
+ void brokerShutdown();
broker::Broker& getBroker();
@@ -172,6 +167,7 @@ class Cluster : private Cpg::Handler, public management::Manageable
size_t mcastId;
+ friend class ClusterHandler;
friend class JoiningHandler;
friend class MemberHandler;
};
diff --git a/qpid/cpp/src/qpid/cluster/ClusterHandler.cpp b/qpid/cpp/src/qpid/cluster/ClusterHandler.cpp
index 648d40470c..7413f27192 100644
--- a/qpid/cpp/src/qpid/cluster/ClusterHandler.cpp
+++ b/qpid/cpp/src/qpid/cluster/ClusterHandler.cpp
@@ -19,11 +19,13 @@
*
*/
-#include "qpid/framing/AllInvoker.h"
-
+#include "Cluster.h"
#include "ClusterHandler.h"
+
#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/AllInvoker.h"
#include "qpid/framing/FieldTable.h"
+#include "qpid/log/Statement.h"
@@ -38,6 +40,7 @@ struct Operations : public framing::AMQP_AllOperations::ClusterHandler {
void update(const framing::FieldTable& members, uint64_t dumping) { handler.update(member, members, dumping); }
void dumpRequest(const std::string& url) { handler.dumpRequest(member, url); }
void ready(const std::string& url) { handler.ready(member, url); }
+ void shutdown() { handler.shutdown(member); }
};
ClusterHandler::~ClusterHandler() {}
@@ -49,5 +52,11 @@ bool ClusterHandler::invoke(const MemberId& id, framing::AMQFrame& frame) {
return framing::invoke(ops, *frame.getBody()).wasHandled();
}
+void ClusterHandler::shutdown(const MemberId& id) {
+ QPID_LOG(notice, cluster.self << " received shutdown from " << id);
+ cluster.leave();
+}
+
+
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/ClusterHandler.h b/qpid/cpp/src/qpid/cluster/ClusterHandler.h
index ee25b8522b..37b0ed4d79 100644
--- a/qpid/cpp/src/qpid/cluster/ClusterHandler.h
+++ b/qpid/cpp/src/qpid/cluster/ClusterHandler.h
@@ -51,6 +51,7 @@ class ClusterHandler
virtual void update(const MemberId&, const framing::FieldTable& members, uint64_t dumping) = 0;
virtual void dumpRequest(const MemberId&, const std::string& url) = 0;
virtual void ready(const MemberId&, const std::string& url) = 0;
+ virtual void shutdown(const MemberId&);
virtual void deliver(Event& e) = 0; // Deliver a connection event.
diff --git a/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp b/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp
index 75f6651b0a..664a8b38cd 100644
--- a/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp
+++ b/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp
@@ -37,6 +37,7 @@ void JoiningHandler::configChange(
cpg_address */*left*/, int nLeft,
cpg_address */*joined*/, int /*nJoined*/)
{
+ // FIXME aconway 2008-09-24: Called with lock held - volatile
if (nLeft == 0 && nCurrent == 1 && *current == cluster.self) { // First in cluster.
QPID_LOG(notice, cluster.self << " first in cluster.");
cluster.map.ready(cluster.self, cluster.url);
@@ -53,9 +54,11 @@ void JoiningHandler::deliver(Event& e) {
}
void JoiningHandler::update(const MemberId&, const framing::FieldTable& members, uint64_t dumper) {
+ Mutex::ScopedLock l(cluster.lock);
cluster.map.update(members, dumper);
QPID_LOG(debug, "Cluster update: " << cluster.map);
checkDumpRequest();
+ cluster.updateMemberStats();
}
void JoiningHandler::checkDumpRequest() {
@@ -67,6 +70,7 @@ void JoiningHandler::checkDumpRequest() {
}
void JoiningHandler::dumpRequest(const MemberId& dumpee, const std::string& ) {
+ Mutex::ScopedLock l(cluster.lock);
if (cluster.map.dumper) { // Already a dump in progress.
if (dumpee == cluster.self && state == DUMP_REQUESTED)
state = START; // Need to make another request.
@@ -96,11 +100,13 @@ void JoiningHandler::dumpRequest(const MemberId& dumpee, const std::string& ) {
}
void JoiningHandler::ready(const MemberId& id, const std::string& url) {
+ Mutex::ScopedLock l(cluster.lock);
cluster.map.ready(id, Url(url));
checkDumpRequest();
}
void JoiningHandler::insert(const boost::intrusive_ptr<Connection>& c) {
+ Mutex::ScopedLock l(cluster.lock);
if (c->isCatchUp()) {
++catchUpConnections;
QPID_LOG(debug, "Catch-up connection " << *c << " started, total " << catchUpConnections);
@@ -109,6 +115,7 @@ void JoiningHandler::insert(const boost::intrusive_ptr<Connection>& c) {
}
void JoiningHandler::catchUpClosed(const boost::intrusive_ptr<Connection>& c) {
+ Mutex::ScopedLock l(cluster.lock);
QPID_LOG(debug, "Catch-up complete for " << *c << ", remaining catch-ups: " << catchUpConnections-1);
if (c->isShadow())
cluster.connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c));
@@ -118,7 +125,7 @@ void JoiningHandler::catchUpClosed(const boost::intrusive_ptr<Connection>& c) {
void JoiningHandler::dumpComplete() {
// FIXME aconway 2008-09-18: need to detect incomplete dump.
- //
+ // Called with lock - volatile?
if (state == STALLED) {
QPID_LOG(debug, cluster.self << " received dump and stalled at start point, unstalling.");
cluster.ready();
@@ -130,4 +137,5 @@ void JoiningHandler::dumpComplete() {
}
}
+
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/MemberHandler.cpp b/qpid/cpp/src/qpid/cluster/MemberHandler.cpp
index 0f600a4995..53a1d13a4d 100644
--- a/qpid/cpp/src/qpid/cluster/MemberHandler.cpp
+++ b/qpid/cpp/src/qpid/cluster/MemberHandler.cpp
@@ -43,6 +43,7 @@ void MemberHandler::configChange(
cpg_address */*left*/, int /*nLeft*/,
cpg_address */*joined*/, int nJoined)
{
+ // FIXME aconway 2008-09-24: Called with lock held - volatile
if (nJoined && cluster.map.sendUpdate(cluster.self)) // New members need update
cluster.mcastControl(cluster.map.toControl(), 0);
}
@@ -51,9 +52,13 @@ void MemberHandler::deliver(Event& e) {
cluster.connectionEventQueue.push(e);
}
-void MemberHandler::update(const MemberId&, const framing::FieldTable& , uint64_t) {}
+void MemberHandler::update(const MemberId&, const framing::FieldTable& , uint64_t) {
+ Mutex::ScopedLock l(cluster.lock);
+ cluster.updateMemberStats();
+}
void MemberHandler::dumpRequest(const MemberId& dumpee, const std::string& urlStr) {
+ Mutex::ScopedLock l(cluster.lock);
if (cluster.map.dumper) return; // dump in progress, ignore request.
cluster.map.dumper = cluster.map.first();
@@ -76,17 +81,18 @@ void MemberHandler::ready(const MemberId& id, const std::string& url) {
void MemberHandler::dumpSent() {
- QPID_LOG(debug, "Finished sending state dump.");
Mutex::ScopedLock l(cluster.lock);
+ QPID_LOG(debug, "Finished sending state dump.");
cluster.ready();
}
void MemberHandler::dumpError(const std::exception& e) {
- QPID_LOG(error, "Error sending state dump from " << cluster.self << ": " << e.what());
+ QPID_LOG(error, cluster.self << " error sending state dump: " << e.what());
dumpSent();
}
void MemberHandler::insert(const boost::intrusive_ptr<Connection>& c) {
+ Mutex::ScopedLock l(cluster.lock);
if (c->isCatchUp()) // Not allowed in member mode
c->getBrokerConnection().close(execution::ERROR_CODE_ILLEGAL_STATE, "Not in catch-up mode.");
else
@@ -94,6 +100,7 @@ void MemberHandler::insert(const boost::intrusive_ptr<Connection>& c) {
}
void MemberHandler::catchUpClosed(const boost::intrusive_ptr<Connection>& c) {
+ Mutex::ScopedLock l(cluster.lock);
QPID_LOG(warning, "Catch-up connection " << c << " closed in member mode");
assert(0);
}
diff --git a/qpid/cpp/src/qpid/cluster/MemberHandler.h b/qpid/cpp/src/qpid/cluster/MemberHandler.h
index 9a07507a17..7655034ed8 100644
--- a/qpid/cpp/src/qpid/cluster/MemberHandler.h
+++ b/qpid/cpp/src/qpid/cluster/MemberHandler.h
@@ -48,7 +48,7 @@ class MemberHandler : public ClusterHandler
void update(const MemberId&, const framing::FieldTable& members, uint64_t dumping);
void dumpRequest(const MemberId&, const std::string& url);
void ready(const MemberId&, const std::string& url);
-
+
void dumpSent();
void dumpError(const std::exception&);