diff options
author | Ted Ross <tross@apache.org> | 2008-11-21 20:17:22 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2008-11-21 20:17:22 +0000 |
commit | a00e575e242f6c5c9673948efc905226f5a0bfd2 (patch) | |
tree | 8fb608856ca15d5286541844a1dcd513bbd7030f | |
parent | 284d5014e4881605ee63f774a42bb54f05ea9f22 (diff) | |
download | qpid-python-a00e575e242f6c5c9673948efc905226f5a0bfd2.tar.gz |
Fixed several problems related to qmf update timestamps:
- Timestamps were set at update send time regardless of whether
the object's contents were actually changed. Now timestamps are
set at the time of the change.
- Agent heartbeat messages are now being sent after periodic
updates, not before.
Cleaned up the Agent object in qmf.console.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@719699 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-x | qpid/cpp/managementgen/qmfgen/schema.py | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp | 27 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementBroker.cpp | 25 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementObject.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementObject.h | 12 | ||||
-rw-r--r-- | qpid/python/qmf/console.py | 26 |
6 files changed, 50 insertions, 46 deletions
diff --git a/qpid/cpp/managementgen/qmfgen/schema.py b/qpid/cpp/managementgen/qmfgen/schema.py index 692a474992..350a271dd4 100755 --- a/qpid/cpp/managementgen/qmfgen/schema.py +++ b/qpid/cpp/managementgen/qmfgen/schema.py @@ -145,6 +145,7 @@ class SchemaType: stream.write (" " + prefix + varName + "Max = val;\n") if changeFlag != None: stream.write (" " + changeFlag + " = true;\n") + stream.write (" setUpdateTime();\n") stream.write (" }\n") if self.style != "mma": stream.write (" inline " + self.asArg + " get_" + varName + "() {\n"); @@ -157,6 +158,7 @@ class SchemaType: stream.write (" presenceMask[presenceByte_%s] &= ~presenceMask_%s;\n" % (varName, varName)) if changeFlag != None: stream.write (" " + changeFlag + " = true;\n") + stream.write (" setUpdateTime();\n") stream.write (" }\n") stream.write (" inline bool isSet_" + varName + "() {\n") stream.write (" return (presenceMask[presenceByte_%s] & presenceMask_%s) != 0;\n" % (varName, varName)) @@ -171,6 +173,7 @@ class SchemaType: stream.write (" " + varName + "High = " + varName + ";\n") if changeFlag != None: stream.write (" " + changeFlag + " = true;\n") + stream.write (" setUpdateTime();\n") stream.write (" }\n"); stream.write (" inline void dec_" + varName + " (" + self.asArg + " by = 1) {\n"); if not self.perThread: @@ -181,6 +184,7 @@ class SchemaType: stream.write (" " + varName + "Low = " + varName + ";\n") if changeFlag != None: stream.write (" " + changeFlag + " = true;\n") + stream.write (" setUpdateTime();\n") stream.write (" }\n"); def genHiLoStatResets (self, stream, varName): diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp index dc9122664d..1f7d2569fb 100644 --- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp +++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp @@ -687,18 +687,6 @@ void ManagementAgentImpl::periodicProcessing() if (!connected) return; - { - Buffer msgBuffer(msgChars, BUFSIZE); - encodeHeader(msgBuffer, 'h'); - msgBuffer.putLongLong(uint64_t(Duration(now()))); - stringstream key; - key << "console.heartbeat." << assignedBrokerBank << "." << assignedAgentBank; - - contentSize = BUFSIZE - msgBuffer.available(); - msgBuffer.reset(); - connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", key.str()); - } - moveNewObjectsLH(); if (debugLevel >= DEBUG_PUBLISH) { @@ -715,9 +703,6 @@ void ManagementAgentImpl::periodicProcessing() } } - if (managementObjects.empty()) - return; - // // Clear the been-here flag on all objects in the map. // @@ -792,6 +777,18 @@ void ManagementAgentImpl::periodicProcessing() } deleteList.clear(); + + { + Buffer msgBuffer(msgChars, BUFSIZE); + encodeHeader(msgBuffer, 'h'); + msgBuffer.putLongLong(uint64_t(Duration(now()))); + stringstream key; + key << "console.heartbeat." << assignedBrokerBank << "." << assignedAgentBank; + + contentSize = BUFSIZE - msgBuffer.available(); + msgBuffer.reset(); + connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", key.str()); + } } void ManagementAgentImpl::ConnectionThread::run() diff --git a/qpid/cpp/src/qpid/management/ManagementBroker.cpp b/qpid/cpp/src/qpid/management/ManagementBroker.cpp index 48b73546b3..b800d62fb6 100644 --- a/qpid/cpp/src/qpid/management/ManagementBroker.cpp +++ b/qpid/cpp/src/qpid/management/ManagementBroker.cpp @@ -344,17 +344,6 @@ void ManagementBroker::periodicProcessing (void) string routingKey; list<pair<ObjectId, ManagementObject*> > deleteList; - { - Buffer msgBuffer(msgChars, BUFSIZE); - encodeHeader(msgBuffer, 'h'); - msgBuffer.putLongLong(uint64_t(Duration(now()))); - - contentSize = BUFSIZE - msgBuffer.available (); - msgBuffer.reset (); - routingKey = "console.heartbeat.1.0"; - sendBuffer (msgBuffer, contentSize, mExchange, routingKey); - } - moveNewObjectsLH(); if (clientWasAdded) { @@ -367,9 +356,6 @@ void ManagementBroker::periodicProcessing (void) } } - if (managementObjects.empty ()) - return; - for (ManagementObjectMap::iterator iter = managementObjects.begin (); iter != managementObjects.end (); iter++) @@ -416,6 +402,17 @@ void ManagementBroker::periodicProcessing (void) deleteList.clear(); deleteOrphanedAgentsLH(); } + + { + Buffer msgBuffer(msgChars, BUFSIZE); + encodeHeader(msgBuffer, 'h'); + msgBuffer.putLongLong(uint64_t(Duration(now()))); + + contentSize = BUFSIZE - msgBuffer.available (); + msgBuffer.reset (); + routingKey = "console.heartbeat.1.0"; + sendBuffer (msgBuffer, contentSize, mExchange, routingKey); + } } void ManagementBroker::sendCommandComplete (string replyToKey, uint32_t sequence, diff --git a/qpid/cpp/src/qpid/management/ManagementObject.cpp b/qpid/cpp/src/qpid/management/ManagementObject.cpp index 5a40e3f619..f4c45de126 100644 --- a/qpid/cpp/src/qpid/management/ManagementObject.cpp +++ b/qpid/cpp/src/qpid/management/ManagementObject.cpp @@ -163,7 +163,7 @@ void ManagementObject::writeTimestamps (framing::Buffer& buf) buf.putShortString (getPackageName ()); buf.putShortString (getClassName ()); buf.putBin128 (getMd5Sum ()); - buf.putLongLong (uint64_t (sys::Duration (sys::now ()))); + buf.putLongLong (updateTime); buf.putLongLong (createTime); buf.putLongLong (destroyTime); objectId.encode(buf); diff --git a/qpid/cpp/src/qpid/management/ManagementObject.h b/qpid/cpp/src/qpid/management/ManagementObject.h index a34f50ab8f..6f229353e3 100644 --- a/qpid/cpp/src/qpid/management/ManagementObject.h +++ b/qpid/cpp/src/qpid/management/ManagementObject.h @@ -114,6 +114,7 @@ class ManagementObject : public ManagementItem uint64_t createTime; uint64_t destroyTime; + uint64_t updateTime; ObjectId objectId; bool configChanged; bool instChanged; @@ -132,11 +133,11 @@ class ManagementObject : public ManagementItem public: typedef void (*writeSchemaCall_t) (qpid::framing::Buffer&); - ManagementObject (ManagementAgent* _agent, Manageable* _core) : - destroyTime(0), configChanged(true), - instChanged(true), deleted(false), coreObject(_core), agent(_agent) - { createTime = uint64_t (qpid::sys::Duration (qpid::sys::now ())); } - virtual ~ManagementObject () {} + ManagementObject(ManagementAgent* _agent, Manageable* _core) : + createTime(uint64_t(qpid::sys::Duration(qpid::sys::now()))), + destroyTime(0), updateTime(createTime), configChanged(true), + instChanged(true), deleted(false), coreObject(_core), agent(_agent) {} + virtual ~ManagementObject() {} virtual writeSchemaCall_t getWriteSchemaCall (void) = 0; virtual void writeProperties(qpid::framing::Buffer& buf) = 0; @@ -159,6 +160,7 @@ class ManagementObject : public ManagementItem configChanged = true; instChanged = true; } + inline void setUpdateTime() { updateTime = (uint64_t(sys::Duration(sys::now()))); } inline void resourceDestroy (void) { destroyTime = uint64_t (qpid::sys::Duration (qpid::sys::now ())); diff --git a/qpid/python/qmf/console.py b/qpid/python/qmf/console.py index 4e74cac4ee..8ccbefaf47 100644 --- a/qpid/python/qmf/console.py +++ b/qpid/python/qmf/console.py @@ -340,7 +340,7 @@ class Session: self.cv.release() broker._setHeader(sendCodec, 'G', seq) sendCodec.write_map(map) - smsg = broker._message(sendCodec.encoded, "agent.%s" % agent.bank) + smsg = broker._message(sendCodec.encoded, "agent.%d.%d" % (agent.brokerBank, agent.agentBank)) broker._send(smsg) starttime = time() @@ -507,7 +507,7 @@ class Session: agent = broker.getAgent(brokerBank, agentBank) timestamp = codec.read_uint64() - if self.console != None: + if self.console != None and agent != None: self.console.heartbeat(agent, timestamp) def _handleEventInd(self, broker, codec, seq): @@ -1135,7 +1135,7 @@ class Broker: self.authPass = authPass self.topicCredits = topicCredits self.agents = {} - self.agents["1.0"] = Agent(self, "1.0", "BrokerAgent") + self.agents[(1,0)] = Agent(self, 0, "BrokerAgent") self.topicBound = False self.cv = Condition() self.syncInFlight = False @@ -1162,7 +1162,7 @@ class Broker: return 1 def getAgent(self, brokerBank, agentBank): - bankKey = "%d.%d" % (brokerBank, agentBank) + bankKey = (brokerBank, agentBank) if bankKey in self.agents: return self.agents[bankKey] return None @@ -1250,10 +1250,10 @@ class Broker: self.error = "Connect Failed %d - %s" % (e[0], e[1]) def _updateAgent(self, obj): - bankKey = "%d.%d" % (obj.brokerBank, obj.agentBank) + bankKey = (obj.brokerBank, obj.agentBank) if obj._deleteTime == 0: if bankKey not in self.agents: - agent = Agent(self, bankKey, obj.label) + agent = Agent(self, obj.agentBank, obj.label) self.agents[bankKey] = agent if self.session.console != None: self.session.console.newAgent(agent) @@ -1377,19 +1377,23 @@ class Broker: class Agent: """ """ - def __init__(self, broker, bank, label): + def __init__(self, broker, agentBank, label): self.broker = broker - self.bank = bank - self.label = label + self.brokerBank = broker.getBrokerBank() + self.agentBank = agentBank + self.label = label def __repr__(self): - return "Agent at bank %s (%s)" % (self.bank, self.label) + return "Agent at bank %d.%d (%s)" % (self.brokerBank, self.agentBank, self.label) def getBroker(self): return self.broker + def getBrokerBank(self): + return self.brokerBank + def getAgentBank(self): - return self.bank + return self.agentBank class Event: """ """ |