summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorEvax Software <contact@evax.fr>2012-10-04 13:34:39 -0400
committerfarcepest <farcepest@gmail.com>2012-10-04 13:34:39 -0400
commit7a846cf5e35ebcad852309aebaa0bac8a7244744 (patch)
tree4fbd759ae4894cabcd590e15e61b535766a13ee2 /tests
parent0164ffeb2534f050148348e50a51641491b6a65d (diff)
downloadmysqldb1-7a846cf5e35ebcad852309aebaa0bac8a7244744.tar.gz
Allow pip install from git
Merged with modifications from https://github.com/evax/MySQLdb1/commit/a8152690101733904b16a32ff8467220ca07242c
Diffstat (limited to 'tests')
-rw-r--r--tests/capabilities.py281
-rw-r--r--tests/configdb.py25
-rw-r--r--tests/dbapi20.py853
-rw-r--r--tests/default.cnf10
-rw-r--r--tests/test_MySQLdb_capabilities.py102
-rw-r--r--tests/test_MySQLdb_dbapi20.py202
-rw-r--r--tests/test_MySQLdb_nonstandard.py86
-rw-r--r--tests/travis.cnf10
8 files changed, 1569 insertions, 0 deletions
diff --git a/tests/capabilities.py b/tests/capabilities.py
new file mode 100644
index 0000000..076361c
--- /dev/null
+++ b/tests/capabilities.py
@@ -0,0 +1,281 @@
+#!/usr/bin/env python -O
+""" Script to test database capabilities and the DB-API interface
+ for functionality and memory leaks.
+
+ Adapted from a script by M-A Lemburg.
+
+"""
+from time import time
+import array
+import unittest
+from configdb import connection_factory
+
+
+class DatabaseTest(unittest.TestCase):
+
+ db_module = None
+ connect_args = ()
+ connect_kwargs = dict()
+ create_table_extra = ''
+ rows = 10
+ debug = False
+
+ def setUp(self):
+ import gc
+ db = connection_factory(**self.connect_kwargs)
+ self.connection = db
+ self.cursor = db.cursor()
+ # TODO: this needs to be re-evaluated for Python 3
+ self.BLOBText = ''.join([chr(i) for i in range(256)] * 100);
+ self.BLOBUText = u''.join([unichr(i) for i in range(16384)])
+ self.BLOBBinary = self.db_module.Binary(''.join([chr(i) for i in range(256)] * 16))
+
+ leak_test = True
+
+ def tearDown(self):
+ if self.leak_test:
+ import gc
+ del self.cursor
+ orphans = gc.collect()
+ self.failIf(orphans, "%d orphaned objects found after deleting cursor" % orphans)
+
+ del self.connection
+ orphans = gc.collect()
+ self.failIf(orphans, "%d orphaned objects found after deleting connection" % orphans)
+
+ def table_exists(self, name):
+ try:
+ self.cursor.execute('select * from %s where 1=0' % name)
+ except:
+ return False
+ else:
+ return True
+
+ def quote_identifier(self, ident):
+ return '"%s"' % ident
+
+ def new_table_name(self):
+ i = id(self.cursor)
+ while True:
+ name = self.quote_identifier('tb%08x' % i)
+ if not self.table_exists(name):
+ return name
+ i = i + 1
+
+ def create_table(self, columndefs):
+
+ """ Create a table using a list of column definitions given in
+ columndefs.
+
+ generator must be a function taking arguments (row_number,
+ col_number) returning a suitable data object for insertion
+ into the table.
+
+ """
+ self.table = self.new_table_name()
+ self.cursor.execute('CREATE TABLE %s (%s) %s' %
+ (self.table,
+ ',\n'.join(columndefs),
+ self.create_table_extra))
+
+ def check_data_integrity(self, columndefs, generator):
+ # insert
+ self.create_table(columndefs)
+ insert_statement = ('INSERT INTO %s VALUES (%s)' %
+ (self.table,
+ ','.join(['%s'] * len(columndefs))))
+ data = [ [ generator(i,j) for j in range(len(columndefs)) ]
+ for i in range(self.rows) ]
+ self.cursor.executemany(insert_statement, data)
+ self.connection.commit()
+ # verify
+ self.cursor.execute('select * from %s' % self.table)
+ l = self.cursor.fetchall()
+ self.assertEquals(len(l), self.rows)
+ try:
+ for i in range(self.rows):
+ for j in range(len(columndefs)):
+ self.assertEquals(l[i][j], generator(i,j))
+ finally:
+ if not self.debug:
+ self.cursor.execute('drop table %s' % (self.table))
+
+ def test_transactions(self):
+ columndefs = ( 'col1 INT', 'col2 VARCHAR(255)')
+ def generator(row, col):
+ if col == 0: return row
+ else: return ('%i' % (row%10))*255
+ self.create_table(columndefs)
+ insert_statement = ('INSERT INTO %s VALUES (%s)' %
+ (self.table,
+ ','.join(['%s'] * len(columndefs))))
+ data = [ [ generator(i,j) for j in range(len(columndefs)) ]
+ for i in range(self.rows) ]
+ self.cursor.executemany(insert_statement, data)
+ # verify
+ self.connection.commit()
+ self.cursor.execute('select * from %s' % self.table)
+ l = self.cursor.fetchall()
+ self.assertEquals(len(l), self.rows)
+ for i in range(self.rows):
+ for j in range(len(columndefs)):
+ self.assertEquals(l[i][j], generator(i,j))
+ delete_statement = 'delete from %s where col1=%%s' % self.table
+ self.cursor.execute(delete_statement, (0,))
+ self.cursor.execute('select col1 from %s where col1=%s' % \
+ (self.table, 0))
+ l = self.cursor.fetchall()
+ self.assertFalse(l, "DELETE didn't work")
+ self.connection.rollback()
+ self.cursor.execute('select col1 from %s where col1=%s' % \
+ (self.table, 0))
+ l = self.cursor.fetchall()
+ self.assertTrue(len(l) == 1, "ROLLBACK didn't work")
+ self.cursor.execute('drop table %s' % (self.table))
+
+ def test_truncation(self):
+ columndefs = ( 'col1 INT', 'col2 VARCHAR(255)')
+ def generator(row, col):
+ if col == 0: return row
+ else: return ('%i' % (row%10))*((255-self.rows/2)+row)
+ self.create_table(columndefs)
+ insert_statement = ('INSERT INTO %s VALUES (%s)' %
+ (self.table,
+ ','.join(['%s'] * len(columndefs))))
+
+ try:
+ self.cursor.execute(insert_statement, (0, '0'*256))
+ except self.connection.DataError:
+ pass
+ else:
+ self.fail("Over-long column did not generate warnings/exception with single insert")
+
+ self.connection.rollback()
+
+ try:
+ for i in range(self.rows):
+ data = []
+ for j in range(len(columndefs)):
+ data.append(generator(i,j))
+ self.cursor.execute(insert_statement,tuple(data))
+ except self.connection.DataError:
+ pass
+ else:
+ self.fail("Over-long columns did not generate warnings/exception with execute()")
+
+ self.connection.rollback()
+
+ try:
+ data = [ [ generator(i,j) for j in range(len(columndefs)) ]
+ for i in range(self.rows) ]
+ self.cursor.executemany(insert_statement, data)
+ except self.connection.DataError:
+ pass
+ else:
+ self.fail("Over-long columns did not generate warnings/exception with executemany()")
+
+ self.connection.rollback()
+ self.cursor.execute('drop table %s' % (self.table))
+
+ def test_CHAR(self):
+ # Character data
+ def generator(row,col):
+ return ('%i' % ((row+col) % 10)) * 255
+ self.check_data_integrity(
+ ('col1 char(255)','col2 char(255)'),
+ generator)
+
+ def test_INT(self):
+ # Number data
+ def generator(row,col):
+ return row*row
+ self.check_data_integrity(
+ ('col1 INT',),
+ generator)
+
+ def test_DECIMAL(self):
+ # DECIMAL
+ def generator(row,col):
+ from decimal import Decimal
+ return Decimal("%d.%02d" % (row, col))
+ self.check_data_integrity(
+ ('col1 DECIMAL(5,2)',),
+ generator)
+
+ def test_DATE(self):
+ ticks = time()
+ def generator(row,col):
+ return self.db_module.DateFromTicks(ticks+row*86400-col*1313)
+ self.check_data_integrity(
+ ('col1 DATE',),
+ generator)
+
+ def test_TIME(self):
+ ticks = time()
+ def generator(row,col):
+ return self.db_module.TimeFromTicks(ticks+row*86400-col*1313)
+ self.check_data_integrity(
+ ('col1 TIME',),
+ generator)
+
+ def test_DATETIME(self):
+ ticks = time()
+ def generator(row,col):
+ return self.db_module.TimestampFromTicks(ticks+row*86400-col*1313)
+ self.check_data_integrity(
+ ('col1 DATETIME',),
+ generator)
+
+ def test_TIMESTAMP(self):
+ ticks = time()
+ def generator(row,col):
+ return self.db_module.TimestampFromTicks(ticks+row*86400-col*1313)
+ self.check_data_integrity(
+ ('col1 TIMESTAMP',),
+ generator)
+
+ def test_fractional_TIMESTAMP(self):
+ ticks = time()
+ def generator(row,col):
+ return self.db_module.TimestampFromTicks(ticks+row*86400-col*1313+row*0.7*col/3.0)
+ self.check_data_integrity(
+ ('col1 TIMESTAMP',),
+ generator)
+
+ def test_LONG(self):
+ def generator(row,col):
+ if col == 0:
+ return row
+ else:
+ return self.BLOBUText # 'BLOB Text ' * 1024
+ self.check_data_integrity(
+ ('col1 INT','col2 LONG'),
+ generator)
+
+ def test_TEXT(self):
+ def generator(row,col):
+ return self.BLOBUText # 'BLOB Text ' * 1024
+ self.check_data_integrity(
+ ('col2 TEXT',),
+ generator)
+
+ def test_LONG_BYTE(self):
+ def generator(row,col):
+ if col == 0:
+ return row
+ else:
+ return self.BLOBBinary # 'BLOB\000Binary ' * 1024
+ self.check_data_integrity(
+ ('col1 INT','col2 LONG BYTE'),
+ generator)
+
+ def test_BLOB(self):
+ def generator(row,col):
+ if col == 0:
+ return row
+ else:
+ return self.BLOBBinary # 'BLOB\000Binary ' * 1024
+ self.check_data_integrity(
+ ('col1 INT','col2 BLOB'),
+ generator)
+
diff --git a/tests/configdb.py b/tests/configdb.py
new file mode 100644
index 0000000..cd6d43d
--- /dev/null
+++ b/tests/configdb.py
@@ -0,0 +1,25 @@
+"""Configure database connection for tests."""
+
+from os import environ, path
+
+tests_path = path.dirname(__file__)
+conf_file = environ.get('TESTDB', 'default.cnf')
+conf_path = path.join(tests_path, conf_file)
+connect_kwargs = dict(
+ read_default_file = conf_path,
+ read_default_group = "MySQLdb-tests",
+)
+
+def connection_kwargs(kwargs):
+ db_kwargs = connect_kwargs.copy()
+ db_kwargs.update(kwargs)
+ return db_kwargs
+
+def connection_factory(**kwargs):
+ import MySQLdb
+ db_kwargs = connection_kwargs(kwargs)
+ db = MySQLdb.connect(**db_kwargs)
+ return db
+
+
+
diff --git a/tests/dbapi20.py b/tests/dbapi20.py
new file mode 100644
index 0000000..ad292ae
--- /dev/null
+++ b/tests/dbapi20.py
@@ -0,0 +1,853 @@
+#!/usr/bin/env python
+''' Python DB API 2.0 driver compliance unit test suite.
+
+ This software is Public Domain and may be used without restrictions.
+
+ "Now we have booze and barflies entering the discussion, plus rumours of
+ DBAs on drugs... and I won't tell you what flashes through my mind each
+ time I read the subject line with 'Anal Compliance' in it. All around
+ this is turning out to be a thoroughly unwholesome unit test."
+
+ -- Ian Bicking
+'''
+
+__rcs_id__ = '$Id$'
+__version__ = '$Revision$'[11:-2]
+__author__ = 'Stuart Bishop <zen@shangri-la.dropbear.id.au>'
+
+import unittest
+import time
+
+# $Log$
+# Revision 1.1.2.1 2006/02/25 03:44:32 adustman
+# Generic DB-API unit test module
+#
+# Revision 1.10 2003/10/09 03:14:14 zenzen
+# Add test for DB API 2.0 optional extension, where database exceptions
+# are exposed as attributes on the Connection object.
+#
+# Revision 1.9 2003/08/13 01:16:36 zenzen
+# Minor tweak from Stefan Fleiter
+#
+# Revision 1.8 2003/04/10 00:13:25 zenzen
+# Changes, as per suggestions by M.-A. Lemburg
+# - Add a table prefix, to ensure namespace collisions can always be avoided
+#
+# Revision 1.7 2003/02/26 23:33:37 zenzen
+# Break out DDL into helper functions, as per request by David Rushby
+#
+# Revision 1.6 2003/02/21 03:04:33 zenzen
+# Stuff from Henrik Ekelund:
+# added test_None
+# added test_nextset & hooks
+#
+# Revision 1.5 2003/02/17 22:08:43 zenzen
+# Implement suggestions and code from Henrik Eklund - test that cursor.arraysize
+# defaults to 1 & generic cursor.callproc test added
+#
+# Revision 1.4 2003/02/15 00:16:33 zenzen
+# Changes, as per suggestions and bug reports by M.-A. Lemburg,
+# Matthew T. Kromer, Federico Di Gregorio and Daniel Dittmar
+# - Class renamed
+# - Now a subclass of TestCase, to avoid requiring the driver stub
+# to use multiple inheritance
+# - Reversed the polarity of buggy test in test_description
+# - Test exception heirarchy correctly
+# - self.populate is now self._populate(), so if a driver stub
+# overrides self.ddl1 this change propogates
+# - VARCHAR columns now have a width, which will hopefully make the
+# DDL even more portible (this will be reversed if it causes more problems)
+# - cursor.rowcount being checked after various execute and fetchXXX methods
+# - Check for fetchall and fetchmany returning empty lists after results
+# are exhausted (already checking for empty lists if select retrieved
+# nothing
+# - Fix bugs in test_setoutputsize_basic and test_setinputsizes
+#
+
+class DatabaseAPI20Test(unittest.TestCase):
+ ''' Test a database self.driver for DB API 2.0 compatibility.
+ This implementation tests Gadfly, but the TestCase
+ is structured so that other self.drivers can subclass this
+ test case to ensure compiliance with the DB-API. It is
+ expected that this TestCase may be expanded in the future
+ if ambiguities or edge conditions are discovered.
+
+ The 'Optional Extensions' are not yet being tested.
+
+ self.drivers should subclass this test, overriding setUp, tearDown,
+ self.driver, connect_args and connect_kw_args. Class specification
+ should be as follows:
+
+ import dbapi20
+ class mytest(dbapi20.DatabaseAPI20Test):
+ [...]
+
+ Don't 'import DatabaseAPI20Test from dbapi20', or you will
+ confuse the unit tester - just 'import dbapi20'.
+ '''
+
+ # The self.driver module. This should be the module where the 'connect'
+ # method is to be found
+ driver = None
+ connect_args = () # List of arguments to pass to connect
+ connect_kw_args = {} # Keyword arguments for connect
+ table_prefix = 'dbapi20test_' # If you need to specify a prefix for tables
+
+ ddl1 = 'create table %sbooze (name varchar(20))' % table_prefix
+ ddl2 = 'create table %sbarflys (name varchar(20))' % table_prefix
+ xddl1 = 'drop table %sbooze' % table_prefix
+ xddl2 = 'drop table %sbarflys' % table_prefix
+
+ lowerfunc = 'lower' # Name of stored procedure to convert string->lowercase
+
+ # Some drivers may need to override these helpers, for example adding
+ # a 'commit' after the execute.
+ def executeDDL1(self,cursor):
+ cursor.execute(self.ddl1)
+
+ def executeDDL2(self,cursor):
+ cursor.execute(self.ddl2)
+
+ def setUp(self):
+ ''' self.drivers should override this method to perform required setup
+ if any is necessary, such as creating the database.
+ '''
+ pass
+
+ def tearDown(self):
+ ''' self.drivers should override this method to perform required cleanup
+ if any is necessary, such as deleting the test database.
+ The default drops the tables that may be created.
+ '''
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ for ddl in (self.xddl1,self.xddl2):
+ try:
+ cur.execute(ddl)
+ con.commit()
+ except self.driver.Error:
+ # Assume table didn't exist. Other tests will check if
+ # execute is busted.
+ pass
+ finally:
+ con.close()
+
+ def _connect(self):
+ try:
+ return self.driver.connect(
+ *self.connect_args,**self.connect_kw_args
+ )
+ except AttributeError:
+ self.fail("No connect method found in self.driver module")
+
+ def test_connect(self):
+ con = self._connect()
+ con.close()
+
+ def test_apilevel(self):
+ try:
+ # Must exist
+ apilevel = self.driver.apilevel
+ # Must equal 2.0
+ self.assertEqual(apilevel,'2.0')
+ except AttributeError:
+ self.fail("Driver doesn't define apilevel")
+
+ def test_threadsafety(self):
+ try:
+ # Must exist
+ threadsafety = self.driver.threadsafety
+ # Must be a valid value
+ self.assertTrue(threadsafety in (0,1,2,3))
+ except AttributeError:
+ self.fail("Driver doesn't define threadsafety")
+
+ def test_paramstyle(self):
+ try:
+ # Must exist
+ paramstyle = self.driver.paramstyle
+ # Must be a valid value
+ self.assertTrue(paramstyle in (
+ 'qmark','numeric','named','format','pyformat'
+ ))
+ except AttributeError:
+ self.fail("Driver doesn't define paramstyle")
+
+ def test_Exceptions(self):
+ # Make sure required exceptions exist, and are in the
+ # defined heirarchy.
+ self.assertTrue(issubclass(self.driver.Warning,StandardError))
+ self.assertTrue(issubclass(self.driver.Error,StandardError))
+ self.assertTrue(
+ issubclass(self.driver.InterfaceError,self.driver.Error)
+ )
+ self.assertTrue(
+ issubclass(self.driver.DatabaseError,self.driver.Error)
+ )
+ self.assertTrue(
+ issubclass(self.driver.OperationalError,self.driver.Error)
+ )
+ self.assertTrue(
+ issubclass(self.driver.IntegrityError,self.driver.Error)
+ )
+ self.assertTrue(
+ issubclass(self.driver.InternalError,self.driver.Error)
+ )
+ self.assertTrue(
+ issubclass(self.driver.ProgrammingError,self.driver.Error)
+ )
+ self.assertTrue(
+ issubclass(self.driver.NotSupportedError,self.driver.Error)
+ )
+
+ def test_ExceptionsAsConnectionAttributes(self):
+ # OPTIONAL EXTENSION
+ # Test for the optional DB API 2.0 extension, where the exceptions
+ # are exposed as attributes on the Connection object
+ # I figure this optional extension will be implemented by any
+ # driver author who is using this test suite, so it is enabled
+ # by default.
+ con = self._connect()
+ drv = self.driver
+ self.assertTrue(con.Warning is drv.Warning)
+ self.assertTrue(con.Error is drv.Error)
+ self.assertTrue(con.InterfaceError is drv.InterfaceError)
+ self.assertTrue(con.DatabaseError is drv.DatabaseError)
+ self.assertTrue(con.OperationalError is drv.OperationalError)
+ self.assertTrue(con.IntegrityError is drv.IntegrityError)
+ self.assertTrue(con.InternalError is drv.InternalError)
+ self.assertTrue(con.ProgrammingError is drv.ProgrammingError)
+ self.assertTrue(con.NotSupportedError is drv.NotSupportedError)
+
+
+ def test_commit(self):
+ con = self._connect()
+ try:
+ # Commit must work, even if it doesn't do anything
+ con.commit()
+ finally:
+ con.close()
+
+ def test_rollback(self):
+ con = self._connect()
+ # If rollback is defined, it should either work or throw
+ # the documented exception
+ if hasattr(con,'rollback'):
+ try:
+ con.rollback()
+ except self.driver.NotSupportedError:
+ pass
+
+ def test_cursor(self):
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ finally:
+ con.close()
+
+ def test_cursor_isolation(self):
+ con = self._connect()
+ try:
+ # Make sure cursors created from the same connection have
+ # the documented transaction isolation level
+ cur1 = con.cursor()
+ cur2 = con.cursor()
+ self.executeDDL1(cur1)
+ cur1.execute("insert into %sbooze values ('Victoria Bitter')" % (
+ self.table_prefix
+ ))
+ cur2.execute("select name from %sbooze" % self.table_prefix)
+ booze = cur2.fetchall()
+ self.assertEqual(len(booze),1)
+ self.assertEqual(len(booze[0]),1)
+ self.assertEqual(booze[0][0],'Victoria Bitter')
+ finally:
+ con.close()
+
+ def test_description(self):
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ self.executeDDL1(cur)
+ self.assertEqual(cur.description,None,
+ 'cursor.description should be none after executing a '
+ 'statement that can return no rows (such as DDL)'
+ )
+ cur.execute('select name from %sbooze' % self.table_prefix)
+ self.assertEqual(len(cur.description),1,
+ 'cursor.description describes too many columns'
+ )
+ self.assertEqual(len(cur.description[0]),7,
+ 'cursor.description[x] tuples must have 7 elements'
+ )
+ self.assertEqual(cur.description[0][0].lower(),'name',
+ 'cursor.description[x][0] must return column name'
+ )
+ self.assertEqual(cur.description[0][1],self.driver.STRING,
+ 'cursor.description[x][1] must return column type. Got %r'
+ % cur.description[0][1]
+ )
+
+ # Make sure self.description gets reset
+ self.executeDDL2(cur)
+ self.assertEqual(cur.description,None,
+ 'cursor.description not being set to None when executing '
+ 'no-result statements (eg. DDL)'
+ )
+ finally:
+ con.close()
+
+ def test_rowcount(self):
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ self.executeDDL1(cur)
+ self.assertEqual(cur.rowcount,-1,
+ 'cursor.rowcount should be -1 after executing no-result '
+ 'statements'
+ )
+ cur.execute("insert into %sbooze values ('Victoria Bitter')" % (
+ self.table_prefix
+ ))
+ self.assertTrue(cur.rowcount in (-1,1),
+ 'cursor.rowcount should == number or rows inserted, or '
+ 'set to -1 after executing an insert statement'
+ )
+ cur.execute("select name from %sbooze" % self.table_prefix)
+ self.assertTrue(cur.rowcount in (-1,1),
+ 'cursor.rowcount should == number of rows returned, or '
+ 'set to -1 after executing a select statement'
+ )
+ self.executeDDL2(cur)
+ self.assertEqual(cur.rowcount,-1,
+ 'cursor.rowcount not being reset to -1 after executing '
+ 'no-result statements'
+ )
+ finally:
+ con.close()
+
+ lower_func = 'lower'
+ def test_callproc(self):
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ if self.lower_func and hasattr(cur,'callproc'):
+ r = cur.callproc(self.lower_func,('FOO',))
+ self.assertEqual(len(r),1)
+ self.assertEqual(r[0],'FOO')
+ r = cur.fetchall()
+ self.assertEqual(len(r),1,'callproc produced no result set')
+ self.assertEqual(len(r[0]),1,
+ 'callproc produced invalid result set'
+ )
+ self.assertEqual(r[0][0],'foo',
+ 'callproc produced invalid results'
+ )
+ finally:
+ con.close()
+
+ def test_close(self):
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ finally:
+ con.close()
+
+ # cursor.execute should raise an Error if called after connection
+ # closed
+ self.assertRaises(self.driver.Error,self.executeDDL1,cur)
+
+ # connection.commit should raise an Error if called after connection'
+ # closed.'
+ self.assertRaises(self.driver.Error,con.commit)
+
+ # connection.close should raise an Error if called more than once
+ self.assertRaises(self.driver.Error,con.close)
+
+ def test_execute(self):
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ self._paraminsert(cur)
+ finally:
+ con.close()
+
+ def _paraminsert(self,cur):
+ self.executeDDL1(cur)
+ cur.execute("insert into %sbooze values ('Victoria Bitter')" % (
+ self.table_prefix
+ ))
+ self.assertTrue(cur.rowcount in (-1,1))
+
+ if self.driver.paramstyle == 'qmark':
+ cur.execute(
+ 'insert into %sbooze values (?)' % self.table_prefix,
+ ("Cooper's",)
+ )
+ elif self.driver.paramstyle == 'numeric':
+ cur.execute(
+ 'insert into %sbooze values (:1)' % self.table_prefix,
+ ("Cooper's",)
+ )
+ elif self.driver.paramstyle == 'named':
+ cur.execute(
+ 'insert into %sbooze values (:beer)' % self.table_prefix,
+ {'beer':"Cooper's"}
+ )
+ elif self.driver.paramstyle == 'format':
+ cur.execute(
+ 'insert into %sbooze values (%%s)' % self.table_prefix,
+ ("Cooper's",)
+ )
+ elif self.driver.paramstyle == 'pyformat':
+ cur.execute(
+ 'insert into %sbooze values (%%(beer)s)' % self.table_prefix,
+ {'beer':"Cooper's"}
+ )
+ else:
+ self.fail('Invalid paramstyle')
+ self.assertTrue(cur.rowcount in (-1,1))
+
+ cur.execute('select name from %sbooze' % self.table_prefix)
+ res = cur.fetchall()
+ self.assertEqual(len(res),2,'cursor.fetchall returned too few rows')
+ beers = [res[0][0],res[1][0]]
+ beers.sort()
+ self.assertEqual(beers[0],"Cooper's",
+ 'cursor.fetchall retrieved incorrect data, or data inserted '
+ 'incorrectly'
+ )
+ self.assertEqual(beers[1],"Victoria Bitter",
+ 'cursor.fetchall retrieved incorrect data, or data inserted '
+ 'incorrectly'
+ )
+
+ def test_executemany(self):
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ self.executeDDL1(cur)
+ largs = [ ("Cooper's",) , ("Boag's",) ]
+ margs = [ {'beer': "Cooper's"}, {'beer': "Boag's"} ]
+ if self.driver.paramstyle == 'qmark':
+ cur.executemany(
+ 'insert into %sbooze values (?)' % self.table_prefix,
+ largs
+ )
+ elif self.driver.paramstyle == 'numeric':
+ cur.executemany(
+ 'insert into %sbooze values (:1)' % self.table_prefix,
+ largs
+ )
+ elif self.driver.paramstyle == 'named':
+ cur.executemany(
+ 'insert into %sbooze values (:beer)' % self.table_prefix,
+ margs
+ )
+ elif self.driver.paramstyle == 'format':
+ cur.executemany(
+ 'insert into %sbooze values (%%s)' % self.table_prefix,
+ largs
+ )
+ elif self.driver.paramstyle == 'pyformat':
+ cur.executemany(
+ 'insert into %sbooze values (%%(beer)s)' % (
+ self.table_prefix
+ ),
+ margs
+ )
+ else:
+ self.fail('Unknown paramstyle')
+ self.assertTrue(cur.rowcount in (-1,2),
+ 'insert using cursor.executemany set cursor.rowcount to '
+ 'incorrect value %r' % cur.rowcount
+ )
+ cur.execute('select name from %sbooze' % self.table_prefix)
+ res = cur.fetchall()
+ self.assertEqual(len(res),2,
+ 'cursor.fetchall retrieved incorrect number of rows'
+ )
+ beers = [res[0][0],res[1][0]]
+ beers.sort()
+ self.assertEqual(beers[0],"Boag's",'incorrect data retrieved')
+ self.assertEqual(beers[1],"Cooper's",'incorrect data retrieved')
+ finally:
+ con.close()
+
+ def test_fetchone(self):
+ con = self._connect()
+ try:
+ cur = con.cursor()
+
+ # cursor.fetchone should raise an Error if called before
+ # executing a select-type query
+ self.assertRaises(self.driver.Error,cur.fetchone)
+
+ # cursor.fetchone should raise an Error if called after
+ # executing a query that cannnot return rows
+ self.executeDDL1(cur)
+ self.assertRaises(self.driver.Error,cur.fetchone)
+
+ cur.execute('select name from %sbooze' % self.table_prefix)
+ self.assertEqual(cur.fetchone(),None,
+ 'cursor.fetchone should return None if a query retrieves '
+ 'no rows'
+ )
+ self.assertTrue(cur.rowcount in (-1,0))
+
+ # cursor.fetchone should raise an Error if called after
+ # executing a query that cannnot return rows
+ cur.execute("insert into %sbooze values ('Victoria Bitter')" % (
+ self.table_prefix
+ ))
+ self.assertRaises(self.driver.Error,cur.fetchone)
+
+ cur.execute('select name from %sbooze' % self.table_prefix)
+ r = cur.fetchone()
+ self.assertEqual(len(r),1,
+ 'cursor.fetchone should have retrieved a single row'
+ )
+ self.assertEqual(r[0],'Victoria Bitter',
+ 'cursor.fetchone retrieved incorrect data'
+ )
+ self.assertEqual(cur.fetchone(),None,
+ 'cursor.fetchone should return None if no more rows available'
+ )
+ self.assertTrue(cur.rowcount in (-1,1))
+ finally:
+ con.close()
+
+ samples = [
+ 'Carlton Cold',
+ 'Carlton Draft',
+ 'Mountain Goat',
+ 'Redback',
+ 'Victoria Bitter',
+ 'XXXX'
+ ]
+
+ def _populate(self):
+ ''' Return a list of sql commands to setup the DB for the fetch
+ tests.
+ '''
+ populate = [
+ "insert into %sbooze values ('%s')" % (self.table_prefix,s)
+ for s in self.samples
+ ]
+ return populate
+
+ def test_fetchmany(self):
+ con = self._connect()
+ try:
+ cur = con.cursor()
+
+ # cursor.fetchmany should raise an Error if called without
+ #issuing a query
+ self.assertRaises(self.driver.Error,cur.fetchmany,4)
+
+ self.executeDDL1(cur)
+ for sql in self._populate():
+ cur.execute(sql)
+
+ cur.execute('select name from %sbooze' % self.table_prefix)
+ r = cur.fetchmany()
+ self.assertEqual(len(r),1,
+ 'cursor.fetchmany retrieved incorrect number of rows, '
+ 'default of arraysize is one.'
+ )
+ cur.arraysize=10
+ r = cur.fetchmany(3) # Should get 3 rows
+ self.assertEqual(len(r),3,
+ 'cursor.fetchmany retrieved incorrect number of rows'
+ )
+ r = cur.fetchmany(4) # Should get 2 more
+ self.assertEqual(len(r),2,
+ 'cursor.fetchmany retrieved incorrect number of rows'
+ )
+ r = cur.fetchmany(4) # Should be an empty sequence
+ self.assertEqual(len(r),0,
+ 'cursor.fetchmany should return an empty sequence after '
+ 'results are exhausted'
+ )
+ self.assertTrue(cur.rowcount in (-1,6))
+
+ # Same as above, using cursor.arraysize
+ cur.arraysize=4
+ cur.execute('select name from %sbooze' % self.table_prefix)
+ r = cur.fetchmany() # Should get 4 rows
+ self.assertEqual(len(r),4,
+ 'cursor.arraysize not being honoured by fetchmany'
+ )
+ r = cur.fetchmany() # Should get 2 more
+ self.assertEqual(len(r),2)
+ r = cur.fetchmany() # Should be an empty sequence
+ self.assertEqual(len(r),0)
+ self.assertTrue(cur.rowcount in (-1,6))
+
+ cur.arraysize=6
+ cur.execute('select name from %sbooze' % self.table_prefix)
+ rows = cur.fetchmany() # Should get all rows
+ self.assertTrue(cur.rowcount in (-1,6))
+ self.assertEqual(len(rows),6)
+ self.assertEqual(len(rows),6)
+ rows = [r[0] for r in rows]
+ rows.sort()
+
+ # Make sure we get the right data back out
+ for i in range(0,6):
+ self.assertEqual(rows[i],self.samples[i],
+ 'incorrect data retrieved by cursor.fetchmany'
+ )
+
+ rows = cur.fetchmany() # Should return an empty list
+ self.assertEqual(len(rows),0,
+ 'cursor.fetchmany should return an empty sequence if '
+ 'called after the whole result set has been fetched'
+ )
+ self.assertTrue(cur.rowcount in (-1,6))
+
+ self.executeDDL2(cur)
+ cur.execute('select name from %sbarflys' % self.table_prefix)
+ r = cur.fetchmany() # Should get empty sequence
+ self.assertEqual(len(r),0,
+ 'cursor.fetchmany should return an empty sequence if '
+ 'query retrieved no rows'
+ )
+ self.assertTrue(cur.rowcount in (-1,0))
+
+ finally:
+ con.close()
+
+ def test_fetchall(self):
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ # cursor.fetchall should raise an Error if called
+ # without executing a query that may return rows (such
+ # as a select)
+ self.assertRaises(self.driver.Error, cur.fetchall)
+
+ self.executeDDL1(cur)
+ for sql in self._populate():
+ cur.execute(sql)
+
+ # cursor.fetchall should raise an Error if called
+ # after executing a a statement that cannot return rows
+ self.assertRaises(self.driver.Error,cur.fetchall)
+
+ cur.execute('select name from %sbooze' % self.table_prefix)
+ rows = cur.fetchall()
+ self.assertTrue(cur.rowcount in (-1,len(self.samples)))
+ self.assertEqual(len(rows),len(self.samples),
+ 'cursor.fetchall did not retrieve all rows'
+ )
+ rows = [r[0] for r in rows]
+ rows.sort()
+ for i in range(0,len(self.samples)):
+ self.assertEqual(rows[i],self.samples[i],
+ 'cursor.fetchall retrieved incorrect rows'
+ )
+ rows = cur.fetchall()
+ self.assertEqual(
+ len(rows),0,
+ 'cursor.fetchall should return an empty list if called '
+ 'after the whole result set has been fetched'
+ )
+ self.assertTrue(cur.rowcount in (-1,len(self.samples)))
+
+ self.executeDDL2(cur)
+ cur.execute('select name from %sbarflys' % self.table_prefix)
+ rows = cur.fetchall()
+ self.assertTrue(cur.rowcount in (-1,0))
+ self.assertEqual(len(rows),0,
+ 'cursor.fetchall should return an empty list if '
+ 'a select query returns no rows'
+ )
+
+ finally:
+ con.close()
+
+ def test_mixedfetch(self):
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ self.executeDDL1(cur)
+ for sql in self._populate():
+ cur.execute(sql)
+
+ cur.execute('select name from %sbooze' % self.table_prefix)
+ rows1 = cur.fetchone()
+ rows23 = cur.fetchmany(2)
+ rows4 = cur.fetchone()
+ rows56 = cur.fetchall()
+ self.assertTrue(cur.rowcount in (-1,6))
+ self.assertEqual(len(rows23),2,
+ 'fetchmany returned incorrect number of rows'
+ )
+ self.assertEqual(len(rows56),2,
+ 'fetchall returned incorrect number of rows'
+ )
+
+ rows = [rows1[0]]
+ rows.extend([rows23[0][0],rows23[1][0]])
+ rows.append(rows4[0])
+ rows.extend([rows56[0][0],rows56[1][0]])
+ rows.sort()
+ for i in range(0,len(self.samples)):
+ self.assertEqual(rows[i],self.samples[i],
+ 'incorrect data retrieved or inserted'
+ )
+ finally:
+ con.close()
+
+ def help_nextset_setUp(self,cur):
+ ''' Should create a procedure called deleteme
+ that returns two result sets, first the
+ number of rows in booze then "name from booze"
+ '''
+ raise NotImplementedError('Helper not implemented')
+ #sql="""
+ # create procedure deleteme as
+ # begin
+ # select count(*) from booze
+ # select name from booze
+ # end
+ #"""
+ #cur.execute(sql)
+
+ def help_nextset_tearDown(self,cur):
+ 'If cleaning up is needed after nextSetTest'
+ raise NotImplementedError('Helper not implemented')
+ #cur.execute("drop procedure deleteme")
+
+ def test_nextset(self):
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ if not hasattr(cur,'nextset'):
+ return
+
+ try:
+ self.executeDDL1(cur)
+ sql=self._populate()
+ for sql in self._populate():
+ cur.execute(sql)
+
+ self.help_nextset_setUp(cur)
+
+ cur.callproc('deleteme')
+ numberofrows=cur.fetchone()
+ assert numberofrows[0]== len(self.samples)
+ assert cur.nextset()
+ names=cur.fetchall()
+ assert len(names) == len(self.samples)
+ s=cur.nextset()
+ assert s == None,'No more return sets, should return None'
+ finally:
+ self.help_nextset_tearDown(cur)
+
+ finally:
+ con.close()
+
+ def test_nextset(self):
+ raise NotImplementedError('Drivers need to override this test')
+
+ def test_arraysize(self):
+ # Not much here - rest of the tests for this are in test_fetchmany
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ self.assertTrue(hasattr(cur,'arraysize'),
+ 'cursor.arraysize must be defined'
+ )
+ finally:
+ con.close()
+
+ def test_setinputsizes(self):
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ cur.setinputsizes( (25,) )
+ self._paraminsert(cur) # Make sure cursor still works
+ finally:
+ con.close()
+
+ def test_setoutputsize_basic(self):
+ # Basic test is to make sure setoutputsize doesn't blow up
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ cur.setoutputsize(1000)
+ cur.setoutputsize(2000,0)
+ self._paraminsert(cur) # Make sure the cursor still works
+ finally:
+ con.close()
+
+ def test_setoutputsize(self):
+ # Real test for setoutputsize is driver dependant
+ raise NotImplementedError('Driver need to override this test')
+
+ def test_None(self):
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ self.executeDDL1(cur)
+ cur.execute('insert into %sbooze values (NULL)' % self.table_prefix)
+ cur.execute('select name from %sbooze' % self.table_prefix)
+ r = cur.fetchall()
+ self.assertEqual(len(r),1)
+ self.assertEqual(len(r[0]),1)
+ self.assertEqual(r[0][0],None,'NULL value not returned as None')
+ finally:
+ con.close()
+
+ def test_Date(self):
+ d1 = self.driver.Date(2002,12,25)
+ d2 = self.driver.DateFromTicks(time.mktime((2002,12,25,0,0,0,0,0,0)))
+ # Can we assume this? API doesn't specify, but it seems implied
+ # self.assertEqual(str(d1),str(d2))
+
+ def test_Time(self):
+ t1 = self.driver.Time(13,45,30)
+ t2 = self.driver.TimeFromTicks(time.mktime((2001,1,1,13,45,30,0,0,0)))
+ # Can we assume this? API doesn't specify, but it seems implied
+ # self.assertEqual(str(t1),str(t2))
+
+ def test_Timestamp(self):
+ t1 = self.driver.Timestamp(2002,12,25,13,45,30)
+ t2 = self.driver.TimestampFromTicks(
+ time.mktime((2002,12,25,13,45,30,0,0,0))
+ )
+ # Can we assume this? API doesn't specify, but it seems implied
+ # self.assertEqual(str(t1),str(t2))
+
+ def test_Binary(self):
+ b = self.driver.Binary('Something')
+ b = self.driver.Binary('')
+
+ def test_STRING(self):
+ self.assertTrue(hasattr(self.driver,'STRING'),
+ 'module.STRING must be defined'
+ )
+
+ def test_BINARY(self):
+ self.assertTrue(hasattr(self.driver,'BINARY'),
+ 'module.BINARY must be defined.'
+ )
+
+ def test_NUMBER(self):
+ self.assertTrue(hasattr(self.driver,'NUMBER'),
+ 'module.NUMBER must be defined.'
+ )
+
+ def test_DATETIME(self):
+ self.assertTrue(hasattr(self.driver,'DATETIME'),
+ 'module.DATETIME must be defined.'
+ )
+
+ def test_ROWID(self):
+ self.assertTrue(hasattr(self.driver,'ROWID'),
+ 'module.ROWID must be defined.'
+ )
+
diff --git a/tests/default.cnf b/tests/default.cnf
new file mode 100644
index 0000000..2aeda7c
--- /dev/null
+++ b/tests/default.cnf
@@ -0,0 +1,10 @@
+# To create your own custom version of this file, read
+# http://dev.mysql.com/doc/refman/5.1/en/option-files.html
+# and set TESTDB in your environment to the name of the file
+
+[MySQLdb-tests]
+host = 127.0.0.1
+user = test
+database = test
+#password =
+default-character-set = utf8
diff --git a/tests/test_MySQLdb_capabilities.py b/tests/test_MySQLdb_capabilities.py
new file mode 100644
index 0000000..60bbfad
--- /dev/null
+++ b/tests/test_MySQLdb_capabilities.py
@@ -0,0 +1,102 @@
+#!/usr/bin/env python
+import capabilities
+import unittest
+import MySQLdb
+import warnings
+
+warnings.filterwarnings('error')
+
+class test_MySQLdb(capabilities.DatabaseTest):
+
+ db_module = MySQLdb
+ connect_args = ()
+ connect_kwargs = dict(use_unicode=True, sql_mode="ANSI,STRICT_TRANS_TABLES,TRADITIONAL")
+ create_table_extra = "ENGINE=INNODB CHARACTER SET UTF8"
+ leak_test = False
+
+ def quote_identifier(self, ident):
+ return "`%s`" % ident
+
+ def test_TIME(self):
+ from datetime import timedelta
+ def generator(row,col):
+ return timedelta(0, row*8000)
+ self.check_data_integrity(
+ ('col1 TIME',),
+ generator)
+
+ def test_TINYINT(self):
+ # Number data
+ def generator(row,col):
+ v = (row*row) % 256
+ if v > 127:
+ v = v-256
+ return v
+ self.check_data_integrity(
+ ('col1 TINYINT',),
+ generator)
+
+ def test_stored_procedures(self):
+ db = self.connection
+ c = self.cursor
+ self.create_table(('pos INT', 'tree CHAR(20)'))
+ c.executemany("INSERT INTO %s (pos,tree) VALUES (%%s,%%s)" % self.table,
+ list(enumerate('ash birch cedar larch pine'.split())))
+ db.commit()
+
+ c.execute("""
+ CREATE PROCEDURE test_sp(IN t VARCHAR(255))
+ BEGIN
+ SELECT pos FROM %s WHERE tree = t;
+ END
+ """ % self.table)
+ db.commit()
+
+ c.callproc('test_sp', ('larch',))
+ rows = c.fetchall()
+ self.assertEquals(len(rows), 1)
+ self.assertEquals(rows[0][0], 3)
+ c.nextset()
+
+ c.execute("DROP PROCEDURE test_sp")
+ c.execute('drop table %s' % (self.table))
+
+ def test_small_CHAR(self):
+ # Character data
+ def generator(row,col):
+ i = (row*col+62)%256
+ if i == 62: return ''
+ if i == 63: return None
+ return chr(i)
+ self.check_data_integrity(
+ ('col1 char(1)','col2 char(1)'),
+ generator)
+
+ def test_bug_2671682(self):
+ from MySQLdb.constants import ER
+ try:
+ self.cursor.execute("describe some_non_existent_table");
+ except self.connection.ProgrammingError, msg:
+ self.assertTrue(msg[0] == ER.NO_SUCH_TABLE)
+
+ def test_bug_3514287(self):
+ c = self.cursor
+ try:
+ c.execute("""create table bug_3541287 (
+ c1 CHAR(10),
+ t1 TIMESTAMP)""")
+ c.execute("insert into bug_3541287 (c1,t1) values (%s, NOW())",
+ ("blah",))
+ finally:
+ c.execute("drop table if exists bug_3541287")
+
+ def test_ping(self):
+ self.connection.ping()
+
+
+if __name__ == '__main__':
+ if test_MySQLdb.leak_test:
+ import gc
+ gc.enable()
+ gc.set_debug(gc.DEBUG_LEAK)
+ unittest.main()
diff --git a/tests/test_MySQLdb_dbapi20.py b/tests/test_MySQLdb_dbapi20.py
new file mode 100644
index 0000000..44830e0
--- /dev/null
+++ b/tests/test_MySQLdb_dbapi20.py
@@ -0,0 +1,202 @@
+#!/usr/bin/env python
+import dbapi20
+import unittest
+import MySQLdb
+from configdb import connection_kwargs
+
+class test_MySQLdb(dbapi20.DatabaseAPI20Test):
+ driver = MySQLdb
+ connect_args = ()
+ connect_kw_args = connection_kwargs(dict(sql_mode="ANSI,STRICT_TRANS_TABLES,TRADITIONAL"))
+
+ def test_setoutputsize(self): pass
+ def test_setoutputsize_basic(self): pass
+ def test_nextset(self): pass
+
+ """The tests on fetchone and fetchall and rowcount bogusly
+ test for an exception if the statement cannot return a
+ result set. MySQL always returns a result set; it's just that
+ some things return empty result sets."""
+
+ def test_fetchall(self):
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ # cursor.fetchall should raise an Error if called
+ # without executing a query that may return rows (such
+ # as a select)
+ self.assertRaises(self.driver.Error, cur.fetchall)
+
+ self.executeDDL1(cur)
+ for sql in self._populate():
+ cur.execute(sql)
+
+ # cursor.fetchall should raise an Error if called
+ # after executing a a statement that cannot return rows
+## self.assertRaises(self.driver.Error,cur.fetchall)
+
+ cur.execute('select name from %sbooze' % self.table_prefix)
+ rows = cur.fetchall()
+ self.assertTrue(cur.rowcount in (-1,len(self.samples)))
+ self.assertEqual(len(rows),len(self.samples),
+ 'cursor.fetchall did not retrieve all rows'
+ )
+ rows = [r[0] for r in rows]
+ rows.sort()
+ for i in range(0,len(self.samples)):
+ self.assertEqual(rows[i],self.samples[i],
+ 'cursor.fetchall retrieved incorrect rows'
+ )
+ rows = cur.fetchall()
+ self.assertEqual(
+ len(rows),0,
+ 'cursor.fetchall should return an empty list if called '
+ 'after the whole result set has been fetched'
+ )
+ self.assertTrue(cur.rowcount in (-1,len(self.samples)))
+
+ self.executeDDL2(cur)
+ cur.execute('select name from %sbarflys' % self.table_prefix)
+ rows = cur.fetchall()
+ self.assertTrue(cur.rowcount in (-1,0))
+ self.assertEqual(len(rows),0,
+ 'cursor.fetchall should return an empty list if '
+ 'a select query returns no rows'
+ )
+
+ finally:
+ con.close()
+
+ def test_fetchone(self):
+ con = self._connect()
+ try:
+ cur = con.cursor()
+
+ # cursor.fetchone should raise an Error if called before
+ # executing a select-type query
+ self.assertRaises(self.driver.Error,cur.fetchone)
+
+ # cursor.fetchone should raise an Error if called after
+ # executing a query that cannnot return rows
+ self.executeDDL1(cur)
+## self.assertRaises(self.driver.Error,cur.fetchone)
+
+ cur.execute('select name from %sbooze' % self.table_prefix)
+ self.assertEqual(cur.fetchone(),None,
+ 'cursor.fetchone should return None if a query retrieves '
+ 'no rows'
+ )
+ self.assertTrue(cur.rowcount in (-1,0))
+
+ # cursor.fetchone should raise an Error if called after
+ # executing a query that cannnot return rows
+ cur.execute("insert into %sbooze values ('Victoria Bitter')" % (
+ self.table_prefix
+ ))
+## self.assertRaises(self.driver.Error,cur.fetchone)
+
+ cur.execute('select name from %sbooze' % self.table_prefix)
+ r = cur.fetchone()
+ self.assertEqual(len(r),1,
+ 'cursor.fetchone should have retrieved a single row'
+ )
+ self.assertEqual(r[0],'Victoria Bitter',
+ 'cursor.fetchone retrieved incorrect data'
+ )
+## self.assertEqual(cur.fetchone(),None,
+## 'cursor.fetchone should return None if no more rows available'
+## )
+ self.assertTrue(cur.rowcount in (-1,1))
+ finally:
+ con.close()
+
+ # Same complaint as for fetchall and fetchone
+ def test_rowcount(self):
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ self.executeDDL1(cur)
+## self.assertEqual(cur.rowcount,-1,
+## 'cursor.rowcount should be -1 after executing no-result '
+## 'statements'
+## )
+ cur.execute("insert into %sbooze values ('Victoria Bitter')" % (
+ self.table_prefix
+ ))
+## self.assertTrue(cur.rowcount in (-1,1),
+## 'cursor.rowcount should == number or rows inserted, or '
+## 'set to -1 after executing an insert statement'
+## )
+ cur.execute("select name from %sbooze" % self.table_prefix)
+ self.assertTrue(cur.rowcount in (-1,1),
+ 'cursor.rowcount should == number of rows returned, or '
+ 'set to -1 after executing a select statement'
+ )
+ self.executeDDL2(cur)
+## self.assertEqual(cur.rowcount,-1,
+## 'cursor.rowcount not being reset to -1 after executing '
+## 'no-result statements'
+## )
+ finally:
+ con.close()
+
+ def test_callproc(self):
+ pass # performed in test_MySQL_capabilities
+
+ def help_nextset_setUp(self,cur):
+ ''' Should create a procedure called deleteme
+ that returns two result sets, first the
+ number of rows in booze then "name from booze"
+ '''
+ sql="""
+ create procedure deleteme()
+ begin
+ select count(*) from %(tp)sbooze;
+ select name from %(tp)sbooze;
+ end
+ """ % dict(tp=self.table_prefix)
+ cur.execute(sql)
+
+ def help_nextset_tearDown(self,cur):
+ 'If cleaning up is needed after nextSetTest'
+ cur.execute("drop procedure deleteme")
+
+ def test_nextset(self):
+ from warnings import warn
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ if not hasattr(cur,'nextset'):
+ return
+
+ try:
+ self.executeDDL1(cur)
+ sql=self._populate()
+ for sql in self._populate():
+ cur.execute(sql)
+
+ self.help_nextset_setUp(cur)
+
+ cur.callproc('deleteme')
+ numberofrows=cur.fetchone()
+ assert numberofrows[0]== len(self.samples)
+ assert cur.nextset()
+ names=cur.fetchall()
+ assert len(names) == len(self.samples)
+ s=cur.nextset()
+ if s:
+ empty = cur.fetchall()
+ self.assertEquals(len(empty), 0,
+ "non-empty result set after other result sets")
+ #warn("Incompatibility: MySQL returns an empty result set for the CALL itself",
+ # Warning)
+ #assert s == None,'No more return sets, should return None'
+ finally:
+ self.help_nextset_tearDown(cur)
+
+ finally:
+ con.close()
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/tests/test_MySQLdb_nonstandard.py b/tests/test_MySQLdb_nonstandard.py
new file mode 100644
index 0000000..92fcbdc
--- /dev/null
+++ b/tests/test_MySQLdb_nonstandard.py
@@ -0,0 +1,86 @@
+import unittest
+
+import _mysql
+import MySQLdb
+from MySQLdb.constants import FIELD_TYPE
+from configdb import connection_factory
+
+
+class TestDBAPISet(unittest.TestCase):
+ def test_set_equality(self):
+ self.assertTrue(MySQLdb.STRING == MySQLdb.STRING)
+
+ def test_set_inequality(self):
+ self.assertTrue(MySQLdb.STRING != MySQLdb.NUMBER)
+
+ def test_set_equality_membership(self):
+ self.assertTrue(FIELD_TYPE.VAR_STRING == MySQLdb.STRING)
+
+ def test_set_inequality_membership(self):
+ self.assertTrue(FIELD_TYPE.DATE != MySQLdb.STRING)
+
+
+class CoreModule(unittest.TestCase):
+ """Core _mysql module features."""
+
+ def test_NULL(self):
+ """Should have a NULL constant."""
+ self.assertEqual(_mysql.NULL, 'NULL')
+
+ def test_version(self):
+ """Version information sanity."""
+ self.assertTrue(isinstance(_mysql.__version__, str))
+
+ self.assertTrue(isinstance(_mysql.version_info, tuple))
+ self.assertEqual(len(_mysql.version_info), 5)
+
+ def test_client_info(self):
+ self.assertTrue(isinstance(_mysql.get_client_info(), str))
+
+ def test_thread_safe(self):
+ self.assertTrue(isinstance(_mysql.thread_safe(), int))
+
+
+class CoreAPI(unittest.TestCase):
+ """Test _mysql interaction internals."""
+
+ def setUp(self):
+ self.conn = connection_factory(use_unicode=True)
+
+ def tearDown(self):
+ self.conn.close()
+
+ def test_thread_id(self):
+ tid = self.conn.thread_id()
+ self.assertTrue(isinstance(tid, int),
+ "thread_id didn't return an int.")
+
+ self.assertRaises(TypeError, self.conn.thread_id, ('evil',),
+ "thread_id shouldn't accept arguments.")
+
+ def test_affected_rows(self):
+ self.assertEquals(self.conn.affected_rows(), 0,
+ "Should return 0 before we do anything.")
+
+
+ #def test_debug(self):
+ ## FIXME Only actually tests if you lack SUPER
+ #self.assertRaises(MySQLdb.OperationalError,
+ #self.conn.dump_debug_info)
+
+ def test_charset_name(self):
+ self.assertTrue(isinstance(self.conn.character_set_name(), str),
+ "Should return a string.")
+
+ def test_host_info(self):
+ self.assertTrue(isinstance(self.conn.get_host_info(), str),
+ "Should return a string.")
+
+ def test_proto_info(self):
+ self.assertTrue(isinstance(self.conn.get_proto_info(), int),
+ "Should return an int.")
+
+ def test_server_info(self):
+ self.assertTrue(isinstance(self.conn.get_server_info(), str),
+ "Should return an str.")
+
diff --git a/tests/travis.cnf b/tests/travis.cnf
new file mode 100644
index 0000000..e78f52d
--- /dev/null
+++ b/tests/travis.cnf
@@ -0,0 +1,10 @@
+# To create your own custom version of this file, read
+# http://dev.mysql.com/doc/refman/5.1/en/option-files.html
+# and set TESTDB in your environment to the name of the file
+
+[MySQLdb-tests]
+host = 127.0.0.1
+user = root
+database = mysqldb_test
+#password =
+default-character-set = utf8