summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2011-05-03 20:15:22 +0000
committerTed Ross <tross@apache.org>2011-05-03 20:15:22 +0000
commit41d2254bda3dc0e2c85099277f6a9c642ae09bcc (patch)
treebcbee11a2c09f0086ddac45b0c3bc3faaeb84d37
parent06d1d55323fb13f3b9b572ab42d1544251663f37 (diff)
downloadqpid-python-41d2254bda3dc0e2c85099277f6a9c642ae09bcc.tar.gz
QPID-3241 - Deadlock in qmf agent triggered by producer flow control
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1099225 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp342
1 files changed, 178 insertions, 164 deletions
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
index 593d403a11..633401ef5b 100644
--- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
+++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
@@ -305,43 +305,47 @@ void ManagementAgentImpl::raiseEvent(const ManagementEvent& event, severity_t se
"emerg", "alert", "crit", "error", "warn",
"note", "info", "debug"
};
- sys::Mutex::ScopedLock lock(agentLock);
- Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE);
- uint8_t sev = (severity == SEV_DEFAULT) ? event.getSeverity() : (uint8_t) severity;
+ string content;
stringstream key;
-
- // key << "console.event." << assignedBrokerBank << "." << assignedAgentBank << "." <<
- // event.getPackageName() << "." << event.getEventName();
- key << "agent.ind.event." << keyifyNameStr(event.getPackageName())
- << "." << keyifyNameStr(event.getEventName())
- << "." << severityStr[sev]
- << "." << vendorNameKey
- << "." << productNameKey
- << "." << instanceNameKey;
-
- Variant::Map map_;
- Variant::Map schemaId;
- Variant::Map values;
Variant::Map headers;
- string content;
- map_["_schema_id"] = mapEncodeSchemaId(event.getPackageName(),
- event.getEventName(),
- event.getMd5Sum(),
- ManagementItem::CLASS_KIND_EVENT);
- event.mapEncode(values);
- map_["_values"] = values;
- map_["_timestamp"] = uint64_t(Duration(EPOCH, now()));
- map_["_severity"] = sev;
+ {
+ sys::Mutex::ScopedLock lock(agentLock);
+ Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE);
+ uint8_t sev = (severity == SEV_DEFAULT) ? event.getSeverity() : (uint8_t) severity;
+
+ // key << "console.event." << assignedBrokerBank << "." << assignedAgentBank << "." <<
+ // event.getPackageName() << "." << event.getEventName();
+ key << "agent.ind.event." << keyifyNameStr(event.getPackageName())
+ << "." << keyifyNameStr(event.getEventName())
+ << "." << severityStr[sev]
+ << "." << vendorNameKey
+ << "." << productNameKey
+ << "." << instanceNameKey;
+
+ Variant::Map map_;
+ Variant::Map schemaId;
+ Variant::Map values;
+
+ map_["_schema_id"] = mapEncodeSchemaId(event.getPackageName(),
+ event.getEventName(),
+ event.getMd5Sum(),
+ ManagementItem::CLASS_KIND_EVENT);
+ event.mapEncode(values);
+ map_["_values"] = values;
+ map_["_timestamp"] = uint64_t(Duration(EPOCH, now()));
+ map_["_severity"] = sev;
+
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_data_indication";
+ headers["qmf.content"] = "_event";
+ headers["qmf.agent"] = name_address;
- headers["method"] = "indication";
- headers["qmf.opcode"] = "_data_indication";
- headers["qmf.content"] = "_event";
- headers["qmf.agent"] = name_address;
+ Variant::List list;
+ list.push_back(map_);
+ ListCodec::encode(list, content);
+ }
- Variant::List list;
- list.push_back(map_);
- ListCodec::encode(list, content);
connThreadBody.sendBuffer(content, "", headers, topicExchange, key.str(), "amqp/list");
}
@@ -521,9 +525,12 @@ void ManagementAgentImpl::sendException(const string& rte, const string& rtk, co
void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, const string& rte, const string& rtk)
{
- sys::Mutex::ScopedLock lock(agentLock);
string packageName;
SchemaClassKey key;
+ uint32_t outLen(0);
+ char localBuffer[MA_BUFFER_SIZE];
+ Buffer outBuffer(localBuffer, MA_BUFFER_SIZE);
+ bool found(false);
inBuffer.getShortString(packageName);
inBuffer.getShortString(key.name);
@@ -531,26 +538,30 @@ void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequenc
QPID_LOG(trace, "RCVD SchemaRequest: package=" << packageName << " class=" << key.name);
- PackageMap::iterator pIter = packages.find(packageName);
- if (pIter != packages.end()) {
- ClassMap& cMap = pIter->second;
- ClassMap::iterator cIter = cMap.find(key);
- if (cIter != cMap.end()) {
- SchemaClass& schema = cIter->second;
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
- string body;
-
- encodeHeader(outBuffer, 's', sequence);
- schema.writeSchemaCall(body);
- outBuffer.putRawData(body);
- outLen = MA_BUFFER_SIZE - outBuffer.available();
- outBuffer.reset();
- connThreadBody.sendBuffer(outBuffer, outLen, rte, rtk);
-
- QPID_LOG(trace, "SENT SchemaInd: package=" << packageName << " class=" << key.name);
+ {
+ sys::Mutex::ScopedLock lock(agentLock);
+ PackageMap::iterator pIter = packages.find(packageName);
+ if (pIter != packages.end()) {
+ ClassMap& cMap = pIter->second;
+ ClassMap::iterator cIter = cMap.find(key);
+ if (cIter != cMap.end()) {
+ SchemaClass& schema = cIter->second;
+ string body;
+
+ encodeHeader(outBuffer, 's', sequence);
+ schema.writeSchemaCall(body);
+ outBuffer.putRawData(body);
+ outLen = MA_BUFFER_SIZE - outBuffer.available();
+ outBuffer.reset();
+ found = true;
+ }
}
}
+
+ if (found) {
+ connThreadBody.sendBuffer(outBuffer, outLen, rte, rtk);
+ QPID_LOG(trace, "SENT SchemaInd: package=" << packageName << " class=" << key.name);
+ }
}
void ManagementAgentImpl::handleConsoleAddedIndication()
@@ -969,18 +980,6 @@ ManagementAgentImpl::PackageMap::iterator ManagementAgentImpl::findOrAddPackage(
pair<PackageMap::iterator, bool> result =
packages.insert(pair<string, ClassMap>(name, ClassMap()));
- if (connected) {
- // Publish a package-indication message
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
-
- encodeHeader(outBuffer, 'p');
- encodePackageIndication(outBuffer, result.first);
- outLen = MA_BUFFER_SIZE - outBuffer.available();
- outBuffer.reset();
- connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "schema.package");
- }
-
return result.first;
}
@@ -1038,131 +1037,146 @@ void ManagementAgentImpl::encodeClassIndication(Buffer& buf,
QPID_LOG(trace, "SENT ClassInd: package=" << (*pIter).first << " class=" << key.name);
}
+struct MessageItem {
+ string content;
+ Variant::Map headers;
+ string key;
+ MessageItem(const Variant::Map& h, const string& k) : headers(h), key(k) {}
+};
+
void ManagementAgentImpl::periodicProcessing()
{
string addr_key_base = "agent.ind.data.";
- sys::Mutex::ScopedLock lock(agentLock);
list<ObjectId> deleteList;
-
- if (!connected)
- return;
+ list<boost::shared_ptr<MessageItem> > message_list;
sendHeartbeat();
- moveNewObjectsLH();
-
- //
- // Clear the been-here flag on all objects in the map.
- //
- for (ObjectMap::iterator iter = managementObjects.begin();
- iter != managementObjects.end();
- iter++) {
- ManagementObject* object = iter->second.get();
- object->setFlags(0);
- if (publishAllData) {
- object->setForcePublish(true);
- }
- }
-
- publishAllData = false;
+ {
+ sys::Mutex::ScopedLock lock(agentLock);
- //
- // Process the entire object map.
- //
- uint32_t v2Objs = 0;
+ if (!connected)
+ return;
- for (ObjectMap::iterator baseIter = managementObjects.begin();
- baseIter != managementObjects.end();
- baseIter++) {
- ManagementObject* baseObject = baseIter->second.get();
+ moveNewObjectsLH();
//
- // Skip until we find a base object requiring a sent message.
+ // Clear the been-here flag on all objects in the map.
//
- if (baseObject->getFlags() == 1 ||
- (!baseObject->getConfigChanged() &&
- !baseObject->getInstChanged() &&
- !baseObject->getForcePublish() &&
- !baseObject->isDeleted()))
- continue;
-
- std::string packageName = baseObject->getPackageName();
- std::string className = baseObject->getClassName();
-
- Variant::List list_;
- string content;
- std::stringstream addr_key;
- Variant::Map headers;
-
- addr_key << addr_key_base;
- addr_key << keyifyNameStr(packageName)
- << "." << keyifyNameStr(className)
- << "." << vendorNameKey
- << "." << productNameKey
- << "." << instanceNameKey;
-
- headers["method"] = "indication";
- headers["qmf.opcode"] = "_data_indication";
- headers["qmf.content"] = "_data";
- headers["qmf.agent"] = name_address;
-
- for (ObjectMap::iterator iter = baseIter;
+ for (ObjectMap::iterator iter = managementObjects.begin();
iter != managementObjects.end();
iter++) {
ManagementObject* object = iter->second.get();
- bool send_stats, send_props;
- if (baseObject->isSameClass(*object) && object->getFlags() == 0) {
- object->setFlags(1);
- if (object->getConfigChanged() || object->getInstChanged())
- object->setUpdateTime();
+ object->setFlags(0);
+ if (publishAllData) {
+ object->setForcePublish(true);
+ }
+ }
- send_props = (object->getConfigChanged() || object->getForcePublish() || object->isDeleted());
- send_stats = (object->hasInst() && (object->getInstChanged() || object->getForcePublish()));
-
- if (send_stats || send_props) {
- Variant::Map map_;
- Variant::Map values;
- Variant::Map oid;
-
- object->getObjectId().mapEncode(oid);
- map_["_object_id"] = oid;
- map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
- object->getClassName(),
- object->getMd5Sum());
- object->writeTimestamps(map_);
- object->mapEncodeValues(values, send_props, send_stats);
- map_["_values"] = values;
- list_.push_back(map_);
-
- if (++v2Objs >= maxV2ReplyObjs) {
- v2Objs = 0;
- ListCodec::encode(list_, content);
-
- connThreadBody.sendBuffer(content, "", headers, topicExchange, addr_key.str(), "amqp/list");
- list_.clear();
- content.clear();
- QPID_LOG(trace, "SENT DataIndication");
+ publishAllData = false;
+
+ //
+ // Process the entire object map.
+ //
+ uint32_t v2Objs = 0;
+
+ for (ObjectMap::iterator baseIter = managementObjects.begin();
+ baseIter != managementObjects.end();
+ baseIter++) {
+ ManagementObject* baseObject = baseIter->second.get();
+
+ //
+ // Skip until we find a base object requiring a sent message.
+ //
+ if (baseObject->getFlags() == 1 ||
+ (!baseObject->getConfigChanged() &&
+ !baseObject->getInstChanged() &&
+ !baseObject->getForcePublish() &&
+ !baseObject->isDeleted()))
+ continue;
+
+ std::string packageName = baseObject->getPackageName();
+ std::string className = baseObject->getClassName();
+
+ Variant::List list_;
+ std::stringstream addr_key;
+ Variant::Map headers;
+
+ addr_key << addr_key_base;
+ addr_key << keyifyNameStr(packageName)
+ << "." << keyifyNameStr(className)
+ << "." << vendorNameKey
+ << "." << productNameKey
+ << "." << instanceNameKey;
+
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_data_indication";
+ headers["qmf.content"] = "_data";
+ headers["qmf.agent"] = name_address;
+
+ for (ObjectMap::iterator iter = baseIter;
+ iter != managementObjects.end();
+ iter++) {
+ ManagementObject* object = iter->second.get();
+ bool send_stats, send_props;
+ if (baseObject->isSameClass(*object) && object->getFlags() == 0) {
+ object->setFlags(1);
+ if (object->getConfigChanged() || object->getInstChanged())
+ object->setUpdateTime();
+
+ send_props = (object->getConfigChanged() || object->getForcePublish() || object->isDeleted());
+ send_stats = (object->hasInst() && (object->getInstChanged() || object->getForcePublish()));
+
+ if (send_stats || send_props) {
+ Variant::Map map_;
+ Variant::Map values;
+ Variant::Map oid;
+
+ object->getObjectId().mapEncode(oid);
+ map_["_object_id"] = oid;
+ map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
+ object->getClassName(),
+ object->getMd5Sum());
+ object->writeTimestamps(map_);
+ object->mapEncodeValues(values, send_props, send_stats);
+ map_["_values"] = values;
+ list_.push_back(map_);
+
+ if (++v2Objs >= maxV2ReplyObjs) {
+ v2Objs = 0;
+ boost::shared_ptr<MessageItem> item(new MessageItem(headers, addr_key.str()));
+ ListCodec::encode(list_, item->content);
+ message_list.push_back(item);
+ list_.clear();
+ }
}
+
+ if (object->isDeleted())
+ deleteList.push_back(iter->first);
+ object->setForcePublish(false);
}
+ }
- if (object->isDeleted())
- deleteList.push_back(iter->first);
- object->setForcePublish(false);
+ if (!list_.empty()) {
+ boost::shared_ptr<MessageItem> item(new MessageItem(headers, addr_key.str()));
+ ListCodec::encode(list_, item->content);
+ message_list.push_back(item);
}
}
- if (!list_.empty()) {
- ListCodec::encode(list_, content);
- connThreadBody.sendBuffer(content, "", headers, topicExchange, addr_key.str(), "amqp/list");
- QPID_LOG(trace, "SENT DataIndication");
- }
+ // Delete flagged objects
+ for (list<ObjectId>::reverse_iterator iter = deleteList.rbegin();
+ iter != deleteList.rend();
+ iter++)
+ managementObjects.erase(*iter);
}
- // Delete flagged objects
- for (list<ObjectId>::reverse_iterator iter = deleteList.rbegin();
- iter != deleteList.rend();
- iter++)
- managementObjects.erase(*iter);
+ while (!message_list.empty()) {
+ boost::shared_ptr<MessageItem> item(message_list.front());
+ message_list.pop_front();
+ connThreadBody.sendBuffer(item->content, "", item->headers, topicExchange, item->key, "amqp/list");
+ QPID_LOG(trace, "SENT DataIndication");
+ }
}