diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-04-15 19:37:20 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-04-15 19:37:20 +0000 |
commit | 233b73fc1586020e645f1ebc4ef380bdefbfc67b (patch) | |
tree | 88821ac953d6491cc4f75f72d1f1dbbe9cdc2295 /qpid/cpp/src | |
parent | a9f00579d155359c3717ae02cdc7481a3487783d (diff) | |
download | qpid-python-233b73fc1586020e645f1ebc4ef380bdefbfc67b.tar.gz |
QPID-2507: drop the userLock before calling exchange->route()
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@934561 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementAgent.cpp | 150 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementAgent.h | 21 |
2 files changed, 94 insertions, 77 deletions
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp index 18cd0cdfee..42c96d4641 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp +++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp @@ -342,7 +342,7 @@ void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severi outBuffer.putRawData(sBuf); outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); - sendBuffer(outBuffer, outLen, mExchange, + sendBufferLH(outBuffer, outLen, mExchange, "console.event.1.0." + event.getPackageName() + "." + event.getEventName()); QPID_LOG(trace, "SEND raiseEvent (v1) class=" << event.getPackageName() << "." << event.getEventName()); } @@ -372,7 +372,7 @@ void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severi string content; MapCodec::encode(map_, content); - sendBuffer(content, "", headers, v2Topic, key.str()); + sendBufferLH(content, "", headers, "amqp/map", v2Topic, key.str()); QPID_LOG(trace, "SEND raiseEvent (v2) class=" << event.getPackageName() << "." << event.getEventName()); } @@ -393,6 +393,8 @@ void ManagementAgent::Periodic::fire () void ManagementAgent::clientAdded (const std::string& routingKey) { + sys::Mutex::ScopedLock lock(userLock); + if (routingKey.find("console") != 0) return; @@ -407,7 +409,7 @@ void ManagementAgent::clientAdded (const std::string& routingKey) encodeHeader(outBuffer, 'x'); outLen = outBuffer.getPosition(); outBuffer.reset(); - sendBuffer(outBuffer, outLen, dExchange, aIter->second->routingKey); + sendBufferLH(outBuffer, outLen, dExchange, aIter->second->routingKey); QPID_LOG(trace, "SEND ConsoleAddedIndication to=" << aIter->second->routingKey); } } @@ -441,10 +443,11 @@ bool ManagementAgent::checkHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq) return h1 == 'A' && h2 == 'M' && h3 == '2'; } -void ManagementAgent::sendBuffer(Buffer& buf, - uint32_t length, - qpid::broker::Exchange::shared_ptr exchange, - string routingKey) +// NOTE WELL: assumes userLock is held by caller (LH) +void ManagementAgent::sendBufferLH(Buffer& buf, + uint32_t length, + qpid::broker::Exchange::shared_ptr exchange, + string routingKey) { if (suppressed) { QPID_LOG(trace, "Suppressed management message to " << routingKey); @@ -477,18 +480,24 @@ void ManagementAgent::sendBuffer(Buffer& buf, msg->getFrames().append(content); - DeliverableMessage deliverable (msg); - try { - exchange->route(deliverable, routingKey, 0); - } catch(exception&) {} + { + sys::Mutex::ScopedUnlock u(userLock); + + DeliverableMessage deliverable (msg); + try { + exchange->route(deliverable, routingKey, 0); + } catch(exception&) {} + } } -void ManagementAgent::sendBuffer(const std::string& data, - const std::string& cid, - const Variant::Map& headers, - qpid::broker::Exchange::shared_ptr exchange, - const std::string& routingKey) +// NOTE WELL: assumes userLock is held by caller (LH) +void ManagementAgent::sendBufferLH(const std::string& data, + const std::string& cid, + const Variant::Map& headers, + const std::string& content_type, + qpid::broker::Exchange::shared_ptr exchange, + const std::string& routingKey) { Variant::Map::const_iterator i; @@ -517,6 +526,7 @@ void ManagementAgent::sendBuffer(const std::string& data, if (!cid.empty()) { props->setCorrelationId(cid); } + props->setContentType(content_type); for (i = headers.begin(); i != headers.end(); ++i) { msg->getOrInsertHeaders().setString(i->first, i->second.asString()); @@ -529,10 +539,14 @@ void ManagementAgent::sendBuffer(const std::string& data, msg->getFrames().append(content); - DeliverableMessage deliverable (msg); - try { - exchange->route(deliverable, routingKey, 0); - } catch(exception&) {} + { + sys::Mutex::ScopedUnlock u(userLock); + + DeliverableMessage deliverable (msg); + try { + exchange->route(deliverable, routingKey, 0); + } catch(exception&) {} + } } @@ -683,7 +697,7 @@ void ManagementAgent::periodicProcessing (void) msgBuffer.reset(); stringstream key; key << "console.obj.1.0." << baseObject->getPackageName() << "." << baseObject->getClassName(); - sendBuffer(msgBuffer, contentSize, mExchange, key.str()); + sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); QPID_LOG(trace, "SEND Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount); } } @@ -701,7 +715,7 @@ void ManagementAgent::periodicProcessing (void) headers["qmf.content"] = "_data"; headers["qmf.agent"] = name_address; - sendBuffer(content, "", headers, v2Topic, key.str()); + sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); QPID_LOG(trace, "SEND Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount); } } @@ -732,7 +746,7 @@ void ManagementAgent::periodicProcessing (void) msgBuffer.reset (); stringstream key; key << "console.obj.1.0." << (*cdIter)->getPackageName() << "." << (*cdIter)->getClassName(); - sendBuffer (msgBuffer, contentSize, mExchange, key.str()); + sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); QPID_LOG(trace, "SEND ContentInd for deleted object to=" << key.str()); } @@ -760,7 +774,7 @@ void ManagementAgent::periodicProcessing (void) string content; ListCodec::encode(list_, content); - sendBuffer(content, "", headers, v2Topic, key.str()); + sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); QPID_LOG(trace, "SEND ContentInd for deleted object to=" << key.str()); } } @@ -784,7 +798,7 @@ void ManagementAgent::periodicProcessing (void) contentSize = BUFSIZE - msgBuffer.available (); msgBuffer.reset (); routingKey = "console.heartbeat.1.0"; - sendBuffer (msgBuffer, contentSize, mExchange, routingKey); + sendBufferLH(msgBuffer, contentSize, mExchange, routingKey); QPID_LOG(trace, "SEND HeartbeatInd to=" << routingKey); } @@ -804,7 +818,7 @@ void ManagementAgent::periodicProcessing (void) string content; MapCodec::encode(map, content); - sendBuffer(content, "", headers, v2Topic, addr_key); + sendBufferLH(content, "", headers, "amqp/map", v2Topic, addr_key); QPID_LOG(trace, "SENT AgentHeartbeat name=" << name_address); } @@ -834,7 +848,7 @@ void ManagementAgent::deleteObjectNowLH(const ObjectId& oid) msgBuffer.reset(); stringstream key; key << "console.obj.1.0." << object->getPackageName() << "." << object->getClassName(); - sendBuffer(msgBuffer, contentSize, mExchange, key.str()); + sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); QPID_LOG(trace, "SEND Immediate(delete) ContentInd to=" << key.str()); } @@ -862,15 +876,15 @@ void ManagementAgent::deleteObjectNowLH(const ObjectId& oid) string content; ListCodec::encode(list_, content); - sendBuffer(content, "", headers, v2Topic, key.str()); + sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); QPID_LOG(trace, "SEND Immediate(delete) ContentInd to=" << key.str()); } managementObjects.erase(oid); } -void ManagementAgent::sendCommandComplete (string replyToKey, uint32_t sequence, - uint32_t code, string text) +void ManagementAgent::sendCommandCompleteLH(string replyToKey, uint32_t sequence, + uint32_t code, string text) { Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; @@ -880,7 +894,7 @@ void ManagementAgent::sendCommandComplete (string replyToKey, uint32_t sequence, outBuffer.putShortString (text); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); - sendBuffer (outBuffer, outLen, dExchange, replyToKey); + sendBufferLH(outBuffer, outLen, dExchange, replyToKey); QPID_LOG(trace, "SEND CommandCompleteInd code=" << code << " text=" << text << " to=" << replyToKey << " seq=" << sequence); } @@ -987,7 +1001,7 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey outBuffer.putMediumString(i->second); outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); - sendBuffer(outBuffer, outLen, dExchange, replyToKey); + sendBufferLH(outBuffer, outLen, dExchange, replyToKey); QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << sequence); return; } @@ -1003,7 +1017,7 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN)); outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); - sendBuffer(outBuffer, outLen, dExchange, replyToKey); + sendBufferLH(outBuffer, outLen, dExchange, replyToKey); QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence); return; } @@ -1035,7 +1049,7 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); - sendBuffer(outBuffer, outLen, dExchange, replyToKey); + sendBufferLH(outBuffer, outLen, dExchange, replyToKey); QPID_LOG(trace, "SEND MethodResponse (v1) to=" << replyToKey << " seq=" << sequence); } @@ -1064,7 +1078,7 @@ void ManagementAgent::handleMethodRequestLH (const std::string& body, string rep (outMap["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID); MapCodec::encode(outMap, content); - sendBuffer(content, cid, headers, v2Direct, replyTo); + sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyTo); QPID_LOG(trace, "SEND MethodResponse (invalid param) to=" << replyTo << " seq=" << cid); return; } @@ -1087,7 +1101,7 @@ void ManagementAgent::handleMethodRequestLH (const std::string& body, string rep (outMap["_values"].asMap())["_status_text"] = e.what(); MapCodec::encode(outMap, content); - sendBuffer(content, cid, headers, v2Direct, replyTo); + sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyTo); QPID_LOG(trace, "SEND MethodResponse (invalid format) to=" << replyTo << " seq=" << cid); return; } @@ -1100,7 +1114,7 @@ void ManagementAgent::handleMethodRequestLH (const std::string& body, string rep (outMap["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT); MapCodec::encode(outMap, content); - sendBuffer(content, cid, headers, v2Direct, replyTo); + sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyTo); QPID_LOG(trace, "SEND MethodResponse (unknown object) to=" << replyTo << " seq=" << cid); return; } @@ -1116,7 +1130,7 @@ void ManagementAgent::handleMethodRequestLH (const std::string& body, string rep (outMap["_values"].asMap())["_status_text"] = i->second; MapCodec::encode(outMap, content); - sendBuffer(content, cid, headers, v2Direct, replyTo); + sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyTo); QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << cid); return; } @@ -1133,7 +1147,7 @@ void ManagementAgent::handleMethodRequestLH (const std::string& body, string rep (outMap["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_FORBIDDEN); MapCodec::encode(outMap, content); - sendBuffer(content, cid, headers, v2Direct, replyTo); + sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyTo); QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << cid); return; } @@ -1146,6 +1160,7 @@ void ManagementAgent::handleMethodRequestLH (const std::string& body, string rep methodName << " replyTo=" << replyTo); try { + sys::Mutex::ScopedUnlock u(userLock); iter->second->doMethod(methodName, inArgs, outMap); } catch(exception& e) { outMap.clear(); @@ -1154,13 +1169,13 @@ void ManagementAgent::handleMethodRequestLH (const std::string& body, string rep (outMap["_values"].asMap())["_status_text"] = e.what(); MapCodec::encode(outMap, content); - sendBuffer(content, cid, headers, v2Direct, replyTo); + sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyTo); QPID_LOG(trace, "SEND MethodResponse (exception) to=" << replyTo << " seq=" << cid); return; } MapCodec::encode(outMap, content); - sendBuffer(content, cid, headers, v2Direct, replyTo); + sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyTo); QPID_LOG(trace, "SEND MethodResponse (v2) to=" << replyTo << " seq=" << cid); } @@ -1177,7 +1192,7 @@ void ManagementAgent::handleBrokerRequestLH (Buffer&, string replyToKey, uint32_ outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); - sendBuffer (outBuffer, outLen, dExchange, replyToKey); + sendBufferLH(outBuffer, outLen, dExchange, replyToKey); QPID_LOG(trace, "SEND BrokerResponse to=" << replyToKey); } @@ -1196,11 +1211,11 @@ void ManagementAgent::handlePackageQueryLH (Buffer&, string replyToKey, uint32_t encodePackageIndication (outBuffer, pIter); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); - sendBuffer (outBuffer, outLen, dExchange, replyToKey); + sendBufferLH(outBuffer, outLen, dExchange, replyToKey); QPID_LOG(trace, "SEND PackageInd package=" << (*pIter).first << " to=" << replyToKey << " seq=" << sequence); } - sendCommandComplete (replyToKey, sequence); + sendCommandCompleteLH(replyToKey, sequence); } void ManagementAgent::handlePackageIndLH (Buffer& inBuffer, string replyToKey, uint32_t sequence) @@ -1239,13 +1254,13 @@ void ManagementAgent::handleClassQueryLH(Buffer& inBuffer, string replyToKey, ui encodeClassIndication(outBuffer, pIter, cIter); outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); - sendBuffer(outBuffer, outLen, dExchange, replyToKey); + sendBufferLH(outBuffer, outLen, dExchange, replyToKey); QPID_LOG(trace, "SEND ClassInd class=" << (*pIter).first << ":" << (*cIter).first.name << "(" << Uuid((*cIter).first.hash) << ") to=" << replyToKey << " seq=" << sequence); } } } - sendCommandComplete(replyToKey, sequence); + sendCommandCompleteLH(replyToKey, sequence); } void ManagementAgent::handleClassIndLH (Buffer& inBuffer, string replyToKey, uint32_t) @@ -1274,7 +1289,7 @@ void ManagementAgent::handleClassIndLH (Buffer& inBuffer, string replyToKey, uin key.encode(outBuffer); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); - sendBuffer (outBuffer, outLen, dExchange, replyToKey); + sendBufferLH(outBuffer, outLen, dExchange, replyToKey); QPID_LOG(trace, "SEND SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << "), to=" << replyToKey << " seq=" << sequence); @@ -1324,17 +1339,17 @@ void ManagementAgent::handleSchemaRequestLH(Buffer& inBuffer, string replyToKey, classInfo.appendSchema(outBuffer); outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); - sendBuffer(outBuffer, outLen, dExchange, replyToKey); + sendBufferLH(outBuffer, outLen, dExchange, replyToKey); QPID_LOG(trace, "SEND SchemaResponse to=" << replyToKey << " seq=" << sequence); } else - sendCommandComplete(replyToKey, sequence, 1, "Schema not available"); + sendCommandCompleteLH(replyToKey, sequence, 1, "Schema not available"); } else - sendCommandComplete(replyToKey, sequence, 1, "Class key not found"); + sendCommandCompleteLH(replyToKey, sequence, 1, "Class key not found"); } else - sendCommandComplete(replyToKey, sequence, 1, "Package not found"); + sendCommandCompleteLH(replyToKey, sequence, 1, "Package not found"); } void ManagementAgent::handleSchemaResponseLH(Buffer& inBuffer, string /*replyToKey*/, uint32_t sequence) @@ -1371,7 +1386,7 @@ void ManagementAgent::handleSchemaResponseLH(Buffer& inBuffer, string /*replyToK encodeClassIndication(outBuffer, pIter, cIter); outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); - sendBuffer(outBuffer, outLen, mExchange, "schema.class"); + sendBufferLH(outBuffer, outLen, mExchange, "schema.class"); QPID_LOG(trace, "SEND ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" << " to=schema.class"); } @@ -1448,7 +1463,7 @@ void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, string replyToKey RemoteAgentMap::iterator aIter = remoteAgents.find(connectionRef); if (aIter != remoteAgents.end()) { // There already exists an agent on this session. Reject the request. - sendCommandComplete(replyToKey, sequence, 1, "Connection already has remote agent"); + sendCommandCompleteLH(replyToKey, sequence, 1, "Connection already has remote agent"); return; } @@ -1488,7 +1503,7 @@ void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, string replyToKey outBuffer.putLong (assignedBank); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); - sendBuffer (outBuffer, outLen, dExchange, replyToKey); + sendBufferLH(outBuffer, outLen, dExchange, replyToKey); QPID_LOG(trace, "SEND AttachResponse brokerBank=" << brokerBank << " agentBank=" << assignedBank << " to=" << replyToKey << " seq=" << sequence); } @@ -1530,11 +1545,11 @@ void ManagementAgent::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uin outBuffer.putRawData(sBuf); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); - sendBuffer(outBuffer, outLen, dExchange, replyToKey); + sendBufferLH(outBuffer, outLen, dExchange, replyToKey); QPID_LOG(trace, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence); } } - sendCommandComplete(replyToKey, sequence); + sendCommandCompleteLH(replyToKey, sequence); return; } @@ -1561,13 +1576,13 @@ void ManagementAgent::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uin outBuffer.putRawData(sBuf); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); - sendBuffer(outBuffer, outLen, dExchange, replyToKey); + sendBufferLH(outBuffer, outLen, dExchange, replyToKey); QPID_LOG(trace, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence); } } } - sendCommandComplete(replyToKey, sequence); + sendCommandCompleteLH(replyToKey, sequence); } @@ -1636,7 +1651,7 @@ void ManagementAgent::handleGetQueryLH(const std::string& body, std::string repl list_.push_back(map_); ListCodec::encode(list_, content); - sendBuffer(content, cid, headers, v2Direct, replyTo); + sendBufferLH(content, cid, headers, "amqp/list", v2Direct, replyTo); } } } @@ -1659,7 +1674,7 @@ void ManagementAgent::handleGetQueryLH(const std::string& body, std::string repl list_.push_back(map_); ListCodec::encode(list_, content); - sendBuffer(content, cid, headers, v2Direct, replyTo); + sendBufferLH(content, cid, headers, "amqp/list", v2Direct, replyTo); } } } @@ -1669,7 +1684,7 @@ void ManagementAgent::handleGetQueryLH(const std::string& body, std::string repl list_.clear(); headers.erase("partial"); ListCodec::encode(list_, content); - sendBuffer(content, cid, headers, v2Direct, replyTo); + sendBufferLH(content, cid, headers, "amqp/list", v2Direct, replyTo); QPID_LOG(trace, "SEND GetResponse (v2) to=" << replyTo << " seq=" << cid); } @@ -1692,7 +1707,7 @@ void ManagementAgent::handleLocateRequestLH(const string&, const string& replyTo string content; MapCodec::encode(map, content); - sendBuffer(content, cid, headers, v2Direct, replyTo); + sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyTo); QPID_LOG(trace, "SENT AgentLocateResponse replyTo=" << replyTo); } @@ -1832,7 +1847,7 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg) string content; MapCodec::encode(outMap, content); - sendBuffer(content, cid, headers, v2Direct, replyToKey); + sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyToKey); } else { @@ -1844,7 +1859,7 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg) outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN)); outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); - sendBuffer(outBuffer, outLen, dExchange, replyToKey); + sendBufferLH(outBuffer, outLen, dExchange, replyToKey); } QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence); @@ -1947,10 +1962,10 @@ ManagementAgent::PackageMap::iterator ManagementAgent::findOrAddPackageLH(string encodePackageIndication (outBuffer, result.first); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); - sendBuffer (outBuffer, outLen, mExchange, "schema.package"); - QPID_LOG(trace, "SEND PackageInd package=" << name << " to=schema.package") + sendBufferLH(outBuffer, outLen, mExchange, "schema.package"); + QPID_LOG(trace, "SEND PackageInd package=" << name << " to=schema.package"); - return result.first; + return result.first; } void ManagementAgent::addClassLH(uint8_t kind, @@ -2312,6 +2327,7 @@ void ManagementAgent::importAgents(qpid::framing::Buffer& inBuf) { Variant::List content; ListCodec::decode(buf, content); Variant::List::const_iterator l; + sys::Mutex::ScopedLock lock(userLock); for (l = content.begin(); l != content.end(); l++) { std::auto_ptr<RemoteAgent> agent(new RemoteAgent(*this)); diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h index 0250f39dd6..2366446fe0 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.h +++ b/qpid/cpp/src/qpid/management/ManagementAgent.h @@ -301,15 +301,16 @@ private: void deleteObjectNowLH(const ObjectId& oid); void encodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0); bool checkHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq); - void sendBuffer (framing::Buffer& buf, - uint32_t length, - qpid::broker::Exchange::shared_ptr exchange, - std::string routingKey); - void sendBuffer(const std::string& data, - const std::string& cid, - const qpid::types::Variant::Map& headers, - qpid::broker::Exchange::shared_ptr exchange, - const std::string& routingKey); + void sendBufferLH(framing::Buffer& buf, + uint32_t length, + qpid::broker::Exchange::shared_ptr exchange, + std::string routingKey); + void sendBufferLH(const std::string& data, + const std::string& cid, + const qpid::types::Variant::Map& headers, + const std::string& content_type, + qpid::broker::Exchange::shared_ptr exchange, + const std::string& routingKey); void moveNewObjectsLH(); bool authorizeAgentMessageLH(qpid::broker::Message& msg); @@ -330,7 +331,7 @@ private: uint32_t allocateNewBank (); uint32_t assignBankLH (uint32_t requestedPrefix); void deleteOrphanedAgentsLH(); - void sendCommandComplete (std::string replyToKey, uint32_t sequence, + void sendCommandCompleteLH(std::string replyToKey, uint32_t sequence, uint32_t code = 0, std::string text = std::string("OK")); void handleBrokerRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); void handlePackageQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); |