summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/management/ManagementAgent.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/management/ManagementAgent.cpp')
-rw-r--r--cpp/src/qpid/management/ManagementAgent.cpp1110
1 files changed, 447 insertions, 663 deletions
diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp
index 474c86ed48..86e9d0be8d 100644
--- a/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/cpp/src/qpid/management/ManagementAgent.cpp
@@ -28,12 +28,15 @@
#include "qpid/management/ManagementObject.h"
#include "qpid/broker/DeliverableMessage.h"
#include "qpid/log/Statement.h"
-#include <qpid/broker/Message.h>
+#include "qpid/broker/Message.h"
+#include "qpid/broker/Broker.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/framing/FieldValue.h"
#include "qpid/broker/amqp_0_10/MessageTransfer.h"
#include "qpid/sys/Time.h"
+#include "qpid/sys/Timer.h"
#include "qpid/sys/Thread.h"
+#include "qpid/sys/PollableQueue.h"
#include "qpid/broker/ConnectionState.h"
#include "qpid/broker/AclModule.h"
#include "qpid/types/Variant.h"
@@ -46,6 +49,9 @@
#include <sstream>
#include <typeinfo>
+#include <boost/bind.hpp>
+#include <boost/function.hpp>
+
namespace qpid {
namespace management {
@@ -62,22 +68,23 @@ namespace _qmf = qmf::org::apache::qpid::broker;
namespace {
- const string defaultVendorName("vendor");
- const string defaultProductName("product");
+const size_t qmfV1BufferSize(65536);
+const string defaultVendorName("vendor");
+const string defaultProductName("product");
- // Create a valid binding key substring by
- // replacing all '.' chars with '_'
- const string keyifyNameStr(const string& name)
- {
- string n2 = name;
+// Create a valid binding key substring by
+// replacing all '.' chars with '_'
+const string keyifyNameStr(const string& name)
+{
+ string n2 = name;
- size_t pos = n2.find('.');
- while (pos != n2.npos) {
- n2.replace(pos, 1, "_");
- pos = n2.find('.', pos);
- }
- return n2;
+ size_t pos = n2.find('.');
+ while (pos != n2.npos) {
+ n2.replace(pos, 1, "_");
+ pos = n2.find('.', pos);
}
+ return n2;
+}
struct ScopedManagementContext
{
@@ -90,6 +97,32 @@ struct ScopedManagementContext
setManagementExecutionContext(0);
}
};
+
+typedef boost::function0<void> FireFunction;
+struct Periodic : public qpid::sys::TimerTask
+{
+ FireFunction fireFunction;
+ qpid::sys::Timer* timer;
+
+ Periodic (FireFunction f, qpid::sys::Timer* t, uint32_t seconds);
+ virtual ~Periodic ();
+ void fire ();
+};
+
+Periodic::Periodic (FireFunction f, qpid::sys::Timer* t, uint32_t _seconds)
+ : TimerTask(sys::Duration((_seconds ? _seconds : 1) * sys::TIME_SEC),
+ "ManagementAgent::periodicProcessing"),
+ fireFunction(f), timer(t) {}
+
+Periodic::~Periodic() {}
+
+void Periodic::fire()
+{
+ setupNextFire();
+ timer->add(this);
+ fireFunction();
+}
+
}
@@ -113,9 +146,8 @@ ManagementAgent::RemoteAgent::~RemoteAgent ()
QPID_LOG(debug, "Remote Agent removed bank=[" << brokerBank << "." << agentBank << "]");
if (mgmtObject != 0) {
mgmtObject->resourceDestroy();
- agent.deleteObjectNowLH(mgmtObject->getObjectId());
- delete mgmtObject;
- mgmtObject = 0;
+ agent.deleteObjectNow(mgmtObject->getObjectId());
+ mgmtObject.reset();
}
}
@@ -124,8 +156,7 @@ ManagementAgent::ManagementAgent (const bool qmfV1, const bool qmfV2) :
startTime(sys::now()),
suppressed(false), disallowAllV1Methods(false),
vendorNameKey(defaultVendorName), productNameKey(defaultProductName),
- qmf1Support(qmfV1), qmf2Support(qmfV2), maxReplyObjs(100),
- msgBuffer(MA_BUFFER_SIZE), memstat(0)
+ qmf1Support(qmfV1), qmf2Support(qmfV2), maxReplyObjs(100)
{
nextObjectId = 1;
brokerBank = 1;
@@ -136,7 +167,7 @@ ManagementAgent::ManagementAgent (const bool qmfV1, const bool qmfV2) :
attrMap["_vendor"] = defaultVendorName;
attrMap["_product"] = defaultProductName;
- memstat = new qmf::org::apache::qpid::broker::Memory(this, 0, "amqp-broker");
+ memstat = _qmf::Memory::shared_ptr(new qmf::org::apache::qpid::broker::Memory(this, 0, "amqp-broker"));
addObject(memstat, "amqp-broker");
}
@@ -155,15 +186,6 @@ ManagementAgent::~ManagementAgent ()
v2Direct.reset();
remoteAgents.clear();
-
- moveNewObjectsLH();
- for (ManagementObjectMap::iterator iter = managementObjects.begin ();
- iter != managementObjects.end ();
- iter++) {
- ManagementObject* object = iter->second;
- delete object;
- }
- managementObjects.clear();
}
}
@@ -176,6 +198,11 @@ void ManagementAgent::configure(const string& _dataDir, bool _publish, uint16_t
broker = _broker;
threadPoolSize = _threads;
ManagementObject::maxThreads = threadPoolSize;
+ sendQueue.reset(
+ new EventQueue(boost::bind(&ManagementAgent::sendEvents, this, _1), broker->getPoller()));
+ sendQueue->start();
+ timer = &broker->getTimer();
+ timer->add(new Periodic(boost::bind(&ManagementAgent::periodicProcessing, this), timer, interval));
// Get from file or generate and save to file.
if (dataDir.empty())
@@ -218,13 +245,6 @@ void ManagementAgent::configure(const string& _dataDir, bool _publish, uint16_t
}
}
-void ManagementAgent::pluginsInitialized() {
- // Do this here so cluster plugin has the chance to set up the timer.
- timer = &broker->getClusterTimer();
- timer->add(new Periodic(*this, interval));
-}
-
-
void ManagementAgent::setName(const string& vendor, const string& product, const string& instance)
{
if (vendor.find(':') != vendor.npos) {
@@ -245,13 +265,13 @@ void ManagementAgent::setName(const string& vendor, const string& product, const
} else
inst = instance;
- name_address = vendor + ":" + product + ":" + inst;
- attrMap["_instance"] = inst;
- attrMap["_name"] = name_address;
+ name_address = vendor + ":" + product + ":" + inst;
+ attrMap["_instance"] = inst;
+ attrMap["_name"] = name_address;
- vendorNameKey = keyifyNameStr(vendor);
- productNameKey = keyifyNameStr(product);
- instanceNameKey = keyifyNameStr(inst);
+ vendorNameKey = keyifyNameStr(vendor);
+ productNameKey = keyifyNameStr(product);
+ instanceNameKey = keyifyNameStr(inst);
}
@@ -316,11 +336,12 @@ void ManagementAgent::registerEvent (const string& packageName,
}
// Deprecated: V1 objects
-ObjectId ManagementAgent::addObject(ManagementObject* object, uint64_t persistId, bool persistent)
+ObjectId ManagementAgent::addObject(ManagementObject::shared_ptr object, uint64_t persistId, bool persistent)
{
uint16_t sequence;
uint64_t objectNum;
+ sys::Mutex::ScopedLock lock(addLock);
sequence = persistent ? 0 : bootSequence;
objectNum = persistId ? persistId : nextObjectId++;
@@ -329,17 +350,14 @@ ObjectId ManagementAgent::addObject(ManagementObject* object, uint64_t persistId
object->setObjectId(objId);
- {
- sys::Mutex::ScopedLock lock(addLock);
- newManagementObjects.push_back(object);
- }
+ newManagementObjects.push_back(object);
QPID_LOG(debug, "Management object (V1) added: " << objId.getV2Key());
return objId;
}
-ObjectId ManagementAgent::addObject(ManagementObject* object,
+ObjectId ManagementAgent::addObject(ManagementObject::shared_ptr object,
const string& key,
bool persistent)
{
@@ -369,12 +387,11 @@ void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severi
"emerg", "alert", "crit", "error", "warn",
"note", "info", "debug"
};
- sys::Mutex::ScopedLock lock (userLock);
uint8_t sev = (severity == SEV_DEFAULT) ? event.getSeverity() : (uint8_t) severity;
if (qmf1Support) {
- Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
+ char buffer[qmfV1BufferSize];
+ Buffer outBuffer(buffer, qmfV1BufferSize);
encodeHeader(outBuffer, 'e');
outBuffer.putShortString(event.getPackageName());
@@ -385,9 +402,7 @@ void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severi
string sBuf;
event.encode(sBuf);
outBuffer.putRawData(sBuf);
- outLen = MA_BUFFER_SIZE - outBuffer.available();
- outBuffer.reset();
- sendBufferLH(outBuffer, outLen, mExchange,
+ sendBuffer(outBuffer, mExchange,
"console.event.1.0." + event.getPackageName() + "." + event.getEventName());
QPID_LOG(debug, "SEND raiseEvent (v1) class=" << event.getPackageName() << "." << event.getEventName());
}
@@ -426,25 +441,11 @@ void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severi
Variant::List list_;
list_.push_back(map_);
ListCodec::encode(list_, content);
- sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str());
+ sendBuffer(content, "", headers, "amqp/list", v2Topic, key.str());
QPID_LOG(debug, "SEND raiseEvent (v2) class=" << event.getPackageName() << "." << event.getEventName());
}
}
-ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds)
- : TimerTask(sys::Duration((_seconds ? _seconds : 1) * sys::TIME_SEC),
- "ManagementAgent::periodicProcessing"),
- agent(_agent) {}
-
-ManagementAgent::Periodic::~Periodic() {}
-
-void ManagementAgent::Periodic::fire()
-{
- setupNextFire();
- agent.timer->add(this);
- agent.periodicProcessing();
-}
-
void ManagementAgent::clientAdded (const string& routingKey)
{
sys::Mutex::ScopedLock lock(userLock);
@@ -480,28 +481,14 @@ void ManagementAgent::clientAdded (const string& routingKey)
while (rkeys.size()) {
char localBuffer[16];
Buffer outBuffer(localBuffer, 16);
- uint32_t outLen;
encodeHeader(outBuffer, 'x');
- outLen = outBuffer.getPosition();
- outBuffer.reset();
- sendBufferLH(outBuffer, outLen, dExchange, rkeys.front());
+ sendBuffer(outBuffer, dExchange, rkeys.front());
QPID_LOG(debug, "SEND ConsoleAddedIndication to=" << rkeys.front());
rkeys.pop_front();
}
}
-void ManagementAgent::clusterUpdate() {
- // Called on all cluster memebers when a new member joins a cluster.
- // Set clientWasAdded so that on the next periodicProcessing we will do
- // a full update on all cluster members.
- sys::Mutex::ScopedLock l(userLock);
- moveNewObjectsLH(); // keep lists consistent with updater/updatee.
- moveDeletedObjectsLH();
- clientWasAdded = true;
- debugSnapshot("Cluster member joined");
-}
-
void ManagementAgent::encodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq)
{
buf.putOctet ('A');
@@ -523,12 +510,9 @@ bool ManagementAgent::checkHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq)
return h1 == 'A' && h2 == 'M' && h3 == '2';
}
-// NOTE WELL: assumes userLock is held by caller (LH)
-// NOTE EVEN WELLER: drops this lock when delivering the message!!!
-void ManagementAgent::sendBufferLH(Buffer& buf,
- uint32_t length,
- qpid::broker::Exchange::shared_ptr exchange,
- const string& routingKey)
+void ManagementAgent::sendBuffer(Buffer& buf,
+ qpid::broker::Exchange::shared_ptr exchange,
+ const string& routingKey)
{
if (suppressed) {
QPID_LOG(debug, "Suppressing management message to " << routingKey);
@@ -541,6 +525,8 @@ void ManagementAgent::sendBufferLH(Buffer& buf,
AMQFrame header((AMQHeaderBody()));
AMQFrame content((AMQContentBody()));
+ size_t length = buf.getPosition();
+ buf.reset();
content.castBody<AMQContentBody>()->decode(buf, length);
method.setEof(false);
@@ -560,42 +546,30 @@ void ManagementAgent::sendBufferLH(Buffer& buf,
dp->setRoutingKey(routingKey);
transfer->getFrames().append(content);
-
Message msg(transfer, transfer);
msg.setIsManagementMessage(true);
-
- {
- sys::Mutex::ScopedUnlock u(userLock);
-
- DeliverableMessage deliverable (msg, 0);
- try {
- exchange->route(deliverable);
- } catch(exception&) {}
- }
+ sendQueue->push(make_pair(exchange, msg));
buf.reset();
}
-void ManagementAgent::sendBufferLH(Buffer& buf,
- uint32_t length,
- const string& exchange,
- const string& routingKey)
+void ManagementAgent::sendBuffer(Buffer& buf,
+ const string& exchange,
+ const string& routingKey)
{
qpid::broker::Exchange::shared_ptr ex(broker->getExchanges().get(exchange));
if (ex.get() != 0)
- sendBufferLH(buf, length, ex, routingKey);
+ sendBuffer(buf, ex, routingKey);
}
-// NOTE WELL: assumes userLock is held by caller (LH)
-// NOTE EVEN WELLER: drops this lock when delivering the message!!!
-void ManagementAgent::sendBufferLH(const string& data,
- const string& cid,
- const Variant::Map& headers,
- const string& content_type,
- qpid::broker::Exchange::shared_ptr exchange,
- const string& routingKey,
- uint64_t ttl_msec)
+void ManagementAgent::sendBuffer(const string& data,
+ const string& cid,
+ const Variant::Map& headers,
+ const string& content_type,
+ qpid::broker::Exchange::shared_ptr exchange,
+ const string& routingKey,
+ uint64_t ttl_msec)
{
Variant::Map::const_iterator i;
@@ -643,34 +617,27 @@ void ManagementAgent::sendBufferLH(const string& data,
msg.setIsManagementMessage(true);
msg.computeExpiration(broker->getExpiryPolicy());
- {
- sys::Mutex::ScopedUnlock u(userLock);
-
- DeliverableMessage deliverable (msg, 0);
- try {
- exchange->route(deliverable);
- } catch(exception&) {}
- }
+ sendQueue->push(make_pair(exchange, msg));
}
-void ManagementAgent::sendBufferLH(const string& data,
- const string& cid,
- const Variant::Map& headers,
- const string& content_type,
- const string& exchange,
- const string& routingKey,
- uint64_t ttl_msec)
+void ManagementAgent::sendBuffer(const string& data,
+ const string& cid,
+ const Variant::Map& headers,
+ const string& content_type,
+ const string& exchange,
+ const string& routingKey,
+ uint64_t ttl_msec)
{
qpid::broker::Exchange::shared_ptr ex(broker->getExchanges().get(exchange));
if (ex.get() != 0)
- sendBufferLH(data, cid, headers, content_type, ex, routingKey, ttl_msec);
+ sendBuffer(data, cid, headers, content_type, ex, routingKey, ttl_msec);
}
/** Objects that have been added since the last periodic poll are temporarily
* saved in the newManagementObjects list. This allows objects to be
- * added without needing to block on the userLock (addLock is used instead).
+ * added without needing to block on the userLock (objectLock is used instead).
* These new objects need to be integrated into the object database
* (managementObjects) *before* they can be properly managed. This routine
* performs the integration.
@@ -680,34 +647,33 @@ void ManagementAgent::sendBufferLH(const string& data,
* duplicate object ids. To avoid clashes, don't put deleted objects
* into the active object database.
*/
-void ManagementAgent::moveNewObjectsLH()
+void ManagementAgent::moveNewObjects()
{
- sys::Mutex::ScopedLock lock (addLock);
+ sys::Mutex::ScopedLock lock(addLock);
+ sys::Mutex::ScopedLock objLock (objectLock);
while (!newManagementObjects.empty()) {
- ManagementObject *object = newManagementObjects.back();
+ ManagementObject::shared_ptr object = newManagementObjects.back();
newManagementObjects.pop_back();
if (object->isDeleted()) {
DeletedObject::shared_ptr dptr(new DeletedObject(object, qmf1Support, qmf2Support));
pendingDeletedObjs[dptr->getKey()].push_back(dptr);
- delete object;
} else { // add to active object list, check for duplicates.
ObjectId oid = object->getObjectId();
ManagementObjectMap::iterator destIter = managementObjects.find(oid);
if (destIter != managementObjects.end()) {
// duplicate found. It is OK if the old object has been marked
// deleted, just replace the old with the new.
- ManagementObject *oldObj = destIter->second;
- if (oldObj->isDeleted()) {
- DeletedObject::shared_ptr dptr(new DeletedObject(oldObj, qmf1Support, qmf2Support));
- pendingDeletedObjs[dptr->getKey()].push_back(dptr);
- delete oldObj;
- } else {
+ ManagementObject::shared_ptr oldObj = destIter->second;
+ if (!oldObj->isDeleted()) {
// Duplicate non-deleted objects? This is a user error - oids must be unique.
// for now, leak the old object (safer than deleting - may still be referenced)
// and complain loudly...
QPID_LOG(error, "Detected two management objects with the same identifier: " << oid);
+ oldObj->resourceDestroy();
}
+ DeletedObject::shared_ptr dptr(new DeletedObject(oldObj, qmf1Support, qmf2Support));
+ pendingDeletedObjs[dptr->getKey()].push_back(dptr);
// QPID-3666: be sure to replace the -index- also, as non-key members of
// the index object may be different for the new object! So erase the
// entry, rather than []= assign here:
@@ -720,32 +686,41 @@ void ManagementAgent::moveNewObjectsLH()
void ManagementAgent::periodicProcessing (void)
{
-#define BUFSIZE 65536
#define HEADROOM 4096
debugSnapshot("Management agent periodic processing");
sys::Mutex::ScopedLock lock (userLock);
- uint32_t contentSize;
string routingKey;
string sBuf;
- moveNewObjectsLH();
+ moveNewObjects();
//
// If we're publishing updates, get the latest memory statistics and uptime now
//
if (publish) {
uint64_t uptime = sys::Duration(startTime, sys::now());
- static_cast<_qmf::Broker*>(broker->GetManagementObject())->set_uptime(uptime);
- qpid::sys::MemStat::loadMemInfo(memstat);
+ boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject())->set_uptime(uptime);
+ qpid::sys::MemStat::loadMemInfo(memstat.get());
+ }
+
+ //
+ // Use a copy of the management object map to avoid holding the objectLock
+ //
+ ManagementObjectVector localManagementObjects;
+ {
+ sys::Mutex::ScopedLock objLock(objectLock);
+ std::transform(managementObjects.begin(), managementObjects.end(),
+ std::back_inserter(localManagementObjects),
+ boost::bind(&ManagementObjectMap::value_type::second, _1));
}
//
// Clear the been-here flag on all objects in the map.
//
- for (ManagementObjectMap::iterator iter = managementObjects.begin();
- iter != managementObjects.end();
+ for (ManagementObjectVector::iterator iter = localManagementObjects.begin();
+ iter != localManagementObjects.end();
iter++) {
- ManagementObject* object = iter->second;
+ ManagementObject::shared_ptr object = *iter;
object->setFlags(0);
if (clientWasAdded) {
object->setForcePublish(true);
@@ -760,22 +735,25 @@ void ManagementAgent::periodicProcessing (void)
// if we sent the active update first, _then_ the delete update, clients
// would incorrectly think the object was deleted. See QPID-2997
//
- bool objectsDeleted = moveDeletedObjectsLH();
+ bool objectsDeleted = moveDeletedObjects();
+ PendingDeletedObjsMap localPendingDeletedObjs;
+ {
+ sys::Mutex::ScopedLock objLock(objectLock);
+ localPendingDeletedObjs.swap(pendingDeletedObjs);
+ }
//
// If we are not publishing updates, just clear the pending deletes. There's no
// need to tell anybody.
//
if (!publish)
- pendingDeletedObjs.clear();
-
- if (!pendingDeletedObjs.empty()) {
- // use a temporary copy of the pending deletes so dropping the lock when
- // the buffer is sent is safe.
- PendingDeletedObjsMap tmp(pendingDeletedObjs);
- pendingDeletedObjs.clear();
+ localPendingDeletedObjs.clear();
- for (PendingDeletedObjsMap::iterator mIter = tmp.begin(); mIter != tmp.end(); mIter++) {
+ ResizableBuffer msgBuffer(qmfV1BufferSize);
+ if (!localPendingDeletedObjs.empty()) {
+ for (PendingDeletedObjsMap::iterator mIter = localPendingDeletedObjs.begin();
+ mIter != localPendingDeletedObjs.end();
+ mIter++) {
std::string packageName;
std::string className;
msgBuffer.reset();
@@ -807,11 +785,10 @@ void ManagementAgent::periodicProcessing (void)
}
if (v1Objs >= maxReplyObjs) {
v1Objs = 0;
- contentSize = msgBuffer.getSize();
stringstream key;
key << "console.obj.1.0." << packageName << "." << className;
- msgBuffer.reset();
- sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); // UNLOCKS USERLOCK
+ size_t contentSize = msgBuffer.getPosition();
+ sendBuffer(msgBuffer, mExchange, key.str());
QPID_LOG(debug, "SEND V1 Multicast ContentInd V1 (delete) to="
<< key.str() << " len=" << contentSize);
}
@@ -840,7 +817,7 @@ void ManagementAgent::periodicProcessing (void)
headers["qmf.content"] = "_data";
headers["qmf.agent"] = name_address;
- sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); // UNLOCKS USERLOCK
+ sendBuffer(content, "", headers, "amqp/list", v2Topic, key.str(), 0);
QPID_LOG(debug, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length());
}
}
@@ -850,11 +827,10 @@ void ManagementAgent::periodicProcessing (void)
// send any remaining objects...
if (v1Objs) {
- contentSize = BUFSIZE - msgBuffer.available();
stringstream key;
key << "console.obj.1.0." << packageName << "." << className;
- msgBuffer.reset();
- sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); // UNLOCKS USERLOCK
+ size_t contentSize = msgBuffer.getPosition();
+ sendBuffer(msgBuffer, mExchange, key.str());
QPID_LOG(debug, "SEND V1 Multicast ContentInd V1 (delete) to=" << key.str() << " len=" << contentSize);
}
@@ -877,7 +853,7 @@ void ManagementAgent::periodicProcessing (void)
headers["qmf.content"] = "_data";
headers["qmf.agent"] = name_address;
- sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); // UNLOCKS USERLOCK
+ sendBuffer(content, "", headers, "amqp/list", v2Topic, key.str(), 0);
QPID_LOG(debug, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length());
}
}
@@ -885,9 +861,7 @@ void ManagementAgent::periodicProcessing (void)
}
//
- // Process the entire object map. Remember: we drop the userLock each time we call
- // sendBuffer(). This allows the managementObjects map to be altered during the
- // sendBuffer() call, so always restart the search after a sendBuffer() call
+ // Process the entire object map.
//
// If publish is disabled, don't send any updates.
//
@@ -897,14 +871,14 @@ void ManagementAgent::periodicProcessing (void)
uint32_t pcount;
uint32_t scount;
uint32_t v1Objs, v2Objs;
- ManagementObjectMap::iterator baseIter;
+ ManagementObjectVector::iterator baseIter;
std::string packageName;
std::string className;
- for (baseIter = managementObjects.begin();
- baseIter != managementObjects.end();
+ for (baseIter = localManagementObjects.begin();
+ baseIter != localManagementObjects.end();
baseIter++) {
- ManagementObject* baseObject = baseIter->second;
+ ManagementObject::shared_ptr baseObject = *baseIter;
//
// Skip until we find a base object requiring processing...
//
@@ -915,7 +889,7 @@ void ManagementAgent::periodicProcessing (void)
}
}
- if (baseIter == managementObjects.end())
+ if (baseIter == localManagementObjects.end())
break; // done - all objects processed
pcount = scount = 0;
@@ -924,12 +898,12 @@ void ManagementAgent::periodicProcessing (void)
list_.clear();
msgBuffer.reset();
- for (ManagementObjectMap::iterator iter = baseIter;
- iter != managementObjects.end();
+ for (ManagementObjectVector::iterator iter = baseIter;
+ iter != localManagementObjects.end();
iter++) {
msgBuffer.makeAvailable(HEADROOM); // Make sure there's buffer space
- ManagementObject* baseObject = baseIter->second;
- ManagementObject* object = iter->second;
+ ManagementObject::shared_ptr baseObject = *baseIter;
+ ManagementObject::shared_ptr object = *iter;
bool send_stats, send_props;
if (baseObject->isSameClass(*object) && object->getFlags() == 0) {
object->setFlags(1);
@@ -1004,12 +978,11 @@ void ManagementAgent::periodicProcessing (void)
if (pcount || scount) {
if (qmf1Support) {
- contentSize = BUFSIZE - msgBuffer.available();
- if (contentSize > 0) {
+ if (msgBuffer.getPosition() > 0) {
stringstream key;
key << "console.obj.1.0." << packageName << "." << className;
- msgBuffer.reset();
- sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); // UNLOCKS USERLOCK
+ size_t contentSize = msgBuffer.getPosition();
+ sendBuffer(msgBuffer, mExchange, key.str());
QPID_LOG(debug, "SEND V1 Multicast ContentInd to=" << key.str()
<< " props=" << pcount
<< " stats=" << scount
@@ -1035,7 +1008,7 @@ void ManagementAgent::periodicProcessing (void)
headers["qmf.content"] = "_data";
headers["qmf.agent"] = name_address;
- sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); // UNLOCKS USERLOCK
+ sendBuffer(content, "", headers, "amqp/list", v2Topic, key.str(), 0);
QPID_LOG(debug, "SEND Multicast ContentInd to=" << key.str()
<< " props=" << pcount
<< " stats=" << scount
@@ -1045,21 +1018,21 @@ void ManagementAgent::periodicProcessing (void)
}
} // end processing updates for all objects
- if (objectsDeleted) deleteOrphanedAgentsLH();
+ if (objectsDeleted) {
+ sys::Mutex::ScopedLock lock (userLock);
+ deleteOrphanedAgentsLH();
+ }
// heartbeat generation. Note that heartbeats need to be sent even if publish is disabled.
if (qmf1Support) {
- uint32_t contentSize;
- char msgChars[BUFSIZE];
- Buffer msgBuffer(msgChars, BUFSIZE);
+ char msgChars[qmfV1BufferSize];
+ Buffer msgBuffer(msgChars, qmfV1BufferSize);
encodeHeader(msgBuffer, 'h');
msgBuffer.putLongLong(uint64_t(sys::Duration(sys::EPOCH, sys::now())));
- contentSize = BUFSIZE - msgBuffer.available ();
- msgBuffer.reset ();
routingKey = "console.heartbeat.1.0";
- sendBufferLH(msgBuffer, contentSize, mExchange, routingKey);
+ sendBuffer(msgBuffer, mExchange, routingKey);
QPID_LOG(debug, "SEND HeartbeatInd to=" << routingKey);
}
@@ -1087,23 +1060,26 @@ void ManagementAgent::periodicProcessing (void)
// Set TTL (in msecs) on outgoing heartbeat indications based on the interval
// time to prevent stale heartbeats from getting to the consoles.
- sendBufferLH(content, "", headers, "amqp/map", v2Topic, addr_key.str(), interval * 2 * 1000);
+ sendBuffer(content, "", headers, "amqp/map", v2Topic, addr_key.str(), interval * 2 * 1000);
QPID_LOG(debug, "SENT AgentHeartbeat name=" << name_address);
}
}
-void ManagementAgent::deleteObjectNowLH(const ObjectId& oid)
+void ManagementAgent::deleteObjectNow(const ObjectId& oid)
{
- ManagementObjectMap::iterator iter = managementObjects.find(oid);
- if (iter == managementObjects.end())
- return;
- ManagementObject* object = iter->second;
- if (!object->isDeleted())
- return;
+ ManagementObject::shared_ptr object;
+ {
+ sys::Mutex::ScopedLock lock(objectLock);
+ ManagementObjectMap::iterator iter = managementObjects.find(oid);
+ if (iter == managementObjects.end())
+ return;
+ object = iter->second;
+ if (!object->isDeleted())
+ return;
+ managementObjects.erase(oid);
+ }
- // since sendBufferLH drops the userLock, don't call it until we
- // are done manipulating the object.
#define DNOW_BUFSIZE 2048
char msgChars[DNOW_BUFSIZE];
Buffer msgBuffer(msgChars, DNOW_BUFSIZE);
@@ -1139,15 +1115,12 @@ void ManagementAgent::deleteObjectNowLH(const ObjectId& oid)
v2key << "." << instanceNameKey;
}
- object = 0;
- managementObjects.erase(oid);
+ object.reset();
// object deleted, ok to drop lock now.
if (publish && qmf1Support) {
- uint32_t contentSize = msgBuffer.getPosition();
- msgBuffer.reset();
- sendBufferLH(msgBuffer, contentSize, mExchange, v1key.str());
+ sendBuffer(msgBuffer, mExchange, v1key.str());
QPID_LOG(debug, "SEND Immediate(delete) ContentInd to=" << v1key.str());
}
@@ -1160,29 +1133,26 @@ void ManagementAgent::deleteObjectNowLH(const ObjectId& oid)
string content;
ListCodec::encode(list_, content);
- sendBufferLH(content, "", headers, "amqp/list", v2Topic, v2key.str());
+ sendBuffer(content, "", headers, "amqp/list", v2Topic, v2key.str(), 0);
QPID_LOG(debug, "SEND Immediate(delete) ContentInd to=" << v2key.str());
}
}
-void ManagementAgent::sendCommandCompleteLH(const string& replyToKey, uint32_t sequence,
- uint32_t code, const string& text)
+void ManagementAgent::sendCommandComplete(const string& replyToKey, uint32_t sequence,
+ uint32_t code, const string& text)
{
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
+ ResizableBuffer outBuffer (qmfV1BufferSize);
encodeHeader (outBuffer, 'z', sequence);
outBuffer.putLong (code);
outBuffer.putShortString (text);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
+ sendBuffer(outBuffer, dExchange, replyToKey);
QPID_LOG(debug, "SEND CommandCompleteInd code=" << code << " text=" << text << " to=" <<
replyToKey << " seq=" << sequence);
}
-void ManagementAgent::sendExceptionLH(const string& rte, const string& rtk, const string& cid,
- const string& text, uint32_t code, bool viaLocal)
+void ManagementAgent::sendException(const string& rte, const string& rtk, const string& cid,
+ const string& text, uint32_t code, bool viaLocal)
{
static const string addr_exchange("qmf.default.direct");
@@ -1200,7 +1170,7 @@ void ManagementAgent::sendExceptionLH(const string& rte, const string& rtk, cons
map["_values"] = values;
MapCodec::encode(map, content);
- sendBufferLH(content, cid, headers, "amqp/map", rte, rtk);
+ sendBuffer(content, cid, headers, "amqp/map", rte, rtk);
QPID_LOG(debug, "SENT Exception code=" << code <<" text=" << text);
}
@@ -1211,7 +1181,6 @@ bool ManagementAgent::dispatchCommand (Deliverable& deliverable,
const bool topic,
int qmfVersion)
{
- sys::Mutex::ScopedLock lock (userLock);
Message& msg = ((DeliverableMessage&) deliverable).getMessage ();
if (topic && qmfVersion == 1) {
@@ -1225,23 +1194,23 @@ bool ManagementAgent::dispatchCommand (Deliverable& deliverable,
// schema.#
if (routingKey == "broker") {
- dispatchAgentCommandLH(msg);
+ dispatchAgentCommand(msg);
return false;
}
if (routingKey.length() > 6) {
if (routingKey.compare(0, 9, "agent.1.0") == 0) {
- dispatchAgentCommandLH(msg);
+ dispatchAgentCommand(msg);
return false;
}
if (routingKey.compare(0, 8, "agent.1.") == 0) {
- return authorizeAgentMessageLH(msg);
+ return authorizeAgentMessage(msg);
}
if (routingKey.compare(0, 7, "schema.") == 0) {
- dispatchAgentCommandLH(msg);
+ dispatchAgentCommand(msg);
return true;
}
}
@@ -1253,7 +1222,7 @@ bool ManagementAgent::dispatchCommand (Deliverable& deliverable,
// Intercept messages bound to:
// "console.ind.locate.# - process these messages, and also allow them to be forwarded.
if (routingKey == "console.request.agent_locate") {
- dispatchAgentCommandLH(msg);
+ dispatchAgentCommand(msg);
return true;
}
@@ -1264,7 +1233,7 @@ bool ManagementAgent::dispatchCommand (Deliverable& deliverable,
// "<name_address>" - the broker agent's proper name
// and do not forward them futher
if (routingKey == "broker" || routingKey == name_address) {
- dispatchAgentCommandLH(msg, routingKey == "broker");
+ dispatchAgentCommand(msg, routingKey == "broker");
return false;
}
}
@@ -1273,16 +1242,15 @@ bool ManagementAgent::dispatchCommand (Deliverable& deliverable,
return true;
}
-void ManagementAgent::handleMethodRequestLH(Buffer& inBuffer, const string& replyToKey, uint32_t sequence, const ConnectionToken* connToken)
+void ManagementAgent::handleMethodRequest(Buffer& inBuffer, const string& replyToKey, uint32_t sequence, const ConnectionToken* connToken)
{
- moveNewObjectsLH();
+ moveNewObjects();
string methodName;
string packageName;
string className;
uint8_t hash[16];
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
+ ResizableBuffer outBuffer (qmfV1BufferSize);
AclModule* acl = broker->getAcl();
string inArgs;
@@ -1304,9 +1272,7 @@ void ManagementAgent::handleMethodRequestLH(Buffer& inBuffer, const string& repl
if (disallowAllV1Methods) {
outBuffer.putLong(Manageable::STATUS_FORBIDDEN);
outBuffer.putMediumString("QMFv1 methods forbidden on this broker, use QMFv2");
- outLen = MA_BUFFER_SIZE - outBuffer.available();
- outBuffer.reset();
- sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
+ sendBuffer(outBuffer, dExchange, replyToKey);
QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN reason='All QMFv1 Methods Forbidden' seq=" << sequence);
return;
}
@@ -1315,9 +1281,7 @@ void ManagementAgent::handleMethodRequestLH(Buffer& inBuffer, const string& repl
if (i != disallowed.end()) {
outBuffer.putLong(Manageable::STATUS_FORBIDDEN);
outBuffer.putMediumString(i->second);
- outLen = MA_BUFFER_SIZE - outBuffer.available();
- outBuffer.reset();
- sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
+ sendBuffer(outBuffer, dExchange, replyToKey);
QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << sequence);
return;
}
@@ -1331,30 +1295,34 @@ void ManagementAgent::handleMethodRequestLH(Buffer& inBuffer, const string& repl
if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_METHOD, methodName, &params)) {
outBuffer.putLong(Manageable::STATUS_FORBIDDEN);
outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN));
- outLen = MA_BUFFER_SIZE - outBuffer.available();
- outBuffer.reset();
- sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
+ sendBuffer(outBuffer, dExchange, replyToKey);
QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence);
return;
}
}
- ManagementObjectMap::iterator iter = numericFind(objId);
- if (iter == managementObjects.end() || iter->second->isDeleted()) {
+ ManagementObject::shared_ptr object;
+ {
+ sys::Mutex::ScopedLock lock(objectLock);
+ ManagementObjectMap::iterator iter = numericFind(objId);
+ if (iter != managementObjects.end())
+ object = iter->second;
+ }
+
+ if (!object || object->isDeleted()) {
outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT);
outBuffer.putMediumString(Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT));
} else {
- if ((iter->second->getPackageName() != packageName) ||
- (iter->second->getClassName() != className)) {
+ if ((object->getPackageName() != packageName) ||
+ (object->getClassName() != className)) {
outBuffer.putLong (Manageable::STATUS_PARAMETER_INVALID);
outBuffer.putMediumString(Manageable::StatusText (Manageable::STATUS_PARAMETER_INVALID));
}
else {
uint32_t pos = outBuffer.getPosition();
try {
- sys::Mutex::ScopedUnlock u(userLock);
string outBuf;
- iter->second->doMethod(methodName, inArgs, outBuf, userId);
+ object->doMethod(methodName, inArgs, outBuf, userId);
outBuffer.putRawData(outBuf);
} catch(exception& e) {
outBuffer.setPosition(pos);;
@@ -1364,17 +1332,15 @@ void ManagementAgent::handleMethodRequestLH(Buffer& inBuffer, const string& repl
}
}
- outLen = MA_BUFFER_SIZE - outBuffer.available();
- outBuffer.reset();
- sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
+ sendBuffer(outBuffer, dExchange, replyToKey);
QPID_LOG(debug, "SEND MethodResponse (v1) to=" << replyToKey << " seq=" << sequence);
}
-void ManagementAgent::handleMethodRequestLH (const string& body, const string& rte, const string& rtk,
- const string& cid, const ConnectionToken* connToken, bool viaLocal)
+void ManagementAgent::handleMethodRequest (const string& body, const string& rte, const string& rtk,
+ const string& cid, const ConnectionToken* connToken, bool viaLocal)
{
- moveNewObjectsLH();
+ moveNewObjects();
string methodName;
Variant::Map inMap;
@@ -1393,8 +1359,8 @@ void ManagementAgent::handleMethodRequestLH (const string& body, const string& r
if ((oid = inMap.find("_object_id")) == inMap.end() ||
(mid = inMap.find("_method_name")) == inMap.end()) {
- sendExceptionLH(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID),
- Manageable::STATUS_PARAMETER_INVALID, viaLocal);
+ sendException(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID),
+ Manageable::STATUS_PARAMETER_INVALID, viaLocal);
return;
}
@@ -1412,16 +1378,22 @@ void ManagementAgent::handleMethodRequestLH (const string& body, const string& r
inArgs = (mid->second).asMap();
}
} catch(exception& e) {
- sendExceptionLH(rte, rtk, cid, e.what(), Manageable::STATUS_EXCEPTION, viaLocal);
+ sendException(rte, rtk, cid, e.what(), Manageable::STATUS_EXCEPTION, viaLocal);
return;
}
- ManagementObjectMap::iterator iter = managementObjects.find(objId);
+ ManagementObject::shared_ptr object;
+ {
+ sys::Mutex::ScopedLock lock(objectLock);
+ ManagementObjectMap::iterator iter = managementObjects.find(objId);
+ if (iter != managementObjects.end())
+ object = iter->second;
+ }
- if (iter == managementObjects.end() || iter->second->isDeleted()) {
+ if (!object || object->isDeleted()) {
stringstream estr;
estr << "No object found with ID=" << objId;
- sendExceptionLH(rte, rtk, cid, estr.str(), 1, viaLocal);
+ sendException(rte, rtk, cid, estr.str(), 1, viaLocal);
return;
}
@@ -1429,34 +1401,33 @@ void ManagementAgent::handleMethodRequestLH (const string& body, const string& r
AclModule* acl = broker->getAcl();
DisallowedMethods::const_iterator i;
- i = disallowed.find(make_pair(iter->second->getClassName(), methodName));
+ i = disallowed.find(make_pair(object->getClassName(), methodName));
if (i != disallowed.end()) {
- sendExceptionLH(rte, rtk, cid, i->second, Manageable::STATUS_FORBIDDEN, viaLocal);
+ sendException(rte, rtk, cid, i->second, Manageable::STATUS_FORBIDDEN, viaLocal);
return;
}
string userId = ((const qpid::broker::ConnectionState*) connToken)->getUserId();
if (acl != 0) {
map<acl::Property, string> params;
- params[acl::PROP_SCHEMAPACKAGE] = iter->second->getPackageName();
- params[acl::PROP_SCHEMACLASS] = iter->second->getClassName();
+ params[acl::PROP_SCHEMAPACKAGE] = object->getPackageName();
+ params[acl::PROP_SCHEMACLASS] = object->getClassName();
if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_METHOD, methodName, &params)) {
- sendExceptionLH(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_FORBIDDEN),
- Manageable::STATUS_FORBIDDEN, viaLocal);
+ sendException(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_FORBIDDEN),
+ Manageable::STATUS_FORBIDDEN, viaLocal);
return;
}
}
// invoke the method
- QPID_LOG(debug, "RECV MethodRequest (v2) class=" << iter->second->getPackageName()
- << ":" << iter->second->getClassName() << " method=" <<
+ QPID_LOG(debug, "RECV MethodRequest (v2) class=" << object->getPackageName()
+ << ":" << object->getClassName() << " method=" <<
methodName << " replyTo=" << rte << "/" << rtk << " objId=" << objId << " inArgs=" << inArgs);
try {
- sys::Mutex::ScopedUnlock u(userLock);
- iter->second->doMethod(methodName, inArgs, callMap, userId);
+ object->doMethod(methodName, inArgs, callMap, userId);
errorCode = callMap["_status_code"].asUint32();
if (errorCode == 0) {
outMap["_arguments"] = Variant::Map();
@@ -1467,62 +1438,59 @@ void ManagementAgent::handleMethodRequestLH (const string& body, const string& r
} else
error = callMap["_status_text"].asString();
} catch(exception& e) {
- sendExceptionLH(rte, rtk, cid, e.what(), Manageable::STATUS_EXCEPTION, viaLocal);
+ sendException(rte, rtk, cid, e.what(), Manageable::STATUS_EXCEPTION, viaLocal);
return;
}
if (errorCode != 0) {
- sendExceptionLH(rte, rtk, cid, error, errorCode, viaLocal);
+ sendException(rte, rtk, cid, error, errorCode, viaLocal);
return;
}
MapCodec::encode(outMap, content);
- sendBufferLH(content, cid, headers, "amqp/map", rte, rtk);
+ sendBuffer(content, cid, headers, "amqp/map", rte, rtk);
QPID_LOG(debug, "SEND MethodResponse (v2) to=" << rte << "/" << rtk << " seq=" << cid << " map=" << outMap);
}
-void ManagementAgent::handleBrokerRequestLH (Buffer&, const string& replyToKey, uint32_t sequence)
+void ManagementAgent::handleBrokerRequest (Buffer&, const string& replyToKey, uint32_t sequence)
{
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
+ ResizableBuffer outBuffer (qmfV1BufferSize);
QPID_LOG(debug, "RECV BrokerRequest replyTo=" << replyToKey);
encodeHeader (outBuffer, 'b', sequence);
uuid.encode (outBuffer);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
+ sendBuffer(outBuffer, dExchange, replyToKey);
QPID_LOG(debug, "SEND BrokerResponse to=" << replyToKey);
}
-void ManagementAgent::handlePackageQueryLH (Buffer&, const string& replyToKey, uint32_t sequence)
+void ManagementAgent::handlePackageQuery (Buffer&, const string& replyToKey, uint32_t sequence)
{
QPID_LOG(debug, "RECV PackageQuery replyTo=" << replyToKey);
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
+ ResizableBuffer outBuffer (qmfV1BufferSize);
- for (PackageMap::iterator pIter = packages.begin ();
- pIter != packages.end ();
- pIter++)
{
- encodeHeader (outBuffer, 'p', sequence);
- encodePackageIndication (outBuffer, pIter);
+ sys::Mutex::ScopedLock lock(userLock);
+ for (PackageMap::iterator pIter = packages.begin ();
+ pIter != packages.end ();
+ pIter++)
+ {
+ encodeHeader (outBuffer, 'p', sequence);
+ encodePackageIndication (outBuffer, pIter);
+ }
}
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- if (outLen) {
- outBuffer.reset ();
- sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
+ if (outBuffer.getPosition() > 0) {
+ sendBuffer(outBuffer, dExchange, replyToKey);
QPID_LOG(debug, "SEND PackageInd to=" << replyToKey << " seq=" << sequence);
}
- sendCommandCompleteLH(replyToKey, sequence);
+ sendCommandComplete(replyToKey, sequence);
}
-void ManagementAgent::handlePackageIndLH (Buffer& inBuffer, const string& replyToKey, uint32_t sequence)
+void ManagementAgent::handlePackageInd (Buffer& inBuffer, const string& replyToKey, uint32_t sequence)
{
string packageName;
@@ -1530,10 +1498,11 @@ void ManagementAgent::handlePackageIndLH (Buffer& inBuffer, const string& replyT
QPID_LOG(debug, "RECV PackageInd package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence);
+ sys::Mutex::ScopedLock lock(userLock);
findOrAddPackageLH(packageName);
}
-void ManagementAgent::handleClassQueryLH(Buffer& inBuffer, const string& replyToKey, uint32_t sequence)
+void ManagementAgent::handleClassQuery(Buffer& inBuffer, const string& replyToKey, uint32_t sequence)
{
string packageName;
@@ -1541,40 +1510,39 @@ void ManagementAgent::handleClassQueryLH(Buffer& inBuffer, const string& replyTo
QPID_LOG(debug, "RECV ClassQuery package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence);
- PackageMap::iterator pIter = packages.find(packageName);
- if (pIter != packages.end())
+ typedef std::pair<SchemaClassKey, uint8_t> _ckeyType;
+ std::list<_ckeyType> classes;
{
- typedef std::pair<SchemaClassKey, uint8_t> _ckeyType;
- std::list<_ckeyType> classes;
- ClassMap &cMap = pIter->second;
- for (ClassMap::iterator cIter = cMap.begin();
- cIter != cMap.end();
- cIter++) {
- if (cIter->second.hasSchema()) {
- classes.push_back(make_pair(cIter->first, cIter->second.kind));
+ sys::Mutex::ScopedLock lock(userLock);
+ PackageMap::iterator pIter = packages.find(packageName);
+ if (pIter != packages.end())
+ {
+ ClassMap &cMap = pIter->second;
+ for (ClassMap::iterator cIter = cMap.begin();
+ cIter != cMap.end();
+ cIter++) {
+ if (cIter->second.hasSchema()) {
+ classes.push_back(make_pair(cIter->first, cIter->second.kind));
+ }
}
}
+ }
- while (classes.size()) {
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
+ while (classes.size()) {
+ ResizableBuffer outBuffer(qmfV1BufferSize);
- encodeHeader(outBuffer, 'q', sequence);
- encodeClassIndication(outBuffer, packageName, classes.front().first, classes.front().second);
-
- outLen = MA_BUFFER_SIZE - outBuffer.available();
- outBuffer.reset();
- sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(debug, "SEND ClassInd class=" << packageName << ":" << classes.front().first.name <<
- "(" << Uuid(classes.front().first.hash) << ") to=" << replyToKey << " seq=" << sequence);
- classes.pop_front();
- }
+ encodeHeader(outBuffer, 'q', sequence);
+ encodeClassIndication(outBuffer, packageName, classes.front().first, classes.front().second);
+ sendBuffer(outBuffer, dExchange, replyToKey);
+ QPID_LOG(debug, "SEND ClassInd class=" << packageName << ":" << classes.front().first.name <<
+ "(" << Uuid(classes.front().first.hash) << ") to=" << replyToKey << " seq=" << sequence);
+ classes.pop_front();
}
- sendCommandCompleteLH(replyToKey, sequence);
+ sendCommandComplete(replyToKey, sequence);
}
-void ManagementAgent::handleClassIndLH (Buffer& inBuffer, const string& replyToKey, uint32_t)
+void ManagementAgent::handleClassInd (Buffer& inBuffer, const string& replyToKey, uint32_t)
{
string packageName;
SchemaClassKey key;
@@ -1587,20 +1555,18 @@ void ManagementAgent::handleClassIndLH (Buffer& inBuffer, const string& replyToK
QPID_LOG(debug, "RECV ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) <<
"), replyTo=" << replyToKey);
+ sys::Mutex::ScopedLock lock(userLock);
PackageMap::iterator pIter = findOrAddPackageLH(packageName);
ClassMap::iterator cIter = pIter->second.find(key);
if (cIter == pIter->second.end() || !cIter->second.hasSchema()) {
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
+ ResizableBuffer outBuffer (qmfV1BufferSize);
uint32_t sequence = nextRequestSequence++;
// Schema Request
encodeHeader (outBuffer, 'S', sequence);
outBuffer.putShortString(packageName);
key.encode(outBuffer);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
+ sendBuffer(outBuffer, dExchange, replyToKey);
QPID_LOG(debug, "SEND SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) <<
"), to=" << replyToKey << " seq=" << sequence);
@@ -1625,7 +1591,7 @@ void ManagementAgent::SchemaClass::appendSchema(Buffer& buf)
buf.putRawData(reinterpret_cast<uint8_t*>(&data[0]), data.size());
}
-void ManagementAgent::handleSchemaRequestLH(Buffer& inBuffer, const string& rte, const string& rtk, uint32_t sequence)
+void ManagementAgent::handleSchemaRequest(Buffer& inBuffer, const string& rte, const string& rtk, uint32_t sequence)
{
string packageName;
SchemaClassKey key;
@@ -1636,34 +1602,32 @@ void ManagementAgent::handleSchemaRequestLH(Buffer& inBuffer, const string& rte,
QPID_LOG(debug, "RECV SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) <<
"), replyTo=" << rte << "/" << rtk << " seq=" << sequence);
+ sys::Mutex::ScopedLock lock(userLock);
PackageMap::iterator pIter = packages.find(packageName);
if (pIter != packages.end()) {
ClassMap& cMap = pIter->second;
ClassMap::iterator cIter = cMap.find(key);
if (cIter != cMap.end()) {
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
+ ResizableBuffer outBuffer(qmfV1BufferSize);
SchemaClass& classInfo = cIter->second;
if (classInfo.hasSchema()) {
encodeHeader(outBuffer, 's', sequence);
classInfo.appendSchema(outBuffer);
- outLen = MA_BUFFER_SIZE - outBuffer.available();
- outBuffer.reset();
- sendBufferLH(outBuffer, outLen, rte, rtk);
+ sendBuffer(outBuffer, rte, rtk);
QPID_LOG(debug, "SEND SchemaResponse to=" << rte << "/" << rtk << " seq=" << sequence);
}
else
- sendCommandCompleteLH(rtk, sequence, 1, "Schema not available");
+ sendCommandComplete(rtk, sequence, 1, "Schema not available");
}
else
- sendCommandCompleteLH(rtk, sequence, 1, "Class key not found");
+ sendCommandComplete(rtk, sequence, 1, "Class key not found");
}
else
- sendCommandCompleteLH(rtk, sequence, 1, "Package not found");
+ sendCommandComplete(rtk, sequence, 1, "Package not found");
}
-void ManagementAgent::handleSchemaResponseLH(Buffer& inBuffer, const string& /*replyToKey*/, uint32_t sequence)
+void ManagementAgent::handleSchemaResponse(Buffer& inBuffer, const string& /*replyToKey*/, uint32_t sequence)
{
string packageName;
SchemaClassKey key;
@@ -1676,6 +1640,7 @@ void ManagementAgent::handleSchemaResponseLH(Buffer& inBuffer, const string& /*r
QPID_LOG(debug, "RECV SchemaResponse class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" << " seq=" << sequence);
+ sys::Mutex::ScopedLock lock(userLock);
PackageMap::iterator pIter = packages.find(packageName);
if (pIter != packages.end()) {
ClassMap& cMap = pIter->second;
@@ -1690,14 +1655,11 @@ void ManagementAgent::handleSchemaResponseLH(Buffer& inBuffer, const string& /*r
inBuffer.getRawData(reinterpret_cast<uint8_t*>(&cIter->second.data[0]), length);
// Publish a class-indication message
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
+ ResizableBuffer outBuffer(qmfV1BufferSize);
encodeHeader(outBuffer, 'q');
encodeClassIndication(outBuffer, pIter->first, cIter->first, cIter->second.kind);
- outLen = MA_BUFFER_SIZE - outBuffer.available();
- outBuffer.reset();
- sendBufferLH(outBuffer, outLen, mExchange, "schema.class");
+ sendBuffer(outBuffer, mExchange, "schema.class");
QPID_LOG(debug, "SEND ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" <<
" to=schema.class");
}
@@ -1756,7 +1718,7 @@ void ManagementAgent::deleteOrphanedAgentsLH()
remoteAgents.erase(*dIter);
}
-void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, const string& replyToKey, uint32_t sequence, const ConnectionToken* connToken)
+void ManagementAgent::handleAttachRequest (Buffer& inBuffer, const string& replyToKey, uint32_t sequence, const ConnectionToken* connToken)
{
string label;
uint32_t requestedBrokerBank, requestedAgentBank;
@@ -1764,12 +1726,14 @@ void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, const string& rep
ObjectId connectionRef = ((const ConnectionState*) connToken)->GetManagementObject()->getObjectId();
Uuid systemId;
- moveNewObjectsLH();
+ moveNewObjects();
+
+ sys::Mutex::ScopedLock lock(userLock);
deleteOrphanedAgentsLH();
RemoteAgentMap::iterator aIter = remoteAgents.find(connectionRef);
if (aIter != remoteAgents.end()) {
// There already exists an agent on this session. Reject the request.
- sendCommandCompleteLH(replyToKey, sequence, 1, "Connection already has remote agent");
+ sendCommandComplete(replyToKey, sequence, 1, "Connection already has remote agent");
return;
}
@@ -1788,7 +1752,7 @@ void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, const string& rep
agent->agentBank = assignedBank;
agent->routingKey = replyToKey;
agent->connectionRef = connectionRef;
- agent->mgmtObject = new _qmf::Agent (this, agent.get());
+ agent->mgmtObject = _qmf::Agent::shared_ptr(new _qmf::Agent (this, agent.get()));
agent->mgmtObject->set_connectionRef(agent->connectionRef);
agent->mgmtObject->set_label (label);
agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId());
@@ -1801,25 +1765,22 @@ void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, const string& rep
QPID_LOG(debug, "Remote Agent registered bank=[" << brokerBank << "." << assignedBank << "] replyTo=" << replyToKey);
// Send an Attach Response
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
+ ResizableBuffer outBuffer (qmfV1BufferSize);
encodeHeader (outBuffer, 'a', sequence);
outBuffer.putLong (brokerBank);
outBuffer.putLong (assignedBank);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
+ sendBuffer(outBuffer, dExchange, replyToKey);
QPID_LOG(debug, "SEND AttachResponse brokerBank=" << brokerBank << " agentBank=" << assignedBank <<
" to=" << replyToKey << " seq=" << sequence);
}
-void ManagementAgent::handleGetQueryLH(Buffer& inBuffer, const string& replyToKey, uint32_t sequence)
+void ManagementAgent::handleGetQuery(Buffer& inBuffer, const string& replyToKey, uint32_t sequence)
{
FieldTable ft;
FieldTable::ValuePtr value;
- moveNewObjectsLH();
+ moveNewObjects();
ft.decode(inBuffer);
@@ -1832,11 +1793,17 @@ void ManagementAgent::handleGetQueryLH(Buffer& inBuffer, const string& replyToKe
return;
ObjectId selector(value->get<string>());
- ManagementObjectMap::iterator iter = numericFind(selector);
- if (iter != managementObjects.end()) {
- ManagementObject* object = iter->second;
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
+
+ ManagementObject::shared_ptr object;
+ {
+ sys::Mutex::ScopedLock lock(objectLock);
+ ManagementObjectMap::iterator iter = numericFind(selector);
+ if (iter != managementObjects.end())
+ object = iter->second;
+ }
+
+ if (object) {
+ ResizableBuffer outBuffer (qmfV1BufferSize);
if (object->getConfigChanged() || object->getInstChanged())
object->setUpdateTime();
@@ -1849,89 +1816,80 @@ void ManagementAgent::handleGetQueryLH(Buffer& inBuffer, const string& replyToKe
sBuf.clear();
object->writeStatistics(sBuf, true);
outBuffer.putRawData(sBuf);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
+ sendBuffer(outBuffer, dExchange, replyToKey);
QPID_LOG(debug, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence);
}
}
- sendCommandCompleteLH(replyToKey, sequence);
+ sendCommandComplete(replyToKey, sequence);
return;
}
string className (value->get<string>());
- std::list<ObjectId>matches;
+ std::list<ManagementObject::shared_ptr> matches;
if (className == "memory")
- qpid::sys::MemStat::loadMemInfo(memstat);
+ qpid::sys::MemStat::loadMemInfo(memstat.get());
if (className == "broker") {
uint64_t uptime = sys::Duration(startTime, sys::now());
- static_cast<_qmf::Broker*>(broker->GetManagementObject())->set_uptime(uptime);
+ boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject())->set_uptime(uptime);
}
// build up a set of all objects to be dumped
- for (ManagementObjectMap::iterator iter = managementObjects.begin();
- iter != managementObjects.end();
- iter++) {
- ManagementObject* object = iter->second;
- if (object->getClassName () == className) {
- matches.push_back(object->getObjectId());
+ {
+ sys::Mutex::ScopedLock lock(objectLock);
+ for (ManagementObjectMap::iterator iter = managementObjects.begin();
+ iter != managementObjects.end();
+ iter++) {
+ ManagementObject::shared_ptr object = iter->second;
+ if (object->getClassName () == className) {
+ matches.push_back(object);
+ }
}
}
- // send them (as sendBufferLH drops the userLock)
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
+ // send them
+ ResizableBuffer outBuffer (qmfV1BufferSize);
while (matches.size()) {
- ObjectId objId = matches.front();
- ManagementObjectMap::iterator oIter = managementObjects.find( objId );
- if (oIter != managementObjects.end()) {
- ManagementObject* object = oIter->second;
-
- if (object->getConfigChanged() || object->getInstChanged())
- object->setUpdateTime();
-
- if (!object->isDeleted()) {
- string sProps, sStats;
- object->writeProperties(sProps);
- object->writeStatistics(sStats, true);
-
- size_t len = 8 + sProps.length() + sStats.length(); // 8 == size of header in bytes.
- if (len > MA_BUFFER_SIZE) {
- QPID_LOG(error, "Object " << objId << " too large for output buffer - discarded!");
- } else {
- if (outBuffer.available() < len) { // not enough room in current buffer, send it.
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- sendBufferLH(outBuffer, outLen, dExchange, replyToKey); // drops lock
- QPID_LOG(debug, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence);
- continue; // lock dropped, need to re-find _SAME_ objid as it may have been deleted.
- }
- encodeHeader(outBuffer, 'g', sequence);
- outBuffer.putRawData(sProps);
- outBuffer.putRawData(sStats);
+ ManagementObject::shared_ptr object = matches.front();
+ if (object->getConfigChanged() || object->getInstChanged())
+ object->setUpdateTime();
+
+ if (!object->isDeleted()) {
+ string sProps, sStats;
+ object->writeProperties(sProps);
+ object->writeStatistics(sStats, true);
+
+ size_t len = 8 + sProps.length() + sStats.length(); // 8 == size of header in bytes.
+ if (len > qmfV1BufferSize) {
+ QPID_LOG(error, "Object " << object->getObjectId() << " too large for output buffer - discarded!");
+ } else {
+ if (outBuffer.available() < len) { // not enough room in current buffer, send it.
+ sendBuffer(outBuffer, dExchange, replyToKey);
+ QPID_LOG(debug, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence);
+ continue; // lock dropped, need to re-find _SAME_ objid as it may have been deleted.
}
+ encodeHeader(outBuffer, 'g', sequence);
+ outBuffer.putRawData(sProps);
+ outBuffer.putRawData(sStats);
}
}
matches.pop_front();
}
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- if (outLen) {
- outBuffer.reset ();
- sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
+ if (outBuffer.getPosition() > 0) {
+ sendBuffer(outBuffer, dExchange, replyToKey);
QPID_LOG(debug, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence);
}
- sendCommandCompleteLH(replyToKey, sequence);
+ sendCommandComplete(replyToKey, sequence);
}
-void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, const string& rtk, const string& cid, bool viaLocal)
+void ManagementAgent::handleGetQuery(const string& body, const string& rte, const string& rtk, const string& cid, bool viaLocal)
{
- moveNewObjectsLH();
+ moveNewObjects();
Variant::Map inMap;
Variant::Map::const_iterator i;
@@ -1950,17 +1908,17 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co
*/
i = inMap.find("_what");
if (i == inMap.end()) {
- sendExceptionLH(rte, rtk, cid, "_what element missing in Query");
+ sendException(rte, rtk, cid, "_what element missing in Query");
return;
}
if (i->second.getType() != qpid::types::VAR_STRING) {
- sendExceptionLH(rte, rtk, cid, "_what element is not a string");
+ sendException(rte, rtk, cid, "_what element is not a string");
return;
}
if (i->second.asString() != "OBJECT") {
- sendExceptionLH(rte, rtk, cid, "Query for _what => '" + i->second.asString() + "' not supported");
+ sendException(rte, rtk, cid, "Query for _what => '" + i->second.asString() + "' not supported");
return;
}
@@ -1984,11 +1942,11 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co
}
if (className == "memory")
- qpid::sys::MemStat::loadMemInfo(memstat);
+ qpid::sys::MemStat::loadMemInfo(memstat.get());
if (className == "broker") {
uint64_t uptime = sys::Duration(startTime, sys::now());
- static_cast<_qmf::Broker*>(broker->GetManagementObject())->set_uptime(uptime);
+ boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject())->set_uptime(uptime);
}
/*
@@ -2000,10 +1958,14 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co
Variant::List list_;
ObjectId objId(i->second.asMap());
- ManagementObjectMap::iterator iter = managementObjects.find(objId);
- if (iter != managementObjects.end()) {
- ManagementObject* object = iter->second;
-
+ ManagementObject::shared_ptr object;
+ {
+ sys::Mutex::ScopedLock lock (objectLock);
+ ManagementObjectMap::iterator iter = managementObjects.find(objId);
+ if (iter != managementObjects.end())
+ object = iter->second;
+ }
+ if (object) {
if (object->getConfigChanged() || object->getInstChanged())
object->setUpdateTime();
@@ -2027,7 +1989,7 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co
string content;
ListCodec::encode(list_, content);
- sendBufferLH(content, cid, headers, "amqp/list", rte, rtk);
+ sendBuffer(content, cid, headers, "amqp/list", rte, rtk);
QPID_LOG(debug, "SENT QueryResponse (query by object_id) to=" << rte << "/" << rtk);
return;
}
@@ -2037,10 +1999,18 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co
Variant::List _subList;
unsigned int objCount = 0;
- for (ManagementObjectMap::iterator iter = managementObjects.begin();
- iter != managementObjects.end();
+ ManagementObjectVector localManagementObjects;
+ {
+ sys::Mutex::ScopedLock objLock(objectLock);
+ std::transform(managementObjects.begin(), managementObjects.end(),
+ std::back_inserter(localManagementObjects),
+ boost::bind(&ManagementObjectMap::value_type::second, _1));
+ }
+
+ for (ManagementObjectVector::iterator iter = localManagementObjects.begin();
+ iter != localManagementObjects.end();
iter++) {
- ManagementObject* object = iter->second;
+ ManagementObject::shared_ptr object = *iter;
if (object->getClassName() == className &&
(packageName.empty() || object->getPackageName() == packageName)) {
@@ -2055,7 +2025,7 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co
object->writeTimestamps(map_);
object->mapEncodeValues(values, true, true); // write both stats and properties
- iter->first.mapEncode(oidMap);
+ object->getObjectId().mapEncode(oidMap);
map_["_values"] = values;
map_["_object_id"] = oidMap;
@@ -2080,13 +2050,13 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co
string content;
while (_list.size() > 1) {
ListCodec::encode(_list.front().asList(), content);
- sendBufferLH(content, cid, headers, "amqp/list", rte, rtk);
+ sendBuffer(content, cid, headers, "amqp/list", rte, rtk);
_list.pop_front();
QPID_LOG(debug, "SENT QueryResponse (partial, query by schema_id) to=" << rte << "/" << rtk << " len=" << content.length());
}
headers.erase("partial");
ListCodec::encode(_list.size() ? _list.front().asList() : Variant::List(), content);
- sendBufferLH(content, cid, headers, "amqp/list", rte, rtk);
+ sendBuffer(content, cid, headers, "amqp/list", rte, rtk);
QPID_LOG(debug, "SENT QueryResponse (query by schema_id) to=" << rte << "/" << rtk << " len=" << content.length());
return;
}
@@ -2094,12 +2064,12 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co
// Unrecognized query - Send empty message to indicate CommandComplete
string content;
ListCodec::encode(Variant::List(), content);
- sendBufferLH(content, cid, headers, "amqp/list", rte, rtk);
+ sendBuffer(content, cid, headers, "amqp/list", rte, rtk);
QPID_LOG(debug, "SENT QueryResponse (empty) to=" << rte << "/" << rtk);
}
-void ManagementAgent::handleLocateRequestLH(const string&, const string& rte, const string& rtk, const string& cid)
+void ManagementAgent::handleLocateRequest(const string&, const string& rte, const string& rtk, const string& cid)
{
QPID_LOG(debug, "RCVD AgentLocateRequest");
@@ -2117,16 +2087,17 @@ void ManagementAgent::handleLocateRequestLH(const string&, const string& rte, co
string content;
MapCodec::encode(map, content);
- sendBufferLH(content, cid, headers, "amqp/map", rte, rtk);
+ sendBuffer(content, cid, headers, "amqp/map", rte, rtk);
clientWasAdded = true;
QPID_LOG(debug, "SENT AgentLocateResponse replyTo=" << rte << "/" << rtk);
}
-bool ManagementAgent::authorizeAgentMessageLH(Message& msg)
+bool ManagementAgent::authorizeAgentMessage(Message& msg)
{
- Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE);
+ sys::Mutex::ScopedLock lock(userLock);
+ ResizableBuffer inBuffer (qmfV1BufferSize);
uint32_t sequence = 0;
bool methodReq = false;
bool mapMsg = false;
@@ -2140,7 +2111,7 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg)
// authorized or not. In this case, return true (authorized) if there is no ACL in place,
// otherwise return false;
//
- if (msg.getContentSize() > MA_BUFFER_SIZE)
+ if (msg.getContentSize() > qmfV1BufferSize)
return broker->getAcl() == 0;
inBuffer.putRawData(msg.getContent());
@@ -2149,7 +2120,7 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg)
qpid::broker::amqp_0_10::MessageTransfer& transfer(qpid::broker::amqp_0_10::MessageTransfer::get(msg));
const framing::MessageProperties* p =
- transfer.getFrames().getHeaders()->get<framing::MessageProperties>();
+ transfer.getFrames().getHeaders()->get<framing::MessageProperties>();
const framing::FieldTable *headers = p ? &p->getApplicationHeaders() : 0;
@@ -2193,11 +2164,11 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg)
}
// look up schema for object to get package and class name
-
+ sys::Mutex::ScopedLock lock(objectLock);
ManagementObjectMap::iterator iter = managementObjects.find(objId);
if (iter == managementObjects.end() || iter->second->isDeleted()) {
- QPID_LOG(debug, "ManagementAgent::authorizeAgentMessageLH: stale object id " <<
+ QPID_LOG(debug, "ManagementAgent::authorizeAgentMessage: stale object id " <<
objId);
return false;
}
@@ -2256,19 +2227,16 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg)
cid = p->getCorrelationId();
if (mapMsg) {
- sendExceptionLH(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_FORBIDDEN),
- Manageable::STATUS_FORBIDDEN, false);
+ sendException(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_FORBIDDEN),
+ Manageable::STATUS_FORBIDDEN, false);
} else {
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
+ ResizableBuffer outBuffer(qmfV1BufferSize);
encodeHeader(outBuffer, 'm', sequence);
outBuffer.putLong(Manageable::STATUS_FORBIDDEN);
outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN));
- outLen = MA_BUFFER_SIZE - outBuffer.available();
- outBuffer.reset();
- sendBufferLH(outBuffer, outLen, rte, rtk);
+ sendBuffer(outBuffer, rte, rtk);
}
QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence);
@@ -2280,7 +2248,7 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg)
return true;
}
-void ManagementAgent::dispatchAgentCommandLH(Message& msg, bool viaLocal)
+void ManagementAgent::dispatchAgentCommand(Message& msg, bool viaLocal)
{
string rte;
string rtk;
@@ -2295,10 +2263,10 @@ void ManagementAgent::dispatchAgentCommandLH(Message& msg, bool viaLocal)
else
return;
- Buffer inBuffer(inputBuffer, MA_BUFFER_SIZE);
+ ResizableBuffer inBuffer(qmfV1BufferSize);
uint8_t opcode;
- if (msg.getContentSize() > MA_BUFFER_SIZE) {
+ if (msg.getContentSize() > qmfV1BufferSize) {
QPID_LOG(debug, "ManagementAgent::dispatchAgentCommandLH: Message too large: " <<
msg.getContentSize());
return;
@@ -2317,39 +2285,38 @@ void ManagementAgent::dispatchAgentCommandLH(Message& msg, bool viaLocal)
string body;
string cid;
inBuffer.getRawData(body, bufferLen);
+ {
+ if (p && p->hasCorrelationId()) {
+ cid = p->getCorrelationId();
+ }
- if (p && p->hasCorrelationId()) {
- cid = p->getCorrelationId();
+ if (opcode == "_method_request")
+ return handleMethodRequest(body, rte, rtk, cid, msg.getPublisher(), viaLocal);
+ else if (opcode == "_query_request")
+ return handleGetQuery(body, rte, rtk, cid, viaLocal);
+ else if (opcode == "_agent_locate_request")
+ return handleLocateRequest(body, rte, rtk, cid);
}
-
- if (opcode == "_method_request")
- return handleMethodRequestLH(body, rte, rtk, cid, msg.getPublisher(), viaLocal);
- else if (opcode == "_query_request")
- return handleGetQueryLH(body, rte, rtk, cid, viaLocal);
- else if (opcode == "_agent_locate_request")
- return handleLocateRequestLH(body, rte, rtk, cid);
-
QPID_LOG(warning, "Support for QMF Opcode [" << opcode << "] TBD!!!");
return;
}
// old preV2 binary messages
-
while (inBuffer.getPosition() < bufferLen) {
uint32_t sequence;
if (!checkHeader(inBuffer, &opcode, &sequence))
return;
- if (opcode == 'B') handleBrokerRequestLH (inBuffer, rtk, sequence);
- else if (opcode == 'P') handlePackageQueryLH (inBuffer, rtk, sequence);
- else if (opcode == 'p') handlePackageIndLH (inBuffer, rtk, sequence);
- else if (opcode == 'Q') handleClassQueryLH (inBuffer, rtk, sequence);
- else if (opcode == 'q') handleClassIndLH (inBuffer, rtk, sequence);
- else if (opcode == 'S') handleSchemaRequestLH (inBuffer, rte, rtk, sequence);
- else if (opcode == 's') handleSchemaResponseLH (inBuffer, rtk, sequence);
- else if (opcode == 'A') handleAttachRequestLH (inBuffer, rtk, sequence, msg.getPublisher());
- else if (opcode == 'G') handleGetQueryLH (inBuffer, rtk, sequence);
- else if (opcode == 'M') handleMethodRequestLH (inBuffer, rtk, sequence, msg.getPublisher());
+ if (opcode == 'B') handleBrokerRequest (inBuffer, rtk, sequence);
+ else if (opcode == 'P') handlePackageQuery (inBuffer, rtk, sequence);
+ else if (opcode == 'p') handlePackageInd (inBuffer, rtk, sequence);
+ else if (opcode == 'Q') handleClassQuery (inBuffer, rtk, sequence);
+ else if (opcode == 'q') handleClassInd (inBuffer, rtk, sequence);
+ else if (opcode == 'S') handleSchemaRequest (inBuffer, rte, rtk, sequence);
+ else if (opcode == 's') handleSchemaResponse (inBuffer, rtk, sequence);
+ else if (opcode == 'A') handleAttachRequest (inBuffer, rtk, sequence, msg.getPublisher());
+ else if (opcode == 'G') handleGetQuery (inBuffer, rtk, sequence);
+ else if (opcode == 'M') handleMethodRequest (inBuffer, rtk, sequence, msg.getPublisher());
}
}
@@ -2365,14 +2332,11 @@ ManagementAgent::PackageMap::iterator ManagementAgent::findOrAddPackageLH(string
QPID_LOG (debug, "ManagementAgent added package " << name);
// Publish a package-indication message
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
+ ResizableBuffer outBuffer (qmfV1BufferSize);
encodeHeader (outBuffer, 'p');
encodePackageIndication (outBuffer, result.first);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- sendBufferLH(outBuffer, outLen, mExchange, "schema.package");
+ sendBuffer(outBuffer, mExchange, "schema.package");
QPID_LOG(debug, "SEND PackageInd package=" << name << " to=schema.package");
return result.first;
@@ -2587,70 +2551,6 @@ void ManagementAgent::SchemaClass::mapDecode(const Variant::Map& _map) {
}
}
-void ManagementAgent::exportSchemas(string& out) {
- Variant::List list_;
- Variant::Map map_, kmap, cmap;
-
- for (PackageMap::const_iterator i = packages.begin(); i != packages.end(); ++i) {
- string name = i->first;
- const ClassMap& classes = i ->second;
- for (ClassMap::const_iterator j = classes.begin(); j != classes.end(); ++j) {
- const SchemaClassKey& key = j->first;
- const SchemaClass& klass = j->second;
- if (klass.writeSchemaCall == 0) { // Ignore built-in schemas.
- // Encode name, schema-key, schema-class
-
- map_.clear();
- kmap.clear();
- cmap.clear();
-
- key.mapEncode(kmap);
- klass.mapEncode(cmap);
-
- map_["_pname"] = name;
- map_["_key"] = kmap;
- map_["_class"] = cmap;
- list_.push_back(map_);
- }
- }
- }
-
- ListCodec::encode(list_, out);
-}
-
-void ManagementAgent::importSchemas(qpid::framing::Buffer& inBuf) {
-
- string buf(inBuf.getPointer(), inBuf.available());
- Variant::List content;
- ListCodec::decode(buf, content);
- Variant::List::const_iterator l;
-
-
- for (l = content.begin(); l != content.end(); l++) {
- string package;
- SchemaClassKey key;
- SchemaClass klass;
- Variant::Map map_, kmap, cmap;
- Variant::Map::const_iterator i;
-
- map_ = l->asMap();
-
- if ((i = map_.find("_pname")) != map_.end()) {
- package = i->second.asString();
-
- if ((i = map_.find("_key")) != map_.end()) {
- key.mapDecode(i->second.asMap());
-
- if ((i = map_.find("_class")) != map_.end()) {
- klass.mapDecode(i->second.asMap());
-
- packages[package][key] = klass;
- }
- }
- }
- }
-}
-
void ManagementAgent::RemoteAgent::mapEncode(Variant::Map& map_) const {
Variant::Map _objId, _values;
@@ -2684,7 +2584,7 @@ void ManagementAgent::RemoteAgent::mapDecode(const Variant::Map& map_) {
connectionRef.mapDecode(i->second.asMap());
}
- mgmtObject = new _qmf::Agent(&agent, this);
+ mgmtObject = _qmf::Agent::shared_ptr(new _qmf::Agent(&agent, this));
if ((i = map_.find("_values")) != map_.end()) {
mgmtObject->mapDecodeValues(i->second.asMap());
@@ -2694,52 +2594,6 @@ void ManagementAgent::RemoteAgent::mapDecode(const Variant::Map& map_) {
mgmtObject->set_connectionRef(connectionRef);
}
-void ManagementAgent::exportAgents(string& out) {
- Variant::List list_;
- Variant::Map map_, omap, amap;
-
- for (RemoteAgentMap::const_iterator i = remoteAgents.begin();
- i != remoteAgents.end();
- ++i)
- {
- // TODO aconway 2010-03-04: see comment in ManagementAgent::RemoteAgent::encode
- boost::shared_ptr<RemoteAgent> agent(i->second);
-
- map_.clear();
- amap.clear();
-
- agent->mapEncode(amap);
- map_["_remote_agent"] = amap;
- list_.push_back(map_);
- }
-
- ListCodec::encode(list_, out);
-}
-
-void ManagementAgent::importAgents(qpid::framing::Buffer& inBuf) {
- string buf(inBuf.getPointer(), inBuf.available());
- Variant::List content;
- ListCodec::decode(buf, content);
- Variant::List::const_iterator l;
- sys::Mutex::ScopedLock lock(userLock);
-
- for (l = content.begin(); l != content.end(); l++) {
- boost::shared_ptr<RemoteAgent> agent(new RemoteAgent(*this));
- Variant::Map map_;
- Variant::Map::const_iterator i;
-
- map_ = l->asMap();
-
- if ((i = map_.find("_remote_agent")) != map_.end()) {
-
- agent->mapDecode(i->second.asMap());
-
- addObject (agent->mgmtObject, 0, false);
- remoteAgents[agent->connectionRef] = agent;
- }
- }
-}
-
namespace {
bool isDeletedMap(const ManagementObjectMap::value_type& value) {
return value.second->isDeleted();
@@ -2818,56 +2672,8 @@ Variant::Map ManagementAgent::toMap(const FieldTable& from)
return map;
}
-
-// Build up a list of the current set of deleted objects that are pending their
-// next (last) publish-ment.
-void ManagementAgent::exportDeletedObjects(DeletedObjectList& outList)
-{
- outList.clear();
-
- sys::Mutex::ScopedLock lock (userLock);
-
- moveNewObjectsLH();
- moveDeletedObjectsLH();
-
- // now copy the pending deletes into the outList
- for (PendingDeletedObjsMap::iterator mIter = pendingDeletedObjs.begin();
- mIter != pendingDeletedObjs.end(); mIter++) {
- for (DeletedObjectList::iterator lIter = mIter->second.begin();
- lIter != mIter->second.end(); lIter++) {
- outList.push_back(*lIter);
- }
- }
-}
-
-// Called by cluster to reset the management agent's list of deleted
-// objects to match the rest of the cluster.
-void ManagementAgent::importDeletedObjects(const DeletedObjectList& inList)
-{
- sys::Mutex::ScopedLock lock (userLock);
- // Clear out any existing deleted objects
- moveNewObjectsLH();
- pendingDeletedObjs.clear();
- ManagementObjectMap::iterator i = managementObjects.begin();
- // Silently drop any deleted objects left over from receiving the update.
- while (i != managementObjects.end()) {
- ManagementObject* object = i->second;
- if (object->isDeleted()) {
- delete object;
- managementObjects.erase(i++);
- }
- else ++i;
- }
- for (DeletedObjectList::const_iterator lIter = inList.begin(); lIter != inList.end(); lIter++) {
-
- std::string classkey((*lIter)->packageName + std::string(":") + (*lIter)->className);
- pendingDeletedObjs[classkey].push_back(*lIter);
- }
-}
-
-
// construct a DeletedObject from a management object.
-ManagementAgent::DeletedObject::DeletedObject(ManagementObject *src, bool v1, bool v2)
+ManagementAgent::DeletedObject::DeletedObject(ManagementObject::shared_ptr src, bool v1, bool v2)
: packageName(src->getPackageName()),
className(src->getClassName())
{
@@ -2903,54 +2709,18 @@ ManagementAgent::DeletedObject::DeletedObject(ManagementObject *src, bool v1, bo
}
}
+// Remove Deleted objects, and save for later publishing...
+bool ManagementAgent::moveDeletedObjects() {
+ typedef vector<pair<ObjectId, ManagementObject::shared_ptr> > DeleteList;
+ sys::Mutex::ScopedLock lock (objectLock);
-// construct a DeletedObject from an encoded representation. Used by
-// clustering to move deleted objects between clustered brokers. See
-// DeletedObject::encode() for the reverse.
-ManagementAgent::DeletedObject::DeletedObject(const std::string& encoded)
-{
- qpid::types::Variant::Map map_;
- MapCodec::decode(encoded, map_);
-
- packageName = map_["_package_name"].getString();
- className = map_["_class_name"].getString();
- objectId = map_["_object_id"].getString();
-
- encodedV1Config = map_["_v1_config"].getString();
- encodedV1Inst = map_["_v1_inst"].getString();
- encodedV2 = map_["_v2_data"].asMap();
-}
-
-
-// encode a DeletedObject to a string buffer. Used by
-// clustering to move deleted objects between clustered brokers. See
-// DeletedObject(const std::string&) for the reverse.
-void ManagementAgent::DeletedObject::encode(std::string& toBuffer)
-{
- qpid::types::Variant::Map map_;
-
-
- map_["_package_name"] = packageName;
- map_["_class_name"] = className;
- map_["_object_id"] = objectId;
-
- map_["_v1_config"] = encodedV1Config;
- map_["_v1_inst"] = encodedV1Inst;
- map_["_v2_data"] = encodedV2;
-
- MapCodec::encode(map_, toBuffer);
-}
-
-// Remove Deleted objects, and save for later publishing...
-bool ManagementAgent::moveDeletedObjectsLH() {
- typedef vector<pair<ObjectId, ManagementObject*> > DeleteList;
DeleteList deleteList;
for (ManagementObjectMap::iterator iter = managementObjects.begin();
iter != managementObjects.end();
++iter)
{
- ManagementObject* object = iter->second;
+ ManagementObject::shared_ptr object = iter->second;
if (object->isDeleted()) deleteList.push_back(*iter);
}
@@ -2959,17 +2729,31 @@ bool ManagementAgent::moveDeletedObjectsLH() {
iter != deleteList.rend();
iter++)
{
- ManagementObject* delObj = iter->second;
+ ManagementObject::shared_ptr delObj = iter->second;
assert(delObj->isDeleted());
DeletedObject::shared_ptr dptr(new DeletedObject(delObj, qmf1Support, qmf2Support));
pendingDeletedObjs[dptr->getKey()].push_back(dptr);
managementObjects.erase(iter->first);
- delete iter->second;
}
return !deleteList.empty();
}
+ManagementAgent::EventQueue::Batch::const_iterator ManagementAgent::sendEvents(
+ const EventQueue::Batch& batch)
+{
+ EventQueue::Batch::const_iterator i;
+ for (i = batch.begin(); i != batch.end(); ++i) {
+ DeliverableMessage deliverable (i->second, 0);
+ try {
+ i->first->route(deliverable);
+ } catch(exception& e) {
+ QPID_LOG(warning, "ManagementAgent failed to route event: " << e.what());
+ }
+ }
+ return i;
+}
+
namespace {
QPID_TSS const qpid::broker::ConnectionState* executionContext = 0;
}