From 7a846cf5e35ebcad852309aebaa0bac8a7244744 Mon Sep 17 00:00:00 2001 From: Evax Software Date: Thu, 4 Oct 2012 13:34:39 -0400 Subject: Allow pip install from git Merged with modifications from https://github.com/evax/MySQLdb1/commit/a8152690101733904b16a32ff8467220ca07242c --- tests/capabilities.py | 281 ++++++++++++ tests/configdb.py | 25 ++ tests/dbapi20.py | 853 +++++++++++++++++++++++++++++++++++++ tests/default.cnf | 10 + tests/test_MySQLdb_capabilities.py | 102 +++++ tests/test_MySQLdb_dbapi20.py | 202 +++++++++ tests/test_MySQLdb_nonstandard.py | 86 ++++ tests/travis.cnf | 10 + 8 files changed, 1569 insertions(+) create mode 100644 tests/capabilities.py create mode 100644 tests/configdb.py create mode 100644 tests/dbapi20.py create mode 100644 tests/default.cnf create mode 100644 tests/test_MySQLdb_capabilities.py create mode 100644 tests/test_MySQLdb_dbapi20.py create mode 100644 tests/test_MySQLdb_nonstandard.py create mode 100644 tests/travis.cnf (limited to 'tests') diff --git a/tests/capabilities.py b/tests/capabilities.py new file mode 100644 index 0000000..076361c --- /dev/null +++ b/tests/capabilities.py @@ -0,0 +1,281 @@ +#!/usr/bin/env python -O +""" Script to test database capabilities and the DB-API interface + for functionality and memory leaks. + + Adapted from a script by M-A Lemburg. + +""" +from time import time +import array +import unittest +from configdb import connection_factory + + +class DatabaseTest(unittest.TestCase): + + db_module = None + connect_args = () + connect_kwargs = dict() + create_table_extra = '' + rows = 10 + debug = False + + def setUp(self): + import gc + db = connection_factory(**self.connect_kwargs) + self.connection = db + self.cursor = db.cursor() + # TODO: this needs to be re-evaluated for Python 3 + self.BLOBText = ''.join([chr(i) for i in range(256)] * 100); + self.BLOBUText = u''.join([unichr(i) for i in range(16384)]) + self.BLOBBinary = self.db_module.Binary(''.join([chr(i) for i in range(256)] * 16)) + + leak_test = True + + def tearDown(self): + if self.leak_test: + import gc + del self.cursor + orphans = gc.collect() + self.failIf(orphans, "%d orphaned objects found after deleting cursor" % orphans) + + del self.connection + orphans = gc.collect() + self.failIf(orphans, "%d orphaned objects found after deleting connection" % orphans) + + def table_exists(self, name): + try: + self.cursor.execute('select * from %s where 1=0' % name) + except: + return False + else: + return True + + def quote_identifier(self, ident): + return '"%s"' % ident + + def new_table_name(self): + i = id(self.cursor) + while True: + name = self.quote_identifier('tb%08x' % i) + if not self.table_exists(name): + return name + i = i + 1 + + def create_table(self, columndefs): + + """ Create a table using a list of column definitions given in + columndefs. + + generator must be a function taking arguments (row_number, + col_number) returning a suitable data object for insertion + into the table. + + """ + self.table = self.new_table_name() + self.cursor.execute('CREATE TABLE %s (%s) %s' % + (self.table, + ',\n'.join(columndefs), + self.create_table_extra)) + + def check_data_integrity(self, columndefs, generator): + # insert + self.create_table(columndefs) + insert_statement = ('INSERT INTO %s VALUES (%s)' % + (self.table, + ','.join(['%s'] * len(columndefs)))) + data = [ [ generator(i,j) for j in range(len(columndefs)) ] + for i in range(self.rows) ] + self.cursor.executemany(insert_statement, data) + self.connection.commit() + # verify + self.cursor.execute('select * from %s' % self.table) + l = self.cursor.fetchall() + self.assertEquals(len(l), self.rows) + try: + for i in range(self.rows): + for j in range(len(columndefs)): + self.assertEquals(l[i][j], generator(i,j)) + finally: + if not self.debug: + self.cursor.execute('drop table %s' % (self.table)) + + def test_transactions(self): + columndefs = ( 'col1 INT', 'col2 VARCHAR(255)') + def generator(row, col): + if col == 0: return row + else: return ('%i' % (row%10))*255 + self.create_table(columndefs) + insert_statement = ('INSERT INTO %s VALUES (%s)' % + (self.table, + ','.join(['%s'] * len(columndefs)))) + data = [ [ generator(i,j) for j in range(len(columndefs)) ] + for i in range(self.rows) ] + self.cursor.executemany(insert_statement, data) + # verify + self.connection.commit() + self.cursor.execute('select * from %s' % self.table) + l = self.cursor.fetchall() + self.assertEquals(len(l), self.rows) + for i in range(self.rows): + for j in range(len(columndefs)): + self.assertEquals(l[i][j], generator(i,j)) + delete_statement = 'delete from %s where col1=%%s' % self.table + self.cursor.execute(delete_statement, (0,)) + self.cursor.execute('select col1 from %s where col1=%s' % \ + (self.table, 0)) + l = self.cursor.fetchall() + self.assertFalse(l, "DELETE didn't work") + self.connection.rollback() + self.cursor.execute('select col1 from %s where col1=%s' % \ + (self.table, 0)) + l = self.cursor.fetchall() + self.assertTrue(len(l) == 1, "ROLLBACK didn't work") + self.cursor.execute('drop table %s' % (self.table)) + + def test_truncation(self): + columndefs = ( 'col1 INT', 'col2 VARCHAR(255)') + def generator(row, col): + if col == 0: return row + else: return ('%i' % (row%10))*((255-self.rows/2)+row) + self.create_table(columndefs) + insert_statement = ('INSERT INTO %s VALUES (%s)' % + (self.table, + ','.join(['%s'] * len(columndefs)))) + + try: + self.cursor.execute(insert_statement, (0, '0'*256)) + except self.connection.DataError: + pass + else: + self.fail("Over-long column did not generate warnings/exception with single insert") + + self.connection.rollback() + + try: + for i in range(self.rows): + data = [] + for j in range(len(columndefs)): + data.append(generator(i,j)) + self.cursor.execute(insert_statement,tuple(data)) + except self.connection.DataError: + pass + else: + self.fail("Over-long columns did not generate warnings/exception with execute()") + + self.connection.rollback() + + try: + data = [ [ generator(i,j) for j in range(len(columndefs)) ] + for i in range(self.rows) ] + self.cursor.executemany(insert_statement, data) + except self.connection.DataError: + pass + else: + self.fail("Over-long columns did not generate warnings/exception with executemany()") + + self.connection.rollback() + self.cursor.execute('drop table %s' % (self.table)) + + def test_CHAR(self): + # Character data + def generator(row,col): + return ('%i' % ((row+col) % 10)) * 255 + self.check_data_integrity( + ('col1 char(255)','col2 char(255)'), + generator) + + def test_INT(self): + # Number data + def generator(row,col): + return row*row + self.check_data_integrity( + ('col1 INT',), + generator) + + def test_DECIMAL(self): + # DECIMAL + def generator(row,col): + from decimal import Decimal + return Decimal("%d.%02d" % (row, col)) + self.check_data_integrity( + ('col1 DECIMAL(5,2)',), + generator) + + def test_DATE(self): + ticks = time() + def generator(row,col): + return self.db_module.DateFromTicks(ticks+row*86400-col*1313) + self.check_data_integrity( + ('col1 DATE',), + generator) + + def test_TIME(self): + ticks = time() + def generator(row,col): + return self.db_module.TimeFromTicks(ticks+row*86400-col*1313) + self.check_data_integrity( + ('col1 TIME',), + generator) + + def test_DATETIME(self): + ticks = time() + def generator(row,col): + return self.db_module.TimestampFromTicks(ticks+row*86400-col*1313) + self.check_data_integrity( + ('col1 DATETIME',), + generator) + + def test_TIMESTAMP(self): + ticks = time() + def generator(row,col): + return self.db_module.TimestampFromTicks(ticks+row*86400-col*1313) + self.check_data_integrity( + ('col1 TIMESTAMP',), + generator) + + def test_fractional_TIMESTAMP(self): + ticks = time() + def generator(row,col): + return self.db_module.TimestampFromTicks(ticks+row*86400-col*1313+row*0.7*col/3.0) + self.check_data_integrity( + ('col1 TIMESTAMP',), + generator) + + def test_LONG(self): + def generator(row,col): + if col == 0: + return row + else: + return self.BLOBUText # 'BLOB Text ' * 1024 + self.check_data_integrity( + ('col1 INT','col2 LONG'), + generator) + + def test_TEXT(self): + def generator(row,col): + return self.BLOBUText # 'BLOB Text ' * 1024 + self.check_data_integrity( + ('col2 TEXT',), + generator) + + def test_LONG_BYTE(self): + def generator(row,col): + if col == 0: + return row + else: + return self.BLOBBinary # 'BLOB\000Binary ' * 1024 + self.check_data_integrity( + ('col1 INT','col2 LONG BYTE'), + generator) + + def test_BLOB(self): + def generator(row,col): + if col == 0: + return row + else: + return self.BLOBBinary # 'BLOB\000Binary ' * 1024 + self.check_data_integrity( + ('col1 INT','col2 BLOB'), + generator) + diff --git a/tests/configdb.py b/tests/configdb.py new file mode 100644 index 0000000..cd6d43d --- /dev/null +++ b/tests/configdb.py @@ -0,0 +1,25 @@ +"""Configure database connection for tests.""" + +from os import environ, path + +tests_path = path.dirname(__file__) +conf_file = environ.get('TESTDB', 'default.cnf') +conf_path = path.join(tests_path, conf_file) +connect_kwargs = dict( + read_default_file = conf_path, + read_default_group = "MySQLdb-tests", +) + +def connection_kwargs(kwargs): + db_kwargs = connect_kwargs.copy() + db_kwargs.update(kwargs) + return db_kwargs + +def connection_factory(**kwargs): + import MySQLdb + db_kwargs = connection_kwargs(kwargs) + db = MySQLdb.connect(**db_kwargs) + return db + + + diff --git a/tests/dbapi20.py b/tests/dbapi20.py new file mode 100644 index 0000000..ad292ae --- /dev/null +++ b/tests/dbapi20.py @@ -0,0 +1,853 @@ +#!/usr/bin/env python +''' Python DB API 2.0 driver compliance unit test suite. + + This software is Public Domain and may be used without restrictions. + + "Now we have booze and barflies entering the discussion, plus rumours of + DBAs on drugs... and I won't tell you what flashes through my mind each + time I read the subject line with 'Anal Compliance' in it. All around + this is turning out to be a thoroughly unwholesome unit test." + + -- Ian Bicking +''' + +__rcs_id__ = '$Id$' +__version__ = '$Revision$'[11:-2] +__author__ = 'Stuart Bishop ' + +import unittest +import time + +# $Log$ +# Revision 1.1.2.1 2006/02/25 03:44:32 adustman +# Generic DB-API unit test module +# +# Revision 1.10 2003/10/09 03:14:14 zenzen +# Add test for DB API 2.0 optional extension, where database exceptions +# are exposed as attributes on the Connection object. +# +# Revision 1.9 2003/08/13 01:16:36 zenzen +# Minor tweak from Stefan Fleiter +# +# Revision 1.8 2003/04/10 00:13:25 zenzen +# Changes, as per suggestions by M.-A. Lemburg +# - Add a table prefix, to ensure namespace collisions can always be avoided +# +# Revision 1.7 2003/02/26 23:33:37 zenzen +# Break out DDL into helper functions, as per request by David Rushby +# +# Revision 1.6 2003/02/21 03:04:33 zenzen +# Stuff from Henrik Ekelund: +# added test_None +# added test_nextset & hooks +# +# Revision 1.5 2003/02/17 22:08:43 zenzen +# Implement suggestions and code from Henrik Eklund - test that cursor.arraysize +# defaults to 1 & generic cursor.callproc test added +# +# Revision 1.4 2003/02/15 00:16:33 zenzen +# Changes, as per suggestions and bug reports by M.-A. Lemburg, +# Matthew T. Kromer, Federico Di Gregorio and Daniel Dittmar +# - Class renamed +# - Now a subclass of TestCase, to avoid requiring the driver stub +# to use multiple inheritance +# - Reversed the polarity of buggy test in test_description +# - Test exception heirarchy correctly +# - self.populate is now self._populate(), so if a driver stub +# overrides self.ddl1 this change propogates +# - VARCHAR columns now have a width, which will hopefully make the +# DDL even more portible (this will be reversed if it causes more problems) +# - cursor.rowcount being checked after various execute and fetchXXX methods +# - Check for fetchall and fetchmany returning empty lists after results +# are exhausted (already checking for empty lists if select retrieved +# nothing +# - Fix bugs in test_setoutputsize_basic and test_setinputsizes +# + +class DatabaseAPI20Test(unittest.TestCase): + ''' Test a database self.driver for DB API 2.0 compatibility. + This implementation tests Gadfly, but the TestCase + is structured so that other self.drivers can subclass this + test case to ensure compiliance with the DB-API. It is + expected that this TestCase may be expanded in the future + if ambiguities or edge conditions are discovered. + + The 'Optional Extensions' are not yet being tested. + + self.drivers should subclass this test, overriding setUp, tearDown, + self.driver, connect_args and connect_kw_args. Class specification + should be as follows: + + import dbapi20 + class mytest(dbapi20.DatabaseAPI20Test): + [...] + + Don't 'import DatabaseAPI20Test from dbapi20', or you will + confuse the unit tester - just 'import dbapi20'. + ''' + + # The self.driver module. This should be the module where the 'connect' + # method is to be found + driver = None + connect_args = () # List of arguments to pass to connect + connect_kw_args = {} # Keyword arguments for connect + table_prefix = 'dbapi20test_' # If you need to specify a prefix for tables + + ddl1 = 'create table %sbooze (name varchar(20))' % table_prefix + ddl2 = 'create table %sbarflys (name varchar(20))' % table_prefix + xddl1 = 'drop table %sbooze' % table_prefix + xddl2 = 'drop table %sbarflys' % table_prefix + + lowerfunc = 'lower' # Name of stored procedure to convert string->lowercase + + # Some drivers may need to override these helpers, for example adding + # a 'commit' after the execute. + def executeDDL1(self,cursor): + cursor.execute(self.ddl1) + + def executeDDL2(self,cursor): + cursor.execute(self.ddl2) + + def setUp(self): + ''' self.drivers should override this method to perform required setup + if any is necessary, such as creating the database. + ''' + pass + + def tearDown(self): + ''' self.drivers should override this method to perform required cleanup + if any is necessary, such as deleting the test database. + The default drops the tables that may be created. + ''' + con = self._connect() + try: + cur = con.cursor() + for ddl in (self.xddl1,self.xddl2): + try: + cur.execute(ddl) + con.commit() + except self.driver.Error: + # Assume table didn't exist. Other tests will check if + # execute is busted. + pass + finally: + con.close() + + def _connect(self): + try: + return self.driver.connect( + *self.connect_args,**self.connect_kw_args + ) + except AttributeError: + self.fail("No connect method found in self.driver module") + + def test_connect(self): + con = self._connect() + con.close() + + def test_apilevel(self): + try: + # Must exist + apilevel = self.driver.apilevel + # Must equal 2.0 + self.assertEqual(apilevel,'2.0') + except AttributeError: + self.fail("Driver doesn't define apilevel") + + def test_threadsafety(self): + try: + # Must exist + threadsafety = self.driver.threadsafety + # Must be a valid value + self.assertTrue(threadsafety in (0,1,2,3)) + except AttributeError: + self.fail("Driver doesn't define threadsafety") + + def test_paramstyle(self): + try: + # Must exist + paramstyle = self.driver.paramstyle + # Must be a valid value + self.assertTrue(paramstyle in ( + 'qmark','numeric','named','format','pyformat' + )) + except AttributeError: + self.fail("Driver doesn't define paramstyle") + + def test_Exceptions(self): + # Make sure required exceptions exist, and are in the + # defined heirarchy. + self.assertTrue(issubclass(self.driver.Warning,StandardError)) + self.assertTrue(issubclass(self.driver.Error,StandardError)) + self.assertTrue( + issubclass(self.driver.InterfaceError,self.driver.Error) + ) + self.assertTrue( + issubclass(self.driver.DatabaseError,self.driver.Error) + ) + self.assertTrue( + issubclass(self.driver.OperationalError,self.driver.Error) + ) + self.assertTrue( + issubclass(self.driver.IntegrityError,self.driver.Error) + ) + self.assertTrue( + issubclass(self.driver.InternalError,self.driver.Error) + ) + self.assertTrue( + issubclass(self.driver.ProgrammingError,self.driver.Error) + ) + self.assertTrue( + issubclass(self.driver.NotSupportedError,self.driver.Error) + ) + + def test_ExceptionsAsConnectionAttributes(self): + # OPTIONAL EXTENSION + # Test for the optional DB API 2.0 extension, where the exceptions + # are exposed as attributes on the Connection object + # I figure this optional extension will be implemented by any + # driver author who is using this test suite, so it is enabled + # by default. + con = self._connect() + drv = self.driver + self.assertTrue(con.Warning is drv.Warning) + self.assertTrue(con.Error is drv.Error) + self.assertTrue(con.InterfaceError is drv.InterfaceError) + self.assertTrue(con.DatabaseError is drv.DatabaseError) + self.assertTrue(con.OperationalError is drv.OperationalError) + self.assertTrue(con.IntegrityError is drv.IntegrityError) + self.assertTrue(con.InternalError is drv.InternalError) + self.assertTrue(con.ProgrammingError is drv.ProgrammingError) + self.assertTrue(con.NotSupportedError is drv.NotSupportedError) + + + def test_commit(self): + con = self._connect() + try: + # Commit must work, even if it doesn't do anything + con.commit() + finally: + con.close() + + def test_rollback(self): + con = self._connect() + # If rollback is defined, it should either work or throw + # the documented exception + if hasattr(con,'rollback'): + try: + con.rollback() + except self.driver.NotSupportedError: + pass + + def test_cursor(self): + con = self._connect() + try: + cur = con.cursor() + finally: + con.close() + + def test_cursor_isolation(self): + con = self._connect() + try: + # Make sure cursors created from the same connection have + # the documented transaction isolation level + cur1 = con.cursor() + cur2 = con.cursor() + self.executeDDL1(cur1) + cur1.execute("insert into %sbooze values ('Victoria Bitter')" % ( + self.table_prefix + )) + cur2.execute("select name from %sbooze" % self.table_prefix) + booze = cur2.fetchall() + self.assertEqual(len(booze),1) + self.assertEqual(len(booze[0]),1) + self.assertEqual(booze[0][0],'Victoria Bitter') + finally: + con.close() + + def test_description(self): + con = self._connect() + try: + cur = con.cursor() + self.executeDDL1(cur) + self.assertEqual(cur.description,None, + 'cursor.description should be none after executing a ' + 'statement that can return no rows (such as DDL)' + ) + cur.execute('select name from %sbooze' % self.table_prefix) + self.assertEqual(len(cur.description),1, + 'cursor.description describes too many columns' + ) + self.assertEqual(len(cur.description[0]),7, + 'cursor.description[x] tuples must have 7 elements' + ) + self.assertEqual(cur.description[0][0].lower(),'name', + 'cursor.description[x][0] must return column name' + ) + self.assertEqual(cur.description[0][1],self.driver.STRING, + 'cursor.description[x][1] must return column type. Got %r' + % cur.description[0][1] + ) + + # Make sure self.description gets reset + self.executeDDL2(cur) + self.assertEqual(cur.description,None, + 'cursor.description not being set to None when executing ' + 'no-result statements (eg. DDL)' + ) + finally: + con.close() + + def test_rowcount(self): + con = self._connect() + try: + cur = con.cursor() + self.executeDDL1(cur) + self.assertEqual(cur.rowcount,-1, + 'cursor.rowcount should be -1 after executing no-result ' + 'statements' + ) + cur.execute("insert into %sbooze values ('Victoria Bitter')" % ( + self.table_prefix + )) + self.assertTrue(cur.rowcount in (-1,1), + 'cursor.rowcount should == number or rows inserted, or ' + 'set to -1 after executing an insert statement' + ) + cur.execute("select name from %sbooze" % self.table_prefix) + self.assertTrue(cur.rowcount in (-1,1), + 'cursor.rowcount should == number of rows returned, or ' + 'set to -1 after executing a select statement' + ) + self.executeDDL2(cur) + self.assertEqual(cur.rowcount,-1, + 'cursor.rowcount not being reset to -1 after executing ' + 'no-result statements' + ) + finally: + con.close() + + lower_func = 'lower' + def test_callproc(self): + con = self._connect() + try: + cur = con.cursor() + if self.lower_func and hasattr(cur,'callproc'): + r = cur.callproc(self.lower_func,('FOO',)) + self.assertEqual(len(r),1) + self.assertEqual(r[0],'FOO') + r = cur.fetchall() + self.assertEqual(len(r),1,'callproc produced no result set') + self.assertEqual(len(r[0]),1, + 'callproc produced invalid result set' + ) + self.assertEqual(r[0][0],'foo', + 'callproc produced invalid results' + ) + finally: + con.close() + + def test_close(self): + con = self._connect() + try: + cur = con.cursor() + finally: + con.close() + + # cursor.execute should raise an Error if called after connection + # closed + self.assertRaises(self.driver.Error,self.executeDDL1,cur) + + # connection.commit should raise an Error if called after connection' + # closed.' + self.assertRaises(self.driver.Error,con.commit) + + # connection.close should raise an Error if called more than once + self.assertRaises(self.driver.Error,con.close) + + def test_execute(self): + con = self._connect() + try: + cur = con.cursor() + self._paraminsert(cur) + finally: + con.close() + + def _paraminsert(self,cur): + self.executeDDL1(cur) + cur.execute("insert into %sbooze values ('Victoria Bitter')" % ( + self.table_prefix + )) + self.assertTrue(cur.rowcount in (-1,1)) + + if self.driver.paramstyle == 'qmark': + cur.execute( + 'insert into %sbooze values (?)' % self.table_prefix, + ("Cooper's",) + ) + elif self.driver.paramstyle == 'numeric': + cur.execute( + 'insert into %sbooze values (:1)' % self.table_prefix, + ("Cooper's",) + ) + elif self.driver.paramstyle == 'named': + cur.execute( + 'insert into %sbooze values (:beer)' % self.table_prefix, + {'beer':"Cooper's"} + ) + elif self.driver.paramstyle == 'format': + cur.execute( + 'insert into %sbooze values (%%s)' % self.table_prefix, + ("Cooper's",) + ) + elif self.driver.paramstyle == 'pyformat': + cur.execute( + 'insert into %sbooze values (%%(beer)s)' % self.table_prefix, + {'beer':"Cooper's"} + ) + else: + self.fail('Invalid paramstyle') + self.assertTrue(cur.rowcount in (-1,1)) + + cur.execute('select name from %sbooze' % self.table_prefix) + res = cur.fetchall() + self.assertEqual(len(res),2,'cursor.fetchall returned too few rows') + beers = [res[0][0],res[1][0]] + beers.sort() + self.assertEqual(beers[0],"Cooper's", + 'cursor.fetchall retrieved incorrect data, or data inserted ' + 'incorrectly' + ) + self.assertEqual(beers[1],"Victoria Bitter", + 'cursor.fetchall retrieved incorrect data, or data inserted ' + 'incorrectly' + ) + + def test_executemany(self): + con = self._connect() + try: + cur = con.cursor() + self.executeDDL1(cur) + largs = [ ("Cooper's",) , ("Boag's",) ] + margs = [ {'beer': "Cooper's"}, {'beer': "Boag's"} ] + if self.driver.paramstyle == 'qmark': + cur.executemany( + 'insert into %sbooze values (?)' % self.table_prefix, + largs + ) + elif self.driver.paramstyle == 'numeric': + cur.executemany( + 'insert into %sbooze values (:1)' % self.table_prefix, + largs + ) + elif self.driver.paramstyle == 'named': + cur.executemany( + 'insert into %sbooze values (:beer)' % self.table_prefix, + margs + ) + elif self.driver.paramstyle == 'format': + cur.executemany( + 'insert into %sbooze values (%%s)' % self.table_prefix, + largs + ) + elif self.driver.paramstyle == 'pyformat': + cur.executemany( + 'insert into %sbooze values (%%(beer)s)' % ( + self.table_prefix + ), + margs + ) + else: + self.fail('Unknown paramstyle') + self.assertTrue(cur.rowcount in (-1,2), + 'insert using cursor.executemany set cursor.rowcount to ' + 'incorrect value %r' % cur.rowcount + ) + cur.execute('select name from %sbooze' % self.table_prefix) + res = cur.fetchall() + self.assertEqual(len(res),2, + 'cursor.fetchall retrieved incorrect number of rows' + ) + beers = [res[0][0],res[1][0]] + beers.sort() + self.assertEqual(beers[0],"Boag's",'incorrect data retrieved') + self.assertEqual(beers[1],"Cooper's",'incorrect data retrieved') + finally: + con.close() + + def test_fetchone(self): + con = self._connect() + try: + cur = con.cursor() + + # cursor.fetchone should raise an Error if called before + # executing a select-type query + self.assertRaises(self.driver.Error,cur.fetchone) + + # cursor.fetchone should raise an Error if called after + # executing a query that cannnot return rows + self.executeDDL1(cur) + self.assertRaises(self.driver.Error,cur.fetchone) + + cur.execute('select name from %sbooze' % self.table_prefix) + self.assertEqual(cur.fetchone(),None, + 'cursor.fetchone should return None if a query retrieves ' + 'no rows' + ) + self.assertTrue(cur.rowcount in (-1,0)) + + # cursor.fetchone should raise an Error if called after + # executing a query that cannnot return rows + cur.execute("insert into %sbooze values ('Victoria Bitter')" % ( + self.table_prefix + )) + self.assertRaises(self.driver.Error,cur.fetchone) + + cur.execute('select name from %sbooze' % self.table_prefix) + r = cur.fetchone() + self.assertEqual(len(r),1, + 'cursor.fetchone should have retrieved a single row' + ) + self.assertEqual(r[0],'Victoria Bitter', + 'cursor.fetchone retrieved incorrect data' + ) + self.assertEqual(cur.fetchone(),None, + 'cursor.fetchone should return None if no more rows available' + ) + self.assertTrue(cur.rowcount in (-1,1)) + finally: + con.close() + + samples = [ + 'Carlton Cold', + 'Carlton Draft', + 'Mountain Goat', + 'Redback', + 'Victoria Bitter', + 'XXXX' + ] + + def _populate(self): + ''' Return a list of sql commands to setup the DB for the fetch + tests. + ''' + populate = [ + "insert into %sbooze values ('%s')" % (self.table_prefix,s) + for s in self.samples + ] + return populate + + def test_fetchmany(self): + con = self._connect() + try: + cur = con.cursor() + + # cursor.fetchmany should raise an Error if called without + #issuing a query + self.assertRaises(self.driver.Error,cur.fetchmany,4) + + self.executeDDL1(cur) + for sql in self._populate(): + cur.execute(sql) + + cur.execute('select name from %sbooze' % self.table_prefix) + r = cur.fetchmany() + self.assertEqual(len(r),1, + 'cursor.fetchmany retrieved incorrect number of rows, ' + 'default of arraysize is one.' + ) + cur.arraysize=10 + r = cur.fetchmany(3) # Should get 3 rows + self.assertEqual(len(r),3, + 'cursor.fetchmany retrieved incorrect number of rows' + ) + r = cur.fetchmany(4) # Should get 2 more + self.assertEqual(len(r),2, + 'cursor.fetchmany retrieved incorrect number of rows' + ) + r = cur.fetchmany(4) # Should be an empty sequence + self.assertEqual(len(r),0, + 'cursor.fetchmany should return an empty sequence after ' + 'results are exhausted' + ) + self.assertTrue(cur.rowcount in (-1,6)) + + # Same as above, using cursor.arraysize + cur.arraysize=4 + cur.execute('select name from %sbooze' % self.table_prefix) + r = cur.fetchmany() # Should get 4 rows + self.assertEqual(len(r),4, + 'cursor.arraysize not being honoured by fetchmany' + ) + r = cur.fetchmany() # Should get 2 more + self.assertEqual(len(r),2) + r = cur.fetchmany() # Should be an empty sequence + self.assertEqual(len(r),0) + self.assertTrue(cur.rowcount in (-1,6)) + + cur.arraysize=6 + cur.execute('select name from %sbooze' % self.table_prefix) + rows = cur.fetchmany() # Should get all rows + self.assertTrue(cur.rowcount in (-1,6)) + self.assertEqual(len(rows),6) + self.assertEqual(len(rows),6) + rows = [r[0] for r in rows] + rows.sort() + + # Make sure we get the right data back out + for i in range(0,6): + self.assertEqual(rows[i],self.samples[i], + 'incorrect data retrieved by cursor.fetchmany' + ) + + rows = cur.fetchmany() # Should return an empty list + self.assertEqual(len(rows),0, + 'cursor.fetchmany should return an empty sequence if ' + 'called after the whole result set has been fetched' + ) + self.assertTrue(cur.rowcount in (-1,6)) + + self.executeDDL2(cur) + cur.execute('select name from %sbarflys' % self.table_prefix) + r = cur.fetchmany() # Should get empty sequence + self.assertEqual(len(r),0, + 'cursor.fetchmany should return an empty sequence if ' + 'query retrieved no rows' + ) + self.assertTrue(cur.rowcount in (-1,0)) + + finally: + con.close() + + def test_fetchall(self): + con = self._connect() + try: + cur = con.cursor() + # cursor.fetchall should raise an Error if called + # without executing a query that may return rows (such + # as a select) + self.assertRaises(self.driver.Error, cur.fetchall) + + self.executeDDL1(cur) + for sql in self._populate(): + cur.execute(sql) + + # cursor.fetchall should raise an Error if called + # after executing a a statement that cannot return rows + self.assertRaises(self.driver.Error,cur.fetchall) + + cur.execute('select name from %sbooze' % self.table_prefix) + rows = cur.fetchall() + self.assertTrue(cur.rowcount in (-1,len(self.samples))) + self.assertEqual(len(rows),len(self.samples), + 'cursor.fetchall did not retrieve all rows' + ) + rows = [r[0] for r in rows] + rows.sort() + for i in range(0,len(self.samples)): + self.assertEqual(rows[i],self.samples[i], + 'cursor.fetchall retrieved incorrect rows' + ) + rows = cur.fetchall() + self.assertEqual( + len(rows),0, + 'cursor.fetchall should return an empty list if called ' + 'after the whole result set has been fetched' + ) + self.assertTrue(cur.rowcount in (-1,len(self.samples))) + + self.executeDDL2(cur) + cur.execute('select name from %sbarflys' % self.table_prefix) + rows = cur.fetchall() + self.assertTrue(cur.rowcount in (-1,0)) + self.assertEqual(len(rows),0, + 'cursor.fetchall should return an empty list if ' + 'a select query returns no rows' + ) + + finally: + con.close() + + def test_mixedfetch(self): + con = self._connect() + try: + cur = con.cursor() + self.executeDDL1(cur) + for sql in self._populate(): + cur.execute(sql) + + cur.execute('select name from %sbooze' % self.table_prefix) + rows1 = cur.fetchone() + rows23 = cur.fetchmany(2) + rows4 = cur.fetchone() + rows56 = cur.fetchall() + self.assertTrue(cur.rowcount in (-1,6)) + self.assertEqual(len(rows23),2, + 'fetchmany returned incorrect number of rows' + ) + self.assertEqual(len(rows56),2, + 'fetchall returned incorrect number of rows' + ) + + rows = [rows1[0]] + rows.extend([rows23[0][0],rows23[1][0]]) + rows.append(rows4[0]) + rows.extend([rows56[0][0],rows56[1][0]]) + rows.sort() + for i in range(0,len(self.samples)): + self.assertEqual(rows[i],self.samples[i], + 'incorrect data retrieved or inserted' + ) + finally: + con.close() + + def help_nextset_setUp(self,cur): + ''' Should create a procedure called deleteme + that returns two result sets, first the + number of rows in booze then "name from booze" + ''' + raise NotImplementedError('Helper not implemented') + #sql=""" + # create procedure deleteme as + # begin + # select count(*) from booze + # select name from booze + # end + #""" + #cur.execute(sql) + + def help_nextset_tearDown(self,cur): + 'If cleaning up is needed after nextSetTest' + raise NotImplementedError('Helper not implemented') + #cur.execute("drop procedure deleteme") + + def test_nextset(self): + con = self._connect() + try: + cur = con.cursor() + if not hasattr(cur,'nextset'): + return + + try: + self.executeDDL1(cur) + sql=self._populate() + for sql in self._populate(): + cur.execute(sql) + + self.help_nextset_setUp(cur) + + cur.callproc('deleteme') + numberofrows=cur.fetchone() + assert numberofrows[0]== len(self.samples) + assert cur.nextset() + names=cur.fetchall() + assert len(names) == len(self.samples) + s=cur.nextset() + assert s == None,'No more return sets, should return None' + finally: + self.help_nextset_tearDown(cur) + + finally: + con.close() + + def test_nextset(self): + raise NotImplementedError('Drivers need to override this test') + + def test_arraysize(self): + # Not much here - rest of the tests for this are in test_fetchmany + con = self._connect() + try: + cur = con.cursor() + self.assertTrue(hasattr(cur,'arraysize'), + 'cursor.arraysize must be defined' + ) + finally: + con.close() + + def test_setinputsizes(self): + con = self._connect() + try: + cur = con.cursor() + cur.setinputsizes( (25,) ) + self._paraminsert(cur) # Make sure cursor still works + finally: + con.close() + + def test_setoutputsize_basic(self): + # Basic test is to make sure setoutputsize doesn't blow up + con = self._connect() + try: + cur = con.cursor() + cur.setoutputsize(1000) + cur.setoutputsize(2000,0) + self._paraminsert(cur) # Make sure the cursor still works + finally: + con.close() + + def test_setoutputsize(self): + # Real test for setoutputsize is driver dependant + raise NotImplementedError('Driver need to override this test') + + def test_None(self): + con = self._connect() + try: + cur = con.cursor() + self.executeDDL1(cur) + cur.execute('insert into %sbooze values (NULL)' % self.table_prefix) + cur.execute('select name from %sbooze' % self.table_prefix) + r = cur.fetchall() + self.assertEqual(len(r),1) + self.assertEqual(len(r[0]),1) + self.assertEqual(r[0][0],None,'NULL value not returned as None') + finally: + con.close() + + def test_Date(self): + d1 = self.driver.Date(2002,12,25) + d2 = self.driver.DateFromTicks(time.mktime((2002,12,25,0,0,0,0,0,0))) + # Can we assume this? API doesn't specify, but it seems implied + # self.assertEqual(str(d1),str(d2)) + + def test_Time(self): + t1 = self.driver.Time(13,45,30) + t2 = self.driver.TimeFromTicks(time.mktime((2001,1,1,13,45,30,0,0,0))) + # Can we assume this? API doesn't specify, but it seems implied + # self.assertEqual(str(t1),str(t2)) + + def test_Timestamp(self): + t1 = self.driver.Timestamp(2002,12,25,13,45,30) + t2 = self.driver.TimestampFromTicks( + time.mktime((2002,12,25,13,45,30,0,0,0)) + ) + # Can we assume this? API doesn't specify, but it seems implied + # self.assertEqual(str(t1),str(t2)) + + def test_Binary(self): + b = self.driver.Binary('Something') + b = self.driver.Binary('') + + def test_STRING(self): + self.assertTrue(hasattr(self.driver,'STRING'), + 'module.STRING must be defined' + ) + + def test_BINARY(self): + self.assertTrue(hasattr(self.driver,'BINARY'), + 'module.BINARY must be defined.' + ) + + def test_NUMBER(self): + self.assertTrue(hasattr(self.driver,'NUMBER'), + 'module.NUMBER must be defined.' + ) + + def test_DATETIME(self): + self.assertTrue(hasattr(self.driver,'DATETIME'), + 'module.DATETIME must be defined.' + ) + + def test_ROWID(self): + self.assertTrue(hasattr(self.driver,'ROWID'), + 'module.ROWID must be defined.' + ) + diff --git a/tests/default.cnf b/tests/default.cnf new file mode 100644 index 0000000..2aeda7c --- /dev/null +++ b/tests/default.cnf @@ -0,0 +1,10 @@ +# To create your own custom version of this file, read +# http://dev.mysql.com/doc/refman/5.1/en/option-files.html +# and set TESTDB in your environment to the name of the file + +[MySQLdb-tests] +host = 127.0.0.1 +user = test +database = test +#password = +default-character-set = utf8 diff --git a/tests/test_MySQLdb_capabilities.py b/tests/test_MySQLdb_capabilities.py new file mode 100644 index 0000000..60bbfad --- /dev/null +++ b/tests/test_MySQLdb_capabilities.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python +import capabilities +import unittest +import MySQLdb +import warnings + +warnings.filterwarnings('error') + +class test_MySQLdb(capabilities.DatabaseTest): + + db_module = MySQLdb + connect_args = () + connect_kwargs = dict(use_unicode=True, sql_mode="ANSI,STRICT_TRANS_TABLES,TRADITIONAL") + create_table_extra = "ENGINE=INNODB CHARACTER SET UTF8" + leak_test = False + + def quote_identifier(self, ident): + return "`%s`" % ident + + def test_TIME(self): + from datetime import timedelta + def generator(row,col): + return timedelta(0, row*8000) + self.check_data_integrity( + ('col1 TIME',), + generator) + + def test_TINYINT(self): + # Number data + def generator(row,col): + v = (row*row) % 256 + if v > 127: + v = v-256 + return v + self.check_data_integrity( + ('col1 TINYINT',), + generator) + + def test_stored_procedures(self): + db = self.connection + c = self.cursor + self.create_table(('pos INT', 'tree CHAR(20)')) + c.executemany("INSERT INTO %s (pos,tree) VALUES (%%s,%%s)" % self.table, + list(enumerate('ash birch cedar larch pine'.split()))) + db.commit() + + c.execute(""" + CREATE PROCEDURE test_sp(IN t VARCHAR(255)) + BEGIN + SELECT pos FROM %s WHERE tree = t; + END + """ % self.table) + db.commit() + + c.callproc('test_sp', ('larch',)) + rows = c.fetchall() + self.assertEquals(len(rows), 1) + self.assertEquals(rows[0][0], 3) + c.nextset() + + c.execute("DROP PROCEDURE test_sp") + c.execute('drop table %s' % (self.table)) + + def test_small_CHAR(self): + # Character data + def generator(row,col): + i = (row*col+62)%256 + if i == 62: return '' + if i == 63: return None + return chr(i) + self.check_data_integrity( + ('col1 char(1)','col2 char(1)'), + generator) + + def test_bug_2671682(self): + from MySQLdb.constants import ER + try: + self.cursor.execute("describe some_non_existent_table"); + except self.connection.ProgrammingError, msg: + self.assertTrue(msg[0] == ER.NO_SUCH_TABLE) + + def test_bug_3514287(self): + c = self.cursor + try: + c.execute("""create table bug_3541287 ( + c1 CHAR(10), + t1 TIMESTAMP)""") + c.execute("insert into bug_3541287 (c1,t1) values (%s, NOW())", + ("blah",)) + finally: + c.execute("drop table if exists bug_3541287") + + def test_ping(self): + self.connection.ping() + + +if __name__ == '__main__': + if test_MySQLdb.leak_test: + import gc + gc.enable() + gc.set_debug(gc.DEBUG_LEAK) + unittest.main() diff --git a/tests/test_MySQLdb_dbapi20.py b/tests/test_MySQLdb_dbapi20.py new file mode 100644 index 0000000..44830e0 --- /dev/null +++ b/tests/test_MySQLdb_dbapi20.py @@ -0,0 +1,202 @@ +#!/usr/bin/env python +import dbapi20 +import unittest +import MySQLdb +from configdb import connection_kwargs + +class test_MySQLdb(dbapi20.DatabaseAPI20Test): + driver = MySQLdb + connect_args = () + connect_kw_args = connection_kwargs(dict(sql_mode="ANSI,STRICT_TRANS_TABLES,TRADITIONAL")) + + def test_setoutputsize(self): pass + def test_setoutputsize_basic(self): pass + def test_nextset(self): pass + + """The tests on fetchone and fetchall and rowcount bogusly + test for an exception if the statement cannot return a + result set. MySQL always returns a result set; it's just that + some things return empty result sets.""" + + def test_fetchall(self): + con = self._connect() + try: + cur = con.cursor() + # cursor.fetchall should raise an Error if called + # without executing a query that may return rows (such + # as a select) + self.assertRaises(self.driver.Error, cur.fetchall) + + self.executeDDL1(cur) + for sql in self._populate(): + cur.execute(sql) + + # cursor.fetchall should raise an Error if called + # after executing a a statement that cannot return rows +## self.assertRaises(self.driver.Error,cur.fetchall) + + cur.execute('select name from %sbooze' % self.table_prefix) + rows = cur.fetchall() + self.assertTrue(cur.rowcount in (-1,len(self.samples))) + self.assertEqual(len(rows),len(self.samples), + 'cursor.fetchall did not retrieve all rows' + ) + rows = [r[0] for r in rows] + rows.sort() + for i in range(0,len(self.samples)): + self.assertEqual(rows[i],self.samples[i], + 'cursor.fetchall retrieved incorrect rows' + ) + rows = cur.fetchall() + self.assertEqual( + len(rows),0, + 'cursor.fetchall should return an empty list if called ' + 'after the whole result set has been fetched' + ) + self.assertTrue(cur.rowcount in (-1,len(self.samples))) + + self.executeDDL2(cur) + cur.execute('select name from %sbarflys' % self.table_prefix) + rows = cur.fetchall() + self.assertTrue(cur.rowcount in (-1,0)) + self.assertEqual(len(rows),0, + 'cursor.fetchall should return an empty list if ' + 'a select query returns no rows' + ) + + finally: + con.close() + + def test_fetchone(self): + con = self._connect() + try: + cur = con.cursor() + + # cursor.fetchone should raise an Error if called before + # executing a select-type query + self.assertRaises(self.driver.Error,cur.fetchone) + + # cursor.fetchone should raise an Error if called after + # executing a query that cannnot return rows + self.executeDDL1(cur) +## self.assertRaises(self.driver.Error,cur.fetchone) + + cur.execute('select name from %sbooze' % self.table_prefix) + self.assertEqual(cur.fetchone(),None, + 'cursor.fetchone should return None if a query retrieves ' + 'no rows' + ) + self.assertTrue(cur.rowcount in (-1,0)) + + # cursor.fetchone should raise an Error if called after + # executing a query that cannnot return rows + cur.execute("insert into %sbooze values ('Victoria Bitter')" % ( + self.table_prefix + )) +## self.assertRaises(self.driver.Error,cur.fetchone) + + cur.execute('select name from %sbooze' % self.table_prefix) + r = cur.fetchone() + self.assertEqual(len(r),1, + 'cursor.fetchone should have retrieved a single row' + ) + self.assertEqual(r[0],'Victoria Bitter', + 'cursor.fetchone retrieved incorrect data' + ) +## self.assertEqual(cur.fetchone(),None, +## 'cursor.fetchone should return None if no more rows available' +## ) + self.assertTrue(cur.rowcount in (-1,1)) + finally: + con.close() + + # Same complaint as for fetchall and fetchone + def test_rowcount(self): + con = self._connect() + try: + cur = con.cursor() + self.executeDDL1(cur) +## self.assertEqual(cur.rowcount,-1, +## 'cursor.rowcount should be -1 after executing no-result ' +## 'statements' +## ) + cur.execute("insert into %sbooze values ('Victoria Bitter')" % ( + self.table_prefix + )) +## self.assertTrue(cur.rowcount in (-1,1), +## 'cursor.rowcount should == number or rows inserted, or ' +## 'set to -1 after executing an insert statement' +## ) + cur.execute("select name from %sbooze" % self.table_prefix) + self.assertTrue(cur.rowcount in (-1,1), + 'cursor.rowcount should == number of rows returned, or ' + 'set to -1 after executing a select statement' + ) + self.executeDDL2(cur) +## self.assertEqual(cur.rowcount,-1, +## 'cursor.rowcount not being reset to -1 after executing ' +## 'no-result statements' +## ) + finally: + con.close() + + def test_callproc(self): + pass # performed in test_MySQL_capabilities + + def help_nextset_setUp(self,cur): + ''' Should create a procedure called deleteme + that returns two result sets, first the + number of rows in booze then "name from booze" + ''' + sql=""" + create procedure deleteme() + begin + select count(*) from %(tp)sbooze; + select name from %(tp)sbooze; + end + """ % dict(tp=self.table_prefix) + cur.execute(sql) + + def help_nextset_tearDown(self,cur): + 'If cleaning up is needed after nextSetTest' + cur.execute("drop procedure deleteme") + + def test_nextset(self): + from warnings import warn + con = self._connect() + try: + cur = con.cursor() + if not hasattr(cur,'nextset'): + return + + try: + self.executeDDL1(cur) + sql=self._populate() + for sql in self._populate(): + cur.execute(sql) + + self.help_nextset_setUp(cur) + + cur.callproc('deleteme') + numberofrows=cur.fetchone() + assert numberofrows[0]== len(self.samples) + assert cur.nextset() + names=cur.fetchall() + assert len(names) == len(self.samples) + s=cur.nextset() + if s: + empty = cur.fetchall() + self.assertEquals(len(empty), 0, + "non-empty result set after other result sets") + #warn("Incompatibility: MySQL returns an empty result set for the CALL itself", + # Warning) + #assert s == None,'No more return sets, should return None' + finally: + self.help_nextset_tearDown(cur) + + finally: + con.close() + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_MySQLdb_nonstandard.py b/tests/test_MySQLdb_nonstandard.py new file mode 100644 index 0000000..92fcbdc --- /dev/null +++ b/tests/test_MySQLdb_nonstandard.py @@ -0,0 +1,86 @@ +import unittest + +import _mysql +import MySQLdb +from MySQLdb.constants import FIELD_TYPE +from configdb import connection_factory + + +class TestDBAPISet(unittest.TestCase): + def test_set_equality(self): + self.assertTrue(MySQLdb.STRING == MySQLdb.STRING) + + def test_set_inequality(self): + self.assertTrue(MySQLdb.STRING != MySQLdb.NUMBER) + + def test_set_equality_membership(self): + self.assertTrue(FIELD_TYPE.VAR_STRING == MySQLdb.STRING) + + def test_set_inequality_membership(self): + self.assertTrue(FIELD_TYPE.DATE != MySQLdb.STRING) + + +class CoreModule(unittest.TestCase): + """Core _mysql module features.""" + + def test_NULL(self): + """Should have a NULL constant.""" + self.assertEqual(_mysql.NULL, 'NULL') + + def test_version(self): + """Version information sanity.""" + self.assertTrue(isinstance(_mysql.__version__, str)) + + self.assertTrue(isinstance(_mysql.version_info, tuple)) + self.assertEqual(len(_mysql.version_info), 5) + + def test_client_info(self): + self.assertTrue(isinstance(_mysql.get_client_info(), str)) + + def test_thread_safe(self): + self.assertTrue(isinstance(_mysql.thread_safe(), int)) + + +class CoreAPI(unittest.TestCase): + """Test _mysql interaction internals.""" + + def setUp(self): + self.conn = connection_factory(use_unicode=True) + + def tearDown(self): + self.conn.close() + + def test_thread_id(self): + tid = self.conn.thread_id() + self.assertTrue(isinstance(tid, int), + "thread_id didn't return an int.") + + self.assertRaises(TypeError, self.conn.thread_id, ('evil',), + "thread_id shouldn't accept arguments.") + + def test_affected_rows(self): + self.assertEquals(self.conn.affected_rows(), 0, + "Should return 0 before we do anything.") + + + #def test_debug(self): + ## FIXME Only actually tests if you lack SUPER + #self.assertRaises(MySQLdb.OperationalError, + #self.conn.dump_debug_info) + + def test_charset_name(self): + self.assertTrue(isinstance(self.conn.character_set_name(), str), + "Should return a string.") + + def test_host_info(self): + self.assertTrue(isinstance(self.conn.get_host_info(), str), + "Should return a string.") + + def test_proto_info(self): + self.assertTrue(isinstance(self.conn.get_proto_info(), int), + "Should return an int.") + + def test_server_info(self): + self.assertTrue(isinstance(self.conn.get_server_info(), str), + "Should return an str.") + diff --git a/tests/travis.cnf b/tests/travis.cnf new file mode 100644 index 0000000..e78f52d --- /dev/null +++ b/tests/travis.cnf @@ -0,0 +1,10 @@ +# To create your own custom version of this file, read +# http://dev.mysql.com/doc/refman/5.1/en/option-files.html +# and set TESTDB in your environment to the name of the file + +[MySQLdb-tests] +host = 127.0.0.1 +user = root +database = mysqldb_test +#password = +default-character-set = utf8 -- cgit v1.2.1