summaryrefslogtreecommitdiff
path: root/qpid
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2010-04-15 19:37:20 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2010-04-15 19:37:20 +0000
commit233b73fc1586020e645f1ebc4ef380bdefbfc67b (patch)
tree88821ac953d6491cc4f75f72d1f1dbbe9cdc2295 /qpid
parenta9f00579d155359c3717ae02cdc7481a3487783d (diff)
downloadqpid-python-233b73fc1586020e645f1ebc4ef380bdefbfc67b.tar.gz
QPID-2507: drop the userLock before calling exchange->route()
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@934561 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid')
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.cpp150
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.h21
2 files changed, 94 insertions, 77 deletions
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
index 18cd0cdfee..42c96d4641 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
@@ -342,7 +342,7 @@ void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severi
outBuffer.putRawData(sBuf);
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
- sendBuffer(outBuffer, outLen, mExchange,
+ sendBufferLH(outBuffer, outLen, mExchange,
"console.event.1.0." + event.getPackageName() + "." + event.getEventName());
QPID_LOG(trace, "SEND raiseEvent (v1) class=" << event.getPackageName() << "." << event.getEventName());
}
@@ -372,7 +372,7 @@ void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severi
string content;
MapCodec::encode(map_, content);
- sendBuffer(content, "", headers, v2Topic, key.str());
+ sendBufferLH(content, "", headers, "amqp/map", v2Topic, key.str());
QPID_LOG(trace, "SEND raiseEvent (v2) class=" << event.getPackageName() << "." << event.getEventName());
}
@@ -393,6 +393,8 @@ void ManagementAgent::Periodic::fire ()
void ManagementAgent::clientAdded (const std::string& routingKey)
{
+ sys::Mutex::ScopedLock lock(userLock);
+
if (routingKey.find("console") != 0)
return;
@@ -407,7 +409,7 @@ void ManagementAgent::clientAdded (const std::string& routingKey)
encodeHeader(outBuffer, 'x');
outLen = outBuffer.getPosition();
outBuffer.reset();
- sendBuffer(outBuffer, outLen, dExchange, aIter->second->routingKey);
+ sendBufferLH(outBuffer, outLen, dExchange, aIter->second->routingKey);
QPID_LOG(trace, "SEND ConsoleAddedIndication to=" << aIter->second->routingKey);
}
}
@@ -441,10 +443,11 @@ bool ManagementAgent::checkHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq)
return h1 == 'A' && h2 == 'M' && h3 == '2';
}
-void ManagementAgent::sendBuffer(Buffer& buf,
- uint32_t length,
- qpid::broker::Exchange::shared_ptr exchange,
- string routingKey)
+// NOTE WELL: assumes userLock is held by caller (LH)
+void ManagementAgent::sendBufferLH(Buffer& buf,
+ uint32_t length,
+ qpid::broker::Exchange::shared_ptr exchange,
+ string routingKey)
{
if (suppressed) {
QPID_LOG(trace, "Suppressed management message to " << routingKey);
@@ -477,18 +480,24 @@ void ManagementAgent::sendBuffer(Buffer& buf,
msg->getFrames().append(content);
- DeliverableMessage deliverable (msg);
- try {
- exchange->route(deliverable, routingKey, 0);
- } catch(exception&) {}
+ {
+ sys::Mutex::ScopedUnlock u(userLock);
+
+ DeliverableMessage deliverable (msg);
+ try {
+ exchange->route(deliverable, routingKey, 0);
+ } catch(exception&) {}
+ }
}
-void ManagementAgent::sendBuffer(const std::string& data,
- const std::string& cid,
- const Variant::Map& headers,
- qpid::broker::Exchange::shared_ptr exchange,
- const std::string& routingKey)
+// NOTE WELL: assumes userLock is held by caller (LH)
+void ManagementAgent::sendBufferLH(const std::string& data,
+ const std::string& cid,
+ const Variant::Map& headers,
+ const std::string& content_type,
+ qpid::broker::Exchange::shared_ptr exchange,
+ const std::string& routingKey)
{
Variant::Map::const_iterator i;
@@ -517,6 +526,7 @@ void ManagementAgent::sendBuffer(const std::string& data,
if (!cid.empty()) {
props->setCorrelationId(cid);
}
+ props->setContentType(content_type);
for (i = headers.begin(); i != headers.end(); ++i) {
msg->getOrInsertHeaders().setString(i->first, i->second.asString());
@@ -529,10 +539,14 @@ void ManagementAgent::sendBuffer(const std::string& data,
msg->getFrames().append(content);
- DeliverableMessage deliverable (msg);
- try {
- exchange->route(deliverable, routingKey, 0);
- } catch(exception&) {}
+ {
+ sys::Mutex::ScopedUnlock u(userLock);
+
+ DeliverableMessage deliverable (msg);
+ try {
+ exchange->route(deliverable, routingKey, 0);
+ } catch(exception&) {}
+ }
}
@@ -683,7 +697,7 @@ void ManagementAgent::periodicProcessing (void)
msgBuffer.reset();
stringstream key;
key << "console.obj.1.0." << baseObject->getPackageName() << "." << baseObject->getClassName();
- sendBuffer(msgBuffer, contentSize, mExchange, key.str());
+ sendBufferLH(msgBuffer, contentSize, mExchange, key.str());
QPID_LOG(trace, "SEND Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount);
}
}
@@ -701,7 +715,7 @@ void ManagementAgent::periodicProcessing (void)
headers["qmf.content"] = "_data";
headers["qmf.agent"] = name_address;
- sendBuffer(content, "", headers, v2Topic, key.str());
+ sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str());
QPID_LOG(trace, "SEND Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount);
}
}
@@ -732,7 +746,7 @@ void ManagementAgent::periodicProcessing (void)
msgBuffer.reset ();
stringstream key;
key << "console.obj.1.0." << (*cdIter)->getPackageName() << "." << (*cdIter)->getClassName();
- sendBuffer (msgBuffer, contentSize, mExchange, key.str());
+ sendBufferLH(msgBuffer, contentSize, mExchange, key.str());
QPID_LOG(trace, "SEND ContentInd for deleted object to=" << key.str());
}
@@ -760,7 +774,7 @@ void ManagementAgent::periodicProcessing (void)
string content;
ListCodec::encode(list_, content);
- sendBuffer(content, "", headers, v2Topic, key.str());
+ sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str());
QPID_LOG(trace, "SEND ContentInd for deleted object to=" << key.str());
}
}
@@ -784,7 +798,7 @@ void ManagementAgent::periodicProcessing (void)
contentSize = BUFSIZE - msgBuffer.available ();
msgBuffer.reset ();
routingKey = "console.heartbeat.1.0";
- sendBuffer (msgBuffer, contentSize, mExchange, routingKey);
+ sendBufferLH(msgBuffer, contentSize, mExchange, routingKey);
QPID_LOG(trace, "SEND HeartbeatInd to=" << routingKey);
}
@@ -804,7 +818,7 @@ void ManagementAgent::periodicProcessing (void)
string content;
MapCodec::encode(map, content);
- sendBuffer(content, "", headers, v2Topic, addr_key);
+ sendBufferLH(content, "", headers, "amqp/map", v2Topic, addr_key);
QPID_LOG(trace, "SENT AgentHeartbeat name=" << name_address);
}
@@ -834,7 +848,7 @@ void ManagementAgent::deleteObjectNowLH(const ObjectId& oid)
msgBuffer.reset();
stringstream key;
key << "console.obj.1.0." << object->getPackageName() << "." << object->getClassName();
- sendBuffer(msgBuffer, contentSize, mExchange, key.str());
+ sendBufferLH(msgBuffer, contentSize, mExchange, key.str());
QPID_LOG(trace, "SEND Immediate(delete) ContentInd to=" << key.str());
}
@@ -862,15 +876,15 @@ void ManagementAgent::deleteObjectNowLH(const ObjectId& oid)
string content;
ListCodec::encode(list_, content);
- sendBuffer(content, "", headers, v2Topic, key.str());
+ sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str());
QPID_LOG(trace, "SEND Immediate(delete) ContentInd to=" << key.str());
}
managementObjects.erase(oid);
}
-void ManagementAgent::sendCommandComplete (string replyToKey, uint32_t sequence,
- uint32_t code, string text)
+void ManagementAgent::sendCommandCompleteLH(string replyToKey, uint32_t sequence,
+ uint32_t code, string text)
{
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
@@ -880,7 +894,7 @@ void ManagementAgent::sendCommandComplete (string replyToKey, uint32_t sequence,
outBuffer.putShortString (text);
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
- sendBuffer (outBuffer, outLen, dExchange, replyToKey);
+ sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
QPID_LOG(trace, "SEND CommandCompleteInd code=" << code << " text=" << text << " to=" <<
replyToKey << " seq=" << sequence);
}
@@ -987,7 +1001,7 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey
outBuffer.putMediumString(i->second);
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
- sendBuffer(outBuffer, outLen, dExchange, replyToKey);
+ sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << sequence);
return;
}
@@ -1003,7 +1017,7 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey
outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN));
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
- sendBuffer(outBuffer, outLen, dExchange, replyToKey);
+ sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence);
return;
}
@@ -1035,7 +1049,7 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
- sendBuffer(outBuffer, outLen, dExchange, replyToKey);
+ sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
QPID_LOG(trace, "SEND MethodResponse (v1) to=" << replyToKey << " seq=" << sequence);
}
@@ -1064,7 +1078,7 @@ void ManagementAgent::handleMethodRequestLH (const std::string& body, string rep
(outMap["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID);
MapCodec::encode(outMap, content);
- sendBuffer(content, cid, headers, v2Direct, replyTo);
+ sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyTo);
QPID_LOG(trace, "SEND MethodResponse (invalid param) to=" << replyTo << " seq=" << cid);
return;
}
@@ -1087,7 +1101,7 @@ void ManagementAgent::handleMethodRequestLH (const std::string& body, string rep
(outMap["_values"].asMap())["_status_text"] = e.what();
MapCodec::encode(outMap, content);
- sendBuffer(content, cid, headers, v2Direct, replyTo);
+ sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyTo);
QPID_LOG(trace, "SEND MethodResponse (invalid format) to=" << replyTo << " seq=" << cid);
return;
}
@@ -1100,7 +1114,7 @@ void ManagementAgent::handleMethodRequestLH (const std::string& body, string rep
(outMap["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT);
MapCodec::encode(outMap, content);
- sendBuffer(content, cid, headers, v2Direct, replyTo);
+ sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyTo);
QPID_LOG(trace, "SEND MethodResponse (unknown object) to=" << replyTo << " seq=" << cid);
return;
}
@@ -1116,7 +1130,7 @@ void ManagementAgent::handleMethodRequestLH (const std::string& body, string rep
(outMap["_values"].asMap())["_status_text"] = i->second;
MapCodec::encode(outMap, content);
- sendBuffer(content, cid, headers, v2Direct, replyTo);
+ sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyTo);
QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << cid);
return;
}
@@ -1133,7 +1147,7 @@ void ManagementAgent::handleMethodRequestLH (const std::string& body, string rep
(outMap["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_FORBIDDEN);
MapCodec::encode(outMap, content);
- sendBuffer(content, cid, headers, v2Direct, replyTo);
+ sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyTo);
QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << cid);
return;
}
@@ -1146,6 +1160,7 @@ void ManagementAgent::handleMethodRequestLH (const std::string& body, string rep
methodName << " replyTo=" << replyTo);
try {
+ sys::Mutex::ScopedUnlock u(userLock);
iter->second->doMethod(methodName, inArgs, outMap);
} catch(exception& e) {
outMap.clear();
@@ -1154,13 +1169,13 @@ void ManagementAgent::handleMethodRequestLH (const std::string& body, string rep
(outMap["_values"].asMap())["_status_text"] = e.what();
MapCodec::encode(outMap, content);
- sendBuffer(content, cid, headers, v2Direct, replyTo);
+ sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyTo);
QPID_LOG(trace, "SEND MethodResponse (exception) to=" << replyTo << " seq=" << cid);
return;
}
MapCodec::encode(outMap, content);
- sendBuffer(content, cid, headers, v2Direct, replyTo);
+ sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyTo);
QPID_LOG(trace, "SEND MethodResponse (v2) to=" << replyTo << " seq=" << cid);
}
@@ -1177,7 +1192,7 @@ void ManagementAgent::handleBrokerRequestLH (Buffer&, string replyToKey, uint32_
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
- sendBuffer (outBuffer, outLen, dExchange, replyToKey);
+ sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
QPID_LOG(trace, "SEND BrokerResponse to=" << replyToKey);
}
@@ -1196,11 +1211,11 @@ void ManagementAgent::handlePackageQueryLH (Buffer&, string replyToKey, uint32_t
encodePackageIndication (outBuffer, pIter);
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
- sendBuffer (outBuffer, outLen, dExchange, replyToKey);
+ sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
QPID_LOG(trace, "SEND PackageInd package=" << (*pIter).first << " to=" << replyToKey << " seq=" << sequence);
}
- sendCommandComplete (replyToKey, sequence);
+ sendCommandCompleteLH(replyToKey, sequence);
}
void ManagementAgent::handlePackageIndLH (Buffer& inBuffer, string replyToKey, uint32_t sequence)
@@ -1239,13 +1254,13 @@ void ManagementAgent::handleClassQueryLH(Buffer& inBuffer, string replyToKey, ui
encodeClassIndication(outBuffer, pIter, cIter);
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
- sendBuffer(outBuffer, outLen, dExchange, replyToKey);
+ sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
QPID_LOG(trace, "SEND ClassInd class=" << (*pIter).first << ":" << (*cIter).first.name <<
"(" << Uuid((*cIter).first.hash) << ") to=" << replyToKey << " seq=" << sequence);
}
}
}
- sendCommandComplete(replyToKey, sequence);
+ sendCommandCompleteLH(replyToKey, sequence);
}
void ManagementAgent::handleClassIndLH (Buffer& inBuffer, string replyToKey, uint32_t)
@@ -1274,7 +1289,7 @@ void ManagementAgent::handleClassIndLH (Buffer& inBuffer, string replyToKey, uin
key.encode(outBuffer);
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
- sendBuffer (outBuffer, outLen, dExchange, replyToKey);
+ sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
QPID_LOG(trace, "SEND SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) <<
"), to=" << replyToKey << " seq=" << sequence);
@@ -1324,17 +1339,17 @@ void ManagementAgent::handleSchemaRequestLH(Buffer& inBuffer, string replyToKey,
classInfo.appendSchema(outBuffer);
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
- sendBuffer(outBuffer, outLen, dExchange, replyToKey);
+ sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
QPID_LOG(trace, "SEND SchemaResponse to=" << replyToKey << " seq=" << sequence);
}
else
- sendCommandComplete(replyToKey, sequence, 1, "Schema not available");
+ sendCommandCompleteLH(replyToKey, sequence, 1, "Schema not available");
}
else
- sendCommandComplete(replyToKey, sequence, 1, "Class key not found");
+ sendCommandCompleteLH(replyToKey, sequence, 1, "Class key not found");
}
else
- sendCommandComplete(replyToKey, sequence, 1, "Package not found");
+ sendCommandCompleteLH(replyToKey, sequence, 1, "Package not found");
}
void ManagementAgent::handleSchemaResponseLH(Buffer& inBuffer, string /*replyToKey*/, uint32_t sequence)
@@ -1371,7 +1386,7 @@ void ManagementAgent::handleSchemaResponseLH(Buffer& inBuffer, string /*replyToK
encodeClassIndication(outBuffer, pIter, cIter);
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
- sendBuffer(outBuffer, outLen, mExchange, "schema.class");
+ sendBufferLH(outBuffer, outLen, mExchange, "schema.class");
QPID_LOG(trace, "SEND ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" <<
" to=schema.class");
}
@@ -1448,7 +1463,7 @@ void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, string replyToKey
RemoteAgentMap::iterator aIter = remoteAgents.find(connectionRef);
if (aIter != remoteAgents.end()) {
// There already exists an agent on this session. Reject the request.
- sendCommandComplete(replyToKey, sequence, 1, "Connection already has remote agent");
+ sendCommandCompleteLH(replyToKey, sequence, 1, "Connection already has remote agent");
return;
}
@@ -1488,7 +1503,7 @@ void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, string replyToKey
outBuffer.putLong (assignedBank);
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
- sendBuffer (outBuffer, outLen, dExchange, replyToKey);
+ sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
QPID_LOG(trace, "SEND AttachResponse brokerBank=" << brokerBank << " agentBank=" << assignedBank <<
" to=" << replyToKey << " seq=" << sequence);
}
@@ -1530,11 +1545,11 @@ void ManagementAgent::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uin
outBuffer.putRawData(sBuf);
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
- sendBuffer(outBuffer, outLen, dExchange, replyToKey);
+ sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
QPID_LOG(trace, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence);
}
}
- sendCommandComplete(replyToKey, sequence);
+ sendCommandCompleteLH(replyToKey, sequence);
return;
}
@@ -1561,13 +1576,13 @@ void ManagementAgent::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uin
outBuffer.putRawData(sBuf);
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
- sendBuffer(outBuffer, outLen, dExchange, replyToKey);
+ sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
QPID_LOG(trace, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence);
}
}
}
- sendCommandComplete(replyToKey, sequence);
+ sendCommandCompleteLH(replyToKey, sequence);
}
@@ -1636,7 +1651,7 @@ void ManagementAgent::handleGetQueryLH(const std::string& body, std::string repl
list_.push_back(map_);
ListCodec::encode(list_, content);
- sendBuffer(content, cid, headers, v2Direct, replyTo);
+ sendBufferLH(content, cid, headers, "amqp/list", v2Direct, replyTo);
}
}
}
@@ -1659,7 +1674,7 @@ void ManagementAgent::handleGetQueryLH(const std::string& body, std::string repl
list_.push_back(map_);
ListCodec::encode(list_, content);
- sendBuffer(content, cid, headers, v2Direct, replyTo);
+ sendBufferLH(content, cid, headers, "amqp/list", v2Direct, replyTo);
}
}
}
@@ -1669,7 +1684,7 @@ void ManagementAgent::handleGetQueryLH(const std::string& body, std::string repl
list_.clear();
headers.erase("partial");
ListCodec::encode(list_, content);
- sendBuffer(content, cid, headers, v2Direct, replyTo);
+ sendBufferLH(content, cid, headers, "amqp/list", v2Direct, replyTo);
QPID_LOG(trace, "SEND GetResponse (v2) to=" << replyTo << " seq=" << cid);
}
@@ -1692,7 +1707,7 @@ void ManagementAgent::handleLocateRequestLH(const string&, const string& replyTo
string content;
MapCodec::encode(map, content);
- sendBuffer(content, cid, headers, v2Direct, replyTo);
+ sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyTo);
QPID_LOG(trace, "SENT AgentLocateResponse replyTo=" << replyTo);
}
@@ -1832,7 +1847,7 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg)
string content;
MapCodec::encode(outMap, content);
- sendBuffer(content, cid, headers, v2Direct, replyToKey);
+ sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyToKey);
} else {
@@ -1844,7 +1859,7 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg)
outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN));
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
- sendBuffer(outBuffer, outLen, dExchange, replyToKey);
+ sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
}
QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence);
@@ -1947,10 +1962,10 @@ ManagementAgent::PackageMap::iterator ManagementAgent::findOrAddPackageLH(string
encodePackageIndication (outBuffer, result.first);
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
- sendBuffer (outBuffer, outLen, mExchange, "schema.package");
- QPID_LOG(trace, "SEND PackageInd package=" << name << " to=schema.package")
+ sendBufferLH(outBuffer, outLen, mExchange, "schema.package");
+ QPID_LOG(trace, "SEND PackageInd package=" << name << " to=schema.package");
- return result.first;
+ return result.first;
}
void ManagementAgent::addClassLH(uint8_t kind,
@@ -2312,6 +2327,7 @@ void ManagementAgent::importAgents(qpid::framing::Buffer& inBuf) {
Variant::List content;
ListCodec::decode(buf, content);
Variant::List::const_iterator l;
+ sys::Mutex::ScopedLock lock(userLock);
for (l = content.begin(); l != content.end(); l++) {
std::auto_ptr<RemoteAgent> agent(new RemoteAgent(*this));
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h
index 0250f39dd6..2366446fe0 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.h
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.h
@@ -301,15 +301,16 @@ private:
void deleteObjectNowLH(const ObjectId& oid);
void encodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0);
bool checkHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
- void sendBuffer (framing::Buffer& buf,
- uint32_t length,
- qpid::broker::Exchange::shared_ptr exchange,
- std::string routingKey);
- void sendBuffer(const std::string& data,
- const std::string& cid,
- const qpid::types::Variant::Map& headers,
- qpid::broker::Exchange::shared_ptr exchange,
- const std::string& routingKey);
+ void sendBufferLH(framing::Buffer& buf,
+ uint32_t length,
+ qpid::broker::Exchange::shared_ptr exchange,
+ std::string routingKey);
+ void sendBufferLH(const std::string& data,
+ const std::string& cid,
+ const qpid::types::Variant::Map& headers,
+ const std::string& content_type,
+ qpid::broker::Exchange::shared_ptr exchange,
+ const std::string& routingKey);
void moveNewObjectsLH();
bool authorizeAgentMessageLH(qpid::broker::Message& msg);
@@ -330,7 +331,7 @@ private:
uint32_t allocateNewBank ();
uint32_t assignBankLH (uint32_t requestedPrefix);
void deleteOrphanedAgentsLH();
- void sendCommandComplete (std::string replyToKey, uint32_t sequence,
+ void sendCommandCompleteLH(std::string replyToKey, uint32_t sequence,
uint32_t code = 0, std::string text = std::string("OK"));
void handleBrokerRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
void handlePackageQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);