summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp15
-rw-r--r--cpp/src/qpid/cluster/Cluster.h2
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp4
-rw-r--r--cpp/src/qpid/cluster/UpdateDataExchange.cpp49
-rw-r--r--cpp/src/qpid/cluster/UpdateDataExchange.h11
-rw-r--r--cpp/src/qpid/management/ManagementAgent.cpp32
6 files changed, 51 insertions, 62 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 37932be735..4f98c60cad 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -264,6 +264,8 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
boost::bind(&Cluster::leave, this),
"Error delivering frames",
poller),
+ failoverExchange(new FailoverExchange(broker.GetVhostObject(), &broker)),
+ updateDataExchange(new UpdateDataExchange(this)),
quorum(boost::bind(&Cluster::leave, this)),
decoder(boost::bind(&Cluster::deliverFrame, this, _1)),
discarding(true),
@@ -283,17 +285,16 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
broker.setClusterTimer(std::auto_ptr<sys::Timer>(timer));
// Failover exchange provides membership updates to clients.
- failoverExchange.reset(new FailoverExchange(broker.GetVhostObject(), &broker));
broker.getExchanges().registerExchange(failoverExchange);
// Update exchange is used during updates to replicate messages
// without modifying delivery-properties.exchange.
broker.getExchanges().registerExchange(
boost::shared_ptr<broker::Exchange>(new UpdateExchange(this)));
+
// Update-data exchange is used for passing data that may be too large
// for single control frame.
- broker.getExchanges().registerExchange(
- boost::shared_ptr<broker::Exchange>(new UpdateDataExchange(this, broker.getManagementAgent())));
+ broker.getExchanges().registerExchange(updateDataExchange);
// Load my store status before we go into initialization
if (! broker::NullMessageStore::isNullStore(&broker.getStore())) {
@@ -931,11 +932,15 @@ void Cluster::checkUpdateIn(Lock& l) {
// NB: don't updateMgmtMembership() here as we are not in the deliver
// thread. It will be updated on delivery of the "ready" we just mcast.
broker.setClusterUpdatee(false);
- if (mAgent) mAgent->suppress(false); // Enable management output.
+ if (mAgent) {
+ // Update management agent now, after all update activity is complete.
+ updateDataExchange->updateManagementAgent(mAgent);
+ mAgent->suppress(false); // Enable management output.
+ mAgent->clusterUpdate();
+ }
discarding = false; // OK to set, we're stalled for update.
QPID_LOG(notice, *this << " update complete, starting catch-up.");
QPID_LOG(debug, debugSnapshot()); // OK to call because we're stalled.
- if (mAgent) mAgent->clusterUpdate();
enableClusterSafe(); // Enable cluster-safe assertions
deliverEventQueue.start();
}
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index c046874f1c..8f73c6acca 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -68,6 +68,7 @@ namespace cluster {
class Connection;
class EventFrame;
class ClusterTimer;
+class UpdateDataExchange;
/**
* Connection to the cluster
@@ -255,6 +256,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
PollableEventQueue deliverEventQueue;
PollableFrameQueue deliverFrameQueue;
boost::shared_ptr<FailoverExchange> failoverExchange;
+ boost::shared_ptr<UpdateDataExchange> updateDataExchange;
Quorum quorum;
LockedConnectionMap localConnections;
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 7406d64bda..2a98800484 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -463,15 +463,15 @@ void Connection::membership(const FieldTable& joiners, const FieldTable& members
const framing::SequenceNumber& frameSeq)
{
QPID_LOG(debug, cluster << " incoming update complete on connection " << *this);
- cluster.updateInDone(ClusterMap(joiners, members, frameSeq));
updateIn.consumerNumbering.clear();
closeUpdated();
+ cluster.updateInDone(ClusterMap(joiners, members, frameSeq));
}
void Connection::retractOffer() {
QPID_LOG(info, cluster << " incoming update retracted on connection " << *this);
- cluster.updateInRetracted();
closeUpdated();
+ cluster.updateInRetracted();
}
void Connection::closeUpdated() {
diff --git a/cpp/src/qpid/cluster/UpdateDataExchange.cpp b/cpp/src/qpid/cluster/UpdateDataExchange.cpp
index 90a53f5531..2f242b3024 100644
--- a/cpp/src/qpid/cluster/UpdateDataExchange.cpp
+++ b/cpp/src/qpid/cluster/UpdateDataExchange.cpp
@@ -35,54 +35,37 @@ const std::string UpdateDataExchange::MANAGEMENT_AGENTS_KEY("management-agents")
const std::string UpdateDataExchange::MANAGEMENT_SCHEMAS_KEY("management-schemas");
const std::string UpdateDataExchange::MANAGEMENT_DELETED_OBJECTS_KEY("management-deleted-objects");
-UpdateDataExchange::UpdateDataExchange(management::Manageable* parent,
- management::ManagementAgent* agent_) :
- Exchange(EXCHANGE_NAME, parent),
- agent(agent_)
+UpdateDataExchange::UpdateDataExchange(management::Manageable* parent) :
+ Exchange(EXCHANGE_NAME, parent)
{}
void UpdateDataExchange::route(broker::Deliverable& msg, const std::string& routingKey,
const qpid::framing::FieldTable* )
{
std::string data = msg.getMessage().getFrames().getContent();
- if (routingKey == MANAGEMENT_AGENTS_KEY)
- managementAgents(data);
- else if (routingKey == MANAGEMENT_SCHEMAS_KEY)
- managementSchemas(data);
- else if (routingKey == MANAGEMENT_DELETED_OBJECTS_KEY)
- managementDeletedObjects(data);
- else
- throw Exception(
- QPID_MSG("Cluster update-data exchange received unknown routing-key: "
- << routingKey));
+ if (routingKey == MANAGEMENT_AGENTS_KEY) managementAgents = data;
+ else if (routingKey == MANAGEMENT_SCHEMAS_KEY) managementSchemas = data;
+ else if (routingKey == MANAGEMENT_DELETED_OBJECTS_KEY) managementDeletedObjects = data;
+ else throw Exception(
+ QPID_MSG("Cluster update-data exchange received unknown routing-key: "
+ << routingKey));
}
-void UpdateDataExchange::managementAgents(const std::string& data) {
- if (!agent)
- throw Exception(
- QPID_MSG("Received management agent update but management is disabled."));
- framing::Buffer buf(const_cast<char*>(data.data()), data.size());
- agent->importAgents(buf);
+void UpdateDataExchange::updateManagementAgent(management::ManagementAgent* agent) {
+ if (!agent) return;
+
+ framing::Buffer buf1(const_cast<char*>(managementAgents.data()), managementAgents.size());
+ agent->importAgents(buf1);
QPID_LOG(debug, " Updated management agents.");
-}
-void UpdateDataExchange::managementSchemas(const std::string& data) {
- if (!agent)
- throw Exception(
- QPID_MSG("Received management schema update but management is disabled."));
- framing::Buffer buf(const_cast<char*>(data.data()), data.size());
- agent->importSchemas(buf);
+ framing::Buffer buf2(const_cast<char*>(managementSchemas.data()), managementSchemas.size());
+ agent->importSchemas(buf2);
QPID_LOG(debug, " Updated management schemas");
-}
-void UpdateDataExchange::managementDeletedObjects(const std::string& data) {
using amqp_0_10::ListCodec;
using types::Variant;
- if (!agent)
- throw Exception(
- QPID_MSG("Management agent update but management not enabled."));
Variant::List encoded;
- ListCodec::decode(data, encoded);
+ ListCodec::decode(managementDeletedObjects, encoded);
management::ManagementAgent::DeletedObjectList objects;
for (Variant::List::iterator i = encoded.begin(); i != encoded.end(); ++i) {
objects.push_back(management::ManagementAgent::DeletedObject::shared_ptr(
diff --git a/cpp/src/qpid/cluster/UpdateDataExchange.h b/cpp/src/qpid/cluster/UpdateDataExchange.h
index 1c4022a4aa..27a98548f3 100644
--- a/cpp/src/qpid/cluster/UpdateDataExchange.h
+++ b/cpp/src/qpid/cluster/UpdateDataExchange.h
@@ -45,7 +45,7 @@ class UpdateDataExchange : public broker::Exchange
static const std::string MANAGEMENT_SCHEMAS_KEY;
static const std::string MANAGEMENT_DELETED_OBJECTS_KEY;
- UpdateDataExchange(management::Manageable* parent, management::ManagementAgent*);
+ UpdateDataExchange(management::Manageable* parent);
void route(broker::Deliverable& msg, const std::string& routingKey,
const framing::FieldTable* args);
@@ -68,12 +68,13 @@ class UpdateDataExchange : public broker::Exchange
const qpid::framing::FieldTable*)
{ return false; }
+ void updateManagementAgent(management::ManagementAgent* agent);
+
private:
- management::ManagementAgent* agent;
- void managementAgents(const std::string&);
- void managementSchemas(const std::string&);
- void managementDeletedObjects(const std::string&);
+ std::string managementAgents;
+ std::string managementSchemas;
+ std::string managementDeletedObjects;
};
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp
index 9d9cfb5164..900f80fce6 100644
--- a/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/cpp/src/qpid/management/ManagementAgent.cpp
@@ -2940,29 +2940,27 @@ void ManagementAgent::exportDeletedObjects(DeletedObjectList& outList)
}
}
-
-// Merge this list's deleted objects to the management Agent's list of deleted
-// objects waiting for next (last) publish-ment.
+// Called by cluster to reset the management agent's list of deleted
+// objects to match the rest of the cluster.
void ManagementAgent::importDeletedObjects(const DeletedObjectList& inList)
{
sys::Mutex::ScopedLock lock (userLock);
-
+ // Clear out any existing deleted objects
+ moveNewObjectsLH();
+ pendingDeletedObjs.clear();
+ ManagementObjectMap::iterator i = managementObjects.begin();
+ while (i != managementObjects.end()) {
+ ManagementObject* object = i->second;
+ if (object->isDeleted()) {
+ delete object;
+ managementObjects.erase(i++);
+ }
+ else ++i;
+ }
for (DeletedObjectList::const_iterator lIter = inList.begin(); lIter != inList.end(); lIter++) {
std::string classkey((*lIter)->packageName + std::string(":") + (*lIter)->className);
- DeletedObjectList& dList = pendingDeletedObjs[classkey];
-
- // not sure if this is necessary - merge by objectid....
- bool found = false;
- for (DeletedObjectList::iterator dIter = dList.begin(); dIter != dList.end(); dIter++) {
- if ((*dIter)->objectId == (*lIter)->objectId) {
- found = true;
- break;
- }
- }
- if (!found) {
- dList.push_back(*lIter);
- }
+ pendingDeletedObjs[classkey].push_back(*lIter);
}
}