diff options
Diffstat (limited to 'cpp/src/qpid/cluster/UpdateClient.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.cpp | 46 |
1 files changed, 25 insertions, 21 deletions
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp index c52caf6aa9..6b324be4c5 100644 --- a/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -78,6 +78,10 @@ using namespace framing; namespace arg=client::arg; using client::SessionBase_0_10Access; +std::ostream& operator<<(std::ostream& o, const UpdateClient& c) { + return o << "cluster(" << c.updaterId << " UPDATER)"; +} + struct ClusterConnectionProxy : public AMQP_AllProxy::ClusterConnection, public framing::FrameHandler { boost::shared_ptr<qpid::client::ConnectionImpl> connection; @@ -142,7 +146,7 @@ void UpdateClient::run() { } void UpdateClient::update() { - QPID_LOG(debug, updaterId << " updating state to " << updateeId + QPID_LOG(debug, *this << " updating state to " << updateeId << " at " << updateeUrl); Broker& b = updaterBroker; @@ -177,14 +181,14 @@ void UpdateClient::update() { // NOTE: connection will be closed from the other end, don't close // it here as that causes a race. - // FIXME aconway 2010-03-15: This sleep avoids the race condition + // TODO aconway 2010-03-15: This sleep avoids the race condition // described in // https://bugzilla.redhat.com/show_bug.cgi?id=568831. // It allows the connection to fully close before destroying the // Connection object. Remove when the bug is fixed. // sys::usleep(10*1000); - QPID_LOG(debug, updaterId << " update completed to " << updateeId + QPID_LOG(debug, *this << " update completed to " << updateeId << " at " << updateeUrl << ": " << membership); } @@ -205,7 +209,7 @@ void UpdateClient::updateManagementSetupState() management::ManagementAgent* agent = updaterBroker.getManagementAgent(); if (!agent) return; - QPID_LOG(debug, updaterId << " updating management setup-state."); + QPID_LOG(debug, *this << " updating management setup-state."); std::string vendor, product, instance; agent->getName(vendor, product, instance); ClusterConnectionProxy(session).managementSetupState( @@ -219,19 +223,19 @@ void UpdateClient::updateManagementAgent() if (!agent) return; string data; - QPID_LOG(debug, updaterId << " updating management schemas. ") + QPID_LOG(debug, *this << " updating management schemas. ") agent->exportSchemas(data); session.messageTransfer( arg::content=client::Message(data, UpdateDataExchange::MANAGEMENT_SCHEMAS_KEY), arg::destination=UpdateDataExchange::EXCHANGE_NAME); - QPID_LOG(debug, updaterId << " updating management agents. ") + QPID_LOG(debug, *this << " updating management agents. ") agent->exportAgents(data); session.messageTransfer( arg::content=client::Message(data, UpdateDataExchange::MANAGEMENT_AGENTS_KEY), arg::destination=UpdateDataExchange::EXCHANGE_NAME); - QPID_LOG(debug, updaterId << " updating management deleted objects. ") + QPID_LOG(debug, *this << " updating management deleted objects. ") typedef management::ManagementAgent::DeletedObjectList DeletedObjectList; DeletedObjectList deleted; agent->exportDeletedObjects(deleted); @@ -248,7 +252,7 @@ void UpdateClient::updateManagementAgent() } void UpdateClient::updateExchange(const boost::shared_ptr<Exchange>& ex) { - QPID_LOG(debug, updaterId << " updating exchange " << ex->getName()); + QPID_LOG(debug, *this << " updating exchange " << ex->getName()); ClusterConnectionProxy(session).exchange(encode(*ex)); } @@ -341,13 +345,13 @@ void UpdateClient::updateQueue(client::AsyncSession& s, const boost::shared_ptr< } void UpdateClient::updateExclusiveQueue(const boost::shared_ptr<broker::Queue>& q) { - QPID_LOG(debug, updaterId << " updating exclusive queue " << q->getName() << " on " << shadowSession.getId()); + QPID_LOG(debug, *this << " updating exclusive queue " << q->getName() << " on " << shadowSession.getId()); updateQueue(shadowSession, q); } void UpdateClient::updateNonExclusiveQueue(const boost::shared_ptr<broker::Queue>& q) { if (!q->hasExclusiveOwner()) { - QPID_LOG(debug, updaterId << " updating queue " << q->getName()); + QPID_LOG(debug, *this << " updating queue " << q->getName()); updateQueue(session, q); }//else queue will be updated as part of session state of owning session } @@ -362,12 +366,12 @@ void UpdateClient::updateOutputTask(const sys::OutputTask* task) { SemanticState::ConsumerImpl* ci = const_cast<SemanticState::ConsumerImpl*>(cci); uint16_t channel = ci->getParent().getSession().getChannel(); ClusterConnectionProxy(shadowConnection).outputTask(channel, ci->getName()); - QPID_LOG(debug, updaterId << " updating output task " << ci->getName() + QPID_LOG(debug, *this << " updating output task " << ci->getName() << " channel=" << channel); } void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& updateConnection) { - QPID_LOG(debug, updaterId << " updating connection " << *updateConnection); + QPID_LOG(debug, *this << " updating connection " << *updateConnection); assert(updateConnection->getBrokerConnection()); broker::Connection& bc = *updateConnection->getBrokerConnection(); @@ -398,14 +402,14 @@ void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& upda updateConnection->getOutput().getSendMax() ); shadowConnection.close(); - QPID_LOG(debug, updaterId << " updated connection " << *updateConnection); + QPID_LOG(debug, *this << " updated connection " << *updateConnection); } void UpdateClient::updateSession(broker::SessionHandler& sh) { broker::SessionState* ss = sh.getSession(); if (!ss) return; // no session. - QPID_LOG(debug, updaterId << " updating session " << ss->getId()); + QPID_LOG(debug, *this << " updating session " << ss->getId()); // Create a client session to update session state. boost::shared_ptr<client::ConnectionImpl> cimpl = client::ConnectionAccess::getImpl(shadowConnection); @@ -416,14 +420,14 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) { // Re-create session state on remote connection. - QPID_LOG(debug, updaterId << " updating exclusive queues."); + QPID_LOG(debug, *this << " updating exclusive queues."); ss->getSessionAdapter().eachExclusiveQueue(boost::bind(&UpdateClient::updateExclusiveQueue, this, _1)); - QPID_LOG(debug, updaterId << " updating consumers."); + QPID_LOG(debug, *this << " updating consumers."); ss->getSemanticState().eachConsumer( boost::bind(&UpdateClient::updateConsumer, this, _1)); - QPID_LOG(debug, updaterId << " updating unacknowledged messages."); + QPID_LOG(debug, *this << " updating unacknowledged messages."); broker::DeliveryRecords& drs = ss->getSemanticState().getUnacked(); std::for_each(drs.begin(), drs.end(), boost::bind(&UpdateClient::updateUnacked, this, _1)); @@ -454,13 +458,13 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) { if (inProgress) { inProgress->getFrames().map(simpl->out); } - QPID_LOG(debug, updaterId << " updated session " << sh.getSession()->getId()); + QPID_LOG(debug, *this << " updated session " << sh.getSession()->getId()); } void UpdateClient::updateConsumer( const broker::SemanticState::ConsumerImpl::shared_ptr& ci) { - QPID_LOG(debug, updaterId << " updating consumer " << ci->getName() << " on " + QPID_LOG(debug, *this << " updating consumer " << ci->getName() << " on " << shadowSession.getId()); using namespace message; @@ -485,7 +489,7 @@ void UpdateClient::updateConsumer( ); consumerNumbering.add(ci); - QPID_LOG(debug, updaterId << " updated consumer " << ci->getName() + QPID_LOG(debug, *this << " updated consumer " << ci->getName() << " on " << shadowSession.getId()); } @@ -552,7 +556,7 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater { }; void UpdateClient::updateTxState(broker::SemanticState& s) { - QPID_LOG(debug, updaterId << " updating TX transaction state."); + QPID_LOG(debug, *this << " updating TX transaction state."); ClusterConnectionProxy proxy(shadowSession); proxy.accumulatedAck(s.getAccumulatedAck()); broker::TxBuffer::shared_ptr txBuffer = s.getTxBuffer(); |