summaryrefslogtreecommitdiff
path: root/test/sql/test_compiler.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2019-08-18 10:02:24 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2019-08-30 17:38:09 -0400
commit36e8fe48b2332ecc44b506d1f86cc6ab3bb65f07 (patch)
tree38a218519ba4618fb6290c6851a4510b0ffed0a3 /test/sql/test_compiler.py
parentf499671ccc30cd42d6e3beb6ddec60e104bff9c5 (diff)
downloadsqlalchemy-36e8fe48b2332ecc44b506d1f86cc6ab3bb65f07.tar.gz
Render LIMIT/OFFSET conditions after compile on select dialects
Added new "post compile parameters" feature. This feature allows a :func:`.bindparam` construct to have its value rendered into the SQL string before being passed to the DBAPI driver, but after the compilation step, using the "literal render" feature of the compiler. The immediate rationale for this feature is to support LIMIT/OFFSET schemes that don't work or perform well as bound parameters handled by the database driver, while still allowing for SQLAlchemy SQL constructs to be cacheable in their compiled form. The immediate targets for the new feature are the "TOP N" clause used by SQL Server (and Sybase) which does not support a bound parameter, as well as the "ROWNUM" and optional "FIRST_ROWS()" schemes used by the Oracle dialect, the former of which has been known to perform better without bound parameters and the latter of which does not support a bound parameter. The feature builds upon the mechanisms first developed to support "expanding" parameters for IN expressions. As part of this feature, the Oracle ``use_binds_for_limits`` feature is turned on unconditionally and this flag is now deprecated. - adds limited support for "unique" bound parameters within a text() construct. - adds an additional int() check within the literal render function of the Integer datatype and tests that non-int values raise ValueError. Fixes: #4808 Change-Id: Iace97d544d1a7351ee07db970c6bc06a19c712c6
Diffstat (limited to 'test/sql/test_compiler.py')
-rw-r--r--test/sql/test_compiler.py961
1 files changed, 501 insertions, 460 deletions
diff --git a/test/sql/test_compiler.py b/test/sql/test_compiler.py
index 03e18e921..ebc0fc631 100644
--- a/test/sql/test_compiler.py
+++ b/test/sql/test_compiler.py
@@ -311,20 +311,6 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
checkparams=params,
)
- def test_limit_offset_select_literal_binds(self):
- stmt = select([1]).limit(5).offset(6)
- self.assert_compile(
- stmt, "SELECT 1 LIMIT 5 OFFSET 6", literal_binds=True
- )
-
- def test_limit_offset_compound_select_literal_binds(self):
- stmt = select([1]).union(select([2])).limit(5).offset(6)
- self.assert_compile(
- stmt,
- "SELECT 1 UNION SELECT 2 LIMIT 5 OFFSET 6",
- literal_binds=True,
- )
-
def test_select_precol_compile_ordering(self):
s1 = (
select([column("x")])
@@ -1304,20 +1290,6 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
"SELECT mytable.myid FROM mytable",
)
- def test_multiple_col_binds(self):
- self.assert_compile(
- select(
- [literal_column("*")],
- or_(
- table1.c.myid == 12,
- table1.c.myid == "asdf",
- table1.c.myid == "foo",
- ),
- ),
- "SELECT * FROM mytable WHERE mytable.myid = :myid_1 "
- "OR mytable.myid = :myid_2 OR mytable.myid = :myid_3",
- )
-
def test_order_by_nulls(self):
self.assert_compile(
table2.select(
@@ -1631,71 +1603,6 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
dialect=mysql.dialect(),
)
- def test_render_binds_as_literal(self):
- """test a compiler that renders binds inline into
- SQL in the columns clause."""
-
- dialect = default.DefaultDialect()
-
- class Compiler(dialect.statement_compiler):
- ansi_bind_rules = True
-
- dialect.statement_compiler = Compiler
-
- self.assert_compile(
- select([literal("someliteral")]),
- "SELECT 'someliteral' AS anon_1",
- dialect=dialect,
- )
-
- self.assert_compile(
- select([table1.c.myid + 3]),
- "SELECT mytable.myid + 3 AS anon_1 FROM mytable",
- dialect=dialect,
- )
-
- self.assert_compile(
- select([table1.c.myid.in_([4, 5, 6])]),
- "SELECT mytable.myid IN (4, 5, 6) AS anon_1 FROM mytable",
- dialect=dialect,
- )
-
- self.assert_compile(
- select([func.mod(table1.c.myid, 5)]),
- "SELECT mod(mytable.myid, 5) AS mod_1 FROM mytable",
- dialect=dialect,
- )
-
- self.assert_compile(
- select([literal("foo").in_([])]),
- "SELECT 1 != 1 AS anon_1",
- dialect=dialect,
- )
-
- self.assert_compile(
- select([literal(util.b("foo"))]),
- "SELECT 'foo' AS anon_1",
- dialect=dialect,
- )
-
- # test callable
- self.assert_compile(
- select([table1.c.myid == bindparam("foo", callable_=lambda: 5)]),
- "SELECT mytable.myid = 5 AS anon_1 FROM mytable",
- dialect=dialect,
- )
-
- empty_in_dialect = default.DefaultDialect(empty_in_strategy="dynamic")
- empty_in_dialect.statement_compiler = Compiler
-
- assert_raises_message(
- exc.CompileError,
- "Bind parameter 'foo' without a "
- "renderable value not allowed here.",
- bindparam("foo").in_([]).compile,
- dialect=empty_in_dialect,
- )
-
def test_collate(self):
# columns clause
self.assert_compile(
@@ -2214,373 +2121,6 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
" LIMIT -1 OFFSET :param_2) AS anon_2",
)
- def test_binds(self):
- for (
- stmt,
- expected_named_stmt,
- expected_positional_stmt,
- expected_default_params_dict,
- expected_default_params_list,
- test_param_dict,
- expected_test_params_dict,
- expected_test_params_list,
- ) in [
- (
- select(
- [table1, table2],
- and_(
- table1.c.myid == table2.c.otherid,
- table1.c.name == bindparam("mytablename"),
- ),
- ),
- "SELECT mytable.myid, mytable.name, mytable.description, "
- "myothertable.otherid, myothertable.othername FROM mytable, "
- "myothertable WHERE mytable.myid = myothertable.otherid "
- "AND mytable.name = :mytablename",
- "SELECT mytable.myid, mytable.name, mytable.description, "
- "myothertable.otherid, myothertable.othername FROM mytable, "
- "myothertable WHERE mytable.myid = myothertable.otherid AND "
- "mytable.name = ?",
- {"mytablename": None},
- [None],
- {"mytablename": 5},
- {"mytablename": 5},
- [5],
- ),
- (
- select(
- [table1],
- or_(
- table1.c.myid == bindparam("myid"),
- table2.c.otherid == bindparam("myid"),
- ),
- ),
- "SELECT mytable.myid, mytable.name, mytable.description "
- "FROM mytable, myothertable WHERE mytable.myid = :myid "
- "OR myothertable.otherid = :myid",
- "SELECT mytable.myid, mytable.name, mytable.description "
- "FROM mytable, myothertable WHERE mytable.myid = ? "
- "OR myothertable.otherid = ?",
- {"myid": None},
- [None, None],
- {"myid": 5},
- {"myid": 5},
- [5, 5],
- ),
- (
- text(
- "SELECT mytable.myid, mytable.name, "
- "mytable.description FROM "
- "mytable, myothertable WHERE mytable.myid = :myid OR "
- "myothertable.otherid = :myid"
- ),
- "SELECT mytable.myid, mytable.name, mytable.description FROM "
- "mytable, myothertable WHERE mytable.myid = :myid OR "
- "myothertable.otherid = :myid",
- "SELECT mytable.myid, mytable.name, mytable.description FROM "
- "mytable, myothertable WHERE mytable.myid = ? OR "
- "myothertable.otherid = ?",
- {"myid": None},
- [None, None],
- {"myid": 5},
- {"myid": 5},
- [5, 5],
- ),
- (
- select(
- [table1],
- or_(
- table1.c.myid == bindparam("myid", unique=True),
- table2.c.otherid == bindparam("myid", unique=True),
- ),
- ),
- "SELECT mytable.myid, mytable.name, mytable.description FROM "
- "mytable, myothertable WHERE mytable.myid = "
- ":myid_1 OR myothertable.otherid = :myid_2",
- "SELECT mytable.myid, mytable.name, mytable.description FROM "
- "mytable, myothertable WHERE mytable.myid = ? "
- "OR myothertable.otherid = ?",
- {"myid_1": None, "myid_2": None},
- [None, None],
- {"myid_1": 5, "myid_2": 6},
- {"myid_1": 5, "myid_2": 6},
- [5, 6],
- ),
- (
- bindparam("test", type_=String, required=False) + text("'hi'"),
- ":test || 'hi'",
- "? || 'hi'",
- {"test": None},
- [None],
- {},
- {"test": None},
- [None],
- ),
- (
- # testing select.params() here - bindparam() objects
- # must get required flag set to False
- select(
- [table1],
- or_(
- table1.c.myid == bindparam("myid"),
- table2.c.otherid == bindparam("myotherid"),
- ),
- ).params({"myid": 8, "myotherid": 7}),
- "SELECT mytable.myid, mytable.name, mytable.description FROM "
- "mytable, myothertable WHERE mytable.myid = "
- ":myid OR myothertable.otherid = :myotherid",
- "SELECT mytable.myid, mytable.name, mytable.description FROM "
- "mytable, myothertable WHERE mytable.myid = "
- "? OR myothertable.otherid = ?",
- {"myid": 8, "myotherid": 7},
- [8, 7],
- {"myid": 5},
- {"myid": 5, "myotherid": 7},
- [5, 7],
- ),
- (
- select(
- [table1],
- or_(
- table1.c.myid
- == bindparam("myid", value=7, unique=True),
- table2.c.otherid
- == bindparam("myid", value=8, unique=True),
- ),
- ),
- "SELECT mytable.myid, mytable.name, mytable.description FROM "
- "mytable, myothertable WHERE mytable.myid = "
- ":myid_1 OR myothertable.otherid = :myid_2",
- "SELECT mytable.myid, mytable.name, mytable.description FROM "
- "mytable, myothertable WHERE mytable.myid = "
- "? OR myothertable.otherid = ?",
- {"myid_1": 7, "myid_2": 8},
- [7, 8],
- {"myid_1": 5, "myid_2": 6},
- {"myid_1": 5, "myid_2": 6},
- [5, 6],
- ),
- ]:
-
- self.assert_compile(
- stmt, expected_named_stmt, params=expected_default_params_dict
- )
- self.assert_compile(
- stmt, expected_positional_stmt, dialect=sqlite.dialect()
- )
- nonpositional = stmt.compile()
- positional = stmt.compile(dialect=sqlite.dialect())
- pp = positional.params
- eq_(
- [pp[k] for k in positional.positiontup],
- expected_default_params_list,
- )
-
- eq_(
- nonpositional.construct_params(test_param_dict),
- expected_test_params_dict,
- )
- pp = positional.construct_params(test_param_dict)
- eq_(
- [pp[k] for k in positional.positiontup],
- expected_test_params_list,
- )
-
- # check that params() doesn't modify original statement
- s = select(
- [table1],
- or_(
- table1.c.myid == bindparam("myid"),
- table2.c.otherid == bindparam("myotherid"),
- ),
- )
- s2 = s.params({"myid": 8, "myotherid": 7})
- s3 = s2.params({"myid": 9})
- assert s.compile().params == {"myid": None, "myotherid": None}
- assert s2.compile().params == {"myid": 8, "myotherid": 7}
- assert s3.compile().params == {"myid": 9, "myotherid": 7}
-
- # test using same 'unique' param object twice in one compile
- s = (
- select([table1.c.myid])
- .where(table1.c.myid == 12)
- .scalar_subquery()
- )
- s2 = select([table1, s], table1.c.myid == s)
- self.assert_compile(
- s2,
- "SELECT mytable.myid, mytable.name, mytable.description, "
- "(SELECT mytable.myid FROM mytable WHERE mytable.myid = "
- ":myid_1) AS anon_1 FROM mytable WHERE mytable.myid = "
- "(SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_1)",
- )
- positional = s2.compile(dialect=sqlite.dialect())
-
- pp = positional.params
- assert [pp[k] for k in positional.positiontup] == [12, 12]
-
- # check that conflicts with "unique" params are caught
- s = select(
- [table1],
- or_(table1.c.myid == 7, table1.c.myid == bindparam("myid_1")),
- )
- assert_raises_message(
- exc.CompileError,
- "conflicts with unique bind parameter " "of the same name",
- str,
- s,
- )
-
- s = select(
- [table1],
- or_(
- table1.c.myid == 7,
- table1.c.myid == 8,
- table1.c.myid == bindparam("myid_1"),
- ),
- )
- assert_raises_message(
- exc.CompileError,
- "conflicts with unique bind parameter " "of the same name",
- str,
- s,
- )
-
- def _test_binds_no_hash_collision(self):
- """test that construct_params doesn't corrupt dict
- due to hash collisions"""
-
- total_params = 100000
-
- in_clause = [":in%d" % i for i in range(total_params)]
- params = dict(("in%d" % i, i) for i in range(total_params))
- t = text("text clause %s" % ", ".join(in_clause))
- eq_(len(t.bindparams), total_params)
- c = t.compile()
- pp = c.construct_params(params)
- eq_(len(set(pp)), total_params, "%s %s" % (len(set(pp)), len(pp)))
- eq_(len(set(pp.values())), total_params)
-
- def test_bind_as_col(self):
- t = table("foo", column("id"))
-
- s = select([t, literal("lala").label("hoho")])
- self.assert_compile(s, "SELECT foo.id, :param_1 AS hoho FROM foo")
-
- assert [str(c) for c in s.subquery().c] == ["anon_1.id", "anon_1.hoho"]
-
- def test_bind_callable(self):
- expr = column("x") == bindparam("key", callable_=lambda: 12)
- self.assert_compile(expr, "x = :key", {"x": 12})
-
- def test_bind_params_missing(self):
- assert_raises_message(
- exc.InvalidRequestError,
- r"A value is required for bind parameter 'x'",
- select([table1])
- .where(
- and_(
- table1.c.myid == bindparam("x", required=True),
- table1.c.name == bindparam("y", required=True),
- )
- )
- .compile()
- .construct_params,
- params=dict(y=5),
- )
-
- assert_raises_message(
- exc.InvalidRequestError,
- r"A value is required for bind parameter 'x'",
- select([table1])
- .where(table1.c.myid == bindparam("x", required=True))
- .compile()
- .construct_params,
- )
-
- assert_raises_message(
- exc.InvalidRequestError,
- r"A value is required for bind parameter 'x', "
- "in parameter group 2",
- select([table1])
- .where(
- and_(
- table1.c.myid == bindparam("x", required=True),
- table1.c.name == bindparam("y", required=True),
- )
- )
- .compile()
- .construct_params,
- params=dict(y=5),
- _group_number=2,
- )
-
- assert_raises_message(
- exc.InvalidRequestError,
- r"A value is required for bind parameter 'x', "
- "in parameter group 2",
- select([table1])
- .where(table1.c.myid == bindparam("x", required=True))
- .compile()
- .construct_params,
- _group_number=2,
- )
-
- def test_tuple(self):
- self.assert_compile(
- tuple_(table1.c.myid, table1.c.name).in_([(1, "foo"), (5, "bar")]),
- "(mytable.myid, mytable.name) IN "
- "((:param_1, :param_2), (:param_3, :param_4))",
- )
-
- dialect = default.DefaultDialect()
- dialect.tuple_in_values = True
- self.assert_compile(
- tuple_(table1.c.myid, table1.c.name).in_([(1, "foo"), (5, "bar")]),
- "(mytable.myid, mytable.name) IN "
- "(VALUES (:param_1, :param_2), (:param_3, :param_4))",
- dialect=dialect,
- )
-
- self.assert_compile(
- tuple_(table1.c.myid, table1.c.name).in_(
- [tuple_(table2.c.otherid, table2.c.othername)]
- ),
- "(mytable.myid, mytable.name) IN "
- "((myothertable.otherid, myothertable.othername))",
- )
-
- self.assert_compile(
- tuple_(table1.c.myid, table1.c.name).in_(
- select([table2.c.otherid, table2.c.othername])
- ),
- "(mytable.myid, mytable.name) IN (SELECT "
- "myothertable.otherid, myothertable.othername FROM myothertable)",
- )
-
- def test_expanding_parameter(self):
- self.assert_compile(
- tuple_(table1.c.myid, table1.c.name).in_(
- bindparam("foo", expanding=True)
- ),
- "(mytable.myid, mytable.name) IN ([EXPANDING_foo])",
- )
-
- dialect = default.DefaultDialect()
- dialect.tuple_in_values = True
- self.assert_compile(
- tuple_(table1.c.myid, table1.c.name).in_(
- bindparam("foo", expanding=True)
- ),
- "(mytable.myid, mytable.name) IN ([EXPANDING_foo])",
- dialect=dialect,
- )
-
- self.assert_compile(
- table1.c.myid.in_(bindparam("foo", expanding=True)),
- "mytable.myid IN ([EXPANDING_foo])",
- )
-
def test_cast(self):
tbl = table(
"casttest",
@@ -3250,6 +2790,507 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
assert_raises(exc.ArgumentError, and_, ("a",), ("b",))
+class BindParameterTest(AssertsCompiledSQL, fixtures.TestBase):
+ __dialect__ = "default"
+
+ def test_binds(self):
+ for (
+ stmt,
+ expected_named_stmt,
+ expected_positional_stmt,
+ expected_default_params_dict,
+ expected_default_params_list,
+ test_param_dict,
+ expected_test_params_dict,
+ expected_test_params_list,
+ ) in [
+ (
+ select(
+ [table1, table2],
+ and_(
+ table1.c.myid == table2.c.otherid,
+ table1.c.name == bindparam("mytablename"),
+ ),
+ ),
+ "SELECT mytable.myid, mytable.name, mytable.description, "
+ "myothertable.otherid, myothertable.othername FROM mytable, "
+ "myothertable WHERE mytable.myid = myothertable.otherid "
+ "AND mytable.name = :mytablename",
+ "SELECT mytable.myid, mytable.name, mytable.description, "
+ "myothertable.otherid, myothertable.othername FROM mytable, "
+ "myothertable WHERE mytable.myid = myothertable.otherid AND "
+ "mytable.name = ?",
+ {"mytablename": None},
+ [None],
+ {"mytablename": 5},
+ {"mytablename": 5},
+ [5],
+ ),
+ (
+ select(
+ [table1],
+ or_(
+ table1.c.myid == bindparam("myid"),
+ table2.c.otherid == bindparam("myid"),
+ ),
+ ),
+ "SELECT mytable.myid, mytable.name, mytable.description "
+ "FROM mytable, myothertable WHERE mytable.myid = :myid "
+ "OR myothertable.otherid = :myid",
+ "SELECT mytable.myid, mytable.name, mytable.description "
+ "FROM mytable, myothertable WHERE mytable.myid = ? "
+ "OR myothertable.otherid = ?",
+ {"myid": None},
+ [None, None],
+ {"myid": 5},
+ {"myid": 5},
+ [5, 5],
+ ),
+ (
+ text(
+ "SELECT mytable.myid, mytable.name, "
+ "mytable.description FROM "
+ "mytable, myothertable WHERE mytable.myid = :myid OR "
+ "myothertable.otherid = :myid"
+ ),
+ "SELECT mytable.myid, mytable.name, mytable.description FROM "
+ "mytable, myothertable WHERE mytable.myid = :myid OR "
+ "myothertable.otherid = :myid",
+ "SELECT mytable.myid, mytable.name, mytable.description FROM "
+ "mytable, myothertable WHERE mytable.myid = ? OR "
+ "myothertable.otherid = ?",
+ {"myid": None},
+ [None, None],
+ {"myid": 5},
+ {"myid": 5},
+ [5, 5],
+ ),
+ (
+ select(
+ [table1],
+ or_(
+ table1.c.myid == bindparam("myid", unique=True),
+ table2.c.otherid == bindparam("myid", unique=True),
+ ),
+ ),
+ "SELECT mytable.myid, mytable.name, mytable.description FROM "
+ "mytable, myothertable WHERE mytable.myid = "
+ ":myid_1 OR myothertable.otherid = :myid_2",
+ "SELECT mytable.myid, mytable.name, mytable.description FROM "
+ "mytable, myothertable WHERE mytable.myid = ? "
+ "OR myothertable.otherid = ?",
+ {"myid_1": None, "myid_2": None},
+ [None, None],
+ {"myid_1": 5, "myid_2": 6},
+ {"myid_1": 5, "myid_2": 6},
+ [5, 6],
+ ),
+ (
+ bindparam("test", type_=String, required=False) + text("'hi'"),
+ ":test || 'hi'",
+ "? || 'hi'",
+ {"test": None},
+ [None],
+ {},
+ {"test": None},
+ [None],
+ ),
+ (
+ # testing select.params() here - bindparam() objects
+ # must get required flag set to False
+ select(
+ [table1],
+ or_(
+ table1.c.myid == bindparam("myid"),
+ table2.c.otherid == bindparam("myotherid"),
+ ),
+ ).params({"myid": 8, "myotherid": 7}),
+ "SELECT mytable.myid, mytable.name, mytable.description FROM "
+ "mytable, myothertable WHERE mytable.myid = "
+ ":myid OR myothertable.otherid = :myotherid",
+ "SELECT mytable.myid, mytable.name, mytable.description FROM "
+ "mytable, myothertable WHERE mytable.myid = "
+ "? OR myothertable.otherid = ?",
+ {"myid": 8, "myotherid": 7},
+ [8, 7],
+ {"myid": 5},
+ {"myid": 5, "myotherid": 7},
+ [5, 7],
+ ),
+ (
+ select(
+ [table1],
+ or_(
+ table1.c.myid
+ == bindparam("myid", value=7, unique=True),
+ table2.c.otherid
+ == bindparam("myid", value=8, unique=True),
+ ),
+ ),
+ "SELECT mytable.myid, mytable.name, mytable.description FROM "
+ "mytable, myothertable WHERE mytable.myid = "
+ ":myid_1 OR myothertable.otherid = :myid_2",
+ "SELECT mytable.myid, mytable.name, mytable.description FROM "
+ "mytable, myothertable WHERE mytable.myid = "
+ "? OR myothertable.otherid = ?",
+ {"myid_1": 7, "myid_2": 8},
+ [7, 8],
+ {"myid_1": 5, "myid_2": 6},
+ {"myid_1": 5, "myid_2": 6},
+ [5, 6],
+ ),
+ ]:
+
+ self.assert_compile(
+ stmt, expected_named_stmt, params=expected_default_params_dict
+ )
+ self.assert_compile(
+ stmt, expected_positional_stmt, dialect=sqlite.dialect()
+ )
+ nonpositional = stmt.compile()
+ positional = stmt.compile(dialect=sqlite.dialect())
+ pp = positional.params
+ eq_(
+ [pp[k] for k in positional.positiontup],
+ expected_default_params_list,
+ )
+
+ eq_(
+ nonpositional.construct_params(test_param_dict),
+ expected_test_params_dict,
+ )
+ pp = positional.construct_params(test_param_dict)
+ eq_(
+ [pp[k] for k in positional.positiontup],
+ expected_test_params_list,
+ )
+
+ # check that params() doesn't modify original statement
+ s = select(
+ [table1],
+ or_(
+ table1.c.myid == bindparam("myid"),
+ table2.c.otherid == bindparam("myotherid"),
+ ),
+ )
+ s2 = s.params({"myid": 8, "myotherid": 7})
+ s3 = s2.params({"myid": 9})
+ assert s.compile().params == {"myid": None, "myotherid": None}
+ assert s2.compile().params == {"myid": 8, "myotherid": 7}
+ assert s3.compile().params == {"myid": 9, "myotherid": 7}
+
+ # test using same 'unique' param object twice in one compile
+ s = (
+ select([table1.c.myid])
+ .where(table1.c.myid == 12)
+ .scalar_subquery()
+ )
+ s2 = select([table1, s], table1.c.myid == s)
+ self.assert_compile(
+ s2,
+ "SELECT mytable.myid, mytable.name, mytable.description, "
+ "(SELECT mytable.myid FROM mytable WHERE mytable.myid = "
+ ":myid_1) AS anon_1 FROM mytable WHERE mytable.myid = "
+ "(SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_1)",
+ )
+ positional = s2.compile(dialect=sqlite.dialect())
+
+ pp = positional.params
+ assert [pp[k] for k in positional.positiontup] == [12, 12]
+
+ # check that conflicts with "unique" params are caught
+ s = select(
+ [table1],
+ or_(table1.c.myid == 7, table1.c.myid == bindparam("myid_1")),
+ )
+ assert_raises_message(
+ exc.CompileError,
+ "conflicts with unique bind parameter " "of the same name",
+ str,
+ s,
+ )
+
+ s = select(
+ [table1],
+ or_(
+ table1.c.myid == 7,
+ table1.c.myid == 8,
+ table1.c.myid == bindparam("myid_1"),
+ ),
+ )
+ assert_raises_message(
+ exc.CompileError,
+ "conflicts with unique bind parameter " "of the same name",
+ str,
+ s,
+ )
+
+ def _test_binds_no_hash_collision(self):
+ """test that construct_params doesn't corrupt dict
+ due to hash collisions"""
+
+ total_params = 100000
+
+ in_clause = [":in%d" % i for i in range(total_params)]
+ params = dict(("in%d" % i, i) for i in range(total_params))
+ t = text("text clause %s" % ", ".join(in_clause))
+ eq_(len(t.bindparams), total_params)
+ c = t.compile()
+ pp = c.construct_params(params)
+ eq_(len(set(pp)), total_params, "%s %s" % (len(set(pp)), len(pp)))
+ eq_(len(set(pp.values())), total_params)
+
+ def test_bind_as_col(self):
+ t = table("foo", column("id"))
+
+ s = select([t, literal("lala").label("hoho")])
+ self.assert_compile(s, "SELECT foo.id, :param_1 AS hoho FROM foo")
+
+ assert [str(c) for c in s.subquery().c] == ["anon_1.id", "anon_1.hoho"]
+
+ def test_bind_callable(self):
+ expr = column("x") == bindparam("key", callable_=lambda: 12)
+ self.assert_compile(expr, "x = :key", {"x": 12})
+
+ def test_bind_params_missing(self):
+ assert_raises_message(
+ exc.InvalidRequestError,
+ r"A value is required for bind parameter 'x'",
+ select([table1])
+ .where(
+ and_(
+ table1.c.myid == bindparam("x", required=True),
+ table1.c.name == bindparam("y", required=True),
+ )
+ )
+ .compile()
+ .construct_params,
+ params=dict(y=5),
+ )
+
+ assert_raises_message(
+ exc.InvalidRequestError,
+ r"A value is required for bind parameter 'x'",
+ select([table1])
+ .where(table1.c.myid == bindparam("x", required=True))
+ .compile()
+ .construct_params,
+ )
+
+ assert_raises_message(
+ exc.InvalidRequestError,
+ r"A value is required for bind parameter 'x', "
+ "in parameter group 2",
+ select([table1])
+ .where(
+ and_(
+ table1.c.myid == bindparam("x", required=True),
+ table1.c.name == bindparam("y", required=True),
+ )
+ )
+ .compile()
+ .construct_params,
+ params=dict(y=5),
+ _group_number=2,
+ )
+
+ assert_raises_message(
+ exc.InvalidRequestError,
+ r"A value is required for bind parameter 'x', "
+ "in parameter group 2",
+ select([table1])
+ .where(table1.c.myid == bindparam("x", required=True))
+ .compile()
+ .construct_params,
+ _group_number=2,
+ )
+
+ def test_tuple(self):
+ self.assert_compile(
+ tuple_(table1.c.myid, table1.c.name).in_([(1, "foo"), (5, "bar")]),
+ "(mytable.myid, mytable.name) IN "
+ "((:param_1, :param_2), (:param_3, :param_4))",
+ )
+
+ dialect = default.DefaultDialect()
+ dialect.tuple_in_values = True
+ self.assert_compile(
+ tuple_(table1.c.myid, table1.c.name).in_([(1, "foo"), (5, "bar")]),
+ "(mytable.myid, mytable.name) IN "
+ "(VALUES (:param_1, :param_2), (:param_3, :param_4))",
+ dialect=dialect,
+ )
+
+ self.assert_compile(
+ tuple_(table1.c.myid, table1.c.name).in_(
+ [tuple_(table2.c.otherid, table2.c.othername)]
+ ),
+ "(mytable.myid, mytable.name) IN "
+ "((myothertable.otherid, myothertable.othername))",
+ )
+
+ self.assert_compile(
+ tuple_(table1.c.myid, table1.c.name).in_(
+ select([table2.c.otherid, table2.c.othername])
+ ),
+ "(mytable.myid, mytable.name) IN (SELECT "
+ "myothertable.otherid, myothertable.othername FROM myothertable)",
+ )
+
+ def test_expanding_parameter(self):
+ self.assert_compile(
+ tuple_(table1.c.myid, table1.c.name).in_(
+ bindparam("foo", expanding=True)
+ ),
+ "(mytable.myid, mytable.name) IN ([POSTCOMPILE_foo])",
+ )
+
+ dialect = default.DefaultDialect()
+ dialect.tuple_in_values = True
+ self.assert_compile(
+ tuple_(table1.c.myid, table1.c.name).in_(
+ bindparam("foo", expanding=True)
+ ),
+ "(mytable.myid, mytable.name) IN ([POSTCOMPILE_foo])",
+ dialect=dialect,
+ )
+
+ self.assert_compile(
+ table1.c.myid.in_(bindparam("foo", expanding=True)),
+ "mytable.myid IN ([POSTCOMPILE_foo])",
+ )
+
+ def test_limit_offset_select_literal_binds(self):
+ stmt = select([1]).limit(5).offset(6)
+ self.assert_compile(
+ stmt, "SELECT 1 LIMIT 5 OFFSET 6", literal_binds=True
+ )
+
+ def test_limit_offset_compound_select_literal_binds(self):
+ stmt = select([1]).union(select([2])).limit(5).offset(6)
+ self.assert_compile(
+ stmt,
+ "SELECT 1 UNION SELECT 2 LIMIT 5 OFFSET 6",
+ literal_binds=True,
+ )
+
+ def test_multiple_col_binds(self):
+ self.assert_compile(
+ select(
+ [literal_column("*")],
+ or_(
+ table1.c.myid == 12,
+ table1.c.myid == "asdf",
+ table1.c.myid == "foo",
+ ),
+ ),
+ "SELECT * FROM mytable WHERE mytable.myid = :myid_1 "
+ "OR mytable.myid = :myid_2 OR mytable.myid = :myid_3",
+ )
+
+ def test_render_binds_as_literal(self):
+ """test a compiler that renders binds inline into
+ SQL in the columns clause."""
+
+ dialect = default.DefaultDialect()
+
+ class Compiler(dialect.statement_compiler):
+ ansi_bind_rules = True
+
+ dialect.statement_compiler = Compiler
+
+ self.assert_compile(
+ select([literal("someliteral")]),
+ "SELECT 'someliteral' AS anon_1",
+ dialect=dialect,
+ )
+
+ self.assert_compile(
+ select([table1.c.myid + 3]),
+ "SELECT mytable.myid + 3 AS anon_1 FROM mytable",
+ dialect=dialect,
+ )
+
+ self.assert_compile(
+ select([table1.c.myid.in_([4, 5, 6])]),
+ "SELECT mytable.myid IN (4, 5, 6) AS anon_1 FROM mytable",
+ dialect=dialect,
+ )
+
+ self.assert_compile(
+ select([func.mod(table1.c.myid, 5)]),
+ "SELECT mod(mytable.myid, 5) AS mod_1 FROM mytable",
+ dialect=dialect,
+ )
+
+ self.assert_compile(
+ select([literal("foo").in_([])]),
+ "SELECT 1 != 1 AS anon_1",
+ dialect=dialect,
+ )
+
+ self.assert_compile(
+ select([literal(util.b("foo"))]),
+ "SELECT 'foo' AS anon_1",
+ dialect=dialect,
+ )
+
+ # test callable
+ self.assert_compile(
+ select([table1.c.myid == bindparam("foo", callable_=lambda: 5)]),
+ "SELECT mytable.myid = 5 AS anon_1 FROM mytable",
+ dialect=dialect,
+ )
+
+ empty_in_dialect = default.DefaultDialect(empty_in_strategy="dynamic")
+ empty_in_dialect.statement_compiler = Compiler
+
+ assert_raises_message(
+ exc.CompileError,
+ "Bind parameter 'foo' without a "
+ "renderable value not allowed here.",
+ bindparam("foo").in_([]).compile,
+ dialect=empty_in_dialect,
+ )
+
+ def test_render_literal_execute_parameter(self):
+ self.assert_compile(
+ select([table1.c.myid]).where(
+ table1.c.myid == bindparam("foo", 5, literal_execute=True)
+ ),
+ "SELECT mytable.myid FROM mytable "
+ "WHERE mytable.myid = [POSTCOMPILE_foo]",
+ )
+
+ def test_render_literal_execute_parameter_literal_binds(self):
+ self.assert_compile(
+ select([table1.c.myid]).where(
+ table1.c.myid == bindparam("foo", 5, literal_execute=True)
+ ),
+ "SELECT mytable.myid FROM mytable " "WHERE mytable.myid = 5",
+ literal_binds=True,
+ )
+
+ def test_render_expanding_parameter(self):
+ self.assert_compile(
+ select([table1.c.myid]).where(
+ table1.c.myid.in_(bindparam("foo", expanding=True))
+ ),
+ "SELECT mytable.myid FROM mytable "
+ "WHERE mytable.myid IN ([POSTCOMPILE_foo])",
+ )
+
+ def test_render_expanding_parameter_literal_binds(self):
+ self.assert_compile(
+ select([table1.c.myid]).where(
+ table1.c.myid.in_(bindparam("foo", [1, 2, 3], expanding=True))
+ ),
+ "SELECT mytable.myid FROM mytable "
+ "WHERE mytable.myid IN (1, 2, 3)",
+ literal_binds=True,
+ )
+
+
class UnsupportedTest(fixtures.TestBase):
def test_unsupported_element_str_visit_name(self):
from sqlalchemy.sql.expression import ClauseElement