summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/UpdateClient.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/UpdateClient.cpp')
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.cpp46
1 files changed, 25 insertions, 21 deletions
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp
index c52caf6aa9..6b324be4c5 100644
--- a/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -78,6 +78,10 @@ using namespace framing;
namespace arg=client::arg;
using client::SessionBase_0_10Access;
+std::ostream& operator<<(std::ostream& o, const UpdateClient& c) {
+ return o << "cluster(" << c.updaterId << " UPDATER)";
+}
+
struct ClusterConnectionProxy : public AMQP_AllProxy::ClusterConnection, public framing::FrameHandler
{
boost::shared_ptr<qpid::client::ConnectionImpl> connection;
@@ -142,7 +146,7 @@ void UpdateClient::run() {
}
void UpdateClient::update() {
- QPID_LOG(debug, updaterId << " updating state to " << updateeId
+ QPID_LOG(debug, *this << " updating state to " << updateeId
<< " at " << updateeUrl);
Broker& b = updaterBroker;
@@ -177,14 +181,14 @@ void UpdateClient::update() {
// NOTE: connection will be closed from the other end, don't close
// it here as that causes a race.
- // FIXME aconway 2010-03-15: This sleep avoids the race condition
+ // TODO aconway 2010-03-15: This sleep avoids the race condition
// described in // https://bugzilla.redhat.com/show_bug.cgi?id=568831.
// It allows the connection to fully close before destroying the
// Connection object. Remove when the bug is fixed.
//
sys::usleep(10*1000);
- QPID_LOG(debug, updaterId << " update completed to " << updateeId
+ QPID_LOG(debug, *this << " update completed to " << updateeId
<< " at " << updateeUrl << ": " << membership);
}
@@ -205,7 +209,7 @@ void UpdateClient::updateManagementSetupState()
management::ManagementAgent* agent = updaterBroker.getManagementAgent();
if (!agent) return;
- QPID_LOG(debug, updaterId << " updating management setup-state.");
+ QPID_LOG(debug, *this << " updating management setup-state.");
std::string vendor, product, instance;
agent->getName(vendor, product, instance);
ClusterConnectionProxy(session).managementSetupState(
@@ -219,19 +223,19 @@ void UpdateClient::updateManagementAgent()
if (!agent) return;
string data;
- QPID_LOG(debug, updaterId << " updating management schemas. ")
+ QPID_LOG(debug, *this << " updating management schemas. ")
agent->exportSchemas(data);
session.messageTransfer(
arg::content=client::Message(data, UpdateDataExchange::MANAGEMENT_SCHEMAS_KEY),
arg::destination=UpdateDataExchange::EXCHANGE_NAME);
- QPID_LOG(debug, updaterId << " updating management agents. ")
+ QPID_LOG(debug, *this << " updating management agents. ")
agent->exportAgents(data);
session.messageTransfer(
arg::content=client::Message(data, UpdateDataExchange::MANAGEMENT_AGENTS_KEY),
arg::destination=UpdateDataExchange::EXCHANGE_NAME);
- QPID_LOG(debug, updaterId << " updating management deleted objects. ")
+ QPID_LOG(debug, *this << " updating management deleted objects. ")
typedef management::ManagementAgent::DeletedObjectList DeletedObjectList;
DeletedObjectList deleted;
agent->exportDeletedObjects(deleted);
@@ -248,7 +252,7 @@ void UpdateClient::updateManagementAgent()
}
void UpdateClient::updateExchange(const boost::shared_ptr<Exchange>& ex) {
- QPID_LOG(debug, updaterId << " updating exchange " << ex->getName());
+ QPID_LOG(debug, *this << " updating exchange " << ex->getName());
ClusterConnectionProxy(session).exchange(encode(*ex));
}
@@ -341,13 +345,13 @@ void UpdateClient::updateQueue(client::AsyncSession& s, const boost::shared_ptr<
}
void UpdateClient::updateExclusiveQueue(const boost::shared_ptr<broker::Queue>& q) {
- QPID_LOG(debug, updaterId << " updating exclusive queue " << q->getName() << " on " << shadowSession.getId());
+ QPID_LOG(debug, *this << " updating exclusive queue " << q->getName() << " on " << shadowSession.getId());
updateQueue(shadowSession, q);
}
void UpdateClient::updateNonExclusiveQueue(const boost::shared_ptr<broker::Queue>& q) {
if (!q->hasExclusiveOwner()) {
- QPID_LOG(debug, updaterId << " updating queue " << q->getName());
+ QPID_LOG(debug, *this << " updating queue " << q->getName());
updateQueue(session, q);
}//else queue will be updated as part of session state of owning session
}
@@ -362,12 +366,12 @@ void UpdateClient::updateOutputTask(const sys::OutputTask* task) {
SemanticState::ConsumerImpl* ci = const_cast<SemanticState::ConsumerImpl*>(cci);
uint16_t channel = ci->getParent().getSession().getChannel();
ClusterConnectionProxy(shadowConnection).outputTask(channel, ci->getName());
- QPID_LOG(debug, updaterId << " updating output task " << ci->getName()
+ QPID_LOG(debug, *this << " updating output task " << ci->getName()
<< " channel=" << channel);
}
void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& updateConnection) {
- QPID_LOG(debug, updaterId << " updating connection " << *updateConnection);
+ QPID_LOG(debug, *this << " updating connection " << *updateConnection);
assert(updateConnection->getBrokerConnection());
broker::Connection& bc = *updateConnection->getBrokerConnection();
@@ -398,14 +402,14 @@ void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& upda
updateConnection->getOutput().getSendMax()
);
shadowConnection.close();
- QPID_LOG(debug, updaterId << " updated connection " << *updateConnection);
+ QPID_LOG(debug, *this << " updated connection " << *updateConnection);
}
void UpdateClient::updateSession(broker::SessionHandler& sh) {
broker::SessionState* ss = sh.getSession();
if (!ss) return; // no session.
- QPID_LOG(debug, updaterId << " updating session " << ss->getId());
+ QPID_LOG(debug, *this << " updating session " << ss->getId());
// Create a client session to update session state.
boost::shared_ptr<client::ConnectionImpl> cimpl = client::ConnectionAccess::getImpl(shadowConnection);
@@ -416,14 +420,14 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) {
// Re-create session state on remote connection.
- QPID_LOG(debug, updaterId << " updating exclusive queues.");
+ QPID_LOG(debug, *this << " updating exclusive queues.");
ss->getSessionAdapter().eachExclusiveQueue(boost::bind(&UpdateClient::updateExclusiveQueue, this, _1));
- QPID_LOG(debug, updaterId << " updating consumers.");
+ QPID_LOG(debug, *this << " updating consumers.");
ss->getSemanticState().eachConsumer(
boost::bind(&UpdateClient::updateConsumer, this, _1));
- QPID_LOG(debug, updaterId << " updating unacknowledged messages.");
+ QPID_LOG(debug, *this << " updating unacknowledged messages.");
broker::DeliveryRecords& drs = ss->getSemanticState().getUnacked();
std::for_each(drs.begin(), drs.end(),
boost::bind(&UpdateClient::updateUnacked, this, _1));
@@ -454,13 +458,13 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) {
if (inProgress) {
inProgress->getFrames().map(simpl->out);
}
- QPID_LOG(debug, updaterId << " updated session " << sh.getSession()->getId());
+ QPID_LOG(debug, *this << " updated session " << sh.getSession()->getId());
}
void UpdateClient::updateConsumer(
const broker::SemanticState::ConsumerImpl::shared_ptr& ci)
{
- QPID_LOG(debug, updaterId << " updating consumer " << ci->getName() << " on "
+ QPID_LOG(debug, *this << " updating consumer " << ci->getName() << " on "
<< shadowSession.getId());
using namespace message;
@@ -485,7 +489,7 @@ void UpdateClient::updateConsumer(
);
consumerNumbering.add(ci);
- QPID_LOG(debug, updaterId << " updated consumer " << ci->getName()
+ QPID_LOG(debug, *this << " updated consumer " << ci->getName()
<< " on " << shadowSession.getId());
}
@@ -552,7 +556,7 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater {
};
void UpdateClient::updateTxState(broker::SemanticState& s) {
- QPID_LOG(debug, updaterId << " updating TX transaction state.");
+ QPID_LOG(debug, *this << " updating TX transaction state.");
ClusterConnectionProxy proxy(shadowSession);
proxy.accumulatedAck(s.getAccumulatedAck());
broker::TxBuffer::shared_ptr txBuffer = s.getTxBuffer();