diff options
Diffstat (limited to 'cpp')
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 16 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.cpp | 46 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.h | 5 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/UpdateDataExchange.cpp | 16 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/UpdateDataExchange.h | 8 | ||||
| -rw-r--r-- | cpp/src/qpid/management/ManagementAgent.cpp | 184 | ||||
| -rw-r--r-- | cpp/src/qpid/management/ManagementObject.cpp | 1 | ||||
| -rwxr-xr-x | cpp/src/tests/cluster_test_logs.py | 105 | ||||
| -rwxr-xr-x | cpp/src/tests/cluster_tests.py | 21 | 
9 files changed, 284 insertions, 118 deletions
| diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 0013c370a7..5720f7fcc1 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -265,7 +265,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :                        "Error delivering frames",                        poller),      failoverExchange(new FailoverExchange(broker.GetVhostObject(), &broker)), -    updateDataExchange(new UpdateDataExchange(this)), +    updateDataExchange(new UpdateDataExchange(*this)),      quorum(boost::bind(&Cluster::leave, this)),      decoder(boost::bind(&Cluster::deliverFrame, this, _1)),      discarding(true), @@ -356,7 +356,7 @@ void Cluster::addLocalConnection(const boost::intrusive_ptr<Connection>& c) {  // Called in connection thread to insert an updated shadow connection.  void Cluster::addShadowConnection(const boost::intrusive_ptr<Connection>& c) { -    QPID_LOG(info, *this << " new shadow connection " << c->getId()); +    QPID_LOG(debug, *this << " new shadow connection " << c->getId());      // Safe to use connections here because we're pre-catchup, stalled      // and discarding, so deliveredFrame is not processing any      // connection events. @@ -749,7 +749,7 @@ struct AppendQueue {  std::string Cluster::debugSnapshot() {      assertClusterSafe();      std::ostringstream msg; -    msg << "queue snapshot at " << map.getFrameSeq() << ":"; +    msg << "Member joined, frameSeq=" << map.getFrameSeq() << ", queue snapshot:";      AppendQueue append(msg);      broker.getQueues().eachQueue(append);      return msg.str(); @@ -837,7 +837,7 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, Lock& l)          checkUpdateIn(l);      }      else { -        QPID_LOG(debug,*this << " unstall, ignore update " << updater +        QPID_LOG(info, *this << " unstall, ignore update " << updater                   << " to " << updatee);          deliverEventQueue.start(); // Not involved in update.      } @@ -932,15 +932,15 @@ void Cluster::checkUpdateIn(Lock& l) {          // NB: don't updateMgmtMembership() here as we are not in the deliver          // thread. It will be updated on delivery of the "ready" we just mcast.          broker.setClusterUpdatee(false); +        discarding = false;     // OK to set, we're stalled for update. +        QPID_LOG(notice, *this << " update complete, starting catch-up."); +        QPID_LOG(debug, debugSnapshot()); // OK to call because we're stalled.          if (mAgent) {              // Update management agent now, after all update activity is complete.              updateDataExchange->updateManagementAgent(mAgent);              mAgent->suppress(false); // Enable management output.              mAgent->clusterUpdate();          } -        discarding = false;     // OK to set, we're stalled for update. -        QPID_LOG(notice, *this << " update complete, starting catch-up."); -        QPID_LOG(debug, debugSnapshot()); // OK to call because we're stalled.          enableClusterSafe();    // Enable cluster-safe assertions          deliverEventQueue.start();      } @@ -1111,7 +1111,7 @@ void Cluster::setClusterId(const Uuid& uuid, Lock&) {          mgmtObject->set_clusterID(clusterId.str());          mgmtObject->set_memberID(stream.str());      } -    QPID_LOG(debug, *this << " cluster-uuid = " << clusterId); +    QPID_LOG(notice, *this << " cluster-uuid = " << clusterId);  }  void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) { diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp index c52caf6aa9..6b324be4c5 100644 --- a/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -78,6 +78,10 @@ using namespace framing;  namespace arg=client::arg;  using client::SessionBase_0_10Access; +std::ostream& operator<<(std::ostream& o, const UpdateClient& c) { +    return o << "cluster(" << c.updaterId << " UPDATER)"; +} +  struct ClusterConnectionProxy : public AMQP_AllProxy::ClusterConnection, public framing::FrameHandler   {      boost::shared_ptr<qpid::client::ConnectionImpl> connection; @@ -142,7 +146,7 @@ void UpdateClient::run() {  }  void UpdateClient::update() { -    QPID_LOG(debug, updaterId << " updating state to " << updateeId +    QPID_LOG(debug, *this << " updating state to " << updateeId               << " at " << updateeUrl);      Broker& b = updaterBroker; @@ -177,14 +181,14 @@ void UpdateClient::update() {      // NOTE: connection will be closed from the other end, don't close      // it here as that causes a race. -    // FIXME aconway 2010-03-15: This sleep avoids the race condition +    // TODO aconway 2010-03-15: This sleep avoids the race condition      // described in // https://bugzilla.redhat.com/show_bug.cgi?id=568831.      // It allows the connection to fully close before destroying the      // Connection object. Remove when the bug is fixed.      //      sys::usleep(10*1000); -    QPID_LOG(debug,  updaterId << " update completed to " << updateeId +    QPID_LOG(debug,  *this << " update completed to " << updateeId               << " at " << updateeUrl << ": " << membership);  } @@ -205,7 +209,7 @@ void UpdateClient::updateManagementSetupState()      management::ManagementAgent* agent = updaterBroker.getManagementAgent();      if (!agent) return; -    QPID_LOG(debug, updaterId << " updating management setup-state."); +    QPID_LOG(debug, *this << " updating management setup-state.");      std::string vendor, product, instance;      agent->getName(vendor, product, instance);      ClusterConnectionProxy(session).managementSetupState( @@ -219,19 +223,19 @@ void UpdateClient::updateManagementAgent()      if (!agent) return;      string data; -    QPID_LOG(debug, updaterId << " updating management schemas. ") +    QPID_LOG(debug, *this << " updating management schemas. ")      agent->exportSchemas(data);      session.messageTransfer(          arg::content=client::Message(data, UpdateDataExchange::MANAGEMENT_SCHEMAS_KEY),          arg::destination=UpdateDataExchange::EXCHANGE_NAME); -    QPID_LOG(debug, updaterId << " updating management agents. ") +    QPID_LOG(debug, *this << " updating management agents. ")      agent->exportAgents(data);      session.messageTransfer(          arg::content=client::Message(data, UpdateDataExchange::MANAGEMENT_AGENTS_KEY),          arg::destination=UpdateDataExchange::EXCHANGE_NAME); -    QPID_LOG(debug, updaterId << " updating management deleted objects. ") +    QPID_LOG(debug, *this << " updating management deleted objects. ")      typedef management::ManagementAgent::DeletedObjectList DeletedObjectList;      DeletedObjectList deleted;      agent->exportDeletedObjects(deleted); @@ -248,7 +252,7 @@ void UpdateClient::updateManagementAgent()  }  void UpdateClient::updateExchange(const boost::shared_ptr<Exchange>& ex) { -    QPID_LOG(debug, updaterId << " updating exchange " << ex->getName()); +    QPID_LOG(debug, *this << " updating exchange " << ex->getName());      ClusterConnectionProxy(session).exchange(encode(*ex));  } @@ -341,13 +345,13 @@ void UpdateClient::updateQueue(client::AsyncSession& s, const boost::shared_ptr<  }  void UpdateClient::updateExclusiveQueue(const boost::shared_ptr<broker::Queue>& q) { -    QPID_LOG(debug, updaterId << " updating exclusive queue " << q->getName() << " on " << shadowSession.getId()); +    QPID_LOG(debug, *this << " updating exclusive queue " << q->getName() << " on " << shadowSession.getId());      updateQueue(shadowSession, q);  }  void UpdateClient::updateNonExclusiveQueue(const boost::shared_ptr<broker::Queue>& q) {      if (!q->hasExclusiveOwner()) { -        QPID_LOG(debug, updaterId << " updating queue " << q->getName()); +        QPID_LOG(debug, *this << " updating queue " << q->getName());          updateQueue(session, q);      }//else queue will be updated as part of session state of owning session  } @@ -362,12 +366,12 @@ void UpdateClient::updateOutputTask(const sys::OutputTask* task) {      SemanticState::ConsumerImpl* ci = const_cast<SemanticState::ConsumerImpl*>(cci);      uint16_t channel =  ci->getParent().getSession().getChannel();      ClusterConnectionProxy(shadowConnection).outputTask(channel,  ci->getName()); -    QPID_LOG(debug, updaterId << " updating output task " << ci->getName() +    QPID_LOG(debug, *this << " updating output task " << ci->getName()               << " channel=" << channel);  }  void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& updateConnection) { -    QPID_LOG(debug, updaterId << " updating connection " << *updateConnection); +    QPID_LOG(debug, *this << " updating connection " << *updateConnection);      assert(updateConnection->getBrokerConnection());      broker::Connection& bc = *updateConnection->getBrokerConnection(); @@ -398,14 +402,14 @@ void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& upda          updateConnection->getOutput().getSendMax()      );      shadowConnection.close(); -    QPID_LOG(debug, updaterId << " updated connection " << *updateConnection); +    QPID_LOG(debug, *this << " updated connection " << *updateConnection);  }  void UpdateClient::updateSession(broker::SessionHandler& sh) {      broker::SessionState* ss = sh.getSession();      if (!ss) return;            // no session. -    QPID_LOG(debug, updaterId << " updating session " << ss->getId()); +    QPID_LOG(debug, *this << " updating session " << ss->getId());      // Create a client session to update session state.       boost::shared_ptr<client::ConnectionImpl> cimpl = client::ConnectionAccess::getImpl(shadowConnection); @@ -416,14 +420,14 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) {      // Re-create session state on remote connection. -    QPID_LOG(debug, updaterId << " updating exclusive queues."); +    QPID_LOG(debug, *this << " updating exclusive queues.");      ss->getSessionAdapter().eachExclusiveQueue(boost::bind(&UpdateClient::updateExclusiveQueue, this, _1)); -    QPID_LOG(debug, updaterId << " updating consumers."); +    QPID_LOG(debug, *this << " updating consumers.");      ss->getSemanticState().eachConsumer(          boost::bind(&UpdateClient::updateConsumer, this, _1)); -    QPID_LOG(debug, updaterId << " updating unacknowledged messages."); +    QPID_LOG(debug, *this << " updating unacknowledged messages.");      broker::DeliveryRecords& drs = ss->getSemanticState().getUnacked();      std::for_each(drs.begin(), drs.end(),                    boost::bind(&UpdateClient::updateUnacked, this, _1)); @@ -454,13 +458,13 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) {      if (inProgress) {          inProgress->getFrames().map(simpl->out);      } -    QPID_LOG(debug, updaterId << " updated session " << sh.getSession()->getId()); +    QPID_LOG(debug, *this << " updated session " << sh.getSession()->getId());  }  void UpdateClient::updateConsumer(      const broker::SemanticState::ConsumerImpl::shared_ptr& ci)  { -    QPID_LOG(debug, updaterId << " updating consumer " << ci->getName() << " on " +    QPID_LOG(debug, *this << " updating consumer " << ci->getName() << " on "               << shadowSession.getId());      using namespace message; @@ -485,7 +489,7 @@ void UpdateClient::updateConsumer(      );      consumerNumbering.add(ci); -    QPID_LOG(debug, updaterId << " updated consumer " << ci->getName() +    QPID_LOG(debug, *this << " updated consumer " << ci->getName()               << " on " << shadowSession.getId());  } @@ -552,7 +556,7 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater {  };  void UpdateClient::updateTxState(broker::SemanticState& s) { -    QPID_LOG(debug, updaterId << " updating TX transaction state."); +    QPID_LOG(debug, *this << " updating TX transaction state.");      ClusterConnectionProxy proxy(shadowSession);      proxy.accumulatedAck(s.getAccumulatedAck());      broker::TxBuffer::shared_ptr txBuffer = s.getTxBuffer(); diff --git a/cpp/src/qpid/cluster/UpdateClient.h b/cpp/src/qpid/cluster/UpdateClient.h index be09af7e81..76621cd7ba 100644 --- a/cpp/src/qpid/cluster/UpdateClient.h +++ b/cpp/src/qpid/cluster/UpdateClient.h @@ -30,7 +30,7 @@  #include "qpid/broker/SemanticState.h"  #include "qpid/sys/Runnable.h"  #include <boost/shared_ptr.hpp> - +#include <iosfwd>  namespace qpid { @@ -114,8 +114,11 @@ class UpdateClient : public sys::Runnable {      boost::function<void()> done;      boost::function<void(const std::exception& e)> failed;      client::ConnectionSettings connectionSettings; + +  friend std::ostream& operator<<(std::ostream&, const UpdateClient&);  }; +  }} // namespace qpid::cluster  #endif  /*!QPID_CLUSTER_UPDATECLIENT_H*/ diff --git a/cpp/src/qpid/cluster/UpdateDataExchange.cpp b/cpp/src/qpid/cluster/UpdateDataExchange.cpp index 2f242b3024..2a079b8881 100644 --- a/cpp/src/qpid/cluster/UpdateDataExchange.cpp +++ b/cpp/src/qpid/cluster/UpdateDataExchange.cpp @@ -19,6 +19,7 @@   *   */  #include "UpdateDataExchange.h" +#include "Cluster.h"  #include "qpid/amqp_0_10/Codecs.h"  #include "qpid/broker/Deliverable.h"  #include "qpid/broker/Message.h" @@ -35,8 +36,13 @@ const std::string UpdateDataExchange::MANAGEMENT_AGENTS_KEY("management-agents")  const std::string UpdateDataExchange::MANAGEMENT_SCHEMAS_KEY("management-schemas");  const std::string UpdateDataExchange::MANAGEMENT_DELETED_OBJECTS_KEY("management-deleted-objects"); -UpdateDataExchange::UpdateDataExchange(management::Manageable* parent) : -    Exchange(EXCHANGE_NAME, parent) +std::ostream& operator<<(std::ostream& o, const UpdateDataExchange& c) { +    return o << "cluster(" << c.clusterId << " UPDATER)"; +} + +UpdateDataExchange::UpdateDataExchange(Cluster& cluster) : +    Exchange(EXCHANGE_NAME, &cluster), +    clusterId(cluster.getId())  {}  void UpdateDataExchange::route(broker::Deliverable& msg, const std::string& routingKey, @@ -56,11 +62,11 @@ void UpdateDataExchange::updateManagementAgent(management::ManagementAgent* agen      framing::Buffer buf1(const_cast<char*>(managementAgents.data()), managementAgents.size());      agent->importAgents(buf1); -    QPID_LOG(debug, " Updated management agents."); +    QPID_LOG(debug, *this << " updated management agents.");      framing::Buffer buf2(const_cast<char*>(managementSchemas.data()), managementSchemas.size());      agent->importSchemas(buf2); -    QPID_LOG(debug, " Updated management schemas"); +    QPID_LOG(debug, *this << " updated management schemas.");      using amqp_0_10::ListCodec;      using types::Variant; @@ -72,7 +78,7 @@ void UpdateDataExchange::updateManagementAgent(management::ManagementAgent* agen                                new management::ManagementAgent::DeletedObject(*i)));      }      agent->importDeletedObjects(objects); -    QPID_LOG(debug, " Updated management deleted objects."); +    QPID_LOG(debug, *this << " updated management deleted objects.");  } diff --git a/cpp/src/qpid/cluster/UpdateDataExchange.h b/cpp/src/qpid/cluster/UpdateDataExchange.h index 27a98548f3..8c493e400a 100644 --- a/cpp/src/qpid/cluster/UpdateDataExchange.h +++ b/cpp/src/qpid/cluster/UpdateDataExchange.h @@ -23,6 +23,8 @@   */  #include "qpid/broker/Exchange.h" +#include "types.h" +#include <iosfwd>  namespace qpid { @@ -31,6 +33,7 @@ class ManagementAgent;  }  namespace cluster { +class Cluster;  /**   * An exchange used to send data that is to large for a control @@ -45,7 +48,7 @@ class UpdateDataExchange : public broker::Exchange      static const std::string MANAGEMENT_SCHEMAS_KEY;      static const std::string MANAGEMENT_DELETED_OBJECTS_KEY; -    UpdateDataExchange(management::Manageable* parent); +    UpdateDataExchange(Cluster& parent);      void route(broker::Deliverable& msg, const std::string& routingKey,                 const framing::FieldTable* args); @@ -71,10 +74,11 @@ class UpdateDataExchange : public broker::Exchange      void updateManagementAgent(management::ManagementAgent* agent);    private: - +    MemberId clusterId;      std::string managementAgents;      std::string managementSchemas;      std::string managementDeletedObjects; +  friend std::ostream& operator<<(std::ostream&, const UpdateDataExchange&);  };  }} // namespace qpid::cluster diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp index 07751f57ef..7b60ea35c4 100644 --- a/cpp/src/qpid/management/ManagementAgent.cpp +++ b/cpp/src/qpid/management/ManagementAgent.cpp @@ -18,7 +18,12 @@   * under the License.   *   */ -  + + +// NOTE on use of log levels: The criteria for using trace vs. debug +// is to use trace for log messages that are generated for each +// unbatched stats/props notification and debug for everything else. +  #include "qpid/management/ManagementAgent.h"  #include "qpid/management/ManagementObject.h"  #include "qpid/broker/DeliverableMessage.h" @@ -89,7 +94,7 @@ static Variant::Map mapEncodeSchemaId(const string& pname,  ManagementAgent::RemoteAgent::~RemoteAgent ()  { -    QPID_LOG(trace, "Remote Agent removed bank=[" << brokerBank << "." << agentBank << "]"); +    QPID_LOG(debug, "Remote Agent removed bank=[" << brokerBank << "." << agentBank << "]");      if (mgmtObject != 0) {          mgmtObject->resourceDestroy();          agent.deleteObjectNowLH(mgmtObject->getObjectId()); @@ -169,7 +174,7 @@ void ManagementAgent::configure(const string& _dataDir, uint16_t _interval,                  uuid.generate();                  QPID_LOG (info, "No stored broker ID found - ManagementAgent generated broker ID: " << uuid);              } else -                QPID_LOG (debug, "ManagementAgent restored broker ID: " << uuid); +                QPID_LOG (info, "ManagementAgent restored broker ID: " << uuid);              // if sequence goes beyond a 12-bit field, skip zero and wrap to 1.              bootSequence++; @@ -308,7 +313,7 @@ ObjectId ManagementAgent::addObject(ManagementObject* object, uint64_t persistId          }          newManagementObjects[objId] = object;      } - +    QPID_LOG(debug, "Management object (V1) added: " << objId.getV2Key());      return objId;  } @@ -330,7 +335,6 @@ ObjectId ManagementAgent::addObject(ManagementObject* object,      }      object->setObjectId(objId); -      {          sys::Mutex::ScopedLock lock(addLock);          ManagementObjectMap::iterator destIter = newManagementObjects.find(objId); @@ -340,7 +344,7 @@ ObjectId ManagementAgent::addObject(ManagementObject* object,          }          newManagementObjects[objId] = object;      } - +    QPID_LOG(debug, "Management object added: " << objId.getV2Key());      return objId;  } @@ -370,7 +374,7 @@ void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severi          outBuffer.reset();          sendBufferLH(outBuffer, outLen, mExchange,                     "console.event.1.0." + event.getPackageName() + "." + event.getEventName()); -        QPID_LOG(trace, "SEND raiseEvent (v1) class=" << event.getPackageName() << "." << event.getEventName()); +        QPID_LOG(debug, "SEND raiseEvent (v1) class=" << event.getPackageName() << "." << event.getEventName());      }      if (qmf2Support) { @@ -408,9 +412,8 @@ void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severi          list_.push_back(map_);          ListCodec::encode(list_, content);          sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); -        QPID_LOG(trace, "SEND raiseEvent (v2) class=" << event.getPackageName() << "." << event.getEventName()); +        QPID_LOG(debug, "SEND raiseEvent (v2) class=" << event.getPackageName() << "." << event.getEventName());      } -  }  ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds) @@ -467,7 +470,7 @@ void ManagementAgent::clientAdded (const string& routingKey)          outLen = outBuffer.getPosition();          outBuffer.reset();          sendBufferLH(outBuffer, outLen, dExchange, rkeys.front()); -        QPID_LOG(trace, "SEND ConsoleAddedIndication to=" << rkeys.front()); +        QPID_LOG(debug, "SEND ConsoleAddedIndication to=" << rkeys.front());          rkeys.pop_front();      }  } @@ -476,8 +479,10 @@ void ManagementAgent::clusterUpdate() {      // Called on all cluster memebers when a new member joins a cluster.      // Set clientWasAdded so that on the next periodicProcessing we will do       // a full update on all cluster members. +    sys::Mutex::ScopedLock l(userLock); +    moveNewObjectsLH();         // to be consistent with updater/updatee.      clientWasAdded = true; -    QPID_LOG(debug, "cluster update " << debugSnapshot()); +    QPID_LOG(debug, "Cluster member joined, " << debugSnapshot());  }  void ManagementAgent::encodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq) @@ -509,7 +514,7 @@ void ManagementAgent::sendBufferLH(Buffer&  buf,                                     string   routingKey)  {      if (suppressed) { -        QPID_LOG(trace, "Suppressing management message to " << routingKey); +        QPID_LOG(debug, "Suppressing management message to " << routingKey);          return;      }      if (exchange.get() == 0) return; @@ -564,7 +569,7 @@ void ManagementAgent::sendBufferLH(const string& data,      Variant::Map::const_iterator i;      if (suppressed) { -        QPID_LOG(trace, "Suppressing management message to " << routingKey); +        QPID_LOG(debug, "Suppressing management message to " << routingKey);          return;      }      if (exchange.get() == 0) return; @@ -637,7 +642,7 @@ void ManagementAgent::periodicProcessing (void)  {  #define BUFSIZE   65536  #define HEADROOM  4096 -    QPID_LOG(trace, "Management agent periodic processing"); +    QPID_LOG(debug, "Management agent periodic processing");      sys::Mutex::ScopedLock lock (userLock);      char                msgChars[BUFSIZE];      uint32_t            contentSize; @@ -776,17 +781,26 @@ void ManagementAgent::periodicProcessing (void)                  send_stats = (object->hasInst() && (object->getInstChanged() || object->getForcePublish()));                  if (send_props && qmf1Support) { +                    size_t pos = msgBuffer.getPosition();                      encodeHeader(msgBuffer, 'c');                      sBuf.clear();                      object->writeProperties(sBuf);                      msgBuffer.putRawData(sBuf); +                    QPID_LOG(trace, "Changed V1 properties " +                             << object->getObjectId().getV2Key() +                             << " len=" << msgBuffer.getPosition()-pos);                  }                  if (send_stats && qmf1Support) { +                    size_t pos = msgBuffer.getPosition();                      encodeHeader(msgBuffer, 'i');                      sBuf.clear();                      object->writeStatistics(sBuf);                      msgBuffer.putRawData(sBuf); +                    QPID_LOG(trace, "Changed V1 statistics " +                             << object->getObjectId().getV2Key() +                             << " len=" << msgBuffer.getPosition()-pos); +                  }                  if ((send_stats || send_props) && qmf2Support) { @@ -805,6 +819,10 @@ void ManagementAgent::periodicProcessing (void)                      map_["_values"] = values;                      list_.push_back(map_);                      v2Objs++; +                    QPID_LOG(trace, "Changed V2" +                             << (send_stats? " statistics":"") +                             << (send_props? " properties":"") +                             << " map=" << map_);                  }                  if (send_props) pcount++; @@ -826,7 +844,10 @@ void ManagementAgent::periodicProcessing (void)                      key << "console.obj.1.0." << packageName << "." << className;                      msgBuffer.reset();                      sendBufferLH(msgBuffer, contentSize, mExchange, key.str());   // UNLOCKS USERLOCK -                    QPID_LOG(trace, "SEND V1 Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount << " len=" << contentSize); +                    QPID_LOG(debug, "SEND V1 Multicast ContentInd to=" << key.str() +                             << " props=" << pcount +                             << " stats=" << scount +                             << " len=" << contentSize);                  }              } @@ -849,7 +870,10 @@ void ManagementAgent::periodicProcessing (void)                      headers["qmf.agent"] = name_address;                      sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str());  // UNLOCKS USERLOCK -                    QPID_LOG(trace, "SEND Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount << " len=" << content.length()); +                    QPID_LOG(debug, "SEND Multicast ContentInd to=" << key.str() +                             << " props=" << pcount +                             << " stats=" << scount +                             << " len=" << content.length());                  }              }          } @@ -877,15 +901,19 @@ void ManagementAgent::periodicProcessing (void)              for (DeletedObjectList::iterator lIter = mIter->second.begin();                   lIter != mIter->second.end(); lIter++) { - +                std::string oid = (*lIter)->objectId;                  if (!(*lIter)->encodedV1Config.empty()) {                      encodeHeader(msgBuffer, 'c');                      msgBuffer.putRawData((*lIter)->encodedV1Config); +                    QPID_LOG(trace, "Deleting V1 properties " << oid +                             << " len=" << (*lIter)->encodedV1Config.size());                      v1Objs++;                  }                  if (!(*lIter)->encodedV1Inst.empty()) {                      encodeHeader(msgBuffer, 'i');                      msgBuffer.putRawData((*lIter)->encodedV1Inst); +                    QPID_LOG(trace, "Deleting V1 statistics " << oid +                             << " len=" <<  (*lIter)->encodedV1Inst.size());                      v1Objs++;                  }                  if (v1Objs && msgBuffer.available() < HEADROOM) { @@ -895,10 +923,12 @@ void ManagementAgent::periodicProcessing (void)                      key << "console.obj.1.0." << packageName << "." << className;                      msgBuffer.reset();                      sendBufferLH(msgBuffer, contentSize, mExchange, key.str());   // UNLOCKS USERLOCK -                    QPID_LOG(trace, "SEND V1 Multicast ContentInd V1 (delete) to=" << key.str() << " len=" << contentSize); +                    QPID_LOG(debug, "SEND V1 Multicast ContentInd V1 (delete) to=" +                             << key.str() << " len=" << contentSize);                  }                  if (!(*lIter)->encodedV2.empty()) { +                    QPID_LOG(trace, "Deleting V2 " << "map=" << (*lIter)->encodedV2);                      list_.push_back((*lIter)->encodedV2);                      if (++v2Objs >= maxV2ReplyObjs) {                          v2Objs = 0; @@ -922,7 +952,7 @@ void ManagementAgent::periodicProcessing (void)                              headers["qmf.agent"] = name_address;                              sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str());  // UNLOCKS USERLOCK -                            QPID_LOG(trace, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length()); +                            QPID_LOG(debug, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length());                          }                      }                  } @@ -936,7 +966,7 @@ void ManagementAgent::periodicProcessing (void)                  key << "console.obj.1.0." << packageName << "." << className;                  msgBuffer.reset();                  sendBufferLH(msgBuffer, contentSize, mExchange, key.str());   // UNLOCKS USERLOCK -                QPID_LOG(trace, "SEND V1 Multicast ContentInd V1 (delete) to=" << key.str() << " len=" << contentSize); +                QPID_LOG(debug, "SEND V1 Multicast ContentInd V1 (delete) to=" << key.str() << " len=" << contentSize);              }              if (!list_.empty()) { @@ -959,7 +989,7 @@ void ManagementAgent::periodicProcessing (void)                      headers["qmf.agent"] = name_address;                      sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str());  // UNLOCKS USERLOCK -                    QPID_LOG(trace, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length()); +                    QPID_LOG(debug, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length());                  }              }          }  // end map @@ -984,7 +1014,7 @@ void ManagementAgent::periodicProcessing (void)          msgBuffer.reset ();          routingKey = "console.heartbeat.1.0";          sendBufferLH(msgBuffer, contentSize, mExchange, routingKey); -        QPID_LOG(trace, "SEND HeartbeatInd to=" << routingKey); +        QPID_LOG(debug, "SEND HeartbeatInd to=" << routingKey);      }      if (qmf2Support) { @@ -1013,7 +1043,7 @@ void ManagementAgent::periodicProcessing (void)          // time to prevent stale heartbeats from getting to the consoles.          sendBufferLH(content, "", headers, "amqp/map", v2Topic, addr_key.str(), interval * 2 * 1000); -        QPID_LOG(trace, "SENT AgentHeartbeat name=" << name_address); +        QPID_LOG(debug, "SENT AgentHeartbeat name=" << name_address);      }      QPID_LOG(debug, "periodic update " << debugSnapshot());  } @@ -1073,7 +1103,7 @@ void ManagementAgent::deleteObjectNowLH(const ObjectId& oid)          uint32_t contentSize = msgBuffer.getPosition();          msgBuffer.reset();          sendBufferLH(msgBuffer, contentSize, mExchange, v1key.str()); -        QPID_LOG(trace, "SEND Immediate(delete) ContentInd to=" << v1key.str()); +        QPID_LOG(debug, "SEND Immediate(delete) ContentInd to=" << v1key.str());      }      if (qmf2Support) { @@ -1086,7 +1116,7 @@ void ManagementAgent::deleteObjectNowLH(const ObjectId& oid)          string content;          ListCodec::encode(list_, content);          sendBufferLH(content, "", headers, "amqp/list", v2Topic, v2key.str()); -        QPID_LOG(trace, "SEND Immediate(delete) ContentInd to=" << v2key.str()); +        QPID_LOG(debug, "SEND Immediate(delete) ContentInd to=" << v2key.str());      }  } @@ -1102,7 +1132,7 @@ void ManagementAgent::sendCommandCompleteLH(const string& replyToKey, uint32_t s      outLen = MA_BUFFER_SIZE - outBuffer.available ();      outBuffer.reset ();      sendBufferLH(outBuffer, outLen, dExchange, replyToKey); -    QPID_LOG(trace, "SEND CommandCompleteInd code=" << code << " text=" << text << " to=" << +    QPID_LOG(debug, "SEND CommandCompleteInd code=" << code << " text=" << text << " to=" <<               replyToKey << " seq=" << sequence);  } @@ -1127,7 +1157,7 @@ void ManagementAgent::sendExceptionLH(const string& replyToKey, const string& ci      MapCodec::encode(map, content);      sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyToKey); -    QPID_LOG(trace, "SENT Exception code=" << code <<" text=" << text); +    QPID_LOG(debug, "SENT Exception code=" << code <<" text=" << text);  }  bool ManagementAgent::dispatchCommand (Deliverable&      deliverable, @@ -1221,7 +1251,7 @@ void ManagementAgent::handleMethodRequestLH(Buffer& inBuffer, const string& repl      inBuffer.getShortString(methodName);      inBuffer.getRawData(inArgs, inBuffer.available()); -    QPID_LOG(trace, "RECV MethodRequest (v1) class=" << packageName << ":" << className << "(" << Uuid(hash) << ") method=" << +    QPID_LOG(debug, "RECV MethodRequest (v1) class=" << packageName << ":" << className << "(" << Uuid(hash) << ") method=" <<               methodName << " replyTo=" << replyToKey);      encodeHeader(outBuffer, 'm', sequence); @@ -1232,7 +1262,7 @@ void ManagementAgent::handleMethodRequestLH(Buffer& inBuffer, const string& repl          outLen = MA_BUFFER_SIZE - outBuffer.available();          outBuffer.reset();          sendBufferLH(outBuffer, outLen, dExchange, replyToKey); -        QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN reason='All QMFv1 Methods Forbidden' seq=" << sequence); +        QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN reason='All QMFv1 Methods Forbidden' seq=" << sequence);          return;      } @@ -1243,7 +1273,7 @@ void ManagementAgent::handleMethodRequestLH(Buffer& inBuffer, const string& repl          outLen = MA_BUFFER_SIZE - outBuffer.available();          outBuffer.reset();          sendBufferLH(outBuffer, outLen, dExchange, replyToKey); -        QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << sequence); +        QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << sequence);          return;      } @@ -1259,7 +1289,7 @@ void ManagementAgent::handleMethodRequestLH(Buffer& inBuffer, const string& repl              outLen = MA_BUFFER_SIZE - outBuffer.available();              outBuffer.reset();              sendBufferLH(outBuffer, outLen, dExchange, replyToKey); -            QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence); +            QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence);              return;          }      } @@ -1291,7 +1321,7 @@ void ManagementAgent::handleMethodRequestLH(Buffer& inBuffer, const string& repl      outLen = MA_BUFFER_SIZE - outBuffer.available();      outBuffer.reset();      sendBufferLH(outBuffer, outLen, dExchange, replyToKey); -    QPID_LOG(trace, "SEND MethodResponse (v1) to=" << replyToKey << " seq=" << sequence); +    QPID_LOG(debug, "SEND MethodResponse (v1) to=" << replyToKey << " seq=" << sequence);  } @@ -1374,7 +1404,7 @@ void ManagementAgent::handleMethodRequestLH (const string& body, const string& r      // invoke the method -    QPID_LOG(trace, "RECV MethodRequest (v2) class=" << iter->second->getPackageName() +    QPID_LOG(debug, "RECV MethodRequest (v2) class=" << iter->second->getPackageName()               << ":" << iter->second->getClassName() << " method=" <<               methodName << " replyTo=" << replyTo << " objId=" << objId << " inArgs=" << inArgs); @@ -1402,7 +1432,7 @@ void ManagementAgent::handleMethodRequestLH (const string& body, const string& r      MapCodec::encode(outMap, content);      sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyTo); -    QPID_LOG(trace, "SEND MethodResponse (v2) to=" << replyTo << " seq=" << cid << " map=" << outMap); +    QPID_LOG(debug, "SEND MethodResponse (v2) to=" << replyTo << " seq=" << cid << " map=" << outMap);  } @@ -1411,7 +1441,7 @@ void ManagementAgent::handleBrokerRequestLH (Buffer&, const string& replyToKey,      Buffer   outBuffer (outputBuffer, MA_BUFFER_SIZE);      uint32_t outLen; -    QPID_LOG(trace, "RECV BrokerRequest replyTo=" << replyToKey); +    QPID_LOG(debug, "RECV BrokerRequest replyTo=" << replyToKey);      encodeHeader (outBuffer, 'b', sequence);      uuid.encode  (outBuffer); @@ -1419,12 +1449,12 @@ void ManagementAgent::handleBrokerRequestLH (Buffer&, const string& replyToKey,      outLen = MA_BUFFER_SIZE - outBuffer.available ();      outBuffer.reset ();      sendBufferLH(outBuffer, outLen, dExchange, replyToKey); -    QPID_LOG(trace, "SEND BrokerResponse to=" << replyToKey); +    QPID_LOG(debug, "SEND BrokerResponse to=" << replyToKey);  }  void ManagementAgent::handlePackageQueryLH (Buffer&, const string& replyToKey, uint32_t sequence)  { -    QPID_LOG(trace, "RECV PackageQuery replyTo=" << replyToKey); +    QPID_LOG(debug, "RECV PackageQuery replyTo=" << replyToKey);      Buffer   outBuffer (outputBuffer, MA_BUFFER_SIZE);      uint32_t outLen; @@ -1440,7 +1470,7 @@ void ManagementAgent::handlePackageQueryLH (Buffer&, const string& replyToKey, u      if (outLen) {          outBuffer.reset ();          sendBufferLH(outBuffer, outLen, dExchange, replyToKey); -        QPID_LOG(trace, "SEND PackageInd to=" << replyToKey << " seq=" << sequence); +        QPID_LOG(debug, "SEND PackageInd to=" << replyToKey << " seq=" << sequence);      }      sendCommandCompleteLH(replyToKey, sequence); @@ -1452,7 +1482,7 @@ void ManagementAgent::handlePackageIndLH (Buffer& inBuffer, const string& replyT      inBuffer.getShortString(packageName); -    QPID_LOG(trace, "RECV PackageInd package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence); +    QPID_LOG(debug, "RECV PackageInd package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence);      findOrAddPackageLH(packageName);  } @@ -1463,7 +1493,7 @@ void ManagementAgent::handleClassQueryLH(Buffer& inBuffer, const string& replyTo      inBuffer.getShortString(packageName); -    QPID_LOG(trace, "RECV ClassQuery package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence); +    QPID_LOG(debug, "RECV ClassQuery package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence);      PackageMap::iterator pIter = packages.find(packageName);      if (pIter != packages.end()) @@ -1489,7 +1519,7 @@ void ManagementAgent::handleClassQueryLH(Buffer& inBuffer, const string& replyTo              outLen = MA_BUFFER_SIZE - outBuffer.available();              outBuffer.reset();              sendBufferLH(outBuffer, outLen, dExchange, replyToKey); -            QPID_LOG(trace, "SEND ClassInd class=" << packageName << ":" << classes.front().first.name << +            QPID_LOG(debug, "SEND ClassInd class=" << packageName << ":" << classes.front().first.name <<                       "(" << Uuid(classes.front().first.hash) << ") to=" << replyToKey << " seq=" << sequence);              classes.pop_front();          } @@ -1508,7 +1538,7 @@ void ManagementAgent::handleClassIndLH (Buffer& inBuffer, const string& replyToK      inBuffer.getShortString(key.name);      inBuffer.getBin128(key.hash); -    QPID_LOG(trace, "RECV ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << +    QPID_LOG(debug, "RECV ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) <<               "), replyTo=" << replyToKey);      PackageMap::iterator pIter = findOrAddPackageLH(packageName); @@ -1525,7 +1555,7 @@ void ManagementAgent::handleClassIndLH (Buffer& inBuffer, const string& replyToK          outLen = MA_BUFFER_SIZE - outBuffer.available ();          outBuffer.reset ();          sendBufferLH(outBuffer, outLen, dExchange, replyToKey); -        QPID_LOG(trace, "SEND SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << +        QPID_LOG(debug, "SEND SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) <<                   "), to=" << replyToKey << " seq=" << sequence);          if (cIter != pIter->second.end()) @@ -1557,7 +1587,7 @@ void ManagementAgent::handleSchemaRequestLH(Buffer& inBuffer, const string& repl      inBuffer.getShortString (packageName);      key.decode(inBuffer); -    QPID_LOG(trace, "RECV SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << +    QPID_LOG(debug, "RECV SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) <<               "), replyTo=" << replyToKey << " seq=" << sequence);      PackageMap::iterator pIter = packages.find(packageName); @@ -1575,7 +1605,7 @@ void ManagementAgent::handleSchemaRequestLH(Buffer& inBuffer, const string& repl                  outLen = MA_BUFFER_SIZE - outBuffer.available();                  outBuffer.reset();                  sendBufferLH(outBuffer, outLen, dExchange, replyToKey); -                QPID_LOG(trace, "SEND SchemaResponse to=" << replyToKey << " seq=" << sequence); +                QPID_LOG(debug, "SEND SchemaResponse to=" << replyToKey << " seq=" << sequence);              }              else                  sendCommandCompleteLH(replyToKey, sequence, 1, "Schema not available"); @@ -1598,7 +1628,7 @@ void ManagementAgent::handleSchemaResponseLH(Buffer& inBuffer, const string& /*r      key.decode(inBuffer);      inBuffer.restore(); -    QPID_LOG(trace, "RECV SchemaResponse class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" << " seq=" << sequence); +    QPID_LOG(debug, "RECV SchemaResponse class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" << " seq=" << sequence);      PackageMap::iterator pIter = packages.find(packageName);      if (pIter != packages.end()) { @@ -1622,7 +1652,7 @@ void ManagementAgent::handleSchemaResponseLH(Buffer& inBuffer, const string& /*r                  outLen = MA_BUFFER_SIZE - outBuffer.available();                  outBuffer.reset();                  sendBufferLH(outBuffer, outLen, mExchange, "schema.class"); -                QPID_LOG(trace, "SEND ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" << +                QPID_LOG(debug, "SEND ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" <<                           " to=schema.class");              }          } @@ -1702,7 +1732,7 @@ void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, const string& rep      requestedBrokerBank = inBuffer.getLong();      requestedAgentBank  = inBuffer.getLong(); -    QPID_LOG(trace, "RECV (Agent)AttachRequest label=" << label << " reqBrokerBank=" << requestedBrokerBank << +    QPID_LOG(debug, "RECV (Agent)AttachRequest label=" << label << " reqBrokerBank=" << requestedBrokerBank <<               " reqAgentBank=" << requestedAgentBank << " replyTo=" << replyToKey << " seq=" << sequence);      assignedBank = assignBankLH(requestedAgentBank); @@ -1722,7 +1752,7 @@ void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, const string& rep      addObject (agent->mgmtObject, 0);      remoteAgents[connectionRef] = agent; -    QPID_LOG(trace, "Remote Agent registered bank=[" << brokerBank << "." << assignedBank << "] replyTo=" << replyToKey); +    QPID_LOG(debug, "Remote Agent registered bank=[" << brokerBank << "." << assignedBank << "] replyTo=" << replyToKey);      // Send an Attach Response      Buffer   outBuffer (outputBuffer, MA_BUFFER_SIZE); @@ -1734,7 +1764,7 @@ void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, const string& rep      outLen = MA_BUFFER_SIZE - outBuffer.available ();      outBuffer.reset ();      sendBufferLH(outBuffer, outLen, dExchange, replyToKey); -    QPID_LOG(trace, "SEND AttachResponse brokerBank=" << brokerBank << " agentBank=" << assignedBank << +    QPID_LOG(debug, "SEND AttachResponse brokerBank=" << brokerBank << " agentBank=" << assignedBank <<               " to=" << replyToKey << " seq=" << sequence);  } @@ -1747,7 +1777,7 @@ void ManagementAgent::handleGetQueryLH(Buffer& inBuffer, const string& replyToKe      ft.decode(inBuffer); -    QPID_LOG(trace, "RECV GetQuery (v1) query=" << ft << " seq=" << sequence); +    QPID_LOG(debug, "RECV GetQuery (v1) query=" << ft << " seq=" << sequence);      value = ft.get("_class");      if (value.get() == 0 || !value->convertsTo<string>()) { @@ -1776,7 +1806,7 @@ void ManagementAgent::handleGetQueryLH(Buffer& inBuffer, const string& replyToKe                  outLen = MA_BUFFER_SIZE - outBuffer.available ();                  outBuffer.reset ();                  sendBufferLH(outBuffer, outLen, dExchange, replyToKey); -                QPID_LOG(trace, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence); +                QPID_LOG(debug, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence);              }          }          sendCommandCompleteLH(replyToKey, sequence); @@ -1821,7 +1851,7 @@ void ManagementAgent::handleGetQueryLH(Buffer& inBuffer, const string& replyToKe                          outLen = MA_BUFFER_SIZE - outBuffer.available ();                          outBuffer.reset ();                          sendBufferLH(outBuffer, outLen, dExchange, replyToKey);   // drops lock -                        QPID_LOG(trace, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence); +                        QPID_LOG(debug, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence);                          continue;  // lock dropped, need to re-find _SAME_ objid as it may have been deleted.                      }                      encodeHeader(outBuffer, 'g', sequence); @@ -1837,7 +1867,7 @@ void ManagementAgent::handleGetQueryLH(Buffer& inBuffer, const string& replyToKe      if (outLen) {          outBuffer.reset ();          sendBufferLH(outBuffer, outLen, dExchange, replyToKey); -        QPID_LOG(trace, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence); +        QPID_LOG(debug, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence);      }      sendCommandCompleteLH(replyToKey, sequence); @@ -1853,7 +1883,7 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& replyTo      Variant::Map headers;      MapCodec::decode(body, inMap); -    QPID_LOG(trace, "RECV GetQuery (v2): map=" << inMap << " seq=" << cid); +    QPID_LOG(debug, "RECV GetQuery (v2): map=" << inMap << " seq=" << cid);      headers["method"] = "response";      headers["qmf.opcode"] = "_query_response"; @@ -1935,7 +1965,7 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& replyTo              ListCodec::encode(list_, content);              sendBufferLH(content, cid, headers, "amqp/list", v2Direct, replyTo); -            QPID_LOG(trace, "SENT QueryResponse (query by object_id) to=" << replyTo); +            QPID_LOG(debug, "SENT QueryResponse (query by object_id) to=" << replyTo);              return;          }      } else { @@ -1989,12 +2019,12 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& replyTo              ListCodec::encode(_list.front().asList(), content);              sendBufferLH(content, cid, headers, "amqp/list", v2Direct, replyTo);              _list.pop_front(); -            QPID_LOG(trace, "SENT QueryResponse (partial, query by schema_id) to=" << replyTo << " len=" << content.length()); +            QPID_LOG(debug, "SENT QueryResponse (partial, query by schema_id) to=" << replyTo << " len=" << content.length());          }          headers.erase("partial");          ListCodec::encode(_list.size() ? _list.front().asList() : Variant::List(), content);          sendBufferLH(content, cid, headers, "amqp/list", v2Direct, replyTo); -        QPID_LOG(trace, "SENT QueryResponse (query by schema_id) to=" << replyTo << " len=" << content.length()); +        QPID_LOG(debug, "SENT QueryResponse (query by schema_id) to=" << replyTo << " len=" << content.length());          return;      } @@ -2002,14 +2032,14 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& replyTo      string content;      ListCodec::encode(Variant::List(), content);      sendBufferLH(content, cid, headers, "amqp/list", v2Direct, replyTo); -    QPID_LOG(trace, "SENT QueryResponse (empty) to=" << replyTo); +    QPID_LOG(debug, "SENT QueryResponse (empty) to=" << replyTo);  }  void ManagementAgent::handleLocateRequestLH(const string&, const string& replyTo,                                              const string& cid)  { -    QPID_LOG(trace, "RCVD AgentLocateRequest"); +    QPID_LOG(debug, "RCVD AgentLocateRequest");      Variant::Map map;      Variant::Map headers; @@ -2028,7 +2058,7 @@ void ManagementAgent::handleLocateRequestLH(const string&, const string& replyTo      sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyTo);      clientWasAdded = true; -    QPID_LOG(trace, "SENT AgentLocateResponse replyTo=" << replyTo); +    QPID_LOG(debug, "SENT AgentLocateResponse replyTo=" << replyTo);  } @@ -2171,7 +2201,7 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg)                  sendBufferLH(outBuffer, outLen, dExchange, replyToKey);              } -            QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence); +            QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence);          }          return false; @@ -2269,7 +2299,7 @@ ManagementAgent::PackageMap::iterator ManagementAgent::findOrAddPackageLH(string      outLen = MA_BUFFER_SIZE - outBuffer.available ();      outBuffer.reset ();      sendBufferLH(outBuffer, outLen, mExchange, "schema.package"); -    QPID_LOG(trace, "SEND PackageInd package=" << name << " to=schema.package"); +    QPID_LOG(debug, "SEND PackageInd package=" << name << " to=schema.package");      return result.first;  } @@ -2639,12 +2669,13 @@ void ManagementAgent::importAgents(qpid::framing::Buffer& inBuf) {  }  namespace { -bool isNotDeleted(const ManagementObjectMap::value_type& value) { -    return !value.second->isDeleted(); +bool isDeleted(const ManagementObjectMap::value_type& value) { +    return value.second->isDeleted();  } -size_t countNotDeleted(const ManagementObjectMap& map) { -    return std::count_if(map.begin(), map.end(), isNotDeleted); +void summarizeMap(std::ostream& o, const char* name, const ManagementObjectMap& map) { +    size_t deleted = std::count_if(map.begin(), map.end(), isDeleted); +    o << map.size() << " " << name << " (" << deleted << " deleted), ";  }  void dumpMap(std::ostream& o, const ManagementObjectMap& map) { @@ -2657,13 +2688,18 @@ void dumpMap(std::ostream& o, const ManagementObjectMap& map) {  string ManagementAgent::debugSnapshot() {      ostringstream msg; -    msg << " management snapshot:"; -    for (RemoteAgentMap::const_iterator i=remoteAgents.begin(); -         i != remoteAgents.end(); ++i) -        msg << " " << i->second->routingKey; -    msg << " packages: " << packages.size(); -    msg << " objects: " << countNotDeleted(managementObjects); -    msg << " new objects: " << countNotDeleted(newManagementObjects); +    msg << " management snapshot: "; +    if (!remoteAgents.empty()) { +        msg <<  remoteAgents.size() << " agents("; +        for (RemoteAgentMap::const_iterator i=remoteAgents.begin(); +             i != remoteAgents.end(); ++i) +            msg << " " << i->second->routingKey; +        msg << "), "; +    } +    msg  << packages.size() << " packages, "; +    summarizeMap(msg, "objects", managementObjects); +    summarizeMap(msg, "new objects ", newManagementObjects); +    msg << pendingDeletedObjs.size() << " pending deletes" ;      return msg.str();  } diff --git a/cpp/src/qpid/management/ManagementObject.cpp b/cpp/src/qpid/management/ManagementObject.cpp index 670a242c02..cfdd58ed53 100644 --- a/cpp/src/qpid/management/ManagementObject.cpp +++ b/cpp/src/qpid/management/ManagementObject.cpp @@ -262,6 +262,7 @@ void ManagementObject::setUpdateTime()  void ManagementObject::resourceDestroy()  { +    QPID_LOG(trace, "Management object marked deleted: " << getObjectId().getV2Key());      destroyTime = sys::Duration(sys::EPOCH, sys::now());      deleted     = true;  } diff --git a/cpp/src/tests/cluster_test_logs.py b/cpp/src/tests/cluster_test_logs.py new file mode 100755 index 0000000000..160e15e628 --- /dev/null +++ b/cpp/src/tests/cluster_test_logs.py @@ -0,0 +1,105 @@ +#!/usr/bin/env python + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements.  See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership.  The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License.  You may obtain a copy of the License at +# +#   http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied.  See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Functions for comparing broker log files, used by cluster_tests.py. + +import os, os.path, re, glob +from itertools import izip + +def split_log(log): +    """Split a broker log at checkpoints where a member joins. +    Return the set of checkpoints discovered.""" +    checkpoint_re = re.compile("Member joined, frameSeq=([0-9]+), queue snapshot:") +    outfile = None +    checkpoints = [] +    for l in open(log): +        match = checkpoint_re.search(l) +        if match: +            checkpoint = match.groups()[0] +            checkpoints.append(checkpoint) +            if outfile: outfile.close() +            outfile = open("%s.%s"%(log, checkpoint), 'w') + +        if outfile: outfile.write(l) +    if outfile: outfile.close() +    return checkpoints + +def filter_log(log): +    """Filter the contents of a log file to remove data that is expected +    to differ between brokers in a cluster. Filtered log contents between +    the same checkpoints should match across the cluster.""" +    out = open("%s.filter"%(log), 'w') +    for l in open(log): +        # Lines to skip entirely +        skip = "|".join([ +            'local connection',         # Only on local broker +            'UPDATER|UPDATEE|OFFER',    # Ignore update process +            'stall for update|unstall, ignore update|cancelled offer .* unstall', +            'caught up', +            'active for links|Passivating links|Activating links', +            'info Connection.* connected to', # UpdateClient connection +            'warning Broker closed connection: 200, OK', +            'task late', +            'task overran' +            ]) +        if re.compile(skip).search(l): continue + +        # Regex to match a UUID +        uuid='\w\w\w\w\w\w\w\w-\w\w\w\w-\w\w\w\w-\w\w\w\w-\w\w\w\w\w\w\w\w\w\w\w\w' + +        # Regular expression substitutions to remove expected differences +        for pattern,subst in [ +            (r'\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d', ''), # Remove timestamp +            (r'cluster\([0-9.: ]*', 'cluster('), # Remove cluster node id +            (r' local\)| shadow\)', ')'), # Remove local/shadow indication +            (r'CATCHUP', 'READY'), # Treat catchup as equivalent to ready. +            # System UUID +            (r'(org.apache.qpid.broker:system[:(])%s(\)?)'%(uuid), r'\1UUID\2'), + +            # FIXME aconway 2010-12-20: substitutions to mask known problems +            #(r' len=\d+', ' len=NN'),   # buffer lengths +            #(r' map={.*_object_name:([^,}]*)[,}].*', r' \1'), # V2 map - just keep name +            #(r'\d+-\d+-\d+--\d+', 'X-X-X--X'), # V1 Object IDs +            ]: l = re.sub(pattern,subst,l) +        out.write(l) +    out.close() + +def verify_logs(logs): +    """Compare log files from cluster brokers, verify that they correspond correctly.""" +    for l in glob.glob("*.log"): filter_log(l) +    checkpoints = set() +    for l in glob.glob("*.filter"): checkpoints = checkpoints.union(set(split_log(l))) +    errors=[] +    for c in checkpoints: +        fragments = glob.glob("*.filter.%s"%(c)) +        fragments.sort(reverse=True, key=os.path.getsize) +        while len(fragments) >= 2: +            a = fragments.pop(0) +            b = fragments[0] +            for ab in izip(open(a), open(b)): +                if ab[0] != ab[1]: +                    errors.append("\n    %s %s"%(a, b)) +                    break +    if errors: +        raise Exception("Files differ in %s"%(os.getcwd())+"".join(errors)) + +# Can be run as a script. +if __name__ == "__main__": +    verify_logs(glob.glob("*.log")) diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py index daa47a6322..03913356ca 100755 --- a/cpp/src/tests/cluster_tests.py +++ b/cpp/src/tests/cluster_tests.py @@ -18,7 +18,7 @@  # under the License.  # -import os, signal, sys, time, imp, re, subprocess +import os, signal, sys, time, imp, re, subprocess, glob, cluster_test_logs  from qpid import datatypes, messaging  from qpid.brokertest import *  from qpid.harness import Skipped @@ -35,7 +35,7 @@ log = getLogger("qpid.cluster_tests")  # a non-0 code. Hence the apparently inconsistent use of EXPECT_EXIT_OK  # and EXPECT_EXIT_FAIL in some of the tests below. -# FIXME aconway 2010-03-11: resolve this - ideally any exit due to an error +# TODO aconway 2010-03-11: resolve this - ideally any exit due to an error  # should give non-0 exit status.  # Import scripts as modules @@ -299,7 +299,10 @@ class LongTests(BrokerTest):          for i in range(i, len(cluster)): cluster[i].kill()      def test_management(self, args=[]): -        """Stress test: Run management clients and other clients concurrently.""" +        """ +        Stress test: Run management clients and other clients concurrently +        while killing and restarting brokers. +        """          class ClientLoop(StoppableThread):              """Run a client executable in a loop.""" @@ -352,9 +355,9 @@ class LongTests(BrokerTest):                  finally: self.lock.release()                  StoppableThread.stop(self) -        # def test_management -        args += ["--mgmt-pub-interval", 1] # Publish management information every second. -        # FIXME aconway 2010-12-15: extra debugging +        # body of test_management() + +        args += ["--mgmt-pub-interval", 1]          args += ["--log-enable=trace+:management"]          # Use store if present.          if BrokerTest.store_lib: args +=["--load-module", BrokerTest.store_lib] @@ -403,6 +406,10 @@ class LongTests(BrokerTest):              start_mclients(cluster[alive])          for c in chain(mclients, *clients):              c.stop() +        # Verify that logs are consistent +        # FIXME aconway 2010-12-21: this is currently expected to fail due to +        # known bugs, see https://issues.apache.org/jira/browse/QPID-2982 +        self.assertRaises(Exception, cluster_test_logs.verify_logs, glob.glob("*.log"))      def test_management_qmf2(self):          self.test_management(args=["--mgmt-qmf2=yes"]) @@ -506,7 +513,7 @@ class StoreTests(BrokerTest):          self.assertEqual(a.wait(), 0)          self.assertEqual(c.wait(), 0)          # Mix members from both shutdown events, they should fail -        # FIXME aconway 2010-03-11: can't predict the exit status of these +        # TODO aconway 2010-03-11: can't predict the exit status of these          # as it depends on the order of delivery of initial-status messages.          # See comment at top of this file.          a = cluster.start("a", expect=EXPECT_UNKNOWN, wait=False) | 
