diff options
Diffstat (limited to 'cpp/src/qpid/management/ManagementAgent.cpp')
-rw-r--r-- | cpp/src/qpid/management/ManagementAgent.cpp | 1110 |
1 files changed, 447 insertions, 663 deletions
diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp index 474c86ed48..86e9d0be8d 100644 --- a/cpp/src/qpid/management/ManagementAgent.cpp +++ b/cpp/src/qpid/management/ManagementAgent.cpp @@ -28,12 +28,15 @@ #include "qpid/management/ManagementObject.h" #include "qpid/broker/DeliverableMessage.h" #include "qpid/log/Statement.h" -#include <qpid/broker/Message.h> +#include "qpid/broker/Message.h" +#include "qpid/broker/Broker.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/framing/FieldValue.h" #include "qpid/broker/amqp_0_10/MessageTransfer.h" #include "qpid/sys/Time.h" +#include "qpid/sys/Timer.h" #include "qpid/sys/Thread.h" +#include "qpid/sys/PollableQueue.h" #include "qpid/broker/ConnectionState.h" #include "qpid/broker/AclModule.h" #include "qpid/types/Variant.h" @@ -46,6 +49,9 @@ #include <sstream> #include <typeinfo> +#include <boost/bind.hpp> +#include <boost/function.hpp> + namespace qpid { namespace management { @@ -62,22 +68,23 @@ namespace _qmf = qmf::org::apache::qpid::broker; namespace { - const string defaultVendorName("vendor"); - const string defaultProductName("product"); +const size_t qmfV1BufferSize(65536); +const string defaultVendorName("vendor"); +const string defaultProductName("product"); - // Create a valid binding key substring by - // replacing all '.' chars with '_' - const string keyifyNameStr(const string& name) - { - string n2 = name; +// Create a valid binding key substring by +// replacing all '.' chars with '_' +const string keyifyNameStr(const string& name) +{ + string n2 = name; - size_t pos = n2.find('.'); - while (pos != n2.npos) { - n2.replace(pos, 1, "_"); - pos = n2.find('.', pos); - } - return n2; + size_t pos = n2.find('.'); + while (pos != n2.npos) { + n2.replace(pos, 1, "_"); + pos = n2.find('.', pos); } + return n2; +} struct ScopedManagementContext { @@ -90,6 +97,32 @@ struct ScopedManagementContext setManagementExecutionContext(0); } }; + +typedef boost::function0<void> FireFunction; +struct Periodic : public qpid::sys::TimerTask +{ + FireFunction fireFunction; + qpid::sys::Timer* timer; + + Periodic (FireFunction f, qpid::sys::Timer* t, uint32_t seconds); + virtual ~Periodic (); + void fire (); +}; + +Periodic::Periodic (FireFunction f, qpid::sys::Timer* t, uint32_t _seconds) + : TimerTask(sys::Duration((_seconds ? _seconds : 1) * sys::TIME_SEC), + "ManagementAgent::periodicProcessing"), + fireFunction(f), timer(t) {} + +Periodic::~Periodic() {} + +void Periodic::fire() +{ + setupNextFire(); + timer->add(this); + fireFunction(); +} + } @@ -113,9 +146,8 @@ ManagementAgent::RemoteAgent::~RemoteAgent () QPID_LOG(debug, "Remote Agent removed bank=[" << brokerBank << "." << agentBank << "]"); if (mgmtObject != 0) { mgmtObject->resourceDestroy(); - agent.deleteObjectNowLH(mgmtObject->getObjectId()); - delete mgmtObject; - mgmtObject = 0; + agent.deleteObjectNow(mgmtObject->getObjectId()); + mgmtObject.reset(); } } @@ -124,8 +156,7 @@ ManagementAgent::ManagementAgent (const bool qmfV1, const bool qmfV2) : startTime(sys::now()), suppressed(false), disallowAllV1Methods(false), vendorNameKey(defaultVendorName), productNameKey(defaultProductName), - qmf1Support(qmfV1), qmf2Support(qmfV2), maxReplyObjs(100), - msgBuffer(MA_BUFFER_SIZE), memstat(0) + qmf1Support(qmfV1), qmf2Support(qmfV2), maxReplyObjs(100) { nextObjectId = 1; brokerBank = 1; @@ -136,7 +167,7 @@ ManagementAgent::ManagementAgent (const bool qmfV1, const bool qmfV2) : attrMap["_vendor"] = defaultVendorName; attrMap["_product"] = defaultProductName; - memstat = new qmf::org::apache::qpid::broker::Memory(this, 0, "amqp-broker"); + memstat = _qmf::Memory::shared_ptr(new qmf::org::apache::qpid::broker::Memory(this, 0, "amqp-broker")); addObject(memstat, "amqp-broker"); } @@ -155,15 +186,6 @@ ManagementAgent::~ManagementAgent () v2Direct.reset(); remoteAgents.clear(); - - moveNewObjectsLH(); - for (ManagementObjectMap::iterator iter = managementObjects.begin (); - iter != managementObjects.end (); - iter++) { - ManagementObject* object = iter->second; - delete object; - } - managementObjects.clear(); } } @@ -176,6 +198,11 @@ void ManagementAgent::configure(const string& _dataDir, bool _publish, uint16_t broker = _broker; threadPoolSize = _threads; ManagementObject::maxThreads = threadPoolSize; + sendQueue.reset( + new EventQueue(boost::bind(&ManagementAgent::sendEvents, this, _1), broker->getPoller())); + sendQueue->start(); + timer = &broker->getTimer(); + timer->add(new Periodic(boost::bind(&ManagementAgent::periodicProcessing, this), timer, interval)); // Get from file or generate and save to file. if (dataDir.empty()) @@ -218,13 +245,6 @@ void ManagementAgent::configure(const string& _dataDir, bool _publish, uint16_t } } -void ManagementAgent::pluginsInitialized() { - // Do this here so cluster plugin has the chance to set up the timer. - timer = &broker->getClusterTimer(); - timer->add(new Periodic(*this, interval)); -} - - void ManagementAgent::setName(const string& vendor, const string& product, const string& instance) { if (vendor.find(':') != vendor.npos) { @@ -245,13 +265,13 @@ void ManagementAgent::setName(const string& vendor, const string& product, const } else inst = instance; - name_address = vendor + ":" + product + ":" + inst; - attrMap["_instance"] = inst; - attrMap["_name"] = name_address; + name_address = vendor + ":" + product + ":" + inst; + attrMap["_instance"] = inst; + attrMap["_name"] = name_address; - vendorNameKey = keyifyNameStr(vendor); - productNameKey = keyifyNameStr(product); - instanceNameKey = keyifyNameStr(inst); + vendorNameKey = keyifyNameStr(vendor); + productNameKey = keyifyNameStr(product); + instanceNameKey = keyifyNameStr(inst); } @@ -316,11 +336,12 @@ void ManagementAgent::registerEvent (const string& packageName, } // Deprecated: V1 objects -ObjectId ManagementAgent::addObject(ManagementObject* object, uint64_t persistId, bool persistent) +ObjectId ManagementAgent::addObject(ManagementObject::shared_ptr object, uint64_t persistId, bool persistent) { uint16_t sequence; uint64_t objectNum; + sys::Mutex::ScopedLock lock(addLock); sequence = persistent ? 0 : bootSequence; objectNum = persistId ? persistId : nextObjectId++; @@ -329,17 +350,14 @@ ObjectId ManagementAgent::addObject(ManagementObject* object, uint64_t persistId object->setObjectId(objId); - { - sys::Mutex::ScopedLock lock(addLock); - newManagementObjects.push_back(object); - } + newManagementObjects.push_back(object); QPID_LOG(debug, "Management object (V1) added: " << objId.getV2Key()); return objId; } -ObjectId ManagementAgent::addObject(ManagementObject* object, +ObjectId ManagementAgent::addObject(ManagementObject::shared_ptr object, const string& key, bool persistent) { @@ -369,12 +387,11 @@ void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severi "emerg", "alert", "crit", "error", "warn", "note", "info", "debug" }; - sys::Mutex::ScopedLock lock (userLock); uint8_t sev = (severity == SEV_DEFAULT) ? event.getSeverity() : (uint8_t) severity; if (qmf1Support) { - Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + char buffer[qmfV1BufferSize]; + Buffer outBuffer(buffer, qmfV1BufferSize); encodeHeader(outBuffer, 'e'); outBuffer.putShortString(event.getPackageName()); @@ -385,9 +402,7 @@ void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severi string sBuf; event.encode(sBuf); outBuffer.putRawData(sBuf); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - sendBufferLH(outBuffer, outLen, mExchange, + sendBuffer(outBuffer, mExchange, "console.event.1.0." + event.getPackageName() + "." + event.getEventName()); QPID_LOG(debug, "SEND raiseEvent (v1) class=" << event.getPackageName() << "." << event.getEventName()); } @@ -426,25 +441,11 @@ void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severi Variant::List list_; list_.push_back(map_); ListCodec::encode(list_, content); - sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); + sendBuffer(content, "", headers, "amqp/list", v2Topic, key.str()); QPID_LOG(debug, "SEND raiseEvent (v2) class=" << event.getPackageName() << "." << event.getEventName()); } } -ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds) - : TimerTask(sys::Duration((_seconds ? _seconds : 1) * sys::TIME_SEC), - "ManagementAgent::periodicProcessing"), - agent(_agent) {} - -ManagementAgent::Periodic::~Periodic() {} - -void ManagementAgent::Periodic::fire() -{ - setupNextFire(); - agent.timer->add(this); - agent.periodicProcessing(); -} - void ManagementAgent::clientAdded (const string& routingKey) { sys::Mutex::ScopedLock lock(userLock); @@ -480,28 +481,14 @@ void ManagementAgent::clientAdded (const string& routingKey) while (rkeys.size()) { char localBuffer[16]; Buffer outBuffer(localBuffer, 16); - uint32_t outLen; encodeHeader(outBuffer, 'x'); - outLen = outBuffer.getPosition(); - outBuffer.reset(); - sendBufferLH(outBuffer, outLen, dExchange, rkeys.front()); + sendBuffer(outBuffer, dExchange, rkeys.front()); QPID_LOG(debug, "SEND ConsoleAddedIndication to=" << rkeys.front()); rkeys.pop_front(); } } -void ManagementAgent::clusterUpdate() { - // Called on all cluster memebers when a new member joins a cluster. - // Set clientWasAdded so that on the next periodicProcessing we will do - // a full update on all cluster members. - sys::Mutex::ScopedLock l(userLock); - moveNewObjectsLH(); // keep lists consistent with updater/updatee. - moveDeletedObjectsLH(); - clientWasAdded = true; - debugSnapshot("Cluster member joined"); -} - void ManagementAgent::encodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq) { buf.putOctet ('A'); @@ -523,12 +510,9 @@ bool ManagementAgent::checkHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq) return h1 == 'A' && h2 == 'M' && h3 == '2'; } -// NOTE WELL: assumes userLock is held by caller (LH) -// NOTE EVEN WELLER: drops this lock when delivering the message!!! -void ManagementAgent::sendBufferLH(Buffer& buf, - uint32_t length, - qpid::broker::Exchange::shared_ptr exchange, - const string& routingKey) +void ManagementAgent::sendBuffer(Buffer& buf, + qpid::broker::Exchange::shared_ptr exchange, + const string& routingKey) { if (suppressed) { QPID_LOG(debug, "Suppressing management message to " << routingKey); @@ -541,6 +525,8 @@ void ManagementAgent::sendBufferLH(Buffer& buf, AMQFrame header((AMQHeaderBody())); AMQFrame content((AMQContentBody())); + size_t length = buf.getPosition(); + buf.reset(); content.castBody<AMQContentBody>()->decode(buf, length); method.setEof(false); @@ -560,42 +546,30 @@ void ManagementAgent::sendBufferLH(Buffer& buf, dp->setRoutingKey(routingKey); transfer->getFrames().append(content); - Message msg(transfer, transfer); msg.setIsManagementMessage(true); - - { - sys::Mutex::ScopedUnlock u(userLock); - - DeliverableMessage deliverable (msg, 0); - try { - exchange->route(deliverable); - } catch(exception&) {} - } + sendQueue->push(make_pair(exchange, msg)); buf.reset(); } -void ManagementAgent::sendBufferLH(Buffer& buf, - uint32_t length, - const string& exchange, - const string& routingKey) +void ManagementAgent::sendBuffer(Buffer& buf, + const string& exchange, + const string& routingKey) { qpid::broker::Exchange::shared_ptr ex(broker->getExchanges().get(exchange)); if (ex.get() != 0) - sendBufferLH(buf, length, ex, routingKey); + sendBuffer(buf, ex, routingKey); } -// NOTE WELL: assumes userLock is held by caller (LH) -// NOTE EVEN WELLER: drops this lock when delivering the message!!! -void ManagementAgent::sendBufferLH(const string& data, - const string& cid, - const Variant::Map& headers, - const string& content_type, - qpid::broker::Exchange::shared_ptr exchange, - const string& routingKey, - uint64_t ttl_msec) +void ManagementAgent::sendBuffer(const string& data, + const string& cid, + const Variant::Map& headers, + const string& content_type, + qpid::broker::Exchange::shared_ptr exchange, + const string& routingKey, + uint64_t ttl_msec) { Variant::Map::const_iterator i; @@ -643,34 +617,27 @@ void ManagementAgent::sendBufferLH(const string& data, msg.setIsManagementMessage(true); msg.computeExpiration(broker->getExpiryPolicy()); - { - sys::Mutex::ScopedUnlock u(userLock); - - DeliverableMessage deliverable (msg, 0); - try { - exchange->route(deliverable); - } catch(exception&) {} - } + sendQueue->push(make_pair(exchange, msg)); } -void ManagementAgent::sendBufferLH(const string& data, - const string& cid, - const Variant::Map& headers, - const string& content_type, - const string& exchange, - const string& routingKey, - uint64_t ttl_msec) +void ManagementAgent::sendBuffer(const string& data, + const string& cid, + const Variant::Map& headers, + const string& content_type, + const string& exchange, + const string& routingKey, + uint64_t ttl_msec) { qpid::broker::Exchange::shared_ptr ex(broker->getExchanges().get(exchange)); if (ex.get() != 0) - sendBufferLH(data, cid, headers, content_type, ex, routingKey, ttl_msec); + sendBuffer(data, cid, headers, content_type, ex, routingKey, ttl_msec); } /** Objects that have been added since the last periodic poll are temporarily * saved in the newManagementObjects list. This allows objects to be - * added without needing to block on the userLock (addLock is used instead). + * added without needing to block on the userLock (objectLock is used instead). * These new objects need to be integrated into the object database * (managementObjects) *before* they can be properly managed. This routine * performs the integration. @@ -680,34 +647,33 @@ void ManagementAgent::sendBufferLH(const string& data, * duplicate object ids. To avoid clashes, don't put deleted objects * into the active object database. */ -void ManagementAgent::moveNewObjectsLH() +void ManagementAgent::moveNewObjects() { - sys::Mutex::ScopedLock lock (addLock); + sys::Mutex::ScopedLock lock(addLock); + sys::Mutex::ScopedLock objLock (objectLock); while (!newManagementObjects.empty()) { - ManagementObject *object = newManagementObjects.back(); + ManagementObject::shared_ptr object = newManagementObjects.back(); newManagementObjects.pop_back(); if (object->isDeleted()) { DeletedObject::shared_ptr dptr(new DeletedObject(object, qmf1Support, qmf2Support)); pendingDeletedObjs[dptr->getKey()].push_back(dptr); - delete object; } else { // add to active object list, check for duplicates. ObjectId oid = object->getObjectId(); ManagementObjectMap::iterator destIter = managementObjects.find(oid); if (destIter != managementObjects.end()) { // duplicate found. It is OK if the old object has been marked // deleted, just replace the old with the new. - ManagementObject *oldObj = destIter->second; - if (oldObj->isDeleted()) { - DeletedObject::shared_ptr dptr(new DeletedObject(oldObj, qmf1Support, qmf2Support)); - pendingDeletedObjs[dptr->getKey()].push_back(dptr); - delete oldObj; - } else { + ManagementObject::shared_ptr oldObj = destIter->second; + if (!oldObj->isDeleted()) { // Duplicate non-deleted objects? This is a user error - oids must be unique. // for now, leak the old object (safer than deleting - may still be referenced) // and complain loudly... QPID_LOG(error, "Detected two management objects with the same identifier: " << oid); + oldObj->resourceDestroy(); } + DeletedObject::shared_ptr dptr(new DeletedObject(oldObj, qmf1Support, qmf2Support)); + pendingDeletedObjs[dptr->getKey()].push_back(dptr); // QPID-3666: be sure to replace the -index- also, as non-key members of // the index object may be different for the new object! So erase the // entry, rather than []= assign here: @@ -720,32 +686,41 @@ void ManagementAgent::moveNewObjectsLH() void ManagementAgent::periodicProcessing (void) { -#define BUFSIZE 65536 #define HEADROOM 4096 debugSnapshot("Management agent periodic processing"); sys::Mutex::ScopedLock lock (userLock); - uint32_t contentSize; string routingKey; string sBuf; - moveNewObjectsLH(); + moveNewObjects(); // // If we're publishing updates, get the latest memory statistics and uptime now // if (publish) { uint64_t uptime = sys::Duration(startTime, sys::now()); - static_cast<_qmf::Broker*>(broker->GetManagementObject())->set_uptime(uptime); - qpid::sys::MemStat::loadMemInfo(memstat); + boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject())->set_uptime(uptime); + qpid::sys::MemStat::loadMemInfo(memstat.get()); + } + + // + // Use a copy of the management object map to avoid holding the objectLock + // + ManagementObjectVector localManagementObjects; + { + sys::Mutex::ScopedLock objLock(objectLock); + std::transform(managementObjects.begin(), managementObjects.end(), + std::back_inserter(localManagementObjects), + boost::bind(&ManagementObjectMap::value_type::second, _1)); } // // Clear the been-here flag on all objects in the map. // - for (ManagementObjectMap::iterator iter = managementObjects.begin(); - iter != managementObjects.end(); + for (ManagementObjectVector::iterator iter = localManagementObjects.begin(); + iter != localManagementObjects.end(); iter++) { - ManagementObject* object = iter->second; + ManagementObject::shared_ptr object = *iter; object->setFlags(0); if (clientWasAdded) { object->setForcePublish(true); @@ -760,22 +735,25 @@ void ManagementAgent::periodicProcessing (void) // if we sent the active update first, _then_ the delete update, clients // would incorrectly think the object was deleted. See QPID-2997 // - bool objectsDeleted = moveDeletedObjectsLH(); + bool objectsDeleted = moveDeletedObjects(); + PendingDeletedObjsMap localPendingDeletedObjs; + { + sys::Mutex::ScopedLock objLock(objectLock); + localPendingDeletedObjs.swap(pendingDeletedObjs); + } // // If we are not publishing updates, just clear the pending deletes. There's no // need to tell anybody. // if (!publish) - pendingDeletedObjs.clear(); - - if (!pendingDeletedObjs.empty()) { - // use a temporary copy of the pending deletes so dropping the lock when - // the buffer is sent is safe. - PendingDeletedObjsMap tmp(pendingDeletedObjs); - pendingDeletedObjs.clear(); + localPendingDeletedObjs.clear(); - for (PendingDeletedObjsMap::iterator mIter = tmp.begin(); mIter != tmp.end(); mIter++) { + ResizableBuffer msgBuffer(qmfV1BufferSize); + if (!localPendingDeletedObjs.empty()) { + for (PendingDeletedObjsMap::iterator mIter = localPendingDeletedObjs.begin(); + mIter != localPendingDeletedObjs.end(); + mIter++) { std::string packageName; std::string className; msgBuffer.reset(); @@ -807,11 +785,10 @@ void ManagementAgent::periodicProcessing (void) } if (v1Objs >= maxReplyObjs) { v1Objs = 0; - contentSize = msgBuffer.getSize(); stringstream key; key << "console.obj.1.0." << packageName << "." << className; - msgBuffer.reset(); - sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); // UNLOCKS USERLOCK + size_t contentSize = msgBuffer.getPosition(); + sendBuffer(msgBuffer, mExchange, key.str()); QPID_LOG(debug, "SEND V1 Multicast ContentInd V1 (delete) to=" << key.str() << " len=" << contentSize); } @@ -840,7 +817,7 @@ void ManagementAgent::periodicProcessing (void) headers["qmf.content"] = "_data"; headers["qmf.agent"] = name_address; - sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); // UNLOCKS USERLOCK + sendBuffer(content, "", headers, "amqp/list", v2Topic, key.str(), 0); QPID_LOG(debug, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length()); } } @@ -850,11 +827,10 @@ void ManagementAgent::periodicProcessing (void) // send any remaining objects... if (v1Objs) { - contentSize = BUFSIZE - msgBuffer.available(); stringstream key; key << "console.obj.1.0." << packageName << "." << className; - msgBuffer.reset(); - sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); // UNLOCKS USERLOCK + size_t contentSize = msgBuffer.getPosition(); + sendBuffer(msgBuffer, mExchange, key.str()); QPID_LOG(debug, "SEND V1 Multicast ContentInd V1 (delete) to=" << key.str() << " len=" << contentSize); } @@ -877,7 +853,7 @@ void ManagementAgent::periodicProcessing (void) headers["qmf.content"] = "_data"; headers["qmf.agent"] = name_address; - sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); // UNLOCKS USERLOCK + sendBuffer(content, "", headers, "amqp/list", v2Topic, key.str(), 0); QPID_LOG(debug, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length()); } } @@ -885,9 +861,7 @@ void ManagementAgent::periodicProcessing (void) } // - // Process the entire object map. Remember: we drop the userLock each time we call - // sendBuffer(). This allows the managementObjects map to be altered during the - // sendBuffer() call, so always restart the search after a sendBuffer() call + // Process the entire object map. // // If publish is disabled, don't send any updates. // @@ -897,14 +871,14 @@ void ManagementAgent::periodicProcessing (void) uint32_t pcount; uint32_t scount; uint32_t v1Objs, v2Objs; - ManagementObjectMap::iterator baseIter; + ManagementObjectVector::iterator baseIter; std::string packageName; std::string className; - for (baseIter = managementObjects.begin(); - baseIter != managementObjects.end(); + for (baseIter = localManagementObjects.begin(); + baseIter != localManagementObjects.end(); baseIter++) { - ManagementObject* baseObject = baseIter->second; + ManagementObject::shared_ptr baseObject = *baseIter; // // Skip until we find a base object requiring processing... // @@ -915,7 +889,7 @@ void ManagementAgent::periodicProcessing (void) } } - if (baseIter == managementObjects.end()) + if (baseIter == localManagementObjects.end()) break; // done - all objects processed pcount = scount = 0; @@ -924,12 +898,12 @@ void ManagementAgent::periodicProcessing (void) list_.clear(); msgBuffer.reset(); - for (ManagementObjectMap::iterator iter = baseIter; - iter != managementObjects.end(); + for (ManagementObjectVector::iterator iter = baseIter; + iter != localManagementObjects.end(); iter++) { msgBuffer.makeAvailable(HEADROOM); // Make sure there's buffer space - ManagementObject* baseObject = baseIter->second; - ManagementObject* object = iter->second; + ManagementObject::shared_ptr baseObject = *baseIter; + ManagementObject::shared_ptr object = *iter; bool send_stats, send_props; if (baseObject->isSameClass(*object) && object->getFlags() == 0) { object->setFlags(1); @@ -1004,12 +978,11 @@ void ManagementAgent::periodicProcessing (void) if (pcount || scount) { if (qmf1Support) { - contentSize = BUFSIZE - msgBuffer.available(); - if (contentSize > 0) { + if (msgBuffer.getPosition() > 0) { stringstream key; key << "console.obj.1.0." << packageName << "." << className; - msgBuffer.reset(); - sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); // UNLOCKS USERLOCK + size_t contentSize = msgBuffer.getPosition(); + sendBuffer(msgBuffer, mExchange, key.str()); QPID_LOG(debug, "SEND V1 Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount @@ -1035,7 +1008,7 @@ void ManagementAgent::periodicProcessing (void) headers["qmf.content"] = "_data"; headers["qmf.agent"] = name_address; - sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); // UNLOCKS USERLOCK + sendBuffer(content, "", headers, "amqp/list", v2Topic, key.str(), 0); QPID_LOG(debug, "SEND Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount @@ -1045,21 +1018,21 @@ void ManagementAgent::periodicProcessing (void) } } // end processing updates for all objects - if (objectsDeleted) deleteOrphanedAgentsLH(); + if (objectsDeleted) { + sys::Mutex::ScopedLock lock (userLock); + deleteOrphanedAgentsLH(); + } // heartbeat generation. Note that heartbeats need to be sent even if publish is disabled. if (qmf1Support) { - uint32_t contentSize; - char msgChars[BUFSIZE]; - Buffer msgBuffer(msgChars, BUFSIZE); + char msgChars[qmfV1BufferSize]; + Buffer msgBuffer(msgChars, qmfV1BufferSize); encodeHeader(msgBuffer, 'h'); msgBuffer.putLongLong(uint64_t(sys::Duration(sys::EPOCH, sys::now()))); - contentSize = BUFSIZE - msgBuffer.available (); - msgBuffer.reset (); routingKey = "console.heartbeat.1.0"; - sendBufferLH(msgBuffer, contentSize, mExchange, routingKey); + sendBuffer(msgBuffer, mExchange, routingKey); QPID_LOG(debug, "SEND HeartbeatInd to=" << routingKey); } @@ -1087,23 +1060,26 @@ void ManagementAgent::periodicProcessing (void) // Set TTL (in msecs) on outgoing heartbeat indications based on the interval // time to prevent stale heartbeats from getting to the consoles. - sendBufferLH(content, "", headers, "amqp/map", v2Topic, addr_key.str(), interval * 2 * 1000); + sendBuffer(content, "", headers, "amqp/map", v2Topic, addr_key.str(), interval * 2 * 1000); QPID_LOG(debug, "SENT AgentHeartbeat name=" << name_address); } } -void ManagementAgent::deleteObjectNowLH(const ObjectId& oid) +void ManagementAgent::deleteObjectNow(const ObjectId& oid) { - ManagementObjectMap::iterator iter = managementObjects.find(oid); - if (iter == managementObjects.end()) - return; - ManagementObject* object = iter->second; - if (!object->isDeleted()) - return; + ManagementObject::shared_ptr object; + { + sys::Mutex::ScopedLock lock(objectLock); + ManagementObjectMap::iterator iter = managementObjects.find(oid); + if (iter == managementObjects.end()) + return; + object = iter->second; + if (!object->isDeleted()) + return; + managementObjects.erase(oid); + } - // since sendBufferLH drops the userLock, don't call it until we - // are done manipulating the object. #define DNOW_BUFSIZE 2048 char msgChars[DNOW_BUFSIZE]; Buffer msgBuffer(msgChars, DNOW_BUFSIZE); @@ -1139,15 +1115,12 @@ void ManagementAgent::deleteObjectNowLH(const ObjectId& oid) v2key << "." << instanceNameKey; } - object = 0; - managementObjects.erase(oid); + object.reset(); // object deleted, ok to drop lock now. if (publish && qmf1Support) { - uint32_t contentSize = msgBuffer.getPosition(); - msgBuffer.reset(); - sendBufferLH(msgBuffer, contentSize, mExchange, v1key.str()); + sendBuffer(msgBuffer, mExchange, v1key.str()); QPID_LOG(debug, "SEND Immediate(delete) ContentInd to=" << v1key.str()); } @@ -1160,29 +1133,26 @@ void ManagementAgent::deleteObjectNowLH(const ObjectId& oid) string content; ListCodec::encode(list_, content); - sendBufferLH(content, "", headers, "amqp/list", v2Topic, v2key.str()); + sendBuffer(content, "", headers, "amqp/list", v2Topic, v2key.str(), 0); QPID_LOG(debug, "SEND Immediate(delete) ContentInd to=" << v2key.str()); } } -void ManagementAgent::sendCommandCompleteLH(const string& replyToKey, uint32_t sequence, - uint32_t code, const string& text) +void ManagementAgent::sendCommandComplete(const string& replyToKey, uint32_t sequence, + uint32_t code, const string& text) { - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + ResizableBuffer outBuffer (qmfV1BufferSize); encodeHeader (outBuffer, 'z', sequence); outBuffer.putLong (code); outBuffer.putShortString (text); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - sendBufferLH(outBuffer, outLen, dExchange, replyToKey); + sendBuffer(outBuffer, dExchange, replyToKey); QPID_LOG(debug, "SEND CommandCompleteInd code=" << code << " text=" << text << " to=" << replyToKey << " seq=" << sequence); } -void ManagementAgent::sendExceptionLH(const string& rte, const string& rtk, const string& cid, - const string& text, uint32_t code, bool viaLocal) +void ManagementAgent::sendException(const string& rte, const string& rtk, const string& cid, + const string& text, uint32_t code, bool viaLocal) { static const string addr_exchange("qmf.default.direct"); @@ -1200,7 +1170,7 @@ void ManagementAgent::sendExceptionLH(const string& rte, const string& rtk, cons map["_values"] = values; MapCodec::encode(map, content); - sendBufferLH(content, cid, headers, "amqp/map", rte, rtk); + sendBuffer(content, cid, headers, "amqp/map", rte, rtk); QPID_LOG(debug, "SENT Exception code=" << code <<" text=" << text); } @@ -1211,7 +1181,6 @@ bool ManagementAgent::dispatchCommand (Deliverable& deliverable, const bool topic, int qmfVersion) { - sys::Mutex::ScopedLock lock (userLock); Message& msg = ((DeliverableMessage&) deliverable).getMessage (); if (topic && qmfVersion == 1) { @@ -1225,23 +1194,23 @@ bool ManagementAgent::dispatchCommand (Deliverable& deliverable, // schema.# if (routingKey == "broker") { - dispatchAgentCommandLH(msg); + dispatchAgentCommand(msg); return false; } if (routingKey.length() > 6) { if (routingKey.compare(0, 9, "agent.1.0") == 0) { - dispatchAgentCommandLH(msg); + dispatchAgentCommand(msg); return false; } if (routingKey.compare(0, 8, "agent.1.") == 0) { - return authorizeAgentMessageLH(msg); + return authorizeAgentMessage(msg); } if (routingKey.compare(0, 7, "schema.") == 0) { - dispatchAgentCommandLH(msg); + dispatchAgentCommand(msg); return true; } } @@ -1253,7 +1222,7 @@ bool ManagementAgent::dispatchCommand (Deliverable& deliverable, // Intercept messages bound to: // "console.ind.locate.# - process these messages, and also allow them to be forwarded. if (routingKey == "console.request.agent_locate") { - dispatchAgentCommandLH(msg); + dispatchAgentCommand(msg); return true; } @@ -1264,7 +1233,7 @@ bool ManagementAgent::dispatchCommand (Deliverable& deliverable, // "<name_address>" - the broker agent's proper name // and do not forward them futher if (routingKey == "broker" || routingKey == name_address) { - dispatchAgentCommandLH(msg, routingKey == "broker"); + dispatchAgentCommand(msg, routingKey == "broker"); return false; } } @@ -1273,16 +1242,15 @@ bool ManagementAgent::dispatchCommand (Deliverable& deliverable, return true; } -void ManagementAgent::handleMethodRequestLH(Buffer& inBuffer, const string& replyToKey, uint32_t sequence, const ConnectionToken* connToken) +void ManagementAgent::handleMethodRequest(Buffer& inBuffer, const string& replyToKey, uint32_t sequence, const ConnectionToken* connToken) { - moveNewObjectsLH(); + moveNewObjects(); string methodName; string packageName; string className; uint8_t hash[16]; - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + ResizableBuffer outBuffer (qmfV1BufferSize); AclModule* acl = broker->getAcl(); string inArgs; @@ -1304,9 +1272,7 @@ void ManagementAgent::handleMethodRequestLH(Buffer& inBuffer, const string& repl if (disallowAllV1Methods) { outBuffer.putLong(Manageable::STATUS_FORBIDDEN); outBuffer.putMediumString("QMFv1 methods forbidden on this broker, use QMFv2"); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - sendBufferLH(outBuffer, outLen, dExchange, replyToKey); + sendBuffer(outBuffer, dExchange, replyToKey); QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN reason='All QMFv1 Methods Forbidden' seq=" << sequence); return; } @@ -1315,9 +1281,7 @@ void ManagementAgent::handleMethodRequestLH(Buffer& inBuffer, const string& repl if (i != disallowed.end()) { outBuffer.putLong(Manageable::STATUS_FORBIDDEN); outBuffer.putMediumString(i->second); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - sendBufferLH(outBuffer, outLen, dExchange, replyToKey); + sendBuffer(outBuffer, dExchange, replyToKey); QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << sequence); return; } @@ -1331,30 +1295,34 @@ void ManagementAgent::handleMethodRequestLH(Buffer& inBuffer, const string& repl if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_METHOD, methodName, ¶ms)) { outBuffer.putLong(Manageable::STATUS_FORBIDDEN); outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN)); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - sendBufferLH(outBuffer, outLen, dExchange, replyToKey); + sendBuffer(outBuffer, dExchange, replyToKey); QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence); return; } } - ManagementObjectMap::iterator iter = numericFind(objId); - if (iter == managementObjects.end() || iter->second->isDeleted()) { + ManagementObject::shared_ptr object; + { + sys::Mutex::ScopedLock lock(objectLock); + ManagementObjectMap::iterator iter = numericFind(objId); + if (iter != managementObjects.end()) + object = iter->second; + } + + if (!object || object->isDeleted()) { outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT); outBuffer.putMediumString(Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT)); } else { - if ((iter->second->getPackageName() != packageName) || - (iter->second->getClassName() != className)) { + if ((object->getPackageName() != packageName) || + (object->getClassName() != className)) { outBuffer.putLong (Manageable::STATUS_PARAMETER_INVALID); outBuffer.putMediumString(Manageable::StatusText (Manageable::STATUS_PARAMETER_INVALID)); } else { uint32_t pos = outBuffer.getPosition(); try { - sys::Mutex::ScopedUnlock u(userLock); string outBuf; - iter->second->doMethod(methodName, inArgs, outBuf, userId); + object->doMethod(methodName, inArgs, outBuf, userId); outBuffer.putRawData(outBuf); } catch(exception& e) { outBuffer.setPosition(pos);; @@ -1364,17 +1332,15 @@ void ManagementAgent::handleMethodRequestLH(Buffer& inBuffer, const string& repl } } - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - sendBufferLH(outBuffer, outLen, dExchange, replyToKey); + sendBuffer(outBuffer, dExchange, replyToKey); QPID_LOG(debug, "SEND MethodResponse (v1) to=" << replyToKey << " seq=" << sequence); } -void ManagementAgent::handleMethodRequestLH (const string& body, const string& rte, const string& rtk, - const string& cid, const ConnectionToken* connToken, bool viaLocal) +void ManagementAgent::handleMethodRequest (const string& body, const string& rte, const string& rtk, + const string& cid, const ConnectionToken* connToken, bool viaLocal) { - moveNewObjectsLH(); + moveNewObjects(); string methodName; Variant::Map inMap; @@ -1393,8 +1359,8 @@ void ManagementAgent::handleMethodRequestLH (const string& body, const string& r if ((oid = inMap.find("_object_id")) == inMap.end() || (mid = inMap.find("_method_name")) == inMap.end()) { - sendExceptionLH(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID), - Manageable::STATUS_PARAMETER_INVALID, viaLocal); + sendException(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID), + Manageable::STATUS_PARAMETER_INVALID, viaLocal); return; } @@ -1412,16 +1378,22 @@ void ManagementAgent::handleMethodRequestLH (const string& body, const string& r inArgs = (mid->second).asMap(); } } catch(exception& e) { - sendExceptionLH(rte, rtk, cid, e.what(), Manageable::STATUS_EXCEPTION, viaLocal); + sendException(rte, rtk, cid, e.what(), Manageable::STATUS_EXCEPTION, viaLocal); return; } - ManagementObjectMap::iterator iter = managementObjects.find(objId); + ManagementObject::shared_ptr object; + { + sys::Mutex::ScopedLock lock(objectLock); + ManagementObjectMap::iterator iter = managementObjects.find(objId); + if (iter != managementObjects.end()) + object = iter->second; + } - if (iter == managementObjects.end() || iter->second->isDeleted()) { + if (!object || object->isDeleted()) { stringstream estr; estr << "No object found with ID=" << objId; - sendExceptionLH(rte, rtk, cid, estr.str(), 1, viaLocal); + sendException(rte, rtk, cid, estr.str(), 1, viaLocal); return; } @@ -1429,34 +1401,33 @@ void ManagementAgent::handleMethodRequestLH (const string& body, const string& r AclModule* acl = broker->getAcl(); DisallowedMethods::const_iterator i; - i = disallowed.find(make_pair(iter->second->getClassName(), methodName)); + i = disallowed.find(make_pair(object->getClassName(), methodName)); if (i != disallowed.end()) { - sendExceptionLH(rte, rtk, cid, i->second, Manageable::STATUS_FORBIDDEN, viaLocal); + sendException(rte, rtk, cid, i->second, Manageable::STATUS_FORBIDDEN, viaLocal); return; } string userId = ((const qpid::broker::ConnectionState*) connToken)->getUserId(); if (acl != 0) { map<acl::Property, string> params; - params[acl::PROP_SCHEMAPACKAGE] = iter->second->getPackageName(); - params[acl::PROP_SCHEMACLASS] = iter->second->getClassName(); + params[acl::PROP_SCHEMAPACKAGE] = object->getPackageName(); + params[acl::PROP_SCHEMACLASS] = object->getClassName(); if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_METHOD, methodName, ¶ms)) { - sendExceptionLH(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_FORBIDDEN), - Manageable::STATUS_FORBIDDEN, viaLocal); + sendException(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_FORBIDDEN), + Manageable::STATUS_FORBIDDEN, viaLocal); return; } } // invoke the method - QPID_LOG(debug, "RECV MethodRequest (v2) class=" << iter->second->getPackageName() - << ":" << iter->second->getClassName() << " method=" << + QPID_LOG(debug, "RECV MethodRequest (v2) class=" << object->getPackageName() + << ":" << object->getClassName() << " method=" << methodName << " replyTo=" << rte << "/" << rtk << " objId=" << objId << " inArgs=" << inArgs); try { - sys::Mutex::ScopedUnlock u(userLock); - iter->second->doMethod(methodName, inArgs, callMap, userId); + object->doMethod(methodName, inArgs, callMap, userId); errorCode = callMap["_status_code"].asUint32(); if (errorCode == 0) { outMap["_arguments"] = Variant::Map(); @@ -1467,62 +1438,59 @@ void ManagementAgent::handleMethodRequestLH (const string& body, const string& r } else error = callMap["_status_text"].asString(); } catch(exception& e) { - sendExceptionLH(rte, rtk, cid, e.what(), Manageable::STATUS_EXCEPTION, viaLocal); + sendException(rte, rtk, cid, e.what(), Manageable::STATUS_EXCEPTION, viaLocal); return; } if (errorCode != 0) { - sendExceptionLH(rte, rtk, cid, error, errorCode, viaLocal); + sendException(rte, rtk, cid, error, errorCode, viaLocal); return; } MapCodec::encode(outMap, content); - sendBufferLH(content, cid, headers, "amqp/map", rte, rtk); + sendBuffer(content, cid, headers, "amqp/map", rte, rtk); QPID_LOG(debug, "SEND MethodResponse (v2) to=" << rte << "/" << rtk << " seq=" << cid << " map=" << outMap); } -void ManagementAgent::handleBrokerRequestLH (Buffer&, const string& replyToKey, uint32_t sequence) +void ManagementAgent::handleBrokerRequest (Buffer&, const string& replyToKey, uint32_t sequence) { - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + ResizableBuffer outBuffer (qmfV1BufferSize); QPID_LOG(debug, "RECV BrokerRequest replyTo=" << replyToKey); encodeHeader (outBuffer, 'b', sequence); uuid.encode (outBuffer); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - sendBufferLH(outBuffer, outLen, dExchange, replyToKey); + sendBuffer(outBuffer, dExchange, replyToKey); QPID_LOG(debug, "SEND BrokerResponse to=" << replyToKey); } -void ManagementAgent::handlePackageQueryLH (Buffer&, const string& replyToKey, uint32_t sequence) +void ManagementAgent::handlePackageQuery (Buffer&, const string& replyToKey, uint32_t sequence) { QPID_LOG(debug, "RECV PackageQuery replyTo=" << replyToKey); - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + ResizableBuffer outBuffer (qmfV1BufferSize); - for (PackageMap::iterator pIter = packages.begin (); - pIter != packages.end (); - pIter++) { - encodeHeader (outBuffer, 'p', sequence); - encodePackageIndication (outBuffer, pIter); + sys::Mutex::ScopedLock lock(userLock); + for (PackageMap::iterator pIter = packages.begin (); + pIter != packages.end (); + pIter++) + { + encodeHeader (outBuffer, 'p', sequence); + encodePackageIndication (outBuffer, pIter); + } } - outLen = MA_BUFFER_SIZE - outBuffer.available (); - if (outLen) { - outBuffer.reset (); - sendBufferLH(outBuffer, outLen, dExchange, replyToKey); + if (outBuffer.getPosition() > 0) { + sendBuffer(outBuffer, dExchange, replyToKey); QPID_LOG(debug, "SEND PackageInd to=" << replyToKey << " seq=" << sequence); } - sendCommandCompleteLH(replyToKey, sequence); + sendCommandComplete(replyToKey, sequence); } -void ManagementAgent::handlePackageIndLH (Buffer& inBuffer, const string& replyToKey, uint32_t sequence) +void ManagementAgent::handlePackageInd (Buffer& inBuffer, const string& replyToKey, uint32_t sequence) { string packageName; @@ -1530,10 +1498,11 @@ void ManagementAgent::handlePackageIndLH (Buffer& inBuffer, const string& replyT QPID_LOG(debug, "RECV PackageInd package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence); + sys::Mutex::ScopedLock lock(userLock); findOrAddPackageLH(packageName); } -void ManagementAgent::handleClassQueryLH(Buffer& inBuffer, const string& replyToKey, uint32_t sequence) +void ManagementAgent::handleClassQuery(Buffer& inBuffer, const string& replyToKey, uint32_t sequence) { string packageName; @@ -1541,40 +1510,39 @@ void ManagementAgent::handleClassQueryLH(Buffer& inBuffer, const string& replyTo QPID_LOG(debug, "RECV ClassQuery package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence); - PackageMap::iterator pIter = packages.find(packageName); - if (pIter != packages.end()) + typedef std::pair<SchemaClassKey, uint8_t> _ckeyType; + std::list<_ckeyType> classes; { - typedef std::pair<SchemaClassKey, uint8_t> _ckeyType; - std::list<_ckeyType> classes; - ClassMap &cMap = pIter->second; - for (ClassMap::iterator cIter = cMap.begin(); - cIter != cMap.end(); - cIter++) { - if (cIter->second.hasSchema()) { - classes.push_back(make_pair(cIter->first, cIter->second.kind)); + sys::Mutex::ScopedLock lock(userLock); + PackageMap::iterator pIter = packages.find(packageName); + if (pIter != packages.end()) + { + ClassMap &cMap = pIter->second; + for (ClassMap::iterator cIter = cMap.begin(); + cIter != cMap.end(); + cIter++) { + if (cIter->second.hasSchema()) { + classes.push_back(make_pair(cIter->first, cIter->second.kind)); + } } } + } - while (classes.size()) { - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + while (classes.size()) { + ResizableBuffer outBuffer(qmfV1BufferSize); - encodeHeader(outBuffer, 'q', sequence); - encodeClassIndication(outBuffer, packageName, classes.front().first, classes.front().second); - - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - sendBufferLH(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(debug, "SEND ClassInd class=" << packageName << ":" << classes.front().first.name << - "(" << Uuid(classes.front().first.hash) << ") to=" << replyToKey << " seq=" << sequence); - classes.pop_front(); - } + encodeHeader(outBuffer, 'q', sequence); + encodeClassIndication(outBuffer, packageName, classes.front().first, classes.front().second); + sendBuffer(outBuffer, dExchange, replyToKey); + QPID_LOG(debug, "SEND ClassInd class=" << packageName << ":" << classes.front().first.name << + "(" << Uuid(classes.front().first.hash) << ") to=" << replyToKey << " seq=" << sequence); + classes.pop_front(); } - sendCommandCompleteLH(replyToKey, sequence); + sendCommandComplete(replyToKey, sequence); } -void ManagementAgent::handleClassIndLH (Buffer& inBuffer, const string& replyToKey, uint32_t) +void ManagementAgent::handleClassInd (Buffer& inBuffer, const string& replyToKey, uint32_t) { string packageName; SchemaClassKey key; @@ -1587,20 +1555,18 @@ void ManagementAgent::handleClassIndLH (Buffer& inBuffer, const string& replyToK QPID_LOG(debug, "RECV ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << "), replyTo=" << replyToKey); + sys::Mutex::ScopedLock lock(userLock); PackageMap::iterator pIter = findOrAddPackageLH(packageName); ClassMap::iterator cIter = pIter->second.find(key); if (cIter == pIter->second.end() || !cIter->second.hasSchema()) { - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + ResizableBuffer outBuffer (qmfV1BufferSize); uint32_t sequence = nextRequestSequence++; // Schema Request encodeHeader (outBuffer, 'S', sequence); outBuffer.putShortString(packageName); key.encode(outBuffer); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - sendBufferLH(outBuffer, outLen, dExchange, replyToKey); + sendBuffer(outBuffer, dExchange, replyToKey); QPID_LOG(debug, "SEND SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << "), to=" << replyToKey << " seq=" << sequence); @@ -1625,7 +1591,7 @@ void ManagementAgent::SchemaClass::appendSchema(Buffer& buf) buf.putRawData(reinterpret_cast<uint8_t*>(&data[0]), data.size()); } -void ManagementAgent::handleSchemaRequestLH(Buffer& inBuffer, const string& rte, const string& rtk, uint32_t sequence) +void ManagementAgent::handleSchemaRequest(Buffer& inBuffer, const string& rte, const string& rtk, uint32_t sequence) { string packageName; SchemaClassKey key; @@ -1636,34 +1602,32 @@ void ManagementAgent::handleSchemaRequestLH(Buffer& inBuffer, const string& rte, QPID_LOG(debug, "RECV SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << "), replyTo=" << rte << "/" << rtk << " seq=" << sequence); + sys::Mutex::ScopedLock lock(userLock); PackageMap::iterator pIter = packages.find(packageName); if (pIter != packages.end()) { ClassMap& cMap = pIter->second; ClassMap::iterator cIter = cMap.find(key); if (cIter != cMap.end()) { - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + ResizableBuffer outBuffer(qmfV1BufferSize); SchemaClass& classInfo = cIter->second; if (classInfo.hasSchema()) { encodeHeader(outBuffer, 's', sequence); classInfo.appendSchema(outBuffer); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - sendBufferLH(outBuffer, outLen, rte, rtk); + sendBuffer(outBuffer, rte, rtk); QPID_LOG(debug, "SEND SchemaResponse to=" << rte << "/" << rtk << " seq=" << sequence); } else - sendCommandCompleteLH(rtk, sequence, 1, "Schema not available"); + sendCommandComplete(rtk, sequence, 1, "Schema not available"); } else - sendCommandCompleteLH(rtk, sequence, 1, "Class key not found"); + sendCommandComplete(rtk, sequence, 1, "Class key not found"); } else - sendCommandCompleteLH(rtk, sequence, 1, "Package not found"); + sendCommandComplete(rtk, sequence, 1, "Package not found"); } -void ManagementAgent::handleSchemaResponseLH(Buffer& inBuffer, const string& /*replyToKey*/, uint32_t sequence) +void ManagementAgent::handleSchemaResponse(Buffer& inBuffer, const string& /*replyToKey*/, uint32_t sequence) { string packageName; SchemaClassKey key; @@ -1676,6 +1640,7 @@ void ManagementAgent::handleSchemaResponseLH(Buffer& inBuffer, const string& /*r QPID_LOG(debug, "RECV SchemaResponse class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" << " seq=" << sequence); + sys::Mutex::ScopedLock lock(userLock); PackageMap::iterator pIter = packages.find(packageName); if (pIter != packages.end()) { ClassMap& cMap = pIter->second; @@ -1690,14 +1655,11 @@ void ManagementAgent::handleSchemaResponseLH(Buffer& inBuffer, const string& /*r inBuffer.getRawData(reinterpret_cast<uint8_t*>(&cIter->second.data[0]), length); // Publish a class-indication message - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + ResizableBuffer outBuffer(qmfV1BufferSize); encodeHeader(outBuffer, 'q'); encodeClassIndication(outBuffer, pIter->first, cIter->first, cIter->second.kind); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - sendBufferLH(outBuffer, outLen, mExchange, "schema.class"); + sendBuffer(outBuffer, mExchange, "schema.class"); QPID_LOG(debug, "SEND ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" << " to=schema.class"); } @@ -1756,7 +1718,7 @@ void ManagementAgent::deleteOrphanedAgentsLH() remoteAgents.erase(*dIter); } -void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, const string& replyToKey, uint32_t sequence, const ConnectionToken* connToken) +void ManagementAgent::handleAttachRequest (Buffer& inBuffer, const string& replyToKey, uint32_t sequence, const ConnectionToken* connToken) { string label; uint32_t requestedBrokerBank, requestedAgentBank; @@ -1764,12 +1726,14 @@ void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, const string& rep ObjectId connectionRef = ((const ConnectionState*) connToken)->GetManagementObject()->getObjectId(); Uuid systemId; - moveNewObjectsLH(); + moveNewObjects(); + + sys::Mutex::ScopedLock lock(userLock); deleteOrphanedAgentsLH(); RemoteAgentMap::iterator aIter = remoteAgents.find(connectionRef); if (aIter != remoteAgents.end()) { // There already exists an agent on this session. Reject the request. - sendCommandCompleteLH(replyToKey, sequence, 1, "Connection already has remote agent"); + sendCommandComplete(replyToKey, sequence, 1, "Connection already has remote agent"); return; } @@ -1788,7 +1752,7 @@ void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, const string& rep agent->agentBank = assignedBank; agent->routingKey = replyToKey; agent->connectionRef = connectionRef; - agent->mgmtObject = new _qmf::Agent (this, agent.get()); + agent->mgmtObject = _qmf::Agent::shared_ptr(new _qmf::Agent (this, agent.get())); agent->mgmtObject->set_connectionRef(agent->connectionRef); agent->mgmtObject->set_label (label); agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId()); @@ -1801,25 +1765,22 @@ void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, const string& rep QPID_LOG(debug, "Remote Agent registered bank=[" << brokerBank << "." << assignedBank << "] replyTo=" << replyToKey); // Send an Attach Response - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + ResizableBuffer outBuffer (qmfV1BufferSize); encodeHeader (outBuffer, 'a', sequence); outBuffer.putLong (brokerBank); outBuffer.putLong (assignedBank); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - sendBufferLH(outBuffer, outLen, dExchange, replyToKey); + sendBuffer(outBuffer, dExchange, replyToKey); QPID_LOG(debug, "SEND AttachResponse brokerBank=" << brokerBank << " agentBank=" << assignedBank << " to=" << replyToKey << " seq=" << sequence); } -void ManagementAgent::handleGetQueryLH(Buffer& inBuffer, const string& replyToKey, uint32_t sequence) +void ManagementAgent::handleGetQuery(Buffer& inBuffer, const string& replyToKey, uint32_t sequence) { FieldTable ft; FieldTable::ValuePtr value; - moveNewObjectsLH(); + moveNewObjects(); ft.decode(inBuffer); @@ -1832,11 +1793,17 @@ void ManagementAgent::handleGetQueryLH(Buffer& inBuffer, const string& replyToKe return; ObjectId selector(value->get<string>()); - ManagementObjectMap::iterator iter = numericFind(selector); - if (iter != managementObjects.end()) { - ManagementObject* object = iter->second; - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + + ManagementObject::shared_ptr object; + { + sys::Mutex::ScopedLock lock(objectLock); + ManagementObjectMap::iterator iter = numericFind(selector); + if (iter != managementObjects.end()) + object = iter->second; + } + + if (object) { + ResizableBuffer outBuffer (qmfV1BufferSize); if (object->getConfigChanged() || object->getInstChanged()) object->setUpdateTime(); @@ -1849,89 +1816,80 @@ void ManagementAgent::handleGetQueryLH(Buffer& inBuffer, const string& replyToKe sBuf.clear(); object->writeStatistics(sBuf, true); outBuffer.putRawData(sBuf); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - sendBufferLH(outBuffer, outLen, dExchange, replyToKey); + sendBuffer(outBuffer, dExchange, replyToKey); QPID_LOG(debug, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence); } } - sendCommandCompleteLH(replyToKey, sequence); + sendCommandComplete(replyToKey, sequence); return; } string className (value->get<string>()); - std::list<ObjectId>matches; + std::list<ManagementObject::shared_ptr> matches; if (className == "memory") - qpid::sys::MemStat::loadMemInfo(memstat); + qpid::sys::MemStat::loadMemInfo(memstat.get()); if (className == "broker") { uint64_t uptime = sys::Duration(startTime, sys::now()); - static_cast<_qmf::Broker*>(broker->GetManagementObject())->set_uptime(uptime); + boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject())->set_uptime(uptime); } // build up a set of all objects to be dumped - for (ManagementObjectMap::iterator iter = managementObjects.begin(); - iter != managementObjects.end(); - iter++) { - ManagementObject* object = iter->second; - if (object->getClassName () == className) { - matches.push_back(object->getObjectId()); + { + sys::Mutex::ScopedLock lock(objectLock); + for (ManagementObjectMap::iterator iter = managementObjects.begin(); + iter != managementObjects.end(); + iter++) { + ManagementObject::shared_ptr object = iter->second; + if (object->getClassName () == className) { + matches.push_back(object); + } } } - // send them (as sendBufferLH drops the userLock) - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + // send them + ResizableBuffer outBuffer (qmfV1BufferSize); while (matches.size()) { - ObjectId objId = matches.front(); - ManagementObjectMap::iterator oIter = managementObjects.find( objId ); - if (oIter != managementObjects.end()) { - ManagementObject* object = oIter->second; - - if (object->getConfigChanged() || object->getInstChanged()) - object->setUpdateTime(); - - if (!object->isDeleted()) { - string sProps, sStats; - object->writeProperties(sProps); - object->writeStatistics(sStats, true); - - size_t len = 8 + sProps.length() + sStats.length(); // 8 == size of header in bytes. - if (len > MA_BUFFER_SIZE) { - QPID_LOG(error, "Object " << objId << " too large for output buffer - discarded!"); - } else { - if (outBuffer.available() < len) { // not enough room in current buffer, send it. - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - sendBufferLH(outBuffer, outLen, dExchange, replyToKey); // drops lock - QPID_LOG(debug, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence); - continue; // lock dropped, need to re-find _SAME_ objid as it may have been deleted. - } - encodeHeader(outBuffer, 'g', sequence); - outBuffer.putRawData(sProps); - outBuffer.putRawData(sStats); + ManagementObject::shared_ptr object = matches.front(); + if (object->getConfigChanged() || object->getInstChanged()) + object->setUpdateTime(); + + if (!object->isDeleted()) { + string sProps, sStats; + object->writeProperties(sProps); + object->writeStatistics(sStats, true); + + size_t len = 8 + sProps.length() + sStats.length(); // 8 == size of header in bytes. + if (len > qmfV1BufferSize) { + QPID_LOG(error, "Object " << object->getObjectId() << " too large for output buffer - discarded!"); + } else { + if (outBuffer.available() < len) { // not enough room in current buffer, send it. + sendBuffer(outBuffer, dExchange, replyToKey); + QPID_LOG(debug, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence); + continue; // lock dropped, need to re-find _SAME_ objid as it may have been deleted. } + encodeHeader(outBuffer, 'g', sequence); + outBuffer.putRawData(sProps); + outBuffer.putRawData(sStats); } } matches.pop_front(); } - outLen = MA_BUFFER_SIZE - outBuffer.available (); - if (outLen) { - outBuffer.reset (); - sendBufferLH(outBuffer, outLen, dExchange, replyToKey); + if (outBuffer.getPosition() > 0) { + sendBuffer(outBuffer, dExchange, replyToKey); QPID_LOG(debug, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence); } - sendCommandCompleteLH(replyToKey, sequence); + sendCommandComplete(replyToKey, sequence); } -void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, const string& rtk, const string& cid, bool viaLocal) +void ManagementAgent::handleGetQuery(const string& body, const string& rte, const string& rtk, const string& cid, bool viaLocal) { - moveNewObjectsLH(); + moveNewObjects(); Variant::Map inMap; Variant::Map::const_iterator i; @@ -1950,17 +1908,17 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co */ i = inMap.find("_what"); if (i == inMap.end()) { - sendExceptionLH(rte, rtk, cid, "_what element missing in Query"); + sendException(rte, rtk, cid, "_what element missing in Query"); return; } if (i->second.getType() != qpid::types::VAR_STRING) { - sendExceptionLH(rte, rtk, cid, "_what element is not a string"); + sendException(rte, rtk, cid, "_what element is not a string"); return; } if (i->second.asString() != "OBJECT") { - sendExceptionLH(rte, rtk, cid, "Query for _what => '" + i->second.asString() + "' not supported"); + sendException(rte, rtk, cid, "Query for _what => '" + i->second.asString() + "' not supported"); return; } @@ -1984,11 +1942,11 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co } if (className == "memory") - qpid::sys::MemStat::loadMemInfo(memstat); + qpid::sys::MemStat::loadMemInfo(memstat.get()); if (className == "broker") { uint64_t uptime = sys::Duration(startTime, sys::now()); - static_cast<_qmf::Broker*>(broker->GetManagementObject())->set_uptime(uptime); + boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject())->set_uptime(uptime); } /* @@ -2000,10 +1958,14 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co Variant::List list_; ObjectId objId(i->second.asMap()); - ManagementObjectMap::iterator iter = managementObjects.find(objId); - if (iter != managementObjects.end()) { - ManagementObject* object = iter->second; - + ManagementObject::shared_ptr object; + { + sys::Mutex::ScopedLock lock (objectLock); + ManagementObjectMap::iterator iter = managementObjects.find(objId); + if (iter != managementObjects.end()) + object = iter->second; + } + if (object) { if (object->getConfigChanged() || object->getInstChanged()) object->setUpdateTime(); @@ -2027,7 +1989,7 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co string content; ListCodec::encode(list_, content); - sendBufferLH(content, cid, headers, "amqp/list", rte, rtk); + sendBuffer(content, cid, headers, "amqp/list", rte, rtk); QPID_LOG(debug, "SENT QueryResponse (query by object_id) to=" << rte << "/" << rtk); return; } @@ -2037,10 +1999,18 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co Variant::List _subList; unsigned int objCount = 0; - for (ManagementObjectMap::iterator iter = managementObjects.begin(); - iter != managementObjects.end(); + ManagementObjectVector localManagementObjects; + { + sys::Mutex::ScopedLock objLock(objectLock); + std::transform(managementObjects.begin(), managementObjects.end(), + std::back_inserter(localManagementObjects), + boost::bind(&ManagementObjectMap::value_type::second, _1)); + } + + for (ManagementObjectVector::iterator iter = localManagementObjects.begin(); + iter != localManagementObjects.end(); iter++) { - ManagementObject* object = iter->second; + ManagementObject::shared_ptr object = *iter; if (object->getClassName() == className && (packageName.empty() || object->getPackageName() == packageName)) { @@ -2055,7 +2025,7 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co object->writeTimestamps(map_); object->mapEncodeValues(values, true, true); // write both stats and properties - iter->first.mapEncode(oidMap); + object->getObjectId().mapEncode(oidMap); map_["_values"] = values; map_["_object_id"] = oidMap; @@ -2080,13 +2050,13 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co string content; while (_list.size() > 1) { ListCodec::encode(_list.front().asList(), content); - sendBufferLH(content, cid, headers, "amqp/list", rte, rtk); + sendBuffer(content, cid, headers, "amqp/list", rte, rtk); _list.pop_front(); QPID_LOG(debug, "SENT QueryResponse (partial, query by schema_id) to=" << rte << "/" << rtk << " len=" << content.length()); } headers.erase("partial"); ListCodec::encode(_list.size() ? _list.front().asList() : Variant::List(), content); - sendBufferLH(content, cid, headers, "amqp/list", rte, rtk); + sendBuffer(content, cid, headers, "amqp/list", rte, rtk); QPID_LOG(debug, "SENT QueryResponse (query by schema_id) to=" << rte << "/" << rtk << " len=" << content.length()); return; } @@ -2094,12 +2064,12 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co // Unrecognized query - Send empty message to indicate CommandComplete string content; ListCodec::encode(Variant::List(), content); - sendBufferLH(content, cid, headers, "amqp/list", rte, rtk); + sendBuffer(content, cid, headers, "amqp/list", rte, rtk); QPID_LOG(debug, "SENT QueryResponse (empty) to=" << rte << "/" << rtk); } -void ManagementAgent::handleLocateRequestLH(const string&, const string& rte, const string& rtk, const string& cid) +void ManagementAgent::handleLocateRequest(const string&, const string& rte, const string& rtk, const string& cid) { QPID_LOG(debug, "RCVD AgentLocateRequest"); @@ -2117,16 +2087,17 @@ void ManagementAgent::handleLocateRequestLH(const string&, const string& rte, co string content; MapCodec::encode(map, content); - sendBufferLH(content, cid, headers, "amqp/map", rte, rtk); + sendBuffer(content, cid, headers, "amqp/map", rte, rtk); clientWasAdded = true; QPID_LOG(debug, "SENT AgentLocateResponse replyTo=" << rte << "/" << rtk); } -bool ManagementAgent::authorizeAgentMessageLH(Message& msg) +bool ManagementAgent::authorizeAgentMessage(Message& msg) { - Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE); + sys::Mutex::ScopedLock lock(userLock); + ResizableBuffer inBuffer (qmfV1BufferSize); uint32_t sequence = 0; bool methodReq = false; bool mapMsg = false; @@ -2140,7 +2111,7 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg) // authorized or not. In this case, return true (authorized) if there is no ACL in place, // otherwise return false; // - if (msg.getContentSize() > MA_BUFFER_SIZE) + if (msg.getContentSize() > qmfV1BufferSize) return broker->getAcl() == 0; inBuffer.putRawData(msg.getContent()); @@ -2149,7 +2120,7 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg) qpid::broker::amqp_0_10::MessageTransfer& transfer(qpid::broker::amqp_0_10::MessageTransfer::get(msg)); const framing::MessageProperties* p = - transfer.getFrames().getHeaders()->get<framing::MessageProperties>(); + transfer.getFrames().getHeaders()->get<framing::MessageProperties>(); const framing::FieldTable *headers = p ? &p->getApplicationHeaders() : 0; @@ -2193,11 +2164,11 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg) } // look up schema for object to get package and class name - + sys::Mutex::ScopedLock lock(objectLock); ManagementObjectMap::iterator iter = managementObjects.find(objId); if (iter == managementObjects.end() || iter->second->isDeleted()) { - QPID_LOG(debug, "ManagementAgent::authorizeAgentMessageLH: stale object id " << + QPID_LOG(debug, "ManagementAgent::authorizeAgentMessage: stale object id " << objId); return false; } @@ -2256,19 +2227,16 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg) cid = p->getCorrelationId(); if (mapMsg) { - sendExceptionLH(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_FORBIDDEN), - Manageable::STATUS_FORBIDDEN, false); + sendException(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_FORBIDDEN), + Manageable::STATUS_FORBIDDEN, false); } else { - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + ResizableBuffer outBuffer(qmfV1BufferSize); encodeHeader(outBuffer, 'm', sequence); outBuffer.putLong(Manageable::STATUS_FORBIDDEN); outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN)); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - sendBufferLH(outBuffer, outLen, rte, rtk); + sendBuffer(outBuffer, rte, rtk); } QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence); @@ -2280,7 +2248,7 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg) return true; } -void ManagementAgent::dispatchAgentCommandLH(Message& msg, bool viaLocal) +void ManagementAgent::dispatchAgentCommand(Message& msg, bool viaLocal) { string rte; string rtk; @@ -2295,10 +2263,10 @@ void ManagementAgent::dispatchAgentCommandLH(Message& msg, bool viaLocal) else return; - Buffer inBuffer(inputBuffer, MA_BUFFER_SIZE); + ResizableBuffer inBuffer(qmfV1BufferSize); uint8_t opcode; - if (msg.getContentSize() > MA_BUFFER_SIZE) { + if (msg.getContentSize() > qmfV1BufferSize) { QPID_LOG(debug, "ManagementAgent::dispatchAgentCommandLH: Message too large: " << msg.getContentSize()); return; @@ -2317,39 +2285,38 @@ void ManagementAgent::dispatchAgentCommandLH(Message& msg, bool viaLocal) string body; string cid; inBuffer.getRawData(body, bufferLen); + { + if (p && p->hasCorrelationId()) { + cid = p->getCorrelationId(); + } - if (p && p->hasCorrelationId()) { - cid = p->getCorrelationId(); + if (opcode == "_method_request") + return handleMethodRequest(body, rte, rtk, cid, msg.getPublisher(), viaLocal); + else if (opcode == "_query_request") + return handleGetQuery(body, rte, rtk, cid, viaLocal); + else if (opcode == "_agent_locate_request") + return handleLocateRequest(body, rte, rtk, cid); } - - if (opcode == "_method_request") - return handleMethodRequestLH(body, rte, rtk, cid, msg.getPublisher(), viaLocal); - else if (opcode == "_query_request") - return handleGetQueryLH(body, rte, rtk, cid, viaLocal); - else if (opcode == "_agent_locate_request") - return handleLocateRequestLH(body, rte, rtk, cid); - QPID_LOG(warning, "Support for QMF Opcode [" << opcode << "] TBD!!!"); return; } // old preV2 binary messages - while (inBuffer.getPosition() < bufferLen) { uint32_t sequence; if (!checkHeader(inBuffer, &opcode, &sequence)) return; - if (opcode == 'B') handleBrokerRequestLH (inBuffer, rtk, sequence); - else if (opcode == 'P') handlePackageQueryLH (inBuffer, rtk, sequence); - else if (opcode == 'p') handlePackageIndLH (inBuffer, rtk, sequence); - else if (opcode == 'Q') handleClassQueryLH (inBuffer, rtk, sequence); - else if (opcode == 'q') handleClassIndLH (inBuffer, rtk, sequence); - else if (opcode == 'S') handleSchemaRequestLH (inBuffer, rte, rtk, sequence); - else if (opcode == 's') handleSchemaResponseLH (inBuffer, rtk, sequence); - else if (opcode == 'A') handleAttachRequestLH (inBuffer, rtk, sequence, msg.getPublisher()); - else if (opcode == 'G') handleGetQueryLH (inBuffer, rtk, sequence); - else if (opcode == 'M') handleMethodRequestLH (inBuffer, rtk, sequence, msg.getPublisher()); + if (opcode == 'B') handleBrokerRequest (inBuffer, rtk, sequence); + else if (opcode == 'P') handlePackageQuery (inBuffer, rtk, sequence); + else if (opcode == 'p') handlePackageInd (inBuffer, rtk, sequence); + else if (opcode == 'Q') handleClassQuery (inBuffer, rtk, sequence); + else if (opcode == 'q') handleClassInd (inBuffer, rtk, sequence); + else if (opcode == 'S') handleSchemaRequest (inBuffer, rte, rtk, sequence); + else if (opcode == 's') handleSchemaResponse (inBuffer, rtk, sequence); + else if (opcode == 'A') handleAttachRequest (inBuffer, rtk, sequence, msg.getPublisher()); + else if (opcode == 'G') handleGetQuery (inBuffer, rtk, sequence); + else if (opcode == 'M') handleMethodRequest (inBuffer, rtk, sequence, msg.getPublisher()); } } @@ -2365,14 +2332,11 @@ ManagementAgent::PackageMap::iterator ManagementAgent::findOrAddPackageLH(string QPID_LOG (debug, "ManagementAgent added package " << name); // Publish a package-indication message - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + ResizableBuffer outBuffer (qmfV1BufferSize); encodeHeader (outBuffer, 'p'); encodePackageIndication (outBuffer, result.first); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - sendBufferLH(outBuffer, outLen, mExchange, "schema.package"); + sendBuffer(outBuffer, mExchange, "schema.package"); QPID_LOG(debug, "SEND PackageInd package=" << name << " to=schema.package"); return result.first; @@ -2587,70 +2551,6 @@ void ManagementAgent::SchemaClass::mapDecode(const Variant::Map& _map) { } } -void ManagementAgent::exportSchemas(string& out) { - Variant::List list_; - Variant::Map map_, kmap, cmap; - - for (PackageMap::const_iterator i = packages.begin(); i != packages.end(); ++i) { - string name = i->first; - const ClassMap& classes = i ->second; - for (ClassMap::const_iterator j = classes.begin(); j != classes.end(); ++j) { - const SchemaClassKey& key = j->first; - const SchemaClass& klass = j->second; - if (klass.writeSchemaCall == 0) { // Ignore built-in schemas. - // Encode name, schema-key, schema-class - - map_.clear(); - kmap.clear(); - cmap.clear(); - - key.mapEncode(kmap); - klass.mapEncode(cmap); - - map_["_pname"] = name; - map_["_key"] = kmap; - map_["_class"] = cmap; - list_.push_back(map_); - } - } - } - - ListCodec::encode(list_, out); -} - -void ManagementAgent::importSchemas(qpid::framing::Buffer& inBuf) { - - string buf(inBuf.getPointer(), inBuf.available()); - Variant::List content; - ListCodec::decode(buf, content); - Variant::List::const_iterator l; - - - for (l = content.begin(); l != content.end(); l++) { - string package; - SchemaClassKey key; - SchemaClass klass; - Variant::Map map_, kmap, cmap; - Variant::Map::const_iterator i; - - map_ = l->asMap(); - - if ((i = map_.find("_pname")) != map_.end()) { - package = i->second.asString(); - - if ((i = map_.find("_key")) != map_.end()) { - key.mapDecode(i->second.asMap()); - - if ((i = map_.find("_class")) != map_.end()) { - klass.mapDecode(i->second.asMap()); - - packages[package][key] = klass; - } - } - } - } -} - void ManagementAgent::RemoteAgent::mapEncode(Variant::Map& map_) const { Variant::Map _objId, _values; @@ -2684,7 +2584,7 @@ void ManagementAgent::RemoteAgent::mapDecode(const Variant::Map& map_) { connectionRef.mapDecode(i->second.asMap()); } - mgmtObject = new _qmf::Agent(&agent, this); + mgmtObject = _qmf::Agent::shared_ptr(new _qmf::Agent(&agent, this)); if ((i = map_.find("_values")) != map_.end()) { mgmtObject->mapDecodeValues(i->second.asMap()); @@ -2694,52 +2594,6 @@ void ManagementAgent::RemoteAgent::mapDecode(const Variant::Map& map_) { mgmtObject->set_connectionRef(connectionRef); } -void ManagementAgent::exportAgents(string& out) { - Variant::List list_; - Variant::Map map_, omap, amap; - - for (RemoteAgentMap::const_iterator i = remoteAgents.begin(); - i != remoteAgents.end(); - ++i) - { - // TODO aconway 2010-03-04: see comment in ManagementAgent::RemoteAgent::encode - boost::shared_ptr<RemoteAgent> agent(i->second); - - map_.clear(); - amap.clear(); - - agent->mapEncode(amap); - map_["_remote_agent"] = amap; - list_.push_back(map_); - } - - ListCodec::encode(list_, out); -} - -void ManagementAgent::importAgents(qpid::framing::Buffer& inBuf) { - string buf(inBuf.getPointer(), inBuf.available()); - Variant::List content; - ListCodec::decode(buf, content); - Variant::List::const_iterator l; - sys::Mutex::ScopedLock lock(userLock); - - for (l = content.begin(); l != content.end(); l++) { - boost::shared_ptr<RemoteAgent> agent(new RemoteAgent(*this)); - Variant::Map map_; - Variant::Map::const_iterator i; - - map_ = l->asMap(); - - if ((i = map_.find("_remote_agent")) != map_.end()) { - - agent->mapDecode(i->second.asMap()); - - addObject (agent->mgmtObject, 0, false); - remoteAgents[agent->connectionRef] = agent; - } - } -} - namespace { bool isDeletedMap(const ManagementObjectMap::value_type& value) { return value.second->isDeleted(); @@ -2818,56 +2672,8 @@ Variant::Map ManagementAgent::toMap(const FieldTable& from) return map; } - -// Build up a list of the current set of deleted objects that are pending their -// next (last) publish-ment. -void ManagementAgent::exportDeletedObjects(DeletedObjectList& outList) -{ - outList.clear(); - - sys::Mutex::ScopedLock lock (userLock); - - moveNewObjectsLH(); - moveDeletedObjectsLH(); - - // now copy the pending deletes into the outList - for (PendingDeletedObjsMap::iterator mIter = pendingDeletedObjs.begin(); - mIter != pendingDeletedObjs.end(); mIter++) { - for (DeletedObjectList::iterator lIter = mIter->second.begin(); - lIter != mIter->second.end(); lIter++) { - outList.push_back(*lIter); - } - } -} - -// Called by cluster to reset the management agent's list of deleted -// objects to match the rest of the cluster. -void ManagementAgent::importDeletedObjects(const DeletedObjectList& inList) -{ - sys::Mutex::ScopedLock lock (userLock); - // Clear out any existing deleted objects - moveNewObjectsLH(); - pendingDeletedObjs.clear(); - ManagementObjectMap::iterator i = managementObjects.begin(); - // Silently drop any deleted objects left over from receiving the update. - while (i != managementObjects.end()) { - ManagementObject* object = i->second; - if (object->isDeleted()) { - delete object; - managementObjects.erase(i++); - } - else ++i; - } - for (DeletedObjectList::const_iterator lIter = inList.begin(); lIter != inList.end(); lIter++) { - - std::string classkey((*lIter)->packageName + std::string(":") + (*lIter)->className); - pendingDeletedObjs[classkey].push_back(*lIter); - } -} - - // construct a DeletedObject from a management object. -ManagementAgent::DeletedObject::DeletedObject(ManagementObject *src, bool v1, bool v2) +ManagementAgent::DeletedObject::DeletedObject(ManagementObject::shared_ptr src, bool v1, bool v2) : packageName(src->getPackageName()), className(src->getClassName()) { @@ -2903,54 +2709,18 @@ ManagementAgent::DeletedObject::DeletedObject(ManagementObject *src, bool v1, bo } } +// Remove Deleted objects, and save for later publishing... +bool ManagementAgent::moveDeletedObjects() { + typedef vector<pair<ObjectId, ManagementObject::shared_ptr> > DeleteList; + sys::Mutex::ScopedLock lock (objectLock); -// construct a DeletedObject from an encoded representation. Used by -// clustering to move deleted objects between clustered brokers. See -// DeletedObject::encode() for the reverse. -ManagementAgent::DeletedObject::DeletedObject(const std::string& encoded) -{ - qpid::types::Variant::Map map_; - MapCodec::decode(encoded, map_); - - packageName = map_["_package_name"].getString(); - className = map_["_class_name"].getString(); - objectId = map_["_object_id"].getString(); - - encodedV1Config = map_["_v1_config"].getString(); - encodedV1Inst = map_["_v1_inst"].getString(); - encodedV2 = map_["_v2_data"].asMap(); -} - - -// encode a DeletedObject to a string buffer. Used by -// clustering to move deleted objects between clustered brokers. See -// DeletedObject(const std::string&) for the reverse. -void ManagementAgent::DeletedObject::encode(std::string& toBuffer) -{ - qpid::types::Variant::Map map_; - - - map_["_package_name"] = packageName; - map_["_class_name"] = className; - map_["_object_id"] = objectId; - - map_["_v1_config"] = encodedV1Config; - map_["_v1_inst"] = encodedV1Inst; - map_["_v2_data"] = encodedV2; - - MapCodec::encode(map_, toBuffer); -} - -// Remove Deleted objects, and save for later publishing... -bool ManagementAgent::moveDeletedObjectsLH() { - typedef vector<pair<ObjectId, ManagementObject*> > DeleteList; DeleteList deleteList; for (ManagementObjectMap::iterator iter = managementObjects.begin(); iter != managementObjects.end(); ++iter) { - ManagementObject* object = iter->second; + ManagementObject::shared_ptr object = iter->second; if (object->isDeleted()) deleteList.push_back(*iter); } @@ -2959,17 +2729,31 @@ bool ManagementAgent::moveDeletedObjectsLH() { iter != deleteList.rend(); iter++) { - ManagementObject* delObj = iter->second; + ManagementObject::shared_ptr delObj = iter->second; assert(delObj->isDeleted()); DeletedObject::shared_ptr dptr(new DeletedObject(delObj, qmf1Support, qmf2Support)); pendingDeletedObjs[dptr->getKey()].push_back(dptr); managementObjects.erase(iter->first); - delete iter->second; } return !deleteList.empty(); } +ManagementAgent::EventQueue::Batch::const_iterator ManagementAgent::sendEvents( + const EventQueue::Batch& batch) +{ + EventQueue::Batch::const_iterator i; + for (i = batch.begin(); i != batch.end(); ++i) { + DeliverableMessage deliverable (i->second, 0); + try { + i->first->route(deliverable); + } catch(exception& e) { + QPID_LOG(warning, "ManagementAgent failed to route event: " << e.what()); + } + } + return i; +} + namespace { QPID_TSS const qpid::broker::ConnectionState* executionContext = 0; } |