summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-01-07 16:32:34 +0000
committerAlan Conway <aconway@apache.org>2011-01-07 16:32:34 +0000
commit97ec99f115c5190be04963e2853d0315d9a75a52 (patch)
tree23eca9f137946af8e857c44a435126dc687322cd /cpp/src
parentbda33c5b69189bf645ff818d8315bb8fc3288b7a (diff)
downloadqpid-python-97ec99f115c5190be04963e2853d0315d9a75a52.tar.gz
QPID-2982: Improved cluster/management logging and automated test for log consistency.
The cluster_tests.py test_management test is augmented to compare broker logs after the test completes. Any inconsistency in the logs causes the test to fail. This check is currently disabled as it is failing due to known issues. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1056378 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp16
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.cpp46
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.h5
-rw-r--r--cpp/src/qpid/cluster/UpdateDataExchange.cpp16
-rw-r--r--cpp/src/qpid/cluster/UpdateDataExchange.h8
-rw-r--r--cpp/src/qpid/management/ManagementAgent.cpp184
-rw-r--r--cpp/src/qpid/management/ManagementObject.cpp1
-rwxr-xr-xcpp/src/tests/cluster_test_logs.py105
-rwxr-xr-xcpp/src/tests/cluster_tests.py21
9 files changed, 284 insertions, 118 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 0013c370a7..5720f7fcc1 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -265,7 +265,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
"Error delivering frames",
poller),
failoverExchange(new FailoverExchange(broker.GetVhostObject(), &broker)),
- updateDataExchange(new UpdateDataExchange(this)),
+ updateDataExchange(new UpdateDataExchange(*this)),
quorum(boost::bind(&Cluster::leave, this)),
decoder(boost::bind(&Cluster::deliverFrame, this, _1)),
discarding(true),
@@ -356,7 +356,7 @@ void Cluster::addLocalConnection(const boost::intrusive_ptr<Connection>& c) {
// Called in connection thread to insert an updated shadow connection.
void Cluster::addShadowConnection(const boost::intrusive_ptr<Connection>& c) {
- QPID_LOG(info, *this << " new shadow connection " << c->getId());
+ QPID_LOG(debug, *this << " new shadow connection " << c->getId());
// Safe to use connections here because we're pre-catchup, stalled
// and discarding, so deliveredFrame is not processing any
// connection events.
@@ -749,7 +749,7 @@ struct AppendQueue {
std::string Cluster::debugSnapshot() {
assertClusterSafe();
std::ostringstream msg;
- msg << "queue snapshot at " << map.getFrameSeq() << ":";
+ msg << "Member joined, frameSeq=" << map.getFrameSeq() << ", queue snapshot:";
AppendQueue append(msg);
broker.getQueues().eachQueue(append);
return msg.str();
@@ -837,7 +837,7 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, Lock& l)
checkUpdateIn(l);
}
else {
- QPID_LOG(debug,*this << " unstall, ignore update " << updater
+ QPID_LOG(info, *this << " unstall, ignore update " << updater
<< " to " << updatee);
deliverEventQueue.start(); // Not involved in update.
}
@@ -932,15 +932,15 @@ void Cluster::checkUpdateIn(Lock& l) {
// NB: don't updateMgmtMembership() here as we are not in the deliver
// thread. It will be updated on delivery of the "ready" we just mcast.
broker.setClusterUpdatee(false);
+ discarding = false; // OK to set, we're stalled for update.
+ QPID_LOG(notice, *this << " update complete, starting catch-up.");
+ QPID_LOG(debug, debugSnapshot()); // OK to call because we're stalled.
if (mAgent) {
// Update management agent now, after all update activity is complete.
updateDataExchange->updateManagementAgent(mAgent);
mAgent->suppress(false); // Enable management output.
mAgent->clusterUpdate();
}
- discarding = false; // OK to set, we're stalled for update.
- QPID_LOG(notice, *this << " update complete, starting catch-up.");
- QPID_LOG(debug, debugSnapshot()); // OK to call because we're stalled.
enableClusterSafe(); // Enable cluster-safe assertions
deliverEventQueue.start();
}
@@ -1111,7 +1111,7 @@ void Cluster::setClusterId(const Uuid& uuid, Lock&) {
mgmtObject->set_clusterID(clusterId.str());
mgmtObject->set_memberID(stream.str());
}
- QPID_LOG(debug, *this << " cluster-uuid = " << clusterId);
+ QPID_LOG(notice, *this << " cluster-uuid = " << clusterId);
}
void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) {
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp
index c52caf6aa9..6b324be4c5 100644
--- a/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -78,6 +78,10 @@ using namespace framing;
namespace arg=client::arg;
using client::SessionBase_0_10Access;
+std::ostream& operator<<(std::ostream& o, const UpdateClient& c) {
+ return o << "cluster(" << c.updaterId << " UPDATER)";
+}
+
struct ClusterConnectionProxy : public AMQP_AllProxy::ClusterConnection, public framing::FrameHandler
{
boost::shared_ptr<qpid::client::ConnectionImpl> connection;
@@ -142,7 +146,7 @@ void UpdateClient::run() {
}
void UpdateClient::update() {
- QPID_LOG(debug, updaterId << " updating state to " << updateeId
+ QPID_LOG(debug, *this << " updating state to " << updateeId
<< " at " << updateeUrl);
Broker& b = updaterBroker;
@@ -177,14 +181,14 @@ void UpdateClient::update() {
// NOTE: connection will be closed from the other end, don't close
// it here as that causes a race.
- // FIXME aconway 2010-03-15: This sleep avoids the race condition
+ // TODO aconway 2010-03-15: This sleep avoids the race condition
// described in // https://bugzilla.redhat.com/show_bug.cgi?id=568831.
// It allows the connection to fully close before destroying the
// Connection object. Remove when the bug is fixed.
//
sys::usleep(10*1000);
- QPID_LOG(debug, updaterId << " update completed to " << updateeId
+ QPID_LOG(debug, *this << " update completed to " << updateeId
<< " at " << updateeUrl << ": " << membership);
}
@@ -205,7 +209,7 @@ void UpdateClient::updateManagementSetupState()
management::ManagementAgent* agent = updaterBroker.getManagementAgent();
if (!agent) return;
- QPID_LOG(debug, updaterId << " updating management setup-state.");
+ QPID_LOG(debug, *this << " updating management setup-state.");
std::string vendor, product, instance;
agent->getName(vendor, product, instance);
ClusterConnectionProxy(session).managementSetupState(
@@ -219,19 +223,19 @@ void UpdateClient::updateManagementAgent()
if (!agent) return;
string data;
- QPID_LOG(debug, updaterId << " updating management schemas. ")
+ QPID_LOG(debug, *this << " updating management schemas. ")
agent->exportSchemas(data);
session.messageTransfer(
arg::content=client::Message(data, UpdateDataExchange::MANAGEMENT_SCHEMAS_KEY),
arg::destination=UpdateDataExchange::EXCHANGE_NAME);
- QPID_LOG(debug, updaterId << " updating management agents. ")
+ QPID_LOG(debug, *this << " updating management agents. ")
agent->exportAgents(data);
session.messageTransfer(
arg::content=client::Message(data, UpdateDataExchange::MANAGEMENT_AGENTS_KEY),
arg::destination=UpdateDataExchange::EXCHANGE_NAME);
- QPID_LOG(debug, updaterId << " updating management deleted objects. ")
+ QPID_LOG(debug, *this << " updating management deleted objects. ")
typedef management::ManagementAgent::DeletedObjectList DeletedObjectList;
DeletedObjectList deleted;
agent->exportDeletedObjects(deleted);
@@ -248,7 +252,7 @@ void UpdateClient::updateManagementAgent()
}
void UpdateClient::updateExchange(const boost::shared_ptr<Exchange>& ex) {
- QPID_LOG(debug, updaterId << " updating exchange " << ex->getName());
+ QPID_LOG(debug, *this << " updating exchange " << ex->getName());
ClusterConnectionProxy(session).exchange(encode(*ex));
}
@@ -341,13 +345,13 @@ void UpdateClient::updateQueue(client::AsyncSession& s, const boost::shared_ptr<
}
void UpdateClient::updateExclusiveQueue(const boost::shared_ptr<broker::Queue>& q) {
- QPID_LOG(debug, updaterId << " updating exclusive queue " << q->getName() << " on " << shadowSession.getId());
+ QPID_LOG(debug, *this << " updating exclusive queue " << q->getName() << " on " << shadowSession.getId());
updateQueue(shadowSession, q);
}
void UpdateClient::updateNonExclusiveQueue(const boost::shared_ptr<broker::Queue>& q) {
if (!q->hasExclusiveOwner()) {
- QPID_LOG(debug, updaterId << " updating queue " << q->getName());
+ QPID_LOG(debug, *this << " updating queue " << q->getName());
updateQueue(session, q);
}//else queue will be updated as part of session state of owning session
}
@@ -362,12 +366,12 @@ void UpdateClient::updateOutputTask(const sys::OutputTask* task) {
SemanticState::ConsumerImpl* ci = const_cast<SemanticState::ConsumerImpl*>(cci);
uint16_t channel = ci->getParent().getSession().getChannel();
ClusterConnectionProxy(shadowConnection).outputTask(channel, ci->getName());
- QPID_LOG(debug, updaterId << " updating output task " << ci->getName()
+ QPID_LOG(debug, *this << " updating output task " << ci->getName()
<< " channel=" << channel);
}
void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& updateConnection) {
- QPID_LOG(debug, updaterId << " updating connection " << *updateConnection);
+ QPID_LOG(debug, *this << " updating connection " << *updateConnection);
assert(updateConnection->getBrokerConnection());
broker::Connection& bc = *updateConnection->getBrokerConnection();
@@ -398,14 +402,14 @@ void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& upda
updateConnection->getOutput().getSendMax()
);
shadowConnection.close();
- QPID_LOG(debug, updaterId << " updated connection " << *updateConnection);
+ QPID_LOG(debug, *this << " updated connection " << *updateConnection);
}
void UpdateClient::updateSession(broker::SessionHandler& sh) {
broker::SessionState* ss = sh.getSession();
if (!ss) return; // no session.
- QPID_LOG(debug, updaterId << " updating session " << ss->getId());
+ QPID_LOG(debug, *this << " updating session " << ss->getId());
// Create a client session to update session state.
boost::shared_ptr<client::ConnectionImpl> cimpl = client::ConnectionAccess::getImpl(shadowConnection);
@@ -416,14 +420,14 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) {
// Re-create session state on remote connection.
- QPID_LOG(debug, updaterId << " updating exclusive queues.");
+ QPID_LOG(debug, *this << " updating exclusive queues.");
ss->getSessionAdapter().eachExclusiveQueue(boost::bind(&UpdateClient::updateExclusiveQueue, this, _1));
- QPID_LOG(debug, updaterId << " updating consumers.");
+ QPID_LOG(debug, *this << " updating consumers.");
ss->getSemanticState().eachConsumer(
boost::bind(&UpdateClient::updateConsumer, this, _1));
- QPID_LOG(debug, updaterId << " updating unacknowledged messages.");
+ QPID_LOG(debug, *this << " updating unacknowledged messages.");
broker::DeliveryRecords& drs = ss->getSemanticState().getUnacked();
std::for_each(drs.begin(), drs.end(),
boost::bind(&UpdateClient::updateUnacked, this, _1));
@@ -454,13 +458,13 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) {
if (inProgress) {
inProgress->getFrames().map(simpl->out);
}
- QPID_LOG(debug, updaterId << " updated session " << sh.getSession()->getId());
+ QPID_LOG(debug, *this << " updated session " << sh.getSession()->getId());
}
void UpdateClient::updateConsumer(
const broker::SemanticState::ConsumerImpl::shared_ptr& ci)
{
- QPID_LOG(debug, updaterId << " updating consumer " << ci->getName() << " on "
+ QPID_LOG(debug, *this << " updating consumer " << ci->getName() << " on "
<< shadowSession.getId());
using namespace message;
@@ -485,7 +489,7 @@ void UpdateClient::updateConsumer(
);
consumerNumbering.add(ci);
- QPID_LOG(debug, updaterId << " updated consumer " << ci->getName()
+ QPID_LOG(debug, *this << " updated consumer " << ci->getName()
<< " on " << shadowSession.getId());
}
@@ -552,7 +556,7 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater {
};
void UpdateClient::updateTxState(broker::SemanticState& s) {
- QPID_LOG(debug, updaterId << " updating TX transaction state.");
+ QPID_LOG(debug, *this << " updating TX transaction state.");
ClusterConnectionProxy proxy(shadowSession);
proxy.accumulatedAck(s.getAccumulatedAck());
broker::TxBuffer::shared_ptr txBuffer = s.getTxBuffer();
diff --git a/cpp/src/qpid/cluster/UpdateClient.h b/cpp/src/qpid/cluster/UpdateClient.h
index be09af7e81..76621cd7ba 100644
--- a/cpp/src/qpid/cluster/UpdateClient.h
+++ b/cpp/src/qpid/cluster/UpdateClient.h
@@ -30,7 +30,7 @@
#include "qpid/broker/SemanticState.h"
#include "qpid/sys/Runnable.h"
#include <boost/shared_ptr.hpp>
-
+#include <iosfwd>
namespace qpid {
@@ -114,8 +114,11 @@ class UpdateClient : public sys::Runnable {
boost::function<void()> done;
boost::function<void(const std::exception& e)> failed;
client::ConnectionSettings connectionSettings;
+
+ friend std::ostream& operator<<(std::ostream&, const UpdateClient&);
};
+
}} // namespace qpid::cluster
#endif /*!QPID_CLUSTER_UPDATECLIENT_H*/
diff --git a/cpp/src/qpid/cluster/UpdateDataExchange.cpp b/cpp/src/qpid/cluster/UpdateDataExchange.cpp
index 2f242b3024..2a079b8881 100644
--- a/cpp/src/qpid/cluster/UpdateDataExchange.cpp
+++ b/cpp/src/qpid/cluster/UpdateDataExchange.cpp
@@ -19,6 +19,7 @@
*
*/
#include "UpdateDataExchange.h"
+#include "Cluster.h"
#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/broker/Deliverable.h"
#include "qpid/broker/Message.h"
@@ -35,8 +36,13 @@ const std::string UpdateDataExchange::MANAGEMENT_AGENTS_KEY("management-agents")
const std::string UpdateDataExchange::MANAGEMENT_SCHEMAS_KEY("management-schemas");
const std::string UpdateDataExchange::MANAGEMENT_DELETED_OBJECTS_KEY("management-deleted-objects");
-UpdateDataExchange::UpdateDataExchange(management::Manageable* parent) :
- Exchange(EXCHANGE_NAME, parent)
+std::ostream& operator<<(std::ostream& o, const UpdateDataExchange& c) {
+ return o << "cluster(" << c.clusterId << " UPDATER)";
+}
+
+UpdateDataExchange::UpdateDataExchange(Cluster& cluster) :
+ Exchange(EXCHANGE_NAME, &cluster),
+ clusterId(cluster.getId())
{}
void UpdateDataExchange::route(broker::Deliverable& msg, const std::string& routingKey,
@@ -56,11 +62,11 @@ void UpdateDataExchange::updateManagementAgent(management::ManagementAgent* agen
framing::Buffer buf1(const_cast<char*>(managementAgents.data()), managementAgents.size());
agent->importAgents(buf1);
- QPID_LOG(debug, " Updated management agents.");
+ QPID_LOG(debug, *this << " updated management agents.");
framing::Buffer buf2(const_cast<char*>(managementSchemas.data()), managementSchemas.size());
agent->importSchemas(buf2);
- QPID_LOG(debug, " Updated management schemas");
+ QPID_LOG(debug, *this << " updated management schemas.");
using amqp_0_10::ListCodec;
using types::Variant;
@@ -72,7 +78,7 @@ void UpdateDataExchange::updateManagementAgent(management::ManagementAgent* agen
new management::ManagementAgent::DeletedObject(*i)));
}
agent->importDeletedObjects(objects);
- QPID_LOG(debug, " Updated management deleted objects.");
+ QPID_LOG(debug, *this << " updated management deleted objects.");
}
diff --git a/cpp/src/qpid/cluster/UpdateDataExchange.h b/cpp/src/qpid/cluster/UpdateDataExchange.h
index 27a98548f3..8c493e400a 100644
--- a/cpp/src/qpid/cluster/UpdateDataExchange.h
+++ b/cpp/src/qpid/cluster/UpdateDataExchange.h
@@ -23,6 +23,8 @@
*/
#include "qpid/broker/Exchange.h"
+#include "types.h"
+#include <iosfwd>
namespace qpid {
@@ -31,6 +33,7 @@ class ManagementAgent;
}
namespace cluster {
+class Cluster;
/**
* An exchange used to send data that is to large for a control
@@ -45,7 +48,7 @@ class UpdateDataExchange : public broker::Exchange
static const std::string MANAGEMENT_SCHEMAS_KEY;
static const std::string MANAGEMENT_DELETED_OBJECTS_KEY;
- UpdateDataExchange(management::Manageable* parent);
+ UpdateDataExchange(Cluster& parent);
void route(broker::Deliverable& msg, const std::string& routingKey,
const framing::FieldTable* args);
@@ -71,10 +74,11 @@ class UpdateDataExchange : public broker::Exchange
void updateManagementAgent(management::ManagementAgent* agent);
private:
-
+ MemberId clusterId;
std::string managementAgents;
std::string managementSchemas;
std::string managementDeletedObjects;
+ friend std::ostream& operator<<(std::ostream&, const UpdateDataExchange&);
};
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp
index 07751f57ef..7b60ea35c4 100644
--- a/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/cpp/src/qpid/management/ManagementAgent.cpp
@@ -18,7 +18,12 @@
* under the License.
*
*/
-
+
+
+// NOTE on use of log levels: The criteria for using trace vs. debug
+// is to use trace for log messages that are generated for each
+// unbatched stats/props notification and debug for everything else.
+
#include "qpid/management/ManagementAgent.h"
#include "qpid/management/ManagementObject.h"
#include "qpid/broker/DeliverableMessage.h"
@@ -89,7 +94,7 @@ static Variant::Map mapEncodeSchemaId(const string& pname,
ManagementAgent::RemoteAgent::~RemoteAgent ()
{
- QPID_LOG(trace, "Remote Agent removed bank=[" << brokerBank << "." << agentBank << "]");
+ QPID_LOG(debug, "Remote Agent removed bank=[" << brokerBank << "." << agentBank << "]");
if (mgmtObject != 0) {
mgmtObject->resourceDestroy();
agent.deleteObjectNowLH(mgmtObject->getObjectId());
@@ -169,7 +174,7 @@ void ManagementAgent::configure(const string& _dataDir, uint16_t _interval,
uuid.generate();
QPID_LOG (info, "No stored broker ID found - ManagementAgent generated broker ID: " << uuid);
} else
- QPID_LOG (debug, "ManagementAgent restored broker ID: " << uuid);
+ QPID_LOG (info, "ManagementAgent restored broker ID: " << uuid);
// if sequence goes beyond a 12-bit field, skip zero and wrap to 1.
bootSequence++;
@@ -308,7 +313,7 @@ ObjectId ManagementAgent::addObject(ManagementObject* object, uint64_t persistId
}
newManagementObjects[objId] = object;
}
-
+ QPID_LOG(debug, "Management object (V1) added: " << objId.getV2Key());
return objId;
}
@@ -330,7 +335,6 @@ ObjectId ManagementAgent::addObject(ManagementObject* object,
}
object->setObjectId(objId);
-
{
sys::Mutex::ScopedLock lock(addLock);
ManagementObjectMap::iterator destIter = newManagementObjects.find(objId);
@@ -340,7 +344,7 @@ ObjectId ManagementAgent::addObject(ManagementObject* object,
}
newManagementObjects[objId] = object;
}
-
+ QPID_LOG(debug, "Management object added: " << objId.getV2Key());
return objId;
}
@@ -370,7 +374,7 @@ void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severi
outBuffer.reset();
sendBufferLH(outBuffer, outLen, mExchange,
"console.event.1.0." + event.getPackageName() + "." + event.getEventName());
- QPID_LOG(trace, "SEND raiseEvent (v1) class=" << event.getPackageName() << "." << event.getEventName());
+ QPID_LOG(debug, "SEND raiseEvent (v1) class=" << event.getPackageName() << "." << event.getEventName());
}
if (qmf2Support) {
@@ -408,9 +412,8 @@ void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severi
list_.push_back(map_);
ListCodec::encode(list_, content);
sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str());
- QPID_LOG(trace, "SEND raiseEvent (v2) class=" << event.getPackageName() << "." << event.getEventName());
+ QPID_LOG(debug, "SEND raiseEvent (v2) class=" << event.getPackageName() << "." << event.getEventName());
}
-
}
ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds)
@@ -467,7 +470,7 @@ void ManagementAgent::clientAdded (const string& routingKey)
outLen = outBuffer.getPosition();
outBuffer.reset();
sendBufferLH(outBuffer, outLen, dExchange, rkeys.front());
- QPID_LOG(trace, "SEND ConsoleAddedIndication to=" << rkeys.front());
+ QPID_LOG(debug, "SEND ConsoleAddedIndication to=" << rkeys.front());
rkeys.pop_front();
}
}
@@ -476,8 +479,10 @@ void ManagementAgent::clusterUpdate() {
// Called on all cluster memebers when a new member joins a cluster.
// Set clientWasAdded so that on the next periodicProcessing we will do
// a full update on all cluster members.
+ sys::Mutex::ScopedLock l(userLock);
+ moveNewObjectsLH(); // to be consistent with updater/updatee.
clientWasAdded = true;
- QPID_LOG(debug, "cluster update " << debugSnapshot());
+ QPID_LOG(debug, "Cluster member joined, " << debugSnapshot());
}
void ManagementAgent::encodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq)
@@ -509,7 +514,7 @@ void ManagementAgent::sendBufferLH(Buffer& buf,
string routingKey)
{
if (suppressed) {
- QPID_LOG(trace, "Suppressing management message to " << routingKey);
+ QPID_LOG(debug, "Suppressing management message to " << routingKey);
return;
}
if (exchange.get() == 0) return;
@@ -564,7 +569,7 @@ void ManagementAgent::sendBufferLH(const string& data,
Variant::Map::const_iterator i;
if (suppressed) {
- QPID_LOG(trace, "Suppressing management message to " << routingKey);
+ QPID_LOG(debug, "Suppressing management message to " << routingKey);
return;
}
if (exchange.get() == 0) return;
@@ -637,7 +642,7 @@ void ManagementAgent::periodicProcessing (void)
{
#define BUFSIZE 65536
#define HEADROOM 4096
- QPID_LOG(trace, "Management agent periodic processing");
+ QPID_LOG(debug, "Management agent periodic processing");
sys::Mutex::ScopedLock lock (userLock);
char msgChars[BUFSIZE];
uint32_t contentSize;
@@ -776,17 +781,26 @@ void ManagementAgent::periodicProcessing (void)
send_stats = (object->hasInst() && (object->getInstChanged() || object->getForcePublish()));
if (send_props && qmf1Support) {
+ size_t pos = msgBuffer.getPosition();
encodeHeader(msgBuffer, 'c');
sBuf.clear();
object->writeProperties(sBuf);
msgBuffer.putRawData(sBuf);
+ QPID_LOG(trace, "Changed V1 properties "
+ << object->getObjectId().getV2Key()
+ << " len=" << msgBuffer.getPosition()-pos);
}
if (send_stats && qmf1Support) {
+ size_t pos = msgBuffer.getPosition();
encodeHeader(msgBuffer, 'i');
sBuf.clear();
object->writeStatistics(sBuf);
msgBuffer.putRawData(sBuf);
+ QPID_LOG(trace, "Changed V1 statistics "
+ << object->getObjectId().getV2Key()
+ << " len=" << msgBuffer.getPosition()-pos);
+
}
if ((send_stats || send_props) && qmf2Support) {
@@ -805,6 +819,10 @@ void ManagementAgent::periodicProcessing (void)
map_["_values"] = values;
list_.push_back(map_);
v2Objs++;
+ QPID_LOG(trace, "Changed V2"
+ << (send_stats? " statistics":"")
+ << (send_props? " properties":"")
+ << " map=" << map_);
}
if (send_props) pcount++;
@@ -826,7 +844,10 @@ void ManagementAgent::periodicProcessing (void)
key << "console.obj.1.0." << packageName << "." << className;
msgBuffer.reset();
sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); // UNLOCKS USERLOCK
- QPID_LOG(trace, "SEND V1 Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount << " len=" << contentSize);
+ QPID_LOG(debug, "SEND V1 Multicast ContentInd to=" << key.str()
+ << " props=" << pcount
+ << " stats=" << scount
+ << " len=" << contentSize);
}
}
@@ -849,7 +870,10 @@ void ManagementAgent::periodicProcessing (void)
headers["qmf.agent"] = name_address;
sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); // UNLOCKS USERLOCK
- QPID_LOG(trace, "SEND Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount << " len=" << content.length());
+ QPID_LOG(debug, "SEND Multicast ContentInd to=" << key.str()
+ << " props=" << pcount
+ << " stats=" << scount
+ << " len=" << content.length());
}
}
}
@@ -877,15 +901,19 @@ void ManagementAgent::periodicProcessing (void)
for (DeletedObjectList::iterator lIter = mIter->second.begin();
lIter != mIter->second.end(); lIter++) {
-
+ std::string oid = (*lIter)->objectId;
if (!(*lIter)->encodedV1Config.empty()) {
encodeHeader(msgBuffer, 'c');
msgBuffer.putRawData((*lIter)->encodedV1Config);
+ QPID_LOG(trace, "Deleting V1 properties " << oid
+ << " len=" << (*lIter)->encodedV1Config.size());
v1Objs++;
}
if (!(*lIter)->encodedV1Inst.empty()) {
encodeHeader(msgBuffer, 'i');
msgBuffer.putRawData((*lIter)->encodedV1Inst);
+ QPID_LOG(trace, "Deleting V1 statistics " << oid
+ << " len=" << (*lIter)->encodedV1Inst.size());
v1Objs++;
}
if (v1Objs && msgBuffer.available() < HEADROOM) {
@@ -895,10 +923,12 @@ void ManagementAgent::periodicProcessing (void)
key << "console.obj.1.0." << packageName << "." << className;
msgBuffer.reset();
sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); // UNLOCKS USERLOCK
- QPID_LOG(trace, "SEND V1 Multicast ContentInd V1 (delete) to=" << key.str() << " len=" << contentSize);
+ QPID_LOG(debug, "SEND V1 Multicast ContentInd V1 (delete) to="
+ << key.str() << " len=" << contentSize);
}
if (!(*lIter)->encodedV2.empty()) {
+ QPID_LOG(trace, "Deleting V2 " << "map=" << (*lIter)->encodedV2);
list_.push_back((*lIter)->encodedV2);
if (++v2Objs >= maxV2ReplyObjs) {
v2Objs = 0;
@@ -922,7 +952,7 @@ void ManagementAgent::periodicProcessing (void)
headers["qmf.agent"] = name_address;
sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); // UNLOCKS USERLOCK
- QPID_LOG(trace, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length());
+ QPID_LOG(debug, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length());
}
}
}
@@ -936,7 +966,7 @@ void ManagementAgent::periodicProcessing (void)
key << "console.obj.1.0." << packageName << "." << className;
msgBuffer.reset();
sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); // UNLOCKS USERLOCK
- QPID_LOG(trace, "SEND V1 Multicast ContentInd V1 (delete) to=" << key.str() << " len=" << contentSize);
+ QPID_LOG(debug, "SEND V1 Multicast ContentInd V1 (delete) to=" << key.str() << " len=" << contentSize);
}
if (!list_.empty()) {
@@ -959,7 +989,7 @@ void ManagementAgent::periodicProcessing (void)
headers["qmf.agent"] = name_address;
sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); // UNLOCKS USERLOCK
- QPID_LOG(trace, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length());
+ QPID_LOG(debug, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length());
}
}
} // end map
@@ -984,7 +1014,7 @@ void ManagementAgent::periodicProcessing (void)
msgBuffer.reset ();
routingKey = "console.heartbeat.1.0";
sendBufferLH(msgBuffer, contentSize, mExchange, routingKey);
- QPID_LOG(trace, "SEND HeartbeatInd to=" << routingKey);
+ QPID_LOG(debug, "SEND HeartbeatInd to=" << routingKey);
}
if (qmf2Support) {
@@ -1013,7 +1043,7 @@ void ManagementAgent::periodicProcessing (void)
// time to prevent stale heartbeats from getting to the consoles.
sendBufferLH(content, "", headers, "amqp/map", v2Topic, addr_key.str(), interval * 2 * 1000);
- QPID_LOG(trace, "SENT AgentHeartbeat name=" << name_address);
+ QPID_LOG(debug, "SENT AgentHeartbeat name=" << name_address);
}
QPID_LOG(debug, "periodic update " << debugSnapshot());
}
@@ -1073,7 +1103,7 @@ void ManagementAgent::deleteObjectNowLH(const ObjectId& oid)
uint32_t contentSize = msgBuffer.getPosition();
msgBuffer.reset();
sendBufferLH(msgBuffer, contentSize, mExchange, v1key.str());
- QPID_LOG(trace, "SEND Immediate(delete) ContentInd to=" << v1key.str());
+ QPID_LOG(debug, "SEND Immediate(delete) ContentInd to=" << v1key.str());
}
if (qmf2Support) {
@@ -1086,7 +1116,7 @@ void ManagementAgent::deleteObjectNowLH(const ObjectId& oid)
string content;
ListCodec::encode(list_, content);
sendBufferLH(content, "", headers, "amqp/list", v2Topic, v2key.str());
- QPID_LOG(trace, "SEND Immediate(delete) ContentInd to=" << v2key.str());
+ QPID_LOG(debug, "SEND Immediate(delete) ContentInd to=" << v2key.str());
}
}
@@ -1102,7 +1132,7 @@ void ManagementAgent::sendCommandCompleteLH(const string& replyToKey, uint32_t s
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(trace, "SEND CommandCompleteInd code=" << code << " text=" << text << " to=" <<
+ QPID_LOG(debug, "SEND CommandCompleteInd code=" << code << " text=" << text << " to=" <<
replyToKey << " seq=" << sequence);
}
@@ -1127,7 +1157,7 @@ void ManagementAgent::sendExceptionLH(const string& replyToKey, const string& ci
MapCodec::encode(map, content);
sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyToKey);
- QPID_LOG(trace, "SENT Exception code=" << code <<" text=" << text);
+ QPID_LOG(debug, "SENT Exception code=" << code <<" text=" << text);
}
bool ManagementAgent::dispatchCommand (Deliverable& deliverable,
@@ -1221,7 +1251,7 @@ void ManagementAgent::handleMethodRequestLH(Buffer& inBuffer, const string& repl
inBuffer.getShortString(methodName);
inBuffer.getRawData(inArgs, inBuffer.available());
- QPID_LOG(trace, "RECV MethodRequest (v1) class=" << packageName << ":" << className << "(" << Uuid(hash) << ") method=" <<
+ QPID_LOG(debug, "RECV MethodRequest (v1) class=" << packageName << ":" << className << "(" << Uuid(hash) << ") method=" <<
methodName << " replyTo=" << replyToKey);
encodeHeader(outBuffer, 'm', sequence);
@@ -1232,7 +1262,7 @@ void ManagementAgent::handleMethodRequestLH(Buffer& inBuffer, const string& repl
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN reason='All QMFv1 Methods Forbidden' seq=" << sequence);
+ QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN reason='All QMFv1 Methods Forbidden' seq=" << sequence);
return;
}
@@ -1243,7 +1273,7 @@ void ManagementAgent::handleMethodRequestLH(Buffer& inBuffer, const string& repl
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << sequence);
+ QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << sequence);
return;
}
@@ -1259,7 +1289,7 @@ void ManagementAgent::handleMethodRequestLH(Buffer& inBuffer, const string& repl
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence);
+ QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence);
return;
}
}
@@ -1291,7 +1321,7 @@ void ManagementAgent::handleMethodRequestLH(Buffer& inBuffer, const string& repl
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(trace, "SEND MethodResponse (v1) to=" << replyToKey << " seq=" << sequence);
+ QPID_LOG(debug, "SEND MethodResponse (v1) to=" << replyToKey << " seq=" << sequence);
}
@@ -1374,7 +1404,7 @@ void ManagementAgent::handleMethodRequestLH (const string& body, const string& r
// invoke the method
- QPID_LOG(trace, "RECV MethodRequest (v2) class=" << iter->second->getPackageName()
+ QPID_LOG(debug, "RECV MethodRequest (v2) class=" << iter->second->getPackageName()
<< ":" << iter->second->getClassName() << " method=" <<
methodName << " replyTo=" << replyTo << " objId=" << objId << " inArgs=" << inArgs);
@@ -1402,7 +1432,7 @@ void ManagementAgent::handleMethodRequestLH (const string& body, const string& r
MapCodec::encode(outMap, content);
sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyTo);
- QPID_LOG(trace, "SEND MethodResponse (v2) to=" << replyTo << " seq=" << cid << " map=" << outMap);
+ QPID_LOG(debug, "SEND MethodResponse (v2) to=" << replyTo << " seq=" << cid << " map=" << outMap);
}
@@ -1411,7 +1441,7 @@ void ManagementAgent::handleBrokerRequestLH (Buffer&, const string& replyToKey,
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- QPID_LOG(trace, "RECV BrokerRequest replyTo=" << replyToKey);
+ QPID_LOG(debug, "RECV BrokerRequest replyTo=" << replyToKey);
encodeHeader (outBuffer, 'b', sequence);
uuid.encode (outBuffer);
@@ -1419,12 +1449,12 @@ void ManagementAgent::handleBrokerRequestLH (Buffer&, const string& replyToKey,
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(trace, "SEND BrokerResponse to=" << replyToKey);
+ QPID_LOG(debug, "SEND BrokerResponse to=" << replyToKey);
}
void ManagementAgent::handlePackageQueryLH (Buffer&, const string& replyToKey, uint32_t sequence)
{
- QPID_LOG(trace, "RECV PackageQuery replyTo=" << replyToKey);
+ QPID_LOG(debug, "RECV PackageQuery replyTo=" << replyToKey);
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
@@ -1440,7 +1470,7 @@ void ManagementAgent::handlePackageQueryLH (Buffer&, const string& replyToKey, u
if (outLen) {
outBuffer.reset ();
sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(trace, "SEND PackageInd to=" << replyToKey << " seq=" << sequence);
+ QPID_LOG(debug, "SEND PackageInd to=" << replyToKey << " seq=" << sequence);
}
sendCommandCompleteLH(replyToKey, sequence);
@@ -1452,7 +1482,7 @@ void ManagementAgent::handlePackageIndLH (Buffer& inBuffer, const string& replyT
inBuffer.getShortString(packageName);
- QPID_LOG(trace, "RECV PackageInd package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence);
+ QPID_LOG(debug, "RECV PackageInd package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence);
findOrAddPackageLH(packageName);
}
@@ -1463,7 +1493,7 @@ void ManagementAgent::handleClassQueryLH(Buffer& inBuffer, const string& replyTo
inBuffer.getShortString(packageName);
- QPID_LOG(trace, "RECV ClassQuery package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence);
+ QPID_LOG(debug, "RECV ClassQuery package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence);
PackageMap::iterator pIter = packages.find(packageName);
if (pIter != packages.end())
@@ -1489,7 +1519,7 @@ void ManagementAgent::handleClassQueryLH(Buffer& inBuffer, const string& replyTo
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(trace, "SEND ClassInd class=" << packageName << ":" << classes.front().first.name <<
+ QPID_LOG(debug, "SEND ClassInd class=" << packageName << ":" << classes.front().first.name <<
"(" << Uuid(classes.front().first.hash) << ") to=" << replyToKey << " seq=" << sequence);
classes.pop_front();
}
@@ -1508,7 +1538,7 @@ void ManagementAgent::handleClassIndLH (Buffer& inBuffer, const string& replyToK
inBuffer.getShortString(key.name);
inBuffer.getBin128(key.hash);
- QPID_LOG(trace, "RECV ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) <<
+ QPID_LOG(debug, "RECV ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) <<
"), replyTo=" << replyToKey);
PackageMap::iterator pIter = findOrAddPackageLH(packageName);
@@ -1525,7 +1555,7 @@ void ManagementAgent::handleClassIndLH (Buffer& inBuffer, const string& replyToK
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(trace, "SEND SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) <<
+ QPID_LOG(debug, "SEND SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) <<
"), to=" << replyToKey << " seq=" << sequence);
if (cIter != pIter->second.end())
@@ -1557,7 +1587,7 @@ void ManagementAgent::handleSchemaRequestLH(Buffer& inBuffer, const string& repl
inBuffer.getShortString (packageName);
key.decode(inBuffer);
- QPID_LOG(trace, "RECV SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) <<
+ QPID_LOG(debug, "RECV SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) <<
"), replyTo=" << replyToKey << " seq=" << sequence);
PackageMap::iterator pIter = packages.find(packageName);
@@ -1575,7 +1605,7 @@ void ManagementAgent::handleSchemaRequestLH(Buffer& inBuffer, const string& repl
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(trace, "SEND SchemaResponse to=" << replyToKey << " seq=" << sequence);
+ QPID_LOG(debug, "SEND SchemaResponse to=" << replyToKey << " seq=" << sequence);
}
else
sendCommandCompleteLH(replyToKey, sequence, 1, "Schema not available");
@@ -1598,7 +1628,7 @@ void ManagementAgent::handleSchemaResponseLH(Buffer& inBuffer, const string& /*r
key.decode(inBuffer);
inBuffer.restore();
- QPID_LOG(trace, "RECV SchemaResponse class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" << " seq=" << sequence);
+ QPID_LOG(debug, "RECV SchemaResponse class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" << " seq=" << sequence);
PackageMap::iterator pIter = packages.find(packageName);
if (pIter != packages.end()) {
@@ -1622,7 +1652,7 @@ void ManagementAgent::handleSchemaResponseLH(Buffer& inBuffer, const string& /*r
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
sendBufferLH(outBuffer, outLen, mExchange, "schema.class");
- QPID_LOG(trace, "SEND ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" <<
+ QPID_LOG(debug, "SEND ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" <<
" to=schema.class");
}
}
@@ -1702,7 +1732,7 @@ void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, const string& rep
requestedBrokerBank = inBuffer.getLong();
requestedAgentBank = inBuffer.getLong();
- QPID_LOG(trace, "RECV (Agent)AttachRequest label=" << label << " reqBrokerBank=" << requestedBrokerBank <<
+ QPID_LOG(debug, "RECV (Agent)AttachRequest label=" << label << " reqBrokerBank=" << requestedBrokerBank <<
" reqAgentBank=" << requestedAgentBank << " replyTo=" << replyToKey << " seq=" << sequence);
assignedBank = assignBankLH(requestedAgentBank);
@@ -1722,7 +1752,7 @@ void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, const string& rep
addObject (agent->mgmtObject, 0);
remoteAgents[connectionRef] = agent;
- QPID_LOG(trace, "Remote Agent registered bank=[" << brokerBank << "." << assignedBank << "] replyTo=" << replyToKey);
+ QPID_LOG(debug, "Remote Agent registered bank=[" << brokerBank << "." << assignedBank << "] replyTo=" << replyToKey);
// Send an Attach Response
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
@@ -1734,7 +1764,7 @@ void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, const string& rep
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(trace, "SEND AttachResponse brokerBank=" << brokerBank << " agentBank=" << assignedBank <<
+ QPID_LOG(debug, "SEND AttachResponse brokerBank=" << brokerBank << " agentBank=" << assignedBank <<
" to=" << replyToKey << " seq=" << sequence);
}
@@ -1747,7 +1777,7 @@ void ManagementAgent::handleGetQueryLH(Buffer& inBuffer, const string& replyToKe
ft.decode(inBuffer);
- QPID_LOG(trace, "RECV GetQuery (v1) query=" << ft << " seq=" << sequence);
+ QPID_LOG(debug, "RECV GetQuery (v1) query=" << ft << " seq=" << sequence);
value = ft.get("_class");
if (value.get() == 0 || !value->convertsTo<string>()) {
@@ -1776,7 +1806,7 @@ void ManagementAgent::handleGetQueryLH(Buffer& inBuffer, const string& replyToKe
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(trace, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence);
+ QPID_LOG(debug, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence);
}
}
sendCommandCompleteLH(replyToKey, sequence);
@@ -1821,7 +1851,7 @@ void ManagementAgent::handleGetQueryLH(Buffer& inBuffer, const string& replyToKe
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
sendBufferLH(outBuffer, outLen, dExchange, replyToKey); // drops lock
- QPID_LOG(trace, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence);
+ QPID_LOG(debug, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence);
continue; // lock dropped, need to re-find _SAME_ objid as it may have been deleted.
}
encodeHeader(outBuffer, 'g', sequence);
@@ -1837,7 +1867,7 @@ void ManagementAgent::handleGetQueryLH(Buffer& inBuffer, const string& replyToKe
if (outLen) {
outBuffer.reset ();
sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(trace, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence);
+ QPID_LOG(debug, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence);
}
sendCommandCompleteLH(replyToKey, sequence);
@@ -1853,7 +1883,7 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& replyTo
Variant::Map headers;
MapCodec::decode(body, inMap);
- QPID_LOG(trace, "RECV GetQuery (v2): map=" << inMap << " seq=" << cid);
+ QPID_LOG(debug, "RECV GetQuery (v2): map=" << inMap << " seq=" << cid);
headers["method"] = "response";
headers["qmf.opcode"] = "_query_response";
@@ -1935,7 +1965,7 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& replyTo
ListCodec::encode(list_, content);
sendBufferLH(content, cid, headers, "amqp/list", v2Direct, replyTo);
- QPID_LOG(trace, "SENT QueryResponse (query by object_id) to=" << replyTo);
+ QPID_LOG(debug, "SENT QueryResponse (query by object_id) to=" << replyTo);
return;
}
} else {
@@ -1989,12 +2019,12 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& replyTo
ListCodec::encode(_list.front().asList(), content);
sendBufferLH(content, cid, headers, "amqp/list", v2Direct, replyTo);
_list.pop_front();
- QPID_LOG(trace, "SENT QueryResponse (partial, query by schema_id) to=" << replyTo << " len=" << content.length());
+ QPID_LOG(debug, "SENT QueryResponse (partial, query by schema_id) to=" << replyTo << " len=" << content.length());
}
headers.erase("partial");
ListCodec::encode(_list.size() ? _list.front().asList() : Variant::List(), content);
sendBufferLH(content, cid, headers, "amqp/list", v2Direct, replyTo);
- QPID_LOG(trace, "SENT QueryResponse (query by schema_id) to=" << replyTo << " len=" << content.length());
+ QPID_LOG(debug, "SENT QueryResponse (query by schema_id) to=" << replyTo << " len=" << content.length());
return;
}
@@ -2002,14 +2032,14 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& replyTo
string content;
ListCodec::encode(Variant::List(), content);
sendBufferLH(content, cid, headers, "amqp/list", v2Direct, replyTo);
- QPID_LOG(trace, "SENT QueryResponse (empty) to=" << replyTo);
+ QPID_LOG(debug, "SENT QueryResponse (empty) to=" << replyTo);
}
void ManagementAgent::handleLocateRequestLH(const string&, const string& replyTo,
const string& cid)
{
- QPID_LOG(trace, "RCVD AgentLocateRequest");
+ QPID_LOG(debug, "RCVD AgentLocateRequest");
Variant::Map map;
Variant::Map headers;
@@ -2028,7 +2058,7 @@ void ManagementAgent::handleLocateRequestLH(const string&, const string& replyTo
sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyTo);
clientWasAdded = true;
- QPID_LOG(trace, "SENT AgentLocateResponse replyTo=" << replyTo);
+ QPID_LOG(debug, "SENT AgentLocateResponse replyTo=" << replyTo);
}
@@ -2171,7 +2201,7 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg)
sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
}
- QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence);
+ QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence);
}
return false;
@@ -2269,7 +2299,7 @@ ManagementAgent::PackageMap::iterator ManagementAgent::findOrAddPackageLH(string
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
sendBufferLH(outBuffer, outLen, mExchange, "schema.package");
- QPID_LOG(trace, "SEND PackageInd package=" << name << " to=schema.package");
+ QPID_LOG(debug, "SEND PackageInd package=" << name << " to=schema.package");
return result.first;
}
@@ -2639,12 +2669,13 @@ void ManagementAgent::importAgents(qpid::framing::Buffer& inBuf) {
}
namespace {
-bool isNotDeleted(const ManagementObjectMap::value_type& value) {
- return !value.second->isDeleted();
+bool isDeleted(const ManagementObjectMap::value_type& value) {
+ return value.second->isDeleted();
}
-size_t countNotDeleted(const ManagementObjectMap& map) {
- return std::count_if(map.begin(), map.end(), isNotDeleted);
+void summarizeMap(std::ostream& o, const char* name, const ManagementObjectMap& map) {
+ size_t deleted = std::count_if(map.begin(), map.end(), isDeleted);
+ o << map.size() << " " << name << " (" << deleted << " deleted), ";
}
void dumpMap(std::ostream& o, const ManagementObjectMap& map) {
@@ -2657,13 +2688,18 @@ void dumpMap(std::ostream& o, const ManagementObjectMap& map) {
string ManagementAgent::debugSnapshot() {
ostringstream msg;
- msg << " management snapshot:";
- for (RemoteAgentMap::const_iterator i=remoteAgents.begin();
- i != remoteAgents.end(); ++i)
- msg << " " << i->second->routingKey;
- msg << " packages: " << packages.size();
- msg << " objects: " << countNotDeleted(managementObjects);
- msg << " new objects: " << countNotDeleted(newManagementObjects);
+ msg << " management snapshot: ";
+ if (!remoteAgents.empty()) {
+ msg << remoteAgents.size() << " agents(";
+ for (RemoteAgentMap::const_iterator i=remoteAgents.begin();
+ i != remoteAgents.end(); ++i)
+ msg << " " << i->second->routingKey;
+ msg << "), ";
+ }
+ msg << packages.size() << " packages, ";
+ summarizeMap(msg, "objects", managementObjects);
+ summarizeMap(msg, "new objects ", newManagementObjects);
+ msg << pendingDeletedObjs.size() << " pending deletes" ;
return msg.str();
}
diff --git a/cpp/src/qpid/management/ManagementObject.cpp b/cpp/src/qpid/management/ManagementObject.cpp
index 670a242c02..cfdd58ed53 100644
--- a/cpp/src/qpid/management/ManagementObject.cpp
+++ b/cpp/src/qpid/management/ManagementObject.cpp
@@ -262,6 +262,7 @@ void ManagementObject::setUpdateTime()
void ManagementObject::resourceDestroy()
{
+ QPID_LOG(trace, "Management object marked deleted: " << getObjectId().getV2Key());
destroyTime = sys::Duration(sys::EPOCH, sys::now());
deleted = true;
}
diff --git a/cpp/src/tests/cluster_test_logs.py b/cpp/src/tests/cluster_test_logs.py
new file mode 100755
index 0000000000..160e15e628
--- /dev/null
+++ b/cpp/src/tests/cluster_test_logs.py
@@ -0,0 +1,105 @@
+#!/usr/bin/env python
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Functions for comparing broker log files, used by cluster_tests.py.
+
+import os, os.path, re, glob
+from itertools import izip
+
+def split_log(log):
+ """Split a broker log at checkpoints where a member joins.
+ Return the set of checkpoints discovered."""
+ checkpoint_re = re.compile("Member joined, frameSeq=([0-9]+), queue snapshot:")
+ outfile = None
+ checkpoints = []
+ for l in open(log):
+ match = checkpoint_re.search(l)
+ if match:
+ checkpoint = match.groups()[0]
+ checkpoints.append(checkpoint)
+ if outfile: outfile.close()
+ outfile = open("%s.%s"%(log, checkpoint), 'w')
+
+ if outfile: outfile.write(l)
+ if outfile: outfile.close()
+ return checkpoints
+
+def filter_log(log):
+ """Filter the contents of a log file to remove data that is expected
+ to differ between brokers in a cluster. Filtered log contents between
+ the same checkpoints should match across the cluster."""
+ out = open("%s.filter"%(log), 'w')
+ for l in open(log):
+ # Lines to skip entirely
+ skip = "|".join([
+ 'local connection', # Only on local broker
+ 'UPDATER|UPDATEE|OFFER', # Ignore update process
+ 'stall for update|unstall, ignore update|cancelled offer .* unstall',
+ 'caught up',
+ 'active for links|Passivating links|Activating links',
+ 'info Connection.* connected to', # UpdateClient connection
+ 'warning Broker closed connection: 200, OK',
+ 'task late',
+ 'task overran'
+ ])
+ if re.compile(skip).search(l): continue
+
+ # Regex to match a UUID
+ uuid='\w\w\w\w\w\w\w\w-\w\w\w\w-\w\w\w\w-\w\w\w\w-\w\w\w\w\w\w\w\w\w\w\w\w'
+
+ # Regular expression substitutions to remove expected differences
+ for pattern,subst in [
+ (r'\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d', ''), # Remove timestamp
+ (r'cluster\([0-9.: ]*', 'cluster('), # Remove cluster node id
+ (r' local\)| shadow\)', ')'), # Remove local/shadow indication
+ (r'CATCHUP', 'READY'), # Treat catchup as equivalent to ready.
+ # System UUID
+ (r'(org.apache.qpid.broker:system[:(])%s(\)?)'%(uuid), r'\1UUID\2'),
+
+ # FIXME aconway 2010-12-20: substitutions to mask known problems
+ #(r' len=\d+', ' len=NN'), # buffer lengths
+ #(r' map={.*_object_name:([^,}]*)[,}].*', r' \1'), # V2 map - just keep name
+ #(r'\d+-\d+-\d+--\d+', 'X-X-X--X'), # V1 Object IDs
+ ]: l = re.sub(pattern,subst,l)
+ out.write(l)
+ out.close()
+
+def verify_logs(logs):
+ """Compare log files from cluster brokers, verify that they correspond correctly."""
+ for l in glob.glob("*.log"): filter_log(l)
+ checkpoints = set()
+ for l in glob.glob("*.filter"): checkpoints = checkpoints.union(set(split_log(l)))
+ errors=[]
+ for c in checkpoints:
+ fragments = glob.glob("*.filter.%s"%(c))
+ fragments.sort(reverse=True, key=os.path.getsize)
+ while len(fragments) >= 2:
+ a = fragments.pop(0)
+ b = fragments[0]
+ for ab in izip(open(a), open(b)):
+ if ab[0] != ab[1]:
+ errors.append("\n %s %s"%(a, b))
+ break
+ if errors:
+ raise Exception("Files differ in %s"%(os.getcwd())+"".join(errors))
+
+# Can be run as a script.
+if __name__ == "__main__":
+ verify_logs(glob.glob("*.log"))
diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py
index daa47a6322..03913356ca 100755
--- a/cpp/src/tests/cluster_tests.py
+++ b/cpp/src/tests/cluster_tests.py
@@ -18,7 +18,7 @@
# under the License.
#
-import os, signal, sys, time, imp, re, subprocess
+import os, signal, sys, time, imp, re, subprocess, glob, cluster_test_logs
from qpid import datatypes, messaging
from qpid.brokertest import *
from qpid.harness import Skipped
@@ -35,7 +35,7 @@ log = getLogger("qpid.cluster_tests")
# a non-0 code. Hence the apparently inconsistent use of EXPECT_EXIT_OK
# and EXPECT_EXIT_FAIL in some of the tests below.
-# FIXME aconway 2010-03-11: resolve this - ideally any exit due to an error
+# TODO aconway 2010-03-11: resolve this - ideally any exit due to an error
# should give non-0 exit status.
# Import scripts as modules
@@ -299,7 +299,10 @@ class LongTests(BrokerTest):
for i in range(i, len(cluster)): cluster[i].kill()
def test_management(self, args=[]):
- """Stress test: Run management clients and other clients concurrently."""
+ """
+ Stress test: Run management clients and other clients concurrently
+ while killing and restarting brokers.
+ """
class ClientLoop(StoppableThread):
"""Run a client executable in a loop."""
@@ -352,9 +355,9 @@ class LongTests(BrokerTest):
finally: self.lock.release()
StoppableThread.stop(self)
- # def test_management
- args += ["--mgmt-pub-interval", 1] # Publish management information every second.
- # FIXME aconway 2010-12-15: extra debugging
+ # body of test_management()
+
+ args += ["--mgmt-pub-interval", 1]
args += ["--log-enable=trace+:management"]
# Use store if present.
if BrokerTest.store_lib: args +=["--load-module", BrokerTest.store_lib]
@@ -403,6 +406,10 @@ class LongTests(BrokerTest):
start_mclients(cluster[alive])
for c in chain(mclients, *clients):
c.stop()
+ # Verify that logs are consistent
+ # FIXME aconway 2010-12-21: this is currently expected to fail due to
+ # known bugs, see https://issues.apache.org/jira/browse/QPID-2982
+ self.assertRaises(Exception, cluster_test_logs.verify_logs, glob.glob("*.log"))
def test_management_qmf2(self):
self.test_management(args=["--mgmt-qmf2=yes"])
@@ -506,7 +513,7 @@ class StoreTests(BrokerTest):
self.assertEqual(a.wait(), 0)
self.assertEqual(c.wait(), 0)
# Mix members from both shutdown events, they should fail
- # FIXME aconway 2010-03-11: can't predict the exit status of these
+ # TODO aconway 2010-03-11: can't predict the exit status of these
# as it depends on the order of delivery of initial-status messages.
# See comment at top of this file.
a = cluster.start("a", expect=EXPECT_UNKNOWN, wait=False)