diff options
38 files changed, 368 insertions, 203 deletions
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt index ebdff57961..568cec84b8 100644 --- a/qpid/cpp/src/CMakeLists.txt +++ b/qpid/cpp/src/CMakeLists.txt @@ -522,7 +522,6 @@ option(BUILD_HA "Build Active-Passive HA plugin" ${ha_default}) if (BUILD_HA) set (ha_SOURCES - qpid/ha/QueueSnapshot.h qpid/ha/AlternateExchangeSetter.h qpid/ha/Backup.cpp qpid/ha/Backup.h @@ -533,24 +532,29 @@ if (BUILD_HA) qpid/ha/BrokerReplicator.h qpid/ha/ConnectionObserver.cpp qpid/ha/ConnectionObserver.h - qpid/ha/Event.cpp - qpid/ha/Event.h + qpid/ha/Event.cpp + qpid/ha/Event.h qpid/ha/FailoverExchange.cpp qpid/ha/FailoverExchange.h qpid/ha/HaBroker.cpp qpid/ha/HaBroker.h qpid/ha/HaPlugin.cpp - qpid/ha/IdSetter.h - qpid/ha/QueueSnapshot.h + qpid/ha/IdSetter.h + qpid/ha/LogPrefix.cpp + qpid/ha/LogPrefix.h qpid/ha/Membership.cpp qpid/ha/Membership.h qpid/ha/Primary.cpp qpid/ha/Primary.h qpid/ha/PrimaryQueueLimits.h + qpid/ha/PrimaryTxObserver.cpp + qpid/ha/PrimaryTxObserver.h qpid/ha/QueueGuard.cpp qpid/ha/QueueGuard.h qpid/ha/QueueReplicator.cpp qpid/ha/QueueReplicator.h + qpid/ha/QueueSnapshot.h + qpid/ha/QueueSnapshot.h qpid/ha/RemoteBackup.cpp qpid/ha/RemoteBackup.h qpid/ha/ReplicatingSubscription.cpp @@ -564,11 +568,9 @@ if (BUILD_HA) qpid/ha/StatusCheck.h qpid/ha/TxReplicatingSubscription.cpp qpid/ha/TxReplicatingSubscription.h - qpid/ha/PrimaryTxObserver.cpp - qpid/ha/PrimaryTxObserver.h + qpid/ha/TxReplicator.cpp + qpid/ha/TxReplicator.h qpid/ha/types.cpp - qpid/ha/TxReplicator.cpp - qpid/ha/TxReplicator.h qpid/ha/types.h ) diff --git a/qpid/cpp/src/qpid/broker/TxBuffer.cpp b/qpid/cpp/src/qpid/broker/TxBuffer.cpp index 6dc2f5c2f4..f7552f16a3 100644 --- a/qpid/cpp/src/qpid/broker/TxBuffer.cpp +++ b/qpid/cpp/src/qpid/broker/TxBuffer.cpp @@ -96,7 +96,8 @@ std::string TxBuffer::endCommit(TransactionalStore* const store) { void TxBuffer::setError(const std::string& e) { QPID_LOG(error, "Asynchronous transaction error: " << e); sys::Mutex::ScopedLock l(errorLock); - error = e; + if (!error.empty()) error += " "; + error += e; } }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp index 9d50b1c665..f1b6eadd75 100644 --- a/qpid/cpp/src/qpid/ha/Backup.cpp +++ b/qpid/cpp/src/qpid/ha/Backup.cpp @@ -49,7 +49,7 @@ using std::string; using sys::Mutex; Backup::Backup(HaBroker& hb, const Settings& s) : - logPrefix("Backup: "), membership(hb.getMembership()), stopped(false), + logPrefix(hb.logPrefix), membership(hb.getMembership()), stopped(false), haBroker(hb), broker(hb.getBroker()), settings(s), statusCheck(new StatusCheck(hb)) {} @@ -60,7 +60,7 @@ void Backup::setBrokerUrl(const Url& brokers) { if (stopped) return; if (haBroker.getStatus() == JOINING) statusCheck->setUrl(brokers); if (!link) { // Not yet initialized - QPID_LOG(info, logPrefix << "Connecting to cluster, broker URL: " << brokers); + QPID_LOG(info, logPrefix << "Connecting to cluster: " << brokers); string protocol = brokers[0].protocol.empty() ? "tcp" : brokers[0].protocol; types::Uuid uuid(true); link = broker.getLinks().declare( @@ -78,7 +78,6 @@ void Backup::setBrokerUrl(const Url& brokers) { void Backup::stop(Mutex::ScopedLock&) { if (stopped) return; stopped = true; - QPID_LOG(debug, logPrefix << "Leaving backup role."); if (link) link->close(); if (replicator.get()) { replicator->shutdown(); @@ -106,8 +105,7 @@ Role* Backup::promote() { case JOINING: if (statusCheck->canPromote()) return recover(l); else { - QPID_LOG(error, - logPrefix << "Joining active cluster, cannot be promoted."); + QPID_LOG(error, logPrefix << "Joining active cluster, cannot be promoted."); throw Exception("Joining active cluster, cannot be promoted."); } break; diff --git a/qpid/cpp/src/qpid/ha/Backup.h b/qpid/cpp/src/qpid/ha/Backup.h index 88194158ce..47c44aa59c 100644 --- a/qpid/cpp/src/qpid/ha/Backup.h +++ b/qpid/cpp/src/qpid/ha/Backup.h @@ -22,6 +22,7 @@ * */ +#include "LogPrefix.h" #include "Role.h" #include "Settings.h" #include "qpid/Url.h" @@ -53,8 +54,6 @@ class Backup : public Role Backup(HaBroker&, const Settings&); ~Backup(); - std::string getLogPrefix() const { return logPrefix; } - void setBrokerUrl(const Url&); Role* promote(); @@ -65,7 +64,7 @@ class Backup : public Role void stop(sys::Mutex::ScopedLock&); Role* recover(sys::Mutex::ScopedLock&); - std::string logPrefix; + const LogPrefix& logPrefix; Membership& membership; sys::Mutex lock; diff --git a/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h b/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h index 5a67cde922..a58e666fa7 100644 --- a/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h +++ b/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h @@ -22,6 +22,7 @@ * */ +#include "LogPrefix.h" #include "qpid/broker/ConnectionObserver.h" #include "qpid/broker/Connection.h" #include "qpid/log/Statement.h" @@ -35,12 +36,17 @@ namespace ha { class BackupConnectionExcluder : public broker::ConnectionObserver { public: + BackupConnectionExcluder(const LogPrefix& lp) : logPrefix(lp) {} + void opened(broker::Connection& connection) { - QPID_LOG(debug, "Backup: Rejected connection "+connection.getMgmtId()); + QPID_LOG(trace, logPrefix << "Rejected connection "+connection.getMgmtId()); connection.abort(); } void closed(broker::Connection&) {} + + private: + const LogPrefix& logPrefix; }; }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/BrokerInfo.cpp b/qpid/cpp/src/qpid/ha/BrokerInfo.cpp index a13451e179..c8a652a7ab 100644 --- a/qpid/cpp/src/qpid/ha/BrokerInfo.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerInfo.cpp @@ -92,7 +92,7 @@ void BrokerInfo::assign(const Variant::Map& m) { } std::ostream& BrokerInfo::printId(std::ostream& o) const { - o << getSystemId().str().substr(0,8); + o << shortStr(getSystemId()); if (getAddress() != empty) o << "@" << getAddress(); return o; } diff --git a/qpid/cpp/src/qpid/ha/BrokerInfo.h b/qpid/cpp/src/qpid/ha/BrokerInfo.h index 87d51f5113..92556a5c4b 100644 --- a/qpid/cpp/src/qpid/ha/BrokerInfo.h +++ b/qpid/cpp/src/qpid/ha/BrokerInfo.h @@ -63,9 +63,14 @@ class BrokerInfo void assign(const types::Variant::Map&); // So it can be put in a set. - bool operator<(const BrokerInfo x) const { return systemId < x.systemId; } + bool operator<(const BrokerInfo& x) const { return systemId < x.systemId; } - // Print just the identifying information, not the status. + bool operator==(const BrokerInfo& x) const + { return address == x.address && systemId == x.systemId && status == x.status; } + + bool operator!=(const BrokerInfo& x) const { return !(*this == x); } + + // Print just the identifying information (shortId@address), not the status. std::ostream& printId(std::ostream& o) const; private: diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index 3957ef5a0c..a62080932d 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -175,7 +175,7 @@ Variant::Map asMapVoid(const Variant& value) { // Report errors on the broker replication session. class BrokerReplicator::ErrorListener : public broker::SessionHandler::ErrorListener { public: - ErrorListener(const std::string& logPrefix_) : logPrefix(logPrefix_) {} + ErrorListener(const LogPrefix& lp) : logPrefix(lp) {} void connectionException(framing::connection::CloseCode code, const std::string& msg) { QPID_LOG(error, logPrefix << framing::createConnectionException(code, msg).what()); @@ -189,12 +189,10 @@ class BrokerReplicator::ErrorListener : public broker::SessionHandler::ErrorList void incomingExecutionException(framing::execution::ErrorCode code, const std::string& msg) { QPID_LOG(error, logPrefix << "Incoming " << framing::createSessionException(code, msg).what()); } - void detach() { - QPID_LOG(debug, logPrefix << "Session detached."); - } + void detach() {} private: - std::string logPrefix; + const LogPrefix& logPrefix; }; /** Keep track of queues or exchanges during the update process to solve 2 @@ -213,8 +211,9 @@ class BrokerReplicator::UpdateTracker { typedef boost::function<void (const std::string&)> CleanFn; UpdateTracker(const std::string& type_, // "queue" or "exchange" - CleanFn f) - : type(type_), cleanFn(f) {} + CleanFn f, + const LogPrefix& lp) + : type(type_), cleanFn(f), logPrefix(lp) {} /** Destructor cleans up remaining initial queues. */ ~UpdateTracker() { @@ -224,7 +223,7 @@ class BrokerReplicator::UpdateTracker { boost::bind(&UpdateTracker::clean, this, _1)); } catch (const std::exception& e) { - QPID_LOG(error, "Error in cleanup of lost objects: " << e.what()); + QPID_LOG(error, logPrefix << "Error in cleanup of lost objects: " << e.what()); } } @@ -251,7 +250,7 @@ class BrokerReplicator::UpdateTracker { private: void clean(const std::string& name) { - QPID_LOG(debug, "Backup: Deleted " << type << " " << name << + QPID_LOG(debug, logPrefix << "Deleted " << type << " " << name << ": no longer exists on primary"); try { cleanFn(name); } catch (const framing::NotFoundException&) {} @@ -260,6 +259,7 @@ class BrokerReplicator::UpdateTracker { std::string type; Names initial, events; CleanFn cleanFn; + const LogPrefix& logPrefix; }; namespace { @@ -279,7 +279,7 @@ boost::shared_ptr<BrokerReplicator> BrokerReplicator::create( BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>& l) : Exchange(QPID_CONFIGURATION_REPLICATOR), - logPrefix("Backup: "), replicationTest(NONE), + logPrefix(hb.logPrefix), replicationTest(NONE), haBroker(hb), broker(hb.getBroker()), exchanges(broker.getExchanges()), queues(broker.getQueues()), link(l), @@ -372,19 +372,20 @@ void BrokerReplicator::connected(Bridge& bridge, SessionHandler& sessionHandler) link->getRemoteAddress(primary); string queueName = bridge.getQueueName(); - QPID_LOG(notice, logPrefix << (initialized ? "Failing over" : "Connecting") - << " to primary " << primary - << " status:" << printable(haBroker.getStatus())); + QPID_LOG(info, logPrefix << (initialized ? "Failing over" : "Connecting") + << " to primary " << primary); initialized = true; exchangeTracker.reset( new UpdateTracker("exchange", - boost::bind(&BrokerReplicator::deleteExchange, this, _1))); + boost::bind(&BrokerReplicator::deleteExchange, this, _1), + logPrefix)); exchanges.eachExchange(boost::bind(&BrokerReplicator::existingExchange, this, _1)); queueTracker.reset( new UpdateTracker("queue", - boost::bind(&BrokerReplicator::deleteQueue, this, _1, true))); + boost::bind(&BrokerReplicator::deleteQueue, this, _1, true), + logPrefix)); queues.eachQueue(boost::bind(&BrokerReplicator::existingQueue, this, _1)); framing::AMQP_ServerProxy peer(sessionHandler.out); @@ -417,14 +418,14 @@ void BrokerReplicator::connected(Bridge& bridge, SessionHandler& sessionHandler) // Called for each queue in existence when the backup connects to a primary. void BrokerReplicator::existingQueue(const boost::shared_ptr<Queue>& q) { if (replicationTest.getLevel(*q)) { - QPID_LOG(debug, "Existing queue: " << q->getName()); + QPID_LOG(debug, logPrefix << "Existing queue: " << q->getName()); queueTracker->addQueue(q); } } void BrokerReplicator::existingExchange(const boost::shared_ptr<Exchange>& ex) { if (replicationTest.getLevel(*ex)) { - QPID_LOG(debug, "Existing exchange: " << ex->getName()); + QPID_LOG(debug, logPrefix << "Existing exchange: " << ex->getName()); exchangeTracker->addExchange(ex); } } @@ -447,7 +448,7 @@ void BrokerReplicator::route(Deliverable& msg) { if (msg.getMessage().getPropertyAsString(QMF_CONTENT) == EVENT) { for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) { Variant::Map& map = i->asMap(); - QPID_LOG(trace, "Broker replicator event: " << map); + QPID_LOG(trace, logPrefix << "Broker replicator event: " << map); Variant::Map& schema = map[SCHEMA_ID].asMap(); Variant::Map& values = map[VALUES].asMap(); std::string key = (schema[PACKAGE_NAME].asString() + @@ -459,7 +460,7 @@ void BrokerReplicator::route(Deliverable& msg) { } else if (msg.getMessage().getPropertyAsString(QMF_OPCODE) == QUERY_RESPONSE) { for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) { Variant::Map& map = i->asMap(); - QPID_LOG(trace, "Broker replicator response: " << map); + QPID_LOG(trace, logPrefix << "Broker replicator response: " << map); string type = map[SCHEMA_ID].asMap()[CLASS_NAME].asString(); Variant::Map& values = map[VALUES].asMap(); framing::FieldTable args; @@ -691,8 +692,7 @@ void BrokerReplicator::doResponseExchange(Variant::Map& values) { if (exchange && exchange->getArgs().getAsString(QPID_HA_UUID) != args.getAsString(QPID_HA_UUID)) { - QPID_LOG(warning, logPrefix << "UUID mismatch, replacing exchange: " - << name); + QPID_LOG(warning, logPrefix << "Exchange response replacing (UUID mismatch): " << name); deleteExchange(name); } CreateExchangeResult result = createExchange( @@ -793,21 +793,17 @@ void BrokerReplicator::deleteQueue(const std::string& name, bool purge) { } void BrokerReplicator::deleteExchange(const std::string& name) { - try { - boost::shared_ptr<broker::Exchange> exchange = exchanges.find(name); - if (!exchange) { - QPID_LOG(warning, logPrefix << "Cannot delete exchange, not found: " << name); - return; - } - if (exchange->inUseAsAlternate()) { - QPID_LOG(warning, "Cannot delete exchange, in use as alternate: " << name); - return; - } - broker.deleteExchange(name, userId, remoteHost); - QPID_LOG(debug, logPrefix << "Exchange deleted: " << name); - } catch (const framing::NotFoundException&) { - QPID_LOG(debug, logPrefix << "Exchange not found for deletion: " << name); + boost::shared_ptr<broker::Exchange> exchange = exchanges.find(name); + if (!exchange) { + QPID_LOG(warning, logPrefix << "Cannot delete exchange, not found: " << name); + return; + } + if (exchange->inUseAsAlternate()) { + QPID_LOG(warning, logPrefix << "Cannot delete exchange, in use as alternate: " << name); + return; } + broker.deleteExchange(name, userId, remoteHost); + QPID_LOG(debug, logPrefix << "Exchange deleted: " << name); } boost::shared_ptr<QueueReplicator> BrokerReplicator::replicateQueue( diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h index 1e051878ae..44e80263de 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h @@ -52,6 +52,7 @@ class FieldTable; } namespace ha { +class LogPrefix; class HaBroker; class QueueReplicator; @@ -155,7 +156,7 @@ class BrokerReplicator : public broker::Exchange, void setMembership(const types::Variant::List&); // Set membership from list. - std::string logPrefix; + const LogPrefix& logPrefix; ReplicationTest replicationTest; std::string userId, remoteHost; HaBroker& haBroker; diff --git a/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp b/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp index c9c5c2e576..a824adb871 100644 --- a/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp +++ b/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp @@ -31,7 +31,7 @@ namespace qpid { namespace ha { ConnectionObserver::ConnectionObserver(HaBroker& hb, const types::Uuid& uuid) - : haBroker(hb), logPrefix("Backup: "), self(uuid) {} + : haBroker(hb), logPrefix(hb.logPrefix), self(uuid) {} bool ConnectionObserver::getBrokerInfo(const broker::Connection& connection, BrokerInfo& info) { qpid::types::Variant::Map::const_iterator i = connection.getClientProperties().find(ConnectionObserver::BACKUP_TAG); @@ -55,11 +55,10 @@ bool ConnectionObserver::getAddress(const broker::Connection& connection, Addres return false; } -void ConnectionObserver::setObserver(const ObserverPtr& o, const std::string& newlogPrefix) +void ConnectionObserver::setObserver(const ObserverPtr& o) { sys::Mutex::ScopedLock l(lock); observer = o; - logPrefix = newlogPrefix; } ConnectionObserver::ObserverPtr ConnectionObserver::getObserver() { @@ -83,21 +82,21 @@ void ConnectionObserver::opened(broker::Connection& connection) { // Set my own address if there is an address header. Address addr; if (getAddress(connection, addr)) haBroker.setAddress(addr); - QPID_LOG(debug, logPrefix << "Rejected self connection "+connection.getMgmtId()); + QPID_LOG(trace, logPrefix << "Rejected self connection "+connection.getMgmtId()); connection.abort(); return; } if (connection.isLink()) return; // Allow outgoing links. if (connection.getClientProperties().find(ADMIN_TAG) != connection.getClientProperties().end()) { - QPID_LOG(debug, logPrefix << "Accepted admin connection: " - << connection.getMgmtId()); + QPID_LOG(trace, logPrefix << "Accepted admin connection: " << connection.getMgmtId()); return; // No need to call observer, always allow admins. } ObserverPtr o(getObserver()); if (o) o->opened(connection); } catch (const std::exception& e) { - QPID_LOG(error, logPrefix << "Open error: " << e.what()); + QPID_LOG(error, logPrefix << "Error on incoming connection " << connection.getMgmtId() + << ": " << e.what()); throw; } } @@ -109,7 +108,8 @@ void ConnectionObserver::closed(broker::Connection& connection) { if (o) o->closed(connection); } catch (const std::exception& e) { - QPID_LOG(error, logPrefix << "Close error: " << e.what()); + QPID_LOG(error, logPrefix << "Error closing incoming connection " << connection.getMgmtId() + << ": " << e.what()); throw; } } diff --git a/qpid/cpp/src/qpid/ha/ConnectionObserver.h b/qpid/cpp/src/qpid/ha/ConnectionObserver.h index 079dc43be6..f447d479f0 100644 --- a/qpid/cpp/src/qpid/ha/ConnectionObserver.h +++ b/qpid/cpp/src/qpid/ha/ConnectionObserver.h @@ -34,6 +34,7 @@ struct Address; namespace ha { class BrokerInfo; class HaBroker; +class LogPrefix; /** * Observes connections, delegates to another ConnectionObserver for @@ -59,7 +60,7 @@ class ConnectionObserver : public broker::ConnectionObserver ConnectionObserver(HaBroker& haBroker, const types::Uuid& self); - void setObserver(const ObserverPtr&, const std::string& logPrefix); + void setObserver(const ObserverPtr&); ObserverPtr getObserver(); void reset(); @@ -72,7 +73,7 @@ class ConnectionObserver : public broker::ConnectionObserver sys::Mutex lock; HaBroker& haBroker; - std::string logPrefix; + const LogPrefix& logPrefix; ObserverPtr observer; types::Uuid self; }; diff --git a/qpid/cpp/src/qpid/ha/FailoverExchange.cpp b/qpid/cpp/src/qpid/ha/FailoverExchange.cpp index f1b87c63c8..9bda5ea5bf 100644 --- a/qpid/cpp/src/qpid/ha/FailoverExchange.cpp +++ b/qpid/cpp/src/qpid/ha/FailoverExchange.cpp @@ -63,7 +63,6 @@ ostream& operator<<(ostream& o, const OstreamUrls& urls) { FailoverExchange::FailoverExchange(management::Manageable& parent, Broker& b) : Exchange(typeName, &parent, &b) { - QPID_LOG(debug, typeName << " created."); if (mgmtExchange != 0) mgmtExchange->set_type(typeName); } @@ -114,7 +113,7 @@ bool FailoverExchange::hasBindings() { } void FailoverExchange::route(Deliverable&) { - QPID_LOG(warning, "Message received by exchange " << typeName << " ignoring"); + QPID_LOG(warning, typeName << " unexpected message, ignored."); } void FailoverExchange::sendUpdate(const Queue::shared_ptr& queue, sys::Mutex::ScopedLock&) { diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp index f154e45a22..7699b0e1d2 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.cpp +++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp @@ -69,10 +69,16 @@ using boost::dynamic_pointer_cast; // class HaBroker::BrokerObserver : public broker::BrokerObserver { public: + BrokerObserver(const LogPrefix& lp) : logPrefix(lp) {} + void queueCreate(const boost::shared_ptr<broker::Queue>& q) { q->getObservers().add(boost::shared_ptr<QueueSnapshot>(new QueueSnapshot)); - q->getMessageInterceptors().add(boost::shared_ptr<IdSetter>(new IdSetter(q->getName()))); + q->getMessageInterceptors().add( + boost::shared_ptr<IdSetter>(new IdSetter(logPrefix, q->getName()))); } + + private: + const LogPrefix& logPrefix; }; // Called in Plugin::earlyInitialize @@ -83,20 +89,19 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s) broker(b), observer(new ConnectionObserver(*this, systemId)), role(new StandAlone), - membership(BrokerInfo(systemId, STANDALONE), *this), + membership(BrokerInfo(systemId, STANDALONE), *this), // Sets logPrefix failoverExchange(new FailoverExchange(*b.GetVhostObject(), b)) { // If we are joining a cluster we must start excluding clients now, // otherwise there's a window for a client to connect before we get to // initialize() if (settings.cluster) { - QPID_LOG(debug, "Backup starting, rejecting client connections."); - shared_ptr<broker::ConnectionObserver> excluder(new BackupConnectionExcluder); - observer->setObserver(excluder, "Backup: "); + shared_ptr<broker::ConnectionObserver> excluder(new BackupConnectionExcluder(logPrefix)); + observer->setObserver(excluder); broker.getConnectionObservers().add(observer); broker.getExchanges().registerExchange(failoverExchange); } - broker.getBrokerObservers().add(boost::shared_ptr<BrokerObserver>(new BrokerObserver())); + broker.getBrokerObservers().add(boost::shared_ptr<BrokerObserver>(new BrokerObserver(logPrefix))); } namespace { @@ -107,8 +112,8 @@ bool isNone(const std::string& x) { return x.empty() || x == NONE; } // Called in Plugin::initialize void HaBroker::initialize() { if (settings.cluster) { + QPID_LOG(info, logPrefix << "Starting HA broker"); membership.setStatus(JOINING); - QPID_LOG(info, "Initializing HA broker: " << membership.getSelf()); } // Set up the management object. @@ -138,7 +143,6 @@ void HaBroker::initialize() { } HaBroker::~HaBroker() { - QPID_LOG(notice, role->getLogPrefix() << "Shut down"); broker.getConnectionObservers().remove(observer); } @@ -160,7 +164,7 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, case _qmf::HaBroker::METHOD_REPLICATE: { _qmf::ArgsHaBrokerReplicate& bq_args = dynamic_cast<_qmf::ArgsHaBrokerReplicate&>(args); - QPID_LOG(debug, role->getLogPrefix() << "Replicate individual queue " + QPID_LOG(debug, logPrefix << "Replicate individual queue " << bq_args.i_queue << " from " << bq_args.i_broker); shared_ptr<broker::Queue> queue = broker.getQueues().get(bq_args.i_queue); @@ -195,7 +199,7 @@ void HaBroker::setPublicUrl(const Url& url) { knownBrokers.push_back(url); vector<Url> urls(1, url); failoverExchange->updateUrls(urls); - QPID_LOG(debug, role->getLogPrefix() << "Public URL set to: " << url); + QPID_LOG(debug, logPrefix << "Public URL set to: " << url); } void HaBroker::setBrokerUrl(const Url& url) { @@ -203,7 +207,7 @@ void HaBroker::setBrokerUrl(const Url& url) { Mutex::ScopedLock l(lock); brokerUrl = url; mgmtObject->set_brokersUrl(brokerUrl.str()); - QPID_LOG(info, role->getLogPrefix() << "Brokers URL set to: " << url); + QPID_LOG(info, logPrefix << "Brokers URL set to: " << url); } role->setBrokerUrl(url); // Oustside lock } @@ -214,7 +218,7 @@ std::vector<Url> HaBroker::getKnownBrokers() const { } void HaBroker::shutdown(const std::string& message) { - QPID_LOG(critical, "Shutting down: " << message); + QPID_LOG(critical, logPrefix << "Shutting down: " << message); broker.shutdown(); throw Exception(message); } @@ -224,7 +228,7 @@ BrokerStatus HaBroker::getStatus() const { } void HaBroker::setAddress(const Address& a) { - QPID_LOG(info, role->getLogPrefix() << "Set self address to: " << a); + QPID_LOG(info, logPrefix << "Set self address to: " << a); membership.setSelfAddress(a); } diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h index 9fadd4f35c..023706e7e3 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.h +++ b/qpid/cpp/src/qpid/ha/HaBroker.h @@ -25,6 +25,7 @@ #include "BrokerInfo.h" #include "Membership.h" #include "types.h" +#include "LogPrefix.h" #include "Settings.h" #include "qpid/Url.h" #include "FailoverExchange.h" @@ -101,6 +102,9 @@ class HaBroker : public management::Manageable /** Authenticated user ID for queue create/delete */ std::string getUserId() const { return userId; } + /** logPrefix is thread safe and used by other classes (Membership) */ + LogPrefix logPrefix; + private: class BrokerObserver; diff --git a/qpid/cpp/src/qpid/ha/HaPlugin.cpp b/qpid/cpp/src/qpid/ha/HaPlugin.cpp index a958b0d29c..913a9b5084 100644 --- a/qpid/cpp/src/qpid/ha/HaPlugin.cpp +++ b/qpid/cpp/src/qpid/ha/HaPlugin.cpp @@ -76,7 +76,7 @@ struct HaPlugin : public Plugin { broker::Broker* broker = dynamic_cast<broker::Broker*>(&target); if (broker && (settings.cluster || settings.queueReplication)) { if (!broker->getManagementAgent()) { - QPID_LOG(warning, "HA plugin disabled because management is disabled"); + QPID_LOG(warning, "Cannot start HA: management is disabled"); if (settings.cluster) throw Exception("Cannot start HA: management is disabled"); } else { diff --git a/qpid/cpp/src/qpid/ha/IdSetter.h b/qpid/cpp/src/qpid/ha/IdSetter.h index 0350bf1519..f0629c99bb 100644 --- a/qpid/cpp/src/qpid/ha/IdSetter.h +++ b/qpid/cpp/src/qpid/ha/IdSetter.h @@ -33,6 +33,7 @@ namespace qpid { namespace ha { +class LogPrefix; /** * A MessageInterceptor that sets the ReplicationId on each message as it is @@ -43,16 +44,16 @@ namespace ha { class IdSetter : public broker::MessageInterceptor { public: - IdSetter(const std::string& q, ReplicationId firstId=1) : queue(q), nextId(firstId) { - QPID_LOG(debug, "Replication-ID will be set for " << queue << " from " << firstId); - } + IdSetter(const LogPrefix& lp, const std::string& q, ReplicationId firstId=1) : + logPrefix(lp), queue(q), nextId(firstId) + {} void record(broker::Message& m) { // Record is called when a message is first delivered to a queue, before it has // been enqueued or saved in a transaction buffer. This is when we normally want // to assign a replication-id. m.setReplicationId(nextId++); - QPID_LOG(trace, "Replication-ID set: " << logMessageId(queue, m.getReplicationId())); + QPID_LOG(trace, logPrefix << "Replication-ID set: " << logMessageId(queue, m.getReplicationId())); } void publish(broker::Message& m) { @@ -62,11 +63,12 @@ class IdSetter : public broker::MessageInterceptor // store record() is not called, so set the ID now if not already set. if (!m.hasReplicationId()) { m.setReplicationId(nextId++); - QPID_LOG(trace, "Replication-ID set: " << logMessageId(queue, m)); + QPID_LOG(trace, logPrefix << "Replication-ID set: " << logMessageId(queue, m)); } } private: + const LogPrefix& logPrefix; std::string queue; sys::AtomicValue<uint32_t> nextId; }; diff --git a/qpid/cpp/src/qpid/ha/LogPrefix.cpp b/qpid/cpp/src/qpid/ha/LogPrefix.cpp new file mode 100644 index 0000000000..c1ccf050c1 --- /dev/null +++ b/qpid/cpp/src/qpid/ha/LogPrefix.cpp @@ -0,0 +1,35 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "LogPrefix.h" +#include <iostream> + +namespace qpid { +namespace ha { + +std::ostream& operator<<(std::ostream& o, const LogPrefix& lp) { + return o << lp.get(); +} + +std::ostream& operator<<(std::ostream& o, const LogPrefix2& lp) { + return o << lp.prePrefix.get() << lp.get(); +} + +}} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/LogPrefix.h b/qpid/cpp/src/qpid/ha/LogPrefix.h new file mode 100644 index 0000000000..3b6bb17d99 --- /dev/null +++ b/qpid/cpp/src/qpid/ha/LogPrefix.h @@ -0,0 +1,75 @@ +#ifndef QPID_HA_LOGPREFIX_H +#define QPID_HA_LOGPREFIX_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/sys/Mutex.h> +#include <string> +#include <iosfwd> + +namespace qpid { +namespace ha { + +/** + * Thread safe string holder to hold a string that may be read and modified concurrently. + */ +class LogPrefix +{ + public: + explicit LogPrefix(const std::string& s=std::string()) : prefix(s) {} + void set(const std::string& s) { sys::RWlock::ScopedWlock l(lock); prefix = s; } + std::string get() const { sys::RWlock::ScopedRlock l(lock); return prefix; } + + LogPrefix& operator=(const std::string& s) { set(s); return *this; } + operator std::string() const { return get(); } + + private: + // Undefined, not copyable. + LogPrefix(const LogPrefix& lp); + LogPrefix& operator=(const LogPrefix&); + + mutable sys::RWlock lock; + std::string prefix; +}; +std::ostream& operator<<(std::ostream& o, const LogPrefix& lp); + +/** + * A two-part log prefix with a reference to a pre-prefix and a post-prefix. + * Operator << will print both parts, get/set just manage the post-prefix. + */ +class LogPrefix2 : public LogPrefix { + public: + const LogPrefix& prePrefix; + explicit LogPrefix2(const LogPrefix& lp, const std::string& s=std::string()) : LogPrefix(s), prePrefix(lp) {} + LogPrefix2& operator=(const std::string& s) { set(s); return *this; } + + private: + // Undefined, not copyable. + LogPrefix2(const LogPrefix2& lp); + LogPrefix2& operator=(const LogPrefix2&); +}; +std::ostream& operator<<(std::ostream& o, const LogPrefix2& lp); + + +}} // namespace qpid::ha + +#endif /*!QPID_HA_LOGPREFIX_H*/ diff --git a/qpid/cpp/src/qpid/ha/Membership.cpp b/qpid/cpp/src/qpid/ha/Membership.cpp index fdb47014d9..92a0b7db70 100644 --- a/qpid/cpp/src/qpid/ha/Membership.cpp +++ b/qpid/cpp/src/qpid/ha/Membership.cpp @@ -43,9 +43,14 @@ Membership::Membership(const BrokerInfo& info, HaBroker& b) : haBroker(b), self(info.getSystemId()) { brokers[self] = info; + setPrefix(); oldStatus = info.getStatus(); } +void Membership::setPrefix() { + haBroker.logPrefix = Msg() << shortStr(brokers[self].getSystemId()) + << "(" << printable(brokers[self].getStatus()) << ") "; +} void Membership::clear() { Mutex::ScopedLock l(lock); BrokerInfo me = brokers[self]; @@ -57,7 +62,7 @@ void Membership::add(const BrokerInfo& b) { Mutex::ScopedLock l(lock); assert(b.getSystemId() != self); brokers[b.getSystemId()] = b; - update(l); + update(true, l); } @@ -67,7 +72,7 @@ void Membership::remove(const types::Uuid& id) { BrokerInfo::Map::iterator i = brokers.find(id); if (i != brokers.end()) { brokers.erase(i); - update(l); + update(true, l); } } @@ -83,7 +88,7 @@ void Membership::assign(const types::Variant::List& list) { BrokerInfo b(i->asMap()); brokers[b.getSystemId()] = b; } - update(l); + update(true, l); } types::Variant::List Membership::asList() const { @@ -144,8 +149,7 @@ bool checkTransition(BrokerStatus from, BrokerStatus to) { } } // namespace -void Membership::update(Mutex::ScopedLock& l) { - QPID_LOG(info, "Membership: " << brokers); +void Membership::update(bool log, Mutex::ScopedLock& l) { // Update managment and send update event. BrokerStatus newStatus = getStatus(l); Variant::List brokerList = asList(l); @@ -171,27 +175,30 @@ void Membership::update(Mutex::ScopedLock& l) { // Check status transitions if (oldStatus != newStatus) { - QPID_LOG(info, "Status change: " + QPID_LOG(info, haBroker.logPrefix << "Status change: " << printable(oldStatus) << " -> " << printable(newStatus)); if (!checkTransition(oldStatus, newStatus)) { haBroker.shutdown(QPID_MSG("Illegal state transition: " << printable(oldStatus) << " -> " << printable(newStatus))); } oldStatus = newStatus; + setPrefix(); + if (newStatus == READY) QPID_LOG(notice, haBroker.logPrefix << "Backup is ready"); } + if (log) QPID_LOG(info, haBroker.logPrefix << "Membership update: " << brokers); } void Membership::setMgmtObject(boost::shared_ptr<_qmf::HaBroker> mo) { Mutex::ScopedLock l(lock); mgmtObject = mo; - update(l); + update(false, l); } void Membership::setStatus(BrokerStatus newStatus) { Mutex::ScopedLock l(lock); brokers[self].setStatus(newStatus); - update(l); + update(false, l); } BrokerStatus Membership::getStatus() const { @@ -215,7 +222,7 @@ BrokerInfo Membership::getSelf() const { void Membership::setSelfAddress(const Address& a) { Mutex::ScopedLock l(lock); brokers[self].setAddress(a); - update(l); + update(false, l); } }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/Membership.h b/qpid/cpp/src/qpid/ha/Membership.h index 5b2b72e2fc..5590d255fa 100644 --- a/qpid/cpp/src/qpid/ha/Membership.h +++ b/qpid/cpp/src/qpid/ha/Membership.h @@ -85,7 +85,8 @@ class Membership void setSelfAddress(const Address&); private: - void update(sys::Mutex::ScopedLock&); + void setPrefix(); + void update(bool log, sys::Mutex::ScopedLock&); BrokerStatus getStatus(sys::Mutex::ScopedLock&) const; types::Variant::List asList(sys::Mutex::ScopedLock&) const; diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp index dd41f74790..3790d14626 100644 --- a/qpid/cpp/src/qpid/ha/Primary.cpp +++ b/qpid/cpp/src/qpid/ha/Primary.cpp @@ -38,6 +38,7 @@ #include "qpid/framing/FieldValue.h" #include "qpid/log/Statement.h" #include "qpid/types/Uuid.h" +#include "qpid/types/Variant.h" #include "qpid/sys/Timer.h" #include <boost/bind.hpp> #include <boost/shared_ptr.hpp> @@ -54,6 +55,10 @@ using namespace framing; namespace { +const std::string CLIENT_PROCESS_NAME("qpid.client_process"); +const std::string CLIENT_PID("qpid.client_pid"); +const std::string CLIENT_PPID("qpid.client_ppid"); + class PrimaryConnectionObserver : public broker::ConnectionObserver { public: @@ -90,7 +95,7 @@ class ExpectedBackupTimerTask : public sys::TimerTask { class PrimaryErrorListener : public broker::SessionHandler::ErrorListener { public: - PrimaryErrorListener(const std::string& logPrefix_) : logPrefix(logPrefix_) {} + PrimaryErrorListener(const LogPrefix& lp) : logPrefix(lp) {} void connectionException(framing::connection::CloseCode code, const std::string& msg) { QPID_LOG(debug, logPrefix << framing::createConnectionException(code, msg).what()); @@ -104,17 +109,15 @@ class PrimaryErrorListener : public broker::SessionHandler::ErrorListener { void incomingExecutionException(framing::execution::ErrorCode code, const std::string& msg) { QPID_LOG(debug, logPrefix << "Incoming " << framing::createSessionException(code, msg).what()); } - void detach() { - QPID_LOG(debug, logPrefix << "Session detached."); - } + void detach() {} private: - std::string logPrefix; + const LogPrefix& logPrefix; }; class PrimarySessionHandlerObserver : public broker::SessionHandlerObserver { public: - PrimarySessionHandlerObserver(const std::string& logPrefix) + PrimarySessionHandlerObserver(const LogPrefix& logPrefix) : errorListener(new PrimaryErrorListener(logPrefix)) {} void newSessionHandler(broker::SessionHandler& sh) { BrokerInfo info; @@ -133,7 +136,7 @@ class PrimarySessionHandlerObserver : public broker::SessionHandlerObserver { Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) : haBroker(hb), membership(hb.getMembership()), - logPrefix("Primary: "), active(false), + logPrefix(hb.logPrefix), active(false), replicationTest(hb.getSettings().replicateDefault.get()), sessionHandlerObserver(new PrimarySessionHandlerObserver(logPrefix)), queueLimits(logPrefix, hb.getBroker().getQueues(), replicationTest) @@ -142,6 +145,7 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) : // So we are safe from client interference while we set up the primary. hb.getMembership().setStatus(RECOVERING); + QPID_LOG(notice, logPrefix << "Promoted to primary"); // Process all QueueReplicators, handles auto-delete queues. QueueReplicator::Vector qrs; @@ -152,10 +156,9 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) : // NOTE: RemoteBackups must be created before we set the BrokerObserver // or ConnectionObserver so that there is no client activity while // the QueueGuards are created. - QPID_LOG(notice, logPrefix << "Promoted and recovering, waiting for backups: " - << expect); + QPID_LOG(notice, logPrefix << "Recovering backups: " << expect); for (BrokerInfo::Set::const_iterator i = expect.begin(); i != expect.end(); ++i) { - boost::shared_ptr<RemoteBackup> backup(new RemoteBackup(*i, 0)); + boost::shared_ptr<RemoteBackup> backup(new RemoteBackup(*i, 0, logPrefix)); backups[i->getSystemId()] = backup; if (!backup->isReady()) expectedBackups.insert(backup); setCatchupQueues(backup, true); // Create guards @@ -173,7 +176,7 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) : // Allow client connections connectionObserver.reset(new PrimaryConnectionObserver(*this)); - haBroker.getObserver()->setObserver(connectionObserver, logPrefix); + haBroker.getObserver()->setObserver(connectionObserver); } Primary::~Primary() { @@ -191,8 +194,8 @@ void Primary::checkReady() { activate = active = true; } if (activate) { - QPID_LOG(notice, logPrefix << "Promoted and active."); membership.setStatus(ACTIVE); // Outside of lock. + QPID_LOG(notice, logPrefix << "All backups recovered."); } } @@ -205,7 +208,7 @@ void Primary::checkReady(boost::shared_ptr<RemoteBackup> backup) { info.setStatus(READY); membership.add(info); if (expectedBackups.erase(backup)) { - QPID_LOG(info, logPrefix << "Expected backup is ready: " << info); + QPID_LOG(info, logPrefix << "Recovering backup is ready: " << info); ready = true; } else @@ -229,7 +232,7 @@ void Primary::timeoutExpectedBackups() { boost::shared_ptr<RemoteBackup> backup = *j; if (!backup->getConnection()) { BrokerInfo info = backup->getBrokerInfo(); - QPID_LOG(error, logPrefix << "Expected backup timed out: " << info); + QPID_LOG(error, logPrefix << "Recovering backup timed out: " << info); backupDisconnect(backup, l); // Calls erase(j) // Keep broker in membership but downgrade status to CATCHUP. // The broker will get this status change when it eventually connects. @@ -303,6 +306,8 @@ void Primary::queueCreate(const QueuePtr& q) { ReplicateLevel level = replicationTest.useLevel(*q); q->addArgument(QPID_REPLICATE, printable(level).str()); if (level) { + QPID_LOG(debug, logPrefix << "Created queue " << q->getName() + << " replication: " << printable(level)); // Give each queue a unique id. Used by backups to avoid confusion of // same-named queues. q->addArgument(QPID_HA_UUID, types::Variant(Uuid(true))); @@ -312,8 +317,6 @@ void Primary::queueCreate(const QueuePtr& q) { for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i) i->second->queueCreate(q); } - QPID_LOG(debug, logPrefix << "Created queue " << q->getName() - << " replication: " << printable(level)); checkReady(); // Outside lock } } @@ -358,7 +361,7 @@ void Primary::exchangeDestroy(const ExchangePtr& ex) { shared_ptr<RemoteBackup> Primary::backupConnect( const BrokerInfo& info, broker::Connection& connection, Mutex::ScopedLock&) { - shared_ptr<RemoteBackup> backup(new RemoteBackup(info, &connection)); + shared_ptr<RemoteBackup> backup(new RemoteBackup(info, &connection, logPrefix)); queueLimits.addBackup(backup); backups[info.getSystemId()] = backup; return backup; @@ -382,7 +385,15 @@ void Primary::opened(broker::Connection& connection) { if (ha::ConnectionObserver::getBrokerInfo(connection, info)) { Mutex::ScopedLock l(lock); BackupMap::iterator i = backups.find(info.getSystemId()); + if (info.getStatus() == JOINING) { + info.setStatus(CATCHUP); + membership.add(info); + } if (i == backups.end()) { + if (info.getStatus() == JOINING) { + info.setStatus(CATCHUP); + membership.add(info); + } QPID_LOG(info, logPrefix << "New backup connection: " << info); backup = backupConnect(info, connection, l); } @@ -397,13 +408,20 @@ void Primary::opened(broker::Connection& connection) { i->second->setConnection(&connection); backup = i->second; } - if (info.getStatus() == JOINING) { - info.setStatus(CATCHUP); - membership.add(info); + } + else { + const types::Variant::Map& properties = connection.getClientProperties(); + std::ostringstream pinfo; + types::Variant::Map::const_iterator i = properties.find(CLIENT_PROCESS_NAME); + // FIXME aconway 2014-08-13: Conditional on logging. + if (i != properties.end()) { + pinfo << " " << i->second; + i = properties.find(CLIENT_PID); + if (i != properties.end()) + pinfo << "(" << i->second << ")"; } + QPID_LOG(info, logPrefix << "Accepted client connection " << connection.getMgmtId() << pinfo.str()); } - else - QPID_LOG(debug, logPrefix << "Accepted client connection " << connection.getMgmtId()); // Outside lock if (backup) { @@ -448,7 +466,7 @@ boost::shared_ptr<QueueGuard> Primary::getGuard(const QueuePtr& q, const BrokerI } Role* Primary::promote() { - QPID_LOG(info, "Ignoring promotion, already primary: " << haBroker.getBrokerInfo()); + QPID_LOG(info, logPrefix << "Ignoring promotion, already primary"); return 0; } diff --git a/qpid/cpp/src/qpid/ha/Primary.h b/qpid/cpp/src/qpid/ha/Primary.h index 46cf990834..84d714fc01 100644 --- a/qpid/cpp/src/qpid/ha/Primary.h +++ b/qpid/cpp/src/qpid/ha/Primary.h @@ -25,6 +25,7 @@ #include "types.h" #include "hash.h" #include "BrokerInfo.h" +#include "LogPrefix.h" #include "PrimaryQueueLimits.h" #include "ReplicationTest.h" #include "Role.h" @@ -81,7 +82,6 @@ class Primary : public Role ~Primary(); // Role implementation - std::string getLogPrefix() const { return logPrefix; } Role* promote(); void setBrokerUrl(const Url&) {} @@ -142,7 +142,7 @@ class Primary : public Role mutable sys::Mutex lock; HaBroker& haBroker; Membership& membership; - std::string logPrefix; + const LogPrefix& logPrefix; bool active; ReplicationTest replicationTest; diff --git a/qpid/cpp/src/qpid/ha/PrimaryQueueLimits.h b/qpid/cpp/src/qpid/ha/PrimaryQueueLimits.h index d614a48099..6d0c55736a 100644 --- a/qpid/cpp/src/qpid/ha/PrimaryQueueLimits.h +++ b/qpid/cpp/src/qpid/ha/PrimaryQueueLimits.h @@ -36,6 +36,7 @@ class Queue; } namespace ha { +class LogPrefix; class RemoteBackup; /** @@ -48,7 +49,7 @@ class PrimaryQueueLimits { public: // FIXME aconway 2014-01-24: hardcoded maxQueues, use negotiated channel-max - PrimaryQueueLimits(const std::string& lp, + PrimaryQueueLimits(const LogPrefix& lp, broker::QueueRegistry& qr, const ReplicationTest& rt ) : @@ -97,7 +98,7 @@ class PrimaryQueueLimits void removeBackup(const boost::shared_ptr<RemoteBackup>&) {} private: - std::string logPrefix; + const LogPrefix& logPrefix; uint64_t maxQueues; uint64_t queues; }; diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp index 90046a8c5a..bbfcbba3a3 100644 --- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp +++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp @@ -94,6 +94,7 @@ PrimaryTxObserver::PrimaryTxObserver( Primary& p, HaBroker& hb, const boost::intrusive_ptr<broker::TxBuffer>& tx ) : state(SENDING), + logPrefix(hb.logPrefix), primary(p), haBroker(hb), broker(hb.getBroker()), replicationTest(hb.getSettings().replicateDefault.get()), txBuffer(tx), @@ -101,7 +102,7 @@ PrimaryTxObserver::PrimaryTxObserver( exchangeName(TRANSACTION_REPLICATOR_PREFIX+id.str()), empty(true) { - logPrefix = "Primary transaction "+shortStr(id)+": "; + logPrefix = "Primary TX "+shortStr(id)+": "; // The brokers known at this point are the ones that will be included // in the transaction. Brokers that join later are not included. @@ -115,8 +116,7 @@ PrimaryTxObserver::PrimaryTxObserver( for (size_t i = 0; i < incomplete.size(); ++i) txBuffer->startCompleter(); - QPID_LOG(debug, logPrefix << "Started TX " << id); - QPID_LOG(debug, logPrefix << "Backups: " << backups); + QPID_LOG(debug, logPrefix << "Started, backups " << backups); } void PrimaryTxObserver::initialize() { @@ -140,9 +140,7 @@ void PrimaryTxObserver::initialize() { } -PrimaryTxObserver::~PrimaryTxObserver() { - QPID_LOG(debug, logPrefix << "Ended"); -} +PrimaryTxObserver::~PrimaryTxObserver() {} void PrimaryTxObserver::checkState(State expect, const std::string& msg) { if (state != expect) @@ -254,7 +252,7 @@ void PrimaryTxObserver::end(Mutex::ScopedLock&) { try { broker.getExchanges().destroy(getExchangeName()); } catch (const std::exception& e) { - QPID_LOG(error, logPrefix << "Deleting transaction exchange: " << e.what()); + QPID_LOG(error, logPrefix << "Deleting TX exchange: " << e.what()); } } @@ -266,11 +264,12 @@ bool PrimaryTxObserver::completed(const Uuid& id, Mutex::ScopedLock&) { return false; } -bool PrimaryTxObserver::error(const Uuid& id, const char* msg, Mutex::ScopedLock& l) +bool PrimaryTxObserver::error(const Uuid& id, const std::string& msg, Mutex::ScopedLock& l) { if (incomplete.find(id) != incomplete.end()) { // Note: setError before completed since completed may trigger completion. - txBuffer->setError(QPID_MSG(logPrefix << msg << id)); + // Only use the TX part of the log prefix. + txBuffer->setError(Msg() << logPrefix.get() << msg << shortStr(id) << "."); completed(id, l); return true; } @@ -290,7 +289,7 @@ void PrimaryTxObserver::txPrepareOkEvent(const string& data) { void PrimaryTxObserver::txPrepareFailEvent(const string& data) { Mutex::ScopedLock l(lock); types::Uuid backup = decodeStr<TxPrepareFailEvent>(data).broker; - if (error(backup, "Prepare failed on backup: ", l)) { + if (error(backup, "Prepare failed on backup ", l)) { QPID_LOG(error, logPrefix << "Prepare failed on backup " << backup); } else { QPID_LOG(error, logPrefix << "Unexpected prepare-fail response from " << backup); diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h index 6ea1ba185b..6f445ee212 100644 --- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h +++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h @@ -24,6 +24,7 @@ #include "types.h" #include "ReplicationTest.h" +#include "LogPrefix.h" #include "qpid/broker/SessionState.h" #include "qpid/broker/TransactionObserver.h" #include "qpid/log/Statement.h" @@ -105,11 +106,11 @@ class PrimaryTxObserver : public broker::TransactionObserver, void txPrepareOkEvent(const std::string& data); void txPrepareFailEvent(const std::string& data); bool completed(const types::Uuid& id, sys::Mutex::ScopedLock&); - bool error(const types::Uuid& id, const char* msg, sys::Mutex::ScopedLock& l); + bool error(const types::Uuid& id, const std::string& msg, sys::Mutex::ScopedLock& l); sys::Monitor lock; State state; - std::string logPrefix; + LogPrefix2 logPrefix; Primary& primary; HaBroker& haBroker; broker::Broker& broker; diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.cpp b/qpid/cpp/src/qpid/ha/QueueGuard.cpp index 94b7a53937..b6b6037b6f 100644 --- a/qpid/cpp/src/qpid/ha/QueueGuard.cpp +++ b/qpid/cpp/src/qpid/ha/QueueGuard.cpp @@ -47,8 +47,8 @@ class QueueGuard::QueueObserver : public broker::QueueObserver -QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info) - : cancelled(false), queue(q) +QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info, const LogPrefix& lp) + : cancelled(false), logPrefix(lp), queue(q) { std::ostringstream os; os << "Guard of " << queue.getName() << " at "; @@ -61,7 +61,9 @@ QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info) QueuePosition front, back; q.getRange(front, back, broker::REPLICATOR); first = back + 1; - QPID_LOG(debug, logPrefix << "First guarded position " << first); + QPID_LOG(debug, logPrefix << "Guarded: front " << front + << ", back " << back + << ", guarded " << first); } QueueGuard::~QueueGuard() { cancel(); } diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.h b/qpid/cpp/src/qpid/ha/QueueGuard.h index 33967970eb..9bf61f31da 100644 --- a/qpid/cpp/src/qpid/ha/QueueGuard.h +++ b/qpid/cpp/src/qpid/ha/QueueGuard.h @@ -24,6 +24,7 @@ #include "types.h" #include "hash.h" +#include "LogPrefix.h" #include "qpid/types/Uuid.h" #include "qpid/sys/Mutex.h" #include "qpid/sys/unordered_map.h" @@ -59,7 +60,7 @@ class ReplicatingSubscription; */ class QueueGuard { public: - QueueGuard(broker::Queue& q, const BrokerInfo&); + QueueGuard(broker::Queue& q, const BrokerInfo&, const LogPrefix&); ~QueueGuard(); /** QueueObserver override. Delay completion of the message. @@ -97,7 +98,7 @@ class QueueGuard { sys::Mutex lock; QueuePosition first; bool cancelled; - std::string logPrefix; + LogPrefix2 logPrefix; broker::Queue& queue; Delayed delayed; boost::shared_ptr<QueueObserver> observer; diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index 4abbb3affb..7997bc6aa9 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -22,6 +22,7 @@ #include "Event.h" #include "HaBroker.h" #include "IdSetter.h" +#include "LogPrefix.h" #include "QueueReplicator.h" #include "QueueSnapshot.h" #include "ReplicatingSubscription.h" @@ -97,12 +98,11 @@ class QueueReplicator::ErrorListener : public SessionHandler::ErrorListener { QPID_LOG(error, logPrefix << "Incoming " << framing::createSessionException(code, msg).what()); } - void detach() { - QPID_LOG(debug, logPrefix << "Session detached"); - } + void detach() {} + private: boost::weak_ptr<QueueReplicator> queueReplicator; - std::string logPrefix; + const LogPrefix& logPrefix; }; class QueueReplicator::QueueObserver : public broker::QueueObserver { @@ -152,11 +152,12 @@ QueueReplicator::QueueReplicator(HaBroker& hb, link(l), queue(q), sessionHandler(0), - logPrefix("Backup of "+q->getName()+": "), + logPrefix(hb.logPrefix, "Backup of "+q->getName()+": "), subscribed(false), settings(hb.getSettings()), nextId(0), maxId(0) { + QPID_LOG(debug, logPrefix << "Created"); // The QueueReplicator will take over setting replication IDs. boost::shared_ptr<IdSetter> setter = q->getMessageInterceptors().findType<IdSetter>(); @@ -181,7 +182,6 @@ QueueReplicator::~QueueReplicator() {} void QueueReplicator::initialize() { Mutex::ScopedLock l(lock); - QPID_LOG(debug, logPrefix << "Created"); if (!queue) return; // Already destroyed // Enable callback to route() @@ -255,10 +255,13 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa arguments.setInt(QPID_SYNC_FREQUENCY, 1); // TODO aconway 2012-05-22: optimize? arguments.setTable(ReplicatingSubscription::QPID_BROKER_INFO, brokerInfo.asFieldTable()); boost::shared_ptr<QueueSnapshot> qs = queue->getObservers().findType<QueueSnapshot>(); - if (qs) arguments.set(ReplicatingSubscription::QPID_ID_SET, - FieldTable::ValuePtr( - new Var32Value(encodeStr(qs->getSnapshot()), TYPE_CODE_VBIN32))); - + ReplicationIdSet snapshot; + if (qs) { + snapshot = qs->getSnapshot(); + arguments.set( + ReplicatingSubscription::QPID_ID_SET, + FieldTable::ValuePtr(new Var32Value(encodeStr(snapshot), TYPE_CODE_VBIN32))); + } try { peer.getMessage().subscribe( args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/, @@ -268,12 +271,12 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa peer.getMessage().flow(getName(), 1, settings.getFlowBytes()); } catch(const exception& e) { - QPID_LOG(error, QPID_MSG(logPrefix + "Cannot connect to primary: " << e.what())); + QPID_LOG(error, logPrefix << "Cannot connect to primary: " << e.what()); throw; } qpid::Address primary; link->getRemoteAddress(primary); - QPID_LOG(debug, logPrefix << "Connected to " << primary << "(" << bridgeName << ")"); + QPID_LOG(debug, logPrefix << "Connected to " << primary << " snapshot=" << snapshot << " bridge=" << bridgeName); QPID_LOG(trace, logPrefix << "Subscription arguments: " << arguments); } @@ -391,7 +394,7 @@ void QueueReplicator::promoted() { // On primary QueueReplicator no longer sets IDs, start an IdSetter. QPID_LOG(debug, logPrefix << "Promoted, first replication-id " << maxId+1) queue->getMessageInterceptors().add( - boost::shared_ptr<IdSetter>(new IdSetter(queue->getName(), maxId+1))); + boost::shared_ptr<IdSetter>(new IdSetter(logPrefix, queue->getName(), maxId+1))); // Process auto-deletes if (queue->isAutoDelete()) { // Make a temporary shared_ptr to prevent premature deletion of queue. diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h index 3d525db440..a4b31b6c9a 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.h +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h @@ -22,7 +22,10 @@ * */ + + #include "BrokerInfo.h" +#include "LogPrefix.h" #include "hash.h" #include "qpid/broker/Exchange.h" #include <boost/enable_shared_from_this.hpp> @@ -134,7 +137,7 @@ class QueueReplicator : public broker::Exchange, bool deletedOnPrimary(framing::execution::ErrorCode e, const std::string& msg); - std::string logPrefix; + LogPrefix2 logPrefix; std::string bridgeName; bool subscribed; diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp index c0a118d57f..c263d37e43 100644 --- a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp +++ b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp @@ -35,13 +35,13 @@ using sys::Mutex; using boost::bind; RemoteBackup::RemoteBackup( - const BrokerInfo& info, broker::Connection* c -) : brokerInfo(info), replicationTest(NONE), started(false), connection(c), reportedReady(false) + const BrokerInfo& info, broker::Connection* c, const LogPrefix& lp +) : logPrefix(lp), brokerInfo(info), replicationTest(NONE), + started(false), connection(c), reportedReady(false) { std::ostringstream oss; oss << "Remote backup at " << info << ": "; logPrefix = oss.str(); - QPID_LOG(debug, logPrefix << (c? "Connected" : "Expected")); } RemoteBackup::~RemoteBackup() { @@ -70,7 +70,7 @@ void RemoteBackup::catchupQueue(const QueuePtr& q, bool createGuard) { QPID_LOG(debug, logPrefix << "Catch-up queue" << (createGuard ? " and guard" : "") << ": " << q->getName()); catchupQueues.insert(q); - if (createGuard) guards[q].reset(new QueueGuard(*q, brokerInfo)); + if (createGuard) guards[q].reset(new QueueGuard(*q, brokerInfo, logPrefix)); } } @@ -86,18 +86,12 @@ RemoteBackup::GuardPtr RemoteBackup::guard(const QueuePtr& q) { void RemoteBackup::ready(const QueuePtr& q) { catchupQueues.erase(q); - if (catchupQueues.size()) { - QPID_LOG(debug, logPrefix << "Caught up on queue: " << q->getName() << ", " - << catchupQueues.size() << " remain to catch up"); - } - else - QPID_LOG(debug, logPrefix << "Caught up on queue: " << q->getName() ); } // Called via BrokerObserver::queueCreate and from catchupQueue void RemoteBackup::queueCreate(const QueuePtr& q) { if (replicationTest.getLevel(*q) == ALL) - guards[q].reset(new QueueGuard(*q, brokerInfo)); + guards[q].reset(new QueueGuard(*q, brokerInfo, logPrefix)); } // Called via BrokerObserver @@ -112,6 +106,7 @@ void RemoteBackup::queueDestroy(const QueuePtr& q) { bool RemoteBackup::reportReady() { if (!reportedReady && isReady()) { + if (catchupQueues.empty()) QPID_LOG(debug, logPrefix << "Caught up."); reportedReady = true; return true; } diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.h b/qpid/cpp/src/qpid/ha/RemoteBackup.h index b9e2e1a496..77c493d27e 100644 --- a/qpid/cpp/src/qpid/ha/RemoteBackup.h +++ b/qpid/cpp/src/qpid/ha/RemoteBackup.h @@ -22,6 +22,7 @@ * */ +#include "LogPrefix.h" #include "ReplicationTest.h" #include "BrokerInfo.h" #include "types.h" @@ -56,7 +57,7 @@ class RemoteBackup /** Note: isReady() can be true after construction *@param connected true if the backup is already connected. */ - RemoteBackup(const BrokerInfo&, broker::Connection*); + RemoteBackup(const BrokerInfo&, broker::Connection*, const LogPrefix&); ~RemoteBackup(); /** Return guard associated with a queue. Used to create ReplicatingSubscription. */ @@ -102,7 +103,7 @@ class RemoteBackup typedef std::set<QueuePtr> QueueSet; - std::string logPrefix; + LogPrefix2 logPrefix; BrokerInfo brokerInfo; ReplicationTest replicationTest; GuardMap guards; diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp index 67e1e77681..e08a34529e 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -107,7 +107,7 @@ ReplicatingSubscription::ReplicatingSubscription( const framing::FieldTable& arguments ) : ConsumerImpl(parent, name, queue_, ack, REPLICATOR, exclusive, tag, resumeId, resumeTtl, arguments), - + logPrefix(hb.logPrefix), position(0), wasStopped(false), ready(false), cancelled(false), haBroker(hb), primary(boost::dynamic_pointer_cast<Primary>(haBroker.getRole())) @@ -121,7 +121,7 @@ void ReplicatingSubscription::initialize() { FieldTable ft; if (!getArguments().getTable(ReplicatingSubscription::QPID_BROKER_INFO, ft)) throw InvalidArgumentException( - logPrefix+"Can't subscribe, no broker info: "+getTag()); + logPrefix.get()+"Can't subscribe, no broker info: "+getTag()); info.assign(ft); // Set a log prefix message that identifies the remote broker. @@ -132,7 +132,7 @@ void ReplicatingSubscription::initialize() { // If there's already a guard (we are in failover) use it, else create one. if (primary) guard = primary->getGuard(queue, info); - if (!guard) guard.reset(new QueueGuard(*queue, info)); + if (!guard) guard.reset(new QueueGuard(*queue, info, logPrefix.prePrefix)); // NOTE: Once the observer is attached we can have concurrent // calls to dequeued so we need to lock use of this->dequeues. @@ -147,7 +147,7 @@ void ReplicatingSubscription::initialize() { if (!snapshot) { queue->getObservers().remove( boost::dynamic_pointer_cast<ReplicatingSubscription>(shared_from_this())); - throw ResourceDeletedException(logPrefix+"Can't subscribe, queue deleted"); + throw ResourceDeletedException(logPrefix.get()+"Can't subscribe, queue deleted"); } ReplicationIdSet primaryIds = snapshot->getSnapshot(); std::string backupStr = getArguments().getAsString(ReplicatingSubscription::QPID_ID_SET); @@ -166,10 +166,10 @@ void ReplicatingSubscription::initialize() { // position >= front so if front is safe then position must be. position = front; - QPID_LOG(debug, logPrefix << "Subscribed: front " << front - << ", back " << back + QPID_LOG(debug, logPrefix << "Subscribed: primary [" + << front << "," << back << "]=" << primaryIds << ", guarded " << guard->getFirst() - << ", on backup " << skipEnqueue); + << ", backup (keep " << skipEnqueue << ", drop " << initDequeues << ")"); checkReady(l); } @@ -242,7 +242,12 @@ void ReplicatingSubscription::checkReady(sys::Mutex::ScopedLock& l) { ready = true; sys::Mutex::ScopedUnlock u(lock); // Notify Primary that a subscription is ready. - QPID_LOG(debug, logPrefix << "Caught up"); + if (position+1 >= guard->getFirst()) { + QPID_LOG(debug, logPrefix << "Caught up at " << position); + } else { + QPID_LOG(debug, logPrefix << "Caught up at " << position << "short of guard at " << guard->getFirst()); + } + if (primary) primary->readyReplica(*this); } } diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h index 08c08b0ca3..d6d41dd2cf 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h @@ -23,6 +23,7 @@ */ #include "BrokerInfo.h" +#include "LogPrefix.h" #include "qpid/broker/SemanticState.h" #include "qpid/broker/ConsumerFactory.h" #include "qpid/broker/QueueObserver.h" @@ -144,7 +145,7 @@ class ReplicatingSubscription : bool doDispatch(); private: - std::string logPrefix; + LogPrefix2 logPrefix; QueuePosition position; ReplicationIdSet dequeues; // Dequeues to be sent in next dequeue event. ReplicationIdSet skipEnqueue; // Enqueues to skip: messages already on backup and tx enqueues. diff --git a/qpid/cpp/src/qpid/ha/Role.h b/qpid/cpp/src/qpid/ha/Role.h index 9986fde7e1..5392ce1fff 100644 --- a/qpid/cpp/src/qpid/ha/Role.h +++ b/qpid/cpp/src/qpid/ha/Role.h @@ -40,9 +40,6 @@ class Role public: virtual ~Role() {} - /** Log prefix appropriate to the role */ - virtual std::string getLogPrefix() const = 0; - /** QMF promote method handler. * @return The new role if promoted, 0 if not. Caller takes ownership. */ diff --git a/qpid/cpp/src/qpid/ha/StandAlone.h b/qpid/cpp/src/qpid/ha/StandAlone.h index d052996d40..01bcf1a0b3 100644 --- a/qpid/cpp/src/qpid/ha/StandAlone.h +++ b/qpid/cpp/src/qpid/ha/StandAlone.h @@ -33,12 +33,8 @@ namespace ha { class StandAlone : public Role { public: - std::string getLogPrefix() const { return logPrefix; } Role* promote() { return 0; } void setBrokerUrl(const Url&) {} - - private: - std::string logPrefix; }; }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/TxReplicator.cpp b/qpid/cpp/src/qpid/ha/TxReplicator.cpp index ee8bd342b2..33adc9780d 100644 --- a/qpid/cpp/src/qpid/ha/TxReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/TxReplicator.cpp @@ -39,10 +39,11 @@ #include "qpid/broker/DeliverableMessage.h" #include "qpid/framing/BufferTypes.h" #include "qpid/log/Statement.h" -#include <boost/shared_ptr.hpp> -#include <boost/bind.hpp> #include "qpid/broker/amqp_0_10/MessageTransfer.h" #include "qpid/framing/MessageTransferBody.h" +#include <boost/shared_ptr.hpp> +#include <boost/bind.hpp> +#include <sstream> namespace qpid { namespace ha { @@ -57,15 +58,19 @@ namespace { const string PREFIX(TRANSACTION_REPLICATOR_PREFIX); } // namespace - - bool TxReplicator::isTxQueue(const string& q) { return startsWith(q, PREFIX); } -string TxReplicator::getTxId(const string& q) { - assert(isTxQueue(q)); - return q.substr(PREFIX.size()); +Uuid TxReplicator::getTxId(const string& q) { + if (TxReplicator::isTxQueue(q)) { + std::istringstream is(q); + is.seekg(PREFIX.size()); + Uuid id; + is >> id; + if (!is.fail()) return id; + } + throw Exception(QPID_MSG("Invalid tx queue: " << q)); } string TxReplicator::getType() const { return ReplicatingSubscription::QPID_TX_REPLICATOR; } @@ -85,15 +90,14 @@ TxReplicator::TxReplicator( const boost::shared_ptr<broker::Queue>& txQueue, const boost::shared_ptr<broker::Link>& link) : QueueReplicator(hb, txQueue, link), + logPrefix(hb.logPrefix), store(hb.getBroker().hasStore() ? &hb.getBroker().getStore() : 0), channel(link->nextChannel()), empty(true), ended(false), dequeueState(hb.getBroker().getQueues()) { - string id(getTxId(txQueue->getName())); - string shortId = id.substr(0, 8); - logPrefix = "Backup of transaction "+shortId+": "; - QPID_LOG(debug, logPrefix << "Started TX " << id); + logPrefix = "Backup of TX "+shortStr(getTxId(txQueue->getName()))+": "; + QPID_LOG(debug, logPrefix << "Started"); if (!store) throw Exception(QPID_MSG(logPrefix << "No message store loaded.")); // Dispatch transaction events. @@ -213,7 +217,7 @@ void TxReplicator::prepare(const string&, sys::Mutex::ScopedLock& l) { QPID_LOG(debug, logPrefix << "Local prepare OK"); sendMessage(TxPrepareOkEvent(haBroker.getSystemId()).message(queue->getName()), l); } else { - QPID_LOG(debug, logPrefix << "Local prepare failed"); + QPID_LOG(error, logPrefix << "Local prepare failed"); sendMessage(TxPrepareFailEvent(haBroker.getSystemId()).message(queue->getName()), l); } } @@ -240,7 +244,7 @@ void TxReplicator::backups(const string& data, sys::Mutex::ScopedLock& l) { TxBackupsEvent e; decodeStr(data, e); if (!e.backups.count(haBroker.getMembership().getSelf().getSystemId())) { - QPID_LOG(info, logPrefix << "Not participating in transaction"); + QPID_LOG(info, logPrefix << "Not participating"); end(l); } else { QPID_LOG(debug, logPrefix << "Backups: " << e.backups); diff --git a/qpid/cpp/src/qpid/ha/TxReplicator.h b/qpid/cpp/src/qpid/ha/TxReplicator.h index fe25fbc78b..c7599d21b1 100644 --- a/qpid/cpp/src/qpid/ha/TxReplicator.h +++ b/qpid/cpp/src/qpid/ha/TxReplicator.h @@ -22,11 +22,13 @@ * */ +#include "LogPrefix.h" #include "QueueReplicator.h" #include "Event.h" #include "qpid/broker/DeliveryRecord.h" #include "qpid/broker/TransactionalStore.h" #include "qpid/sys/Mutex.h" +#include "qpid/types/Uuid.h" namespace qpid { @@ -56,7 +58,7 @@ class TxReplicator : public QueueReplicator { typedef boost::shared_ptr<broker::Link> LinkPtr; static bool isTxQueue(const std::string& queue); - static std::string getTxId(const std::string& queue); + static types::Uuid getTxId(const std::string& queue); static boost::shared_ptr<TxReplicator> create( HaBroker&, const QueuePtr& txQueue, const LinkPtr& link); @@ -90,7 +92,7 @@ class TxReplicator : public QueueReplicator { void backups(const std::string& data, sys::Mutex::ScopedLock&); void end(sys::Mutex::ScopedLock&); - std::string logPrefix; + LogPrefix2 logPrefix; TxEnqueueEvent enq; // Enqueue data for next deliver. boost::intrusive_ptr<broker::TxBuffer> txBuffer; broker::MessageStore* store; diff --git a/qpid/cpp/src/tests/qpid-txtest2.cpp b/qpid/cpp/src/tests/qpid-txtest2.cpp index 3d9941baee..d64f13d9c5 100644 --- a/qpid/cpp/src/tests/qpid-txtest2.cpp +++ b/qpid/cpp/src/tests/qpid-txtest2.cpp @@ -165,7 +165,7 @@ struct Client session.close(); connection.close(); } catch(const std::exception& e) { - std::cout << e.what() << std::endl; + std::cout << "Client shutdown: " << e.what() << std::endl; } } }; @@ -350,7 +350,7 @@ int main(int argc, char** argv) } return 0; } catch(const std::exception& e) { - std::cout << e.what() << std::endl; + std::cout << argv[0] << ": " << e.what() << std::endl; } return 2; } |