diff options
Diffstat (limited to 'qpid/cpp/src/qpid/ha/Primary.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/ha/Primary.cpp | 46 |
1 files changed, 46 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp index 3bb51b1813..b4d50d1652 100644 --- a/qpid/cpp/src/qpid/ha/Primary.cpp +++ b/qpid/cpp/src/qpid/ha/Primary.cpp @@ -33,6 +33,7 @@ #include "qpid/broker/BrokerObserver.h" #include "qpid/broker/Connection.h" #include "qpid/broker/Queue.h" +#include "qpid/broker/SessionHandlerObserver.h" #include "qpid/framing/FieldTable.h" #include "qpid/framing/FieldValue.h" #include "qpid/log/Statement.h" @@ -87,12 +88,54 @@ class ExpectedBackupTimerTask : public sys::TimerTask { Primary& primary; }; +class PrimaryErrorListener : public broker::SessionHandler::ErrorListener { + public: + PrimaryErrorListener(const std::string& logPrefix_) : logPrefix(logPrefix_) {} + + void connectionException(framing::connection::CloseCode code, const std::string& msg) { + QPID_LOG(debug, logPrefix << framing::createConnectionException(code, msg).what()); + } + void channelException(framing::session::DetachCode code, const std::string& msg) { + QPID_LOG(debug, logPrefix << framing::createChannelException(code, msg).what()); + } + void executionException(framing::execution::ErrorCode code, const std::string& msg) { + QPID_LOG(debug, logPrefix << framing::createSessionException(code, msg).what()); + } + void incomingExecutionException(framing::execution::ErrorCode code, const std::string& msg) { + QPID_LOG(debug, logPrefix << "Incoming " << framing::createSessionException(code, msg).what()); + } + void detach() { + QPID_LOG(debug, logPrefix << "Session detached."); + } + + private: + std::string logPrefix; +}; + +class PrimarySessionHandlerObserver : public broker::SessionHandlerObserver { + public: + PrimarySessionHandlerObserver(const std::string& logPrefix) + : errorListener(new PrimaryErrorListener(logPrefix)) {} + void newSessionHandler(broker::SessionHandler& sh) { + BrokerInfo info; + // Suppress error logging for backup connections + // TODO aconway 2014-01-31: Be more selective, suppress only expected errors? + if (ha::ConnectionObserver::getBrokerInfo(sh.getConnection(), info)) { + sh.setErrorListener(errorListener); + } + } + private: + boost::shared_ptr<PrimaryErrorListener> errorListener; +}; + + } // namespace Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) : haBroker(hb), membership(hb.getMembership()), logPrefix("Primary: "), active(false), replicationTest(hb.getSettings().replicateDefault.get()), + sessionHandlerObserver(new PrimarySessionHandlerObserver(logPrefix)), queueLimits(logPrefix) { // Note that at this point, we are still rejecting client connections. @@ -124,6 +167,8 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) : } brokerObserver.reset(new PrimaryBrokerObserver(*this)); haBroker.getBroker().getBrokerObservers().add(brokerObserver); + haBroker.getBroker().getSessionHandlerObservers().add(sessionHandlerObserver); + checkReady(); // Outside lock // Allow client connections @@ -134,6 +179,7 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) : Primary::~Primary() { if (timerTask) timerTask->cancel(); haBroker.getBroker().getBrokerObservers().remove(brokerObserver); + haBroker.getBroker().getSessionHandlerObservers().remove(sessionHandlerObserver); haBroker.getObserver()->reset(); } |