diff options
author | Alan Conway <aconway@apache.org> | 2013-07-04 15:30:19 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2013-07-04 15:30:19 +0000 |
commit | 913c1dff25da868988b337e04f5cf5067bea3590 (patch) | |
tree | 3ceaf1ce7807502e276414e6ac73d3afb92b6d12 | |
parent | a87428ea534cd63025bbc480f4c0f53f5fb58f3a (diff) | |
download | qpid-python-913c1dff25da868988b337e04f5cf5067bea3590.tar.gz |
QPID-4944: HA Sporadic failure: test_failover_send_receive
Test failing if run as: ha_tests.py -DDURATION=2
AssertionError: Stalled test0 waiting for 248, sent 1228
The problem was a missing call to notify() when a ReplicatingSubscription
skipped a message. That resulted in very long (>1s) delays between skipped
messages which caused the test to time out.
Changes:
- ReplicatingSubscription::deliver call notify() to keep consumer active.
- Re-enable test_failover_send_receive.
- Increase default credit for replicating subscription to match qpid-send.
- Rename ReplicatingSubscription::unacked as unready, clearer meaning.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1499789 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp | 10 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/Settings.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/StatusCheck.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/tests/brokertest.py | 3 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 10 |
6 files changed, 14 insertions, 15 deletions
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp index 4ccdfdfb74..1ede47ed60 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -220,12 +220,12 @@ bool ReplicatingSubscription::deliver( QPID_LOG(trace, logPrefix << "On backup, skip " << LogMessageId(*getQueue(), m)); guard->complete(id); // This will never be acknowledged. - result = false; + notify(); + result = true; } else { QPID_LOG(trace, logPrefix << "Replicated " << LogMessageId(*getQueue(), m)); - // Only consider unguarded messages for ready status. - if (!ready && !isGuarded(l)) unacked += id; + if (!ready && !isGuarded(l)) unready += id; sendIdEvent(id, l); result = ConsumerImpl::deliver(c, m); } @@ -242,7 +242,7 @@ bool ReplicatingSubscription::deliver( *@param position: must be <= last position seen by subscription. */ void ReplicatingSubscription::checkReady(sys::Mutex::ScopedLock& l) { - if (!ready && isGuarded(l) && unacked.empty()) { + if (!ready && isGuarded(l) && unready.empty()) { ready = true; sys::Mutex::ScopedUnlock u(lock); // Notify Primary that a subscription is ready. @@ -274,7 +274,7 @@ void ReplicatingSubscription::acknowledged(const broker::DeliveryRecord& r) { guard->complete(id); { Mutex::ScopedLock l(lock); - unacked -= id; + unready -= id; checkReady(l); } ConsumerImpl::acknowledged(r); diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h index 4d572c7f17..c202089e91 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h @@ -129,7 +129,7 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl QueuePosition position; ReplicationIdSet dequeues; // Dequeues to be sent in next dequeue event. ReplicationIdSet skip; // Messages already on backup will be skipped. - ReplicationIdSet unacked; // Replicated but un-acknowledged. + ReplicationIdSet unready; // Unguarded, replicated and un-acknowledged. bool ready; bool cancelled; BrokerInfo info; diff --git a/qpid/cpp/src/qpid/ha/Settings.h b/qpid/cpp/src/qpid/ha/Settings.h index be735cfe0d..9d5a5e6ae0 100644 --- a/qpid/cpp/src/qpid/ha/Settings.h +++ b/qpid/cpp/src/qpid/ha/Settings.h @@ -38,7 +38,7 @@ class Settings public: Settings() : cluster(false), queueReplication(false), replicateDefault(NONE), backupTimeout(10*sys::TIME_SEC), - flowMessages(100), flowBytes(0) + flowMessages(1000), flowBytes(0) {} bool cluster; // True if we are a cluster member. diff --git a/qpid/cpp/src/qpid/ha/StatusCheck.h b/qpid/cpp/src/qpid/ha/StatusCheck.h index 924018c50e..65ad3cefcf 100644 --- a/qpid/cpp/src/qpid/ha/StatusCheck.h +++ b/qpid/cpp/src/qpid/ha/StatusCheck.h @@ -33,7 +33,7 @@ namespace qpid { namespace ha { -// FIXME aconway 2012-12-21: This solution is incomplete. It will only protect +// TODO aconway 2012-12-21: This solution is incomplete. It will only protect // against bad promotion if there are READY brokers when this broker starts. // It will not help the situation where brokers became READY after this one starts. // diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py index 220c5f4367..15372b312d 100644 --- a/qpid/cpp/src/tests/brokertest.py +++ b/qpid/cpp/src/tests/brokertest.py @@ -578,7 +578,7 @@ class NumberedReceiver(Thread): """ def __init__(self, broker, sender=None, queue="test-queue", connection_options=RECONNECT_OPTIONS, - failover_updates=True, url=None): + failover_updates=True, url=None, args=[]): """ sender: enable flow control. Call sender.received(n) for each message received. """ @@ -591,6 +591,7 @@ class NumberedReceiver(Thread): "--forever" ] if failover_updates: cmd += [ "--failover-updates" ] + cmd += args self.receiver = self.test.popen( cmd, expect=EXPECT_RUNNING, stdout=PIPE) self.lock = Lock() diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 2235e87dc5..f3360658d3 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -927,9 +927,7 @@ class LongTests(HaBrokerTest): if d: return float(d)*60 else: return 3 # Default is to be quick - # FIXME aconway 2013-06-27: skip this test pending a fix for - # https://issues.apache.org/jira/browse/QPID-4944 - def skip_test_failover_send_receive(self): + def test_failover_send_receive(self): """Test failover with continuous send-receive""" brokers = HaCluster(self, 3) @@ -937,7 +935,7 @@ class LongTests(HaBrokerTest): n = 10 senders = [NumberedSender(brokers[0], url=brokers.url, max_depth=1024, failover_updates=False, - queue="test%s"%(i)) for i in xrange(n)] + queue="test%s"%(i)) for i in xrange(n)] receivers = [NumberedReceiver(brokers[0], url=brokers.url, sender=senders[i], failover_updates=False, queue="test%s"%(i)) for i in xrange(n)] @@ -966,7 +964,7 @@ class LongTests(HaBrokerTest): # one or two backups are running, for s in senders: s.sender.assert_running() for r in receivers: r.receiver.assert_running() - checkpoint = [ r.received+100 for r in receivers ] + checkpoint = [ r.received+10 for r in receivers ] victim = random.choice([0,1,2,primary]) # Give the primary a better chance. if victim == primary: # Don't kill primary till it is active and the next @@ -982,7 +980,7 @@ class LongTests(HaBrokerTest): # Make sure we are not stalled map(wait_passed, receivers, checkpoint) # Run another checkpoint to ensure things work in this configuration - checkpoint = [ r.received+100 for r in receivers ] + checkpoint = [ r.received+10 for r in receivers ] map(wait_passed, receivers, checkpoint) i += 1 except: |