summaryrefslogtreecommitdiff
path: root/test/test_failover_integration.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_failover_integration.py')
-rw-r--r--test/test_failover_integration.py18
1 files changed, 12 insertions, 6 deletions
diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py
index c23ab14..631068a 100644
--- a/test/test_failover_integration.py
+++ b/test/test_failover_integration.py
@@ -92,7 +92,6 @@ class TestFailover(KafkaIntegrationTestCase):
# Should be equal to 100 before + 1 recovery + 100 after
self.assert_message_count(topic, 201, partitions=(partition,))
-
@kafka_versions("all")
def test_switch_leader_async(self):
topic = self.topic
@@ -129,7 +128,8 @@ class TestFailover(KafkaIntegrationTestCase):
# count number of messages
# Should be equal to 10 before + 1 recovery + 10 after
- self.assert_message_count(topic, 21, partitions=(partition,))
+ self.assert_message_count(topic, 21, partitions=(partition,),
+ at_least=True)
@kafka_versions("all")
def test_switch_leader_keyed_producer(self):
@@ -184,12 +184,12 @@ class TestFailover(KafkaIntegrationTestCase):
broker.close()
return broker
- def assert_message_count(self, topic, check_count, timeout=10, partitions=None):
+ def assert_message_count(self, topic, check_count, timeout=10,
+ partitions=None, at_least=False):
hosts = ','.join(['%s:%d' % (broker.host, broker.port)
for broker in self.brokers])
client = KafkaClient(hosts)
- group = random_string(10)
consumer = SimpleConsumer(client, None, topic,
partitions=partitions,
auto_commit=False,
@@ -199,10 +199,16 @@ class TestFailover(KafkaIntegrationTestCase):
pending = consumer.pending(partitions)
# Keep checking if it isn't immediately correct, subject to timeout
- while pending != check_count and (time.time() - started_at < timeout):
+ while pending < check_count and (time.time() - started_at < timeout):
pending = consumer.pending(partitions)
consumer.stop()
client.close()
- self.assertEqual(pending, check_count)
+ if pending < check_count:
+ self.fail('Too few pending messages: found %d, expected %d' %
+ (pending, check_count))
+ elif pending > check_count and not at_least:
+ self.fail('Too many pending messages: found %d, expected %d' %
+ (pending, check_count))
+ return True