summaryrefslogtreecommitdiff
path: root/test/integration.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/integration.py')
-rw-r--r--test/integration.py26
1 files changed, 26 insertions, 0 deletions
diff --git a/test/integration.py b/test/integration.py
index 7680682..e11b33b 100644
--- a/test/integration.py
+++ b/test/integration.py
@@ -12,6 +12,7 @@ import time
import unittest
from kafka.client import KafkaClient, ProduceRequest, FetchRequest, OffsetRequest
+from kafka.queue import KafkaQueue
def get_open_port():
sock = socket.socket()
@@ -231,5 +232,30 @@ class IntegrationTest(unittest.TestCase):
#self.assertTrue(self.server.wait_for("Created log for 'test-10k'-1"))
#self.assertTrue(self.server.wait_for("Flushing log 'test-10k-1'"))
+ def test_queue(self):
+ # Send 1000 messages
+ q = KafkaQueue(self.kafka, "test-queue", [0,1])
+ t1 = time.time()
+ for i in range(1000):
+ q.put("test %d" % i)
+ t2 = time.time()
+
+ # Wait for the producer to fully flush
+ time.sleep(2)
+
+ # Copy all the messages into a list
+ t1 = time.time()
+ consumed = []
+ for i in range(1000):
+ consumed.append(q.get())
+ t2 = time.time()
+
+ # Verify everything is there
+ for i in range(1000):
+ self.assertTrue("test %d" % i in consumed)
+
+ # Shutdown the queue
+ q.close()
+
if __name__ == "__main__":
unittest.main()