summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/cluster
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2010-12-01 21:31:36 +0000
committerAlan Conway <aconway@apache.org>2010-12-01 21:31:36 +0000
commitc7ffb7a9c913627a4104823e5384144b348d7623 (patch)
tree5465163d0c5797ba347cef532cab0a380ccbd218 /qpid/cpp/src/qpid/cluster
parent4101090c274118708f0183b436f3de68f1a32277 (diff)
downloadqpid-python-c7ffb7a9c913627a4104823e5384144b348d7623.tar.gz
Modified cluster_tests causes broker shut down with invalid-argument error.
Described in https://bugzilla.redhat.com/show_bug.cgi?id=655078. The management agent's deleted-object list was not being replicated to new members joining the cluster, so management generated fewer deleted object notifications on the newer member, causing it to fail with an invalid-argument error. The list is now being replicated correctly. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1041181 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/cluster')
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp7
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.cpp24
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.h2
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateClient.cpp40
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp96
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateDataExchange.h81
6 files changed, 221 insertions, 29 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index b7545ad706..37932be735 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -127,6 +127,7 @@
#include "qpid/cluster/UpdateClient.h"
#include "qpid/cluster/RetractClient.h"
#include "qpid/cluster/FailoverExchange.h"
+#include "qpid/cluster/UpdateDataExchange.h"
#include "qpid/cluster/UpdateExchange.h"
#include "qpid/cluster/ClusterTimer.h"
@@ -197,7 +198,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster;
* Currently use SVN revision to avoid clashes with versions from
* different branches.
*/
-const uint32_t Cluster::CLUSTER_VERSION = 964709;
+const uint32_t Cluster::CLUSTER_VERSION = 1039478;
struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
qpid::cluster::Cluster& cluster;
@@ -289,6 +290,10 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
// without modifying delivery-properties.exchange.
broker.getExchanges().registerExchange(
boost::shared_ptr<broker::Exchange>(new UpdateExchange(this)));
+ // Update-data exchange is used for passing data that may be too large
+ // for single control frame.
+ broker.getExchanges().registerExchange(
+ boost::shared_ptr<broker::Exchange>(new UpdateDataExchange(this, broker.getManagementAgent())));
// Load my store status before we go into initialization
if (! broker::NullMessageStore::isNullStore(&broker.getStore())) {
diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp
index e76cebf68d..7406d64bda 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.cpp
+++ b/qpid/cpp/src/qpid/cluster/Connection.cpp
@@ -18,6 +18,7 @@
* under the License.
*
*/
+#include "qpid/amqp_0_10/Codecs.h"
#include "Connection.h"
#include "UpdateClient.h"
#include "Cluster.h"
@@ -42,6 +43,7 @@
#include "qpid/framing/ConnectionCloseOkBody.h"
#include "qpid/log/Statement.h"
#include "qpid/sys/ClusterSafe.h"
+#include "qpid/types/Variant.h"
#include "qpid/management/ManagementAgent.h"
#include <boost/current_function.hpp>
@@ -51,7 +53,8 @@ namespace cluster {
using namespace framing;
using namespace framing::cluster;
-
+using amqp_0_10::ListCodec;
+using types::Variant;
qpid::sys::AtomicValue<uint64_t> Connection::catchUpId(0x5000000000000000LL);
@@ -626,15 +629,6 @@ void Connection::addQueueListener(const std::string& q, uint32_t listener) {
findQueue(q)->getListeners().addListener(updateIn.consumerNumbering[listener]);
}
-void Connection::managementSchema(const std::string& data) {
- management::ManagementAgent* agent = cluster.getBroker().getManagementAgent();
- if (!agent)
- throw Exception(QPID_MSG("Management schema update but management not enabled."));
- framing::Buffer buf(const_cast<char*>(data.data()), data.size());
- agent->importSchemas(buf);
- QPID_LOG(debug, cluster << " updated management schemas");
-}
-
//
// This is the handler for incoming managementsetup messages.
//
@@ -648,15 +642,5 @@ void Connection::managementSetupState(uint64_t objectNum, uint16_t bootSequence)
agent->setNextObjectId(objectNum);
agent->setBootSequence(bootSequence);
}
-
-void Connection::managementAgents(const std::string& data) {
- management::ManagementAgent* agent = cluster.getBroker().getManagementAgent();
- if (!agent)
- throw Exception(QPID_MSG("Management agent update but management not enabled."));
- framing::Buffer buf(const_cast<char*>(data.data()), data.size());
- agent->importAgents(buf);
- QPID_LOG(debug, cluster << " updated management agents");
-}
-
}} // Namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h
index 24b8c8532f..95d846eaf8 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.h
+++ b/qpid/cpp/src/qpid/cluster/Connection.h
@@ -177,8 +177,6 @@ class Connection :
OutputInterceptor& getOutput() { return output; }
void addQueueListener(const std::string& queue, uint32_t listener);
- void managementSchema(const std::string& data);
- void managementAgents(const std::string& data);
void managementSetupState(uint64_t objectNum, uint16_t bootSequence);
void setSecureConnection ( broker::SecureConnection * sc );
diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
index bc1b812a94..7d73f3c1db 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -18,12 +18,14 @@
* under the License.
*
*/
+#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/cluster/UpdateClient.h"
#include "qpid/cluster/Cluster.h"
#include "qpid/cluster/ClusterMap.h"
#include "qpid/cluster/Connection.h"
#include "qpid/cluster/Decoder.h"
#include "qpid/cluster/ExpiryPolicy.h"
+#include "qpid/cluster/UpdateDataExchange.h"
#include "qpid/client/SessionBase_0_10Access.h"
#include "qpid/client/ConnectionAccess.h"
#include "qpid/client/SessionImpl.h"
@@ -52,6 +54,7 @@
#include "qpid/framing/ProtocolVersion.h"
#include "qpid/framing/TypeCode.h"
#include "qpid/log/Statement.h"
+#include "qpid/types/Variant.h"
#include "qpid/Url.h"
#include "qmf/org/apache/qpid/broker/ManagementSetupState.h"
#include <boost/bind.hpp>
@@ -62,12 +65,14 @@
namespace qpid {
namespace cluster {
+using amqp_0_10::ListCodec;
using broker::Broker;
using broker::Exchange;
using broker::Queue;
using broker::QueueBinding;
using broker::Message;
using broker::SemanticState;
+using types::Variant;
using namespace framing;
namespace arg=client::arg;
@@ -153,7 +158,6 @@ void UpdateClient::update() {
std::for_each(connections.begin(), connections.end(),
boost::bind(&UpdateClient::updateConnection, this, _1));
session.queueDelete(arg::queue=UPDATE);
- session.close();
// Update queue listeners: must come after sessions so consumerNumbering is populated
b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueListeners, this, _1));
@@ -162,14 +166,16 @@ void UpdateClient::update() {
updateManagementAgent();
+ session.close();
+
ClusterConnectionMembershipBody membership;
map.toMethodBody(membership);
AMQFrame frame(membership);
client::ConnectionAccess::getImpl(connection)->expand(frame.encodedSize(), false);
client::ConnectionAccess::getImpl(connection)->handle(frame);
- // FIXME aconway 2010-06-16: Connection will be closed from the other end.
- // connection.close();
+ // NOTE: connection will be closed from the other end, don't close
+ // it here as that causes a race.
// FIXME aconway 2010-03-15: This sleep avoids the race condition
// described in // https://bugzilla.redhat.com/show_bug.cgi?id=568831.
@@ -221,12 +227,34 @@ void UpdateClient::updateManagementAgent()
{
management::ManagementAgent* agent = updaterBroker.getManagementAgent();
if (!agent) return;
- // Send management schemas and agents.
string data;
+
+ QPID_LOG(debug, updaterId << " updating management schemas. ")
agent->exportSchemas(data);
- ClusterConnectionProxy(session).managementSchema(data);
+ session.messageTransfer(
+ arg::content=client::Message(data, UpdateDataExchange::MANAGEMENT_SCHEMAS_KEY),
+ arg::destination=UpdateDataExchange::EXCHANGE_NAME);
+
+ QPID_LOG(debug, updaterId << " updating management agents. ")
agent->exportAgents(data);
- ClusterConnectionProxy(session).managementAgents(data);
+ session.messageTransfer(
+ arg::content=client::Message(data, UpdateDataExchange::MANAGEMENT_AGENTS_KEY),
+ arg::destination=UpdateDataExchange::EXCHANGE_NAME);
+
+ QPID_LOG(debug, updaterId << " updating management deleted objects. ")
+ typedef management::ManagementAgent::DeletedObjectList DeletedObjectList;
+ DeletedObjectList deleted;
+ agent->exportDeletedObjects(deleted);
+ Variant::List list;
+ for (DeletedObjectList::iterator i = deleted.begin(); i != deleted.end(); ++i) {
+ string encoded;
+ (*i)->encode(encoded);
+ list.push_back(encoded);
+ }
+ ListCodec::encode(list, data);
+ session.messageTransfer(
+ arg::content=client::Message(data, UpdateDataExchange::MANAGEMENT_DELETED_OBJECTS_KEY),
+ arg::destination=UpdateDataExchange::EXCHANGE_NAME);
}
void UpdateClient::updateExchange(const boost::shared_ptr<Exchange>& ex) {
diff --git a/qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp b/qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp
new file mode 100644
index 0000000000..90a53f5531
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp
@@ -0,0 +1,96 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "UpdateDataExchange.h"
+#include "qpid/amqp_0_10/Codecs.h"
+#include "qpid/broker/Deliverable.h"
+#include "qpid/broker/Message.h"
+#include "qpid/log/Statement.h"
+#include "qpid/management/ManagementAgent.h"
+#include "qpid/types/Variant.h"
+
+namespace qpid {
+namespace cluster {
+
+const std::string UpdateDataExchange::EXCHANGE_NAME("qpid.cluster-update-data");
+const std::string UpdateDataExchange::EXCHANGE_TYPE("qpid.cluster-update-data");
+const std::string UpdateDataExchange::MANAGEMENT_AGENTS_KEY("management-agents");
+const std::string UpdateDataExchange::MANAGEMENT_SCHEMAS_KEY("management-schemas");
+const std::string UpdateDataExchange::MANAGEMENT_DELETED_OBJECTS_KEY("management-deleted-objects");
+
+UpdateDataExchange::UpdateDataExchange(management::Manageable* parent,
+ management::ManagementAgent* agent_) :
+ Exchange(EXCHANGE_NAME, parent),
+ agent(agent_)
+{}
+
+void UpdateDataExchange::route(broker::Deliverable& msg, const std::string& routingKey,
+ const qpid::framing::FieldTable* )
+{
+ std::string data = msg.getMessage().getFrames().getContent();
+ if (routingKey == MANAGEMENT_AGENTS_KEY)
+ managementAgents(data);
+ else if (routingKey == MANAGEMENT_SCHEMAS_KEY)
+ managementSchemas(data);
+ else if (routingKey == MANAGEMENT_DELETED_OBJECTS_KEY)
+ managementDeletedObjects(data);
+ else
+ throw Exception(
+ QPID_MSG("Cluster update-data exchange received unknown routing-key: "
+ << routingKey));
+}
+
+void UpdateDataExchange::managementAgents(const std::string& data) {
+ if (!agent)
+ throw Exception(
+ QPID_MSG("Received management agent update but management is disabled."));
+ framing::Buffer buf(const_cast<char*>(data.data()), data.size());
+ agent->importAgents(buf);
+ QPID_LOG(debug, " Updated management agents.");
+}
+
+void UpdateDataExchange::managementSchemas(const std::string& data) {
+ if (!agent)
+ throw Exception(
+ QPID_MSG("Received management schema update but management is disabled."));
+ framing::Buffer buf(const_cast<char*>(data.data()), data.size());
+ agent->importSchemas(buf);
+ QPID_LOG(debug, " Updated management schemas");
+}
+
+void UpdateDataExchange::managementDeletedObjects(const std::string& data) {
+ using amqp_0_10::ListCodec;
+ using types::Variant;
+ if (!agent)
+ throw Exception(
+ QPID_MSG("Management agent update but management not enabled."));
+ Variant::List encoded;
+ ListCodec::decode(data, encoded);
+ management::ManagementAgent::DeletedObjectList objects;
+ for (Variant::List::iterator i = encoded.begin(); i != encoded.end(); ++i) {
+ objects.push_back(management::ManagementAgent::DeletedObject::shared_ptr(
+ new management::ManagementAgent::DeletedObject(*i)));
+ }
+ agent->importDeletedObjects(objects);
+ QPID_LOG(debug, " Updated management deleted objects.");
+}
+
+
+}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/UpdateDataExchange.h b/qpid/cpp/src/qpid/cluster/UpdateDataExchange.h
new file mode 100644
index 0000000000..1c4022a4aa
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/UpdateDataExchange.h
@@ -0,0 +1,81 @@
+#ifndef QPID_CLUSTER_UPDATEDATAEXCHANGE_H
+#define QPID_CLUSTER_UPDATEDATAEXCHANGE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/broker/Exchange.h"
+
+namespace qpid {
+
+namespace management {
+class ManagementAgent;
+}
+
+namespace cluster {
+
+/**
+ * An exchange used to send data that is to large for a control
+ * during update. The routing key indicates the type of data.
+ */
+class UpdateDataExchange : public broker::Exchange
+{
+ public:
+ static const std::string EXCHANGE_NAME;
+ static const std::string EXCHANGE_TYPE;
+ static const std::string MANAGEMENT_AGENTS_KEY;
+ static const std::string MANAGEMENT_SCHEMAS_KEY;
+ static const std::string MANAGEMENT_DELETED_OBJECTS_KEY;
+
+ UpdateDataExchange(management::Manageable* parent, management::ManagementAgent*);
+
+ void route(broker::Deliverable& msg, const std::string& routingKey,
+ const framing::FieldTable* args);
+
+ // Not implemented
+ std::string getType() const { return EXCHANGE_TYPE; }
+
+ bool bind(boost::shared_ptr<broker::Queue>,
+ const std::string&,
+ const qpid::framing::FieldTable*)
+ { return false; }
+
+ bool unbind(boost::shared_ptr<broker::Queue>,
+ const std::string&,
+ const qpid::framing::FieldTable*)
+ { return false; }
+
+ bool isBound(boost::shared_ptr<broker::Queue>,
+ const std::string*,
+ const qpid::framing::FieldTable*)
+ { return false; }
+
+ private:
+ management::ManagementAgent* agent;
+
+ void managementAgents(const std::string&);
+ void managementSchemas(const std::string&);
+ void managementDeletedObjects(const std::string&);
+};
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_UPDATEDATAEXCHANGE_H*/