diff options
author | Alan Conway <aconway@apache.org> | 2011-01-07 16:32:34 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2011-01-07 16:32:34 +0000 |
commit | 97ec99f115c5190be04963e2853d0315d9a75a52 (patch) | |
tree | 23eca9f137946af8e857c44a435126dc687322cd /cpp/src | |
parent | bda33c5b69189bf645ff818d8315bb8fc3288b7a (diff) | |
download | qpid-python-97ec99f115c5190be04963e2853d0315d9a75a52.tar.gz |
QPID-2982: Improved cluster/management logging and automated test for log consistency.
The cluster_tests.py test_management test is augmented to compare
broker logs after the test completes. Any inconsistency in the logs
causes the test to fail. This check is currently disabled as it is
failing due to known issues.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1056378 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 16 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.cpp | 46 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/UpdateDataExchange.cpp | 16 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/UpdateDataExchange.h | 8 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementAgent.cpp | 184 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementObject.cpp | 1 | ||||
-rwxr-xr-x | cpp/src/tests/cluster_test_logs.py | 105 | ||||
-rwxr-xr-x | cpp/src/tests/cluster_tests.py | 21 |
9 files changed, 284 insertions, 118 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 0013c370a7..5720f7fcc1 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -265,7 +265,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : "Error delivering frames", poller), failoverExchange(new FailoverExchange(broker.GetVhostObject(), &broker)), - updateDataExchange(new UpdateDataExchange(this)), + updateDataExchange(new UpdateDataExchange(*this)), quorum(boost::bind(&Cluster::leave, this)), decoder(boost::bind(&Cluster::deliverFrame, this, _1)), discarding(true), @@ -356,7 +356,7 @@ void Cluster::addLocalConnection(const boost::intrusive_ptr<Connection>& c) { // Called in connection thread to insert an updated shadow connection. void Cluster::addShadowConnection(const boost::intrusive_ptr<Connection>& c) { - QPID_LOG(info, *this << " new shadow connection " << c->getId()); + QPID_LOG(debug, *this << " new shadow connection " << c->getId()); // Safe to use connections here because we're pre-catchup, stalled // and discarding, so deliveredFrame is not processing any // connection events. @@ -749,7 +749,7 @@ struct AppendQueue { std::string Cluster::debugSnapshot() { assertClusterSafe(); std::ostringstream msg; - msg << "queue snapshot at " << map.getFrameSeq() << ":"; + msg << "Member joined, frameSeq=" << map.getFrameSeq() << ", queue snapshot:"; AppendQueue append(msg); broker.getQueues().eachQueue(append); return msg.str(); @@ -837,7 +837,7 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, Lock& l) checkUpdateIn(l); } else { - QPID_LOG(debug,*this << " unstall, ignore update " << updater + QPID_LOG(info, *this << " unstall, ignore update " << updater << " to " << updatee); deliverEventQueue.start(); // Not involved in update. } @@ -932,15 +932,15 @@ void Cluster::checkUpdateIn(Lock& l) { // NB: don't updateMgmtMembership() here as we are not in the deliver // thread. It will be updated on delivery of the "ready" we just mcast. broker.setClusterUpdatee(false); + discarding = false; // OK to set, we're stalled for update. + QPID_LOG(notice, *this << " update complete, starting catch-up."); + QPID_LOG(debug, debugSnapshot()); // OK to call because we're stalled. if (mAgent) { // Update management agent now, after all update activity is complete. updateDataExchange->updateManagementAgent(mAgent); mAgent->suppress(false); // Enable management output. mAgent->clusterUpdate(); } - discarding = false; // OK to set, we're stalled for update. - QPID_LOG(notice, *this << " update complete, starting catch-up."); - QPID_LOG(debug, debugSnapshot()); // OK to call because we're stalled. enableClusterSafe(); // Enable cluster-safe assertions deliverEventQueue.start(); } @@ -1111,7 +1111,7 @@ void Cluster::setClusterId(const Uuid& uuid, Lock&) { mgmtObject->set_clusterID(clusterId.str()); mgmtObject->set_memberID(stream.str()); } - QPID_LOG(debug, *this << " cluster-uuid = " << clusterId); + QPID_LOG(notice, *this << " cluster-uuid = " << clusterId); } void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) { diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp index c52caf6aa9..6b324be4c5 100644 --- a/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -78,6 +78,10 @@ using namespace framing; namespace arg=client::arg; using client::SessionBase_0_10Access; +std::ostream& operator<<(std::ostream& o, const UpdateClient& c) { + return o << "cluster(" << c.updaterId << " UPDATER)"; +} + struct ClusterConnectionProxy : public AMQP_AllProxy::ClusterConnection, public framing::FrameHandler { boost::shared_ptr<qpid::client::ConnectionImpl> connection; @@ -142,7 +146,7 @@ void UpdateClient::run() { } void UpdateClient::update() { - QPID_LOG(debug, updaterId << " updating state to " << updateeId + QPID_LOG(debug, *this << " updating state to " << updateeId << " at " << updateeUrl); Broker& b = updaterBroker; @@ -177,14 +181,14 @@ void UpdateClient::update() { // NOTE: connection will be closed from the other end, don't close // it here as that causes a race. - // FIXME aconway 2010-03-15: This sleep avoids the race condition + // TODO aconway 2010-03-15: This sleep avoids the race condition // described in // https://bugzilla.redhat.com/show_bug.cgi?id=568831. // It allows the connection to fully close before destroying the // Connection object. Remove when the bug is fixed. // sys::usleep(10*1000); - QPID_LOG(debug, updaterId << " update completed to " << updateeId + QPID_LOG(debug, *this << " update completed to " << updateeId << " at " << updateeUrl << ": " << membership); } @@ -205,7 +209,7 @@ void UpdateClient::updateManagementSetupState() management::ManagementAgent* agent = updaterBroker.getManagementAgent(); if (!agent) return; - QPID_LOG(debug, updaterId << " updating management setup-state."); + QPID_LOG(debug, *this << " updating management setup-state."); std::string vendor, product, instance; agent->getName(vendor, product, instance); ClusterConnectionProxy(session).managementSetupState( @@ -219,19 +223,19 @@ void UpdateClient::updateManagementAgent() if (!agent) return; string data; - QPID_LOG(debug, updaterId << " updating management schemas. ") + QPID_LOG(debug, *this << " updating management schemas. ") agent->exportSchemas(data); session.messageTransfer( arg::content=client::Message(data, UpdateDataExchange::MANAGEMENT_SCHEMAS_KEY), arg::destination=UpdateDataExchange::EXCHANGE_NAME); - QPID_LOG(debug, updaterId << " updating management agents. ") + QPID_LOG(debug, *this << " updating management agents. ") agent->exportAgents(data); session.messageTransfer( arg::content=client::Message(data, UpdateDataExchange::MANAGEMENT_AGENTS_KEY), arg::destination=UpdateDataExchange::EXCHANGE_NAME); - QPID_LOG(debug, updaterId << " updating management deleted objects. ") + QPID_LOG(debug, *this << " updating management deleted objects. ") typedef management::ManagementAgent::DeletedObjectList DeletedObjectList; DeletedObjectList deleted; agent->exportDeletedObjects(deleted); @@ -248,7 +252,7 @@ void UpdateClient::updateManagementAgent() } void UpdateClient::updateExchange(const boost::shared_ptr<Exchange>& ex) { - QPID_LOG(debug, updaterId << " updating exchange " << ex->getName()); + QPID_LOG(debug, *this << " updating exchange " << ex->getName()); ClusterConnectionProxy(session).exchange(encode(*ex)); } @@ -341,13 +345,13 @@ void UpdateClient::updateQueue(client::AsyncSession& s, const boost::shared_ptr< } void UpdateClient::updateExclusiveQueue(const boost::shared_ptr<broker::Queue>& q) { - QPID_LOG(debug, updaterId << " updating exclusive queue " << q->getName() << " on " << shadowSession.getId()); + QPID_LOG(debug, *this << " updating exclusive queue " << q->getName() << " on " << shadowSession.getId()); updateQueue(shadowSession, q); } void UpdateClient::updateNonExclusiveQueue(const boost::shared_ptr<broker::Queue>& q) { if (!q->hasExclusiveOwner()) { - QPID_LOG(debug, updaterId << " updating queue " << q->getName()); + QPID_LOG(debug, *this << " updating queue " << q->getName()); updateQueue(session, q); }//else queue will be updated as part of session state of owning session } @@ -362,12 +366,12 @@ void UpdateClient::updateOutputTask(const sys::OutputTask* task) { SemanticState::ConsumerImpl* ci = const_cast<SemanticState::ConsumerImpl*>(cci); uint16_t channel = ci->getParent().getSession().getChannel(); ClusterConnectionProxy(shadowConnection).outputTask(channel, ci->getName()); - QPID_LOG(debug, updaterId << " updating output task " << ci->getName() + QPID_LOG(debug, *this << " updating output task " << ci->getName() << " channel=" << channel); } void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& updateConnection) { - QPID_LOG(debug, updaterId << " updating connection " << *updateConnection); + QPID_LOG(debug, *this << " updating connection " << *updateConnection); assert(updateConnection->getBrokerConnection()); broker::Connection& bc = *updateConnection->getBrokerConnection(); @@ -398,14 +402,14 @@ void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& upda updateConnection->getOutput().getSendMax() ); shadowConnection.close(); - QPID_LOG(debug, updaterId << " updated connection " << *updateConnection); + QPID_LOG(debug, *this << " updated connection " << *updateConnection); } void UpdateClient::updateSession(broker::SessionHandler& sh) { broker::SessionState* ss = sh.getSession(); if (!ss) return; // no session. - QPID_LOG(debug, updaterId << " updating session " << ss->getId()); + QPID_LOG(debug, *this << " updating session " << ss->getId()); // Create a client session to update session state. boost::shared_ptr<client::ConnectionImpl> cimpl = client::ConnectionAccess::getImpl(shadowConnection); @@ -416,14 +420,14 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) { // Re-create session state on remote connection. - QPID_LOG(debug, updaterId << " updating exclusive queues."); + QPID_LOG(debug, *this << " updating exclusive queues."); ss->getSessionAdapter().eachExclusiveQueue(boost::bind(&UpdateClient::updateExclusiveQueue, this, _1)); - QPID_LOG(debug, updaterId << " updating consumers."); + QPID_LOG(debug, *this << " updating consumers."); ss->getSemanticState().eachConsumer( boost::bind(&UpdateClient::updateConsumer, this, _1)); - QPID_LOG(debug, updaterId << " updating unacknowledged messages."); + QPID_LOG(debug, *this << " updating unacknowledged messages."); broker::DeliveryRecords& drs = ss->getSemanticState().getUnacked(); std::for_each(drs.begin(), drs.end(), boost::bind(&UpdateClient::updateUnacked, this, _1)); @@ -454,13 +458,13 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) { if (inProgress) { inProgress->getFrames().map(simpl->out); } - QPID_LOG(debug, updaterId << " updated session " << sh.getSession()->getId()); + QPID_LOG(debug, *this << " updated session " << sh.getSession()->getId()); } void UpdateClient::updateConsumer( const broker::SemanticState::ConsumerImpl::shared_ptr& ci) { - QPID_LOG(debug, updaterId << " updating consumer " << ci->getName() << " on " + QPID_LOG(debug, *this << " updating consumer " << ci->getName() << " on " << shadowSession.getId()); using namespace message; @@ -485,7 +489,7 @@ void UpdateClient::updateConsumer( ); consumerNumbering.add(ci); - QPID_LOG(debug, updaterId << " updated consumer " << ci->getName() + QPID_LOG(debug, *this << " updated consumer " << ci->getName() << " on " << shadowSession.getId()); } @@ -552,7 +556,7 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater { }; void UpdateClient::updateTxState(broker::SemanticState& s) { - QPID_LOG(debug, updaterId << " updating TX transaction state."); + QPID_LOG(debug, *this << " updating TX transaction state."); ClusterConnectionProxy proxy(shadowSession); proxy.accumulatedAck(s.getAccumulatedAck()); broker::TxBuffer::shared_ptr txBuffer = s.getTxBuffer(); diff --git a/cpp/src/qpid/cluster/UpdateClient.h b/cpp/src/qpid/cluster/UpdateClient.h index be09af7e81..76621cd7ba 100644 --- a/cpp/src/qpid/cluster/UpdateClient.h +++ b/cpp/src/qpid/cluster/UpdateClient.h @@ -30,7 +30,7 @@ #include "qpid/broker/SemanticState.h" #include "qpid/sys/Runnable.h" #include <boost/shared_ptr.hpp> - +#include <iosfwd> namespace qpid { @@ -114,8 +114,11 @@ class UpdateClient : public sys::Runnable { boost::function<void()> done; boost::function<void(const std::exception& e)> failed; client::ConnectionSettings connectionSettings; + + friend std::ostream& operator<<(std::ostream&, const UpdateClient&); }; + }} // namespace qpid::cluster #endif /*!QPID_CLUSTER_UPDATECLIENT_H*/ diff --git a/cpp/src/qpid/cluster/UpdateDataExchange.cpp b/cpp/src/qpid/cluster/UpdateDataExchange.cpp index 2f242b3024..2a079b8881 100644 --- a/cpp/src/qpid/cluster/UpdateDataExchange.cpp +++ b/cpp/src/qpid/cluster/UpdateDataExchange.cpp @@ -19,6 +19,7 @@ * */ #include "UpdateDataExchange.h" +#include "Cluster.h" #include "qpid/amqp_0_10/Codecs.h" #include "qpid/broker/Deliverable.h" #include "qpid/broker/Message.h" @@ -35,8 +36,13 @@ const std::string UpdateDataExchange::MANAGEMENT_AGENTS_KEY("management-agents") const std::string UpdateDataExchange::MANAGEMENT_SCHEMAS_KEY("management-schemas"); const std::string UpdateDataExchange::MANAGEMENT_DELETED_OBJECTS_KEY("management-deleted-objects"); -UpdateDataExchange::UpdateDataExchange(management::Manageable* parent) : - Exchange(EXCHANGE_NAME, parent) +std::ostream& operator<<(std::ostream& o, const UpdateDataExchange& c) { + return o << "cluster(" << c.clusterId << " UPDATER)"; +} + +UpdateDataExchange::UpdateDataExchange(Cluster& cluster) : + Exchange(EXCHANGE_NAME, &cluster), + clusterId(cluster.getId()) {} void UpdateDataExchange::route(broker::Deliverable& msg, const std::string& routingKey, @@ -56,11 +62,11 @@ void UpdateDataExchange::updateManagementAgent(management::ManagementAgent* agen framing::Buffer buf1(const_cast<char*>(managementAgents.data()), managementAgents.size()); agent->importAgents(buf1); - QPID_LOG(debug, " Updated management agents."); + QPID_LOG(debug, *this << " updated management agents."); framing::Buffer buf2(const_cast<char*>(managementSchemas.data()), managementSchemas.size()); agent->importSchemas(buf2); - QPID_LOG(debug, " Updated management schemas"); + QPID_LOG(debug, *this << " updated management schemas."); using amqp_0_10::ListCodec; using types::Variant; @@ -72,7 +78,7 @@ void UpdateDataExchange::updateManagementAgent(management::ManagementAgent* agen new management::ManagementAgent::DeletedObject(*i))); } agent->importDeletedObjects(objects); - QPID_LOG(debug, " Updated management deleted objects."); + QPID_LOG(debug, *this << " updated management deleted objects."); } diff --git a/cpp/src/qpid/cluster/UpdateDataExchange.h b/cpp/src/qpid/cluster/UpdateDataExchange.h index 27a98548f3..8c493e400a 100644 --- a/cpp/src/qpid/cluster/UpdateDataExchange.h +++ b/cpp/src/qpid/cluster/UpdateDataExchange.h @@ -23,6 +23,8 @@ */ #include "qpid/broker/Exchange.h" +#include "types.h" +#include <iosfwd> namespace qpid { @@ -31,6 +33,7 @@ class ManagementAgent; } namespace cluster { +class Cluster; /** * An exchange used to send data that is to large for a control @@ -45,7 +48,7 @@ class UpdateDataExchange : public broker::Exchange static const std::string MANAGEMENT_SCHEMAS_KEY; static const std::string MANAGEMENT_DELETED_OBJECTS_KEY; - UpdateDataExchange(management::Manageable* parent); + UpdateDataExchange(Cluster& parent); void route(broker::Deliverable& msg, const std::string& routingKey, const framing::FieldTable* args); @@ -71,10 +74,11 @@ class UpdateDataExchange : public broker::Exchange void updateManagementAgent(management::ManagementAgent* agent); private: - + MemberId clusterId; std::string managementAgents; std::string managementSchemas; std::string managementDeletedObjects; + friend std::ostream& operator<<(std::ostream&, const UpdateDataExchange&); }; }} // namespace qpid::cluster diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp index 07751f57ef..7b60ea35c4 100644 --- a/cpp/src/qpid/management/ManagementAgent.cpp +++ b/cpp/src/qpid/management/ManagementAgent.cpp @@ -18,7 +18,12 @@ * under the License. * */ - + + +// NOTE on use of log levels: The criteria for using trace vs. debug +// is to use trace for log messages that are generated for each +// unbatched stats/props notification and debug for everything else. + #include "qpid/management/ManagementAgent.h" #include "qpid/management/ManagementObject.h" #include "qpid/broker/DeliverableMessage.h" @@ -89,7 +94,7 @@ static Variant::Map mapEncodeSchemaId(const string& pname, ManagementAgent::RemoteAgent::~RemoteAgent () { - QPID_LOG(trace, "Remote Agent removed bank=[" << brokerBank << "." << agentBank << "]"); + QPID_LOG(debug, "Remote Agent removed bank=[" << brokerBank << "." << agentBank << "]"); if (mgmtObject != 0) { mgmtObject->resourceDestroy(); agent.deleteObjectNowLH(mgmtObject->getObjectId()); @@ -169,7 +174,7 @@ void ManagementAgent::configure(const string& _dataDir, uint16_t _interval, uuid.generate(); QPID_LOG (info, "No stored broker ID found - ManagementAgent generated broker ID: " << uuid); } else - QPID_LOG (debug, "ManagementAgent restored broker ID: " << uuid); + QPID_LOG (info, "ManagementAgent restored broker ID: " << uuid); // if sequence goes beyond a 12-bit field, skip zero and wrap to 1. bootSequence++; @@ -308,7 +313,7 @@ ObjectId ManagementAgent::addObject(ManagementObject* object, uint64_t persistId } newManagementObjects[objId] = object; } - + QPID_LOG(debug, "Management object (V1) added: " << objId.getV2Key()); return objId; } @@ -330,7 +335,6 @@ ObjectId ManagementAgent::addObject(ManagementObject* object, } object->setObjectId(objId); - { sys::Mutex::ScopedLock lock(addLock); ManagementObjectMap::iterator destIter = newManagementObjects.find(objId); @@ -340,7 +344,7 @@ ObjectId ManagementAgent::addObject(ManagementObject* object, } newManagementObjects[objId] = object; } - + QPID_LOG(debug, "Management object added: " << objId.getV2Key()); return objId; } @@ -370,7 +374,7 @@ void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severi outBuffer.reset(); sendBufferLH(outBuffer, outLen, mExchange, "console.event.1.0." + event.getPackageName() + "." + event.getEventName()); - QPID_LOG(trace, "SEND raiseEvent (v1) class=" << event.getPackageName() << "." << event.getEventName()); + QPID_LOG(debug, "SEND raiseEvent (v1) class=" << event.getPackageName() << "." << event.getEventName()); } if (qmf2Support) { @@ -408,9 +412,8 @@ void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severi list_.push_back(map_); ListCodec::encode(list_, content); sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); - QPID_LOG(trace, "SEND raiseEvent (v2) class=" << event.getPackageName() << "." << event.getEventName()); + QPID_LOG(debug, "SEND raiseEvent (v2) class=" << event.getPackageName() << "." << event.getEventName()); } - } ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds) @@ -467,7 +470,7 @@ void ManagementAgent::clientAdded (const string& routingKey) outLen = outBuffer.getPosition(); outBuffer.reset(); sendBufferLH(outBuffer, outLen, dExchange, rkeys.front()); - QPID_LOG(trace, "SEND ConsoleAddedIndication to=" << rkeys.front()); + QPID_LOG(debug, "SEND ConsoleAddedIndication to=" << rkeys.front()); rkeys.pop_front(); } } @@ -476,8 +479,10 @@ void ManagementAgent::clusterUpdate() { // Called on all cluster memebers when a new member joins a cluster. // Set clientWasAdded so that on the next periodicProcessing we will do // a full update on all cluster members. + sys::Mutex::ScopedLock l(userLock); + moveNewObjectsLH(); // to be consistent with updater/updatee. clientWasAdded = true; - QPID_LOG(debug, "cluster update " << debugSnapshot()); + QPID_LOG(debug, "Cluster member joined, " << debugSnapshot()); } void ManagementAgent::encodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq) @@ -509,7 +514,7 @@ void ManagementAgent::sendBufferLH(Buffer& buf, string routingKey) { if (suppressed) { - QPID_LOG(trace, "Suppressing management message to " << routingKey); + QPID_LOG(debug, "Suppressing management message to " << routingKey); return; } if (exchange.get() == 0) return; @@ -564,7 +569,7 @@ void ManagementAgent::sendBufferLH(const string& data, Variant::Map::const_iterator i; if (suppressed) { - QPID_LOG(trace, "Suppressing management message to " << routingKey); + QPID_LOG(debug, "Suppressing management message to " << routingKey); return; } if (exchange.get() == 0) return; @@ -637,7 +642,7 @@ void ManagementAgent::periodicProcessing (void) { #define BUFSIZE 65536 #define HEADROOM 4096 - QPID_LOG(trace, "Management agent periodic processing"); + QPID_LOG(debug, "Management agent periodic processing"); sys::Mutex::ScopedLock lock (userLock); char msgChars[BUFSIZE]; uint32_t contentSize; @@ -776,17 +781,26 @@ void ManagementAgent::periodicProcessing (void) send_stats = (object->hasInst() && (object->getInstChanged() || object->getForcePublish())); if (send_props && qmf1Support) { + size_t pos = msgBuffer.getPosition(); encodeHeader(msgBuffer, 'c'); sBuf.clear(); object->writeProperties(sBuf); msgBuffer.putRawData(sBuf); + QPID_LOG(trace, "Changed V1 properties " + << object->getObjectId().getV2Key() + << " len=" << msgBuffer.getPosition()-pos); } if (send_stats && qmf1Support) { + size_t pos = msgBuffer.getPosition(); encodeHeader(msgBuffer, 'i'); sBuf.clear(); object->writeStatistics(sBuf); msgBuffer.putRawData(sBuf); + QPID_LOG(trace, "Changed V1 statistics " + << object->getObjectId().getV2Key() + << " len=" << msgBuffer.getPosition()-pos); + } if ((send_stats || send_props) && qmf2Support) { @@ -805,6 +819,10 @@ void ManagementAgent::periodicProcessing (void) map_["_values"] = values; list_.push_back(map_); v2Objs++; + QPID_LOG(trace, "Changed V2" + << (send_stats? " statistics":"") + << (send_props? " properties":"") + << " map=" << map_); } if (send_props) pcount++; @@ -826,7 +844,10 @@ void ManagementAgent::periodicProcessing (void) key << "console.obj.1.0." << packageName << "." << className; msgBuffer.reset(); sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); // UNLOCKS USERLOCK - QPID_LOG(trace, "SEND V1 Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount << " len=" << contentSize); + QPID_LOG(debug, "SEND V1 Multicast ContentInd to=" << key.str() + << " props=" << pcount + << " stats=" << scount + << " len=" << contentSize); } } @@ -849,7 +870,10 @@ void ManagementAgent::periodicProcessing (void) headers["qmf.agent"] = name_address; sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); // UNLOCKS USERLOCK - QPID_LOG(trace, "SEND Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount << " len=" << content.length()); + QPID_LOG(debug, "SEND Multicast ContentInd to=" << key.str() + << " props=" << pcount + << " stats=" << scount + << " len=" << content.length()); } } } @@ -877,15 +901,19 @@ void ManagementAgent::periodicProcessing (void) for (DeletedObjectList::iterator lIter = mIter->second.begin(); lIter != mIter->second.end(); lIter++) { - + std::string oid = (*lIter)->objectId; if (!(*lIter)->encodedV1Config.empty()) { encodeHeader(msgBuffer, 'c'); msgBuffer.putRawData((*lIter)->encodedV1Config); + QPID_LOG(trace, "Deleting V1 properties " << oid + << " len=" << (*lIter)->encodedV1Config.size()); v1Objs++; } if (!(*lIter)->encodedV1Inst.empty()) { encodeHeader(msgBuffer, 'i'); msgBuffer.putRawData((*lIter)->encodedV1Inst); + QPID_LOG(trace, "Deleting V1 statistics " << oid + << " len=" << (*lIter)->encodedV1Inst.size()); v1Objs++; } if (v1Objs && msgBuffer.available() < HEADROOM) { @@ -895,10 +923,12 @@ void ManagementAgent::periodicProcessing (void) key << "console.obj.1.0." << packageName << "." << className; msgBuffer.reset(); sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); // UNLOCKS USERLOCK - QPID_LOG(trace, "SEND V1 Multicast ContentInd V1 (delete) to=" << key.str() << " len=" << contentSize); + QPID_LOG(debug, "SEND V1 Multicast ContentInd V1 (delete) to=" + << key.str() << " len=" << contentSize); } if (!(*lIter)->encodedV2.empty()) { + QPID_LOG(trace, "Deleting V2 " << "map=" << (*lIter)->encodedV2); list_.push_back((*lIter)->encodedV2); if (++v2Objs >= maxV2ReplyObjs) { v2Objs = 0; @@ -922,7 +952,7 @@ void ManagementAgent::periodicProcessing (void) headers["qmf.agent"] = name_address; sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); // UNLOCKS USERLOCK - QPID_LOG(trace, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length()); + QPID_LOG(debug, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length()); } } } @@ -936,7 +966,7 @@ void ManagementAgent::periodicProcessing (void) key << "console.obj.1.0." << packageName << "." << className; msgBuffer.reset(); sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); // UNLOCKS USERLOCK - QPID_LOG(trace, "SEND V1 Multicast ContentInd V1 (delete) to=" << key.str() << " len=" << contentSize); + QPID_LOG(debug, "SEND V1 Multicast ContentInd V1 (delete) to=" << key.str() << " len=" << contentSize); } if (!list_.empty()) { @@ -959,7 +989,7 @@ void ManagementAgent::periodicProcessing (void) headers["qmf.agent"] = name_address; sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); // UNLOCKS USERLOCK - QPID_LOG(trace, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length()); + QPID_LOG(debug, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length()); } } } // end map @@ -984,7 +1014,7 @@ void ManagementAgent::periodicProcessing (void) msgBuffer.reset (); routingKey = "console.heartbeat.1.0"; sendBufferLH(msgBuffer, contentSize, mExchange, routingKey); - QPID_LOG(trace, "SEND HeartbeatInd to=" << routingKey); + QPID_LOG(debug, "SEND HeartbeatInd to=" << routingKey); } if (qmf2Support) { @@ -1013,7 +1043,7 @@ void ManagementAgent::periodicProcessing (void) // time to prevent stale heartbeats from getting to the consoles. sendBufferLH(content, "", headers, "amqp/map", v2Topic, addr_key.str(), interval * 2 * 1000); - QPID_LOG(trace, "SENT AgentHeartbeat name=" << name_address); + QPID_LOG(debug, "SENT AgentHeartbeat name=" << name_address); } QPID_LOG(debug, "periodic update " << debugSnapshot()); } @@ -1073,7 +1103,7 @@ void ManagementAgent::deleteObjectNowLH(const ObjectId& oid) uint32_t contentSize = msgBuffer.getPosition(); msgBuffer.reset(); sendBufferLH(msgBuffer, contentSize, mExchange, v1key.str()); - QPID_LOG(trace, "SEND Immediate(delete) ContentInd to=" << v1key.str()); + QPID_LOG(debug, "SEND Immediate(delete) ContentInd to=" << v1key.str()); } if (qmf2Support) { @@ -1086,7 +1116,7 @@ void ManagementAgent::deleteObjectNowLH(const ObjectId& oid) string content; ListCodec::encode(list_, content); sendBufferLH(content, "", headers, "amqp/list", v2Topic, v2key.str()); - QPID_LOG(trace, "SEND Immediate(delete) ContentInd to=" << v2key.str()); + QPID_LOG(debug, "SEND Immediate(delete) ContentInd to=" << v2key.str()); } } @@ -1102,7 +1132,7 @@ void ManagementAgent::sendCommandCompleteLH(const string& replyToKey, uint32_t s outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); sendBufferLH(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(trace, "SEND CommandCompleteInd code=" << code << " text=" << text << " to=" << + QPID_LOG(debug, "SEND CommandCompleteInd code=" << code << " text=" << text << " to=" << replyToKey << " seq=" << sequence); } @@ -1127,7 +1157,7 @@ void ManagementAgent::sendExceptionLH(const string& replyToKey, const string& ci MapCodec::encode(map, content); sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyToKey); - QPID_LOG(trace, "SENT Exception code=" << code <<" text=" << text); + QPID_LOG(debug, "SENT Exception code=" << code <<" text=" << text); } bool ManagementAgent::dispatchCommand (Deliverable& deliverable, @@ -1221,7 +1251,7 @@ void ManagementAgent::handleMethodRequestLH(Buffer& inBuffer, const string& repl inBuffer.getShortString(methodName); inBuffer.getRawData(inArgs, inBuffer.available()); - QPID_LOG(trace, "RECV MethodRequest (v1) class=" << packageName << ":" << className << "(" << Uuid(hash) << ") method=" << + QPID_LOG(debug, "RECV MethodRequest (v1) class=" << packageName << ":" << className << "(" << Uuid(hash) << ") method=" << methodName << " replyTo=" << replyToKey); encodeHeader(outBuffer, 'm', sequence); @@ -1232,7 +1262,7 @@ void ManagementAgent::handleMethodRequestLH(Buffer& inBuffer, const string& repl outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); sendBufferLH(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN reason='All QMFv1 Methods Forbidden' seq=" << sequence); + QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN reason='All QMFv1 Methods Forbidden' seq=" << sequence); return; } @@ -1243,7 +1273,7 @@ void ManagementAgent::handleMethodRequestLH(Buffer& inBuffer, const string& repl outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); sendBufferLH(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << sequence); + QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << sequence); return; } @@ -1259,7 +1289,7 @@ void ManagementAgent::handleMethodRequestLH(Buffer& inBuffer, const string& repl outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); sendBufferLH(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence); + QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence); return; } } @@ -1291,7 +1321,7 @@ void ManagementAgent::handleMethodRequestLH(Buffer& inBuffer, const string& repl outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); sendBufferLH(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(trace, "SEND MethodResponse (v1) to=" << replyToKey << " seq=" << sequence); + QPID_LOG(debug, "SEND MethodResponse (v1) to=" << replyToKey << " seq=" << sequence); } @@ -1374,7 +1404,7 @@ void ManagementAgent::handleMethodRequestLH (const string& body, const string& r // invoke the method - QPID_LOG(trace, "RECV MethodRequest (v2) class=" << iter->second->getPackageName() + QPID_LOG(debug, "RECV MethodRequest (v2) class=" << iter->second->getPackageName() << ":" << iter->second->getClassName() << " method=" << methodName << " replyTo=" << replyTo << " objId=" << objId << " inArgs=" << inArgs); @@ -1402,7 +1432,7 @@ void ManagementAgent::handleMethodRequestLH (const string& body, const string& r MapCodec::encode(outMap, content); sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyTo); - QPID_LOG(trace, "SEND MethodResponse (v2) to=" << replyTo << " seq=" << cid << " map=" << outMap); + QPID_LOG(debug, "SEND MethodResponse (v2) to=" << replyTo << " seq=" << cid << " map=" << outMap); } @@ -1411,7 +1441,7 @@ void ManagementAgent::handleBrokerRequestLH (Buffer&, const string& replyToKey, Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - QPID_LOG(trace, "RECV BrokerRequest replyTo=" << replyToKey); + QPID_LOG(debug, "RECV BrokerRequest replyTo=" << replyToKey); encodeHeader (outBuffer, 'b', sequence); uuid.encode (outBuffer); @@ -1419,12 +1449,12 @@ void ManagementAgent::handleBrokerRequestLH (Buffer&, const string& replyToKey, outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); sendBufferLH(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(trace, "SEND BrokerResponse to=" << replyToKey); + QPID_LOG(debug, "SEND BrokerResponse to=" << replyToKey); } void ManagementAgent::handlePackageQueryLH (Buffer&, const string& replyToKey, uint32_t sequence) { - QPID_LOG(trace, "RECV PackageQuery replyTo=" << replyToKey); + QPID_LOG(debug, "RECV PackageQuery replyTo=" << replyToKey); Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; @@ -1440,7 +1470,7 @@ void ManagementAgent::handlePackageQueryLH (Buffer&, const string& replyToKey, u if (outLen) { outBuffer.reset (); sendBufferLH(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(trace, "SEND PackageInd to=" << replyToKey << " seq=" << sequence); + QPID_LOG(debug, "SEND PackageInd to=" << replyToKey << " seq=" << sequence); } sendCommandCompleteLH(replyToKey, sequence); @@ -1452,7 +1482,7 @@ void ManagementAgent::handlePackageIndLH (Buffer& inBuffer, const string& replyT inBuffer.getShortString(packageName); - QPID_LOG(trace, "RECV PackageInd package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence); + QPID_LOG(debug, "RECV PackageInd package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence); findOrAddPackageLH(packageName); } @@ -1463,7 +1493,7 @@ void ManagementAgent::handleClassQueryLH(Buffer& inBuffer, const string& replyTo inBuffer.getShortString(packageName); - QPID_LOG(trace, "RECV ClassQuery package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence); + QPID_LOG(debug, "RECV ClassQuery package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence); PackageMap::iterator pIter = packages.find(packageName); if (pIter != packages.end()) @@ -1489,7 +1519,7 @@ void ManagementAgent::handleClassQueryLH(Buffer& inBuffer, const string& replyTo outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); sendBufferLH(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(trace, "SEND ClassInd class=" << packageName << ":" << classes.front().first.name << + QPID_LOG(debug, "SEND ClassInd class=" << packageName << ":" << classes.front().first.name << "(" << Uuid(classes.front().first.hash) << ") to=" << replyToKey << " seq=" << sequence); classes.pop_front(); } @@ -1508,7 +1538,7 @@ void ManagementAgent::handleClassIndLH (Buffer& inBuffer, const string& replyToK inBuffer.getShortString(key.name); inBuffer.getBin128(key.hash); - QPID_LOG(trace, "RECV ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << + QPID_LOG(debug, "RECV ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << "), replyTo=" << replyToKey); PackageMap::iterator pIter = findOrAddPackageLH(packageName); @@ -1525,7 +1555,7 @@ void ManagementAgent::handleClassIndLH (Buffer& inBuffer, const string& replyToK outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); sendBufferLH(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(trace, "SEND SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << + QPID_LOG(debug, "SEND SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << "), to=" << replyToKey << " seq=" << sequence); if (cIter != pIter->second.end()) @@ -1557,7 +1587,7 @@ void ManagementAgent::handleSchemaRequestLH(Buffer& inBuffer, const string& repl inBuffer.getShortString (packageName); key.decode(inBuffer); - QPID_LOG(trace, "RECV SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << + QPID_LOG(debug, "RECV SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << "), replyTo=" << replyToKey << " seq=" << sequence); PackageMap::iterator pIter = packages.find(packageName); @@ -1575,7 +1605,7 @@ void ManagementAgent::handleSchemaRequestLH(Buffer& inBuffer, const string& repl outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); sendBufferLH(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(trace, "SEND SchemaResponse to=" << replyToKey << " seq=" << sequence); + QPID_LOG(debug, "SEND SchemaResponse to=" << replyToKey << " seq=" << sequence); } else sendCommandCompleteLH(replyToKey, sequence, 1, "Schema not available"); @@ -1598,7 +1628,7 @@ void ManagementAgent::handleSchemaResponseLH(Buffer& inBuffer, const string& /*r key.decode(inBuffer); inBuffer.restore(); - QPID_LOG(trace, "RECV SchemaResponse class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" << " seq=" << sequence); + QPID_LOG(debug, "RECV SchemaResponse class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" << " seq=" << sequence); PackageMap::iterator pIter = packages.find(packageName); if (pIter != packages.end()) { @@ -1622,7 +1652,7 @@ void ManagementAgent::handleSchemaResponseLH(Buffer& inBuffer, const string& /*r outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); sendBufferLH(outBuffer, outLen, mExchange, "schema.class"); - QPID_LOG(trace, "SEND ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" << + QPID_LOG(debug, "SEND ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" << " to=schema.class"); } } @@ -1702,7 +1732,7 @@ void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, const string& rep requestedBrokerBank = inBuffer.getLong(); requestedAgentBank = inBuffer.getLong(); - QPID_LOG(trace, "RECV (Agent)AttachRequest label=" << label << " reqBrokerBank=" << requestedBrokerBank << + QPID_LOG(debug, "RECV (Agent)AttachRequest label=" << label << " reqBrokerBank=" << requestedBrokerBank << " reqAgentBank=" << requestedAgentBank << " replyTo=" << replyToKey << " seq=" << sequence); assignedBank = assignBankLH(requestedAgentBank); @@ -1722,7 +1752,7 @@ void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, const string& rep addObject (agent->mgmtObject, 0); remoteAgents[connectionRef] = agent; - QPID_LOG(trace, "Remote Agent registered bank=[" << brokerBank << "." << assignedBank << "] replyTo=" << replyToKey); + QPID_LOG(debug, "Remote Agent registered bank=[" << brokerBank << "." << assignedBank << "] replyTo=" << replyToKey); // Send an Attach Response Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); @@ -1734,7 +1764,7 @@ void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, const string& rep outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); sendBufferLH(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(trace, "SEND AttachResponse brokerBank=" << brokerBank << " agentBank=" << assignedBank << + QPID_LOG(debug, "SEND AttachResponse brokerBank=" << brokerBank << " agentBank=" << assignedBank << " to=" << replyToKey << " seq=" << sequence); } @@ -1747,7 +1777,7 @@ void ManagementAgent::handleGetQueryLH(Buffer& inBuffer, const string& replyToKe ft.decode(inBuffer); - QPID_LOG(trace, "RECV GetQuery (v1) query=" << ft << " seq=" << sequence); + QPID_LOG(debug, "RECV GetQuery (v1) query=" << ft << " seq=" << sequence); value = ft.get("_class"); if (value.get() == 0 || !value->convertsTo<string>()) { @@ -1776,7 +1806,7 @@ void ManagementAgent::handleGetQueryLH(Buffer& inBuffer, const string& replyToKe outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); sendBufferLH(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(trace, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence); + QPID_LOG(debug, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence); } } sendCommandCompleteLH(replyToKey, sequence); @@ -1821,7 +1851,7 @@ void ManagementAgent::handleGetQueryLH(Buffer& inBuffer, const string& replyToKe outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); sendBufferLH(outBuffer, outLen, dExchange, replyToKey); // drops lock - QPID_LOG(trace, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence); + QPID_LOG(debug, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence); continue; // lock dropped, need to re-find _SAME_ objid as it may have been deleted. } encodeHeader(outBuffer, 'g', sequence); @@ -1837,7 +1867,7 @@ void ManagementAgent::handleGetQueryLH(Buffer& inBuffer, const string& replyToKe if (outLen) { outBuffer.reset (); sendBufferLH(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(trace, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence); + QPID_LOG(debug, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence); } sendCommandCompleteLH(replyToKey, sequence); @@ -1853,7 +1883,7 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& replyTo Variant::Map headers; MapCodec::decode(body, inMap); - QPID_LOG(trace, "RECV GetQuery (v2): map=" << inMap << " seq=" << cid); + QPID_LOG(debug, "RECV GetQuery (v2): map=" << inMap << " seq=" << cid); headers["method"] = "response"; headers["qmf.opcode"] = "_query_response"; @@ -1935,7 +1965,7 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& replyTo ListCodec::encode(list_, content); sendBufferLH(content, cid, headers, "amqp/list", v2Direct, replyTo); - QPID_LOG(trace, "SENT QueryResponse (query by object_id) to=" << replyTo); + QPID_LOG(debug, "SENT QueryResponse (query by object_id) to=" << replyTo); return; } } else { @@ -1989,12 +2019,12 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& replyTo ListCodec::encode(_list.front().asList(), content); sendBufferLH(content, cid, headers, "amqp/list", v2Direct, replyTo); _list.pop_front(); - QPID_LOG(trace, "SENT QueryResponse (partial, query by schema_id) to=" << replyTo << " len=" << content.length()); + QPID_LOG(debug, "SENT QueryResponse (partial, query by schema_id) to=" << replyTo << " len=" << content.length()); } headers.erase("partial"); ListCodec::encode(_list.size() ? _list.front().asList() : Variant::List(), content); sendBufferLH(content, cid, headers, "amqp/list", v2Direct, replyTo); - QPID_LOG(trace, "SENT QueryResponse (query by schema_id) to=" << replyTo << " len=" << content.length()); + QPID_LOG(debug, "SENT QueryResponse (query by schema_id) to=" << replyTo << " len=" << content.length()); return; } @@ -2002,14 +2032,14 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& replyTo string content; ListCodec::encode(Variant::List(), content); sendBufferLH(content, cid, headers, "amqp/list", v2Direct, replyTo); - QPID_LOG(trace, "SENT QueryResponse (empty) to=" << replyTo); + QPID_LOG(debug, "SENT QueryResponse (empty) to=" << replyTo); } void ManagementAgent::handleLocateRequestLH(const string&, const string& replyTo, const string& cid) { - QPID_LOG(trace, "RCVD AgentLocateRequest"); + QPID_LOG(debug, "RCVD AgentLocateRequest"); Variant::Map map; Variant::Map headers; @@ -2028,7 +2058,7 @@ void ManagementAgent::handleLocateRequestLH(const string&, const string& replyTo sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyTo); clientWasAdded = true; - QPID_LOG(trace, "SENT AgentLocateResponse replyTo=" << replyTo); + QPID_LOG(debug, "SENT AgentLocateResponse replyTo=" << replyTo); } @@ -2171,7 +2201,7 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg) sendBufferLH(outBuffer, outLen, dExchange, replyToKey); } - QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence); + QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence); } return false; @@ -2269,7 +2299,7 @@ ManagementAgent::PackageMap::iterator ManagementAgent::findOrAddPackageLH(string outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); sendBufferLH(outBuffer, outLen, mExchange, "schema.package"); - QPID_LOG(trace, "SEND PackageInd package=" << name << " to=schema.package"); + QPID_LOG(debug, "SEND PackageInd package=" << name << " to=schema.package"); return result.first; } @@ -2639,12 +2669,13 @@ void ManagementAgent::importAgents(qpid::framing::Buffer& inBuf) { } namespace { -bool isNotDeleted(const ManagementObjectMap::value_type& value) { - return !value.second->isDeleted(); +bool isDeleted(const ManagementObjectMap::value_type& value) { + return value.second->isDeleted(); } -size_t countNotDeleted(const ManagementObjectMap& map) { - return std::count_if(map.begin(), map.end(), isNotDeleted); +void summarizeMap(std::ostream& o, const char* name, const ManagementObjectMap& map) { + size_t deleted = std::count_if(map.begin(), map.end(), isDeleted); + o << map.size() << " " << name << " (" << deleted << " deleted), "; } void dumpMap(std::ostream& o, const ManagementObjectMap& map) { @@ -2657,13 +2688,18 @@ void dumpMap(std::ostream& o, const ManagementObjectMap& map) { string ManagementAgent::debugSnapshot() { ostringstream msg; - msg << " management snapshot:"; - for (RemoteAgentMap::const_iterator i=remoteAgents.begin(); - i != remoteAgents.end(); ++i) - msg << " " << i->second->routingKey; - msg << " packages: " << packages.size(); - msg << " objects: " << countNotDeleted(managementObjects); - msg << " new objects: " << countNotDeleted(newManagementObjects); + msg << " management snapshot: "; + if (!remoteAgents.empty()) { + msg << remoteAgents.size() << " agents("; + for (RemoteAgentMap::const_iterator i=remoteAgents.begin(); + i != remoteAgents.end(); ++i) + msg << " " << i->second->routingKey; + msg << "), "; + } + msg << packages.size() << " packages, "; + summarizeMap(msg, "objects", managementObjects); + summarizeMap(msg, "new objects ", newManagementObjects); + msg << pendingDeletedObjs.size() << " pending deletes" ; return msg.str(); } diff --git a/cpp/src/qpid/management/ManagementObject.cpp b/cpp/src/qpid/management/ManagementObject.cpp index 670a242c02..cfdd58ed53 100644 --- a/cpp/src/qpid/management/ManagementObject.cpp +++ b/cpp/src/qpid/management/ManagementObject.cpp @@ -262,6 +262,7 @@ void ManagementObject::setUpdateTime() void ManagementObject::resourceDestroy() { + QPID_LOG(trace, "Management object marked deleted: " << getObjectId().getV2Key()); destroyTime = sys::Duration(sys::EPOCH, sys::now()); deleted = true; } diff --git a/cpp/src/tests/cluster_test_logs.py b/cpp/src/tests/cluster_test_logs.py new file mode 100755 index 0000000000..160e15e628 --- /dev/null +++ b/cpp/src/tests/cluster_test_logs.py @@ -0,0 +1,105 @@ +#!/usr/bin/env python + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Functions for comparing broker log files, used by cluster_tests.py. + +import os, os.path, re, glob +from itertools import izip + +def split_log(log): + """Split a broker log at checkpoints where a member joins. + Return the set of checkpoints discovered.""" + checkpoint_re = re.compile("Member joined, frameSeq=([0-9]+), queue snapshot:") + outfile = None + checkpoints = [] + for l in open(log): + match = checkpoint_re.search(l) + if match: + checkpoint = match.groups()[0] + checkpoints.append(checkpoint) + if outfile: outfile.close() + outfile = open("%s.%s"%(log, checkpoint), 'w') + + if outfile: outfile.write(l) + if outfile: outfile.close() + return checkpoints + +def filter_log(log): + """Filter the contents of a log file to remove data that is expected + to differ between brokers in a cluster. Filtered log contents between + the same checkpoints should match across the cluster.""" + out = open("%s.filter"%(log), 'w') + for l in open(log): + # Lines to skip entirely + skip = "|".join([ + 'local connection', # Only on local broker + 'UPDATER|UPDATEE|OFFER', # Ignore update process + 'stall for update|unstall, ignore update|cancelled offer .* unstall', + 'caught up', + 'active for links|Passivating links|Activating links', + 'info Connection.* connected to', # UpdateClient connection + 'warning Broker closed connection: 200, OK', + 'task late', + 'task overran' + ]) + if re.compile(skip).search(l): continue + + # Regex to match a UUID + uuid='\w\w\w\w\w\w\w\w-\w\w\w\w-\w\w\w\w-\w\w\w\w-\w\w\w\w\w\w\w\w\w\w\w\w' + + # Regular expression substitutions to remove expected differences + for pattern,subst in [ + (r'\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d', ''), # Remove timestamp + (r'cluster\([0-9.: ]*', 'cluster('), # Remove cluster node id + (r' local\)| shadow\)', ')'), # Remove local/shadow indication + (r'CATCHUP', 'READY'), # Treat catchup as equivalent to ready. + # System UUID + (r'(org.apache.qpid.broker:system[:(])%s(\)?)'%(uuid), r'\1UUID\2'), + + # FIXME aconway 2010-12-20: substitutions to mask known problems + #(r' len=\d+', ' len=NN'), # buffer lengths + #(r' map={.*_object_name:([^,}]*)[,}].*', r' \1'), # V2 map - just keep name + #(r'\d+-\d+-\d+--\d+', 'X-X-X--X'), # V1 Object IDs + ]: l = re.sub(pattern,subst,l) + out.write(l) + out.close() + +def verify_logs(logs): + """Compare log files from cluster brokers, verify that they correspond correctly.""" + for l in glob.glob("*.log"): filter_log(l) + checkpoints = set() + for l in glob.glob("*.filter"): checkpoints = checkpoints.union(set(split_log(l))) + errors=[] + for c in checkpoints: + fragments = glob.glob("*.filter.%s"%(c)) + fragments.sort(reverse=True, key=os.path.getsize) + while len(fragments) >= 2: + a = fragments.pop(0) + b = fragments[0] + for ab in izip(open(a), open(b)): + if ab[0] != ab[1]: + errors.append("\n %s %s"%(a, b)) + break + if errors: + raise Exception("Files differ in %s"%(os.getcwd())+"".join(errors)) + +# Can be run as a script. +if __name__ == "__main__": + verify_logs(glob.glob("*.log")) diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py index daa47a6322..03913356ca 100755 --- a/cpp/src/tests/cluster_tests.py +++ b/cpp/src/tests/cluster_tests.py @@ -18,7 +18,7 @@ # under the License. # -import os, signal, sys, time, imp, re, subprocess +import os, signal, sys, time, imp, re, subprocess, glob, cluster_test_logs from qpid import datatypes, messaging from qpid.brokertest import * from qpid.harness import Skipped @@ -35,7 +35,7 @@ log = getLogger("qpid.cluster_tests") # a non-0 code. Hence the apparently inconsistent use of EXPECT_EXIT_OK # and EXPECT_EXIT_FAIL in some of the tests below. -# FIXME aconway 2010-03-11: resolve this - ideally any exit due to an error +# TODO aconway 2010-03-11: resolve this - ideally any exit due to an error # should give non-0 exit status. # Import scripts as modules @@ -299,7 +299,10 @@ class LongTests(BrokerTest): for i in range(i, len(cluster)): cluster[i].kill() def test_management(self, args=[]): - """Stress test: Run management clients and other clients concurrently.""" + """ + Stress test: Run management clients and other clients concurrently + while killing and restarting brokers. + """ class ClientLoop(StoppableThread): """Run a client executable in a loop.""" @@ -352,9 +355,9 @@ class LongTests(BrokerTest): finally: self.lock.release() StoppableThread.stop(self) - # def test_management - args += ["--mgmt-pub-interval", 1] # Publish management information every second. - # FIXME aconway 2010-12-15: extra debugging + # body of test_management() + + args += ["--mgmt-pub-interval", 1] args += ["--log-enable=trace+:management"] # Use store if present. if BrokerTest.store_lib: args +=["--load-module", BrokerTest.store_lib] @@ -403,6 +406,10 @@ class LongTests(BrokerTest): start_mclients(cluster[alive]) for c in chain(mclients, *clients): c.stop() + # Verify that logs are consistent + # FIXME aconway 2010-12-21: this is currently expected to fail due to + # known bugs, see https://issues.apache.org/jira/browse/QPID-2982 + self.assertRaises(Exception, cluster_test_logs.verify_logs, glob.glob("*.log")) def test_management_qmf2(self): self.test_management(args=["--mgmt-qmf2=yes"]) @@ -506,7 +513,7 @@ class StoreTests(BrokerTest): self.assertEqual(a.wait(), 0) self.assertEqual(c.wait(), 0) # Mix members from both shutdown events, they should fail - # FIXME aconway 2010-03-11: can't predict the exit status of these + # TODO aconway 2010-03-11: can't predict the exit status of these # as it depends on the order of delivery of initial-status messages. # See comment at top of this file. a = cluster.start("a", expect=EXPECT_UNKNOWN, wait=False) |