diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 15 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/UpdateDataExchange.cpp | 49 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/UpdateDataExchange.h | 11 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementAgent.cpp | 32 |
6 files changed, 51 insertions, 62 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 37932be735..4f98c60cad 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -264,6 +264,8 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : boost::bind(&Cluster::leave, this), "Error delivering frames", poller), + failoverExchange(new FailoverExchange(broker.GetVhostObject(), &broker)), + updateDataExchange(new UpdateDataExchange(this)), quorum(boost::bind(&Cluster::leave, this)), decoder(boost::bind(&Cluster::deliverFrame, this, _1)), discarding(true), @@ -283,17 +285,16 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : broker.setClusterTimer(std::auto_ptr<sys::Timer>(timer)); // Failover exchange provides membership updates to clients. - failoverExchange.reset(new FailoverExchange(broker.GetVhostObject(), &broker)); broker.getExchanges().registerExchange(failoverExchange); // Update exchange is used during updates to replicate messages // without modifying delivery-properties.exchange. broker.getExchanges().registerExchange( boost::shared_ptr<broker::Exchange>(new UpdateExchange(this))); + // Update-data exchange is used for passing data that may be too large // for single control frame. - broker.getExchanges().registerExchange( - boost::shared_ptr<broker::Exchange>(new UpdateDataExchange(this, broker.getManagementAgent()))); + broker.getExchanges().registerExchange(updateDataExchange); // Load my store status before we go into initialization if (! broker::NullMessageStore::isNullStore(&broker.getStore())) { @@ -931,11 +932,15 @@ void Cluster::checkUpdateIn(Lock& l) { // NB: don't updateMgmtMembership() here as we are not in the deliver // thread. It will be updated on delivery of the "ready" we just mcast. broker.setClusterUpdatee(false); - if (mAgent) mAgent->suppress(false); // Enable management output. + if (mAgent) { + // Update management agent now, after all update activity is complete. + updateDataExchange->updateManagementAgent(mAgent); + mAgent->suppress(false); // Enable management output. + mAgent->clusterUpdate(); + } discarding = false; // OK to set, we're stalled for update. QPID_LOG(notice, *this << " update complete, starting catch-up."); QPID_LOG(debug, debugSnapshot()); // OK to call because we're stalled. - if (mAgent) mAgent->clusterUpdate(); enableClusterSafe(); // Enable cluster-safe assertions deliverEventQueue.start(); } diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index c046874f1c..8f73c6acca 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -68,6 +68,7 @@ namespace cluster { class Connection; class EventFrame; class ClusterTimer; +class UpdateDataExchange; /** * Connection to the cluster @@ -255,6 +256,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { PollableEventQueue deliverEventQueue; PollableFrameQueue deliverFrameQueue; boost::shared_ptr<FailoverExchange> failoverExchange; + boost::shared_ptr<UpdateDataExchange> updateDataExchange; Quorum quorum; LockedConnectionMap localConnections; diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 7406d64bda..2a98800484 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -463,15 +463,15 @@ void Connection::membership(const FieldTable& joiners, const FieldTable& members const framing::SequenceNumber& frameSeq) { QPID_LOG(debug, cluster << " incoming update complete on connection " << *this); - cluster.updateInDone(ClusterMap(joiners, members, frameSeq)); updateIn.consumerNumbering.clear(); closeUpdated(); + cluster.updateInDone(ClusterMap(joiners, members, frameSeq)); } void Connection::retractOffer() { QPID_LOG(info, cluster << " incoming update retracted on connection " << *this); - cluster.updateInRetracted(); closeUpdated(); + cluster.updateInRetracted(); } void Connection::closeUpdated() { diff --git a/cpp/src/qpid/cluster/UpdateDataExchange.cpp b/cpp/src/qpid/cluster/UpdateDataExchange.cpp index 90a53f5531..2f242b3024 100644 --- a/cpp/src/qpid/cluster/UpdateDataExchange.cpp +++ b/cpp/src/qpid/cluster/UpdateDataExchange.cpp @@ -35,54 +35,37 @@ const std::string UpdateDataExchange::MANAGEMENT_AGENTS_KEY("management-agents") const std::string UpdateDataExchange::MANAGEMENT_SCHEMAS_KEY("management-schemas"); const std::string UpdateDataExchange::MANAGEMENT_DELETED_OBJECTS_KEY("management-deleted-objects"); -UpdateDataExchange::UpdateDataExchange(management::Manageable* parent, - management::ManagementAgent* agent_) : - Exchange(EXCHANGE_NAME, parent), - agent(agent_) +UpdateDataExchange::UpdateDataExchange(management::Manageable* parent) : + Exchange(EXCHANGE_NAME, parent) {} void UpdateDataExchange::route(broker::Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* ) { std::string data = msg.getMessage().getFrames().getContent(); - if (routingKey == MANAGEMENT_AGENTS_KEY) - managementAgents(data); - else if (routingKey == MANAGEMENT_SCHEMAS_KEY) - managementSchemas(data); - else if (routingKey == MANAGEMENT_DELETED_OBJECTS_KEY) - managementDeletedObjects(data); - else - throw Exception( - QPID_MSG("Cluster update-data exchange received unknown routing-key: " - << routingKey)); + if (routingKey == MANAGEMENT_AGENTS_KEY) managementAgents = data; + else if (routingKey == MANAGEMENT_SCHEMAS_KEY) managementSchemas = data; + else if (routingKey == MANAGEMENT_DELETED_OBJECTS_KEY) managementDeletedObjects = data; + else throw Exception( + QPID_MSG("Cluster update-data exchange received unknown routing-key: " + << routingKey)); } -void UpdateDataExchange::managementAgents(const std::string& data) { - if (!agent) - throw Exception( - QPID_MSG("Received management agent update but management is disabled.")); - framing::Buffer buf(const_cast<char*>(data.data()), data.size()); - agent->importAgents(buf); +void UpdateDataExchange::updateManagementAgent(management::ManagementAgent* agent) { + if (!agent) return; + + framing::Buffer buf1(const_cast<char*>(managementAgents.data()), managementAgents.size()); + agent->importAgents(buf1); QPID_LOG(debug, " Updated management agents."); -} -void UpdateDataExchange::managementSchemas(const std::string& data) { - if (!agent) - throw Exception( - QPID_MSG("Received management schema update but management is disabled.")); - framing::Buffer buf(const_cast<char*>(data.data()), data.size()); - agent->importSchemas(buf); + framing::Buffer buf2(const_cast<char*>(managementSchemas.data()), managementSchemas.size()); + agent->importSchemas(buf2); QPID_LOG(debug, " Updated management schemas"); -} -void UpdateDataExchange::managementDeletedObjects(const std::string& data) { using amqp_0_10::ListCodec; using types::Variant; - if (!agent) - throw Exception( - QPID_MSG("Management agent update but management not enabled.")); Variant::List encoded; - ListCodec::decode(data, encoded); + ListCodec::decode(managementDeletedObjects, encoded); management::ManagementAgent::DeletedObjectList objects; for (Variant::List::iterator i = encoded.begin(); i != encoded.end(); ++i) { objects.push_back(management::ManagementAgent::DeletedObject::shared_ptr( diff --git a/cpp/src/qpid/cluster/UpdateDataExchange.h b/cpp/src/qpid/cluster/UpdateDataExchange.h index 1c4022a4aa..27a98548f3 100644 --- a/cpp/src/qpid/cluster/UpdateDataExchange.h +++ b/cpp/src/qpid/cluster/UpdateDataExchange.h @@ -45,7 +45,7 @@ class UpdateDataExchange : public broker::Exchange static const std::string MANAGEMENT_SCHEMAS_KEY; static const std::string MANAGEMENT_DELETED_OBJECTS_KEY; - UpdateDataExchange(management::Manageable* parent, management::ManagementAgent*); + UpdateDataExchange(management::Manageable* parent); void route(broker::Deliverable& msg, const std::string& routingKey, const framing::FieldTable* args); @@ -68,12 +68,13 @@ class UpdateDataExchange : public broker::Exchange const qpid::framing::FieldTable*) { return false; } + void updateManagementAgent(management::ManagementAgent* agent); + private: - management::ManagementAgent* agent; - void managementAgents(const std::string&); - void managementSchemas(const std::string&); - void managementDeletedObjects(const std::string&); + std::string managementAgents; + std::string managementSchemas; + std::string managementDeletedObjects; }; }} // namespace qpid::cluster diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp index 9d9cfb5164..900f80fce6 100644 --- a/cpp/src/qpid/management/ManagementAgent.cpp +++ b/cpp/src/qpid/management/ManagementAgent.cpp @@ -2940,29 +2940,27 @@ void ManagementAgent::exportDeletedObjects(DeletedObjectList& outList) } } - -// Merge this list's deleted objects to the management Agent's list of deleted -// objects waiting for next (last) publish-ment. +// Called by cluster to reset the management agent's list of deleted +// objects to match the rest of the cluster. void ManagementAgent::importDeletedObjects(const DeletedObjectList& inList) { sys::Mutex::ScopedLock lock (userLock); - + // Clear out any existing deleted objects + moveNewObjectsLH(); + pendingDeletedObjs.clear(); + ManagementObjectMap::iterator i = managementObjects.begin(); + while (i != managementObjects.end()) { + ManagementObject* object = i->second; + if (object->isDeleted()) { + delete object; + managementObjects.erase(i++); + } + else ++i; + } for (DeletedObjectList::const_iterator lIter = inList.begin(); lIter != inList.end(); lIter++) { std::string classkey((*lIter)->packageName + std::string(":") + (*lIter)->className); - DeletedObjectList& dList = pendingDeletedObjs[classkey]; - - // not sure if this is necessary - merge by objectid.... - bool found = false; - for (DeletedObjectList::iterator dIter = dList.begin(); dIter != dList.end(); dIter++) { - if ((*dIter)->objectId == (*lIter)->objectId) { - found = true; - break; - } - } - if (!found) { - dList.push_back(*lIter); - } + pendingDeletedObjs[classkey].push_back(*lIter); } } |