summaryrefslogtreecommitdiff
path: root/test/sql/test_query.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2015-09-27 12:09:24 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2015-09-27 12:09:24 -0400
commita8e1d33ae514a045d71d0a26d0c1325eecd4ca99 (patch)
tree1470e63f9b1b9faf3c46705f062c3f27e2b68211 /test/sql/test_query.py
parent91255618ddb47553774c620a23479adf88c27b74 (diff)
downloadsqlalchemy-a8e1d33ae514a045d71d0a26d0c1325eecd4ca99.tar.gz
- break out critical aspects of test_query into their own tests
finally, test_resultset and test_insert_exec. Update all idioms within these.
Diffstat (limited to 'test/sql/test_query.py')
-rw-r--r--test/sql/test_query.py1420
1 files changed, 4 insertions, 1416 deletions
diff --git a/test/sql/test_query.py b/test/sql/test_query.py
index 0313a9cd0..aca933fc9 100644
--- a/test/sql/test_query.py
+++ b/test/sql/test_query.py
@@ -2,13 +2,12 @@ from sqlalchemy.testing import eq_, assert_raises_message, assert_raises, \
is_, in_, not_in_
from sqlalchemy import testing
from sqlalchemy.testing import fixtures, engines
-from sqlalchemy import util
from sqlalchemy import (
exc, sql, func, select, String, Integer, MetaData, and_, ForeignKey,
- union, intersect, except_, union_all, VARCHAR, INT, CHAR, text, Sequence,
- bindparam, literal, not_, type_coerce, literal_column, desc, asc,
- TypeDecorator, or_, cast, table, column)
-from sqlalchemy.engine import default, result as _result
+ union, intersect, except_, union_all, VARCHAR, INT, text,
+ bindparam, literal, not_, literal_column, desc, asc,
+ TypeDecorator, or_, cast)
+from sqlalchemy.engine import default
from sqlalchemy.testing.schema import Table, Column
# ongoing - these are old tests. those which are of general use
@@ -62,260 +61,6 @@ class QueryTest(fixtures.TestBase):
def teardown_class(cls):
metadata.drop_all()
- @testing.requires.multivalues_inserts
- def test_multivalues_insert(self):
- users.insert(
- values=[
- {'user_id': 7, 'user_name': 'jack'},
- {'user_id': 8, 'user_name': 'ed'}]).execute()
- rows = users.select().order_by(users.c.user_id).execute().fetchall()
- self.assert_(rows[0] == (7, 'jack'))
- self.assert_(rows[1] == (8, 'ed'))
- users.insert(values=[(9, 'jack'), (10, 'ed')]).execute()
- rows = users.select().order_by(users.c.user_id).execute().fetchall()
- self.assert_(rows[2] == (9, 'jack'))
- self.assert_(rows[3] == (10, 'ed'))
-
- def test_insert_heterogeneous_params(self):
- """test that executemany parameters are asserted to match the
- parameter set of the first."""
-
- assert_raises_message(
- exc.StatementError,
- r"\(sqlalchemy.exc.InvalidRequestError\) A value is required for "
- "bind parameter 'user_name', in "
- "parameter group 2 "
- r"\[SQL: u?'INSERT INTO query_users",
- users.insert().execute,
- {'user_id': 7, 'user_name': 'jack'},
- {'user_id': 8, 'user_name': 'ed'},
- {'user_id': 9}
- )
-
- # this succeeds however. We aren't yet doing
- # a length check on all subsequent parameters.
- users.insert().execute(
- {'user_id': 7},
- {'user_id': 8, 'user_name': 'ed'},
- {'user_id': 9}
- )
-
- def test_lastrow_accessor(self):
- """Tests the inserted_primary_key and lastrow_has_id() functions."""
-
- def insert_values(engine, table, values):
- """
- Inserts a row into a table, returns the full list of values
- INSERTed including defaults that fired off on the DB side and
- detects rows that had defaults and post-fetches.
- """
-
- # verify implicit_returning is working
- if engine.dialect.implicit_returning:
- ins = table.insert()
- comp = ins.compile(engine, column_keys=list(values))
- if not set(values).issuperset(
- c.key for c in table.primary_key):
- assert comp.returning
-
- result = engine.execute(table.insert(), **values)
- ret = values.copy()
-
- for col, id in zip(table.primary_key, result.inserted_primary_key):
- ret[col.key] = id
-
- if result.lastrow_has_defaults():
- criterion = and_(
- *[
- col == id for col, id in
- zip(table.primary_key, result.inserted_primary_key)])
- row = engine.execute(table.select(criterion)).first()
- for c in table.c:
- ret[c.key] = row[c]
- return ret
-
- if testing.against('firebird', 'postgresql', 'oracle', 'mssql'):
- assert testing.db.dialect.implicit_returning
-
- if testing.db.dialect.implicit_returning:
- test_engines = [
- engines.testing_engine(options={'implicit_returning': False}),
- engines.testing_engine(options={'implicit_returning': True}),
- ]
- else:
- test_engines = [testing.db]
-
- for engine in test_engines:
- metadata = MetaData()
- for supported, table, values, assertvalues in [
- (
- {'unsupported': ['sqlite']},
- Table(
- "t1", metadata,
- Column(
- 'id', Integer, primary_key=True,
- test_needs_autoincrement=True),
- Column('foo', String(30), primary_key=True)),
- {'foo': 'hi'},
- {'id': 1, 'foo': 'hi'}
- ),
- (
- {'unsupported': ['sqlite']},
- Table(
- "t2", metadata,
- Column(
- 'id', Integer, primary_key=True,
- test_needs_autoincrement=True),
- Column('foo', String(30), primary_key=True),
- Column('bar', String(30), server_default='hi')
- ),
- {'foo': 'hi'},
- {'id': 1, 'foo': 'hi', 'bar': 'hi'}
- ),
- (
- {'unsupported': []},
- Table(
- "t3", metadata,
- Column("id", String(40), primary_key=True),
- Column('foo', String(30), primary_key=True),
- Column("bar", String(30))
- ),
- {'id': 'hi', 'foo': 'thisisfoo', 'bar': "thisisbar"},
- {'id': 'hi', 'foo': 'thisisfoo', 'bar': "thisisbar"}
- ),
- (
- {'unsupported': []},
- Table(
- "t4", metadata,
- Column(
- 'id', Integer,
- Sequence('t4_id_seq', optional=True),
- primary_key=True),
- Column('foo', String(30), primary_key=True),
- Column('bar', String(30), server_default='hi')
- ),
- {'foo': 'hi', 'id': 1},
- {'id': 1, 'foo': 'hi', 'bar': 'hi'}
- ),
- (
- {'unsupported': []},
- Table(
- "t5", metadata,
- Column('id', String(10), primary_key=True),
- Column('bar', String(30), server_default='hi')
- ),
- {'id': 'id1'},
- {'id': 'id1', 'bar': 'hi'},
- ),
- (
- {'unsupported': ['sqlite']},
- Table(
- "t6", metadata,
- Column(
- 'id', Integer, primary_key=True,
- test_needs_autoincrement=True),
- Column('bar', Integer, primary_key=True)
- ),
- {'bar': 0},
- {'id': 1, 'bar': 0},
- ),
- ]:
- if testing.db.name in supported['unsupported']:
- continue
- try:
- table.create(bind=engine, checkfirst=True)
- i = insert_values(engine, table, values)
- assert i == assertvalues, "tablename: %s %r %r" % \
- (table.name, repr(i), repr(assertvalues))
- finally:
- table.drop(bind=engine)
-
- # TODO: why not in the sqlite suite?
- @testing.only_on('sqlite+pysqlite')
- @testing.provide_metadata
- def test_lastrowid_zero(self):
- from sqlalchemy.dialects import sqlite
- eng = engines.testing_engine()
-
- class ExcCtx(sqlite.base.SQLiteExecutionContext):
-
- def get_lastrowid(self):
- return 0
- eng.dialect.execution_ctx_cls = ExcCtx
- t = Table(
- 't', self.metadata, Column('x', Integer, primary_key=True),
- Column('y', Integer))
- t.create(eng)
- r = eng.execute(t.insert().values(y=5))
- eq_(r.inserted_primary_key, [0])
-
- @testing.fails_on(
- 'sqlite', "sqlite autoincremnt doesn't work with composite pks")
- def test_misordered_lastrow(self):
- related = Table(
- 'related', metadata,
- Column('id', Integer, primary_key=True),
- mysql_engine='MyISAM'
- )
- t6 = Table(
- "t6", metadata,
- Column(
- 'manual_id', Integer, ForeignKey('related.id'),
- primary_key=True),
- Column(
- 'auto_id', Integer, primary_key=True,
- test_needs_autoincrement=True),
- mysql_engine='MyISAM'
- )
-
- metadata.create_all()
- r = related.insert().values(id=12).execute()
- id = r.inserted_primary_key[0]
- assert id == 12
-
- r = t6.insert().values(manual_id=id).execute()
- eq_(r.inserted_primary_key, [12, 1])
-
- def test_implicit_id_insert_select_columns(self):
- stmt = users.insert().from_select(
- (users.c.user_id, users.c.user_name),
- users.select().where(users.c.user_id == 20))
-
- testing.db.execute(stmt)
-
- def test_implicit_id_insert_select_keys(self):
- stmt = users.insert().from_select(
- ["user_id", "user_name"],
- users.select().where(users.c.user_id == 20))
-
- testing.db.execute(stmt)
-
- def test_row_iteration(self):
- users.insert().execute(
- {'user_id': 7, 'user_name': 'jack'},
- {'user_id': 8, 'user_name': 'ed'},
- {'user_id': 9, 'user_name': 'fred'},
- )
- r = users.select().execute()
- l = []
- for row in r:
- l.append(row)
- self.assert_(len(l) == 3)
-
- @testing.requires.subqueries
- def test_anonymous_rows(self):
- users.insert().execute(
- {'user_id': 7, 'user_name': 'jack'},
- {'user_id': 8, 'user_name': 'ed'},
- {'user_id': 9, 'user_name': 'fred'},
- )
-
- sel = select([users.c.user_id]).where(users.c.user_name == 'jack'). \
- as_scalar()
- for row in select([sel + 1, sel + 3], bind=users.bind).execute():
- assert row['anon_1'] == 8
- assert row['anon_2'] == 10
-
@testing.fails_on(
'firebird', "kinterbasdb doesn't send full type information")
def test_order_by_label(self):
@@ -365,154 +110,6 @@ class QueryTest(fixtures.TestBase):
[("test: ed",), ("test: fred",), ("test: jack",)]
)
- def test_row_comparison(self):
- users.insert().execute(user_id=7, user_name='jack')
- rp = users.select().execute().first()
-
- self.assert_(rp == rp)
- self.assert_(not(rp != rp))
-
- equal = (7, 'jack')
-
- self.assert_(rp == equal)
- self.assert_(equal == rp)
- self.assert_(not (rp != equal))
- self.assert_(not (equal != equal))
-
- def endless():
- while True:
- yield 1
- self.assert_(rp != endless())
- self.assert_(endless() != rp)
-
- # test that everything compares the same
- # as it would against a tuple
- import operator
- for compare in [False, 8, endless(), 'xyz', (7, 'jack')]:
- for op in [
- operator.eq, operator.ne, operator.gt,
- operator.lt, operator.ge, operator.le
- ]:
-
- try:
- control = op(equal, compare)
- except TypeError:
- # Py3K raises TypeError for some invalid comparisons
- assert_raises(TypeError, op, rp, compare)
- else:
- eq_(control, op(rp, compare))
-
- try:
- control = op(compare, equal)
- except TypeError:
- # Py3K raises TypeError for some invalid comparisons
- assert_raises(TypeError, op, compare, rp)
- else:
- eq_(control, op(compare, rp))
-
- @testing.provide_metadata
- def test_column_label_overlap_fallback(self):
- content = Table(
- 'content', self.metadata,
- Column('type', String(30)),
- )
- bar = Table(
- 'bar', self.metadata,
- Column('content_type', String(30))
- )
- self.metadata.create_all(testing.db)
- testing.db.execute(content.insert().values(type="t1"))
-
- row = testing.db.execute(content.select(use_labels=True)).first()
- assert content.c.type in row
- assert bar.c.content_type not in row
- assert sql.column('content_type') in row
-
- row = testing.db.execute(
- select([content.c.type.label("content_type")])).first()
- assert content.c.type in row
-
- assert bar.c.content_type not in row
-
- assert sql.column('content_type') in row
-
- row = testing.db.execute(select([func.now().label("content_type")])). \
- first()
- assert content.c.type not in row
-
- assert bar.c.content_type not in row
-
- assert sql.column('content_type') in row
-
- def test_pickled_rows(self):
- users.insert().execute(
- {'user_id': 7, 'user_name': 'jack'},
- {'user_id': 8, 'user_name': 'ed'},
- {'user_id': 9, 'user_name': 'fred'},
- )
-
- for pickle in False, True:
- for use_labels in False, True:
- result = users.select(use_labels=use_labels).order_by(
- users.c.user_id).execute().fetchall()
-
- if pickle:
- result = util.pickle.loads(util.pickle.dumps(result))
-
- eq_(
- result,
- [(7, "jack"), (8, "ed"), (9, "fred")]
- )
- if use_labels:
- eq_(result[0]['query_users_user_id'], 7)
- eq_(
- list(result[0].keys()),
- ["query_users_user_id", "query_users_user_name"])
- else:
- eq_(result[0]['user_id'], 7)
- eq_(list(result[0].keys()), ["user_id", "user_name"])
-
- eq_(result[0][0], 7)
- eq_(result[0][users.c.user_id], 7)
- eq_(result[0][users.c.user_name], 'jack')
-
- if not pickle or use_labels:
- assert_raises(
- exc.NoSuchColumnError,
- lambda: result[0][addresses.c.user_id])
- else:
- # test with a different table. name resolution is
- # causing 'user_id' to match when use_labels wasn't used.
- eq_(result[0][addresses.c.user_id], 7)
-
- assert_raises(
- exc.NoSuchColumnError, lambda: result[0]['fake key'])
- assert_raises(
- exc.NoSuchColumnError,
- lambda: result[0][addresses.c.address_id])
-
- def test_column_error_printing(self):
- row = testing.db.execute(select([1])).first()
-
- class unprintable(object):
-
- def __str__(self):
- raise ValueError("nope")
-
- msg = r"Could not locate column in row for column '%s'"
-
- for accessor, repl in [
- ("x", "x"),
- (Column("q", Integer), "q"),
- (Column("q", Integer) + 12, r"q \+ :q_1"),
- (unprintable(), "unprintable element.*"),
- ]:
- assert_raises_message(
- exc.NoSuchColumnError,
- msg % repl,
- lambda: row[accessor]
- )
-
@testing.requires.boolean_col_expressions
def test_or_and_as_columns(self):
true, false = literal(True), literal(False)
@@ -539,16 +136,6 @@ class QueryTest(fixtures.TestBase):
assert row.x == True # noqa
assert row.y == False # noqa
- def test_fetchmany(self):
- users.insert().execute(user_id=7, user_name='jack')
- users.insert().execute(user_id=8, user_name='ed')
- users.insert().execute(user_id=9, user_name='fred')
- r = users.select().execute()
- l = []
- for row in r.fetchmany(size=2):
- l.append(row)
- self.assert_(len(l) == 2, "fetchmany(size=2) got %s rows" % len(l))
-
def test_like_ops(self):
users.insert().execute(
{'user_id': 1, 'user_name': 'apples'},
@@ -817,618 +404,6 @@ class QueryTest(fixtures.TestBase):
use_labels=labels),
[(3, 'a'), (2, 'b'), (1, None)])
- def test_column_slices(self):
- users.insert().execute(user_id=1, user_name='john')
- users.insert().execute(user_id=2, user_name='jack')
- addresses.insert().execute(
- address_id=1, user_id=2, address='foo@bar.com')
-
- r = text(
- "select * from query_addresses", bind=testing.db).execute().first()
- self.assert_(r[0:1] == (1,))
- self.assert_(r[1:] == (2, 'foo@bar.com'))
- self.assert_(r[:-1] == (1, 2))
-
- def test_column_accessor_basic_compiled(self):
- users.insert().execute(
- dict(user_id=1, user_name='john'),
- dict(user_id=2, user_name='jack')
- )
-
- r = users.select(users.c.user_id == 2).execute().first()
- self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2)
- self.assert_(
- r.user_name == r['user_name'] == r[users.c.user_name] == 'jack')
-
- def test_column_accessor_basic_text(self):
- users.insert().execute(
- dict(user_id=1, user_name='john'),
- dict(user_id=2, user_name='jack')
- )
- r = testing.db.execute(
- text("select * from query_users where user_id=2")).first()
- self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2)
- self.assert_(
- r.user_name == r['user_name'] == r[users.c.user_name] == 'jack')
-
- def test_column_accessor_textual_select(self):
- users.insert().execute(
- dict(user_id=1, user_name='john'),
- dict(user_id=2, user_name='jack')
- )
- # this will create column() objects inside
- # the select(), these need to match on name anyway
- r = testing.db.execute(
- select([
- column('user_id'), column('user_name')
- ]).select_from(table('query_users')).
- where(text('user_id=2'))
- ).first()
- self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2)
- self.assert_(
- r.user_name == r['user_name'] == r[users.c.user_name] == 'jack')
-
- def test_column_accessor_dotted_union(self):
- users.insert().execute(
- dict(user_id=1, user_name='john'),
- )
-
- # test a little sqlite weirdness - with the UNION,
- # cols come back as "query_users.user_id" in cursor.description
- r = testing.db.execute(
- text(
- "select query_users.user_id, query_users.user_name "
- "from query_users "
- "UNION select query_users.user_id, "
- "query_users.user_name from query_users"
- )
- ).first()
- eq_(r['user_id'], 1)
- eq_(r['user_name'], "john")
- eq_(list(r.keys()), ["user_id", "user_name"])
-
- @testing.only_on("sqlite", "sqlite specific feature")
- def test_column_accessor_sqlite_raw(self):
- users.insert().execute(
- dict(user_id=1, user_name='john'),
- )
-
- r = text(
- "select query_users.user_id, query_users.user_name "
- "from query_users "
- "UNION select query_users.user_id, "
- "query_users.user_name from query_users",
- bind=testing.db).execution_options(sqlite_raw_colnames=True). \
- execute().first()
- assert 'user_id' not in r
- assert 'user_name' not in r
- eq_(r['query_users.user_id'], 1)
- eq_(r['query_users.user_name'], "john")
- eq_(list(r.keys()), ["query_users.user_id", "query_users.user_name"])
-
- @testing.only_on("sqlite", "sqlite specific feature")
- def test_column_accessor_sqlite_translated(self):
- users.insert().execute(
- dict(user_id=1, user_name='john'),
- )
-
- r = text(
- "select query_users.user_id, query_users.user_name "
- "from query_users "
- "UNION select query_users.user_id, "
- "query_users.user_name from query_users",
- bind=testing.db).execute().first()
- eq_(r['user_id'], 1)
- eq_(r['user_name'], "john")
- eq_(r['query_users.user_id'], 1)
- eq_(r['query_users.user_name'], "john")
- eq_(list(r.keys()), ["user_id", "user_name"])
-
- def test_column_accessor_labels_w_dots(self):
- users.insert().execute(
- dict(user_id=1, user_name='john'),
- )
- # test using literal tablename.colname
- r = text(
- 'select query_users.user_id AS "query_users.user_id", '
- 'query_users.user_name AS "query_users.user_name" '
- 'from query_users', bind=testing.db).\
- execution_options(sqlite_raw_colnames=True).execute().first()
- eq_(r['query_users.user_id'], 1)
- eq_(r['query_users.user_name'], "john")
- assert "user_name" not in r
- eq_(list(r.keys()), ["query_users.user_id", "query_users.user_name"])
-
- def test_column_accessor_unary(self):
- users.insert().execute(
- dict(user_id=1, user_name='john'),
- )
-
- # unary experssions
- r = select([users.c.user_name.distinct()]).order_by(
- users.c.user_name).execute().first()
- eq_(r[users.c.user_name], 'john')
- eq_(r.user_name, 'john')
-
- def test_column_accessor_err(self):
- r = testing.db.execute(select([1])).first()
- assert_raises_message(
- AttributeError,
- "Could not locate column in row for column 'foo'",
- getattr, r, "foo"
- )
- assert_raises_message(
- KeyError,
- "Could not locate column in row for column 'foo'",
- lambda: r['foo']
- )
-
- def test_graceful_fetch_on_non_rows(self):
- """test that calling fetchone() etc. on a result that doesn't
- return rows fails gracefully.
-
- """
-
- # these proxies don't work with no cursor.description present.
- # so they don't apply to this test at the moment.
- # result.FullyBufferedResultProxy,
- # result.BufferedRowResultProxy,
- # result.BufferedColumnResultProxy
-
- conn = testing.db.connect()
- for meth in [
- lambda r: r.fetchone(),
- lambda r: r.fetchall(),
- lambda r: r.first(),
- lambda r: r.scalar(),
- lambda r: r.fetchmany(),
- lambda r: r._getter('user'),
- lambda r: r._has_key('user'),
- ]:
- trans = conn.begin()
- result = conn.execute(users.insert(), user_id=1)
- assert_raises_message(
- exc.ResourceClosedError,
- "This result object does not return rows. "
- "It has been closed automatically.",
- meth, result,
- )
- trans.rollback()
-
- @testing.requires.empty_inserts
- @testing.requires.returning
- def test_no_inserted_pk_on_returning(self):
- result = testing.db.execute(users.insert().returning(
- users.c.user_id, users.c.user_name))
- assert_raises_message(
- exc.InvalidRequestError,
- r"Can't call inserted_primary_key when returning\(\) is used.",
- getattr, result, 'inserted_primary_key'
- )
-
- def test_fetchone_til_end(self):
- result = testing.db.execute("select * from query_users")
- eq_(result.fetchone(), None)
- eq_(result.fetchone(), None)
- eq_(result.fetchone(), None)
- result.close()
- assert_raises_message(
- exc.ResourceClosedError,
- "This result object is closed.",
- result.fetchone
- )
-
- def test_row_case_sensitive(self):
- row = testing.db.execute(
- select([
- literal_column("1").label("case_insensitive"),
- literal_column("2").label("CaseSensitive")
- ])
- ).first()
-
- eq_(list(row.keys()), ["case_insensitive", "CaseSensitive"])
-
- in_("case_insensitive", row._keymap)
- in_("CaseSensitive", row._keymap)
- not_in_("casesensitive", row._keymap)
-
- eq_(row["case_insensitive"], 1)
- eq_(row["CaseSensitive"], 2)
-
- assert_raises(
- KeyError,
- lambda: row["Case_insensitive"]
- )
- assert_raises(
- KeyError,
- lambda: row["casesensitive"]
- )
-
- def test_row_case_sensitive_unoptimized(self):
- ins_db = engines.testing_engine(options={"case_sensitive": True})
- row = ins_db.execute(
- select([
- literal_column("1").label("case_insensitive"),
- literal_column("2").label("CaseSensitive"),
- text("3 AS screw_up_the_cols")
- ])
- ).first()
-
- eq_(
- list(row.keys()),
- ["case_insensitive", "CaseSensitive", "screw_up_the_cols"])
-
- in_("case_insensitive", row._keymap)
- in_("CaseSensitive", row._keymap)
- not_in_("casesensitive", row._keymap)
-
- eq_(row["case_insensitive"], 1)
- eq_(row["CaseSensitive"], 2)
- eq_(row["screw_up_the_cols"], 3)
-
- assert_raises(KeyError, lambda: row["Case_insensitive"])
- assert_raises(KeyError, lambda: row["casesensitive"])
- assert_raises(KeyError, lambda: row["screw_UP_the_cols"])
-
- def test_row_case_insensitive(self):
- ins_db = engines.testing_engine(options={"case_sensitive": False})
- row = ins_db.execute(
- select([
- literal_column("1").label("case_insensitive"),
- literal_column("2").label("CaseSensitive")
- ])
- ).first()
-
- eq_(list(row.keys()), ["case_insensitive", "CaseSensitive"])
-
- in_("case_insensitive", row._keymap)
- in_("CaseSensitive", row._keymap)
- in_("casesensitive", row._keymap)
-
- eq_(row["case_insensitive"], 1)
- eq_(row["CaseSensitive"], 2)
- eq_(row["Case_insensitive"], 1)
- eq_(row["casesensitive"], 2)
-
- def test_row_case_insensitive_unoptimized(self):
- ins_db = engines.testing_engine(options={"case_sensitive": False})
- row = ins_db.execute(
- select([
- literal_column("1").label("case_insensitive"),
- literal_column("2").label("CaseSensitive"),
- text("3 AS screw_up_the_cols")
- ])
- ).first()
-
- eq_(
- list(row.keys()),
- ["case_insensitive", "CaseSensitive", "screw_up_the_cols"])
-
- in_("case_insensitive", row._keymap)
- in_("CaseSensitive", row._keymap)
- in_("casesensitive", row._keymap)
-
- eq_(row["case_insensitive"], 1)
- eq_(row["CaseSensitive"], 2)
- eq_(row["screw_up_the_cols"], 3)
- eq_(row["Case_insensitive"], 1)
- eq_(row["casesensitive"], 2)
- eq_(row["screw_UP_the_cols"], 3)
-
- def test_row_as_args(self):
- users.insert().execute(user_id=1, user_name='john')
- r = users.select(users.c.user_id == 1).execute().first()
- users.delete().execute()
- users.insert().execute(r)
- eq_(users.select().execute().fetchall(), [(1, 'john')])
-
- def test_result_as_args(self):
- users.insert().execute([
- dict(user_id=1, user_name='john'),
- dict(user_id=2, user_name='ed')])
- r = users.select().execute()
- users2.insert().execute(list(r))
- eq_(
- users2.select().order_by(users2.c.user_id).execute().fetchall(),
- [(1, 'john'), (2, 'ed')]
- )
-
- users2.delete().execute()
- r = users.select().execute()
- users2.insert().execute(*list(r))
- eq_(
- users2.select().order_by(users2.c.user_id).execute().fetchall(),
- [(1, 'john'), (2, 'ed')]
- )
-
- @testing.requires.duplicate_names_in_cursor_description
- def test_ambiguous_column(self):
- users.insert().execute(user_id=1, user_name='john')
- result = users.outerjoin(addresses).select().execute()
- r = result.first()
-
- assert_raises_message(
- exc.InvalidRequestError,
- "Ambiguous column name",
- lambda: r['user_id']
- )
-
- assert_raises_message(
- exc.InvalidRequestError,
- "Ambiguous column name",
- lambda: r[users.c.user_id]
- )
-
- assert_raises_message(
- exc.InvalidRequestError,
- "Ambiguous column name",
- lambda: r[addresses.c.user_id]
- )
-
- # try to trick it - fake_table isn't in the result!
- # we get the correct error
- fake_table = Table('fake', MetaData(), Column('user_id', Integer))
- assert_raises_message(
- exc.InvalidRequestError,
- "Could not locate column in row for column 'fake.user_id'",
- lambda: r[fake_table.c.user_id]
- )
-
- r = util.pickle.loads(util.pickle.dumps(r))
- assert_raises_message(
- exc.InvalidRequestError,
- "Ambiguous column name",
- lambda: r['user_id']
- )
-
- result = users.outerjoin(addresses).select().execute()
- result = _result.BufferedColumnResultProxy(result.context)
- r = result.first()
- assert isinstance(r, _result.BufferedColumnRow)
- assert_raises_message(
- exc.InvalidRequestError,
- "Ambiguous column name",
- lambda: r['user_id']
- )
-
- @testing.requires.duplicate_names_in_cursor_description
- def test_ambiguous_column_by_col(self):
- users.insert().execute(user_id=1, user_name='john')
- ua = users.alias()
- u2 = users.alias()
- result = select([users.c.user_id, ua.c.user_id]).execute()
- row = result.first()
-
- assert_raises_message(
- exc.InvalidRequestError,
- "Ambiguous column name",
- lambda: row[users.c.user_id]
- )
-
- assert_raises_message(
- exc.InvalidRequestError,
- "Ambiguous column name",
- lambda: row[ua.c.user_id]
- )
-
- # Unfortunately, this fails -
- # we'd like
- # "Could not locate column in row"
- # to be raised here, but the check for
- # "common column" in _compare_name_for_result()
- # has other requirements to be more liberal.
- # Ultimately the
- # expression system would need a way to determine
- # if given two columns in a "proxy" relationship, if they
- # refer to a different parent table
- assert_raises_message(
- exc.InvalidRequestError,
- "Ambiguous column name",
- lambda: row[u2.c.user_id]
- )
-
- @testing.requires.duplicate_names_in_cursor_description
- def test_ambiguous_column_contains(self):
- # ticket 2702. in 0.7 we'd get True, False.
- # in 0.8, both columns are present so it's True;
- # but when they're fetched you'll get the ambiguous error.
- users.insert().execute(user_id=1, user_name='john')
- result = select([users.c.user_id, addresses.c.user_id]).\
- select_from(users.outerjoin(addresses)).execute()
- row = result.first()
-
- eq_(
- set([users.c.user_id in row, addresses.c.user_id in row]),
- set([True])
- )
-
- def test_ambiguous_column_by_col_plus_label(self):
- users.insert().execute(user_id=1, user_name='john')
- result = select(
- [users.c.user_id,
- type_coerce(users.c.user_id, Integer).label('foo')]).execute()
- row = result.first()
- eq_(
- row[users.c.user_id], 1
- )
- eq_(
- row[1], 1
- )
-
- def test_fetch_partial_result_map(self):
- users.insert().execute(user_id=7, user_name='ed')
-
- t = text("select * from query_users").columns(
- user_name=String()
- )
- eq_(
- testing.db.execute(t).fetchall(), [(7, 'ed')]
- )
-
- def test_fetch_unordered_result_map(self):
- users.insert().execute(user_id=7, user_name='ed')
-
- class Goofy1(TypeDecorator):
- impl = String
-
- def process_result_value(self, value, dialect):
- return value + "a"
-
- class Goofy2(TypeDecorator):
- impl = String
-
- def process_result_value(self, value, dialect):
- return value + "b"
-
- class Goofy3(TypeDecorator):
- impl = String
-
- def process_result_value(self, value, dialect):
- return value + "c"
-
- t = text(
- "select user_name as a, user_name as b, "
- "user_name as c from query_users").columns(
- a=Goofy1(), b=Goofy2(), c=Goofy3()
- )
- eq_(
- testing.db.execute(t).fetchall(), [
- ('eda', 'edb', 'edc')
- ]
- )
-
- @testing.requires.subqueries
- def test_column_label_targeting(self):
- users.insert().execute(user_id=7, user_name='ed')
-
- for s in (
- users.select().alias('foo'),
- users.select().alias(users.name),
- ):
- row = s.select(use_labels=True).execute().first()
- assert row[s.c.user_id] == 7
- assert row[s.c.user_name] == 'ed'
-
- def test_keys(self):
- users.insert().execute(user_id=1, user_name='foo')
- result = users.select().execute()
- eq_(
- result.keys(),
- ['user_id', 'user_name']
- )
- row = result.first()
- eq_(
- row.keys(),
- ['user_id', 'user_name']
- )
-
- def test_keys_anon_labels(self):
- """test [ticket:3483]"""
-
- users.insert().execute(user_id=1, user_name='foo')
- result = testing.db.execute(
- select([
- users.c.user_id,
- users.c.user_name.label(None),
- func.count(literal_column('1'))]).
- group_by(users.c.user_id, users.c.user_name)
- )
-
- eq_(
- result.keys(),
- ['user_id', 'user_name_1', 'count_1']
- )
- row = result.first()
- eq_(
- row.keys(),
- ['user_id', 'user_name_1', 'count_1']
- )
-
- def test_items(self):
- users.insert().execute(user_id=1, user_name='foo')
- r = users.select().execute().first()
- eq_(
- [(x[0].lower(), x[1]) for x in list(r.items())],
- [('user_id', 1), ('user_name', 'foo')])
-
- def test_len(self):
- users.insert().execute(user_id=1, user_name='foo')
- r = users.select().execute().first()
- eq_(len(r), 2)
-
- r = testing.db.execute('select user_name, user_id from query_users'). \
- first()
- eq_(len(r), 2)
- r = testing.db.execute('select user_name from query_users').first()
- eq_(len(r), 1)
-
- def test_sorting_in_python(self):
- users.insert().execute(
- dict(user_id=1, user_name='foo'),
- dict(user_id=2, user_name='bar'),
- dict(user_id=3, user_name='def'),
- )
-
- rows = users.select().order_by(users.c.user_name).execute().fetchall()
-
- eq_(rows, [(2, 'bar'), (3, 'def'), (1, 'foo')])
-
- eq_(sorted(rows), [(1, 'foo'), (2, 'bar'), (3, 'def')])
-
- def test_column_order_with_simple_query(self):
- # should return values in column definition order
- users.insert().execute(user_id=1, user_name='foo')
- r = users.select(users.c.user_id == 1).execute().first()
- eq_(r[0], 1)
- eq_(r[1], 'foo')
- eq_([x.lower() for x in list(r.keys())], ['user_id', 'user_name'])
- eq_(list(r.values()), [1, 'foo'])
-
- def test_column_order_with_text_query(self):
- # should return values in query order
- users.insert().execute(user_id=1, user_name='foo')
- r = testing.db.execute('select user_name, user_id from query_users'). \
- first()
- eq_(r[0], 'foo')
- eq_(r[1], 1)
- eq_([x.lower() for x in list(r.keys())], ['user_name', 'user_id'])
- eq_(list(r.values()), ['foo', 1])
-
- @testing.crashes('oracle', 'FIXME: unknown, varify not fails_on()')
- @testing.crashes('firebird', 'An identifier must begin with a letter')
- def test_column_accessor_shadow(self):
- meta = MetaData(testing.db)
- shadowed = Table(
- 'test_shadowed', meta,
- Column('shadow_id', INT, primary_key=True),
- Column('shadow_name', VARCHAR(20)),
- Column('parent', VARCHAR(20)),
- Column('row', VARCHAR(40)),
- Column('_parent', VARCHAR(20)),
- Column('_row', VARCHAR(20)),
- )
- shadowed.create(checkfirst=True)
- try:
- shadowed.insert().execute(
- shadow_id=1, shadow_name='The Shadow', parent='The Light',
- row='Without light there is no shadow',
- _parent='Hidden parent', _row='Hidden row')
- r = shadowed.select(shadowed.c.shadow_id == 1).execute().first()
- self.assert_(
- r.shadow_id == r['shadow_id'] == r[shadowed.c.shadow_id] == 1)
- self.assert_(
- r.shadow_name == r['shadow_name'] ==
- r[shadowed.c.shadow_name] == 'The Shadow')
- self.assert_(
- r.parent == r['parent'] == r[shadowed.c.parent] == 'The Light')
- self.assert_(
- r.row == r['row'] == r[shadowed.c.row] ==
- 'Without light there is no shadow')
- self.assert_(r['_parent'] == 'Hidden parent')
- self.assert_(r['_row'] == 'Hidden row')
- finally:
- shadowed.drop(checkfirst=True)
-
@testing.emits_warning('.*empty sequence.*')
def test_in_filtering(self):
"""test the behavior of the in_() function."""
@@ -1578,393 +553,6 @@ class RequiredBindTest(fixtures.TablesTest):
is_(bindparam('foo', callable_=c, required=False).required, False)
-class TableInsertTest(fixtures.TablesTest):
-
- """test for consistent insert behavior across dialects
- regarding the inline=True flag, lower-case 't' tables.
-
- """
- run_create_tables = 'each'
- __backend__ = True
-
- @classmethod
- def define_tables(cls, metadata):
- Table(
- 'foo', metadata,
- Column('id', Integer, Sequence('t_id_seq'), primary_key=True),
- Column('data', String(50)),
- Column('x', Integer)
- )
-
- def _fixture(self, types=True):
- if types:
- t = sql.table(
- 'foo', sql.column('id', Integer),
- sql.column('data', String),
- sql.column('x', Integer))
- else:
- t = sql.table(
- 'foo', sql.column('id'), sql.column('data'), sql.column('x'))
- return t
-
- def _test(self, stmt, row, returning=None, inserted_primary_key=False):
- r = testing.db.execute(stmt)
-
- if returning:
- returned = r.first()
- eq_(returned, returning)
- elif inserted_primary_key is not False:
- eq_(r.inserted_primary_key, inserted_primary_key)
-
- eq_(testing.db.execute(self.tables.foo.select()).first(), row)
-
- def _test_multi(self, stmt, rows, data):
- testing.db.execute(stmt, rows)
- eq_(
- testing.db.execute(
- self.tables.foo.select().
- order_by(self.tables.foo.c.id)).fetchall(),
- data)
-
- @testing.requires.sequences
- def test_expicit_sequence(self):
- t = self._fixture()
- self._test(
- t.insert().values(
- id=func.next_value(Sequence('t_id_seq')), data='data', x=5),
- (1, 'data', 5)
- )
-
- def test_uppercase(self):
- t = self.tables.foo
- self._test(
- t.insert().values(id=1, data='data', x=5),
- (1, 'data', 5),
- inserted_primary_key=[1]
- )
-
- def test_uppercase_inline(self):
- t = self.tables.foo
- self._test(
- t.insert(inline=True).values(id=1, data='data', x=5),
- (1, 'data', 5),
- inserted_primary_key=[1]
- )
-
- @testing.crashes(
- "mssql+pyodbc",
- "Pyodbc + SQL Server + Py3K, some decimal handling issue")
- def test_uppercase_inline_implicit(self):
- t = self.tables.foo
- self._test(
- t.insert(inline=True).values(data='data', x=5),
- (1, 'data', 5),
- inserted_primary_key=[None]
- )
-
- def test_uppercase_implicit(self):
- t = self.tables.foo
- self._test(
- t.insert().values(data='data', x=5),
- (1, 'data', 5),
- inserted_primary_key=[1]
- )
-
- def test_uppercase_direct_params(self):
- t = self.tables.foo
- self._test(
- t.insert().values(id=1, data='data', x=5),
- (1, 'data', 5),
- inserted_primary_key=[1]
- )
-
- @testing.requires.returning
- def test_uppercase_direct_params_returning(self):
- t = self.tables.foo
- self._test(
- t.insert().values(id=1, data='data', x=5).returning(t.c.id, t.c.x),
- (1, 'data', 5),
- returning=(1, 5)
- )
-
- @testing.fails_on(
- 'mssql', "lowercase table doesn't support identity insert disable")
- def test_direct_params(self):
- t = self._fixture()
- self._test(
- t.insert().values(id=1, data='data', x=5),
- (1, 'data', 5),
- inserted_primary_key=[]
- )
-
- @testing.fails_on(
- 'mssql', "lowercase table doesn't support identity insert disable")
- @testing.requires.returning
- def test_direct_params_returning(self):
- t = self._fixture()
- self._test(
- t.insert().values(id=1, data='data', x=5).returning(t.c.id, t.c.x),
- (1, 'data', 5),
- returning=(1, 5)
- )
-
- @testing.requires.emulated_lastrowid
- def test_implicit_pk(self):
- t = self._fixture()
- self._test(
- t.insert().values(data='data', x=5),
- (1, 'data', 5),
- inserted_primary_key=[]
- )
-
- @testing.requires.emulated_lastrowid
- def test_implicit_pk_multi_rows(self):
- t = self._fixture()
- self._test_multi(
- t.insert(),
- [
- {'data': 'd1', 'x': 5},
- {'data': 'd2', 'x': 6},
- {'data': 'd3', 'x': 7},
- ],
- [
- (1, 'd1', 5),
- (2, 'd2', 6),
- (3, 'd3', 7)
- ],
- )
-
- @testing.requires.emulated_lastrowid
- def test_implicit_pk_inline(self):
- t = self._fixture()
- self._test(
- t.insert(inline=True).values(data='data', x=5),
- (1, 'data', 5),
- inserted_primary_key=[]
- )
-
-
-class KeyTargetingTest(fixtures.TablesTest):
- run_inserts = 'once'
- run_deletes = None
- __backend__ = True
-
- @classmethod
- def define_tables(cls, metadata):
- Table(
- 'keyed1', metadata, Column("a", CHAR(2), key="b"),
- Column("c", CHAR(2), key="q")
- )
- Table('keyed2', metadata, Column("a", CHAR(2)), Column("b", CHAR(2)))
- Table('keyed3', metadata, Column("a", CHAR(2)), Column("d", CHAR(2)))
- Table('keyed4', metadata, Column("b", CHAR(2)), Column("q", CHAR(2)))
- Table('content', metadata, Column('t', String(30), key="type"))
- Table('bar', metadata, Column('ctype', String(30), key="content_type"))
-
- if testing.requires.schemas.enabled:
- Table(
- 'wschema', metadata,
- Column("a", CHAR(2), key="b"),
- Column("c", CHAR(2), key="q"),
- schema=testing.config.test_schema
- )
-
- @classmethod
- def insert_data(cls):
- cls.tables.keyed1.insert().execute(dict(b="a1", q="c1"))
- cls.tables.keyed2.insert().execute(dict(a="a2", b="b2"))
- cls.tables.keyed3.insert().execute(dict(a="a3", d="d3"))
- cls.tables.keyed4.insert().execute(dict(b="b4", q="q4"))
- cls.tables.content.insert().execute(type="t1")
-
- if testing.requires.schemas.enabled:
- cls.tables['%s.wschema' % testing.config.test_schema].insert().execute(
- dict(b="a1", q="c1"))
-
- @testing.requires.schemas
- def test_keyed_accessor_wschema(self):
- keyed1 = self.tables['%s.wschema' % testing.config.test_schema]
- row = testing.db.execute(keyed1.select()).first()
-
- eq_(row.b, "a1")
- eq_(row.q, "c1")
- eq_(row.a, "a1")
- eq_(row.c, "c1")
-
- def test_keyed_accessor_single(self):
- keyed1 = self.tables.keyed1
- row = testing.db.execute(keyed1.select()).first()
-
- eq_(row.b, "a1")
- eq_(row.q, "c1")
- eq_(row.a, "a1")
- eq_(row.c, "c1")
-
- def test_keyed_accessor_single_labeled(self):
- keyed1 = self.tables.keyed1
- row = testing.db.execute(keyed1.select().apply_labels()).first()
-
- eq_(row.keyed1_b, "a1")
- eq_(row.keyed1_q, "c1")
- eq_(row.keyed1_a, "a1")
- eq_(row.keyed1_c, "c1")
-
- @testing.requires.duplicate_names_in_cursor_description
- def test_keyed_accessor_composite_conflict_2(self):
- keyed1 = self.tables.keyed1
- keyed2 = self.tables.keyed2
-
- row = testing.db.execute(select([keyed1, keyed2])).first()
- # row.b is unambiguous
- eq_(row.b, "b2")
- # row.a is ambiguous
- assert_raises_message(
- exc.InvalidRequestError,
- "Ambig",
- getattr, row, "a"
- )
-
- def test_keyed_accessor_composite_names_precedent(self):
- keyed1 = self.tables.keyed1
- keyed4 = self.tables.keyed4
-
- row = testing.db.execute(select([keyed1, keyed4])).first()
- eq_(row.b, "b4")
- eq_(row.q, "q4")
- eq_(row.a, "a1")
- eq_(row.c, "c1")
-
- @testing.requires.duplicate_names_in_cursor_description
- def test_keyed_accessor_composite_keys_precedent(self):
- keyed1 = self.tables.keyed1
- keyed3 = self.tables.keyed3
-
- row = testing.db.execute(select([keyed1, keyed3])).first()
- eq_(row.q, "c1")
- assert_raises_message(
- exc.InvalidRequestError,
- "Ambiguous column name 'b'",
- getattr, row, "b"
- )
- assert_raises_message(
- exc.InvalidRequestError,
- "Ambiguous column name 'a'",
- getattr, row, "a"
- )
- eq_(row.d, "d3")
-
- def test_keyed_accessor_composite_labeled(self):
- keyed1 = self.tables.keyed1
- keyed2 = self.tables.keyed2
-
- row = testing.db.execute(select([keyed1, keyed2]).apply_labels()). \
- first()
- eq_(row.keyed1_b, "a1")
- eq_(row.keyed1_a, "a1")
- eq_(row.keyed1_q, "c1")
- eq_(row.keyed1_c, "c1")
- eq_(row.keyed2_a, "a2")
- eq_(row.keyed2_b, "b2")
- assert_raises(KeyError, lambda: row['keyed2_c'])
- assert_raises(KeyError, lambda: row['keyed2_q'])
-
- def test_column_label_overlap_fallback(self):
- content, bar = self.tables.content, self.tables.bar
- row = testing.db.execute(
- select([content.c.type.label("content_type")])).first()
- assert content.c.type not in row
- assert bar.c.content_type not in row
- assert sql.column('content_type') in row
-
- row = testing.db.execute(select([func.now().label("content_type")])). \
- first()
- assert content.c.type not in row
- assert bar.c.content_type not in row
- assert sql.column('content_type') in row
-
- def test_column_label_overlap_fallback_2(self):
- content, bar = self.tables.content, self.tables.bar
- row = testing.db.execute(content.select(use_labels=True)).first()
- assert content.c.type in row
- assert bar.c.content_type not in row
- assert sql.column('content_type') not in row
-
- def test_columnclause_schema_column_one(self):
- keyed2 = self.tables.keyed2
-
- # this is addressed by [ticket:2932]
- # ColumnClause._compare_name_for_result allows the
- # columns which the statement is against to be lightweight
- # cols, which results in a more liberal comparison scheme
- a, b = sql.column('a'), sql.column('b')
- stmt = select([a, b]).select_from(table("keyed2"))
- row = testing.db.execute(stmt).first()
-
- assert keyed2.c.a in row
- assert keyed2.c.b in row
- assert a in row
- assert b in row
-
- def test_columnclause_schema_column_two(self):
- keyed2 = self.tables.keyed2
-
- a, b = sql.column('a'), sql.column('b')
- stmt = select([keyed2.c.a, keyed2.c.b])
- row = testing.db.execute(stmt).first()
-
- assert keyed2.c.a in row
- assert keyed2.c.b in row
- assert a in row
- assert b in row
-
- def test_columnclause_schema_column_three(self):
- keyed2 = self.tables.keyed2
-
- # this is also addressed by [ticket:2932]
-
- a, b = sql.column('a'), sql.column('b')
- stmt = text("select a, b from keyed2").columns(a=CHAR, b=CHAR)
- row = testing.db.execute(stmt).first()
-
- assert keyed2.c.a in row
- assert keyed2.c.b in row
- assert a in row
- assert b in row
- assert stmt.c.a in row
- assert stmt.c.b in row
-
- def test_columnclause_schema_column_four(self):
- keyed2 = self.tables.keyed2
-
- # this is also addressed by [ticket:2932]
-
- a, b = sql.column('keyed2_a'), sql.column('keyed2_b')
- stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns(
- a, b)
- row = testing.db.execute(stmt).first()
-
- assert keyed2.c.a in row
- assert keyed2.c.b in row
- assert a in row
- assert b in row
- assert stmt.c.keyed2_a in row
- assert stmt.c.keyed2_b in row
-
- def test_columnclause_schema_column_five(self):
- keyed2 = self.tables.keyed2
-
- # this is also addressed by [ticket:2932]
-
- stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns(
- keyed2_a=CHAR, keyed2_b=CHAR)
- row = testing.db.execute(stmt).first()
-
- assert keyed2.c.a in row
- assert keyed2.c.b in row
- assert stmt.c.keyed2_a in row
- assert stmt.c.keyed2_b in row
-
-
class LimitTest(fixtures.TestBase):
__backend__ = True