diff options
Diffstat (limited to 'examples/postgis/postgis.py')
-rw-r--r-- | examples/postgis/postgis.py | 186 |
1 files changed, 128 insertions, 58 deletions
diff --git a/examples/postgis/postgis.py b/examples/postgis/postgis.py index ffea3d018..508d63398 100644 --- a/examples/postgis/postgis.py +++ b/examples/postgis/postgis.py @@ -5,6 +5,7 @@ import binascii # Python datatypes + class GisElement(object): """Represents a geometry value.""" @@ -12,16 +13,21 @@ class GisElement(object): return self.desc def __repr__(self): - return "<%s at 0x%x; %r>" % (self.__class__.__name__, - id(self), self.desc) + return "<%s at 0x%x; %r>" % ( + self.__class__.__name__, + id(self), + self.desc, + ) + class BinaryGisElement(GisElement, expression.Function): """Represents a Geometry value expressed as binary.""" def __init__(self, data): self.data = data - expression.Function.__init__(self, "ST_GeomFromEWKB", data, - type_=Geometry(coerce_="binary")) + expression.Function.__init__( + self, "ST_GeomFromEWKB", data, type_=Geometry(coerce_="binary") + ) @property def desc(self): @@ -31,24 +37,26 @@ class BinaryGisElement(GisElement, expression.Function): def as_hex(self): return binascii.hexlify(self.data) + class TextualGisElement(GisElement, expression.Function): """Represents a Geometry value expressed as text.""" def __init__(self, desc, srid=-1): self.desc = desc - expression.Function.__init__(self, "ST_GeomFromText", desc, srid, - type_=Geometry) + expression.Function.__init__( + self, "ST_GeomFromText", desc, srid, type_=Geometry + ) # SQL datatypes. + class Geometry(UserDefinedType): """Base PostGIS Geometry column type.""" name = "GEOMETRY" - def __init__(self, dimension=None, srid=-1, - coerce_="text"): + def __init__(self, dimension=None, srid=-1, coerce_="text"): self.dimension = dimension self.srid = srid self.coerce = coerce_ @@ -58,11 +66,11 @@ class Geometry(UserDefinedType): # override the __eq__() operator def __eq__(self, other): - return self.op('~=')(other) + return self.op("~=")(other) # add a custom operator def intersects(self, other): - return self.op('&&')(other) + return self.op("&&")(other) # any number of GIS operators can be overridden/added here # using the techniques above. @@ -95,6 +103,7 @@ class Geometry(UserDefinedType): return value.desc else: return value + return process def result_processor(self, dialect, coltype): @@ -104,27 +113,35 @@ class Geometry(UserDefinedType): fac = BinaryGisElement else: assert False + def process(value): if value is not None: return fac(value) else: return value + return process def adapt(self, impltype): - return impltype(dimension=self.dimension, - srid=self.srid, coerce_=self.coerce) + return impltype( + dimension=self.dimension, srid=self.srid, coerce_=self.coerce + ) + # other datatypes can be added as needed. + class Point(Geometry): - name = 'POINT' + name = "POINT" + class Curve(Geometry): - name = 'CURVE' + name = "CURVE" + class LineString(Curve): - name = 'LINESTRING' + name = "LINESTRING" + # ... etc. @@ -135,6 +152,7 @@ class LineString(Curve): # versions don't appear to require these special steps anymore. However, # here we illustrate how to set up these features in any case. + def setup_ddl_events(): @event.listens_for(Table, "before_create") def before_create(target, connection, **kw): @@ -153,9 +171,10 @@ def setup_ddl_events(): dispatch("after-drop", target, connection) def dispatch(event, table, bind): - if event in ('before-create', 'before-drop'): - regular_cols = [c for c in table.c if not - isinstance(c.type, Geometry)] + if event in ("before-create", "before-drop"): + regular_cols = [ + c for c in table.c if not isinstance(c.type, Geometry) + ] gis_cols = set(table.c).difference(regular_cols) table.info["_saved_columns"] = table.c @@ -163,85 +182,129 @@ def setup_ddl_events(): # Geometry columns table.columns = expression.ColumnCollection(*regular_cols) - if event == 'before-drop': + if event == "before-drop": for c in gis_cols: bind.execute( - select([ + select( + [ func.DropGeometryColumn( - 'public', table.name, c.name)], - autocommit=True) - ) + "public", table.name, c.name + ) + ], + autocommit=True, + ) + ) - elif event == 'after-create': - table.columns = table.info.pop('_saved_columns') + elif event == "after-create": + table.columns = table.info.pop("_saved_columns") for c in table.c: if isinstance(c.type, Geometry): bind.execute( - select([ - func.AddGeometryColumn( - table.name, c.name, - c.type.srid, - c.type.name, - c.type.dimension)], - autocommit=True) + select( + [ + func.AddGeometryColumn( + table.name, + c.name, + c.type.srid, + c.type.name, + c.type.dimension, + ) + ], + autocommit=True, ) - elif event == 'after-drop': - table.columns = table.info.pop('_saved_columns') -setup_ddl_events() + ) + elif event == "after-drop": + table.columns = table.info.pop("_saved_columns") +setup_ddl_events() + # illustrate usage -if __name__ == '__main__': - from sqlalchemy import (create_engine, MetaData, Column, Integer, String, - func, select) +if __name__ == "__main__": + from sqlalchemy import ( + create_engine, + MetaData, + Column, + Integer, + String, + func, + select, + ) from sqlalchemy.orm import sessionmaker from sqlalchemy.ext.declarative import declarative_base - engine = create_engine('postgresql://scott:tiger@localhost/test', echo=True) + engine = create_engine( + "postgresql://scott:tiger@localhost/test", echo=True + ) metadata = MetaData(engine) Base = declarative_base(metadata=metadata) class Road(Base): - __tablename__ = 'roads' + __tablename__ = "roads" road_id = Column(Integer, primary_key=True) road_name = Column(String) road_geom = Column(Geometry(2)) - metadata.drop_all() metadata.create_all() session = sessionmaker(bind=engine)() # Add objects. We can use strings... - session.add_all([ - Road(road_name='Jeff Rd', road_geom='LINESTRING(191232 243118,191108 243242)'), - Road(road_name='Geordie Rd', road_geom='LINESTRING(189141 244158,189265 244817)'), - Road(road_name='Paul St', road_geom='LINESTRING(192783 228138,192612 229814)'), - Road(road_name='Graeme Ave', road_geom='LINESTRING(189412 252431,189631 259122)'), - Road(road_name='Phil Tce', road_geom='LINESTRING(190131 224148,190871 228134)'), - ]) + session.add_all( + [ + Road( + road_name="Jeff Rd", + road_geom="LINESTRING(191232 243118,191108 243242)", + ), + Road( + road_name="Geordie Rd", + road_geom="LINESTRING(189141 244158,189265 244817)", + ), + Road( + road_name="Paul St", + road_geom="LINESTRING(192783 228138,192612 229814)", + ), + Road( + road_name="Graeme Ave", + road_geom="LINESTRING(189412 252431,189631 259122)", + ), + Road( + road_name="Phil Tce", + road_geom="LINESTRING(190131 224148,190871 228134)", + ), + ] + ) # or use an explicit TextualGisElement (similar to saying func.GeomFromText()) - r = Road(road_name='Dave Cres', road_geom=TextualGisElement('LINESTRING(198231 263418,198213 268322)', -1)) + r = Road( + road_name="Dave Cres", + road_geom=TextualGisElement( + "LINESTRING(198231 263418,198213 268322)", -1 + ), + ) session.add(r) # pre flush, the TextualGisElement represents the string we sent. - assert str(r.road_geom) == 'LINESTRING(198231 263418,198213 268322)' + assert str(r.road_geom) == "LINESTRING(198231 263418,198213 268322)" session.commit() # after flush and/or commit, all the TextualGisElements become PersistentGisElements. assert str(r.road_geom) == "LINESTRING(198231 263418,198213 268322)" - r1 = session.query(Road).filter(Road.road_name == 'Graeme Ave').one() + r1 = session.query(Road).filter(Road.road_name == "Graeme Ave").one() # illustrate the overridden __eq__() operator. # strings come in as TextualGisElements - r2 = session.query(Road).filter(Road.road_geom == 'LINESTRING(189412 252431,189631 259122)').one() + r2 = ( + session.query(Road) + .filter(Road.road_geom == "LINESTRING(189412 252431,189631 259122)") + .one() + ) r3 = session.query(Road).filter(Road.road_geom == r1.road_geom).one() @@ -250,22 +313,29 @@ if __name__ == '__main__': # core usage just fine: road_table = Road.__table__ - stmt = select([road_table]).where(road_table.c.road_geom.intersects(r1.road_geom)) + stmt = select([road_table]).where( + road_table.c.road_geom.intersects(r1.road_geom) + ) print(session.execute(stmt).fetchall()) # TODO: for some reason the auto-generated labels have the internal replacement # strings exposed, even though PG doesn't complain # look up the hex binary version, using SQLAlchemy casts - as_binary = session.scalar(select([type_coerce(r.road_geom, Geometry(coerce_="binary"))])) - assert as_binary.as_hex == \ - '01020000000200000000000000b832084100000000e813104100000000283208410000000088601041' + as_binary = session.scalar( + select([type_coerce(r.road_geom, Geometry(coerce_="binary"))]) + ) + assert ( + as_binary.as_hex + == "01020000000200000000000000b832084100000000e813104100000000283208410000000088601041" + ) # back again, same method ! - as_text = session.scalar(select([type_coerce(as_binary, Geometry(coerce_="text"))])) + as_text = session.scalar( + select([type_coerce(as_binary, Geometry(coerce_="text"))]) + ) assert as_text.desc == "LINESTRING(198231 263418,198213 268322)" - session.rollback() metadata.drop_all() |