diff options
Diffstat (limited to 'test/test_failover_integration.py')
-rw-r--r-- | test/test_failover_integration.py | 18 |
1 files changed, 12 insertions, 6 deletions
diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py index c23ab14..631068a 100644 --- a/test/test_failover_integration.py +++ b/test/test_failover_integration.py @@ -92,7 +92,6 @@ class TestFailover(KafkaIntegrationTestCase): # Should be equal to 100 before + 1 recovery + 100 after self.assert_message_count(topic, 201, partitions=(partition,)) - @kafka_versions("all") def test_switch_leader_async(self): topic = self.topic @@ -129,7 +128,8 @@ class TestFailover(KafkaIntegrationTestCase): # count number of messages # Should be equal to 10 before + 1 recovery + 10 after - self.assert_message_count(topic, 21, partitions=(partition,)) + self.assert_message_count(topic, 21, partitions=(partition,), + at_least=True) @kafka_versions("all") def test_switch_leader_keyed_producer(self): @@ -184,12 +184,12 @@ class TestFailover(KafkaIntegrationTestCase): broker.close() return broker - def assert_message_count(self, topic, check_count, timeout=10, partitions=None): + def assert_message_count(self, topic, check_count, timeout=10, + partitions=None, at_least=False): hosts = ','.join(['%s:%d' % (broker.host, broker.port) for broker in self.brokers]) client = KafkaClient(hosts) - group = random_string(10) consumer = SimpleConsumer(client, None, topic, partitions=partitions, auto_commit=False, @@ -199,10 +199,16 @@ class TestFailover(KafkaIntegrationTestCase): pending = consumer.pending(partitions) # Keep checking if it isn't immediately correct, subject to timeout - while pending != check_count and (time.time() - started_at < timeout): + while pending < check_count and (time.time() - started_at < timeout): pending = consumer.pending(partitions) consumer.stop() client.close() - self.assertEqual(pending, check_count) + if pending < check_count: + self.fail('Too few pending messages: found %d, expected %d' % + (pending, check_count)) + elif pending > check_count and not at_least: + self.fail('Too many pending messages: found %d, expected %d' % + (pending, check_count)) + return True |