summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2014-02-03 19:17:02 +0000
committerAlan Conway <aconway@apache.org>2014-02-03 19:17:02 +0000
commitc8bdc3035a0bd1a8527b648f555b4f8b4f413e98 (patch)
treed96894f9fa7005f9e84cd767321d9b49c1464cee
parentfe915e0f5573921b28cbc2b7b6f7d15ce90a05a9 (diff)
downloadqpid-python-c8bdc3035a0bd1a8527b648f555b4f8b4f413e98.tar.gz
QPID-5528: HA Clean up error messages around rolled-back transactions.
A simple transaction test on a 3 node cluster generates a lot of errors and rollback messages in the broker logs even though the test code never rolls back a transaction. E.g. qpid-cluster-benchmark -b 20.0.20.200 -n1 -m 1000 -q3 -s2 -r2 --send-arg=--tx --send-arg=10 --receive-arg=--tx --receive-arg=10 The errors are caused by queues being deleted while backup brokers are using them. This happens a lot in the transaction test because a transactional session must create a new transaction when the previous one closes. When the session closes the open transaction is rolled back automatically. Thus there is almost always an empty transaction that is created then immediately rolled back at the end of the session. Backup brokers may still be in the process of subscribing to the transaction's replication queue at this point, causing (harmlesss) errors. This commit takes the following steps to clean up the unwanted error and rollback messages: HA TX messages cleaned up: - Remove log messages about rolling back/destroying empty transactions. - Remove misleading "backup disconnected" message for cancelled transactions. - Remove spurious warning about ignored unreplicated dequeues. - Include TxReplicator destroy in QueueReplicator mutex, idempotence check before destroy. Allow HA to suppress/modify broker exception logging: - Move broker exception logging into ErrorListener - Every SessionHandler has DefaultErrorListener that does the same logging as before. - Added SessionHandlerObserver to allow plugins to change the error listener. - HA plugin set ErrorListeners to log harmless exceptions as HA debug messages. Unrelated cleanup: - Broker now logs "incoming execution exceptions" as debug messages rather than ignoring. - Exception prefixes: don't add the prefix if already present. The exception test above should now pass without errors or rollback messages in the logs. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1564010 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/Exception.cpp12
-rw-r--r--qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp17
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h3
-rw-r--r--qpid/cpp/src/qpid/broker/SessionHandler.cpp28
-rw-r--r--qpid/cpp/src/qpid/broker/SessionHandlerObserver.h51
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp30
-rw-r--r--qpid/cpp/src/qpid/ha/ErrorListener.h60
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.cpp46
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.h2
-rw-r--r--qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp14
-rw-r--r--qpid/cpp/src/qpid/ha/PrimaryTxObserver.h6
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp45
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.h17
-rw-r--r--qpid/cpp/src/qpid/ha/TxReplicator.cpp21
-rw-r--r--qpid/cpp/src/qpid/ha/TxReplicator.h3
15 files changed, 279 insertions, 76 deletions
diff --git a/qpid/cpp/src/qpid/Exception.cpp b/qpid/cpp/src/qpid/Exception.cpp
index a6696f06e1..999c2aeb52 100644
--- a/qpid/cpp/src/qpid/Exception.cpp
+++ b/qpid/cpp/src/qpid/Exception.cpp
@@ -45,16 +45,20 @@ Exception::Exception(const std::string& msg) throw() : message(msg) {
Exception::~Exception() throw() {}
-std::string Exception::getPrefix() const { return ""; }
+std::string Exception::getPrefix() const { return std::string(); }
std::string Exception::getMessage() const { return message; }
+namespace { const std::string COLON(": "); }
+
const char* Exception::what() const throw() {
// Construct the what string the first time it is needed.
if (whatStr.empty()) {
- whatStr = getPrefix();
- if (!whatStr.empty()) whatStr += ": ";
- whatStr += message;
+ if (message.compare(0, getPrefix().size(), getPrefix()) == 0 || // Already has prefix
+ getPrefix().empty()) // No prefix
+ whatStr = message;
+ else
+ whatStr = getPrefix() + COLON + message;
}
return whatStr.c_str();
}
diff --git a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
index 5eedafc77b..43f39c2919 100644
--- a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
+++ b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
@@ -94,8 +94,7 @@ void SessionHandler::handleIn(AMQFrame& f) {
}
}
catch(const SessionException& e) {
- QPID_LOG(error, "Execution exception: " << e.what());
- executionException(e.code, e.what()); // Let subclass handle this first.
+ executionException(e.code, e.what());
framing::AMQP_AllProxy::Execution execution(channel);
AMQMethodBody* m = f.getMethod();
SequenceNumber commandId;
@@ -105,16 +104,13 @@ void SessionHandler::handleIn(AMQFrame& f) {
sendDetach();
}
catch(const ChannelException& e){
- QPID_LOG(error, "Channel exception: " << e.what());
- channelException(e.code, e.what()); // Let subclass handle this first.
+ channelException(e.code, e.what());
peer.detached(name, e.code);
}
catch(const ConnectionException& e) {
- QPID_LOG(error, "Connection exception: " << e.what());
connectionException(e.code, e.getMessage());
}
catch(const std::exception& e) {
- QPID_LOG(error, "Unexpected exception: " << e.what());
connectionException(connection::CLOSE_CODE_FRAMING_ERROR, e.what());
}
}
@@ -186,13 +182,14 @@ void SessionHandler::detach(const std::string& name) {
}
void SessionHandler::detached(const std::string& /*name*/, uint8_t code) {
- // Special case for detached: Don't check if we are
- // attached. Checking can lead to an endless game of "detached
- // tennis" on federated brokers.
awaitingDetached = false;
+ // Special case for detached: Don't throw if we are not attached. Doing so
+ // can lead to an endless game of "detached tennis" on federated brokers.
+ if (!getState()) return; // Already detached.
if (code != session::DETACH_CODE_NORMAL) {
sendReady = receiveReady = false;
- channelException(convert(code), "session.detached from peer.");
+ channelException(convert(code), Msg() << "Channel " << channel.get()
+ << " received session.detached from peer");
} else {
handleDetach();
}
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index 13679bd623..4bad8f2960 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -38,6 +38,7 @@
#include "qpid/broker/System.h"
#include "qpid/broker/ConsumerFactory.h"
#include "qpid/broker/ConnectionObservers.h"
+#include "qpid/broker/SessionHandlerObserver.h"
#include "qpid/broker/BrokerObservers.h"
#include "qpid/management/Manageable.h"
#include "qpid/sys/ConnectionCodec.h"
@@ -179,6 +180,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
DataDir dataDir;
DataDir pagingDir;
ConnectionObservers connectionObservers;
+ SessionHandlerObservers sessionHandlerObservers;
BrokerObservers brokerObservers;
QueueRegistry queues;
@@ -361,6 +363,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
ConsumerFactories& getConsumerFactories() { return consumerFactories; }
ConnectionObservers& getConnectionObservers() { return connectionObservers; }
+ SessionHandlerObservers& getSessionHandlerObservers() { return sessionHandlerObservers; }
BrokerObservers& getBrokerObservers() { return brokerObservers; }
/** Properties to be set on outgoing link connections */
diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.cpp b/qpid/cpp/src/qpid/broker/SessionHandler.cpp
index a8f5734af7..3d5faf2dab 100644
--- a/qpid/cpp/src/qpid/broker/SessionHandler.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionHandler.cpp
@@ -33,11 +33,35 @@ using namespace framing;
using namespace std;
using namespace qpid::sys;
+namespace {
+class DefaultErrorListener : public SessionHandler::ErrorListener {
+ public:
+ void connectionException(framing::connection::CloseCode code, const std::string& msg) {
+ QPID_LOG(error, "Connection exception: " << framing::createConnectionException(code, msg).what());
+ }
+ void channelException(framing::session::DetachCode code, const std::string& msg) {
+ QPID_LOG(error, "Channel exception: " << framing::createChannelException(code, msg).what());
+ }
+ void executionException(framing::execution::ErrorCode code, const std::string& msg) {
+ QPID_LOG(error, "Execution exception: " << framing::createSessionException(code, msg).what());
+ }
+ void incomingExecutionException(framing::execution::ErrorCode code, const std::string& msg) {
+ QPID_LOG(debug, "Incoming execution exception: " << framing::createSessionException(code, msg).what());
+ }
+ void detach() {}
+
+ private:
+};
+}
+
SessionHandler::SessionHandler(amqp_0_10::Connection& c, ChannelId ch)
: qpid::amqp_0_10::SessionHandler(&c.getOutput(), ch),
connection(c),
- proxy(out)
-{}
+ proxy(out),
+ errorListener(boost::shared_ptr<ErrorListener>(new DefaultErrorListener()))
+{
+ c.getBroker().getSessionHandlerObservers().newSessionHandler(*this);
+}
SessionHandler::~SessionHandler()
{
diff --git a/qpid/cpp/src/qpid/broker/SessionHandlerObserver.h b/qpid/cpp/src/qpid/broker/SessionHandlerObserver.h
new file mode 100644
index 0000000000..6d0ea16254
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/SessionHandlerObserver.h
@@ -0,0 +1,51 @@
+#ifndef QPID_BROKER_SESSIONHANDLEROBSERVER_H
+#define QPID_BROKER_SESSIONHANDLEROBSERVER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Observers.h"
+
+namespace qpid {
+namespace broker {
+class SessionHandler;
+
+/**
+ * Observer of session handler events.
+ */
+class SessionHandlerObserver
+{
+ public:
+ virtual ~SessionHandlerObserver() {}
+ virtual void newSessionHandler(SessionHandler&) {}
+};
+
+
+class SessionHandlerObservers : public Observers<SessionHandlerObserver> {
+ public:
+ void newSessionHandler(SessionHandler& sh) {
+ each(boost::bind(&SessionHandlerObserver::newSessionHandler, _1, boost::ref(sh)));
+ }
+};
+
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_SESSIONHANDLEROBSERVER_H*/
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index b1faf19e52..7928b6ab71 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -172,34 +172,29 @@ Variant::Map asMapVoid(const Variant& value) {
}
} // namespace
-// Listens for errors on the bridge session.
-class BrokerReplicator::ErrorListener : public SessionHandler::ErrorListener {
+// Report errors on the broker replication session.
+class BrokerReplicator::ErrorListener : public broker::SessionHandler::ErrorListener {
public:
- ErrorListener(const std::string& lp, BrokerReplicator& br) :
- logPrefix(lp), brokerReplicator(br) {}
+ ErrorListener(const std::string& logPrefix_) : logPrefix(logPrefix_) {}
- void connectionException(framing::connection::CloseCode, const std::string& msg) {
- QPID_LOG(error, logPrefix << "Connection error: " << msg);
+ void connectionException(framing::connection::CloseCode code, const std::string& msg) {
+ QPID_LOG(error, logPrefix << framing::createConnectionException(code, msg).what());
}
- void channelException(framing::session::DetachCode, const std::string& msg) {
- QPID_LOG(error, logPrefix << "Channel error: " << msg);
+ void channelException(framing::session::DetachCode code, const std::string& msg) {
+ QPID_LOG(error, logPrefix << framing::createChannelException(code, msg).what());
}
- void executionException(framing::execution::ErrorCode, const std::string& msg) {
- QPID_LOG(error, logPrefix << "Execution error: " << msg);
+ void executionException(framing::execution::ErrorCode code, const std::string& msg) {
+ QPID_LOG(error, logPrefix << framing::createSessionException(code, msg).what());
}
-
- void incomingExecutionException(
- framing::execution::ErrorCode, const std::string& msg) {
- QPID_LOG(error, logPrefix << "Incoming execution error: " << msg);
+ void incomingExecutionException(framing::execution::ErrorCode code, const std::string& msg) {
+ QPID_LOG(error, logPrefix << "Incoming " << framing::createSessionException(code, msg).what());
}
-
void detach() {
QPID_LOG(debug, logPrefix << "Session detached.");
}
private:
std::string logPrefix;
- BrokerReplicator& brokerReplicator;
};
/** Keep track of queues or exchanges during the update process to solve 2
@@ -328,8 +323,7 @@ void BrokerReplicator::initialize() {
boost::bind(&BrokerReplicator::connected, shared_from_this(), _1, _2)
);
assert(result.second);
- result.first->setErrorListener(
- boost::shared_ptr<ErrorListener>(new ErrorListener(logPrefix, *this)));
+ result.first->setErrorListener(boost::shared_ptr<ErrorListener>(new ErrorListener(logPrefix)));
broker.getConnectionObservers().add(shared_from_this());
}
diff --git a/qpid/cpp/src/qpid/ha/ErrorListener.h b/qpid/cpp/src/qpid/ha/ErrorListener.h
new file mode 100644
index 0000000000..1ae2078a11
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/ErrorListener.h
@@ -0,0 +1,60 @@
+#ifndef QPID_HA_ERRORLISTENER_H
+#define QPID_HA_ERRORLISTENER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/broker/SessionHandler.h"
+#include "qpid/log/Statement.h"
+#include "qpid/framing/reply_exceptions.h"
+
+namespace qpid {
+namespace ha {
+
+/** Default ErrorListener for HA module */
+class ErrorListener : public broker::SessionHandler::ErrorListener {
+ public:
+ ErrorListener(const std::string& logPrefix_) : logPrefix(logPrefix_) {}
+
+ void connectionException(framing::connection::CloseCode code, const std::string& msg) {
+ QPID_LOG(error, logPrefix << framing::createConnectionException(code, msg).what());
+ }
+ void channelException(framing::session::DetachCode code, const std::string& msg) {
+ QPID_LOG(error, logPrefix << framing::createChannelException(code, msg).what());
+ }
+ void executionException(framing::execution::ErrorCode code, const std::string& msg) {
+ QPID_LOG(error, logPrefix << framing::createSessionException(code, msg).what());
+ }
+ void incomingExecutionException(framing::execution::ErrorCode code, const std::string& msg) {
+ QPID_LOG(error, logPrefix << "Incoming " << framing::createSessionException(code, msg).what());
+ }
+ void detach() {
+ QPID_LOG(error, logPrefix << "Session detached.");
+ }
+
+ private:
+ std::string logPrefix;
+};
+
+
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_ERRORLISTENER_H*/
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp
index 3bb51b1813..b4d50d1652 100644
--- a/qpid/cpp/src/qpid/ha/Primary.cpp
+++ b/qpid/cpp/src/qpid/ha/Primary.cpp
@@ -33,6 +33,7 @@
#include "qpid/broker/BrokerObserver.h"
#include "qpid/broker/Connection.h"
#include "qpid/broker/Queue.h"
+#include "qpid/broker/SessionHandlerObserver.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/FieldValue.h"
#include "qpid/log/Statement.h"
@@ -87,12 +88,54 @@ class ExpectedBackupTimerTask : public sys::TimerTask {
Primary& primary;
};
+class PrimaryErrorListener : public broker::SessionHandler::ErrorListener {
+ public:
+ PrimaryErrorListener(const std::string& logPrefix_) : logPrefix(logPrefix_) {}
+
+ void connectionException(framing::connection::CloseCode code, const std::string& msg) {
+ QPID_LOG(debug, logPrefix << framing::createConnectionException(code, msg).what());
+ }
+ void channelException(framing::session::DetachCode code, const std::string& msg) {
+ QPID_LOG(debug, logPrefix << framing::createChannelException(code, msg).what());
+ }
+ void executionException(framing::execution::ErrorCode code, const std::string& msg) {
+ QPID_LOG(debug, logPrefix << framing::createSessionException(code, msg).what());
+ }
+ void incomingExecutionException(framing::execution::ErrorCode code, const std::string& msg) {
+ QPID_LOG(debug, logPrefix << "Incoming " << framing::createSessionException(code, msg).what());
+ }
+ void detach() {
+ QPID_LOG(debug, logPrefix << "Session detached.");
+ }
+
+ private:
+ std::string logPrefix;
+};
+
+class PrimarySessionHandlerObserver : public broker::SessionHandlerObserver {
+ public:
+ PrimarySessionHandlerObserver(const std::string& logPrefix)
+ : errorListener(new PrimaryErrorListener(logPrefix)) {}
+ void newSessionHandler(broker::SessionHandler& sh) {
+ BrokerInfo info;
+ // Suppress error logging for backup connections
+ // TODO aconway 2014-01-31: Be more selective, suppress only expected errors?
+ if (ha::ConnectionObserver::getBrokerInfo(sh.getConnection(), info)) {
+ sh.setErrorListener(errorListener);
+ }
+ }
+ private:
+ boost::shared_ptr<PrimaryErrorListener> errorListener;
+};
+
+
} // namespace
Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
haBroker(hb), membership(hb.getMembership()),
logPrefix("Primary: "), active(false),
replicationTest(hb.getSettings().replicateDefault.get()),
+ sessionHandlerObserver(new PrimarySessionHandlerObserver(logPrefix)),
queueLimits(logPrefix)
{
// Note that at this point, we are still rejecting client connections.
@@ -124,6 +167,8 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
}
brokerObserver.reset(new PrimaryBrokerObserver(*this));
haBroker.getBroker().getBrokerObservers().add(brokerObserver);
+ haBroker.getBroker().getSessionHandlerObservers().add(sessionHandlerObserver);
+
checkReady(); // Outside lock
// Allow client connections
@@ -134,6 +179,7 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
Primary::~Primary() {
if (timerTask) timerTask->cancel();
haBroker.getBroker().getBrokerObservers().remove(brokerObserver);
+ haBroker.getBroker().getSessionHandlerObservers().remove(sessionHandlerObserver);
haBroker.getObserver()->reset();
}
diff --git a/qpid/cpp/src/qpid/ha/Primary.h b/qpid/cpp/src/qpid/ha/Primary.h
index 2e32515c9a..af368bca0f 100644
--- a/qpid/cpp/src/qpid/ha/Primary.h
+++ b/qpid/cpp/src/qpid/ha/Primary.h
@@ -42,6 +42,7 @@ class Queue;
class Connection;
class ConnectionObserver;
class BrokerObserver;
+class SessionHandlerObserver;
class TxBuffer;
class DtxBuffer;
}
@@ -152,6 +153,7 @@ class Primary : public Role
BackupMap backups;
boost::shared_ptr<broker::ConnectionObserver> connectionObserver;
boost::shared_ptr<broker::BrokerObserver> brokerObserver;
+ boost::shared_ptr<broker::SessionHandlerObserver> sessionHandlerObserver;
boost::intrusive_ptr<sys::TimerTask> timerTask;
ReplicaMap replicas;
TxMap txMap;
diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
index dc5bf15911..c94ced7024 100644
--- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
+++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
@@ -98,7 +98,8 @@ PrimaryTxObserver::PrimaryTxObserver(
replicationTest(hb.getSettings().replicateDefault.get()),
txBuffer(tx),
id(true),
- exchangeName(TRANSACTION_REPLICATOR_PREFIX+id.str())
+ exchangeName(TRANSACTION_REPLICATOR_PREFIX+id.str()),
+ empty(true)
{
logPrefix = "Primary transaction "+shortStr(id)+": ";
@@ -149,6 +150,7 @@ void PrimaryTxObserver::enqueue(const QueuePtr& q, const broker::Message& m)
if (replicationTest.useLevel(*q) == ALL) { // Ignore unreplicated queues.
QPID_LOG(trace, logPrefix << "Enqueue: " << LogMessageId(*q, m));
checkState(SENDING, "Too late for enqueue");
+ empty = false;
enqueues[q] += m.getReplicationId();
txQueue->deliver(TxEnqueueEvent(q->getName(), m.getReplicationId()).message());
txQueue->deliver(m);
@@ -162,12 +164,9 @@ void PrimaryTxObserver::dequeue(
checkState(SENDING, "Too late for dequeue");
if (replicationTest.useLevel(*q) == ALL) { // Ignore unreplicated queues.
QPID_LOG(trace, logPrefix << "Dequeue: " << LogMessageId(*q, pos, id));
+ empty = false;
txQueue->deliver(TxDequeueEvent(q->getName(), id).message());
}
- else {
- QPID_LOG(warning, logPrefix << "Dequeue skipped, queue not replicated: "
- << LogMessageId(*q, pos, id));
- }
}
namespace {
@@ -221,8 +220,10 @@ void PrimaryTxObserver::commit() {
}
void PrimaryTxObserver::rollback() {
- QPID_LOG(debug, logPrefix << "Rollback");
Mutex::ScopedLock l(lock);
+ // Don't bleat about rolling back empty transactions, this happens all the time
+ // when a session closes and rolls back its outstanding transaction.
+ if (!empty) QPID_LOG(debug, logPrefix << "Rollback");
if (state != ENDED) {
txQueue->deliver(TxRollbackEvent().message());
end(l);
@@ -287,7 +288,6 @@ void PrimaryTxObserver::txPrepareFailEvent(const string& data) {
void PrimaryTxObserver::cancel(const ReplicatingSubscription& rs) {
Mutex::ScopedLock l(lock);
types::Uuid backup = rs.getBrokerInfo().getSystemId();
- QPID_LOG(debug, logPrefix << "Backup disconnected: " << backup);
// Normally the backup should be completed before it is cancelled.
if (completed(backup, l)) error(backup, "Unexpected disconnect:", l);
// Break the pointer cycle if backups have completed and we are done with txBuffer.
diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
index 105fee4d40..5b7c2e3e93 100644
--- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
+++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
@@ -55,8 +55,9 @@ class Primary;
* A TxReplicator on the backup replicates the tx-queue and creates
* a TxBuffer on the backup equivalent to the one on the primary.
*
- * Also observes the tx-queue for prepare-complete messages and
- * subscription cancellations.
+ * Creates an exchange to receive prepare-ok/prepare-fail messages from backups.
+ *
+ * Monitors for tx-queue subscription cancellations.
*
* THREAD SAFE: called in user connection thread for TX events,
* and in backup connection threads for prepare-completed events
@@ -122,6 +123,7 @@ class PrimaryTxObserver : public broker::TransactionObserver,
QueueIdsMap enqueues;
UuidSet backups; // All backups of transaction.
UuidSet incomplete; // Incomplete backups (not yet responded to prepare)
+ bool empty; // True if the transaction is empty - no enqueues/dequeues.
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index 507df6ea5a..59b2013f59 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -73,17 +73,26 @@ void QueueReplicator::copy(ExchangeRegistry& registry, Vector& result) {
registry.eachExchange(boost::bind(&pushIfQr, boost::ref(result), _1));
}
+// Debug log expected exceptions on queue replicator, check incoming execution
+// exceptions for "deleted on primary" conditions.
class QueueReplicator::ErrorListener : public SessionHandler::ErrorListener {
public:
ErrorListener(const boost::shared_ptr<QueueReplicator>& qr)
: queueReplicator(qr), logPrefix(qr->logPrefix) {}
- void connectionException(framing::connection::CloseCode, const std::string&) {}
- void channelException(framing::session::DetachCode, const std::string&) {}
- void executionException(framing::execution::ErrorCode, const std::string&) {}
-
- void incomingExecutionException(ErrorCode e, const std::string& msg) {
- queueReplicator->incomingExecutionException(e, msg);
+ void connectionException(framing::connection::CloseCode code, const std::string& msg) {
+ QPID_LOG(debug, logPrefix << framing::createConnectionException(code, msg).what());
+ }
+ void channelException(framing::session::DetachCode code, const std::string& msg) {
+ QPID_LOG(debug, logPrefix << framing::createChannelException(code, msg).what());
+ }
+ void executionException(framing::execution::ErrorCode code, const std::string& msg) {
+ QPID_LOG(debug, logPrefix << framing::createSessionException(code, msg).what());
+ }
+ void incomingExecutionException(ErrorCode code, const std::string& msg) {
+ if (!queueReplicator->deletedOnPrimary(code, msg))
+ QPID_LOG(error, logPrefix << "Incoming "
+ << framing::createSessionException(code, msg).what());
}
void detach() {
QPID_LOG(debug, logPrefix << "Session detached");
@@ -197,20 +206,25 @@ void QueueReplicator::disconnect() {
// Called from Queue::destroyed()
void QueueReplicator::destroy() {
+ QPID_LOG(debug, logPrefix << "Destroyed");
boost::shared_ptr<Bridge> bridge2; // To call outside of lock
{
Mutex::ScopedLock l(lock);
if (!queue) return; // Already destroyed
- QPID_LOG(debug, logPrefix << "Destroyed");
bridge2 = bridge; // call close outside the lock.
- // Need to drop shared pointers to avoid pointer cycles keeping this in memory.
- queue.reset();
- bridge.reset();
- getBroker()->getExchanges().destroy(getName());
+ destroy(l);
}
if (bridge2) bridge2->close(); // Outside of lock, avoid deadlock.
}
+void QueueReplicator::destroy(Mutex::ScopedLock&) {
+ // Need to drop shared pointers to avoid pointer cycles keeping this in memory.
+ queue.reset();
+ bridge.reset();
+ getBroker()->getExchanges().destroy(getName());
+}
+
+
// Called in a broker connection thread when the bridge is created.
// Note: called with the Link lock held.
void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler_) {
@@ -306,18 +320,19 @@ void QueueReplicator::idEvent(const string& data, Mutex::ScopedLock&) {
nextId = decodeStr<IdEvent>(data).id;
}
-void QueueReplicator::incomingExecutionException(ErrorCode e, const std::string& msg) {
+bool QueueReplicator::deletedOnPrimary(ErrorCode e, const std::string& msg) {
if (e == ERROR_CODE_NOT_FOUND || e == ERROR_CODE_RESOURCE_DELETED) {
// If the queue is destroyed at the same time we are subscribing, we may
// get a not-found or resource-deleted exception before the
// BrokerReplicator gets the queue-delete event. Shut down the bridge by
// calling destroy(), we can let the BrokerReplicator delete the queue
// when the queue-delete arrives.
- QPID_LOG(debug, logPrefix << "Deleted on primary: " << msg);
+ QPID_LOG(debug, logPrefix << "Deleted on primary: "
+ << framing::createSessionException(e, msg).what());
destroy();
+ return true;
}
- else
- QPID_LOG(error, logPrefix << "Incoming execution exception: " << msg);
+ return false;
}
// Unused Exchange methods.
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h
index 22cd13a0a8..f94c6de116 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.h
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h
@@ -46,11 +46,13 @@ class HaBroker;
class Settings;
/**
- * Exchange created on a backup broker to replicate a queue on the primary.
+ * Exchange created on a backup broker to receive replicated messages and
+ * replication events from a queue on the primary. It subscribes to the primary
+ * queue via a ReplicatingSubscription on the primary by passing special
+ * arguments to the subscribe command.
*
- * Puts replicated messages on the local queue, handles dequeue events.
- * Creates a ReplicatingSubscription on the primary by passing special
- * arguments to the consume command.
+ * It puts replicated messages on the local replica queue and handles dequeue
+ * events by removing local messages.
*
* THREAD SAFE: Called in different connection threads.
*/
@@ -74,7 +76,7 @@ class QueueReplicator : public broker::Exchange,
void disconnect(); // Called when we are disconnected from the primary.
- std::string getType() const;
+ virtual std::string getType() const;
void route(broker::Deliverable&);
@@ -101,7 +103,9 @@ class QueueReplicator : public broker::Exchange,
void initialize(); // Called as part of create()
virtual void deliver(const broker::Message&);
+
virtual void destroy(); // Called when the queue is destroyed.
+ virtual void destroy(sys::Mutex::ScopedLock&);
sys::Mutex lock;
HaBroker& haBroker;
@@ -124,8 +128,7 @@ class QueueReplicator : public broker::Exchange,
void dequeueEvent(const std::string& data, sys::Mutex::ScopedLock&);
void idEvent(const std::string& data, sys::Mutex::ScopedLock&);
- void incomingExecutionException(framing::execution::ErrorCode e,
- const std::string& msg);
+ bool deletedOnPrimary(framing::execution::ErrorCode e, const std::string& msg);
std::string logPrefix;
std::string bridgeName;
diff --git a/qpid/cpp/src/qpid/ha/TxReplicator.cpp b/qpid/cpp/src/qpid/ha/TxReplicator.cpp
index 7ff03b5f92..d2a647ae8f 100644
--- a/qpid/cpp/src/qpid/ha/TxReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/TxReplicator.cpp
@@ -87,7 +87,7 @@ TxReplicator::TxReplicator(
QueueReplicator(hb, txQueue, link),
store(hb.getBroker().hasStore() ? &hb.getBroker().getStore() : 0),
channel(link->nextChannel()),
- ended(false),
+ empty(true), ended(false),
dequeueState(hb.getBroker().getQueues())
{
string id(getTxId(txQueue->getName()));
@@ -151,6 +151,7 @@ void TxReplicator::enqueue(const string& data, sys::Mutex::ScopedLock&) {
decodeStr(data, e);
QPID_LOG(trace, logPrefix << "Enqueue: " << e);
enq = e;
+ empty = false;
}
void TxReplicator::dequeue(const string& data, sys::Mutex::ScopedLock&) {
@@ -163,6 +164,7 @@ void TxReplicator::dequeue(const string& data, sys::Mutex::ScopedLock&) {
// prepared, then they are all receieved before the prepare event.
// We collect the events here so we can do a single scan of the queue in prepare.
dequeueState.add(e);
+ empty = false;
}
void TxReplicator::DequeueState::add(const TxDequeueEvent& event) {
@@ -227,7 +229,9 @@ void TxReplicator::commit(const string&, sys::Mutex::ScopedLock& l) {
void TxReplicator::rollback(const string&, sys::Mutex::ScopedLock& l) {
if (!txBuffer) return;
- QPID_LOG(debug, logPrefix << "Rollback");
+ // Don't bleat about rolling back empty transactions, this happens all the time
+ // when a session closes and rolls back its outstanding transaction.
+ if (!empty) QPID_LOG(debug, logPrefix << "Rollback");
if (context.get()) store->abort(*context);
txBuffer->rollback();
end(l);
@@ -255,15 +259,12 @@ void TxReplicator::end(sys::Mutex::ScopedLock&) {
}
// Called when the tx queue is deleted.
-void TxReplicator::destroy() {
- {
- sys::Mutex::ScopedLock l(lock);
- if (!ended) {
- QPID_LOG(error, logPrefix << "Destroyed prematurely, rollback.");
- rollback(string(), l);
- }
+void TxReplicator::destroy(sys::Mutex::ScopedLock& l) {
+ if (!ended) {
+ if (!empty) QPID_LOG(error, logPrefix << "Destroyed prematurely, rollback");
+ rollback(string(), l);
}
- QueueReplicator::destroy();
+ QueueReplicator::destroy(l);
}
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/TxReplicator.h b/qpid/cpp/src/qpid/ha/TxReplicator.h
index 7f1256699a..5c509d14a7 100644
--- a/qpid/cpp/src/qpid/ha/TxReplicator.h
+++ b/qpid/cpp/src/qpid/ha/TxReplicator.h
@@ -67,7 +67,8 @@ class TxReplicator : public QueueReplicator {
// QueueReplicator overrides
void route(broker::Deliverable& deliverable);
- void destroy();
+ using QueueReplicator::destroy;
+ void destroy(sys::Mutex::ScopedLock&);
protected: