diff options
Diffstat (limited to 'test/dialect/test_postgresql.py')
-rw-r--r-- | test/dialect/test_postgresql.py | 128 |
1 files changed, 128 insertions, 0 deletions
diff --git a/test/dialect/test_postgresql.py b/test/dialect/test_postgresql.py index 33753b48f..25b6bcb54 100644 --- a/test/dialect/test_postgresql.py +++ b/test/dialect/test_postgresql.py @@ -2745,6 +2745,61 @@ class HStoreTest(fixtures.TestBase): ) % expected ) + def test_bind_serialize_default(self): + from sqlalchemy.engine import default + + dialect = default.DefaultDialect() + proc = self.test_table.c.hash.type._cached_bind_processor(dialect) + eq_( + proc({"key1": "value1", "key2": "value2"}), + '"key2"=>"value2", "key1"=>"value1"' + ) + + def test_result_deserialize_default(self): + from sqlalchemy.engine import default + + dialect = default.DefaultDialect() + proc = self.test_table.c.hash.type._cached_result_processor( + dialect, None) + eq_( + proc('"key2"=>"value2", "key1"=>"value1"'), + {"key1": "value1", "key2": "value2"} + ) + + def test_bind_serialize_psycopg2(self): + from sqlalchemy.dialects.postgresql import psycopg2 + + dialect = psycopg2.PGDialect_psycopg2() + dialect._has_native_hstore = True + proc = self.test_table.c.hash.type._cached_bind_processor(dialect) + is_(proc, None) + + dialect = psycopg2.PGDialect_psycopg2() + dialect._has_native_hstore = False + proc = self.test_table.c.hash.type._cached_bind_processor(dialect) + eq_( + proc({"key1": "value1", "key2": "value2"}), + '"key2"=>"value2", "key1"=>"value1"' + ) + + def test_result_deserialize_psycopg2(self): + from sqlalchemy.dialects.postgresql import psycopg2 + + dialect = psycopg2.PGDialect_psycopg2() + dialect._has_native_hstore = True + proc = self.test_table.c.hash.type._cached_result_processor( + dialect, None) + is_(proc, None) + + dialect = psycopg2.PGDialect_psycopg2() + dialect._has_native_hstore = False + proc = self.test_table.c.hash.type._cached_result_processor( + dialect, None) + eq_( + proc('"key2"=>"value2", "key1"=>"value1"'), + {"key1": "value1", "key2": "value2"} + ) + def test_where_has_key(self): self._test_where( self.hashcol.has_key('foo'), @@ -2897,3 +2952,76 @@ class HStoreTest(fixtures.TestBase): "hstore_to_matrix(test_table.hash) AS hstore_to_matrix_1", True ) + +class HStoreRoundTripTest(fixtures.TablesTest): + #__only_on__ = 'postgresql' + __requires__ = 'hstore', + __dialect__ = postgresql.dialect() + + @classmethod + def define_tables(cls, metadata): + Table('data_table', metadata, + Column('id', Integer, primary_key=True), + Column('name', String(30), nullable=False), + Column('data', HSTORE) + ) + + def _fixture_data(self, engine): + data_table = self.tables.data_table + engine.execute( + data_table.insert(), + {'name': 'r1', 'data': {"k1": "r1v1", "k2": "r1v2"}}, + {'name': 'r2', 'data': {"k1": "r2v1", "k2": "r2v2"}}, + {'name': 'r3', 'data': {"k1": "r3v1", "k2": "r3v2"}}, + {'name': 'r4', 'data': {"k1": "r4v1", "k2": "r4v2"}}, + {'name': 'r5', 'data': {"k1": "r5v1", "k2": "r5v2"}}, + ) + + def _assert_data(self, compare): + data = testing.db.execute( + select([self.tables.data_table.c.data]). + order_by(self.tables.data_table.c.name) + ).fetchall() + eq_([d for d, in data], compare) + + def _test_insert(self, engine): + engine.execute( + self.tables.data_table.insert(), + {'name': 'r1', 'data': {"k1": "r1v1", "k2": "r1v2"}} + ) + self._assert_data([{"k1": "r1v1", "k2": "r1v2"}]) + + def _non_native_engine(self): + if testing.against("postgresql+psycopg2"): + engine = engines.testing_engine(options=dict(use_native_hstore=False)) + else: + engine = testing.db + engine.connect() + return engine + + @testing.only_on("postgresql+psycopg2") + def test_insert_native(self): + engine = testing.db + self._test_insert(engine) + + def test_insert_python(self): + engine = self._non_native_engine() + self._test_insert(engine) + + @testing.only_on("postgresql+psycopg2") + def test_criterion_native(self): + engine = testing.db + self._fixture_data(engine) + self._test_criterion(engine) + + def test_criterion_python(self): + engine = self._non_native_engine() + self._fixture_data(engine) + self._test_criterion(engine) + + def _test_criterion(self, engine): + data_table = self.tables.data_table + result = engine.execute( + select([data_table.c.data]).where(data_table.c.data['k1'] == 'r3v1') + ).first() + eq_(result, ({'k1': 'r3v1', 'k2': 'r3v2'},)) |