1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
|
# engine/ddl.py
# Copyright (C) 2009-2011 the SQLAlchemy authors and contributors <see AUTHORS file>
#
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
"""Routines to handle CREATE/DROP workflow."""
from sqlalchemy import engine, schema
from sqlalchemy.sql import util as sql_util
class DDLBase(schema.SchemaVisitor):
def __init__(self, connection):
self.connection = connection
class SchemaGenerator(DDLBase):
def __init__(self, dialect, connection, checkfirst=False, tables=None, **kwargs):
super(SchemaGenerator, self).__init__(connection, **kwargs)
self.checkfirst = checkfirst
self.tables = tables and set(tables) or None
self.preparer = dialect.identifier_preparer
self.dialect = dialect
def _can_create(self, table):
self.dialect.validate_identifier(table.name)
if table.schema:
self.dialect.validate_identifier(table.schema)
return not self.checkfirst or not self.dialect.has_table(self.connection, table.name, schema=table.schema)
def visit_metadata(self, metadata):
if self.tables:
tables = self.tables
else:
tables = metadata.tables.values()
collection = [t for t in sql_util.sort_tables(tables) if self._can_create(t)]
metadata.dispatch.before_create(metadata, self.connection,
tables=collection)
for table in collection:
self.traverse_single(table, create_ok=True)
metadata.dispatch.after_create(metadata, self.connection,
tables=collection)
def visit_table(self, table, create_ok=False):
if not create_ok and not self._can_create(table):
return
table.dispatch.before_create(table, self.connection)
for column in table.columns:
if column.default is not None:
self.traverse_single(column.default)
self.connection.execute(schema.CreateTable(table))
if hasattr(table, 'indexes'):
for index in table.indexes:
self.traverse_single(index)
table.dispatch.after_create(table, self.connection)
def visit_sequence(self, sequence):
if self.dialect.supports_sequences:
if ((not self.dialect.sequences_optional or
not sequence.optional) and
(not self.checkfirst or
not self.dialect.has_sequence(self.connection, sequence.name, schema=sequence.schema))):
self.connection.execute(schema.CreateSequence(sequence))
def visit_index(self, index):
self.connection.execute(schema.CreateIndex(index))
class SchemaDropper(DDLBase):
def __init__(self, dialect, connection, checkfirst=False, tables=None, **kwargs):
super(SchemaDropper, self).__init__(connection, **kwargs)
self.checkfirst = checkfirst
self.tables = tables
self.preparer = dialect.identifier_preparer
self.dialect = dialect
def visit_metadata(self, metadata):
if self.tables:
tables = self.tables
else:
tables = metadata.tables.values()
collection = [t for t in reversed(sql_util.sort_tables(tables)) if self._can_drop(t)]
metadata.dispatch.before_drop(metadata, self.connection,
tables=collection)
for table in collection:
self.traverse_single(table, drop_ok=True)
metadata.dispatch.after_drop(metadata, self.connection,
tables=collection)
def _can_drop(self, table):
self.dialect.validate_identifier(table.name)
if table.schema:
self.dialect.validate_identifier(table.schema)
return not self.checkfirst or self.dialect.has_table(self.connection, table.name, schema=table.schema)
def visit_index(self, index):
self.connection.execute(schema.DropIndex(index))
def visit_table(self, table, drop_ok=False):
if not drop_ok and not self._can_drop(table):
return
table.dispatch.before_drop(table, self.connection)
for column in table.columns:
if column.default is not None:
self.traverse_single(column.default)
self.connection.execute(schema.DropTable(table))
table.dispatch.after_drop(table, self.connection)
def visit_sequence(self, sequence):
if self.dialect.supports_sequences:
if ((not self.dialect.sequences_optional or
not sequence.optional) and
(not self.checkfirst or
self.dialect.has_sequence(self.connection, sequence.name, schema=sequence.schema))):
self.connection.execute(schema.DropSequence(sequence))
|