summaryrefslogtreecommitdiff
path: root/test/test_conn.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2014-08-17 20:05:13 -0700
committerDana Powers <dana.powers@rd.io>2014-08-19 21:27:43 -0700
commitdac157a6fb2c2607454a276f4cd6ca0911ce6d1b (patch)
tree2faac278364700b03175af32ed9f3db2b90ba7f7 /test/test_conn.py
parent19499c3de62aca699c7abc3f5639b43c96f130c6 (diff)
downloadkafka-python-dac157a6fb2c2607454a276f4cd6ca0911ce6d1b.tar.gz
Improve KafkaConnection testing: mock socket.create_connection; add tests for __init__ and send
Diffstat (limited to 'test/test_conn.py')
-rw-r--r--test/test_conn.py103
1 files changed, 95 insertions, 8 deletions
diff --git a/test/test_conn.py b/test/test_conn.py
index 4ab6d4f..bcc6500 100644
--- a/test/test_conn.py
+++ b/test/test_conn.py
@@ -1,13 +1,28 @@
import os
import random
+import socket
import struct
+
+import mock
import unittest2
-import kafka.conn
+
+from kafka.common import *
+from kafka.conn import *
class ConnTest(unittest2.TestCase):
+ def setUp(self):
+ # Mocking socket.create_connection will cause _sock to always be a
+ # MagicMock()
+ patcher = mock.patch('socket.create_connection', spec=True)
+ self.MockCreateConn = patcher.start()
+
+ # Also mock socket.sendall() to appear successful
+ self.MockCreateConn().sendall.return_value = None
+ self.addCleanup(patcher.stop)
+
def test_collect_hosts__happy_path(self):
hosts = "localhost:1234,localhost"
- results = kafka.conn.collect_hosts(hosts)
+ results = collect_hosts(hosts)
self.assertEqual(set(results), set([
('localhost', 1234),
@@ -20,7 +35,7 @@ class ConnTest(unittest2.TestCase):
'localhost',
]
- results = kafka.conn.collect_hosts(hosts)
+ results = collect_hosts(hosts)
self.assertEqual(set(results), set([
('localhost', 1234),
@@ -29,20 +44,92 @@ class ConnTest(unittest2.TestCase):
def test_collect_hosts__with_spaces(self):
hosts = "localhost:1234, localhost"
- results = kafka.conn.collect_hosts(hosts)
+ results = collect_hosts(hosts)
self.assertEqual(set(results), set([
('localhost', 1234),
('localhost', 9092),
]))
- @unittest2.skip("Not Implemented")
def test_send(self):
- pass
+ fake_config = {
+ 'host': 'localhost',
+ 'port': 9090,
+ 'request_id': 0,
+ 'payload': 'test data'
+ }
+
+ def mock_reinit(obj):
+ obj._sock = mock.MagicMock()
+ obj._sock.sendall.return_value = None
+ obj._dirty = False
+
+ with mock.patch.object(KafkaConnection, 'reinit', new=mock_reinit):
+ conn = KafkaConnection(fake_config['host'], fake_config['port'])
+ conn.send(fake_config['request_id'], fake_config['payload'])
+ conn._sock.sendall.assert_called_with(fake_config['payload'])
+
+ def test_init_creates_socket_connection(self):
+ fake_config = {
+ 'host': 'localhost',
+ 'port': 9090,
+ }
+
+ assert socket.create_connection is self.MockCreateConn
+ conn = KafkaConnection(fake_config['host'], fake_config['port'])
+ socket.create_connection.assert_called_with((fake_config['host'], fake_config['port']), DEFAULT_SOCKET_TIMEOUT_SECONDS)
+
+ def test_init_failure_raises_connection_error(self):
+ fake_config = {
+ 'host': 'localhost',
+ 'port': 9090,
+ }
+
+ def raise_error(*args):
+ raise socket.error
+
+ with mock.patch.object(socket, 'create_connection', new=raise_error):
+ with self.assertRaises(ConnectionError):
+ KafkaConnection(fake_config['host'], fake_config['port'])
- @unittest2.skip("Not Implemented")
def test_send__reconnects_on_dirty_conn(self):
- pass
+ fake_config = {
+ 'host': 'localhost',
+ 'port': 9090,
+ 'request_id': 0,
+ 'payload': 'test data'
+ }
+
+ # Get a connection (with socket mocked)
+ assert socket.create_connection is self.MockCreateConn
+ conn = KafkaConnection(fake_config['host'], fake_config['port'])
+
+ # Dirty it
+
+ try:
+ conn._raise_connection_error()
+ except ConnectionError:
+ pass
+
+ # Reset the socket call counts
+ socket.create_connection.reset_mock()
+ self.assertEqual(socket.create_connection.call_count, 0)
+
+ # Now test that sending attempts to reconnect
+ conn.send(fake_config['request_id'], fake_config['payload'])
+ self.assertEqual(socket.create_connection.call_count, 1)
+
+ # A second way to dirty it...
+ conn.close()
+
+ # Reset the socket call counts
+ socket.create_connection.reset_mock()
+ self.assertEqual(socket.create_connection.call_count, 0)
+
+ # Now test that sending attempts to reconnect
+ conn.send(fake_config['request_id'], fake_config['payload'])
+ self.assertEqual(socket.create_connection.call_count, 1)
+
@unittest2.skip("Not Implemented")
def test_send__failure_sets_dirty_connection(self):