summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp16
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateClient.cpp46
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateClient.h5
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp16
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateDataExchange.h8
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.cpp184
-rw-r--r--qpid/cpp/src/qpid/management/ManagementObject.cpp1
-rwxr-xr-xqpid/cpp/src/tests/cluster_test_logs.py105
-rwxr-xr-xqpid/cpp/src/tests/cluster_tests.py21
9 files changed, 284 insertions, 118 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index 0013c370a7..5720f7fcc1 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -265,7 +265,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
"Error delivering frames",
poller),
failoverExchange(new FailoverExchange(broker.GetVhostObject(), &broker)),
- updateDataExchange(new UpdateDataExchange(this)),
+ updateDataExchange(new UpdateDataExchange(*this)),
quorum(boost::bind(&Cluster::leave, this)),
decoder(boost::bind(&Cluster::deliverFrame, this, _1)),
discarding(true),
@@ -356,7 +356,7 @@ void Cluster::addLocalConnection(const boost::intrusive_ptr<Connection>& c) {
// Called in connection thread to insert an updated shadow connection.
void Cluster::addShadowConnection(const boost::intrusive_ptr<Connection>& c) {
- QPID_LOG(info, *this << " new shadow connection " << c->getId());
+ QPID_LOG(debug, *this << " new shadow connection " << c->getId());
// Safe to use connections here because we're pre-catchup, stalled
// and discarding, so deliveredFrame is not processing any
// connection events.
@@ -749,7 +749,7 @@ struct AppendQueue {
std::string Cluster::debugSnapshot() {
assertClusterSafe();
std::ostringstream msg;
- msg << "queue snapshot at " << map.getFrameSeq() << ":";
+ msg << "Member joined, frameSeq=" << map.getFrameSeq() << ", queue snapshot:";
AppendQueue append(msg);
broker.getQueues().eachQueue(append);
return msg.str();
@@ -837,7 +837,7 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, Lock& l)
checkUpdateIn(l);
}
else {
- QPID_LOG(debug,*this << " unstall, ignore update " << updater
+ QPID_LOG(info, *this << " unstall, ignore update " << updater
<< " to " << updatee);
deliverEventQueue.start(); // Not involved in update.
}
@@ -932,15 +932,15 @@ void Cluster::checkUpdateIn(Lock& l) {
// NB: don't updateMgmtMembership() here as we are not in the deliver
// thread. It will be updated on delivery of the "ready" we just mcast.
broker.setClusterUpdatee(false);
+ discarding = false; // OK to set, we're stalled for update.
+ QPID_LOG(notice, *this << " update complete, starting catch-up.");
+ QPID_LOG(debug, debugSnapshot()); // OK to call because we're stalled.
if (mAgent) {
// Update management agent now, after all update activity is complete.
updateDataExchange->updateManagementAgent(mAgent);
mAgent->suppress(false); // Enable management output.
mAgent->clusterUpdate();
}
- discarding = false; // OK to set, we're stalled for update.
- QPID_LOG(notice, *this << " update complete, starting catch-up.");
- QPID_LOG(debug, debugSnapshot()); // OK to call because we're stalled.
enableClusterSafe(); // Enable cluster-safe assertions
deliverEventQueue.start();
}
@@ -1111,7 +1111,7 @@ void Cluster::setClusterId(const Uuid& uuid, Lock&) {
mgmtObject->set_clusterID(clusterId.str());
mgmtObject->set_memberID(stream.str());
}
- QPID_LOG(debug, *this << " cluster-uuid = " << clusterId);
+ QPID_LOG(notice, *this << " cluster-uuid = " << clusterId);
}
void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) {
diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
index c52caf6aa9..6b324be4c5 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -78,6 +78,10 @@ using namespace framing;
namespace arg=client::arg;
using client::SessionBase_0_10Access;
+std::ostream& operator<<(std::ostream& o, const UpdateClient& c) {
+ return o << "cluster(" << c.updaterId << " UPDATER)";
+}
+
struct ClusterConnectionProxy : public AMQP_AllProxy::ClusterConnection, public framing::FrameHandler
{
boost::shared_ptr<qpid::client::ConnectionImpl> connection;
@@ -142,7 +146,7 @@ void UpdateClient::run() {
}
void UpdateClient::update() {
- QPID_LOG(debug, updaterId << " updating state to " << updateeId
+ QPID_LOG(debug, *this << " updating state to " << updateeId
<< " at " << updateeUrl);
Broker& b = updaterBroker;
@@ -177,14 +181,14 @@ void UpdateClient::update() {
// NOTE: connection will be closed from the other end, don't close
// it here as that causes a race.
- // FIXME aconway 2010-03-15: This sleep avoids the race condition
+ // TODO aconway 2010-03-15: This sleep avoids the race condition
// described in // https://bugzilla.redhat.com/show_bug.cgi?id=568831.
// It allows the connection to fully close before destroying the
// Connection object. Remove when the bug is fixed.
//
sys::usleep(10*1000);
- QPID_LOG(debug, updaterId << " update completed to " << updateeId
+ QPID_LOG(debug, *this << " update completed to " << updateeId
<< " at " << updateeUrl << ": " << membership);
}
@@ -205,7 +209,7 @@ void UpdateClient::updateManagementSetupState()
management::ManagementAgent* agent = updaterBroker.getManagementAgent();
if (!agent) return;
- QPID_LOG(debug, updaterId << " updating management setup-state.");
+ QPID_LOG(debug, *this << " updating management setup-state.");
std::string vendor, product, instance;
agent->getName(vendor, product, instance);
ClusterConnectionProxy(session).managementSetupState(
@@ -219,19 +223,19 @@ void UpdateClient::updateManagementAgent()
if (!agent) return;
string data;
- QPID_LOG(debug, updaterId << " updating management schemas. ")
+ QPID_LOG(debug, *this << " updating management schemas. ")
agent->exportSchemas(data);
session.messageTransfer(
arg::content=client::Message(data, UpdateDataExchange::MANAGEMENT_SCHEMAS_KEY),
arg::destination=UpdateDataExchange::EXCHANGE_NAME);
- QPID_LOG(debug, updaterId << " updating management agents. ")
+ QPID_LOG(debug, *this << " updating management agents. ")
agent->exportAgents(data);
session.messageTransfer(
arg::content=client::Message(data, UpdateDataExchange::MANAGEMENT_AGENTS_KEY),
arg::destination=UpdateDataExchange::EXCHANGE_NAME);
- QPID_LOG(debug, updaterId << " updating management deleted objects. ")
+ QPID_LOG(debug, *this << " updating management deleted objects. ")
typedef management::ManagementAgent::DeletedObjectList DeletedObjectList;
DeletedObjectList deleted;
agent->exportDeletedObjects(deleted);
@@ -248,7 +252,7 @@ void UpdateClient::updateManagementAgent()
}
void UpdateClient::updateExchange(const boost::shared_ptr<Exchange>& ex) {
- QPID_LOG(debug, updaterId << " updating exchange " << ex->getName());
+ QPID_LOG(debug, *this << " updating exchange " << ex->getName());
ClusterConnectionProxy(session).exchange(encode(*ex));
}
@@ -341,13 +345,13 @@ void UpdateClient::updateQueue(client::AsyncSession& s, const boost::shared_ptr<
}
void UpdateClient::updateExclusiveQueue(const boost::shared_ptr<broker::Queue>& q) {
- QPID_LOG(debug, updaterId << " updating exclusive queue " << q->getName() << " on " << shadowSession.getId());
+ QPID_LOG(debug, *this << " updating exclusive queue " << q->getName() << " on " << shadowSession.getId());
updateQueue(shadowSession, q);
}
void UpdateClient::updateNonExclusiveQueue(const boost::shared_ptr<broker::Queue>& q) {
if (!q->hasExclusiveOwner()) {
- QPID_LOG(debug, updaterId << " updating queue " << q->getName());
+ QPID_LOG(debug, *this << " updating queue " << q->getName());
updateQueue(session, q);
}//else queue will be updated as part of session state of owning session
}
@@ -362,12 +366,12 @@ void UpdateClient::updateOutputTask(const sys::OutputTask* task) {
SemanticState::ConsumerImpl* ci = const_cast<SemanticState::ConsumerImpl*>(cci);
uint16_t channel = ci->getParent().getSession().getChannel();
ClusterConnectionProxy(shadowConnection).outputTask(channel, ci->getName());
- QPID_LOG(debug, updaterId << " updating output task " << ci->getName()
+ QPID_LOG(debug, *this << " updating output task " << ci->getName()
<< " channel=" << channel);
}
void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& updateConnection) {
- QPID_LOG(debug, updaterId << " updating connection " << *updateConnection);
+ QPID_LOG(debug, *this << " updating connection " << *updateConnection);
assert(updateConnection->getBrokerConnection());
broker::Connection& bc = *updateConnection->getBrokerConnection();
@@ -398,14 +402,14 @@ void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& upda
updateConnection->getOutput().getSendMax()
);
shadowConnection.close();
- QPID_LOG(debug, updaterId << " updated connection " << *updateConnection);
+ QPID_LOG(debug, *this << " updated connection " << *updateConnection);
}
void UpdateClient::updateSession(broker::SessionHandler& sh) {
broker::SessionState* ss = sh.getSession();
if (!ss) return; // no session.
- QPID_LOG(debug, updaterId << " updating session " << ss->getId());
+ QPID_LOG(debug, *this << " updating session " << ss->getId());
// Create a client session to update session state.
boost::shared_ptr<client::ConnectionImpl> cimpl = client::ConnectionAccess::getImpl(shadowConnection);
@@ -416,14 +420,14 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) {
// Re-create session state on remote connection.
- QPID_LOG(debug, updaterId << " updating exclusive queues.");
+ QPID_LOG(debug, *this << " updating exclusive queues.");
ss->getSessionAdapter().eachExclusiveQueue(boost::bind(&UpdateClient::updateExclusiveQueue, this, _1));
- QPID_LOG(debug, updaterId << " updating consumers.");
+ QPID_LOG(debug, *this << " updating consumers.");
ss->getSemanticState().eachConsumer(
boost::bind(&UpdateClient::updateConsumer, this, _1));
- QPID_LOG(debug, updaterId << " updating unacknowledged messages.");
+ QPID_LOG(debug, *this << " updating unacknowledged messages.");
broker::DeliveryRecords& drs = ss->getSemanticState().getUnacked();
std::for_each(drs.begin(), drs.end(),
boost::bind(&UpdateClient::updateUnacked, this, _1));
@@ -454,13 +458,13 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) {
if (inProgress) {
inProgress->getFrames().map(simpl->out);
}
- QPID_LOG(debug, updaterId << " updated session " << sh.getSession()->getId());
+ QPID_LOG(debug, *this << " updated session " << sh.getSession()->getId());
}
void UpdateClient::updateConsumer(
const broker::SemanticState::ConsumerImpl::shared_ptr& ci)
{
- QPID_LOG(debug, updaterId << " updating consumer " << ci->getName() << " on "
+ QPID_LOG(debug, *this << " updating consumer " << ci->getName() << " on "
<< shadowSession.getId());
using namespace message;
@@ -485,7 +489,7 @@ void UpdateClient::updateConsumer(
);
consumerNumbering.add(ci);
- QPID_LOG(debug, updaterId << " updated consumer " << ci->getName()
+ QPID_LOG(debug, *this << " updated consumer " << ci->getName()
<< " on " << shadowSession.getId());
}
@@ -552,7 +556,7 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater {
};
void UpdateClient::updateTxState(broker::SemanticState& s) {
- QPID_LOG(debug, updaterId << " updating TX transaction state.");
+ QPID_LOG(debug, *this << " updating TX transaction state.");
ClusterConnectionProxy proxy(shadowSession);
proxy.accumulatedAck(s.getAccumulatedAck());
broker::TxBuffer::shared_ptr txBuffer = s.getTxBuffer();
diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.h b/qpid/cpp/src/qpid/cluster/UpdateClient.h
index be09af7e81..76621cd7ba 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateClient.h
+++ b/qpid/cpp/src/qpid/cluster/UpdateClient.h
@@ -30,7 +30,7 @@
#include "qpid/broker/SemanticState.h"
#include "qpid/sys/Runnable.h"
#include <boost/shared_ptr.hpp>
-
+#include <iosfwd>
namespace qpid {
@@ -114,8 +114,11 @@ class UpdateClient : public sys::Runnable {
boost::function<void()> done;
boost::function<void(const std::exception& e)> failed;
client::ConnectionSettings connectionSettings;
+
+ friend std::ostream& operator<<(std::ostream&, const UpdateClient&);
};
+
}} // namespace qpid::cluster
#endif /*!QPID_CLUSTER_UPDATECLIENT_H*/
diff --git a/qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp b/qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp
index 2f242b3024..2a079b8881 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp
+++ b/qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp
@@ -19,6 +19,7 @@
*
*/
#include "UpdateDataExchange.h"
+#include "Cluster.h"
#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/broker/Deliverable.h"
#include "qpid/broker/Message.h"
@@ -35,8 +36,13 @@ const std::string UpdateDataExchange::MANAGEMENT_AGENTS_KEY("management-agents")
const std::string UpdateDataExchange::MANAGEMENT_SCHEMAS_KEY("management-schemas");
const std::string UpdateDataExchange::MANAGEMENT_DELETED_OBJECTS_KEY("management-deleted-objects");
-UpdateDataExchange::UpdateDataExchange(management::Manageable* parent) :
- Exchange(EXCHANGE_NAME, parent)
+std::ostream& operator<<(std::ostream& o, const UpdateDataExchange& c) {
+ return o << "cluster(" << c.clusterId << " UPDATER)";
+}
+
+UpdateDataExchange::UpdateDataExchange(Cluster& cluster) :
+ Exchange(EXCHANGE_NAME, &cluster),
+ clusterId(cluster.getId())
{}
void UpdateDataExchange::route(broker::Deliverable& msg, const std::string& routingKey,
@@ -56,11 +62,11 @@ void UpdateDataExchange::updateManagementAgent(management::ManagementAgent* agen
framing::Buffer buf1(const_cast<char*>(managementAgents.data()), managementAgents.size());
agent->importAgents(buf1);
- QPID_LOG(debug, " Updated management agents.");
+ QPID_LOG(debug, *this << " updated management agents.");
framing::Buffer buf2(const_cast<char*>(managementSchemas.data()), managementSchemas.size());
agent->importSchemas(buf2);
- QPID_LOG(debug, " Updated management schemas");
+ QPID_LOG(debug, *this << " updated management schemas.");
using amqp_0_10::ListCodec;
using types::Variant;
@@ -72,7 +78,7 @@ void UpdateDataExchange::updateManagementAgent(management::ManagementAgent* agen
new management::ManagementAgent::DeletedObject(*i)));
}
agent->importDeletedObjects(objects);
- QPID_LOG(debug, " Updated management deleted objects.");
+ QPID_LOG(debug, *this << " updated management deleted objects.");
}
diff --git a/qpid/cpp/src/qpid/cluster/UpdateDataExchange.h b/qpid/cpp/src/qpid/cluster/UpdateDataExchange.h
index 27a98548f3..8c493e400a 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateDataExchange.h
+++ b/qpid/cpp/src/qpid/cluster/UpdateDataExchange.h
@@ -23,6 +23,8 @@
*/
#include "qpid/broker/Exchange.h"
+#include "types.h"
+#include <iosfwd>
namespace qpid {
@@ -31,6 +33,7 @@ class ManagementAgent;
}
namespace cluster {
+class Cluster;
/**
* An exchange used to send data that is to large for a control
@@ -45,7 +48,7 @@ class UpdateDataExchange : public broker::Exchange
static const std::string MANAGEMENT_SCHEMAS_KEY;
static const std::string MANAGEMENT_DELETED_OBJECTS_KEY;
- UpdateDataExchange(management::Manageable* parent);
+ UpdateDataExchange(Cluster& parent);
void route(broker::Deliverable& msg, const std::string& routingKey,
const framing::FieldTable* args);
@@ -71,10 +74,11 @@ class UpdateDataExchange : public broker::Exchange
void updateManagementAgent(management::ManagementAgent* agent);
private:
-
+ MemberId clusterId;
std::string managementAgents;
std::string managementSchemas;
std::string managementDeletedObjects;
+ friend std::ostream& operator<<(std::ostream&, const UpdateDataExchange&);
};
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
index 07751f57ef..7b60ea35c4 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
@@ -18,7 +18,12 @@
* under the License.
*
*/
-
+
+
+// NOTE on use of log levels: The criteria for using trace vs. debug
+// is to use trace for log messages that are generated for each
+// unbatched stats/props notification and debug for everything else.
+
#include "qpid/management/ManagementAgent.h"
#include "qpid/management/ManagementObject.h"
#include "qpid/broker/DeliverableMessage.h"
@@ -89,7 +94,7 @@ static Variant::Map mapEncodeSchemaId(const string& pname,
ManagementAgent::RemoteAgent::~RemoteAgent ()
{
- QPID_LOG(trace, "Remote Agent removed bank=[" << brokerBank << "." << agentBank << "]");
+ QPID_LOG(debug, "Remote Agent removed bank=[" << brokerBank << "." << agentBank << "]");
if (mgmtObject != 0) {
mgmtObject->resourceDestroy();
agent.deleteObjectNowLH(mgmtObject->getObjectId());
@@ -169,7 +174,7 @@ void ManagementAgent::configure(const string& _dataDir, uint16_t _interval,
uuid.generate();
QPID_LOG (info, "No stored broker ID found - ManagementAgent generated broker ID: " << uuid);
} else
- QPID_LOG (debug, "ManagementAgent restored broker ID: " << uuid);
+ QPID_LOG (info, "ManagementAgent restored broker ID: " << uuid);
// if sequence goes beyond a 12-bit field, skip zero and wrap to 1.
bootSequence++;
@@ -308,7 +313,7 @@ ObjectId ManagementAgent::addObject(ManagementObject* object, uint64_t persistId
}
newManagementObjects[objId] = object;
}
-
+ QPID_LOG(debug, "Management object (V1) added: " << objId.getV2Key());
return objId;
}
@@ -330,7 +335,6 @@ ObjectId ManagementAgent::addObject(ManagementObject* object,
}
object->setObjectId(objId);
-
{
sys::Mutex::ScopedLock lock(addLock);
ManagementObjectMap::iterator destIter = newManagementObjects.find(objId);
@@ -340,7 +344,7 @@ ObjectId ManagementAgent::addObject(ManagementObject* object,
}
newManagementObjects[objId] = object;
}
-
+ QPID_LOG(debug, "Management object added: " << objId.getV2Key());
return objId;
}
@@ -370,7 +374,7 @@ void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severi
outBuffer.reset();
sendBufferLH(outBuffer, outLen, mExchange,
"console.event.1.0." + event.getPackageName() + "." + event.getEventName());
- QPID_LOG(trace, "SEND raiseEvent (v1) class=" << event.getPackageName() << "." << event.getEventName());
+ QPID_LOG(debug, "SEND raiseEvent (v1) class=" << event.getPackageName() << "." << event.getEventName());
}
if (qmf2Support) {
@@ -408,9 +412,8 @@ void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severi
list_.push_back(map_);
ListCodec::encode(list_, content);
sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str());
- QPID_LOG(trace, "SEND raiseEvent (v2) class=" << event.getPackageName() << "." << event.getEventName());
+ QPID_LOG(debug, "SEND raiseEvent (v2) class=" << event.getPackageName() << "." << event.getEventName());
}
-
}
ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds)
@@ -467,7 +470,7 @@ void ManagementAgent::clientAdded (const string& routingKey)
outLen = outBuffer.getPosition();
outBuffer.reset();
sendBufferLH(outBuffer, outLen, dExchange, rkeys.front());
- QPID_LOG(trace, "SEND ConsoleAddedIndication to=" << rkeys.front());
+ QPID_LOG(debug, "SEND ConsoleAddedIndication to=" << rkeys.front());
rkeys.pop_front();
}
}
@@ -476,8 +479,10 @@ void ManagementAgent::clusterUpdate() {
// Called on all cluster memebers when a new member joins a cluster.
// Set clientWasAdded so that on the next periodicProcessing we will do
// a full update on all cluster members.
+ sys::Mutex::ScopedLock l(userLock);
+ moveNewObjectsLH(); // to be consistent with updater/updatee.
clientWasAdded = true;
- QPID_LOG(debug, "cluster update " << debugSnapshot());
+ QPID_LOG(debug, "Cluster member joined, " << debugSnapshot());
}
void ManagementAgent::encodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq)
@@ -509,7 +514,7 @@ void ManagementAgent::sendBufferLH(Buffer& buf,
string routingKey)
{
if (suppressed) {
- QPID_LOG(trace, "Suppressing management message to " << routingKey);
+ QPID_LOG(debug, "Suppressing management message to " << routingKey);
return;
}
if (exchange.get() == 0) return;
@@ -564,7 +569,7 @@ void ManagementAgent::sendBufferLH(const string& data,
Variant::Map::const_iterator i;
if (suppressed) {
- QPID_LOG(trace, "Suppressing management message to " << routingKey);
+ QPID_LOG(debug, "Suppressing management message to " << routingKey);
return;
}
if (exchange.get() == 0) return;
@@ -637,7 +642,7 @@ void ManagementAgent::periodicProcessing (void)
{
#define BUFSIZE 65536
#define HEADROOM 4096
- QPID_LOG(trace, "Management agent periodic processing");
+ QPID_LOG(debug, "Management agent periodic processing");
sys::Mutex::ScopedLock lock (userLock);
char msgChars[BUFSIZE];
uint32_t contentSize;
@@ -776,17 +781,26 @@ void ManagementAgent::periodicProcessing (void)
send_stats = (object->hasInst() && (object->getInstChanged() || object->getForcePublish()));
if (send_props && qmf1Support) {
+ size_t pos = msgBuffer.getPosition();
encodeHeader(msgBuffer, 'c');
sBuf.clear();
object->writeProperties(sBuf);
msgBuffer.putRawData(sBuf);
+ QPID_LOG(trace, "Changed V1 properties "
+ << object->getObjectId().getV2Key()
+ << " len=" << msgBuffer.getPosition()-pos);
}
if (send_stats && qmf1Support) {
+ size_t pos = msgBuffer.getPosition();
encodeHeader(msgBuffer, 'i');
sBuf.clear();
object->writeStatistics(sBuf);
msgBuffer.putRawData(sBuf);
+ QPID_LOG(trace, "Changed V1 statistics "
+ << object->getObjectId().getV2Key()
+ << " len=" << msgBuffer.getPosition()-pos);
+
}
if ((send_stats || send_props) && qmf2Support) {
@@ -805,6 +819,10 @@ void ManagementAgent::periodicProcessing (void)
map_["_values"] = values;
list_.push_back(map_);
v2Objs++;
+ QPID_LOG(trace, "Changed V2"
+ << (send_stats? " statistics":"")
+ << (send_props? " properties":"")
+ << " map=" << map_);
}
if (send_props) pcount++;
@@ -826,7 +844,10 @@ void ManagementAgent::periodicProcessing (void)
key << "console.obj.1.0." << packageName << "." << className;
msgBuffer.reset();
sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); // UNLOCKS USERLOCK
- QPID_LOG(trace, "SEND V1 Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount << " len=" << contentSize);
+ QPID_LOG(debug, "SEND V1 Multicast ContentInd to=" << key.str()
+ << " props=" << pcount
+ << " stats=" << scount
+ << " len=" << contentSize);
}
}
@@ -849,7 +870,10 @@ void ManagementAgent::periodicProcessing (void)
headers["qmf.agent"] = name_address;
sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); // UNLOCKS USERLOCK
- QPID_LOG(trace, "SEND Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount << " len=" << content.length());
+ QPID_LOG(debug, "SEND Multicast ContentInd to=" << key.str()
+ << " props=" << pcount
+ << " stats=" << scount
+ << " len=" << content.length());
}
}
}
@@ -877,15 +901,19 @@ void ManagementAgent::periodicProcessing (void)
for (DeletedObjectList::iterator lIter = mIter->second.begin();
lIter != mIter->second.end(); lIter++) {
-
+ std::string oid = (*lIter)->objectId;
if (!(*lIter)->encodedV1Config.empty()) {
encodeHeader(msgBuffer, 'c');
msgBuffer.putRawData((*lIter)->encodedV1Config);
+ QPID_LOG(trace, "Deleting V1 properties " << oid
+ << " len=" << (*lIter)->encodedV1Config.size());
v1Objs++;
}
if (!(*lIter)->encodedV1Inst.empty()) {
encodeHeader(msgBuffer, 'i');
msgBuffer.putRawData((*lIter)->encodedV1Inst);
+ QPID_LOG(trace, "Deleting V1 statistics " << oid
+ << " len=" << (*lIter)->encodedV1Inst.size());
v1Objs++;
}
if (v1Objs && msgBuffer.available() < HEADROOM) {
@@ -895,10 +923,12 @@ void ManagementAgent::periodicProcessing (void)
key << "console.obj.1.0." << packageName << "." << className;
msgBuffer.reset();
sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); // UNLOCKS USERLOCK
- QPID_LOG(trace, "SEND V1 Multicast ContentInd V1 (delete) to=" << key.str() << " len=" << contentSize);
+ QPID_LOG(debug, "SEND V1 Multicast ContentInd V1 (delete) to="
+ << key.str() << " len=" << contentSize);
}
if (!(*lIter)->encodedV2.empty()) {
+ QPID_LOG(trace, "Deleting V2 " << "map=" << (*lIter)->encodedV2);
list_.push_back((*lIter)->encodedV2);
if (++v2Objs >= maxV2ReplyObjs) {
v2Objs = 0;
@@ -922,7 +952,7 @@ void ManagementAgent::periodicProcessing (void)
headers["qmf.agent"] = name_address;
sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); // UNLOCKS USERLOCK
- QPID_LOG(trace, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length());
+ QPID_LOG(debug, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length());
}
}
}
@@ -936,7 +966,7 @@ void ManagementAgent::periodicProcessing (void)
key << "console.obj.1.0." << packageName << "." << className;
msgBuffer.reset();
sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); // UNLOCKS USERLOCK
- QPID_LOG(trace, "SEND V1 Multicast ContentInd V1 (delete) to=" << key.str() << " len=" << contentSize);
+ QPID_LOG(debug, "SEND V1 Multicast ContentInd V1 (delete) to=" << key.str() << " len=" << contentSize);
}
if (!list_.empty()) {
@@ -959,7 +989,7 @@ void ManagementAgent::periodicProcessing (void)
headers["qmf.agent"] = name_address;
sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); // UNLOCKS USERLOCK
- QPID_LOG(trace, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length());
+ QPID_LOG(debug, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length());
}
}
} // end map
@@ -984,7 +1014,7 @@ void ManagementAgent::periodicProcessing (void)
msgBuffer.reset ();
routingKey = "console.heartbeat.1.0";
sendBufferLH(msgBuffer, contentSize, mExchange, routingKey);
- QPID_LOG(trace, "SEND HeartbeatInd to=" << routingKey);
+ QPID_LOG(debug, "SEND HeartbeatInd to=" << routingKey);
}
if (qmf2Support) {
@@ -1013,7 +1043,7 @@ void ManagementAgent::periodicProcessing (void)
// time to prevent stale heartbeats from getting to the consoles.
sendBufferLH(content, "", headers, "amqp/map", v2Topic, addr_key.str(), interval * 2 * 1000);
- QPID_LOG(trace, "SENT AgentHeartbeat name=" << name_address);
+ QPID_LOG(debug, "SENT AgentHeartbeat name=" << name_address);
}
QPID_LOG(debug, "periodic update " << debugSnapshot());
}
@@ -1073,7 +1103,7 @@ void ManagementAgent::deleteObjectNowLH(const ObjectId& oid)
uint32_t contentSize = msgBuffer.getPosition();
msgBuffer.reset();
sendBufferLH(msgBuffer, contentSize, mExchange, v1key.str());
- QPID_LOG(trace, "SEND Immediate(delete) ContentInd to=" << v1key.str());
+ QPID_LOG(debug, "SEND Immediate(delete) ContentInd to=" << v1key.str());
}
if (qmf2Support) {
@@ -1086,7 +1116,7 @@ void ManagementAgent::deleteObjectNowLH(const ObjectId& oid)
string content;
ListCodec::encode(list_, content);
sendBufferLH(content, "", headers, "amqp/list", v2Topic, v2key.str());
- QPID_LOG(trace, "SEND Immediate(delete) ContentInd to=" << v2key.str());
+ QPID_LOG(debug, "SEND Immediate(delete) ContentInd to=" << v2key.str());
}
}
@@ -1102,7 +1132,7 @@ void ManagementAgent::sendCommandCompleteLH(const string& replyToKey, uint32_t s
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(trace, "SEND CommandCompleteInd code=" << code << " text=" << text << " to=" <<
+ QPID_LOG(debug, "SEND CommandCompleteInd code=" << code << " text=" << text << " to=" <<
replyToKey << " seq=" << sequence);
}
@@ -1127,7 +1157,7 @@ void ManagementAgent::sendExceptionLH(const string& replyToKey, const string& ci
MapCodec::encode(map, content);
sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyToKey);
- QPID_LOG(trace, "SENT Exception code=" << code <<" text=" << text);
+ QPID_LOG(debug, "SENT Exception code=" << code <<" text=" << text);
}
bool ManagementAgent::dispatchCommand (Deliverable& deliverable,
@@ -1221,7 +1251,7 @@ void ManagementAgent::handleMethodRequestLH(Buffer& inBuffer, const string& repl
inBuffer.getShortString(methodName);
inBuffer.getRawData(inArgs, inBuffer.available());
- QPID_LOG(trace, "RECV MethodRequest (v1) class=" << packageName << ":" << className << "(" << Uuid(hash) << ") method=" <<
+ QPID_LOG(debug, "RECV MethodRequest (v1) class=" << packageName << ":" << className << "(" << Uuid(hash) << ") method=" <<
methodName << " replyTo=" << replyToKey);
encodeHeader(outBuffer, 'm', sequence);
@@ -1232,7 +1262,7 @@ void ManagementAgent::handleMethodRequestLH(Buffer& inBuffer, const string& repl
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN reason='All QMFv1 Methods Forbidden' seq=" << sequence);
+ QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN reason='All QMFv1 Methods Forbidden' seq=" << sequence);
return;
}
@@ -1243,7 +1273,7 @@ void ManagementAgent::handleMethodRequestLH(Buffer& inBuffer, const string& repl
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << sequence);
+ QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << sequence);
return;
}
@@ -1259,7 +1289,7 @@ void ManagementAgent::handleMethodRequestLH(Buffer& inBuffer, const string& repl
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence);
+ QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence);
return;
}
}
@@ -1291,7 +1321,7 @@ void ManagementAgent::handleMethodRequestLH(Buffer& inBuffer, const string& repl
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(trace, "SEND MethodResponse (v1) to=" << replyToKey << " seq=" << sequence);
+ QPID_LOG(debug, "SEND MethodResponse (v1) to=" << replyToKey << " seq=" << sequence);
}
@@ -1374,7 +1404,7 @@ void ManagementAgent::handleMethodRequestLH (const string& body, const string& r
// invoke the method
- QPID_LOG(trace, "RECV MethodRequest (v2) class=" << iter->second->getPackageName()
+ QPID_LOG(debug, "RECV MethodRequest (v2) class=" << iter->second->getPackageName()
<< ":" << iter->second->getClassName() << " method=" <<
methodName << " replyTo=" << replyTo << " objId=" << objId << " inArgs=" << inArgs);
@@ -1402,7 +1432,7 @@ void ManagementAgent::handleMethodRequestLH (const string& body, const string& r
MapCodec::encode(outMap, content);
sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyTo);
- QPID_LOG(trace, "SEND MethodResponse (v2) to=" << replyTo << " seq=" << cid << " map=" << outMap);
+ QPID_LOG(debug, "SEND MethodResponse (v2) to=" << replyTo << " seq=" << cid << " map=" << outMap);
}
@@ -1411,7 +1441,7 @@ void ManagementAgent::handleBrokerRequestLH (Buffer&, const string& replyToKey,
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- QPID_LOG(trace, "RECV BrokerRequest replyTo=" << replyToKey);
+ QPID_LOG(debug, "RECV BrokerRequest replyTo=" << replyToKey);
encodeHeader (outBuffer, 'b', sequence);
uuid.encode (outBuffer);
@@ -1419,12 +1449,12 @@ void ManagementAgent::handleBrokerRequestLH (Buffer&, const string& replyToKey,
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(trace, "SEND BrokerResponse to=" << replyToKey);
+ QPID_LOG(debug, "SEND BrokerResponse to=" << replyToKey);
}
void ManagementAgent::handlePackageQueryLH (Buffer&, const string& replyToKey, uint32_t sequence)
{
- QPID_LOG(trace, "RECV PackageQuery replyTo=" << replyToKey);
+ QPID_LOG(debug, "RECV PackageQuery replyTo=" << replyToKey);
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
@@ -1440,7 +1470,7 @@ void ManagementAgent::handlePackageQueryLH (Buffer&, const string& replyToKey, u
if (outLen) {
outBuffer.reset ();
sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(trace, "SEND PackageInd to=" << replyToKey << " seq=" << sequence);
+ QPID_LOG(debug, "SEND PackageInd to=" << replyToKey << " seq=" << sequence);
}
sendCommandCompleteLH(replyToKey, sequence);
@@ -1452,7 +1482,7 @@ void ManagementAgent::handlePackageIndLH (Buffer& inBuffer, const string& replyT
inBuffer.getShortString(packageName);
- QPID_LOG(trace, "RECV PackageInd package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence);
+ QPID_LOG(debug, "RECV PackageInd package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence);
findOrAddPackageLH(packageName);
}
@@ -1463,7 +1493,7 @@ void ManagementAgent::handleClassQueryLH(Buffer& inBuffer, const string& replyTo
inBuffer.getShortString(packageName);
- QPID_LOG(trace, "RECV ClassQuery package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence);
+ QPID_LOG(debug, "RECV ClassQuery package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence);
PackageMap::iterator pIter = packages.find(packageName);
if (pIter != packages.end())
@@ -1489,7 +1519,7 @@ void ManagementAgent::handleClassQueryLH(Buffer& inBuffer, const string& replyTo
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(trace, "SEND ClassInd class=" << packageName << ":" << classes.front().first.name <<
+ QPID_LOG(debug, "SEND ClassInd class=" << packageName << ":" << classes.front().first.name <<
"(" << Uuid(classes.front().first.hash) << ") to=" << replyToKey << " seq=" << sequence);
classes.pop_front();
}
@@ -1508,7 +1538,7 @@ void ManagementAgent::handleClassIndLH (Buffer& inBuffer, const string& replyToK
inBuffer.getShortString(key.name);
inBuffer.getBin128(key.hash);
- QPID_LOG(trace, "RECV ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) <<
+ QPID_LOG(debug, "RECV ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) <<
"), replyTo=" << replyToKey);
PackageMap::iterator pIter = findOrAddPackageLH(packageName);
@@ -1525,7 +1555,7 @@ void ManagementAgent::handleClassIndLH (Buffer& inBuffer, const string& replyToK
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(trace, "SEND SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) <<
+ QPID_LOG(debug, "SEND SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) <<
"), to=" << replyToKey << " seq=" << sequence);
if (cIter != pIter->second.end())
@@ -1557,7 +1587,7 @@ void ManagementAgent::handleSchemaRequestLH(Buffer& inBuffer, const string& repl
inBuffer.getShortString (packageName);
key.decode(inBuffer);
- QPID_LOG(trace, "RECV SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) <<
+ QPID_LOG(debug, "RECV SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) <<
"), replyTo=" << replyToKey << " seq=" << sequence);
PackageMap::iterator pIter = packages.find(packageName);
@@ -1575,7 +1605,7 @@ void ManagementAgent::handleSchemaRequestLH(Buffer& inBuffer, const string& repl
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(trace, "SEND SchemaResponse to=" << replyToKey << " seq=" << sequence);
+ QPID_LOG(debug, "SEND SchemaResponse to=" << replyToKey << " seq=" << sequence);
}
else
sendCommandCompleteLH(replyToKey, sequence, 1, "Schema not available");
@@ -1598,7 +1628,7 @@ void ManagementAgent::handleSchemaResponseLH(Buffer& inBuffer, const string& /*r
key.decode(inBuffer);
inBuffer.restore();
- QPID_LOG(trace, "RECV SchemaResponse class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" << " seq=" << sequence);
+ QPID_LOG(debug, "RECV SchemaResponse class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" << " seq=" << sequence);
PackageMap::iterator pIter = packages.find(packageName);
if (pIter != packages.end()) {
@@ -1622,7 +1652,7 @@ void ManagementAgent::handleSchemaResponseLH(Buffer& inBuffer, const string& /*r
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
sendBufferLH(outBuffer, outLen, mExchange, "schema.class");
- QPID_LOG(trace, "SEND ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" <<
+ QPID_LOG(debug, "SEND ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" <<
" to=schema.class");
}
}
@@ -1702,7 +1732,7 @@ void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, const string& rep
requestedBrokerBank = inBuffer.getLong();
requestedAgentBank = inBuffer.getLong();
- QPID_LOG(trace, "RECV (Agent)AttachRequest label=" << label << " reqBrokerBank=" << requestedBrokerBank <<
+ QPID_LOG(debug, "RECV (Agent)AttachRequest label=" << label << " reqBrokerBank=" << requestedBrokerBank <<
" reqAgentBank=" << requestedAgentBank << " replyTo=" << replyToKey << " seq=" << sequence);
assignedBank = assignBankLH(requestedAgentBank);
@@ -1722,7 +1752,7 @@ void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, const string& rep
addObject (agent->mgmtObject, 0);
remoteAgents[connectionRef] = agent;
- QPID_LOG(trace, "Remote Agent registered bank=[" << brokerBank << "." << assignedBank << "] replyTo=" << replyToKey);
+ QPID_LOG(debug, "Remote Agent registered bank=[" << brokerBank << "." << assignedBank << "] replyTo=" << replyToKey);
// Send an Attach Response
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
@@ -1734,7 +1764,7 @@ void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, const string& rep
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(trace, "SEND AttachResponse brokerBank=" << brokerBank << " agentBank=" << assignedBank <<
+ QPID_LOG(debug, "SEND AttachResponse brokerBank=" << brokerBank << " agentBank=" << assignedBank <<
" to=" << replyToKey << " seq=" << sequence);
}
@@ -1747,7 +1777,7 @@ void ManagementAgent::handleGetQueryLH(Buffer& inBuffer, const string& replyToKe
ft.decode(inBuffer);
- QPID_LOG(trace, "RECV GetQuery (v1) query=" << ft << " seq=" << sequence);
+ QPID_LOG(debug, "RECV GetQuery (v1) query=" << ft << " seq=" << sequence);
value = ft.get("_class");
if (value.get() == 0 || !value->convertsTo<string>()) {
@@ -1776,7 +1806,7 @@ void ManagementAgent::handleGetQueryLH(Buffer& inBuffer, const string& replyToKe
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(trace, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence);
+ QPID_LOG(debug, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence);
}
}
sendCommandCompleteLH(replyToKey, sequence);
@@ -1821,7 +1851,7 @@ void ManagementAgent::handleGetQueryLH(Buffer& inBuffer, const string& replyToKe
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
sendBufferLH(outBuffer, outLen, dExchange, replyToKey); // drops lock
- QPID_LOG(trace, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence);
+ QPID_LOG(debug, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence);
continue; // lock dropped, need to re-find _SAME_ objid as it may have been deleted.
}
encodeHeader(outBuffer, 'g', sequence);
@@ -1837,7 +1867,7 @@ void ManagementAgent::handleGetQueryLH(Buffer& inBuffer, const string& replyToKe
if (outLen) {
outBuffer.reset ();
sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(trace, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence);
+ QPID_LOG(debug, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence);
}
sendCommandCompleteLH(replyToKey, sequence);
@@ -1853,7 +1883,7 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& replyTo
Variant::Map headers;
MapCodec::decode(body, inMap);
- QPID_LOG(trace, "RECV GetQuery (v2): map=" << inMap << " seq=" << cid);
+ QPID_LOG(debug, "RECV GetQuery (v2): map=" << inMap << " seq=" << cid);
headers["method"] = "response";
headers["qmf.opcode"] = "_query_response";
@@ -1935,7 +1965,7 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& replyTo
ListCodec::encode(list_, content);
sendBufferLH(content, cid, headers, "amqp/list", v2Direct, replyTo);
- QPID_LOG(trace, "SENT QueryResponse (query by object_id) to=" << replyTo);
+ QPID_LOG(debug, "SENT QueryResponse (query by object_id) to=" << replyTo);
return;
}
} else {
@@ -1989,12 +2019,12 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& replyTo
ListCodec::encode(_list.front().asList(), content);
sendBufferLH(content, cid, headers, "amqp/list", v2Direct, replyTo);
_list.pop_front();
- QPID_LOG(trace, "SENT QueryResponse (partial, query by schema_id) to=" << replyTo << " len=" << content.length());
+ QPID_LOG(debug, "SENT QueryResponse (partial, query by schema_id) to=" << replyTo << " len=" << content.length());
}
headers.erase("partial");
ListCodec::encode(_list.size() ? _list.front().asList() : Variant::List(), content);
sendBufferLH(content, cid, headers, "amqp/list", v2Direct, replyTo);
- QPID_LOG(trace, "SENT QueryResponse (query by schema_id) to=" << replyTo << " len=" << content.length());
+ QPID_LOG(debug, "SENT QueryResponse (query by schema_id) to=" << replyTo << " len=" << content.length());
return;
}
@@ -2002,14 +2032,14 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& replyTo
string content;
ListCodec::encode(Variant::List(), content);
sendBufferLH(content, cid, headers, "amqp/list", v2Direct, replyTo);
- QPID_LOG(trace, "SENT QueryResponse (empty) to=" << replyTo);
+ QPID_LOG(debug, "SENT QueryResponse (empty) to=" << replyTo);
}
void ManagementAgent::handleLocateRequestLH(const string&, const string& replyTo,
const string& cid)
{
- QPID_LOG(trace, "RCVD AgentLocateRequest");
+ QPID_LOG(debug, "RCVD AgentLocateRequest");
Variant::Map map;
Variant::Map headers;
@@ -2028,7 +2058,7 @@ void ManagementAgent::handleLocateRequestLH(const string&, const string& replyTo
sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyTo);
clientWasAdded = true;
- QPID_LOG(trace, "SENT AgentLocateResponse replyTo=" << replyTo);
+ QPID_LOG(debug, "SENT AgentLocateResponse replyTo=" << replyTo);
}
@@ -2171,7 +2201,7 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg)
sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
}
- QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence);
+ QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence);
}
return false;
@@ -2269,7 +2299,7 @@ ManagementAgent::PackageMap::iterator ManagementAgent::findOrAddPackageLH(string
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
sendBufferLH(outBuffer, outLen, mExchange, "schema.package");
- QPID_LOG(trace, "SEND PackageInd package=" << name << " to=schema.package");
+ QPID_LOG(debug, "SEND PackageInd package=" << name << " to=schema.package");
return result.first;
}
@@ -2639,12 +2669,13 @@ void ManagementAgent::importAgents(qpid::framing::Buffer& inBuf) {
}
namespace {
-bool isNotDeleted(const ManagementObjectMap::value_type& value) {
- return !value.second->isDeleted();
+bool isDeleted(const ManagementObjectMap::value_type& value) {
+ return value.second->isDeleted();
}
-size_t countNotDeleted(const ManagementObjectMap& map) {
- return std::count_if(map.begin(), map.end(), isNotDeleted);
+void summarizeMap(std::ostream& o, const char* name, const ManagementObjectMap& map) {
+ size_t deleted = std::count_if(map.begin(), map.end(), isDeleted);
+ o << map.size() << " " << name << " (" << deleted << " deleted), ";
}
void dumpMap(std::ostream& o, const ManagementObjectMap& map) {
@@ -2657,13 +2688,18 @@ void dumpMap(std::ostream& o, const ManagementObjectMap& map) {
string ManagementAgent::debugSnapshot() {
ostringstream msg;
- msg << " management snapshot:";
- for (RemoteAgentMap::const_iterator i=remoteAgents.begin();
- i != remoteAgents.end(); ++i)
- msg << " " << i->second->routingKey;
- msg << " packages: " << packages.size();
- msg << " objects: " << countNotDeleted(managementObjects);
- msg << " new objects: " << countNotDeleted(newManagementObjects);
+ msg << " management snapshot: ";
+ if (!remoteAgents.empty()) {
+ msg << remoteAgents.size() << " agents(";
+ for (RemoteAgentMap::const_iterator i=remoteAgents.begin();
+ i != remoteAgents.end(); ++i)
+ msg << " " << i->second->routingKey;
+ msg << "), ";
+ }
+ msg << packages.size() << " packages, ";
+ summarizeMap(msg, "objects", managementObjects);
+ summarizeMap(msg, "new objects ", newManagementObjects);
+ msg << pendingDeletedObjs.size() << " pending deletes" ;
return msg.str();
}
diff --git a/qpid/cpp/src/qpid/management/ManagementObject.cpp b/qpid/cpp/src/qpid/management/ManagementObject.cpp
index 670a242c02..cfdd58ed53 100644
--- a/qpid/cpp/src/qpid/management/ManagementObject.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementObject.cpp
@@ -262,6 +262,7 @@ void ManagementObject::setUpdateTime()
void ManagementObject::resourceDestroy()
{
+ QPID_LOG(trace, "Management object marked deleted: " << getObjectId().getV2Key());
destroyTime = sys::Duration(sys::EPOCH, sys::now());
deleted = true;
}
diff --git a/qpid/cpp/src/tests/cluster_test_logs.py b/qpid/cpp/src/tests/cluster_test_logs.py
new file mode 100755
index 0000000000..160e15e628
--- /dev/null
+++ b/qpid/cpp/src/tests/cluster_test_logs.py
@@ -0,0 +1,105 @@
+#!/usr/bin/env python
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Functions for comparing broker log files, used by cluster_tests.py.
+
+import os, os.path, re, glob
+from itertools import izip
+
+def split_log(log):
+ """Split a broker log at checkpoints where a member joins.
+ Return the set of checkpoints discovered."""
+ checkpoint_re = re.compile("Member joined, frameSeq=([0-9]+), queue snapshot:")
+ outfile = None
+ checkpoints = []
+ for l in open(log):
+ match = checkpoint_re.search(l)
+ if match:
+ checkpoint = match.groups()[0]
+ checkpoints.append(checkpoint)
+ if outfile: outfile.close()
+ outfile = open("%s.%s"%(log, checkpoint), 'w')
+
+ if outfile: outfile.write(l)
+ if outfile: outfile.close()
+ return checkpoints
+
+def filter_log(log):
+ """Filter the contents of a log file to remove data that is expected
+ to differ between brokers in a cluster. Filtered log contents between
+ the same checkpoints should match across the cluster."""
+ out = open("%s.filter"%(log), 'w')
+ for l in open(log):
+ # Lines to skip entirely
+ skip = "|".join([
+ 'local connection', # Only on local broker
+ 'UPDATER|UPDATEE|OFFER', # Ignore update process
+ 'stall for update|unstall, ignore update|cancelled offer .* unstall',
+ 'caught up',
+ 'active for links|Passivating links|Activating links',
+ 'info Connection.* connected to', # UpdateClient connection
+ 'warning Broker closed connection: 200, OK',
+ 'task late',
+ 'task overran'
+ ])
+ if re.compile(skip).search(l): continue
+
+ # Regex to match a UUID
+ uuid='\w\w\w\w\w\w\w\w-\w\w\w\w-\w\w\w\w-\w\w\w\w-\w\w\w\w\w\w\w\w\w\w\w\w'
+
+ # Regular expression substitutions to remove expected differences
+ for pattern,subst in [
+ (r'\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d', ''), # Remove timestamp
+ (r'cluster\([0-9.: ]*', 'cluster('), # Remove cluster node id
+ (r' local\)| shadow\)', ')'), # Remove local/shadow indication
+ (r'CATCHUP', 'READY'), # Treat catchup as equivalent to ready.
+ # System UUID
+ (r'(org.apache.qpid.broker:system[:(])%s(\)?)'%(uuid), r'\1UUID\2'),
+
+ # FIXME aconway 2010-12-20: substitutions to mask known problems
+ #(r' len=\d+', ' len=NN'), # buffer lengths
+ #(r' map={.*_object_name:([^,}]*)[,}].*', r' \1'), # V2 map - just keep name
+ #(r'\d+-\d+-\d+--\d+', 'X-X-X--X'), # V1 Object IDs
+ ]: l = re.sub(pattern,subst,l)
+ out.write(l)
+ out.close()
+
+def verify_logs(logs):
+ """Compare log files from cluster brokers, verify that they correspond correctly."""
+ for l in glob.glob("*.log"): filter_log(l)
+ checkpoints = set()
+ for l in glob.glob("*.filter"): checkpoints = checkpoints.union(set(split_log(l)))
+ errors=[]
+ for c in checkpoints:
+ fragments = glob.glob("*.filter.%s"%(c))
+ fragments.sort(reverse=True, key=os.path.getsize)
+ while len(fragments) >= 2:
+ a = fragments.pop(0)
+ b = fragments[0]
+ for ab in izip(open(a), open(b)):
+ if ab[0] != ab[1]:
+ errors.append("\n %s %s"%(a, b))
+ break
+ if errors:
+ raise Exception("Files differ in %s"%(os.getcwd())+"".join(errors))
+
+# Can be run as a script.
+if __name__ == "__main__":
+ verify_logs(glob.glob("*.log"))
diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py
index daa47a6322..03913356ca 100755
--- a/qpid/cpp/src/tests/cluster_tests.py
+++ b/qpid/cpp/src/tests/cluster_tests.py
@@ -18,7 +18,7 @@
# under the License.
#
-import os, signal, sys, time, imp, re, subprocess
+import os, signal, sys, time, imp, re, subprocess, glob, cluster_test_logs
from qpid import datatypes, messaging
from qpid.brokertest import *
from qpid.harness import Skipped
@@ -35,7 +35,7 @@ log = getLogger("qpid.cluster_tests")
# a non-0 code. Hence the apparently inconsistent use of EXPECT_EXIT_OK
# and EXPECT_EXIT_FAIL in some of the tests below.
-# FIXME aconway 2010-03-11: resolve this - ideally any exit due to an error
+# TODO aconway 2010-03-11: resolve this - ideally any exit due to an error
# should give non-0 exit status.
# Import scripts as modules
@@ -299,7 +299,10 @@ class LongTests(BrokerTest):
for i in range(i, len(cluster)): cluster[i].kill()
def test_management(self, args=[]):
- """Stress test: Run management clients and other clients concurrently."""
+ """
+ Stress test: Run management clients and other clients concurrently
+ while killing and restarting brokers.
+ """
class ClientLoop(StoppableThread):
"""Run a client executable in a loop."""
@@ -352,9 +355,9 @@ class LongTests(BrokerTest):
finally: self.lock.release()
StoppableThread.stop(self)
- # def test_management
- args += ["--mgmt-pub-interval", 1] # Publish management information every second.
- # FIXME aconway 2010-12-15: extra debugging
+ # body of test_management()
+
+ args += ["--mgmt-pub-interval", 1]
args += ["--log-enable=trace+:management"]
# Use store if present.
if BrokerTest.store_lib: args +=["--load-module", BrokerTest.store_lib]
@@ -403,6 +406,10 @@ class LongTests(BrokerTest):
start_mclients(cluster[alive])
for c in chain(mclients, *clients):
c.stop()
+ # Verify that logs are consistent
+ # FIXME aconway 2010-12-21: this is currently expected to fail due to
+ # known bugs, see https://issues.apache.org/jira/browse/QPID-2982
+ self.assertRaises(Exception, cluster_test_logs.verify_logs, glob.glob("*.log"))
def test_management_qmf2(self):
self.test_management(args=["--mgmt-qmf2=yes"])
@@ -506,7 +513,7 @@ class StoreTests(BrokerTest):
self.assertEqual(a.wait(), 0)
self.assertEqual(c.wait(), 0)
# Mix members from both shutdown events, they should fail
- # FIXME aconway 2010-03-11: can't predict the exit status of these
+ # TODO aconway 2010-03-11: can't predict the exit status of these
# as it depends on the order of delivery of initial-status messages.
# See comment at top of this file.
a = cluster.start("a", expect=EXPECT_UNKNOWN, wait=False)