summaryrefslogtreecommitdiff
path: root/test/test_failover_integration.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_failover_integration.py')
-rw-r--r--test/test_failover_integration.py240
1 files changed, 0 insertions, 240 deletions
diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py
deleted file mode 100644
index ad7dcb9..0000000
--- a/test/test_failover_integration.py
+++ /dev/null
@@ -1,240 +0,0 @@
-import logging
-import os
-import time
-
-from kafka import SimpleClient, SimpleConsumer, KeyedProducer
-from kafka.errors import (
- FailedPayloadsError, KafkaConnectionError, RequestTimedOutError,
- NotLeaderForPartitionError)
-from kafka.producer.base import Producer
-from kafka.structs import TopicPartition
-
-from test.fixtures import ZookeeperFixture, KafkaFixture
-from test.testutil import KafkaIntegrationTestCase, random_string
-
-
-log = logging.getLogger(__name__)
-
-
-class TestFailover(KafkaIntegrationTestCase):
- create_client = False
-
- def setUp(self):
- if not os.environ.get('KAFKA_VERSION'):
- self.skipTest('integration test requires KAFKA_VERSION')
-
- zk_chroot = random_string(10)
- replicas = 3
- partitions = 3
-
- # mini zookeeper, 3 kafka brokers
- self.zk = ZookeeperFixture.instance()
- kk_kwargs = {'zk_chroot': zk_chroot, 'replicas': replicas,
- 'partitions': partitions}
- self.brokers = [KafkaFixture.instance(i, self.zk, **kk_kwargs)
- for i in range(replicas)]
-
- hosts = ['%s:%d' % (b.host, b.port) for b in self.brokers]
- self.client = SimpleClient(hosts, timeout=2)
- super(TestFailover, self).setUp()
-
- def tearDown(self):
- super(TestFailover, self).tearDown()
- if not os.environ.get('KAFKA_VERSION'):
- return
-
- self.client.close()
- for broker in self.brokers:
- broker.close()
- self.zk.close()
-
- def test_switch_leader(self):
- topic = self.topic
- partition = 0
-
- # Testing the base Producer class here so that we can easily send
- # messages to a specific partition, kill the leader for that partition
- # and check that after another broker takes leadership the producer
- # is able to resume sending messages
-
- # require that the server commit messages to all in-sync replicas
- # so that failover doesn't lose any messages on server-side
- # and we can assert that server-side message count equals client-side
- producer = Producer(self.client, async_send=False,
- req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT)
-
- # Send 100 random messages to a specific partition
- self._send_random_messages(producer, topic, partition, 100)
-
- # kill leader for partition
- self._kill_leader(topic, partition)
-
- # expect failure, but don't wait more than 60 secs to recover
- recovered = False
- started = time.time()
- timeout = 60
- while not recovered and (time.time() - started) < timeout:
- try:
- log.debug("attempting to send 'success' message after leader killed")
- producer.send_messages(topic, partition, b'success')
- log.debug("success!")
- recovered = True
- except (FailedPayloadsError, KafkaConnectionError, RequestTimedOutError,
- NotLeaderForPartitionError):
- log.debug("caught exception sending message -- will retry")
- continue
-
- # Verify we successfully sent the message
- self.assertTrue(recovered)
-
- # send some more messages to new leader
- self._send_random_messages(producer, topic, partition, 100)
-
- # count number of messages
- # Should be equal to 100 before + 1 recovery + 100 after
- # at_least=True because exactly once delivery isn't really a thing
- self.assert_message_count(topic, 201, partitions=(partition,),
- at_least=True)
-
- def test_switch_leader_async(self):
- topic = self.topic
- partition = 0
-
- # Test the base class Producer -- send_messages to a specific partition
- producer = Producer(self.client, async_send=True,
- batch_send_every_n=15,
- batch_send_every_t=3,
- req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT,
- async_log_messages_on_error=False)
-
- # Send 10 random messages
- self._send_random_messages(producer, topic, partition, 10)
- self._send_random_messages(producer, topic, partition + 1, 10)
-
- # kill leader for partition
- self._kill_leader(topic, partition)
-
- log.debug("attempting to send 'success' message after leader killed")
-
- # in async mode, this should return immediately
- producer.send_messages(topic, partition, b'success')
- producer.send_messages(topic, partition + 1, b'success')
-
- # send to new leader
- self._send_random_messages(producer, topic, partition, 10)
- self._send_random_messages(producer, topic, partition + 1, 10)
-
- # Stop the producer and wait for it to shutdown
- producer.stop()
- started = time.time()
- timeout = 60
- while (time.time() - started) < timeout:
- if not producer.thread.is_alive():
- break
- time.sleep(0.1)
- else:
- self.fail('timeout waiting for producer queue to empty')
-
- # count number of messages
- # Should be equal to 10 before + 1 recovery + 10 after
- # at_least=True because exactly once delivery isn't really a thing
- self.assert_message_count(topic, 21, partitions=(partition,),
- at_least=True)
- self.assert_message_count(topic, 21, partitions=(partition + 1,),
- at_least=True)
-
- def test_switch_leader_keyed_producer(self):
- topic = self.topic
-
- producer = KeyedProducer(self.client, async_send=False)
-
- # Send 10 random messages
- for _ in range(10):
- key = random_string(3).encode('utf-8')
- msg = random_string(10).encode('utf-8')
- producer.send_messages(topic, key, msg)
-
- # kill leader for partition 0
- self._kill_leader(topic, 0)
-
- recovered = False
- started = time.time()
- timeout = 60
- while not recovered and (time.time() - started) < timeout:
- try:
- key = random_string(3).encode('utf-8')
- msg = random_string(10).encode('utf-8')
- producer.send_messages(topic, key, msg)
- if producer.partitioners[topic].partition(key) == 0:
- recovered = True
- except (FailedPayloadsError, KafkaConnectionError, RequestTimedOutError,
- NotLeaderForPartitionError):
- log.debug("caught exception sending message -- will retry")
- continue
-
- # Verify we successfully sent the message
- self.assertTrue(recovered)
-
- # send some more messages just to make sure no more exceptions
- for _ in range(10):
- key = random_string(3).encode('utf-8')
- msg = random_string(10).encode('utf-8')
- producer.send_messages(topic, key, msg)
-
- def test_switch_leader_simple_consumer(self):
- producer = Producer(self.client, async_send=False)
- consumer = SimpleConsumer(self.client, None, self.topic, partitions=None, auto_commit=False, iter_timeout=10)
- self._send_random_messages(producer, self.topic, 0, 2)
- consumer.get_messages()
- self._kill_leader(self.topic, 0)
- consumer.get_messages()
-
- def _send_random_messages(self, producer, topic, partition, n):
- for j in range(n):
- msg = 'msg {0}: {1}'.format(j, random_string(10))
- log.debug('_send_random_message %s to %s:%d', msg, topic, partition)
- while True:
- try:
- producer.send_messages(topic, partition, msg.encode('utf-8'))
- except Exception:
- log.exception('failure in _send_random_messages - retrying')
- continue
- else:
- break
-
- def _kill_leader(self, topic, partition):
- leader = self.client.topics_to_brokers[TopicPartition(topic, partition)]
- broker = self.brokers[leader.nodeId]
- broker.close()
- return broker
-
- def assert_message_count(self, topic, check_count, timeout=10,
- partitions=None, at_least=False):
- hosts = ','.join(['%s:%d' % (broker.host, broker.port)
- for broker in self.brokers])
-
- client = SimpleClient(hosts, timeout=2)
- consumer = SimpleConsumer(client, None, topic,
- partitions=partitions,
- auto_commit=False,
- iter_timeout=timeout)
-
- started_at = time.time()
- pending = -1
- while pending < check_count and (time.time() - started_at < timeout):
- try:
- pending = consumer.pending(partitions)
- except FailedPayloadsError:
- pass
- time.sleep(0.5)
-
- consumer.stop()
- client.close()
-
- if pending < check_count:
- self.fail('Too few pending messages: found %d, expected %d' %
- (pending, check_count))
- elif pending > check_count and not at_least:
- self.fail('Too many pending messages: found %d, expected %d' %
- (pending, check_count))
- return True