summaryrefslogtreecommitdiff
path: root/test/test_failover_integration.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-06-04 23:44:42 -0700
committerDana Powers <dana.powers@rd.io>2015-06-06 16:47:26 -0700
commit26abd24dde1d50d79f5c4e39776de8e89f586e0b (patch)
tree7efcce9ef03f62decaf6e858209df4db56cebe9c /test/test_failover_integration.py
parent9ac0f057b621c8706e8790b3c10295ef848121c3 (diff)
downloadkafka-python-26abd24dde1d50d79f5c4e39776de8e89f586e0b.tar.gz
add option to check for at-least-once message delivery in failover tests
Diffstat (limited to 'test/test_failover_integration.py')
-rw-r--r--test/test_failover_integration.py18
1 files changed, 12 insertions, 6 deletions
diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py
index c23ab14..631068a 100644
--- a/test/test_failover_integration.py
+++ b/test/test_failover_integration.py
@@ -92,7 +92,6 @@ class TestFailover(KafkaIntegrationTestCase):
# Should be equal to 100 before + 1 recovery + 100 after
self.assert_message_count(topic, 201, partitions=(partition,))
-
@kafka_versions("all")
def test_switch_leader_async(self):
topic = self.topic
@@ -129,7 +128,8 @@ class TestFailover(KafkaIntegrationTestCase):
# count number of messages
# Should be equal to 10 before + 1 recovery + 10 after
- self.assert_message_count(topic, 21, partitions=(partition,))
+ self.assert_message_count(topic, 21, partitions=(partition,),
+ at_least=True)
@kafka_versions("all")
def test_switch_leader_keyed_producer(self):
@@ -184,12 +184,12 @@ class TestFailover(KafkaIntegrationTestCase):
broker.close()
return broker
- def assert_message_count(self, topic, check_count, timeout=10, partitions=None):
+ def assert_message_count(self, topic, check_count, timeout=10,
+ partitions=None, at_least=False):
hosts = ','.join(['%s:%d' % (broker.host, broker.port)
for broker in self.brokers])
client = KafkaClient(hosts)
- group = random_string(10)
consumer = SimpleConsumer(client, None, topic,
partitions=partitions,
auto_commit=False,
@@ -199,10 +199,16 @@ class TestFailover(KafkaIntegrationTestCase):
pending = consumer.pending(partitions)
# Keep checking if it isn't immediately correct, subject to timeout
- while pending != check_count and (time.time() - started_at < timeout):
+ while pending < check_count and (time.time() - started_at < timeout):
pending = consumer.pending(partitions)
consumer.stop()
client.close()
- self.assertEqual(pending, check_count)
+ if pending < check_count:
+ self.fail('Too few pending messages: found %d, expected %d' %
+ (pending, check_count))
+ elif pending > check_count and not at_least:
+ self.fail('Too many pending messages: found %d, expected %d' %
+ (pending, check_count))
+ return True