diff options
author | Wouter Bolsterlee <uws@xs4all.nl> | 2013-05-20 21:24:49 +0200 |
---|---|---|
committer | Wouter Bolsterlee <uws@xs4all.nl> | 2013-05-20 21:24:49 +0200 |
commit | 87b9f5aa3f9f80dfa3d02bd186e127f1f622a96c (patch) | |
tree | a86707c31d49568c45ae54161b9e2ee487b7c63b /tests | |
parent | 08bdce56f99e40fbc7f9561b3d7665d1246dd66f (diff) | |
download | happybase-87b9f5aa3f9f80dfa3d02bd186e127f1f622a96c.tar.gz |
Add tests for the connection pool
Diffstat (limited to 'tests')
-rw-r--r-- | tests/test_api.py | 119 |
1 files changed, 104 insertions, 15 deletions
diff --git a/tests/test_api.py b/tests/test_api.py index b1bc941..9e77d52 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -1,9 +1,11 @@ """ -HappyBase API tests. +HappyBase tests. """ -import os import collections +import os +import random +import threading from nose.tools import ( assert_dict_equal, @@ -17,7 +19,7 @@ from nose.tools import ( assert_true, ) -import happybase +from happybase import Connection, ConnectionPool, NoConnectionsAvailable HAPPYBASE_HOST = os.environ.get('HAPPYBASE_HOST') HAPPYBASE_PORT = os.environ.get('HAPPYBASE_PORT') @@ -28,18 +30,22 @@ KEEP_TABLE = ('HAPPYBASE_NO_CLEANUP' in os.environ) TABLE_PREFIX = 'happybase_tests_tmp' TEST_TABLE_NAME = 'test1' +connection_kwargs = dict( + host=HAPPYBASE_HOST, + port=HAPPYBASE_PORT, + table_prefix=TABLE_PREFIX, + compat=HAPPYBASE_COMPAT, + transport=HAPPYBASE_TRANSPORT, +) + + +# Yuck, globals connection = table = None def setup_module(): global connection, table - connection = happybase.Connection( - host=HAPPYBASE_HOST, - port=HAPPYBASE_PORT, - table_prefix=TABLE_PREFIX, - compat=HAPPYBASE_COMPAT, - transport=HAPPYBASE_TRANSPORT, - ) + connection = Connection(**connection_kwargs) assert_is_not_none(connection) @@ -62,11 +68,11 @@ def teardown_module(): def test_connection_compat(): with assert_raises(ValueError): - happybase.Connection(compat='0.1.invalid.version') + Connection(compat='0.1.invalid.version') def test_timeout_arg(): - happybase.Connection( + Connection( timeout=5000, autoconnect=False) @@ -91,14 +97,14 @@ def test_prefix(): assert_equal(connection.table('foobar').name, TABLE_PREFIX + '_foobar') assert_equal(connection.table('foobar', use_prefix=False).name, 'foobar') - c = happybase.Connection(autoconnect=False) + c = Connection(autoconnect=False) assert_equal('foo', c._table_name('foo')) with assert_raises(TypeError): - happybase.Connection(autoconnect=False, table_prefix=123) + Connection(autoconnect=False, table_prefix=123) with assert_raises(TypeError): - happybase.Connection(autoconnect=False, table_prefix_separator=2.1) + Connection(autoconnect=False, table_prefix_separator=2.1) def test_stringify(): @@ -436,3 +442,86 @@ def test_delete(): table.delete(row_key) assert_dict_equal({}, table.row(row_key)) + + +def test_connection_pool_construction(): + with assert_raises(TypeError): + ConnectionPool(size='abc') + + with assert_raises(ValueError): + ConnectionPool(size=0) + + +def test_connection_pool(): + + def run(): + name = threading.current_thread().name + print "Thread %s starting" % name + + def inner_function(): + # Nested connection requests must return the same connection + with pool.connection() as another_connection: + assert connection is another_connection + + for i in xrange(100): + with pool.connection() as connection: + connection.tables() + + # Fake an exception once in a while + if random.random() < .001: + connection._tainted = True + + inner_function() + + print "Thread %s done" % name + + N_THREADS = 50 + + pool = ConnectionPool(size=3, **connection_kwargs) + threads = [threading.Thread(target=run) for i in xrange(N_THREADS)] + + for t in threads: + t.start() + + while threads: + for t in threads: + t.join(timeout=.1) + + # filter out finished threads + threads = [t for t in threads if t.is_alive()] + print "%d threads still alive" % len(threads) + + +def test_pool_exhaustion(): + pool = ConnectionPool(size=1, **connection_kwargs) + + def run(): + with assert_raises(NoConnectionsAvailable): + with pool.connection(timeout=.1) as connection: + connection.tables() + + with pool.connection(): + # At this point the only connection is assigned to this thread, + # so another thread cannot obtain a connection at this point. + + t = threading.Thread(target=run) + t.start() + t.join() + + +if __name__ == '__main__': + import sys + + # Dump stacktraces using 'kill -USR1', useful for debugging hanging + # programs and multi threading issues. + try: + import faulthandler + except ImportError: + pass + else: + import signal + faulthandler.register(signal.SIGUSR1) + + method_name = 'test_%s' % sys.argv[1] + method = globals()[method_name] + method() |