summaryrefslogtreecommitdiff
path: root/test/dialect/test_postgresql.py
diff options
context:
space:
mode:
authormike bayer <mike_mp@zzzcomputing.com>2013-06-22 07:47:02 -0700
committermike bayer <mike_mp@zzzcomputing.com>2013-06-22 07:47:02 -0700
commit29fa6913be46c4e4c95b2b2810baea24c4b211dd (patch)
tree858b755e10ec1dd30235c9f96925f56fa4361544 /test/dialect/test_postgresql.py
parent8c555f24b197832b9944f25d47d5989aa942bdea (diff)
parentb2da12e070e9d83bea5284dae11b8e6d4d509818 (diff)
downloadsqlalchemy-29fa6913be46c4e4c95b2b2810baea24c4b211dd.tar.gz
Merge pull request #5 from cjw296/pg-ranges
Support for Postgres range types.
Diffstat (limited to 'test/dialect/test_postgresql.py')
-rw-r--r--test/dialect/test_postgresql.py330
1 files changed, 329 insertions, 1 deletions
diff --git a/test/dialect/test_postgresql.py b/test/dialect/test_postgresql.py
index d1ba960c4..88554a34d 100644
--- a/test/dialect/test_postgresql.py
+++ b/test/dialect/test_postgresql.py
@@ -17,7 +17,9 @@ from sqlalchemy import Table, Column, select, MetaData, text, Integer, \
from sqlalchemy.orm import Session, mapper, aliased
from sqlalchemy import exc, schema, types
from sqlalchemy.dialects.postgresql import base as postgresql
-from sqlalchemy.dialects.postgresql import HSTORE, hstore, array
+from sqlalchemy.dialects.postgresql import HSTORE, hstore, array, \
+ INT4RANGE, INT8RANGE, NUMRANGE, DATERANGE, TSRANGE, TSTZRANGE, \
+ ExcludeConstraint
import decimal
from sqlalchemy import util
from sqlalchemy.testing.util import round_decimal
@@ -182,6 +184,53 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
'USING hash (data)',
dialect=postgresql.dialect())
+ def test_exclude_constraint_min(self):
+ m = MetaData()
+ tbl = Table('testtbl', m,
+ Column('room', Integer, primary_key=True))
+ cons = ExcludeConstraint(('room', '='))
+ tbl.append_constraint(cons)
+ self.assert_compile(schema.AddConstraint(cons),
+ 'ALTER TABLE testtbl ADD EXCLUDE USING gist '
+ '(room WITH =)',
+ dialect=postgresql.dialect())
+
+ def test_exclude_constraint_full(self):
+ m = MetaData()
+ room = Column('room', Integer, primary_key=True)
+ tbl = Table('testtbl', m,
+ room,
+ Column('during', TSRANGE))
+ room = Column('room', Integer, primary_key=True)
+ cons = ExcludeConstraint((room, '='), ('during', '&&'),
+ name='my_name',
+ using='gist',
+ where="room > 100",
+ deferrable=True,
+ initially='immediate')
+ tbl.append_constraint(cons)
+ self.assert_compile(schema.AddConstraint(cons),
+ 'ALTER TABLE testtbl ADD CONSTRAINT my_name '
+ 'EXCLUDE USING gist '
+ '(room WITH =, during WITH ''&&) WHERE '
+ '(room > 100) DEFERRABLE INITIALLY immediate',
+ dialect=postgresql.dialect())
+
+ def test_exclude_constraint_copy(self):
+ m = MetaData()
+ cons = ExcludeConstraint(('room', '='))
+ tbl = Table('testtbl', m,
+ Column('room', Integer, primary_key=True),
+ cons)
+ # apparently you can't copy a ColumnCollectionConstraint until
+ # after it has been bound to a table...
+ cons_copy = cons.copy()
+ tbl.append_constraint(cons_copy)
+ self.assert_compile(schema.AddConstraint(cons_copy),
+ 'ALTER TABLE testtbl ADD EXCLUDE USING gist '
+ '(room WITH =)',
+ dialect=postgresql.dialect())
+
def test_substring(self):
self.assert_compile(func.substring('abc', 1, 2),
'SUBSTRING(%(substring_1)s FROM %(substring_2)s '
@@ -3242,3 +3291,282 @@ class HStoreRoundTripTest(fixtures.TablesTest):
def test_unicode_round_trip_native(self):
engine = testing.db
self._test_unicode_round_trip(engine)
+
+class _RangeTypeMixin(object):
+ __requires__ = 'range_types',
+ __dialect__ = 'postgresql+psycopg2'
+
+ @property
+ def extras(self):
+ # done this way so we don't get ImportErrors with
+ # older psycopg2 versions.
+ from psycopg2 import extras
+ return extras
+
+ @classmethod
+ def define_tables(cls, metadata):
+ # no reason ranges shouldn't be primary keys,
+ # so lets just use them as such
+ table = Table('data_table', metadata,
+ Column('range', cls._col_type, primary_key=True),
+ )
+ cls.col = table.c.range
+
+ def test_actual_type(self):
+ eq_(str(self._col_type()), self._col_str)
+
+ def test_reflect(self):
+ from sqlalchemy import inspect
+ insp = inspect(testing.db)
+ cols = insp.get_columns('data_table')
+ assert isinstance(cols[0]['type'], self._col_type)
+
+ def _assert_data(self):
+ data = testing.db.execute(
+ select([self.tables.data_table.c.range])
+ ).fetchall()
+ eq_(data, [(self._data_obj(), )])
+
+ def test_insert_obj(self):
+ testing.db.engine.execute(
+ self.tables.data_table.insert(),
+ {'range': self._data_obj()}
+ )
+ self._assert_data()
+
+ def test_insert_text(self):
+ testing.db.engine.execute(
+ self.tables.data_table.insert(),
+ {'range': self._data_str}
+ )
+ self._assert_data()
+
+ # operator tests
+
+ def _test_clause(self, colclause, expected):
+ dialect = postgresql.dialect()
+ compiled = str(colclause.compile(dialect=dialect))
+ eq_(compiled, expected)
+
+ def test_where_equal(self):
+ self._test_clause(
+ self.col==self._data_str,
+ "data_table.range = %(range_1)s"
+ )
+
+ def test_where_not_equal(self):
+ self._test_clause(
+ self.col!=self._data_str,
+ "data_table.range <> %(range_1)s"
+ )
+
+ def test_where_less_than(self):
+ self._test_clause(
+ self.col < self._data_str,
+ "data_table.range < %(range_1)s"
+ )
+
+ def test_where_greater_than(self):
+ self._test_clause(
+ self.col > self._data_str,
+ "data_table.range > %(range_1)s"
+ )
+
+ def test_where_less_than_or_equal(self):
+ self._test_clause(
+ self.col <= self._data_str,
+ "data_table.range <= %(range_1)s"
+ )
+
+ def test_where_greater_than_or_equal(self):
+ self._test_clause(
+ self.col >= self._data_str,
+ "data_table.range >= %(range_1)s"
+ )
+
+ def test_contains(self):
+ self._test_clause(
+ self.col.contains(self._data_str),
+ "data_table.range @> %(range_1)s"
+ )
+
+ def test_contained_by(self):
+ self._test_clause(
+ self.col.contained_by(self._data_str),
+ "data_table.range <@ %(range_1)s"
+ )
+
+ def test_overlaps(self):
+ self._test_clause(
+ self.col.overlaps(self._data_str),
+ "data_table.range && %(range_1)s"
+ )
+
+ def test_strictly_left_of(self):
+ self._test_clause(
+ self.col << self._data_str,
+ "data_table.range << %(range_1)s"
+ )
+ self._test_clause(
+ self.col.strictly_left_of(self._data_str),
+ "data_table.range << %(range_1)s"
+ )
+
+ def test_strictly_right_of(self):
+ self._test_clause(
+ self.col >> self._data_str,
+ "data_table.range >> %(range_1)s"
+ )
+ self._test_clause(
+ self.col.strictly_right_of(self._data_str),
+ "data_table.range >> %(range_1)s"
+ )
+
+ def test_not_extend_right_of(self):
+ self._test_clause(
+ self.col.not_extend_right_of(self._data_str),
+ "data_table.range &< %(range_1)s"
+ )
+
+ def test_not_extend_left_of(self):
+ self._test_clause(
+ self.col.not_extend_left_of(self._data_str),
+ "data_table.range &> %(range_1)s"
+ )
+
+ def test_adjacent_to(self):
+ self._test_clause(
+ self.col.adjacent_to(self._data_str),
+ "data_table.range -|- %(range_1)s"
+ )
+
+ def test_union(self):
+ self._test_clause(
+ self.col + self.col,
+ "data_table.range + data_table.range"
+ )
+
+ def test_union_result(self):
+ # insert
+ testing.db.engine.execute(
+ self.tables.data_table.insert(),
+ {'range': self._data_str}
+ )
+ # select
+ range = self.tables.data_table.c.range
+ data = testing.db.execute(
+ select([range + range])
+ ).fetchall()
+ eq_(data, [(self._data_obj(), )])
+
+
+ def test_intersection(self):
+ self._test_clause(
+ self.col * self.col,
+ "data_table.range * data_table.range"
+ )
+
+ def test_intersection_result(self):
+ # insert
+ testing.db.engine.execute(
+ self.tables.data_table.insert(),
+ {'range': self._data_str}
+ )
+ # select
+ range = self.tables.data_table.c.range
+ data = testing.db.execute(
+ select([range * range])
+ ).fetchall()
+ eq_(data, [(self._data_obj(), )])
+
+ def test_different(self):
+ self._test_clause(
+ self.col - self.col,
+ "data_table.range - data_table.range"
+ )
+
+ def test_difference_result(self):
+ # insert
+ testing.db.engine.execute(
+ self.tables.data_table.insert(),
+ {'range': self._data_str}
+ )
+ # select
+ range = self.tables.data_table.c.range
+ data = testing.db.execute(
+ select([range - range])
+ ).fetchall()
+ eq_(data, [(self._data_obj().__class__(empty=True), )])
+
+class Int4RangeTests(_RangeTypeMixin, fixtures.TablesTest):
+
+ _col_type = INT4RANGE
+ _col_str = 'INT4RANGE'
+ _data_str = '[1,2)'
+ def _data_obj(self):
+ return self.extras.NumericRange(1, 2)
+
+class Int8RangeTests(_RangeTypeMixin, fixtures.TablesTest):
+
+ _col_type = INT8RANGE
+ _col_str = 'INT8RANGE'
+ _data_str = '[9223372036854775806,9223372036854775807)'
+ def _data_obj(self):
+ return self.extras.NumericRange(
+ 9223372036854775806, 9223372036854775807
+ )
+
+class NumRangeTests(_RangeTypeMixin, fixtures.TablesTest):
+
+ _col_type = NUMRANGE
+ _col_str = 'NUMRANGE'
+ _data_str = '[1.0,2.0)'
+ def _data_obj(self):
+ return self.extras.NumericRange(
+ decimal.Decimal('1.0'), decimal.Decimal('2.0')
+ )
+
+class DateRangeTests(_RangeTypeMixin, fixtures.TablesTest):
+
+ _col_type = DATERANGE
+ _col_str = 'DATERANGE'
+ _data_str = '[2013-03-23,2013-03-24)'
+ def _data_obj(self):
+ return self.extras.DateRange(
+ datetime.date(2013, 3, 23), datetime.date(2013, 3, 24)
+ )
+
+class DateTimeRangeTests(_RangeTypeMixin, fixtures.TablesTest):
+
+ _col_type = TSRANGE
+ _col_str = 'TSRANGE'
+ _data_str = '[2013-03-23 14:30,2013-03-23 23:30)'
+ def _data_obj(self):
+ return self.extras.DateTimeRange(
+ datetime.datetime(2013, 3, 23, 14, 30),
+ datetime.datetime(2013, 3, 23, 23, 30)
+ )
+
+class DateTimeTZRangeTests(_RangeTypeMixin, fixtures.TablesTest):
+
+ _col_type = TSTZRANGE
+ _col_str = 'TSTZRANGE'
+
+ # make sure we use one, steady timestamp with timezone pair
+ # for all parts of all these tests
+ _tstzs = None
+ def tstzs(self):
+ if self._tstzs is None:
+ lower = testing.db.connect().scalar(
+ func.current_timestamp().select()
+ )
+ upper = lower+datetime.timedelta(1)
+ self._tstzs = (lower, upper)
+ return self._tstzs
+
+ @property
+ def _data_str(self):
+ return '[%s,%s)' % self.tstzs()
+
+ def _data_obj(self):
+ return self.extras.DateTimeTZRange(*self.tstzs())