summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorWouter Bolsterlee <uws@xs4all.nl>2013-05-20 21:24:49 +0200
committerWouter Bolsterlee <uws@xs4all.nl>2013-05-20 21:24:49 +0200
commit87b9f5aa3f9f80dfa3d02bd186e127f1f622a96c (patch)
treea86707c31d49568c45ae54161b9e2ee487b7c63b /tests
parent08bdce56f99e40fbc7f9561b3d7665d1246dd66f (diff)
downloadhappybase-87b9f5aa3f9f80dfa3d02bd186e127f1f622a96c.tar.gz
Add tests for the connection pool
Diffstat (limited to 'tests')
-rw-r--r--tests/test_api.py119
1 files changed, 104 insertions, 15 deletions
diff --git a/tests/test_api.py b/tests/test_api.py
index b1bc941..9e77d52 100644
--- a/tests/test_api.py
+++ b/tests/test_api.py
@@ -1,9 +1,11 @@
"""
-HappyBase API tests.
+HappyBase tests.
"""
-import os
import collections
+import os
+import random
+import threading
from nose.tools import (
assert_dict_equal,
@@ -17,7 +19,7 @@ from nose.tools import (
assert_true,
)
-import happybase
+from happybase import Connection, ConnectionPool, NoConnectionsAvailable
HAPPYBASE_HOST = os.environ.get('HAPPYBASE_HOST')
HAPPYBASE_PORT = os.environ.get('HAPPYBASE_PORT')
@@ -28,18 +30,22 @@ KEEP_TABLE = ('HAPPYBASE_NO_CLEANUP' in os.environ)
TABLE_PREFIX = 'happybase_tests_tmp'
TEST_TABLE_NAME = 'test1'
+connection_kwargs = dict(
+ host=HAPPYBASE_HOST,
+ port=HAPPYBASE_PORT,
+ table_prefix=TABLE_PREFIX,
+ compat=HAPPYBASE_COMPAT,
+ transport=HAPPYBASE_TRANSPORT,
+)
+
+
+# Yuck, globals
connection = table = None
def setup_module():
global connection, table
- connection = happybase.Connection(
- host=HAPPYBASE_HOST,
- port=HAPPYBASE_PORT,
- table_prefix=TABLE_PREFIX,
- compat=HAPPYBASE_COMPAT,
- transport=HAPPYBASE_TRANSPORT,
- )
+ connection = Connection(**connection_kwargs)
assert_is_not_none(connection)
@@ -62,11 +68,11 @@ def teardown_module():
def test_connection_compat():
with assert_raises(ValueError):
- happybase.Connection(compat='0.1.invalid.version')
+ Connection(compat='0.1.invalid.version')
def test_timeout_arg():
- happybase.Connection(
+ Connection(
timeout=5000,
autoconnect=False)
@@ -91,14 +97,14 @@ def test_prefix():
assert_equal(connection.table('foobar').name, TABLE_PREFIX + '_foobar')
assert_equal(connection.table('foobar', use_prefix=False).name, 'foobar')
- c = happybase.Connection(autoconnect=False)
+ c = Connection(autoconnect=False)
assert_equal('foo', c._table_name('foo'))
with assert_raises(TypeError):
- happybase.Connection(autoconnect=False, table_prefix=123)
+ Connection(autoconnect=False, table_prefix=123)
with assert_raises(TypeError):
- happybase.Connection(autoconnect=False, table_prefix_separator=2.1)
+ Connection(autoconnect=False, table_prefix_separator=2.1)
def test_stringify():
@@ -436,3 +442,86 @@ def test_delete():
table.delete(row_key)
assert_dict_equal({}, table.row(row_key))
+
+
+def test_connection_pool_construction():
+ with assert_raises(TypeError):
+ ConnectionPool(size='abc')
+
+ with assert_raises(ValueError):
+ ConnectionPool(size=0)
+
+
+def test_connection_pool():
+
+ def run():
+ name = threading.current_thread().name
+ print "Thread %s starting" % name
+
+ def inner_function():
+ # Nested connection requests must return the same connection
+ with pool.connection() as another_connection:
+ assert connection is another_connection
+
+ for i in xrange(100):
+ with pool.connection() as connection:
+ connection.tables()
+
+ # Fake an exception once in a while
+ if random.random() < .001:
+ connection._tainted = True
+
+ inner_function()
+
+ print "Thread %s done" % name
+
+ N_THREADS = 50
+
+ pool = ConnectionPool(size=3, **connection_kwargs)
+ threads = [threading.Thread(target=run) for i in xrange(N_THREADS)]
+
+ for t in threads:
+ t.start()
+
+ while threads:
+ for t in threads:
+ t.join(timeout=.1)
+
+ # filter out finished threads
+ threads = [t for t in threads if t.is_alive()]
+ print "%d threads still alive" % len(threads)
+
+
+def test_pool_exhaustion():
+ pool = ConnectionPool(size=1, **connection_kwargs)
+
+ def run():
+ with assert_raises(NoConnectionsAvailable):
+ with pool.connection(timeout=.1) as connection:
+ connection.tables()
+
+ with pool.connection():
+ # At this point the only connection is assigned to this thread,
+ # so another thread cannot obtain a connection at this point.
+
+ t = threading.Thread(target=run)
+ t.start()
+ t.join()
+
+
+if __name__ == '__main__':
+ import sys
+
+ # Dump stacktraces using 'kill -USR1', useful for debugging hanging
+ # programs and multi threading issues.
+ try:
+ import faulthandler
+ except ImportError:
+ pass
+ else:
+ import signal
+ faulthandler.register(signal.SIGUSR1)
+
+ method_name = 'test_%s' % sys.argv[1]
+ method = globals()[method_name]
+ method()