diff options
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cluster.cpp | 16 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/UpdateClient.cpp | 46 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/UpdateClient.h | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp | 16 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/UpdateDataExchange.h | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementAgent.cpp | 184 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementObject.cpp | 1 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/cluster_test_logs.py | 105 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/cluster_tests.py | 21 |
9 files changed, 284 insertions, 118 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index 0013c370a7..5720f7fcc1 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -265,7 +265,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : "Error delivering frames", poller), failoverExchange(new FailoverExchange(broker.GetVhostObject(), &broker)), - updateDataExchange(new UpdateDataExchange(this)), + updateDataExchange(new UpdateDataExchange(*this)), quorum(boost::bind(&Cluster::leave, this)), decoder(boost::bind(&Cluster::deliverFrame, this, _1)), discarding(true), @@ -356,7 +356,7 @@ void Cluster::addLocalConnection(const boost::intrusive_ptr<Connection>& c) { // Called in connection thread to insert an updated shadow connection. void Cluster::addShadowConnection(const boost::intrusive_ptr<Connection>& c) { - QPID_LOG(info, *this << " new shadow connection " << c->getId()); + QPID_LOG(debug, *this << " new shadow connection " << c->getId()); // Safe to use connections here because we're pre-catchup, stalled // and discarding, so deliveredFrame is not processing any // connection events. @@ -749,7 +749,7 @@ struct AppendQueue { std::string Cluster::debugSnapshot() { assertClusterSafe(); std::ostringstream msg; - msg << "queue snapshot at " << map.getFrameSeq() << ":"; + msg << "Member joined, frameSeq=" << map.getFrameSeq() << ", queue snapshot:"; AppendQueue append(msg); broker.getQueues().eachQueue(append); return msg.str(); @@ -837,7 +837,7 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, Lock& l) checkUpdateIn(l); } else { - QPID_LOG(debug,*this << " unstall, ignore update " << updater + QPID_LOG(info, *this << " unstall, ignore update " << updater << " to " << updatee); deliverEventQueue.start(); // Not involved in update. } @@ -932,15 +932,15 @@ void Cluster::checkUpdateIn(Lock& l) { // NB: don't updateMgmtMembership() here as we are not in the deliver // thread. It will be updated on delivery of the "ready" we just mcast. broker.setClusterUpdatee(false); + discarding = false; // OK to set, we're stalled for update. + QPID_LOG(notice, *this << " update complete, starting catch-up."); + QPID_LOG(debug, debugSnapshot()); // OK to call because we're stalled. if (mAgent) { // Update management agent now, after all update activity is complete. updateDataExchange->updateManagementAgent(mAgent); mAgent->suppress(false); // Enable management output. mAgent->clusterUpdate(); } - discarding = false; // OK to set, we're stalled for update. - QPID_LOG(notice, *this << " update complete, starting catch-up."); - QPID_LOG(debug, debugSnapshot()); // OK to call because we're stalled. enableClusterSafe(); // Enable cluster-safe assertions deliverEventQueue.start(); } @@ -1111,7 +1111,7 @@ void Cluster::setClusterId(const Uuid& uuid, Lock&) { mgmtObject->set_clusterID(clusterId.str()); mgmtObject->set_memberID(stream.str()); } - QPID_LOG(debug, *this << " cluster-uuid = " << clusterId); + QPID_LOG(notice, *this << " cluster-uuid = " << clusterId); } void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) { diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp index c52caf6aa9..6b324be4c5 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp @@ -78,6 +78,10 @@ using namespace framing; namespace arg=client::arg; using client::SessionBase_0_10Access; +std::ostream& operator<<(std::ostream& o, const UpdateClient& c) { + return o << "cluster(" << c.updaterId << " UPDATER)"; +} + struct ClusterConnectionProxy : public AMQP_AllProxy::ClusterConnection, public framing::FrameHandler { boost::shared_ptr<qpid::client::ConnectionImpl> connection; @@ -142,7 +146,7 @@ void UpdateClient::run() { } void UpdateClient::update() { - QPID_LOG(debug, updaterId << " updating state to " << updateeId + QPID_LOG(debug, *this << " updating state to " << updateeId << " at " << updateeUrl); Broker& b = updaterBroker; @@ -177,14 +181,14 @@ void UpdateClient::update() { // NOTE: connection will be closed from the other end, don't close // it here as that causes a race. - // FIXME aconway 2010-03-15: This sleep avoids the race condition + // TODO aconway 2010-03-15: This sleep avoids the race condition // described in // https://bugzilla.redhat.com/show_bug.cgi?id=568831. // It allows the connection to fully close before destroying the // Connection object. Remove when the bug is fixed. // sys::usleep(10*1000); - QPID_LOG(debug, updaterId << " update completed to " << updateeId + QPID_LOG(debug, *this << " update completed to " << updateeId << " at " << updateeUrl << ": " << membership); } @@ -205,7 +209,7 @@ void UpdateClient::updateManagementSetupState() management::ManagementAgent* agent = updaterBroker.getManagementAgent(); if (!agent) return; - QPID_LOG(debug, updaterId << " updating management setup-state."); + QPID_LOG(debug, *this << " updating management setup-state."); std::string vendor, product, instance; agent->getName(vendor, product, instance); ClusterConnectionProxy(session).managementSetupState( @@ -219,19 +223,19 @@ void UpdateClient::updateManagementAgent() if (!agent) return; string data; - QPID_LOG(debug, updaterId << " updating management schemas. ") + QPID_LOG(debug, *this << " updating management schemas. ") agent->exportSchemas(data); session.messageTransfer( arg::content=client::Message(data, UpdateDataExchange::MANAGEMENT_SCHEMAS_KEY), arg::destination=UpdateDataExchange::EXCHANGE_NAME); - QPID_LOG(debug, updaterId << " updating management agents. ") + QPID_LOG(debug, *this << " updating management agents. ") agent->exportAgents(data); session.messageTransfer( arg::content=client::Message(data, UpdateDataExchange::MANAGEMENT_AGENTS_KEY), arg::destination=UpdateDataExchange::EXCHANGE_NAME); - QPID_LOG(debug, updaterId << " updating management deleted objects. ") + QPID_LOG(debug, *this << " updating management deleted objects. ") typedef management::ManagementAgent::DeletedObjectList DeletedObjectList; DeletedObjectList deleted; agent->exportDeletedObjects(deleted); @@ -248,7 +252,7 @@ void UpdateClient::updateManagementAgent() } void UpdateClient::updateExchange(const boost::shared_ptr<Exchange>& ex) { - QPID_LOG(debug, updaterId << " updating exchange " << ex->getName()); + QPID_LOG(debug, *this << " updating exchange " << ex->getName()); ClusterConnectionProxy(session).exchange(encode(*ex)); } @@ -341,13 +345,13 @@ void UpdateClient::updateQueue(client::AsyncSession& s, const boost::shared_ptr< } void UpdateClient::updateExclusiveQueue(const boost::shared_ptr<broker::Queue>& q) { - QPID_LOG(debug, updaterId << " updating exclusive queue " << q->getName() << " on " << shadowSession.getId()); + QPID_LOG(debug, *this << " updating exclusive queue " << q->getName() << " on " << shadowSession.getId()); updateQueue(shadowSession, q); } void UpdateClient::updateNonExclusiveQueue(const boost::shared_ptr<broker::Queue>& q) { if (!q->hasExclusiveOwner()) { - QPID_LOG(debug, updaterId << " updating queue " << q->getName()); + QPID_LOG(debug, *this << " updating queue " << q->getName()); updateQueue(session, q); }//else queue will be updated as part of session state of owning session } @@ -362,12 +366,12 @@ void UpdateClient::updateOutputTask(const sys::OutputTask* task) { SemanticState::ConsumerImpl* ci = const_cast<SemanticState::ConsumerImpl*>(cci); uint16_t channel = ci->getParent().getSession().getChannel(); ClusterConnectionProxy(shadowConnection).outputTask(channel, ci->getName()); - QPID_LOG(debug, updaterId << " updating output task " << ci->getName() + QPID_LOG(debug, *this << " updating output task " << ci->getName() << " channel=" << channel); } void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& updateConnection) { - QPID_LOG(debug, updaterId << " updating connection " << *updateConnection); + QPID_LOG(debug, *this << " updating connection " << *updateConnection); assert(updateConnection->getBrokerConnection()); broker::Connection& bc = *updateConnection->getBrokerConnection(); @@ -398,14 +402,14 @@ void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& upda updateConnection->getOutput().getSendMax() ); shadowConnection.close(); - QPID_LOG(debug, updaterId << " updated connection " << *updateConnection); + QPID_LOG(debug, *this << " updated connection " << *updateConnection); } void UpdateClient::updateSession(broker::SessionHandler& sh) { broker::SessionState* ss = sh.getSession(); if (!ss) return; // no session. - QPID_LOG(debug, updaterId << " updating session " << ss->getId()); + QPID_LOG(debug, *this << " updating session " << ss->getId()); // Create a client session to update session state. boost::shared_ptr<client::ConnectionImpl> cimpl = client::ConnectionAccess::getImpl(shadowConnection); @@ -416,14 +420,14 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) { // Re-create session state on remote connection. - QPID_LOG(debug, updaterId << " updating exclusive queues."); + QPID_LOG(debug, *this << " updating exclusive queues."); ss->getSessionAdapter().eachExclusiveQueue(boost::bind(&UpdateClient::updateExclusiveQueue, this, _1)); - QPID_LOG(debug, updaterId << " updating consumers."); + QPID_LOG(debug, *this << " updating consumers."); ss->getSemanticState().eachConsumer( boost::bind(&UpdateClient::updateConsumer, this, _1)); - QPID_LOG(debug, updaterId << " updating unacknowledged messages."); + QPID_LOG(debug, *this << " updating unacknowledged messages."); broker::DeliveryRecords& drs = ss->getSemanticState().getUnacked(); std::for_each(drs.begin(), drs.end(), boost::bind(&UpdateClient::updateUnacked, this, _1)); @@ -454,13 +458,13 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) { if (inProgress) { inProgress->getFrames().map(simpl->out); } - QPID_LOG(debug, updaterId << " updated session " << sh.getSession()->getId()); + QPID_LOG(debug, *this << " updated session " << sh.getSession()->getId()); } void UpdateClient::updateConsumer( const broker::SemanticState::ConsumerImpl::shared_ptr& ci) { - QPID_LOG(debug, updaterId << " updating consumer " << ci->getName() << " on " + QPID_LOG(debug, *this << " updating consumer " << ci->getName() << " on " << shadowSession.getId()); using namespace message; @@ -485,7 +489,7 @@ void UpdateClient::updateConsumer( ); consumerNumbering.add(ci); - QPID_LOG(debug, updaterId << " updated consumer " << ci->getName() + QPID_LOG(debug, *this << " updated consumer " << ci->getName() << " on " << shadowSession.getId()); } @@ -552,7 +556,7 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater { }; void UpdateClient::updateTxState(broker::SemanticState& s) { - QPID_LOG(debug, updaterId << " updating TX transaction state."); + QPID_LOG(debug, *this << " updating TX transaction state."); ClusterConnectionProxy proxy(shadowSession); proxy.accumulatedAck(s.getAccumulatedAck()); broker::TxBuffer::shared_ptr txBuffer = s.getTxBuffer(); diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.h b/qpid/cpp/src/qpid/cluster/UpdateClient.h index be09af7e81..76621cd7ba 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateClient.h +++ b/qpid/cpp/src/qpid/cluster/UpdateClient.h @@ -30,7 +30,7 @@ #include "qpid/broker/SemanticState.h" #include "qpid/sys/Runnable.h" #include <boost/shared_ptr.hpp> - +#include <iosfwd> namespace qpid { @@ -114,8 +114,11 @@ class UpdateClient : public sys::Runnable { boost::function<void()> done; boost::function<void(const std::exception& e)> failed; client::ConnectionSettings connectionSettings; + + friend std::ostream& operator<<(std::ostream&, const UpdateClient&); }; + }} // namespace qpid::cluster #endif /*!QPID_CLUSTER_UPDATECLIENT_H*/ diff --git a/qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp b/qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp index 2f242b3024..2a079b8881 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp +++ b/qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp @@ -19,6 +19,7 @@ * */ #include "UpdateDataExchange.h" +#include "Cluster.h" #include "qpid/amqp_0_10/Codecs.h" #include "qpid/broker/Deliverable.h" #include "qpid/broker/Message.h" @@ -35,8 +36,13 @@ const std::string UpdateDataExchange::MANAGEMENT_AGENTS_KEY("management-agents") const std::string UpdateDataExchange::MANAGEMENT_SCHEMAS_KEY("management-schemas"); const std::string UpdateDataExchange::MANAGEMENT_DELETED_OBJECTS_KEY("management-deleted-objects"); -UpdateDataExchange::UpdateDataExchange(management::Manageable* parent) : - Exchange(EXCHANGE_NAME, parent) +std::ostream& operator<<(std::ostream& o, const UpdateDataExchange& c) { + return o << "cluster(" << c.clusterId << " UPDATER)"; +} + +UpdateDataExchange::UpdateDataExchange(Cluster& cluster) : + Exchange(EXCHANGE_NAME, &cluster), + clusterId(cluster.getId()) {} void UpdateDataExchange::route(broker::Deliverable& msg, const std::string& routingKey, @@ -56,11 +62,11 @@ void UpdateDataExchange::updateManagementAgent(management::ManagementAgent* agen framing::Buffer buf1(const_cast<char*>(managementAgents.data()), managementAgents.size()); agent->importAgents(buf1); - QPID_LOG(debug, " Updated management agents."); + QPID_LOG(debug, *this << " updated management agents."); framing::Buffer buf2(const_cast<char*>(managementSchemas.data()), managementSchemas.size()); agent->importSchemas(buf2); - QPID_LOG(debug, " Updated management schemas"); + QPID_LOG(debug, *this << " updated management schemas."); using amqp_0_10::ListCodec; using types::Variant; @@ -72,7 +78,7 @@ void UpdateDataExchange::updateManagementAgent(management::ManagementAgent* agen new management::ManagementAgent::DeletedObject(*i))); } agent->importDeletedObjects(objects); - QPID_LOG(debug, " Updated management deleted objects."); + QPID_LOG(debug, *this << " updated management deleted objects."); } diff --git a/qpid/cpp/src/qpid/cluster/UpdateDataExchange.h b/qpid/cpp/src/qpid/cluster/UpdateDataExchange.h index 27a98548f3..8c493e400a 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateDataExchange.h +++ b/qpid/cpp/src/qpid/cluster/UpdateDataExchange.h @@ -23,6 +23,8 @@ */ #include "qpid/broker/Exchange.h" +#include "types.h" +#include <iosfwd> namespace qpid { @@ -31,6 +33,7 @@ class ManagementAgent; } namespace cluster { +class Cluster; /** * An exchange used to send data that is to large for a control @@ -45,7 +48,7 @@ class UpdateDataExchange : public broker::Exchange static const std::string MANAGEMENT_SCHEMAS_KEY; static const std::string MANAGEMENT_DELETED_OBJECTS_KEY; - UpdateDataExchange(management::Manageable* parent); + UpdateDataExchange(Cluster& parent); void route(broker::Deliverable& msg, const std::string& routingKey, const framing::FieldTable* args); @@ -71,10 +74,11 @@ class UpdateDataExchange : public broker::Exchange void updateManagementAgent(management::ManagementAgent* agent); private: - + MemberId clusterId; std::string managementAgents; std::string managementSchemas; std::string managementDeletedObjects; + friend std::ostream& operator<<(std::ostream&, const UpdateDataExchange&); }; }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp index 07751f57ef..7b60ea35c4 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp +++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp @@ -18,7 +18,12 @@ * under the License. * */ - + + +// NOTE on use of log levels: The criteria for using trace vs. debug +// is to use trace for log messages that are generated for each +// unbatched stats/props notification and debug for everything else. + #include "qpid/management/ManagementAgent.h" #include "qpid/management/ManagementObject.h" #include "qpid/broker/DeliverableMessage.h" @@ -89,7 +94,7 @@ static Variant::Map mapEncodeSchemaId(const string& pname, ManagementAgent::RemoteAgent::~RemoteAgent () { - QPID_LOG(trace, "Remote Agent removed bank=[" << brokerBank << "." << agentBank << "]"); + QPID_LOG(debug, "Remote Agent removed bank=[" << brokerBank << "." << agentBank << "]"); if (mgmtObject != 0) { mgmtObject->resourceDestroy(); agent.deleteObjectNowLH(mgmtObject->getObjectId()); @@ -169,7 +174,7 @@ void ManagementAgent::configure(const string& _dataDir, uint16_t _interval, uuid.generate(); QPID_LOG (info, "No stored broker ID found - ManagementAgent generated broker ID: " << uuid); } else - QPID_LOG (debug, "ManagementAgent restored broker ID: " << uuid); + QPID_LOG (info, "ManagementAgent restored broker ID: " << uuid); // if sequence goes beyond a 12-bit field, skip zero and wrap to 1. bootSequence++; @@ -308,7 +313,7 @@ ObjectId ManagementAgent::addObject(ManagementObject* object, uint64_t persistId } newManagementObjects[objId] = object; } - + QPID_LOG(debug, "Management object (V1) added: " << objId.getV2Key()); return objId; } @@ -330,7 +335,6 @@ ObjectId ManagementAgent::addObject(ManagementObject* object, } object->setObjectId(objId); - { sys::Mutex::ScopedLock lock(addLock); ManagementObjectMap::iterator destIter = newManagementObjects.find(objId); @@ -340,7 +344,7 @@ ObjectId ManagementAgent::addObject(ManagementObject* object, } newManagementObjects[objId] = object; } - + QPID_LOG(debug, "Management object added: " << objId.getV2Key()); return objId; } @@ -370,7 +374,7 @@ void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severi outBuffer.reset(); sendBufferLH(outBuffer, outLen, mExchange, "console.event.1.0." + event.getPackageName() + "." + event.getEventName()); - QPID_LOG(trace, "SEND raiseEvent (v1) class=" << event.getPackageName() << "." << event.getEventName()); + QPID_LOG(debug, "SEND raiseEvent (v1) class=" << event.getPackageName() << "." << event.getEventName()); } if (qmf2Support) { @@ -408,9 +412,8 @@ void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severi list_.push_back(map_); ListCodec::encode(list_, content); sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); - QPID_LOG(trace, "SEND raiseEvent (v2) class=" << event.getPackageName() << "." << event.getEventName()); + QPID_LOG(debug, "SEND raiseEvent (v2) class=" << event.getPackageName() << "." << event.getEventName()); } - } ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds) @@ -467,7 +470,7 @@ void ManagementAgent::clientAdded (const string& routingKey) outLen = outBuffer.getPosition(); outBuffer.reset(); sendBufferLH(outBuffer, outLen, dExchange, rkeys.front()); - QPID_LOG(trace, "SEND ConsoleAddedIndication to=" << rkeys.front()); + QPID_LOG(debug, "SEND ConsoleAddedIndication to=" << rkeys.front()); rkeys.pop_front(); } } @@ -476,8 +479,10 @@ void ManagementAgent::clusterUpdate() { // Called on all cluster memebers when a new member joins a cluster. // Set clientWasAdded so that on the next periodicProcessing we will do // a full update on all cluster members. + sys::Mutex::ScopedLock l(userLock); + moveNewObjectsLH(); // to be consistent with updater/updatee. clientWasAdded = true; - QPID_LOG(debug, "cluster update " << debugSnapshot()); + QPID_LOG(debug, "Cluster member joined, " << debugSnapshot()); } void ManagementAgent::encodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq) @@ -509,7 +514,7 @@ void ManagementAgent::sendBufferLH(Buffer& buf, string routingKey) { if (suppressed) { - QPID_LOG(trace, "Suppressing management message to " << routingKey); + QPID_LOG(debug, "Suppressing management message to " << routingKey); return; } if (exchange.get() == 0) return; @@ -564,7 +569,7 @@ void ManagementAgent::sendBufferLH(const string& data, Variant::Map::const_iterator i; if (suppressed) { - QPID_LOG(trace, "Suppressing management message to " << routingKey); + QPID_LOG(debug, "Suppressing management message to " << routingKey); return; } if (exchange.get() == 0) return; @@ -637,7 +642,7 @@ void ManagementAgent::periodicProcessing (void) { #define BUFSIZE 65536 #define HEADROOM 4096 - QPID_LOG(trace, "Management agent periodic processing"); + QPID_LOG(debug, "Management agent periodic processing"); sys::Mutex::ScopedLock lock (userLock); char msgChars[BUFSIZE]; uint32_t contentSize; @@ -776,17 +781,26 @@ void ManagementAgent::periodicProcessing (void) send_stats = (object->hasInst() && (object->getInstChanged() || object->getForcePublish())); if (send_props && qmf1Support) { + size_t pos = msgBuffer.getPosition(); encodeHeader(msgBuffer, 'c'); sBuf.clear(); object->writeProperties(sBuf); msgBuffer.putRawData(sBuf); + QPID_LOG(trace, "Changed V1 properties " + << object->getObjectId().getV2Key() + << " len=" << msgBuffer.getPosition()-pos); } if (send_stats && qmf1Support) { + size_t pos = msgBuffer.getPosition(); encodeHeader(msgBuffer, 'i'); sBuf.clear(); object->writeStatistics(sBuf); msgBuffer.putRawData(sBuf); + QPID_LOG(trace, "Changed V1 statistics " + << object->getObjectId().getV2Key() + << " len=" << msgBuffer.getPosition()-pos); + } if ((send_stats || send_props) && qmf2Support) { @@ -805,6 +819,10 @@ void ManagementAgent::periodicProcessing (void) map_["_values"] = values; list_.push_back(map_); v2Objs++; + QPID_LOG(trace, "Changed V2" + << (send_stats? " statistics":"") + << (send_props? " properties":"") + << " map=" << map_); } if (send_props) pcount++; @@ -826,7 +844,10 @@ void ManagementAgent::periodicProcessing (void) key << "console.obj.1.0." << packageName << "." << className; msgBuffer.reset(); sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); // UNLOCKS USERLOCK - QPID_LOG(trace, "SEND V1 Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount << " len=" << contentSize); + QPID_LOG(debug, "SEND V1 Multicast ContentInd to=" << key.str() + << " props=" << pcount + << " stats=" << scount + << " len=" << contentSize); } } @@ -849,7 +870,10 @@ void ManagementAgent::periodicProcessing (void) headers["qmf.agent"] = name_address; sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); // UNLOCKS USERLOCK - QPID_LOG(trace, "SEND Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount << " len=" << content.length()); + QPID_LOG(debug, "SEND Multicast ContentInd to=" << key.str() + << " props=" << pcount + << " stats=" << scount + << " len=" << content.length()); } } } @@ -877,15 +901,19 @@ void ManagementAgent::periodicProcessing (void) for (DeletedObjectList::iterator lIter = mIter->second.begin(); lIter != mIter->second.end(); lIter++) { - + std::string oid = (*lIter)->objectId; if (!(*lIter)->encodedV1Config.empty()) { encodeHeader(msgBuffer, 'c'); msgBuffer.putRawData((*lIter)->encodedV1Config); + QPID_LOG(trace, "Deleting V1 properties " << oid + << " len=" << (*lIter)->encodedV1Config.size()); v1Objs++; } if (!(*lIter)->encodedV1Inst.empty()) { encodeHeader(msgBuffer, 'i'); msgBuffer.putRawData((*lIter)->encodedV1Inst); + QPID_LOG(trace, "Deleting V1 statistics " << oid + << " len=" << (*lIter)->encodedV1Inst.size()); v1Objs++; } if (v1Objs && msgBuffer.available() < HEADROOM) { @@ -895,10 +923,12 @@ void ManagementAgent::periodicProcessing (void) key << "console.obj.1.0." << packageName << "." << className; msgBuffer.reset(); sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); // UNLOCKS USERLOCK - QPID_LOG(trace, "SEND V1 Multicast ContentInd V1 (delete) to=" << key.str() << " len=" << contentSize); + QPID_LOG(debug, "SEND V1 Multicast ContentInd V1 (delete) to=" + << key.str() << " len=" << contentSize); } if (!(*lIter)->encodedV2.empty()) { + QPID_LOG(trace, "Deleting V2 " << "map=" << (*lIter)->encodedV2); list_.push_back((*lIter)->encodedV2); if (++v2Objs >= maxV2ReplyObjs) { v2Objs = 0; @@ -922,7 +952,7 @@ void ManagementAgent::periodicProcessing (void) headers["qmf.agent"] = name_address; sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); // UNLOCKS USERLOCK - QPID_LOG(trace, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length()); + QPID_LOG(debug, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length()); } } } @@ -936,7 +966,7 @@ void ManagementAgent::periodicProcessing (void) key << "console.obj.1.0." << packageName << "." << className; msgBuffer.reset(); sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); // UNLOCKS USERLOCK - QPID_LOG(trace, "SEND V1 Multicast ContentInd V1 (delete) to=" << key.str() << " len=" << contentSize); + QPID_LOG(debug, "SEND V1 Multicast ContentInd V1 (delete) to=" << key.str() << " len=" << contentSize); } if (!list_.empty()) { @@ -959,7 +989,7 @@ void ManagementAgent::periodicProcessing (void) headers["qmf.agent"] = name_address; sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); // UNLOCKS USERLOCK - QPID_LOG(trace, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length()); + QPID_LOG(debug, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length()); } } } // end map @@ -984,7 +1014,7 @@ void ManagementAgent::periodicProcessing (void) msgBuffer.reset (); routingKey = "console.heartbeat.1.0"; sendBufferLH(msgBuffer, contentSize, mExchange, routingKey); - QPID_LOG(trace, "SEND HeartbeatInd to=" << routingKey); + QPID_LOG(debug, "SEND HeartbeatInd to=" << routingKey); } if (qmf2Support) { @@ -1013,7 +1043,7 @@ void ManagementAgent::periodicProcessing (void) // time to prevent stale heartbeats from getting to the consoles. sendBufferLH(content, "", headers, "amqp/map", v2Topic, addr_key.str(), interval * 2 * 1000); - QPID_LOG(trace, "SENT AgentHeartbeat name=" << name_address); + QPID_LOG(debug, "SENT AgentHeartbeat name=" << name_address); } QPID_LOG(debug, "periodic update " << debugSnapshot()); } @@ -1073,7 +1103,7 @@ void ManagementAgent::deleteObjectNowLH(const ObjectId& oid) uint32_t contentSize = msgBuffer.getPosition(); msgBuffer.reset(); sendBufferLH(msgBuffer, contentSize, mExchange, v1key.str()); - QPID_LOG(trace, "SEND Immediate(delete) ContentInd to=" << v1key.str()); + QPID_LOG(debug, "SEND Immediate(delete) ContentInd to=" << v1key.str()); } if (qmf2Support) { @@ -1086,7 +1116,7 @@ void ManagementAgent::deleteObjectNowLH(const ObjectId& oid) string content; ListCodec::encode(list_, content); sendBufferLH(content, "", headers, "amqp/list", v2Topic, v2key.str()); - QPID_LOG(trace, "SEND Immediate(delete) ContentInd to=" << v2key.str()); + QPID_LOG(debug, "SEND Immediate(delete) ContentInd to=" << v2key.str()); } } @@ -1102,7 +1132,7 @@ void ManagementAgent::sendCommandCompleteLH(const string& replyToKey, uint32_t s outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); sendBufferLH(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(trace, "SEND CommandCompleteInd code=" << code << " text=" << text << " to=" << + QPID_LOG(debug, "SEND CommandCompleteInd code=" << code << " text=" << text << " to=" << replyToKey << " seq=" << sequence); } @@ -1127,7 +1157,7 @@ void ManagementAgent::sendExceptionLH(const string& replyToKey, const string& ci MapCodec::encode(map, content); sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyToKey); - QPID_LOG(trace, "SENT Exception code=" << code <<" text=" << text); + QPID_LOG(debug, "SENT Exception code=" << code <<" text=" << text); } bool ManagementAgent::dispatchCommand (Deliverable& deliverable, @@ -1221,7 +1251,7 @@ void ManagementAgent::handleMethodRequestLH(Buffer& inBuffer, const string& repl inBuffer.getShortString(methodName); inBuffer.getRawData(inArgs, inBuffer.available()); - QPID_LOG(trace, "RECV MethodRequest (v1) class=" << packageName << ":" << className << "(" << Uuid(hash) << ") method=" << + QPID_LOG(debug, "RECV MethodRequest (v1) class=" << packageName << ":" << className << "(" << Uuid(hash) << ") method=" << methodName << " replyTo=" << replyToKey); encodeHeader(outBuffer, 'm', sequence); @@ -1232,7 +1262,7 @@ void ManagementAgent::handleMethodRequestLH(Buffer& inBuffer, const string& repl outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); sendBufferLH(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN reason='All QMFv1 Methods Forbidden' seq=" << sequence); + QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN reason='All QMFv1 Methods Forbidden' seq=" << sequence); return; } @@ -1243,7 +1273,7 @@ void ManagementAgent::handleMethodRequestLH(Buffer& inBuffer, const string& repl outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); sendBufferLH(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << sequence); + QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << sequence); return; } @@ -1259,7 +1289,7 @@ void ManagementAgent::handleMethodRequestLH(Buffer& inBuffer, const string& repl outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); sendBufferLH(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence); + QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence); return; } } @@ -1291,7 +1321,7 @@ void ManagementAgent::handleMethodRequestLH(Buffer& inBuffer, const string& repl outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); sendBufferLH(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(trace, "SEND MethodResponse (v1) to=" << replyToKey << " seq=" << sequence); + QPID_LOG(debug, "SEND MethodResponse (v1) to=" << replyToKey << " seq=" << sequence); } @@ -1374,7 +1404,7 @@ void ManagementAgent::handleMethodRequestLH (const string& body, const string& r // invoke the method - QPID_LOG(trace, "RECV MethodRequest (v2) class=" << iter->second->getPackageName() + QPID_LOG(debug, "RECV MethodRequest (v2) class=" << iter->second->getPackageName() << ":" << iter->second->getClassName() << " method=" << methodName << " replyTo=" << replyTo << " objId=" << objId << " inArgs=" << inArgs); @@ -1402,7 +1432,7 @@ void ManagementAgent::handleMethodRequestLH (const string& body, const string& r MapCodec::encode(outMap, content); sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyTo); - QPID_LOG(trace, "SEND MethodResponse (v2) to=" << replyTo << " seq=" << cid << " map=" << outMap); + QPID_LOG(debug, "SEND MethodResponse (v2) to=" << replyTo << " seq=" << cid << " map=" << outMap); } @@ -1411,7 +1441,7 @@ void ManagementAgent::handleBrokerRequestLH (Buffer&, const string& replyToKey, Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - QPID_LOG(trace, "RECV BrokerRequest replyTo=" << replyToKey); + QPID_LOG(debug, "RECV BrokerRequest replyTo=" << replyToKey); encodeHeader (outBuffer, 'b', sequence); uuid.encode (outBuffer); @@ -1419,12 +1449,12 @@ void ManagementAgent::handleBrokerRequestLH (Buffer&, const string& replyToKey, outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); sendBufferLH(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(trace, "SEND BrokerResponse to=" << replyToKey); + QPID_LOG(debug, "SEND BrokerResponse to=" << replyToKey); } void ManagementAgent::handlePackageQueryLH (Buffer&, const string& replyToKey, uint32_t sequence) { - QPID_LOG(trace, "RECV PackageQuery replyTo=" << replyToKey); + QPID_LOG(debug, "RECV PackageQuery replyTo=" << replyToKey); Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; @@ -1440,7 +1470,7 @@ void ManagementAgent::handlePackageQueryLH (Buffer&, const string& replyToKey, u if (outLen) { outBuffer.reset (); sendBufferLH(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(trace, "SEND PackageInd to=" << replyToKey << " seq=" << sequence); + QPID_LOG(debug, "SEND PackageInd to=" << replyToKey << " seq=" << sequence); } sendCommandCompleteLH(replyToKey, sequence); @@ -1452,7 +1482,7 @@ void ManagementAgent::handlePackageIndLH (Buffer& inBuffer, const string& replyT inBuffer.getShortString(packageName); - QPID_LOG(trace, "RECV PackageInd package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence); + QPID_LOG(debug, "RECV PackageInd package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence); findOrAddPackageLH(packageName); } @@ -1463,7 +1493,7 @@ void ManagementAgent::handleClassQueryLH(Buffer& inBuffer, const string& replyTo inBuffer.getShortString(packageName); - QPID_LOG(trace, "RECV ClassQuery package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence); + QPID_LOG(debug, "RECV ClassQuery package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence); PackageMap::iterator pIter = packages.find(packageName); if (pIter != packages.end()) @@ -1489,7 +1519,7 @@ void ManagementAgent::handleClassQueryLH(Buffer& inBuffer, const string& replyTo outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); sendBufferLH(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(trace, "SEND ClassInd class=" << packageName << ":" << classes.front().first.name << + QPID_LOG(debug, "SEND ClassInd class=" << packageName << ":" << classes.front().first.name << "(" << Uuid(classes.front().first.hash) << ") to=" << replyToKey << " seq=" << sequence); classes.pop_front(); } @@ -1508,7 +1538,7 @@ void ManagementAgent::handleClassIndLH (Buffer& inBuffer, const string& replyToK inBuffer.getShortString(key.name); inBuffer.getBin128(key.hash); - QPID_LOG(trace, "RECV ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << + QPID_LOG(debug, "RECV ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << "), replyTo=" << replyToKey); PackageMap::iterator pIter = findOrAddPackageLH(packageName); @@ -1525,7 +1555,7 @@ void ManagementAgent::handleClassIndLH (Buffer& inBuffer, const string& replyToK outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); sendBufferLH(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(trace, "SEND SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << + QPID_LOG(debug, "SEND SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << "), to=" << replyToKey << " seq=" << sequence); if (cIter != pIter->second.end()) @@ -1557,7 +1587,7 @@ void ManagementAgent::handleSchemaRequestLH(Buffer& inBuffer, const string& repl inBuffer.getShortString (packageName); key.decode(inBuffer); - QPID_LOG(trace, "RECV SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << + QPID_LOG(debug, "RECV SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << "), replyTo=" << replyToKey << " seq=" << sequence); PackageMap::iterator pIter = packages.find(packageName); @@ -1575,7 +1605,7 @@ void ManagementAgent::handleSchemaRequestLH(Buffer& inBuffer, const string& repl outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); sendBufferLH(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(trace, "SEND SchemaResponse to=" << replyToKey << " seq=" << sequence); + QPID_LOG(debug, "SEND SchemaResponse to=" << replyToKey << " seq=" << sequence); } else sendCommandCompleteLH(replyToKey, sequence, 1, "Schema not available"); @@ -1598,7 +1628,7 @@ void ManagementAgent::handleSchemaResponseLH(Buffer& inBuffer, const string& /*r key.decode(inBuffer); inBuffer.restore(); - QPID_LOG(trace, "RECV SchemaResponse class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" << " seq=" << sequence); + QPID_LOG(debug, "RECV SchemaResponse class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" << " seq=" << sequence); PackageMap::iterator pIter = packages.find(packageName); if (pIter != packages.end()) { @@ -1622,7 +1652,7 @@ void ManagementAgent::handleSchemaResponseLH(Buffer& inBuffer, const string& /*r outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); sendBufferLH(outBuffer, outLen, mExchange, "schema.class"); - QPID_LOG(trace, "SEND ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" << + QPID_LOG(debug, "SEND ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" << " to=schema.class"); } } @@ -1702,7 +1732,7 @@ void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, const string& rep requestedBrokerBank = inBuffer.getLong(); requestedAgentBank = inBuffer.getLong(); - QPID_LOG(trace, "RECV (Agent)AttachRequest label=" << label << " reqBrokerBank=" << requestedBrokerBank << + QPID_LOG(debug, "RECV (Agent)AttachRequest label=" << label << " reqBrokerBank=" << requestedBrokerBank << " reqAgentBank=" << requestedAgentBank << " replyTo=" << replyToKey << " seq=" << sequence); assignedBank = assignBankLH(requestedAgentBank); @@ -1722,7 +1752,7 @@ void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, const string& rep addObject (agent->mgmtObject, 0); remoteAgents[connectionRef] = agent; - QPID_LOG(trace, "Remote Agent registered bank=[" << brokerBank << "." << assignedBank << "] replyTo=" << replyToKey); + QPID_LOG(debug, "Remote Agent registered bank=[" << brokerBank << "." << assignedBank << "] replyTo=" << replyToKey); // Send an Attach Response Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); @@ -1734,7 +1764,7 @@ void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, const string& rep outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); sendBufferLH(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(trace, "SEND AttachResponse brokerBank=" << brokerBank << " agentBank=" << assignedBank << + QPID_LOG(debug, "SEND AttachResponse brokerBank=" << brokerBank << " agentBank=" << assignedBank << " to=" << replyToKey << " seq=" << sequence); } @@ -1747,7 +1777,7 @@ void ManagementAgent::handleGetQueryLH(Buffer& inBuffer, const string& replyToKe ft.decode(inBuffer); - QPID_LOG(trace, "RECV GetQuery (v1) query=" << ft << " seq=" << sequence); + QPID_LOG(debug, "RECV GetQuery (v1) query=" << ft << " seq=" << sequence); value = ft.get("_class"); if (value.get() == 0 || !value->convertsTo<string>()) { @@ -1776,7 +1806,7 @@ void ManagementAgent::handleGetQueryLH(Buffer& inBuffer, const string& replyToKe outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); sendBufferLH(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(trace, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence); + QPID_LOG(debug, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence); } } sendCommandCompleteLH(replyToKey, sequence); @@ -1821,7 +1851,7 @@ void ManagementAgent::handleGetQueryLH(Buffer& inBuffer, const string& replyToKe outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); sendBufferLH(outBuffer, outLen, dExchange, replyToKey); // drops lock - QPID_LOG(trace, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence); + QPID_LOG(debug, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence); continue; // lock dropped, need to re-find _SAME_ objid as it may have been deleted. } encodeHeader(outBuffer, 'g', sequence); @@ -1837,7 +1867,7 @@ void ManagementAgent::handleGetQueryLH(Buffer& inBuffer, const string& replyToKe if (outLen) { outBuffer.reset (); sendBufferLH(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(trace, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence); + QPID_LOG(debug, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence); } sendCommandCompleteLH(replyToKey, sequence); @@ -1853,7 +1883,7 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& replyTo Variant::Map headers; MapCodec::decode(body, inMap); - QPID_LOG(trace, "RECV GetQuery (v2): map=" << inMap << " seq=" << cid); + QPID_LOG(debug, "RECV GetQuery (v2): map=" << inMap << " seq=" << cid); headers["method"] = "response"; headers["qmf.opcode"] = "_query_response"; @@ -1935,7 +1965,7 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& replyTo ListCodec::encode(list_, content); sendBufferLH(content, cid, headers, "amqp/list", v2Direct, replyTo); - QPID_LOG(trace, "SENT QueryResponse (query by object_id) to=" << replyTo); + QPID_LOG(debug, "SENT QueryResponse (query by object_id) to=" << replyTo); return; } } else { @@ -1989,12 +2019,12 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& replyTo ListCodec::encode(_list.front().asList(), content); sendBufferLH(content, cid, headers, "amqp/list", v2Direct, replyTo); _list.pop_front(); - QPID_LOG(trace, "SENT QueryResponse (partial, query by schema_id) to=" << replyTo << " len=" << content.length()); + QPID_LOG(debug, "SENT QueryResponse (partial, query by schema_id) to=" << replyTo << " len=" << content.length()); } headers.erase("partial"); ListCodec::encode(_list.size() ? _list.front().asList() : Variant::List(), content); sendBufferLH(content, cid, headers, "amqp/list", v2Direct, replyTo); - QPID_LOG(trace, "SENT QueryResponse (query by schema_id) to=" << replyTo << " len=" << content.length()); + QPID_LOG(debug, "SENT QueryResponse (query by schema_id) to=" << replyTo << " len=" << content.length()); return; } @@ -2002,14 +2032,14 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& replyTo string content; ListCodec::encode(Variant::List(), content); sendBufferLH(content, cid, headers, "amqp/list", v2Direct, replyTo); - QPID_LOG(trace, "SENT QueryResponse (empty) to=" << replyTo); + QPID_LOG(debug, "SENT QueryResponse (empty) to=" << replyTo); } void ManagementAgent::handleLocateRequestLH(const string&, const string& replyTo, const string& cid) { - QPID_LOG(trace, "RCVD AgentLocateRequest"); + QPID_LOG(debug, "RCVD AgentLocateRequest"); Variant::Map map; Variant::Map headers; @@ -2028,7 +2058,7 @@ void ManagementAgent::handleLocateRequestLH(const string&, const string& replyTo sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyTo); clientWasAdded = true; - QPID_LOG(trace, "SENT AgentLocateResponse replyTo=" << replyTo); + QPID_LOG(debug, "SENT AgentLocateResponse replyTo=" << replyTo); } @@ -2171,7 +2201,7 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg) sendBufferLH(outBuffer, outLen, dExchange, replyToKey); } - QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence); + QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence); } return false; @@ -2269,7 +2299,7 @@ ManagementAgent::PackageMap::iterator ManagementAgent::findOrAddPackageLH(string outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); sendBufferLH(outBuffer, outLen, mExchange, "schema.package"); - QPID_LOG(trace, "SEND PackageInd package=" << name << " to=schema.package"); + QPID_LOG(debug, "SEND PackageInd package=" << name << " to=schema.package"); return result.first; } @@ -2639,12 +2669,13 @@ void ManagementAgent::importAgents(qpid::framing::Buffer& inBuf) { } namespace { -bool isNotDeleted(const ManagementObjectMap::value_type& value) { - return !value.second->isDeleted(); +bool isDeleted(const ManagementObjectMap::value_type& value) { + return value.second->isDeleted(); } -size_t countNotDeleted(const ManagementObjectMap& map) { - return std::count_if(map.begin(), map.end(), isNotDeleted); +void summarizeMap(std::ostream& o, const char* name, const ManagementObjectMap& map) { + size_t deleted = std::count_if(map.begin(), map.end(), isDeleted); + o << map.size() << " " << name << " (" << deleted << " deleted), "; } void dumpMap(std::ostream& o, const ManagementObjectMap& map) { @@ -2657,13 +2688,18 @@ void dumpMap(std::ostream& o, const ManagementObjectMap& map) { string ManagementAgent::debugSnapshot() { ostringstream msg; - msg << " management snapshot:"; - for (RemoteAgentMap::const_iterator i=remoteAgents.begin(); - i != remoteAgents.end(); ++i) - msg << " " << i->second->routingKey; - msg << " packages: " << packages.size(); - msg << " objects: " << countNotDeleted(managementObjects); - msg << " new objects: " << countNotDeleted(newManagementObjects); + msg << " management snapshot: "; + if (!remoteAgents.empty()) { + msg << remoteAgents.size() << " agents("; + for (RemoteAgentMap::const_iterator i=remoteAgents.begin(); + i != remoteAgents.end(); ++i) + msg << " " << i->second->routingKey; + msg << "), "; + } + msg << packages.size() << " packages, "; + summarizeMap(msg, "objects", managementObjects); + summarizeMap(msg, "new objects ", newManagementObjects); + msg << pendingDeletedObjs.size() << " pending deletes" ; return msg.str(); } diff --git a/qpid/cpp/src/qpid/management/ManagementObject.cpp b/qpid/cpp/src/qpid/management/ManagementObject.cpp index 670a242c02..cfdd58ed53 100644 --- a/qpid/cpp/src/qpid/management/ManagementObject.cpp +++ b/qpid/cpp/src/qpid/management/ManagementObject.cpp @@ -262,6 +262,7 @@ void ManagementObject::setUpdateTime() void ManagementObject::resourceDestroy() { + QPID_LOG(trace, "Management object marked deleted: " << getObjectId().getV2Key()); destroyTime = sys::Duration(sys::EPOCH, sys::now()); deleted = true; } diff --git a/qpid/cpp/src/tests/cluster_test_logs.py b/qpid/cpp/src/tests/cluster_test_logs.py new file mode 100755 index 0000000000..160e15e628 --- /dev/null +++ b/qpid/cpp/src/tests/cluster_test_logs.py @@ -0,0 +1,105 @@ +#!/usr/bin/env python + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Functions for comparing broker log files, used by cluster_tests.py. + +import os, os.path, re, glob +from itertools import izip + +def split_log(log): + """Split a broker log at checkpoints where a member joins. + Return the set of checkpoints discovered.""" + checkpoint_re = re.compile("Member joined, frameSeq=([0-9]+), queue snapshot:") + outfile = None + checkpoints = [] + for l in open(log): + match = checkpoint_re.search(l) + if match: + checkpoint = match.groups()[0] + checkpoints.append(checkpoint) + if outfile: outfile.close() + outfile = open("%s.%s"%(log, checkpoint), 'w') + + if outfile: outfile.write(l) + if outfile: outfile.close() + return checkpoints + +def filter_log(log): + """Filter the contents of a log file to remove data that is expected + to differ between brokers in a cluster. Filtered log contents between + the same checkpoints should match across the cluster.""" + out = open("%s.filter"%(log), 'w') + for l in open(log): + # Lines to skip entirely + skip = "|".join([ + 'local connection', # Only on local broker + 'UPDATER|UPDATEE|OFFER', # Ignore update process + 'stall for update|unstall, ignore update|cancelled offer .* unstall', + 'caught up', + 'active for links|Passivating links|Activating links', + 'info Connection.* connected to', # UpdateClient connection + 'warning Broker closed connection: 200, OK', + 'task late', + 'task overran' + ]) + if re.compile(skip).search(l): continue + + # Regex to match a UUID + uuid='\w\w\w\w\w\w\w\w-\w\w\w\w-\w\w\w\w-\w\w\w\w-\w\w\w\w\w\w\w\w\w\w\w\w' + + # Regular expression substitutions to remove expected differences + for pattern,subst in [ + (r'\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d', ''), # Remove timestamp + (r'cluster\([0-9.: ]*', 'cluster('), # Remove cluster node id + (r' local\)| shadow\)', ')'), # Remove local/shadow indication + (r'CATCHUP', 'READY'), # Treat catchup as equivalent to ready. + # System UUID + (r'(org.apache.qpid.broker:system[:(])%s(\)?)'%(uuid), r'\1UUID\2'), + + # FIXME aconway 2010-12-20: substitutions to mask known problems + #(r' len=\d+', ' len=NN'), # buffer lengths + #(r' map={.*_object_name:([^,}]*)[,}].*', r' \1'), # V2 map - just keep name + #(r'\d+-\d+-\d+--\d+', 'X-X-X--X'), # V1 Object IDs + ]: l = re.sub(pattern,subst,l) + out.write(l) + out.close() + +def verify_logs(logs): + """Compare log files from cluster brokers, verify that they correspond correctly.""" + for l in glob.glob("*.log"): filter_log(l) + checkpoints = set() + for l in glob.glob("*.filter"): checkpoints = checkpoints.union(set(split_log(l))) + errors=[] + for c in checkpoints: + fragments = glob.glob("*.filter.%s"%(c)) + fragments.sort(reverse=True, key=os.path.getsize) + while len(fragments) >= 2: + a = fragments.pop(0) + b = fragments[0] + for ab in izip(open(a), open(b)): + if ab[0] != ab[1]: + errors.append("\n %s %s"%(a, b)) + break + if errors: + raise Exception("Files differ in %s"%(os.getcwd())+"".join(errors)) + +# Can be run as a script. +if __name__ == "__main__": + verify_logs(glob.glob("*.log")) diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py index daa47a6322..03913356ca 100755 --- a/qpid/cpp/src/tests/cluster_tests.py +++ b/qpid/cpp/src/tests/cluster_tests.py @@ -18,7 +18,7 @@ # under the License. # -import os, signal, sys, time, imp, re, subprocess +import os, signal, sys, time, imp, re, subprocess, glob, cluster_test_logs from qpid import datatypes, messaging from qpid.brokertest import * from qpid.harness import Skipped @@ -35,7 +35,7 @@ log = getLogger("qpid.cluster_tests") # a non-0 code. Hence the apparently inconsistent use of EXPECT_EXIT_OK # and EXPECT_EXIT_FAIL in some of the tests below. -# FIXME aconway 2010-03-11: resolve this - ideally any exit due to an error +# TODO aconway 2010-03-11: resolve this - ideally any exit due to an error # should give non-0 exit status. # Import scripts as modules @@ -299,7 +299,10 @@ class LongTests(BrokerTest): for i in range(i, len(cluster)): cluster[i].kill() def test_management(self, args=[]): - """Stress test: Run management clients and other clients concurrently.""" + """ + Stress test: Run management clients and other clients concurrently + while killing and restarting brokers. + """ class ClientLoop(StoppableThread): """Run a client executable in a loop.""" @@ -352,9 +355,9 @@ class LongTests(BrokerTest): finally: self.lock.release() StoppableThread.stop(self) - # def test_management - args += ["--mgmt-pub-interval", 1] # Publish management information every second. - # FIXME aconway 2010-12-15: extra debugging + # body of test_management() + + args += ["--mgmt-pub-interval", 1] args += ["--log-enable=trace+:management"] # Use store if present. if BrokerTest.store_lib: args +=["--load-module", BrokerTest.store_lib] @@ -403,6 +406,10 @@ class LongTests(BrokerTest): start_mclients(cluster[alive]) for c in chain(mclients, *clients): c.stop() + # Verify that logs are consistent + # FIXME aconway 2010-12-21: this is currently expected to fail due to + # known bugs, see https://issues.apache.org/jira/browse/QPID-2982 + self.assertRaises(Exception, cluster_test_logs.verify_logs, glob.glob("*.log")) def test_management_qmf2(self): self.test_management(args=["--mgmt-qmf2=yes"]) @@ -506,7 +513,7 @@ class StoreTests(BrokerTest): self.assertEqual(a.wait(), 0) self.assertEqual(c.wait(), 0) # Mix members from both shutdown events, they should fail - # FIXME aconway 2010-03-11: can't predict the exit status of these + # TODO aconway 2010-03-11: can't predict the exit status of these # as it depends on the order of delivery of initial-status messages. # See comment at top of this file. a = cluster.start("a", expect=EXPECT_UNKNOWN, wait=False) |