summaryrefslogtreecommitdiff
path: root/test/integration.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/integration.py')
-rw-r--r--test/integration.py64
1 files changed, 48 insertions, 16 deletions
diff --git a/test/integration.py b/test/integration.py
index 2779898..fccf28f 100644
--- a/test/integration.py
+++ b/test/integration.py
@@ -1,3 +1,4 @@
+import glob
import os
import select
import shlex
@@ -19,6 +20,16 @@ def get_open_port():
sock.close()
return port
+def build_kafka_classpath():
+ baseDir = "./kafka-src"
+ jars = []
+ jars += glob.glob(os.path.join(baseDir, "project/boot/scala-2.8.0/lib/*.jar"))
+ jars += glob.glob(os.path.join(baseDir, "core/target/scala_2.8.0/*.jar"))
+ jars += glob.glob(os.path.join(baseDir, "core/lib/*.jar"))
+ jars += glob.glob(os.path.join(baseDir, "perf/target/scala_2.8.0/kafka*.jar"))
+ jars += glob.glob(os.path.join(baseDir, "core/lib_managed/scala_2.8.0/compile/*.jar"))
+ return ":".join(["."] + [os.path.abspath(jar) for jar in jars])
+
class KafkaFixture(Thread):
def __init__(self, port):
Thread.__init__(self)
@@ -26,6 +37,7 @@ class KafkaFixture(Thread):
self.capture = ""
self.shouldDie = Event()
self.tmpDir = tempfile.mkdtemp()
+ print("tmp dir: %s" % self.tmpDir)
def run(self):
# Create the log directory
@@ -33,6 +45,7 @@ class KafkaFixture(Thread):
os.mkdir(logDir)
# Create the config file
+ logConfig = "test/resources/log4j.properties"
configFile = os.path.join(self.tmpDir, 'server.properties')
f = open('test/resources/server.properties', 'r')
props = f.read()
@@ -41,7 +54,7 @@ class KafkaFixture(Thread):
f.close()
# Start Kafka
- args = shlex.split("./kafka-src/bin/kafka-server-start.sh %s" % configFile)
+ args = shlex.split("java -Xmx256M -server -Dlog4j.configuration=%s -cp %s kafka.Kafka %s" % (logConfig, build_kafka_classpath(), configFile))
proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env={"JMX_PORT":"%d" % get_open_port()})
killed = False
@@ -57,7 +70,7 @@ class KafkaFixture(Thread):
killed = True
if proc.poll() is not None:
- shutil.rmtree(self.tmpDir)
+ #shutil.rmtree(self.tmpDir)
if killed:
break
else:
@@ -71,45 +84,55 @@ class KafkaFixture(Thread):
return False
if target in self.capture:
return True
- time.sleep(1)
+ time.sleep(0.100)
class IntegrationTest(unittest.TestCase):
- def setUp(self):
+ @classmethod
+ def setUpClass(cls):
port = get_open_port()
- self.server = KafkaFixture(port)
- self.server.start()
- self.server.wait_for("Kafka server started")
- self.kafka = KafkaClient("localhost", port)
+ cls.server = KafkaFixture(port)
+ cls.server.start()
+ cls.server.wait_for("Kafka server started")
+ cls.kafka = KafkaClient("localhost", port)
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.kafka.close()
+ cls.server.shouldDie.set()
def test_produce(self):
+ # Produce a message, check that the log got created
req = ProduceRequest("test-produce", 0, [KafkaClient.create_message("testing")])
self.kafka.send_message_set(req)
self.assertTrue(self.server.wait_for("Created log for 'test-produce'-0"))
+ # Same thing, different partition
req = ProduceRequest("test-produce", 1, [KafkaClient.create_message("testing")])
self.kafka.send_message_set(req)
self.assertTrue(self.server.wait_for("Created log for 'test-produce'-1"))
def test_produce_consume(self):
+ # Send two messages and consume them
message1 = KafkaClient.create_message("testing 1")
message2 = KafkaClient.create_message("testing 2")
req = ProduceRequest("test-produce-consume", 0, [message1, message2])
self.kafka.send_message_set(req)
self.assertTrue(self.server.wait_for("Created log for 'test-produce-consume'-0"))
- time.sleep(1)
+ self.assertTrue(self.server.wait_for("Flushing log 'test-produce-consume-0'"))
req = FetchRequest("test-produce-consume", 0, 0, 1024)
(messages, req) = self.kafka.get_message_set(req)
self.assertEquals(len(messages), 2)
self.assertEquals(messages[0], message1)
self.assertEquals(messages[1], message2)
+ # Do the same, but for a different partition
message3 = KafkaClient.create_message("testing 3")
message4 = KafkaClient.create_message("testing 4")
req = ProduceRequest("test-produce-consume", 1, [message3, message4])
self.kafka.send_message_set(req)
self.assertTrue(self.server.wait_for("Created log for 'test-produce-consume'-1"))
- time.sleep(1)
+ self.assertTrue(self.server.wait_for("Flushing log 'test-produce-consume-1'"))
req = FetchRequest("test-produce-consume", 1, 0, 1024)
(messages, req) = self.kafka.get_message_set(req)
self.assertEquals(len(messages), 2)
@@ -117,20 +140,29 @@ class IntegrationTest(unittest.TestCase):
self.assertEquals(messages[1], message4)
def test_check_offset(self):
+ # Produce/consume a message, check that the next offset looks correct
message1 = KafkaClient.create_message("testing 1")
req = ProduceRequest("test-check-offset", 0, [message1])
self.kafka.send_message_set(req)
self.assertTrue(self.server.wait_for("Created log for 'test-check-offset'-0"))
- time.sleep(1)
+ self.assertTrue(self.server.wait_for("Flushing log 'test-check-offset-0'"))
req = FetchRequest("test-check-offset", 0, 0, 1024)
- (messages, req) = self.kafka.get_message_set(req)
+ (messages, nextReq) = self.kafka.get_message_set(req)
self.assertEquals(len(messages), 1)
self.assertEquals(messages[0], message1)
- assertEquals(req.offset, len(KafkaClient.encode_message(message1)))
+ self.assertEquals(nextReq.offset, len(KafkaClient.encode_message(message1)))
- def tearDown(self):
- self.kafka.close()
- self.server.shouldDie.set()
+ # Produce another message, consume with the last offset
+ message2 = KafkaClient.create_message("test 2")
+ req = ProduceRequest("test-check-offset", 0, [message2])
+ self.kafka.send_message_set(req)
+ self.assertTrue(self.server.wait_for("Flushing log 'test-check-offset-0'"))
+
+ # Verify
+ (messages, nextReq) = self.kafka.get_message_set(nextReq)
+ self.assertEquals(len(messages), 1)
+ self.assertEquals(messages[0], message2)
+ self.assertEquals(nextReq.offset, len(KafkaClient.encode_message(message1)) + len(KafkaClient.encode_message(message2)))
if __name__ == "__main__":