summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/ha/Primary.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2014-02-03 19:17:02 +0000
committerAlan Conway <aconway@apache.org>2014-02-03 19:17:02 +0000
commitc8bdc3035a0bd1a8527b648f555b4f8b4f413e98 (patch)
treed96894f9fa7005f9e84cd767321d9b49c1464cee /qpid/cpp/src/qpid/ha/Primary.cpp
parentfe915e0f5573921b28cbc2b7b6f7d15ce90a05a9 (diff)
downloadqpid-python-c8bdc3035a0bd1a8527b648f555b4f8b4f413e98.tar.gz
QPID-5528: HA Clean up error messages around rolled-back transactions.
A simple transaction test on a 3 node cluster generates a lot of errors and rollback messages in the broker logs even though the test code never rolls back a transaction. E.g. qpid-cluster-benchmark -b 20.0.20.200 -n1 -m 1000 -q3 -s2 -r2 --send-arg=--tx --send-arg=10 --receive-arg=--tx --receive-arg=10 The errors are caused by queues being deleted while backup brokers are using them. This happens a lot in the transaction test because a transactional session must create a new transaction when the previous one closes. When the session closes the open transaction is rolled back automatically. Thus there is almost always an empty transaction that is created then immediately rolled back at the end of the session. Backup brokers may still be in the process of subscribing to the transaction's replication queue at this point, causing (harmlesss) errors. This commit takes the following steps to clean up the unwanted error and rollback messages: HA TX messages cleaned up: - Remove log messages about rolling back/destroying empty transactions. - Remove misleading "backup disconnected" message for cancelled transactions. - Remove spurious warning about ignored unreplicated dequeues. - Include TxReplicator destroy in QueueReplicator mutex, idempotence check before destroy. Allow HA to suppress/modify broker exception logging: - Move broker exception logging into ErrorListener - Every SessionHandler has DefaultErrorListener that does the same logging as before. - Added SessionHandlerObserver to allow plugins to change the error listener. - HA plugin set ErrorListeners to log harmless exceptions as HA debug messages. Unrelated cleanup: - Broker now logs "incoming execution exceptions" as debug messages rather than ignoring. - Exception prefixes: don't add the prefix if already present. The exception test above should now pass without errors or rollback messages in the logs. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1564010 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/ha/Primary.cpp')
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.cpp46
1 files changed, 46 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp
index 3bb51b1813..b4d50d1652 100644
--- a/qpid/cpp/src/qpid/ha/Primary.cpp
+++ b/qpid/cpp/src/qpid/ha/Primary.cpp
@@ -33,6 +33,7 @@
#include "qpid/broker/BrokerObserver.h"
#include "qpid/broker/Connection.h"
#include "qpid/broker/Queue.h"
+#include "qpid/broker/SessionHandlerObserver.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/FieldValue.h"
#include "qpid/log/Statement.h"
@@ -87,12 +88,54 @@ class ExpectedBackupTimerTask : public sys::TimerTask {
Primary& primary;
};
+class PrimaryErrorListener : public broker::SessionHandler::ErrorListener {
+ public:
+ PrimaryErrorListener(const std::string& logPrefix_) : logPrefix(logPrefix_) {}
+
+ void connectionException(framing::connection::CloseCode code, const std::string& msg) {
+ QPID_LOG(debug, logPrefix << framing::createConnectionException(code, msg).what());
+ }
+ void channelException(framing::session::DetachCode code, const std::string& msg) {
+ QPID_LOG(debug, logPrefix << framing::createChannelException(code, msg).what());
+ }
+ void executionException(framing::execution::ErrorCode code, const std::string& msg) {
+ QPID_LOG(debug, logPrefix << framing::createSessionException(code, msg).what());
+ }
+ void incomingExecutionException(framing::execution::ErrorCode code, const std::string& msg) {
+ QPID_LOG(debug, logPrefix << "Incoming " << framing::createSessionException(code, msg).what());
+ }
+ void detach() {
+ QPID_LOG(debug, logPrefix << "Session detached.");
+ }
+
+ private:
+ std::string logPrefix;
+};
+
+class PrimarySessionHandlerObserver : public broker::SessionHandlerObserver {
+ public:
+ PrimarySessionHandlerObserver(const std::string& logPrefix)
+ : errorListener(new PrimaryErrorListener(logPrefix)) {}
+ void newSessionHandler(broker::SessionHandler& sh) {
+ BrokerInfo info;
+ // Suppress error logging for backup connections
+ // TODO aconway 2014-01-31: Be more selective, suppress only expected errors?
+ if (ha::ConnectionObserver::getBrokerInfo(sh.getConnection(), info)) {
+ sh.setErrorListener(errorListener);
+ }
+ }
+ private:
+ boost::shared_ptr<PrimaryErrorListener> errorListener;
+};
+
+
} // namespace
Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
haBroker(hb), membership(hb.getMembership()),
logPrefix("Primary: "), active(false),
replicationTest(hb.getSettings().replicateDefault.get()),
+ sessionHandlerObserver(new PrimarySessionHandlerObserver(logPrefix)),
queueLimits(logPrefix)
{
// Note that at this point, we are still rejecting client connections.
@@ -124,6 +167,8 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
}
brokerObserver.reset(new PrimaryBrokerObserver(*this));
haBroker.getBroker().getBrokerObservers().add(brokerObserver);
+ haBroker.getBroker().getSessionHandlerObservers().add(sessionHandlerObserver);
+
checkReady(); // Outside lock
// Allow client connections
@@ -134,6 +179,7 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
Primary::~Primary() {
if (timerTask) timerTask->cancel();
haBroker.getBroker().getBrokerObservers().remove(brokerObserver);
+ haBroker.getBroker().getSessionHandlerObservers().remove(sessionHandlerObserver);
haBroker.getObserver()->reset();
}