summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/ha/HaBroker.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/ha/HaBroker.cpp')
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp30
1 files changed, 17 insertions, 13 deletions
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index f154e45a22..7699b0e1d2 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -69,10 +69,16 @@ using boost::dynamic_pointer_cast;
//
class HaBroker::BrokerObserver : public broker::BrokerObserver {
public:
+ BrokerObserver(const LogPrefix& lp) : logPrefix(lp) {}
+
void queueCreate(const boost::shared_ptr<broker::Queue>& q) {
q->getObservers().add(boost::shared_ptr<QueueSnapshot>(new QueueSnapshot));
- q->getMessageInterceptors().add(boost::shared_ptr<IdSetter>(new IdSetter(q->getName())));
+ q->getMessageInterceptors().add(
+ boost::shared_ptr<IdSetter>(new IdSetter(logPrefix, q->getName())));
}
+
+ private:
+ const LogPrefix& logPrefix;
};
// Called in Plugin::earlyInitialize
@@ -83,20 +89,19 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s)
broker(b),
observer(new ConnectionObserver(*this, systemId)),
role(new StandAlone),
- membership(BrokerInfo(systemId, STANDALONE), *this),
+ membership(BrokerInfo(systemId, STANDALONE), *this), // Sets logPrefix
failoverExchange(new FailoverExchange(*b.GetVhostObject(), b))
{
// If we are joining a cluster we must start excluding clients now,
// otherwise there's a window for a client to connect before we get to
// initialize()
if (settings.cluster) {
- QPID_LOG(debug, "Backup starting, rejecting client connections.");
- shared_ptr<broker::ConnectionObserver> excluder(new BackupConnectionExcluder);
- observer->setObserver(excluder, "Backup: ");
+ shared_ptr<broker::ConnectionObserver> excluder(new BackupConnectionExcluder(logPrefix));
+ observer->setObserver(excluder);
broker.getConnectionObservers().add(observer);
broker.getExchanges().registerExchange(failoverExchange);
}
- broker.getBrokerObservers().add(boost::shared_ptr<BrokerObserver>(new BrokerObserver()));
+ broker.getBrokerObservers().add(boost::shared_ptr<BrokerObserver>(new BrokerObserver(logPrefix)));
}
namespace {
@@ -107,8 +112,8 @@ bool isNone(const std::string& x) { return x.empty() || x == NONE; }
// Called in Plugin::initialize
void HaBroker::initialize() {
if (settings.cluster) {
+ QPID_LOG(info, logPrefix << "Starting HA broker");
membership.setStatus(JOINING);
- QPID_LOG(info, "Initializing HA broker: " << membership.getSelf());
}
// Set up the management object.
@@ -138,7 +143,6 @@ void HaBroker::initialize() {
}
HaBroker::~HaBroker() {
- QPID_LOG(notice, role->getLogPrefix() << "Shut down");
broker.getConnectionObservers().remove(observer);
}
@@ -160,7 +164,7 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args,
case _qmf::HaBroker::METHOD_REPLICATE: {
_qmf::ArgsHaBrokerReplicate& bq_args =
dynamic_cast<_qmf::ArgsHaBrokerReplicate&>(args);
- QPID_LOG(debug, role->getLogPrefix() << "Replicate individual queue "
+ QPID_LOG(debug, logPrefix << "Replicate individual queue "
<< bq_args.i_queue << " from " << bq_args.i_broker);
shared_ptr<broker::Queue> queue = broker.getQueues().get(bq_args.i_queue);
@@ -195,7 +199,7 @@ void HaBroker::setPublicUrl(const Url& url) {
knownBrokers.push_back(url);
vector<Url> urls(1, url);
failoverExchange->updateUrls(urls);
- QPID_LOG(debug, role->getLogPrefix() << "Public URL set to: " << url);
+ QPID_LOG(debug, logPrefix << "Public URL set to: " << url);
}
void HaBroker::setBrokerUrl(const Url& url) {
@@ -203,7 +207,7 @@ void HaBroker::setBrokerUrl(const Url& url) {
Mutex::ScopedLock l(lock);
brokerUrl = url;
mgmtObject->set_brokersUrl(brokerUrl.str());
- QPID_LOG(info, role->getLogPrefix() << "Brokers URL set to: " << url);
+ QPID_LOG(info, logPrefix << "Brokers URL set to: " << url);
}
role->setBrokerUrl(url); // Oustside lock
}
@@ -214,7 +218,7 @@ std::vector<Url> HaBroker::getKnownBrokers() const {
}
void HaBroker::shutdown(const std::string& message) {
- QPID_LOG(critical, "Shutting down: " << message);
+ QPID_LOG(critical, logPrefix << "Shutting down: " << message);
broker.shutdown();
throw Exception(message);
}
@@ -224,7 +228,7 @@ BrokerStatus HaBroker::getStatus() const {
}
void HaBroker::setAddress(const Address& a) {
- QPID_LOG(info, role->getLogPrefix() << "Set self address to: " << a);
+ QPID_LOG(info, logPrefix << "Set self address to: " << a);
membership.setSelfAddress(a);
}