summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/ha/QueueReplicator.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/ha/QueueReplicator.cpp')
-rw-r--r--cpp/src/qpid/ha/QueueReplicator.cpp45
1 files changed, 30 insertions, 15 deletions
diff --git a/cpp/src/qpid/ha/QueueReplicator.cpp b/cpp/src/qpid/ha/QueueReplicator.cpp
index 507df6ea5a..59b2013f59 100644
--- a/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -73,17 +73,26 @@ void QueueReplicator::copy(ExchangeRegistry& registry, Vector& result) {
registry.eachExchange(boost::bind(&pushIfQr, boost::ref(result), _1));
}
+// Debug log expected exceptions on queue replicator, check incoming execution
+// exceptions for "deleted on primary" conditions.
class QueueReplicator::ErrorListener : public SessionHandler::ErrorListener {
public:
ErrorListener(const boost::shared_ptr<QueueReplicator>& qr)
: queueReplicator(qr), logPrefix(qr->logPrefix) {}
- void connectionException(framing::connection::CloseCode, const std::string&) {}
- void channelException(framing::session::DetachCode, const std::string&) {}
- void executionException(framing::execution::ErrorCode, const std::string&) {}
-
- void incomingExecutionException(ErrorCode e, const std::string& msg) {
- queueReplicator->incomingExecutionException(e, msg);
+ void connectionException(framing::connection::CloseCode code, const std::string& msg) {
+ QPID_LOG(debug, logPrefix << framing::createConnectionException(code, msg).what());
+ }
+ void channelException(framing::session::DetachCode code, const std::string& msg) {
+ QPID_LOG(debug, logPrefix << framing::createChannelException(code, msg).what());
+ }
+ void executionException(framing::execution::ErrorCode code, const std::string& msg) {
+ QPID_LOG(debug, logPrefix << framing::createSessionException(code, msg).what());
+ }
+ void incomingExecutionException(ErrorCode code, const std::string& msg) {
+ if (!queueReplicator->deletedOnPrimary(code, msg))
+ QPID_LOG(error, logPrefix << "Incoming "
+ << framing::createSessionException(code, msg).what());
}
void detach() {
QPID_LOG(debug, logPrefix << "Session detached");
@@ -197,20 +206,25 @@ void QueueReplicator::disconnect() {
// Called from Queue::destroyed()
void QueueReplicator::destroy() {
+ QPID_LOG(debug, logPrefix << "Destroyed");
boost::shared_ptr<Bridge> bridge2; // To call outside of lock
{
Mutex::ScopedLock l(lock);
if (!queue) return; // Already destroyed
- QPID_LOG(debug, logPrefix << "Destroyed");
bridge2 = bridge; // call close outside the lock.
- // Need to drop shared pointers to avoid pointer cycles keeping this in memory.
- queue.reset();
- bridge.reset();
- getBroker()->getExchanges().destroy(getName());
+ destroy(l);
}
if (bridge2) bridge2->close(); // Outside of lock, avoid deadlock.
}
+void QueueReplicator::destroy(Mutex::ScopedLock&) {
+ // Need to drop shared pointers to avoid pointer cycles keeping this in memory.
+ queue.reset();
+ bridge.reset();
+ getBroker()->getExchanges().destroy(getName());
+}
+
+
// Called in a broker connection thread when the bridge is created.
// Note: called with the Link lock held.
void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler_) {
@@ -306,18 +320,19 @@ void QueueReplicator::idEvent(const string& data, Mutex::ScopedLock&) {
nextId = decodeStr<IdEvent>(data).id;
}
-void QueueReplicator::incomingExecutionException(ErrorCode e, const std::string& msg) {
+bool QueueReplicator::deletedOnPrimary(ErrorCode e, const std::string& msg) {
if (e == ERROR_CODE_NOT_FOUND || e == ERROR_CODE_RESOURCE_DELETED) {
// If the queue is destroyed at the same time we are subscribing, we may
// get a not-found or resource-deleted exception before the
// BrokerReplicator gets the queue-delete event. Shut down the bridge by
// calling destroy(), we can let the BrokerReplicator delete the queue
// when the queue-delete arrives.
- QPID_LOG(debug, logPrefix << "Deleted on primary: " << msg);
+ QPID_LOG(debug, logPrefix << "Deleted on primary: "
+ << framing::createSessionException(e, msg).what());
destroy();
+ return true;
}
- else
- QPID_LOG(error, logPrefix << "Incoming execution exception: " << msg);
+ return false;
}
// Unused Exchange methods.