summaryrefslogtreecommitdiff
path: root/MySQLdb
diff options
context:
space:
mode:
authorfarcepest <farcepest@gmail.com>2012-10-02 12:37:41 -0400
committerfarcepest <farcepest@gmail.com>2012-10-02 12:37:41 -0400
commitd0e96c715558db1b1a3717be1db3f4d325cc3c0a (patch)
tree81b6c7ff42759b284730d6c7aeed4abe6ca64b52 /MySQLdb
parent4af9b70b91903f788207e7b7bbb28676862c1b5a (diff)
downloadmysqldb1-d0e96c715558db1b1a3717be1db3f4d325cc3c0a.tar.gz
Unify test connection configuration
Diffstat (limited to 'MySQLdb')
-rw-r--r--MySQLdb/tests/capabilities.py561
-rw-r--r--MySQLdb/tests/configdb.py25
-rw-r--r--MySQLdb/tests/default.cnf10
-rw-r--r--MySQLdb/tests/test_MySQLdb_capabilities.py205
-rw-r--r--MySQLdb/tests/test_MySQLdb_dbapi20.py407
5 files changed, 620 insertions, 588 deletions
diff --git a/MySQLdb/tests/capabilities.py b/MySQLdb/tests/capabilities.py
index 6bb7436..076361c 100644
--- a/MySQLdb/tests/capabilities.py
+++ b/MySQLdb/tests/capabilities.py
@@ -1,280 +1,281 @@
-#!/usr/bin/env python -O
-""" Script to test database capabilities and the DB-API interface
- for functionality and memory leaks.
-
- Adapted from a script by M-A Lemburg.
-
-"""
-from time import time
-import array
-import unittest
-
-
-class DatabaseTest(unittest.TestCase):
-
- db_module = None
- connect_args = ()
- connect_kwargs = dict()
- create_table_extra = ''
- rows = 10
- debug = False
-
- def setUp(self):
- import gc
- db = self.db_module.connect(*self.connect_args, **self.connect_kwargs)
- self.connection = db
- self.cursor = db.cursor()
- # TODO: this needs to be re-evaluated for Python 3
- self.BLOBText = ''.join([chr(i) for i in range(256)] * 100);
- self.BLOBUText = u''.join([unichr(i) for i in range(16384)])
- self.BLOBBinary = self.db_module.Binary(''.join([chr(i) for i in range(256)] * 16))
-
- leak_test = True
-
- def tearDown(self):
- if self.leak_test:
- import gc
- del self.cursor
- orphans = gc.collect()
- self.failIf(orphans, "%d orphaned objects found after deleting cursor" % orphans)
-
- del self.connection
- orphans = gc.collect()
- self.failIf(orphans, "%d orphaned objects found after deleting connection" % orphans)
-
- def table_exists(self, name):
- try:
- self.cursor.execute('select * from %s where 1=0' % name)
- except:
- return False
- else:
- return True
-
- def quote_identifier(self, ident):
- return '"%s"' % ident
-
- def new_table_name(self):
- i = id(self.cursor)
- while True:
- name = self.quote_identifier('tb%08x' % i)
- if not self.table_exists(name):
- return name
- i = i + 1
-
- def create_table(self, columndefs):
-
- """ Create a table using a list of column definitions given in
- columndefs.
-
- generator must be a function taking arguments (row_number,
- col_number) returning a suitable data object for insertion
- into the table.
-
- """
- self.table = self.new_table_name()
- self.cursor.execute('CREATE TABLE %s (%s) %s' %
- (self.table,
- ',\n'.join(columndefs),
- self.create_table_extra))
-
- def check_data_integrity(self, columndefs, generator):
- # insert
- self.create_table(columndefs)
- insert_statement = ('INSERT INTO %s VALUES (%s)' %
- (self.table,
- ','.join(['%s'] * len(columndefs))))
- data = [ [ generator(i,j) for j in range(len(columndefs)) ]
- for i in range(self.rows) ]
- self.cursor.executemany(insert_statement, data)
- self.connection.commit()
- # verify
- self.cursor.execute('select * from %s' % self.table)
- l = self.cursor.fetchall()
- self.assertEquals(len(l), self.rows)
- try:
- for i in range(self.rows):
- for j in range(len(columndefs)):
- self.assertEquals(l[i][j], generator(i,j))
- finally:
- if not self.debug:
- self.cursor.execute('drop table %s' % (self.table))
-
- def test_transactions(self):
- columndefs = ( 'col1 INT', 'col2 VARCHAR(255)')
- def generator(row, col):
- if col == 0: return row
- else: return ('%i' % (row%10))*255
- self.create_table(columndefs)
- insert_statement = ('INSERT INTO %s VALUES (%s)' %
- (self.table,
- ','.join(['%s'] * len(columndefs))))
- data = [ [ generator(i,j) for j in range(len(columndefs)) ]
- for i in range(self.rows) ]
- self.cursor.executemany(insert_statement, data)
- # verify
- self.connection.commit()
- self.cursor.execute('select * from %s' % self.table)
- l = self.cursor.fetchall()
- self.assertEquals(len(l), self.rows)
- for i in range(self.rows):
- for j in range(len(columndefs)):
- self.assertEquals(l[i][j], generator(i,j))
- delete_statement = 'delete from %s where col1=%%s' % self.table
- self.cursor.execute(delete_statement, (0,))
- self.cursor.execute('select col1 from %s where col1=%s' % \
- (self.table, 0))
- l = self.cursor.fetchall()
- self.assertFalse(l, "DELETE didn't work")
- self.connection.rollback()
- self.cursor.execute('select col1 from %s where col1=%s' % \
- (self.table, 0))
- l = self.cursor.fetchall()
- self.assertTrue(len(l) == 1, "ROLLBACK didn't work")
- self.cursor.execute('drop table %s' % (self.table))
-
- def test_truncation(self):
- columndefs = ( 'col1 INT', 'col2 VARCHAR(255)')
- def generator(row, col):
- if col == 0: return row
- else: return ('%i' % (row%10))*((255-self.rows/2)+row)
- self.create_table(columndefs)
- insert_statement = ('INSERT INTO %s VALUES (%s)' %
- (self.table,
- ','.join(['%s'] * len(columndefs))))
-
- try:
- self.cursor.execute(insert_statement, (0, '0'*256))
- except self.connection.DataError:
- pass
- else:
- self.fail("Over-long column did not generate warnings/exception with single insert")
-
- self.connection.rollback()
-
- try:
- for i in range(self.rows):
- data = []
- for j in range(len(columndefs)):
- data.append(generator(i,j))
- self.cursor.execute(insert_statement,tuple(data))
- except self.connection.DataError:
- pass
- else:
- self.fail("Over-long columns did not generate warnings/exception with execute()")
-
- self.connection.rollback()
-
- try:
- data = [ [ generator(i,j) for j in range(len(columndefs)) ]
- for i in range(self.rows) ]
- self.cursor.executemany(insert_statement, data)
- except self.connection.DataError:
- pass
- else:
- self.fail("Over-long columns did not generate warnings/exception with executemany()")
-
- self.connection.rollback()
- self.cursor.execute('drop table %s' % (self.table))
-
- def test_CHAR(self):
- # Character data
- def generator(row,col):
- return ('%i' % ((row+col) % 10)) * 255
- self.check_data_integrity(
- ('col1 char(255)','col2 char(255)'),
- generator)
-
- def test_INT(self):
- # Number data
- def generator(row,col):
- return row*row
- self.check_data_integrity(
- ('col1 INT',),
- generator)
-
- def test_DECIMAL(self):
- # DECIMAL
- def generator(row,col):
- from decimal import Decimal
- return Decimal("%d.%02d" % (row, col))
- self.check_data_integrity(
- ('col1 DECIMAL(5,2)',),
- generator)
-
- def test_DATE(self):
- ticks = time()
- def generator(row,col):
- return self.db_module.DateFromTicks(ticks+row*86400-col*1313)
- self.check_data_integrity(
- ('col1 DATE',),
- generator)
-
- def test_TIME(self):
- ticks = time()
- def generator(row,col):
- return self.db_module.TimeFromTicks(ticks+row*86400-col*1313)
- self.check_data_integrity(
- ('col1 TIME',),
- generator)
-
- def test_DATETIME(self):
- ticks = time()
- def generator(row,col):
- return self.db_module.TimestampFromTicks(ticks+row*86400-col*1313)
- self.check_data_integrity(
- ('col1 DATETIME',),
- generator)
-
- def test_TIMESTAMP(self):
- ticks = time()
- def generator(row,col):
- return self.db_module.TimestampFromTicks(ticks+row*86400-col*1313)
- self.check_data_integrity(
- ('col1 TIMESTAMP',),
- generator)
-
- def test_fractional_TIMESTAMP(self):
- ticks = time()
- def generator(row,col):
- return self.db_module.TimestampFromTicks(ticks+row*86400-col*1313+row*0.7*col/3.0)
- self.check_data_integrity(
- ('col1 TIMESTAMP',),
- generator)
-
- def test_LONG(self):
- def generator(row,col):
- if col == 0:
- return row
- else:
- return self.BLOBUText # 'BLOB Text ' * 1024
- self.check_data_integrity(
- ('col1 INT','col2 LONG'),
- generator)
-
- def test_TEXT(self):
- def generator(row,col):
- return self.BLOBUText # 'BLOB Text ' * 1024
- self.check_data_integrity(
- ('col2 TEXT',),
- generator)
-
- def test_LONG_BYTE(self):
- def generator(row,col):
- if col == 0:
- return row
- else:
- return self.BLOBBinary # 'BLOB\000Binary ' * 1024
- self.check_data_integrity(
- ('col1 INT','col2 LONG BYTE'),
- generator)
-
- def test_BLOB(self):
- def generator(row,col):
- if col == 0:
- return row
- else:
- return self.BLOBBinary # 'BLOB\000Binary ' * 1024
- self.check_data_integrity(
- ('col1 INT','col2 BLOB'),
- generator)
-
+#!/usr/bin/env python -O
+""" Script to test database capabilities and the DB-API interface
+ for functionality and memory leaks.
+
+ Adapted from a script by M-A Lemburg.
+
+"""
+from time import time
+import array
+import unittest
+from configdb import connection_factory
+
+
+class DatabaseTest(unittest.TestCase):
+
+ db_module = None
+ connect_args = ()
+ connect_kwargs = dict()
+ create_table_extra = ''
+ rows = 10
+ debug = False
+
+ def setUp(self):
+ import gc
+ db = connection_factory(**self.connect_kwargs)
+ self.connection = db
+ self.cursor = db.cursor()
+ # TODO: this needs to be re-evaluated for Python 3
+ self.BLOBText = ''.join([chr(i) for i in range(256)] * 100);
+ self.BLOBUText = u''.join([unichr(i) for i in range(16384)])
+ self.BLOBBinary = self.db_module.Binary(''.join([chr(i) for i in range(256)] * 16))
+
+ leak_test = True
+
+ def tearDown(self):
+ if self.leak_test:
+ import gc
+ del self.cursor
+ orphans = gc.collect()
+ self.failIf(orphans, "%d orphaned objects found after deleting cursor" % orphans)
+
+ del self.connection
+ orphans = gc.collect()
+ self.failIf(orphans, "%d orphaned objects found after deleting connection" % orphans)
+
+ def table_exists(self, name):
+ try:
+ self.cursor.execute('select * from %s where 1=0' % name)
+ except:
+ return False
+ else:
+ return True
+
+ def quote_identifier(self, ident):
+ return '"%s"' % ident
+
+ def new_table_name(self):
+ i = id(self.cursor)
+ while True:
+ name = self.quote_identifier('tb%08x' % i)
+ if not self.table_exists(name):
+ return name
+ i = i + 1
+
+ def create_table(self, columndefs):
+
+ """ Create a table using a list of column definitions given in
+ columndefs.
+
+ generator must be a function taking arguments (row_number,
+ col_number) returning a suitable data object for insertion
+ into the table.
+
+ """
+ self.table = self.new_table_name()
+ self.cursor.execute('CREATE TABLE %s (%s) %s' %
+ (self.table,
+ ',\n'.join(columndefs),
+ self.create_table_extra))
+
+ def check_data_integrity(self, columndefs, generator):
+ # insert
+ self.create_table(columndefs)
+ insert_statement = ('INSERT INTO %s VALUES (%s)' %
+ (self.table,
+ ','.join(['%s'] * len(columndefs))))
+ data = [ [ generator(i,j) for j in range(len(columndefs)) ]
+ for i in range(self.rows) ]
+ self.cursor.executemany(insert_statement, data)
+ self.connection.commit()
+ # verify
+ self.cursor.execute('select * from %s' % self.table)
+ l = self.cursor.fetchall()
+ self.assertEquals(len(l), self.rows)
+ try:
+ for i in range(self.rows):
+ for j in range(len(columndefs)):
+ self.assertEquals(l[i][j], generator(i,j))
+ finally:
+ if not self.debug:
+ self.cursor.execute('drop table %s' % (self.table))
+
+ def test_transactions(self):
+ columndefs = ( 'col1 INT', 'col2 VARCHAR(255)')
+ def generator(row, col):
+ if col == 0: return row
+ else: return ('%i' % (row%10))*255
+ self.create_table(columndefs)
+ insert_statement = ('INSERT INTO %s VALUES (%s)' %
+ (self.table,
+ ','.join(['%s'] * len(columndefs))))
+ data = [ [ generator(i,j) for j in range(len(columndefs)) ]
+ for i in range(self.rows) ]
+ self.cursor.executemany(insert_statement, data)
+ # verify
+ self.connection.commit()
+ self.cursor.execute('select * from %s' % self.table)
+ l = self.cursor.fetchall()
+ self.assertEquals(len(l), self.rows)
+ for i in range(self.rows):
+ for j in range(len(columndefs)):
+ self.assertEquals(l[i][j], generator(i,j))
+ delete_statement = 'delete from %s where col1=%%s' % self.table
+ self.cursor.execute(delete_statement, (0,))
+ self.cursor.execute('select col1 from %s where col1=%s' % \
+ (self.table, 0))
+ l = self.cursor.fetchall()
+ self.assertFalse(l, "DELETE didn't work")
+ self.connection.rollback()
+ self.cursor.execute('select col1 from %s where col1=%s' % \
+ (self.table, 0))
+ l = self.cursor.fetchall()
+ self.assertTrue(len(l) == 1, "ROLLBACK didn't work")
+ self.cursor.execute('drop table %s' % (self.table))
+
+ def test_truncation(self):
+ columndefs = ( 'col1 INT', 'col2 VARCHAR(255)')
+ def generator(row, col):
+ if col == 0: return row
+ else: return ('%i' % (row%10))*((255-self.rows/2)+row)
+ self.create_table(columndefs)
+ insert_statement = ('INSERT INTO %s VALUES (%s)' %
+ (self.table,
+ ','.join(['%s'] * len(columndefs))))
+
+ try:
+ self.cursor.execute(insert_statement, (0, '0'*256))
+ except self.connection.DataError:
+ pass
+ else:
+ self.fail("Over-long column did not generate warnings/exception with single insert")
+
+ self.connection.rollback()
+
+ try:
+ for i in range(self.rows):
+ data = []
+ for j in range(len(columndefs)):
+ data.append(generator(i,j))
+ self.cursor.execute(insert_statement,tuple(data))
+ except self.connection.DataError:
+ pass
+ else:
+ self.fail("Over-long columns did not generate warnings/exception with execute()")
+
+ self.connection.rollback()
+
+ try:
+ data = [ [ generator(i,j) for j in range(len(columndefs)) ]
+ for i in range(self.rows) ]
+ self.cursor.executemany(insert_statement, data)
+ except self.connection.DataError:
+ pass
+ else:
+ self.fail("Over-long columns did not generate warnings/exception with executemany()")
+
+ self.connection.rollback()
+ self.cursor.execute('drop table %s' % (self.table))
+
+ def test_CHAR(self):
+ # Character data
+ def generator(row,col):
+ return ('%i' % ((row+col) % 10)) * 255
+ self.check_data_integrity(
+ ('col1 char(255)','col2 char(255)'),
+ generator)
+
+ def test_INT(self):
+ # Number data
+ def generator(row,col):
+ return row*row
+ self.check_data_integrity(
+ ('col1 INT',),
+ generator)
+
+ def test_DECIMAL(self):
+ # DECIMAL
+ def generator(row,col):
+ from decimal import Decimal
+ return Decimal("%d.%02d" % (row, col))
+ self.check_data_integrity(
+ ('col1 DECIMAL(5,2)',),
+ generator)
+
+ def test_DATE(self):
+ ticks = time()
+ def generator(row,col):
+ return self.db_module.DateFromTicks(ticks+row*86400-col*1313)
+ self.check_data_integrity(
+ ('col1 DATE',),
+ generator)
+
+ def test_TIME(self):
+ ticks = time()
+ def generator(row,col):
+ return self.db_module.TimeFromTicks(ticks+row*86400-col*1313)
+ self.check_data_integrity(
+ ('col1 TIME',),
+ generator)
+
+ def test_DATETIME(self):
+ ticks = time()
+ def generator(row,col):
+ return self.db_module.TimestampFromTicks(ticks+row*86400-col*1313)
+ self.check_data_integrity(
+ ('col1 DATETIME',),
+ generator)
+
+ def test_TIMESTAMP(self):
+ ticks = time()
+ def generator(row,col):
+ return self.db_module.TimestampFromTicks(ticks+row*86400-col*1313)
+ self.check_data_integrity(
+ ('col1 TIMESTAMP',),
+ generator)
+
+ def test_fractional_TIMESTAMP(self):
+ ticks = time()
+ def generator(row,col):
+ return self.db_module.TimestampFromTicks(ticks+row*86400-col*1313+row*0.7*col/3.0)
+ self.check_data_integrity(
+ ('col1 TIMESTAMP',),
+ generator)
+
+ def test_LONG(self):
+ def generator(row,col):
+ if col == 0:
+ return row
+ else:
+ return self.BLOBUText # 'BLOB Text ' * 1024
+ self.check_data_integrity(
+ ('col1 INT','col2 LONG'),
+ generator)
+
+ def test_TEXT(self):
+ def generator(row,col):
+ return self.BLOBUText # 'BLOB Text ' * 1024
+ self.check_data_integrity(
+ ('col2 TEXT',),
+ generator)
+
+ def test_LONG_BYTE(self):
+ def generator(row,col):
+ if col == 0:
+ return row
+ else:
+ return self.BLOBBinary # 'BLOB\000Binary ' * 1024
+ self.check_data_integrity(
+ ('col1 INT','col2 LONG BYTE'),
+ generator)
+
+ def test_BLOB(self):
+ def generator(row,col):
+ if col == 0:
+ return row
+ else:
+ return self.BLOBBinary # 'BLOB\000Binary ' * 1024
+ self.check_data_integrity(
+ ('col1 INT','col2 BLOB'),
+ generator)
+
diff --git a/MySQLdb/tests/configdb.py b/MySQLdb/tests/configdb.py
new file mode 100644
index 0000000..cd6d43d
--- /dev/null
+++ b/MySQLdb/tests/configdb.py
@@ -0,0 +1,25 @@
+"""Configure database connection for tests."""
+
+from os import environ, path
+
+tests_path = path.dirname(__file__)
+conf_file = environ.get('TESTDB', 'default.cnf')
+conf_path = path.join(tests_path, conf_file)
+connect_kwargs = dict(
+ read_default_file = conf_path,
+ read_default_group = "MySQLdb-tests",
+)
+
+def connection_kwargs(kwargs):
+ db_kwargs = connect_kwargs.copy()
+ db_kwargs.update(kwargs)
+ return db_kwargs
+
+def connection_factory(**kwargs):
+ import MySQLdb
+ db_kwargs = connection_kwargs(kwargs)
+ db = MySQLdb.connect(**db_kwargs)
+ return db
+
+
+
diff --git a/MySQLdb/tests/default.cnf b/MySQLdb/tests/default.cnf
new file mode 100644
index 0000000..2aeda7c
--- /dev/null
+++ b/MySQLdb/tests/default.cnf
@@ -0,0 +1,10 @@
+# To create your own custom version of this file, read
+# http://dev.mysql.com/doc/refman/5.1/en/option-files.html
+# and set TESTDB in your environment to the name of the file
+
+[MySQLdb-tests]
+host = 127.0.0.1
+user = test
+database = test
+#password =
+default-character-set = utf8
diff --git a/MySQLdb/tests/test_MySQLdb_capabilities.py b/MySQLdb/tests/test_MySQLdb_capabilities.py
index a510468..60bbfad 100644
--- a/MySQLdb/tests/test_MySQLdb_capabilities.py
+++ b/MySQLdb/tests/test_MySQLdb_capabilities.py
@@ -1,103 +1,102 @@
-#!/usr/bin/env python
-import capabilities
-import unittest
-import MySQLdb
-import warnings
-
-warnings.filterwarnings('error')
-
-class test_MySQLdb(capabilities.DatabaseTest):
-
- db_module = MySQLdb
- connect_args = ()
- connect_kwargs = dict(db='test', host="127.0.0.1", user="test", #read_default_file='~/.my.cnf',
- charset='utf8', sql_mode="ANSI,STRICT_TRANS_TABLES,TRADITIONAL")
- create_table_extra = "ENGINE=INNODB CHARACTER SET UTF8"
- leak_test = False
-
- def quote_identifier(self, ident):
- return "`%s`" % ident
-
- def test_TIME(self):
- from datetime import timedelta
- def generator(row,col):
- return timedelta(0, row*8000)
- self.check_data_integrity(
- ('col1 TIME',),
- generator)
-
- def test_TINYINT(self):
- # Number data
- def generator(row,col):
- v = (row*row) % 256
- if v > 127:
- v = v-256
- return v
- self.check_data_integrity(
- ('col1 TINYINT',),
- generator)
-
- def test_stored_procedures(self):
- db = self.connection
- c = self.cursor
- self.create_table(('pos INT', 'tree CHAR(20)'))
- c.executemany("INSERT INTO %s (pos,tree) VALUES (%%s,%%s)" % self.table,
- list(enumerate('ash birch cedar larch pine'.split())))
- db.commit()
-
- c.execute("""
- CREATE PROCEDURE test_sp(IN t VARCHAR(255))
- BEGIN
- SELECT pos FROM %s WHERE tree = t;
- END
- """ % self.table)
- db.commit()
-
- c.callproc('test_sp', ('larch',))
- rows = c.fetchall()
- self.assertEquals(len(rows), 1)
- self.assertEquals(rows[0][0], 3)
- c.nextset()
-
- c.execute("DROP PROCEDURE test_sp")
- c.execute('drop table %s' % (self.table))
-
- def test_small_CHAR(self):
- # Character data
- def generator(row,col):
- i = (row*col+62)%256
- if i == 62: return ''
- if i == 63: return None
- return chr(i)
- self.check_data_integrity(
- ('col1 char(1)','col2 char(1)'),
- generator)
-
- def test_bug_2671682(self):
- from MySQLdb.constants import ER
- try:
- self.cursor.execute("describe some_non_existent_table");
- except self.connection.ProgrammingError, msg:
- self.assertTrue(msg[0] == ER.NO_SUCH_TABLE)
-
- def test_bug_3514287(self):
- c = self.cursor
- try:
- c.execute("""create table bug_3541287 (
- c1 CHAR(10),
- t1 TIMESTAMP)""")
- c.execute("insert into bug_3541287 (c1,t1) values (%s, NOW())",
- ("blah",))
- finally:
- c.execute("drop table if exists bug_3541287")
-
- def test_ping(self):
- self.connection.ping()
-
-
-if __name__ == '__main__':
- if test_MySQLdb.leak_test:
- import gc
- gc.enable()
- gc.set_debug(gc.DEBUG_LEAK)
- unittest.main()
+#!/usr/bin/env python
+import capabilities
+import unittest
+import MySQLdb
+import warnings
+
+warnings.filterwarnings('error')
+
+class test_MySQLdb(capabilities.DatabaseTest):
+
+ db_module = MySQLdb
+ connect_args = ()
+ connect_kwargs = dict(use_unicode=True, sql_mode="ANSI,STRICT_TRANS_TABLES,TRADITIONAL")
+ create_table_extra = "ENGINE=INNODB CHARACTER SET UTF8"
+ leak_test = False
+
+ def quote_identifier(self, ident):
+ return "`%s`" % ident
+
+ def test_TIME(self):
+ from datetime import timedelta
+ def generator(row,col):
+ return timedelta(0, row*8000)
+ self.check_data_integrity(
+ ('col1 TIME',),
+ generator)
+
+ def test_TINYINT(self):
+ # Number data
+ def generator(row,col):
+ v = (row*row) % 256
+ if v > 127:
+ v = v-256
+ return v
+ self.check_data_integrity(
+ ('col1 TINYINT',),
+ generator)
+
+ def test_stored_procedures(self):
+ db = self.connection
+ c = self.cursor
+ self.create_table(('pos INT', 'tree CHAR(20)'))
+ c.executemany("INSERT INTO %s (pos,tree) VALUES (%%s,%%s)" % self.table,
+ list(enumerate('ash birch cedar larch pine'.split())))
+ db.commit()
+
+ c.execute("""
+ CREATE PROCEDURE test_sp(IN t VARCHAR(255))
+ BEGIN
+ SELECT pos FROM %s WHERE tree = t;
+ END
+ """ % self.table)
+ db.commit()
+
+ c.callproc('test_sp', ('larch',))
+ rows = c.fetchall()
+ self.assertEquals(len(rows), 1)
+ self.assertEquals(rows[0][0], 3)
+ c.nextset()
+
+ c.execute("DROP PROCEDURE test_sp")
+ c.execute('drop table %s' % (self.table))
+
+ def test_small_CHAR(self):
+ # Character data
+ def generator(row,col):
+ i = (row*col+62)%256
+ if i == 62: return ''
+ if i == 63: return None
+ return chr(i)
+ self.check_data_integrity(
+ ('col1 char(1)','col2 char(1)'),
+ generator)
+
+ def test_bug_2671682(self):
+ from MySQLdb.constants import ER
+ try:
+ self.cursor.execute("describe some_non_existent_table");
+ except self.connection.ProgrammingError, msg:
+ self.assertTrue(msg[0] == ER.NO_SUCH_TABLE)
+
+ def test_bug_3514287(self):
+ c = self.cursor
+ try:
+ c.execute("""create table bug_3541287 (
+ c1 CHAR(10),
+ t1 TIMESTAMP)""")
+ c.execute("insert into bug_3541287 (c1,t1) values (%s, NOW())",
+ ("blah",))
+ finally:
+ c.execute("drop table if exists bug_3541287")
+
+ def test_ping(self):
+ self.connection.ping()
+
+
+if __name__ == '__main__':
+ if test_MySQLdb.leak_test:
+ import gc
+ gc.enable()
+ gc.set_debug(gc.DEBUG_LEAK)
+ unittest.main()
diff --git a/MySQLdb/tests/test_MySQLdb_dbapi20.py b/MySQLdb/tests/test_MySQLdb_dbapi20.py
index 4f4527a..44830e0 100644
--- a/MySQLdb/tests/test_MySQLdb_dbapi20.py
+++ b/MySQLdb/tests/test_MySQLdb_dbapi20.py
@@ -1,205 +1,202 @@
-#!/usr/bin/env python
-import dbapi20
-import unittest
-import MySQLdb
-
-class test_MySQLdb(dbapi20.DatabaseAPI20Test):
- driver = MySQLdb
- connect_args = ()
- connect_kw_args = dict(db='test',
- host="127.0.0.1",
- user="test", #read_default_file='~/.my.cnf',
- charset='utf8',
- sql_mode="ANSI,STRICT_TRANS_TABLES,TRADITIONAL")
-
- def test_setoutputsize(self): pass
- def test_setoutputsize_basic(self): pass
- def test_nextset(self): pass
-
- """The tests on fetchone and fetchall and rowcount bogusly
- test for an exception if the statement cannot return a
- result set. MySQL always returns a result set; it's just that
- some things return empty result sets."""
-
- def test_fetchall(self):
- con = self._connect()
- try:
- cur = con.cursor()
- # cursor.fetchall should raise an Error if called
- # without executing a query that may return rows (such
- # as a select)
- self.assertRaises(self.driver.Error, cur.fetchall)
-
- self.executeDDL1(cur)
- for sql in self._populate():
- cur.execute(sql)
-
- # cursor.fetchall should raise an Error if called
- # after executing a a statement that cannot return rows
-## self.assertRaises(self.driver.Error,cur.fetchall)
-
- cur.execute('select name from %sbooze' % self.table_prefix)
- rows = cur.fetchall()
- self.assertTrue(cur.rowcount in (-1,len(self.samples)))
- self.assertEqual(len(rows),len(self.samples),
- 'cursor.fetchall did not retrieve all rows'
- )
- rows = [r[0] for r in rows]
- rows.sort()
- for i in range(0,len(self.samples)):
- self.assertEqual(rows[i],self.samples[i],
- 'cursor.fetchall retrieved incorrect rows'
- )
- rows = cur.fetchall()
- self.assertEqual(
- len(rows),0,
- 'cursor.fetchall should return an empty list if called '
- 'after the whole result set has been fetched'
- )
- self.assertTrue(cur.rowcount in (-1,len(self.samples)))
-
- self.executeDDL2(cur)
- cur.execute('select name from %sbarflys' % self.table_prefix)
- rows = cur.fetchall()
- self.assertTrue(cur.rowcount in (-1,0))
- self.assertEqual(len(rows),0,
- 'cursor.fetchall should return an empty list if '
- 'a select query returns no rows'
- )
-
- finally:
- con.close()
-
- def test_fetchone(self):
- con = self._connect()
- try:
- cur = con.cursor()
-
- # cursor.fetchone should raise an Error if called before
- # executing a select-type query
- self.assertRaises(self.driver.Error,cur.fetchone)
-
- # cursor.fetchone should raise an Error if called after
- # executing a query that cannnot return rows
- self.executeDDL1(cur)
-## self.assertRaises(self.driver.Error,cur.fetchone)
-
- cur.execute('select name from %sbooze' % self.table_prefix)
- self.assertEqual(cur.fetchone(),None,
- 'cursor.fetchone should return None if a query retrieves '
- 'no rows'
- )
- self.assertTrue(cur.rowcount in (-1,0))
-
- # cursor.fetchone should raise an Error if called after
- # executing a query that cannnot return rows
- cur.execute("insert into %sbooze values ('Victoria Bitter')" % (
- self.table_prefix
- ))
-## self.assertRaises(self.driver.Error,cur.fetchone)
-
- cur.execute('select name from %sbooze' % self.table_prefix)
- r = cur.fetchone()
- self.assertEqual(len(r),1,
- 'cursor.fetchone should have retrieved a single row'
- )
- self.assertEqual(r[0],'Victoria Bitter',
- 'cursor.fetchone retrieved incorrect data'
- )
-## self.assertEqual(cur.fetchone(),None,
-## 'cursor.fetchone should return None if no more rows available'
-## )
- self.assertTrue(cur.rowcount in (-1,1))
- finally:
- con.close()
-
- # Same complaint as for fetchall and fetchone
- def test_rowcount(self):
- con = self._connect()
- try:
- cur = con.cursor()
- self.executeDDL1(cur)
-## self.assertEqual(cur.rowcount,-1,
-## 'cursor.rowcount should be -1 after executing no-result '
-## 'statements'
-## )
- cur.execute("insert into %sbooze values ('Victoria Bitter')" % (
- self.table_prefix
- ))
-## self.assertTrue(cur.rowcount in (-1,1),
-## 'cursor.rowcount should == number or rows inserted, or '
-## 'set to -1 after executing an insert statement'
-## )
- cur.execute("select name from %sbooze" % self.table_prefix)
- self.assertTrue(cur.rowcount in (-1,1),
- 'cursor.rowcount should == number of rows returned, or '
- 'set to -1 after executing a select statement'
- )
- self.executeDDL2(cur)
-## self.assertEqual(cur.rowcount,-1,
-## 'cursor.rowcount not being reset to -1 after executing '
-## 'no-result statements'
-## )
- finally:
- con.close()
-
- def test_callproc(self):
- pass # performed in test_MySQL_capabilities
-
- def help_nextset_setUp(self,cur):
- ''' Should create a procedure called deleteme
- that returns two result sets, first the
- number of rows in booze then "name from booze"
- '''
- sql="""
- create procedure deleteme()
- begin
- select count(*) from %(tp)sbooze;
- select name from %(tp)sbooze;
- end
- """ % dict(tp=self.table_prefix)
- cur.execute(sql)
-
- def help_nextset_tearDown(self,cur):
- 'If cleaning up is needed after nextSetTest'
- cur.execute("drop procedure deleteme")
-
- def test_nextset(self):
- from warnings import warn
- con = self._connect()
- try:
- cur = con.cursor()
- if not hasattr(cur,'nextset'):
- return
-
- try:
- self.executeDDL1(cur)
- sql=self._populate()
- for sql in self._populate():
- cur.execute(sql)
-
- self.help_nextset_setUp(cur)
-
- cur.callproc('deleteme')
- numberofrows=cur.fetchone()
- assert numberofrows[0]== len(self.samples)
- assert cur.nextset()
- names=cur.fetchall()
- assert len(names) == len(self.samples)
- s=cur.nextset()
- if s:
- empty = cur.fetchall()
- self.assertEquals(len(empty), 0,
- "non-empty result set after other result sets")
- #warn("Incompatibility: MySQL returns an empty result set for the CALL itself",
- # Warning)
- #assert s == None,'No more return sets, should return None'
- finally:
- self.help_nextset_tearDown(cur)
-
- finally:
- con.close()
-
-
-if __name__ == '__main__':
- unittest.main()
+#!/usr/bin/env python
+import dbapi20
+import unittest
+import MySQLdb
+from configdb import connection_kwargs
+
+class test_MySQLdb(dbapi20.DatabaseAPI20Test):
+ driver = MySQLdb
+ connect_args = ()
+ connect_kw_args = connection_kwargs(dict(sql_mode="ANSI,STRICT_TRANS_TABLES,TRADITIONAL"))
+
+ def test_setoutputsize(self): pass
+ def test_setoutputsize_basic(self): pass
+ def test_nextset(self): pass
+
+ """The tests on fetchone and fetchall and rowcount bogusly
+ test for an exception if the statement cannot return a
+ result set. MySQL always returns a result set; it's just that
+ some things return empty result sets."""
+
+ def test_fetchall(self):
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ # cursor.fetchall should raise an Error if called
+ # without executing a query that may return rows (such
+ # as a select)
+ self.assertRaises(self.driver.Error, cur.fetchall)
+
+ self.executeDDL1(cur)
+ for sql in self._populate():
+ cur.execute(sql)
+
+ # cursor.fetchall should raise an Error if called
+ # after executing a a statement that cannot return rows
+## self.assertRaises(self.driver.Error,cur.fetchall)
+
+ cur.execute('select name from %sbooze' % self.table_prefix)
+ rows = cur.fetchall()
+ self.assertTrue(cur.rowcount in (-1,len(self.samples)))
+ self.assertEqual(len(rows),len(self.samples),
+ 'cursor.fetchall did not retrieve all rows'
+ )
+ rows = [r[0] for r in rows]
+ rows.sort()
+ for i in range(0,len(self.samples)):
+ self.assertEqual(rows[i],self.samples[i],
+ 'cursor.fetchall retrieved incorrect rows'
+ )
+ rows = cur.fetchall()
+ self.assertEqual(
+ len(rows),0,
+ 'cursor.fetchall should return an empty list if called '
+ 'after the whole result set has been fetched'
+ )
+ self.assertTrue(cur.rowcount in (-1,len(self.samples)))
+
+ self.executeDDL2(cur)
+ cur.execute('select name from %sbarflys' % self.table_prefix)
+ rows = cur.fetchall()
+ self.assertTrue(cur.rowcount in (-1,0))
+ self.assertEqual(len(rows),0,
+ 'cursor.fetchall should return an empty list if '
+ 'a select query returns no rows'
+ )
+
+ finally:
+ con.close()
+
+ def test_fetchone(self):
+ con = self._connect()
+ try:
+ cur = con.cursor()
+
+ # cursor.fetchone should raise an Error if called before
+ # executing a select-type query
+ self.assertRaises(self.driver.Error,cur.fetchone)
+
+ # cursor.fetchone should raise an Error if called after
+ # executing a query that cannnot return rows
+ self.executeDDL1(cur)
+## self.assertRaises(self.driver.Error,cur.fetchone)
+
+ cur.execute('select name from %sbooze' % self.table_prefix)
+ self.assertEqual(cur.fetchone(),None,
+ 'cursor.fetchone should return None if a query retrieves '
+ 'no rows'
+ )
+ self.assertTrue(cur.rowcount in (-1,0))
+
+ # cursor.fetchone should raise an Error if called after
+ # executing a query that cannnot return rows
+ cur.execute("insert into %sbooze values ('Victoria Bitter')" % (
+ self.table_prefix
+ ))
+## self.assertRaises(self.driver.Error,cur.fetchone)
+
+ cur.execute('select name from %sbooze' % self.table_prefix)
+ r = cur.fetchone()
+ self.assertEqual(len(r),1,
+ 'cursor.fetchone should have retrieved a single row'
+ )
+ self.assertEqual(r[0],'Victoria Bitter',
+ 'cursor.fetchone retrieved incorrect data'
+ )
+## self.assertEqual(cur.fetchone(),None,
+## 'cursor.fetchone should return None if no more rows available'
+## )
+ self.assertTrue(cur.rowcount in (-1,1))
+ finally:
+ con.close()
+
+ # Same complaint as for fetchall and fetchone
+ def test_rowcount(self):
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ self.executeDDL1(cur)
+## self.assertEqual(cur.rowcount,-1,
+## 'cursor.rowcount should be -1 after executing no-result '
+## 'statements'
+## )
+ cur.execute("insert into %sbooze values ('Victoria Bitter')" % (
+ self.table_prefix
+ ))
+## self.assertTrue(cur.rowcount in (-1,1),
+## 'cursor.rowcount should == number or rows inserted, or '
+## 'set to -1 after executing an insert statement'
+## )
+ cur.execute("select name from %sbooze" % self.table_prefix)
+ self.assertTrue(cur.rowcount in (-1,1),
+ 'cursor.rowcount should == number of rows returned, or '
+ 'set to -1 after executing a select statement'
+ )
+ self.executeDDL2(cur)
+## self.assertEqual(cur.rowcount,-1,
+## 'cursor.rowcount not being reset to -1 after executing '
+## 'no-result statements'
+## )
+ finally:
+ con.close()
+
+ def test_callproc(self):
+ pass # performed in test_MySQL_capabilities
+
+ def help_nextset_setUp(self,cur):
+ ''' Should create a procedure called deleteme
+ that returns two result sets, first the
+ number of rows in booze then "name from booze"
+ '''
+ sql="""
+ create procedure deleteme()
+ begin
+ select count(*) from %(tp)sbooze;
+ select name from %(tp)sbooze;
+ end
+ """ % dict(tp=self.table_prefix)
+ cur.execute(sql)
+
+ def help_nextset_tearDown(self,cur):
+ 'If cleaning up is needed after nextSetTest'
+ cur.execute("drop procedure deleteme")
+
+ def test_nextset(self):
+ from warnings import warn
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ if not hasattr(cur,'nextset'):
+ return
+
+ try:
+ self.executeDDL1(cur)
+ sql=self._populate()
+ for sql in self._populate():
+ cur.execute(sql)
+
+ self.help_nextset_setUp(cur)
+
+ cur.callproc('deleteme')
+ numberofrows=cur.fetchone()
+ assert numberofrows[0]== len(self.samples)
+ assert cur.nextset()
+ names=cur.fetchall()
+ assert len(names) == len(self.samples)
+ s=cur.nextset()
+ if s:
+ empty = cur.fetchall()
+ self.assertEquals(len(empty), 0,
+ "non-empty result set after other result sets")
+ #warn("Incompatibility: MySQL returns an empty result set for the CALL itself",
+ # Warning)
+ #assert s == None,'No more return sets, should return None'
+ finally:
+ self.help_nextset_tearDown(cur)
+
+ finally:
+ con.close()
+
+
+if __name__ == '__main__':
+ unittest.main()