diff options
-rw-r--r-- | CHANGES | 6 | ||||
-rw-r--r-- | lib/sqlalchemy/databases/mysql.py | 100 | ||||
-rw-r--r-- | test/dialect/mysql.py | 45 | ||||
-rw-r--r-- | test/sql/select.py | 274 |
4 files changed, 258 insertions, 167 deletions
@@ -145,6 +145,12 @@ CHANGES - Added test coverage for unknown type reflection. Fixed sqlite/mysql handling of type reflection for unknown types. + - Added REAL for mysql dialect (for folks exploiting the + REAL_AS_FLOAT sql mode). + + - mysql Float, MSFloat and MSDouble constructed without arguments + now produce no-argument DDL, e.g.'FLOAT'. + - misc - Removed unused util.hash(). diff --git a/lib/sqlalchemy/databases/mysql.py b/lib/sqlalchemy/databases/mysql.py index f8c52b2e1..eb8dede05 100644 --- a/lib/sqlalchemy/databases/mysql.py +++ b/lib/sqlalchemy/databases/mysql.py @@ -365,10 +365,10 @@ class MSDecimal(MSNumeric): return self._extend("DECIMAL(%(precision)s, %(length)s)" % {'precision': self.precision, 'length' : self.length}) -class MSDouble(MSNumeric): +class MSDouble(sqltypes.Float, _NumericType): """MySQL DOUBLE type.""" - def __init__(self, precision=10, length=2, asdecimal=True, **kw): + def __init__(self, precision=None, length=None, asdecimal=True, **kw): """Construct a DOUBLE. precision @@ -389,8 +389,14 @@ class MSDouble(MSNumeric): if ((precision is None and length is not None) or (precision is not None and length is None)): - raise exceptions.ArgumentError("You must specify both precision and length or omit both altogether.") - super(MSDouble, self).__init__(precision, length, asdecimal=asdecimal, **kw) + raise exceptions.ArgumentError( + "You must specify both precision and length or omit " + "both altogether.") + + _NumericType.__init__(self, **kw) + sqltypes.Float.__init__(self, asdecimal=asdecimal) + self.length = length + self.precision = precision def get_col_spec(self): if self.precision is not None and self.length is not None: @@ -401,10 +407,42 @@ class MSDouble(MSNumeric): return self._extend('DOUBLE') +class MSReal(MSDouble): + """MySQL REAL type.""" + + def __init__(self, precision=None, length=None, asdecimal=True, **kw): + """Construct a REAL. + + precision + Total digits in this number. If length and precision are both None, + values are stored to limits allowed by the server. + + length + The number of digits after the decimal point. + + unsigned + Optional. + + zerofill + Optional. If true, values will be stored as strings left-padded with + zeros. Note that this does not effect the values returned by the + underlying database API, which continue to be numeric. + """ + MSDouble.__init__(self, precision, length, asdecimal, **kw) + + def get_col_spec(self): + if self.precision is not None and self.length is not None: + return self._extend("REAL(%(precision)s, %(length)s)" % + {'precision': self.precision, + 'length' : self.length}) + else: + return self._extend('REAL') + + class MSFloat(sqltypes.Float, _NumericType): """MySQL FLOAT type.""" - def __init__(self, precision=10, length=None, asdecimal=False, **kw): + def __init__(self, precision=None, length=None, asdecimal=False, **kw): """Construct a FLOAT. precision @@ -423,12 +461,13 @@ class MSFloat(sqltypes.Float, _NumericType): underlying database API, which continue to be numeric. """ - self.length = length _NumericType.__init__(self, **kw) - sqltypes.Float.__init__(self, precision, asdecimal=asdecimal) + sqltypes.Float.__init__(self, asdecimal=asdecimal) + self.length = length + self.precision = precision def get_col_spec(self): - if hasattr(self, 'length') and self.length is not None: + if self.length is not None and self.precision is not None: return self._extend("FLOAT(%s, %s)" % (self.precision, self.length)) elif self.precision is not None: return self._extend("FLOAT(%s)" % (self.precision,)) @@ -1284,6 +1323,8 @@ colspecs = { sqltypes.NCHAR: MSNChar, sqltypes.TIMESTAMP: MSTimeStamp, sqltypes.BLOB: MSBlob, + MSDouble: MSDouble, + MSReal: MSReal, _BinaryType: _BinaryType, } @@ -1798,15 +1839,46 @@ class MySQLCompiler(compiler.DefaultCompiler): sql_operators.mod: '%%' }) - def visit_cast(self, cast, **kwargs): - if isinstance(cast.type, (sqltypes.Date, sqltypes.Time, - sqltypes.DateTime)): - return super(MySQLCompiler, self).visit_cast(cast, **kwargs) + def visit_typeclause(self, typeclause): + type_ = typeclause.type.dialect_impl(self.dialect) + if isinstance(type_, MSInteger): + if getattr(type_, 'unsigned', False): + return 'UNSIGNED INTEGER' + else: + return 'SIGNED INTEGER' + elif isinstance(type_, (MSChar, MSDecimal, MSDateTime, + MSDate, MSTime)): + return type_.get_col_spec() + elif isinstance(type_, (MSText, MSNChar, MSNVarChar)): + return 'CHAR' + elif type(type_) is MSString: + if getattr(type_, 'length'): + return 'CHAR(%s)' % type_.length + else: + return 'CHAR' + elif isinstance(type_, _BinaryType): + return 'BINARY' + elif isinstance(type_, MSNumeric): + return type_.get_col_spec().replace('NUMERIC', 'DECIMAL') + elif isinstance(type_, MSTimeStamp): + return 'DATETIME' + elif isinstance(type_, (MSDateTime, MSDate, MSTime)): + return type_.get_col_spec() else: - # so just skip the CAST altogether for now. - # TODO: put whatever MySQL does for CAST here. + return None + + def visit_cast(self, cast, **kwargs): + # No cast until 4, no decimals until 5. + type_ = self.process(cast.typeclause) + if type_ is None: return self.process(cast.clause) + if self.stack and self.stack[-1].get('select'): + # not sure if we want to set the typemap here... + self.typemap.setdefault("CAST", cast.type) + return 'CAST(%s AS %s)' % (self.process(cast.clause), type_) + + def get_select_precolumns(self, select): if isinstance(select._distinct, basestring): return select._distinct.upper() + " " diff --git a/test/dialect/mysql.py b/test/dialect/mysql.py index d43f5cd16..afe035462 100644 --- a/test/dialect/mysql.py +++ b/test/dialect/mysql.py @@ -46,7 +46,7 @@ class TypesTest(AssertMixin): @testing.supported('mysql') def test_numeric(self): "Exercise type specification and options for numeric types." - + columns = [ # column type, args, kwargs, expected ddl # e.g. Column(Integer(10, unsigned=True)) == 'INTEGER(10) UNSIGNED' @@ -80,8 +80,6 @@ class TypesTest(AssertMixin): (mysql.MSDouble, [None, None], {}, 'DOUBLE'), - (mysql.MSDouble, [12], {}, - 'DOUBLE(12, 2)'), (mysql.MSDouble, [12, 4], {'unsigned':True}, 'DOUBLE(12, 4) UNSIGNED'), (mysql.MSDouble, [12, 4], {'zerofill':True}, @@ -89,8 +87,17 @@ class TypesTest(AssertMixin): (mysql.MSDouble, [12, 4], {'zerofill':True, 'unsigned':True}, 'DOUBLE(12, 4) UNSIGNED ZEROFILL'), + (mysql.MSReal, [None, None], {}, + 'REAL'), + (mysql.MSReal, [12, 4], {'unsigned':True}, + 'REAL(12, 4) UNSIGNED'), + (mysql.MSReal, [12, 4], {'zerofill':True}, + 'REAL(12, 4) ZEROFILL'), + (mysql.MSReal, [12, 4], {'zerofill':True, 'unsigned':True}, + 'REAL(12, 4) UNSIGNED ZEROFILL'), + (mysql.MSFloat, [], {}, - 'FLOAT(10)'), + 'FLOAT'), (mysql.MSFloat, [None], {}, 'FLOAT'), (mysql.MSFloat, [12], {}, @@ -156,7 +163,7 @@ class TypesTest(AssertMixin): numeric_table = Table(*table_args) gen = testbase.db.dialect.schemagenerator(testbase.db.dialect, testbase.db, None, None) - + for col in numeric_table.c: index = int(col.name[1:]) self.assert_eq(gen.get_column_specification(col), @@ -169,7 +176,7 @@ class TypesTest(AssertMixin): except: raise numeric_table.drop() - + @testing.supported('mysql') @testing.exclude('mysql', '<', (4, 1, 1)) def test_charset(self): @@ -225,7 +232,7 @@ class TypesTest(AssertMixin): 'TINYTEXT CHARACTER SET utf8 COLLATE utf8_bin'), (mysql.MSMediumText, [], {'charset':'utf8', 'binary':True}, - 'MEDIUMTEXT CHARACTER SET utf8 BINARY'), + 'MEDIUMTEXT CHARACTER SET utf8 BINARY'), (mysql.MSLongText, [], {'ascii':True}, 'LONGTEXT ASCII'), @@ -241,7 +248,7 @@ class TypesTest(AssertMixin): charset_table = Table(*table_args) gen = testbase.db.dialect.schemagenerator(testbase.db.dialect, testbase.db, None, None) - + for col in charset_table.c: index = int(col.name[1:]) self.assert_eq(gen.get_column_specification(col), @@ -259,7 +266,7 @@ class TypesTest(AssertMixin): @testing.exclude('mysql', '<', (5, 0, 5)) def test_bit_50(self): """Exercise BIT types on 5.0+ (not valid for all engine types)""" - + meta = MetaData(testbase.db) bit_table = Table('mysql_bits', meta, Column('b1', mysql.MSBit), @@ -381,7 +388,7 @@ class TypesTest(AssertMixin): @testing.exclude('mysql', '<', (4, 1, 0)) def test_timestamp(self): """Exercise funky TIMESTAMP default syntax.""" - + meta = MetaData(testbase.db) try: @@ -450,7 +457,7 @@ class TypesTest(AssertMixin): self.assert_eq(colspec(table.c.y5), 'y5 YEAR(4)') finally: meta.drop_all() - + @testing.supported('mysql') def test_set(self): @@ -486,7 +493,7 @@ class TypesTest(AssertMixin): print "Found %s" % list(row) raise table.delete().execute() - + roundtrip([None, None, None],[None] * 3) roundtrip(['', '', ''], [set([''])] * 3) @@ -513,7 +520,7 @@ class TypesTest(AssertMixin): @testing.supported('mysql') def test_enum(self): """Exercise the ENUM type.""" - + db = testbase.db enum_table = Table('mysql_enum', MetaData(testbase.db), Column('e1', mysql.MSEnum("'a'", "'b'")), @@ -562,7 +569,7 @@ class TypesTest(AssertMixin): if testbase.db.dialect.dbapi.version_info < (1, 2, 2, 'beta', 3) and \ testbase.db.dialect.dbapi.version_info >= (1, 2, 2): # these mysqldb seem to always uses 'sets', even on later pythons - import sets + import sets def convert(value): if value is None: return value @@ -570,7 +577,7 @@ class TypesTest(AssertMixin): return sets.Set([]) else: return sets.Set([value]) - + e = [] for row in expected: e.append(tuple([convert(c) for c in row])) @@ -645,14 +652,14 @@ class TypesTest(AssertMixin): t_table = Table('mysql_types', m, *columns) try: m.create_all() - + m2 = MetaData(db) rt = Table('mysql_types', m2, autoload=True) try: db.execute('CREATE OR REPLACE VIEW mysql_types_v ' 'AS SELECT * from mysql_types') rv = Table('mysql_types_v', m2, autoload=True) - + expected = [len(c) > 1 and c[1] or c[0] for c in specs] # Early 5.0 releases seem to report more "general" for columns @@ -781,7 +788,7 @@ class SQLTest(SQLCompileTest): @testing.supported('mysql') def test_limit(self): t = sql.table('t', sql.column('col1'), sql.column('col2')) - + self.assert_compile( select([t]).limit(10).offset(20), "SELECT t.col1, t.col2 FROM t LIMIT 20, 10" @@ -816,7 +823,7 @@ class SQLTest(SQLCompileTest): ) def colspec(c): - return testbase.db.dialect.schemagenerator(testbase.db.dialect, + return testbase.db.dialect.schemagenerator(testbase.db.dialect, testbase.db, None, None).get_column_specification(c) if __name__ == "__main__": diff --git a/test/sql/select.py b/test/sql/select.py index 1315b47ba..699d05faa 100644 --- a/test/sql/select.py +++ b/test/sql/select.py @@ -8,23 +8,23 @@ from testlib import * # the select test now tests almost completely with TableClause/ColumnClause objects, -# which are free-roaming table/column objects not attached to any database. +# which are free-roaming table/column objects not attached to any database. # so SQLAlchemy's SQL construction engine can be used with no database dependencies at all. -table1 = table('mytable', +table1 = table('mytable', column('myid', Integer), column('name', String), column('description', String), ) table2 = table( - 'myothertable', + 'myothertable', column('otherid', Integer), column('othername', String), ) table3 = table( - 'thirdtable', + 'thirdtable', column('userid', Integer), column('otherstuff', String), ) @@ -38,13 +38,13 @@ table4 = Table( schema = 'remote_owner' ) -users = table('users', +users = table('users', column('user_id'), column('user_name'), column('password'), ) -addresses = table('addresses', +addresses = table('addresses', column('address_id'), column('user_id'), column('street'), @@ -54,7 +54,7 @@ addresses = table('addresses', ) class SelectTest(SQLCompileTest): - + def test_attribute_sanity(self): assert hasattr(table1, 'c') assert hasattr(table1.select(), 'c') @@ -67,7 +67,7 @@ class SelectTest(SQLCompileTest): assert not hasattr(table1.select().c.myid, 'columns') assert not hasattr(table1.alias().c.myid, 'columns') assert not hasattr(table1.alias().c.myid, 'c') - + def testtableselect(self): self.assert_compile(table1.select(), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable") @@ -85,29 +85,29 @@ myothertable.othername FROM mytable, myothertable") ) , "SELECT myid, name, description FROM (SELECT mytable.myid AS myid, mytable.name AS name, mytable.description AS description FROM mytable WHERE mytable.name = :mytable_name) WHERE myid = :myid") - + sq = select([table1]) self.assert_compile( sq.select(), "SELECT myid, name, description FROM (SELECT mytable.myid AS myid, mytable.name AS name, mytable.description AS description FROM mytable)" ) - + sq = select( [table1], ).alias('sq') self.assert_compile( - sq.select(sq.c.myid == 7), + sq.select(sq.c.myid == 7), "SELECT sq.myid, sq.name, sq.description FROM \ (SELECT mytable.myid AS myid, mytable.name AS name, mytable.description AS description FROM mytable) AS sq WHERE sq.myid = :sq_myid" ) - + sq = select( [table1, table2], and_(table1.c.myid ==7, table2.c.otherid==table1.c.myid), use_labels = True ).alias('sq') - + sqstring = "SELECT mytable.myid AS mytable_myid, mytable.name AS mytable_name, \ mytable.description AS mytable_description, myothertable.otherid AS myothertable_otherid, \ myothertable.othername AS myothertable_othername FROM mytable, myothertable \ @@ -126,7 +126,7 @@ sq2.sq_myothertable_otherid, sq2.sq_myothertable_othername FROM \ (SELECT sq.mytable_myid AS sq_mytable_myid, sq.mytable_name AS sq_mytable_name, \ sq.mytable_description AS sq_mytable_description, sq.myothertable_otherid AS sq_myothertable_otherid, \ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") AS sq) AS sq2") - + def testmssql_noorderbyinsubquery(self): """test that the ms-sql dialect removes ORDER BY clauses from subqueries""" dialect = mssql.dialect() @@ -143,17 +143,17 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A # TODO: this is probably incorrect; no "AS <foo>" is being applied to the table self.assert_compile(table1.join(table4, table1.c.myid==table4.c.rem_id).select(), "SELECT mytable.myid, mytable.name, mytable.description, remotetable.rem_id, remotetable.datatype_id, remotetable.value FROM mytable JOIN remote_owner.remotetable ON remotetable.rem_id = mytable.myid") - + def testdontovercorrelate(self): self.assert_compile(select([table1], from_obj=[table1, table1.select()]), """SELECT mytable.myid, mytable.name, mytable.description FROM mytable, (SELECT mytable.myid AS myid, mytable.name AS name, mytable.description AS description FROM mytable)""") - + def testexistsascolumnclause(self): self.assert_compile(exists([table1.c.myid], table1.c.myid==5).select(), "SELECT EXISTS (SELECT mytable.myid FROM mytable WHERE mytable.myid = :mytable_myid)", params={'mytable_myid':5}) self.assert_compile(select([table1, exists([1], from_obj=[table2])]), "SELECT mytable.myid, mytable.name, mytable.description, EXISTS (SELECT 1 FROM myothertable) FROM mytable", params={}) self.assert_compile(select([table1, exists([1], from_obj=[table2]).label('foo')]), "SELECT mytable.myid, mytable.name, mytable.description, EXISTS (SELECT 1 FROM myothertable) AS foo FROM mytable", params={}) - + def test_generative_exists(self): self.assert_compile( table1.select(exists([1], table2.c.otherid == table1.c.myid).correlate(table1)), @@ -174,7 +174,7 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A table1.select(exists([1]).where(table2.c.otherid == table1.c.myid).correlate(table1)).select_from(table1.join(table2, table1.c.myid==table2.c.otherid)).replace_selectable(table2, table2.alias()), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable JOIN myothertable AS myothertable_1 ON mytable.myid = myothertable_1.otherid WHERE EXISTS (SELECT 1 FROM myothertable AS myothertable_1 WHERE myothertable_1.otherid = mytable.myid)" ) - + def testwheresubquery(self): s = select([addresses.c.street], addresses.c.user_id==users.c.user_id, correlate=True).alias('s') self.assert_compile( @@ -182,10 +182,10 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A """SELECT users.user_id, users.user_name, users.password, s.street FROM users, (SELECT addresses.street AS street FROM addresses WHERE addresses.user_id = users.user_id) AS s""") self.assert_compile( - table1.select(table1.c.myid == select([table1.c.myid], table1.c.name=='jack')), + table1.select(table1.c.myid == select([table1.c.myid], table1.c.name=='jack')), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = (SELECT mytable.myid FROM mytable WHERE mytable.name = :mytable_name)" ) - + self.assert_compile( table1.select(table1.c.myid == select([table2.c.otherid], table1.c.name == table2.c.othername)), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = (SELECT myothertable.otherid FROM myothertable WHERE mytable.name = myothertable.othername)" @@ -207,7 +207,7 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A self.assert_compile( select([users, s.c.street], from_obj=[s]), """SELECT users.user_id, users.user_name, users.password, s.street FROM users, (SELECT addresses.street AS street FROM addresses WHERE addresses.user_id = users.user_id) AS s""") - + # test constructing the outer query via append_column(), which occurs in the ORM's Query object s = select([], exists([1], table2.c.otherid==table1.c.myid), from_obj=[table1]) s.append_column(table1) @@ -216,7 +216,7 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE EXISTS (SELECT 1 FROM myothertable WHERE myothertable.otherid = mytable.myid)" ) - + def testorderbysubquery(self): self.assert_compile( table1.select(order_by=[select([table2.c.otherid], table1.c.myid==table2.c.otherid)]), @@ -226,8 +226,8 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A table1.select(order_by=[desc(select([table2.c.otherid], table1.c.myid==table2.c.otherid))]), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable ORDER BY (SELECT myothertable.otherid FROM myothertable WHERE mytable.myid = myothertable.otherid) DESC" ) - - + + def test_scalar_select(self): s = select([table1.c.myid], scalar=True, correlate=False) self.assert_compile(select([table1, s]), "SELECT mytable.myid, mytable.name, mytable.description, (SELECT mytable.myid FROM mytable) FROM mytable") @@ -259,7 +259,7 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A s.columns.foo except exceptions.InvalidRequestError, err: assert str(err) == 'Scalar Select expression has no columns; use this object directly within a column-level expression.' - + zips = table('zips', column('zipcode'), column('latitude'), @@ -272,7 +272,7 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A zip = '12345' qlat = select([zips.c.latitude], zips.c.zipcode == zip).correlate(None).as_scalar() qlng = select([zips.c.longitude], zips.c.zipcode == zip).correlate(None).as_scalar() - + q = select([places.c.id, places.c.nm, zips.c.zipcode, func.latlondist(qlat, qlng).label('dist')], zips.c.zipcode==zip, order_by = ['dist', places.c.nm] @@ -281,7 +281,7 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A self.assert_compile(q,"SELECT places.id, places.nm, zips.zipcode, latlondist((SELECT zips.latitude FROM zips WHERE " "zips.zipcode = :zips_zipcode), (SELECT zips.longitude FROM zips WHERE zips.zipcode = :zips_zipcode_1)) AS dist " "FROM places, zips WHERE zips.zipcode = :zips_zipcode_2 ORDER BY dist, places.nm") - + zalias = zips.alias('main_zip') qlat = select([zips.c.latitude], zips.c.zipcode == zalias.c.zipcode, scalar=True) qlng = select([zips.c.longitude], zips.c.zipcode == zalias.c.zipcode, scalar=True) @@ -295,14 +295,14 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A j1 = table1.join(table2, table1.c.myid==table2.c.otherid) s2 = select([table1, s1], from_obj=[j1]) self.assert_compile(s2, "SELECT mytable.myid, mytable.name, mytable.description, (SELECT t2alias.otherid FROM myothertable AS t2alias WHERE mytable.myid = t2alias.otherid) FROM mytable JOIN myothertable ON mytable.myid = myothertable.otherid") - + def testlabelcomparison(self): x = func.lala(table1.c.myid).label('foo') self.assert_compile(select([x], x==5), "SELECT lala(mytable.myid) AS foo FROM mytable WHERE lala(mytable.myid) = :literal") - + def testand(self): self.assert_compile( - select(['*'], and_(table1.c.myid == 12, table1.c.name=='asdf', table2.c.othername == 'foo', "sysdate() = today()")), + select(['*'], and_(table1.c.myid == 12, table1.c.name=='asdf', table2.c.othername == 'foo', "sysdate() = today()")), "SELECT * FROM mytable, myothertable WHERE mytable.myid = :mytable_myid AND mytable.name = :mytable_name AND myothertable.othername = :myothertable_othername AND sysdate() = today()" ) @@ -311,7 +311,7 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A select([table1], and_( table1.c.myid == 12, or_(table2.c.othername=='asdf', table2.c.othername == 'foo', table2.c.otherid == 9), - "sysdate() = today()", + "sysdate() = today()", )), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable, myothertable WHERE mytable.myid = :mytable_myid AND (myothertable.othername = :myothertable_othername OR myothertable.othername = :myothertable_othername_1 OR myothertable.otherid = :myothertable_otherid) AND sysdate() = today()", checkparams = {'myothertable_othername': 'asdf', 'myothertable_othername_1':'foo', 'myothertable_otherid': 9, 'mytable_myid': 12} @@ -337,7 +337,7 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A self.assert_compile( select([func.count(distinct(table1.c.myid))]), "SELECT count(DISTINCT mytable.myid) FROM mytable" ) - + def testoperators(self): # exercise arithmetic operators for (py_op, sql_op) in ((operator.add, '+'), (operator.mul, '*'), @@ -384,22 +384,22 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A fwd_sql + "'\n or\n'" + rev_sql + "'") self.assert_compile( - table1.select((table1.c.myid != 12) & ~(table1.c.name=='john')), + table1.select((table1.c.myid != 12) & ~(table1.c.name=='john')), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid != :mytable_myid AND mytable.name != :mytable_name" ) self.assert_compile( - table1.select((table1.c.myid != 12) & ~(table1.c.name.between('jack','john'))), + table1.select((table1.c.myid != 12) & ~(table1.c.name.between('jack','john'))), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid != :mytable_myid AND NOT (mytable.name BETWEEN :mytable_name AND :mytable_name_1)" ) self.assert_compile( - table1.select((table1.c.myid != 12) & ~and_(table1.c.name=='john', table1.c.name=='ed', table1.c.name=='fred')), + table1.select((table1.c.myid != 12) & ~and_(table1.c.name=='john', table1.c.name=='ed', table1.c.name=='fred')), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid != :mytable_myid AND NOT (mytable.name = :mytable_name AND mytable.name = :mytable_name_1 AND mytable.name = :mytable_name_2)" ) self.assert_compile( - table1.select((table1.c.myid != 12) & ~table1.c.name), + table1.select((table1.c.myid != 12) & ~table1.c.name), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid != :mytable_myid AND NOT mytable.name" ) @@ -412,7 +412,7 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A table1.select(table1.c.myid.op('hoho')(12)==14), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE (mytable.myid hoho :mytable_myid) = :literal" ) - + # test that clauses can be pickled (operators need to be module-level, etc.) clause = (table1.c.myid == 12) & table1.c.myid.between(15, 20) & table1.c.myid.like('hoho') assert str(clause) == str(util.pickle.loads(util.pickle.dumps(clause))) @@ -439,7 +439,7 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A def testmultiparam(self): self.assert_compile( - select(["*"], or_(table1.c.myid == 12, table1.c.myid=='asdf', table1.c.myid == 'foo')), + select(["*"], or_(table1.c.myid == 12, table1.c.myid=='asdf', table1.c.myid == 'foo')), "SELECT * FROM mytable WHERE mytable.myid = :mytable_myid OR mytable.myid = :mytable_myid_1 OR mytable.myid = :mytable_myid_2" ) @@ -466,10 +466,10 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A select([table2.c.othername, func.count(table2.c.otherid)], group_by = [table2.c.othername], order_by = [table2.c.othername]), "SELECT myothertable.othername, count(myothertable.otherid) FROM myothertable GROUP BY myothertable.othername ORDER BY myothertable.othername" ) - + def testforupdate(self): self.assert_compile(table1.select(table1.c.myid==7, for_update=True), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid FOR UPDATE") - + self.assert_compile(table1.select(table1.c.myid==7, for_update="nowait"), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid FOR UPDATE") self.assert_compile(table1.select(table1.c.myid==7, for_update="nowait"), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid FOR UPDATE NOWAIT", dialect=oracle.dialect()) @@ -479,7 +479,7 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A self.assert_compile(table1.select(table1.c.myid==7, for_update=True), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = %s FOR UPDATE", dialect=mysql.dialect()) self.assert_compile(table1.select(table1.c.myid==7, for_update=True), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid FOR UPDATE", dialect=oracle.dialect()) - + def testalias(self): # test the alias for a table1. column names stay the same, table name "changes" to "foo". self.assert_compile( @@ -500,7 +500,7 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A # labels tablename_columnname, which become the column keys accessible off the Selectable object. # also, only use one column from the second table and all columns from the first table1. q = select([table1, table2.c.otherid], table1.c.myid == table2.c.otherid, use_labels = True) - + # make an alias of the "selectable". column names stay the same (i.e. the labels), table name "changes" to "t2view". a = alias(q, 't2view') @@ -514,13 +514,13 @@ t2view.mytable_description AS t2view_mytable_description, t2view.myothertable_ot myothertable.otherid AS myothertable_otherid FROM mytable, myothertable \ WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid = :t2view_mytable_myid" ) - - + + def test_prefixes(self): self.assert_compile(table1.select().prefix_with("SQL_CALC_FOUND_ROWS").prefix_with("SQL_SOME_WEIRD_MYSQL_THING"), "SELECT SQL_CALC_FOUND_ROWS SQL_SOME_WEIRD_MYSQL_THING mytable.myid, mytable.name, mytable.description FROM mytable" ) - + def testtext(self): self.assert_compile( text("select * from foo where lala = bar") , @@ -532,7 +532,7 @@ WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid = ["foobar(a)", "pk_foo_bar(syslaal)"], "a = 12", from_obj = ["foobar left outer join lala on foobar.foo = lala.foo"] - ), + ), "SELECT foobar(a), pk_foo_bar(syslaal) FROM foobar left outer join lala on foobar.foo = lala.foo WHERE a = 12") # test unicode @@ -540,7 +540,7 @@ WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid = [u"foobar(a)", u"pk_foo_bar(syslaal)"], u"a = 12", from_obj = [u"foobar left outer join lala on foobar.foo = lala.foo"] - ), + ), u"SELECT foobar(a), pk_foo_bar(syslaal) FROM foobar left outer join lala on foobar.foo = lala.foo WHERE a = 12") # test building a select query programmatically with text @@ -558,7 +558,7 @@ WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid = select(["column1", "column2"], from_obj=[table1]).alias('somealias').select(), "SELECT somealias.column1, somealias.column2 FROM (SELECT column1, column2 FROM mytable) AS somealias" ) - + # test that use_labels doesnt interfere with literal columns self.assert_compile( select(["column1", "column2", table1.c.myid], from_obj=[table1], use_labels=True), @@ -570,7 +570,7 @@ WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid = select(["column1 AS foobar", "column2 AS hoho", table1.c.myid], from_obj=[table1], use_labels=True), "SELECT column1 AS foobar, column2 AS hoho, mytable.myid AS mytable_myid FROM mytable" ) - + print "---------------------------------------------" s1 = select(["column1 AS foobar", "column2 AS hoho", table1.c.myid], from_obj=[table1]) print "---------------------------------------------" @@ -583,22 +583,22 @@ WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid = def testtextbinds(self): self.assert_compile( - text("select * from foo where lala=:bar and hoho=:whee", bindparams=[bindparam('bar', 4), bindparam('whee', 7)]), - "select * from foo where lala=:bar and hoho=:whee", + text("select * from foo where lala=:bar and hoho=:whee", bindparams=[bindparam('bar', 4), bindparam('whee', 7)]), + "select * from foo where lala=:bar and hoho=:whee", checkparams={'bar':4, 'whee': 7}, ) self.assert_compile( - text("select * from foo where clock='05:06:07'"), - "select * from foo where clock='05:06:07'", + text("select * from foo where clock='05:06:07'"), + "select * from foo where clock='05:06:07'", checkparams={}, params={}, ) dialect = postgres.dialect() self.assert_compile( - text("select * from foo where lala=:bar and hoho=:whee", bindparams=[bindparam('bar',4), bindparam('whee',7)]), - "select * from foo where lala=%(bar)s and hoho=%(whee)s", + text("select * from foo where lala=:bar and hoho=:whee", bindparams=[bindparam('bar',4), bindparam('whee',7)]), + "select * from foo where lala=%(bar)s and hoho=%(whee)s", checkparams={'bar':4, 'whee': 7}, dialect=dialect ) @@ -612,12 +612,12 @@ WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid = dialect = sqlite.dialect() self.assert_compile( - text("select * from foo where lala=:bar and hoho=:whee", bindparams=[bindparam('bar',4), bindparam('whee',7)]), - "select * from foo where lala=? and hoho=?", + text("select * from foo where lala=:bar and hoho=:whee", bindparams=[bindparam('bar',4), bindparam('whee',7)]), + "select * from foo where lala=? and hoho=?", checkparams={'bar':4, 'whee':7}, dialect=dialect ) - + def testtextmix(self): self.assert_compile(select( [table1, table2.c.otherid, "sysdate()", "foo, bar, lala"], @@ -626,7 +626,7 @@ WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid = "datetime(foo) = Today", table1.c.myid == table2.c.otherid, ) - ), + ), "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, sysdate(), foo, bar, lala \ FROM mytable, myothertable WHERE foo.id = foofoo(lala) AND datetime(foo) = Today AND mytable.myid = myothertable.otherid") @@ -635,11 +635,11 @@ FROM mytable, myothertable WHERE foo.id = foofoo(lala) AND datetime(foo) = Today [alias(table1, 't'), "foo.f"], "foo.f = t.id", from_obj = ["(select f from bar where lala=heyhey) foo"] - ), + ), "SELECT t.myid, t.name, t.description, foo.f FROM mytable AS t, (select f from bar where lala=heyhey) foo WHERE foo.f = t.id") def testliteral(self): - self.assert_compile(select([literal("foo") + literal("bar")], from_obj=[table1]), + self.assert_compile(select([literal("foo") + literal("bar")], from_obj=[table1]), "SELECT :literal || :literal_1 FROM mytable") def testcalculatedcolumns(self): @@ -665,35 +665,35 @@ FROM mytable, myothertable WHERE foo.id = foofoo(lala) AND datetime(foo) = Today select([value_tbl.c.id], value_tbl.c.val1 / (value_tbl.c.val2 - value_tbl.c.val1) /value_tbl.c.val1 > 2.0), "SELECT values.id FROM values WHERE values.val1 / (values.val2 - values.val1) / values.val1 > :literal" ) - + def testfunction(self): """tests the generation of functions using the func keyword""" # test an expression with a function - self.assert_compile(func.lala(3, 4, literal("five"), table1.c.myid) * table2.c.otherid, + self.assert_compile(func.lala(3, 4, literal("five"), table1.c.myid) * table2.c.otherid, "lala(:lala, :lala_1, :literal, mytable.myid) * myothertable.otherid") # test it in a SELECT - self.assert_compile(select([func.count(table1.c.myid)]), + self.assert_compile(select([func.count(table1.c.myid)]), "SELECT count(mytable.myid) FROM mytable") # test a "dotted" function name - self.assert_compile(select([func.foo.bar.lala(table1.c.myid)]), + self.assert_compile(select([func.foo.bar.lala(table1.c.myid)]), "SELECT foo.bar.lala(mytable.myid) FROM mytable") # test the bind parameter name with a "dotted" function name is only the name # (limits the length of the bind param name) - self.assert_compile(select([func.foo.bar.lala(12)]), + self.assert_compile(select([func.foo.bar.lala(12)]), "SELECT foo.bar.lala(:lala)") # test a dotted func off the engine itself self.assert_compile(func.lala.hoho(7), "lala.hoho(:hoho)") - + # test None becomes NULL self.assert_compile(func.my_func(1,2,None,3), "my_func(:my_func, :my_func_1, NULL, :my_func_2)") - + # test pickling self.assert_compile(util.pickle.loads(util.pickle.dumps(func.my_func(1, 2, None, 3))), "my_func(:my_func, :my_func_1, NULL, :my_func_2)") - + # assert func raises AttributeError for __bases__ attribute, since its not a class # fixes pydoc try: @@ -705,10 +705,10 @@ FROM mytable, myothertable WHERE foo.id = foofoo(lala) AND datetime(foo) = Today def test_functions_with_cols(self): from sqlalchemy.sql import column users = table('users', column('id'), column('name'), column('fullname')) - calculate = select([column('q'), column('z'), column('r')], + calculate = select([column('q'), column('z'), column('r')], from_obj=[func.calculate(bindparam('x'), bindparam('y'))]) - - self.assert_compile(select([users], users.c.id > calculate.c.z), + + self.assert_compile(select([users], users.c.id > calculate.c.z), "SELECT users.id, users.name, users.fullname " "FROM users, (SELECT q, z, r " "FROM calculate(:x, :y)) " @@ -717,10 +717,10 @@ FROM mytable, myothertable WHERE foo.id = foofoo(lala) AND datetime(foo) = Today print "--------------------------------------------------" s = select([users], users.c.id.between( - calculate.alias('c1').unique_params(x=17, y=45).c.z, + calculate.alias('c1').unique_params(x=17, y=45).c.z, calculate.alias('c2').unique_params(x=5, y=12).c.z)) - self.assert_compile(s, + self.assert_compile(s, "SELECT users.id, users.name, users.fullname " "FROM users, (SELECT q, z, r " "FROM calculate(:x, :y)) AS c1, (SELECT q, z, r " @@ -731,9 +731,9 @@ FROM mytable, myothertable WHERE foo.id = foofoo(lala) AND datetime(foo) = Today def testextract(self): """test the EXTRACT function""" self.assert_compile(select([extract("month", table3.c.otherstuff)]), "SELECT extract(month FROM thirdtable.otherstuff) FROM thirdtable") - + self.assert_compile(select([extract("day", func.to_date("03/20/2005", "MM/DD/YYYY"))]), "SELECT extract(day FROM to_date(:to_date, :to_date_1))") - + def testjoin(self): self.assert_compile( join(table2, table1, table1.c.myid == table2.c.otherid).select(), @@ -754,18 +754,18 @@ mytable.description FROM myothertable JOIN mytable ON mytable.myid = myothertabl ]), "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername, thirdtable.userid, thirdtable.otherstuff FROM mytable JOIN myothertable ON mytable.myid = myothertable.otherid JOIN thirdtable ON mytable.myid = thirdtable.userid" ) - + self.assert_compile( join(users, addresses, users.c.user_id==addresses.c.user_id).select(), "SELECT users.user_id, users.user_name, users.password, addresses.address_id, addresses.user_id, addresses.street, addresses.city, addresses.state, addresses.zip FROM users JOIN addresses ON users.user_id = addresses.user_id" ) - + def testmultijoin(self): self.assert_compile( select([table1, table2, table3], - + from_obj = [join(table1, table2, table1.c.myid == table2.c.otherid).outerjoin(table3, table1.c.myid==table3.c.userid)] - + #from_obj = [outerjoin(join(table, table2, table1.c.myid == table2.c.otherid), table3, table1.c.myid==table3.c.userid)] ) ,"SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername, thirdtable.userid, thirdtable.otherstuff FROM mytable JOIN myothertable ON mytable.myid = myothertable.otherid LEFT OUTER JOIN thirdtable ON mytable.myid = thirdtable.userid" @@ -776,19 +776,19 @@ mytable.description FROM myothertable JOIN mytable ON mytable.myid = myothertabl ) ,"SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername, thirdtable.userid, thirdtable.otherstuff FROM mytable LEFT OUTER JOIN (myothertable JOIN thirdtable ON myothertable.otherid = thirdtable.userid) ON mytable.myid = myothertable.otherid" ) - + def testunion(self): x = union( select([table1], table1.c.myid == 5), select([table1], table1.c.myid == 12), order_by = [table1.c.myid], ) - + self.assert_compile(x, "SELECT mytable.myid, mytable.name, mytable.description \ FROM mytable WHERE mytable.myid = :mytable_myid UNION \ SELECT mytable.myid, mytable.name, mytable.description \ FROM mytable WHERE mytable.myid = :mytable_myid_1 ORDER BY mytable.myid") - + self.assert_compile( union( select([table1]), @@ -799,14 +799,14 @@ FROM mytable WHERE mytable.myid = :mytable_myid_1 ORDER BY mytable.myid") "SELECT mytable.myid, mytable.name, mytable.description \ FROM mytable UNION SELECT myothertable.otherid, myothertable.othername \ FROM myothertable UNION SELECT thirdtable.userid, thirdtable.otherstuff FROM thirdtable") - + u = union( select([table1]), select([table2]), select([table3]) ) assert u.corresponding_column(table2.c.otherid) is u.c.otherid - + self.assert_compile( union( select([table1]), @@ -820,7 +820,7 @@ FROM mytable UNION SELECT myothertable.otherid, myothertable.othername \ FROM myothertable ORDER BY myid \ LIMIT 5 OFFSET 10" ) - + self.assert_compile( union( select([table1.c.myid, table1.c.name, func.max(table1.c.description)], table1.c.name=='name2', group_by=[table1.c.myid, table1.c.name]), @@ -858,7 +858,7 @@ SELECT thirdtable.userid FROM thirdtable)" "SELECT myothertable.otherid FROM myothertable EXCEPT SELECT thirdtable.userid FROM thirdtable \ UNION SELECT mytable.myid FROM mytable" ) - + def testouterjoin(self): query = select( [table1, table2], @@ -870,7 +870,7 @@ UNION SELECT mytable.myid FROM mytable" ), from_obj = [ outerjoin(table1, table2, table1.c.myid == table2.c.otherid) ] ) - self.assert_compile(query, + self.assert_compile(query, "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername \ FROM mytable LEFT OUTER JOIN myothertable ON mytable.myid = myothertable.otherid \ WHERE mytable.name = %(mytable_name)s OR mytable.myid = %(mytable_myid)s OR \ @@ -884,9 +884,9 @@ EXISTS (select yay from foo where boo = lar)", stmt, expected_named_stmt, expected_positional_stmt, - expected_default_params_dict, + expected_default_params_dict, expected_default_params_list, - test_param_dict, + test_param_dict, expected_test_params_dict, expected_test_params_list ) in [ @@ -945,7 +945,7 @@ EXISTS (select yay from foo where boo = lar)", {'myid':5, 'myid_1':6}, {'myid':5, 'myid_1':6}, [5,6] ), ]: - + self.assert_compile(stmt, expected_named_stmt, params=expected_default_params_dict) self.assert_compile(stmt, expected_positional_stmt, dialect=sqlite.dialect()) nonpositional = stmt.compile() @@ -955,7 +955,7 @@ EXISTS (select yay from foo where boo = lar)", assert nonpositional.get_params(**test_param_dict) == expected_test_params_dict, "expected :%s got %s" % (str(expected_test_params_dict), str(nonpositional.get_params(**test_param_dict).get_raw_dict())) pp = positional.get_params(**test_param_dict) assert [pp[k] for k in positional.positiontup] == expected_test_params_list - + # check that params() doesnt modify original statement s = select([table1], or_(table1.c.myid==bindparam('myid'), table2.c.otherid==bindparam('myotherid'))) s2 = s.params({'myid':8, 'myotherid':7}) @@ -963,8 +963,8 @@ EXISTS (select yay from foo where boo = lar)", assert s.compile().params == {'myid':None, 'myotherid':None} assert s2.compile().params == {'myid':8, 'myotherid':7} assert s3.compile().params == {'myid':9, 'myotherid':7} - - + + # check that conflicts with "unique" params are caught s = select([table1], or_(table1.c.myid==7, table1.c.myid==bindparam('mytable_myid'))) try: @@ -979,7 +979,7 @@ EXISTS (select yay from foo where boo = lar)", assert False except exceptions.CompileError, err: assert str(err) == "Bind parameter 'mytable_myid_1' conflicts with unique bind parameter of the same name" - + def testbindascol(self): t = table('foo', column('id')) @@ -987,7 +987,7 @@ EXISTS (select yay from foo where boo = lar)", s = select([t, literal('lala').label('hoho')]) self.assert_compile(s, "SELECT foo.id, :literal AS hoho FROM foo") assert [str(c) for c in s.c] == ["id", "hoho"] - + def testin(self): self.assert_compile(select([table1], table1.c.myid.in_(['a'])), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid)") @@ -1061,10 +1061,10 @@ EXISTS (select yay from foo where boo = lar)", WHERE mytable.myid IN (\ SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid \ UNION SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid_1)") - + # test that putting a select in an IN clause does not blow away its ORDER BY clause self.assert_compile( - select([table1, table2], + select([table1, table2], table2.c.otherid.in_( select([table2.c.otherid], order_by=[table2.c.othername], limit=10, correlate=False) ), @@ -1072,24 +1072,24 @@ UNION SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE ), "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername FROM mytable JOIN myothertable ON mytable.myid = myothertable.otherid WHERE myothertable.otherid IN (SELECT myothertable.otherid FROM myothertable ORDER BY myothertable.othername LIMIT 10) ORDER BY mytable.myid" ) - + # test empty in clause self.assert_compile(select([table1], table1.c.myid.in_([])), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE (CASE WHEN (mytable.myid IS NULL) THEN NULL ELSE 0 END = 1)") - + def test_in_deprecated_api(self): self.assert_compile(select([table1], table1.c.myid.in_('abc')), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid)") - + self.assert_compile(select([table1], table1.c.myid.in_(1)), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid)") - + self.assert_compile(select([table1], table1.c.myid.in_(1,2)), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid, :mytable_myid_1)") - + self.assert_compile(select([table1], table1.c.myid.in_()), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE (CASE WHEN (mytable.myid IS NULL) THEN NULL ELSE 0 END = 1)") - + def testcast(self): tbl = table('casttest', column('id', Integer), @@ -1097,7 +1097,7 @@ UNION SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE column('v2', Float), column('ts', TIMESTAMP), ) - + def check_results(dialect, expected_results, literal): self.assertEqual(len(expected_results), 5, 'Incorrect number of expected results') self.assertEqual(str(cast(tbl.c.v1, Numeric).compile(dialect=dialect)), 'CAST(casttest.v1 AS %s)' %expected_results[0]) @@ -1105,8 +1105,15 @@ UNION SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE self.assertEqual(str(cast(tbl.c.ts, Date).compile(dialect=dialect)), 'CAST(casttest.ts AS %s)' %expected_results[2]) self.assertEqual(str(cast(1234, TEXT).compile(dialect=dialect)), 'CAST(%s AS %s)' %(literal, expected_results[3])) self.assertEqual(str(cast('test', String(20)).compile(dialect=dialect)), 'CAST(%s AS %s)' %(literal, expected_results[4])) - sel = select([tbl, cast(tbl.c.v1, Numeric)]).compile(dialect=dialect) - self.assertEqual(str(sel), "SELECT casttest.id, casttest.v1, casttest.v2, casttest.ts, CAST(casttest.v1 AS NUMERIC(10, 2)) \nFROM casttest") + # fixme: shoving all of this dialect-specific stuff in one test + # is now officialy completely ridiculous AND non-obviously omits + # coverage on other dialects. + sel = select([tbl, cast(tbl.c.v1, Numeric)]).compile(dialect=dialect) + if isinstance(dialect, type(mysql.dialect())): + self.assertEqual(str(sel), "SELECT casttest.id, casttest.v1, casttest.v2, casttest.ts, CAST(casttest.v1 AS DECIMAL(10, 2)) \nFROM casttest") + else: + self.assertEqual(str(sel), "SELECT casttest.id, casttest.v1, casttest.v2, casttest.ts, CAST(casttest.v1 AS NUMERIC(10, 2)) \nFROM casttest") + # first test with Postgres engine check_results(postgres.dialect(), ['NUMERIC(10, 2)', 'NUMERIC(12, 9)', 'DATE', 'TEXT', 'VARCHAR(20)'], '%(literal)s') @@ -1116,18 +1123,17 @@ UNION SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE # then the sqlite engine check_results(sqlite.dialect(), ['NUMERIC(10, 2)', 'NUMERIC(12, 9)', 'DATE', 'TEXT', 'VARCHAR(20)'], '?') - # MySQL seems to only support DATE types for cast - self.assertEqual(str(cast(tbl.c.ts, Date).compile(dialect=mysql.dialect())), 'CAST(casttest.ts AS DATE)') - self.assertEqual(str(cast(tbl.c.ts, Numeric).compile(dialect=mysql.dialect())), 'casttest.ts') + # then the MySQL engine + check_results(mysql.dialect(), ['DECIMAL(10, 2)', 'DECIMAL(12, 9)', 'DATE', 'CHAR', 'CHAR(20)'], '%s') def testdatebetween(self): import datetime - table = Table('dt', metadata, + table = Table('dt', metadata, Column('date', Date)) self.assert_compile(table.select(table.c.date.between(datetime.date(2006,6,1), datetime.date(2006,6,5))), "SELECT dt.date FROM dt WHERE dt.date BETWEEN :dt_date AND :dt_date_1", checkparams={'dt_date':datetime.date(2006,6,1), 'dt_date_1':datetime.date(2006,6,5)}) self.assert_compile(table.select(sql.between(table.c.date, datetime.date(2006,6,1), datetime.date(2006,6,5))), "SELECT dt.date FROM dt WHERE dt.date BETWEEN :literal AND :literal_1", checkparams={'literal':datetime.date(2006,6,1), 'literal_1':datetime.date(2006,6,5)}) - + def test_operator_precedence(self): table = Table('op', metadata, Column('field', Integer)) @@ -1164,39 +1170,39 @@ class CRUDTest(SQLCompileTest): # insert with user-supplied bind params for specific columns, # cols provided literally self.assert_compile( - insert(table1, {table1.c.myid : bindparam('userid'), table1.c.name : bindparam('username')}), + insert(table1, {table1.c.myid : bindparam('userid'), table1.c.name : bindparam('username')}), "INSERT INTO mytable (myid, name) VALUES (:userid, :username)") - + # insert with user-supplied bind params for specific columns, cols # provided as strings self.assert_compile( - insert(table1, dict(myid = 3, name = 'jack')), + insert(table1, dict(myid = 3, name = 'jack')), "INSERT INTO mytable (myid, name) VALUES (:myid, :name)" ) # test with a tuple of params instead of named self.assert_compile( - insert(table1, (3, 'jack', 'mydescription')), + insert(table1, (3, 'jack', 'mydescription')), "INSERT INTO mytable (myid, name, description) VALUES (:myid, :name, :description)", checkparams = {'myid':3, 'name':'jack', 'description':'mydescription'} ) - + self.assert_compile( insert(table1, values={table1.c.myid : bindparam('userid')}).values({table1.c.name : bindparam('username')}), "INSERT INTO mytable (myid, name) VALUES (:userid, :username)" ) - + def testinlineinsert(self): metadata = MetaData() - table = Table('sometable', metadata, + table = Table('sometable', metadata, Column('id', Integer, primary_key=True), Column('foo', Integer, default=func.foobar())) - self.assert_compile(table.insert(values={}, inline=True), "INSERT INTO sometable (foo) VALUES (foobar())") - self.assert_compile(table.insert(inline=True), "INSERT INTO sometable (foo) VALUES (foobar())", params={}) - + self.assert_compile(table.insert(values={}, inline=True), "INSERT INTO sometable (foo) VALUES (foobar())") + self.assert_compile(table.insert(inline=True), "INSERT INTO sometable (foo) VALUES (foobar())", params={}) + def testinsertexpression(self): self.assert_compile(insert(table1, values=dict(myid=func.lala())), "INSERT INTO mytable (myid) VALUES (lala())") - + def testupdate(self): self.assert_compile(update(table1, table1.c.myid == 7), "UPDATE mytable SET name=:name WHERE mytable.myid = :mytable_myid", params = {table1.c.name:'fred'}) self.assert_compile(update(table1, table1.c.myid == 7), "UPDATE mytable SET name=:name WHERE mytable.myid = :mytable_myid", params = {'name':'fred'}) @@ -1209,9 +1215,9 @@ class CRUDTest(SQLCompileTest): c = s.compile(column_keys=['mytable_id', 'name']) self.assert_compile(update(table1, table1.c.myid == 12, values = {table1.c.name : table1.c.myid}).values({table1.c.name:table1.c.name + 'foo'}), "UPDATE mytable SET name=(mytable.name || :mytable_name), description=:description WHERE mytable.myid = :mytable_myid", params = {'description':'test'}) self.assert_(str(s) == str(c)) - + def testupdateexpression(self): - self.assert_compile(update(table1, + self.assert_compile(update(table1, (table1.c.myid == func.hoho(4)) & (table1.c.name == literal('foo') + table1.c.name + literal('lala')), values = { @@ -1219,7 +1225,7 @@ class CRUDTest(SQLCompileTest): table1.c.myid : func.do_stuff(table1.c.myid, literal('hoho')) }), "UPDATE mytable SET myid=do_stuff(mytable.myid, :literal), name=(mytable.name || :mytable_name) " "WHERE mytable.myid = hoho(:hoho) AND mytable.name = :literal_1 || mytable.name || :literal_2") - + def testcorrelatedupdate(self): # test against a straight text subquery u = update(table1, values = {table1.c.name : text("(select name from mytable where id=mytable.id)")}) @@ -1228,7 +1234,7 @@ class CRUDTest(SQLCompileTest): mt = table1.alias() u = update(table1, values = {table1.c.name : select([mt.c.name], mt.c.myid==table1.c.myid)}) self.assert_compile(u, "UPDATE mytable SET name=(SELECT mytable_1.name FROM mytable AS mytable_1 WHERE mytable_1.myid = mytable.myid)") - + # test against a regular constructed subquery s = select([table2], table2.c.otherid == table1.c.myid) u = update(table1, table1.c.name == 'jack', values = {table1.c.name : s}) @@ -1243,10 +1249,10 @@ class CRUDTest(SQLCompileTest): s = select([table2.c.othername], table2.c.otherid == table1.c.myid) u = table1.update(table1.c.name==s) self.assert_compile(u, "UPDATE mytable SET myid=:myid, name=:name, description=:description WHERE mytable.name = (SELECT myothertable.othername FROM myothertable WHERE myothertable.otherid = mytable.myid)") - + def testdelete(self): self.assert_compile(delete(table1, table1.c.myid == 7), "DELETE FROM mytable WHERE mytable.myid = :mytable_myid") - + def testcorrelateddelete(self): # test a non-correlated WHERE clause s = select([table2.c.othername], table2.c.otherid == 7) @@ -1263,7 +1269,7 @@ class InlineDefaultTest(SQLCompileTest): m = MetaData() foo = Table('foo', m, Column('id', Integer)) - + t = Table('test', m, Column('col1', Integer, default=func.foo(1)), Column('col2', Integer, default=select([func.coalesce(func.max(foo.c.id))])), @@ -1283,7 +1289,7 @@ class InlineDefaultTest(SQLCompileTest): ) self.assert_compile(t.update(inline=True, values={'col3':'foo'}), "UPDATE test SET col1=foo(:foo), col2=(SELECT coalesce(max(foo.id)) FROM foo), col3=:col3") - + class SchemaTest(SQLCompileTest): def testselect(self): # these tests will fail with the MS-SQL compiler since it will alias schema-qualified tables @@ -1297,12 +1303,12 @@ class SchemaTest(SQLCompileTest): def testalias(self): a = alias(table4, 'remtable') self.assert_compile(a.select(a.c.datatype_id==7), "SELECT remtable.rem_id, remtable.datatype_id, remtable.value FROM remote_owner.remotetable AS remtable WHERE remtable.datatype_id = :remtable_datatype_id") - + def testupdate(self): self.assert_compile(table4.update(table4.c.value=='test', values={table4.c.datatype_id:12}), "UPDATE remote_owner.remotetable SET datatype_id=:datatype_id WHERE remotetable.value = :remotetable_value") - + def testinsert(self): self.assert_compile(table4.insert(values=(2, 5, 'test')), "INSERT INTO remote_owner.remotetable (rem_id, datatype_id, value) VALUES (:rem_id, :datatype_id, :value)") - + if __name__ == "__main__": testbase.main() |