diff options
author | Mike Bayer <mike_mp@zzzcomputing.com> | 2015-09-27 12:09:24 -0400 |
---|---|---|
committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2015-09-27 12:09:24 -0400 |
commit | a8e1d33ae514a045d71d0a26d0c1325eecd4ca99 (patch) | |
tree | 1470e63f9b1b9faf3c46705f062c3f27e2b68211 /test/sql/test_query.py | |
parent | 91255618ddb47553774c620a23479adf88c27b74 (diff) | |
download | sqlalchemy-a8e1d33ae514a045d71d0a26d0c1325eecd4ca99.tar.gz |
- break out critical aspects of test_query into their own tests
finally, test_resultset and test_insert_exec. Update all
idioms within these.
Diffstat (limited to 'test/sql/test_query.py')
-rw-r--r-- | test/sql/test_query.py | 1420 |
1 files changed, 4 insertions, 1416 deletions
diff --git a/test/sql/test_query.py b/test/sql/test_query.py index 0313a9cd0..aca933fc9 100644 --- a/test/sql/test_query.py +++ b/test/sql/test_query.py @@ -2,13 +2,12 @@ from sqlalchemy.testing import eq_, assert_raises_message, assert_raises, \ is_, in_, not_in_ from sqlalchemy import testing from sqlalchemy.testing import fixtures, engines -from sqlalchemy import util from sqlalchemy import ( exc, sql, func, select, String, Integer, MetaData, and_, ForeignKey, - union, intersect, except_, union_all, VARCHAR, INT, CHAR, text, Sequence, - bindparam, literal, not_, type_coerce, literal_column, desc, asc, - TypeDecorator, or_, cast, table, column) -from sqlalchemy.engine import default, result as _result + union, intersect, except_, union_all, VARCHAR, INT, text, + bindparam, literal, not_, literal_column, desc, asc, + TypeDecorator, or_, cast) +from sqlalchemy.engine import default from sqlalchemy.testing.schema import Table, Column # ongoing - these are old tests. those which are of general use @@ -62,260 +61,6 @@ class QueryTest(fixtures.TestBase): def teardown_class(cls): metadata.drop_all() - @testing.requires.multivalues_inserts - def test_multivalues_insert(self): - users.insert( - values=[ - {'user_id': 7, 'user_name': 'jack'}, - {'user_id': 8, 'user_name': 'ed'}]).execute() - rows = users.select().order_by(users.c.user_id).execute().fetchall() - self.assert_(rows[0] == (7, 'jack')) - self.assert_(rows[1] == (8, 'ed')) - users.insert(values=[(9, 'jack'), (10, 'ed')]).execute() - rows = users.select().order_by(users.c.user_id).execute().fetchall() - self.assert_(rows[2] == (9, 'jack')) - self.assert_(rows[3] == (10, 'ed')) - - def test_insert_heterogeneous_params(self): - """test that executemany parameters are asserted to match the - parameter set of the first.""" - - assert_raises_message( - exc.StatementError, - r"\(sqlalchemy.exc.InvalidRequestError\) A value is required for " - "bind parameter 'user_name', in " - "parameter group 2 " - r"\[SQL: u?'INSERT INTO query_users", - users.insert().execute, - {'user_id': 7, 'user_name': 'jack'}, - {'user_id': 8, 'user_name': 'ed'}, - {'user_id': 9} - ) - - # this succeeds however. We aren't yet doing - # a length check on all subsequent parameters. - users.insert().execute( - {'user_id': 7}, - {'user_id': 8, 'user_name': 'ed'}, - {'user_id': 9} - ) - - def test_lastrow_accessor(self): - """Tests the inserted_primary_key and lastrow_has_id() functions.""" - - def insert_values(engine, table, values): - """ - Inserts a row into a table, returns the full list of values - INSERTed including defaults that fired off on the DB side and - detects rows that had defaults and post-fetches. - """ - - # verify implicit_returning is working - if engine.dialect.implicit_returning: - ins = table.insert() - comp = ins.compile(engine, column_keys=list(values)) - if not set(values).issuperset( - c.key for c in table.primary_key): - assert comp.returning - - result = engine.execute(table.insert(), **values) - ret = values.copy() - - for col, id in zip(table.primary_key, result.inserted_primary_key): - ret[col.key] = id - - if result.lastrow_has_defaults(): - criterion = and_( - *[ - col == id for col, id in - zip(table.primary_key, result.inserted_primary_key)]) - row = engine.execute(table.select(criterion)).first() - for c in table.c: - ret[c.key] = row[c] - return ret - - if testing.against('firebird', 'postgresql', 'oracle', 'mssql'): - assert testing.db.dialect.implicit_returning - - if testing.db.dialect.implicit_returning: - test_engines = [ - engines.testing_engine(options={'implicit_returning': False}), - engines.testing_engine(options={'implicit_returning': True}), - ] - else: - test_engines = [testing.db] - - for engine in test_engines: - metadata = MetaData() - for supported, table, values, assertvalues in [ - ( - {'unsupported': ['sqlite']}, - Table( - "t1", metadata, - Column( - 'id', Integer, primary_key=True, - test_needs_autoincrement=True), - Column('foo', String(30), primary_key=True)), - {'foo': 'hi'}, - {'id': 1, 'foo': 'hi'} - ), - ( - {'unsupported': ['sqlite']}, - Table( - "t2", metadata, - Column( - 'id', Integer, primary_key=True, - test_needs_autoincrement=True), - Column('foo', String(30), primary_key=True), - Column('bar', String(30), server_default='hi') - ), - {'foo': 'hi'}, - {'id': 1, 'foo': 'hi', 'bar': 'hi'} - ), - ( - {'unsupported': []}, - Table( - "t3", metadata, - Column("id", String(40), primary_key=True), - Column('foo', String(30), primary_key=True), - Column("bar", String(30)) - ), - {'id': 'hi', 'foo': 'thisisfoo', 'bar': "thisisbar"}, - {'id': 'hi', 'foo': 'thisisfoo', 'bar': "thisisbar"} - ), - ( - {'unsupported': []}, - Table( - "t4", metadata, - Column( - 'id', Integer, - Sequence('t4_id_seq', optional=True), - primary_key=True), - Column('foo', String(30), primary_key=True), - Column('bar', String(30), server_default='hi') - ), - {'foo': 'hi', 'id': 1}, - {'id': 1, 'foo': 'hi', 'bar': 'hi'} - ), - ( - {'unsupported': []}, - Table( - "t5", metadata, - Column('id', String(10), primary_key=True), - Column('bar', String(30), server_default='hi') - ), - {'id': 'id1'}, - {'id': 'id1', 'bar': 'hi'}, - ), - ( - {'unsupported': ['sqlite']}, - Table( - "t6", metadata, - Column( - 'id', Integer, primary_key=True, - test_needs_autoincrement=True), - Column('bar', Integer, primary_key=True) - ), - {'bar': 0}, - {'id': 1, 'bar': 0}, - ), - ]: - if testing.db.name in supported['unsupported']: - continue - try: - table.create(bind=engine, checkfirst=True) - i = insert_values(engine, table, values) - assert i == assertvalues, "tablename: %s %r %r" % \ - (table.name, repr(i), repr(assertvalues)) - finally: - table.drop(bind=engine) - - # TODO: why not in the sqlite suite? - @testing.only_on('sqlite+pysqlite') - @testing.provide_metadata - def test_lastrowid_zero(self): - from sqlalchemy.dialects import sqlite - eng = engines.testing_engine() - - class ExcCtx(sqlite.base.SQLiteExecutionContext): - - def get_lastrowid(self): - return 0 - eng.dialect.execution_ctx_cls = ExcCtx - t = Table( - 't', self.metadata, Column('x', Integer, primary_key=True), - Column('y', Integer)) - t.create(eng) - r = eng.execute(t.insert().values(y=5)) - eq_(r.inserted_primary_key, [0]) - - @testing.fails_on( - 'sqlite', "sqlite autoincremnt doesn't work with composite pks") - def test_misordered_lastrow(self): - related = Table( - 'related', metadata, - Column('id', Integer, primary_key=True), - mysql_engine='MyISAM' - ) - t6 = Table( - "t6", metadata, - Column( - 'manual_id', Integer, ForeignKey('related.id'), - primary_key=True), - Column( - 'auto_id', Integer, primary_key=True, - test_needs_autoincrement=True), - mysql_engine='MyISAM' - ) - - metadata.create_all() - r = related.insert().values(id=12).execute() - id = r.inserted_primary_key[0] - assert id == 12 - - r = t6.insert().values(manual_id=id).execute() - eq_(r.inserted_primary_key, [12, 1]) - - def test_implicit_id_insert_select_columns(self): - stmt = users.insert().from_select( - (users.c.user_id, users.c.user_name), - users.select().where(users.c.user_id == 20)) - - testing.db.execute(stmt) - - def test_implicit_id_insert_select_keys(self): - stmt = users.insert().from_select( - ["user_id", "user_name"], - users.select().where(users.c.user_id == 20)) - - testing.db.execute(stmt) - - def test_row_iteration(self): - users.insert().execute( - {'user_id': 7, 'user_name': 'jack'}, - {'user_id': 8, 'user_name': 'ed'}, - {'user_id': 9, 'user_name': 'fred'}, - ) - r = users.select().execute() - l = [] - for row in r: - l.append(row) - self.assert_(len(l) == 3) - - @testing.requires.subqueries - def test_anonymous_rows(self): - users.insert().execute( - {'user_id': 7, 'user_name': 'jack'}, - {'user_id': 8, 'user_name': 'ed'}, - {'user_id': 9, 'user_name': 'fred'}, - ) - - sel = select([users.c.user_id]).where(users.c.user_name == 'jack'). \ - as_scalar() - for row in select([sel + 1, sel + 3], bind=users.bind).execute(): - assert row['anon_1'] == 8 - assert row['anon_2'] == 10 - @testing.fails_on( 'firebird', "kinterbasdb doesn't send full type information") def test_order_by_label(self): @@ -365,154 +110,6 @@ class QueryTest(fixtures.TestBase): [("test: ed",), ("test: fred",), ("test: jack",)] ) - def test_row_comparison(self): - users.insert().execute(user_id=7, user_name='jack') - rp = users.select().execute().first() - - self.assert_(rp == rp) - self.assert_(not(rp != rp)) - - equal = (7, 'jack') - - self.assert_(rp == equal) - self.assert_(equal == rp) - self.assert_(not (rp != equal)) - self.assert_(not (equal != equal)) - - def endless(): - while True: - yield 1 - self.assert_(rp != endless()) - self.assert_(endless() != rp) - - # test that everything compares the same - # as it would against a tuple - import operator - for compare in [False, 8, endless(), 'xyz', (7, 'jack')]: - for op in [ - operator.eq, operator.ne, operator.gt, - operator.lt, operator.ge, operator.le - ]: - - try: - control = op(equal, compare) - except TypeError: - # Py3K raises TypeError for some invalid comparisons - assert_raises(TypeError, op, rp, compare) - else: - eq_(control, op(rp, compare)) - - try: - control = op(compare, equal) - except TypeError: - # Py3K raises TypeError for some invalid comparisons - assert_raises(TypeError, op, compare, rp) - else: - eq_(control, op(compare, rp)) - - @testing.provide_metadata - def test_column_label_overlap_fallback(self): - content = Table( - 'content', self.metadata, - Column('type', String(30)), - ) - bar = Table( - 'bar', self.metadata, - Column('content_type', String(30)) - ) - self.metadata.create_all(testing.db) - testing.db.execute(content.insert().values(type="t1")) - - row = testing.db.execute(content.select(use_labels=True)).first() - assert content.c.type in row - assert bar.c.content_type not in row - assert sql.column('content_type') in row - - row = testing.db.execute( - select([content.c.type.label("content_type")])).first() - assert content.c.type in row - - assert bar.c.content_type not in row - - assert sql.column('content_type') in row - - row = testing.db.execute(select([func.now().label("content_type")])). \ - first() - assert content.c.type not in row - - assert bar.c.content_type not in row - - assert sql.column('content_type') in row - - def test_pickled_rows(self): - users.insert().execute( - {'user_id': 7, 'user_name': 'jack'}, - {'user_id': 8, 'user_name': 'ed'}, - {'user_id': 9, 'user_name': 'fred'}, - ) - - for pickle in False, True: - for use_labels in False, True: - result = users.select(use_labels=use_labels).order_by( - users.c.user_id).execute().fetchall() - - if pickle: - result = util.pickle.loads(util.pickle.dumps(result)) - - eq_( - result, - [(7, "jack"), (8, "ed"), (9, "fred")] - ) - if use_labels: - eq_(result[0]['query_users_user_id'], 7) - eq_( - list(result[0].keys()), - ["query_users_user_id", "query_users_user_name"]) - else: - eq_(result[0]['user_id'], 7) - eq_(list(result[0].keys()), ["user_id", "user_name"]) - - eq_(result[0][0], 7) - eq_(result[0][users.c.user_id], 7) - eq_(result[0][users.c.user_name], 'jack') - - if not pickle or use_labels: - assert_raises( - exc.NoSuchColumnError, - lambda: result[0][addresses.c.user_id]) - else: - # test with a different table. name resolution is - # causing 'user_id' to match when use_labels wasn't used. - eq_(result[0][addresses.c.user_id], 7) - - assert_raises( - exc.NoSuchColumnError, lambda: result[0]['fake key']) - assert_raises( - exc.NoSuchColumnError, - lambda: result[0][addresses.c.address_id]) - - def test_column_error_printing(self): - row = testing.db.execute(select([1])).first() - - class unprintable(object): - - def __str__(self): - raise ValueError("nope") - - msg = r"Could not locate column in row for column '%s'" - - for accessor, repl in [ - ("x", "x"), - (Column("q", Integer), "q"), - (Column("q", Integer) + 12, r"q \+ :q_1"), - (unprintable(), "unprintable element.*"), - ]: - assert_raises_message( - exc.NoSuchColumnError, - msg % repl, - lambda: row[accessor] - ) - @testing.requires.boolean_col_expressions def test_or_and_as_columns(self): true, false = literal(True), literal(False) @@ -539,16 +136,6 @@ class QueryTest(fixtures.TestBase): assert row.x == True # noqa assert row.y == False # noqa - def test_fetchmany(self): - users.insert().execute(user_id=7, user_name='jack') - users.insert().execute(user_id=8, user_name='ed') - users.insert().execute(user_id=9, user_name='fred') - r = users.select().execute() - l = [] - for row in r.fetchmany(size=2): - l.append(row) - self.assert_(len(l) == 2, "fetchmany(size=2) got %s rows" % len(l)) - def test_like_ops(self): users.insert().execute( {'user_id': 1, 'user_name': 'apples'}, @@ -817,618 +404,6 @@ class QueryTest(fixtures.TestBase): use_labels=labels), [(3, 'a'), (2, 'b'), (1, None)]) - def test_column_slices(self): - users.insert().execute(user_id=1, user_name='john') - users.insert().execute(user_id=2, user_name='jack') - addresses.insert().execute( - address_id=1, user_id=2, address='foo@bar.com') - - r = text( - "select * from query_addresses", bind=testing.db).execute().first() - self.assert_(r[0:1] == (1,)) - self.assert_(r[1:] == (2, 'foo@bar.com')) - self.assert_(r[:-1] == (1, 2)) - - def test_column_accessor_basic_compiled(self): - users.insert().execute( - dict(user_id=1, user_name='john'), - dict(user_id=2, user_name='jack') - ) - - r = users.select(users.c.user_id == 2).execute().first() - self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2) - self.assert_( - r.user_name == r['user_name'] == r[users.c.user_name] == 'jack') - - def test_column_accessor_basic_text(self): - users.insert().execute( - dict(user_id=1, user_name='john'), - dict(user_id=2, user_name='jack') - ) - r = testing.db.execute( - text("select * from query_users where user_id=2")).first() - self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2) - self.assert_( - r.user_name == r['user_name'] == r[users.c.user_name] == 'jack') - - def test_column_accessor_textual_select(self): - users.insert().execute( - dict(user_id=1, user_name='john'), - dict(user_id=2, user_name='jack') - ) - # this will create column() objects inside - # the select(), these need to match on name anyway - r = testing.db.execute( - select([ - column('user_id'), column('user_name') - ]).select_from(table('query_users')). - where(text('user_id=2')) - ).first() - self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2) - self.assert_( - r.user_name == r['user_name'] == r[users.c.user_name] == 'jack') - - def test_column_accessor_dotted_union(self): - users.insert().execute( - dict(user_id=1, user_name='john'), - ) - - # test a little sqlite weirdness - with the UNION, - # cols come back as "query_users.user_id" in cursor.description - r = testing.db.execute( - text( - "select query_users.user_id, query_users.user_name " - "from query_users " - "UNION select query_users.user_id, " - "query_users.user_name from query_users" - ) - ).first() - eq_(r['user_id'], 1) - eq_(r['user_name'], "john") - eq_(list(r.keys()), ["user_id", "user_name"]) - - @testing.only_on("sqlite", "sqlite specific feature") - def test_column_accessor_sqlite_raw(self): - users.insert().execute( - dict(user_id=1, user_name='john'), - ) - - r = text( - "select query_users.user_id, query_users.user_name " - "from query_users " - "UNION select query_users.user_id, " - "query_users.user_name from query_users", - bind=testing.db).execution_options(sqlite_raw_colnames=True). \ - execute().first() - assert 'user_id' not in r - assert 'user_name' not in r - eq_(r['query_users.user_id'], 1) - eq_(r['query_users.user_name'], "john") - eq_(list(r.keys()), ["query_users.user_id", "query_users.user_name"]) - - @testing.only_on("sqlite", "sqlite specific feature") - def test_column_accessor_sqlite_translated(self): - users.insert().execute( - dict(user_id=1, user_name='john'), - ) - - r = text( - "select query_users.user_id, query_users.user_name " - "from query_users " - "UNION select query_users.user_id, " - "query_users.user_name from query_users", - bind=testing.db).execute().first() - eq_(r['user_id'], 1) - eq_(r['user_name'], "john") - eq_(r['query_users.user_id'], 1) - eq_(r['query_users.user_name'], "john") - eq_(list(r.keys()), ["user_id", "user_name"]) - - def test_column_accessor_labels_w_dots(self): - users.insert().execute( - dict(user_id=1, user_name='john'), - ) - # test using literal tablename.colname - r = text( - 'select query_users.user_id AS "query_users.user_id", ' - 'query_users.user_name AS "query_users.user_name" ' - 'from query_users', bind=testing.db).\ - execution_options(sqlite_raw_colnames=True).execute().first() - eq_(r['query_users.user_id'], 1) - eq_(r['query_users.user_name'], "john") - assert "user_name" not in r - eq_(list(r.keys()), ["query_users.user_id", "query_users.user_name"]) - - def test_column_accessor_unary(self): - users.insert().execute( - dict(user_id=1, user_name='john'), - ) - - # unary experssions - r = select([users.c.user_name.distinct()]).order_by( - users.c.user_name).execute().first() - eq_(r[users.c.user_name], 'john') - eq_(r.user_name, 'john') - - def test_column_accessor_err(self): - r = testing.db.execute(select([1])).first() - assert_raises_message( - AttributeError, - "Could not locate column in row for column 'foo'", - getattr, r, "foo" - ) - assert_raises_message( - KeyError, - "Could not locate column in row for column 'foo'", - lambda: r['foo'] - ) - - def test_graceful_fetch_on_non_rows(self): - """test that calling fetchone() etc. on a result that doesn't - return rows fails gracefully. - - """ - - # these proxies don't work with no cursor.description present. - # so they don't apply to this test at the moment. - # result.FullyBufferedResultProxy, - # result.BufferedRowResultProxy, - # result.BufferedColumnResultProxy - - conn = testing.db.connect() - for meth in [ - lambda r: r.fetchone(), - lambda r: r.fetchall(), - lambda r: r.first(), - lambda r: r.scalar(), - lambda r: r.fetchmany(), - lambda r: r._getter('user'), - lambda r: r._has_key('user'), - ]: - trans = conn.begin() - result = conn.execute(users.insert(), user_id=1) - assert_raises_message( - exc.ResourceClosedError, - "This result object does not return rows. " - "It has been closed automatically.", - meth, result, - ) - trans.rollback() - - @testing.requires.empty_inserts - @testing.requires.returning - def test_no_inserted_pk_on_returning(self): - result = testing.db.execute(users.insert().returning( - users.c.user_id, users.c.user_name)) - assert_raises_message( - exc.InvalidRequestError, - r"Can't call inserted_primary_key when returning\(\) is used.", - getattr, result, 'inserted_primary_key' - ) - - def test_fetchone_til_end(self): - result = testing.db.execute("select * from query_users") - eq_(result.fetchone(), None) - eq_(result.fetchone(), None) - eq_(result.fetchone(), None) - result.close() - assert_raises_message( - exc.ResourceClosedError, - "This result object is closed.", - result.fetchone - ) - - def test_row_case_sensitive(self): - row = testing.db.execute( - select([ - literal_column("1").label("case_insensitive"), - literal_column("2").label("CaseSensitive") - ]) - ).first() - - eq_(list(row.keys()), ["case_insensitive", "CaseSensitive"]) - - in_("case_insensitive", row._keymap) - in_("CaseSensitive", row._keymap) - not_in_("casesensitive", row._keymap) - - eq_(row["case_insensitive"], 1) - eq_(row["CaseSensitive"], 2) - - assert_raises( - KeyError, - lambda: row["Case_insensitive"] - ) - assert_raises( - KeyError, - lambda: row["casesensitive"] - ) - - def test_row_case_sensitive_unoptimized(self): - ins_db = engines.testing_engine(options={"case_sensitive": True}) - row = ins_db.execute( - select([ - literal_column("1").label("case_insensitive"), - literal_column("2").label("CaseSensitive"), - text("3 AS screw_up_the_cols") - ]) - ).first() - - eq_( - list(row.keys()), - ["case_insensitive", "CaseSensitive", "screw_up_the_cols"]) - - in_("case_insensitive", row._keymap) - in_("CaseSensitive", row._keymap) - not_in_("casesensitive", row._keymap) - - eq_(row["case_insensitive"], 1) - eq_(row["CaseSensitive"], 2) - eq_(row["screw_up_the_cols"], 3) - - assert_raises(KeyError, lambda: row["Case_insensitive"]) - assert_raises(KeyError, lambda: row["casesensitive"]) - assert_raises(KeyError, lambda: row["screw_UP_the_cols"]) - - def test_row_case_insensitive(self): - ins_db = engines.testing_engine(options={"case_sensitive": False}) - row = ins_db.execute( - select([ - literal_column("1").label("case_insensitive"), - literal_column("2").label("CaseSensitive") - ]) - ).first() - - eq_(list(row.keys()), ["case_insensitive", "CaseSensitive"]) - - in_("case_insensitive", row._keymap) - in_("CaseSensitive", row._keymap) - in_("casesensitive", row._keymap) - - eq_(row["case_insensitive"], 1) - eq_(row["CaseSensitive"], 2) - eq_(row["Case_insensitive"], 1) - eq_(row["casesensitive"], 2) - - def test_row_case_insensitive_unoptimized(self): - ins_db = engines.testing_engine(options={"case_sensitive": False}) - row = ins_db.execute( - select([ - literal_column("1").label("case_insensitive"), - literal_column("2").label("CaseSensitive"), - text("3 AS screw_up_the_cols") - ]) - ).first() - - eq_( - list(row.keys()), - ["case_insensitive", "CaseSensitive", "screw_up_the_cols"]) - - in_("case_insensitive", row._keymap) - in_("CaseSensitive", row._keymap) - in_("casesensitive", row._keymap) - - eq_(row["case_insensitive"], 1) - eq_(row["CaseSensitive"], 2) - eq_(row["screw_up_the_cols"], 3) - eq_(row["Case_insensitive"], 1) - eq_(row["casesensitive"], 2) - eq_(row["screw_UP_the_cols"], 3) - - def test_row_as_args(self): - users.insert().execute(user_id=1, user_name='john') - r = users.select(users.c.user_id == 1).execute().first() - users.delete().execute() - users.insert().execute(r) - eq_(users.select().execute().fetchall(), [(1, 'john')]) - - def test_result_as_args(self): - users.insert().execute([ - dict(user_id=1, user_name='john'), - dict(user_id=2, user_name='ed')]) - r = users.select().execute() - users2.insert().execute(list(r)) - eq_( - users2.select().order_by(users2.c.user_id).execute().fetchall(), - [(1, 'john'), (2, 'ed')] - ) - - users2.delete().execute() - r = users.select().execute() - users2.insert().execute(*list(r)) - eq_( - users2.select().order_by(users2.c.user_id).execute().fetchall(), - [(1, 'john'), (2, 'ed')] - ) - - @testing.requires.duplicate_names_in_cursor_description - def test_ambiguous_column(self): - users.insert().execute(user_id=1, user_name='john') - result = users.outerjoin(addresses).select().execute() - r = result.first() - - assert_raises_message( - exc.InvalidRequestError, - "Ambiguous column name", - lambda: r['user_id'] - ) - - assert_raises_message( - exc.InvalidRequestError, - "Ambiguous column name", - lambda: r[users.c.user_id] - ) - - assert_raises_message( - exc.InvalidRequestError, - "Ambiguous column name", - lambda: r[addresses.c.user_id] - ) - - # try to trick it - fake_table isn't in the result! - # we get the correct error - fake_table = Table('fake', MetaData(), Column('user_id', Integer)) - assert_raises_message( - exc.InvalidRequestError, - "Could not locate column in row for column 'fake.user_id'", - lambda: r[fake_table.c.user_id] - ) - - r = util.pickle.loads(util.pickle.dumps(r)) - assert_raises_message( - exc.InvalidRequestError, - "Ambiguous column name", - lambda: r['user_id'] - ) - - result = users.outerjoin(addresses).select().execute() - result = _result.BufferedColumnResultProxy(result.context) - r = result.first() - assert isinstance(r, _result.BufferedColumnRow) - assert_raises_message( - exc.InvalidRequestError, - "Ambiguous column name", - lambda: r['user_id'] - ) - - @testing.requires.duplicate_names_in_cursor_description - def test_ambiguous_column_by_col(self): - users.insert().execute(user_id=1, user_name='john') - ua = users.alias() - u2 = users.alias() - result = select([users.c.user_id, ua.c.user_id]).execute() - row = result.first() - - assert_raises_message( - exc.InvalidRequestError, - "Ambiguous column name", - lambda: row[users.c.user_id] - ) - - assert_raises_message( - exc.InvalidRequestError, - "Ambiguous column name", - lambda: row[ua.c.user_id] - ) - - # Unfortunately, this fails - - # we'd like - # "Could not locate column in row" - # to be raised here, but the check for - # "common column" in _compare_name_for_result() - # has other requirements to be more liberal. - # Ultimately the - # expression system would need a way to determine - # if given two columns in a "proxy" relationship, if they - # refer to a different parent table - assert_raises_message( - exc.InvalidRequestError, - "Ambiguous column name", - lambda: row[u2.c.user_id] - ) - - @testing.requires.duplicate_names_in_cursor_description - def test_ambiguous_column_contains(self): - # ticket 2702. in 0.7 we'd get True, False. - # in 0.8, both columns are present so it's True; - # but when they're fetched you'll get the ambiguous error. - users.insert().execute(user_id=1, user_name='john') - result = select([users.c.user_id, addresses.c.user_id]).\ - select_from(users.outerjoin(addresses)).execute() - row = result.first() - - eq_( - set([users.c.user_id in row, addresses.c.user_id in row]), - set([True]) - ) - - def test_ambiguous_column_by_col_plus_label(self): - users.insert().execute(user_id=1, user_name='john') - result = select( - [users.c.user_id, - type_coerce(users.c.user_id, Integer).label('foo')]).execute() - row = result.first() - eq_( - row[users.c.user_id], 1 - ) - eq_( - row[1], 1 - ) - - def test_fetch_partial_result_map(self): - users.insert().execute(user_id=7, user_name='ed') - - t = text("select * from query_users").columns( - user_name=String() - ) - eq_( - testing.db.execute(t).fetchall(), [(7, 'ed')] - ) - - def test_fetch_unordered_result_map(self): - users.insert().execute(user_id=7, user_name='ed') - - class Goofy1(TypeDecorator): - impl = String - - def process_result_value(self, value, dialect): - return value + "a" - - class Goofy2(TypeDecorator): - impl = String - - def process_result_value(self, value, dialect): - return value + "b" - - class Goofy3(TypeDecorator): - impl = String - - def process_result_value(self, value, dialect): - return value + "c" - - t = text( - "select user_name as a, user_name as b, " - "user_name as c from query_users").columns( - a=Goofy1(), b=Goofy2(), c=Goofy3() - ) - eq_( - testing.db.execute(t).fetchall(), [ - ('eda', 'edb', 'edc') - ] - ) - - @testing.requires.subqueries - def test_column_label_targeting(self): - users.insert().execute(user_id=7, user_name='ed') - - for s in ( - users.select().alias('foo'), - users.select().alias(users.name), - ): - row = s.select(use_labels=True).execute().first() - assert row[s.c.user_id] == 7 - assert row[s.c.user_name] == 'ed' - - def test_keys(self): - users.insert().execute(user_id=1, user_name='foo') - result = users.select().execute() - eq_( - result.keys(), - ['user_id', 'user_name'] - ) - row = result.first() - eq_( - row.keys(), - ['user_id', 'user_name'] - ) - - def test_keys_anon_labels(self): - """test [ticket:3483]""" - - users.insert().execute(user_id=1, user_name='foo') - result = testing.db.execute( - select([ - users.c.user_id, - users.c.user_name.label(None), - func.count(literal_column('1'))]). - group_by(users.c.user_id, users.c.user_name) - ) - - eq_( - result.keys(), - ['user_id', 'user_name_1', 'count_1'] - ) - row = result.first() - eq_( - row.keys(), - ['user_id', 'user_name_1', 'count_1'] - ) - - def test_items(self): - users.insert().execute(user_id=1, user_name='foo') - r = users.select().execute().first() - eq_( - [(x[0].lower(), x[1]) for x in list(r.items())], - [('user_id', 1), ('user_name', 'foo')]) - - def test_len(self): - users.insert().execute(user_id=1, user_name='foo') - r = users.select().execute().first() - eq_(len(r), 2) - - r = testing.db.execute('select user_name, user_id from query_users'). \ - first() - eq_(len(r), 2) - r = testing.db.execute('select user_name from query_users').first() - eq_(len(r), 1) - - def test_sorting_in_python(self): - users.insert().execute( - dict(user_id=1, user_name='foo'), - dict(user_id=2, user_name='bar'), - dict(user_id=3, user_name='def'), - ) - - rows = users.select().order_by(users.c.user_name).execute().fetchall() - - eq_(rows, [(2, 'bar'), (3, 'def'), (1, 'foo')]) - - eq_(sorted(rows), [(1, 'foo'), (2, 'bar'), (3, 'def')]) - - def test_column_order_with_simple_query(self): - # should return values in column definition order - users.insert().execute(user_id=1, user_name='foo') - r = users.select(users.c.user_id == 1).execute().first() - eq_(r[0], 1) - eq_(r[1], 'foo') - eq_([x.lower() for x in list(r.keys())], ['user_id', 'user_name']) - eq_(list(r.values()), [1, 'foo']) - - def test_column_order_with_text_query(self): - # should return values in query order - users.insert().execute(user_id=1, user_name='foo') - r = testing.db.execute('select user_name, user_id from query_users'). \ - first() - eq_(r[0], 'foo') - eq_(r[1], 1) - eq_([x.lower() for x in list(r.keys())], ['user_name', 'user_id']) - eq_(list(r.values()), ['foo', 1]) - - @testing.crashes('oracle', 'FIXME: unknown, varify not fails_on()') - @testing.crashes('firebird', 'An identifier must begin with a letter') - def test_column_accessor_shadow(self): - meta = MetaData(testing.db) - shadowed = Table( - 'test_shadowed', meta, - Column('shadow_id', INT, primary_key=True), - Column('shadow_name', VARCHAR(20)), - Column('parent', VARCHAR(20)), - Column('row', VARCHAR(40)), - Column('_parent', VARCHAR(20)), - Column('_row', VARCHAR(20)), - ) - shadowed.create(checkfirst=True) - try: - shadowed.insert().execute( - shadow_id=1, shadow_name='The Shadow', parent='The Light', - row='Without light there is no shadow', - _parent='Hidden parent', _row='Hidden row') - r = shadowed.select(shadowed.c.shadow_id == 1).execute().first() - self.assert_( - r.shadow_id == r['shadow_id'] == r[shadowed.c.shadow_id] == 1) - self.assert_( - r.shadow_name == r['shadow_name'] == - r[shadowed.c.shadow_name] == 'The Shadow') - self.assert_( - r.parent == r['parent'] == r[shadowed.c.parent] == 'The Light') - self.assert_( - r.row == r['row'] == r[shadowed.c.row] == - 'Without light there is no shadow') - self.assert_(r['_parent'] == 'Hidden parent') - self.assert_(r['_row'] == 'Hidden row') - finally: - shadowed.drop(checkfirst=True) - @testing.emits_warning('.*empty sequence.*') def test_in_filtering(self): """test the behavior of the in_() function.""" @@ -1578,393 +553,6 @@ class RequiredBindTest(fixtures.TablesTest): is_(bindparam('foo', callable_=c, required=False).required, False) -class TableInsertTest(fixtures.TablesTest): - - """test for consistent insert behavior across dialects - regarding the inline=True flag, lower-case 't' tables. - - """ - run_create_tables = 'each' - __backend__ = True - - @classmethod - def define_tables(cls, metadata): - Table( - 'foo', metadata, - Column('id', Integer, Sequence('t_id_seq'), primary_key=True), - Column('data', String(50)), - Column('x', Integer) - ) - - def _fixture(self, types=True): - if types: - t = sql.table( - 'foo', sql.column('id', Integer), - sql.column('data', String), - sql.column('x', Integer)) - else: - t = sql.table( - 'foo', sql.column('id'), sql.column('data'), sql.column('x')) - return t - - def _test(self, stmt, row, returning=None, inserted_primary_key=False): - r = testing.db.execute(stmt) - - if returning: - returned = r.first() - eq_(returned, returning) - elif inserted_primary_key is not False: - eq_(r.inserted_primary_key, inserted_primary_key) - - eq_(testing.db.execute(self.tables.foo.select()).first(), row) - - def _test_multi(self, stmt, rows, data): - testing.db.execute(stmt, rows) - eq_( - testing.db.execute( - self.tables.foo.select(). - order_by(self.tables.foo.c.id)).fetchall(), - data) - - @testing.requires.sequences - def test_expicit_sequence(self): - t = self._fixture() - self._test( - t.insert().values( - id=func.next_value(Sequence('t_id_seq')), data='data', x=5), - (1, 'data', 5) - ) - - def test_uppercase(self): - t = self.tables.foo - self._test( - t.insert().values(id=1, data='data', x=5), - (1, 'data', 5), - inserted_primary_key=[1] - ) - - def test_uppercase_inline(self): - t = self.tables.foo - self._test( - t.insert(inline=True).values(id=1, data='data', x=5), - (1, 'data', 5), - inserted_primary_key=[1] - ) - - @testing.crashes( - "mssql+pyodbc", - "Pyodbc + SQL Server + Py3K, some decimal handling issue") - def test_uppercase_inline_implicit(self): - t = self.tables.foo - self._test( - t.insert(inline=True).values(data='data', x=5), - (1, 'data', 5), - inserted_primary_key=[None] - ) - - def test_uppercase_implicit(self): - t = self.tables.foo - self._test( - t.insert().values(data='data', x=5), - (1, 'data', 5), - inserted_primary_key=[1] - ) - - def test_uppercase_direct_params(self): - t = self.tables.foo - self._test( - t.insert().values(id=1, data='data', x=5), - (1, 'data', 5), - inserted_primary_key=[1] - ) - - @testing.requires.returning - def test_uppercase_direct_params_returning(self): - t = self.tables.foo - self._test( - t.insert().values(id=1, data='data', x=5).returning(t.c.id, t.c.x), - (1, 'data', 5), - returning=(1, 5) - ) - - @testing.fails_on( - 'mssql', "lowercase table doesn't support identity insert disable") - def test_direct_params(self): - t = self._fixture() - self._test( - t.insert().values(id=1, data='data', x=5), - (1, 'data', 5), - inserted_primary_key=[] - ) - - @testing.fails_on( - 'mssql', "lowercase table doesn't support identity insert disable") - @testing.requires.returning - def test_direct_params_returning(self): - t = self._fixture() - self._test( - t.insert().values(id=1, data='data', x=5).returning(t.c.id, t.c.x), - (1, 'data', 5), - returning=(1, 5) - ) - - @testing.requires.emulated_lastrowid - def test_implicit_pk(self): - t = self._fixture() - self._test( - t.insert().values(data='data', x=5), - (1, 'data', 5), - inserted_primary_key=[] - ) - - @testing.requires.emulated_lastrowid - def test_implicit_pk_multi_rows(self): - t = self._fixture() - self._test_multi( - t.insert(), - [ - {'data': 'd1', 'x': 5}, - {'data': 'd2', 'x': 6}, - {'data': 'd3', 'x': 7}, - ], - [ - (1, 'd1', 5), - (2, 'd2', 6), - (3, 'd3', 7) - ], - ) - - @testing.requires.emulated_lastrowid - def test_implicit_pk_inline(self): - t = self._fixture() - self._test( - t.insert(inline=True).values(data='data', x=5), - (1, 'data', 5), - inserted_primary_key=[] - ) - - -class KeyTargetingTest(fixtures.TablesTest): - run_inserts = 'once' - run_deletes = None - __backend__ = True - - @classmethod - def define_tables(cls, metadata): - Table( - 'keyed1', metadata, Column("a", CHAR(2), key="b"), - Column("c", CHAR(2), key="q") - ) - Table('keyed2', metadata, Column("a", CHAR(2)), Column("b", CHAR(2))) - Table('keyed3', metadata, Column("a", CHAR(2)), Column("d", CHAR(2))) - Table('keyed4', metadata, Column("b", CHAR(2)), Column("q", CHAR(2))) - Table('content', metadata, Column('t', String(30), key="type")) - Table('bar', metadata, Column('ctype', String(30), key="content_type")) - - if testing.requires.schemas.enabled: - Table( - 'wschema', metadata, - Column("a", CHAR(2), key="b"), - Column("c", CHAR(2), key="q"), - schema=testing.config.test_schema - ) - - @classmethod - def insert_data(cls): - cls.tables.keyed1.insert().execute(dict(b="a1", q="c1")) - cls.tables.keyed2.insert().execute(dict(a="a2", b="b2")) - cls.tables.keyed3.insert().execute(dict(a="a3", d="d3")) - cls.tables.keyed4.insert().execute(dict(b="b4", q="q4")) - cls.tables.content.insert().execute(type="t1") - - if testing.requires.schemas.enabled: - cls.tables['%s.wschema' % testing.config.test_schema].insert().execute( - dict(b="a1", q="c1")) - - @testing.requires.schemas - def test_keyed_accessor_wschema(self): - keyed1 = self.tables['%s.wschema' % testing.config.test_schema] - row = testing.db.execute(keyed1.select()).first() - - eq_(row.b, "a1") - eq_(row.q, "c1") - eq_(row.a, "a1") - eq_(row.c, "c1") - - def test_keyed_accessor_single(self): - keyed1 = self.tables.keyed1 - row = testing.db.execute(keyed1.select()).first() - - eq_(row.b, "a1") - eq_(row.q, "c1") - eq_(row.a, "a1") - eq_(row.c, "c1") - - def test_keyed_accessor_single_labeled(self): - keyed1 = self.tables.keyed1 - row = testing.db.execute(keyed1.select().apply_labels()).first() - - eq_(row.keyed1_b, "a1") - eq_(row.keyed1_q, "c1") - eq_(row.keyed1_a, "a1") - eq_(row.keyed1_c, "c1") - - @testing.requires.duplicate_names_in_cursor_description - def test_keyed_accessor_composite_conflict_2(self): - keyed1 = self.tables.keyed1 - keyed2 = self.tables.keyed2 - - row = testing.db.execute(select([keyed1, keyed2])).first() - # row.b is unambiguous - eq_(row.b, "b2") - # row.a is ambiguous - assert_raises_message( - exc.InvalidRequestError, - "Ambig", - getattr, row, "a" - ) - - def test_keyed_accessor_composite_names_precedent(self): - keyed1 = self.tables.keyed1 - keyed4 = self.tables.keyed4 - - row = testing.db.execute(select([keyed1, keyed4])).first() - eq_(row.b, "b4") - eq_(row.q, "q4") - eq_(row.a, "a1") - eq_(row.c, "c1") - - @testing.requires.duplicate_names_in_cursor_description - def test_keyed_accessor_composite_keys_precedent(self): - keyed1 = self.tables.keyed1 - keyed3 = self.tables.keyed3 - - row = testing.db.execute(select([keyed1, keyed3])).first() - eq_(row.q, "c1") - assert_raises_message( - exc.InvalidRequestError, - "Ambiguous column name 'b'", - getattr, row, "b" - ) - assert_raises_message( - exc.InvalidRequestError, - "Ambiguous column name 'a'", - getattr, row, "a" - ) - eq_(row.d, "d3") - - def test_keyed_accessor_composite_labeled(self): - keyed1 = self.tables.keyed1 - keyed2 = self.tables.keyed2 - - row = testing.db.execute(select([keyed1, keyed2]).apply_labels()). \ - first() - eq_(row.keyed1_b, "a1") - eq_(row.keyed1_a, "a1") - eq_(row.keyed1_q, "c1") - eq_(row.keyed1_c, "c1") - eq_(row.keyed2_a, "a2") - eq_(row.keyed2_b, "b2") - assert_raises(KeyError, lambda: row['keyed2_c']) - assert_raises(KeyError, lambda: row['keyed2_q']) - - def test_column_label_overlap_fallback(self): - content, bar = self.tables.content, self.tables.bar - row = testing.db.execute( - select([content.c.type.label("content_type")])).first() - assert content.c.type not in row - assert bar.c.content_type not in row - assert sql.column('content_type') in row - - row = testing.db.execute(select([func.now().label("content_type")])). \ - first() - assert content.c.type not in row - assert bar.c.content_type not in row - assert sql.column('content_type') in row - - def test_column_label_overlap_fallback_2(self): - content, bar = self.tables.content, self.tables.bar - row = testing.db.execute(content.select(use_labels=True)).first() - assert content.c.type in row - assert bar.c.content_type not in row - assert sql.column('content_type') not in row - - def test_columnclause_schema_column_one(self): - keyed2 = self.tables.keyed2 - - # this is addressed by [ticket:2932] - # ColumnClause._compare_name_for_result allows the - # columns which the statement is against to be lightweight - # cols, which results in a more liberal comparison scheme - a, b = sql.column('a'), sql.column('b') - stmt = select([a, b]).select_from(table("keyed2")) - row = testing.db.execute(stmt).first() - - assert keyed2.c.a in row - assert keyed2.c.b in row - assert a in row - assert b in row - - def test_columnclause_schema_column_two(self): - keyed2 = self.tables.keyed2 - - a, b = sql.column('a'), sql.column('b') - stmt = select([keyed2.c.a, keyed2.c.b]) - row = testing.db.execute(stmt).first() - - assert keyed2.c.a in row - assert keyed2.c.b in row - assert a in row - assert b in row - - def test_columnclause_schema_column_three(self): - keyed2 = self.tables.keyed2 - - # this is also addressed by [ticket:2932] - - a, b = sql.column('a'), sql.column('b') - stmt = text("select a, b from keyed2").columns(a=CHAR, b=CHAR) - row = testing.db.execute(stmt).first() - - assert keyed2.c.a in row - assert keyed2.c.b in row - assert a in row - assert b in row - assert stmt.c.a in row - assert stmt.c.b in row - - def test_columnclause_schema_column_four(self): - keyed2 = self.tables.keyed2 - - # this is also addressed by [ticket:2932] - - a, b = sql.column('keyed2_a'), sql.column('keyed2_b') - stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns( - a, b) - row = testing.db.execute(stmt).first() - - assert keyed2.c.a in row - assert keyed2.c.b in row - assert a in row - assert b in row - assert stmt.c.keyed2_a in row - assert stmt.c.keyed2_b in row - - def test_columnclause_schema_column_five(self): - keyed2 = self.tables.keyed2 - - # this is also addressed by [ticket:2932] - - stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns( - keyed2_a=CHAR, keyed2_b=CHAR) - row = testing.db.execute(stmt).first() - - assert keyed2.c.a in row - assert keyed2.c.b in row - assert stmt.c.keyed2_a in row - assert stmt.c.keyed2_b in row - - class LimitTest(fixtures.TestBase): __backend__ = True |