summaryrefslogtreecommitdiff
path: root/examples/postgis/postgis.py
diff options
context:
space:
mode:
Diffstat (limited to 'examples/postgis/postgis.py')
-rw-r--r--examples/postgis/postgis.py186
1 files changed, 128 insertions, 58 deletions
diff --git a/examples/postgis/postgis.py b/examples/postgis/postgis.py
index ffea3d018..508d63398 100644
--- a/examples/postgis/postgis.py
+++ b/examples/postgis/postgis.py
@@ -5,6 +5,7 @@ import binascii
# Python datatypes
+
class GisElement(object):
"""Represents a geometry value."""
@@ -12,16 +13,21 @@ class GisElement(object):
return self.desc
def __repr__(self):
- return "<%s at 0x%x; %r>" % (self.__class__.__name__,
- id(self), self.desc)
+ return "<%s at 0x%x; %r>" % (
+ self.__class__.__name__,
+ id(self),
+ self.desc,
+ )
+
class BinaryGisElement(GisElement, expression.Function):
"""Represents a Geometry value expressed as binary."""
def __init__(self, data):
self.data = data
- expression.Function.__init__(self, "ST_GeomFromEWKB", data,
- type_=Geometry(coerce_="binary"))
+ expression.Function.__init__(
+ self, "ST_GeomFromEWKB", data, type_=Geometry(coerce_="binary")
+ )
@property
def desc(self):
@@ -31,24 +37,26 @@ class BinaryGisElement(GisElement, expression.Function):
def as_hex(self):
return binascii.hexlify(self.data)
+
class TextualGisElement(GisElement, expression.Function):
"""Represents a Geometry value expressed as text."""
def __init__(self, desc, srid=-1):
self.desc = desc
- expression.Function.__init__(self, "ST_GeomFromText", desc, srid,
- type_=Geometry)
+ expression.Function.__init__(
+ self, "ST_GeomFromText", desc, srid, type_=Geometry
+ )
# SQL datatypes.
+
class Geometry(UserDefinedType):
"""Base PostGIS Geometry column type."""
name = "GEOMETRY"
- def __init__(self, dimension=None, srid=-1,
- coerce_="text"):
+ def __init__(self, dimension=None, srid=-1, coerce_="text"):
self.dimension = dimension
self.srid = srid
self.coerce = coerce_
@@ -58,11 +66,11 @@ class Geometry(UserDefinedType):
# override the __eq__() operator
def __eq__(self, other):
- return self.op('~=')(other)
+ return self.op("~=")(other)
# add a custom operator
def intersects(self, other):
- return self.op('&&')(other)
+ return self.op("&&")(other)
# any number of GIS operators can be overridden/added here
# using the techniques above.
@@ -95,6 +103,7 @@ class Geometry(UserDefinedType):
return value.desc
else:
return value
+
return process
def result_processor(self, dialect, coltype):
@@ -104,27 +113,35 @@ class Geometry(UserDefinedType):
fac = BinaryGisElement
else:
assert False
+
def process(value):
if value is not None:
return fac(value)
else:
return value
+
return process
def adapt(self, impltype):
- return impltype(dimension=self.dimension,
- srid=self.srid, coerce_=self.coerce)
+ return impltype(
+ dimension=self.dimension, srid=self.srid, coerce_=self.coerce
+ )
+
# other datatypes can be added as needed.
+
class Point(Geometry):
- name = 'POINT'
+ name = "POINT"
+
class Curve(Geometry):
- name = 'CURVE'
+ name = "CURVE"
+
class LineString(Curve):
- name = 'LINESTRING'
+ name = "LINESTRING"
+
# ... etc.
@@ -135,6 +152,7 @@ class LineString(Curve):
# versions don't appear to require these special steps anymore. However,
# here we illustrate how to set up these features in any case.
+
def setup_ddl_events():
@event.listens_for(Table, "before_create")
def before_create(target, connection, **kw):
@@ -153,9 +171,10 @@ def setup_ddl_events():
dispatch("after-drop", target, connection)
def dispatch(event, table, bind):
- if event in ('before-create', 'before-drop'):
- regular_cols = [c for c in table.c if not
- isinstance(c.type, Geometry)]
+ if event in ("before-create", "before-drop"):
+ regular_cols = [
+ c for c in table.c if not isinstance(c.type, Geometry)
+ ]
gis_cols = set(table.c).difference(regular_cols)
table.info["_saved_columns"] = table.c
@@ -163,85 +182,129 @@ def setup_ddl_events():
# Geometry columns
table.columns = expression.ColumnCollection(*regular_cols)
- if event == 'before-drop':
+ if event == "before-drop":
for c in gis_cols:
bind.execute(
- select([
+ select(
+ [
func.DropGeometryColumn(
- 'public', table.name, c.name)],
- autocommit=True)
- )
+ "public", table.name, c.name
+ )
+ ],
+ autocommit=True,
+ )
+ )
- elif event == 'after-create':
- table.columns = table.info.pop('_saved_columns')
+ elif event == "after-create":
+ table.columns = table.info.pop("_saved_columns")
for c in table.c:
if isinstance(c.type, Geometry):
bind.execute(
- select([
- func.AddGeometryColumn(
- table.name, c.name,
- c.type.srid,
- c.type.name,
- c.type.dimension)],
- autocommit=True)
+ select(
+ [
+ func.AddGeometryColumn(
+ table.name,
+ c.name,
+ c.type.srid,
+ c.type.name,
+ c.type.dimension,
+ )
+ ],
+ autocommit=True,
)
- elif event == 'after-drop':
- table.columns = table.info.pop('_saved_columns')
-setup_ddl_events()
+ )
+ elif event == "after-drop":
+ table.columns = table.info.pop("_saved_columns")
+setup_ddl_events()
+
# illustrate usage
-if __name__ == '__main__':
- from sqlalchemy import (create_engine, MetaData, Column, Integer, String,
- func, select)
+if __name__ == "__main__":
+ from sqlalchemy import (
+ create_engine,
+ MetaData,
+ Column,
+ Integer,
+ String,
+ func,
+ select,
+ )
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
- engine = create_engine('postgresql://scott:tiger@localhost/test', echo=True)
+ engine = create_engine(
+ "postgresql://scott:tiger@localhost/test", echo=True
+ )
metadata = MetaData(engine)
Base = declarative_base(metadata=metadata)
class Road(Base):
- __tablename__ = 'roads'
+ __tablename__ = "roads"
road_id = Column(Integer, primary_key=True)
road_name = Column(String)
road_geom = Column(Geometry(2))
-
metadata.drop_all()
metadata.create_all()
session = sessionmaker(bind=engine)()
# Add objects. We can use strings...
- session.add_all([
- Road(road_name='Jeff Rd', road_geom='LINESTRING(191232 243118,191108 243242)'),
- Road(road_name='Geordie Rd', road_geom='LINESTRING(189141 244158,189265 244817)'),
- Road(road_name='Paul St', road_geom='LINESTRING(192783 228138,192612 229814)'),
- Road(road_name='Graeme Ave', road_geom='LINESTRING(189412 252431,189631 259122)'),
- Road(road_name='Phil Tce', road_geom='LINESTRING(190131 224148,190871 228134)'),
- ])
+ session.add_all(
+ [
+ Road(
+ road_name="Jeff Rd",
+ road_geom="LINESTRING(191232 243118,191108 243242)",
+ ),
+ Road(
+ road_name="Geordie Rd",
+ road_geom="LINESTRING(189141 244158,189265 244817)",
+ ),
+ Road(
+ road_name="Paul St",
+ road_geom="LINESTRING(192783 228138,192612 229814)",
+ ),
+ Road(
+ road_name="Graeme Ave",
+ road_geom="LINESTRING(189412 252431,189631 259122)",
+ ),
+ Road(
+ road_name="Phil Tce",
+ road_geom="LINESTRING(190131 224148,190871 228134)",
+ ),
+ ]
+ )
# or use an explicit TextualGisElement (similar to saying func.GeomFromText())
- r = Road(road_name='Dave Cres', road_geom=TextualGisElement('LINESTRING(198231 263418,198213 268322)', -1))
+ r = Road(
+ road_name="Dave Cres",
+ road_geom=TextualGisElement(
+ "LINESTRING(198231 263418,198213 268322)", -1
+ ),
+ )
session.add(r)
# pre flush, the TextualGisElement represents the string we sent.
- assert str(r.road_geom) == 'LINESTRING(198231 263418,198213 268322)'
+ assert str(r.road_geom) == "LINESTRING(198231 263418,198213 268322)"
session.commit()
# after flush and/or commit, all the TextualGisElements become PersistentGisElements.
assert str(r.road_geom) == "LINESTRING(198231 263418,198213 268322)"
- r1 = session.query(Road).filter(Road.road_name == 'Graeme Ave').one()
+ r1 = session.query(Road).filter(Road.road_name == "Graeme Ave").one()
# illustrate the overridden __eq__() operator.
# strings come in as TextualGisElements
- r2 = session.query(Road).filter(Road.road_geom == 'LINESTRING(189412 252431,189631 259122)').one()
+ r2 = (
+ session.query(Road)
+ .filter(Road.road_geom == "LINESTRING(189412 252431,189631 259122)")
+ .one()
+ )
r3 = session.query(Road).filter(Road.road_geom == r1.road_geom).one()
@@ -250,22 +313,29 @@ if __name__ == '__main__':
# core usage just fine:
road_table = Road.__table__
- stmt = select([road_table]).where(road_table.c.road_geom.intersects(r1.road_geom))
+ stmt = select([road_table]).where(
+ road_table.c.road_geom.intersects(r1.road_geom)
+ )
print(session.execute(stmt).fetchall())
# TODO: for some reason the auto-generated labels have the internal replacement
# strings exposed, even though PG doesn't complain
# look up the hex binary version, using SQLAlchemy casts
- as_binary = session.scalar(select([type_coerce(r.road_geom, Geometry(coerce_="binary"))]))
- assert as_binary.as_hex == \
- '01020000000200000000000000b832084100000000e813104100000000283208410000000088601041'
+ as_binary = session.scalar(
+ select([type_coerce(r.road_geom, Geometry(coerce_="binary"))])
+ )
+ assert (
+ as_binary.as_hex
+ == "01020000000200000000000000b832084100000000e813104100000000283208410000000088601041"
+ )
# back again, same method !
- as_text = session.scalar(select([type_coerce(as_binary, Geometry(coerce_="text"))]))
+ as_text = session.scalar(
+ select([type_coerce(as_binary, Geometry(coerce_="text"))])
+ )
assert as_text.desc == "LINESTRING(198231 263418,198213 268322)"
-
session.rollback()
metadata.drop_all()