diff options
author | Mike Bayer <mike_mp@zzzcomputing.com> | 2012-08-17 19:09:31 -0400 |
---|---|---|
committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2012-08-17 19:09:31 -0400 |
commit | 54824f50f84b6a2c4ced67f8445067904e3119bd (patch) | |
tree | 52a10287306613c4d61ac0ee3ea6b1d754ddaa72 /examples/postgis/postgis.py | |
parent | a2468c8a31c8308cdb5740f2401e9dedd003836e (diff) | |
download | sqlalchemy-54824f50f84b6a2c4ced67f8445067904e3119bd.tar.gz |
- really start making postgis example slick.
Diffstat (limited to 'examples/postgis/postgis.py')
-rw-r--r-- | examples/postgis/postgis.py | 122 |
1 files changed, 62 insertions, 60 deletions
diff --git a/examples/postgis/postgis.py b/examples/postgis/postgis.py index 77fcacba1..ef9b77c81 100644 --- a/examples/postgis/postgis.py +++ b/examples/postgis/postgis.py @@ -1,20 +1,13 @@ -from sqlalchemy.types import UserDefinedType -from sqlalchemy.sql import expression +from sqlalchemy.types import UserDefinedType, _Binary, TypeDecorator +from sqlalchemy.sql import expression, type_coerce from sqlalchemy import event, Table +import binascii # Python datatypes class GisElement(object): """Represents a geometry value.""" - @property - def wkt(self): - return func.ST_AsText(literal(self, Geometry)) - - @property - def wkb(self): - return func.ST_AsBinary(literal(self, Geometry)) - def __str__(self): return self.desc @@ -22,40 +15,43 @@ class GisElement(object): return "<%s at 0x%x; %r>" % (self.__class__.__name__, id(self), self.desc) -class PersistentGisElement(GisElement): - """Represents a Geometry value as loaded from the database.""" +class BinaryGisElement(GisElement, expression.Function): + """Represents a Geometry value expressed as binary.""" - def __init__(self, desc): - self.desc = desc + def __init__(self, data): + self.data = data + expression.Function.__init__(self, "ST_GeomFromEWKB", data, + type_=Geometry(coerce_="binary")) -class TextualGisElement(GisElement, expression.Function): - """Represents a Geometry value as expressed within application code; - i.e. in wkt format. + @property + def desc(self): + return self.as_hex - Extends expression.Function so that the value is interpreted as - GeomFromText(value) in a SQL expression context. + @property + def as_hex(self): + return binascii.hexlify(self.data) - """ +class TextualGisElement(GisElement, expression.Function): + """Represents a Geometry value expressed as text.""" def __init__(self, desc, srid=-1): self.desc = desc - expression.Function.__init__(self, "ST_GeomFromText", desc, srid) + expression.Function.__init__(self, "ST_GeomFromText", desc, srid, + type_=Geometry) # SQL datatypes. class Geometry(UserDefinedType): - """Base PostGIS Geometry column type. - - Converts bind/result values to/from a PersistentGisElement. - - """ + """Base PostGIS Geometry column type.""" name = "GEOMETRY" - def __init__(self, dimension=None, srid=-1): + def __init__(self, dimension=None, srid=-1, + coerce_="text"): self.dimension = dimension self.srid = srid + self.coerce = coerce_ class comparator_factory(UserDefinedType.Comparator): """Define custom operations for geometry types.""" @@ -78,10 +74,20 @@ class Geometry(UserDefinedType): return self.name def bind_expression(self, bindvalue): - return TextualGisElement(bindvalue) + if self.coerce == "text": + return TextualGisElement(bindvalue) + elif self.coerce == "binary": + return BinaryGisElement(bindvalue) + else: + assert False def column_expression(self, col): - return func.ST_AsText(col, type_=self) + if self.coerce == "text": + return func.ST_AsText(col, type_=self) + elif self.coerce == "binary": + return func.ST_AsBinary(col, type_=self) + else: + assert False def bind_processor(self, dialect): def process(value): @@ -92,13 +98,23 @@ class Geometry(UserDefinedType): return process def result_processor(self, dialect, coltype): + if self.coerce == "text": + fac = TextualGisElement + elif self.coerce == "binary": + fac = BinaryGisElement + else: + assert False def process(value): if value is not None: - return PersistentGisElement(value) + return fac(value) else: return value return process + def adapt(self, impltype): + return impltype(dimension=self.dimension, + srid=self.srid, coerce_=self.coerce) + # other datatypes can be added as needed. class Point(Geometry): @@ -173,33 +189,12 @@ def setup_ddl_events(): table.columns = table.info.pop('_saved_columns') setup_ddl_events() -def _to_postgis(value): - """Interpret a value as a GIS-compatible construct. - - - TODO. I'd like to make this unnecessary also, - and see if the Geometry type can do the coersion. - This would require [ticket:1534]. - - """ - - if hasattr(value, '__clause_element__'): - return value.__clause_element__() - elif isinstance(value, (expression.ClauseElement, GisElement)): - return value - elif isinstance(value, basestring): - return TextualGisElement(value) - elif value is None: - return None - else: - raise Exception("Invalid type") - # illustrate usage if __name__ == '__main__': from sqlalchemy import (create_engine, MetaData, Column, Integer, String, - func, literal, select) + func, select) from sqlalchemy.orm import sessionmaker from sqlalchemy.ext.declarative import declarative_base @@ -235,31 +230,38 @@ if __name__ == '__main__': # pre flush, the TextualGisElement represents the string we sent. assert str(r.road_geom) == 'LINESTRING(198231 263418,198213 268322)' - assert session.scalar(r.road_geom.wkt) == 'LINESTRING(198231 263418,198213 268322)' session.commit() # after flush and/or commit, all the TextualGisElements become PersistentGisElements. assert str(r.road_geom) == "LINESTRING(198231 263418,198213 268322)" - r1 = session.query(Road).filter(Road.road_name=='Graeme Ave').one() + r1 = session.query(Road).filter(Road.road_name == 'Graeme Ave').one() # illustrate the overridden __eq__() operator. # strings come in as TextualGisElements r2 = session.query(Road).filter(Road.road_geom == 'LINESTRING(189412 252431,189631 259122)').one() - # PersistentGisElements work directly r3 = session.query(Road).filter(Road.road_geom == r1.road_geom).one() assert r1 is r2 is r3 - # illustrate the "intersects" operator - print session.query(Road).filter(Road.road_geom.intersects(r1.road_geom)).all() + # core usage just fine: + + road_table = Road.__table__ + stmt = select([road_table]).where(road_table.c.road_geom.intersects(r1.road_geom)) + print session.execute(stmt).fetchall() + + # look up the hex binary version, using SQLAlchemy casts + as_binary = session.scalar(select([type_coerce(r.road_geom, Geometry(coerce_="binary"))])) + assert as_binary.as_hex == \ + '01020000000200000000000000b832084100000000e813104100000000283208410000000088601041' + + # back again, same method ! + as_text = session.scalar(select([type_coerce(as_binary, Geometry(coerce_="text"))])) + assert as_text.desc == "LINESTRING(198231 263418,198213 268322)" - # illustrate usage of the "wkt" accessor. this requires a DB - # execution to call the AsText() function so we keep this explicit. - assert session.scalar(r1.road_geom.wkt) == 'LINESTRING(189412 252431,189631 259122)' session.rollback() |