from sqlalchemy import and_ from sqlalchemy import case from sqlalchemy import cast from sqlalchemy import Column from sqlalchemy import exc from sqlalchemy import Integer from sqlalchemy import literal_column from sqlalchemy import MetaData from sqlalchemy import select from sqlalchemy import String from sqlalchemy import Table from sqlalchemy import testing from sqlalchemy import text from sqlalchemy.sql import column from sqlalchemy.sql import table from sqlalchemy.testing import assert_raises_message from sqlalchemy.testing import AssertsCompiledSQL from sqlalchemy.testing import eq_ from sqlalchemy.testing import fixtures info_table = None class CaseTest(fixtures.TestBase, AssertsCompiledSQL): __dialect__ = "default" @classmethod def setup_class(cls): metadata = MetaData(testing.db) global info_table info_table = Table( "infos", metadata, Column("pk", Integer, primary_key=True), Column("info", String(30)), ) info_table.create() info_table.insert().execute( {"pk": 1, "info": "pk_1_data"}, {"pk": 2, "info": "pk_2_data"}, {"pk": 3, "info": "pk_3_data"}, {"pk": 4, "info": "pk_4_data"}, {"pk": 5, "info": "pk_5_data"}, {"pk": 6, "info": "pk_6_data"}, ) @classmethod def teardown_class(cls): info_table.drop() @testing.fails_on("firebird", "FIXME: unknown") @testing.requires.subqueries def test_case(self): inner = select( [ case( [ [info_table.c.pk < 3, "lessthan3"], [ and_(info_table.c.pk >= 3, info_table.c.pk < 7), "gt3", ], ] ).label("x"), info_table.c.pk, info_table.c.info, ], from_obj=[info_table], ) inner_result = inner.execute().fetchall() # Outputs: # lessthan3 1 pk_1_data # lessthan3 2 pk_2_data # gt3 3 pk_3_data # gt3 4 pk_4_data # gt3 5 pk_5_data # gt3 6 pk_6_data assert inner_result == [ ("lessthan3", 1, "pk_1_data"), ("lessthan3", 2, "pk_2_data"), ("gt3", 3, "pk_3_data"), ("gt3", 4, "pk_4_data"), ("gt3", 5, "pk_5_data"), ("gt3", 6, "pk_6_data"), ] outer = select([inner.alias("q_inner")]) outer_result = outer.execute().fetchall() assert outer_result == [ ("lessthan3", 1, "pk_1_data"), ("lessthan3", 2, "pk_2_data"), ("gt3", 3, "pk_3_data"), ("gt3", 4, "pk_4_data"), ("gt3", 5, "pk_5_data"), ("gt3", 6, "pk_6_data"), ] w_else = select( [ case( [ [info_table.c.pk < 3, cast(3, Integer)], [and_(info_table.c.pk >= 3, info_table.c.pk < 6), 6], ], else_=0, ).label("x"), info_table.c.pk, info_table.c.info, ], from_obj=[info_table], ) else_result = w_else.execute().fetchall() assert else_result == [ (3, 1, "pk_1_data"), (3, 2, "pk_2_data"), (6, 3, "pk_3_data"), (6, 4, "pk_4_data"), (6, 5, "pk_5_data"), (0, 6, "pk_6_data"), ] def test_literal_interpretation_ambiguous(self): assert_raises_message( exc.ArgumentError, r"Ambiguous literal: 'x'. Use the 'text\(\)' function", case, [("x", "y")], ) def test_literal_interpretation_ambiguous_tuple(self): assert_raises_message( exc.ArgumentError, r"Ambiguous literal: \('x', 'y'\). Use the 'text\(\)' function", case, [(("x", "y"), "z")], ) def test_literal_interpretation(self): t = table("test", column("col1")) self.assert_compile( case([("x", "y")], value=t.c.col1), "CASE test.col1 WHEN :param_1 THEN :param_2 END", ) self.assert_compile( case([(t.c.col1 == 7, "y")], else_="z"), "CASE WHEN (test.col1 = :col1_1) THEN :param_1 ELSE :param_2 END", ) def test_text_doesnt_explode(self): for s in [ select( [ case( [(info_table.c.info == "pk_4_data", text("'yes'"))], else_=text("'no'"), ) ] ).order_by(info_table.c.info), select( [ case( [ ( info_table.c.info == "pk_4_data", literal_column("'yes'"), ) ], else_=literal_column("'no'"), ) ] ).order_by(info_table.c.info), ]: if testing.against("firebird"): eq_( s.execute().fetchall(), [ ("no ",), ("no ",), ("no ",), ("yes",), ("no ",), ("no ",), ], ) else: eq_( s.execute().fetchall(), [("no",), ("no",), ("no",), ("yes",), ("no",), ("no",)], ) @testing.fails_on("firebird", "FIXME: unknown") def testcase_with_dict(self): query = select( [ case( { info_table.c.pk < 3: "lessthan3", info_table.c.pk >= 3: "gt3", }, else_="other", ), info_table.c.pk, info_table.c.info, ], from_obj=[info_table], ) assert query.execute().fetchall() == [ ("lessthan3", 1, "pk_1_data"), ("lessthan3", 2, "pk_2_data"), ("gt3", 3, "pk_3_data"), ("gt3", 4, "pk_4_data"), ("gt3", 5, "pk_5_data"), ("gt3", 6, "pk_6_data"), ] simple_query = select( [ case( {1: "one", 2: "two"}, value=info_table.c.pk, else_="other" ), info_table.c.pk, ], whereclause=info_table.c.pk < 4, from_obj=[info_table], ) assert simple_query.execute().fetchall() == [ ("one", 1), ("two", 2), ("other", 3), ]