diff options
Diffstat (limited to 'cpp/src/qpid/cluster')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 16 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.cpp | 46 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/UpdateDataExchange.cpp | 16 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/UpdateDataExchange.h | 8 |
5 files changed, 54 insertions, 37 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 0013c370a7..5720f7fcc1 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -265,7 +265,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : "Error delivering frames", poller), failoverExchange(new FailoverExchange(broker.GetVhostObject(), &broker)), - updateDataExchange(new UpdateDataExchange(this)), + updateDataExchange(new UpdateDataExchange(*this)), quorum(boost::bind(&Cluster::leave, this)), decoder(boost::bind(&Cluster::deliverFrame, this, _1)), discarding(true), @@ -356,7 +356,7 @@ void Cluster::addLocalConnection(const boost::intrusive_ptr<Connection>& c) { // Called in connection thread to insert an updated shadow connection. void Cluster::addShadowConnection(const boost::intrusive_ptr<Connection>& c) { - QPID_LOG(info, *this << " new shadow connection " << c->getId()); + QPID_LOG(debug, *this << " new shadow connection " << c->getId()); // Safe to use connections here because we're pre-catchup, stalled // and discarding, so deliveredFrame is not processing any // connection events. @@ -749,7 +749,7 @@ struct AppendQueue { std::string Cluster::debugSnapshot() { assertClusterSafe(); std::ostringstream msg; - msg << "queue snapshot at " << map.getFrameSeq() << ":"; + msg << "Member joined, frameSeq=" << map.getFrameSeq() << ", queue snapshot:"; AppendQueue append(msg); broker.getQueues().eachQueue(append); return msg.str(); @@ -837,7 +837,7 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, Lock& l) checkUpdateIn(l); } else { - QPID_LOG(debug,*this << " unstall, ignore update " << updater + QPID_LOG(info, *this << " unstall, ignore update " << updater << " to " << updatee); deliverEventQueue.start(); // Not involved in update. } @@ -932,15 +932,15 @@ void Cluster::checkUpdateIn(Lock& l) { // NB: don't updateMgmtMembership() here as we are not in the deliver // thread. It will be updated on delivery of the "ready" we just mcast. broker.setClusterUpdatee(false); + discarding = false; // OK to set, we're stalled for update. + QPID_LOG(notice, *this << " update complete, starting catch-up."); + QPID_LOG(debug, debugSnapshot()); // OK to call because we're stalled. if (mAgent) { // Update management agent now, after all update activity is complete. updateDataExchange->updateManagementAgent(mAgent); mAgent->suppress(false); // Enable management output. mAgent->clusterUpdate(); } - discarding = false; // OK to set, we're stalled for update. - QPID_LOG(notice, *this << " update complete, starting catch-up."); - QPID_LOG(debug, debugSnapshot()); // OK to call because we're stalled. enableClusterSafe(); // Enable cluster-safe assertions deliverEventQueue.start(); } @@ -1111,7 +1111,7 @@ void Cluster::setClusterId(const Uuid& uuid, Lock&) { mgmtObject->set_clusterID(clusterId.str()); mgmtObject->set_memberID(stream.str()); } - QPID_LOG(debug, *this << " cluster-uuid = " << clusterId); + QPID_LOG(notice, *this << " cluster-uuid = " << clusterId); } void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) { diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp index c52caf6aa9..6b324be4c5 100644 --- a/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -78,6 +78,10 @@ using namespace framing; namespace arg=client::arg; using client::SessionBase_0_10Access; +std::ostream& operator<<(std::ostream& o, const UpdateClient& c) { + return o << "cluster(" << c.updaterId << " UPDATER)"; +} + struct ClusterConnectionProxy : public AMQP_AllProxy::ClusterConnection, public framing::FrameHandler { boost::shared_ptr<qpid::client::ConnectionImpl> connection; @@ -142,7 +146,7 @@ void UpdateClient::run() { } void UpdateClient::update() { - QPID_LOG(debug, updaterId << " updating state to " << updateeId + QPID_LOG(debug, *this << " updating state to " << updateeId << " at " << updateeUrl); Broker& b = updaterBroker; @@ -177,14 +181,14 @@ void UpdateClient::update() { // NOTE: connection will be closed from the other end, don't close // it here as that causes a race. - // FIXME aconway 2010-03-15: This sleep avoids the race condition + // TODO aconway 2010-03-15: This sleep avoids the race condition // described in // https://bugzilla.redhat.com/show_bug.cgi?id=568831. // It allows the connection to fully close before destroying the // Connection object. Remove when the bug is fixed. // sys::usleep(10*1000); - QPID_LOG(debug, updaterId << " update completed to " << updateeId + QPID_LOG(debug, *this << " update completed to " << updateeId << " at " << updateeUrl << ": " << membership); } @@ -205,7 +209,7 @@ void UpdateClient::updateManagementSetupState() management::ManagementAgent* agent = updaterBroker.getManagementAgent(); if (!agent) return; - QPID_LOG(debug, updaterId << " updating management setup-state."); + QPID_LOG(debug, *this << " updating management setup-state."); std::string vendor, product, instance; agent->getName(vendor, product, instance); ClusterConnectionProxy(session).managementSetupState( @@ -219,19 +223,19 @@ void UpdateClient::updateManagementAgent() if (!agent) return; string data; - QPID_LOG(debug, updaterId << " updating management schemas. ") + QPID_LOG(debug, *this << " updating management schemas. ") agent->exportSchemas(data); session.messageTransfer( arg::content=client::Message(data, UpdateDataExchange::MANAGEMENT_SCHEMAS_KEY), arg::destination=UpdateDataExchange::EXCHANGE_NAME); - QPID_LOG(debug, updaterId << " updating management agents. ") + QPID_LOG(debug, *this << " updating management agents. ") agent->exportAgents(data); session.messageTransfer( arg::content=client::Message(data, UpdateDataExchange::MANAGEMENT_AGENTS_KEY), arg::destination=UpdateDataExchange::EXCHANGE_NAME); - QPID_LOG(debug, updaterId << " updating management deleted objects. ") + QPID_LOG(debug, *this << " updating management deleted objects. ") typedef management::ManagementAgent::DeletedObjectList DeletedObjectList; DeletedObjectList deleted; agent->exportDeletedObjects(deleted); @@ -248,7 +252,7 @@ void UpdateClient::updateManagementAgent() } void UpdateClient::updateExchange(const boost::shared_ptr<Exchange>& ex) { - QPID_LOG(debug, updaterId << " updating exchange " << ex->getName()); + QPID_LOG(debug, *this << " updating exchange " << ex->getName()); ClusterConnectionProxy(session).exchange(encode(*ex)); } @@ -341,13 +345,13 @@ void UpdateClient::updateQueue(client::AsyncSession& s, const boost::shared_ptr< } void UpdateClient::updateExclusiveQueue(const boost::shared_ptr<broker::Queue>& q) { - QPID_LOG(debug, updaterId << " updating exclusive queue " << q->getName() << " on " << shadowSession.getId()); + QPID_LOG(debug, *this << " updating exclusive queue " << q->getName() << " on " << shadowSession.getId()); updateQueue(shadowSession, q); } void UpdateClient::updateNonExclusiveQueue(const boost::shared_ptr<broker::Queue>& q) { if (!q->hasExclusiveOwner()) { - QPID_LOG(debug, updaterId << " updating queue " << q->getName()); + QPID_LOG(debug, *this << " updating queue " << q->getName()); updateQueue(session, q); }//else queue will be updated as part of session state of owning session } @@ -362,12 +366,12 @@ void UpdateClient::updateOutputTask(const sys::OutputTask* task) { SemanticState::ConsumerImpl* ci = const_cast<SemanticState::ConsumerImpl*>(cci); uint16_t channel = ci->getParent().getSession().getChannel(); ClusterConnectionProxy(shadowConnection).outputTask(channel, ci->getName()); - QPID_LOG(debug, updaterId << " updating output task " << ci->getName() + QPID_LOG(debug, *this << " updating output task " << ci->getName() << " channel=" << channel); } void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& updateConnection) { - QPID_LOG(debug, updaterId << " updating connection " << *updateConnection); + QPID_LOG(debug, *this << " updating connection " << *updateConnection); assert(updateConnection->getBrokerConnection()); broker::Connection& bc = *updateConnection->getBrokerConnection(); @@ -398,14 +402,14 @@ void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& upda updateConnection->getOutput().getSendMax() ); shadowConnection.close(); - QPID_LOG(debug, updaterId << " updated connection " << *updateConnection); + QPID_LOG(debug, *this << " updated connection " << *updateConnection); } void UpdateClient::updateSession(broker::SessionHandler& sh) { broker::SessionState* ss = sh.getSession(); if (!ss) return; // no session. - QPID_LOG(debug, updaterId << " updating session " << ss->getId()); + QPID_LOG(debug, *this << " updating session " << ss->getId()); // Create a client session to update session state. boost::shared_ptr<client::ConnectionImpl> cimpl = client::ConnectionAccess::getImpl(shadowConnection); @@ -416,14 +420,14 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) { // Re-create session state on remote connection. - QPID_LOG(debug, updaterId << " updating exclusive queues."); + QPID_LOG(debug, *this << " updating exclusive queues."); ss->getSessionAdapter().eachExclusiveQueue(boost::bind(&UpdateClient::updateExclusiveQueue, this, _1)); - QPID_LOG(debug, updaterId << " updating consumers."); + QPID_LOG(debug, *this << " updating consumers."); ss->getSemanticState().eachConsumer( boost::bind(&UpdateClient::updateConsumer, this, _1)); - QPID_LOG(debug, updaterId << " updating unacknowledged messages."); + QPID_LOG(debug, *this << " updating unacknowledged messages."); broker::DeliveryRecords& drs = ss->getSemanticState().getUnacked(); std::for_each(drs.begin(), drs.end(), boost::bind(&UpdateClient::updateUnacked, this, _1)); @@ -454,13 +458,13 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) { if (inProgress) { inProgress->getFrames().map(simpl->out); } - QPID_LOG(debug, updaterId << " updated session " << sh.getSession()->getId()); + QPID_LOG(debug, *this << " updated session " << sh.getSession()->getId()); } void UpdateClient::updateConsumer( const broker::SemanticState::ConsumerImpl::shared_ptr& ci) { - QPID_LOG(debug, updaterId << " updating consumer " << ci->getName() << " on " + QPID_LOG(debug, *this << " updating consumer " << ci->getName() << " on " << shadowSession.getId()); using namespace message; @@ -485,7 +489,7 @@ void UpdateClient::updateConsumer( ); consumerNumbering.add(ci); - QPID_LOG(debug, updaterId << " updated consumer " << ci->getName() + QPID_LOG(debug, *this << " updated consumer " << ci->getName() << " on " << shadowSession.getId()); } @@ -552,7 +556,7 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater { }; void UpdateClient::updateTxState(broker::SemanticState& s) { - QPID_LOG(debug, updaterId << " updating TX transaction state."); + QPID_LOG(debug, *this << " updating TX transaction state."); ClusterConnectionProxy proxy(shadowSession); proxy.accumulatedAck(s.getAccumulatedAck()); broker::TxBuffer::shared_ptr txBuffer = s.getTxBuffer(); diff --git a/cpp/src/qpid/cluster/UpdateClient.h b/cpp/src/qpid/cluster/UpdateClient.h index be09af7e81..76621cd7ba 100644 --- a/cpp/src/qpid/cluster/UpdateClient.h +++ b/cpp/src/qpid/cluster/UpdateClient.h @@ -30,7 +30,7 @@ #include "qpid/broker/SemanticState.h" #include "qpid/sys/Runnable.h" #include <boost/shared_ptr.hpp> - +#include <iosfwd> namespace qpid { @@ -114,8 +114,11 @@ class UpdateClient : public sys::Runnable { boost::function<void()> done; boost::function<void(const std::exception& e)> failed; client::ConnectionSettings connectionSettings; + + friend std::ostream& operator<<(std::ostream&, const UpdateClient&); }; + }} // namespace qpid::cluster #endif /*!QPID_CLUSTER_UPDATECLIENT_H*/ diff --git a/cpp/src/qpid/cluster/UpdateDataExchange.cpp b/cpp/src/qpid/cluster/UpdateDataExchange.cpp index 2f242b3024..2a079b8881 100644 --- a/cpp/src/qpid/cluster/UpdateDataExchange.cpp +++ b/cpp/src/qpid/cluster/UpdateDataExchange.cpp @@ -19,6 +19,7 @@ * */ #include "UpdateDataExchange.h" +#include "Cluster.h" #include "qpid/amqp_0_10/Codecs.h" #include "qpid/broker/Deliverable.h" #include "qpid/broker/Message.h" @@ -35,8 +36,13 @@ const std::string UpdateDataExchange::MANAGEMENT_AGENTS_KEY("management-agents") const std::string UpdateDataExchange::MANAGEMENT_SCHEMAS_KEY("management-schemas"); const std::string UpdateDataExchange::MANAGEMENT_DELETED_OBJECTS_KEY("management-deleted-objects"); -UpdateDataExchange::UpdateDataExchange(management::Manageable* parent) : - Exchange(EXCHANGE_NAME, parent) +std::ostream& operator<<(std::ostream& o, const UpdateDataExchange& c) { + return o << "cluster(" << c.clusterId << " UPDATER)"; +} + +UpdateDataExchange::UpdateDataExchange(Cluster& cluster) : + Exchange(EXCHANGE_NAME, &cluster), + clusterId(cluster.getId()) {} void UpdateDataExchange::route(broker::Deliverable& msg, const std::string& routingKey, @@ -56,11 +62,11 @@ void UpdateDataExchange::updateManagementAgent(management::ManagementAgent* agen framing::Buffer buf1(const_cast<char*>(managementAgents.data()), managementAgents.size()); agent->importAgents(buf1); - QPID_LOG(debug, " Updated management agents."); + QPID_LOG(debug, *this << " updated management agents."); framing::Buffer buf2(const_cast<char*>(managementSchemas.data()), managementSchemas.size()); agent->importSchemas(buf2); - QPID_LOG(debug, " Updated management schemas"); + QPID_LOG(debug, *this << " updated management schemas."); using amqp_0_10::ListCodec; using types::Variant; @@ -72,7 +78,7 @@ void UpdateDataExchange::updateManagementAgent(management::ManagementAgent* agen new management::ManagementAgent::DeletedObject(*i))); } agent->importDeletedObjects(objects); - QPID_LOG(debug, " Updated management deleted objects."); + QPID_LOG(debug, *this << " updated management deleted objects."); } diff --git a/cpp/src/qpid/cluster/UpdateDataExchange.h b/cpp/src/qpid/cluster/UpdateDataExchange.h index 27a98548f3..8c493e400a 100644 --- a/cpp/src/qpid/cluster/UpdateDataExchange.h +++ b/cpp/src/qpid/cluster/UpdateDataExchange.h @@ -23,6 +23,8 @@ */ #include "qpid/broker/Exchange.h" +#include "types.h" +#include <iosfwd> namespace qpid { @@ -31,6 +33,7 @@ class ManagementAgent; } namespace cluster { +class Cluster; /** * An exchange used to send data that is to large for a control @@ -45,7 +48,7 @@ class UpdateDataExchange : public broker::Exchange static const std::string MANAGEMENT_SCHEMAS_KEY; static const std::string MANAGEMENT_DELETED_OBJECTS_KEY; - UpdateDataExchange(management::Manageable* parent); + UpdateDataExchange(Cluster& parent); void route(broker::Deliverable& msg, const std::string& routingKey, const framing::FieldTable* args); @@ -71,10 +74,11 @@ class UpdateDataExchange : public broker::Exchange void updateManagementAgent(management::ManagementAgent* agent); private: - + MemberId clusterId; std::string managementAgents; std::string managementSchemas; std::string managementDeletedObjects; + friend std::ostream& operator<<(std::ostream&, const UpdateDataExchange&); }; }} // namespace qpid::cluster |