summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp16
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.cpp46
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.h5
-rw-r--r--cpp/src/qpid/cluster/UpdateDataExchange.cpp16
-rw-r--r--cpp/src/qpid/cluster/UpdateDataExchange.h8
5 files changed, 54 insertions, 37 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 0013c370a7..5720f7fcc1 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -265,7 +265,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
"Error delivering frames",
poller),
failoverExchange(new FailoverExchange(broker.GetVhostObject(), &broker)),
- updateDataExchange(new UpdateDataExchange(this)),
+ updateDataExchange(new UpdateDataExchange(*this)),
quorum(boost::bind(&Cluster::leave, this)),
decoder(boost::bind(&Cluster::deliverFrame, this, _1)),
discarding(true),
@@ -356,7 +356,7 @@ void Cluster::addLocalConnection(const boost::intrusive_ptr<Connection>& c) {
// Called in connection thread to insert an updated shadow connection.
void Cluster::addShadowConnection(const boost::intrusive_ptr<Connection>& c) {
- QPID_LOG(info, *this << " new shadow connection " << c->getId());
+ QPID_LOG(debug, *this << " new shadow connection " << c->getId());
// Safe to use connections here because we're pre-catchup, stalled
// and discarding, so deliveredFrame is not processing any
// connection events.
@@ -749,7 +749,7 @@ struct AppendQueue {
std::string Cluster::debugSnapshot() {
assertClusterSafe();
std::ostringstream msg;
- msg << "queue snapshot at " << map.getFrameSeq() << ":";
+ msg << "Member joined, frameSeq=" << map.getFrameSeq() << ", queue snapshot:";
AppendQueue append(msg);
broker.getQueues().eachQueue(append);
return msg.str();
@@ -837,7 +837,7 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, Lock& l)
checkUpdateIn(l);
}
else {
- QPID_LOG(debug,*this << " unstall, ignore update " << updater
+ QPID_LOG(info, *this << " unstall, ignore update " << updater
<< " to " << updatee);
deliverEventQueue.start(); // Not involved in update.
}
@@ -932,15 +932,15 @@ void Cluster::checkUpdateIn(Lock& l) {
// NB: don't updateMgmtMembership() here as we are not in the deliver
// thread. It will be updated on delivery of the "ready" we just mcast.
broker.setClusterUpdatee(false);
+ discarding = false; // OK to set, we're stalled for update.
+ QPID_LOG(notice, *this << " update complete, starting catch-up.");
+ QPID_LOG(debug, debugSnapshot()); // OK to call because we're stalled.
if (mAgent) {
// Update management agent now, after all update activity is complete.
updateDataExchange->updateManagementAgent(mAgent);
mAgent->suppress(false); // Enable management output.
mAgent->clusterUpdate();
}
- discarding = false; // OK to set, we're stalled for update.
- QPID_LOG(notice, *this << " update complete, starting catch-up.");
- QPID_LOG(debug, debugSnapshot()); // OK to call because we're stalled.
enableClusterSafe(); // Enable cluster-safe assertions
deliverEventQueue.start();
}
@@ -1111,7 +1111,7 @@ void Cluster::setClusterId(const Uuid& uuid, Lock&) {
mgmtObject->set_clusterID(clusterId.str());
mgmtObject->set_memberID(stream.str());
}
- QPID_LOG(debug, *this << " cluster-uuid = " << clusterId);
+ QPID_LOG(notice, *this << " cluster-uuid = " << clusterId);
}
void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) {
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp
index c52caf6aa9..6b324be4c5 100644
--- a/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -78,6 +78,10 @@ using namespace framing;
namespace arg=client::arg;
using client::SessionBase_0_10Access;
+std::ostream& operator<<(std::ostream& o, const UpdateClient& c) {
+ return o << "cluster(" << c.updaterId << " UPDATER)";
+}
+
struct ClusterConnectionProxy : public AMQP_AllProxy::ClusterConnection, public framing::FrameHandler
{
boost::shared_ptr<qpid::client::ConnectionImpl> connection;
@@ -142,7 +146,7 @@ void UpdateClient::run() {
}
void UpdateClient::update() {
- QPID_LOG(debug, updaterId << " updating state to " << updateeId
+ QPID_LOG(debug, *this << " updating state to " << updateeId
<< " at " << updateeUrl);
Broker& b = updaterBroker;
@@ -177,14 +181,14 @@ void UpdateClient::update() {
// NOTE: connection will be closed from the other end, don't close
// it here as that causes a race.
- // FIXME aconway 2010-03-15: This sleep avoids the race condition
+ // TODO aconway 2010-03-15: This sleep avoids the race condition
// described in // https://bugzilla.redhat.com/show_bug.cgi?id=568831.
// It allows the connection to fully close before destroying the
// Connection object. Remove when the bug is fixed.
//
sys::usleep(10*1000);
- QPID_LOG(debug, updaterId << " update completed to " << updateeId
+ QPID_LOG(debug, *this << " update completed to " << updateeId
<< " at " << updateeUrl << ": " << membership);
}
@@ -205,7 +209,7 @@ void UpdateClient::updateManagementSetupState()
management::ManagementAgent* agent = updaterBroker.getManagementAgent();
if (!agent) return;
- QPID_LOG(debug, updaterId << " updating management setup-state.");
+ QPID_LOG(debug, *this << " updating management setup-state.");
std::string vendor, product, instance;
agent->getName(vendor, product, instance);
ClusterConnectionProxy(session).managementSetupState(
@@ -219,19 +223,19 @@ void UpdateClient::updateManagementAgent()
if (!agent) return;
string data;
- QPID_LOG(debug, updaterId << " updating management schemas. ")
+ QPID_LOG(debug, *this << " updating management schemas. ")
agent->exportSchemas(data);
session.messageTransfer(
arg::content=client::Message(data, UpdateDataExchange::MANAGEMENT_SCHEMAS_KEY),
arg::destination=UpdateDataExchange::EXCHANGE_NAME);
- QPID_LOG(debug, updaterId << " updating management agents. ")
+ QPID_LOG(debug, *this << " updating management agents. ")
agent->exportAgents(data);
session.messageTransfer(
arg::content=client::Message(data, UpdateDataExchange::MANAGEMENT_AGENTS_KEY),
arg::destination=UpdateDataExchange::EXCHANGE_NAME);
- QPID_LOG(debug, updaterId << " updating management deleted objects. ")
+ QPID_LOG(debug, *this << " updating management deleted objects. ")
typedef management::ManagementAgent::DeletedObjectList DeletedObjectList;
DeletedObjectList deleted;
agent->exportDeletedObjects(deleted);
@@ -248,7 +252,7 @@ void UpdateClient::updateManagementAgent()
}
void UpdateClient::updateExchange(const boost::shared_ptr<Exchange>& ex) {
- QPID_LOG(debug, updaterId << " updating exchange " << ex->getName());
+ QPID_LOG(debug, *this << " updating exchange " << ex->getName());
ClusterConnectionProxy(session).exchange(encode(*ex));
}
@@ -341,13 +345,13 @@ void UpdateClient::updateQueue(client::AsyncSession& s, const boost::shared_ptr<
}
void UpdateClient::updateExclusiveQueue(const boost::shared_ptr<broker::Queue>& q) {
- QPID_LOG(debug, updaterId << " updating exclusive queue " << q->getName() << " on " << shadowSession.getId());
+ QPID_LOG(debug, *this << " updating exclusive queue " << q->getName() << " on " << shadowSession.getId());
updateQueue(shadowSession, q);
}
void UpdateClient::updateNonExclusiveQueue(const boost::shared_ptr<broker::Queue>& q) {
if (!q->hasExclusiveOwner()) {
- QPID_LOG(debug, updaterId << " updating queue " << q->getName());
+ QPID_LOG(debug, *this << " updating queue " << q->getName());
updateQueue(session, q);
}//else queue will be updated as part of session state of owning session
}
@@ -362,12 +366,12 @@ void UpdateClient::updateOutputTask(const sys::OutputTask* task) {
SemanticState::ConsumerImpl* ci = const_cast<SemanticState::ConsumerImpl*>(cci);
uint16_t channel = ci->getParent().getSession().getChannel();
ClusterConnectionProxy(shadowConnection).outputTask(channel, ci->getName());
- QPID_LOG(debug, updaterId << " updating output task " << ci->getName()
+ QPID_LOG(debug, *this << " updating output task " << ci->getName()
<< " channel=" << channel);
}
void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& updateConnection) {
- QPID_LOG(debug, updaterId << " updating connection " << *updateConnection);
+ QPID_LOG(debug, *this << " updating connection " << *updateConnection);
assert(updateConnection->getBrokerConnection());
broker::Connection& bc = *updateConnection->getBrokerConnection();
@@ -398,14 +402,14 @@ void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& upda
updateConnection->getOutput().getSendMax()
);
shadowConnection.close();
- QPID_LOG(debug, updaterId << " updated connection " << *updateConnection);
+ QPID_LOG(debug, *this << " updated connection " << *updateConnection);
}
void UpdateClient::updateSession(broker::SessionHandler& sh) {
broker::SessionState* ss = sh.getSession();
if (!ss) return; // no session.
- QPID_LOG(debug, updaterId << " updating session " << ss->getId());
+ QPID_LOG(debug, *this << " updating session " << ss->getId());
// Create a client session to update session state.
boost::shared_ptr<client::ConnectionImpl> cimpl = client::ConnectionAccess::getImpl(shadowConnection);
@@ -416,14 +420,14 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) {
// Re-create session state on remote connection.
- QPID_LOG(debug, updaterId << " updating exclusive queues.");
+ QPID_LOG(debug, *this << " updating exclusive queues.");
ss->getSessionAdapter().eachExclusiveQueue(boost::bind(&UpdateClient::updateExclusiveQueue, this, _1));
- QPID_LOG(debug, updaterId << " updating consumers.");
+ QPID_LOG(debug, *this << " updating consumers.");
ss->getSemanticState().eachConsumer(
boost::bind(&UpdateClient::updateConsumer, this, _1));
- QPID_LOG(debug, updaterId << " updating unacknowledged messages.");
+ QPID_LOG(debug, *this << " updating unacknowledged messages.");
broker::DeliveryRecords& drs = ss->getSemanticState().getUnacked();
std::for_each(drs.begin(), drs.end(),
boost::bind(&UpdateClient::updateUnacked, this, _1));
@@ -454,13 +458,13 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) {
if (inProgress) {
inProgress->getFrames().map(simpl->out);
}
- QPID_LOG(debug, updaterId << " updated session " << sh.getSession()->getId());
+ QPID_LOG(debug, *this << " updated session " << sh.getSession()->getId());
}
void UpdateClient::updateConsumer(
const broker::SemanticState::ConsumerImpl::shared_ptr& ci)
{
- QPID_LOG(debug, updaterId << " updating consumer " << ci->getName() << " on "
+ QPID_LOG(debug, *this << " updating consumer " << ci->getName() << " on "
<< shadowSession.getId());
using namespace message;
@@ -485,7 +489,7 @@ void UpdateClient::updateConsumer(
);
consumerNumbering.add(ci);
- QPID_LOG(debug, updaterId << " updated consumer " << ci->getName()
+ QPID_LOG(debug, *this << " updated consumer " << ci->getName()
<< " on " << shadowSession.getId());
}
@@ -552,7 +556,7 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater {
};
void UpdateClient::updateTxState(broker::SemanticState& s) {
- QPID_LOG(debug, updaterId << " updating TX transaction state.");
+ QPID_LOG(debug, *this << " updating TX transaction state.");
ClusterConnectionProxy proxy(shadowSession);
proxy.accumulatedAck(s.getAccumulatedAck());
broker::TxBuffer::shared_ptr txBuffer = s.getTxBuffer();
diff --git a/cpp/src/qpid/cluster/UpdateClient.h b/cpp/src/qpid/cluster/UpdateClient.h
index be09af7e81..76621cd7ba 100644
--- a/cpp/src/qpid/cluster/UpdateClient.h
+++ b/cpp/src/qpid/cluster/UpdateClient.h
@@ -30,7 +30,7 @@
#include "qpid/broker/SemanticState.h"
#include "qpid/sys/Runnable.h"
#include <boost/shared_ptr.hpp>
-
+#include <iosfwd>
namespace qpid {
@@ -114,8 +114,11 @@ class UpdateClient : public sys::Runnable {
boost::function<void()> done;
boost::function<void(const std::exception& e)> failed;
client::ConnectionSettings connectionSettings;
+
+ friend std::ostream& operator<<(std::ostream&, const UpdateClient&);
};
+
}} // namespace qpid::cluster
#endif /*!QPID_CLUSTER_UPDATECLIENT_H*/
diff --git a/cpp/src/qpid/cluster/UpdateDataExchange.cpp b/cpp/src/qpid/cluster/UpdateDataExchange.cpp
index 2f242b3024..2a079b8881 100644
--- a/cpp/src/qpid/cluster/UpdateDataExchange.cpp
+++ b/cpp/src/qpid/cluster/UpdateDataExchange.cpp
@@ -19,6 +19,7 @@
*
*/
#include "UpdateDataExchange.h"
+#include "Cluster.h"
#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/broker/Deliverable.h"
#include "qpid/broker/Message.h"
@@ -35,8 +36,13 @@ const std::string UpdateDataExchange::MANAGEMENT_AGENTS_KEY("management-agents")
const std::string UpdateDataExchange::MANAGEMENT_SCHEMAS_KEY("management-schemas");
const std::string UpdateDataExchange::MANAGEMENT_DELETED_OBJECTS_KEY("management-deleted-objects");
-UpdateDataExchange::UpdateDataExchange(management::Manageable* parent) :
- Exchange(EXCHANGE_NAME, parent)
+std::ostream& operator<<(std::ostream& o, const UpdateDataExchange& c) {
+ return o << "cluster(" << c.clusterId << " UPDATER)";
+}
+
+UpdateDataExchange::UpdateDataExchange(Cluster& cluster) :
+ Exchange(EXCHANGE_NAME, &cluster),
+ clusterId(cluster.getId())
{}
void UpdateDataExchange::route(broker::Deliverable& msg, const std::string& routingKey,
@@ -56,11 +62,11 @@ void UpdateDataExchange::updateManagementAgent(management::ManagementAgent* agen
framing::Buffer buf1(const_cast<char*>(managementAgents.data()), managementAgents.size());
agent->importAgents(buf1);
- QPID_LOG(debug, " Updated management agents.");
+ QPID_LOG(debug, *this << " updated management agents.");
framing::Buffer buf2(const_cast<char*>(managementSchemas.data()), managementSchemas.size());
agent->importSchemas(buf2);
- QPID_LOG(debug, " Updated management schemas");
+ QPID_LOG(debug, *this << " updated management schemas.");
using amqp_0_10::ListCodec;
using types::Variant;
@@ -72,7 +78,7 @@ void UpdateDataExchange::updateManagementAgent(management::ManagementAgent* agen
new management::ManagementAgent::DeletedObject(*i)));
}
agent->importDeletedObjects(objects);
- QPID_LOG(debug, " Updated management deleted objects.");
+ QPID_LOG(debug, *this << " updated management deleted objects.");
}
diff --git a/cpp/src/qpid/cluster/UpdateDataExchange.h b/cpp/src/qpid/cluster/UpdateDataExchange.h
index 27a98548f3..8c493e400a 100644
--- a/cpp/src/qpid/cluster/UpdateDataExchange.h
+++ b/cpp/src/qpid/cluster/UpdateDataExchange.h
@@ -23,6 +23,8 @@
*/
#include "qpid/broker/Exchange.h"
+#include "types.h"
+#include <iosfwd>
namespace qpid {
@@ -31,6 +33,7 @@ class ManagementAgent;
}
namespace cluster {
+class Cluster;
/**
* An exchange used to send data that is to large for a control
@@ -45,7 +48,7 @@ class UpdateDataExchange : public broker::Exchange
static const std::string MANAGEMENT_SCHEMAS_KEY;
static const std::string MANAGEMENT_DELETED_OBJECTS_KEY;
- UpdateDataExchange(management::Manageable* parent);
+ UpdateDataExchange(Cluster& parent);
void route(broker::Deliverable& msg, const std::string& routingKey,
const framing::FieldTable* args);
@@ -71,10 +74,11 @@ class UpdateDataExchange : public broker::Exchange
void updateManagementAgent(management::ManagementAgent* agent);
private:
-
+ MemberId clusterId;
std::string managementAgents;
std::string managementSchemas;
std::string managementDeletedObjects;
+ friend std::ostream& operator<<(std::ostream&, const UpdateDataExchange&);
};
}} // namespace qpid::cluster