diff options
-rw-r--r-- | cpp/src/qpid/console/Broker.cpp | 32 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementAgent.cpp | 95 |
2 files changed, 79 insertions, 48 deletions
diff --git a/cpp/src/qpid/console/Broker.cpp b/cpp/src/qpid/console/Broker.cpp index 1a3172fff6..789b90eaaf 100644 --- a/cpp/src/qpid/console/Broker.cpp +++ b/cpp/src/qpid/console/Broker.cpp @@ -92,25 +92,29 @@ bool Broker::checkHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq) const void Broker::received(qpid::client::Message& msg) { +#define QMF_HEADER_SIZE 8 string data = msg.getData(); Buffer inBuffer(const_cast<char*>(data.c_str()), data.size()); uint8_t opcode; uint32_t sequence; - if (checkHeader(inBuffer, &opcode, &sequence)) { - QPID_LOG(trace, "Broker::received: opcode=" << opcode << " seq=" << sequence); - - if (opcode == 'b') sessionManager.handleBrokerResp(this, inBuffer, sequence); - else if (opcode == 'p') sessionManager.handlePackageInd(this, inBuffer, sequence); - else if (opcode == 'z') sessionManager.handleCommandComplete(this, inBuffer, sequence); - else if (opcode == 'q') sessionManager.handleClassInd(this, inBuffer, sequence); - else if (opcode == 'm') sessionManager.handleMethodResp(this, inBuffer, sequence); - else if (opcode == 'h') sessionManager.handleHeartbeatInd(this, inBuffer, sequence); - else if (opcode == 'e') sessionManager.handleEventInd(this, inBuffer, sequence); - else if (opcode == 's') sessionManager.handleSchemaResp(this, inBuffer, sequence); - else if (opcode == 'c') sessionManager.handleContentInd(this, inBuffer, sequence, true, false); - else if (opcode == 'i') sessionManager.handleContentInd(this, inBuffer, sequence, false, true); - else if (opcode == 'g') sessionManager.handleContentInd(this, inBuffer, sequence, true, true); + while (inBuffer.available() >= QMF_HEADER_SIZE) { + if (checkHeader(inBuffer, &opcode, &sequence)) { + QPID_LOG(trace, "Broker::received: opcode=" << opcode << " seq=" << sequence); + + if (opcode == 'b') sessionManager.handleBrokerResp(this, inBuffer, sequence); + else if (opcode == 'p') sessionManager.handlePackageInd(this, inBuffer, sequence); + else if (opcode == 'z') sessionManager.handleCommandComplete(this, inBuffer, sequence); + else if (opcode == 'q') sessionManager.handleClassInd(this, inBuffer, sequence); + else if (opcode == 'm') sessionManager.handleMethodResp(this, inBuffer, sequence); + else if (opcode == 'h') sessionManager.handleHeartbeatInd(this, inBuffer, sequence); + else if (opcode == 'e') sessionManager.handleEventInd(this, inBuffer, sequence); + else if (opcode == 's') sessionManager.handleSchemaResp(this, inBuffer, sequence); + else if (opcode == 'c') sessionManager.handleContentInd(this, inBuffer, sequence, true, false); + else if (opcode == 'i') sessionManager.handleContentInd(this, inBuffer, sequence, false, true); + else if (opcode == 'g') sessionManager.handleContentInd(this, inBuffer, sequence, true, true); + } else + return; } } diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp index 4998b274e8..8fcc5264e4 100644 --- a/cpp/src/qpid/management/ManagementAgent.cpp +++ b/cpp/src/qpid/management/ManagementAgent.cpp @@ -310,6 +310,7 @@ void ManagementAgent::moveNewObjectsLH() void ManagementAgent::periodicProcessing (void) { #define BUFSIZE 65536 +#define HEADROOM 4096 Mutex::ScopedLock lock (userLock); char msgChars[BUFSIZE]; uint32_t contentSize; @@ -321,49 +322,75 @@ void ManagementAgent::periodicProcessing (void) moveNewObjectsLH(); - if (clientWasAdded) { - clientWasAdded = false; - for (ManagementObjectMap::iterator iter = managementObjects.begin (); - iter != managementObjects.end (); - iter++) { - ManagementObject* object = iter->second; + // + // Clear the been-here flag on all objects in the map. + // + for (ManagementObjectMap::iterator iter = managementObjects.begin(); + iter != managementObjects.end(); + iter++) { + ManagementObject* object = iter->second; + object->setFlags(0); + if (clientWasAdded) { object->setForcePublish(true); } } - for (ManagementObjectMap::iterator iter = managementObjects.begin (); - iter != managementObjects.end (); - iter++) { - ManagementObject* object = iter->second; - - if (object->getConfigChanged() || object->getInstChanged()) - object->setUpdateTime(); + clientWasAdded = false; - if (object->getConfigChanged() || object->getForcePublish() || object->isDeleted()) { - Buffer msgBuffer (msgChars, BUFSIZE); - encodeHeader (msgBuffer, 'c'); - object->writeProperties(msgBuffer); + // + // Process the entire object map. + // + for (ManagementObjectMap::iterator baseIter = managementObjects.begin(); + baseIter != managementObjects.end(); + baseIter++) { + ManagementObject* baseObject = baseIter->second; + + // + // Skip until we find a base object requiring a sent message. + // + if (baseObject->getFlags() == 1 || + (!baseObject->getConfigChanged() && + !baseObject->getInstChanged() && + !baseObject->getForcePublish() && + !baseObject->isDeleted())) + continue; - contentSize = BUFSIZE - msgBuffer.available (); - msgBuffer.reset (); - routingKey = "console.obj.1.0." + object->getPackageName() + "." + object->getClassName(); - sendBuffer (msgBuffer, contentSize, mExchange, routingKey); - } + Buffer msgBuffer(msgChars, BUFSIZE); + for (ManagementObjectMap::iterator iter = baseIter; + iter != managementObjects.end(); + iter++) { + ManagementObject* object = iter->second; + if (baseObject->isSameClass(*object) && object->getFlags() == 0) { + object->setFlags(1); + if (object->getConfigChanged() || object->getInstChanged()) + object->setUpdateTime(); + + if (object->getConfigChanged() || object->getForcePublish() || object->isDeleted()) { + encodeHeader(msgBuffer, 'c'); + object->writeProperties(msgBuffer); + } - if (object->hasInst() && (object->getInstChanged() || object->getForcePublish())) { - Buffer msgBuffer (msgChars, BUFSIZE); - encodeHeader (msgBuffer, 'i'); - object->writeStatistics(msgBuffer); - - contentSize = BUFSIZE - msgBuffer.available (); - msgBuffer.reset (); - routingKey = "console.obj.1.0." + object->getPackageName() + "." + object->getClassName(); - sendBuffer (msgBuffer, contentSize, mExchange, routingKey); + if (object->hasInst() && (object->getInstChanged() || object->getForcePublish())) { + encodeHeader(msgBuffer, 'i'); + object->writeStatistics(msgBuffer); + } + + if (object->isDeleted()) + deleteList.push_back(pair<ObjectId, ManagementObject*>(iter->first, object)); + object->setForcePublish(false); + + if (msgBuffer.available() < HEADROOM) + break; + } } - if (object->isDeleted()) - deleteList.push_back(pair<ObjectId, ManagementObject*>(iter->first, object)); - object->setForcePublish(false); + contentSize = BUFSIZE - msgBuffer.available(); + if (contentSize > 0) { + msgBuffer.reset(); + stringstream key; + key << "console.obj.1.0." << baseObject->getPackageName() << "." << baseObject->getClassName(); + sendBuffer(msgBuffer, contentSize, mExchange, key.str()); + } } // Delete flagged objects |