diff options
author | Ted Ross <tross@apache.org> | 2011-05-03 20:15:22 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2011-05-03 20:15:22 +0000 |
commit | 41d2254bda3dc0e2c85099277f6a9c642ae09bcc (patch) | |
tree | bcbee11a2c09f0086ddac45b0c3bc3faaeb84d37 | |
parent | 06d1d55323fb13f3b9b572ab42d1544251663f37 (diff) | |
download | qpid-python-41d2254bda3dc0e2c85099277f6a9c642ae09bcc.tar.gz |
QPID-3241 - Deadlock in qmf agent triggered by producer flow control
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1099225 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp | 342 |
1 files changed, 178 insertions, 164 deletions
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp index 593d403a11..633401ef5b 100644 --- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp +++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp @@ -305,43 +305,47 @@ void ManagementAgentImpl::raiseEvent(const ManagementEvent& event, severity_t se "emerg", "alert", "crit", "error", "warn", "note", "info", "debug" }; - sys::Mutex::ScopedLock lock(agentLock); - Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE); - uint8_t sev = (severity == SEV_DEFAULT) ? event.getSeverity() : (uint8_t) severity; + string content; stringstream key; - - // key << "console.event." << assignedBrokerBank << "." << assignedAgentBank << "." << - // event.getPackageName() << "." << event.getEventName(); - key << "agent.ind.event." << keyifyNameStr(event.getPackageName()) - << "." << keyifyNameStr(event.getEventName()) - << "." << severityStr[sev] - << "." << vendorNameKey - << "." << productNameKey - << "." << instanceNameKey; - - Variant::Map map_; - Variant::Map schemaId; - Variant::Map values; Variant::Map headers; - string content; - map_["_schema_id"] = mapEncodeSchemaId(event.getPackageName(), - event.getEventName(), - event.getMd5Sum(), - ManagementItem::CLASS_KIND_EVENT); - event.mapEncode(values); - map_["_values"] = values; - map_["_timestamp"] = uint64_t(Duration(EPOCH, now())); - map_["_severity"] = sev; + { + sys::Mutex::ScopedLock lock(agentLock); + Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE); + uint8_t sev = (severity == SEV_DEFAULT) ? event.getSeverity() : (uint8_t) severity; + + // key << "console.event." << assignedBrokerBank << "." << assignedAgentBank << "." << + // event.getPackageName() << "." << event.getEventName(); + key << "agent.ind.event." << keyifyNameStr(event.getPackageName()) + << "." << keyifyNameStr(event.getEventName()) + << "." << severityStr[sev] + << "." << vendorNameKey + << "." << productNameKey + << "." << instanceNameKey; + + Variant::Map map_; + Variant::Map schemaId; + Variant::Map values; + + map_["_schema_id"] = mapEncodeSchemaId(event.getPackageName(), + event.getEventName(), + event.getMd5Sum(), + ManagementItem::CLASS_KIND_EVENT); + event.mapEncode(values); + map_["_values"] = values; + map_["_timestamp"] = uint64_t(Duration(EPOCH, now())); + map_["_severity"] = sev; + + headers["method"] = "indication"; + headers["qmf.opcode"] = "_data_indication"; + headers["qmf.content"] = "_event"; + headers["qmf.agent"] = name_address; - headers["method"] = "indication"; - headers["qmf.opcode"] = "_data_indication"; - headers["qmf.content"] = "_event"; - headers["qmf.agent"] = name_address; + Variant::List list; + list.push_back(map_); + ListCodec::encode(list, content); + } - Variant::List list; - list.push_back(map_); - ListCodec::encode(list, content); connThreadBody.sendBuffer(content, "", headers, topicExchange, key.str(), "amqp/list"); } @@ -521,9 +525,12 @@ void ManagementAgentImpl::sendException(const string& rte, const string& rtk, co void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, const string& rte, const string& rtk) { - sys::Mutex::ScopedLock lock(agentLock); string packageName; SchemaClassKey key; + uint32_t outLen(0); + char localBuffer[MA_BUFFER_SIZE]; + Buffer outBuffer(localBuffer, MA_BUFFER_SIZE); + bool found(false); inBuffer.getShortString(packageName); inBuffer.getShortString(key.name); @@ -531,26 +538,30 @@ void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequenc QPID_LOG(trace, "RCVD SchemaRequest: package=" << packageName << " class=" << key.name); - PackageMap::iterator pIter = packages.find(packageName); - if (pIter != packages.end()) { - ClassMap& cMap = pIter->second; - ClassMap::iterator cIter = cMap.find(key); - if (cIter != cMap.end()) { - SchemaClass& schema = cIter->second; - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; - string body; - - encodeHeader(outBuffer, 's', sequence); - schema.writeSchemaCall(body); - outBuffer.putRawData(body); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - connThreadBody.sendBuffer(outBuffer, outLen, rte, rtk); - - QPID_LOG(trace, "SENT SchemaInd: package=" << packageName << " class=" << key.name); + { + sys::Mutex::ScopedLock lock(agentLock); + PackageMap::iterator pIter = packages.find(packageName); + if (pIter != packages.end()) { + ClassMap& cMap = pIter->second; + ClassMap::iterator cIter = cMap.find(key); + if (cIter != cMap.end()) { + SchemaClass& schema = cIter->second; + string body; + + encodeHeader(outBuffer, 's', sequence); + schema.writeSchemaCall(body); + outBuffer.putRawData(body); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + found = true; + } } } + + if (found) { + connThreadBody.sendBuffer(outBuffer, outLen, rte, rtk); + QPID_LOG(trace, "SENT SchemaInd: package=" << packageName << " class=" << key.name); + } } void ManagementAgentImpl::handleConsoleAddedIndication() @@ -969,18 +980,6 @@ ManagementAgentImpl::PackageMap::iterator ManagementAgentImpl::findOrAddPackage( pair<PackageMap::iterator, bool> result = packages.insert(pair<string, ClassMap>(name, ClassMap())); - if (connected) { - // Publish a package-indication message - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; - - encodeHeader(outBuffer, 'p'); - encodePackageIndication(outBuffer, result.first); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "schema.package"); - } - return result.first; } @@ -1038,131 +1037,146 @@ void ManagementAgentImpl::encodeClassIndication(Buffer& buf, QPID_LOG(trace, "SENT ClassInd: package=" << (*pIter).first << " class=" << key.name); } +struct MessageItem { + string content; + Variant::Map headers; + string key; + MessageItem(const Variant::Map& h, const string& k) : headers(h), key(k) {} +}; + void ManagementAgentImpl::periodicProcessing() { string addr_key_base = "agent.ind.data."; - sys::Mutex::ScopedLock lock(agentLock); list<ObjectId> deleteList; - - if (!connected) - return; + list<boost::shared_ptr<MessageItem> > message_list; sendHeartbeat(); - moveNewObjectsLH(); - - // - // Clear the been-here flag on all objects in the map. - // - for (ObjectMap::iterator iter = managementObjects.begin(); - iter != managementObjects.end(); - iter++) { - ManagementObject* object = iter->second.get(); - object->setFlags(0); - if (publishAllData) { - object->setForcePublish(true); - } - } - - publishAllData = false; + { + sys::Mutex::ScopedLock lock(agentLock); - // - // Process the entire object map. - // - uint32_t v2Objs = 0; + if (!connected) + return; - for (ObjectMap::iterator baseIter = managementObjects.begin(); - baseIter != managementObjects.end(); - baseIter++) { - ManagementObject* baseObject = baseIter->second.get(); + moveNewObjectsLH(); // - // Skip until we find a base object requiring a sent message. + // Clear the been-here flag on all objects in the map. // - if (baseObject->getFlags() == 1 || - (!baseObject->getConfigChanged() && - !baseObject->getInstChanged() && - !baseObject->getForcePublish() && - !baseObject->isDeleted())) - continue; - - std::string packageName = baseObject->getPackageName(); - std::string className = baseObject->getClassName(); - - Variant::List list_; - string content; - std::stringstream addr_key; - Variant::Map headers; - - addr_key << addr_key_base; - addr_key << keyifyNameStr(packageName) - << "." << keyifyNameStr(className) - << "." << vendorNameKey - << "." << productNameKey - << "." << instanceNameKey; - - headers["method"] = "indication"; - headers["qmf.opcode"] = "_data_indication"; - headers["qmf.content"] = "_data"; - headers["qmf.agent"] = name_address; - - for (ObjectMap::iterator iter = baseIter; + for (ObjectMap::iterator iter = managementObjects.begin(); iter != managementObjects.end(); iter++) { ManagementObject* object = iter->second.get(); - bool send_stats, send_props; - if (baseObject->isSameClass(*object) && object->getFlags() == 0) { - object->setFlags(1); - if (object->getConfigChanged() || object->getInstChanged()) - object->setUpdateTime(); + object->setFlags(0); + if (publishAllData) { + object->setForcePublish(true); + } + } - send_props = (object->getConfigChanged() || object->getForcePublish() || object->isDeleted()); - send_stats = (object->hasInst() && (object->getInstChanged() || object->getForcePublish())); - - if (send_stats || send_props) { - Variant::Map map_; - Variant::Map values; - Variant::Map oid; - - object->getObjectId().mapEncode(oid); - map_["_object_id"] = oid; - map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(), - object->getClassName(), - object->getMd5Sum()); - object->writeTimestamps(map_); - object->mapEncodeValues(values, send_props, send_stats); - map_["_values"] = values; - list_.push_back(map_); - - if (++v2Objs >= maxV2ReplyObjs) { - v2Objs = 0; - ListCodec::encode(list_, content); - - connThreadBody.sendBuffer(content, "", headers, topicExchange, addr_key.str(), "amqp/list"); - list_.clear(); - content.clear(); - QPID_LOG(trace, "SENT DataIndication"); + publishAllData = false; + + // + // Process the entire object map. + // + uint32_t v2Objs = 0; + + for (ObjectMap::iterator baseIter = managementObjects.begin(); + baseIter != managementObjects.end(); + baseIter++) { + ManagementObject* baseObject = baseIter->second.get(); + + // + // Skip until we find a base object requiring a sent message. + // + if (baseObject->getFlags() == 1 || + (!baseObject->getConfigChanged() && + !baseObject->getInstChanged() && + !baseObject->getForcePublish() && + !baseObject->isDeleted())) + continue; + + std::string packageName = baseObject->getPackageName(); + std::string className = baseObject->getClassName(); + + Variant::List list_; + std::stringstream addr_key; + Variant::Map headers; + + addr_key << addr_key_base; + addr_key << keyifyNameStr(packageName) + << "." << keyifyNameStr(className) + << "." << vendorNameKey + << "." << productNameKey + << "." << instanceNameKey; + + headers["method"] = "indication"; + headers["qmf.opcode"] = "_data_indication"; + headers["qmf.content"] = "_data"; + headers["qmf.agent"] = name_address; + + for (ObjectMap::iterator iter = baseIter; + iter != managementObjects.end(); + iter++) { + ManagementObject* object = iter->second.get(); + bool send_stats, send_props; + if (baseObject->isSameClass(*object) && object->getFlags() == 0) { + object->setFlags(1); + if (object->getConfigChanged() || object->getInstChanged()) + object->setUpdateTime(); + + send_props = (object->getConfigChanged() || object->getForcePublish() || object->isDeleted()); + send_stats = (object->hasInst() && (object->getInstChanged() || object->getForcePublish())); + + if (send_stats || send_props) { + Variant::Map map_; + Variant::Map values; + Variant::Map oid; + + object->getObjectId().mapEncode(oid); + map_["_object_id"] = oid; + map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(), + object->getClassName(), + object->getMd5Sum()); + object->writeTimestamps(map_); + object->mapEncodeValues(values, send_props, send_stats); + map_["_values"] = values; + list_.push_back(map_); + + if (++v2Objs >= maxV2ReplyObjs) { + v2Objs = 0; + boost::shared_ptr<MessageItem> item(new MessageItem(headers, addr_key.str())); + ListCodec::encode(list_, item->content); + message_list.push_back(item); + list_.clear(); + } } + + if (object->isDeleted()) + deleteList.push_back(iter->first); + object->setForcePublish(false); } + } - if (object->isDeleted()) - deleteList.push_back(iter->first); - object->setForcePublish(false); + if (!list_.empty()) { + boost::shared_ptr<MessageItem> item(new MessageItem(headers, addr_key.str())); + ListCodec::encode(list_, item->content); + message_list.push_back(item); } } - if (!list_.empty()) { - ListCodec::encode(list_, content); - connThreadBody.sendBuffer(content, "", headers, topicExchange, addr_key.str(), "amqp/list"); - QPID_LOG(trace, "SENT DataIndication"); - } + // Delete flagged objects + for (list<ObjectId>::reverse_iterator iter = deleteList.rbegin(); + iter != deleteList.rend(); + iter++) + managementObjects.erase(*iter); } - // Delete flagged objects - for (list<ObjectId>::reverse_iterator iter = deleteList.rbegin(); - iter != deleteList.rend(); - iter++) - managementObjects.erase(*iter); + while (!message_list.empty()) { + boost::shared_ptr<MessageItem> item(message_list.front()); + message_list.pop_front(); + connThreadBody.sendBuffer(item->content, "", item->headers, topicExchange, item->key, "amqp/list"); + QPID_LOG(trace, "SENT DataIndication"); + } } |