diff options
author | Mike Bayer <mike_mp@zzzcomputing.com> | 2013-06-28 22:30:11 -0400 |
---|---|---|
committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2013-06-28 22:30:11 -0400 |
commit | 1c23741b8e045d266d0ecbed975952547444a5fa (patch) | |
tree | 366b9619c81a271bb3f05a37867ddb2124467c1d /test/dialect/test_mysql.py | |
parent | 83f3dbc83d1066216084a01b32cddcc090f697d5 (diff) | |
download | sqlalchemy-1c23741b8e045d266d0ecbed975952547444a5fa.tar.gz |
refactor test suites for postgresql, mssql, mysql into packages.
Diffstat (limited to 'test/dialect/test_mysql.py')
-rw-r--r-- | test/dialect/test_mysql.py | 1602 |
1 files changed, 0 insertions, 1602 deletions
diff --git a/test/dialect/test_mysql.py b/test/dialect/test_mysql.py deleted file mode 100644 index ec6b1c193..000000000 --- a/test/dialect/test_mysql.py +++ /dev/null @@ -1,1602 +0,0 @@ -# coding: utf-8 - -from sqlalchemy.testing import eq_, assert_raises, assert_raises_message - - -from sqlalchemy import * -from sqlalchemy import sql, exc, schema, types as sqltypes -from sqlalchemy.util import u -from sqlalchemy.dialects.mysql import base as mysql -from sqlalchemy.engine.url import make_url -from sqlalchemy.testing import fixtures, AssertsCompiledSQL, AssertsExecutionResults -from sqlalchemy import testing -from sqlalchemy.testing import engines -from sqlalchemy.testing.engines import utf8_engine -import datetime - -class CompileTest(fixtures.TestBase, AssertsCompiledSQL): - - __dialect__ = mysql.dialect() - - def test_reserved_words(self): - table = Table("mysql_table", MetaData(), - Column("col1", Integer), - Column("master_ssl_verify_server_cert", Integer)) - x = select([table.c.col1, table.c.master_ssl_verify_server_cert]) - - self.assert_compile(x, - "SELECT mysql_table.col1, " - "mysql_table.`master_ssl_verify_server_cert` FROM mysql_table") - - def test_create_index_simple(self): - m = MetaData() - tbl = Table('testtbl', m, Column('data', String(255))) - idx = Index('test_idx1', tbl.c.data) - - self.assert_compile(schema.CreateIndex(idx), - 'CREATE INDEX test_idx1 ON testtbl (data)') - - def test_create_index_with_length(self): - m = MetaData() - tbl = Table('testtbl', m, Column('data', String(255))) - idx1 = Index('test_idx1', tbl.c.data, mysql_length=10) - idx2 = Index('test_idx2', tbl.c.data, mysql_length=5) - - self.assert_compile(schema.CreateIndex(idx1), - 'CREATE INDEX test_idx1 ON testtbl (data(10))') - self.assert_compile(schema.CreateIndex(idx2), - 'CREATE INDEX test_idx2 ON testtbl (data(5))') - - def test_create_composite_index_with_length(self): - m = MetaData() - tbl = Table('testtbl', m, - Column('a', String(255)), - Column('b', String(255))) - - idx1 = Index('test_idx1', tbl.c.a, tbl.c.b, - mysql_length={'a': 10, 'b': 20}) - idx2 = Index('test_idx2', tbl.c.a, tbl.c.b, - mysql_length={'a': 15}) - idx3 = Index('test_idx3', tbl.c.a, tbl.c.b, - mysql_length=30) - - self.assert_compile( - schema.CreateIndex(idx1), - 'CREATE INDEX test_idx1 ON testtbl (a(10), b(20))' - ) - self.assert_compile( - schema.CreateIndex(idx2), - 'CREATE INDEX test_idx2 ON testtbl (a(15), b)' - ) - self.assert_compile( - schema.CreateIndex(idx3), - 'CREATE INDEX test_idx3 ON testtbl (a(30), b(30))' - ) - - def test_create_index_with_using(self): - m = MetaData() - tbl = Table('testtbl', m, Column('data', String(255))) - idx1 = Index('test_idx1', tbl.c.data, mysql_using='btree') - idx2 = Index('test_idx2', tbl.c.data, mysql_using='hash') - - self.assert_compile(schema.CreateIndex(idx1), - 'CREATE INDEX test_idx1 ON testtbl (data) USING btree') - self.assert_compile(schema.CreateIndex(idx2), - 'CREATE INDEX test_idx2 ON testtbl (data) USING hash') - - def test_create_pk_plain(self): - m = MetaData() - tbl = Table('testtbl', m, Column('data', String(255)), - PrimaryKeyConstraint('data')) - - self.assert_compile(schema.CreateTable(tbl), - "CREATE TABLE testtbl (data VARCHAR(255), PRIMARY KEY (data))") - - def test_create_pk_with_using(self): - m = MetaData() - tbl = Table('testtbl', m, Column('data', String(255)), - PrimaryKeyConstraint('data', mysql_using='btree')) - - self.assert_compile(schema.CreateTable(tbl), - "CREATE TABLE testtbl (data VARCHAR(255), " - "PRIMARY KEY (data) USING btree)") - - def test_skip_deferrable_kw(self): - m = MetaData() - t1 = Table('t1', m, Column('id', Integer, primary_key=True)) - t2 = Table('t2', m, Column('id', Integer, - ForeignKey('t1.id', deferrable=True), - primary_key=True)) - - self.assert_compile( - schema.CreateTable(t2), - "CREATE TABLE t2 (id INTEGER NOT NULL, " - "PRIMARY KEY (id), FOREIGN KEY(id) REFERENCES t1 (id))" - ) - -class DialectTest(fixtures.TestBase): - __only_on__ = 'mysql' - - @testing.only_on(['mysql+mysqldb', 'mysql+oursql'], - 'requires particular SSL arguments') - def test_ssl_arguments(self): - dialect = testing.db.dialect - kwarg = dialect.create_connect_args( - make_url("mysql://scott:tiger@localhost:3306/test" - "?ssl_ca=/ca.pem&ssl_cert=/cert.pem&ssl_key=/key.pem") - )[1] - # args that differ among mysqldb and oursql - for k in ('use_unicode', 'found_rows', 'client_flag'): - kwarg.pop(k, None) - eq_( - kwarg, - { - 'passwd': 'tiger', 'db': 'test', - 'ssl': {'ca': '/ca.pem', 'cert': '/cert.pem', - 'key': '/key.pem'}, - 'host': 'localhost', 'user': 'scott', - 'port': 3306 - } - ) - - -class TypesTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL): - "Test MySQL column types" - - __dialect__ = mysql.dialect() - - def test_numeric(self): - "Exercise type specification and options for numeric types." - - columns = [ - # column type, args, kwargs, expected ddl - # e.g. Column(Integer(10, unsigned=True)) == - # 'INTEGER(10) UNSIGNED' - (mysql.MSNumeric, [], {}, - 'NUMERIC'), - (mysql.MSNumeric, [None], {}, - 'NUMERIC'), - (mysql.MSNumeric, [12], {}, - 'NUMERIC(12)'), - (mysql.MSNumeric, [12, 4], {'unsigned':True}, - 'NUMERIC(12, 4) UNSIGNED'), - (mysql.MSNumeric, [12, 4], {'zerofill':True}, - 'NUMERIC(12, 4) ZEROFILL'), - (mysql.MSNumeric, [12, 4], {'zerofill':True, 'unsigned':True}, - 'NUMERIC(12, 4) UNSIGNED ZEROFILL'), - - (mysql.MSDecimal, [], {}, - 'DECIMAL'), - (mysql.MSDecimal, [None], {}, - 'DECIMAL'), - (mysql.MSDecimal, [12], {}, - 'DECIMAL(12)'), - (mysql.MSDecimal, [12, None], {}, - 'DECIMAL(12)'), - (mysql.MSDecimal, [12, 4], {'unsigned':True}, - 'DECIMAL(12, 4) UNSIGNED'), - (mysql.MSDecimal, [12, 4], {'zerofill':True}, - 'DECIMAL(12, 4) ZEROFILL'), - (mysql.MSDecimal, [12, 4], {'zerofill':True, 'unsigned':True}, - 'DECIMAL(12, 4) UNSIGNED ZEROFILL'), - - (mysql.MSDouble, [None, None], {}, - 'DOUBLE'), - (mysql.MSDouble, [12, 4], {'unsigned':True}, - 'DOUBLE(12, 4) UNSIGNED'), - (mysql.MSDouble, [12, 4], {'zerofill':True}, - 'DOUBLE(12, 4) ZEROFILL'), - (mysql.MSDouble, [12, 4], {'zerofill':True, 'unsigned':True}, - 'DOUBLE(12, 4) UNSIGNED ZEROFILL'), - - (mysql.MSReal, [None, None], {}, - 'REAL'), - (mysql.MSReal, [12, 4], {'unsigned':True}, - 'REAL(12, 4) UNSIGNED'), - (mysql.MSReal, [12, 4], {'zerofill':True}, - 'REAL(12, 4) ZEROFILL'), - (mysql.MSReal, [12, 4], {'zerofill':True, 'unsigned':True}, - 'REAL(12, 4) UNSIGNED ZEROFILL'), - - (mysql.MSFloat, [], {}, - 'FLOAT'), - (mysql.MSFloat, [None], {}, - 'FLOAT'), - (mysql.MSFloat, [12], {}, - 'FLOAT(12)'), - (mysql.MSFloat, [12, 4], {}, - 'FLOAT(12, 4)'), - (mysql.MSFloat, [12, 4], {'unsigned':True}, - 'FLOAT(12, 4) UNSIGNED'), - (mysql.MSFloat, [12, 4], {'zerofill':True}, - 'FLOAT(12, 4) ZEROFILL'), - (mysql.MSFloat, [12, 4], {'zerofill':True, 'unsigned':True}, - 'FLOAT(12, 4) UNSIGNED ZEROFILL'), - - (mysql.MSInteger, [], {}, - 'INTEGER'), - (mysql.MSInteger, [4], {}, - 'INTEGER(4)'), - (mysql.MSInteger, [4], {'unsigned':True}, - 'INTEGER(4) UNSIGNED'), - (mysql.MSInteger, [4], {'zerofill':True}, - 'INTEGER(4) ZEROFILL'), - (mysql.MSInteger, [4], {'zerofill':True, 'unsigned':True}, - 'INTEGER(4) UNSIGNED ZEROFILL'), - - (mysql.MSBigInteger, [], {}, - 'BIGINT'), - (mysql.MSBigInteger, [4], {}, - 'BIGINT(4)'), - (mysql.MSBigInteger, [4], {'unsigned':True}, - 'BIGINT(4) UNSIGNED'), - (mysql.MSBigInteger, [4], {'zerofill':True}, - 'BIGINT(4) ZEROFILL'), - (mysql.MSBigInteger, [4], {'zerofill':True, 'unsigned':True}, - 'BIGINT(4) UNSIGNED ZEROFILL'), - - (mysql.MSMediumInteger, [], {}, - 'MEDIUMINT'), - (mysql.MSMediumInteger, [4], {}, - 'MEDIUMINT(4)'), - (mysql.MSMediumInteger, [4], {'unsigned':True}, - 'MEDIUMINT(4) UNSIGNED'), - (mysql.MSMediumInteger, [4], {'zerofill':True}, - 'MEDIUMINT(4) ZEROFILL'), - (mysql.MSMediumInteger, [4], {'zerofill':True, 'unsigned':True}, - 'MEDIUMINT(4) UNSIGNED ZEROFILL'), - - (mysql.MSTinyInteger, [], {}, - 'TINYINT'), - (mysql.MSTinyInteger, [1], {}, - 'TINYINT(1)'), - (mysql.MSTinyInteger, [1], {'unsigned':True}, - 'TINYINT(1) UNSIGNED'), - (mysql.MSTinyInteger, [1], {'zerofill':True}, - 'TINYINT(1) ZEROFILL'), - (mysql.MSTinyInteger, [1], {'zerofill':True, 'unsigned':True}, - 'TINYINT(1) UNSIGNED ZEROFILL'), - - (mysql.MSSmallInteger, [], {}, - 'SMALLINT'), - (mysql.MSSmallInteger, [4], {}, - 'SMALLINT(4)'), - (mysql.MSSmallInteger, [4], {'unsigned':True}, - 'SMALLINT(4) UNSIGNED'), - (mysql.MSSmallInteger, [4], {'zerofill':True}, - 'SMALLINT(4) ZEROFILL'), - (mysql.MSSmallInteger, [4], {'zerofill':True, 'unsigned':True}, - 'SMALLINT(4) UNSIGNED ZEROFILL'), - ] - - for type_, args, kw, res in columns: - self.assert_compile( - type_(*args, **kw), - res - ) - - @testing.exclude('mysql', '<', (4, 1, 1), 'no charset support') - def test_charset(self): - """Exercise CHARACTER SET and COLLATE-ish options on string types.""" - - columns = [ - (mysql.MSChar, [1], {}, - 'CHAR(1)'), - (mysql.NCHAR, [1], {}, - 'NATIONAL CHAR(1)'), - (mysql.MSChar, [1], {'binary':True}, - 'CHAR(1) BINARY'), - (mysql.MSChar, [1], {'ascii':True}, - 'CHAR(1) ASCII'), - (mysql.MSChar, [1], {'unicode':True}, - 'CHAR(1) UNICODE'), - (mysql.MSChar, [1], {'ascii':True, 'binary':True}, - 'CHAR(1) ASCII BINARY'), - (mysql.MSChar, [1], {'unicode':True, 'binary':True}, - 'CHAR(1) UNICODE BINARY'), - (mysql.MSChar, [1], {'charset':'utf8'}, - 'CHAR(1) CHARACTER SET utf8'), - (mysql.MSChar, [1], {'charset':'utf8', 'binary':True}, - 'CHAR(1) CHARACTER SET utf8 BINARY'), - (mysql.MSChar, [1], {'charset':'utf8', 'unicode':True}, - 'CHAR(1) CHARACTER SET utf8'), - (mysql.MSChar, [1], {'charset':'utf8', 'ascii':True}, - 'CHAR(1) CHARACTER SET utf8'), - (mysql.MSChar, [1], {'collation': 'utf8_bin'}, - 'CHAR(1) COLLATE utf8_bin'), - (mysql.MSChar, [1], {'charset': 'utf8', 'collation': 'utf8_bin'}, - 'CHAR(1) CHARACTER SET utf8 COLLATE utf8_bin'), - (mysql.MSChar, [1], {'charset': 'utf8', 'binary': True}, - 'CHAR(1) CHARACTER SET utf8 BINARY'), - (mysql.MSChar, [1], {'charset': 'utf8', 'collation': 'utf8_bin', - 'binary': True}, - 'CHAR(1) CHARACTER SET utf8 COLLATE utf8_bin'), - (mysql.MSChar, [1], {'national':True}, - 'NATIONAL CHAR(1)'), - (mysql.MSChar, [1], {'national':True, 'charset':'utf8'}, - 'NATIONAL CHAR(1)'), - (mysql.MSChar, [1], {'national':True, 'charset':'utf8', - 'binary':True}, - 'NATIONAL CHAR(1) BINARY'), - (mysql.MSChar, [1], {'national':True, 'binary':True, - 'unicode':True}, - 'NATIONAL CHAR(1) BINARY'), - (mysql.MSChar, [1], {'national':True, 'collation':'utf8_bin'}, - 'NATIONAL CHAR(1) COLLATE utf8_bin'), - - (mysql.MSString, [1], {'charset':'utf8', 'collation':'utf8_bin'}, - 'VARCHAR(1) CHARACTER SET utf8 COLLATE utf8_bin'), - (mysql.MSString, [1], {'national':True, 'collation':'utf8_bin'}, - 'NATIONAL VARCHAR(1) COLLATE utf8_bin'), - - (mysql.MSTinyText, [], {'charset':'utf8', 'collation':'utf8_bin'}, - 'TINYTEXT CHARACTER SET utf8 COLLATE utf8_bin'), - - (mysql.MSMediumText, [], {'charset':'utf8', 'binary':True}, - 'MEDIUMTEXT CHARACTER SET utf8 BINARY'), - - (mysql.MSLongText, [], {'ascii':True}, - 'LONGTEXT ASCII'), - - (mysql.ENUM, ["foo", "bar"], {'unicode':True}, - '''ENUM('foo','bar') UNICODE'''), - - (String, [20], {"collation":"utf8"}, 'VARCHAR(20) COLLATE utf8') - - - ] - - for type_, args, kw, res in columns: - self.assert_compile( - type_(*args, **kw), - res - ) - - @testing.only_if('mysql') - @testing.exclude('mysql', '<', (5, 0, 5), 'a 5.0+ feature') - @testing.provide_metadata - def test_charset_collate_table(self): - t = Table('foo', self.metadata, - Column('id', Integer), - mysql_default_charset='utf8', - mysql_collate='utf8_unicode_ci' - ) - t.create() - m2 = MetaData(testing.db) - t2 = Table('foo', m2, autoload=True) - eq_(t2.kwargs['mysql_collate'], 'utf8_unicode_ci') - eq_(t2.kwargs['mysql_default charset'], 'utf8') - - def test_bit_50(self): - """Exercise BIT types on 5.0+ (not valid for all engine types)""" - - for type_, expected in [ - (mysql.MSBit(), "BIT"), - (mysql.MSBit(1), "BIT(1)"), - (mysql.MSBit(63), "BIT(63)"), - ]: - self.assert_compile(type_, expected) - - @testing.only_if('mysql') - @testing.exclude('mysql', '<', (5, 0, 5), 'a 5.0+ feature') - @testing.fails_on('mysql+oursql', 'some round trips fail, oursql bug ?') - @testing.provide_metadata - def test_bit_50_roundtrip(self): - bit_table = Table('mysql_bits', self.metadata, - Column('b1', mysql.MSBit), - Column('b2', mysql.MSBit()), - Column('b3', mysql.MSBit(), nullable=False), - Column('b4', mysql.MSBit(1)), - Column('b5', mysql.MSBit(8)), - Column('b6', mysql.MSBit(32)), - Column('b7', mysql.MSBit(63)), - Column('b8', mysql.MSBit(64))) - self.metadata.create_all() - - meta2 = MetaData(testing.db) - reflected = Table('mysql_bits', meta2, autoload=True) - - for table in bit_table, reflected: - - def roundtrip(store, expected=None): - expected = expected or store - table.insert(store).execute() - row = table.select().execute().first() - try: - self.assert_(list(row) == expected) - except: - print("Storing %s" % store) - print("Expected %s" % expected) - print("Found %s" % list(row)) - raise - table.delete().execute().close() - - roundtrip([0] * 8) - roundtrip([None, None, 0, None, None, None, None, None]) - roundtrip([1] * 8) - roundtrip([sql.text("b'1'")] * 8, [1] * 8) - - i = 255 - roundtrip([0, 0, 0, 0, i, i, i, i]) - i = 2 ** 32 - 1 - roundtrip([0, 0, 0, 0, 0, i, i, i]) - i = 2 ** 63 - 1 - roundtrip([0, 0, 0, 0, 0, 0, i, i]) - i = 2 ** 64 - 1 - roundtrip([0, 0, 0, 0, 0, 0, 0, i]) - - def test_boolean(self): - for type_, expected in [ - (BOOLEAN(), "BOOL"), - (Boolean(), "BOOL"), - (mysql.TINYINT(1), "TINYINT(1)"), - (mysql.TINYINT(1, unsigned=True), "TINYINT(1) UNSIGNED") - ]: - self.assert_compile(type_, expected) - - @testing.only_if('mysql') - @testing.provide_metadata - def test_boolean_roundtrip(self): - bool_table = Table( - 'mysql_bool', - self.metadata, - Column('b1', BOOLEAN), - Column('b2', Boolean), - Column('b3', mysql.MSTinyInteger(1)), - Column('b4', mysql.MSTinyInteger(1, unsigned=True)), - Column('b5', mysql.MSTinyInteger), - ) - self.metadata.create_all() - table = bool_table - - def roundtrip(store, expected=None): - expected = expected or store - table.insert(store).execute() - row = table.select().execute().first() - self.assert_(list(row) == expected) - for i, val in enumerate(expected): - if isinstance(val, bool): - self.assert_(val is row[i]) - table.delete().execute() - - roundtrip([None, None, None, None, None]) - roundtrip([True, True, 1, 1, 1]) - roundtrip([False, False, 0, 0, 0]) - roundtrip([True, True, True, True, True], [True, True, 1, - 1, 1]) - roundtrip([False, False, 0, 0, 0], [False, False, 0, 0, 0]) - - meta2 = MetaData(testing.db) - table = Table('mysql_bool', meta2, autoload=True) - eq_(colspec(table.c.b3), 'b3 TINYINT(1)') - eq_(colspec(table.c.b4), 'b4 TINYINT(1) UNSIGNED') - meta2 = MetaData(testing.db) - table = Table( - 'mysql_bool', - meta2, - Column('b1', BOOLEAN), - Column('b2', Boolean), - Column('b3', BOOLEAN), - Column('b4', BOOLEAN), - autoload=True, - ) - eq_(colspec(table.c.b3), 'b3 BOOL') - eq_(colspec(table.c.b4), 'b4 BOOL') - roundtrip([None, None, None, None, None]) - roundtrip([True, True, 1, 1, 1], [True, True, True, True, - 1]) - roundtrip([False, False, 0, 0, 0], [False, False, False, - False, 0]) - roundtrip([True, True, True, True, True], [True, True, - True, True, 1]) - roundtrip([False, False, 0, 0, 0], [False, False, False, - False, 0]) - - def test_timestamp(self): - """Exercise funky TIMESTAMP default syntax.""" - - columns = [ - ([TIMESTAMP], - 'TIMESTAMP NULL'), - ([mysql.MSTimeStamp], - 'TIMESTAMP NULL'), - ([mysql.MSTimeStamp, - DefaultClause(sql.text('CURRENT_TIMESTAMP'))], - "TIMESTAMP DEFAULT CURRENT_TIMESTAMP"), - ([mysql.MSTimeStamp, - DefaultClause(sql.text("'1999-09-09 09:09:09'"))], - "TIMESTAMP DEFAULT '1999-09-09 09:09:09'"), - ([mysql.MSTimeStamp, - DefaultClause(sql.text("'1999-09-09 09:09:09' " - "ON UPDATE CURRENT_TIMESTAMP"))], - "TIMESTAMP DEFAULT '1999-09-09 09:09:09' " - "ON UPDATE CURRENT_TIMESTAMP"), - ([mysql.MSTimeStamp, - DefaultClause(sql.text("CURRENT_TIMESTAMP " - "ON UPDATE CURRENT_TIMESTAMP"))], - "TIMESTAMP DEFAULT CURRENT_TIMESTAMP " - "ON UPDATE CURRENT_TIMESTAMP"), - ] - for spec, expected in columns: - c = Column('t', *spec) - Table('t', MetaData(), c) - self.assert_compile( - schema.CreateColumn(c), - "t %s" % expected - - ) - - @testing.only_if('mysql') - @testing.provide_metadata - def test_timestamp_nullable(self): - ts_table = Table('mysql_timestamp', self.metadata, - Column('t1', TIMESTAMP), - Column('t2', TIMESTAMP, nullable=False), - ) - self.metadata.create_all() - - now = testing.db.execute("select now()").scalar() - - # TIMESTAMP without NULL inserts current time when passed - # NULL. when not passed, generates 0000-00-00 quite - # annoyingly. - ts_table.insert().execute({'t1': now, 't2': None}) - ts_table.insert().execute({'t1': None, 't2': None}) - - # normalize dates that are over the second boundary - def normalize(dt): - if dt is None: - return None - elif (dt - now).seconds < 5: - return now - else: - return dt - eq_( - [tuple([normalize(dt) for dt in row]) - for row in ts_table.select().execute()], - [(now, now), (None, now)] - ) - - def test_time(self): - """"Exercise TIME.""" - - self.assert_compile( - mysql.TIME(), - "TIME" - ) - - self.assert_compile( - mysql.TIME(fsp=5), - "TIME(5)" - ) - - eq_( - mysql.TIME().result_processor(None, None)( - datetime.timedelta(seconds=35, minutes=517, - microseconds=450 - )), - datetime.time(8, 37, 35, 450) - ) - - @testing.only_if('mysql') - @testing.provide_metadata - def test_year(self): - """Exercise YEAR.""" - - year_table = Table('mysql_year', self.metadata, - Column('y1', mysql.MSYear), - Column('y2', mysql.MSYear), - Column('y3', mysql.MSYear), - Column('y5', mysql.MSYear(4))) - - for col in year_table.c: - self.assert_(repr(col)) - year_table.create() - reflected = Table('mysql_year', MetaData(testing.db), - autoload=True) - - for table in year_table, reflected: - table.insert(['1950', '50', None, 1950]).execute() - row = table.select().execute().first() - eq_(list(row), [1950, 2050, None, 1950]) - table.delete().execute() - self.assert_(colspec(table.c.y1).startswith('y1 YEAR')) - eq_(colspec(table.c.y5), 'y5 YEAR(4)') - - @testing.only_if('mysql') - @testing.provide_metadata - def test_set(self): - """Exercise the SET type.""" - - set_table = Table('mysql_set', self.metadata, - Column('s1', - mysql.MSSet("'dq'", "'sq'")), Column('s2', - mysql.MSSet("'a'")), Column('s3', - mysql.MSSet("'5'", "'7'", "'9'"))) - eq_(colspec(set_table.c.s1), "s1 SET('dq','sq')") - eq_(colspec(set_table.c.s2), "s2 SET('a')") - eq_(colspec(set_table.c.s3), "s3 SET('5','7','9')") - set_table.create() - reflected = Table('mysql_set', MetaData(testing.db), - autoload=True) - for table in set_table, reflected: - - def roundtrip(store, expected=None): - expected = expected or store - table.insert(store).execute() - row = table.select().execute().first() - self.assert_(list(row) == expected) - table.delete().execute() - - roundtrip([None, None, None], [None] * 3) - roundtrip(['', '', ''], [set([''])] * 3) - roundtrip([set(['dq']), set(['a']), set(['5'])]) - roundtrip(['dq', 'a', '5'], [set(['dq']), set(['a']), - set(['5'])]) - roundtrip([1, 1, 1], [set(['dq']), set(['a']), set(['5' - ])]) - roundtrip([set(['dq', 'sq']), None, set(['9', '5', '7' - ])]) - set_table.insert().execute({'s3': set(['5'])}, - {'s3': set(['5', '7'])}, {'s3': set(['5', '7', '9'])}, - {'s3': set(['7', '9'])}) - rows = select([set_table.c.s3], set_table.c.s3.in_([set(['5' - ]), set(['5', '7']), set(['7', '5' - ])])).execute().fetchall() - found = set([frozenset(row[0]) for row in rows]) - eq_(found, set([frozenset(['5']), frozenset(['5', '7'])])) - -class EnumTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL): - - __only_on__ = 'mysql' - __dialect__ = mysql.dialect() - - - @testing.uses_deprecated('Manually quoting ENUM value literals') - @testing.provide_metadata - def test_enum(self): - """Exercise the ENUM type.""" - - enum_table = Table('mysql_enum', self.metadata, - Column('e1', mysql.ENUM("'a'", "'b'")), - Column('e2', mysql.ENUM("'a'", "'b'"), - nullable=False), - Column('e2generic', Enum("a", "b"), - nullable=False), - Column('e3', mysql.ENUM("'a'", "'b'", strict=True)), - Column('e4', mysql.ENUM("'a'", "'b'", strict=True), - nullable=False), - Column('e5', mysql.ENUM("a", "b")), - Column('e5generic', Enum("a", "b")), - Column('e6', mysql.ENUM("'a'", "b")), - ) - - eq_(colspec(enum_table.c.e1), - "e1 ENUM('a','b')") - eq_(colspec(enum_table.c.e2), - "e2 ENUM('a','b') NOT NULL") - eq_(colspec(enum_table.c.e2generic), - "e2generic ENUM('a','b') NOT NULL") - eq_(colspec(enum_table.c.e3), - "e3 ENUM('a','b')") - eq_(colspec(enum_table.c.e4), - "e4 ENUM('a','b') NOT NULL") - eq_(colspec(enum_table.c.e5), - "e5 ENUM('a','b')") - eq_(colspec(enum_table.c.e5generic), - "e5generic ENUM('a','b')") - eq_(colspec(enum_table.c.e6), - "e6 ENUM('''a''','b')") - enum_table.create() - - assert_raises(exc.DBAPIError, enum_table.insert().execute, - e1=None, e2=None, e3=None, e4=None) - - assert_raises(exc.StatementError, enum_table.insert().execute, - e1='c', e2='c', e2generic='c', e3='c', - e4='c', e5='c', e5generic='c', e6='c') - - enum_table.insert().execute() - enum_table.insert().execute(e1='a', e2='a', e2generic='a', e3='a', - e4='a', e5='a', e5generic='a', e6="'a'") - enum_table.insert().execute(e1='b', e2='b', e2generic='b', e3='b', - e4='b', e5='b', e5generic='b', e6='b') - - res = enum_table.select().execute().fetchall() - - expected = [(None, 'a', 'a', None, 'a', None, None, None), - ('a', 'a', 'a', 'a', 'a', 'a', 'a', "'a'"), - ('b', 'b', 'b', 'b', 'b', 'b', 'b', 'b')] - - eq_(res, expected) - - def test_unicode_enum(self): - unicode_engine = utf8_engine() - metadata = MetaData(unicode_engine) - t1 = Table('table', metadata, - Column('id', Integer, primary_key=True), - Column('value', Enum(u('réveillé'), u('drôle'), u('S’il'))), - Column('value2', mysql.ENUM(u('réveillé'), u('drôle'), u('S’il'))) - ) - metadata.create_all() - try: - t1.insert().execute(value=u('drôle'), value2=u('drôle')) - t1.insert().execute(value=u('réveillé'), value2=u('réveillé')) - t1.insert().execute(value=u('S’il'), value2=u('S’il')) - eq_(t1.select().order_by(t1.c.id).execute().fetchall(), - [(1, u('drôle'), u('drôle')), (2, u('réveillé'), u('réveillé')), - (3, u('S’il'), u('S’il'))] - ) - - # test reflection of the enum labels - - m2 = MetaData(testing.db) - t2 = Table('table', m2, autoload=True) - - # TODO: what's wrong with the last element ? is there - # latin-1 stuff forcing its way in ? - - assert t2.c.value.type.enums[0:2] == \ - (u('réveillé'), u('drôle')) # u'S’il') # eh ? - - assert t2.c.value2.type.enums[0:2] == \ - (u('réveillé'), u('drôle')) # u'S’il') # eh ? - finally: - metadata.drop_all() - - def test_enum_compile(self): - e1 = Enum('x', 'y', 'z', name='somename') - t1 = Table('sometable', MetaData(), Column('somecolumn', e1)) - self.assert_compile(schema.CreateTable(t1), - "CREATE TABLE sometable (somecolumn " - "ENUM('x','y','z'))") - t1 = Table('sometable', MetaData(), Column('somecolumn', - Enum('x', 'y', 'z', native_enum=False))) - self.assert_compile(schema.CreateTable(t1), - "CREATE TABLE sometable (somecolumn " - "VARCHAR(1), CHECK (somecolumn IN ('x', " - "'y', 'z')))") - - @testing.exclude('mysql', '<', (4,), "3.23 can't handle an ENUM of ''") - @testing.uses_deprecated('Manually quoting ENUM value literals') - def test_enum_parse(self): - """More exercises for the ENUM type.""" - - # MySQL 3.23 can't handle an ENUM of ''.... - - enum_table = Table('mysql_enum', MetaData(testing.db), - Column('e1', mysql.ENUM("'a'")), - Column('e2', mysql.ENUM("''")), - Column('e3', mysql.ENUM('a')), - Column('e4', mysql.ENUM('')), - Column('e5', mysql.ENUM("'a'", "''")), - Column('e6', mysql.ENUM("''", "'a'")), - Column('e7', mysql.ENUM("''", "'''a'''", "'b''b'", "''''"))) - - for col in enum_table.c: - self.assert_(repr(col)) - try: - enum_table.create() - reflected = Table('mysql_enum', MetaData(testing.db), - autoload=True) - for t in enum_table, reflected: - eq_(t.c.e1.type.enums, ("a",)) - eq_(t.c.e2.type.enums, ("",)) - eq_(t.c.e3.type.enums, ("a",)) - eq_(t.c.e4.type.enums, ("",)) - eq_(t.c.e5.type.enums, ("a", "")) - eq_(t.c.e6.type.enums, ("", "a")) - eq_(t.c.e7.type.enums, ("", "'a'", "b'b", "'")) - finally: - enum_table.drop() - -class ReflectionTest(fixtures.TestBase, AssertsExecutionResults): - - __only_on__ = 'mysql' - - def test_default_reflection(self): - """Test reflection of column defaults.""" - - from sqlalchemy.dialects.mysql import VARCHAR - def_table = Table( - 'mysql_def', - MetaData(testing.db), - Column('c1', VARCHAR(10, collation='utf8_unicode_ci'), - DefaultClause(''), nullable=False), - Column('c2', String(10), DefaultClause('0')), - Column('c3', String(10), DefaultClause('abc')), - Column('c4', TIMESTAMP, DefaultClause('2009-04-05 12:00:00' - )), - Column('c5', TIMESTAMP), - Column('c6', TIMESTAMP, - DefaultClause(sql.text("CURRENT_TIMESTAMP " - "ON UPDATE CURRENT_TIMESTAMP"))), - ) - def_table.create() - try: - reflected = Table('mysql_def', MetaData(testing.db), - autoload=True) - finally: - def_table.drop() - assert def_table.c.c1.server_default.arg == '' - assert def_table.c.c2.server_default.arg == '0' - assert def_table.c.c3.server_default.arg == 'abc' - assert def_table.c.c4.server_default.arg \ - == '2009-04-05 12:00:00' - assert str(reflected.c.c1.server_default.arg) == "''" - assert str(reflected.c.c2.server_default.arg) == "'0'" - assert str(reflected.c.c3.server_default.arg) == "'abc'" - assert str(reflected.c.c4.server_default.arg) \ - == "'2009-04-05 12:00:00'" - assert reflected.c.c5.default is None - assert reflected.c.c5.server_default is None - assert reflected.c.c6.default is None - eq_( - str(reflected.c.c6.server_default.arg).upper(), - "CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP" - ) - reflected.create() - try: - reflected2 = Table('mysql_def', MetaData(testing.db), - autoload=True) - finally: - reflected.drop() - assert str(reflected2.c.c1.server_default.arg) == "''" - assert str(reflected2.c.c2.server_default.arg) == "'0'" - assert str(reflected2.c.c3.server_default.arg) == "'abc'" - assert str(reflected2.c.c4.server_default.arg) \ - == "'2009-04-05 12:00:00'" - assert reflected.c.c5.default is None - assert reflected.c.c5.server_default is None - assert reflected.c.c6.default is None - eq_( - str(reflected.c.c6.server_default.arg).upper(), - "CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP" - ) - - def test_reflection_with_table_options(self): - comment = r"""Comment types type speedily ' " \ '' Fun!""" - - def_table = Table('mysql_def', MetaData(testing.db), - Column('c1', Integer()), - mysql_engine='MEMORY', - mysql_comment=comment, - mysql_default_charset='utf8', - mysql_auto_increment='5', - mysql_avg_row_length='3', - mysql_password='secret', - mysql_connection='fish', - ) - - def_table.create() - try: - reflected = Table('mysql_def', MetaData(testing.db), - autoload=True) - finally: - def_table.drop() - - assert def_table.kwargs['mysql_engine'] == 'MEMORY' - assert def_table.kwargs['mysql_comment'] == comment - assert def_table.kwargs['mysql_default_charset'] == 'utf8' - assert def_table.kwargs['mysql_auto_increment'] == '5' - assert def_table.kwargs['mysql_avg_row_length'] == '3' - assert def_table.kwargs['mysql_password'] == 'secret' - assert def_table.kwargs['mysql_connection'] == 'fish' - - assert reflected.kwargs['mysql_engine'] == 'MEMORY' - assert reflected.kwargs['mysql_comment'] == comment - assert reflected.kwargs['mysql_default charset'] == 'utf8' - assert reflected.kwargs['mysql_avg_row_length'] == '3' - assert reflected.kwargs['mysql_connection'] == 'fish' - - # This field doesn't seem to be returned by mysql itself. - #assert reflected.kwargs['mysql_password'] == 'secret' - - # This is explicitly ignored when reflecting schema. - #assert reflected.kwargs['mysql_auto_increment'] == '5' - - def test_reflection_on_include_columns(self): - """Test reflection of include_columns to be sure they respect case.""" - - case_table = Table('mysql_case', MetaData(testing.db), - Column('c1', String(10)), - Column('C2', String(10)), - Column('C3', String(10))) - - try: - case_table.create() - reflected = Table('mysql_case', MetaData(testing.db), - autoload=True, include_columns=['c1', 'C2']) - for t in case_table, reflected: - assert 'c1' in t.c.keys() - assert 'C2' in t.c.keys() - reflected2 = Table('mysql_case', MetaData(testing.db), - autoload=True, include_columns=['c1', 'c2']) - assert 'c1' in reflected2.c.keys() - for c in ['c2', 'C2', 'C3']: - assert c not in reflected2.c.keys() - finally: - case_table.drop() - - @testing.exclude('mysql', '<', (5, 0, 0), 'early types are squirrely') - @testing.uses_deprecated('Using String type with no length') - @testing.uses_deprecated('Manually quoting ENUM value literals') - def test_type_reflection(self): - # (ask_for, roundtripped_as_if_different) - specs = [( String(1), mysql.MSString(1), ), - ( String(3), mysql.MSString(3), ), - ( Text(), mysql.MSText(), ), - ( Unicode(1), mysql.MSString(1), ), - ( Unicode(3), mysql.MSString(3), ), - ( UnicodeText(), mysql.MSText(), ), - ( mysql.MSChar(1), ), - ( mysql.MSChar(3), ), - ( NCHAR(2), mysql.MSChar(2), ), - ( mysql.MSNChar(2), mysql.MSChar(2), ), # N is CREATE only - ( mysql.MSNVarChar(22), mysql.MSString(22), ), - ( SmallInteger(), mysql.MSSmallInteger(), ), - ( SmallInteger(), mysql.MSSmallInteger(4), ), - ( mysql.MSSmallInteger(), ), - ( mysql.MSSmallInteger(4), mysql.MSSmallInteger(4), ), - ( mysql.MSMediumInteger(), mysql.MSMediumInteger(), ), - ( mysql.MSMediumInteger(8), mysql.MSMediumInteger(8), ), - ( LargeBinary(3), mysql.TINYBLOB(), ), - ( LargeBinary(), mysql.BLOB() ), - ( mysql.MSBinary(3), mysql.MSBinary(3), ), - ( mysql.MSVarBinary(3),), - ( mysql.MSTinyBlob(),), - ( mysql.MSBlob(),), - ( mysql.MSBlob(1234), mysql.MSBlob()), - ( mysql.MSMediumBlob(),), - ( mysql.MSLongBlob(),), - ( mysql.ENUM("''","'fleem'"), ), - ] - - columns = [Column('c%i' % (i + 1), t[0]) for i, t in enumerate(specs)] - - db = testing.db - m = MetaData(db) - t_table = Table('mysql_types', m, *columns) - try: - m.create_all() - - m2 = MetaData(db) - rt = Table('mysql_types', m2, autoload=True) - try: - db.execute('CREATE OR REPLACE VIEW mysql_types_v ' - 'AS SELECT * from mysql_types') - rv = Table('mysql_types_v', m2, autoload=True) - - expected = [len(c) > 1 and c[1] or c[0] for c in specs] - - # Early 5.0 releases seem to report more "general" for columns - # in a view, e.g. char -> varchar, tinyblob -> mediumblob - # - # Not sure exactly which point version has the fix. - if db.dialect.server_version_info < (5, 0, 11): - tables = rt, - else: - tables = rt, rv - - for table in tables: - for i, reflected in enumerate(table.c): - assert isinstance(reflected.type, - type(expected[i])), \ - 'element %d: %r not instance of %r' % (i, - reflected.type, type(expected[i])) - finally: - db.execute('DROP VIEW mysql_types_v') - finally: - m.drop_all() - - def test_autoincrement(self): - meta = MetaData(testing.db) - try: - Table('ai_1', meta, - Column('int_y', Integer, primary_key=True), - Column('int_n', Integer, DefaultClause('0'), - primary_key=True), - mysql_engine='MyISAM') - Table('ai_2', meta, - Column('int_y', Integer, primary_key=True), - Column('int_n', Integer, DefaultClause('0'), - primary_key=True), - mysql_engine='MyISAM') - Table('ai_3', meta, - Column('int_n', Integer, DefaultClause('0'), - primary_key=True, autoincrement=False), - Column('int_y', Integer, primary_key=True), - mysql_engine='MyISAM') - Table('ai_4', meta, - Column('int_n', Integer, DefaultClause('0'), - primary_key=True, autoincrement=False), - Column('int_n2', Integer, DefaultClause('0'), - primary_key=True, autoincrement=False), - mysql_engine='MyISAM') - Table('ai_5', meta, - Column('int_y', Integer, primary_key=True), - Column('int_n', Integer, DefaultClause('0'), - primary_key=True, autoincrement=False), - mysql_engine='MyISAM') - Table('ai_6', meta, - Column('o1', String(1), DefaultClause('x'), - primary_key=True), - Column('int_y', Integer, primary_key=True), - mysql_engine='MyISAM') - Table('ai_7', meta, - Column('o1', String(1), DefaultClause('x'), - primary_key=True), - Column('o2', String(1), DefaultClause('x'), - primary_key=True), - Column('int_y', Integer, primary_key=True), - mysql_engine='MyISAM') - Table('ai_8', meta, - Column('o1', String(1), DefaultClause('x'), - primary_key=True), - Column('o2', String(1), DefaultClause('x'), - primary_key=True), - mysql_engine='MyISAM') - meta.create_all() - - table_names = ['ai_1', 'ai_2', 'ai_3', 'ai_4', - 'ai_5', 'ai_6', 'ai_7', 'ai_8'] - mr = MetaData(testing.db) - mr.reflect(only=table_names) - - for tbl in [mr.tables[name] for name in table_names]: - for c in tbl.c: - if c.name.startswith('int_y'): - assert c.autoincrement - elif c.name.startswith('int_n'): - assert not c.autoincrement - tbl.insert().execute() - if 'int_y' in tbl.c: - assert select([tbl.c.int_y]).scalar() == 1 - assert list(tbl.select().execute().first()).count(1) == 1 - else: - assert 1 not in list(tbl.select().execute().first()) - finally: - meta.drop_all() - - @testing.exclude('mysql', '<', (5, 0, 0), 'no information_schema support') - def test_system_views(self): - dialect = testing.db.dialect - connection = testing.db.connect() - view_names = dialect.get_view_names(connection, "information_schema") - self.assert_('TABLES' in view_names) - - -class SQLTest(fixtures.TestBase, AssertsCompiledSQL): - """Tests MySQL-dialect specific compilation.""" - - __dialect__ = mysql.dialect() - - def test_precolumns(self): - dialect = self.__dialect__ - - def gen(distinct=None, prefixes=None): - kw = {} - if distinct is not None: - kw['distinct'] = distinct - if prefixes is not None: - kw['prefixes'] = prefixes - return str(select(['q'], **kw).compile(dialect=dialect)) - - eq_(gen(None), 'SELECT q') - eq_(gen(True), 'SELECT DISTINCT q') - - eq_(gen(prefixes=['ALL']), 'SELECT ALL q') - eq_(gen(prefixes=['DISTINCTROW']), - 'SELECT DISTINCTROW q') - - # Interaction with MySQL prefix extensions - eq_( - gen(None, ['straight_join']), - 'SELECT straight_join q') - eq_( - gen(False, ['HIGH_PRIORITY', 'SQL_SMALL_RESULT', 'ALL']), - 'SELECT HIGH_PRIORITY SQL_SMALL_RESULT ALL q') - eq_( - gen(True, ['high_priority', sql.text('sql_cache')]), - 'SELECT high_priority sql_cache DISTINCT q') - - @testing.uses_deprecated - def test_deprecated_distinct(self): - dialect = self.__dialect__ - - self.assert_compile( - select(['q'], distinct='ALL'), - 'SELECT ALL q', - ) - - self.assert_compile( - select(['q'], distinct='distinctROW'), - 'SELECT DISTINCTROW q', - ) - - self.assert_compile( - select(['q'], distinct='ALL', - prefixes=['HIGH_PRIORITY', 'SQL_SMALL_RESULT']), - 'SELECT HIGH_PRIORITY SQL_SMALL_RESULT ALL q' - ) - - def test_backslash_escaping(self): - self.assert_compile( - sql.column('foo').like('bar', escape='\\'), - "foo LIKE %s ESCAPE '\\\\'" - ) - - dialect = mysql.dialect() - dialect._backslash_escapes=False - self.assert_compile( - sql.column('foo').like('bar', escape='\\'), - "foo LIKE %s ESCAPE '\\'", - dialect=dialect - ) - - def test_limit(self): - t = sql.table('t', sql.column('col1'), sql.column('col2')) - - self.assert_compile( - select([t]).limit(10).offset(20), - "SELECT t.col1, t.col2 FROM t LIMIT %s, %s", - {'param_1':20, 'param_2':10} - ) - self.assert_compile( - select([t]).limit(10), - "SELECT t.col1, t.col2 FROM t LIMIT %s", - {'param_1':10}) - - self.assert_compile( - select([t]).offset(10), - "SELECT t.col1, t.col2 FROM t LIMIT %s, 18446744073709551615", - {'param_1':10} - ) - - def test_varchar_raise(self): - for type_ in ( - String, - VARCHAR, - String(), - VARCHAR(), - NVARCHAR(), - Unicode, - Unicode(), - ): - type_ = sqltypes.to_instance(type_) - assert_raises_message( - exc.CompileError, - "VARCHAR requires a length on dialect mysql", - type_.compile, - dialect=mysql.dialect() - ) - - t1 = Table('sometable', MetaData(), - Column('somecolumn', type_) - ) - assert_raises_message( - exc.CompileError, - r"\(in table 'sometable', column 'somecolumn'\)\: " - r"(?:N)?VARCHAR requires a length on dialect mysql", - schema.CreateTable(t1).compile, - dialect=mysql.dialect() - ) - - def test_update_limit(self): - t = sql.table('t', sql.column('col1'), sql.column('col2')) - - self.assert_compile( - t.update(values={'col1':123}), - "UPDATE t SET col1=%s" - ) - self.assert_compile( - t.update(values={'col1':123}, mysql_limit=5), - "UPDATE t SET col1=%s LIMIT 5" - ) - self.assert_compile( - t.update(values={'col1':123}, mysql_limit=None), - "UPDATE t SET col1=%s" - ) - self.assert_compile( - t.update(t.c.col2==456, values={'col1':123}, mysql_limit=1), - "UPDATE t SET col1=%s WHERE t.col2 = %s LIMIT 1" - ) - - def test_utc_timestamp(self): - self.assert_compile(func.utc_timestamp(), "UTC_TIMESTAMP") - - def test_sysdate(self): - self.assert_compile(func.sysdate(), "SYSDATE()") - - def test_cast(self): - t = sql.table('t', sql.column('col')) - m = mysql - - specs = [ - (Integer, "CAST(t.col AS SIGNED INTEGER)"), - (INT, "CAST(t.col AS SIGNED INTEGER)"), - (m.MSInteger, "CAST(t.col AS SIGNED INTEGER)"), - (m.MSInteger(unsigned=True), "CAST(t.col AS UNSIGNED INTEGER)"), - (SmallInteger, "CAST(t.col AS SIGNED INTEGER)"), - (m.MSSmallInteger, "CAST(t.col AS SIGNED INTEGER)"), - (m.MSTinyInteger, "CAST(t.col AS SIGNED INTEGER)"), - # 'SIGNED INTEGER' is a bigint, so this is ok. - (m.MSBigInteger, "CAST(t.col AS SIGNED INTEGER)"), - (m.MSBigInteger(unsigned=False), "CAST(t.col AS SIGNED INTEGER)"), - (m.MSBigInteger(unsigned=True), - "CAST(t.col AS UNSIGNED INTEGER)"), - (m.MSBit, "t.col"), - - # this is kind of sucky. thank you default arguments! - (NUMERIC, "CAST(t.col AS DECIMAL)"), - (DECIMAL, "CAST(t.col AS DECIMAL)"), - (Numeric, "CAST(t.col AS DECIMAL)"), - (m.MSNumeric, "CAST(t.col AS DECIMAL)"), - (m.MSDecimal, "CAST(t.col AS DECIMAL)"), - - (FLOAT, "t.col"), - (Float, "t.col"), - (m.MSFloat, "t.col"), - (m.MSDouble, "t.col"), - (m.MSReal, "t.col"), - - (TIMESTAMP, "CAST(t.col AS DATETIME)"), - (DATETIME, "CAST(t.col AS DATETIME)"), - (DATE, "CAST(t.col AS DATE)"), - (TIME, "CAST(t.col AS TIME)"), - (DateTime, "CAST(t.col AS DATETIME)"), - (Date, "CAST(t.col AS DATE)"), - (Time, "CAST(t.col AS TIME)"), - (DateTime, "CAST(t.col AS DATETIME)"), - (Date, "CAST(t.col AS DATE)"), - (m.MSTime, "CAST(t.col AS TIME)"), - (m.MSTimeStamp, "CAST(t.col AS DATETIME)"), - (m.MSYear, "t.col"), - (m.MSYear(2), "t.col"), - (Interval, "t.col"), - - (String, "CAST(t.col AS CHAR)"), - (Unicode, "CAST(t.col AS CHAR)"), - (UnicodeText, "CAST(t.col AS CHAR)"), - (VARCHAR, "CAST(t.col AS CHAR)"), - (NCHAR, "CAST(t.col AS CHAR)"), - (CHAR, "CAST(t.col AS CHAR)"), - (CLOB, "CAST(t.col AS CHAR)"), - (TEXT, "CAST(t.col AS CHAR)"), - (String(32), "CAST(t.col AS CHAR(32))"), - (Unicode(32), "CAST(t.col AS CHAR(32))"), - (CHAR(32), "CAST(t.col AS CHAR(32))"), - (m.MSString, "CAST(t.col AS CHAR)"), - (m.MSText, "CAST(t.col AS CHAR)"), - (m.MSTinyText, "CAST(t.col AS CHAR)"), - (m.MSMediumText, "CAST(t.col AS CHAR)"), - (m.MSLongText, "CAST(t.col AS CHAR)"), - (m.MSNChar, "CAST(t.col AS CHAR)"), - (m.MSNVarChar, "CAST(t.col AS CHAR)"), - - (LargeBinary, "CAST(t.col AS BINARY)"), - (BLOB, "CAST(t.col AS BINARY)"), - (m.MSBlob, "CAST(t.col AS BINARY)"), - (m.MSBlob(32), "CAST(t.col AS BINARY)"), - (m.MSTinyBlob, "CAST(t.col AS BINARY)"), - (m.MSMediumBlob, "CAST(t.col AS BINARY)"), - (m.MSLongBlob, "CAST(t.col AS BINARY)"), - (m.MSBinary, "CAST(t.col AS BINARY)"), - (m.MSBinary(32), "CAST(t.col AS BINARY)"), - (m.MSVarBinary, "CAST(t.col AS BINARY)"), - (m.MSVarBinary(32), "CAST(t.col AS BINARY)"), - - # maybe this could be changed to something more DWIM, needs - # testing - (Boolean, "t.col"), - (BOOLEAN, "t.col"), - - (m.MSEnum, "t.col"), - (m.MSEnum("1", "2"), "t.col"), - (m.MSSet, "t.col"), - (m.MSSet("1", "2"), "t.col"), - ] - - for type_, expected in specs: - self.assert_compile(cast(t.c.col, type_), expected) - - def test_no_cast_pre_4(self): - self.assert_compile( - cast(Column('foo', Integer), String), - "CAST(foo AS CHAR)", - ) - dialect = mysql.dialect() - dialect.server_version_info = (3, 2, 3) - self.assert_compile( - cast(Column('foo', Integer), String), - "foo", - dialect=dialect - ) - - def test_cast_grouped_expression_non_castable(self): - self.assert_compile( - cast(sql.column('x') + sql.column('y'), Float), - "(x + y)" - ) - - def test_cast_grouped_expression_pre_4(self): - dialect = mysql.dialect() - dialect.server_version_info = (3, 2, 3) - self.assert_compile( - cast(sql.column('x') + sql.column('y'), Integer), - "(x + y)", - dialect=dialect - ) - - def test_extract(self): - t = sql.table('t', sql.column('col1')) - - for field in 'year', 'month', 'day': - self.assert_compile( - select([extract(field, t.c.col1)]), - "SELECT EXTRACT(%s FROM t.col1) AS anon_1 FROM t" % field) - - # millsecondS to millisecond - self.assert_compile( - select([extract('milliseconds', t.c.col1)]), - "SELECT EXTRACT(millisecond FROM t.col1) AS anon_1 FROM t") - - def test_too_long_index(self): - exp = 'ix_zyrenian_zyme_zyzzogeton_zyzzogeton_zyrenian_zyme_zyz_5cd2' - tname = 'zyrenian_zyme_zyzzogeton_zyzzogeton' - cname = 'zyrenian_zyme_zyzzogeton_zo' - - t1 = Table(tname, MetaData(), - Column(cname, Integer, index=True), - ) - ix1 = list(t1.indexes)[0] - - self.assert_compile( - schema.CreateIndex(ix1), - "CREATE INDEX %s " - "ON %s (%s)" % (exp, tname, cname) - ) - - def test_innodb_autoincrement(self): - t1 = Table('sometable', MetaData(), Column('assigned_id', - Integer(), primary_key=True, autoincrement=False), - Column('id', Integer(), primary_key=True, - autoincrement=True), mysql_engine='InnoDB') - self.assert_compile(schema.CreateTable(t1), - 'CREATE TABLE sometable (assigned_id ' - 'INTEGER NOT NULL, id INTEGER NOT NULL ' - 'AUTO_INCREMENT, PRIMARY KEY (assigned_id, ' - 'id), KEY idx_autoinc_id (id))ENGINE=Inn' - 'oDB') - - t1 = Table('sometable', MetaData(), Column('assigned_id', - Integer(), primary_key=True, autoincrement=True), - Column('id', Integer(), primary_key=True, - autoincrement=False), mysql_engine='InnoDB') - self.assert_compile(schema.CreateTable(t1), - 'CREATE TABLE sometable (assigned_id ' - 'INTEGER NOT NULL AUTO_INCREMENT, id ' - 'INTEGER NOT NULL, PRIMARY KEY ' - '(assigned_id, id))ENGINE=InnoDB') - - def test_innodb_autoincrement_reserved_word_column_name(self): - t1 = Table( - 'sometable', MetaData(), - Column('id', Integer(), primary_key=True, autoincrement=False), - Column('order', Integer(), primary_key=True, autoincrement=True), - mysql_engine='InnoDB') - self.assert_compile( - schema.CreateTable(t1), - 'CREATE TABLE sometable (' - 'id INTEGER NOT NULL, ' - '`order` INTEGER NOT NULL AUTO_INCREMENT, ' - 'PRIMARY KEY (id, `order`), ' - 'KEY idx_autoinc_order (`order`)' - ')ENGINE=InnoDB') - - -class SQLModeDetectionTest(fixtures.TestBase): - __only_on__ = 'mysql' - - def _options(self, modes): - def connect(con, record): - cursor = con.cursor() - print("DOING THiS:", "set sql_mode='%s'" % (",".join(modes))) - cursor.execute("set sql_mode='%s'" % (",".join(modes))) - e = engines.testing_engine(options={ - 'pool_events':[ - (connect, 'first_connect'), - (connect, 'connect') - ] - }) - return e - - def test_backslash_escapes(self): - engine = self._options(['NO_BACKSLASH_ESCAPES']) - c = engine.connect() - assert not engine.dialect._backslash_escapes - c.close() - engine.dispose() - - engine = self._options([]) - c = engine.connect() - assert engine.dialect._backslash_escapes - c.close() - engine.dispose() - - def test_ansi_quotes(self): - engine = self._options(['ANSI_QUOTES']) - c = engine.connect() - assert engine.dialect._server_ansiquotes - c.close() - engine.dispose() - - def test_combination(self): - engine = self._options(['ANSI_QUOTES,NO_BACKSLASH_ESCAPES']) - c = engine.connect() - assert engine.dialect._server_ansiquotes - assert not engine.dialect._backslash_escapes - c.close() - engine.dispose() - -class RawReflectionTest(fixtures.TestBase): - def setup(self): - dialect = mysql.dialect() - self.parser = mysql.MySQLTableDefinitionParser(dialect, dialect.identifier_preparer) - - def test_key_reflection(self): - regex = self.parser._re_key - - assert regex.match(' PRIMARY KEY (`id`),') - assert regex.match(' PRIMARY KEY USING BTREE (`id`),') - assert regex.match(' PRIMARY KEY (`id`) USING BTREE,') - assert regex.match(' PRIMARY KEY (`id`)') - assert regex.match(' PRIMARY KEY USING BTREE (`id`)') - assert regex.match(' PRIMARY KEY (`id`) USING BTREE') - - -class ExecutionTest(fixtures.TestBase): - """Various MySQL execution special cases.""" - - __only_on__ = 'mysql' - - def test_charset_caching(self): - engine = engines.testing_engine() - - cx = engine.connect() - meta = MetaData() - charset = engine.dialect._detect_charset(cx) - - meta.reflect(cx) - eq_(cx.dialect._connection_charset, charset) - cx.close() - - def test_sysdate(self): - d = testing.db.scalar(func.sysdate()) - assert isinstance(d, datetime.datetime) - -class MatchTest(fixtures.TestBase, AssertsCompiledSQL): - __only_on__ = 'mysql' - - @classmethod - def setup_class(cls): - global metadata, cattable, matchtable - metadata = MetaData(testing.db) - - cattable = Table('cattable', metadata, - Column('id', Integer, primary_key=True), - Column('description', String(50)), - mysql_engine='MyISAM' - ) - matchtable = Table('matchtable', metadata, - Column('id', Integer, primary_key=True), - Column('title', String(200)), - Column('category_id', Integer, ForeignKey('cattable.id')), - mysql_engine='MyISAM' - ) - metadata.create_all() - - cattable.insert().execute([ - {'id': 1, 'description': 'Python'}, - {'id': 2, 'description': 'Ruby'}, - ]) - matchtable.insert().execute([ - {'id': 1, - 'title': 'Agile Web Development with Rails', - 'category_id': 2}, - {'id': 2, - 'title': 'Dive Into Python', - 'category_id': 1}, - {'id': 3, - 'title': "Programming Matz's Ruby", - 'category_id': 2}, - {'id': 4, - 'title': 'The Definitive Guide to Django', - 'category_id': 1}, - {'id': 5, - 'title': 'Python in a Nutshell', - 'category_id': 1} - ]) - - @classmethod - def teardown_class(cls): - metadata.drop_all() - - @testing.fails_on('mysql+mysqlconnector', 'uses pyformat') - def test_expression(self): - format = testing.db.dialect.paramstyle == 'format' and '%s' or '?' - self.assert_compile( - matchtable.c.title.match('somstr'), - "MATCH (matchtable.title) AGAINST (%s IN BOOLEAN MODE)" % format) - - @testing.fails_on('mysql+mysqldb', 'uses format') - @testing.fails_on('mysql+pymysql', 'uses format') - @testing.fails_on('mysql+cymysql', 'uses format') - @testing.fails_on('mysql+oursql', 'uses format') - @testing.fails_on('mysql+pyodbc', 'uses format') - @testing.fails_on('mysql+zxjdbc', 'uses format') - def test_expression(self): - format = '%(title_1)s' - self.assert_compile( - matchtable.c.title.match('somstr'), - "MATCH (matchtable.title) AGAINST (%s IN BOOLEAN MODE)" % format) - - def test_simple_match(self): - results = (matchtable.select(). - where(matchtable.c.title.match('python')). - order_by(matchtable.c.id). - execute(). - fetchall()) - eq_([2, 5], [r.id for r in results]) - - def test_simple_match_with_apostrophe(self): - results = (matchtable.select(). - where(matchtable.c.title.match("Matz's")). - execute(). - fetchall()) - eq_([3], [r.id for r in results]) - - def test_or_match(self): - results1 = (matchtable.select(). - where(or_(matchtable.c.title.match('nutshell'), - matchtable.c.title.match('ruby'))). - order_by(matchtable.c.id). - execute(). - fetchall()) - eq_([3, 5], [r.id for r in results1]) - results2 = (matchtable.select(). - where(matchtable.c.title.match('nutshell ruby')). - order_by(matchtable.c.id). - execute(). - fetchall()) - eq_([3, 5], [r.id for r in results2]) - - - def test_and_match(self): - results1 = (matchtable.select(). - where(and_(matchtable.c.title.match('python'), - matchtable.c.title.match('nutshell'))). - execute(). - fetchall()) - eq_([5], [r.id for r in results1]) - results2 = (matchtable.select(). - where(matchtable.c.title.match('+python +nutshell')). - execute(). - fetchall()) - eq_([5], [r.id for r in results2]) - - def test_match_across_joins(self): - results = (matchtable.select(). - where(and_(cattable.c.id==matchtable.c.category_id, - or_(cattable.c.description.match('Ruby'), - matchtable.c.title.match('nutshell')))). - order_by(matchtable.c.id). - execute(). - fetchall()) - eq_([1, 3, 5], [r.id for r in results]) - - -def colspec(c): - return testing.db.dialect.ddl_compiler( - testing.db.dialect, None).get_column_specification(c) - |