summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2013-07-04 15:30:19 +0000
committerAlan Conway <aconway@apache.org>2013-07-04 15:30:19 +0000
commit913c1dff25da868988b337e04f5cf5067bea3590 (patch)
tree3ceaf1ce7807502e276414e6ac73d3afb92b6d12
parenta87428ea534cd63025bbc480f4c0f53f5fb58f3a (diff)
downloadqpid-python-913c1dff25da868988b337e04f5cf5067bea3590.tar.gz
QPID-4944: HA Sporadic failure: test_failover_send_receive
Test failing if run as: ha_tests.py -DDURATION=2 AssertionError: Stalled test0 waiting for 248, sent 1228 The problem was a missing call to notify() when a ReplicatingSubscription skipped a message. That resulted in very long (>1s) delays between skipped messages which caused the test to time out. Changes: - ReplicatingSubscription::deliver call notify() to keep consumer active. - Re-enable test_failover_send_receive. - Increase default credit for replicating subscription to match qpid-send. - Rename ReplicatingSubscription::unacked as unready, clearer meaning. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1499789 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp10
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.h2
-rw-r--r--qpid/cpp/src/qpid/ha/Settings.h2
-rw-r--r--qpid/cpp/src/qpid/ha/StatusCheck.h2
-rw-r--r--qpid/cpp/src/tests/brokertest.py3
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py10
6 files changed, 14 insertions, 15 deletions
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index 4ccdfdfb74..1ede47ed60 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -220,12 +220,12 @@ bool ReplicatingSubscription::deliver(
QPID_LOG(trace, logPrefix << "On backup, skip " <<
LogMessageId(*getQueue(), m));
guard->complete(id); // This will never be acknowledged.
- result = false;
+ notify();
+ result = true;
}
else {
QPID_LOG(trace, logPrefix << "Replicated " << LogMessageId(*getQueue(), m));
- // Only consider unguarded messages for ready status.
- if (!ready && !isGuarded(l)) unacked += id;
+ if (!ready && !isGuarded(l)) unready += id;
sendIdEvent(id, l);
result = ConsumerImpl::deliver(c, m);
}
@@ -242,7 +242,7 @@ bool ReplicatingSubscription::deliver(
*@param position: must be <= last position seen by subscription.
*/
void ReplicatingSubscription::checkReady(sys::Mutex::ScopedLock& l) {
- if (!ready && isGuarded(l) && unacked.empty()) {
+ if (!ready && isGuarded(l) && unready.empty()) {
ready = true;
sys::Mutex::ScopedUnlock u(lock);
// Notify Primary that a subscription is ready.
@@ -274,7 +274,7 @@ void ReplicatingSubscription::acknowledged(const broker::DeliveryRecord& r) {
guard->complete(id);
{
Mutex::ScopedLock l(lock);
- unacked -= id;
+ unready -= id;
checkReady(l);
}
ConsumerImpl::acknowledged(r);
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
index 4d572c7f17..c202089e91 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
@@ -129,7 +129,7 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl
QueuePosition position;
ReplicationIdSet dequeues; // Dequeues to be sent in next dequeue event.
ReplicationIdSet skip; // Messages already on backup will be skipped.
- ReplicationIdSet unacked; // Replicated but un-acknowledged.
+ ReplicationIdSet unready; // Unguarded, replicated and un-acknowledged.
bool ready;
bool cancelled;
BrokerInfo info;
diff --git a/qpid/cpp/src/qpid/ha/Settings.h b/qpid/cpp/src/qpid/ha/Settings.h
index be735cfe0d..9d5a5e6ae0 100644
--- a/qpid/cpp/src/qpid/ha/Settings.h
+++ b/qpid/cpp/src/qpid/ha/Settings.h
@@ -38,7 +38,7 @@ class Settings
public:
Settings() : cluster(false), queueReplication(false),
replicateDefault(NONE), backupTimeout(10*sys::TIME_SEC),
- flowMessages(100), flowBytes(0)
+ flowMessages(1000), flowBytes(0)
{}
bool cluster; // True if we are a cluster member.
diff --git a/qpid/cpp/src/qpid/ha/StatusCheck.h b/qpid/cpp/src/qpid/ha/StatusCheck.h
index 924018c50e..65ad3cefcf 100644
--- a/qpid/cpp/src/qpid/ha/StatusCheck.h
+++ b/qpid/cpp/src/qpid/ha/StatusCheck.h
@@ -33,7 +33,7 @@
namespace qpid {
namespace ha {
-// FIXME aconway 2012-12-21: This solution is incomplete. It will only protect
+// TODO aconway 2012-12-21: This solution is incomplete. It will only protect
// against bad promotion if there are READY brokers when this broker starts.
// It will not help the situation where brokers became READY after this one starts.
//
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index 220c5f4367..15372b312d 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -578,7 +578,7 @@ class NumberedReceiver(Thread):
"""
def __init__(self, broker, sender=None, queue="test-queue",
connection_options=RECONNECT_OPTIONS,
- failover_updates=True, url=None):
+ failover_updates=True, url=None, args=[]):
"""
sender: enable flow control. Call sender.received(n) for each message received.
"""
@@ -591,6 +591,7 @@ class NumberedReceiver(Thread):
"--forever"
]
if failover_updates: cmd += [ "--failover-updates" ]
+ cmd += args
self.receiver = self.test.popen(
cmd, expect=EXPECT_RUNNING, stdout=PIPE)
self.lock = Lock()
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 2235e87dc5..f3360658d3 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -927,9 +927,7 @@ class LongTests(HaBrokerTest):
if d: return float(d)*60
else: return 3 # Default is to be quick
- # FIXME aconway 2013-06-27: skip this test pending a fix for
- # https://issues.apache.org/jira/browse/QPID-4944
- def skip_test_failover_send_receive(self):
+ def test_failover_send_receive(self):
"""Test failover with continuous send-receive"""
brokers = HaCluster(self, 3)
@@ -937,7 +935,7 @@ class LongTests(HaBrokerTest):
n = 10
senders = [NumberedSender(brokers[0], url=brokers.url,
max_depth=1024, failover_updates=False,
- queue="test%s"%(i)) for i in xrange(n)]
+ queue="test%s"%(i)) for i in xrange(n)]
receivers = [NumberedReceiver(brokers[0], url=brokers.url, sender=senders[i],
failover_updates=False,
queue="test%s"%(i)) for i in xrange(n)]
@@ -966,7 +964,7 @@ class LongTests(HaBrokerTest):
# one or two backups are running,
for s in senders: s.sender.assert_running()
for r in receivers: r.receiver.assert_running()
- checkpoint = [ r.received+100 for r in receivers ]
+ checkpoint = [ r.received+10 for r in receivers ]
victim = random.choice([0,1,2,primary]) # Give the primary a better chance.
if victim == primary:
# Don't kill primary till it is active and the next
@@ -982,7 +980,7 @@ class LongTests(HaBrokerTest):
# Make sure we are not stalled
map(wait_passed, receivers, checkpoint)
# Run another checkpoint to ensure things work in this configuration
- checkpoint = [ r.received+100 for r in receivers ]
+ checkpoint = [ r.received+10 for r in receivers ]
map(wait_passed, receivers, checkpoint)
i += 1
except: