summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp')
-rw-r--r--cpp/include/qpid/management/ManagementObject.h7
-rw-r--r--cpp/src/qpid/broker/DirectExchange.cpp1
-rw-r--r--cpp/src/qpid/broker/Exchange.cpp30
-rw-r--r--cpp/src/qpid/broker/Exchange.h3
-rw-r--r--cpp/src/qpid/broker/FanOutExchange.cpp1
-rw-r--r--cpp/src/qpid/broker/HeadersExchange.cpp1
-rw-r--r--cpp/src/qpid/broker/TopicExchange.cpp1
-rw-r--r--cpp/src/qpid/management/ManagementAgent.cpp79
-rw-r--r--cpp/src/qpid/management/ManagementAgent.h11
-rw-r--r--cpp/src/qpid/management/ManagementObject.cpp20
-rw-r--r--cpp/src/qpid/xml/XmlExchange.h2
11 files changed, 130 insertions, 26 deletions
diff --git a/cpp/include/qpid/management/ManagementObject.h b/cpp/include/qpid/management/ManagementObject.h
index 6475ff5406..b1c70f64d6 100644
--- a/cpp/include/qpid/management/ManagementObject.h
+++ b/cpp/include/qpid/management/ManagementObject.h
@@ -27,12 +27,14 @@
#include <qpid/framing/Buffer.h>
#include "qpid/CommonImportExport.h"
#include <map>
+#include <vector>
namespace qpid {
namespace management {
class Manageable;
class ObjectId;
+class ManagementObject;
class AgentAttachment {
@@ -65,7 +67,9 @@ public:
QPID_COMMON_EXTERN uint32_t encodedSize() const { return 16; };
QPID_COMMON_EXTERN void encode(framing::Buffer& buffer) const;
QPID_COMMON_EXTERN void decode(framing::Buffer& buffer);
- QPID_COMMON_EXTERN void setV2Key(const std::string& key) { v2Key = key; }
+ QPID_COMMON_EXTERN void setV2Key(const std::string& _key) { v2Key = _key; }
+ QPID_COMMON_EXTERN void setV2Key(const ManagementObject& object);
+ QPID_COMMON_EXTERN bool equalV1(const ObjectId &other) const;
QPID_COMMON_EXTERN const std::string& getV2Key() const { return v2Key; }
friend QPID_COMMON_EXTERN std::ostream& operator<<(std::ostream&, const ObjectId&);
};
@@ -192,6 +196,7 @@ protected:
};
typedef std::map<ObjectId, ManagementObject*> ManagementObjectMap;
+typedef std::vector<ManagementObject*> ManagementObjectVector;
}}
diff --git a/cpp/src/qpid/broker/DirectExchange.cpp b/cpp/src/qpid/broker/DirectExchange.cpp
index 094f59cdec..05179502e6 100644
--- a/cpp/src/qpid/broker/DirectExchange.cpp
+++ b/cpp/src/qpid/broker/DirectExchange.cpp
@@ -77,6 +77,7 @@ bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, con
if (exclusiveBinding) bk.queues.clear();
if (bk.queues.add_unless(b, MatchQueue(queue))) {
+ b->startManagement();
propagate = bk.fedBinding.addOrigin(fedOrigin);
if (mgmtExchange != 0) {
mgmtExchange->inc_bindingCount();
diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp
index 16eb75c88b..7bb70ed24a 100644
--- a/cpp/src/qpid/broker/Exchange.cpp
+++ b/cpp/src/qpid/broker/Exchange.cpp
@@ -306,9 +306,23 @@ void Exchange::propagateFedOp(const string& routingKey, const string& tags, cons
(*iter)->propagateBinding(routingKey, tags, op, origin, extra_args);
}
-Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchange* parent,
- FieldTable _args, const string& origin)
- : queue(_queue), key(_key), args(_args), mgmtBinding(0)
+Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchange* _parent,
+ FieldTable _args, const string& _origin)
+ : parent(_parent), queue(_queue), key(_key), args(_args), origin(_origin), mgmtBinding(0)
+{
+}
+
+Exchange::Binding::~Binding ()
+{
+ if (mgmtBinding != 0) {
+ ManagementObject* mo = queue->GetManagementObject();
+ if (mo != 0)
+ static_cast<_qmf::Queue*>(mo)->dec_bindingCount();
+ mgmtBinding->resourceDestroy ();
+ }
+}
+
+void Exchange::Binding::startManagement()
{
if (parent != 0)
{
@@ -333,16 +347,6 @@ Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchang
}
}
-Exchange::Binding::~Binding ()
-{
- if (mgmtBinding != 0) {
- ManagementObject* mo = queue->GetManagementObject();
- if (mo != 0)
- static_cast<_qmf::Queue*>(mo)->dec_bindingCount();
- mgmtBinding->resourceDestroy ();
- }
-}
-
ManagementObject* Exchange::Binding::GetManagementObject () const
{
return (ManagementObject*) mgmtBinding;
diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h
index dfe69e2c04..23d044ffd3 100644
--- a/cpp/src/qpid/broker/Exchange.h
+++ b/cpp/src/qpid/broker/Exchange.h
@@ -45,14 +45,17 @@ public:
typedef boost::shared_ptr<Binding> shared_ptr;
typedef std::vector<Binding::shared_ptr> vector;
+ Exchange* parent;
Queue::shared_ptr queue;
const std::string key;
const framing::FieldTable args;
+ std::string origin;
qmf::org::apache::qpid::broker::Binding* mgmtBinding;
Binding(const std::string& key, Queue::shared_ptr queue, Exchange* parent = 0,
framing::FieldTable args = framing::FieldTable(), const std::string& origin = std::string());
~Binding();
+ void startManagement();
management::ManagementObject* GetManagementObject() const;
};
diff --git a/cpp/src/qpid/broker/FanOutExchange.cpp b/cpp/src/qpid/broker/FanOutExchange.cpp
index 6d840b50df..ef410a9154 100644
--- a/cpp/src/qpid/broker/FanOutExchange.cpp
+++ b/cpp/src/qpid/broker/FanOutExchange.cpp
@@ -63,6 +63,7 @@ bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*key*/, const
if (args == 0 || fedOp.empty() || fedOp == fedOpBind) {
Binding::shared_ptr binding (new Binding ("", queue, this, FieldTable(), fedOrigin));
if (bindings.add_unless(binding, MatchQueue(queue))) {
+ binding->startManagement();
propagate = fedBinding.addOrigin(fedOrigin);
if (mgmtExchange != 0) {
mgmtExchange->inc_bindingCount();
diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp
index e4a76a0bcd..640036e741 100644
--- a/cpp/src/qpid/broker/HeadersExchange.cpp
+++ b/cpp/src/qpid/broker/HeadersExchange.cpp
@@ -114,6 +114,7 @@ bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, co
Binding::shared_ptr binding (new Binding (bindingKey, queue, this, *args));
BoundKey bk(binding);
if (bindings.add_unless(bk, MatchArgs(queue, args))) {
+ binding->startManagement();
propagate = bk.fedBinding.addOrigin(fedOrigin);
if (mgmtExchange != 0) {
mgmtExchange->inc_bindingCount();
diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp
index dd57549b5d..6e53ef5fd2 100644
--- a/cpp/src/qpid/broker/TopicExchange.cpp
+++ b/cpp/src/qpid/broker/TopicExchange.cpp
@@ -207,6 +207,7 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons
return false;
} else {
Binding::shared_ptr binding (new Binding (routingPattern, queue, this, FieldTable(), fedOrigin));
+ binding->startManagement();
BoundKey& bk = bindings[routingPattern];
bk.bindingVector.push_back(binding);
propagate = bk.fedBinding.addOrigin(fedOrigin);
diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp
index e21edb4051..8dd680997f 100644
--- a/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/cpp/src/qpid/management/ManagementAgent.cpp
@@ -85,6 +85,12 @@ ManagementAgent::~ManagementAgent ()
delete object;
}
managementObjects.clear();
+
+ while (!deletedManagementObjects.empty()) {
+ ManagementObject* object = deletedManagementObjects.back();
+ delete object;
+ deletedManagementObjects.pop_back();
+ }
}
}
@@ -196,9 +202,20 @@ ObjectId ManagementAgent::addObject(ManagementObject* object,
}
ObjectId objId(0 /*flags*/ , sequence, brokerBank, 0, objectNum);
- objId.setV2Key(object->getKey());
+ objId.setV2Key(*object);
object->setObjectId(objId);
+ ManagementObjectMap::iterator destIter = newManagementObjects.find(objId);
+ if (destIter != newManagementObjects.end()) {
+ if (destIter->second->isDeleted()) {
+ newDeletedManagementObjects.push_back(destIter->second);
+ newManagementObjects.erase(destIter);
+ } else {
+ QPID_LOG(error, "ObjectId collision in addObject. class=" << object->getClassName() <<
+ " key=" << objId.getV2Key());
+ return objId;
+ }
+ }
newManagementObjects[objId] = object;
if (publishNow) {
@@ -344,9 +361,31 @@ void ManagementAgent::moveNewObjectsLH()
Mutex::ScopedLock lock (addLock);
for (ManagementObjectMap::iterator iter = newManagementObjects.begin ();
iter != newManagementObjects.end ();
- iter++)
- managementObjects[iter->first] = iter->second;
+ iter++) {
+ bool skip = false;
+ ManagementObjectMap::iterator destIter = managementObjects.find(iter->first);
+ if (destIter != managementObjects.end()) {
+ // We have an objectId collision with an existing object. If the old object
+ // is deleted, move it to the deleted list.
+ if (destIter->second->isDeleted()) {
+ deletedManagementObjects.push_back(destIter->second);
+ managementObjects.erase(destIter);
+ } else {
+ QPID_LOG(error, "ObjectId collision in moveNewObjects. class=" <<
+ iter->second->getClassName() << " key=" << iter->first.getV2Key());
+ skip = true;
+ }
+ }
+
+ if (!skip)
+ managementObjects[iter->first] = iter->second;
+ }
newManagementObjects.clear();
+
+ while (!newDeletedManagementObjects.empty()) {
+ deletedManagementObjects.push_back(newDeletedManagementObjects.back());
+ newDeletedManagementObjects.pop_back();
+ }
}
void ManagementAgent::periodicProcessing (void)
@@ -449,7 +488,23 @@ void ManagementAgent::periodicProcessing (void)
managementObjects.erase(iter->first);
}
- if (!deleteList.empty()) {
+ // Publish the deletion of objects created by insert-collision
+ bool collisionDeletions = false;
+ for (ManagementObjectVector::iterator cdIter = deletedManagementObjects.begin();
+ cdIter != deletedManagementObjects.end(); cdIter++) {
+ collisionDeletions = true;
+ Buffer msgBuffer(msgChars, BUFSIZE);
+ encodeHeader(msgBuffer, 'c');
+ (*cdIter)->writeProperties(msgBuffer);
+ contentSize = BUFSIZE - msgBuffer.available ();
+ msgBuffer.reset ();
+ stringstream key;
+ key << "console.obj.1.0." << (*cdIter)->getPackageName() << "." << (*cdIter)->getClassName();
+ sendBuffer (msgBuffer, contentSize, mExchange, key.str());
+ QPID_LOG(trace, "SEND ContentInd for deleted object to=" << key.str());
+ }
+
+ if (!deleteList.empty() || collisionDeletions) {
deleteList.clear();
deleteOrphanedAgentsLH();
}
@@ -596,7 +651,7 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey
}
}
- ManagementObjectMap::iterator iter = managementObjects.find(objId);
+ ManagementObjectMap::iterator iter = numericFind(objId);
if (iter == managementObjects.end() || iter->second->isDeleted()) {
outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT);
outBuffer.putMediumString(Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT));
@@ -967,7 +1022,7 @@ void ManagementAgent::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uin
return;
ObjectId selector(value->get<string>());
- ManagementObjectMap::iterator iter = managementObjects.find(selector);
+ ManagementObjectMap::iterator iter = numericFind(selector);
if (iter != managementObjects.end()) {
ManagementObject* object = iter->second;
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
@@ -1294,6 +1349,18 @@ size_t ManagementAgent::validateEventSchema(Buffer& inBuffer)
return end - start;
}
+ManagementObjectMap::iterator ManagementAgent::numericFind(const ObjectId& oid)
+{
+ ManagementObjectMap::iterator iter = managementObjects.begin();
+ for (; iter != managementObjects.end(); iter++) {
+ if (oid.equalV1(iter->first))
+ break;
+ }
+
+ return iter;
+}
+
+
void ManagementAgent::setAllocator(std::auto_ptr<IdAllocator> a)
{
Mutex::ScopedLock lock (addLock);
diff --git a/cpp/src/qpid/management/ManagementAgent.h b/cpp/src/qpid/management/ManagementAgent.h
index ea04a6cb72..3e00ebeb81 100644
--- a/cpp/src/qpid/management/ManagementAgent.h
+++ b/cpp/src/qpid/management/ManagementAgent.h
@@ -217,8 +217,18 @@ private:
RemoteAgentMap remoteAgents;
PackageMap packages;
+
+ //
+ // Protected by userLock
+ //
ManagementObjectMap managementObjects;
+ ManagementObjectVector deletedManagementObjects;
+
+ //
+ // Protected by addLock
+ //
ManagementObjectMap newManagementObjects;
+ ManagementObjectVector newDeletedManagementObjects;
framing::Uuid uuid;
sys::Mutex addLock;
@@ -295,6 +305,7 @@ private:
size_t validateSchema(framing::Buffer&, uint8_t kind);
size_t validateTableSchema(framing::Buffer&);
size_t validateEventSchema(framing::Buffer&);
+ ManagementObjectMap::iterator numericFind(const ObjectId& oid);
void debugSnapshot(const char*);
};
diff --git a/cpp/src/qpid/management/ManagementObject.cpp b/cpp/src/qpid/management/ManagementObject.cpp
index 4ac6613419..4b87800174 100644
--- a/cpp/src/qpid/management/ManagementObject.cpp
+++ b/cpp/src/qpid/management/ManagementObject.cpp
@@ -109,16 +109,18 @@ void ObjectId::fromString(const std::string& text)
bool ObjectId::operator==(const ObjectId &other) const
{
- uint64_t otherFirst = agent == 0 ? other.first : other.first & 0xffff000000000000LL;
-
- return first == otherFirst && second == other.second;
+ return v2Key == other.v2Key;
}
bool ObjectId::operator<(const ObjectId &other) const
{
- uint64_t otherFirst = agent == 0 ? other.first : other.first & 0xffff000000000000LL;
+ return v2Key < other.v2Key;
+}
- return (first < otherFirst) || ((first == otherFirst) && (second < other.second));
+bool ObjectId::equalV1(const ObjectId &other) const
+{
+ uint64_t otherFirst = agent == 0 ? other.first : other.first & 0xffff000000000000LL;
+ return first == otherFirst && second == other.second;
}
void ObjectId::encode(framing::Buffer& buffer) const
@@ -136,6 +138,14 @@ void ObjectId::decode(framing::Buffer& buffer)
second = buffer.getLongLong();
}
+void ObjectId::setV2Key(const ManagementObject& object)
+{
+ std::stringstream oname;
+ oname << object.getPackageName() << "." << object.getClassName() << ":" << object.getKey();
+ v2Key = oname.str();
+}
+
+
namespace qpid {
namespace management {
diff --git a/cpp/src/qpid/xml/XmlExchange.h b/cpp/src/qpid/xml/XmlExchange.h
index 802afddeab..4394ede5e7 100644
--- a/cpp/src/qpid/xml/XmlExchange.h
+++ b/cpp/src/qpid/xml/XmlExchange.h
@@ -53,7 +53,7 @@ class XmlExchange : public virtual Exchange {
const ::qpid::framing::FieldTable& _arguments, Query query):
Binding(key, queue, parent, _arguments),
xquery(query),
- parse_message_content(true) {}
+ parse_message_content(true) { startManagement(); }
};