From d0e96c715558db1b1a3717be1db3f4d325cc3c0a Mon Sep 17 00:00:00 2001 From: farcepest Date: Tue, 2 Oct 2012 12:37:41 -0400 Subject: Unify test connection configuration --- MySQLdb/tests/capabilities.py | 561 +++++++++++++++-------------- MySQLdb/tests/configdb.py | 25 ++ MySQLdb/tests/default.cnf | 10 + MySQLdb/tests/test_MySQLdb_capabilities.py | 205 ++++++----- MySQLdb/tests/test_MySQLdb_dbapi20.py | 407 +++++++++++---------- 5 files changed, 620 insertions(+), 588 deletions(-) create mode 100644 MySQLdb/tests/configdb.py create mode 100644 MySQLdb/tests/default.cnf (limited to 'MySQLdb') diff --git a/MySQLdb/tests/capabilities.py b/MySQLdb/tests/capabilities.py index 6bb7436..076361c 100644 --- a/MySQLdb/tests/capabilities.py +++ b/MySQLdb/tests/capabilities.py @@ -1,280 +1,281 @@ -#!/usr/bin/env python -O -""" Script to test database capabilities and the DB-API interface - for functionality and memory leaks. - - Adapted from a script by M-A Lemburg. - -""" -from time import time -import array -import unittest - - -class DatabaseTest(unittest.TestCase): - - db_module = None - connect_args = () - connect_kwargs = dict() - create_table_extra = '' - rows = 10 - debug = False - - def setUp(self): - import gc - db = self.db_module.connect(*self.connect_args, **self.connect_kwargs) - self.connection = db - self.cursor = db.cursor() - # TODO: this needs to be re-evaluated for Python 3 - self.BLOBText = ''.join([chr(i) for i in range(256)] * 100); - self.BLOBUText = u''.join([unichr(i) for i in range(16384)]) - self.BLOBBinary = self.db_module.Binary(''.join([chr(i) for i in range(256)] * 16)) - - leak_test = True - - def tearDown(self): - if self.leak_test: - import gc - del self.cursor - orphans = gc.collect() - self.failIf(orphans, "%d orphaned objects found after deleting cursor" % orphans) - - del self.connection - orphans = gc.collect() - self.failIf(orphans, "%d orphaned objects found after deleting connection" % orphans) - - def table_exists(self, name): - try: - self.cursor.execute('select * from %s where 1=0' % name) - except: - return False - else: - return True - - def quote_identifier(self, ident): - return '"%s"' % ident - - def new_table_name(self): - i = id(self.cursor) - while True: - name = self.quote_identifier('tb%08x' % i) - if not self.table_exists(name): - return name - i = i + 1 - - def create_table(self, columndefs): - - """ Create a table using a list of column definitions given in - columndefs. - - generator must be a function taking arguments (row_number, - col_number) returning a suitable data object for insertion - into the table. - - """ - self.table = self.new_table_name() - self.cursor.execute('CREATE TABLE %s (%s) %s' % - (self.table, - ',\n'.join(columndefs), - self.create_table_extra)) - - def check_data_integrity(self, columndefs, generator): - # insert - self.create_table(columndefs) - insert_statement = ('INSERT INTO %s VALUES (%s)' % - (self.table, - ','.join(['%s'] * len(columndefs)))) - data = [ [ generator(i,j) for j in range(len(columndefs)) ] - for i in range(self.rows) ] - self.cursor.executemany(insert_statement, data) - self.connection.commit() - # verify - self.cursor.execute('select * from %s' % self.table) - l = self.cursor.fetchall() - self.assertEquals(len(l), self.rows) - try: - for i in range(self.rows): - for j in range(len(columndefs)): - self.assertEquals(l[i][j], generator(i,j)) - finally: - if not self.debug: - self.cursor.execute('drop table %s' % (self.table)) - - def test_transactions(self): - columndefs = ( 'col1 INT', 'col2 VARCHAR(255)') - def generator(row, col): - if col == 0: return row - else: return ('%i' % (row%10))*255 - self.create_table(columndefs) - insert_statement = ('INSERT INTO %s VALUES (%s)' % - (self.table, - ','.join(['%s'] * len(columndefs)))) - data = [ [ generator(i,j) for j in range(len(columndefs)) ] - for i in range(self.rows) ] - self.cursor.executemany(insert_statement, data) - # verify - self.connection.commit() - self.cursor.execute('select * from %s' % self.table) - l = self.cursor.fetchall() - self.assertEquals(len(l), self.rows) - for i in range(self.rows): - for j in range(len(columndefs)): - self.assertEquals(l[i][j], generator(i,j)) - delete_statement = 'delete from %s where col1=%%s' % self.table - self.cursor.execute(delete_statement, (0,)) - self.cursor.execute('select col1 from %s where col1=%s' % \ - (self.table, 0)) - l = self.cursor.fetchall() - self.assertFalse(l, "DELETE didn't work") - self.connection.rollback() - self.cursor.execute('select col1 from %s where col1=%s' % \ - (self.table, 0)) - l = self.cursor.fetchall() - self.assertTrue(len(l) == 1, "ROLLBACK didn't work") - self.cursor.execute('drop table %s' % (self.table)) - - def test_truncation(self): - columndefs = ( 'col1 INT', 'col2 VARCHAR(255)') - def generator(row, col): - if col == 0: return row - else: return ('%i' % (row%10))*((255-self.rows/2)+row) - self.create_table(columndefs) - insert_statement = ('INSERT INTO %s VALUES (%s)' % - (self.table, - ','.join(['%s'] * len(columndefs)))) - - try: - self.cursor.execute(insert_statement, (0, '0'*256)) - except self.connection.DataError: - pass - else: - self.fail("Over-long column did not generate warnings/exception with single insert") - - self.connection.rollback() - - try: - for i in range(self.rows): - data = [] - for j in range(len(columndefs)): - data.append(generator(i,j)) - self.cursor.execute(insert_statement,tuple(data)) - except self.connection.DataError: - pass - else: - self.fail("Over-long columns did not generate warnings/exception with execute()") - - self.connection.rollback() - - try: - data = [ [ generator(i,j) for j in range(len(columndefs)) ] - for i in range(self.rows) ] - self.cursor.executemany(insert_statement, data) - except self.connection.DataError: - pass - else: - self.fail("Over-long columns did not generate warnings/exception with executemany()") - - self.connection.rollback() - self.cursor.execute('drop table %s' % (self.table)) - - def test_CHAR(self): - # Character data - def generator(row,col): - return ('%i' % ((row+col) % 10)) * 255 - self.check_data_integrity( - ('col1 char(255)','col2 char(255)'), - generator) - - def test_INT(self): - # Number data - def generator(row,col): - return row*row - self.check_data_integrity( - ('col1 INT',), - generator) - - def test_DECIMAL(self): - # DECIMAL - def generator(row,col): - from decimal import Decimal - return Decimal("%d.%02d" % (row, col)) - self.check_data_integrity( - ('col1 DECIMAL(5,2)',), - generator) - - def test_DATE(self): - ticks = time() - def generator(row,col): - return self.db_module.DateFromTicks(ticks+row*86400-col*1313) - self.check_data_integrity( - ('col1 DATE',), - generator) - - def test_TIME(self): - ticks = time() - def generator(row,col): - return self.db_module.TimeFromTicks(ticks+row*86400-col*1313) - self.check_data_integrity( - ('col1 TIME',), - generator) - - def test_DATETIME(self): - ticks = time() - def generator(row,col): - return self.db_module.TimestampFromTicks(ticks+row*86400-col*1313) - self.check_data_integrity( - ('col1 DATETIME',), - generator) - - def test_TIMESTAMP(self): - ticks = time() - def generator(row,col): - return self.db_module.TimestampFromTicks(ticks+row*86400-col*1313) - self.check_data_integrity( - ('col1 TIMESTAMP',), - generator) - - def test_fractional_TIMESTAMP(self): - ticks = time() - def generator(row,col): - return self.db_module.TimestampFromTicks(ticks+row*86400-col*1313+row*0.7*col/3.0) - self.check_data_integrity( - ('col1 TIMESTAMP',), - generator) - - def test_LONG(self): - def generator(row,col): - if col == 0: - return row - else: - return self.BLOBUText # 'BLOB Text ' * 1024 - self.check_data_integrity( - ('col1 INT','col2 LONG'), - generator) - - def test_TEXT(self): - def generator(row,col): - return self.BLOBUText # 'BLOB Text ' * 1024 - self.check_data_integrity( - ('col2 TEXT',), - generator) - - def test_LONG_BYTE(self): - def generator(row,col): - if col == 0: - return row - else: - return self.BLOBBinary # 'BLOB\000Binary ' * 1024 - self.check_data_integrity( - ('col1 INT','col2 LONG BYTE'), - generator) - - def test_BLOB(self): - def generator(row,col): - if col == 0: - return row - else: - return self.BLOBBinary # 'BLOB\000Binary ' * 1024 - self.check_data_integrity( - ('col1 INT','col2 BLOB'), - generator) - +#!/usr/bin/env python -O +""" Script to test database capabilities and the DB-API interface + for functionality and memory leaks. + + Adapted from a script by M-A Lemburg. + +""" +from time import time +import array +import unittest +from configdb import connection_factory + + +class DatabaseTest(unittest.TestCase): + + db_module = None + connect_args = () + connect_kwargs = dict() + create_table_extra = '' + rows = 10 + debug = False + + def setUp(self): + import gc + db = connection_factory(**self.connect_kwargs) + self.connection = db + self.cursor = db.cursor() + # TODO: this needs to be re-evaluated for Python 3 + self.BLOBText = ''.join([chr(i) for i in range(256)] * 100); + self.BLOBUText = u''.join([unichr(i) for i in range(16384)]) + self.BLOBBinary = self.db_module.Binary(''.join([chr(i) for i in range(256)] * 16)) + + leak_test = True + + def tearDown(self): + if self.leak_test: + import gc + del self.cursor + orphans = gc.collect() + self.failIf(orphans, "%d orphaned objects found after deleting cursor" % orphans) + + del self.connection + orphans = gc.collect() + self.failIf(orphans, "%d orphaned objects found after deleting connection" % orphans) + + def table_exists(self, name): + try: + self.cursor.execute('select * from %s where 1=0' % name) + except: + return False + else: + return True + + def quote_identifier(self, ident): + return '"%s"' % ident + + def new_table_name(self): + i = id(self.cursor) + while True: + name = self.quote_identifier('tb%08x' % i) + if not self.table_exists(name): + return name + i = i + 1 + + def create_table(self, columndefs): + + """ Create a table using a list of column definitions given in + columndefs. + + generator must be a function taking arguments (row_number, + col_number) returning a suitable data object for insertion + into the table. + + """ + self.table = self.new_table_name() + self.cursor.execute('CREATE TABLE %s (%s) %s' % + (self.table, + ',\n'.join(columndefs), + self.create_table_extra)) + + def check_data_integrity(self, columndefs, generator): + # insert + self.create_table(columndefs) + insert_statement = ('INSERT INTO %s VALUES (%s)' % + (self.table, + ','.join(['%s'] * len(columndefs)))) + data = [ [ generator(i,j) for j in range(len(columndefs)) ] + for i in range(self.rows) ] + self.cursor.executemany(insert_statement, data) + self.connection.commit() + # verify + self.cursor.execute('select * from %s' % self.table) + l = self.cursor.fetchall() + self.assertEquals(len(l), self.rows) + try: + for i in range(self.rows): + for j in range(len(columndefs)): + self.assertEquals(l[i][j], generator(i,j)) + finally: + if not self.debug: + self.cursor.execute('drop table %s' % (self.table)) + + def test_transactions(self): + columndefs = ( 'col1 INT', 'col2 VARCHAR(255)') + def generator(row, col): + if col == 0: return row + else: return ('%i' % (row%10))*255 + self.create_table(columndefs) + insert_statement = ('INSERT INTO %s VALUES (%s)' % + (self.table, + ','.join(['%s'] * len(columndefs)))) + data = [ [ generator(i,j) for j in range(len(columndefs)) ] + for i in range(self.rows) ] + self.cursor.executemany(insert_statement, data) + # verify + self.connection.commit() + self.cursor.execute('select * from %s' % self.table) + l = self.cursor.fetchall() + self.assertEquals(len(l), self.rows) + for i in range(self.rows): + for j in range(len(columndefs)): + self.assertEquals(l[i][j], generator(i,j)) + delete_statement = 'delete from %s where col1=%%s' % self.table + self.cursor.execute(delete_statement, (0,)) + self.cursor.execute('select col1 from %s where col1=%s' % \ + (self.table, 0)) + l = self.cursor.fetchall() + self.assertFalse(l, "DELETE didn't work") + self.connection.rollback() + self.cursor.execute('select col1 from %s where col1=%s' % \ + (self.table, 0)) + l = self.cursor.fetchall() + self.assertTrue(len(l) == 1, "ROLLBACK didn't work") + self.cursor.execute('drop table %s' % (self.table)) + + def test_truncation(self): + columndefs = ( 'col1 INT', 'col2 VARCHAR(255)') + def generator(row, col): + if col == 0: return row + else: return ('%i' % (row%10))*((255-self.rows/2)+row) + self.create_table(columndefs) + insert_statement = ('INSERT INTO %s VALUES (%s)' % + (self.table, + ','.join(['%s'] * len(columndefs)))) + + try: + self.cursor.execute(insert_statement, (0, '0'*256)) + except self.connection.DataError: + pass + else: + self.fail("Over-long column did not generate warnings/exception with single insert") + + self.connection.rollback() + + try: + for i in range(self.rows): + data = [] + for j in range(len(columndefs)): + data.append(generator(i,j)) + self.cursor.execute(insert_statement,tuple(data)) + except self.connection.DataError: + pass + else: + self.fail("Over-long columns did not generate warnings/exception with execute()") + + self.connection.rollback() + + try: + data = [ [ generator(i,j) for j in range(len(columndefs)) ] + for i in range(self.rows) ] + self.cursor.executemany(insert_statement, data) + except self.connection.DataError: + pass + else: + self.fail("Over-long columns did not generate warnings/exception with executemany()") + + self.connection.rollback() + self.cursor.execute('drop table %s' % (self.table)) + + def test_CHAR(self): + # Character data + def generator(row,col): + return ('%i' % ((row+col) % 10)) * 255 + self.check_data_integrity( + ('col1 char(255)','col2 char(255)'), + generator) + + def test_INT(self): + # Number data + def generator(row,col): + return row*row + self.check_data_integrity( + ('col1 INT',), + generator) + + def test_DECIMAL(self): + # DECIMAL + def generator(row,col): + from decimal import Decimal + return Decimal("%d.%02d" % (row, col)) + self.check_data_integrity( + ('col1 DECIMAL(5,2)',), + generator) + + def test_DATE(self): + ticks = time() + def generator(row,col): + return self.db_module.DateFromTicks(ticks+row*86400-col*1313) + self.check_data_integrity( + ('col1 DATE',), + generator) + + def test_TIME(self): + ticks = time() + def generator(row,col): + return self.db_module.TimeFromTicks(ticks+row*86400-col*1313) + self.check_data_integrity( + ('col1 TIME',), + generator) + + def test_DATETIME(self): + ticks = time() + def generator(row,col): + return self.db_module.TimestampFromTicks(ticks+row*86400-col*1313) + self.check_data_integrity( + ('col1 DATETIME',), + generator) + + def test_TIMESTAMP(self): + ticks = time() + def generator(row,col): + return self.db_module.TimestampFromTicks(ticks+row*86400-col*1313) + self.check_data_integrity( + ('col1 TIMESTAMP',), + generator) + + def test_fractional_TIMESTAMP(self): + ticks = time() + def generator(row,col): + return self.db_module.TimestampFromTicks(ticks+row*86400-col*1313+row*0.7*col/3.0) + self.check_data_integrity( + ('col1 TIMESTAMP',), + generator) + + def test_LONG(self): + def generator(row,col): + if col == 0: + return row + else: + return self.BLOBUText # 'BLOB Text ' * 1024 + self.check_data_integrity( + ('col1 INT','col2 LONG'), + generator) + + def test_TEXT(self): + def generator(row,col): + return self.BLOBUText # 'BLOB Text ' * 1024 + self.check_data_integrity( + ('col2 TEXT',), + generator) + + def test_LONG_BYTE(self): + def generator(row,col): + if col == 0: + return row + else: + return self.BLOBBinary # 'BLOB\000Binary ' * 1024 + self.check_data_integrity( + ('col1 INT','col2 LONG BYTE'), + generator) + + def test_BLOB(self): + def generator(row,col): + if col == 0: + return row + else: + return self.BLOBBinary # 'BLOB\000Binary ' * 1024 + self.check_data_integrity( + ('col1 INT','col2 BLOB'), + generator) + diff --git a/MySQLdb/tests/configdb.py b/MySQLdb/tests/configdb.py new file mode 100644 index 0000000..cd6d43d --- /dev/null +++ b/MySQLdb/tests/configdb.py @@ -0,0 +1,25 @@ +"""Configure database connection for tests.""" + +from os import environ, path + +tests_path = path.dirname(__file__) +conf_file = environ.get('TESTDB', 'default.cnf') +conf_path = path.join(tests_path, conf_file) +connect_kwargs = dict( + read_default_file = conf_path, + read_default_group = "MySQLdb-tests", +) + +def connection_kwargs(kwargs): + db_kwargs = connect_kwargs.copy() + db_kwargs.update(kwargs) + return db_kwargs + +def connection_factory(**kwargs): + import MySQLdb + db_kwargs = connection_kwargs(kwargs) + db = MySQLdb.connect(**db_kwargs) + return db + + + diff --git a/MySQLdb/tests/default.cnf b/MySQLdb/tests/default.cnf new file mode 100644 index 0000000..2aeda7c --- /dev/null +++ b/MySQLdb/tests/default.cnf @@ -0,0 +1,10 @@ +# To create your own custom version of this file, read +# http://dev.mysql.com/doc/refman/5.1/en/option-files.html +# and set TESTDB in your environment to the name of the file + +[MySQLdb-tests] +host = 127.0.0.1 +user = test +database = test +#password = +default-character-set = utf8 diff --git a/MySQLdb/tests/test_MySQLdb_capabilities.py b/MySQLdb/tests/test_MySQLdb_capabilities.py index a510468..60bbfad 100644 --- a/MySQLdb/tests/test_MySQLdb_capabilities.py +++ b/MySQLdb/tests/test_MySQLdb_capabilities.py @@ -1,103 +1,102 @@ -#!/usr/bin/env python -import capabilities -import unittest -import MySQLdb -import warnings - -warnings.filterwarnings('error') - -class test_MySQLdb(capabilities.DatabaseTest): - - db_module = MySQLdb - connect_args = () - connect_kwargs = dict(db='test', host="127.0.0.1", user="test", #read_default_file='~/.my.cnf', - charset='utf8', sql_mode="ANSI,STRICT_TRANS_TABLES,TRADITIONAL") - create_table_extra = "ENGINE=INNODB CHARACTER SET UTF8" - leak_test = False - - def quote_identifier(self, ident): - return "`%s`" % ident - - def test_TIME(self): - from datetime import timedelta - def generator(row,col): - return timedelta(0, row*8000) - self.check_data_integrity( - ('col1 TIME',), - generator) - - def test_TINYINT(self): - # Number data - def generator(row,col): - v = (row*row) % 256 - if v > 127: - v = v-256 - return v - self.check_data_integrity( - ('col1 TINYINT',), - generator) - - def test_stored_procedures(self): - db = self.connection - c = self.cursor - self.create_table(('pos INT', 'tree CHAR(20)')) - c.executemany("INSERT INTO %s (pos,tree) VALUES (%%s,%%s)" % self.table, - list(enumerate('ash birch cedar larch pine'.split()))) - db.commit() - - c.execute(""" - CREATE PROCEDURE test_sp(IN t VARCHAR(255)) - BEGIN - SELECT pos FROM %s WHERE tree = t; - END - """ % self.table) - db.commit() - - c.callproc('test_sp', ('larch',)) - rows = c.fetchall() - self.assertEquals(len(rows), 1) - self.assertEquals(rows[0][0], 3) - c.nextset() - - c.execute("DROP PROCEDURE test_sp") - c.execute('drop table %s' % (self.table)) - - def test_small_CHAR(self): - # Character data - def generator(row,col): - i = (row*col+62)%256 - if i == 62: return '' - if i == 63: return None - return chr(i) - self.check_data_integrity( - ('col1 char(1)','col2 char(1)'), - generator) - - def test_bug_2671682(self): - from MySQLdb.constants import ER - try: - self.cursor.execute("describe some_non_existent_table"); - except self.connection.ProgrammingError, msg: - self.assertTrue(msg[0] == ER.NO_SUCH_TABLE) - - def test_bug_3514287(self): - c = self.cursor - try: - c.execute("""create table bug_3541287 ( - c1 CHAR(10), - t1 TIMESTAMP)""") - c.execute("insert into bug_3541287 (c1,t1) values (%s, NOW())", - ("blah",)) - finally: - c.execute("drop table if exists bug_3541287") - - def test_ping(self): - self.connection.ping() - - -if __name__ == '__main__': - if test_MySQLdb.leak_test: - import gc - gc.enable() - gc.set_debug(gc.DEBUG_LEAK) - unittest.main() +#!/usr/bin/env python +import capabilities +import unittest +import MySQLdb +import warnings + +warnings.filterwarnings('error') + +class test_MySQLdb(capabilities.DatabaseTest): + + db_module = MySQLdb + connect_args = () + connect_kwargs = dict(use_unicode=True, sql_mode="ANSI,STRICT_TRANS_TABLES,TRADITIONAL") + create_table_extra = "ENGINE=INNODB CHARACTER SET UTF8" + leak_test = False + + def quote_identifier(self, ident): + return "`%s`" % ident + + def test_TIME(self): + from datetime import timedelta + def generator(row,col): + return timedelta(0, row*8000) + self.check_data_integrity( + ('col1 TIME',), + generator) + + def test_TINYINT(self): + # Number data + def generator(row,col): + v = (row*row) % 256 + if v > 127: + v = v-256 + return v + self.check_data_integrity( + ('col1 TINYINT',), + generator) + + def test_stored_procedures(self): + db = self.connection + c = self.cursor + self.create_table(('pos INT', 'tree CHAR(20)')) + c.executemany("INSERT INTO %s (pos,tree) VALUES (%%s,%%s)" % self.table, + list(enumerate('ash birch cedar larch pine'.split()))) + db.commit() + + c.execute(""" + CREATE PROCEDURE test_sp(IN t VARCHAR(255)) + BEGIN + SELECT pos FROM %s WHERE tree = t; + END + """ % self.table) + db.commit() + + c.callproc('test_sp', ('larch',)) + rows = c.fetchall() + self.assertEquals(len(rows), 1) + self.assertEquals(rows[0][0], 3) + c.nextset() + + c.execute("DROP PROCEDURE test_sp") + c.execute('drop table %s' % (self.table)) + + def test_small_CHAR(self): + # Character data + def generator(row,col): + i = (row*col+62)%256 + if i == 62: return '' + if i == 63: return None + return chr(i) + self.check_data_integrity( + ('col1 char(1)','col2 char(1)'), + generator) + + def test_bug_2671682(self): + from MySQLdb.constants import ER + try: + self.cursor.execute("describe some_non_existent_table"); + except self.connection.ProgrammingError, msg: + self.assertTrue(msg[0] == ER.NO_SUCH_TABLE) + + def test_bug_3514287(self): + c = self.cursor + try: + c.execute("""create table bug_3541287 ( + c1 CHAR(10), + t1 TIMESTAMP)""") + c.execute("insert into bug_3541287 (c1,t1) values (%s, NOW())", + ("blah",)) + finally: + c.execute("drop table if exists bug_3541287") + + def test_ping(self): + self.connection.ping() + + +if __name__ == '__main__': + if test_MySQLdb.leak_test: + import gc + gc.enable() + gc.set_debug(gc.DEBUG_LEAK) + unittest.main() diff --git a/MySQLdb/tests/test_MySQLdb_dbapi20.py b/MySQLdb/tests/test_MySQLdb_dbapi20.py index 4f4527a..44830e0 100644 --- a/MySQLdb/tests/test_MySQLdb_dbapi20.py +++ b/MySQLdb/tests/test_MySQLdb_dbapi20.py @@ -1,205 +1,202 @@ -#!/usr/bin/env python -import dbapi20 -import unittest -import MySQLdb - -class test_MySQLdb(dbapi20.DatabaseAPI20Test): - driver = MySQLdb - connect_args = () - connect_kw_args = dict(db='test', - host="127.0.0.1", - user="test", #read_default_file='~/.my.cnf', - charset='utf8', - sql_mode="ANSI,STRICT_TRANS_TABLES,TRADITIONAL") - - def test_setoutputsize(self): pass - def test_setoutputsize_basic(self): pass - def test_nextset(self): pass - - """The tests on fetchone and fetchall and rowcount bogusly - test for an exception if the statement cannot return a - result set. MySQL always returns a result set; it's just that - some things return empty result sets.""" - - def test_fetchall(self): - con = self._connect() - try: - cur = con.cursor() - # cursor.fetchall should raise an Error if called - # without executing a query that may return rows (such - # as a select) - self.assertRaises(self.driver.Error, cur.fetchall) - - self.executeDDL1(cur) - for sql in self._populate(): - cur.execute(sql) - - # cursor.fetchall should raise an Error if called - # after executing a a statement that cannot return rows -## self.assertRaises(self.driver.Error,cur.fetchall) - - cur.execute('select name from %sbooze' % self.table_prefix) - rows = cur.fetchall() - self.assertTrue(cur.rowcount in (-1,len(self.samples))) - self.assertEqual(len(rows),len(self.samples), - 'cursor.fetchall did not retrieve all rows' - ) - rows = [r[0] for r in rows] - rows.sort() - for i in range(0,len(self.samples)): - self.assertEqual(rows[i],self.samples[i], - 'cursor.fetchall retrieved incorrect rows' - ) - rows = cur.fetchall() - self.assertEqual( - len(rows),0, - 'cursor.fetchall should return an empty list if called ' - 'after the whole result set has been fetched' - ) - self.assertTrue(cur.rowcount in (-1,len(self.samples))) - - self.executeDDL2(cur) - cur.execute('select name from %sbarflys' % self.table_prefix) - rows = cur.fetchall() - self.assertTrue(cur.rowcount in (-1,0)) - self.assertEqual(len(rows),0, - 'cursor.fetchall should return an empty list if ' - 'a select query returns no rows' - ) - - finally: - con.close() - - def test_fetchone(self): - con = self._connect() - try: - cur = con.cursor() - - # cursor.fetchone should raise an Error if called before - # executing a select-type query - self.assertRaises(self.driver.Error,cur.fetchone) - - # cursor.fetchone should raise an Error if called after - # executing a query that cannnot return rows - self.executeDDL1(cur) -## self.assertRaises(self.driver.Error,cur.fetchone) - - cur.execute('select name from %sbooze' % self.table_prefix) - self.assertEqual(cur.fetchone(),None, - 'cursor.fetchone should return None if a query retrieves ' - 'no rows' - ) - self.assertTrue(cur.rowcount in (-1,0)) - - # cursor.fetchone should raise an Error if called after - # executing a query that cannnot return rows - cur.execute("insert into %sbooze values ('Victoria Bitter')" % ( - self.table_prefix - )) -## self.assertRaises(self.driver.Error,cur.fetchone) - - cur.execute('select name from %sbooze' % self.table_prefix) - r = cur.fetchone() - self.assertEqual(len(r),1, - 'cursor.fetchone should have retrieved a single row' - ) - self.assertEqual(r[0],'Victoria Bitter', - 'cursor.fetchone retrieved incorrect data' - ) -## self.assertEqual(cur.fetchone(),None, -## 'cursor.fetchone should return None if no more rows available' -## ) - self.assertTrue(cur.rowcount in (-1,1)) - finally: - con.close() - - # Same complaint as for fetchall and fetchone - def test_rowcount(self): - con = self._connect() - try: - cur = con.cursor() - self.executeDDL1(cur) -## self.assertEqual(cur.rowcount,-1, -## 'cursor.rowcount should be -1 after executing no-result ' -## 'statements' -## ) - cur.execute("insert into %sbooze values ('Victoria Bitter')" % ( - self.table_prefix - )) -## self.assertTrue(cur.rowcount in (-1,1), -## 'cursor.rowcount should == number or rows inserted, or ' -## 'set to -1 after executing an insert statement' -## ) - cur.execute("select name from %sbooze" % self.table_prefix) - self.assertTrue(cur.rowcount in (-1,1), - 'cursor.rowcount should == number of rows returned, or ' - 'set to -1 after executing a select statement' - ) - self.executeDDL2(cur) -## self.assertEqual(cur.rowcount,-1, -## 'cursor.rowcount not being reset to -1 after executing ' -## 'no-result statements' -## ) - finally: - con.close() - - def test_callproc(self): - pass # performed in test_MySQL_capabilities - - def help_nextset_setUp(self,cur): - ''' Should create a procedure called deleteme - that returns two result sets, first the - number of rows in booze then "name from booze" - ''' - sql=""" - create procedure deleteme() - begin - select count(*) from %(tp)sbooze; - select name from %(tp)sbooze; - end - """ % dict(tp=self.table_prefix) - cur.execute(sql) - - def help_nextset_tearDown(self,cur): - 'If cleaning up is needed after nextSetTest' - cur.execute("drop procedure deleteme") - - def test_nextset(self): - from warnings import warn - con = self._connect() - try: - cur = con.cursor() - if not hasattr(cur,'nextset'): - return - - try: - self.executeDDL1(cur) - sql=self._populate() - for sql in self._populate(): - cur.execute(sql) - - self.help_nextset_setUp(cur) - - cur.callproc('deleteme') - numberofrows=cur.fetchone() - assert numberofrows[0]== len(self.samples) - assert cur.nextset() - names=cur.fetchall() - assert len(names) == len(self.samples) - s=cur.nextset() - if s: - empty = cur.fetchall() - self.assertEquals(len(empty), 0, - "non-empty result set after other result sets") - #warn("Incompatibility: MySQL returns an empty result set for the CALL itself", - # Warning) - #assert s == None,'No more return sets, should return None' - finally: - self.help_nextset_tearDown(cur) - - finally: - con.close() - - -if __name__ == '__main__': - unittest.main() +#!/usr/bin/env python +import dbapi20 +import unittest +import MySQLdb +from configdb import connection_kwargs + +class test_MySQLdb(dbapi20.DatabaseAPI20Test): + driver = MySQLdb + connect_args = () + connect_kw_args = connection_kwargs(dict(sql_mode="ANSI,STRICT_TRANS_TABLES,TRADITIONAL")) + + def test_setoutputsize(self): pass + def test_setoutputsize_basic(self): pass + def test_nextset(self): pass + + """The tests on fetchone and fetchall and rowcount bogusly + test for an exception if the statement cannot return a + result set. MySQL always returns a result set; it's just that + some things return empty result sets.""" + + def test_fetchall(self): + con = self._connect() + try: + cur = con.cursor() + # cursor.fetchall should raise an Error if called + # without executing a query that may return rows (such + # as a select) + self.assertRaises(self.driver.Error, cur.fetchall) + + self.executeDDL1(cur) + for sql in self._populate(): + cur.execute(sql) + + # cursor.fetchall should raise an Error if called + # after executing a a statement that cannot return rows +## self.assertRaises(self.driver.Error,cur.fetchall) + + cur.execute('select name from %sbooze' % self.table_prefix) + rows = cur.fetchall() + self.assertTrue(cur.rowcount in (-1,len(self.samples))) + self.assertEqual(len(rows),len(self.samples), + 'cursor.fetchall did not retrieve all rows' + ) + rows = [r[0] for r in rows] + rows.sort() + for i in range(0,len(self.samples)): + self.assertEqual(rows[i],self.samples[i], + 'cursor.fetchall retrieved incorrect rows' + ) + rows = cur.fetchall() + self.assertEqual( + len(rows),0, + 'cursor.fetchall should return an empty list if called ' + 'after the whole result set has been fetched' + ) + self.assertTrue(cur.rowcount in (-1,len(self.samples))) + + self.executeDDL2(cur) + cur.execute('select name from %sbarflys' % self.table_prefix) + rows = cur.fetchall() + self.assertTrue(cur.rowcount in (-1,0)) + self.assertEqual(len(rows),0, + 'cursor.fetchall should return an empty list if ' + 'a select query returns no rows' + ) + + finally: + con.close() + + def test_fetchone(self): + con = self._connect() + try: + cur = con.cursor() + + # cursor.fetchone should raise an Error if called before + # executing a select-type query + self.assertRaises(self.driver.Error,cur.fetchone) + + # cursor.fetchone should raise an Error if called after + # executing a query that cannnot return rows + self.executeDDL1(cur) +## self.assertRaises(self.driver.Error,cur.fetchone) + + cur.execute('select name from %sbooze' % self.table_prefix) + self.assertEqual(cur.fetchone(),None, + 'cursor.fetchone should return None if a query retrieves ' + 'no rows' + ) + self.assertTrue(cur.rowcount in (-1,0)) + + # cursor.fetchone should raise an Error if called after + # executing a query that cannnot return rows + cur.execute("insert into %sbooze values ('Victoria Bitter')" % ( + self.table_prefix + )) +## self.assertRaises(self.driver.Error,cur.fetchone) + + cur.execute('select name from %sbooze' % self.table_prefix) + r = cur.fetchone() + self.assertEqual(len(r),1, + 'cursor.fetchone should have retrieved a single row' + ) + self.assertEqual(r[0],'Victoria Bitter', + 'cursor.fetchone retrieved incorrect data' + ) +## self.assertEqual(cur.fetchone(),None, +## 'cursor.fetchone should return None if no more rows available' +## ) + self.assertTrue(cur.rowcount in (-1,1)) + finally: + con.close() + + # Same complaint as for fetchall and fetchone + def test_rowcount(self): + con = self._connect() + try: + cur = con.cursor() + self.executeDDL1(cur) +## self.assertEqual(cur.rowcount,-1, +## 'cursor.rowcount should be -1 after executing no-result ' +## 'statements' +## ) + cur.execute("insert into %sbooze values ('Victoria Bitter')" % ( + self.table_prefix + )) +## self.assertTrue(cur.rowcount in (-1,1), +## 'cursor.rowcount should == number or rows inserted, or ' +## 'set to -1 after executing an insert statement' +## ) + cur.execute("select name from %sbooze" % self.table_prefix) + self.assertTrue(cur.rowcount in (-1,1), + 'cursor.rowcount should == number of rows returned, or ' + 'set to -1 after executing a select statement' + ) + self.executeDDL2(cur) +## self.assertEqual(cur.rowcount,-1, +## 'cursor.rowcount not being reset to -1 after executing ' +## 'no-result statements' +## ) + finally: + con.close() + + def test_callproc(self): + pass # performed in test_MySQL_capabilities + + def help_nextset_setUp(self,cur): + ''' Should create a procedure called deleteme + that returns two result sets, first the + number of rows in booze then "name from booze" + ''' + sql=""" + create procedure deleteme() + begin + select count(*) from %(tp)sbooze; + select name from %(tp)sbooze; + end + """ % dict(tp=self.table_prefix) + cur.execute(sql) + + def help_nextset_tearDown(self,cur): + 'If cleaning up is needed after nextSetTest' + cur.execute("drop procedure deleteme") + + def test_nextset(self): + from warnings import warn + con = self._connect() + try: + cur = con.cursor() + if not hasattr(cur,'nextset'): + return + + try: + self.executeDDL1(cur) + sql=self._populate() + for sql in self._populate(): + cur.execute(sql) + + self.help_nextset_setUp(cur) + + cur.callproc('deleteme') + numberofrows=cur.fetchone() + assert numberofrows[0]== len(self.samples) + assert cur.nextset() + names=cur.fetchall() + assert len(names) == len(self.samples) + s=cur.nextset() + if s: + empty = cur.fetchall() + self.assertEquals(len(empty), 0, + "non-empty result set after other result sets") + #warn("Incompatibility: MySQL returns an empty result set for the CALL itself", + # Warning) + #assert s == None,'No more return sets, should return None' + finally: + self.help_nextset_tearDown(cur) + + finally: + con.close() + + +if __name__ == '__main__': + unittest.main() -- cgit v1.2.1