summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cpp/src/qpid/console/Broker.cpp32
-rw-r--r--cpp/src/qpid/management/ManagementAgent.cpp95
2 files changed, 79 insertions, 48 deletions
diff --git a/cpp/src/qpid/console/Broker.cpp b/cpp/src/qpid/console/Broker.cpp
index 1a3172fff6..789b90eaaf 100644
--- a/cpp/src/qpid/console/Broker.cpp
+++ b/cpp/src/qpid/console/Broker.cpp
@@ -92,25 +92,29 @@ bool Broker::checkHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq) const
void Broker::received(qpid::client::Message& msg)
{
+#define QMF_HEADER_SIZE 8
string data = msg.getData();
Buffer inBuffer(const_cast<char*>(data.c_str()), data.size());
uint8_t opcode;
uint32_t sequence;
- if (checkHeader(inBuffer, &opcode, &sequence)) {
- QPID_LOG(trace, "Broker::received: opcode=" << opcode << " seq=" << sequence);
-
- if (opcode == 'b') sessionManager.handleBrokerResp(this, inBuffer, sequence);
- else if (opcode == 'p') sessionManager.handlePackageInd(this, inBuffer, sequence);
- else if (opcode == 'z') sessionManager.handleCommandComplete(this, inBuffer, sequence);
- else if (opcode == 'q') sessionManager.handleClassInd(this, inBuffer, sequence);
- else if (opcode == 'm') sessionManager.handleMethodResp(this, inBuffer, sequence);
- else if (opcode == 'h') sessionManager.handleHeartbeatInd(this, inBuffer, sequence);
- else if (opcode == 'e') sessionManager.handleEventInd(this, inBuffer, sequence);
- else if (opcode == 's') sessionManager.handleSchemaResp(this, inBuffer, sequence);
- else if (opcode == 'c') sessionManager.handleContentInd(this, inBuffer, sequence, true, false);
- else if (opcode == 'i') sessionManager.handleContentInd(this, inBuffer, sequence, false, true);
- else if (opcode == 'g') sessionManager.handleContentInd(this, inBuffer, sequence, true, true);
+ while (inBuffer.available() >= QMF_HEADER_SIZE) {
+ if (checkHeader(inBuffer, &opcode, &sequence)) {
+ QPID_LOG(trace, "Broker::received: opcode=" << opcode << " seq=" << sequence);
+
+ if (opcode == 'b') sessionManager.handleBrokerResp(this, inBuffer, sequence);
+ else if (opcode == 'p') sessionManager.handlePackageInd(this, inBuffer, sequence);
+ else if (opcode == 'z') sessionManager.handleCommandComplete(this, inBuffer, sequence);
+ else if (opcode == 'q') sessionManager.handleClassInd(this, inBuffer, sequence);
+ else if (opcode == 'm') sessionManager.handleMethodResp(this, inBuffer, sequence);
+ else if (opcode == 'h') sessionManager.handleHeartbeatInd(this, inBuffer, sequence);
+ else if (opcode == 'e') sessionManager.handleEventInd(this, inBuffer, sequence);
+ else if (opcode == 's') sessionManager.handleSchemaResp(this, inBuffer, sequence);
+ else if (opcode == 'c') sessionManager.handleContentInd(this, inBuffer, sequence, true, false);
+ else if (opcode == 'i') sessionManager.handleContentInd(this, inBuffer, sequence, false, true);
+ else if (opcode == 'g') sessionManager.handleContentInd(this, inBuffer, sequence, true, true);
+ } else
+ return;
}
}
diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp
index 4998b274e8..8fcc5264e4 100644
--- a/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/cpp/src/qpid/management/ManagementAgent.cpp
@@ -310,6 +310,7 @@ void ManagementAgent::moveNewObjectsLH()
void ManagementAgent::periodicProcessing (void)
{
#define BUFSIZE 65536
+#define HEADROOM 4096
Mutex::ScopedLock lock (userLock);
char msgChars[BUFSIZE];
uint32_t contentSize;
@@ -321,49 +322,75 @@ void ManagementAgent::periodicProcessing (void)
moveNewObjectsLH();
- if (clientWasAdded) {
- clientWasAdded = false;
- for (ManagementObjectMap::iterator iter = managementObjects.begin ();
- iter != managementObjects.end ();
- iter++) {
- ManagementObject* object = iter->second;
+ //
+ // Clear the been-here flag on all objects in the map.
+ //
+ for (ManagementObjectMap::iterator iter = managementObjects.begin();
+ iter != managementObjects.end();
+ iter++) {
+ ManagementObject* object = iter->second;
+ object->setFlags(0);
+ if (clientWasAdded) {
object->setForcePublish(true);
}
}
- for (ManagementObjectMap::iterator iter = managementObjects.begin ();
- iter != managementObjects.end ();
- iter++) {
- ManagementObject* object = iter->second;
-
- if (object->getConfigChanged() || object->getInstChanged())
- object->setUpdateTime();
+ clientWasAdded = false;
- if (object->getConfigChanged() || object->getForcePublish() || object->isDeleted()) {
- Buffer msgBuffer (msgChars, BUFSIZE);
- encodeHeader (msgBuffer, 'c');
- object->writeProperties(msgBuffer);
+ //
+ // Process the entire object map.
+ //
+ for (ManagementObjectMap::iterator baseIter = managementObjects.begin();
+ baseIter != managementObjects.end();
+ baseIter++) {
+ ManagementObject* baseObject = baseIter->second;
+
+ //
+ // Skip until we find a base object requiring a sent message.
+ //
+ if (baseObject->getFlags() == 1 ||
+ (!baseObject->getConfigChanged() &&
+ !baseObject->getInstChanged() &&
+ !baseObject->getForcePublish() &&
+ !baseObject->isDeleted()))
+ continue;
- contentSize = BUFSIZE - msgBuffer.available ();
- msgBuffer.reset ();
- routingKey = "console.obj.1.0." + object->getPackageName() + "." + object->getClassName();
- sendBuffer (msgBuffer, contentSize, mExchange, routingKey);
- }
+ Buffer msgBuffer(msgChars, BUFSIZE);
+ for (ManagementObjectMap::iterator iter = baseIter;
+ iter != managementObjects.end();
+ iter++) {
+ ManagementObject* object = iter->second;
+ if (baseObject->isSameClass(*object) && object->getFlags() == 0) {
+ object->setFlags(1);
+ if (object->getConfigChanged() || object->getInstChanged())
+ object->setUpdateTime();
+
+ if (object->getConfigChanged() || object->getForcePublish() || object->isDeleted()) {
+ encodeHeader(msgBuffer, 'c');
+ object->writeProperties(msgBuffer);
+ }
- if (object->hasInst() && (object->getInstChanged() || object->getForcePublish())) {
- Buffer msgBuffer (msgChars, BUFSIZE);
- encodeHeader (msgBuffer, 'i');
- object->writeStatistics(msgBuffer);
-
- contentSize = BUFSIZE - msgBuffer.available ();
- msgBuffer.reset ();
- routingKey = "console.obj.1.0." + object->getPackageName() + "." + object->getClassName();
- sendBuffer (msgBuffer, contentSize, mExchange, routingKey);
+ if (object->hasInst() && (object->getInstChanged() || object->getForcePublish())) {
+ encodeHeader(msgBuffer, 'i');
+ object->writeStatistics(msgBuffer);
+ }
+
+ if (object->isDeleted())
+ deleteList.push_back(pair<ObjectId, ManagementObject*>(iter->first, object));
+ object->setForcePublish(false);
+
+ if (msgBuffer.available() < HEADROOM)
+ break;
+ }
}
- if (object->isDeleted())
- deleteList.push_back(pair<ObjectId, ManagementObject*>(iter->first, object));
- object->setForcePublish(false);
+ contentSize = BUFSIZE - msgBuffer.available();
+ if (contentSize > 0) {
+ msgBuffer.reset();
+ stringstream key;
+ key << "console.obj.1.0." << baseObject->getPackageName() << "." << baseObject->getClassName();
+ sendBuffer(msgBuffer, contentSize, mExchange, key.str());
+ }
}
// Delete flagged objects