summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/cpp/src/tests/BrokerMgmtAgent.cpp371
1 files changed, 362 insertions, 9 deletions
diff --git a/qpid/cpp/src/tests/BrokerMgmtAgent.cpp b/qpid/cpp/src/tests/BrokerMgmtAgent.cpp
index 26d62ff952..6a31dc5cfc 100644
--- a/qpid/cpp/src/tests/BrokerMgmtAgent.cpp
+++ b/qpid/cpp/src/tests/BrokerMgmtAgent.cpp
@@ -45,6 +45,9 @@ namespace qpid {
typedef boost::shared_ptr<_qmf::TestObject> TestObjectPtr;
typedef std::vector<TestObjectPtr> TestObjectVector;
+ // Instantiates a broker and its internal management agent. Provides
+ // factories for constructing Receivers for object indication messages.
+ //
class AgentFixture
{
MessagingFixture *mFix;
@@ -87,7 +90,6 @@ namespace qpid {
"key: 'agent.ind.data.")
+ p + std::string(".") + k
+ std::string("'}]}}"));
-
};
};
@@ -101,7 +103,7 @@ namespace qpid {
TestManageable(management::ManagementAgent *agent) {
_qmf::TestObject *tmp = new _qmf::TestObject(agent, this);
- // see it with some default values...
+ // seed it with some default values...
tmp->set_string1("This is a test string!");
tmp->set_bool1(true);
qpid::types::Variant::Map vMap;
@@ -116,6 +118,7 @@ namespace qpid {
management::ManagementObject* GetManagementObject() const { return mgmtObj; };
static void validateTestObjectProperties(_qmf::TestObject& to)
{
+ // verify the default values are as expected
BOOST_CHECK(to.get_string1() == std::string("This is a test string!"));
BOOST_CHECK(to.get_bool1() == true);
BOOST_CHECK(to.get_map1().size() == 3);
@@ -192,7 +195,7 @@ namespace qpid {
//
QPID_AUTO_TEST_CASE(v1ObjPublish)
{
- AgentFixture* fix = new AgentFixture(5);
+ AgentFixture* fix = new AgentFixture(3);
management::ManagementAgent* agent;
agent = fix->getBrokerAgent();
@@ -202,11 +205,11 @@ namespace qpid {
Receiver r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#");
- agent->addObject(tm->GetManagementObject(), "testobj-1");
+ agent->addObject(tm->GetManagementObject(), 1);
// wait for the object to be published
Message m1;
- BOOST_CHECK(r1.fetch(m1, Duration::SECOND * 10));
+ BOOST_CHECK(r1.fetch(m1, Duration::SECOND * 6));
TestObjectVector objs;
decodeV1ObjectUpdates(m1, objs, objLen);
@@ -228,7 +231,7 @@ namespace qpid {
// wait for the deleted object to be published
bool isDeleted = false;
- while (!isDeleted && r1.fetch(m1, Duration::SECOND * 10)) {
+ while (!isDeleted && r1.fetch(m1, Duration::SECOND * 6)) {
decodeV1ObjectUpdates(m1, objs, objLen);
BOOST_CHECK(objs.size() > 0);
@@ -255,7 +258,7 @@ namespace qpid {
//
QPID_AUTO_TEST_CASE(v2ObjPublish)
{
- AgentFixture* fix = new AgentFixture(5, true);
+ AgentFixture* fix = new AgentFixture(3, true);
management::ManagementAgent* agent;
agent = fix->getBrokerAgent();
@@ -267,7 +270,7 @@ namespace qpid {
// wait for the object to be published
Message m1;
- BOOST_CHECK(r1.fetch(m1, Duration::SECOND * 10));
+ BOOST_CHECK(r1.fetch(m1, Duration::SECOND * 6));
TestObjectVector objs;
decodeV2ObjectUpdates(m1, objs);
@@ -289,7 +292,7 @@ namespace qpid {
// wait for the deleted object to be published
bool isDeleted = false;
- while (!isDeleted && r1.fetch(m1, Duration::SECOND * 10)) {
+ while (!isDeleted && r1.fetch(m1, Duration::SECOND * 6)) {
decodeV2ObjectUpdates(m1, objs);
BOOST_CHECK(objs.size() > 0);
@@ -312,6 +315,356 @@ namespace qpid {
delete tm;
}
+
+ // verify that a deleted object is exported correctly using the
+ // exportDeletedObjects() method. V1 testcase.
+ //
+ QPID_AUTO_TEST_CASE(v1ExportDelObj)
+ {
+ AgentFixture* fix = new AgentFixture(3);
+ management::ManagementAgent* agent;
+ agent = fix->getBrokerAgent();
+
+ // create a manageable test object
+ TestManageable *tm = new TestManageable(agent);
+ uint32_t objLen = tm->GetManagementObject()->writePropertiesSize();
+
+ Receiver r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#");
+
+ agent->addObject(tm->GetManagementObject(), 1);
+
+ // wait for the object to be published
+ Message m1;
+ BOOST_CHECK(r1.fetch(m1, Duration::SECOND * 6));
+
+ TestObjectVector objs;
+ decodeV1ObjectUpdates(m1, objs, objLen);
+ BOOST_CHECK(objs.size() > 0);
+
+ // destroy the object, then immediately export (before the next poll cycle)
+
+ ::qpid::management::ManagementAgent::DeletedObjectList delObjs;
+ tm->GetManagementObject()->resourceDestroy();
+ agent->exportDeletedObjects( delObjs );
+ BOOST_CHECK(delObjs.size() == 1);
+
+ // wait for the deleted object to be published
+
+ bool isDeleted = false;
+ while (!isDeleted && r1.fetch(m1, Duration::SECOND * 6)) {
+
+ decodeV1ObjectUpdates(m1, objs, objLen);
+ BOOST_CHECK(objs.size() > 0);
+
+ for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) {
+
+ TestManageable::validateTestObjectProperties(**oIter);
+
+ qpid::types::Variant::Map mappy;
+ (*oIter)->writeTimestamps(mappy);
+ if (mappy["_delete_ts"].asUint64() != 0)
+ isDeleted = true;
+ }
+ }
+
+ BOOST_CHECK(isDeleted);
+
+ // verify there are no deleted objects to export now.
+
+ agent->exportDeletedObjects( delObjs );
+ BOOST_CHECK(delObjs.size() == 0);
+
+ r1.close();
+ delete fix;
+ delete tm;
+ }
+
+
+ // verify that a deleted object is imported correctly using the
+ // importDeletedObjects() method. V1 testcase.
+ //
+ QPID_AUTO_TEST_CASE(v1ImportDelObj)
+ {
+ AgentFixture* fix = new AgentFixture(3);
+ management::ManagementAgent* agent;
+ agent = fix->getBrokerAgent();
+
+ // create a manageable test object
+ TestManageable *tm = new TestManageable(agent);
+ uint32_t objLen = tm->GetManagementObject()->writePropertiesSize();
+
+ Receiver r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#");
+
+ agent->addObject(tm->GetManagementObject(), 1);
+
+ // wait for the object to be published
+ Message m1;
+ BOOST_CHECK(r1.fetch(m1, Duration::SECOND * 6));
+
+ TestObjectVector objs;
+ decodeV1ObjectUpdates(m1, objs, objLen);
+ BOOST_CHECK(objs.size() > 0);
+
+ // destroy the object, then immediately export (before the next poll cycle)
+
+ ::qpid::management::ManagementAgent::DeletedObjectList delObjs;
+ tm->GetManagementObject()->resourceDestroy();
+ agent->exportDeletedObjects( delObjs );
+ BOOST_CHECK(delObjs.size() == 1);
+
+ // destroy the broker, and reinistantiate a new one without populating it
+ // with a TestObject.
+
+ r1.close();
+ delete fix;
+ delete tm; // should no longer be necessary
+
+ fix = new AgentFixture(3);
+ r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#");
+ agent = fix->getBrokerAgent();
+ agent->importDeletedObjects( delObjs );
+
+ // wait for the deleted object to be published
+
+ bool isDeleted = false;
+ while (!isDeleted && r1.fetch(m1, Duration::SECOND * 6)) {
+
+ decodeV1ObjectUpdates(m1, objs, objLen);
+ BOOST_CHECK(objs.size() > 0);
+
+ for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) {
+
+ TestManageable::validateTestObjectProperties(**oIter);
+
+ qpid::types::Variant::Map mappy;
+ (*oIter)->writeTimestamps(mappy);
+ if (mappy["_delete_ts"].asUint64() != 0)
+ isDeleted = true;
+ }
+ }
+
+ BOOST_CHECK(isDeleted);
+
+ // verify there are no deleted objects to export now.
+
+ agent->exportDeletedObjects( delObjs );
+ BOOST_CHECK(delObjs.size() == 0);
+
+ r1.close();
+ delete fix;
+ }
+
+
+ // verify that an object that is added and deleted prior to the
+ // first poll cycle is accounted for by the export
+ //
+ QPID_AUTO_TEST_CASE(v1ExportFastDelObj)
+ {
+ AgentFixture* fix = new AgentFixture(3);
+ management::ManagementAgent* agent;
+ agent = fix->getBrokerAgent();
+
+ // create a manageable test object
+ TestManageable *tm = new TestManageable(agent);
+
+ // add, then immediately delete and export the object...
+
+ ::qpid::management::ManagementAgent::DeletedObjectList delObjs;
+ agent->addObject(tm->GetManagementObject(), 999);
+ tm->GetManagementObject()->resourceDestroy();
+ agent->exportDeletedObjects( delObjs );
+ BOOST_CHECK(delObjs.size() == 1);
+
+ delete fix;
+ delete tm;
+ }
+
+
+ // Verify that we can export and import multiple deleted objects correctly.
+ //
+ QPID_AUTO_TEST_CASE(v1ImportMultiDelObj)
+ {
+ AgentFixture* fix = new AgentFixture(3);
+ management::ManagementAgent* agent;
+ agent = fix->getBrokerAgent();
+
+ Receiver r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#");
+
+ // populate the agent with multiple test objects
+ const size_t objCount = 50;
+ std::vector<TestManageable *> tmv;
+ uint32_t objLen;
+
+ for (size_t i = 0; i < objCount; i++) {
+ TestManageable *tm = new TestManageable(agent);
+ objLen = tm->GetManagementObject()->writePropertiesSize();
+ agent->addObject(tm->GetManagementObject(), i + 1);
+ tmv.push_back(tm);
+ }
+
+ // wait for the objects to be published
+ Message m1;
+ uint32_t msgCount = 0;
+ while(r1.fetch(m1, Duration::SECOND * 6)) {
+ TestObjectVector objs;
+ decodeV1ObjectUpdates(m1, objs, objLen);
+ msgCount += objs.size();
+ }
+
+ BOOST_CHECK_EQUAL(msgCount, objCount);
+
+ // destroy some of the objects, then immediately export (before the next poll cycle)
+
+ int delCount = 0;
+ for (size_t i = 0; i < objCount; i += 2) {
+ tmv[i]->GetManagementObject()->resourceDestroy();
+ delCount++;
+ }
+
+ ::qpid::management::ManagementAgent::DeletedObjectList delObjs;
+ agent->exportDeletedObjects( delObjs );
+ BOOST_CHECK_EQUAL(delObjs.size(), delCount);
+
+ // destroy the broker, and reinistantiate a new one without populating it
+ // with TestObjects.
+
+ r1.close();
+ delete fix;
+ while (tmv.size()) {
+ delete tmv.back();
+ tmv.pop_back();
+ }
+
+ fix = new AgentFixture(3);
+ r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#");
+ agent = fix->getBrokerAgent();
+ agent->importDeletedObjects( delObjs );
+
+ // wait for the deleted object to be published, verify the count
+
+ int countDels = 0;
+ while (r1.fetch(m1, Duration::SECOND * 6)) {
+ TestObjectVector objs;
+ decodeV1ObjectUpdates(m1, objs, objLen);
+ BOOST_CHECK(objs.size() > 0);
+
+
+ for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) {
+
+ TestManageable::validateTestObjectProperties(**oIter);
+
+ qpid::types::Variant::Map mappy;
+ (*oIter)->writeTimestamps(mappy);
+ if (mappy["_delete_ts"].asUint64() != 0)
+ countDels++;
+ }
+ }
+
+ // make sure we get the correct # of deleted objects
+ BOOST_CHECK_EQUAL(countDels, delCount);
+
+ // verify there are no deleted objects to export now.
+
+ agent->exportDeletedObjects( delObjs );
+ BOOST_CHECK(delObjs.size() == 0);
+
+ r1.close();
+ delete fix;
+ }
+
+ // Verify that we can export and import multiple deleted objects correctly.
+ // QMF V2 variant
+ QPID_AUTO_TEST_CASE(v2ImportMultiDelObj)
+ {
+ AgentFixture* fix = new AgentFixture(3, true);
+ management::ManagementAgent* agent;
+ agent = fix->getBrokerAgent();
+
+ Receiver r1 = fix->createV2DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#");
+
+ // populate the agent with multiple test objects
+ const size_t objCount = 50;
+ std::vector<TestManageable *> tmv;
+ uint32_t objLen;
+
+ for (size_t i = 0; i < objCount; i++) {
+ std::stringstream key;
+ key << "testobj-" << i;
+ TestManageable *tm = new TestManageable(agent);
+ objLen = tm->GetManagementObject()->writePropertiesSize();
+ agent->addObject(tm->GetManagementObject(), key.str());
+ tmv.push_back(tm);
+ }
+
+ // wait for the objects to be published
+ Message m1;
+ uint32_t msgCount = 0;
+ while(r1.fetch(m1, Duration::SECOND * 6)) {
+ TestObjectVector objs;
+ decodeV2ObjectUpdates(m1, objs);
+ msgCount += objs.size();
+ }
+
+ BOOST_CHECK_EQUAL(msgCount, objCount);
+
+ // destroy some of the objects, then immediately export (before the next poll cycle)
+
+ int delCount = 0;
+ for (size_t i = 0; i < objCount; i += 2) {
+ tmv[i]->GetManagementObject()->resourceDestroy();
+ delCount++;
+ }
+
+ ::qpid::management::ManagementAgent::DeletedObjectList delObjs;
+ agent->exportDeletedObjects( delObjs );
+ BOOST_CHECK_EQUAL(delObjs.size(), delCount);
+
+ // destroy the broker, and reinistantiate a new one without populating it
+ // with TestObjects.
+
+ r1.close();
+ delete fix;
+ while (tmv.size()) {
+ delete tmv.back();
+ tmv.pop_back();
+ }
+
+ fix = new AgentFixture(3, true);
+ r1 = fix->createV2DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#");
+ agent = fix->getBrokerAgent();
+ agent->importDeletedObjects( delObjs );
+
+ // wait for the deleted object to be published, verify the count
+
+ int countDels = 0;
+ while (r1.fetch(m1, Duration::SECOND * 6)) {
+ TestObjectVector objs;
+ decodeV2ObjectUpdates(m1, objs);
+ BOOST_CHECK(objs.size() > 0);
+
+ for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) {
+
+ TestManageable::validateTestObjectProperties(**oIter);
+
+ qpid::types::Variant::Map mappy;
+ (*oIter)->writeTimestamps(mappy);
+ if (mappy["_delete_ts"].asUint64() != 0)
+ countDels++;
+ }
+ }
+
+ // make sure we get the correct # of deleted objects
+ BOOST_CHECK_EQUAL(countDels, delCount);
+
+ // verify there are no deleted objects to export now.
+
+ agent->exportDeletedObjects( delObjs );
+ BOOST_CHECK(delObjs.size() == 0);
+
+ r1.close();
+ delete fix;
+ }
+
QPID_AUTO_TEST_SUITE_END()
}
}