summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy
diff options
context:
space:
mode:
authorDavid Baumgold <david@davidbaumgold.com>2022-02-11 12:30:24 -0500
committerMike Bayer <mike_mp@zzzcomputing.com>2022-06-21 10:17:40 -0400
commit017fd9ae0645eaf2a0fbdd067d10c721505b018c (patch)
tree80adc525448f11b11bb34d0cf3b1a0e708725542 /lib/sqlalchemy
parent4e2a89c41b0bb423891767d10bdc3cb1b75eaa5e (diff)
downloadsqlalchemy-017fd9ae0645eaf2a0fbdd067d10c721505b018c.tar.gz
Domain type
Added a new Postgresql :class:`_postgresql.DOMAIN` datatype, which follows the same CREATE TYPE / DROP TYPE behaviors as that of PostgreSQL :class:`_postgresql.ENUM`. Much thanks to David Baumgold for the efforts on this. Fixes: #7316 Closes: #7317 Pull-request: https://github.com/sqlalchemy/sqlalchemy/pull/7317 Pull-request-sha: bc9a82f010e6ca2f70a6e8a7620b748e483c26c3 Change-Id: Id8d7e48843a896de17d20cc466b115b3cc065132
Diffstat (limited to 'lib/sqlalchemy')
-rw-r--r--lib/sqlalchemy/dialects/postgresql/__init__.py14
-rw-r--r--lib/sqlalchemy/dialects/postgresql/base.py303
-rw-r--r--lib/sqlalchemy/dialects/postgresql/named_types.py476
-rw-r--r--lib/sqlalchemy/dialects/postgresql/types.py285
-rw-r--r--lib/sqlalchemy/sql/compiler.py17
5 files changed, 734 insertions, 361 deletions
diff --git a/lib/sqlalchemy/dialects/postgresql/__init__.py b/lib/sqlalchemy/dialects/postgresql/__init__.py
index 85bbf8c5b..62195f59e 100644
--- a/lib/sqlalchemy/dialects/postgresql/__init__.py
+++ b/lib/sqlalchemy/dialects/postgresql/__init__.py
@@ -22,6 +22,7 @@ from .base import BIGINT
from .base import BOOLEAN
from .base import CHAR
from .base import DATE
+from .base import DOMAIN
from .base import DOUBLE_PRECISION
from .base import FLOAT
from .base import INTEGER
@@ -40,6 +41,12 @@ from .hstore import HSTORE
from .hstore import hstore
from .json import JSON
from .json import JSONB
+from .named_types import CreateDomainType
+from .named_types import CreateEnumType
+from .named_types import DropDomainType
+from .named_types import DropEnumType
+from .named_types import ENUM
+from .named_types import NamedType
from .ranges import DATERANGE
from .ranges import INT4RANGE
from .ranges import INT8RANGE
@@ -49,9 +56,6 @@ from .ranges import TSTZRANGE
from .types import BIT
from .types import BYTEA
from .types import CIDR
-from .types import CreateEnumType
-from .types import DropEnumType
-from .types import ENUM
from .types import INET
from .types import INTERVAL
from .types import MACADDR
@@ -97,6 +101,7 @@ __all__ = (
"INTERVAL",
"ARRAY",
"ENUM",
+ "DOMAIN",
"dialect",
"array",
"HSTORE",
@@ -113,6 +118,9 @@ __all__ = (
"Any",
"All",
"DropEnumType",
+ "DropDomainType",
+ "CreateDomainType",
+ "NamedType",
"CreateEnumType",
"ExcludeConstraint",
"aggregate_order_by",
diff --git a/lib/sqlalchemy/dialects/postgresql/base.py b/lib/sqlalchemy/dialects/postgresql/base.py
index 8402341f6..8fc24c933 100644
--- a/lib/sqlalchemy/dialects/postgresql/base.py
+++ b/lib/sqlalchemy/dialects/postgresql/base.py
@@ -1450,6 +1450,9 @@ from __future__ import annotations
from collections import defaultdict
from functools import lru_cache
import re
+from typing import Any
+from typing import List
+from typing import Optional
from . import array as _array
from . import dml
@@ -1457,30 +1460,34 @@ from . import hstore as _hstore
from . import json as _json
from . import pg_catalog
from . import ranges as _ranges
-from .types import _DECIMAL_TYPES # noqa
-from .types import _FLOAT_TYPES # noqa
-from .types import _INT_TYPES # noqa
-from .types import BIT
-from .types import BYTEA
-from .types import CIDR
-from .types import CreateEnumType # noqa
-from .types import DropEnumType # noqa
-from .types import ENUM
-from .types import INET
-from .types import INTERVAL
-from .types import MACADDR
-from .types import MONEY
-from .types import OID
-from .types import PGBit # noqa
-from .types import PGCidr # noqa
-from .types import PGInet # noqa
-from .types import PGInterval # noqa
-from .types import PGMacAddr # noqa
-from .types import PGUuid
-from .types import REGCLASS
-from .types import TIME
-from .types import TIMESTAMP
-from .types import TSVECTOR
+from .named_types import CreateDomainType as CreateDomainType # noqa: F401
+from .named_types import CreateEnumType as CreateEnumType # noqa: F401
+from .named_types import DOMAIN as DOMAIN # noqa: F401
+from .named_types import DropDomainType as DropDomainType # noqa: F401
+from .named_types import DropEnumType as DropEnumType # noqa: F401
+from .named_types import ENUM as ENUM # noqa: F401
+from .named_types import NamedType as NamedType # noqa: F401
+from .types import _DECIMAL_TYPES # noqa: F401
+from .types import _FLOAT_TYPES # noqa: F401
+from .types import _INT_TYPES # noqa: F401
+from .types import BIT as BIT
+from .types import BYTEA as BYTEA
+from .types import CIDR as CIDR
+from .types import INET as INET
+from .types import INTERVAL as INTERVAL
+from .types import MACADDR as MACADDR
+from .types import MONEY as MONEY
+from .types import OID as OID
+from .types import PGBit as PGBit # noqa: F401
+from .types import PGCidr as PGCidr # noqa: F401
+from .types import PGInet as PGInet # noqa: F401
+from .types import PGInterval as PGInterval # noqa: F401
+from .types import PGMacAddr as PGMacAddr # noqa: F401
+from .types import PGUuid as PGUuid
+from .types import REGCLASS as REGCLASS
+from .types import TIME as TIME
+from .types import TIMESTAMP as TIMESTAMP
+from .types import TSVECTOR as TSVECTOR
from ... import exc
from ... import schema
from ... import select
@@ -1515,6 +1522,7 @@ from ...types import SMALLINT
from ...types import TEXT
from ...types import UUID as UUID
from ...types import VARCHAR
+from ...util.typing import TypedDict
IDX_USING = re.compile(r"^(?:btree|hash|gist|gin|[\w_]+)$", re.I)
@@ -2198,6 +2206,38 @@ class PGDDLCompiler(compiler.DDLCompiler):
return "DROP TYPE %s" % (self.preparer.format_type(type_))
+ def visit_create_domain_type(self, create):
+ domain: DOMAIN = create.element
+
+ options = []
+ if domain.collation is not None:
+ options.append(f"COLLATE {self.preparer.quote(domain.collation)}")
+ if domain.default is not None:
+ default = self.render_default_string(domain.default)
+ options.append(f"DEFAULT {default}")
+ if domain.constraint_name is not None:
+ name = self.preparer.truncate_and_render_constraint_name(
+ domain.constraint_name
+ )
+ options.append(f"CONSTRAINT {name}")
+ if domain.not_null:
+ options.append("NOT NULL")
+ if domain.check is not None:
+ check = self.sql_compiler.process(
+ domain.check, include_table=False, literal_binds=True
+ )
+ options.append(f"CHECK ({check})")
+
+ return (
+ f"CREATE DOMAIN {self.preparer.format_type(domain)} AS "
+ f"{self.type_compiler.process(domain.data_type)} "
+ f"{' '.join(options)}"
+ )
+
+ def visit_drop_domain_type(self, drop):
+ domain = drop.element
+ return f"DROP DOMAIN {self.preparer.format_type(domain)}"
+
def visit_create_index(self, create):
preparer = self.preparer
index = create.element
@@ -2470,6 +2510,11 @@ class PGTypeCompiler(compiler.GenericTypeCompiler):
identifier_preparer = self.dialect.identifier_preparer
return identifier_preparer.format_type(type_)
+ def visit_DOMAIN(self, type_, identifier_preparer=None, **kw):
+ if identifier_preparer is None:
+ identifier_preparer = self.dialect.identifier_preparer
+ return identifier_preparer.format_type(type_)
+
def visit_TIMESTAMP(self, type_, **kw):
return "TIMESTAMP%s %s" % (
"(%d)" % type_.precision
@@ -2548,7 +2593,9 @@ class PGIdentifierPreparer(compiler.IdentifierPreparer):
def format_type(self, type_, use_schema=True):
if not type_.name:
- raise exc.CompileError("PostgreSQL ENUM type requires a name.")
+ raise exc.CompileError(
+ f"PostgreSQL {type_.__class__.__name__} type requires a name."
+ )
name = self.quote(type_.name)
effective_schema = self.schema_for_object(type_)
@@ -2558,14 +2605,60 @@ class PGIdentifierPreparer(compiler.IdentifierPreparer):
and use_schema
and effective_schema is not None
):
- name = self.quote_schema(effective_schema) + "." + name
+ name = f"{self.quote_schema(effective_schema)}.{name}"
return name
+class ReflectedNamedType(TypedDict):
+ """Represents a reflected named type."""
+
+ name: str
+ """Name of the type."""
+ schema: str
+ """The schema of the type."""
+ visible: bool
+ """Indicates if this type is in the current search path."""
+
+
+class ReflectedDomainConstraint(TypedDict):
+ """Represents a reflect check constraint of a domain."""
+
+ name: str
+ """Name of the constraint."""
+ check: str
+ """The check constraint text."""
+
+
+class ReflectedDomain(ReflectedNamedType):
+ """Represents a reflected enum."""
+
+ type: str
+ """The string name of the underlying data type of the domain."""
+ nullable: bool
+ """Indicates if the domain allows null or not."""
+ default: Optional[str]
+ """The string representation of the default value of this domain
+ or ``None`` if none present.
+ """
+ constraints: List[ReflectedDomainConstraint]
+ """The constraints defined in the domain, if any.
+ The constraint are in order of evaluation by postgresql.
+ """
+
+
+class ReflectedEnum(ReflectedNamedType):
+ """Represents a reflected enum."""
+
+ labels: List[str]
+ """The labels that compose the enum."""
+
+
class PGInspector(reflection.Inspector):
dialect: PGDialect
- def get_table_oid(self, table_name, schema=None):
+ def get_table_oid(
+ self, table_name: str, schema: Optional[str] = None
+ ) -> int:
"""Return the OID for the given table name.
:param table_name: string name of the table. For special quoting,
@@ -2582,7 +2675,38 @@ class PGInspector(reflection.Inspector):
conn, table_name, schema, info_cache=self.info_cache
)
- def get_enums(self, schema=None):
+ def get_domains(
+ self, schema: Optional[str] = None
+ ) -> List[ReflectedDomain]:
+ """Return a list of DOMAIN objects.
+
+ Each member is a dictionary containing these fields:
+
+ * name - name of the domain
+ * schema - the schema name for the domain.
+ * visible - boolean, whether or not this domain is visible
+ in the default search path.
+ * type - the type defined by this domain.
+ * nullable - Indicates if this domain can be ``NULL``.
+ * default - The default value of the domain or ``None`` if the
+ domain has no default.
+ * constraints - A list of dict wit the constraint defined by this
+ domain. Each element constaints two keys: ``name`` of the
+ constraint and ``check`` with the constraint text.
+
+ :param schema: schema name. If None, the default schema
+ (typically 'public') is used. May also be set to ``'*'`` to
+ indicate load domains for all schemas.
+
+ .. versionadded:: 2.0
+
+ """
+ with self._operation_context() as conn:
+ return self.dialect._load_domains(
+ conn, schema, info_cache=self.info_cache
+ )
+
+ def get_enums(self, schema: Optional[str] = None) -> List[ReflectedEnum]:
"""Return a list of ENUM objects.
Each member is a dictionary containing these fields:
@@ -2594,7 +2718,7 @@ class PGInspector(reflection.Inspector):
* labels - a list of string labels that apply to the enum.
:param schema: schema name. If None, the default schema
- (typically 'public') is used. May also be set to '*' to
+ (typically 'public') is used. May also be set to ``'*'`` to
indicate load enums for all schemas.
.. versionadded:: 1.0.0
@@ -2605,7 +2729,9 @@ class PGInspector(reflection.Inspector):
conn, schema, info_cache=self.info_cache
)
- def get_foreign_table_names(self, schema=None):
+ def get_foreign_table_names(
+ self, schema: Optional[str] = None
+ ) -> List[str]:
"""Return a list of FOREIGN TABLE names.
Behavior is similar to that of
@@ -2621,13 +2747,15 @@ class PGInspector(reflection.Inspector):
conn, schema, info_cache=self.info_cache
)
- def has_type(self, type_name, schema=None, **kw):
+ def has_type(
+ self, type_name: str, schema: Optional[str] = None, **kw: Any
+ ) -> bool:
"""Return if the database has the specified type in the provided
schema.
:param type_name: the type to check.
:param schema: schema name. If None, the default schema
- (typically 'public') is used. May also be set to '*' to
+ (typically 'public') is used. May also be set to ``'*'`` to
check in all schemas.
.. versionadded:: 2.0
@@ -2941,10 +3069,12 @@ class PGDialect(default.DefaultDialect):
pg_catalog.pg_namespace,
pg_catalog.pg_namespace.c.oid == pg_class_table.c.relnamespace,
)
+
if scope is ObjectScope.DEFAULT:
query = query.where(pg_class_table.c.relpersistence != "t")
elif scope is ObjectScope.TEMPORARY:
query = query.where(pg_class_table.c.relpersistence == "t")
+
if schema is None:
query = query.where(
pg_catalog.pg_table_is_visible(pg_class_table.c.oid),
@@ -3319,9 +3449,12 @@ class PGDialect(default.DefaultDialect):
# dictionary with (name, ) if default search path or (schema, name)
# as keys
- domains = self._load_domains(
- connection, info_cache=kw.get("info_cache")
- )
+ domains = {
+ ((d["schema"], d["name"]) if not d["visible"] else (d["name"],)): d
+ for d in self._load_domains(
+ connection, schema="*", info_cache=kw.get("info_cache")
+ )
+ }
# dictionary with (name, ) if default search path or (schema, name)
# as keys
@@ -3446,7 +3579,7 @@ class PGDialect(default.DefaultDialect):
break
elif enum_or_domain_key in domains:
domain = domains[enum_or_domain_key]
- attype = domain["attype"]
+ attype = domain["type"]
attype, is_array = _handle_array_type(attype)
# strip quotes from case sensitive enum or domain names
enum_or_domain_key = tuple(
@@ -3736,7 +3869,7 @@ class PGDialect(default.DefaultDialect):
@util.memoized_property
def _fk_regex_pattern(self):
- # https://www.postgresql.org/docs/14.0/static/sql-createtable.html
+ # https://www.postgresql.org/docs/current/static/sql-createtable.html
return re.compile(
r"FOREIGN KEY \((.*?)\) REFERENCES (?:(.*?)\.)?(.*?)\((.*?)\)"
r"[\s]?(MATCH (FULL|PARTIAL|SIMPLE)+)?"
@@ -4201,7 +4334,7 @@ class PGDialect(default.DefaultDialect):
(
pg_catalog.pg_constraint.c.oid.is_not(None),
pg_catalog.pg_get_constraintdef(
- pg_catalog.pg_constraint.c.oid
+ pg_catalog.pg_constraint.c.oid, True
),
),
else_=None,
@@ -4265,6 +4398,17 @@ class PGDialect(default.DefaultDialect):
check_constraints[(schema, table_name)].append(entry)
return check_constraints.items()
+ def _pg_type_filter_schema(self, query, schema):
+ if schema is None:
+ query = query.where(
+ pg_catalog.pg_type_is_visible(pg_catalog.pg_type.c.oid),
+ # ignore pg_catalog schema
+ pg_catalog.pg_namespace.c.nspname != "pg_catalog",
+ )
+ elif schema != "*":
+ query = query.where(pg_catalog.pg_namespace.c.nspname == schema)
+ return query
+
@lru_cache()
def _enum_query(self, schema):
lbl_sq = (
@@ -4310,15 +4454,7 @@ class PGDialect(default.DefaultDialect):
)
)
- if schema is None:
- query = query.where(
- pg_catalog.pg_type_is_visible(pg_catalog.pg_type.c.oid),
- # ignore pg_catalog schema
- pg_catalog.pg_namespace.c.nspname != "pg_catalog",
- )
- elif schema != "*":
- query = query.where(pg_catalog.pg_namespace.c.nspname == schema)
- return query
+ return self._pg_type_filter_schema(query, schema)
@reflection.cache
def _load_enums(self, connection, schema=None, **kw):
@@ -4339,9 +4475,27 @@ class PGDialect(default.DefaultDialect):
)
return enums
- @util.memoized_property
- def _domain_query(self):
- return (
+ @lru_cache()
+ def _domain_query(self, schema):
+ con_sq = (
+ select(
+ pg_catalog.pg_constraint.c.contypid,
+ sql.func.array_agg(
+ pg_catalog.pg_get_constraintdef(
+ pg_catalog.pg_constraint.c.oid, True
+ )
+ ).label("condefs"),
+ sql.func.array_agg(pg_catalog.pg_constraint.c.conname).label(
+ "connames"
+ ),
+ )
+ # The domain this constraint is on; zero if not a domain constraint
+ .where(pg_catalog.pg_constraint.c.contypid != 0)
+ .group_by(pg_catalog.pg_constraint.c.contypid)
+ .subquery("domain_constraints")
+ )
+
+ query = (
select(
pg_catalog.pg_type.c.typname.label("name"),
pg_catalog.format_type(
@@ -4354,38 +4508,57 @@ class PGDialect(default.DefaultDialect):
"visible"
),
pg_catalog.pg_namespace.c.nspname.label("schema"),
+ con_sq.c.condefs,
+ con_sq.c.connames,
)
.join(
pg_catalog.pg_namespace,
pg_catalog.pg_namespace.c.oid
== pg_catalog.pg_type.c.typnamespace,
)
+ .outerjoin(
+ con_sq,
+ pg_catalog.pg_type.c.oid == con_sq.c.contypid,
+ )
.where(pg_catalog.pg_type.c.typtype == "d")
+ .order_by(
+ pg_catalog.pg_namespace.c.nspname, pg_catalog.pg_type.c.typname
+ )
)
+ return self._pg_type_filter_schema(query, schema)
@reflection.cache
- def _load_domains(self, connection, **kw):
+ def _load_domains(self, connection, schema=None, **kw):
# Load data types for domains:
- result = connection.execute(self._domain_query)
+ result = connection.execute(self._domain_query(schema))
- domains = {}
+ domains = []
for domain in result.mappings():
- domain = domain
# strip (30) from character varying(30)
attype = re.search(r"([^\(]+)", domain["attype"]).group(1)
- # 'visible' just means whether or not the domain is in a
- # schema that's on the search path -- or not overridden by
- # a schema with higher precedence. If it's not visible,
- # it will be prefixed with the schema-name when it's used.
- if domain["visible"]:
- key = (domain["name"],)
- else:
- key = (domain["schema"], domain["name"])
-
- domains[key] = {
- "attype": attype,
+ constraints = []
+ if domain["connames"]:
+ # When a domain has multiple CHECK constraints, they will
+ # be tested in alphabetical order by name.
+ sorted_constraints = sorted(
+ zip(domain["connames"], domain["condefs"]),
+ key=lambda t: t[0],
+ )
+ for name, def_ in sorted_constraints:
+ # constraint is in the form "CHECK (expression)".
+ # remove "CHECK (" and the tailing ")".
+ check = def_[7:-1]
+ constraints.append({"name": name, "check": check})
+
+ domain_rec = {
+ "name": domain["name"],
+ "schema": domain["schema"],
+ "visible": domain["visible"],
+ "type": attype,
"nullable": domain["nullable"],
"default": domain["default"],
+ "constraints": constraints,
}
+ domains.append(domain_rec)
return domains
diff --git a/lib/sqlalchemy/dialects/postgresql/named_types.py b/lib/sqlalchemy/dialects/postgresql/named_types.py
new file mode 100644
index 000000000..b2f274b78
--- /dev/null
+++ b/lib/sqlalchemy/dialects/postgresql/named_types.py
@@ -0,0 +1,476 @@
+# postgresql/named_types.py
+# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+# mypy: ignore-errors
+from __future__ import annotations
+
+from typing import Any
+from typing import Optional
+from typing import Type
+from typing import TYPE_CHECKING
+from typing import Union
+
+from ... import schema
+from ... import util
+from ...sql import coercions
+from ...sql import elements
+from ...sql import roles
+from ...sql import sqltypes
+from ...sql import type_api
+from ...sql.ddl import InvokeDDLBase
+
+if TYPE_CHECKING:
+ from ...sql._typing import _TypeEngineArgument
+
+
+class NamedType(sqltypes.TypeEngine):
+ """Base for named types."""
+
+ __abstract__ = True
+ DDLGenerator: Type["NamedTypeGenerator"]
+ DDLDropper: Type["NamedTypeDropper"]
+ create_type: bool
+
+ def create(self, bind, checkfirst=True, **kw):
+ """Emit ``CREATE`` DDL for this type.
+
+ :param bind: a connectable :class:`_engine.Engine`,
+ :class:`_engine.Connection`, or similar object to emit
+ SQL.
+ :param checkfirst: if ``True``, a query against
+ the PG catalog will be first performed to see
+ if the type does not exist already before
+ creating.
+
+ """
+ bind._run_ddl_visitor(self.DDLGenerator, self, checkfirst=checkfirst)
+
+ def drop(self, bind, checkfirst=True, **kw):
+ """Emit ``DROP`` DDL for this type.
+
+ :param bind: a connectable :class:`_engine.Engine`,
+ :class:`_engine.Connection`, or similar object to emit
+ SQL.
+ :param checkfirst: if ``True``, a query against
+ the PG catalog will be first performed to see
+ if the type actually exists before dropping.
+
+ """
+ bind._run_ddl_visitor(self.DDLDropper, self, checkfirst=checkfirst)
+
+ def _check_for_name_in_memos(self, checkfirst, kw):
+ """Look in the 'ddl runner' for 'memos', then
+ note our name in that collection.
+
+ This to ensure a particular named type is operated
+ upon only once within any kind of create/drop
+ sequence without relying upon "checkfirst".
+
+ """
+ if not self.create_type:
+ return True
+ if "_ddl_runner" in kw:
+ ddl_runner = kw["_ddl_runner"]
+ type_name = f"pg_{self.__visit_name__}"
+ if type_name in ddl_runner.memo:
+ existing = ddl_runner.memo[type_name]
+ else:
+ existing = ddl_runner.memo[type_name] = set()
+ present = (self.schema, self.name) in existing
+ existing.add((self.schema, self.name))
+ return present
+ else:
+ return False
+
+ def _on_table_create(self, target, bind, checkfirst=False, **kw):
+ if (
+ checkfirst
+ or (
+ not self.metadata
+ and not kw.get("_is_metadata_operation", False)
+ )
+ ) and not self._check_for_name_in_memos(checkfirst, kw):
+ self.create(bind=bind, checkfirst=checkfirst)
+
+ def _on_table_drop(self, target, bind, checkfirst=False, **kw):
+ if (
+ not self.metadata
+ and not kw.get("_is_metadata_operation", False)
+ and not self._check_for_name_in_memos(checkfirst, kw)
+ ):
+ self.drop(bind=bind, checkfirst=checkfirst)
+
+ def _on_metadata_create(self, target, bind, checkfirst=False, **kw):
+ if not self._check_for_name_in_memos(checkfirst, kw):
+ self.create(bind=bind, checkfirst=checkfirst)
+
+ def _on_metadata_drop(self, target, bind, checkfirst=False, **kw):
+ if not self._check_for_name_in_memos(checkfirst, kw):
+ self.drop(bind=bind, checkfirst=checkfirst)
+
+
+class NamedTypeGenerator(InvokeDDLBase):
+ def __init__(self, dialect, connection, checkfirst=False, **kwargs):
+ super().__init__(connection, **kwargs)
+ self.checkfirst = checkfirst
+
+ def _can_create_type(self, type_):
+ if not self.checkfirst:
+ return True
+
+ effective_schema = self.connection.schema_for_object(type_)
+ return not self.connection.dialect.has_type(
+ self.connection, type_.name, schema=effective_schema
+ )
+
+
+class NamedTypeDropper(InvokeDDLBase):
+ def __init__(self, dialect, connection, checkfirst=False, **kwargs):
+ super().__init__(connection, **kwargs)
+ self.checkfirst = checkfirst
+
+ def _can_drop_type(self, type_):
+ if not self.checkfirst:
+ return True
+
+ effective_schema = self.connection.schema_for_object(type_)
+ return self.connection.dialect.has_type(
+ self.connection, type_.name, schema=effective_schema
+ )
+
+
+class EnumGenerator(NamedTypeGenerator):
+ def visit_enum(self, enum):
+ if not self._can_create_type(enum):
+ return
+
+ self.connection.execute(CreateEnumType(enum))
+
+
+class EnumDropper(NamedTypeDropper):
+ def visit_enum(self, enum):
+ if not self._can_drop_type(enum):
+ return
+
+ self.connection.execute(DropEnumType(enum))
+
+
+class ENUM(NamedType, sqltypes.NativeForEmulated, sqltypes.Enum):
+
+ """PostgreSQL ENUM type.
+
+ This is a subclass of :class:`_types.Enum` which includes
+ support for PG's ``CREATE TYPE`` and ``DROP TYPE``.
+
+ When the builtin type :class:`_types.Enum` is used and the
+ :paramref:`.Enum.native_enum` flag is left at its default of
+ True, the PostgreSQL backend will use a :class:`_postgresql.ENUM`
+ type as the implementation, so the special create/drop rules
+ will be used.
+
+ The create/drop behavior of ENUM is necessarily intricate, due to the
+ awkward relationship the ENUM type has in relationship to the
+ parent table, in that it may be "owned" by just a single table, or
+ may be shared among many tables.
+
+ When using :class:`_types.Enum` or :class:`_postgresql.ENUM`
+ in an "inline" fashion, the ``CREATE TYPE`` and ``DROP TYPE`` is emitted
+ corresponding to when the :meth:`_schema.Table.create` and
+ :meth:`_schema.Table.drop`
+ methods are called::
+
+ table = Table('sometable', metadata,
+ Column('some_enum', ENUM('a', 'b', 'c', name='myenum'))
+ )
+
+ table.create(engine) # will emit CREATE ENUM and CREATE TABLE
+ table.drop(engine) # will emit DROP TABLE and DROP ENUM
+
+ To use a common enumerated type between multiple tables, the best
+ practice is to declare the :class:`_types.Enum` or
+ :class:`_postgresql.ENUM` independently, and associate it with the
+ :class:`_schema.MetaData` object itself::
+
+ my_enum = ENUM('a', 'b', 'c', name='myenum', metadata=metadata)
+
+ t1 = Table('sometable_one', metadata,
+ Column('some_enum', myenum)
+ )
+
+ t2 = Table('sometable_two', metadata,
+ Column('some_enum', myenum)
+ )
+
+ When this pattern is used, care must still be taken at the level
+ of individual table creates. Emitting CREATE TABLE without also
+ specifying ``checkfirst=True`` will still cause issues::
+
+ t1.create(engine) # will fail: no such type 'myenum'
+
+ If we specify ``checkfirst=True``, the individual table-level create
+ operation will check for the ``ENUM`` and create if not exists::
+
+ # will check if enum exists, and emit CREATE TYPE if not
+ t1.create(engine, checkfirst=True)
+
+ When using a metadata-level ENUM type, the type will always be created
+ and dropped if either the metadata-wide create/drop is called::
+
+ metadata.create_all(engine) # will emit CREATE TYPE
+ metadata.drop_all(engine) # will emit DROP TYPE
+
+ The type can also be created and dropped directly::
+
+ my_enum.create(engine)
+ my_enum.drop(engine)
+
+ .. versionchanged:: 1.0.0 The PostgreSQL :class:`_postgresql.ENUM` type
+ now behaves more strictly with regards to CREATE/DROP. A metadata-level
+ ENUM type will only be created and dropped at the metadata level,
+ not the table level, with the exception of
+ ``table.create(checkfirst=True)``.
+ The ``table.drop()`` call will now emit a DROP TYPE for a table-level
+ enumerated type.
+
+ """
+
+ native_enum = True
+ DDLGenerator = EnumGenerator
+ DDLDropper = EnumDropper
+
+ def __init__(self, *enums, name: str, create_type: bool = True, **kw):
+ """Construct an :class:`_postgresql.ENUM`.
+
+ Arguments are the same as that of
+ :class:`_types.Enum`, but also including
+ the following parameters.
+
+ :param create_type: Defaults to True.
+ Indicates that ``CREATE TYPE`` should be
+ emitted, after optionally checking for the
+ presence of the type, when the parent
+ table is being created; and additionally
+ that ``DROP TYPE`` is called when the table
+ is dropped. When ``False``, no check
+ will be performed and no ``CREATE TYPE``
+ or ``DROP TYPE`` is emitted, unless
+ :meth:`~.postgresql.ENUM.create`
+ or :meth:`~.postgresql.ENUM.drop`
+ are called directly.
+ Setting to ``False`` is helpful
+ when invoking a creation scheme to a SQL file
+ without access to the actual database -
+ the :meth:`~.postgresql.ENUM.create` and
+ :meth:`~.postgresql.ENUM.drop` methods can
+ be used to emit SQL to a target bind.
+
+ """
+ native_enum = kw.pop("native_enum", None)
+ if native_enum is False:
+ util.warn(
+ "the native_enum flag does not apply to the "
+ "sqlalchemy.dialects.postgresql.ENUM datatype; this type "
+ "always refers to ENUM. Use sqlalchemy.types.Enum for "
+ "non-native enum."
+ )
+ self.create_type = create_type
+ super().__init__(*enums, name=name, **kw)
+
+ @classmethod
+ def __test_init__(cls):
+ return cls(name="name")
+
+ @classmethod
+ def adapt_emulated_to_native(cls, impl, **kw):
+ """Produce a PostgreSQL native :class:`_postgresql.ENUM` from plain
+ :class:`.Enum`.
+
+ """
+ kw.setdefault("validate_strings", impl.validate_strings)
+ kw.setdefault("name", impl.name)
+ kw.setdefault("schema", impl.schema)
+ kw.setdefault("inherit_schema", impl.inherit_schema)
+ kw.setdefault("metadata", impl.metadata)
+ kw.setdefault("_create_events", False)
+ kw.setdefault("values_callable", impl.values_callable)
+ kw.setdefault("omit_aliases", impl._omit_aliases)
+ return cls(**kw)
+
+ def create(self, bind=None, checkfirst=True):
+ """Emit ``CREATE TYPE`` for this
+ :class:`_postgresql.ENUM`.
+
+ If the underlying dialect does not support
+ PostgreSQL CREATE TYPE, no action is taken.
+
+ :param bind: a connectable :class:`_engine.Engine`,
+ :class:`_engine.Connection`, or similar object to emit
+ SQL.
+ :param checkfirst: if ``True``, a query against
+ the PG catalog will be first performed to see
+ if the type does not exist already before
+ creating.
+
+ """
+ if not bind.dialect.supports_native_enum:
+ return
+
+ super().create(bind, checkfirst=checkfirst)
+
+ def drop(self, bind=None, checkfirst=True):
+ """Emit ``DROP TYPE`` for this
+ :class:`_postgresql.ENUM`.
+
+ If the underlying dialect does not support
+ PostgreSQL DROP TYPE, no action is taken.
+
+ :param bind: a connectable :class:`_engine.Engine`,
+ :class:`_engine.Connection`, or similar object to emit
+ SQL.
+ :param checkfirst: if ``True``, a query against
+ the PG catalog will be first performed to see
+ if the type actually exists before dropping.
+
+ """
+ if not bind.dialect.supports_native_enum:
+ return
+
+ super().drop(bind, checkfirst=checkfirst)
+
+ def get_dbapi_type(self, dbapi):
+ """dont return dbapi.STRING for ENUM in PostgreSQL, since that's
+ a different type"""
+
+ return None
+
+
+class DomainGenerator(NamedTypeGenerator):
+ def visit_DOMAIN(self, domain):
+ if not self._can_create_type(domain):
+ return
+ self.connection.execute(CreateDomainType(domain))
+
+
+class DomainDropper(NamedTypeDropper):
+ def visit_DOMAIN(self, domain):
+ if not self._can_drop_type(domain):
+ return
+
+ self.connection.execute(DropDomainType(domain))
+
+
+class DOMAIN(NamedType, sqltypes.SchemaType):
+ r"""Represent the DOMAIN PostgreSQL type.
+
+ A domain is essentially a data type with optional constraints
+ that restrict the allowed set of values. E.g.::
+
+ PositiveInt = Domain(
+ "pos_int", Integer, check="VALUE > 0", not_null=True
+ )
+
+ UsPostalCode = Domain(
+ "us_postal_code",
+ Text,
+ check="VALUE ~ '^\d{5}$' OR VALUE ~ '^\d{5}-\d{4}$'"
+ )
+
+ See the `PostgreSQL documentation`__ for additional details
+
+ __ https://www.postgresql.org/docs/current/sql-createdomain.html
+
+ .. versionadded:: 2.0
+
+ """
+
+ DDLGenerator = DomainGenerator
+ DDLDropper = DomainDropper
+
+ __visit_name__ = "DOMAIN"
+
+ def __init__(
+ self,
+ name: str,
+ data_type: _TypeEngineArgument[Any],
+ *,
+ collation: Optional[str] = None,
+ default: Optional[Union[str, elements.TextClause]] = None,
+ constraint_name: Optional[str] = None,
+ not_null: Optional[bool] = None,
+ check: Optional[str] = None,
+ create_type: bool = True,
+ **kw: Any,
+ ):
+ """
+ Construct a DOMAIN.
+
+ :param name: the name of the domain
+ :param data_type: The underlying data type of the domain.
+ This can include array specifiers.
+ :param collation: An optional collation for the domain.
+ If no collation is specified, the underlying data type's default
+ collation is used. The underlying type must be collatable if
+ ``collation`` is specified.
+ :param default: The DEFAULT clause specifies a default value for
+ columns of the domain data type. The default should be a string
+ or a :func:`_expression.text` value.
+ If no default value is specified, then the default value is
+ the null value.
+ :param constraint_name: An optional name for a constraint.
+ If not specified, the backend generates a name.
+ :param not_null: Values of this domain are prevented from being null.
+ By default domain are allowed to be null. If not specified
+ no nullability clause will be emitted.
+ :param check: CHECK clause specify integrity constraint or test
+ which values of the domain must satisfy. A constraint must be
+ an expression producing a Boolean result that can use the key
+ word VALUE to refer to the value being tested.
+ Differently from PostgreSQL, only a single check clause is
+ currently allowed in SQLAlchemy.
+ :param schema: optional schema name
+ :param metadata: optional :class:`_schema.MetaData` object which
+ this :class:`_postgresql.DOMAIN` will be directly associated
+ :param create_type: Defaults to True.
+ Indicates that ``CREATE TYPE`` should be emitted, after optionally
+ checking for the presence of the type, when the parent table is
+ being created; and additionally that ``DROP TYPE`` is called
+ when the table is dropped.
+
+ """
+ self.data_type = type_api.to_instance(data_type)
+ self.default = default
+ self.collation = collation
+ self.constraint_name = constraint_name
+ self.not_null = not_null
+ if check is not None:
+ check = coercions.expect(roles.DDLExpressionRole, check)
+ self.check = check
+ self.create_type = create_type
+ super().__init__(name=name, **kw)
+
+ @classmethod
+ def __test_init__(cls):
+ return cls("name", sqltypes.Integer)
+
+
+class CreateEnumType(schema._CreateDropBase):
+ __visit_name__ = "create_enum_type"
+
+
+class DropEnumType(schema._CreateDropBase):
+ __visit_name__ = "drop_enum_type"
+
+
+class CreateDomainType(schema._CreateDropBase):
+ """Represent a CREATE DOMAIN statement."""
+
+ __visit_name__ = "create_domain_type"
+
+
+class DropDomainType(schema._CreateDropBase):
+ """Represent a DROP DOMAIN statement."""
+
+ __visit_name__ = "drop_domain_type"
diff --git a/lib/sqlalchemy/dialects/postgresql/types.py b/lib/sqlalchemy/dialects/postgresql/types.py
index 55735953b..374adcac1 100644
--- a/lib/sqlalchemy/dialects/postgresql/types.py
+++ b/lib/sqlalchemy/dialects/postgresql/types.py
@@ -8,10 +8,7 @@
import datetime as dt
from typing import Any
-from ... import schema
-from ... import util
from ...sql import sqltypes
-from ...sql.ddl import InvokeDDLBase
_DECIMAL_TYPES = (1231, 1700)
@@ -201,285 +198,3 @@ class TSVECTOR(sqltypes.TypeEngine[Any]):
"""
__visit_name__ = "TSVECTOR"
-
-
-class ENUM(sqltypes.NativeForEmulated, sqltypes.Enum):
-
- """PostgreSQL ENUM type.
-
- This is a subclass of :class:`_types.Enum` which includes
- support for PG's ``CREATE TYPE`` and ``DROP TYPE``.
-
- When the builtin type :class:`_types.Enum` is used and the
- :paramref:`.Enum.native_enum` flag is left at its default of
- True, the PostgreSQL backend will use a :class:`_postgresql.ENUM`
- type as the implementation, so the special create/drop rules
- will be used.
-
- The create/drop behavior of ENUM is necessarily intricate, due to the
- awkward relationship the ENUM type has in relationship to the
- parent table, in that it may be "owned" by just a single table, or
- may be shared among many tables.
-
- When using :class:`_types.Enum` or :class:`_postgresql.ENUM`
- in an "inline" fashion, the ``CREATE TYPE`` and ``DROP TYPE`` is emitted
- corresponding to when the :meth:`_schema.Table.create` and
- :meth:`_schema.Table.drop`
- methods are called::
-
- table = Table('sometable', metadata,
- Column('some_enum', ENUM('a', 'b', 'c', name='myenum'))
- )
-
- table.create(engine) # will emit CREATE ENUM and CREATE TABLE
- table.drop(engine) # will emit DROP TABLE and DROP ENUM
-
- To use a common enumerated type between multiple tables, the best
- practice is to declare the :class:`_types.Enum` or
- :class:`_postgresql.ENUM` independently, and associate it with the
- :class:`_schema.MetaData` object itself::
-
- my_enum = ENUM('a', 'b', 'c', name='myenum', metadata=metadata)
-
- t1 = Table('sometable_one', metadata,
- Column('some_enum', myenum)
- )
-
- t2 = Table('sometable_two', metadata,
- Column('some_enum', myenum)
- )
-
- When this pattern is used, care must still be taken at the level
- of individual table creates. Emitting CREATE TABLE without also
- specifying ``checkfirst=True`` will still cause issues::
-
- t1.create(engine) # will fail: no such type 'myenum'
-
- If we specify ``checkfirst=True``, the individual table-level create
- operation will check for the ``ENUM`` and create if not exists::
-
- # will check if enum exists, and emit CREATE TYPE if not
- t1.create(engine, checkfirst=True)
-
- When using a metadata-level ENUM type, the type will always be created
- and dropped if either the metadata-wide create/drop is called::
-
- metadata.create_all(engine) # will emit CREATE TYPE
- metadata.drop_all(engine) # will emit DROP TYPE
-
- The type can also be created and dropped directly::
-
- my_enum.create(engine)
- my_enum.drop(engine)
-
- .. versionchanged:: 1.0.0 The PostgreSQL :class:`_postgresql.ENUM` type
- now behaves more strictly with regards to CREATE/DROP. A metadata-level
- ENUM type will only be created and dropped at the metadata level,
- not the table level, with the exception of
- ``table.create(checkfirst=True)``.
- The ``table.drop()`` call will now emit a DROP TYPE for a table-level
- enumerated type.
-
- """
-
- native_enum = True
-
- def __init__(self, *enums, **kw):
- """Construct an :class:`_postgresql.ENUM`.
-
- Arguments are the same as that of
- :class:`_types.Enum`, but also including
- the following parameters.
-
- :param create_type: Defaults to True.
- Indicates that ``CREATE TYPE`` should be
- emitted, after optionally checking for the
- presence of the type, when the parent
- table is being created; and additionally
- that ``DROP TYPE`` is called when the table
- is dropped. When ``False``, no check
- will be performed and no ``CREATE TYPE``
- or ``DROP TYPE`` is emitted, unless
- :meth:`~.postgresql.ENUM.create`
- or :meth:`~.postgresql.ENUM.drop`
- are called directly.
- Setting to ``False`` is helpful
- when invoking a creation scheme to a SQL file
- without access to the actual database -
- the :meth:`~.postgresql.ENUM.create` and
- :meth:`~.postgresql.ENUM.drop` methods can
- be used to emit SQL to a target bind.
-
- """
- native_enum = kw.pop("native_enum", None)
- if native_enum is False:
- util.warn(
- "the native_enum flag does not apply to the "
- "sqlalchemy.dialects.postgresql.ENUM datatype; this type "
- "always refers to ENUM. Use sqlalchemy.types.Enum for "
- "non-native enum."
- )
- self.create_type = kw.pop("create_type", True)
- super(ENUM, self).__init__(*enums, **kw)
-
- @classmethod
- def adapt_emulated_to_native(cls, impl, **kw):
- """Produce a PostgreSQL native :class:`_postgresql.ENUM` from plain
- :class:`.Enum`.
-
- """
- kw.setdefault("validate_strings", impl.validate_strings)
- kw.setdefault("name", impl.name)
- kw.setdefault("schema", impl.schema)
- kw.setdefault("inherit_schema", impl.inherit_schema)
- kw.setdefault("metadata", impl.metadata)
- kw.setdefault("_create_events", False)
- kw.setdefault("values_callable", impl.values_callable)
- kw.setdefault("omit_aliases", impl._omit_aliases)
- return cls(**kw)
-
- def create(self, bind=None, checkfirst=True):
- """Emit ``CREATE TYPE`` for this
- :class:`_postgresql.ENUM`.
-
- If the underlying dialect does not support
- PostgreSQL CREATE TYPE, no action is taken.
-
- :param bind: a connectable :class:`_engine.Engine`,
- :class:`_engine.Connection`, or similar object to emit
- SQL.
- :param checkfirst: if ``True``, a query against
- the PG catalog will be first performed to see
- if the type does not exist already before
- creating.
-
- """
- if not bind.dialect.supports_native_enum:
- return
-
- bind._run_ddl_visitor(self.EnumGenerator, self, checkfirst=checkfirst)
-
- def drop(self, bind=None, checkfirst=True):
- """Emit ``DROP TYPE`` for this
- :class:`_postgresql.ENUM`.
-
- If the underlying dialect does not support
- PostgreSQL DROP TYPE, no action is taken.
-
- :param bind: a connectable :class:`_engine.Engine`,
- :class:`_engine.Connection`, or similar object to emit
- SQL.
- :param checkfirst: if ``True``, a query against
- the PG catalog will be first performed to see
- if the type actually exists before dropping.
-
- """
- if not bind.dialect.supports_native_enum:
- return
-
- bind._run_ddl_visitor(self.EnumDropper, self, checkfirst=checkfirst)
-
- class EnumGenerator(InvokeDDLBase):
- def __init__(self, dialect, connection, checkfirst=False, **kwargs):
- super(ENUM.EnumGenerator, self).__init__(connection, **kwargs)
- self.checkfirst = checkfirst
-
- def _can_create_enum(self, enum):
- if not self.checkfirst:
- return True
-
- effective_schema = self.connection.schema_for_object(enum)
-
- return not self.connection.dialect.has_type(
- self.connection, enum.name, schema=effective_schema
- )
-
- def visit_enum(self, enum):
- if not self._can_create_enum(enum):
- return
-
- self.connection.execute(CreateEnumType(enum))
-
- class EnumDropper(InvokeDDLBase):
- def __init__(self, dialect, connection, checkfirst=False, **kwargs):
- super(ENUM.EnumDropper, self).__init__(connection, **kwargs)
- self.checkfirst = checkfirst
-
- def _can_drop_enum(self, enum):
- if not self.checkfirst:
- return True
-
- effective_schema = self.connection.schema_for_object(enum)
-
- return self.connection.dialect.has_type(
- self.connection, enum.name, schema=effective_schema
- )
-
- def visit_enum(self, enum):
- if not self._can_drop_enum(enum):
- return
-
- self.connection.execute(DropEnumType(enum))
-
- def get_dbapi_type(self, dbapi):
- """dont return dbapi.STRING for ENUM in PostgreSQL, since that's
- a different type"""
-
- return None
-
- def _check_for_name_in_memos(self, checkfirst, kw):
- """Look in the 'ddl runner' for 'memos', then
- note our name in that collection.
-
- This to ensure a particular named enum is operated
- upon only once within any kind of create/drop
- sequence without relying upon "checkfirst".
-
- """
- if not self.create_type:
- return True
- if "_ddl_runner" in kw:
- ddl_runner = kw["_ddl_runner"]
- if "_pg_enums" in ddl_runner.memo:
- pg_enums = ddl_runner.memo["_pg_enums"]
- else:
- pg_enums = ddl_runner.memo["_pg_enums"] = set()
- present = (self.schema, self.name) in pg_enums
- pg_enums.add((self.schema, self.name))
- return present
- else:
- return False
-
- def _on_table_create(self, target, bind, checkfirst=False, **kw):
- if (
- checkfirst
- or (
- not self.metadata
- and not kw.get("_is_metadata_operation", False)
- )
- ) and not self._check_for_name_in_memos(checkfirst, kw):
- self.create(bind=bind, checkfirst=checkfirst)
-
- def _on_table_drop(self, target, bind, checkfirst=False, **kw):
- if (
- not self.metadata
- and not kw.get("_is_metadata_operation", False)
- and not self._check_for_name_in_memos(checkfirst, kw)
- ):
- self.drop(bind=bind, checkfirst=checkfirst)
-
- def _on_metadata_create(self, target, bind, checkfirst=False, **kw):
- if not self._check_for_name_in_memos(checkfirst, kw):
- self.create(bind=bind, checkfirst=checkfirst)
-
- def _on_metadata_drop(self, target, bind, checkfirst=False, **kw):
- if not self._check_for_name_in_memos(checkfirst, kw):
- self.drop(bind=bind, checkfirst=checkfirst)
-
-
-class CreateEnumType(schema._CreateDropBase):
- __visit_name__ = "create_enum_type"
-
-
-class DropEnumType(schema._CreateDropBase):
- __visit_name__ = "drop_enum_type"
diff --git a/lib/sqlalchemy/sql/compiler.py b/lib/sqlalchemy/sql/compiler.py
index 8ce0c65e4..1ad547b79 100644
--- a/lib/sqlalchemy/sql/compiler.py
+++ b/lib/sqlalchemy/sql/compiler.py
@@ -5251,17 +5251,18 @@ class DDLCompiler(Compiled):
def get_column_default_string(self, column):
if isinstance(column.server_default, schema.DefaultClause):
- if isinstance(column.server_default.arg, str):
- return self.sql_compiler.render_literal_value(
- column.server_default.arg, sqltypes.STRINGTYPE
- )
- else:
- return self.sql_compiler.process(
- column.server_default.arg, literal_binds=True
- )
+ return self.render_default_string(column.server_default.arg)
else:
return None
+ def render_default_string(self, default):
+ if isinstance(default, str):
+ return self.sql_compiler.render_literal_value(
+ default, sqltypes.STRINGTYPE
+ )
+ else:
+ return self.sql_compiler.process(default, literal_binds=True)
+
def visit_table_or_column_check_constraint(self, constraint, **kw):
if constraint.is_column_level:
return self.visit_column_check_constraint(constraint)