summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/cpp/src/CMakeLists.txt20
-rw-r--r--qpid/cpp/src/qpid/broker/TxBuffer.cpp3
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.cpp8
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.h5
-rw-r--r--qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h8
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerInfo.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerInfo.h9
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp66
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.h3
-rw-r--r--qpid/cpp/src/qpid/ha/ConnectionObserver.cpp16
-rw-r--r--qpid/cpp/src/qpid/ha/ConnectionObserver.h5
-rw-r--r--qpid/cpp/src/qpid/ha/FailoverExchange.cpp3
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp30
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.h4
-rw-r--r--qpid/cpp/src/qpid/ha/HaPlugin.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/IdSetter.h12
-rw-r--r--qpid/cpp/src/qpid/ha/LogPrefix.cpp35
-rw-r--r--qpid/cpp/src/qpid/ha/LogPrefix.h75
-rw-r--r--qpid/cpp/src/qpid/ha/Membership.cpp25
-rw-r--r--qpid/cpp/src/qpid/ha/Membership.h3
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.cpp64
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.h4
-rw-r--r--qpid/cpp/src/qpid/ha/PrimaryQueueLimits.h5
-rw-r--r--qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp19
-rw-r--r--qpid/cpp/src/qpid/ha/PrimaryTxObserver.h5
-rw-r--r--qpid/cpp/src/qpid/ha/QueueGuard.cpp8
-rw-r--r--qpid/cpp/src/qpid/ha/QueueGuard.h5
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp29
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.h5
-rw-r--r--qpid/cpp/src/qpid/ha/RemoteBackup.cpp17
-rw-r--r--qpid/cpp/src/qpid/ha/RemoteBackup.h5
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp21
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.h3
-rw-r--r--qpid/cpp/src/qpid/ha/Role.h3
-rw-r--r--qpid/cpp/src/qpid/ha/StandAlone.h4
-rw-r--r--qpid/cpp/src/qpid/ha/TxReplicator.cpp30
-rw-r--r--qpid/cpp/src/qpid/ha/TxReplicator.h6
-rw-r--r--qpid/cpp/src/tests/qpid-txtest2.cpp4
38 files changed, 368 insertions, 203 deletions
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt
index ebdff57961..568cec84b8 100644
--- a/qpid/cpp/src/CMakeLists.txt
+++ b/qpid/cpp/src/CMakeLists.txt
@@ -522,7 +522,6 @@ option(BUILD_HA "Build Active-Passive HA plugin" ${ha_default})
if (BUILD_HA)
set (ha_SOURCES
- qpid/ha/QueueSnapshot.h
qpid/ha/AlternateExchangeSetter.h
qpid/ha/Backup.cpp
qpid/ha/Backup.h
@@ -533,24 +532,29 @@ if (BUILD_HA)
qpid/ha/BrokerReplicator.h
qpid/ha/ConnectionObserver.cpp
qpid/ha/ConnectionObserver.h
- qpid/ha/Event.cpp
- qpid/ha/Event.h
+ qpid/ha/Event.cpp
+ qpid/ha/Event.h
qpid/ha/FailoverExchange.cpp
qpid/ha/FailoverExchange.h
qpid/ha/HaBroker.cpp
qpid/ha/HaBroker.h
qpid/ha/HaPlugin.cpp
- qpid/ha/IdSetter.h
- qpid/ha/QueueSnapshot.h
+ qpid/ha/IdSetter.h
+ qpid/ha/LogPrefix.cpp
+ qpid/ha/LogPrefix.h
qpid/ha/Membership.cpp
qpid/ha/Membership.h
qpid/ha/Primary.cpp
qpid/ha/Primary.h
qpid/ha/PrimaryQueueLimits.h
+ qpid/ha/PrimaryTxObserver.cpp
+ qpid/ha/PrimaryTxObserver.h
qpid/ha/QueueGuard.cpp
qpid/ha/QueueGuard.h
qpid/ha/QueueReplicator.cpp
qpid/ha/QueueReplicator.h
+ qpid/ha/QueueSnapshot.h
+ qpid/ha/QueueSnapshot.h
qpid/ha/RemoteBackup.cpp
qpid/ha/RemoteBackup.h
qpid/ha/ReplicatingSubscription.cpp
@@ -564,11 +568,9 @@ if (BUILD_HA)
qpid/ha/StatusCheck.h
qpid/ha/TxReplicatingSubscription.cpp
qpid/ha/TxReplicatingSubscription.h
- qpid/ha/PrimaryTxObserver.cpp
- qpid/ha/PrimaryTxObserver.h
+ qpid/ha/TxReplicator.cpp
+ qpid/ha/TxReplicator.h
qpid/ha/types.cpp
- qpid/ha/TxReplicator.cpp
- qpid/ha/TxReplicator.h
qpid/ha/types.h
)
diff --git a/qpid/cpp/src/qpid/broker/TxBuffer.cpp b/qpid/cpp/src/qpid/broker/TxBuffer.cpp
index 6dc2f5c2f4..f7552f16a3 100644
--- a/qpid/cpp/src/qpid/broker/TxBuffer.cpp
+++ b/qpid/cpp/src/qpid/broker/TxBuffer.cpp
@@ -96,7 +96,8 @@ std::string TxBuffer::endCommit(TransactionalStore* const store) {
void TxBuffer::setError(const std::string& e) {
QPID_LOG(error, "Asynchronous transaction error: " << e);
sys::Mutex::ScopedLock l(errorLock);
- error = e;
+ if (!error.empty()) error += " ";
+ error += e;
}
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp
index 9d50b1c665..f1b6eadd75 100644
--- a/qpid/cpp/src/qpid/ha/Backup.cpp
+++ b/qpid/cpp/src/qpid/ha/Backup.cpp
@@ -49,7 +49,7 @@ using std::string;
using sys::Mutex;
Backup::Backup(HaBroker& hb, const Settings& s) :
- logPrefix("Backup: "), membership(hb.getMembership()), stopped(false),
+ logPrefix(hb.logPrefix), membership(hb.getMembership()), stopped(false),
haBroker(hb), broker(hb.getBroker()), settings(s),
statusCheck(new StatusCheck(hb))
{}
@@ -60,7 +60,7 @@ void Backup::setBrokerUrl(const Url& brokers) {
if (stopped) return;
if (haBroker.getStatus() == JOINING) statusCheck->setUrl(brokers);
if (!link) { // Not yet initialized
- QPID_LOG(info, logPrefix << "Connecting to cluster, broker URL: " << brokers);
+ QPID_LOG(info, logPrefix << "Connecting to cluster: " << brokers);
string protocol = brokers[0].protocol.empty() ? "tcp" : brokers[0].protocol;
types::Uuid uuid(true);
link = broker.getLinks().declare(
@@ -78,7 +78,6 @@ void Backup::setBrokerUrl(const Url& brokers) {
void Backup::stop(Mutex::ScopedLock&) {
if (stopped) return;
stopped = true;
- QPID_LOG(debug, logPrefix << "Leaving backup role.");
if (link) link->close();
if (replicator.get()) {
replicator->shutdown();
@@ -106,8 +105,7 @@ Role* Backup::promote() {
case JOINING:
if (statusCheck->canPromote()) return recover(l);
else {
- QPID_LOG(error,
- logPrefix << "Joining active cluster, cannot be promoted.");
+ QPID_LOG(error, logPrefix << "Joining active cluster, cannot be promoted.");
throw Exception("Joining active cluster, cannot be promoted.");
}
break;
diff --git a/qpid/cpp/src/qpid/ha/Backup.h b/qpid/cpp/src/qpid/ha/Backup.h
index 88194158ce..47c44aa59c 100644
--- a/qpid/cpp/src/qpid/ha/Backup.h
+++ b/qpid/cpp/src/qpid/ha/Backup.h
@@ -22,6 +22,7 @@
*
*/
+#include "LogPrefix.h"
#include "Role.h"
#include "Settings.h"
#include "qpid/Url.h"
@@ -53,8 +54,6 @@ class Backup : public Role
Backup(HaBroker&, const Settings&);
~Backup();
- std::string getLogPrefix() const { return logPrefix; }
-
void setBrokerUrl(const Url&);
Role* promote();
@@ -65,7 +64,7 @@ class Backup : public Role
void stop(sys::Mutex::ScopedLock&);
Role* recover(sys::Mutex::ScopedLock&);
- std::string logPrefix;
+ const LogPrefix& logPrefix;
Membership& membership;
sys::Mutex lock;
diff --git a/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h b/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h
index 5a67cde922..a58e666fa7 100644
--- a/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h
+++ b/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h
@@ -22,6 +22,7 @@
*
*/
+#include "LogPrefix.h"
#include "qpid/broker/ConnectionObserver.h"
#include "qpid/broker/Connection.h"
#include "qpid/log/Statement.h"
@@ -35,12 +36,17 @@ namespace ha {
class BackupConnectionExcluder : public broker::ConnectionObserver
{
public:
+ BackupConnectionExcluder(const LogPrefix& lp) : logPrefix(lp) {}
+
void opened(broker::Connection& connection) {
- QPID_LOG(debug, "Backup: Rejected connection "+connection.getMgmtId());
+ QPID_LOG(trace, logPrefix << "Rejected connection "+connection.getMgmtId());
connection.abort();
}
void closed(broker::Connection&) {}
+
+ private:
+ const LogPrefix& logPrefix;
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/BrokerInfo.cpp b/qpid/cpp/src/qpid/ha/BrokerInfo.cpp
index a13451e179..c8a652a7ab 100644
--- a/qpid/cpp/src/qpid/ha/BrokerInfo.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerInfo.cpp
@@ -92,7 +92,7 @@ void BrokerInfo::assign(const Variant::Map& m) {
}
std::ostream& BrokerInfo::printId(std::ostream& o) const {
- o << getSystemId().str().substr(0,8);
+ o << shortStr(getSystemId());
if (getAddress() != empty) o << "@" << getAddress();
return o;
}
diff --git a/qpid/cpp/src/qpid/ha/BrokerInfo.h b/qpid/cpp/src/qpid/ha/BrokerInfo.h
index 87d51f5113..92556a5c4b 100644
--- a/qpid/cpp/src/qpid/ha/BrokerInfo.h
+++ b/qpid/cpp/src/qpid/ha/BrokerInfo.h
@@ -63,9 +63,14 @@ class BrokerInfo
void assign(const types::Variant::Map&);
// So it can be put in a set.
- bool operator<(const BrokerInfo x) const { return systemId < x.systemId; }
+ bool operator<(const BrokerInfo& x) const { return systemId < x.systemId; }
- // Print just the identifying information, not the status.
+ bool operator==(const BrokerInfo& x) const
+ { return address == x.address && systemId == x.systemId && status == x.status; }
+
+ bool operator!=(const BrokerInfo& x) const { return !(*this == x); }
+
+ // Print just the identifying information (shortId@address), not the status.
std::ostream& printId(std::ostream& o) const;
private:
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index 3957ef5a0c..a62080932d 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -175,7 +175,7 @@ Variant::Map asMapVoid(const Variant& value) {
// Report errors on the broker replication session.
class BrokerReplicator::ErrorListener : public broker::SessionHandler::ErrorListener {
public:
- ErrorListener(const std::string& logPrefix_) : logPrefix(logPrefix_) {}
+ ErrorListener(const LogPrefix& lp) : logPrefix(lp) {}
void connectionException(framing::connection::CloseCode code, const std::string& msg) {
QPID_LOG(error, logPrefix << framing::createConnectionException(code, msg).what());
@@ -189,12 +189,10 @@ class BrokerReplicator::ErrorListener : public broker::SessionHandler::ErrorList
void incomingExecutionException(framing::execution::ErrorCode code, const std::string& msg) {
QPID_LOG(error, logPrefix << "Incoming " << framing::createSessionException(code, msg).what());
}
- void detach() {
- QPID_LOG(debug, logPrefix << "Session detached.");
- }
+ void detach() {}
private:
- std::string logPrefix;
+ const LogPrefix& logPrefix;
};
/** Keep track of queues or exchanges during the update process to solve 2
@@ -213,8 +211,9 @@ class BrokerReplicator::UpdateTracker {
typedef boost::function<void (const std::string&)> CleanFn;
UpdateTracker(const std::string& type_, // "queue" or "exchange"
- CleanFn f)
- : type(type_), cleanFn(f) {}
+ CleanFn f,
+ const LogPrefix& lp)
+ : type(type_), cleanFn(f), logPrefix(lp) {}
/** Destructor cleans up remaining initial queues. */
~UpdateTracker() {
@@ -224,7 +223,7 @@ class BrokerReplicator::UpdateTracker {
boost::bind(&UpdateTracker::clean, this, _1));
}
catch (const std::exception& e) {
- QPID_LOG(error, "Error in cleanup of lost objects: " << e.what());
+ QPID_LOG(error, logPrefix << "Error in cleanup of lost objects: " << e.what());
}
}
@@ -251,7 +250,7 @@ class BrokerReplicator::UpdateTracker {
private:
void clean(const std::string& name) {
- QPID_LOG(debug, "Backup: Deleted " << type << " " << name <<
+ QPID_LOG(debug, logPrefix << "Deleted " << type << " " << name <<
": no longer exists on primary");
try { cleanFn(name); }
catch (const framing::NotFoundException&) {}
@@ -260,6 +259,7 @@ class BrokerReplicator::UpdateTracker {
std::string type;
Names initial, events;
CleanFn cleanFn;
+ const LogPrefix& logPrefix;
};
namespace {
@@ -279,7 +279,7 @@ boost::shared_ptr<BrokerReplicator> BrokerReplicator::create(
BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>& l)
: Exchange(QPID_CONFIGURATION_REPLICATOR),
- logPrefix("Backup: "), replicationTest(NONE),
+ logPrefix(hb.logPrefix), replicationTest(NONE),
haBroker(hb), broker(hb.getBroker()),
exchanges(broker.getExchanges()), queues(broker.getQueues()),
link(l),
@@ -372,19 +372,20 @@ void BrokerReplicator::connected(Bridge& bridge, SessionHandler& sessionHandler)
link->getRemoteAddress(primary);
string queueName = bridge.getQueueName();
- QPID_LOG(notice, logPrefix << (initialized ? "Failing over" : "Connecting")
- << " to primary " << primary
- << " status:" << printable(haBroker.getStatus()));
+ QPID_LOG(info, logPrefix << (initialized ? "Failing over" : "Connecting")
+ << " to primary " << primary);
initialized = true;
exchangeTracker.reset(
new UpdateTracker("exchange",
- boost::bind(&BrokerReplicator::deleteExchange, this, _1)));
+ boost::bind(&BrokerReplicator::deleteExchange, this, _1),
+ logPrefix));
exchanges.eachExchange(boost::bind(&BrokerReplicator::existingExchange, this, _1));
queueTracker.reset(
new UpdateTracker("queue",
- boost::bind(&BrokerReplicator::deleteQueue, this, _1, true)));
+ boost::bind(&BrokerReplicator::deleteQueue, this, _1, true),
+ logPrefix));
queues.eachQueue(boost::bind(&BrokerReplicator::existingQueue, this, _1));
framing::AMQP_ServerProxy peer(sessionHandler.out);
@@ -417,14 +418,14 @@ void BrokerReplicator::connected(Bridge& bridge, SessionHandler& sessionHandler)
// Called for each queue in existence when the backup connects to a primary.
void BrokerReplicator::existingQueue(const boost::shared_ptr<Queue>& q) {
if (replicationTest.getLevel(*q)) {
- QPID_LOG(debug, "Existing queue: " << q->getName());
+ QPID_LOG(debug, logPrefix << "Existing queue: " << q->getName());
queueTracker->addQueue(q);
}
}
void BrokerReplicator::existingExchange(const boost::shared_ptr<Exchange>& ex) {
if (replicationTest.getLevel(*ex)) {
- QPID_LOG(debug, "Existing exchange: " << ex->getName());
+ QPID_LOG(debug, logPrefix << "Existing exchange: " << ex->getName());
exchangeTracker->addExchange(ex);
}
}
@@ -447,7 +448,7 @@ void BrokerReplicator::route(Deliverable& msg) {
if (msg.getMessage().getPropertyAsString(QMF_CONTENT) == EVENT) {
for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
Variant::Map& map = i->asMap();
- QPID_LOG(trace, "Broker replicator event: " << map);
+ QPID_LOG(trace, logPrefix << "Broker replicator event: " << map);
Variant::Map& schema = map[SCHEMA_ID].asMap();
Variant::Map& values = map[VALUES].asMap();
std::string key = (schema[PACKAGE_NAME].asString() +
@@ -459,7 +460,7 @@ void BrokerReplicator::route(Deliverable& msg) {
} else if (msg.getMessage().getPropertyAsString(QMF_OPCODE) == QUERY_RESPONSE) {
for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
Variant::Map& map = i->asMap();
- QPID_LOG(trace, "Broker replicator response: " << map);
+ QPID_LOG(trace, logPrefix << "Broker replicator response: " << map);
string type = map[SCHEMA_ID].asMap()[CLASS_NAME].asString();
Variant::Map& values = map[VALUES].asMap();
framing::FieldTable args;
@@ -691,8 +692,7 @@ void BrokerReplicator::doResponseExchange(Variant::Map& values) {
if (exchange &&
exchange->getArgs().getAsString(QPID_HA_UUID) != args.getAsString(QPID_HA_UUID))
{
- QPID_LOG(warning, logPrefix << "UUID mismatch, replacing exchange: "
- << name);
+ QPID_LOG(warning, logPrefix << "Exchange response replacing (UUID mismatch): " << name);
deleteExchange(name);
}
CreateExchangeResult result = createExchange(
@@ -793,21 +793,17 @@ void BrokerReplicator::deleteQueue(const std::string& name, bool purge) {
}
void BrokerReplicator::deleteExchange(const std::string& name) {
- try {
- boost::shared_ptr<broker::Exchange> exchange = exchanges.find(name);
- if (!exchange) {
- QPID_LOG(warning, logPrefix << "Cannot delete exchange, not found: " << name);
- return;
- }
- if (exchange->inUseAsAlternate()) {
- QPID_LOG(warning, "Cannot delete exchange, in use as alternate: " << name);
- return;
- }
- broker.deleteExchange(name, userId, remoteHost);
- QPID_LOG(debug, logPrefix << "Exchange deleted: " << name);
- } catch (const framing::NotFoundException&) {
- QPID_LOG(debug, logPrefix << "Exchange not found for deletion: " << name);
+ boost::shared_ptr<broker::Exchange> exchange = exchanges.find(name);
+ if (!exchange) {
+ QPID_LOG(warning, logPrefix << "Cannot delete exchange, not found: " << name);
+ return;
+ }
+ if (exchange->inUseAsAlternate()) {
+ QPID_LOG(warning, logPrefix << "Cannot delete exchange, in use as alternate: " << name);
+ return;
}
+ broker.deleteExchange(name, userId, remoteHost);
+ QPID_LOG(debug, logPrefix << "Exchange deleted: " << name);
}
boost::shared_ptr<QueueReplicator> BrokerReplicator::replicateQueue(
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
index 1e051878ae..44e80263de 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
@@ -52,6 +52,7 @@ class FieldTable;
}
namespace ha {
+class LogPrefix;
class HaBroker;
class QueueReplicator;
@@ -155,7 +156,7 @@ class BrokerReplicator : public broker::Exchange,
void setMembership(const types::Variant::List&); // Set membership from list.
- std::string logPrefix;
+ const LogPrefix& logPrefix;
ReplicationTest replicationTest;
std::string userId, remoteHost;
HaBroker& haBroker;
diff --git a/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp b/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp
index c9c5c2e576..a824adb871 100644
--- a/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp
+++ b/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp
@@ -31,7 +31,7 @@ namespace qpid {
namespace ha {
ConnectionObserver::ConnectionObserver(HaBroker& hb, const types::Uuid& uuid)
- : haBroker(hb), logPrefix("Backup: "), self(uuid) {}
+ : haBroker(hb), logPrefix(hb.logPrefix), self(uuid) {}
bool ConnectionObserver::getBrokerInfo(const broker::Connection& connection, BrokerInfo& info) {
qpid::types::Variant::Map::const_iterator i = connection.getClientProperties().find(ConnectionObserver::BACKUP_TAG);
@@ -55,11 +55,10 @@ bool ConnectionObserver::getAddress(const broker::Connection& connection, Addres
return false;
}
-void ConnectionObserver::setObserver(const ObserverPtr& o, const std::string& newlogPrefix)
+void ConnectionObserver::setObserver(const ObserverPtr& o)
{
sys::Mutex::ScopedLock l(lock);
observer = o;
- logPrefix = newlogPrefix;
}
ConnectionObserver::ObserverPtr ConnectionObserver::getObserver() {
@@ -83,21 +82,21 @@ void ConnectionObserver::opened(broker::Connection& connection) {
// Set my own address if there is an address header.
Address addr;
if (getAddress(connection, addr)) haBroker.setAddress(addr);
- QPID_LOG(debug, logPrefix << "Rejected self connection "+connection.getMgmtId());
+ QPID_LOG(trace, logPrefix << "Rejected self connection "+connection.getMgmtId());
connection.abort();
return;
}
if (connection.isLink()) return; // Allow outgoing links.
if (connection.getClientProperties().find(ADMIN_TAG) != connection.getClientProperties().end()) {
- QPID_LOG(debug, logPrefix << "Accepted admin connection: "
- << connection.getMgmtId());
+ QPID_LOG(trace, logPrefix << "Accepted admin connection: " << connection.getMgmtId());
return; // No need to call observer, always allow admins.
}
ObserverPtr o(getObserver());
if (o) o->opened(connection);
}
catch (const std::exception& e) {
- QPID_LOG(error, logPrefix << "Open error: " << e.what());
+ QPID_LOG(error, logPrefix << "Error on incoming connection " << connection.getMgmtId()
+ << ": " << e.what());
throw;
}
}
@@ -109,7 +108,8 @@ void ConnectionObserver::closed(broker::Connection& connection) {
if (o) o->closed(connection);
}
catch (const std::exception& e) {
- QPID_LOG(error, logPrefix << "Close error: " << e.what());
+ QPID_LOG(error, logPrefix << "Error closing incoming connection " << connection.getMgmtId()
+ << ": " << e.what());
throw;
}
}
diff --git a/qpid/cpp/src/qpid/ha/ConnectionObserver.h b/qpid/cpp/src/qpid/ha/ConnectionObserver.h
index 079dc43be6..f447d479f0 100644
--- a/qpid/cpp/src/qpid/ha/ConnectionObserver.h
+++ b/qpid/cpp/src/qpid/ha/ConnectionObserver.h
@@ -34,6 +34,7 @@ struct Address;
namespace ha {
class BrokerInfo;
class HaBroker;
+class LogPrefix;
/**
* Observes connections, delegates to another ConnectionObserver for
@@ -59,7 +60,7 @@ class ConnectionObserver : public broker::ConnectionObserver
ConnectionObserver(HaBroker& haBroker, const types::Uuid& self);
- void setObserver(const ObserverPtr&, const std::string& logPrefix);
+ void setObserver(const ObserverPtr&);
ObserverPtr getObserver();
void reset();
@@ -72,7 +73,7 @@ class ConnectionObserver : public broker::ConnectionObserver
sys::Mutex lock;
HaBroker& haBroker;
- std::string logPrefix;
+ const LogPrefix& logPrefix;
ObserverPtr observer;
types::Uuid self;
};
diff --git a/qpid/cpp/src/qpid/ha/FailoverExchange.cpp b/qpid/cpp/src/qpid/ha/FailoverExchange.cpp
index f1b87c63c8..9bda5ea5bf 100644
--- a/qpid/cpp/src/qpid/ha/FailoverExchange.cpp
+++ b/qpid/cpp/src/qpid/ha/FailoverExchange.cpp
@@ -63,7 +63,6 @@ ostream& operator<<(ostream& o, const OstreamUrls& urls) {
FailoverExchange::FailoverExchange(management::Manageable& parent, Broker& b)
: Exchange(typeName, &parent, &b)
{
- QPID_LOG(debug, typeName << " created.");
if (mgmtExchange != 0)
mgmtExchange->set_type(typeName);
}
@@ -114,7 +113,7 @@ bool FailoverExchange::hasBindings() {
}
void FailoverExchange::route(Deliverable&) {
- QPID_LOG(warning, "Message received by exchange " << typeName << " ignoring");
+ QPID_LOG(warning, typeName << " unexpected message, ignored.");
}
void FailoverExchange::sendUpdate(const Queue::shared_ptr& queue, sys::Mutex::ScopedLock&) {
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index f154e45a22..7699b0e1d2 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -69,10 +69,16 @@ using boost::dynamic_pointer_cast;
//
class HaBroker::BrokerObserver : public broker::BrokerObserver {
public:
+ BrokerObserver(const LogPrefix& lp) : logPrefix(lp) {}
+
void queueCreate(const boost::shared_ptr<broker::Queue>& q) {
q->getObservers().add(boost::shared_ptr<QueueSnapshot>(new QueueSnapshot));
- q->getMessageInterceptors().add(boost::shared_ptr<IdSetter>(new IdSetter(q->getName())));
+ q->getMessageInterceptors().add(
+ boost::shared_ptr<IdSetter>(new IdSetter(logPrefix, q->getName())));
}
+
+ private:
+ const LogPrefix& logPrefix;
};
// Called in Plugin::earlyInitialize
@@ -83,20 +89,19 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s)
broker(b),
observer(new ConnectionObserver(*this, systemId)),
role(new StandAlone),
- membership(BrokerInfo(systemId, STANDALONE), *this),
+ membership(BrokerInfo(systemId, STANDALONE), *this), // Sets logPrefix
failoverExchange(new FailoverExchange(*b.GetVhostObject(), b))
{
// If we are joining a cluster we must start excluding clients now,
// otherwise there's a window for a client to connect before we get to
// initialize()
if (settings.cluster) {
- QPID_LOG(debug, "Backup starting, rejecting client connections.");
- shared_ptr<broker::ConnectionObserver> excluder(new BackupConnectionExcluder);
- observer->setObserver(excluder, "Backup: ");
+ shared_ptr<broker::ConnectionObserver> excluder(new BackupConnectionExcluder(logPrefix));
+ observer->setObserver(excluder);
broker.getConnectionObservers().add(observer);
broker.getExchanges().registerExchange(failoverExchange);
}
- broker.getBrokerObservers().add(boost::shared_ptr<BrokerObserver>(new BrokerObserver()));
+ broker.getBrokerObservers().add(boost::shared_ptr<BrokerObserver>(new BrokerObserver(logPrefix)));
}
namespace {
@@ -107,8 +112,8 @@ bool isNone(const std::string& x) { return x.empty() || x == NONE; }
// Called in Plugin::initialize
void HaBroker::initialize() {
if (settings.cluster) {
+ QPID_LOG(info, logPrefix << "Starting HA broker");
membership.setStatus(JOINING);
- QPID_LOG(info, "Initializing HA broker: " << membership.getSelf());
}
// Set up the management object.
@@ -138,7 +143,6 @@ void HaBroker::initialize() {
}
HaBroker::~HaBroker() {
- QPID_LOG(notice, role->getLogPrefix() << "Shut down");
broker.getConnectionObservers().remove(observer);
}
@@ -160,7 +164,7 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args,
case _qmf::HaBroker::METHOD_REPLICATE: {
_qmf::ArgsHaBrokerReplicate& bq_args =
dynamic_cast<_qmf::ArgsHaBrokerReplicate&>(args);
- QPID_LOG(debug, role->getLogPrefix() << "Replicate individual queue "
+ QPID_LOG(debug, logPrefix << "Replicate individual queue "
<< bq_args.i_queue << " from " << bq_args.i_broker);
shared_ptr<broker::Queue> queue = broker.getQueues().get(bq_args.i_queue);
@@ -195,7 +199,7 @@ void HaBroker::setPublicUrl(const Url& url) {
knownBrokers.push_back(url);
vector<Url> urls(1, url);
failoverExchange->updateUrls(urls);
- QPID_LOG(debug, role->getLogPrefix() << "Public URL set to: " << url);
+ QPID_LOG(debug, logPrefix << "Public URL set to: " << url);
}
void HaBroker::setBrokerUrl(const Url& url) {
@@ -203,7 +207,7 @@ void HaBroker::setBrokerUrl(const Url& url) {
Mutex::ScopedLock l(lock);
brokerUrl = url;
mgmtObject->set_brokersUrl(brokerUrl.str());
- QPID_LOG(info, role->getLogPrefix() << "Brokers URL set to: " << url);
+ QPID_LOG(info, logPrefix << "Brokers URL set to: " << url);
}
role->setBrokerUrl(url); // Oustside lock
}
@@ -214,7 +218,7 @@ std::vector<Url> HaBroker::getKnownBrokers() const {
}
void HaBroker::shutdown(const std::string& message) {
- QPID_LOG(critical, "Shutting down: " << message);
+ QPID_LOG(critical, logPrefix << "Shutting down: " << message);
broker.shutdown();
throw Exception(message);
}
@@ -224,7 +228,7 @@ BrokerStatus HaBroker::getStatus() const {
}
void HaBroker::setAddress(const Address& a) {
- QPID_LOG(info, role->getLogPrefix() << "Set self address to: " << a);
+ QPID_LOG(info, logPrefix << "Set self address to: " << a);
membership.setSelfAddress(a);
}
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h
index 9fadd4f35c..023706e7e3 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.h
+++ b/qpid/cpp/src/qpid/ha/HaBroker.h
@@ -25,6 +25,7 @@
#include "BrokerInfo.h"
#include "Membership.h"
#include "types.h"
+#include "LogPrefix.h"
#include "Settings.h"
#include "qpid/Url.h"
#include "FailoverExchange.h"
@@ -101,6 +102,9 @@ class HaBroker : public management::Manageable
/** Authenticated user ID for queue create/delete */
std::string getUserId() const { return userId; }
+ /** logPrefix is thread safe and used by other classes (Membership) */
+ LogPrefix logPrefix;
+
private:
class BrokerObserver;
diff --git a/qpid/cpp/src/qpid/ha/HaPlugin.cpp b/qpid/cpp/src/qpid/ha/HaPlugin.cpp
index a958b0d29c..913a9b5084 100644
--- a/qpid/cpp/src/qpid/ha/HaPlugin.cpp
+++ b/qpid/cpp/src/qpid/ha/HaPlugin.cpp
@@ -76,7 +76,7 @@ struct HaPlugin : public Plugin {
broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
if (broker && (settings.cluster || settings.queueReplication)) {
if (!broker->getManagementAgent()) {
- QPID_LOG(warning, "HA plugin disabled because management is disabled");
+ QPID_LOG(warning, "Cannot start HA: management is disabled");
if (settings.cluster)
throw Exception("Cannot start HA: management is disabled");
} else {
diff --git a/qpid/cpp/src/qpid/ha/IdSetter.h b/qpid/cpp/src/qpid/ha/IdSetter.h
index 0350bf1519..f0629c99bb 100644
--- a/qpid/cpp/src/qpid/ha/IdSetter.h
+++ b/qpid/cpp/src/qpid/ha/IdSetter.h
@@ -33,6 +33,7 @@
namespace qpid {
namespace ha {
+class LogPrefix;
/**
* A MessageInterceptor that sets the ReplicationId on each message as it is
@@ -43,16 +44,16 @@ namespace ha {
class IdSetter : public broker::MessageInterceptor
{
public:
- IdSetter(const std::string& q, ReplicationId firstId=1) : queue(q), nextId(firstId) {
- QPID_LOG(debug, "Replication-ID will be set for " << queue << " from " << firstId);
- }
+ IdSetter(const LogPrefix& lp, const std::string& q, ReplicationId firstId=1) :
+ logPrefix(lp), queue(q), nextId(firstId)
+ {}
void record(broker::Message& m) {
// Record is called when a message is first delivered to a queue, before it has
// been enqueued or saved in a transaction buffer. This is when we normally want
// to assign a replication-id.
m.setReplicationId(nextId++);
- QPID_LOG(trace, "Replication-ID set: " << logMessageId(queue, m.getReplicationId()));
+ QPID_LOG(trace, logPrefix << "Replication-ID set: " << logMessageId(queue, m.getReplicationId()));
}
void publish(broker::Message& m) {
@@ -62,11 +63,12 @@ class IdSetter : public broker::MessageInterceptor
// store record() is not called, so set the ID now if not already set.
if (!m.hasReplicationId()) {
m.setReplicationId(nextId++);
- QPID_LOG(trace, "Replication-ID set: " << logMessageId(queue, m));
+ QPID_LOG(trace, logPrefix << "Replication-ID set: " << logMessageId(queue, m));
}
}
private:
+ const LogPrefix& logPrefix;
std::string queue;
sys::AtomicValue<uint32_t> nextId;
};
diff --git a/qpid/cpp/src/qpid/ha/LogPrefix.cpp b/qpid/cpp/src/qpid/ha/LogPrefix.cpp
new file mode 100644
index 0000000000..c1ccf050c1
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/LogPrefix.cpp
@@ -0,0 +1,35 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "LogPrefix.h"
+#include <iostream>
+
+namespace qpid {
+namespace ha {
+
+std::ostream& operator<<(std::ostream& o, const LogPrefix& lp) {
+ return o << lp.get();
+}
+
+std::ostream& operator<<(std::ostream& o, const LogPrefix2& lp) {
+ return o << lp.prePrefix.get() << lp.get();
+}
+
+}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/LogPrefix.h b/qpid/cpp/src/qpid/ha/LogPrefix.h
new file mode 100644
index 0000000000..3b6bb17d99
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/LogPrefix.h
@@ -0,0 +1,75 @@
+#ifndef QPID_HA_LOGPREFIX_H
+#define QPID_HA_LOGPREFIX_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/sys/Mutex.h>
+#include <string>
+#include <iosfwd>
+
+namespace qpid {
+namespace ha {
+
+/**
+ * Thread safe string holder to hold a string that may be read and modified concurrently.
+ */
+class LogPrefix
+{
+ public:
+ explicit LogPrefix(const std::string& s=std::string()) : prefix(s) {}
+ void set(const std::string& s) { sys::RWlock::ScopedWlock l(lock); prefix = s; }
+ std::string get() const { sys::RWlock::ScopedRlock l(lock); return prefix; }
+
+ LogPrefix& operator=(const std::string& s) { set(s); return *this; }
+ operator std::string() const { return get(); }
+
+ private:
+ // Undefined, not copyable.
+ LogPrefix(const LogPrefix& lp);
+ LogPrefix& operator=(const LogPrefix&);
+
+ mutable sys::RWlock lock;
+ std::string prefix;
+};
+std::ostream& operator<<(std::ostream& o, const LogPrefix& lp);
+
+/**
+ * A two-part log prefix with a reference to a pre-prefix and a post-prefix.
+ * Operator << will print both parts, get/set just manage the post-prefix.
+ */
+class LogPrefix2 : public LogPrefix {
+ public:
+ const LogPrefix& prePrefix;
+ explicit LogPrefix2(const LogPrefix& lp, const std::string& s=std::string()) : LogPrefix(s), prePrefix(lp) {}
+ LogPrefix2& operator=(const std::string& s) { set(s); return *this; }
+
+ private:
+ // Undefined, not copyable.
+ LogPrefix2(const LogPrefix2& lp);
+ LogPrefix2& operator=(const LogPrefix2&);
+};
+std::ostream& operator<<(std::ostream& o, const LogPrefix2& lp);
+
+
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_LOGPREFIX_H*/
diff --git a/qpid/cpp/src/qpid/ha/Membership.cpp b/qpid/cpp/src/qpid/ha/Membership.cpp
index fdb47014d9..92a0b7db70 100644
--- a/qpid/cpp/src/qpid/ha/Membership.cpp
+++ b/qpid/cpp/src/qpid/ha/Membership.cpp
@@ -43,9 +43,14 @@ Membership::Membership(const BrokerInfo& info, HaBroker& b)
: haBroker(b), self(info.getSystemId())
{
brokers[self] = info;
+ setPrefix();
oldStatus = info.getStatus();
}
+void Membership::setPrefix() {
+ haBroker.logPrefix = Msg() << shortStr(brokers[self].getSystemId())
+ << "(" << printable(brokers[self].getStatus()) << ") ";
+}
void Membership::clear() {
Mutex::ScopedLock l(lock);
BrokerInfo me = brokers[self];
@@ -57,7 +62,7 @@ void Membership::add(const BrokerInfo& b) {
Mutex::ScopedLock l(lock);
assert(b.getSystemId() != self);
brokers[b.getSystemId()] = b;
- update(l);
+ update(true, l);
}
@@ -67,7 +72,7 @@ void Membership::remove(const types::Uuid& id) {
BrokerInfo::Map::iterator i = brokers.find(id);
if (i != brokers.end()) {
brokers.erase(i);
- update(l);
+ update(true, l);
}
}
@@ -83,7 +88,7 @@ void Membership::assign(const types::Variant::List& list) {
BrokerInfo b(i->asMap());
brokers[b.getSystemId()] = b;
}
- update(l);
+ update(true, l);
}
types::Variant::List Membership::asList() const {
@@ -144,8 +149,7 @@ bool checkTransition(BrokerStatus from, BrokerStatus to) {
}
} // namespace
-void Membership::update(Mutex::ScopedLock& l) {
- QPID_LOG(info, "Membership: " << brokers);
+void Membership::update(bool log, Mutex::ScopedLock& l) {
// Update managment and send update event.
BrokerStatus newStatus = getStatus(l);
Variant::List brokerList = asList(l);
@@ -171,27 +175,30 @@ void Membership::update(Mutex::ScopedLock& l) {
// Check status transitions
if (oldStatus != newStatus) {
- QPID_LOG(info, "Status change: "
+ QPID_LOG(info, haBroker.logPrefix << "Status change: "
<< printable(oldStatus) << " -> " << printable(newStatus));
if (!checkTransition(oldStatus, newStatus)) {
haBroker.shutdown(QPID_MSG("Illegal state transition: " << printable(oldStatus)
<< " -> " << printable(newStatus)));
}
oldStatus = newStatus;
+ setPrefix();
+ if (newStatus == READY) QPID_LOG(notice, haBroker.logPrefix << "Backup is ready");
}
+ if (log) QPID_LOG(info, haBroker.logPrefix << "Membership update: " << brokers);
}
void Membership::setMgmtObject(boost::shared_ptr<_qmf::HaBroker> mo) {
Mutex::ScopedLock l(lock);
mgmtObject = mo;
- update(l);
+ update(false, l);
}
void Membership::setStatus(BrokerStatus newStatus) {
Mutex::ScopedLock l(lock);
brokers[self].setStatus(newStatus);
- update(l);
+ update(false, l);
}
BrokerStatus Membership::getStatus() const {
@@ -215,7 +222,7 @@ BrokerInfo Membership::getSelf() const {
void Membership::setSelfAddress(const Address& a) {
Mutex::ScopedLock l(lock);
brokers[self].setAddress(a);
- update(l);
+ update(false, l);
}
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/Membership.h b/qpid/cpp/src/qpid/ha/Membership.h
index 5b2b72e2fc..5590d255fa 100644
--- a/qpid/cpp/src/qpid/ha/Membership.h
+++ b/qpid/cpp/src/qpid/ha/Membership.h
@@ -85,7 +85,8 @@ class Membership
void setSelfAddress(const Address&);
private:
- void update(sys::Mutex::ScopedLock&);
+ void setPrefix();
+ void update(bool log, sys::Mutex::ScopedLock&);
BrokerStatus getStatus(sys::Mutex::ScopedLock&) const;
types::Variant::List asList(sys::Mutex::ScopedLock&) const;
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp
index dd41f74790..3790d14626 100644
--- a/qpid/cpp/src/qpid/ha/Primary.cpp
+++ b/qpid/cpp/src/qpid/ha/Primary.cpp
@@ -38,6 +38,7 @@
#include "qpid/framing/FieldValue.h"
#include "qpid/log/Statement.h"
#include "qpid/types/Uuid.h"
+#include "qpid/types/Variant.h"
#include "qpid/sys/Timer.h"
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
@@ -54,6 +55,10 @@ using namespace framing;
namespace {
+const std::string CLIENT_PROCESS_NAME("qpid.client_process");
+const std::string CLIENT_PID("qpid.client_pid");
+const std::string CLIENT_PPID("qpid.client_ppid");
+
class PrimaryConnectionObserver : public broker::ConnectionObserver
{
public:
@@ -90,7 +95,7 @@ class ExpectedBackupTimerTask : public sys::TimerTask {
class PrimaryErrorListener : public broker::SessionHandler::ErrorListener {
public:
- PrimaryErrorListener(const std::string& logPrefix_) : logPrefix(logPrefix_) {}
+ PrimaryErrorListener(const LogPrefix& lp) : logPrefix(lp) {}
void connectionException(framing::connection::CloseCode code, const std::string& msg) {
QPID_LOG(debug, logPrefix << framing::createConnectionException(code, msg).what());
@@ -104,17 +109,15 @@ class PrimaryErrorListener : public broker::SessionHandler::ErrorListener {
void incomingExecutionException(framing::execution::ErrorCode code, const std::string& msg) {
QPID_LOG(debug, logPrefix << "Incoming " << framing::createSessionException(code, msg).what());
}
- void detach() {
- QPID_LOG(debug, logPrefix << "Session detached.");
- }
+ void detach() {}
private:
- std::string logPrefix;
+ const LogPrefix& logPrefix;
};
class PrimarySessionHandlerObserver : public broker::SessionHandlerObserver {
public:
- PrimarySessionHandlerObserver(const std::string& logPrefix)
+ PrimarySessionHandlerObserver(const LogPrefix& logPrefix)
: errorListener(new PrimaryErrorListener(logPrefix)) {}
void newSessionHandler(broker::SessionHandler& sh) {
BrokerInfo info;
@@ -133,7 +136,7 @@ class PrimarySessionHandlerObserver : public broker::SessionHandlerObserver {
Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
haBroker(hb), membership(hb.getMembership()),
- logPrefix("Primary: "), active(false),
+ logPrefix(hb.logPrefix), active(false),
replicationTest(hb.getSettings().replicateDefault.get()),
sessionHandlerObserver(new PrimarySessionHandlerObserver(logPrefix)),
queueLimits(logPrefix, hb.getBroker().getQueues(), replicationTest)
@@ -142,6 +145,7 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
// So we are safe from client interference while we set up the primary.
hb.getMembership().setStatus(RECOVERING);
+ QPID_LOG(notice, logPrefix << "Promoted to primary");
// Process all QueueReplicators, handles auto-delete queues.
QueueReplicator::Vector qrs;
@@ -152,10 +156,9 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
// NOTE: RemoteBackups must be created before we set the BrokerObserver
// or ConnectionObserver so that there is no client activity while
// the QueueGuards are created.
- QPID_LOG(notice, logPrefix << "Promoted and recovering, waiting for backups: "
- << expect);
+ QPID_LOG(notice, logPrefix << "Recovering backups: " << expect);
for (BrokerInfo::Set::const_iterator i = expect.begin(); i != expect.end(); ++i) {
- boost::shared_ptr<RemoteBackup> backup(new RemoteBackup(*i, 0));
+ boost::shared_ptr<RemoteBackup> backup(new RemoteBackup(*i, 0, logPrefix));
backups[i->getSystemId()] = backup;
if (!backup->isReady()) expectedBackups.insert(backup);
setCatchupQueues(backup, true); // Create guards
@@ -173,7 +176,7 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
// Allow client connections
connectionObserver.reset(new PrimaryConnectionObserver(*this));
- haBroker.getObserver()->setObserver(connectionObserver, logPrefix);
+ haBroker.getObserver()->setObserver(connectionObserver);
}
Primary::~Primary() {
@@ -191,8 +194,8 @@ void Primary::checkReady() {
activate = active = true;
}
if (activate) {
- QPID_LOG(notice, logPrefix << "Promoted and active.");
membership.setStatus(ACTIVE); // Outside of lock.
+ QPID_LOG(notice, logPrefix << "All backups recovered.");
}
}
@@ -205,7 +208,7 @@ void Primary::checkReady(boost::shared_ptr<RemoteBackup> backup) {
info.setStatus(READY);
membership.add(info);
if (expectedBackups.erase(backup)) {
- QPID_LOG(info, logPrefix << "Expected backup is ready: " << info);
+ QPID_LOG(info, logPrefix << "Recovering backup is ready: " << info);
ready = true;
}
else
@@ -229,7 +232,7 @@ void Primary::timeoutExpectedBackups() {
boost::shared_ptr<RemoteBackup> backup = *j;
if (!backup->getConnection()) {
BrokerInfo info = backup->getBrokerInfo();
- QPID_LOG(error, logPrefix << "Expected backup timed out: " << info);
+ QPID_LOG(error, logPrefix << "Recovering backup timed out: " << info);
backupDisconnect(backup, l); // Calls erase(j)
// Keep broker in membership but downgrade status to CATCHUP.
// The broker will get this status change when it eventually connects.
@@ -303,6 +306,8 @@ void Primary::queueCreate(const QueuePtr& q) {
ReplicateLevel level = replicationTest.useLevel(*q);
q->addArgument(QPID_REPLICATE, printable(level).str());
if (level) {
+ QPID_LOG(debug, logPrefix << "Created queue " << q->getName()
+ << " replication: " << printable(level));
// Give each queue a unique id. Used by backups to avoid confusion of
// same-named queues.
q->addArgument(QPID_HA_UUID, types::Variant(Uuid(true)));
@@ -312,8 +317,6 @@ void Primary::queueCreate(const QueuePtr& q) {
for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i)
i->second->queueCreate(q);
}
- QPID_LOG(debug, logPrefix << "Created queue " << q->getName()
- << " replication: " << printable(level));
checkReady(); // Outside lock
}
}
@@ -358,7 +361,7 @@ void Primary::exchangeDestroy(const ExchangePtr& ex) {
shared_ptr<RemoteBackup> Primary::backupConnect(
const BrokerInfo& info, broker::Connection& connection, Mutex::ScopedLock&)
{
- shared_ptr<RemoteBackup> backup(new RemoteBackup(info, &connection));
+ shared_ptr<RemoteBackup> backup(new RemoteBackup(info, &connection, logPrefix));
queueLimits.addBackup(backup);
backups[info.getSystemId()] = backup;
return backup;
@@ -382,7 +385,15 @@ void Primary::opened(broker::Connection& connection) {
if (ha::ConnectionObserver::getBrokerInfo(connection, info)) {
Mutex::ScopedLock l(lock);
BackupMap::iterator i = backups.find(info.getSystemId());
+ if (info.getStatus() == JOINING) {
+ info.setStatus(CATCHUP);
+ membership.add(info);
+ }
if (i == backups.end()) {
+ if (info.getStatus() == JOINING) {
+ info.setStatus(CATCHUP);
+ membership.add(info);
+ }
QPID_LOG(info, logPrefix << "New backup connection: " << info);
backup = backupConnect(info, connection, l);
}
@@ -397,13 +408,20 @@ void Primary::opened(broker::Connection& connection) {
i->second->setConnection(&connection);
backup = i->second;
}
- if (info.getStatus() == JOINING) {
- info.setStatus(CATCHUP);
- membership.add(info);
+ }
+ else {
+ const types::Variant::Map& properties = connection.getClientProperties();
+ std::ostringstream pinfo;
+ types::Variant::Map::const_iterator i = properties.find(CLIENT_PROCESS_NAME);
+ // FIXME aconway 2014-08-13: Conditional on logging.
+ if (i != properties.end()) {
+ pinfo << " " << i->second;
+ i = properties.find(CLIENT_PID);
+ if (i != properties.end())
+ pinfo << "(" << i->second << ")";
}
+ QPID_LOG(info, logPrefix << "Accepted client connection " << connection.getMgmtId() << pinfo.str());
}
- else
- QPID_LOG(debug, logPrefix << "Accepted client connection " << connection.getMgmtId());
// Outside lock
if (backup) {
@@ -448,7 +466,7 @@ boost::shared_ptr<QueueGuard> Primary::getGuard(const QueuePtr& q, const BrokerI
}
Role* Primary::promote() {
- QPID_LOG(info, "Ignoring promotion, already primary: " << haBroker.getBrokerInfo());
+ QPID_LOG(info, logPrefix << "Ignoring promotion, already primary");
return 0;
}
diff --git a/qpid/cpp/src/qpid/ha/Primary.h b/qpid/cpp/src/qpid/ha/Primary.h
index 46cf990834..84d714fc01 100644
--- a/qpid/cpp/src/qpid/ha/Primary.h
+++ b/qpid/cpp/src/qpid/ha/Primary.h
@@ -25,6 +25,7 @@
#include "types.h"
#include "hash.h"
#include "BrokerInfo.h"
+#include "LogPrefix.h"
#include "PrimaryQueueLimits.h"
#include "ReplicationTest.h"
#include "Role.h"
@@ -81,7 +82,6 @@ class Primary : public Role
~Primary();
// Role implementation
- std::string getLogPrefix() const { return logPrefix; }
Role* promote();
void setBrokerUrl(const Url&) {}
@@ -142,7 +142,7 @@ class Primary : public Role
mutable sys::Mutex lock;
HaBroker& haBroker;
Membership& membership;
- std::string logPrefix;
+ const LogPrefix& logPrefix;
bool active;
ReplicationTest replicationTest;
diff --git a/qpid/cpp/src/qpid/ha/PrimaryQueueLimits.h b/qpid/cpp/src/qpid/ha/PrimaryQueueLimits.h
index d614a48099..6d0c55736a 100644
--- a/qpid/cpp/src/qpid/ha/PrimaryQueueLimits.h
+++ b/qpid/cpp/src/qpid/ha/PrimaryQueueLimits.h
@@ -36,6 +36,7 @@ class Queue;
}
namespace ha {
+class LogPrefix;
class RemoteBackup;
/**
@@ -48,7 +49,7 @@ class PrimaryQueueLimits
{
public:
// FIXME aconway 2014-01-24: hardcoded maxQueues, use negotiated channel-max
- PrimaryQueueLimits(const std::string& lp,
+ PrimaryQueueLimits(const LogPrefix& lp,
broker::QueueRegistry& qr,
const ReplicationTest& rt
) :
@@ -97,7 +98,7 @@ class PrimaryQueueLimits
void removeBackup(const boost::shared_ptr<RemoteBackup>&) {}
private:
- std::string logPrefix;
+ const LogPrefix& logPrefix;
uint64_t maxQueues;
uint64_t queues;
};
diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
index 90046a8c5a..bbfcbba3a3 100644
--- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
+++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
@@ -94,6 +94,7 @@ PrimaryTxObserver::PrimaryTxObserver(
Primary& p, HaBroker& hb, const boost::intrusive_ptr<broker::TxBuffer>& tx
) :
state(SENDING),
+ logPrefix(hb.logPrefix),
primary(p), haBroker(hb), broker(hb.getBroker()),
replicationTest(hb.getSettings().replicateDefault.get()),
txBuffer(tx),
@@ -101,7 +102,7 @@ PrimaryTxObserver::PrimaryTxObserver(
exchangeName(TRANSACTION_REPLICATOR_PREFIX+id.str()),
empty(true)
{
- logPrefix = "Primary transaction "+shortStr(id)+": ";
+ logPrefix = "Primary TX "+shortStr(id)+": ";
// The brokers known at this point are the ones that will be included
// in the transaction. Brokers that join later are not included.
@@ -115,8 +116,7 @@ PrimaryTxObserver::PrimaryTxObserver(
for (size_t i = 0; i < incomplete.size(); ++i)
txBuffer->startCompleter();
- QPID_LOG(debug, logPrefix << "Started TX " << id);
- QPID_LOG(debug, logPrefix << "Backups: " << backups);
+ QPID_LOG(debug, logPrefix << "Started, backups " << backups);
}
void PrimaryTxObserver::initialize() {
@@ -140,9 +140,7 @@ void PrimaryTxObserver::initialize() {
}
-PrimaryTxObserver::~PrimaryTxObserver() {
- QPID_LOG(debug, logPrefix << "Ended");
-}
+PrimaryTxObserver::~PrimaryTxObserver() {}
void PrimaryTxObserver::checkState(State expect, const std::string& msg) {
if (state != expect)
@@ -254,7 +252,7 @@ void PrimaryTxObserver::end(Mutex::ScopedLock&) {
try {
broker.getExchanges().destroy(getExchangeName());
} catch (const std::exception& e) {
- QPID_LOG(error, logPrefix << "Deleting transaction exchange: " << e.what());
+ QPID_LOG(error, logPrefix << "Deleting TX exchange: " << e.what());
}
}
@@ -266,11 +264,12 @@ bool PrimaryTxObserver::completed(const Uuid& id, Mutex::ScopedLock&) {
return false;
}
-bool PrimaryTxObserver::error(const Uuid& id, const char* msg, Mutex::ScopedLock& l)
+bool PrimaryTxObserver::error(const Uuid& id, const std::string& msg, Mutex::ScopedLock& l)
{
if (incomplete.find(id) != incomplete.end()) {
// Note: setError before completed since completed may trigger completion.
- txBuffer->setError(QPID_MSG(logPrefix << msg << id));
+ // Only use the TX part of the log prefix.
+ txBuffer->setError(Msg() << logPrefix.get() << msg << shortStr(id) << ".");
completed(id, l);
return true;
}
@@ -290,7 +289,7 @@ void PrimaryTxObserver::txPrepareOkEvent(const string& data) {
void PrimaryTxObserver::txPrepareFailEvent(const string& data) {
Mutex::ScopedLock l(lock);
types::Uuid backup = decodeStr<TxPrepareFailEvent>(data).broker;
- if (error(backup, "Prepare failed on backup: ", l)) {
+ if (error(backup, "Prepare failed on backup ", l)) {
QPID_LOG(error, logPrefix << "Prepare failed on backup " << backup);
} else {
QPID_LOG(error, logPrefix << "Unexpected prepare-fail response from " << backup);
diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
index 6ea1ba185b..6f445ee212 100644
--- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
+++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
@@ -24,6 +24,7 @@
#include "types.h"
#include "ReplicationTest.h"
+#include "LogPrefix.h"
#include "qpid/broker/SessionState.h"
#include "qpid/broker/TransactionObserver.h"
#include "qpid/log/Statement.h"
@@ -105,11 +106,11 @@ class PrimaryTxObserver : public broker::TransactionObserver,
void txPrepareOkEvent(const std::string& data);
void txPrepareFailEvent(const std::string& data);
bool completed(const types::Uuid& id, sys::Mutex::ScopedLock&);
- bool error(const types::Uuid& id, const char* msg, sys::Mutex::ScopedLock& l);
+ bool error(const types::Uuid& id, const std::string& msg, sys::Mutex::ScopedLock& l);
sys::Monitor lock;
State state;
- std::string logPrefix;
+ LogPrefix2 logPrefix;
Primary& primary;
HaBroker& haBroker;
broker::Broker& broker;
diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.cpp b/qpid/cpp/src/qpid/ha/QueueGuard.cpp
index 94b7a53937..b6b6037b6f 100644
--- a/qpid/cpp/src/qpid/ha/QueueGuard.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueGuard.cpp
@@ -47,8 +47,8 @@ class QueueGuard::QueueObserver : public broker::QueueObserver
-QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info)
- : cancelled(false), queue(q)
+QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info, const LogPrefix& lp)
+ : cancelled(false), logPrefix(lp), queue(q)
{
std::ostringstream os;
os << "Guard of " << queue.getName() << " at ";
@@ -61,7 +61,9 @@ QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info)
QueuePosition front, back;
q.getRange(front, back, broker::REPLICATOR);
first = back + 1;
- QPID_LOG(debug, logPrefix << "First guarded position " << first);
+ QPID_LOG(debug, logPrefix << "Guarded: front " << front
+ << ", back " << back
+ << ", guarded " << first);
}
QueueGuard::~QueueGuard() { cancel(); }
diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.h b/qpid/cpp/src/qpid/ha/QueueGuard.h
index 33967970eb..9bf61f31da 100644
--- a/qpid/cpp/src/qpid/ha/QueueGuard.h
+++ b/qpid/cpp/src/qpid/ha/QueueGuard.h
@@ -24,6 +24,7 @@
#include "types.h"
#include "hash.h"
+#include "LogPrefix.h"
#include "qpid/types/Uuid.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/unordered_map.h"
@@ -59,7 +60,7 @@ class ReplicatingSubscription;
*/
class QueueGuard {
public:
- QueueGuard(broker::Queue& q, const BrokerInfo&);
+ QueueGuard(broker::Queue& q, const BrokerInfo&, const LogPrefix&);
~QueueGuard();
/** QueueObserver override. Delay completion of the message.
@@ -97,7 +98,7 @@ class QueueGuard {
sys::Mutex lock;
QueuePosition first;
bool cancelled;
- std::string logPrefix;
+ LogPrefix2 logPrefix;
broker::Queue& queue;
Delayed delayed;
boost::shared_ptr<QueueObserver> observer;
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index 4abbb3affb..7997bc6aa9 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -22,6 +22,7 @@
#include "Event.h"
#include "HaBroker.h"
#include "IdSetter.h"
+#include "LogPrefix.h"
#include "QueueReplicator.h"
#include "QueueSnapshot.h"
#include "ReplicatingSubscription.h"
@@ -97,12 +98,11 @@ class QueueReplicator::ErrorListener : public SessionHandler::ErrorListener {
QPID_LOG(error, logPrefix << "Incoming "
<< framing::createSessionException(code, msg).what());
}
- void detach() {
- QPID_LOG(debug, logPrefix << "Session detached");
- }
+ void detach() {}
+
private:
boost::weak_ptr<QueueReplicator> queueReplicator;
- std::string logPrefix;
+ const LogPrefix& logPrefix;
};
class QueueReplicator::QueueObserver : public broker::QueueObserver {
@@ -152,11 +152,12 @@ QueueReplicator::QueueReplicator(HaBroker& hb,
link(l),
queue(q),
sessionHandler(0),
- logPrefix("Backup of "+q->getName()+": "),
+ logPrefix(hb.logPrefix, "Backup of "+q->getName()+": "),
subscribed(false),
settings(hb.getSettings()),
nextId(0), maxId(0)
{
+ QPID_LOG(debug, logPrefix << "Created");
// The QueueReplicator will take over setting replication IDs.
boost::shared_ptr<IdSetter> setter =
q->getMessageInterceptors().findType<IdSetter>();
@@ -181,7 +182,6 @@ QueueReplicator::~QueueReplicator() {}
void QueueReplicator::initialize() {
Mutex::ScopedLock l(lock);
- QPID_LOG(debug, logPrefix << "Created");
if (!queue) return; // Already destroyed
// Enable callback to route()
@@ -255,10 +255,13 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa
arguments.setInt(QPID_SYNC_FREQUENCY, 1); // TODO aconway 2012-05-22: optimize?
arguments.setTable(ReplicatingSubscription::QPID_BROKER_INFO, brokerInfo.asFieldTable());
boost::shared_ptr<QueueSnapshot> qs = queue->getObservers().findType<QueueSnapshot>();
- if (qs) arguments.set(ReplicatingSubscription::QPID_ID_SET,
- FieldTable::ValuePtr(
- new Var32Value(encodeStr(qs->getSnapshot()), TYPE_CODE_VBIN32)));
-
+ ReplicationIdSet snapshot;
+ if (qs) {
+ snapshot = qs->getSnapshot();
+ arguments.set(
+ ReplicatingSubscription::QPID_ID_SET,
+ FieldTable::ValuePtr(new Var32Value(encodeStr(snapshot), TYPE_CODE_VBIN32)));
+ }
try {
peer.getMessage().subscribe(
args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/,
@@ -268,12 +271,12 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa
peer.getMessage().flow(getName(), 1, settings.getFlowBytes());
}
catch(const exception& e) {
- QPID_LOG(error, QPID_MSG(logPrefix + "Cannot connect to primary: " << e.what()));
+ QPID_LOG(error, logPrefix << "Cannot connect to primary: " << e.what());
throw;
}
qpid::Address primary;
link->getRemoteAddress(primary);
- QPID_LOG(debug, logPrefix << "Connected to " << primary << "(" << bridgeName << ")");
+ QPID_LOG(debug, logPrefix << "Connected to " << primary << " snapshot=" << snapshot << " bridge=" << bridgeName);
QPID_LOG(trace, logPrefix << "Subscription arguments: " << arguments);
}
@@ -391,7 +394,7 @@ void QueueReplicator::promoted() {
// On primary QueueReplicator no longer sets IDs, start an IdSetter.
QPID_LOG(debug, logPrefix << "Promoted, first replication-id " << maxId+1)
queue->getMessageInterceptors().add(
- boost::shared_ptr<IdSetter>(new IdSetter(queue->getName(), maxId+1)));
+ boost::shared_ptr<IdSetter>(new IdSetter(logPrefix, queue->getName(), maxId+1)));
// Process auto-deletes
if (queue->isAutoDelete()) {
// Make a temporary shared_ptr to prevent premature deletion of queue.
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h
index 3d525db440..a4b31b6c9a 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.h
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h
@@ -22,7 +22,10 @@
*
*/
+
+
#include "BrokerInfo.h"
+#include "LogPrefix.h"
#include "hash.h"
#include "qpid/broker/Exchange.h"
#include <boost/enable_shared_from_this.hpp>
@@ -134,7 +137,7 @@ class QueueReplicator : public broker::Exchange,
bool deletedOnPrimary(framing::execution::ErrorCode e, const std::string& msg);
- std::string logPrefix;
+ LogPrefix2 logPrefix;
std::string bridgeName;
bool subscribed;
diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
index c0a118d57f..c263d37e43 100644
--- a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
+++ b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
@@ -35,13 +35,13 @@ using sys::Mutex;
using boost::bind;
RemoteBackup::RemoteBackup(
- const BrokerInfo& info, broker::Connection* c
-) : brokerInfo(info), replicationTest(NONE), started(false), connection(c), reportedReady(false)
+ const BrokerInfo& info, broker::Connection* c, const LogPrefix& lp
+) : logPrefix(lp), brokerInfo(info), replicationTest(NONE),
+ started(false), connection(c), reportedReady(false)
{
std::ostringstream oss;
oss << "Remote backup at " << info << ": ";
logPrefix = oss.str();
- QPID_LOG(debug, logPrefix << (c? "Connected" : "Expected"));
}
RemoteBackup::~RemoteBackup() {
@@ -70,7 +70,7 @@ void RemoteBackup::catchupQueue(const QueuePtr& q, bool createGuard) {
QPID_LOG(debug, logPrefix << "Catch-up queue"
<< (createGuard ? " and guard" : "") << ": " << q->getName());
catchupQueues.insert(q);
- if (createGuard) guards[q].reset(new QueueGuard(*q, brokerInfo));
+ if (createGuard) guards[q].reset(new QueueGuard(*q, brokerInfo, logPrefix));
}
}
@@ -86,18 +86,12 @@ RemoteBackup::GuardPtr RemoteBackup::guard(const QueuePtr& q) {
void RemoteBackup::ready(const QueuePtr& q) {
catchupQueues.erase(q);
- if (catchupQueues.size()) {
- QPID_LOG(debug, logPrefix << "Caught up on queue: " << q->getName() << ", "
- << catchupQueues.size() << " remain to catch up");
- }
- else
- QPID_LOG(debug, logPrefix << "Caught up on queue: " << q->getName() );
}
// Called via BrokerObserver::queueCreate and from catchupQueue
void RemoteBackup::queueCreate(const QueuePtr& q) {
if (replicationTest.getLevel(*q) == ALL)
- guards[q].reset(new QueueGuard(*q, brokerInfo));
+ guards[q].reset(new QueueGuard(*q, brokerInfo, logPrefix));
}
// Called via BrokerObserver
@@ -112,6 +106,7 @@ void RemoteBackup::queueDestroy(const QueuePtr& q) {
bool RemoteBackup::reportReady() {
if (!reportedReady && isReady()) {
+ if (catchupQueues.empty()) QPID_LOG(debug, logPrefix << "Caught up.");
reportedReady = true;
return true;
}
diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.h b/qpid/cpp/src/qpid/ha/RemoteBackup.h
index b9e2e1a496..77c493d27e 100644
--- a/qpid/cpp/src/qpid/ha/RemoteBackup.h
+++ b/qpid/cpp/src/qpid/ha/RemoteBackup.h
@@ -22,6 +22,7 @@
*
*/
+#include "LogPrefix.h"
#include "ReplicationTest.h"
#include "BrokerInfo.h"
#include "types.h"
@@ -56,7 +57,7 @@ class RemoteBackup
/** Note: isReady() can be true after construction
*@param connected true if the backup is already connected.
*/
- RemoteBackup(const BrokerInfo&, broker::Connection*);
+ RemoteBackup(const BrokerInfo&, broker::Connection*, const LogPrefix&);
~RemoteBackup();
/** Return guard associated with a queue. Used to create ReplicatingSubscription. */
@@ -102,7 +103,7 @@ class RemoteBackup
typedef std::set<QueuePtr> QueueSet;
- std::string logPrefix;
+ LogPrefix2 logPrefix;
BrokerInfo brokerInfo;
ReplicationTest replicationTest;
GuardMap guards;
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index 67e1e77681..e08a34529e 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -107,7 +107,7 @@ ReplicatingSubscription::ReplicatingSubscription(
const framing::FieldTable& arguments
) : ConsumerImpl(parent, name, queue_, ack, REPLICATOR, exclusive, tag,
resumeId, resumeTtl, arguments),
-
+ logPrefix(hb.logPrefix),
position(0), wasStopped(false), ready(false), cancelled(false),
haBroker(hb),
primary(boost::dynamic_pointer_cast<Primary>(haBroker.getRole()))
@@ -121,7 +121,7 @@ void ReplicatingSubscription::initialize() {
FieldTable ft;
if (!getArguments().getTable(ReplicatingSubscription::QPID_BROKER_INFO, ft))
throw InvalidArgumentException(
- logPrefix+"Can't subscribe, no broker info: "+getTag());
+ logPrefix.get()+"Can't subscribe, no broker info: "+getTag());
info.assign(ft);
// Set a log prefix message that identifies the remote broker.
@@ -132,7 +132,7 @@ void ReplicatingSubscription::initialize() {
// If there's already a guard (we are in failover) use it, else create one.
if (primary) guard = primary->getGuard(queue, info);
- if (!guard) guard.reset(new QueueGuard(*queue, info));
+ if (!guard) guard.reset(new QueueGuard(*queue, info, logPrefix.prePrefix));
// NOTE: Once the observer is attached we can have concurrent
// calls to dequeued so we need to lock use of this->dequeues.
@@ -147,7 +147,7 @@ void ReplicatingSubscription::initialize() {
if (!snapshot) {
queue->getObservers().remove(
boost::dynamic_pointer_cast<ReplicatingSubscription>(shared_from_this()));
- throw ResourceDeletedException(logPrefix+"Can't subscribe, queue deleted");
+ throw ResourceDeletedException(logPrefix.get()+"Can't subscribe, queue deleted");
}
ReplicationIdSet primaryIds = snapshot->getSnapshot();
std::string backupStr = getArguments().getAsString(ReplicatingSubscription::QPID_ID_SET);
@@ -166,10 +166,10 @@ void ReplicatingSubscription::initialize() {
// position >= front so if front is safe then position must be.
position = front;
- QPID_LOG(debug, logPrefix << "Subscribed: front " << front
- << ", back " << back
+ QPID_LOG(debug, logPrefix << "Subscribed: primary ["
+ << front << "," << back << "]=" << primaryIds
<< ", guarded " << guard->getFirst()
- << ", on backup " << skipEnqueue);
+ << ", backup (keep " << skipEnqueue << ", drop " << initDequeues << ")");
checkReady(l);
}
@@ -242,7 +242,12 @@ void ReplicatingSubscription::checkReady(sys::Mutex::ScopedLock& l) {
ready = true;
sys::Mutex::ScopedUnlock u(lock);
// Notify Primary that a subscription is ready.
- QPID_LOG(debug, logPrefix << "Caught up");
+ if (position+1 >= guard->getFirst()) {
+ QPID_LOG(debug, logPrefix << "Caught up at " << position);
+ } else {
+ QPID_LOG(debug, logPrefix << "Caught up at " << position << "short of guard at " << guard->getFirst());
+ }
+
if (primary) primary->readyReplica(*this);
}
}
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
index 08c08b0ca3..d6d41dd2cf 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
@@ -23,6 +23,7 @@
*/
#include "BrokerInfo.h"
+#include "LogPrefix.h"
#include "qpid/broker/SemanticState.h"
#include "qpid/broker/ConsumerFactory.h"
#include "qpid/broker/QueueObserver.h"
@@ -144,7 +145,7 @@ class ReplicatingSubscription :
bool doDispatch();
private:
- std::string logPrefix;
+ LogPrefix2 logPrefix;
QueuePosition position;
ReplicationIdSet dequeues; // Dequeues to be sent in next dequeue event.
ReplicationIdSet skipEnqueue; // Enqueues to skip: messages already on backup and tx enqueues.
diff --git a/qpid/cpp/src/qpid/ha/Role.h b/qpid/cpp/src/qpid/ha/Role.h
index 9986fde7e1..5392ce1fff 100644
--- a/qpid/cpp/src/qpid/ha/Role.h
+++ b/qpid/cpp/src/qpid/ha/Role.h
@@ -40,9 +40,6 @@ class Role
public:
virtual ~Role() {}
- /** Log prefix appropriate to the role */
- virtual std::string getLogPrefix() const = 0;
-
/** QMF promote method handler.
* @return The new role if promoted, 0 if not. Caller takes ownership.
*/
diff --git a/qpid/cpp/src/qpid/ha/StandAlone.h b/qpid/cpp/src/qpid/ha/StandAlone.h
index d052996d40..01bcf1a0b3 100644
--- a/qpid/cpp/src/qpid/ha/StandAlone.h
+++ b/qpid/cpp/src/qpid/ha/StandAlone.h
@@ -33,12 +33,8 @@ namespace ha {
class StandAlone : public Role
{
public:
- std::string getLogPrefix() const { return logPrefix; }
Role* promote() { return 0; }
void setBrokerUrl(const Url&) {}
-
- private:
- std::string logPrefix;
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/TxReplicator.cpp b/qpid/cpp/src/qpid/ha/TxReplicator.cpp
index ee8bd342b2..33adc9780d 100644
--- a/qpid/cpp/src/qpid/ha/TxReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/TxReplicator.cpp
@@ -39,10 +39,11 @@
#include "qpid/broker/DeliverableMessage.h"
#include "qpid/framing/BufferTypes.h"
#include "qpid/log/Statement.h"
-#include <boost/shared_ptr.hpp>
-#include <boost/bind.hpp>
#include "qpid/broker/amqp_0_10/MessageTransfer.h"
#include "qpid/framing/MessageTransferBody.h"
+#include <boost/shared_ptr.hpp>
+#include <boost/bind.hpp>
+#include <sstream>
namespace qpid {
namespace ha {
@@ -57,15 +58,19 @@ namespace {
const string PREFIX(TRANSACTION_REPLICATOR_PREFIX);
} // namespace
-
-
bool TxReplicator::isTxQueue(const string& q) {
return startsWith(q, PREFIX);
}
-string TxReplicator::getTxId(const string& q) {
- assert(isTxQueue(q));
- return q.substr(PREFIX.size());
+Uuid TxReplicator::getTxId(const string& q) {
+ if (TxReplicator::isTxQueue(q)) {
+ std::istringstream is(q);
+ is.seekg(PREFIX.size());
+ Uuid id;
+ is >> id;
+ if (!is.fail()) return id;
+ }
+ throw Exception(QPID_MSG("Invalid tx queue: " << q));
}
string TxReplicator::getType() const { return ReplicatingSubscription::QPID_TX_REPLICATOR; }
@@ -85,15 +90,14 @@ TxReplicator::TxReplicator(
const boost::shared_ptr<broker::Queue>& txQueue,
const boost::shared_ptr<broker::Link>& link) :
QueueReplicator(hb, txQueue, link),
+ logPrefix(hb.logPrefix),
store(hb.getBroker().hasStore() ? &hb.getBroker().getStore() : 0),
channel(link->nextChannel()),
empty(true), ended(false),
dequeueState(hb.getBroker().getQueues())
{
- string id(getTxId(txQueue->getName()));
- string shortId = id.substr(0, 8);
- logPrefix = "Backup of transaction "+shortId+": ";
- QPID_LOG(debug, logPrefix << "Started TX " << id);
+ logPrefix = "Backup of TX "+shortStr(getTxId(txQueue->getName()))+": ";
+ QPID_LOG(debug, logPrefix << "Started");
if (!store) throw Exception(QPID_MSG(logPrefix << "No message store loaded."));
// Dispatch transaction events.
@@ -213,7 +217,7 @@ void TxReplicator::prepare(const string&, sys::Mutex::ScopedLock& l) {
QPID_LOG(debug, logPrefix << "Local prepare OK");
sendMessage(TxPrepareOkEvent(haBroker.getSystemId()).message(queue->getName()), l);
} else {
- QPID_LOG(debug, logPrefix << "Local prepare failed");
+ QPID_LOG(error, logPrefix << "Local prepare failed");
sendMessage(TxPrepareFailEvent(haBroker.getSystemId()).message(queue->getName()), l);
}
}
@@ -240,7 +244,7 @@ void TxReplicator::backups(const string& data, sys::Mutex::ScopedLock& l) {
TxBackupsEvent e;
decodeStr(data, e);
if (!e.backups.count(haBroker.getMembership().getSelf().getSystemId())) {
- QPID_LOG(info, logPrefix << "Not participating in transaction");
+ QPID_LOG(info, logPrefix << "Not participating");
end(l);
} else {
QPID_LOG(debug, logPrefix << "Backups: " << e.backups);
diff --git a/qpid/cpp/src/qpid/ha/TxReplicator.h b/qpid/cpp/src/qpid/ha/TxReplicator.h
index fe25fbc78b..c7599d21b1 100644
--- a/qpid/cpp/src/qpid/ha/TxReplicator.h
+++ b/qpid/cpp/src/qpid/ha/TxReplicator.h
@@ -22,11 +22,13 @@
*
*/
+#include "LogPrefix.h"
#include "QueueReplicator.h"
#include "Event.h"
#include "qpid/broker/DeliveryRecord.h"
#include "qpid/broker/TransactionalStore.h"
#include "qpid/sys/Mutex.h"
+#include "qpid/types/Uuid.h"
namespace qpid {
@@ -56,7 +58,7 @@ class TxReplicator : public QueueReplicator {
typedef boost::shared_ptr<broker::Link> LinkPtr;
static bool isTxQueue(const std::string& queue);
- static std::string getTxId(const std::string& queue);
+ static types::Uuid getTxId(const std::string& queue);
static boost::shared_ptr<TxReplicator> create(
HaBroker&, const QueuePtr& txQueue, const LinkPtr& link);
@@ -90,7 +92,7 @@ class TxReplicator : public QueueReplicator {
void backups(const std::string& data, sys::Mutex::ScopedLock&);
void end(sys::Mutex::ScopedLock&);
- std::string logPrefix;
+ LogPrefix2 logPrefix;
TxEnqueueEvent enq; // Enqueue data for next deliver.
boost::intrusive_ptr<broker::TxBuffer> txBuffer;
broker::MessageStore* store;
diff --git a/qpid/cpp/src/tests/qpid-txtest2.cpp b/qpid/cpp/src/tests/qpid-txtest2.cpp
index 3d9941baee..d64f13d9c5 100644
--- a/qpid/cpp/src/tests/qpid-txtest2.cpp
+++ b/qpid/cpp/src/tests/qpid-txtest2.cpp
@@ -165,7 +165,7 @@ struct Client
session.close();
connection.close();
} catch(const std::exception& e) {
- std::cout << e.what() << std::endl;
+ std::cout << "Client shutdown: " << e.what() << std::endl;
}
}
};
@@ -350,7 +350,7 @@ int main(int argc, char** argv)
}
return 0;
} catch(const std::exception& e) {
- std::cout << e.what() << std::endl;
+ std::cout << argv[0] << ": " << e.what() << std::endl;
}
return 2;
}