diff options
author | Alan Conway <aconway@apache.org> | 2010-12-01 21:31:36 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2010-12-01 21:31:36 +0000 |
commit | c7ffb7a9c913627a4104823e5384144b348d7623 (patch) | |
tree | 5465163d0c5797ba347cef532cab0a380ccbd218 /qpid/cpp/src | |
parent | 4101090c274118708f0183b436f3de68f1a32277 (diff) | |
download | qpid-python-c7ffb7a9c913627a4104823e5384144b348d7623.tar.gz |
Modified cluster_tests causes broker shut down with invalid-argument error.
Described in https://bugzilla.redhat.com/show_bug.cgi?id=655078. The
management agent's deleted-object list was not being replicated to new
members joining the cluster, so management generated fewer deleted
object notifications on the newer member, causing it to fail with an
invalid-argument error. The list is now being replicated correctly.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1041181 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r-- | qpid/cpp/src/cluster.mk | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cluster.cpp | 7 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Connection.cpp | 24 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Connection.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/UpdateClient.cpp | 40 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp | 96 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/UpdateDataExchange.h | 81 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementAgent.cpp | 315 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementAgent.h | 41 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/cluster_tests.py | 7 |
10 files changed, 576 insertions, 41 deletions
diff --git a/qpid/cpp/src/cluster.mk b/qpid/cpp/src/cluster.mk index 2a648e968c..a791b2d41a 100644 --- a/qpid/cpp/src/cluster.mk +++ b/qpid/cpp/src/cluster.mk @@ -93,7 +93,9 @@ cluster_la_SOURCES = \ qpid/cluster/SecureConnectionFactory.h \ qpid/cluster/SecureConnectionFactory.cpp \ qpid/cluster/StoreStatus.h \ - qpid/cluster/StoreStatus.cpp + qpid/cluster/StoreStatus.cpp \ + qpid/cluster/UpdateDataExchange.h \ + qpid/cluster/UpdateDataExchange.cpp cluster_la_LIBADD= -lcpg $(libcman) libqpidbroker.la libqpidclient.la cluster_la_CXXFLAGS = $(AM_CXXFLAGS) -fno-strict-aliasing diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index b7545ad706..37932be735 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -127,6 +127,7 @@ #include "qpid/cluster/UpdateClient.h" #include "qpid/cluster/RetractClient.h" #include "qpid/cluster/FailoverExchange.h" +#include "qpid/cluster/UpdateDataExchange.h" #include "qpid/cluster/UpdateExchange.h" #include "qpid/cluster/ClusterTimer.h" @@ -197,7 +198,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster; * Currently use SVN revision to avoid clashes with versions from * different branches. */ -const uint32_t Cluster::CLUSTER_VERSION = 964709; +const uint32_t Cluster::CLUSTER_VERSION = 1039478; struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { qpid::cluster::Cluster& cluster; @@ -289,6 +290,10 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : // without modifying delivery-properties.exchange. broker.getExchanges().registerExchange( boost::shared_ptr<broker::Exchange>(new UpdateExchange(this))); + // Update-data exchange is used for passing data that may be too large + // for single control frame. + broker.getExchanges().registerExchange( + boost::shared_ptr<broker::Exchange>(new UpdateDataExchange(this, broker.getManagementAgent()))); // Load my store status before we go into initialization if (! broker::NullMessageStore::isNullStore(&broker.getStore())) { diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp index e76cebf68d..7406d64bda 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.cpp +++ b/qpid/cpp/src/qpid/cluster/Connection.cpp @@ -18,6 +18,7 @@ * under the License. * */ +#include "qpid/amqp_0_10/Codecs.h" #include "Connection.h" #include "UpdateClient.h" #include "Cluster.h" @@ -42,6 +43,7 @@ #include "qpid/framing/ConnectionCloseOkBody.h" #include "qpid/log/Statement.h" #include "qpid/sys/ClusterSafe.h" +#include "qpid/types/Variant.h" #include "qpid/management/ManagementAgent.h" #include <boost/current_function.hpp> @@ -51,7 +53,8 @@ namespace cluster { using namespace framing; using namespace framing::cluster; - +using amqp_0_10::ListCodec; +using types::Variant; qpid::sys::AtomicValue<uint64_t> Connection::catchUpId(0x5000000000000000LL); @@ -626,15 +629,6 @@ void Connection::addQueueListener(const std::string& q, uint32_t listener) { findQueue(q)->getListeners().addListener(updateIn.consumerNumbering[listener]); } -void Connection::managementSchema(const std::string& data) { - management::ManagementAgent* agent = cluster.getBroker().getManagementAgent(); - if (!agent) - throw Exception(QPID_MSG("Management schema update but management not enabled.")); - framing::Buffer buf(const_cast<char*>(data.data()), data.size()); - agent->importSchemas(buf); - QPID_LOG(debug, cluster << " updated management schemas"); -} - // // This is the handler for incoming managementsetup messages. // @@ -648,15 +642,5 @@ void Connection::managementSetupState(uint64_t objectNum, uint16_t bootSequence) agent->setNextObjectId(objectNum); agent->setBootSequence(bootSequence); } - -void Connection::managementAgents(const std::string& data) { - management::ManagementAgent* agent = cluster.getBroker().getManagementAgent(); - if (!agent) - throw Exception(QPID_MSG("Management agent update but management not enabled.")); - framing::Buffer buf(const_cast<char*>(data.data()), data.size()); - agent->importAgents(buf); - QPID_LOG(debug, cluster << " updated management agents"); -} - }} // Namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h index 24b8c8532f..95d846eaf8 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.h +++ b/qpid/cpp/src/qpid/cluster/Connection.h @@ -177,8 +177,6 @@ class Connection : OutputInterceptor& getOutput() { return output; } void addQueueListener(const std::string& queue, uint32_t listener); - void managementSchema(const std::string& data); - void managementAgents(const std::string& data); void managementSetupState(uint64_t objectNum, uint16_t bootSequence); void setSecureConnection ( broker::SecureConnection * sc ); diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp index bc1b812a94..7d73f3c1db 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp @@ -18,12 +18,14 @@ * under the License. * */ +#include "qpid/amqp_0_10/Codecs.h" #include "qpid/cluster/UpdateClient.h" #include "qpid/cluster/Cluster.h" #include "qpid/cluster/ClusterMap.h" #include "qpid/cluster/Connection.h" #include "qpid/cluster/Decoder.h" #include "qpid/cluster/ExpiryPolicy.h" +#include "qpid/cluster/UpdateDataExchange.h" #include "qpid/client/SessionBase_0_10Access.h" #include "qpid/client/ConnectionAccess.h" #include "qpid/client/SessionImpl.h" @@ -52,6 +54,7 @@ #include "qpid/framing/ProtocolVersion.h" #include "qpid/framing/TypeCode.h" #include "qpid/log/Statement.h" +#include "qpid/types/Variant.h" #include "qpid/Url.h" #include "qmf/org/apache/qpid/broker/ManagementSetupState.h" #include <boost/bind.hpp> @@ -62,12 +65,14 @@ namespace qpid { namespace cluster { +using amqp_0_10::ListCodec; using broker::Broker; using broker::Exchange; using broker::Queue; using broker::QueueBinding; using broker::Message; using broker::SemanticState; +using types::Variant; using namespace framing; namespace arg=client::arg; @@ -153,7 +158,6 @@ void UpdateClient::update() { std::for_each(connections.begin(), connections.end(), boost::bind(&UpdateClient::updateConnection, this, _1)); session.queueDelete(arg::queue=UPDATE); - session.close(); // Update queue listeners: must come after sessions so consumerNumbering is populated b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueListeners, this, _1)); @@ -162,14 +166,16 @@ void UpdateClient::update() { updateManagementAgent(); + session.close(); + ClusterConnectionMembershipBody membership; map.toMethodBody(membership); AMQFrame frame(membership); client::ConnectionAccess::getImpl(connection)->expand(frame.encodedSize(), false); client::ConnectionAccess::getImpl(connection)->handle(frame); - // FIXME aconway 2010-06-16: Connection will be closed from the other end. - // connection.close(); + // NOTE: connection will be closed from the other end, don't close + // it here as that causes a race. // FIXME aconway 2010-03-15: This sleep avoids the race condition // described in // https://bugzilla.redhat.com/show_bug.cgi?id=568831. @@ -221,12 +227,34 @@ void UpdateClient::updateManagementAgent() { management::ManagementAgent* agent = updaterBroker.getManagementAgent(); if (!agent) return; - // Send management schemas and agents. string data; + + QPID_LOG(debug, updaterId << " updating management schemas. ") agent->exportSchemas(data); - ClusterConnectionProxy(session).managementSchema(data); + session.messageTransfer( + arg::content=client::Message(data, UpdateDataExchange::MANAGEMENT_SCHEMAS_KEY), + arg::destination=UpdateDataExchange::EXCHANGE_NAME); + + QPID_LOG(debug, updaterId << " updating management agents. ") agent->exportAgents(data); - ClusterConnectionProxy(session).managementAgents(data); + session.messageTransfer( + arg::content=client::Message(data, UpdateDataExchange::MANAGEMENT_AGENTS_KEY), + arg::destination=UpdateDataExchange::EXCHANGE_NAME); + + QPID_LOG(debug, updaterId << " updating management deleted objects. ") + typedef management::ManagementAgent::DeletedObjectList DeletedObjectList; + DeletedObjectList deleted; + agent->exportDeletedObjects(deleted); + Variant::List list; + for (DeletedObjectList::iterator i = deleted.begin(); i != deleted.end(); ++i) { + string encoded; + (*i)->encode(encoded); + list.push_back(encoded); + } + ListCodec::encode(list, data); + session.messageTransfer( + arg::content=client::Message(data, UpdateDataExchange::MANAGEMENT_DELETED_OBJECTS_KEY), + arg::destination=UpdateDataExchange::EXCHANGE_NAME); } void UpdateClient::updateExchange(const boost::shared_ptr<Exchange>& ex) { diff --git a/qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp b/qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp new file mode 100644 index 0000000000..90a53f5531 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp @@ -0,0 +1,96 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "UpdateDataExchange.h" +#include "qpid/amqp_0_10/Codecs.h" +#include "qpid/broker/Deliverable.h" +#include "qpid/broker/Message.h" +#include "qpid/log/Statement.h" +#include "qpid/management/ManagementAgent.h" +#include "qpid/types/Variant.h" + +namespace qpid { +namespace cluster { + +const std::string UpdateDataExchange::EXCHANGE_NAME("qpid.cluster-update-data"); +const std::string UpdateDataExchange::EXCHANGE_TYPE("qpid.cluster-update-data"); +const std::string UpdateDataExchange::MANAGEMENT_AGENTS_KEY("management-agents"); +const std::string UpdateDataExchange::MANAGEMENT_SCHEMAS_KEY("management-schemas"); +const std::string UpdateDataExchange::MANAGEMENT_DELETED_OBJECTS_KEY("management-deleted-objects"); + +UpdateDataExchange::UpdateDataExchange(management::Manageable* parent, + management::ManagementAgent* agent_) : + Exchange(EXCHANGE_NAME, parent), + agent(agent_) +{} + +void UpdateDataExchange::route(broker::Deliverable& msg, const std::string& routingKey, + const qpid::framing::FieldTable* ) +{ + std::string data = msg.getMessage().getFrames().getContent(); + if (routingKey == MANAGEMENT_AGENTS_KEY) + managementAgents(data); + else if (routingKey == MANAGEMENT_SCHEMAS_KEY) + managementSchemas(data); + else if (routingKey == MANAGEMENT_DELETED_OBJECTS_KEY) + managementDeletedObjects(data); + else + throw Exception( + QPID_MSG("Cluster update-data exchange received unknown routing-key: " + << routingKey)); +} + +void UpdateDataExchange::managementAgents(const std::string& data) { + if (!agent) + throw Exception( + QPID_MSG("Received management agent update but management is disabled.")); + framing::Buffer buf(const_cast<char*>(data.data()), data.size()); + agent->importAgents(buf); + QPID_LOG(debug, " Updated management agents."); +} + +void UpdateDataExchange::managementSchemas(const std::string& data) { + if (!agent) + throw Exception( + QPID_MSG("Received management schema update but management is disabled.")); + framing::Buffer buf(const_cast<char*>(data.data()), data.size()); + agent->importSchemas(buf); + QPID_LOG(debug, " Updated management schemas"); +} + +void UpdateDataExchange::managementDeletedObjects(const std::string& data) { + using amqp_0_10::ListCodec; + using types::Variant; + if (!agent) + throw Exception( + QPID_MSG("Management agent update but management not enabled.")); + Variant::List encoded; + ListCodec::decode(data, encoded); + management::ManagementAgent::DeletedObjectList objects; + for (Variant::List::iterator i = encoded.begin(); i != encoded.end(); ++i) { + objects.push_back(management::ManagementAgent::DeletedObject::shared_ptr( + new management::ManagementAgent::DeletedObject(*i))); + } + agent->importDeletedObjects(objects); + QPID_LOG(debug, " Updated management deleted objects."); +} + + +}} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/UpdateDataExchange.h b/qpid/cpp/src/qpid/cluster/UpdateDataExchange.h new file mode 100644 index 0000000000..1c4022a4aa --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/UpdateDataExchange.h @@ -0,0 +1,81 @@ +#ifndef QPID_CLUSTER_UPDATEDATAEXCHANGE_H +#define QPID_CLUSTER_UPDATEDATAEXCHANGE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/broker/Exchange.h" + +namespace qpid { + +namespace management { +class ManagementAgent; +} + +namespace cluster { + +/** + * An exchange used to send data that is to large for a control + * during update. The routing key indicates the type of data. + */ +class UpdateDataExchange : public broker::Exchange +{ + public: + static const std::string EXCHANGE_NAME; + static const std::string EXCHANGE_TYPE; + static const std::string MANAGEMENT_AGENTS_KEY; + static const std::string MANAGEMENT_SCHEMAS_KEY; + static const std::string MANAGEMENT_DELETED_OBJECTS_KEY; + + UpdateDataExchange(management::Manageable* parent, management::ManagementAgent*); + + void route(broker::Deliverable& msg, const std::string& routingKey, + const framing::FieldTable* args); + + // Not implemented + std::string getType() const { return EXCHANGE_TYPE; } + + bool bind(boost::shared_ptr<broker::Queue>, + const std::string&, + const qpid::framing::FieldTable*) + { return false; } + + bool unbind(boost::shared_ptr<broker::Queue>, + const std::string&, + const qpid::framing::FieldTable*) + { return false; } + + bool isBound(boost::shared_ptr<broker::Queue>, + const std::string*, + const qpid::framing::FieldTable*) + { return false; } + + private: + management::ManagementAgent* agent; + + void managementAgents(const std::string&); + void managementSchemas(const std::string&); + void managementDeletedObjects(const std::string&); +}; + +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_UPDATEDATAEXCHANGE_H*/ diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp index 11e65efd64..6295f56226 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp +++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp @@ -655,6 +655,11 @@ void ManagementAgent::periodicProcessing (void) iter != managementObjects.end(); iter++) { ManagementObject* object = iter->second; + + if (object->isDeleted()) { + deleteList.push_back(pair<ObjectId, ManagementObject*>(iter->first, object)); + } + object->setFlags(0); if (clientWasAdded) { object->setForcePublish(true); @@ -663,6 +668,52 @@ void ManagementAgent::periodicProcessing (void) clientWasAdded = false; + // Remove Deleted objects, and save for later publishing... + // + for (list<pair<ObjectId, ManagementObject*> >::reverse_iterator iter = deleteList.rbegin(); + iter != deleteList.rend(); + iter++) { + + ManagementObject* delObj = iter->second; + DeletedObject::shared_ptr dptr(new DeletedObject()); + std::string classkey(delObj->getPackageName() + std::string(":") + delObj->getClassName()); + bool send_stats = (delObj->hasInst() && (delObj->getInstChanged() || delObj->getForcePublish())); + + dptr->packageName = delObj->getPackageName(); + dptr->className = delObj->getClassName(); + delObj->getObjectId().encode(dptr->objectId); + + if (qmf1Support) { + delObj->writeProperties(dptr->encodedV1Config); + if (send_stats) { + delObj->writeStatistics(dptr->encodedV1Inst); + } + } + + if (qmf2Support) { + Variant::Map map_; + Variant::Map values; + Variant::Map oid; + + delObj->getObjectId().mapEncode(oid); + map_["_object_id"] = oid; + map_["_schema_id"] = mapEncodeSchemaId(delObj->getPackageName(), + delObj->getClassName(), + "_data", + delObj->getMd5Sum()); + delObj->writeTimestamps(map_); + delObj->mapEncodeValues(values, true, send_stats); + map_["_values"] = values; + + dptr->encodedV2 = map_; + } + + pendingDeletedObjs[classkey].push_back(dptr); + + delete iter->second; + managementObjects.erase(iter->first); + } + // // Process the entire object map. Remember: we drop the userLock each time we call // sendBuffer(). This allows the managementObjects map to be altered during the @@ -711,7 +762,13 @@ void ManagementAgent::periodicProcessing (void) if (object->getConfigChanged() || object->getInstChanged()) object->setUpdateTime(); - send_props = (object->getConfigChanged() || object->getForcePublish() || object->isDeleted()); + // skip any objects marked deleted since our first pass. Deal with them + // on the next periodic cycle... + if (object->isDeleted()) { + continue; + } + + send_props = (object->getConfigChanged() || object->getForcePublish()); send_stats = (object->hasInst() && (object->getInstChanged() || object->getForcePublish())); if (send_props && qmf1Support) { @@ -749,8 +806,6 @@ void ManagementAgent::periodicProcessing (void) if (send_props) pcount++; if (send_stats) scount++; - if (object->isDeleted()) - deleteList.push_back(pair<ObjectId, ManagementObject*>(iter->first, object)); object->setForcePublish(false); if ((qmf1Support && (msgBuffer.available() < HEADROOM)) || @@ -796,12 +851,114 @@ void ManagementAgent::periodicProcessing (void) } } // end processing updates for all objects - // Delete flagged objects - for (list<pair<ObjectId, ManagementObject*> >::reverse_iterator iter = deleteList.rbegin(); - iter != deleteList.rend(); - iter++) { - delete iter->second; - managementObjects.erase(iter->first); + + // now send the pending deletes. Make a temporary copy of the pending deletes so dropping the + // lock when the buffer is sent is safe. + // + if (!pendingDeletedObjs.empty()) { + PendingDeletedObjsMap tmp(pendingDeletedObjs); + pendingDeletedObjs.clear(); + + for (PendingDeletedObjsMap::iterator mIter = tmp.begin(); mIter != tmp.end(); mIter++) { + std::string packageName; + std::string className; + Buffer msgBuffer(msgChars, BUFSIZE); + uint32_t v1Objs = 0; + uint32_t v2Objs = 0; + Variant::List list_; + + size_t pos = mIter->first.find(":"); + packageName = mIter->first.substr(0, pos); + className = mIter->first.substr(pos+1); + + for (DeletedObjectList::iterator lIter = mIter->second.begin(); + lIter != mIter->second.end(); lIter++) { + + if (!(*lIter)->encodedV1Config.empty()) { + encodeHeader(msgBuffer, 'c'); + msgBuffer.putRawData((*lIter)->encodedV1Config); + v1Objs++; + } + if (!(*lIter)->encodedV1Inst.empty()) { + encodeHeader(msgBuffer, 'i'); + msgBuffer.putRawData((*lIter)->encodedV1Inst); + v1Objs++; + } + if (v1Objs && msgBuffer.available() < HEADROOM) { + v1Objs = 0; + contentSize = BUFSIZE - msgBuffer.available(); + stringstream key; + key << "console.obj.1.0." << packageName << "." << className; + msgBuffer.reset(); + sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); // UNLOCKS USERLOCK + QPID_LOG(trace, "SEND V1 Multicast ContentInd V1 (delete) to=" << key.str()); + } + + if (!(*lIter)->encodedV2.empty()) { + list_.push_back((*lIter)->encodedV2); + if (++v2Objs >= maxV2ReplyObjs) { + v2Objs = 0; + + string content; + ListCodec::encode(list_, content); + list_.clear(); + if (content.length()) { + stringstream key; + Variant::Map headers; + key << "agent.ind.data." << keyifyNameStr(packageName) + << "." << keyifyNameStr(className) + << "." << vendorNameKey + << "." << productNameKey; + if (!instanceNameKey.empty()) + key << "." << instanceNameKey; + + headers["method"] = "indication"; + headers["qmf.opcode"] = "_data_indication"; + headers["qmf.content"] = "_data"; + headers["qmf.agent"] = name_address; + + sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); // UNLOCKS USERLOCK + QPID_LOG(trace, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length()); + } + } + } + } // end current list + + // send any remaining objects... + + if (v1Objs) { + contentSize = BUFSIZE - msgBuffer.available(); + stringstream key; + key << "console.obj.1.0." << packageName << "." << className; + msgBuffer.reset(); + sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); // UNLOCKS USERLOCK + QPID_LOG(trace, "SEND V1 Multicast ContentInd V1 (delete) to=" << key.str()); + } + + if (!list_.empty()) { + string content; + ListCodec::encode(list_, content); + list_.clear(); + if (content.length()) { + stringstream key; + Variant::Map headers; + key << "agent.ind.data." << keyifyNameStr(packageName) + << "." << keyifyNameStr(className) + << "." << vendorNameKey + << "." << productNameKey; + if (!instanceNameKey.empty()) + key << "." << instanceNameKey; + + headers["method"] = "indication"; + headers["qmf.opcode"] = "_data_indication"; + headers["qmf.content"] = "_data"; + headers["qmf.agent"] = name_address; + + sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); // UNLOCKS USERLOCK + QPID_LOG(trace, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length()); + } + } + } // end map } if (!deleteList.empty()) { @@ -2700,3 +2857,143 @@ Variant ManagementAgent::toVariant(const boost::shared_ptr<FieldValue>& in) return out; } + +// Build up a list of the current set of deleted objects that are pending their +// next (last) publish-ment. +void ManagementAgent::exportDeletedObjects(DeletedObjectList& outList) +{ + sys::Mutex::ScopedLock lock (userLock); + list<pair<ObjectId, ManagementObject*> > deleteList; + + moveNewObjectsLH(); + + for (ManagementObjectMap::iterator iter = managementObjects.begin(); + iter != managementObjects.end(); + iter++) { + ManagementObject* object = iter->second; + + if (object->isDeleted()) { + deleteList.push_back(pair<ObjectId, ManagementObject*>(iter->first, object)); + } + } + + // Remove Deleted objects, and save for later publishing... + // + for (list<pair<ObjectId, ManagementObject*> >::reverse_iterator iter = deleteList.rbegin(); + iter != deleteList.rend(); + iter++) { + + ManagementObject* delObj = iter->second; + DeletedObject::shared_ptr dptr(new DeletedObject()); + std::string classkey(delObj->getPackageName() + std::string(":") + delObj->getClassName()); + bool send_stats = (delObj->hasInst() && (delObj->getInstChanged() || delObj->getForcePublish())); + + dptr->packageName = delObj->getPackageName(); + dptr->className = delObj->getClassName(); + delObj->getObjectId().encode(dptr->objectId); + + if (qmf1Support) { + delObj->writeProperties(dptr->encodedV1Config); + if (send_stats) { + delObj->writeStatistics(dptr->encodedV1Inst); + } + } + + if (qmf2Support) { + Variant::Map map_; + Variant::Map values; + Variant::Map oid; + + delObj->getObjectId().mapEncode(oid); + map_["_object_id"] = oid; + map_["_schema_id"] = mapEncodeSchemaId(delObj->getPackageName(), + delObj->getClassName(), + "_data", + delObj->getMd5Sum()); + delObj->writeTimestamps(map_); + delObj->mapEncodeValues(values, true, send_stats); + map_["_values"] = values; + + dptr->encodedV2 = map_; + } + + pendingDeletedObjs[classkey].push_back(dptr); + + delete iter->second; + managementObjects.erase(iter->first); + } + + // now copy the pending deletes into the outList + + for (PendingDeletedObjsMap::iterator mIter = pendingDeletedObjs.begin(); + mIter != pendingDeletedObjs.end(); mIter++) { + for (DeletedObjectList::iterator lIter = mIter->second.begin(); + lIter != mIter->second.end(); lIter++) { + outList.push_back(*lIter); + } + } +} + + +// Merge this list's deleted objects to the management Agent's list of deleted +// objects waiting for next (last) publish-ment. +void ManagementAgent::importDeletedObjects(const DeletedObjectList& inList) +{ + sys::Mutex::ScopedLock lock (userLock); + + for (DeletedObjectList::const_iterator lIter = inList.begin(); lIter != inList.end(); lIter++) { + + std::string classkey((*lIter)->packageName + std::string(":") + (*lIter)->className); + DeletedObjectList& dList = pendingDeletedObjs[classkey]; + + // not sure if this is necessary - merge by objectid.... + bool found = false; + for (DeletedObjectList::iterator dIter = dList.begin(); dIter != dList.end(); dIter++) { + if ((*dIter)->objectId == (*lIter)->objectId) { + found = true; + break; + } + } + if (!found) { + dList.push_back(*lIter); + } + } +} + + +// construct a DeletedObject from an encoded representation. Used by +// clustering to move deleted objects between clustered brokers. See +// DeletedObject::encode() for the reverse. +ManagementAgent::DeletedObject::DeletedObject(const std::string& encoded) +{ + qpid::types::Variant::Map map_; + MapCodec::decode(encoded, map_); + + packageName = map_["_package_name"].getString(); + className = map_["_class_name"].getString(); + objectId = map_["_object_id"].getString(); + + encodedV1Config = map_["_v1_config"].getString(); + encodedV1Inst = map_["_v1_inst"].getString(); + encodedV2 = map_["_v2_data"].asMap(); +} + + +// encode a DeletedObject to a string buffer. Used by +// clustering to move deleted objects between clustered brokers. See +// DeletedObject(const std::string&) for the reverse. +void ManagementAgent::DeletedObject::encode(std::string& toBuffer) +{ + qpid::types::Variant::Map map_; + + + map_["_package_name"] = packageName; + map_["_class_name"] = className; + map_["_object_id"] = objectId; + + map_["_v1_config"] = encodedV1Config; + map_["_v1_inst"] = encodedV1Inst; + map_["_v2_data"] = encodedV2; + + MapCodec::encode(map_, toBuffer); +} diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h index f4d3c8c299..9829094a0f 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.h +++ b/qpid/cpp/src/qpid/management/ManagementAgent.h @@ -148,6 +148,40 @@ public: static boost::shared_ptr<framing::FieldValue> toFieldValue(const types::Variant& in); static types::Variant toVariant(const boost::shared_ptr<framing::FieldValue>& val); + // For Clustering: management objects that have been marked as + // "deleted", but are waiting for their last published object + // update are not visible to the cluster replication code. These + // interfaces allow clustering to gather up all the management + // objects that are deleted in order to allow all clustered + // brokers to publish the same set of deleted objects. + + class DeletedObject { + public: + typedef boost::shared_ptr<DeletedObject> shared_ptr; + DeletedObject() {}; + DeletedObject( const std::string &encoded ); + ~DeletedObject() {}; + void encode( std::string& toBuffer ); + + private: + friend class ManagementAgent; + + std::string packageName; + std::string className; + std::string objectId; + + std::string encodedV1Config; // qmfv1 properties + std::string encodedV1Inst; // qmfv1 statistics + qpid::types::Variant::Map encodedV2; + }; + + typedef std::vector<DeletedObject::shared_ptr> DeletedObjectList; + + /** returns a snapshot of all currently deleted management objects. */ + void exportDeletedObjects( DeletedObjectList& outList ); + + /** Import a list of deleted objects to send on next publish interval. */ + void importDeletedObjects( const DeletedObjectList& inList ); private: struct Periodic : public qpid::sys::TimerTask @@ -293,6 +327,13 @@ private: // message. uint32_t maxV2ReplyObjs; + // list of objects that have been deleted, but have yet to be published + // one final time. + // Indexed by a string composed of the object's package and class name. + // Protected by userLock. + typedef std::map<std::string, DeletedObjectList> PendingDeletedObjsMap; + PendingDeletedObjsMap pendingDeletedObjs; + # define MA_BUFFER_SIZE 65536 char inputBuffer[MA_BUFFER_SIZE]; char outputBuffer[MA_BUFFER_SIZE]; diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py index 9f70121b74..99474da2f2 100755 --- a/qpid/cpp/src/tests/cluster_tests.py +++ b/qpid/cpp/src/tests/cluster_tests.py @@ -298,7 +298,7 @@ class LongTests(BrokerTest): receiver.stop() for i in range(i, len(cluster)): cluster[i].kill() - def test_management(self): + def test_management(self, args=[]): """Stress test: Run management clients and other clients concurrently.""" class ClientLoop(StoppableThread): @@ -353,7 +353,7 @@ class LongTests(BrokerTest): StoppableThread.stop(self) # def test_management - args = ["--mgmt-pub-interval", 1] # Publish management information every second. + args += ["--mgmt-pub-interval", 1] # Publish management information every second. # Use store if present. if BrokerTest.store_lib: args +=["--load-module", BrokerTest.store_lib] cluster = self.cluster(3, args) @@ -402,6 +402,9 @@ class LongTests(BrokerTest): for c in chain(mclients, *clients): c.stop() + def test_management_qmf2(self): + self.test_management(args=["--mgmt-qmf2=yes"]) + class StoreTests(BrokerTest): """ Cluster tests that can only be run if there is a store available. |