summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/ha/Primary.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2014-08-19 22:34:15 +0000
committerAlan Conway <aconway@apache.org>2014-08-19 22:34:15 +0000
commit269580b80a3242f7fb6e7dbfff9859e43bbad8fd (patch)
tree9f8fa5faa01ff9234cb5570e087bf1664b10c876 /qpid/cpp/src/qpid/ha/Primary.cpp
parent451bda18227dccc91a08fe1ade559d0f91be932d (diff)
downloadqpid-python-269580b80a3242f7fb6e7dbfff9859e43bbad8fd.tar.gz
QPID-6020: HA logging improvements - log prefix with status and ID.
Include broker status and ID in (almost) all logging messages. Makes it much easier to track broker state and interactions. Sundry other logging improvements including: - Demote noisy messages to trace - connections from rgmanager status checks, searching for primary. - Rationalise start-up messages. - Improved queue state detail replicating subscription and queue guard initialization. - Fail to prepare TX is error. - Collect all primary TX errors into one. - Fix status of catchup brokers in primary membership for logging. - Add process name/PID info to client connection messages. - Various minor message tweaks. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1619003 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/ha/Primary.cpp')
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.cpp64
1 files changed, 41 insertions, 23 deletions
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp
index dd41f74790..3790d14626 100644
--- a/qpid/cpp/src/qpid/ha/Primary.cpp
+++ b/qpid/cpp/src/qpid/ha/Primary.cpp
@@ -38,6 +38,7 @@
#include "qpid/framing/FieldValue.h"
#include "qpid/log/Statement.h"
#include "qpid/types/Uuid.h"
+#include "qpid/types/Variant.h"
#include "qpid/sys/Timer.h"
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
@@ -54,6 +55,10 @@ using namespace framing;
namespace {
+const std::string CLIENT_PROCESS_NAME("qpid.client_process");
+const std::string CLIENT_PID("qpid.client_pid");
+const std::string CLIENT_PPID("qpid.client_ppid");
+
class PrimaryConnectionObserver : public broker::ConnectionObserver
{
public:
@@ -90,7 +95,7 @@ class ExpectedBackupTimerTask : public sys::TimerTask {
class PrimaryErrorListener : public broker::SessionHandler::ErrorListener {
public:
- PrimaryErrorListener(const std::string& logPrefix_) : logPrefix(logPrefix_) {}
+ PrimaryErrorListener(const LogPrefix& lp) : logPrefix(lp) {}
void connectionException(framing::connection::CloseCode code, const std::string& msg) {
QPID_LOG(debug, logPrefix << framing::createConnectionException(code, msg).what());
@@ -104,17 +109,15 @@ class PrimaryErrorListener : public broker::SessionHandler::ErrorListener {
void incomingExecutionException(framing::execution::ErrorCode code, const std::string& msg) {
QPID_LOG(debug, logPrefix << "Incoming " << framing::createSessionException(code, msg).what());
}
- void detach() {
- QPID_LOG(debug, logPrefix << "Session detached.");
- }
+ void detach() {}
private:
- std::string logPrefix;
+ const LogPrefix& logPrefix;
};
class PrimarySessionHandlerObserver : public broker::SessionHandlerObserver {
public:
- PrimarySessionHandlerObserver(const std::string& logPrefix)
+ PrimarySessionHandlerObserver(const LogPrefix& logPrefix)
: errorListener(new PrimaryErrorListener(logPrefix)) {}
void newSessionHandler(broker::SessionHandler& sh) {
BrokerInfo info;
@@ -133,7 +136,7 @@ class PrimarySessionHandlerObserver : public broker::SessionHandlerObserver {
Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
haBroker(hb), membership(hb.getMembership()),
- logPrefix("Primary: "), active(false),
+ logPrefix(hb.logPrefix), active(false),
replicationTest(hb.getSettings().replicateDefault.get()),
sessionHandlerObserver(new PrimarySessionHandlerObserver(logPrefix)),
queueLimits(logPrefix, hb.getBroker().getQueues(), replicationTest)
@@ -142,6 +145,7 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
// So we are safe from client interference while we set up the primary.
hb.getMembership().setStatus(RECOVERING);
+ QPID_LOG(notice, logPrefix << "Promoted to primary");
// Process all QueueReplicators, handles auto-delete queues.
QueueReplicator::Vector qrs;
@@ -152,10 +156,9 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
// NOTE: RemoteBackups must be created before we set the BrokerObserver
// or ConnectionObserver so that there is no client activity while
// the QueueGuards are created.
- QPID_LOG(notice, logPrefix << "Promoted and recovering, waiting for backups: "
- << expect);
+ QPID_LOG(notice, logPrefix << "Recovering backups: " << expect);
for (BrokerInfo::Set::const_iterator i = expect.begin(); i != expect.end(); ++i) {
- boost::shared_ptr<RemoteBackup> backup(new RemoteBackup(*i, 0));
+ boost::shared_ptr<RemoteBackup> backup(new RemoteBackup(*i, 0, logPrefix));
backups[i->getSystemId()] = backup;
if (!backup->isReady()) expectedBackups.insert(backup);
setCatchupQueues(backup, true); // Create guards
@@ -173,7 +176,7 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
// Allow client connections
connectionObserver.reset(new PrimaryConnectionObserver(*this));
- haBroker.getObserver()->setObserver(connectionObserver, logPrefix);
+ haBroker.getObserver()->setObserver(connectionObserver);
}
Primary::~Primary() {
@@ -191,8 +194,8 @@ void Primary::checkReady() {
activate = active = true;
}
if (activate) {
- QPID_LOG(notice, logPrefix << "Promoted and active.");
membership.setStatus(ACTIVE); // Outside of lock.
+ QPID_LOG(notice, logPrefix << "All backups recovered.");
}
}
@@ -205,7 +208,7 @@ void Primary::checkReady(boost::shared_ptr<RemoteBackup> backup) {
info.setStatus(READY);
membership.add(info);
if (expectedBackups.erase(backup)) {
- QPID_LOG(info, logPrefix << "Expected backup is ready: " << info);
+ QPID_LOG(info, logPrefix << "Recovering backup is ready: " << info);
ready = true;
}
else
@@ -229,7 +232,7 @@ void Primary::timeoutExpectedBackups() {
boost::shared_ptr<RemoteBackup> backup = *j;
if (!backup->getConnection()) {
BrokerInfo info = backup->getBrokerInfo();
- QPID_LOG(error, logPrefix << "Expected backup timed out: " << info);
+ QPID_LOG(error, logPrefix << "Recovering backup timed out: " << info);
backupDisconnect(backup, l); // Calls erase(j)
// Keep broker in membership but downgrade status to CATCHUP.
// The broker will get this status change when it eventually connects.
@@ -303,6 +306,8 @@ void Primary::queueCreate(const QueuePtr& q) {
ReplicateLevel level = replicationTest.useLevel(*q);
q->addArgument(QPID_REPLICATE, printable(level).str());
if (level) {
+ QPID_LOG(debug, logPrefix << "Created queue " << q->getName()
+ << " replication: " << printable(level));
// Give each queue a unique id. Used by backups to avoid confusion of
// same-named queues.
q->addArgument(QPID_HA_UUID, types::Variant(Uuid(true)));
@@ -312,8 +317,6 @@ void Primary::queueCreate(const QueuePtr& q) {
for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i)
i->second->queueCreate(q);
}
- QPID_LOG(debug, logPrefix << "Created queue " << q->getName()
- << " replication: " << printable(level));
checkReady(); // Outside lock
}
}
@@ -358,7 +361,7 @@ void Primary::exchangeDestroy(const ExchangePtr& ex) {
shared_ptr<RemoteBackup> Primary::backupConnect(
const BrokerInfo& info, broker::Connection& connection, Mutex::ScopedLock&)
{
- shared_ptr<RemoteBackup> backup(new RemoteBackup(info, &connection));
+ shared_ptr<RemoteBackup> backup(new RemoteBackup(info, &connection, logPrefix));
queueLimits.addBackup(backup);
backups[info.getSystemId()] = backup;
return backup;
@@ -382,7 +385,15 @@ void Primary::opened(broker::Connection& connection) {
if (ha::ConnectionObserver::getBrokerInfo(connection, info)) {
Mutex::ScopedLock l(lock);
BackupMap::iterator i = backups.find(info.getSystemId());
+ if (info.getStatus() == JOINING) {
+ info.setStatus(CATCHUP);
+ membership.add(info);
+ }
if (i == backups.end()) {
+ if (info.getStatus() == JOINING) {
+ info.setStatus(CATCHUP);
+ membership.add(info);
+ }
QPID_LOG(info, logPrefix << "New backup connection: " << info);
backup = backupConnect(info, connection, l);
}
@@ -397,13 +408,20 @@ void Primary::opened(broker::Connection& connection) {
i->second->setConnection(&connection);
backup = i->second;
}
- if (info.getStatus() == JOINING) {
- info.setStatus(CATCHUP);
- membership.add(info);
+ }
+ else {
+ const types::Variant::Map& properties = connection.getClientProperties();
+ std::ostringstream pinfo;
+ types::Variant::Map::const_iterator i = properties.find(CLIENT_PROCESS_NAME);
+ // FIXME aconway 2014-08-13: Conditional on logging.
+ if (i != properties.end()) {
+ pinfo << " " << i->second;
+ i = properties.find(CLIENT_PID);
+ if (i != properties.end())
+ pinfo << "(" << i->second << ")";
}
+ QPID_LOG(info, logPrefix << "Accepted client connection " << connection.getMgmtId() << pinfo.str());
}
- else
- QPID_LOG(debug, logPrefix << "Accepted client connection " << connection.getMgmtId());
// Outside lock
if (backup) {
@@ -448,7 +466,7 @@ boost::shared_ptr<QueueGuard> Primary::getGuard(const QueuePtr& q, const BrokerI
}
Role* Primary::promote() {
- QPID_LOG(info, "Ignoring promotion, already primary: " << haBroker.getBrokerInfo());
+ QPID_LOG(info, logPrefix << "Ignoring promotion, already primary");
return 0;
}