summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/cluster
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2010-12-08 19:21:05 +0000
committerAlan Conway <aconway@apache.org>2010-12-08 19:21:05 +0000
commitf789f7e7a0e5a312d09eccc1029a3131d4549d25 (patch)
treee4d96f7f60401ea87a6e2d57c3d9d7fae1ccb7ff /qpid/cpp/src/qpid/cluster
parentc03731903e9687396163ae70361b8a80a615008b (diff)
downloadqpid-python-f789f7e7a0e5a312d09eccc1029a3131d4549d25.tar.gz
Defer update of managaement agent to end of update process.
Move updating of the management agent to the very end of the update process, after all objects used by the update process itself have been deleted. Before the fix deletions from the update process itself (deleting the qpid.cluster-update queue and its binding to the default exchange) were sporadically appearing as extra delete messages on the updatees management agent and causing inconsistency. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1043621 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/cluster')
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp15
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.h2
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.cpp4
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp49
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateDataExchange.h11
5 files changed, 36 insertions, 45 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index 37932be735..4f98c60cad 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -264,6 +264,8 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
boost::bind(&Cluster::leave, this),
"Error delivering frames",
poller),
+ failoverExchange(new FailoverExchange(broker.GetVhostObject(), &broker)),
+ updateDataExchange(new UpdateDataExchange(this)),
quorum(boost::bind(&Cluster::leave, this)),
decoder(boost::bind(&Cluster::deliverFrame, this, _1)),
discarding(true),
@@ -283,17 +285,16 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
broker.setClusterTimer(std::auto_ptr<sys::Timer>(timer));
// Failover exchange provides membership updates to clients.
- failoverExchange.reset(new FailoverExchange(broker.GetVhostObject(), &broker));
broker.getExchanges().registerExchange(failoverExchange);
// Update exchange is used during updates to replicate messages
// without modifying delivery-properties.exchange.
broker.getExchanges().registerExchange(
boost::shared_ptr<broker::Exchange>(new UpdateExchange(this)));
+
// Update-data exchange is used for passing data that may be too large
// for single control frame.
- broker.getExchanges().registerExchange(
- boost::shared_ptr<broker::Exchange>(new UpdateDataExchange(this, broker.getManagementAgent())));
+ broker.getExchanges().registerExchange(updateDataExchange);
// Load my store status before we go into initialization
if (! broker::NullMessageStore::isNullStore(&broker.getStore())) {
@@ -931,11 +932,15 @@ void Cluster::checkUpdateIn(Lock& l) {
// NB: don't updateMgmtMembership() here as we are not in the deliver
// thread. It will be updated on delivery of the "ready" we just mcast.
broker.setClusterUpdatee(false);
- if (mAgent) mAgent->suppress(false); // Enable management output.
+ if (mAgent) {
+ // Update management agent now, after all update activity is complete.
+ updateDataExchange->updateManagementAgent(mAgent);
+ mAgent->suppress(false); // Enable management output.
+ mAgent->clusterUpdate();
+ }
discarding = false; // OK to set, we're stalled for update.
QPID_LOG(notice, *this << " update complete, starting catch-up.");
QPID_LOG(debug, debugSnapshot()); // OK to call because we're stalled.
- if (mAgent) mAgent->clusterUpdate();
enableClusterSafe(); // Enable cluster-safe assertions
deliverEventQueue.start();
}
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h
index c046874f1c..8f73c6acca 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.h
+++ b/qpid/cpp/src/qpid/cluster/Cluster.h
@@ -68,6 +68,7 @@ namespace cluster {
class Connection;
class EventFrame;
class ClusterTimer;
+class UpdateDataExchange;
/**
* Connection to the cluster
@@ -255,6 +256,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
PollableEventQueue deliverEventQueue;
PollableFrameQueue deliverFrameQueue;
boost::shared_ptr<FailoverExchange> failoverExchange;
+ boost::shared_ptr<UpdateDataExchange> updateDataExchange;
Quorum quorum;
LockedConnectionMap localConnections;
diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp
index 7406d64bda..2a98800484 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.cpp
+++ b/qpid/cpp/src/qpid/cluster/Connection.cpp
@@ -463,15 +463,15 @@ void Connection::membership(const FieldTable& joiners, const FieldTable& members
const framing::SequenceNumber& frameSeq)
{
QPID_LOG(debug, cluster << " incoming update complete on connection " << *this);
- cluster.updateInDone(ClusterMap(joiners, members, frameSeq));
updateIn.consumerNumbering.clear();
closeUpdated();
+ cluster.updateInDone(ClusterMap(joiners, members, frameSeq));
}
void Connection::retractOffer() {
QPID_LOG(info, cluster << " incoming update retracted on connection " << *this);
- cluster.updateInRetracted();
closeUpdated();
+ cluster.updateInRetracted();
}
void Connection::closeUpdated() {
diff --git a/qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp b/qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp
index 90a53f5531..2f242b3024 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp
+++ b/qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp
@@ -35,54 +35,37 @@ const std::string UpdateDataExchange::MANAGEMENT_AGENTS_KEY("management-agents")
const std::string UpdateDataExchange::MANAGEMENT_SCHEMAS_KEY("management-schemas");
const std::string UpdateDataExchange::MANAGEMENT_DELETED_OBJECTS_KEY("management-deleted-objects");
-UpdateDataExchange::UpdateDataExchange(management::Manageable* parent,
- management::ManagementAgent* agent_) :
- Exchange(EXCHANGE_NAME, parent),
- agent(agent_)
+UpdateDataExchange::UpdateDataExchange(management::Manageable* parent) :
+ Exchange(EXCHANGE_NAME, parent)
{}
void UpdateDataExchange::route(broker::Deliverable& msg, const std::string& routingKey,
const qpid::framing::FieldTable* )
{
std::string data = msg.getMessage().getFrames().getContent();
- if (routingKey == MANAGEMENT_AGENTS_KEY)
- managementAgents(data);
- else if (routingKey == MANAGEMENT_SCHEMAS_KEY)
- managementSchemas(data);
- else if (routingKey == MANAGEMENT_DELETED_OBJECTS_KEY)
- managementDeletedObjects(data);
- else
- throw Exception(
- QPID_MSG("Cluster update-data exchange received unknown routing-key: "
- << routingKey));
+ if (routingKey == MANAGEMENT_AGENTS_KEY) managementAgents = data;
+ else if (routingKey == MANAGEMENT_SCHEMAS_KEY) managementSchemas = data;
+ else if (routingKey == MANAGEMENT_DELETED_OBJECTS_KEY) managementDeletedObjects = data;
+ else throw Exception(
+ QPID_MSG("Cluster update-data exchange received unknown routing-key: "
+ << routingKey));
}
-void UpdateDataExchange::managementAgents(const std::string& data) {
- if (!agent)
- throw Exception(
- QPID_MSG("Received management agent update but management is disabled."));
- framing::Buffer buf(const_cast<char*>(data.data()), data.size());
- agent->importAgents(buf);
+void UpdateDataExchange::updateManagementAgent(management::ManagementAgent* agent) {
+ if (!agent) return;
+
+ framing::Buffer buf1(const_cast<char*>(managementAgents.data()), managementAgents.size());
+ agent->importAgents(buf1);
QPID_LOG(debug, " Updated management agents.");
-}
-void UpdateDataExchange::managementSchemas(const std::string& data) {
- if (!agent)
- throw Exception(
- QPID_MSG("Received management schema update but management is disabled."));
- framing::Buffer buf(const_cast<char*>(data.data()), data.size());
- agent->importSchemas(buf);
+ framing::Buffer buf2(const_cast<char*>(managementSchemas.data()), managementSchemas.size());
+ agent->importSchemas(buf2);
QPID_LOG(debug, " Updated management schemas");
-}
-void UpdateDataExchange::managementDeletedObjects(const std::string& data) {
using amqp_0_10::ListCodec;
using types::Variant;
- if (!agent)
- throw Exception(
- QPID_MSG("Management agent update but management not enabled."));
Variant::List encoded;
- ListCodec::decode(data, encoded);
+ ListCodec::decode(managementDeletedObjects, encoded);
management::ManagementAgent::DeletedObjectList objects;
for (Variant::List::iterator i = encoded.begin(); i != encoded.end(); ++i) {
objects.push_back(management::ManagementAgent::DeletedObject::shared_ptr(
diff --git a/qpid/cpp/src/qpid/cluster/UpdateDataExchange.h b/qpid/cpp/src/qpid/cluster/UpdateDataExchange.h
index 1c4022a4aa..27a98548f3 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateDataExchange.h
+++ b/qpid/cpp/src/qpid/cluster/UpdateDataExchange.h
@@ -45,7 +45,7 @@ class UpdateDataExchange : public broker::Exchange
static const std::string MANAGEMENT_SCHEMAS_KEY;
static const std::string MANAGEMENT_DELETED_OBJECTS_KEY;
- UpdateDataExchange(management::Manageable* parent, management::ManagementAgent*);
+ UpdateDataExchange(management::Manageable* parent);
void route(broker::Deliverable& msg, const std::string& routingKey,
const framing::FieldTable* args);
@@ -68,12 +68,13 @@ class UpdateDataExchange : public broker::Exchange
const qpid::framing::FieldTable*)
{ return false; }
+ void updateManagementAgent(management::ManagementAgent* agent);
+
private:
- management::ManagementAgent* agent;
- void managementAgents(const std::string&);
- void managementSchemas(const std::string&);
- void managementDeletedObjects(const std::string&);
+ std::string managementAgents;
+ std::string managementSchemas;
+ std::string managementDeletedObjects;
};
}} // namespace qpid::cluster