summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorNuno Santos <nsantos@apache.org>2008-05-16 01:53:17 +0000
committerNuno Santos <nsantos@apache.org>2008-05-16 01:53:17 +0000
commitcc9a1e5a248ce3cc35d7af76d965196228e3a980 (patch)
tree68c3a841d96bcdf3baa54da484f07218b82054de /cpp
parent652be6b00e5b02deca44e291de640762b26a66e3 (diff)
downloadqpid-python-cc9a1e5a248ce3cc35d7af76d965196228e3a980.tar.gz
QPID-1065: Management messages may lost if client attach hits a small time window -- patch supplied by Ted Ross
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@656920 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/management/ManagementBroker.cpp45
-rw-r--r--cpp/src/qpid/management/ManagementBroker.h1
-rw-r--r--cpp/src/qpid/management/ManagementExchange.cpp8
-rw-r--r--cpp/src/qpid/management/ManagementExchange.h4
4 files changed, 42 insertions, 16 deletions
diff --git a/cpp/src/qpid/management/ManagementBroker.cpp b/cpp/src/qpid/management/ManagementBroker.cpp
index da6a6fd742..ba53e66f1e 100644
--- a/cpp/src/qpid/management/ManagementBroker.cpp
+++ b/cpp/src/qpid/management/ManagementBroker.cpp
@@ -55,6 +55,7 @@ ManagementBroker::ManagementBroker (string _dataDir, uint16_t _interval, Managea
nextObjectId = 1;
bootSequence = 1;
nextRemoteBank = 10;
+ clientWasAdded = false;
// Get from file or generate and save to file.
if (dataDir.empty ())
@@ -129,16 +130,16 @@ void ManagementBroker::shutdown (void)
}
void ManagementBroker::setExchange (broker::Exchange::shared_ptr _mexchange,
- broker::Exchange::shared_ptr _dexchange)
+ broker::Exchange::shared_ptr _dexchange)
{
mExchange = _mexchange;
dExchange = _dexchange;
}
void ManagementBroker::RegisterClass (string packageName,
- string className,
- uint8_t* md5Sum,
- ManagementObject::writeSchemaCall_t schemaCall)
+ string className,
+ uint8_t* md5Sum,
+ ManagementObject::writeSchemaCall_t schemaCall)
{
Mutex::ScopedLock lock (userLock);
PackageMap::iterator pIter = FindOrAddPackage (packageName);
@@ -146,15 +147,22 @@ void ManagementBroker::RegisterClass (string packageName,
}
void ManagementBroker::addObject (ManagementObject::shared_ptr object,
- uint32_t persistId,
- uint32_t persistBank)
+ uint32_t persistId,
+ uint32_t persistBank)
{
Mutex::ScopedLock lock (userLock);
uint64_t objectId;
if (persistId == 0)
+ {
objectId = ((uint64_t) bootSequence) << 48 |
((uint64_t) localBank) << 24 | nextObjectId++;
+ if ((nextObjectId & 0xFF000000) != 0)
+ {
+ nextObjectId = 1;
+ localBank++;
+ }
+ }
else
objectId = ((uint64_t) persistBank) << 24 | persistId;
@@ -175,13 +183,9 @@ void ManagementBroker::Periodic::fire ()
void ManagementBroker::clientAdded (void)
{
- for (ManagementObjectMap::iterator iter = managementObjects.begin ();
- iter != managementObjects.end ();
- iter++)
- {
- ManagementObject::shared_ptr object = iter->second;
- object->setAllChanged ();
- }
+ Mutex::ScopedLock lock (userLock);
+
+ clientWasAdded = true;
}
void ManagementBroker::EncodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq)
@@ -258,6 +262,18 @@ void ManagementBroker::PeriodicProcessing (void)
SendBuffer (msgBuffer, contentSize, mExchange, routingKey);
}
+ if (clientWasAdded)
+ {
+ clientWasAdded = false;
+ for (ManagementObjectMap::iterator iter = managementObjects.begin ();
+ iter != managementObjects.end ();
+ iter++)
+ {
+ ManagementObject::shared_ptr object = iter->second;
+ object->setAllChanged ();
+ }
+ }
+
if (managementObjects.empty ())
return;
@@ -542,9 +558,6 @@ void ManagementBroker::handleSchemaRequestLH (Buffer& inBuffer, string replyToKe
outBuffer.reset ();
SendBuffer (outBuffer, outLen, dExchange, replyToKey);
}
-
- clientAdded ();
- // TODO: Send client-added to each remote agent.
}
}
}
diff --git a/cpp/src/qpid/management/ManagementBroker.h b/cpp/src/qpid/management/ManagementBroker.h
index 2e02cb2a43..70d39c0ca8 100644
--- a/cpp/src/qpid/management/ManagementBroker.h
+++ b/cpp/src/qpid/management/ManagementBroker.h
@@ -155,6 +155,7 @@ class ManagementBroker : public ManagementAgent
uint32_t localBank;
uint32_t nextObjectId;
uint32_t nextRemoteBank;
+ bool clientWasAdded;
# define MA_BUFFER_SIZE 65536
char inputBuffer[MA_BUFFER_SIZE];
diff --git a/cpp/src/qpid/management/ManagementExchange.cpp b/cpp/src/qpid/management/ManagementExchange.cpp
index 28e6fb8d0a..b4824549ed 100644
--- a/cpp/src/qpid/management/ManagementExchange.cpp
+++ b/cpp/src/qpid/management/ManagementExchange.cpp
@@ -53,6 +53,14 @@ void ManagementExchange::route (Deliverable& msg,
TopicExchange::route (msg, routingKey, args);
}
+bool ManagementExchange::bind (Queue::shared_ptr queue,
+ const string& routingKey,
+ const qpid::framing::FieldTable* args)
+{
+ managementAgent->clientAdded ();
+ return TopicExchange::bind (queue, routingKey, args);
+}
+
void ManagementExchange::setManagmentAgent (ManagementBroker* agent)
{
managementAgent = agent;
diff --git a/cpp/src/qpid/management/ManagementExchange.h b/cpp/src/qpid/management/ManagementExchange.h
index 28066b1e80..d54db1a74e 100644
--- a/cpp/src/qpid/management/ManagementExchange.h
+++ b/cpp/src/qpid/management/ManagementExchange.h
@@ -46,6 +46,10 @@ class ManagementExchange : public virtual TopicExchange
const string& routingKey,
const qpid::framing::FieldTable* args);
+ virtual bool bind (Queue::shared_ptr queue,
+ const string& routingKey,
+ const qpid::framing::FieldTable* args);
+
void setManagmentAgent (management::ManagementBroker* agent);
virtual ~ManagementExchange();