summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/cluster
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-01-07 16:32:34 +0000
committerAlan Conway <aconway@apache.org>2011-01-07 16:32:34 +0000
commit739b0523c632f961d3c008646c1c4db8f596cd3d (patch)
tree3700cd08912976f2f7cf005c3246a60234268207 /qpid/cpp/src/qpid/cluster
parent39bf3509656dd13a41fa40a0e0831000c73fc526 (diff)
downloadqpid-python-739b0523c632f961d3c008646c1c4db8f596cd3d.tar.gz
QPID-2982: Improved cluster/management logging and automated test for log consistency.
The cluster_tests.py test_management test is augmented to compare broker logs after the test completes. Any inconsistency in the logs causes the test to fail. This check is currently disabled as it is failing due to known issues. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1056378 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/cluster')
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp16
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateClient.cpp46
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateClient.h5
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp16
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateDataExchange.h8
5 files changed, 54 insertions, 37 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index 0013c370a7..5720f7fcc1 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -265,7 +265,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
"Error delivering frames",
poller),
failoverExchange(new FailoverExchange(broker.GetVhostObject(), &broker)),
- updateDataExchange(new UpdateDataExchange(this)),
+ updateDataExchange(new UpdateDataExchange(*this)),
quorum(boost::bind(&Cluster::leave, this)),
decoder(boost::bind(&Cluster::deliverFrame, this, _1)),
discarding(true),
@@ -356,7 +356,7 @@ void Cluster::addLocalConnection(const boost::intrusive_ptr<Connection>& c) {
// Called in connection thread to insert an updated shadow connection.
void Cluster::addShadowConnection(const boost::intrusive_ptr<Connection>& c) {
- QPID_LOG(info, *this << " new shadow connection " << c->getId());
+ QPID_LOG(debug, *this << " new shadow connection " << c->getId());
// Safe to use connections here because we're pre-catchup, stalled
// and discarding, so deliveredFrame is not processing any
// connection events.
@@ -749,7 +749,7 @@ struct AppendQueue {
std::string Cluster::debugSnapshot() {
assertClusterSafe();
std::ostringstream msg;
- msg << "queue snapshot at " << map.getFrameSeq() << ":";
+ msg << "Member joined, frameSeq=" << map.getFrameSeq() << ", queue snapshot:";
AppendQueue append(msg);
broker.getQueues().eachQueue(append);
return msg.str();
@@ -837,7 +837,7 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, Lock& l)
checkUpdateIn(l);
}
else {
- QPID_LOG(debug,*this << " unstall, ignore update " << updater
+ QPID_LOG(info, *this << " unstall, ignore update " << updater
<< " to " << updatee);
deliverEventQueue.start(); // Not involved in update.
}
@@ -932,15 +932,15 @@ void Cluster::checkUpdateIn(Lock& l) {
// NB: don't updateMgmtMembership() here as we are not in the deliver
// thread. It will be updated on delivery of the "ready" we just mcast.
broker.setClusterUpdatee(false);
+ discarding = false; // OK to set, we're stalled for update.
+ QPID_LOG(notice, *this << " update complete, starting catch-up.");
+ QPID_LOG(debug, debugSnapshot()); // OK to call because we're stalled.
if (mAgent) {
// Update management agent now, after all update activity is complete.
updateDataExchange->updateManagementAgent(mAgent);
mAgent->suppress(false); // Enable management output.
mAgent->clusterUpdate();
}
- discarding = false; // OK to set, we're stalled for update.
- QPID_LOG(notice, *this << " update complete, starting catch-up.");
- QPID_LOG(debug, debugSnapshot()); // OK to call because we're stalled.
enableClusterSafe(); // Enable cluster-safe assertions
deliverEventQueue.start();
}
@@ -1111,7 +1111,7 @@ void Cluster::setClusterId(const Uuid& uuid, Lock&) {
mgmtObject->set_clusterID(clusterId.str());
mgmtObject->set_memberID(stream.str());
}
- QPID_LOG(debug, *this << " cluster-uuid = " << clusterId);
+ QPID_LOG(notice, *this << " cluster-uuid = " << clusterId);
}
void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) {
diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
index c52caf6aa9..6b324be4c5 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -78,6 +78,10 @@ using namespace framing;
namespace arg=client::arg;
using client::SessionBase_0_10Access;
+std::ostream& operator<<(std::ostream& o, const UpdateClient& c) {
+ return o << "cluster(" << c.updaterId << " UPDATER)";
+}
+
struct ClusterConnectionProxy : public AMQP_AllProxy::ClusterConnection, public framing::FrameHandler
{
boost::shared_ptr<qpid::client::ConnectionImpl> connection;
@@ -142,7 +146,7 @@ void UpdateClient::run() {
}
void UpdateClient::update() {
- QPID_LOG(debug, updaterId << " updating state to " << updateeId
+ QPID_LOG(debug, *this << " updating state to " << updateeId
<< " at " << updateeUrl);
Broker& b = updaterBroker;
@@ -177,14 +181,14 @@ void UpdateClient::update() {
// NOTE: connection will be closed from the other end, don't close
// it here as that causes a race.
- // FIXME aconway 2010-03-15: This sleep avoids the race condition
+ // TODO aconway 2010-03-15: This sleep avoids the race condition
// described in // https://bugzilla.redhat.com/show_bug.cgi?id=568831.
// It allows the connection to fully close before destroying the
// Connection object. Remove when the bug is fixed.
//
sys::usleep(10*1000);
- QPID_LOG(debug, updaterId << " update completed to " << updateeId
+ QPID_LOG(debug, *this << " update completed to " << updateeId
<< " at " << updateeUrl << ": " << membership);
}
@@ -205,7 +209,7 @@ void UpdateClient::updateManagementSetupState()
management::ManagementAgent* agent = updaterBroker.getManagementAgent();
if (!agent) return;
- QPID_LOG(debug, updaterId << " updating management setup-state.");
+ QPID_LOG(debug, *this << " updating management setup-state.");
std::string vendor, product, instance;
agent->getName(vendor, product, instance);
ClusterConnectionProxy(session).managementSetupState(
@@ -219,19 +223,19 @@ void UpdateClient::updateManagementAgent()
if (!agent) return;
string data;
- QPID_LOG(debug, updaterId << " updating management schemas. ")
+ QPID_LOG(debug, *this << " updating management schemas. ")
agent->exportSchemas(data);
session.messageTransfer(
arg::content=client::Message(data, UpdateDataExchange::MANAGEMENT_SCHEMAS_KEY),
arg::destination=UpdateDataExchange::EXCHANGE_NAME);
- QPID_LOG(debug, updaterId << " updating management agents. ")
+ QPID_LOG(debug, *this << " updating management agents. ")
agent->exportAgents(data);
session.messageTransfer(
arg::content=client::Message(data, UpdateDataExchange::MANAGEMENT_AGENTS_KEY),
arg::destination=UpdateDataExchange::EXCHANGE_NAME);
- QPID_LOG(debug, updaterId << " updating management deleted objects. ")
+ QPID_LOG(debug, *this << " updating management deleted objects. ")
typedef management::ManagementAgent::DeletedObjectList DeletedObjectList;
DeletedObjectList deleted;
agent->exportDeletedObjects(deleted);
@@ -248,7 +252,7 @@ void UpdateClient::updateManagementAgent()
}
void UpdateClient::updateExchange(const boost::shared_ptr<Exchange>& ex) {
- QPID_LOG(debug, updaterId << " updating exchange " << ex->getName());
+ QPID_LOG(debug, *this << " updating exchange " << ex->getName());
ClusterConnectionProxy(session).exchange(encode(*ex));
}
@@ -341,13 +345,13 @@ void UpdateClient::updateQueue(client::AsyncSession& s, const boost::shared_ptr<
}
void UpdateClient::updateExclusiveQueue(const boost::shared_ptr<broker::Queue>& q) {
- QPID_LOG(debug, updaterId << " updating exclusive queue " << q->getName() << " on " << shadowSession.getId());
+ QPID_LOG(debug, *this << " updating exclusive queue " << q->getName() << " on " << shadowSession.getId());
updateQueue(shadowSession, q);
}
void UpdateClient::updateNonExclusiveQueue(const boost::shared_ptr<broker::Queue>& q) {
if (!q->hasExclusiveOwner()) {
- QPID_LOG(debug, updaterId << " updating queue " << q->getName());
+ QPID_LOG(debug, *this << " updating queue " << q->getName());
updateQueue(session, q);
}//else queue will be updated as part of session state of owning session
}
@@ -362,12 +366,12 @@ void UpdateClient::updateOutputTask(const sys::OutputTask* task) {
SemanticState::ConsumerImpl* ci = const_cast<SemanticState::ConsumerImpl*>(cci);
uint16_t channel = ci->getParent().getSession().getChannel();
ClusterConnectionProxy(shadowConnection).outputTask(channel, ci->getName());
- QPID_LOG(debug, updaterId << " updating output task " << ci->getName()
+ QPID_LOG(debug, *this << " updating output task " << ci->getName()
<< " channel=" << channel);
}
void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& updateConnection) {
- QPID_LOG(debug, updaterId << " updating connection " << *updateConnection);
+ QPID_LOG(debug, *this << " updating connection " << *updateConnection);
assert(updateConnection->getBrokerConnection());
broker::Connection& bc = *updateConnection->getBrokerConnection();
@@ -398,14 +402,14 @@ void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& upda
updateConnection->getOutput().getSendMax()
);
shadowConnection.close();
- QPID_LOG(debug, updaterId << " updated connection " << *updateConnection);
+ QPID_LOG(debug, *this << " updated connection " << *updateConnection);
}
void UpdateClient::updateSession(broker::SessionHandler& sh) {
broker::SessionState* ss = sh.getSession();
if (!ss) return; // no session.
- QPID_LOG(debug, updaterId << " updating session " << ss->getId());
+ QPID_LOG(debug, *this << " updating session " << ss->getId());
// Create a client session to update session state.
boost::shared_ptr<client::ConnectionImpl> cimpl = client::ConnectionAccess::getImpl(shadowConnection);
@@ -416,14 +420,14 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) {
// Re-create session state on remote connection.
- QPID_LOG(debug, updaterId << " updating exclusive queues.");
+ QPID_LOG(debug, *this << " updating exclusive queues.");
ss->getSessionAdapter().eachExclusiveQueue(boost::bind(&UpdateClient::updateExclusiveQueue, this, _1));
- QPID_LOG(debug, updaterId << " updating consumers.");
+ QPID_LOG(debug, *this << " updating consumers.");
ss->getSemanticState().eachConsumer(
boost::bind(&UpdateClient::updateConsumer, this, _1));
- QPID_LOG(debug, updaterId << " updating unacknowledged messages.");
+ QPID_LOG(debug, *this << " updating unacknowledged messages.");
broker::DeliveryRecords& drs = ss->getSemanticState().getUnacked();
std::for_each(drs.begin(), drs.end(),
boost::bind(&UpdateClient::updateUnacked, this, _1));
@@ -454,13 +458,13 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) {
if (inProgress) {
inProgress->getFrames().map(simpl->out);
}
- QPID_LOG(debug, updaterId << " updated session " << sh.getSession()->getId());
+ QPID_LOG(debug, *this << " updated session " << sh.getSession()->getId());
}
void UpdateClient::updateConsumer(
const broker::SemanticState::ConsumerImpl::shared_ptr& ci)
{
- QPID_LOG(debug, updaterId << " updating consumer " << ci->getName() << " on "
+ QPID_LOG(debug, *this << " updating consumer " << ci->getName() << " on "
<< shadowSession.getId());
using namespace message;
@@ -485,7 +489,7 @@ void UpdateClient::updateConsumer(
);
consumerNumbering.add(ci);
- QPID_LOG(debug, updaterId << " updated consumer " << ci->getName()
+ QPID_LOG(debug, *this << " updated consumer " << ci->getName()
<< " on " << shadowSession.getId());
}
@@ -552,7 +556,7 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater {
};
void UpdateClient::updateTxState(broker::SemanticState& s) {
- QPID_LOG(debug, updaterId << " updating TX transaction state.");
+ QPID_LOG(debug, *this << " updating TX transaction state.");
ClusterConnectionProxy proxy(shadowSession);
proxy.accumulatedAck(s.getAccumulatedAck());
broker::TxBuffer::shared_ptr txBuffer = s.getTxBuffer();
diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.h b/qpid/cpp/src/qpid/cluster/UpdateClient.h
index be09af7e81..76621cd7ba 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateClient.h
+++ b/qpid/cpp/src/qpid/cluster/UpdateClient.h
@@ -30,7 +30,7 @@
#include "qpid/broker/SemanticState.h"
#include "qpid/sys/Runnable.h"
#include <boost/shared_ptr.hpp>
-
+#include <iosfwd>
namespace qpid {
@@ -114,8 +114,11 @@ class UpdateClient : public sys::Runnable {
boost::function<void()> done;
boost::function<void(const std::exception& e)> failed;
client::ConnectionSettings connectionSettings;
+
+ friend std::ostream& operator<<(std::ostream&, const UpdateClient&);
};
+
}} // namespace qpid::cluster
#endif /*!QPID_CLUSTER_UPDATECLIENT_H*/
diff --git a/qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp b/qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp
index 2f242b3024..2a079b8881 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp
+++ b/qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp
@@ -19,6 +19,7 @@
*
*/
#include "UpdateDataExchange.h"
+#include "Cluster.h"
#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/broker/Deliverable.h"
#include "qpid/broker/Message.h"
@@ -35,8 +36,13 @@ const std::string UpdateDataExchange::MANAGEMENT_AGENTS_KEY("management-agents")
const std::string UpdateDataExchange::MANAGEMENT_SCHEMAS_KEY("management-schemas");
const std::string UpdateDataExchange::MANAGEMENT_DELETED_OBJECTS_KEY("management-deleted-objects");
-UpdateDataExchange::UpdateDataExchange(management::Manageable* parent) :
- Exchange(EXCHANGE_NAME, parent)
+std::ostream& operator<<(std::ostream& o, const UpdateDataExchange& c) {
+ return o << "cluster(" << c.clusterId << " UPDATER)";
+}
+
+UpdateDataExchange::UpdateDataExchange(Cluster& cluster) :
+ Exchange(EXCHANGE_NAME, &cluster),
+ clusterId(cluster.getId())
{}
void UpdateDataExchange::route(broker::Deliverable& msg, const std::string& routingKey,
@@ -56,11 +62,11 @@ void UpdateDataExchange::updateManagementAgent(management::ManagementAgent* agen
framing::Buffer buf1(const_cast<char*>(managementAgents.data()), managementAgents.size());
agent->importAgents(buf1);
- QPID_LOG(debug, " Updated management agents.");
+ QPID_LOG(debug, *this << " updated management agents.");
framing::Buffer buf2(const_cast<char*>(managementSchemas.data()), managementSchemas.size());
agent->importSchemas(buf2);
- QPID_LOG(debug, " Updated management schemas");
+ QPID_LOG(debug, *this << " updated management schemas.");
using amqp_0_10::ListCodec;
using types::Variant;
@@ -72,7 +78,7 @@ void UpdateDataExchange::updateManagementAgent(management::ManagementAgent* agen
new management::ManagementAgent::DeletedObject(*i)));
}
agent->importDeletedObjects(objects);
- QPID_LOG(debug, " Updated management deleted objects.");
+ QPID_LOG(debug, *this << " updated management deleted objects.");
}
diff --git a/qpid/cpp/src/qpid/cluster/UpdateDataExchange.h b/qpid/cpp/src/qpid/cluster/UpdateDataExchange.h
index 27a98548f3..8c493e400a 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateDataExchange.h
+++ b/qpid/cpp/src/qpid/cluster/UpdateDataExchange.h
@@ -23,6 +23,8 @@
*/
#include "qpid/broker/Exchange.h"
+#include "types.h"
+#include <iosfwd>
namespace qpid {
@@ -31,6 +33,7 @@ class ManagementAgent;
}
namespace cluster {
+class Cluster;
/**
* An exchange used to send data that is to large for a control
@@ -45,7 +48,7 @@ class UpdateDataExchange : public broker::Exchange
static const std::string MANAGEMENT_SCHEMAS_KEY;
static const std::string MANAGEMENT_DELETED_OBJECTS_KEY;
- UpdateDataExchange(management::Manageable* parent);
+ UpdateDataExchange(Cluster& parent);
void route(broker::Deliverable& msg, const std::string& routingKey,
const framing::FieldTable* args);
@@ -71,10 +74,11 @@ class UpdateDataExchange : public broker::Exchange
void updateManagementAgent(management::ManagementAgent* agent);
private:
-
+ MemberId clusterId;
std::string managementAgents;
std::string managementSchemas;
std::string managementDeletedObjects;
+ friend std::ostream& operator<<(std::ostream&, const UpdateDataExchange&);
};
}} // namespace qpid::cluster