summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2010-03-26 17:01:23 +0000
committerTed Ross <tross@apache.org>2010-03-26 17:01:23 +0000
commit0c72a9c689407001ab0ccab00d091166b93ed9ec (patch)
treebf2b3f450cd8da479e56c12beec264896207d912
parent412859fac1743869f40cf56598ca09dbcfb06379 (diff)
downloadqpid-python-0c72a9c689407001ab0ccab00d091166b93ed9ec.tar.gz
Updating qmf branch with latest updates:
- Implemented V2 query in Python - Cleaned up map formatting in c++ agents - Updated for changes in the protocol wiki - Significant cleanup and refactoring in the pure-Python console git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmf-devel0.7a@927970 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/include/qpid/management/ManagementObject.h3
-rwxr-xr-xqpid/cpp/managementgen/qmfgen/schema.py2
-rw-r--r--qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp267
-rw-r--r--qpid/cpp/src/qpid/agent/ManagementAgentImpl.h8
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.cpp10
-rw-r--r--qpid/cpp/src/qpid/management/ManagementObject.cpp5
-rw-r--r--qpid/extras/qmf/src/py/qmf/console.py855
7 files changed, 727 insertions, 423 deletions
diff --git a/qpid/cpp/include/qpid/management/ManagementObject.h b/qpid/cpp/include/qpid/management/ManagementObject.h
index 2a5e5b6e52..1bb3c57ed2 100644
--- a/qpid/cpp/include/qpid/management/ManagementObject.h
+++ b/qpid/cpp/include/qpid/management/ManagementObject.h
@@ -59,7 +59,8 @@ protected:
void fromString(const std::string&);
public:
QPID_COMMON_EXTERN ObjectId() : agent(0), first(0) {}
- QPID_COMMON_EXTERN ObjectId(const messaging::Variant& map) : agent(0) { mapDecode(map.asMap()); }
+ QPID_COMMON_EXTERN ObjectId(const messaging::Variant& map) :
+ agent(0), first(0), agentEpoch(0) { mapDecode(map.asMap()); }
QPID_COMMON_EXTERN ObjectId(uint8_t flags, uint16_t seq, uint32_t broker);
QPID_COMMON_EXTERN ObjectId(AgentAttachment* _agent, uint8_t flags, uint16_t seq);
QPID_COMMON_EXTERN ObjectId(std::istream&);
diff --git a/qpid/cpp/managementgen/qmfgen/schema.py b/qpid/cpp/managementgen/qmfgen/schema.py
index ca6628c366..a827799b4b 100755
--- a/qpid/cpp/managementgen/qmfgen/schema.py
+++ b/qpid/cpp/managementgen/qmfgen/schema.py
@@ -682,7 +682,7 @@ class SchemaStatistic:
def genMap (self, stream):
if self.type.type.perThread:
- self.type.type.genMap(stream, "totals." + self.name)
+ self.type.type.genMap(stream, "totals." + self.name, key=self.name)
else:
self.type.type.genMap(stream, self.name)
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
index 1633e77a4f..5966b0766c 100644
--- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
+++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
@@ -44,6 +44,7 @@ using std::ifstream;
using std::string;
using std::cout;
using std::endl;
+using qpid::messaging::Variant;
Mutex ManagementAgent::Singleton::lock;
bool ManagementAgent::Singleton::disabled = false;
@@ -179,7 +180,7 @@ void ManagementAgentImpl::init(const qpid::client::ConnectionSettings& settings,
void ManagementAgentImpl::registerClass(const string& packageName,
const string& className,
uint8_t* md5Sum,
- qpid::management::ManagementObject::writeSchemaCall_t schemaCall)
+ ManagementObject::writeSchemaCall_t schemaCall)
{
Mutex::ScopedLock lock(agentLock);
PackageMap::iterator pIter = findOrAddPackage(packageName);
@@ -189,7 +190,7 @@ void ManagementAgentImpl::registerClass(const string& packageName,
void ManagementAgentImpl::registerEvent(const string& packageName,
const string& eventName,
uint8_t* md5Sum,
- qpid::management::ManagementObject::writeSchemaCall_t schemaCall)
+ ManagementObject::writeSchemaCall_t schemaCall)
{
Mutex::ScopedLock lock(agentLock);
PackageMap::iterator pIter = findOrAddPackage(packageName);
@@ -239,12 +240,12 @@ void ManagementAgentImpl::raiseEvent(const ManagementEvent& event, severity_t se
key << "console.event." << assignedBrokerBank << "." << assignedAgentBank << "." <<
event.getPackageName() << "." << event.getEventName();
- ::qpid::messaging::Message msg;
- ::qpid::messaging::MapContent content(msg);
- ::qpid::messaging::VariantMap &map_ = content.asMap();
- ::qpid::messaging::VariantMap schemaId;
- ::qpid::messaging::VariantMap values;
- ::qpid::messaging::VariantMap headers;
+ qpid::messaging::Message msg;
+ qpid::messaging::MapContent content(msg);
+ Variant::Map &map_ = content.asMap();
+ Variant::Map schemaId;
+ Variant::Map values;
+ Variant::Map headers;
map_["_schema_id"] = mapEncodeSchemaId(event.getPackageName(),
event.getEventName(),
@@ -379,73 +380,29 @@ void ManagementAgentImpl::sendHeartbeat()
QPID_LOG(trace, "SENT AgentHeartbeat name=" << name_address);
}
-void ManagementAgentImpl::sendCommandComplete(string replyToKey, uint32_t sequence,
- uint32_t code, string text)
+void ManagementAgentImpl::sendException(const string& replyToKey, const string& cid,
+ const string& text, uint32_t code)
{
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
-
- encodeHeader(outBuffer, 'z', sequence);
- outBuffer.putLong(code);
- outBuffer.putShortString(text);
- outLen = MA_BUFFER_SIZE - outBuffer.available();
- outBuffer.reset();
- connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyToKey);
- QPID_LOG(trace, "SENT CommandComplete: seq=" << sequence << " code=" << code << " text=" << text);
-}
-
-void ManagementAgentImpl::handleAttachResponse(Buffer& inBuffer)
-{
- Mutex::ScopedLock lock(agentLock);
-
- assignedBrokerBank = inBuffer.getLong();
- assignedAgentBank = inBuffer.getLong();
-
- QPID_LOG(trace, "RCVD AttachResponse: broker=" << assignedBrokerBank << " agent=" << assignedAgentBank);
-
- if ((assignedBrokerBank != requestedBrokerBank) ||
- (assignedAgentBank != requestedAgentBank)) {
- if (requestedAgentBank == 0) {
- QPID_LOG(notice, "Initial object-id bank assigned: " << assignedBrokerBank << "." <<
- assignedAgentBank);
- } else {
- QPID_LOG(warning, "Collision in object-id! New bank assigned: " << assignedBrokerBank <<
- "." << assignedAgentBank);
- }
- storeData();
- requestedBrokerBank = assignedBrokerBank;
- requestedAgentBank = assignedAgentBank;
- }
+ static const string addr_exchange("qmf.default.direct");
- attachment.setBanks(assignedBrokerBank, assignedAgentBank);
+ messaging::Message msg;
+ messaging::MapContent content(msg);
+ messaging::Variant::Map& map(content.asMap());
+ messaging::Variant::Map headers;
+ messaging::Variant::Map values;
- // Bind to qpid.management to receive commands
- connThreadBody.bindToBank(assignedBrokerBank, assignedAgentBank);
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_exception";
+ headers["qmf.agent"] = name_address;
- // Send package indications for all local packages
- for (PackageMap::iterator pIter = packages.begin();
- pIter != packages.end();
- pIter++) {
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
+ values["error_code"] = code;
+ values["error_text"] = text;
+ map["_values"] = values;
- encodeHeader(outBuffer, 'p');
- encodePackageIndication(outBuffer, pIter);
- outLen = MA_BUFFER_SIZE - outBuffer.available();
- outBuffer.reset();
- connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker");
+ content.encode();
+ connThreadBody.sendBuffer(msg.getContent(), cid, headers, addr_exchange, replyToKey);
- // Send class indications for all local classes
- ClassMap cMap = pIter->second;
- for (ClassMap::iterator cIter = cMap.begin(); cIter != cMap.end(); cIter++) {
- outBuffer.reset();
- encodeHeader(outBuffer, 'q');
- encodeClassIndication(outBuffer, pIter, cIter);
- outLen = MA_BUFFER_SIZE - outBuffer.available();
- outBuffer.reset();
- connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker");
- }
- }
+ QPID_LOG(trace, "SENT Exception code=" << code <<" text=" << text);
}
void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, const string& replyTo)
@@ -509,7 +466,7 @@ void ManagementAgentImpl::invokeMethodRequest(const string& body, const string&
} else {
string methodName;
ObjectId objId;
- qpid::messaging::Variant::Map inArgs;
+ Variant::Map inArgs;
try {
// coversions will throw if input is invalid.
@@ -539,7 +496,7 @@ void ManagementAgentImpl::invokeMethodRequest(const string& body, const string&
}
}
- qpid::messaging::Variant::Map headers;
+ Variant::Map headers;
headers["method"] = "response";
headers["qmf.agent"] = name_address;
if (failed)
@@ -551,20 +508,14 @@ void ManagementAgentImpl::invokeMethodRequest(const string& body, const string&
connThreadBody.sendBuffer(outMsg.getContent(), cid, headers, "qmf.default.direct", replyTo);
}
-void ManagementAgentImpl::handleGetQuery(const string& body, const string& contentType,
- const string& cid, const string& replyTo)
+void ManagementAgentImpl::handleGetQuery(const string& body, const string& cid, const string& replyTo)
{
moveNewObjectsLH();
- if (contentType != "_query_v1") {
- QPID_LOG(warning, "Support for QMF V2 Query format TBD!!!");
- return;
- }
-
qpid::messaging::Message inMsg(body);
qpid::messaging::MapView inMap(inMsg);
qpid::messaging::MapView::const_iterator i;
- ::qpid::messaging::Variant::Map headers;
+ Variant::Map headers;
QPID_LOG(trace, "RCVD GetQuery: map=" << inMap << " cid=" << cid);
@@ -572,71 +523,111 @@ void ManagementAgentImpl::handleGetQuery(const string& body, const string& conte
headers["qmf.opcode"] = "_query_response";
headers["qmf.content"] = "_data";
headers["qmf.agent"] = name_address;
- headers["partial"];
+ headers["partial"] = Variant();
+
+ qpid::messaging::Message outMsg;
+ qpid::messaging::ListContent content(outMsg);
+ Variant::List &list_ = content.asList();
+ Variant::Map map_;
+ Variant::Map values;
+ Variant::Map oidMap;
+
+ /*
+ * Unpack the _what element of the query. Currently we only support OBJECT queries.
+ */
+ i = inMap.find("_what");
+ if (i == inMap.end()) {
+ sendException(replyTo, cid, "_what element missing in Query");
+ return;
+ }
+
+ if (i->second.getType() != qpid::messaging::VAR_STRING) {
+ sendException(replyTo, cid, "_what element is not a string");
+ return;
+ }
+
+ if (i->second.asString() != "OBJECT") {
+ sendException(replyTo, cid, "Query for _what => '" + i->second.asString() + "' not supported");
+ return;
+ }
- ::qpid::messaging::Message outMsg;
- ::qpid::messaging::ListContent content(outMsg);
- ::qpid::messaging::Variant::List &list_ = content.asList();
- ::qpid::messaging::Variant::Map map_;
- ::qpid::messaging::Variant::Map values;
string className;
+ string packageName;
- i = inMap.find("_class");
- if (i != inMap.end())
- try {
- className = i->second.asString();
- } catch(exception& e) {
- className.clear();
- QPID_LOG(trace, "RCVD GetQuery: invalid format - class target ignored.");
- }
+ /*
+ * Handle the _schema_id element, if supplied.
+ */
+ i = inMap.find("_schema_id");
+ if (i != inMap.end() && i->second.getType() == qpid::messaging::VAR_MAP) {
+ const Variant::Map& schemaIdMap(i->second.asMap());
- if (className.empty()) {
- ObjectId objId;
- i = inMap.find("_object_id");
- if (i != inMap.end()) {
-
- try {
- objId = ObjectId(i->second.asMap());
- } catch (exception &e) {
- objId = ObjectId(); // empty object id - won't find a match (I hope).
- QPID_LOG(trace, "RCVD GetQuery (invalid Object Id format) to=" << replyTo << " seq=" << cid);
- }
+ Variant::Map::const_iterator s_iter = schemaIdMap.find("_class_name");
+ if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::messaging::VAR_STRING)
+ className = s_iter->second.asString();
- ManagementObjectMap::iterator iter = managementObjects.find(objId);
- if (iter != managementObjects.end()) {
- ManagementObject* object = iter->second;
+ s_iter = schemaIdMap.find("_package_name");
+ if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::messaging::VAR_STRING)
+ packageName = s_iter->second.asString();
+ }
- if (object->getConfigChanged() || object->getInstChanged())
- object->setUpdateTime();
+ /*
+ * Unpack the _object_id element of the query if it is present. If it is present, find that one
+ * object and return it. If it is not present, send a class-based result.
+ */
+ i = inMap.find("_object_id");
+ if (i != inMap.end() && i->second.getType() == qpid::messaging::VAR_MAP) {
+ ObjectId objId(i->second);
- object->mapEncodeValues(values, true, true); // write both stats and properties
- map_["_values"] = values;
- list_.push_back(map_);
+ ManagementObjectMap::iterator iter = managementObjects.find(objId);
+ if (iter != managementObjects.end()) {
+ ManagementObject* object = iter->second;
- content.encode();
- connThreadBody.sendBuffer(outMsg.getContent(), cid, headers, "qmf.default.direct", replyTo);
- }
+ if (object->getConfigChanged() || object->getInstChanged())
+ object->setUpdateTime();
+
+ object->mapEncodeValues(values, true, true); // write both stats and properties
+ objId.mapEncode(oidMap);
+ map_["_values"] = values;
+ map_["_object_id"] = oidMap;
+ map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
+ object->getClassName(),
+ object->getMd5Sum());
+ list_.push_back(map_);
+ headers.erase("partial");
+
+ content.encode();
+ connThreadBody.sendBuffer(outMsg.getContent(), cid, headers, "qmf.default.direct", replyTo, "amqp/list");
+ QPID_LOG(trace, "SENT QueryResponse (query by object_id) to=" << replyTo);
+ return;
}
} else {
for (ManagementObjectMap::iterator iter = managementObjects.begin();
iter != managementObjects.end();
iter++) {
ManagementObject* object = iter->second;
- if (object->getClassName() == className) {
+ if (object->getClassName() == className &&
+ (packageName.empty() || object->getPackageName() == packageName)) {
// @todo support multiple object reply per message
values.clear();
list_.clear();
+ oidMap.clear();
if (object->getConfigChanged() || object->getInstChanged())
object->setUpdateTime();
object->mapEncodeValues(values, true, true); // write both stats and properties
+ iter->first.mapEncode(oidMap);
map_["_values"] = values;
+ map_["_object_id"] = oidMap;
+ map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
+ object->getClassName(),
+ object->getMd5Sum());
list_.push_back(map_);
content.encode();
- connThreadBody.sendBuffer(outMsg.getContent(), cid, headers, "qmf.default.direct", replyTo);
+ connThreadBody.sendBuffer(outMsg.getContent(), cid, headers, "qmf.default.direct", replyTo, "amqp/list");
+ QPID_LOG(trace, "SENT QueryResponse (query by schema_id) to=" << replyTo);
}
}
}
@@ -645,8 +636,8 @@ void ManagementAgentImpl::handleGetQuery(const string& body, const string& conte
list_.clear();
headers.erase("partial");
content.encode();
- connThreadBody.sendBuffer(outMsg.getContent(), cid, headers, "qmf.default.direct", replyTo);
- QPID_LOG(trace, "SENT ObjectInd");
+ connThreadBody.sendBuffer(outMsg.getContent(), cid, headers, "qmf.default.direct", replyTo, "amqp/list");
+ QPID_LOG(trace, "SENT QueryResponse (empty with no 'partial' indicator) to=" << replyTo);
}
void ManagementAgentImpl::handleLocateRequest(const string&, const string& cid, const string& replyTo)
@@ -718,9 +709,7 @@ void ManagementAgentImpl::received(Message& msg)
if (opcode == "_agent_locate_request") handleLocateRequest(msg.getData(), cid, replyToKey);
else if (opcode == "_method_request") handleMethodRequest(msg.getData(), cid, replyToKey);
- else if (opcode == "_query_request") handleGetQuery(msg.getData(),
- mp.getApplicationHeaders().getAsString("qmf.content"),
- cid, replyToKey);
+ else if (opcode == "_query_request") handleGetQuery(msg.getData(), cid, replyToKey);
else {
QPID_LOG(warning, "Support for QMF V2 Opcode [" << opcode << "] TBD!!!");
}
@@ -737,9 +726,9 @@ void ManagementAgentImpl::received(Message& msg)
if (checkHeader(inBuffer, &opcode, &sequence))
{
- if (opcode == 'a') handleAttachResponse(inBuffer);
- else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence, replyToKey);
+ if (opcode == 'S') handleSchemaRequest(inBuffer, sequence, replyToKey);
else if (opcode == 'x') handleConsoleAddedIndication();
+ else
QPID_LOG(warning, "Ignoring old-format QMF Request! opcode=" << char(opcode));
}
}
@@ -754,15 +743,15 @@ void ManagementAgentImpl::encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq
buf.putLong (seq);
}
-qpid::messaging::Variant::Map ManagementAgentImpl::mapEncodeSchemaId(const string& pname,
- const string& cname,
- const uint8_t *md5Sum)
+Variant::Map ManagementAgentImpl::mapEncodeSchemaId(const string& pname,
+ const string& cname,
+ const uint8_t *md5Sum)
{
- qpid::messaging::Variant::Map map_;
+ Variant::Map map_;
map_["_package_name"] = pname;
map_["_class_name"] = cname;
- map_["_hash_str"] = messaging::Uuid(md5Sum);
+ map_["_hash"] = messaging::Uuid(md5Sum);
return map_;
}
@@ -821,7 +810,7 @@ void ManagementAgentImpl::addClassLocal(uint8_t classKind,
PackageMap::iterator pIter,
const string& className,
uint8_t* md5Sum,
- qpid::management::ManagementObject::writeSchemaCall_t schemaCall)
+ ManagementObject::writeSchemaCall_t schemaCall)
{
SchemaClassKey key;
ClassMap& cMap = pIter->second;
@@ -902,9 +891,9 @@ void ManagementAgentImpl::periodicProcessing()
!baseObject->isDeleted()))
continue;
- ::qpid::messaging::Message m;
- ::qpid::messaging::ListContent content(m);
- ::qpid::messaging::Variant::List &list_ = content.asList();
+ qpid::messaging::Message m;
+ qpid::messaging::ListContent content(m);
+ Variant::List &list_ = content.asList();
for (ManagementObjectMap::iterator iter = baseIter;
iter != managementObjects.end();
@@ -920,9 +909,9 @@ void ManagementAgentImpl::periodicProcessing()
send_stats = (object->hasInst() && (object->getInstChanged() || object->getForcePublish()));
if (send_stats || send_props) {
- ::qpid::messaging::Variant::Map map_;
- ::qpid::messaging::Variant::Map values;
- ::qpid::messaging::Variant::Map oid;
+ Variant::Map map_;
+ Variant::Map values;
+ Variant::Map oid;
object->getObjectId().mapEncode(oid);
map_["_object_id"] = oid;
@@ -943,7 +932,7 @@ void ManagementAgentImpl::periodicProcessing()
content.encode();
const string &str = m.getContent();
if (str.length()) {
- ::qpid::messaging::Variant::Map headers;
+ Variant::Map headers;
headers["method"] = "indication";
headers["qmf.opcode"] = "_data_indication";
headers["qmf.content"] = "_data";
@@ -1068,13 +1057,13 @@ void ManagementAgentImpl::ConnectionThread::sendBuffer(Buffer& buf,
void ManagementAgentImpl::ConnectionThread::sendBuffer(const string& data,
const string& cid,
- const qpid::messaging::VariantMap headers,
+ const Variant::Map headers,
const string& exchange,
const string& routingKey,
const string& contentType)
{
Message msg;
- qpid::messaging::VariantMap::const_iterator i;
+ Variant::Map::const_iterator i;
if (!cid.empty())
msg.getMessageProperties().setCorrelationId(cid);
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
index 69a891a1b2..43b9f36c31 100644
--- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
+++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
@@ -259,16 +259,14 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
const uint8_t *md5Sum);
bool checkHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
void sendHeartbeat();
- void sendCommandComplete (std::string replyToKey, uint32_t sequence,
- uint32_t code = 0, std::string text = std::string("OK"));
- void handleAttachResponse (qpid::framing::Buffer& inBuffer);
+ void sendException(const std::string& replyToKey, const std::string& cid,
+ const std::string& text, uint32_t code=1);
void handlePackageRequest (qpid::framing::Buffer& inBuffer);
void handleClassQuery (qpid::framing::Buffer& inBuffer);
void handleSchemaRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence, const std::string& replyTo);
void invokeMethodRequest (const std::string& body, const std::string& cid, const std::string& replyTo);
- void handleGetQuery (const std::string& body, const std::string& content_type,
- const std::string& cid, const std::string& replyTo);
+ void handleGetQuery (const std::string& body, const std::string& cid, const std::string& replyTo);
void handleLocateRequest (const std::string& body, const std::string& sequence, const std::string& replyTo);
void handleMethodRequest (const std::string& body, const std::string& sequence, const std::string& replyTo);
void handleConsoleAddedIndication();
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
index b1802293b7..7e8dd7e764 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
@@ -62,8 +62,7 @@ static qpid::messaging::Variant::Map mapEncodeSchemaId(const std::string& pname,
map_["_package_name"] = pname;
map_["_class_name"] = cname;
map_["_type"] = type;
- map_["_hash_str"] = std::string((const char *)md5Sum,
- qpid::management::ManagementObject::MD5_LEN);
+ map_["_hash"] = qpid::messaging::Uuid(md5Sum);
return map_;
}
@@ -1896,9 +1895,8 @@ void ManagementAgent::disallow(const std::string& className, const std::string&
}
void ManagementAgent::SchemaClassKey::mapEncode(qpid::messaging::Variant::Map& _map) const {
- const std::string hash_str((const char *)hash, sizeof(hash));
_map["_cname"] = name;
- _map["_hash"] = hash_str;
+ _map["_hash"] = qpid::messaging::Uuid(hash);
}
void ManagementAgent::SchemaClassKey::mapDecode(const qpid::messaging::Variant::Map& _map) {
@@ -1909,8 +1907,8 @@ void ManagementAgent::SchemaClassKey::mapDecode(const qpid::messaging::Variant::
}
if ((i = _map.find("_hash")) != _map.end()) {
- const std::string s = i->second.asString();
- memcpy(hash, s.data(), sizeof(hash));
+ const qpid::messaging::Uuid& uuid = i->second.asUuid();
+ memcpy(hash, uuid.data(), uuid.size());
}
}
diff --git a/qpid/cpp/src/qpid/management/ManagementObject.cpp b/qpid/cpp/src/qpid/management/ManagementObject.cpp
index 9f9216824c..68fcdaca71 100644
--- a/qpid/cpp/src/qpid/management/ManagementObject.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementObject.cpp
@@ -184,9 +184,6 @@ void ObjectId::mapDecode(const messaging::VariantMap& map)
else
throw Exception("Required _object_name field missing.");
- if ((i = map.find("_first")) != map.end())
- first = i->second.asUint64();
-
if ((i = map.find("_agent_name")) != map.end())
agentName = i->second.asString();
@@ -270,7 +267,7 @@ void ManagementObject::writeTimestamps (messaging::VariantMap& map) const
sid["_package_name"] = getPackageName();
sid["_class_name"] = getClassName();
- sid["_hash_str"] = std::string((const char *)getMd5Sum(), MD5_LEN);
+ sid["_hash"] = qpid::messaging::Uuid(getMd5Sum());
map["_schema_id"] = sid;
objectId.mapEncode(oid);
diff --git a/qpid/extras/qmf/src/py/qmf/console.py b/qpid/extras/qmf/src/py/qmf/console.py
index 0b0ec417d0..4bbe69655e 100644
--- a/qpid/extras/qmf/src/py/qmf/console.py
+++ b/qpid/extras/qmf/src/py/qmf/console.py
@@ -41,6 +41,9 @@ from cStringIO import StringIO
#import qpid.log
#qpid.log.enable(name="qpid.io.cmd", level=qpid.log.DEBUG)
+#===================================================================================================
+# CONSOLE
+#===================================================================================================
class Console:
""" To access the asynchronous operations, a class must be derived from
Console with overrides of any combination of the available methods. """
@@ -94,6 +97,10 @@ class Console:
""" Invoked when a method response from an asynchronous method call is received. """
pass
+
+#===================================================================================================
+# BrokerURL
+#===================================================================================================
class BrokerURL(URL):
def __init__(self, text):
URL.__init__(self, text)
@@ -115,13 +122,22 @@ class BrokerURL(URL):
def match(self, host, port):
return socket.getaddrinfo(self.host, self.port)[0][4] == socket.getaddrinfo(host, port)[0][4]
+
+#===================================================================================================
+# Object
+#===================================================================================================
class Object(object):
- """ This class defines a 'proxy' object representing a real managed object on an agent.
- Actions taken on this proxy are remotely affected on the real managed object.
"""
- def __init__(self, session, broker, schema, codec=None, prop=None, stat=None, managed=True, v2Map=None, agentName=None, kwargs={}):
- self._session = session
- self._broker = broker
+ This class defines a 'proxy' object representing a real managed object on an agent.
+ Actions taken on this proxy are remotely affected on the real managed object.
+ """
+ def __init__(self, agent, schema, codec=None, prop=None, stat=None, v2Map=None, agentName=None, kwargs={}):
+ self._agent = agent
+ self._session = None
+ self._broker = None
+ if agent:
+ self._session = agent.session
+ self._broker = agent.broker
self._schema = schema
self._properties = []
self._statistics = []
@@ -129,8 +145,7 @@ class Object(object):
self.v2Init(v2Map, agentName)
return
- self._managed = managed
- if self._managed:
+ if self._agent:
self._currentTime = codec.read_uint64()
self._createTime = codec.read_uint64()
self._deleteTime = codec.read_uint64()
@@ -176,10 +191,9 @@ class Object(object):
if '_subtypes' in omap:
self._subtypes = omap['_subtypes']
if '_object_id' in omap:
- self._managed = True
self._objectId = ObjectId(omap['_object_id'], agentName=agentName)
else:
- self._managed = None
+ self._objectId = None
def getBroker(self):
""" Return the broker from which this object was sent """
@@ -211,7 +225,7 @@ class Object(object):
def isManaged(self):
""" Return True iff this object is a proxy for a managed object on an agent. """
- return self._managed
+ return self._objectId and self._agent
def getIndex(self):
""" Return a string describing this object's primary key. """
@@ -250,7 +264,7 @@ class Object(object):
""" Contact the agent and retrieve the lastest property and statistic values for this object. """
if not self.isManaged():
raise Exception("Object is not managed")
- obj = self._session.getObjects(_objectId = self._objectId, _broker=self._broker)
+ obj = self._agent.getObjects(_objectId=self._objectId)
if obj:
self.mergeUpdate(obj[0])
else:
@@ -423,6 +437,10 @@ class Object(object):
bit = 0
return excludeList
+
+#===================================================================================================
+# Session
+#===================================================================================================
class Session:
"""
An instance of the Session class represents a console session running
@@ -495,132 +513,26 @@ class Session:
if self.userBindings and not self.rcvObjects:
raise Exception("userBindings can't be set unless rcvObjects is set and a console is provided")
- """
- ##
- ## v2_data_queues is used to store object data received from QMFv2 agents.
- ## It is stored here in case we need to go and query schema data from the
- ## agent before reporting to the user.
- ##
- ## v2_data_queues is a map, keyed by agent address of queues of entries
- ## The format of entries in the queue is a data map
- ## This list must be protected by self.cv
- ##
- """
- self.v2_data_queues = {}
- self.v2_pending_queues = {}
-
def _getBrokerForAgentAddr(self, agent_addr):
- broker = None
try:
self.cv.acquire()
key = (1, agent_addr)
for b in self.brokers:
if key in b.agents:
- broker = b
- finally:
- self.cv.release()
- return broker
-
- def _processV2Data(self):
- """
- Attempt to make progress on the entries in the v2_data_queue. If an entry has a schema
- that is in our schema cache, process it. Otherwise, send a request for the schema information
- to the agent that manages the object.
- """
- try:
- self.cv.acquire()
- pop_list = []
- for agent_addr in self.v2_data_queues:
- entries = self.v2_data_queues[agent_addr]
- keep_going = True
- while keep_going and len(entries) > 0:
- schemaId = self._getSchemaIdforV2ObjectLH(entries[0])
- schema = self.schemaCache.getSchema(schemaId)
- if schema:
- broker = self._getBrokerForAgentAddr(agent_addr)
- obj = Object(self, broker, schema, v2Map=entries[0], agentName=agent_addr)
- entries.pop(0)
-
- """
- TODO: This following code assumes that the data indication came unsolicited.
- This needs to be enhanced to handle the case of a query response.
- """
- if self.console:
- self.console.objectProps(broker, obj)
-
- else:
- """
- We have no schema for this data object, move the queue to the pending map and request
- schema data from the agent
- """
- self.v2_pending_queues[agent_addr] = self.v2_data_queues[agent_addr]
- pop_list.append(agent_addr)
- self._v2SendSchemaRequest(agent_addr, schemaId)
- keep_going = None
- for agent_addr in pop_list:
- self.v2_data_queues.pop(agent_addr)
- finally:
- self.cv.release()
-
- def _addV2Data(self, agent_addr, data_map):
- """
- Add data-for-processing to the work queue
- """
- process = None
- try:
- self.cv.acquire()
- if agent_addr in self.v2_pending_queues:
- self.v2_pending_queues[agent_addr].append(data_map)
- else:
- if agent_addr not in self.v2_data_queues:
- self.v2_data_queues[agent_addr] = []
- self.v2_data_queues[agent_addr].append(data_map)
- process = True
- finally:
- self.cv.release()
-
- if process:
- self._processV2Data()
-
- def _removeV2Agent(self, agent):
- """
- Remove entries in the data queues related to a lost agent.
- """
- agent_name = agent.getAgentBank()
- try:
- self.cv.acquire()
- if agent_name in self.v2_data_queues:
- self.v2_data_queues.pop(agent_name)
- if agent_name in self.v2_pending_queues:
- self.v2_pending_queues.pop(agent_name)
+ return b
finally:
self.cv.release()
+ return None
- def _schemaInfoFromV2Agent(self, agent_addr):
- """
- We have just received new schema information from an agent. Check to see if there's
- more work that can now be done.
- """
- re_process = None
+ def _getAgentForAgentAddr(self, agent_addr):
try:
self.cv.acquire()
- if agent_addr in self.v2_pending_queues:
- self.v2_data_queues[agent_addr] = self.v2_pending_queues.pop(agent_addr)
- re_process = True
+ key = agent_addr
+ for b in self.brokers:
+ if key in b.agents:
+ return b.agents[key]
finally:
self.cv.release()
-
- if re_process:
- self._processV2Data()
-
- def _getSchemaIdforV2ObjectLH(self, data):
- """
- Given a data map, extract the schema-identifier.
- """
- if data.__class__ != dict:
- return None
- if '_schema_id' in data:
- return ClassKey(data['_schema_id'])
return None
def __repr__(self):
@@ -642,7 +554,6 @@ class Session:
returned from the addBroker call """
if self.console:
for agent in broker.getAgents():
- self.console.removev2Agent(agent)
self.console.delAgent(agent)
broker._shutdown()
self.brokers.remove(broker)
@@ -711,12 +622,12 @@ class Session:
agentList.append(a)
return agentList
- def makeObject(self, classKey, broker=None, **kwargs):
+ def makeObject(self, classKey, **kwargs):
""" Create a new, unmanaged object of the schema indicated by classKey """
schema = self.getSchema(classKey)
if schema == None:
raise Exception("Schema not found for classKey")
- return Object(self, broker, schema, None, True, True, False, kwargs)
+ return Object(None, schema, None, True, True, kwargs)
def getObjects(self, **kwargs):
""" Get a list of objects from QMF agents.
@@ -886,7 +797,6 @@ class Session:
def _handleBrokerDisconnect(self, broker):
if self.console:
for agent in broker.getAgents():
- self.session._removeV2Agent(agent)
self.console.delAgent(agent)
self.console.brokerDisconnected(broker)
@@ -955,31 +865,6 @@ class Session:
smsg = broker._message(sendCodec.encoded)
broker._send(smsg)
- def _handleMethodResp(self, broker, codec, seq):
- code = codec.read_uint32()
- text = codec.read_str16()
- outArgs = {}
- pair = self.seqMgr._release(seq)
- if pair == None:
- return
- method, synchronous = pair
- if code == 0:
- for arg in method.arguments:
- if arg.dir.find("O") != -1:
- outArgs[arg.name] = self._decodeValue(codec, arg.type, broker)
- result = MethodResult(code, text, outArgs)
- if synchronous:
- try:
- broker.cv.acquire()
- broker.syncResult = result
- broker.syncInFlight = False
- broker.cv.notify()
- finally:
- broker.cv.release()
- else:
- if self.console:
- self.console.methodResponse(broker, seq, result)
-
def _handleHeartbeatInd(self, broker, codec, seq, msg):
brokerBank = 1
agentBank = 0
@@ -1003,58 +888,32 @@ class Session:
self.console.heartbeat(agent, timestamp)
broker._ageAgents()
- def _handleEventInd(self, broker, codec, seq):
- if self.console != None:
- event = Event(self, broker, codec)
- self.console.event(broker, event)
-
def _handleSchemaResp(self, broker, codec, seq, agent_addr):
kind = codec.read_uint8()
classKey = ClassKey(codec)
_class = SchemaClass(kind, classKey, codec, self)
self.schemaCache.declareClass(classKey, _class)
- self.seqMgr._release(seq)
- broker._decOutstanding()
+ ctx = self.seqMgr._release(seq)
+ if ctx:
+ broker._decOutstanding()
if self.console != None:
self.console.newClass(kind, classKey)
- if agent_addr:
- self._schemaInfoFromV2Agent(agent_addr)
-
- def _handleContentInd(self, broker, codec, seq, prop=False, stat=False):
- classKey = ClassKey(codec)
- schema = self.schemaCache.getSchema(classKey)
- if not schema:
- return
-
- object = Object(self, broker, schema, codec, prop, stat)
- if classKey.getPackageName() == "org.apache.qpid.broker" and classKey.getClassName() == "agent" and prop:
- broker._updateAgent(object)
-
- try:
- self.cv.acquire()
- if seq in self.syncSequenceList:
- if object.getTimestamps()[2] == 0 and self._selectMatch(object):
- self.getResult.append(object)
- return
- finally:
- self.cv.release()
-
- if self.console and self.rcvObjects:
- if prop:
- self.console.objectProps(broker, object)
- if stat:
- self.console.objectStats(broker, object)
+ if agent_addr and (agent_addr.__class__ == str or agent_addr.__class__ == unicode):
+ agent = self._getAgentForAgentAddr(agent_addr)
+ if agent:
+ agent._schemaInfoFromV2Agent()
def _v2HandleHeartbeatInd(self, broker, mp, ah, content):
- brokerBank = 1
- agentName = ah["qmf.agent"]
- values = content["_values"]
- timestamp = values["timestamp"]
- interval = values["heartbeat_interval"]
- if agentName == None:
+ try:
+ agentName = ah["qmf.agent"]
+ values = content["_values"]
+ timestamp = values["timestamp"]
+ interval = values["heartbeat_interval"]
+ except:
return
- agent = broker.getAgent(brokerBank, agentName)
+
+ agent = broker.getAgent(1, agentName)
if agent == None:
agent = Agent(broker, agentName, "QMFv2 Agent", True, interval)
broker._addAgent(agentName, agent)
@@ -1067,44 +926,6 @@ class Session:
def _v2HandleAgentLocateRsp(self, broker, mp, ah, content):
self._v2HandleHeartbeatInd(broker, mp, ah, content)
- def _v2HandleDataInd(self, broker, mp, ah, content):
- kind = "_data"
- if "qmf.content" in ah:
- kind = ah["qmf.content"]
- agent_addr = ah["qmf.agent"]
- if content.__class__ != list:
- return
- if kind == "_data":
- for omap in content:
- self._addV2Data(agent_addr, omap)
-
- def _v2HandleQueryRsp(self, broker, mp, ah, content):
- pass
-
- def _v2HandleMethodRsp(self, broker, mp, ah, content):
- pass
-
- def _v2HandleException(self, broker, mp, ah, content):
- pass
-
- def _v2SendSchemaRequest(self, agent_addr, schemaId):
- """
- Send a query to an agent to request details on a particular schema class.
- IMPORTANT: This function currently sends a QMFv1 schema-request to the address of
- the agent. The agent will send its response to amq.direct/<our-key>.
- Eventually, this will be converted to a proper QMFv2 schema query.
- """
- broker = self._getBrokerForAgentAddr(agent_addr)
- if not broker:
- return
-
- sendCodec = Codec()
- seq = self.seqMgr._reserve(None)
- broker._setHeader(sendCodec, 'S', seq)
- schemaId.encode(sendCodec)
- smsg = broker._message(sendCodec.encoded, agent_addr)
- broker._send(smsg, "qmf.default.direct")
-
def _handleError(self, error):
try:
self.cv.acquire()
@@ -1343,6 +1164,10 @@ class Session:
return seq
return None
+
+#===================================================================================================
+# SchemaCache
+#===================================================================================================
class SchemaCache(object):
"""
The SchemaCache is a data structure that stores learned schema information.
@@ -1419,6 +1244,10 @@ class SchemaCache(object):
self.lock.release()
return True
+
+#===================================================================================================
+# ClassKey
+#===================================================================================================
class ClassKey:
""" A ClassKey uniquely identifies a class from the schema. """
def __init__(self, constructor):
@@ -1443,7 +1272,7 @@ class ClassKey:
try:
self.pname = constructor['_package_name']
self.cname = constructor['_class_name']
- self.hash = constructor['_hash_str']
+ self.hash = constructor['_hash']
except:
raise Exception("Invalid ClassKey map format")
else:
@@ -1458,6 +1287,9 @@ class ClassKey:
codec.write_str8(self.cname)
codec.write_bin128(self.hash.bytes)
+ def asMap(self):
+ return {'_package_name': self.pname, '_class_name': self.cname, '_hash': self.hash}
+
def getPackageName(self):
return self.pname
@@ -1476,6 +1308,10 @@ class ClassKey:
def __repr__(self):
return self.pname + ":" + self.cname + "(" + self.getHashString() + ")"
+
+#===================================================================================================
+# SchemaClass
+#===================================================================================================
class SchemaClass:
""" """
CLASS_KIND_TABLE = 1
@@ -1558,6 +1394,10 @@ class SchemaClass:
else:
return self.arguments + self.session.getSchema(self.superTypeKey).getArguments()
+
+#===================================================================================================
+# SchemaProperty
+#===================================================================================================
class SchemaProperty:
""" """
def __init__(self, codec):
@@ -1587,6 +1427,10 @@ class SchemaProperty:
def __repr__(self):
return self.name
+
+#===================================================================================================
+# SchemaStatistic
+#===================================================================================================
class SchemaStatistic:
""" """
def __init__(self, codec):
@@ -1603,6 +1447,10 @@ class SchemaStatistic:
def __repr__(self):
return self.name
+
+#===================================================================================================
+# SchemaMethod
+#===================================================================================================
class SchemaMethod:
""" """
def __init__(self, codec):
@@ -1631,6 +1479,10 @@ class SchemaMethod:
result += ")"
return result
+
+#===================================================================================================
+# SchemaArgument
+#===================================================================================================
class SchemaArgument:
""" """
def __init__(self, codec, methodArg):
@@ -1658,6 +1510,10 @@ class SchemaArgument:
elif key == "refPackage" : self.refPackage = value
elif key == "refClass" : self.refClass = value
+
+#===================================================================================================
+# ObjectId
+#===================================================================================================
class ObjectId:
""" Object that represents QMF object identifiers """
def __init__(self, constructor, first=0, second=0, agentName=None):
@@ -1742,12 +1598,22 @@ class ObjectId:
codec.write_uint64(first)
codec.write_uint64(second)
+ def asMap(self):
+ omap = {'_agent_name': self.agentName, '_object_name': self.objectName}
+ if self.agentEpoch != 0:
+ omap['_agent_epoch'] = self.agentEpoch
+ return omap
+
def __hash__(self):
return self.__repr__().__hash__()
def __eq__(self, other):
return self.__repr__().__eq__(other)
+
+#===================================================================================================
+# MethodResult
+#===================================================================================================
class MethodResult(object):
""" """
def __init__(self, status, text, outArgs):
@@ -1763,6 +1629,10 @@ class MethodResult(object):
def __repr__(self):
return "%s (%d) - %s" % (self.text, self.status, self.outArgs)
+
+#===================================================================================================
+# ManagedConnection
+#===================================================================================================
class ManagedConnection(Thread):
""" Thread class for managing a connection. """
DELAY_MIN = 1
@@ -1825,6 +1695,10 @@ class ManagedConnection(Thread):
finally:
self.cv.release()
+
+#===================================================================================================
+# Broker
+#===================================================================================================
class Broker:
""" This object represents a connection (or potential connection) to a QMF broker. """
SYNC_TIME = 60
@@ -1872,7 +1746,7 @@ class Broker:
def getAgent(self, brokerBank, agentBank):
""" Return the agent object associated with a particular broker and agent bank value."""
- bankKey = (brokerBank, agentBank)
+ bankKey = agentBank
try:
self.cv.acquire()
if bankKey in self.agents:
@@ -1923,7 +1797,7 @@ class Broker:
try:
self.cv.acquire()
self.agents = {}
- self.agents[(1,0)] = Agent(self, 0, "BrokerAgent")
+ self.agents[0] = Agent(self, 0, "BrokerAgent")
finally:
self.cv.release()
@@ -1960,7 +1834,7 @@ class Broker:
self.amqpSession.message_subscribe(queue=self.replyName, destination="rdest",
accept_mode=self.amqpSession.accept_mode.none,
acquire_mode=self.amqpSession.acquire_mode.pre_acquired)
- self.amqpSession.incoming("rdest").listen(self._replyCb, self._exceptionCb)
+ self.amqpSession.incoming("rdest").listen(self._v1Cb, self._exceptionCb)
self.amqpSession.message_set_flow_mode(destination="rdest", flow_mode=1)
self.amqpSession.message_flow(destination="rdest", unit=0, value=0xFFFFFFFFL)
self.amqpSession.message_flow(destination="rdest", unit=1, value=0xFFFFFFFFL)
@@ -1970,7 +1844,7 @@ class Broker:
self.amqpSession.message_subscribe(queue=self.topicName, destination="tdest",
accept_mode=self.amqpSession.accept_mode.none,
acquire_mode=self.amqpSession.acquire_mode.pre_acquired)
- self.amqpSession.incoming("tdest").listen(self._replyCb)
+ self.amqpSession.incoming("tdest").listen(self._v1Cb)
self.amqpSession.message_set_flow_mode(destination="tdest", flow_mode=1)
self.amqpSession.message_flow(destination="tdest", unit=0, value=0xFFFFFFFFL)
self.amqpSession.message_flow(destination="tdest", unit=1, value=0xFFFFFFFFL)
@@ -2013,7 +1887,7 @@ class Broker:
raise
def _updateAgent(self, obj):
- bankKey = (obj.brokerBank, obj.agentBank)
+ bankKey = obj.agentBank
agent = None
if obj._deleteTime == 0:
try:
@@ -2037,7 +1911,7 @@ class Broker:
def _addAgent(self, name, agent):
try:
self.cv.acquire()
- self.agents[(1, name)] = agent
+ self.agents[name] = agent
finally:
self.cv.release()
if self.session.console:
@@ -2057,7 +1931,6 @@ class Broker:
self.cv.release()
if self.session.console:
for agent in to_notify:
- self.session._removeV2Agent(agent)
self.session.console.delAgent(agent)
def _v2SendAgentLocate(self, predicate={}):
@@ -2167,51 +2040,88 @@ class Broker:
finally:
self.cv.release()
- def _replyCb(self, msg):
+ def _v1Cb(self, msg):
+ """
+ This is the general message handler for messages received via the QMFv1 exchanges.
+ """
+ agent = None
agent_addr = None
mp = msg.get("message_properties")
ah = mp.application_headers
if ah and 'qmf.agent' in ah:
agent_addr = ah['qmf.agent']
+
+ if not agent_addr:
+ #
+ # See if we can determine the agent identity from the routing key
+ #
+ dp = msg.get("delivery_properties")
+ rkey = None
+ if dp.routing_key:
+ rkey = dp.routing_key
+ items = rkey.split('.')
+ if len(items) >= 4:
+ if items[0] == 'console' and items[3].isdigit():
+ agent_addr = int(items[3]) # The QMFv1 Agent Bank
+ if agent_addr and agent_addr in self.agents:
+ agent = self.agents[agent_addr]
+
codec = Codec(msg.body)
while True:
opcode, seq = self._checkHeader(codec)
if opcode == None: return
if opcode == 'b': self.session._handleBrokerResp (self, codec, seq)
elif opcode == 'p': self.session._handlePackageInd (self, codec, seq)
- elif opcode == 'z': self.session._handleCommandComplete (self, codec, seq)
elif opcode == 'q': self.session._handleClassInd (self, codec, seq)
- elif opcode == 'm': self.session._handleMethodResp (self, codec, seq)
- elif opcode == 'h': self.session._handleHeartbeatInd (self, codec, seq, msg)
- elif opcode == 'e': self.session._handleEventInd (self, codec, seq)
elif opcode == 's': self.session._handleSchemaResp (self, codec, seq, agent_addr)
- elif opcode == 'c': self.session._handleContentInd (self, codec, seq, prop=True)
- elif opcode == 'i': self.session._handleContentInd (self, codec, seq, stat=True)
- elif opcode == 'g': self.session._handleContentInd (self, codec, seq, prop=True, stat=True)
- self.session.receiver._completed.add(msg.id)
- self.session.channel.session_completed(self.session.receiver._completed)
+ elif opcode == 'h': self.session._handleHeartbeatInd (self, codec, seq, msg)
+ elif opcode == 'z': self.session._handleCommandComplete (self, codec, seq)
+ elif agent:
+ agent._handleQmfV1Message(opcode, mp, ah, codec)
+
+ self.amqpSession.receiver._completed.add(msg.id)
+ self.amqpSession.channel.session_completed(self.amqpSession.receiver._completed)
def _v2Cb(self, msg):
- dp = msg.get("delivery_properties")
+ """
+ This is the general message handler for messages received via QMFv2 exchanges.
+ """
mp = msg.get("message_properties")
ah = mp["application_headers"]
- opcode = ah["qmf.opcode"]
codec = Codec(msg.body)
- if mp.content_type == "amqp/list":
- content = codec.read_list()
- elif mp.content_type == "amqp/map":
- content = codec.read_map()
- else:
- return
-
- if opcode == None: return
- elif opcode == '_agent_heartbeat_indication': self.session._v2HandleHeartbeatInd(self, mp, ah, content)
- elif opcode == '_agent_locate_response': self.session._v2HandleAgentLocateRsp(self, mp, ah, content)
- elif opcode == '_data_indication': self.session._v2HandleDataInd(self, mp, ah, content)
- elif opcode == '_query_response': self.session._v2HandleQueryRsp(self, mp, ah, content)
- elif opcode == '_method_response': self.session._v2HandleMethodRsp(self, mp, ah, content)
- elif opcode == '_exception': self.session._v2HandleException(self, mp, ah, content)
+ if 'qmf.opcode' in ah:
+ opcode = ah['qmf.opcode']
+ if mp.content_type == "amqp/list":
+ content = codec.read_list()
+ if not content:
+ content = []
+ elif mp.content_type == "amqp/map":
+ content = codec.read_map()
+ if not content:
+ content = {}
+ else:
+ content = None
+
+ if content != None:
+ ##
+ ## Directly handle agent heartbeats and agent locate responses as these are broker-scope (they are
+ ## used to maintain the broker's list of agent proxies.
+ ##
+ if opcode == '_agent_heartbeat_indication': self.session._v2HandleHeartbeatInd(self, mp, ah, content)
+ elif opcode == '_agent_locate_response': self.session._v2HandleAgentLocateRsp(self, mp, ah, content)
+ else:
+ ##
+ ## All other opcodes are agent-scope and are forwarded to the agent proxy representing the sender
+ ## of the message.
+ ##
+ agent_addr = ah['qmf.agent']
+ if agent_addr in self.agents:
+ agent = self.agents[agent_addr]
+ agent._handleQmfV2Message(opcode, mp, ah, content)
+
+ self.amqpSession.receiver._completed.add(msg.id)
+ self.amqpSession.channel.session_completed(self.amqpSession.receiver._completed)
def _exceptionCb(self, data):
self.connected = False
@@ -2227,20 +2137,46 @@ class Broker:
if self.thread:
self.thread.disconnected()
+
+#===================================================================================================
+# Agent
+#===================================================================================================
class Agent:
- """ """
+ """
+ This class represents a proxy for a remote agent being managed
+ """
def __init__(self, broker, agentBank, label, isV2=False, interval=0):
self.broker = broker
+ self.session = broker.session
+ self.schemaCache = self.session.schemaCache
self.brokerBank = broker.getBrokerBank()
self.agentBank = agentBank
self.label = label
self.isV2 = isV2
self.heartbeatInterval = interval
+ self.lock = Lock()
+ self.seqMgr = self.session.seqMgr
+ self.contextMap = {}
+ self.unsolicitedContext = RequestContext(self, self)
self.lastSeenTime = time()
+
+ def __call__(self, **kwargs):
+ """
+ This is the handler for unsolicited stuff received from the agent
+ """
+ if 'qmf_object' in kwargs:
+ if self.session.console:
+ self.session.console.objectProps(self.broker, kwargs['qmf_object'])
+ if 'qmf_object_stats' in kwargs:
+ if self.session.console:
+ self.session.console.objectStats(self.broker, kwargs['qmf_object_stats'])
+
+
def touch(self):
self.lastSeenTime = time()
+
def isOld(self):
if self.heartbeatInterval == 0:
return None
@@ -2248,6 +2184,7 @@ class Agent:
return True
return None
+
def __repr__(self):
if self.isV2:
ver = "v2"
@@ -2255,15 +2192,392 @@ class Agent:
ver = "v1"
return "Agent(%s) at bank %d.%s (%s)" % (ver, self.brokerBank, self.agentBank, self.label)
+
def getBroker(self):
return self.broker
+
def getBrokerBank(self):
return self.brokerBank
+
def getAgentBank(self):
return self.agentBank
+
+ def getObjects(self, notifiable=None, **kwargs):
+ """ Get a list of objects from QMF agents.
+ All arguments are passed by name(keyword).
+
+ If 'notifiable' is None (default), this call will block until completion or timeout.
+ If supplied, notifiable is assumed to be a callable object that will be called when the
+ list of queried objects arrives. The single argument to the call shall be a list of
+ the returned objects.
+
+ The class for queried objects may be specified in one of the following ways:
+
+ _schema = <schema> - supply a schema object returned from getSchema.
+ _key = <key> - supply a classKey from the list returned by getClasses.
+ _class = <name> - supply a class name as a string. If the class name exists
+ in multiple packages, a _package argument may also be supplied.
+ _objectId = <id> - get the object referenced by the object-id
+
+ The default timeout for this synchronous operation is 60 seconds. To change the timeout,
+ use the following argument:
+
+ _timeout = <time in seconds>
+
+ If additional arguments are supplied, they are used as property selectors. For example,
+ if the argument name="test" is supplied, only objects whose "name" property is "test"
+ will be returned in the result.
+ """
+ if notifiable:
+ if not callable(notifiable):
+ raise Exception("notifiable object must be callable")
+
+ #
+ # Allocate a context to track this asynchronous request.
+ #
+ context = RequestContext(self, notifiable)
+ sequence = self.seqMgr._reserve(context)
+ try:
+ self.lock.acquire()
+ self.contextMap[sequence] = context
+ finally:
+ self.lock.release()
+
+ #
+ # Compose and send the query message to the agent using the appropriate protocol for the
+ # agent's QMF version.
+ #
+ if self.isV2:
+ self._v2SendGetQuery(sequence, kwargs)
+ else:
+ self._v1SendGetQuery(sequence, kwargs)
+
+ #
+ # If this is a synchronous call, block and wait for completion.
+ #
+ if not notifiable:
+ timeout = 60
+ if '_timeout' in kwargs:
+ timeout = kwargs['_timeout']
+ context.waitForSignal(timeout)
+ if context.exception:
+ raise Exception(context.exception)
+ result = context.queryResults
+ self.contextMap.pop(sequence)
+ return result
+
+
+ def _schemaInfoFromV2Agent(self):
+ """
+ We have just received new schema information from this agent. Check to see if there's
+ more work that can now be done.
+ """
+ try:
+ self.lock.acquire()
+ copy_of_map = self.contextMap
+ finally:
+ self.lock.release()
+
+ self.unsolicitedContext.reprocess()
+ for context in copy_of_map:
+ copy_of_map[context].reprocess()
+
+
+ def _v1HandleMethodResp(self, codec, seq):
+ """
+ Handle a QMFv1 method response
+ """
+ code = codec.read_uint32()
+ text = codec.read_str16()
+ outArgs = {}
+ pair = self.seqMgr._release(seq)
+ if pair == None:
+ return
+ method, synchronous = pair
+ if code == 0:
+ for arg in method.arguments:
+ if arg.dir.find("O") != -1:
+ outArgs[arg.name] = self._decodeValue(codec, arg.type, broker)
+ result = MethodResult(code, text, outArgs)
+ if synchronous:
+ try:
+ broker.cv.acquire()
+ broker.syncResult = result
+ broker.syncInFlight = False
+ broker.cv.notify()
+ finally:
+ broker.cv.release()
+ else:
+ if self.console:
+ self.console.methodResponse(broker, seq, result)
+
+
+ def _v1HandleEventInd(self, broker, codec, seq):
+ """
+ Handle a QMFv1 event indication
+ """
+ if self.console != None:
+ event = Event(self, broker, codec)
+ self.console.event(broker, event)
+
+
+ def _v1HandleContentInd(self, broker, codec, seq, prop=False, stat=False):
+ """
+ Handle a QMFv1 content indication
+ """
+ classKey = ClassKey(codec)
+ schema = self.schemaCache.getSchema(classKey)
+ if not schema:
+ return
+
+ obj = Object(self, broker, schema, codec, prop, stat)
+ if classKey.getPackageName() == "org.apache.qpid.broker" and classKey.getClassName() == "agent" and prop:
+ broker._updateAgent(obj)
+
+ try:
+ self.lock.acquire()
+ if seq in self.syncSequenceList:
+ if object.getTimestamps()[2] == 0 and self._selectMatch(object):
+ self.getResult.append(object)
+ return
+ finally:
+ self.lock.release()
+
+ if self.console and self.rcvObjects:
+ if prop:
+ self.console.objectProps(broker, object)
+ if stat:
+ self.console.objectStats(broker, object)
+
+
+ def _v2HandleDataInd(self, mp, ah, content):
+ """
+ Handle a QMFv2 data indication from the agent
+ """
+ if mp.correlation_id:
+ sequence = int(mp.correlation_id)
+ if sequence not in self.contextMap:
+ return
+ context = self.contextMap[sequence]
+ else:
+ context = self.unsolicitedContext
+
+ kind = "_data"
+ if "qmf.content" in ah:
+ kind = ah["qmf.content"]
+ if kind == "_data":
+ if content.__class__ != list:
+ return
+ for omap in content:
+ context.addV2QueryResult(omap)
+ context.processV2Data()
+
+ if 'partial' not in ah:
+ context.signal()
+
+
+ def _v2HandleMethodRsp(self, mp, ah, content):
+ pass
+
+
+ def _v2HandleException(self, mp, ah, content):
+ pass
+
+
+ def _v1SendGetQuery(self, kwargs):
+ pass
+
+
+ def _v2SendGetQuery(self, sequence, kwargs):
+ """
+ Send a get query to a QMFv2 agent.
+ """
+ #
+ # Build the query map
+ #
+ query = {'_what': 'OBJECT'}
+ if '_class' in kwargs:
+ schemaMap = {'_class_name': kwargs['_class']}
+ if '_package' in kwargs:
+ schemaMap['_package_name'] = kwargs['_package']
+ query['_schema_id'] = schemaMap
+ elif '_key' in kwargs:
+ query['_schema_id'] = kwargs['_key'].asMap()
+ elif '_objectId' in kwargs:
+ query['_object_id'] = kwargs['_objectId'].asMap
+
+ #
+ # Construct and transmit the message
+ #
+ dp = self.broker.amqpSession.delivery_properties()
+ dp.routing_key = self.agentBank
+ mp = self.broker.amqpSession.message_properties()
+ mp.content_type = "amqp/map"
+ mp.user_id = self.broker.authUser
+ mp.correlation_id = str(sequence)
+ mp.app_id = "qmf2"
+ mp.reply_to = self.broker.amqpSession.reply_to("qmf.default.direct", self.broker.v2_queue_name)
+ mp.application_headers = {'qmf.opcode':'_query_request'}
+ sendCodec = Codec()
+ sendCodec.write_map(query)
+ msg = Message(dp, mp, sendCodec.encoded)
+ self.broker._send(msg, "qmf.default.direct")
+
+
+ def _v2SendSchemaRequest(self, schemaId):
+ """
+ Send a query to an agent to request details on a particular schema class.
+ IMPORTANT: This function currently sends a QMFv1 schema-request to the address of
+ the agent. The agent will send its response to amq.direct/<our-key>.
+ Eventually, this will be converted to a proper QMFv2 schema query.
+ """
+ sendCodec = Codec()
+ seq = self.seqMgr._reserve(None)
+ self.broker._setHeader(sendCodec, 'S', seq)
+ schemaId.encode(sendCodec)
+ smsg = self.broker._message(sendCodec.encoded, self.agentBank)
+ self.broker._send(smsg, "qmf.default.direct")
+
+
+ def _handleQmfV1Message(self, opcode, mp, ah, codec):
+ """
+ Process QMFv1 messages arriving from an agent.
+ """
+ if opcode == 'm': self._v1HandleMethodResp(codec, seq)
+ elif opcode == 'e': self._v1HandleEventInd(codec, seq)
+ elif opcode == 'c': self._v1HandleContentInd(codec, seq, prop=True)
+ elif opcode == 'i': self._v1HandleContentInd(codec, seq, stat=True)
+ elif opcode == 'g': self._v1HandleContentInd(codec, seq, prop=True, stat=True)
+
+
+ def _handleQmfV2Message(self, opcode, mp, ah, content):
+ """
+ Process QMFv2 messages arriving from an agent.
+ """
+ if opcode == '_data_indication': self._v2HandleDataInd(mp, ah, content)
+ elif opcode == '_query_response': self._v2HandleDataInd(mp, ah, content)
+ elif opcode == '_method_response': self._v2HandleMethodRsp(mp, ah, content)
+ elif opcode == '_exception': self._v2HandleException(mp, ah, content)
+
+
+#===================================================================================================
+# RequestContext
+#===================================================================================================
+class RequestContext(object):
+ """
+ This class tracks an asynchronous request sent to an agent.
+ """
+ def __init__(self, agent, notifiable):
+ self.agent = agent
+ self.schemaCache = self.agent.schemaCache
+ self.notifiable = notifiable
+ self.startTime = time()
+ self.rawQueryResults = []
+ self.queryResults = []
+ self.exception = None
+ self.waitingForSchema = None
+ self.cv = Condition()
+ self.blocked = notifiable == None
+
+
+ def addV2QueryResult(self, data):
+ self.rawQueryResults.append(data)
+
+
+ def setException(self, ex):
+ self.exception = ex
+
+
+ def getAge(self):
+ return time() - self.startTime
+
+
+ def waitForSignal(self, timeout):
+ try:
+ self.cv.acquire()
+ while self.blocked:
+ if (time() - self.startTime) > timeout:
+ self.exception = "Request timed out after %d seconds" % timeout
+ return
+ self.cv.wait(1)
+ finally:
+ self.cv.release()
+
+
+ def signal(self):
+ try:
+ self.cv.acquire()
+ self.blocked = None
+ self.cv.notify()
+ finally:
+ self.cv.release()
+
+
+ def processV2Data(self):
+ """
+ Attempt to make progress on the entries in the raw_query_results queue. If an entry has a schema
+ that is in our schema cache, process it. Otherwise, send a request for the schema information
+ to the agent that manages the object.
+ """
+ schemaId = None
+ queryResults = []
+ try:
+ self.cv.acquire()
+ if self.waitingForSchema:
+ return
+ while (not self.waitingForSchema) and len(self.rawQueryResults) > 0:
+ head = self.rawQueryResults[0]
+ schemaId = self._getSchemaIdforV2ObjectLH(head)
+ schema = self.schemaCache.getSchema(schemaId)
+ if schema:
+ obj = Object(self.agent, schema, v2Map=head, agentName=self.agent.agentBank)
+ queryResults.append(obj)
+ self.rawQueryResults.pop(0)
+ else:
+ self.waitingForSchema = True
+ finally:
+ self.cv.release()
+
+ if self.waitingForSchema:
+ self.agent._v2SendSchemaRequest(schemaId)
+
+ for result in queryResults:
+ if self.notifiable:
+ self.notifiable(qmf_object=result)
+ else:
+ self.queryResults.append(result)
+
+
+ def reprocess(self):
+ """
+ New schema information has been added to the schema-cache. Clear our 'waiting' status
+ and see if we can make more progress on the raw query list.
+ """
+ try:
+ self.cv.acquire()
+ self.waitingForSchema = None
+ finally:
+ self.cv.release()
+ self.processV2Data()
+
+
+ def _getSchemaIdforV2ObjectLH(self, data):
+ """
+ Given a data map, extract the schema-identifier.
+ """
+ if data.__class__ != dict:
+ return None
+ if '_schema_id' in data:
+ return ClassKey(data['_schema_id'])
+ return None
+
+
+#===================================================================================================
+# Event
+#===================================================================================================
class Event:
""" """
def __init__(self, session, broker, codec):
@@ -2318,6 +2632,10 @@ class Event:
def getSchema(self):
return self.schema
+
+#===================================================================================================
+# SequenceManager
+#===================================================================================================
class SequenceManager:
""" Manage sequence numbers for asynchronous method calls """
def __init__(self):
@@ -2349,6 +2667,9 @@ class SequenceManager:
return data
+#===================================================================================================
+# DebugConsole
+#===================================================================================================
class DebugConsole(Console):
""" """
def brokerConnected(self, broker):