summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2008-11-21 20:17:22 +0000
committerTed Ross <tross@apache.org>2008-11-21 20:17:22 +0000
commita00e575e242f6c5c9673948efc905226f5a0bfd2 (patch)
tree8fb608856ca15d5286541844a1dcd513bbd7030f
parent284d5014e4881605ee63f774a42bb54f05ea9f22 (diff)
downloadqpid-python-a00e575e242f6c5c9673948efc905226f5a0bfd2.tar.gz
Fixed several problems related to qmf update timestamps:
- Timestamps were set at update send time regardless of whether the object's contents were actually changed. Now timestamps are set at the time of the change. - Agent heartbeat messages are now being sent after periodic updates, not before. Cleaned up the Agent object in qmf.console. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@719699 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-xqpid/cpp/managementgen/qmfgen/schema.py4
-rw-r--r--qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp27
-rw-r--r--qpid/cpp/src/qpid/management/ManagementBroker.cpp25
-rw-r--r--qpid/cpp/src/qpid/management/ManagementObject.cpp2
-rw-r--r--qpid/cpp/src/qpid/management/ManagementObject.h12
-rw-r--r--qpid/python/qmf/console.py26
6 files changed, 50 insertions, 46 deletions
diff --git a/qpid/cpp/managementgen/qmfgen/schema.py b/qpid/cpp/managementgen/qmfgen/schema.py
index 692a474992..350a271dd4 100755
--- a/qpid/cpp/managementgen/qmfgen/schema.py
+++ b/qpid/cpp/managementgen/qmfgen/schema.py
@@ -145,6 +145,7 @@ class SchemaType:
stream.write (" " + prefix + varName + "Max = val;\n")
if changeFlag != None:
stream.write (" " + changeFlag + " = true;\n")
+ stream.write (" setUpdateTime();\n")
stream.write (" }\n")
if self.style != "mma":
stream.write (" inline " + self.asArg + " get_" + varName + "() {\n");
@@ -157,6 +158,7 @@ class SchemaType:
stream.write (" presenceMask[presenceByte_%s] &= ~presenceMask_%s;\n" % (varName, varName))
if changeFlag != None:
stream.write (" " + changeFlag + " = true;\n")
+ stream.write (" setUpdateTime();\n")
stream.write (" }\n")
stream.write (" inline bool isSet_" + varName + "() {\n")
stream.write (" return (presenceMask[presenceByte_%s] & presenceMask_%s) != 0;\n" % (varName, varName))
@@ -171,6 +173,7 @@ class SchemaType:
stream.write (" " + varName + "High = " + varName + ";\n")
if changeFlag != None:
stream.write (" " + changeFlag + " = true;\n")
+ stream.write (" setUpdateTime();\n")
stream.write (" }\n");
stream.write (" inline void dec_" + varName + " (" + self.asArg + " by = 1) {\n");
if not self.perThread:
@@ -181,6 +184,7 @@ class SchemaType:
stream.write (" " + varName + "Low = " + varName + ";\n")
if changeFlag != None:
stream.write (" " + changeFlag + " = true;\n")
+ stream.write (" setUpdateTime();\n")
stream.write (" }\n");
def genHiLoStatResets (self, stream, varName):
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
index dc9122664d..1f7d2569fb 100644
--- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
+++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
@@ -687,18 +687,6 @@ void ManagementAgentImpl::periodicProcessing()
if (!connected)
return;
- {
- Buffer msgBuffer(msgChars, BUFSIZE);
- encodeHeader(msgBuffer, 'h');
- msgBuffer.putLongLong(uint64_t(Duration(now())));
- stringstream key;
- key << "console.heartbeat." << assignedBrokerBank << "." << assignedAgentBank;
-
- contentSize = BUFSIZE - msgBuffer.available();
- msgBuffer.reset();
- connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", key.str());
- }
-
moveNewObjectsLH();
if (debugLevel >= DEBUG_PUBLISH) {
@@ -715,9 +703,6 @@ void ManagementAgentImpl::periodicProcessing()
}
}
- if (managementObjects.empty())
- return;
-
//
// Clear the been-here flag on all objects in the map.
//
@@ -792,6 +777,18 @@ void ManagementAgentImpl::periodicProcessing()
}
deleteList.clear();
+
+ {
+ Buffer msgBuffer(msgChars, BUFSIZE);
+ encodeHeader(msgBuffer, 'h');
+ msgBuffer.putLongLong(uint64_t(Duration(now())));
+ stringstream key;
+ key << "console.heartbeat." << assignedBrokerBank << "." << assignedAgentBank;
+
+ contentSize = BUFSIZE - msgBuffer.available();
+ msgBuffer.reset();
+ connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", key.str());
+ }
}
void ManagementAgentImpl::ConnectionThread::run()
diff --git a/qpid/cpp/src/qpid/management/ManagementBroker.cpp b/qpid/cpp/src/qpid/management/ManagementBroker.cpp
index 48b73546b3..b800d62fb6 100644
--- a/qpid/cpp/src/qpid/management/ManagementBroker.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementBroker.cpp
@@ -344,17 +344,6 @@ void ManagementBroker::periodicProcessing (void)
string routingKey;
list<pair<ObjectId, ManagementObject*> > deleteList;
- {
- Buffer msgBuffer(msgChars, BUFSIZE);
- encodeHeader(msgBuffer, 'h');
- msgBuffer.putLongLong(uint64_t(Duration(now())));
-
- contentSize = BUFSIZE - msgBuffer.available ();
- msgBuffer.reset ();
- routingKey = "console.heartbeat.1.0";
- sendBuffer (msgBuffer, contentSize, mExchange, routingKey);
- }
-
moveNewObjectsLH();
if (clientWasAdded) {
@@ -367,9 +356,6 @@ void ManagementBroker::periodicProcessing (void)
}
}
- if (managementObjects.empty ())
- return;
-
for (ManagementObjectMap::iterator iter = managementObjects.begin ();
iter != managementObjects.end ();
iter++)
@@ -416,6 +402,17 @@ void ManagementBroker::periodicProcessing (void)
deleteList.clear();
deleteOrphanedAgentsLH();
}
+
+ {
+ Buffer msgBuffer(msgChars, BUFSIZE);
+ encodeHeader(msgBuffer, 'h');
+ msgBuffer.putLongLong(uint64_t(Duration(now())));
+
+ contentSize = BUFSIZE - msgBuffer.available ();
+ msgBuffer.reset ();
+ routingKey = "console.heartbeat.1.0";
+ sendBuffer (msgBuffer, contentSize, mExchange, routingKey);
+ }
}
void ManagementBroker::sendCommandComplete (string replyToKey, uint32_t sequence,
diff --git a/qpid/cpp/src/qpid/management/ManagementObject.cpp b/qpid/cpp/src/qpid/management/ManagementObject.cpp
index 5a40e3f619..f4c45de126 100644
--- a/qpid/cpp/src/qpid/management/ManagementObject.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementObject.cpp
@@ -163,7 +163,7 @@ void ManagementObject::writeTimestamps (framing::Buffer& buf)
buf.putShortString (getPackageName ());
buf.putShortString (getClassName ());
buf.putBin128 (getMd5Sum ());
- buf.putLongLong (uint64_t (sys::Duration (sys::now ())));
+ buf.putLongLong (updateTime);
buf.putLongLong (createTime);
buf.putLongLong (destroyTime);
objectId.encode(buf);
diff --git a/qpid/cpp/src/qpid/management/ManagementObject.h b/qpid/cpp/src/qpid/management/ManagementObject.h
index a34f50ab8f..6f229353e3 100644
--- a/qpid/cpp/src/qpid/management/ManagementObject.h
+++ b/qpid/cpp/src/qpid/management/ManagementObject.h
@@ -114,6 +114,7 @@ class ManagementObject : public ManagementItem
uint64_t createTime;
uint64_t destroyTime;
+ uint64_t updateTime;
ObjectId objectId;
bool configChanged;
bool instChanged;
@@ -132,11 +133,11 @@ class ManagementObject : public ManagementItem
public:
typedef void (*writeSchemaCall_t) (qpid::framing::Buffer&);
- ManagementObject (ManagementAgent* _agent, Manageable* _core) :
- destroyTime(0), configChanged(true),
- instChanged(true), deleted(false), coreObject(_core), agent(_agent)
- { createTime = uint64_t (qpid::sys::Duration (qpid::sys::now ())); }
- virtual ~ManagementObject () {}
+ ManagementObject(ManagementAgent* _agent, Manageable* _core) :
+ createTime(uint64_t(qpid::sys::Duration(qpid::sys::now()))),
+ destroyTime(0), updateTime(createTime), configChanged(true),
+ instChanged(true), deleted(false), coreObject(_core), agent(_agent) {}
+ virtual ~ManagementObject() {}
virtual writeSchemaCall_t getWriteSchemaCall (void) = 0;
virtual void writeProperties(qpid::framing::Buffer& buf) = 0;
@@ -159,6 +160,7 @@ class ManagementObject : public ManagementItem
configChanged = true;
instChanged = true;
}
+ inline void setUpdateTime() { updateTime = (uint64_t(sys::Duration(sys::now()))); }
inline void resourceDestroy (void) {
destroyTime = uint64_t (qpid::sys::Duration (qpid::sys::now ()));
diff --git a/qpid/python/qmf/console.py b/qpid/python/qmf/console.py
index 4e74cac4ee..8ccbefaf47 100644
--- a/qpid/python/qmf/console.py
+++ b/qpid/python/qmf/console.py
@@ -340,7 +340,7 @@ class Session:
self.cv.release()
broker._setHeader(sendCodec, 'G', seq)
sendCodec.write_map(map)
- smsg = broker._message(sendCodec.encoded, "agent.%s" % agent.bank)
+ smsg = broker._message(sendCodec.encoded, "agent.%d.%d" % (agent.brokerBank, agent.agentBank))
broker._send(smsg)
starttime = time()
@@ -507,7 +507,7 @@ class Session:
agent = broker.getAgent(brokerBank, agentBank)
timestamp = codec.read_uint64()
- if self.console != None:
+ if self.console != None and agent != None:
self.console.heartbeat(agent, timestamp)
def _handleEventInd(self, broker, codec, seq):
@@ -1135,7 +1135,7 @@ class Broker:
self.authPass = authPass
self.topicCredits = topicCredits
self.agents = {}
- self.agents["1.0"] = Agent(self, "1.0", "BrokerAgent")
+ self.agents[(1,0)] = Agent(self, 0, "BrokerAgent")
self.topicBound = False
self.cv = Condition()
self.syncInFlight = False
@@ -1162,7 +1162,7 @@ class Broker:
return 1
def getAgent(self, brokerBank, agentBank):
- bankKey = "%d.%d" % (brokerBank, agentBank)
+ bankKey = (brokerBank, agentBank)
if bankKey in self.agents:
return self.agents[bankKey]
return None
@@ -1250,10 +1250,10 @@ class Broker:
self.error = "Connect Failed %d - %s" % (e[0], e[1])
def _updateAgent(self, obj):
- bankKey = "%d.%d" % (obj.brokerBank, obj.agentBank)
+ bankKey = (obj.brokerBank, obj.agentBank)
if obj._deleteTime == 0:
if bankKey not in self.agents:
- agent = Agent(self, bankKey, obj.label)
+ agent = Agent(self, obj.agentBank, obj.label)
self.agents[bankKey] = agent
if self.session.console != None:
self.session.console.newAgent(agent)
@@ -1377,19 +1377,23 @@ class Broker:
class Agent:
""" """
- def __init__(self, broker, bank, label):
+ def __init__(self, broker, agentBank, label):
self.broker = broker
- self.bank = bank
- self.label = label
+ self.brokerBank = broker.getBrokerBank()
+ self.agentBank = agentBank
+ self.label = label
def __repr__(self):
- return "Agent at bank %s (%s)" % (self.bank, self.label)
+ return "Agent at bank %d.%d (%s)" % (self.brokerBank, self.agentBank, self.label)
def getBroker(self):
return self.broker
+ def getBrokerBank(self):
+ return self.brokerBank
+
def getAgentBank(self):
- return self.bank
+ return self.agentBank
class Event:
""" """