summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/ha/Primary.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/ha/Primary.cpp')
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.cpp46
1 files changed, 46 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp
index 3bb51b1813..b4d50d1652 100644
--- a/qpid/cpp/src/qpid/ha/Primary.cpp
+++ b/qpid/cpp/src/qpid/ha/Primary.cpp
@@ -33,6 +33,7 @@
#include "qpid/broker/BrokerObserver.h"
#include "qpid/broker/Connection.h"
#include "qpid/broker/Queue.h"
+#include "qpid/broker/SessionHandlerObserver.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/FieldValue.h"
#include "qpid/log/Statement.h"
@@ -87,12 +88,54 @@ class ExpectedBackupTimerTask : public sys::TimerTask {
Primary& primary;
};
+class PrimaryErrorListener : public broker::SessionHandler::ErrorListener {
+ public:
+ PrimaryErrorListener(const std::string& logPrefix_) : logPrefix(logPrefix_) {}
+
+ void connectionException(framing::connection::CloseCode code, const std::string& msg) {
+ QPID_LOG(debug, logPrefix << framing::createConnectionException(code, msg).what());
+ }
+ void channelException(framing::session::DetachCode code, const std::string& msg) {
+ QPID_LOG(debug, logPrefix << framing::createChannelException(code, msg).what());
+ }
+ void executionException(framing::execution::ErrorCode code, const std::string& msg) {
+ QPID_LOG(debug, logPrefix << framing::createSessionException(code, msg).what());
+ }
+ void incomingExecutionException(framing::execution::ErrorCode code, const std::string& msg) {
+ QPID_LOG(debug, logPrefix << "Incoming " << framing::createSessionException(code, msg).what());
+ }
+ void detach() {
+ QPID_LOG(debug, logPrefix << "Session detached.");
+ }
+
+ private:
+ std::string logPrefix;
+};
+
+class PrimarySessionHandlerObserver : public broker::SessionHandlerObserver {
+ public:
+ PrimarySessionHandlerObserver(const std::string& logPrefix)
+ : errorListener(new PrimaryErrorListener(logPrefix)) {}
+ void newSessionHandler(broker::SessionHandler& sh) {
+ BrokerInfo info;
+ // Suppress error logging for backup connections
+ // TODO aconway 2014-01-31: Be more selective, suppress only expected errors?
+ if (ha::ConnectionObserver::getBrokerInfo(sh.getConnection(), info)) {
+ sh.setErrorListener(errorListener);
+ }
+ }
+ private:
+ boost::shared_ptr<PrimaryErrorListener> errorListener;
+};
+
+
} // namespace
Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
haBroker(hb), membership(hb.getMembership()),
logPrefix("Primary: "), active(false),
replicationTest(hb.getSettings().replicateDefault.get()),
+ sessionHandlerObserver(new PrimarySessionHandlerObserver(logPrefix)),
queueLimits(logPrefix)
{
// Note that at this point, we are still rejecting client connections.
@@ -124,6 +167,8 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
}
brokerObserver.reset(new PrimaryBrokerObserver(*this));
haBroker.getBroker().getBrokerObservers().add(brokerObserver);
+ haBroker.getBroker().getSessionHandlerObservers().add(sessionHandlerObserver);
+
checkReady(); // Outside lock
// Allow client connections
@@ -134,6 +179,7 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
Primary::~Primary() {
if (timerTask) timerTask->cancel();
haBroker.getBroker().getBrokerObservers().remove(brokerObserver);
+ haBroker.getBroker().getSessionHandlerObservers().remove(sessionHandlerObserver);
haBroker.getObserver()->reset();
}