diff options
author | Dana Powers <dana.powers@rd.io> | 2015-06-04 23:44:42 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-06-06 16:47:26 -0700 |
commit | 26abd24dde1d50d79f5c4e39776de8e89f586e0b (patch) | |
tree | 7efcce9ef03f62decaf6e858209df4db56cebe9c /test/test_failover_integration.py | |
parent | 9ac0f057b621c8706e8790b3c10295ef848121c3 (diff) | |
download | kafka-python-26abd24dde1d50d79f5c4e39776de8e89f586e0b.tar.gz |
add option to check for at-least-once message delivery in failover tests
Diffstat (limited to 'test/test_failover_integration.py')
-rw-r--r-- | test/test_failover_integration.py | 18 |
1 files changed, 12 insertions, 6 deletions
diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py index c23ab14..631068a 100644 --- a/test/test_failover_integration.py +++ b/test/test_failover_integration.py @@ -92,7 +92,6 @@ class TestFailover(KafkaIntegrationTestCase): # Should be equal to 100 before + 1 recovery + 100 after self.assert_message_count(topic, 201, partitions=(partition,)) - @kafka_versions("all") def test_switch_leader_async(self): topic = self.topic @@ -129,7 +128,8 @@ class TestFailover(KafkaIntegrationTestCase): # count number of messages # Should be equal to 10 before + 1 recovery + 10 after - self.assert_message_count(topic, 21, partitions=(partition,)) + self.assert_message_count(topic, 21, partitions=(partition,), + at_least=True) @kafka_versions("all") def test_switch_leader_keyed_producer(self): @@ -184,12 +184,12 @@ class TestFailover(KafkaIntegrationTestCase): broker.close() return broker - def assert_message_count(self, topic, check_count, timeout=10, partitions=None): + def assert_message_count(self, topic, check_count, timeout=10, + partitions=None, at_least=False): hosts = ','.join(['%s:%d' % (broker.host, broker.port) for broker in self.brokers]) client = KafkaClient(hosts) - group = random_string(10) consumer = SimpleConsumer(client, None, topic, partitions=partitions, auto_commit=False, @@ -199,10 +199,16 @@ class TestFailover(KafkaIntegrationTestCase): pending = consumer.pending(partitions) # Keep checking if it isn't immediately correct, subject to timeout - while pending != check_count and (time.time() - started_at < timeout): + while pending < check_count and (time.time() - started_at < timeout): pending = consumer.pending(partitions) consumer.stop() client.close() - self.assertEqual(pending, check_count) + if pending < check_count: + self.fail('Too few pending messages: found %d, expected %d' % + (pending, check_count)) + elif pending > check_count and not at_least: + self.fail('Too many pending messages: found %d, expected %d' % + (pending, check_count)) + return True |