summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/cpp/src/cluster.mk4
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp7
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.cpp24
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.h2
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateClient.cpp40
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp96
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateDataExchange.h81
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.cpp315
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.h41
-rwxr-xr-xqpid/cpp/src/tests/cluster_tests.py7
-rw-r--r--qpid/cpp/xml/cluster.xml10
11 files changed, 576 insertions, 51 deletions
diff --git a/qpid/cpp/src/cluster.mk b/qpid/cpp/src/cluster.mk
index 2a648e968c..a791b2d41a 100644
--- a/qpid/cpp/src/cluster.mk
+++ b/qpid/cpp/src/cluster.mk
@@ -93,7 +93,9 @@ cluster_la_SOURCES = \
qpid/cluster/SecureConnectionFactory.h \
qpid/cluster/SecureConnectionFactory.cpp \
qpid/cluster/StoreStatus.h \
- qpid/cluster/StoreStatus.cpp
+ qpid/cluster/StoreStatus.cpp \
+ qpid/cluster/UpdateDataExchange.h \
+ qpid/cluster/UpdateDataExchange.cpp
cluster_la_LIBADD= -lcpg $(libcman) libqpidbroker.la libqpidclient.la
cluster_la_CXXFLAGS = $(AM_CXXFLAGS) -fno-strict-aliasing
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index b7545ad706..37932be735 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -127,6 +127,7 @@
#include "qpid/cluster/UpdateClient.h"
#include "qpid/cluster/RetractClient.h"
#include "qpid/cluster/FailoverExchange.h"
+#include "qpid/cluster/UpdateDataExchange.h"
#include "qpid/cluster/UpdateExchange.h"
#include "qpid/cluster/ClusterTimer.h"
@@ -197,7 +198,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster;
* Currently use SVN revision to avoid clashes with versions from
* different branches.
*/
-const uint32_t Cluster::CLUSTER_VERSION = 964709;
+const uint32_t Cluster::CLUSTER_VERSION = 1039478;
struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
qpid::cluster::Cluster& cluster;
@@ -289,6 +290,10 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
// without modifying delivery-properties.exchange.
broker.getExchanges().registerExchange(
boost::shared_ptr<broker::Exchange>(new UpdateExchange(this)));
+ // Update-data exchange is used for passing data that may be too large
+ // for single control frame.
+ broker.getExchanges().registerExchange(
+ boost::shared_ptr<broker::Exchange>(new UpdateDataExchange(this, broker.getManagementAgent())));
// Load my store status before we go into initialization
if (! broker::NullMessageStore::isNullStore(&broker.getStore())) {
diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp
index e76cebf68d..7406d64bda 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.cpp
+++ b/qpid/cpp/src/qpid/cluster/Connection.cpp
@@ -18,6 +18,7 @@
* under the License.
*
*/
+#include "qpid/amqp_0_10/Codecs.h"
#include "Connection.h"
#include "UpdateClient.h"
#include "Cluster.h"
@@ -42,6 +43,7 @@
#include "qpid/framing/ConnectionCloseOkBody.h"
#include "qpid/log/Statement.h"
#include "qpid/sys/ClusterSafe.h"
+#include "qpid/types/Variant.h"
#include "qpid/management/ManagementAgent.h"
#include <boost/current_function.hpp>
@@ -51,7 +53,8 @@ namespace cluster {
using namespace framing;
using namespace framing::cluster;
-
+using amqp_0_10::ListCodec;
+using types::Variant;
qpid::sys::AtomicValue<uint64_t> Connection::catchUpId(0x5000000000000000LL);
@@ -626,15 +629,6 @@ void Connection::addQueueListener(const std::string& q, uint32_t listener) {
findQueue(q)->getListeners().addListener(updateIn.consumerNumbering[listener]);
}
-void Connection::managementSchema(const std::string& data) {
- management::ManagementAgent* agent = cluster.getBroker().getManagementAgent();
- if (!agent)
- throw Exception(QPID_MSG("Management schema update but management not enabled."));
- framing::Buffer buf(const_cast<char*>(data.data()), data.size());
- agent->importSchemas(buf);
- QPID_LOG(debug, cluster << " updated management schemas");
-}
-
//
// This is the handler for incoming managementsetup messages.
//
@@ -648,15 +642,5 @@ void Connection::managementSetupState(uint64_t objectNum, uint16_t bootSequence)
agent->setNextObjectId(objectNum);
agent->setBootSequence(bootSequence);
}
-
-void Connection::managementAgents(const std::string& data) {
- management::ManagementAgent* agent = cluster.getBroker().getManagementAgent();
- if (!agent)
- throw Exception(QPID_MSG("Management agent update but management not enabled."));
- framing::Buffer buf(const_cast<char*>(data.data()), data.size());
- agent->importAgents(buf);
- QPID_LOG(debug, cluster << " updated management agents");
-}
-
}} // Namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h
index 24b8c8532f..95d846eaf8 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.h
+++ b/qpid/cpp/src/qpid/cluster/Connection.h
@@ -177,8 +177,6 @@ class Connection :
OutputInterceptor& getOutput() { return output; }
void addQueueListener(const std::string& queue, uint32_t listener);
- void managementSchema(const std::string& data);
- void managementAgents(const std::string& data);
void managementSetupState(uint64_t objectNum, uint16_t bootSequence);
void setSecureConnection ( broker::SecureConnection * sc );
diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
index bc1b812a94..7d73f3c1db 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -18,12 +18,14 @@
* under the License.
*
*/
+#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/cluster/UpdateClient.h"
#include "qpid/cluster/Cluster.h"
#include "qpid/cluster/ClusterMap.h"
#include "qpid/cluster/Connection.h"
#include "qpid/cluster/Decoder.h"
#include "qpid/cluster/ExpiryPolicy.h"
+#include "qpid/cluster/UpdateDataExchange.h"
#include "qpid/client/SessionBase_0_10Access.h"
#include "qpid/client/ConnectionAccess.h"
#include "qpid/client/SessionImpl.h"
@@ -52,6 +54,7 @@
#include "qpid/framing/ProtocolVersion.h"
#include "qpid/framing/TypeCode.h"
#include "qpid/log/Statement.h"
+#include "qpid/types/Variant.h"
#include "qpid/Url.h"
#include "qmf/org/apache/qpid/broker/ManagementSetupState.h"
#include <boost/bind.hpp>
@@ -62,12 +65,14 @@
namespace qpid {
namespace cluster {
+using amqp_0_10::ListCodec;
using broker::Broker;
using broker::Exchange;
using broker::Queue;
using broker::QueueBinding;
using broker::Message;
using broker::SemanticState;
+using types::Variant;
using namespace framing;
namespace arg=client::arg;
@@ -153,7 +158,6 @@ void UpdateClient::update() {
std::for_each(connections.begin(), connections.end(),
boost::bind(&UpdateClient::updateConnection, this, _1));
session.queueDelete(arg::queue=UPDATE);
- session.close();
// Update queue listeners: must come after sessions so consumerNumbering is populated
b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueListeners, this, _1));
@@ -162,14 +166,16 @@ void UpdateClient::update() {
updateManagementAgent();
+ session.close();
+
ClusterConnectionMembershipBody membership;
map.toMethodBody(membership);
AMQFrame frame(membership);
client::ConnectionAccess::getImpl(connection)->expand(frame.encodedSize(), false);
client::ConnectionAccess::getImpl(connection)->handle(frame);
- // FIXME aconway 2010-06-16: Connection will be closed from the other end.
- // connection.close();
+ // NOTE: connection will be closed from the other end, don't close
+ // it here as that causes a race.
// FIXME aconway 2010-03-15: This sleep avoids the race condition
// described in // https://bugzilla.redhat.com/show_bug.cgi?id=568831.
@@ -221,12 +227,34 @@ void UpdateClient::updateManagementAgent()
{
management::ManagementAgent* agent = updaterBroker.getManagementAgent();
if (!agent) return;
- // Send management schemas and agents.
string data;
+
+ QPID_LOG(debug, updaterId << " updating management schemas. ")
agent->exportSchemas(data);
- ClusterConnectionProxy(session).managementSchema(data);
+ session.messageTransfer(
+ arg::content=client::Message(data, UpdateDataExchange::MANAGEMENT_SCHEMAS_KEY),
+ arg::destination=UpdateDataExchange::EXCHANGE_NAME);
+
+ QPID_LOG(debug, updaterId << " updating management agents. ")
agent->exportAgents(data);
- ClusterConnectionProxy(session).managementAgents(data);
+ session.messageTransfer(
+ arg::content=client::Message(data, UpdateDataExchange::MANAGEMENT_AGENTS_KEY),
+ arg::destination=UpdateDataExchange::EXCHANGE_NAME);
+
+ QPID_LOG(debug, updaterId << " updating management deleted objects. ")
+ typedef management::ManagementAgent::DeletedObjectList DeletedObjectList;
+ DeletedObjectList deleted;
+ agent->exportDeletedObjects(deleted);
+ Variant::List list;
+ for (DeletedObjectList::iterator i = deleted.begin(); i != deleted.end(); ++i) {
+ string encoded;
+ (*i)->encode(encoded);
+ list.push_back(encoded);
+ }
+ ListCodec::encode(list, data);
+ session.messageTransfer(
+ arg::content=client::Message(data, UpdateDataExchange::MANAGEMENT_DELETED_OBJECTS_KEY),
+ arg::destination=UpdateDataExchange::EXCHANGE_NAME);
}
void UpdateClient::updateExchange(const boost::shared_ptr<Exchange>& ex) {
diff --git a/qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp b/qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp
new file mode 100644
index 0000000000..90a53f5531
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp
@@ -0,0 +1,96 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "UpdateDataExchange.h"
+#include "qpid/amqp_0_10/Codecs.h"
+#include "qpid/broker/Deliverable.h"
+#include "qpid/broker/Message.h"
+#include "qpid/log/Statement.h"
+#include "qpid/management/ManagementAgent.h"
+#include "qpid/types/Variant.h"
+
+namespace qpid {
+namespace cluster {
+
+const std::string UpdateDataExchange::EXCHANGE_NAME("qpid.cluster-update-data");
+const std::string UpdateDataExchange::EXCHANGE_TYPE("qpid.cluster-update-data");
+const std::string UpdateDataExchange::MANAGEMENT_AGENTS_KEY("management-agents");
+const std::string UpdateDataExchange::MANAGEMENT_SCHEMAS_KEY("management-schemas");
+const std::string UpdateDataExchange::MANAGEMENT_DELETED_OBJECTS_KEY("management-deleted-objects");
+
+UpdateDataExchange::UpdateDataExchange(management::Manageable* parent,
+ management::ManagementAgent* agent_) :
+ Exchange(EXCHANGE_NAME, parent),
+ agent(agent_)
+{}
+
+void UpdateDataExchange::route(broker::Deliverable& msg, const std::string& routingKey,
+ const qpid::framing::FieldTable* )
+{
+ std::string data = msg.getMessage().getFrames().getContent();
+ if (routingKey == MANAGEMENT_AGENTS_KEY)
+ managementAgents(data);
+ else if (routingKey == MANAGEMENT_SCHEMAS_KEY)
+ managementSchemas(data);
+ else if (routingKey == MANAGEMENT_DELETED_OBJECTS_KEY)
+ managementDeletedObjects(data);
+ else
+ throw Exception(
+ QPID_MSG("Cluster update-data exchange received unknown routing-key: "
+ << routingKey));
+}
+
+void UpdateDataExchange::managementAgents(const std::string& data) {
+ if (!agent)
+ throw Exception(
+ QPID_MSG("Received management agent update but management is disabled."));
+ framing::Buffer buf(const_cast<char*>(data.data()), data.size());
+ agent->importAgents(buf);
+ QPID_LOG(debug, " Updated management agents.");
+}
+
+void UpdateDataExchange::managementSchemas(const std::string& data) {
+ if (!agent)
+ throw Exception(
+ QPID_MSG("Received management schema update but management is disabled."));
+ framing::Buffer buf(const_cast<char*>(data.data()), data.size());
+ agent->importSchemas(buf);
+ QPID_LOG(debug, " Updated management schemas");
+}
+
+void UpdateDataExchange::managementDeletedObjects(const std::string& data) {
+ using amqp_0_10::ListCodec;
+ using types::Variant;
+ if (!agent)
+ throw Exception(
+ QPID_MSG("Management agent update but management not enabled."));
+ Variant::List encoded;
+ ListCodec::decode(data, encoded);
+ management::ManagementAgent::DeletedObjectList objects;
+ for (Variant::List::iterator i = encoded.begin(); i != encoded.end(); ++i) {
+ objects.push_back(management::ManagementAgent::DeletedObject::shared_ptr(
+ new management::ManagementAgent::DeletedObject(*i)));
+ }
+ agent->importDeletedObjects(objects);
+ QPID_LOG(debug, " Updated management deleted objects.");
+}
+
+
+}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/UpdateDataExchange.h b/qpid/cpp/src/qpid/cluster/UpdateDataExchange.h
new file mode 100644
index 0000000000..1c4022a4aa
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/UpdateDataExchange.h
@@ -0,0 +1,81 @@
+#ifndef QPID_CLUSTER_UPDATEDATAEXCHANGE_H
+#define QPID_CLUSTER_UPDATEDATAEXCHANGE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/broker/Exchange.h"
+
+namespace qpid {
+
+namespace management {
+class ManagementAgent;
+}
+
+namespace cluster {
+
+/**
+ * An exchange used to send data that is to large for a control
+ * during update. The routing key indicates the type of data.
+ */
+class UpdateDataExchange : public broker::Exchange
+{
+ public:
+ static const std::string EXCHANGE_NAME;
+ static const std::string EXCHANGE_TYPE;
+ static const std::string MANAGEMENT_AGENTS_KEY;
+ static const std::string MANAGEMENT_SCHEMAS_KEY;
+ static const std::string MANAGEMENT_DELETED_OBJECTS_KEY;
+
+ UpdateDataExchange(management::Manageable* parent, management::ManagementAgent*);
+
+ void route(broker::Deliverable& msg, const std::string& routingKey,
+ const framing::FieldTable* args);
+
+ // Not implemented
+ std::string getType() const { return EXCHANGE_TYPE; }
+
+ bool bind(boost::shared_ptr<broker::Queue>,
+ const std::string&,
+ const qpid::framing::FieldTable*)
+ { return false; }
+
+ bool unbind(boost::shared_ptr<broker::Queue>,
+ const std::string&,
+ const qpid::framing::FieldTable*)
+ { return false; }
+
+ bool isBound(boost::shared_ptr<broker::Queue>,
+ const std::string*,
+ const qpid::framing::FieldTable*)
+ { return false; }
+
+ private:
+ management::ManagementAgent* agent;
+
+ void managementAgents(const std::string&);
+ void managementSchemas(const std::string&);
+ void managementDeletedObjects(const std::string&);
+};
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_UPDATEDATAEXCHANGE_H*/
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
index 11e65efd64..6295f56226 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
@@ -655,6 +655,11 @@ void ManagementAgent::periodicProcessing (void)
iter != managementObjects.end();
iter++) {
ManagementObject* object = iter->second;
+
+ if (object->isDeleted()) {
+ deleteList.push_back(pair<ObjectId, ManagementObject*>(iter->first, object));
+ }
+
object->setFlags(0);
if (clientWasAdded) {
object->setForcePublish(true);
@@ -663,6 +668,52 @@ void ManagementAgent::periodicProcessing (void)
clientWasAdded = false;
+ // Remove Deleted objects, and save for later publishing...
+ //
+ for (list<pair<ObjectId, ManagementObject*> >::reverse_iterator iter = deleteList.rbegin();
+ iter != deleteList.rend();
+ iter++) {
+
+ ManagementObject* delObj = iter->second;
+ DeletedObject::shared_ptr dptr(new DeletedObject());
+ std::string classkey(delObj->getPackageName() + std::string(":") + delObj->getClassName());
+ bool send_stats = (delObj->hasInst() && (delObj->getInstChanged() || delObj->getForcePublish()));
+
+ dptr->packageName = delObj->getPackageName();
+ dptr->className = delObj->getClassName();
+ delObj->getObjectId().encode(dptr->objectId);
+
+ if (qmf1Support) {
+ delObj->writeProperties(dptr->encodedV1Config);
+ if (send_stats) {
+ delObj->writeStatistics(dptr->encodedV1Inst);
+ }
+ }
+
+ if (qmf2Support) {
+ Variant::Map map_;
+ Variant::Map values;
+ Variant::Map oid;
+
+ delObj->getObjectId().mapEncode(oid);
+ map_["_object_id"] = oid;
+ map_["_schema_id"] = mapEncodeSchemaId(delObj->getPackageName(),
+ delObj->getClassName(),
+ "_data",
+ delObj->getMd5Sum());
+ delObj->writeTimestamps(map_);
+ delObj->mapEncodeValues(values, true, send_stats);
+ map_["_values"] = values;
+
+ dptr->encodedV2 = map_;
+ }
+
+ pendingDeletedObjs[classkey].push_back(dptr);
+
+ delete iter->second;
+ managementObjects.erase(iter->first);
+ }
+
//
// Process the entire object map. Remember: we drop the userLock each time we call
// sendBuffer(). This allows the managementObjects map to be altered during the
@@ -711,7 +762,13 @@ void ManagementAgent::periodicProcessing (void)
if (object->getConfigChanged() || object->getInstChanged())
object->setUpdateTime();
- send_props = (object->getConfigChanged() || object->getForcePublish() || object->isDeleted());
+ // skip any objects marked deleted since our first pass. Deal with them
+ // on the next periodic cycle...
+ if (object->isDeleted()) {
+ continue;
+ }
+
+ send_props = (object->getConfigChanged() || object->getForcePublish());
send_stats = (object->hasInst() && (object->getInstChanged() || object->getForcePublish()));
if (send_props && qmf1Support) {
@@ -749,8 +806,6 @@ void ManagementAgent::periodicProcessing (void)
if (send_props) pcount++;
if (send_stats) scount++;
- if (object->isDeleted())
- deleteList.push_back(pair<ObjectId, ManagementObject*>(iter->first, object));
object->setForcePublish(false);
if ((qmf1Support && (msgBuffer.available() < HEADROOM)) ||
@@ -796,12 +851,114 @@ void ManagementAgent::periodicProcessing (void)
}
} // end processing updates for all objects
- // Delete flagged objects
- for (list<pair<ObjectId, ManagementObject*> >::reverse_iterator iter = deleteList.rbegin();
- iter != deleteList.rend();
- iter++) {
- delete iter->second;
- managementObjects.erase(iter->first);
+
+ // now send the pending deletes. Make a temporary copy of the pending deletes so dropping the
+ // lock when the buffer is sent is safe.
+ //
+ if (!pendingDeletedObjs.empty()) {
+ PendingDeletedObjsMap tmp(pendingDeletedObjs);
+ pendingDeletedObjs.clear();
+
+ for (PendingDeletedObjsMap::iterator mIter = tmp.begin(); mIter != tmp.end(); mIter++) {
+ std::string packageName;
+ std::string className;
+ Buffer msgBuffer(msgChars, BUFSIZE);
+ uint32_t v1Objs = 0;
+ uint32_t v2Objs = 0;
+ Variant::List list_;
+
+ size_t pos = mIter->first.find(":");
+ packageName = mIter->first.substr(0, pos);
+ className = mIter->first.substr(pos+1);
+
+ for (DeletedObjectList::iterator lIter = mIter->second.begin();
+ lIter != mIter->second.end(); lIter++) {
+
+ if (!(*lIter)->encodedV1Config.empty()) {
+ encodeHeader(msgBuffer, 'c');
+ msgBuffer.putRawData((*lIter)->encodedV1Config);
+ v1Objs++;
+ }
+ if (!(*lIter)->encodedV1Inst.empty()) {
+ encodeHeader(msgBuffer, 'i');
+ msgBuffer.putRawData((*lIter)->encodedV1Inst);
+ v1Objs++;
+ }
+ if (v1Objs && msgBuffer.available() < HEADROOM) {
+ v1Objs = 0;
+ contentSize = BUFSIZE - msgBuffer.available();
+ stringstream key;
+ key << "console.obj.1.0." << packageName << "." << className;
+ msgBuffer.reset();
+ sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); // UNLOCKS USERLOCK
+ QPID_LOG(trace, "SEND V1 Multicast ContentInd V1 (delete) to=" << key.str());
+ }
+
+ if (!(*lIter)->encodedV2.empty()) {
+ list_.push_back((*lIter)->encodedV2);
+ if (++v2Objs >= maxV2ReplyObjs) {
+ v2Objs = 0;
+
+ string content;
+ ListCodec::encode(list_, content);
+ list_.clear();
+ if (content.length()) {
+ stringstream key;
+ Variant::Map headers;
+ key << "agent.ind.data." << keyifyNameStr(packageName)
+ << "." << keyifyNameStr(className)
+ << "." << vendorNameKey
+ << "." << productNameKey;
+ if (!instanceNameKey.empty())
+ key << "." << instanceNameKey;
+
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_data_indication";
+ headers["qmf.content"] = "_data";
+ headers["qmf.agent"] = name_address;
+
+ sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); // UNLOCKS USERLOCK
+ QPID_LOG(trace, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length());
+ }
+ }
+ }
+ } // end current list
+
+ // send any remaining objects...
+
+ if (v1Objs) {
+ contentSize = BUFSIZE - msgBuffer.available();
+ stringstream key;
+ key << "console.obj.1.0." << packageName << "." << className;
+ msgBuffer.reset();
+ sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); // UNLOCKS USERLOCK
+ QPID_LOG(trace, "SEND V1 Multicast ContentInd V1 (delete) to=" << key.str());
+ }
+
+ if (!list_.empty()) {
+ string content;
+ ListCodec::encode(list_, content);
+ list_.clear();
+ if (content.length()) {
+ stringstream key;
+ Variant::Map headers;
+ key << "agent.ind.data." << keyifyNameStr(packageName)
+ << "." << keyifyNameStr(className)
+ << "." << vendorNameKey
+ << "." << productNameKey;
+ if (!instanceNameKey.empty())
+ key << "." << instanceNameKey;
+
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_data_indication";
+ headers["qmf.content"] = "_data";
+ headers["qmf.agent"] = name_address;
+
+ sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); // UNLOCKS USERLOCK
+ QPID_LOG(trace, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length());
+ }
+ }
+ } // end map
}
if (!deleteList.empty()) {
@@ -2700,3 +2857,143 @@ Variant ManagementAgent::toVariant(const boost::shared_ptr<FieldValue>& in)
return out;
}
+
+// Build up a list of the current set of deleted objects that are pending their
+// next (last) publish-ment.
+void ManagementAgent::exportDeletedObjects(DeletedObjectList& outList)
+{
+ sys::Mutex::ScopedLock lock (userLock);
+ list<pair<ObjectId, ManagementObject*> > deleteList;
+
+ moveNewObjectsLH();
+
+ for (ManagementObjectMap::iterator iter = managementObjects.begin();
+ iter != managementObjects.end();
+ iter++) {
+ ManagementObject* object = iter->second;
+
+ if (object->isDeleted()) {
+ deleteList.push_back(pair<ObjectId, ManagementObject*>(iter->first, object));
+ }
+ }
+
+ // Remove Deleted objects, and save for later publishing...
+ //
+ for (list<pair<ObjectId, ManagementObject*> >::reverse_iterator iter = deleteList.rbegin();
+ iter != deleteList.rend();
+ iter++) {
+
+ ManagementObject* delObj = iter->second;
+ DeletedObject::shared_ptr dptr(new DeletedObject());
+ std::string classkey(delObj->getPackageName() + std::string(":") + delObj->getClassName());
+ bool send_stats = (delObj->hasInst() && (delObj->getInstChanged() || delObj->getForcePublish()));
+
+ dptr->packageName = delObj->getPackageName();
+ dptr->className = delObj->getClassName();
+ delObj->getObjectId().encode(dptr->objectId);
+
+ if (qmf1Support) {
+ delObj->writeProperties(dptr->encodedV1Config);
+ if (send_stats) {
+ delObj->writeStatistics(dptr->encodedV1Inst);
+ }
+ }
+
+ if (qmf2Support) {
+ Variant::Map map_;
+ Variant::Map values;
+ Variant::Map oid;
+
+ delObj->getObjectId().mapEncode(oid);
+ map_["_object_id"] = oid;
+ map_["_schema_id"] = mapEncodeSchemaId(delObj->getPackageName(),
+ delObj->getClassName(),
+ "_data",
+ delObj->getMd5Sum());
+ delObj->writeTimestamps(map_);
+ delObj->mapEncodeValues(values, true, send_stats);
+ map_["_values"] = values;
+
+ dptr->encodedV2 = map_;
+ }
+
+ pendingDeletedObjs[classkey].push_back(dptr);
+
+ delete iter->second;
+ managementObjects.erase(iter->first);
+ }
+
+ // now copy the pending deletes into the outList
+
+ for (PendingDeletedObjsMap::iterator mIter = pendingDeletedObjs.begin();
+ mIter != pendingDeletedObjs.end(); mIter++) {
+ for (DeletedObjectList::iterator lIter = mIter->second.begin();
+ lIter != mIter->second.end(); lIter++) {
+ outList.push_back(*lIter);
+ }
+ }
+}
+
+
+// Merge this list's deleted objects to the management Agent's list of deleted
+// objects waiting for next (last) publish-ment.
+void ManagementAgent::importDeletedObjects(const DeletedObjectList& inList)
+{
+ sys::Mutex::ScopedLock lock (userLock);
+
+ for (DeletedObjectList::const_iterator lIter = inList.begin(); lIter != inList.end(); lIter++) {
+
+ std::string classkey((*lIter)->packageName + std::string(":") + (*lIter)->className);
+ DeletedObjectList& dList = pendingDeletedObjs[classkey];
+
+ // not sure if this is necessary - merge by objectid....
+ bool found = false;
+ for (DeletedObjectList::iterator dIter = dList.begin(); dIter != dList.end(); dIter++) {
+ if ((*dIter)->objectId == (*lIter)->objectId) {
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ dList.push_back(*lIter);
+ }
+ }
+}
+
+
+// construct a DeletedObject from an encoded representation. Used by
+// clustering to move deleted objects between clustered brokers. See
+// DeletedObject::encode() for the reverse.
+ManagementAgent::DeletedObject::DeletedObject(const std::string& encoded)
+{
+ qpid::types::Variant::Map map_;
+ MapCodec::decode(encoded, map_);
+
+ packageName = map_["_package_name"].getString();
+ className = map_["_class_name"].getString();
+ objectId = map_["_object_id"].getString();
+
+ encodedV1Config = map_["_v1_config"].getString();
+ encodedV1Inst = map_["_v1_inst"].getString();
+ encodedV2 = map_["_v2_data"].asMap();
+}
+
+
+// encode a DeletedObject to a string buffer. Used by
+// clustering to move deleted objects between clustered brokers. See
+// DeletedObject(const std::string&) for the reverse.
+void ManagementAgent::DeletedObject::encode(std::string& toBuffer)
+{
+ qpid::types::Variant::Map map_;
+
+
+ map_["_package_name"] = packageName;
+ map_["_class_name"] = className;
+ map_["_object_id"] = objectId;
+
+ map_["_v1_config"] = encodedV1Config;
+ map_["_v1_inst"] = encodedV1Inst;
+ map_["_v2_data"] = encodedV2;
+
+ MapCodec::encode(map_, toBuffer);
+}
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h
index f4d3c8c299..9829094a0f 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.h
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.h
@@ -148,6 +148,40 @@ public:
static boost::shared_ptr<framing::FieldValue> toFieldValue(const types::Variant& in);
static types::Variant toVariant(const boost::shared_ptr<framing::FieldValue>& val);
+ // For Clustering: management objects that have been marked as
+ // "deleted", but are waiting for their last published object
+ // update are not visible to the cluster replication code. These
+ // interfaces allow clustering to gather up all the management
+ // objects that are deleted in order to allow all clustered
+ // brokers to publish the same set of deleted objects.
+
+ class DeletedObject {
+ public:
+ typedef boost::shared_ptr<DeletedObject> shared_ptr;
+ DeletedObject() {};
+ DeletedObject( const std::string &encoded );
+ ~DeletedObject() {};
+ void encode( std::string& toBuffer );
+
+ private:
+ friend class ManagementAgent;
+
+ std::string packageName;
+ std::string className;
+ std::string objectId;
+
+ std::string encodedV1Config; // qmfv1 properties
+ std::string encodedV1Inst; // qmfv1 statistics
+ qpid::types::Variant::Map encodedV2;
+ };
+
+ typedef std::vector<DeletedObject::shared_ptr> DeletedObjectList;
+
+ /** returns a snapshot of all currently deleted management objects. */
+ void exportDeletedObjects( DeletedObjectList& outList );
+
+ /** Import a list of deleted objects to send on next publish interval. */
+ void importDeletedObjects( const DeletedObjectList& inList );
private:
struct Periodic : public qpid::sys::TimerTask
@@ -293,6 +327,13 @@ private:
// message.
uint32_t maxV2ReplyObjs;
+ // list of objects that have been deleted, but have yet to be published
+ // one final time.
+ // Indexed by a string composed of the object's package and class name.
+ // Protected by userLock.
+ typedef std::map<std::string, DeletedObjectList> PendingDeletedObjsMap;
+ PendingDeletedObjsMap pendingDeletedObjs;
+
# define MA_BUFFER_SIZE 65536
char inputBuffer[MA_BUFFER_SIZE];
char outputBuffer[MA_BUFFER_SIZE];
diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py
index 9f70121b74..99474da2f2 100755
--- a/qpid/cpp/src/tests/cluster_tests.py
+++ b/qpid/cpp/src/tests/cluster_tests.py
@@ -298,7 +298,7 @@ class LongTests(BrokerTest):
receiver.stop()
for i in range(i, len(cluster)): cluster[i].kill()
- def test_management(self):
+ def test_management(self, args=[]):
"""Stress test: Run management clients and other clients concurrently."""
class ClientLoop(StoppableThread):
@@ -353,7 +353,7 @@ class LongTests(BrokerTest):
StoppableThread.stop(self)
# def test_management
- args = ["--mgmt-pub-interval", 1] # Publish management information every second.
+ args += ["--mgmt-pub-interval", 1] # Publish management information every second.
# Use store if present.
if BrokerTest.store_lib: args +=["--load-module", BrokerTest.store_lib]
cluster = self.cluster(3, args)
@@ -402,6 +402,9 @@ class LongTests(BrokerTest):
for c in chain(mclients, *clients):
c.stop()
+ def test_management_qmf2(self):
+ self.test_management(args=["--mgmt-qmf2=yes"])
+
class StoreTests(BrokerTest):
"""
Cluster tests that can only be run if there is a store available.
diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml
index 9cbad82d61..dfe9554ad1 100644
--- a/qpid/cpp/xml/cluster.xml
+++ b/qpid/cpp/xml/cluster.xml
@@ -263,20 +263,10 @@
<field name="consumer" type="uint32"/>
</control>
- <!-- Replicate management agent schema -->
- <control name="management-schema" code="0x35">
- <field name="data" type="vbin32"/>
- </control>
-
<!-- added by jrd. propagate a management-setup-state widget -->
<control name="management-setup-state" code="0x36">
<field name="objectNum" type="uint64"/>
<field name="bootSequence" type="uint16"/>
</control>
-
- <!-- Replicate management agent's remote-agent map -->
- <control name="management-agents" code="0x37">
- <field name="data" type="vbin32"/>
- </control>
</class>
</amqp>