diff options
author | Alan Conway <aconway@apache.org> | 2010-12-01 21:31:36 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2010-12-01 21:31:36 +0000 |
commit | c7ffb7a9c913627a4104823e5384144b348d7623 (patch) | |
tree | 5465163d0c5797ba347cef532cab0a380ccbd218 /qpid/cpp/src/qpid/cluster | |
parent | 4101090c274118708f0183b436f3de68f1a32277 (diff) | |
download | qpid-python-c7ffb7a9c913627a4104823e5384144b348d7623.tar.gz |
Modified cluster_tests causes broker shut down with invalid-argument error.
Described in https://bugzilla.redhat.com/show_bug.cgi?id=655078. The
management agent's deleted-object list was not being replicated to new
members joining the cluster, so management generated fewer deleted
object notifications on the newer member, causing it to fail with an
invalid-argument error. The list is now being replicated correctly.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1041181 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/cluster')
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cluster.cpp | 7 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Connection.cpp | 24 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Connection.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/UpdateClient.cpp | 40 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp | 96 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/UpdateDataExchange.h | 81 |
6 files changed, 221 insertions, 29 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index b7545ad706..37932be735 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -127,6 +127,7 @@ #include "qpid/cluster/UpdateClient.h" #include "qpid/cluster/RetractClient.h" #include "qpid/cluster/FailoverExchange.h" +#include "qpid/cluster/UpdateDataExchange.h" #include "qpid/cluster/UpdateExchange.h" #include "qpid/cluster/ClusterTimer.h" @@ -197,7 +198,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster; * Currently use SVN revision to avoid clashes with versions from * different branches. */ -const uint32_t Cluster::CLUSTER_VERSION = 964709; +const uint32_t Cluster::CLUSTER_VERSION = 1039478; struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { qpid::cluster::Cluster& cluster; @@ -289,6 +290,10 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : // without modifying delivery-properties.exchange. broker.getExchanges().registerExchange( boost::shared_ptr<broker::Exchange>(new UpdateExchange(this))); + // Update-data exchange is used for passing data that may be too large + // for single control frame. + broker.getExchanges().registerExchange( + boost::shared_ptr<broker::Exchange>(new UpdateDataExchange(this, broker.getManagementAgent()))); // Load my store status before we go into initialization if (! broker::NullMessageStore::isNullStore(&broker.getStore())) { diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp index e76cebf68d..7406d64bda 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.cpp +++ b/qpid/cpp/src/qpid/cluster/Connection.cpp @@ -18,6 +18,7 @@ * under the License. * */ +#include "qpid/amqp_0_10/Codecs.h" #include "Connection.h" #include "UpdateClient.h" #include "Cluster.h" @@ -42,6 +43,7 @@ #include "qpid/framing/ConnectionCloseOkBody.h" #include "qpid/log/Statement.h" #include "qpid/sys/ClusterSafe.h" +#include "qpid/types/Variant.h" #include "qpid/management/ManagementAgent.h" #include <boost/current_function.hpp> @@ -51,7 +53,8 @@ namespace cluster { using namespace framing; using namespace framing::cluster; - +using amqp_0_10::ListCodec; +using types::Variant; qpid::sys::AtomicValue<uint64_t> Connection::catchUpId(0x5000000000000000LL); @@ -626,15 +629,6 @@ void Connection::addQueueListener(const std::string& q, uint32_t listener) { findQueue(q)->getListeners().addListener(updateIn.consumerNumbering[listener]); } -void Connection::managementSchema(const std::string& data) { - management::ManagementAgent* agent = cluster.getBroker().getManagementAgent(); - if (!agent) - throw Exception(QPID_MSG("Management schema update but management not enabled.")); - framing::Buffer buf(const_cast<char*>(data.data()), data.size()); - agent->importSchemas(buf); - QPID_LOG(debug, cluster << " updated management schemas"); -} - // // This is the handler for incoming managementsetup messages. // @@ -648,15 +642,5 @@ void Connection::managementSetupState(uint64_t objectNum, uint16_t bootSequence) agent->setNextObjectId(objectNum); agent->setBootSequence(bootSequence); } - -void Connection::managementAgents(const std::string& data) { - management::ManagementAgent* agent = cluster.getBroker().getManagementAgent(); - if (!agent) - throw Exception(QPID_MSG("Management agent update but management not enabled.")); - framing::Buffer buf(const_cast<char*>(data.data()), data.size()); - agent->importAgents(buf); - QPID_LOG(debug, cluster << " updated management agents"); -} - }} // Namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h index 24b8c8532f..95d846eaf8 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.h +++ b/qpid/cpp/src/qpid/cluster/Connection.h @@ -177,8 +177,6 @@ class Connection : OutputInterceptor& getOutput() { return output; } void addQueueListener(const std::string& queue, uint32_t listener); - void managementSchema(const std::string& data); - void managementAgents(const std::string& data); void managementSetupState(uint64_t objectNum, uint16_t bootSequence); void setSecureConnection ( broker::SecureConnection * sc ); diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp index bc1b812a94..7d73f3c1db 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp @@ -18,12 +18,14 @@ * under the License. * */ +#include "qpid/amqp_0_10/Codecs.h" #include "qpid/cluster/UpdateClient.h" #include "qpid/cluster/Cluster.h" #include "qpid/cluster/ClusterMap.h" #include "qpid/cluster/Connection.h" #include "qpid/cluster/Decoder.h" #include "qpid/cluster/ExpiryPolicy.h" +#include "qpid/cluster/UpdateDataExchange.h" #include "qpid/client/SessionBase_0_10Access.h" #include "qpid/client/ConnectionAccess.h" #include "qpid/client/SessionImpl.h" @@ -52,6 +54,7 @@ #include "qpid/framing/ProtocolVersion.h" #include "qpid/framing/TypeCode.h" #include "qpid/log/Statement.h" +#include "qpid/types/Variant.h" #include "qpid/Url.h" #include "qmf/org/apache/qpid/broker/ManagementSetupState.h" #include <boost/bind.hpp> @@ -62,12 +65,14 @@ namespace qpid { namespace cluster { +using amqp_0_10::ListCodec; using broker::Broker; using broker::Exchange; using broker::Queue; using broker::QueueBinding; using broker::Message; using broker::SemanticState; +using types::Variant; using namespace framing; namespace arg=client::arg; @@ -153,7 +158,6 @@ void UpdateClient::update() { std::for_each(connections.begin(), connections.end(), boost::bind(&UpdateClient::updateConnection, this, _1)); session.queueDelete(arg::queue=UPDATE); - session.close(); // Update queue listeners: must come after sessions so consumerNumbering is populated b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueListeners, this, _1)); @@ -162,14 +166,16 @@ void UpdateClient::update() { updateManagementAgent(); + session.close(); + ClusterConnectionMembershipBody membership; map.toMethodBody(membership); AMQFrame frame(membership); client::ConnectionAccess::getImpl(connection)->expand(frame.encodedSize(), false); client::ConnectionAccess::getImpl(connection)->handle(frame); - // FIXME aconway 2010-06-16: Connection will be closed from the other end. - // connection.close(); + // NOTE: connection will be closed from the other end, don't close + // it here as that causes a race. // FIXME aconway 2010-03-15: This sleep avoids the race condition // described in // https://bugzilla.redhat.com/show_bug.cgi?id=568831. @@ -221,12 +227,34 @@ void UpdateClient::updateManagementAgent() { management::ManagementAgent* agent = updaterBroker.getManagementAgent(); if (!agent) return; - // Send management schemas and agents. string data; + + QPID_LOG(debug, updaterId << " updating management schemas. ") agent->exportSchemas(data); - ClusterConnectionProxy(session).managementSchema(data); + session.messageTransfer( + arg::content=client::Message(data, UpdateDataExchange::MANAGEMENT_SCHEMAS_KEY), + arg::destination=UpdateDataExchange::EXCHANGE_NAME); + + QPID_LOG(debug, updaterId << " updating management agents. ") agent->exportAgents(data); - ClusterConnectionProxy(session).managementAgents(data); + session.messageTransfer( + arg::content=client::Message(data, UpdateDataExchange::MANAGEMENT_AGENTS_KEY), + arg::destination=UpdateDataExchange::EXCHANGE_NAME); + + QPID_LOG(debug, updaterId << " updating management deleted objects. ") + typedef management::ManagementAgent::DeletedObjectList DeletedObjectList; + DeletedObjectList deleted; + agent->exportDeletedObjects(deleted); + Variant::List list; + for (DeletedObjectList::iterator i = deleted.begin(); i != deleted.end(); ++i) { + string encoded; + (*i)->encode(encoded); + list.push_back(encoded); + } + ListCodec::encode(list, data); + session.messageTransfer( + arg::content=client::Message(data, UpdateDataExchange::MANAGEMENT_DELETED_OBJECTS_KEY), + arg::destination=UpdateDataExchange::EXCHANGE_NAME); } void UpdateClient::updateExchange(const boost::shared_ptr<Exchange>& ex) { diff --git a/qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp b/qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp new file mode 100644 index 0000000000..90a53f5531 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp @@ -0,0 +1,96 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "UpdateDataExchange.h" +#include "qpid/amqp_0_10/Codecs.h" +#include "qpid/broker/Deliverable.h" +#include "qpid/broker/Message.h" +#include "qpid/log/Statement.h" +#include "qpid/management/ManagementAgent.h" +#include "qpid/types/Variant.h" + +namespace qpid { +namespace cluster { + +const std::string UpdateDataExchange::EXCHANGE_NAME("qpid.cluster-update-data"); +const std::string UpdateDataExchange::EXCHANGE_TYPE("qpid.cluster-update-data"); +const std::string UpdateDataExchange::MANAGEMENT_AGENTS_KEY("management-agents"); +const std::string UpdateDataExchange::MANAGEMENT_SCHEMAS_KEY("management-schemas"); +const std::string UpdateDataExchange::MANAGEMENT_DELETED_OBJECTS_KEY("management-deleted-objects"); + +UpdateDataExchange::UpdateDataExchange(management::Manageable* parent, + management::ManagementAgent* agent_) : + Exchange(EXCHANGE_NAME, parent), + agent(agent_) +{} + +void UpdateDataExchange::route(broker::Deliverable& msg, const std::string& routingKey, + const qpid::framing::FieldTable* ) +{ + std::string data = msg.getMessage().getFrames().getContent(); + if (routingKey == MANAGEMENT_AGENTS_KEY) + managementAgents(data); + else if (routingKey == MANAGEMENT_SCHEMAS_KEY) + managementSchemas(data); + else if (routingKey == MANAGEMENT_DELETED_OBJECTS_KEY) + managementDeletedObjects(data); + else + throw Exception( + QPID_MSG("Cluster update-data exchange received unknown routing-key: " + << routingKey)); +} + +void UpdateDataExchange::managementAgents(const std::string& data) { + if (!agent) + throw Exception( + QPID_MSG("Received management agent update but management is disabled.")); + framing::Buffer buf(const_cast<char*>(data.data()), data.size()); + agent->importAgents(buf); + QPID_LOG(debug, " Updated management agents."); +} + +void UpdateDataExchange::managementSchemas(const std::string& data) { + if (!agent) + throw Exception( + QPID_MSG("Received management schema update but management is disabled.")); + framing::Buffer buf(const_cast<char*>(data.data()), data.size()); + agent->importSchemas(buf); + QPID_LOG(debug, " Updated management schemas"); +} + +void UpdateDataExchange::managementDeletedObjects(const std::string& data) { + using amqp_0_10::ListCodec; + using types::Variant; + if (!agent) + throw Exception( + QPID_MSG("Management agent update but management not enabled.")); + Variant::List encoded; + ListCodec::decode(data, encoded); + management::ManagementAgent::DeletedObjectList objects; + for (Variant::List::iterator i = encoded.begin(); i != encoded.end(); ++i) { + objects.push_back(management::ManagementAgent::DeletedObject::shared_ptr( + new management::ManagementAgent::DeletedObject(*i))); + } + agent->importDeletedObjects(objects); + QPID_LOG(debug, " Updated management deleted objects."); +} + + +}} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/UpdateDataExchange.h b/qpid/cpp/src/qpid/cluster/UpdateDataExchange.h new file mode 100644 index 0000000000..1c4022a4aa --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/UpdateDataExchange.h @@ -0,0 +1,81 @@ +#ifndef QPID_CLUSTER_UPDATEDATAEXCHANGE_H +#define QPID_CLUSTER_UPDATEDATAEXCHANGE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/broker/Exchange.h" + +namespace qpid { + +namespace management { +class ManagementAgent; +} + +namespace cluster { + +/** + * An exchange used to send data that is to large for a control + * during update. The routing key indicates the type of data. + */ +class UpdateDataExchange : public broker::Exchange +{ + public: + static const std::string EXCHANGE_NAME; + static const std::string EXCHANGE_TYPE; + static const std::string MANAGEMENT_AGENTS_KEY; + static const std::string MANAGEMENT_SCHEMAS_KEY; + static const std::string MANAGEMENT_DELETED_OBJECTS_KEY; + + UpdateDataExchange(management::Manageable* parent, management::ManagementAgent*); + + void route(broker::Deliverable& msg, const std::string& routingKey, + const framing::FieldTable* args); + + // Not implemented + std::string getType() const { return EXCHANGE_TYPE; } + + bool bind(boost::shared_ptr<broker::Queue>, + const std::string&, + const qpid::framing::FieldTable*) + { return false; } + + bool unbind(boost::shared_ptr<broker::Queue>, + const std::string&, + const qpid::framing::FieldTable*) + { return false; } + + bool isBound(boost::shared_ptr<broker::Queue>, + const std::string*, + const qpid::framing::FieldTable*) + { return false; } + + private: + management::ManagementAgent* agent; + + void managementAgents(const std::string&); + void managementSchemas(const std::string&); + void managementDeletedObjects(const std::string&); +}; + +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_UPDATEDATAEXCHANGE_H*/ |