summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2013-07-04 15:30:11 +0000
committerAlan Conway <aconway@apache.org>2013-07-04 15:30:11 +0000
commita87428ea534cd63025bbc480f4c0f53f5fb58f3a (patch)
treee30b59b1a03fbefb1a2eb70861feaa01d7842707
parentff4ab4da4ea40107a593fdc1b87de0977bb49a3f (diff)
downloadqpid-python-a87428ea534cd63025bbc480f4c0f53f5fb58f3a.tar.gz
QPID-4944: HA Sporadic failure - logging improvements used to investigate.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1499788 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp4
-rw-r--r--qpid/cpp/src/qpid/ha/IdSetter.h7
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/QueueGuard.cpp5
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp5
-rw-r--r--qpid/cpp/src/tests/brokertest.py1
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py3
7 files changed, 14 insertions, 13 deletions
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
index f43119ea4c..668b500570 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
@@ -316,8 +316,8 @@ bool ConnectionImpl::resetSessions(const sys::Mutex::ScopedLock& )
getImplPtr(i->second)->setSession(connection.newSession(i->first));
}
return true;
- } catch (const qpid::TransportFailure&) {
- QPID_LOG(debug, "Connection failed while re-initialising sessions");
+ } catch (const qpid::TransportFailure& e) {
+ QPID_LOG(debug, "Connection Failed to re-initialize sessions: " << e.what());
return false;
} catch (const qpid::framing::ResourceLimitExceededException& e) {
if (reconnectOnLimitExceeded) {
diff --git a/qpid/cpp/src/qpid/ha/IdSetter.h b/qpid/cpp/src/qpid/ha/IdSetter.h
index a56dbc810a..dc9fa90af9 100644
--- a/qpid/cpp/src/qpid/ha/IdSetter.h
+++ b/qpid/cpp/src/qpid/ha/IdSetter.h
@@ -44,13 +44,10 @@ class IdSetter : public broker::MessageInterceptor
{
public:
IdSetter(const std::string& q, ReplicationId firstId) : nextId(firstId), name(q) {
- QPID_LOG(trace, "Initial replication ID for " << name << " is " << nextId.get());
+ QPID_LOG(trace, "Initial replication ID for " << name << " =" << nextId.get());
}
- void record(broker::Message& m) {
- m.setReplicationId(nextId++);
- QPID_LOG(trace, "Recorded replication ID " << m.getReplicationId() << " on " << name);
- }
+ void record(broker::Message& m) { m.setReplicationId(nextId++); }
private:
sys::AtomicValue<uint32_t> nextId;
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp
index 30a0ac5da9..beabab4e32 100644
--- a/qpid/cpp/src/qpid/ha/Primary.cpp
+++ b/qpid/cpp/src/qpid/ha/Primary.cpp
@@ -329,7 +329,7 @@ void Primary::closed(broker::Connection& connection) {
// are a backup, but closed() is called after we have become primary.
// Checking isConnected() lets us ignore such spurious closes.
if (i == backups.end()) {
- QPID_LOG(info, "Disconnect from unknown backup " << info);
+ QPID_LOG(info, logPrefix << "Disconnect from unknown backup " << info);
}
else if (i->second->getConnection() != &connection) {
QPID_LOG(info, logPrefix << "Late disconnect from backup " << info);
diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.cpp b/qpid/cpp/src/qpid/ha/QueueGuard.cpp
index 2108bc4077..d2792e5e17 100644
--- a/qpid/cpp/src/qpid/ha/QueueGuard.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueGuard.cpp
@@ -70,9 +70,9 @@ QueueGuard::~QueueGuard() { cancel(); }
void QueueGuard::enqueued(const Message& m) {
// Delay completion
ReplicationId id = m.getReplicationId();
- QPID_LOG(trace, logPrefix << "Delayed completion of " << LogMessageId(queue, m));
Mutex::ScopedLock l(lock);
if (cancelled) return; // Don't record enqueues after we are cancelled.
+ QPID_LOG(trace, logPrefix << "Delayed completion of " << LogMessageId(queue, m));
delayed[id] = m.getIngressCompletion();
m.getIngressCompletion()->startCompleter();
}
@@ -89,6 +89,7 @@ void QueueGuard::cancel() {
queue.removeObserver(observer);
Mutex::ScopedLock l(lock);
if (cancelled) return;
+ QPID_LOG(debug, logPrefix << "Cancelled");
cancelled = true;
while (!delayed.empty()) complete(delayed.begin(), l);
}
@@ -111,7 +112,7 @@ bool QueueGuard::complete(ReplicationId id, Mutex::ScopedLock& l) {
}
void QueueGuard::complete(Delayed::iterator i, Mutex::ScopedLock&) {
- QPID_LOG(trace, logPrefix << "Completed " << i->first);
+ QPID_LOG(trace, logPrefix << "Completed " << queue.getName() << " =" << i->first);
i->second->finishCompleter();
delayed.erase(i);
}
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index 976619397e..4ccdfdfb74 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -168,7 +168,6 @@ ReplicatingSubscription::ReplicatingSubscription(
QPID_LOG(debug, logPrefix << "Subscribed: front " << front
<< ", back " << back
- << ", start " << position
<< ", guarded " << guard->getFirst()
<< ", on backup " << skip);
checkReady(l);
@@ -218,6 +217,8 @@ bool ReplicatingSubscription::deliver(
bool result = false;
if (skip.contains(id)) {
skip -= id;
+ QPID_LOG(trace, logPrefix << "On backup, skip " <<
+ LogMessageId(*getQueue(), m));
guard->complete(id); // This will never be acknowledged.
result = false;
}
@@ -269,7 +270,7 @@ void ReplicatingSubscription::acknowledged(const broker::DeliveryRecord& r) {
// Finish completion of message, it has been acknowledged by the backup.
ReplicationId id = r.getReplicationId();
QPID_LOG(trace, logPrefix << "Acknowledged " <<
- LogMessageId(*getQueue(), r.getMessageId(), r.getReplicationId()));
+ LogMessageId(*getQueue(), r.getMessageId(), id));
guard->complete(id);
{
Mutex::ScopedLock l(lock);
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index fbbaadc350..220c5f4367 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -534,6 +534,7 @@ class NumberedSender(Thread):
self.received = 0
self.stopped = False
self.error = None
+ self.queue = queue
def write_message(self, n):
self.sender.stdin.write(str(n)+"\n")
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index a42810dcc0..2235e87dc5 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -949,7 +949,8 @@ class LongTests(HaBrokerTest):
def check():
r.check() # Verify no exceptions
return r.received > n + 100
- assert retry(check), "Stalled %s at %s"%(r.queue, n)
+ assert retry(check), "Stalled %s waiting for %s, sent %s"%(
+ r.queue, n, [s for s in senders if s.queue==r.queue][0].sent)
for r in receivers: wait_passed(r, 0)