diff options
author | Nuno Santos <nsantos@apache.org> | 2008-05-16 01:53:17 +0000 |
---|---|---|
committer | Nuno Santos <nsantos@apache.org> | 2008-05-16 01:53:17 +0000 |
commit | cc9a1e5a248ce3cc35d7af76d965196228e3a980 (patch) | |
tree | 68c3a841d96bcdf3baa54da484f07218b82054de /cpp | |
parent | 652be6b00e5b02deca44e291de640762b26a66e3 (diff) | |
download | qpid-python-cc9a1e5a248ce3cc35d7af76d965196228e3a980.tar.gz |
QPID-1065: Management messages may lost if client attach hits a small time window -- patch supplied by Ted Ross
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@656920 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/qpid/management/ManagementBroker.cpp | 45 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementBroker.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementExchange.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementExchange.h | 4 |
4 files changed, 42 insertions, 16 deletions
diff --git a/cpp/src/qpid/management/ManagementBroker.cpp b/cpp/src/qpid/management/ManagementBroker.cpp index da6a6fd742..ba53e66f1e 100644 --- a/cpp/src/qpid/management/ManagementBroker.cpp +++ b/cpp/src/qpid/management/ManagementBroker.cpp @@ -55,6 +55,7 @@ ManagementBroker::ManagementBroker (string _dataDir, uint16_t _interval, Managea nextObjectId = 1; bootSequence = 1; nextRemoteBank = 10; + clientWasAdded = false; // Get from file or generate and save to file. if (dataDir.empty ()) @@ -129,16 +130,16 @@ void ManagementBroker::shutdown (void) } void ManagementBroker::setExchange (broker::Exchange::shared_ptr _mexchange, - broker::Exchange::shared_ptr _dexchange) + broker::Exchange::shared_ptr _dexchange) { mExchange = _mexchange; dExchange = _dexchange; } void ManagementBroker::RegisterClass (string packageName, - string className, - uint8_t* md5Sum, - ManagementObject::writeSchemaCall_t schemaCall) + string className, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall) { Mutex::ScopedLock lock (userLock); PackageMap::iterator pIter = FindOrAddPackage (packageName); @@ -146,15 +147,22 @@ void ManagementBroker::RegisterClass (string packageName, } void ManagementBroker::addObject (ManagementObject::shared_ptr object, - uint32_t persistId, - uint32_t persistBank) + uint32_t persistId, + uint32_t persistBank) { Mutex::ScopedLock lock (userLock); uint64_t objectId; if (persistId == 0) + { objectId = ((uint64_t) bootSequence) << 48 | ((uint64_t) localBank) << 24 | nextObjectId++; + if ((nextObjectId & 0xFF000000) != 0) + { + nextObjectId = 1; + localBank++; + } + } else objectId = ((uint64_t) persistBank) << 24 | persistId; @@ -175,13 +183,9 @@ void ManagementBroker::Periodic::fire () void ManagementBroker::clientAdded (void) { - for (ManagementObjectMap::iterator iter = managementObjects.begin (); - iter != managementObjects.end (); - iter++) - { - ManagementObject::shared_ptr object = iter->second; - object->setAllChanged (); - } + Mutex::ScopedLock lock (userLock); + + clientWasAdded = true; } void ManagementBroker::EncodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq) @@ -258,6 +262,18 @@ void ManagementBroker::PeriodicProcessing (void) SendBuffer (msgBuffer, contentSize, mExchange, routingKey); } + if (clientWasAdded) + { + clientWasAdded = false; + for (ManagementObjectMap::iterator iter = managementObjects.begin (); + iter != managementObjects.end (); + iter++) + { + ManagementObject::shared_ptr object = iter->second; + object->setAllChanged (); + } + } + if (managementObjects.empty ()) return; @@ -542,9 +558,6 @@ void ManagementBroker::handleSchemaRequestLH (Buffer& inBuffer, string replyToKe outBuffer.reset (); SendBuffer (outBuffer, outLen, dExchange, replyToKey); } - - clientAdded (); - // TODO: Send client-added to each remote agent. } } } diff --git a/cpp/src/qpid/management/ManagementBroker.h b/cpp/src/qpid/management/ManagementBroker.h index 2e02cb2a43..70d39c0ca8 100644 --- a/cpp/src/qpid/management/ManagementBroker.h +++ b/cpp/src/qpid/management/ManagementBroker.h @@ -155,6 +155,7 @@ class ManagementBroker : public ManagementAgent uint32_t localBank; uint32_t nextObjectId; uint32_t nextRemoteBank; + bool clientWasAdded; # define MA_BUFFER_SIZE 65536 char inputBuffer[MA_BUFFER_SIZE]; diff --git a/cpp/src/qpid/management/ManagementExchange.cpp b/cpp/src/qpid/management/ManagementExchange.cpp index 28e6fb8d0a..b4824549ed 100644 --- a/cpp/src/qpid/management/ManagementExchange.cpp +++ b/cpp/src/qpid/management/ManagementExchange.cpp @@ -53,6 +53,14 @@ void ManagementExchange::route (Deliverable& msg, TopicExchange::route (msg, routingKey, args); } +bool ManagementExchange::bind (Queue::shared_ptr queue, + const string& routingKey, + const qpid::framing::FieldTable* args) +{ + managementAgent->clientAdded (); + return TopicExchange::bind (queue, routingKey, args); +} + void ManagementExchange::setManagmentAgent (ManagementBroker* agent) { managementAgent = agent; diff --git a/cpp/src/qpid/management/ManagementExchange.h b/cpp/src/qpid/management/ManagementExchange.h index 28066b1e80..d54db1a74e 100644 --- a/cpp/src/qpid/management/ManagementExchange.h +++ b/cpp/src/qpid/management/ManagementExchange.h @@ -46,6 +46,10 @@ class ManagementExchange : public virtual TopicExchange const string& routingKey, const qpid::framing::FieldTable* args); + virtual bool bind (Queue::shared_ptr queue, + const string& routingKey, + const qpid::framing::FieldTable* args); + void setManagmentAgent (management::ManagementBroker* agent); virtual ~ManagementExchange(); |