summaryrefslogtreecommitdiff
path: root/test/test_failover_integration.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-06-04 13:33:07 -0700
committerDana Powers <dana.powers@rd.io>2015-06-06 16:47:26 -0700
commit9ac0f057b621c8706e8790b3c10295ef848121c3 (patch)
tree27bedfac212136502e548953ede6c89a818d020f /test/test_failover_integration.py
parente16541e3cbfb9501099cf02dc237237e7519d637 (diff)
downloadkafka-python-9ac0f057b621c8706e8790b3c10295ef848121c3.tar.gz
Reenable test_switch_leader_async
Diffstat (limited to 'test/test_failover_integration.py')
-rw-r--r--test/test_failover_integration.py20
1 files changed, 13 insertions, 7 deletions
diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py
index 11e255d..c23ab14 100644
--- a/test/test_failover_integration.py
+++ b/test/test_failover_integration.py
@@ -93,14 +93,14 @@ class TestFailover(KafkaIntegrationTestCase):
self.assert_message_count(topic, 201, partitions=(partition,))
- #@kafka_versions("all")
- @unittest.skip("async producer does not support reliable failover yet")
+ @kafka_versions("all")
def test_switch_leader_async(self):
topic = self.topic
partition = 0
# Test the base class Producer -- send_messages to a specific partition
- producer = Producer(self.client, async=True)
+ producer = Producer(self.client, async=True,
+ req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT)
# Send 10 random messages
self._send_random_messages(producer, topic, partition, 10)
@@ -111,15 +111,21 @@ class TestFailover(KafkaIntegrationTestCase):
logging.debug("attempting to send 'success' message after leader killed")
# in async mode, this should return immediately
- producer.send_messages(topic, partition, 'success')
+ producer.send_messages(topic, partition, b'success')
# send to new leader
self._send_random_messages(producer, topic, partition, 10)
- # wait until producer queue is empty
- while not producer.queue.empty():
- time.sleep(0.1)
+ # Stop the producer and wait for it to shutdown
producer.stop()
+ started = time.time()
+ timeout = 60
+ while (time.time() - started) < timeout:
+ if not producer.thread.is_alive():
+ break
+ time.sleep(0.1)
+ else:
+ self.fail('timeout waiting for producer queue to empty')
# count number of messages
# Should be equal to 10 before + 1 recovery + 10 after