From 97ec99f115c5190be04963e2853d0315d9a75a52 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Fri, 7 Jan 2011 16:32:34 +0000 Subject: QPID-2982: Improved cluster/management logging and automated test for log consistency. The cluster_tests.py test_management test is augmented to compare broker logs after the test completes. Any inconsistency in the logs causes the test to fail. This check is currently disabled as it is failing due to known issues. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1056378 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/cluster/UpdateClient.cpp | 46 +++++++++++++++++++---------------- 1 file changed, 25 insertions(+), 21 deletions(-) (limited to 'cpp/src/qpid/cluster/UpdateClient.cpp') diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp index c52caf6aa9..6b324be4c5 100644 --- a/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -78,6 +78,10 @@ using namespace framing; namespace arg=client::arg; using client::SessionBase_0_10Access; +std::ostream& operator<<(std::ostream& o, const UpdateClient& c) { + return o << "cluster(" << c.updaterId << " UPDATER)"; +} + struct ClusterConnectionProxy : public AMQP_AllProxy::ClusterConnection, public framing::FrameHandler { boost::shared_ptr connection; @@ -142,7 +146,7 @@ void UpdateClient::run() { } void UpdateClient::update() { - QPID_LOG(debug, updaterId << " updating state to " << updateeId + QPID_LOG(debug, *this << " updating state to " << updateeId << " at " << updateeUrl); Broker& b = updaterBroker; @@ -177,14 +181,14 @@ void UpdateClient::update() { // NOTE: connection will be closed from the other end, don't close // it here as that causes a race. - // FIXME aconway 2010-03-15: This sleep avoids the race condition + // TODO aconway 2010-03-15: This sleep avoids the race condition // described in // https://bugzilla.redhat.com/show_bug.cgi?id=568831. // It allows the connection to fully close before destroying the // Connection object. Remove when the bug is fixed. // sys::usleep(10*1000); - QPID_LOG(debug, updaterId << " update completed to " << updateeId + QPID_LOG(debug, *this << " update completed to " << updateeId << " at " << updateeUrl << ": " << membership); } @@ -205,7 +209,7 @@ void UpdateClient::updateManagementSetupState() management::ManagementAgent* agent = updaterBroker.getManagementAgent(); if (!agent) return; - QPID_LOG(debug, updaterId << " updating management setup-state."); + QPID_LOG(debug, *this << " updating management setup-state."); std::string vendor, product, instance; agent->getName(vendor, product, instance); ClusterConnectionProxy(session).managementSetupState( @@ -219,19 +223,19 @@ void UpdateClient::updateManagementAgent() if (!agent) return; string data; - QPID_LOG(debug, updaterId << " updating management schemas. ") + QPID_LOG(debug, *this << " updating management schemas. ") agent->exportSchemas(data); session.messageTransfer( arg::content=client::Message(data, UpdateDataExchange::MANAGEMENT_SCHEMAS_KEY), arg::destination=UpdateDataExchange::EXCHANGE_NAME); - QPID_LOG(debug, updaterId << " updating management agents. ") + QPID_LOG(debug, *this << " updating management agents. ") agent->exportAgents(data); session.messageTransfer( arg::content=client::Message(data, UpdateDataExchange::MANAGEMENT_AGENTS_KEY), arg::destination=UpdateDataExchange::EXCHANGE_NAME); - QPID_LOG(debug, updaterId << " updating management deleted objects. ") + QPID_LOG(debug, *this << " updating management deleted objects. ") typedef management::ManagementAgent::DeletedObjectList DeletedObjectList; DeletedObjectList deleted; agent->exportDeletedObjects(deleted); @@ -248,7 +252,7 @@ void UpdateClient::updateManagementAgent() } void UpdateClient::updateExchange(const boost::shared_ptr& ex) { - QPID_LOG(debug, updaterId << " updating exchange " << ex->getName()); + QPID_LOG(debug, *this << " updating exchange " << ex->getName()); ClusterConnectionProxy(session).exchange(encode(*ex)); } @@ -341,13 +345,13 @@ void UpdateClient::updateQueue(client::AsyncSession& s, const boost::shared_ptr< } void UpdateClient::updateExclusiveQueue(const boost::shared_ptr& q) { - QPID_LOG(debug, updaterId << " updating exclusive queue " << q->getName() << " on " << shadowSession.getId()); + QPID_LOG(debug, *this << " updating exclusive queue " << q->getName() << " on " << shadowSession.getId()); updateQueue(shadowSession, q); } void UpdateClient::updateNonExclusiveQueue(const boost::shared_ptr& q) { if (!q->hasExclusiveOwner()) { - QPID_LOG(debug, updaterId << " updating queue " << q->getName()); + QPID_LOG(debug, *this << " updating queue " << q->getName()); updateQueue(session, q); }//else queue will be updated as part of session state of owning session } @@ -362,12 +366,12 @@ void UpdateClient::updateOutputTask(const sys::OutputTask* task) { SemanticState::ConsumerImpl* ci = const_cast(cci); uint16_t channel = ci->getParent().getSession().getChannel(); ClusterConnectionProxy(shadowConnection).outputTask(channel, ci->getName()); - QPID_LOG(debug, updaterId << " updating output task " << ci->getName() + QPID_LOG(debug, *this << " updating output task " << ci->getName() << " channel=" << channel); } void UpdateClient::updateConnection(const boost::intrusive_ptr& updateConnection) { - QPID_LOG(debug, updaterId << " updating connection " << *updateConnection); + QPID_LOG(debug, *this << " updating connection " << *updateConnection); assert(updateConnection->getBrokerConnection()); broker::Connection& bc = *updateConnection->getBrokerConnection(); @@ -398,14 +402,14 @@ void UpdateClient::updateConnection(const boost::intrusive_ptr& upda updateConnection->getOutput().getSendMax() ); shadowConnection.close(); - QPID_LOG(debug, updaterId << " updated connection " << *updateConnection); + QPID_LOG(debug, *this << " updated connection " << *updateConnection); } void UpdateClient::updateSession(broker::SessionHandler& sh) { broker::SessionState* ss = sh.getSession(); if (!ss) return; // no session. - QPID_LOG(debug, updaterId << " updating session " << ss->getId()); + QPID_LOG(debug, *this << " updating session " << ss->getId()); // Create a client session to update session state. boost::shared_ptr cimpl = client::ConnectionAccess::getImpl(shadowConnection); @@ -416,14 +420,14 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) { // Re-create session state on remote connection. - QPID_LOG(debug, updaterId << " updating exclusive queues."); + QPID_LOG(debug, *this << " updating exclusive queues."); ss->getSessionAdapter().eachExclusiveQueue(boost::bind(&UpdateClient::updateExclusiveQueue, this, _1)); - QPID_LOG(debug, updaterId << " updating consumers."); + QPID_LOG(debug, *this << " updating consumers."); ss->getSemanticState().eachConsumer( boost::bind(&UpdateClient::updateConsumer, this, _1)); - QPID_LOG(debug, updaterId << " updating unacknowledged messages."); + QPID_LOG(debug, *this << " updating unacknowledged messages."); broker::DeliveryRecords& drs = ss->getSemanticState().getUnacked(); std::for_each(drs.begin(), drs.end(), boost::bind(&UpdateClient::updateUnacked, this, _1)); @@ -454,13 +458,13 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) { if (inProgress) { inProgress->getFrames().map(simpl->out); } - QPID_LOG(debug, updaterId << " updated session " << sh.getSession()->getId()); + QPID_LOG(debug, *this << " updated session " << sh.getSession()->getId()); } void UpdateClient::updateConsumer( const broker::SemanticState::ConsumerImpl::shared_ptr& ci) { - QPID_LOG(debug, updaterId << " updating consumer " << ci->getName() << " on " + QPID_LOG(debug, *this << " updating consumer " << ci->getName() << " on " << shadowSession.getId()); using namespace message; @@ -485,7 +489,7 @@ void UpdateClient::updateConsumer( ); consumerNumbering.add(ci); - QPID_LOG(debug, updaterId << " updated consumer " << ci->getName() + QPID_LOG(debug, *this << " updated consumer " << ci->getName() << " on " << shadowSession.getId()); } @@ -552,7 +556,7 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater { }; void UpdateClient::updateTxState(broker::SemanticState& s) { - QPID_LOG(debug, updaterId << " updating TX transaction state."); + QPID_LOG(debug, *this << " updating TX transaction state."); ClusterConnectionProxy proxy(shadowSession); proxy.accumulatedAck(s.getAccumulatedAck()); broker::TxBuffer::shared_ptr txBuffer = s.getTxBuffer(); -- cgit v1.2.1