summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CHANGES23
-rw-r--r--lib/sqlalchemy/__init__.py2
-rw-r--r--lib/sqlalchemy/orm/dependency.py53
-rw-r--r--lib/sqlalchemy/orm/sync.py3
-rw-r--r--lib/sqlalchemy/orm/unitofwork.py56
-rw-r--r--test/orm/test_cascade.py146
-rw-r--r--test/orm/test_unitofworkv2.py16
-rw-r--r--test/perf/large_flush.py84
8 files changed, 330 insertions, 53 deletions
diff --git a/CHANGES b/CHANGES
index c556ce6c6..f6beb1ab0 100644
--- a/CHANGES
+++ b/CHANGES
@@ -4,6 +4,29 @@
CHANGES
=======
+0.6uow_refactor
+===============
+- orm
+ - Unit of work internals have been rewritten. Units of work
+ with large numbers of objects interdependent objects
+ can now be flushed without recursion overflows
+ as there is no longer reliance upon recursive calls
+ [ticket:1081]. The number of internal structures now stays
+ constant for a particular session state, regardless of
+ how many relationships are present on mappings. The flow
+ of events now corresponds to a linear list of steps,
+ generated by the mappers and relationships based on actual
+ work to be done, filtered through a single topological sort
+ for correct ordering. Flush actions are assembled using
+ far fewer steps and less memory. [ticket:1742]
+
+ - one-to-many relationships now maintain a list of positive
+ parent-child associations within the flush, preventing
+ previous parents marked as deleted from cascading a
+ delete or NULL foreign key set on those child objects,
+ despite the end-user not removing the child from the old
+ association. [ticket:1764]
+
0.6.0
=====
diff --git a/lib/sqlalchemy/__init__.py b/lib/sqlalchemy/__init__.py
index 376b13e64..774ac24d9 100644
--- a/lib/sqlalchemy/__init__.py
+++ b/lib/sqlalchemy/__init__.py
@@ -114,6 +114,6 @@ from sqlalchemy.engine import create_engine, engine_from_config
__all__ = sorted(name for name, obj in locals().items()
if not (name.startswith('_') or inspect.ismodule(obj)))
-__version__ = '0.6beta3'
+__version__ = '0.6uow_refactor'
del inspect, sys
diff --git a/lib/sqlalchemy/orm/dependency.py b/lib/sqlalchemy/orm/dependency.py
index dd595a052..316c5382d 100644
--- a/lib/sqlalchemy/orm/dependency.py
+++ b/lib/sqlalchemy/orm/dependency.py
@@ -10,7 +10,7 @@
from sqlalchemy import sql, util
import sqlalchemy.exceptions as sa_exc
-from sqlalchemy.orm import attributes, exc, sync, unitofwork
+from sqlalchemy.orm import attributes, exc, sync, unitofwork, util as mapperutil
from sqlalchemy.orm.interfaces import ONETOMANY, MANYTOONE, MANYTOMANY
@@ -42,21 +42,13 @@ class DependencyProcessor(object):
"child are present" %
self.prop)
- def _get_instrumented_attribute(self):
- """Return the ``InstrumentedAttribute`` handled by this
- ``DependencyProecssor``.
-
- """
- return self.parent.class_manager.get_impl(self.key)
-
def hasparent(self, state):
"""return True if the given object instance has a parent,
according to the ``InstrumentedAttribute`` handled by this
``DependencyProcessor``.
"""
- # TODO: use correct API for this
- return self._get_instrumented_attribute().hasparent(state)
+ return self.parent.class_manager.get_impl(self.key).hasparent(state)
def per_property_preprocessors(self, uow):
"""establish actions and dependencies related to a flush.
@@ -326,7 +318,7 @@ class OneToManyDP(DependencyProcessor):
(parent_saves, after_save),
(after_save, child_saves),
(after_save, child_deletes),
-
+
(child_saves, parent_deletes),
(child_deletes, parent_deletes),
@@ -397,6 +389,7 @@ class OneToManyDP(DependencyProcessor):
uowcommit.register_object(child, isdelete=True)
else:
uowcommit.register_object(child)
+
if should_null_fks:
for child in history.unchanged:
if child is not None:
@@ -404,15 +397,23 @@ class OneToManyDP(DependencyProcessor):
def presort_saves(self, uowcommit, states):
+ children_added = uowcommit.memo(('children_added', self), set)
+
for state in states:
+ pks_changed = self._pks_changed(uowcommit, state)
+
history = uowcommit.get_attribute_history(
state,
self.key,
- passive=True)
+ passive=not pks_changed
+ or self.passive_updates)
if history:
for child in history.added:
if child is not None:
- uowcommit.register_object(child)
+ uowcommit.register_object(child, cancel_delete=True)
+
+ children_added.update(history.added)
+
for child in history.deleted:
if not self.cascade.delete_orphan:
uowcommit.register_object(child, isdelete=False)
@@ -422,11 +423,8 @@ class OneToManyDP(DependencyProcessor):
uowcommit.register_object(
attributes.instance_state(c),
isdelete=True)
- if self._pks_changed(uowcommit, state):
- if not history:
- history = uowcommit.get_attribute_history(
- state, self.key,
- passive=self.passive_updates)
+
+ if pks_changed:
if history:
for child in history.unchanged:
if child is not None:
@@ -442,6 +440,8 @@ class OneToManyDP(DependencyProcessor):
# safely for any cascade but is unnecessary if delete cascade
# is on.
if self.post_update or not self.passive_deletes == 'all':
+ children_added = uowcommit.memo(('children_added', self), set)
+
for state in states:
history = uowcommit.get_attribute_history(
state,
@@ -461,7 +461,8 @@ class OneToManyDP(DependencyProcessor):
uowcommit,
[state])
if self.post_update or not self.cascade.delete:
- for child in history.unchanged:
+ for child in set(history.unchanged).\
+ difference(children_added):
if child is not None:
self._synchronize(
state,
@@ -472,7 +473,11 @@ class OneToManyDP(DependencyProcessor):
child,
uowcommit,
[state])
-
+ # technically, we can even remove each child from the
+ # collection here too. but this would be a somewhat
+ # inconsistent behavior since it wouldn't happen if the old
+ # parent wasn't deleted but child was moved.
+
def process_saves(self, uowcommit, states):
for state in states:
history = uowcommit.get_attribute_history(state, self.key, passive=True)
@@ -731,10 +736,10 @@ class DetectKeySwitch(DependencyProcessor):
self._process_key_switches(states, uowcommit)
def _key_switchers(self, uow, states):
- if ('pk_switchers', self) in uow.attributes:
- switched, notswitched = uow.attributes[('pk_switchers', self)]
- else:
- uow.attributes[('pk_switchers', self)] = (switched, notswitched) = (set(), set())
+ switched, notswitched = uow.memo(
+ ('pk_switchers', self),
+ lambda: (set(), set())
+ )
allstates = switched.union(notswitched)
for s in states:
diff --git a/lib/sqlalchemy/orm/sync.py b/lib/sqlalchemy/orm/sync.py
index b9ddbb6e7..184ae8c84 100644
--- a/lib/sqlalchemy/orm/sync.py
+++ b/lib/sqlalchemy/orm/sync.py
@@ -75,8 +75,7 @@ def source_modified(uowcommit, source, source_mapper, synchronize_pairs):
except exc.UnmappedColumnError:
_raise_col_to_prop(False, source_mapper, l, None, r)
history = uowcommit.get_attribute_history(source, prop.key, passive=True)
- if len(history.deleted):
- return True
+ return bool(history.deleted)
else:
return False
diff --git a/lib/sqlalchemy/orm/unitofwork.py b/lib/sqlalchemy/orm/unitofwork.py
index 1095c1347..2c616943f 100644
--- a/lib/sqlalchemy/orm/unitofwork.py
+++ b/lib/sqlalchemy/orm/unitofwork.py
@@ -12,13 +12,12 @@ organizes them in order of dependency, and executes.
"""
-from sqlalchemy import util, log, topological
+from sqlalchemy import util, topological
from sqlalchemy.orm import attributes, interfaces
from sqlalchemy.orm import util as mapperutil
from sqlalchemy.orm.util import _state_mapper
# Load lazily
-object_session = None
_state_session = None
class UOWEventHandler(interfaces.AttributeExtension):
@@ -90,10 +89,10 @@ class UOWTransaction(object):
# as a parent.
self.mappers = util.defaultdict(set)
- # a set of Preprocess objects, which gather
+ # a dictionary of Preprocess objects, which gather
# additional states impacted by the flush
# and determine if a flush action is needed
- self.presort_actions = set()
+ self.presort_actions = {}
# dictionary of PostSortRec objects, each
# one issues work during the flush within
@@ -121,6 +120,13 @@ class UOWTransaction(object):
return state in self.states and self.states[state][0]
+ def memo(self, key, callable_):
+ if key in self.attributes:
+ return self.attributes[key]
+ else:
+ self.attributes[key] = ret = callable_()
+ return ret
+
def remove_state_actions(self, state):
"""remove pending actions for a state from the uowtransaction."""
@@ -139,10 +145,10 @@ class UOWTransaction(object):
# if the cached lookup was "passive" and now we want non-passive, do a non-passive
# lookup and re-cache
if cached_passive and not passive:
- history = attributes.get_state_history(state, key, passive=False)
+ history = state.get_history(key, passive=False)
self.attributes[hashkey] = (history, passive)
else:
- history = attributes.get_state_history(state, key, passive=passive)
+ history = state.get_history(key, passive=passive)
self.attributes[hashkey] = (history, passive)
if not history or not state.get_impl(key).uses_objects:
@@ -151,9 +157,12 @@ class UOWTransaction(object):
return history.as_state()
def register_preprocessor(self, processor, fromparent):
- self.presort_actions.add(Preprocess(processor, fromparent))
+ key = (processor, fromparent)
+ if key not in self.presort_actions:
+ self.presort_actions[key] = Preprocess(processor, fromparent)
- def register_object(self, state, isdelete=False, listonly=False):
+ def register_object(self, state, isdelete=False,
+ listonly=False, cancel_delete=False):
if not self.session._contains_state(state):
return
@@ -166,11 +175,8 @@ class UOWTransaction(object):
self.mappers[mapper].add(state)
self.states[state] = (isdelete, listonly)
else:
- existing_isdelete, existing_listonly = self.states[state]
- self.states[state] = (
- existing_isdelete or isdelete,
- existing_listonly and listonly
- )
+ if not listonly and (isdelete or cancel_delete):
+ self.states[state] = (isdelete, False)
def issue_post_update(self, state, post_update_cols):
mapper = state.manager.mapper.base_mapper
@@ -180,11 +186,23 @@ class UOWTransaction(object):
@util.memoized_property
def _mapper_for_dep(self):
+ """return a dynamic mapping of (Mapper, DependencyProcessor) to
+ True or False, indicating if the DependencyProcessor operates
+ on objects of that Mapper.
+
+ The result is stored in the dictionary persistently once
+ calculated.
+
+ """
return util.PopulateDict(
lambda tup:tup[0]._props.get(tup[1].key) is tup[1].prop
)
def filter_states_for_dep(self, dep, states):
+ """Filter the given list of InstanceStates to those relevant to the
+ given DependencyProcessor.
+
+ """
mapper_for_dep = self._mapper_for_dep
return [s for s in states if mapper_for_dep[(s.manager.mapper, dep)]]
@@ -196,12 +214,16 @@ class UOWTransaction(object):
yield state
def _generate_actions(self):
+ """Generate the full, unsorted collection of PostSortRecs as
+ well as dependency pairs for this UOWTransaction.
+
+ """
# execute presort_actions, until all states
# have been processed. a presort_action might
# add new states to the uow.
while True:
ret = False
- for action in list(self.presort_actions):
+ for action in list(self.presort_actions.values()):
if action.execute(self):
ret = True
if not ret:
@@ -211,7 +233,7 @@ class UOWTransaction(object):
self.cycles = cycles = topological.find_cycles(
self.dependencies,
self.postsort_actions.values())
-
+
if cycles:
# if yes, break the per-mapper actions into
# per-state actions
@@ -248,7 +270,7 @@ class UOWTransaction(object):
#sort = topological.sort(self.dependencies, postsort_actions)
#print "--------------"
#print self.dependencies
- #print postsort_actions
+ #print list(sort)
#print "COUNT OF POSTSORT ACTIONS", len(postsort_actions)
# execute
@@ -281,8 +303,6 @@ class UOWTransaction(object):
# debug... would like to see how many do this
self.session._register_newly_persistent(state)
-log.class_logger(UOWTransaction)
-
class IterateMappersMixin(object):
def _mappers(self, uow):
if self.fromparent:
diff --git a/test/orm/test_cascade.py b/test/orm/test_cascade.py
index a7152ecc1..7b07898a5 100644
--- a/test/orm/test_cascade.py
+++ b/test/orm/test_cascade.py
@@ -1156,7 +1156,6 @@ class UnsavedOrphansTest3(_base.MappedTest):
assert c not in s, "Should expunge customer when both parents are gone"
-
class DoubleParentOrphanTest(_base.MappedTest):
"""test orphan detection for an entity with two parent relationships"""
@@ -1276,6 +1275,151 @@ class CollectionAssignmentOrphanTest(_base.MappedTest):
eq_(sess.query(A).get(a1.id),
A(name='a1', bs=[B(name='b1'), B(name='b2'), B(name='b3')]))
+class O2MConflictTest(_base.MappedTest):
+ """test that O2M dependency detects a change in parent, does the
+ right thing, and even updates the collection/attribute.
+
+ """
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table("parent", metadata,
+ Column("id", Integer, primary_key=True, test_needs_autoincrement=True)
+ )
+ Table("child", metadata,
+ Column("id", Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('parent_id', Integer, ForeignKey('parent.id'), nullable=False)
+ )
+
+ @classmethod
+ def setup_classes(cls):
+ class Parent(_base.ComparableEntity):
+ pass
+ class Child(_base.ComparableEntity):
+ pass
+
+ @testing.resolve_artifact_names
+ def _do_delete_old_test(self):
+ sess = create_session()
+
+ p1, p2, c1 = Parent(), Parent(), Child()
+ if Parent.child.property.uselist:
+ p1.child.append(c1)
+ else:
+ p1.child = c1
+ sess.add_all([p1, c1])
+ sess.flush()
+
+ sess.delete(p1)
+
+ if Parent.child.property.uselist:
+ p2.child.append(c1)
+ else:
+ p2.child = c1
+ sess.add(p2)
+
+ sess.flush()
+ eq_(sess.query(Child).filter(Child.parent_id==p2.id).all(), [c1])
+
+ @testing.resolve_artifact_names
+ def _do_move_test(self):
+ sess = create_session()
+
+ p1, p2, c1 = Parent(), Parent(), Child()
+ if Parent.child.property.uselist:
+ p1.child.append(c1)
+ else:
+ p1.child = c1
+ sess.add_all([p1, c1])
+ sess.flush()
+
+ if Parent.child.property.uselist:
+ p2.child.append(c1)
+ else:
+ p2.child = c1
+ sess.add(p2)
+
+ sess.flush()
+ eq_(sess.query(Child).filter(Child.parent_id==p2.id).all(), [c1])
+
+ @testing.resolve_artifact_names
+ def test_o2o_delete_old(self):
+ mapper(Parent, parent, properties={
+ 'child':relationship(Child, uselist=False)
+ })
+ mapper(Child, child)
+ self._do_delete_old_test()
+ self._do_move_test()
+
+ @testing.resolve_artifact_names
+ def test_o2m_delete_old(self):
+ mapper(Parent, parent, properties={
+ 'child':relationship(Child, uselist=True)
+ })
+ mapper(Child, child)
+ self._do_delete_old_test()
+ self._do_move_test()
+
+ @testing.resolve_artifact_names
+ def test_o2o_backref_delete_old(self):
+ mapper(Parent, parent, properties={
+ 'child':relationship(Child, uselist=False, backref='parent')
+ })
+ mapper(Child, child)
+ self._do_delete_old_test()
+ self._do_move_test()
+
+ @testing.resolve_artifact_names
+ def test_o2o_delcascade_delete_old(self):
+ mapper(Parent, parent, properties={
+ 'child':relationship(Child, uselist=False, cascade="all, delete")
+ })
+ mapper(Child, child)
+ self._do_delete_old_test()
+ self._do_move_test()
+
+ @testing.resolve_artifact_names
+ def test_o2o_delorphan_delete_old(self):
+ mapper(Parent, parent, properties={
+ 'child':relationship(Child, uselist=False, cascade="all, delete, delete-orphan")
+ })
+ mapper(Child, child)
+ self._do_delete_old_test()
+ self._do_move_test()
+
+ @testing.resolve_artifact_names
+ def test_o2o_delorphan_backref_delete_old(self):
+ mapper(Parent, parent, properties={
+ 'child':relationship(Child, uselist=False,
+ cascade="all, delete, delete-orphan",
+ backref='parent')
+ })
+ mapper(Child, child)
+ self._do_delete_old_test()
+ self._do_move_test()
+
+ @testing.resolve_artifact_names
+ def test_o2o_backref_delorphan_delete_old(self):
+ mapper(Parent, parent)
+ mapper(Child, child, properties = {
+ 'parent' : relationship(Parent, uselist=False, single_parent=True,
+ backref=backref('child', uselist=False),
+ cascade="all,delete,delete-orphan")
+ })
+ self._do_delete_old_test()
+ self._do_move_test()
+
+ @testing.resolve_artifact_names
+ def test_o2m_backref_delorphan_delete_old(self):
+ mapper(Parent, parent)
+ mapper(Child, child, properties = {
+ 'parent' : relationship(Parent, uselist=False, single_parent=True,
+ backref=backref('child', uselist=True),
+ cascade="all,delete,delete-orphan")
+ })
+ self._do_delete_old_test()
+ self._do_move_test()
+
class PartialFlushTest(_base.MappedTest):
"""test cascade behavior as it relates to object lists passed to flush().
diff --git a/test/orm/test_unitofworkv2.py b/test/orm/test_unitofworkv2.py
index 7fef87d33..e28537b00 100644
--- a/test/orm/test_unitofworkv2.py
+++ b/test/orm/test_unitofworkv2.py
@@ -14,10 +14,7 @@ from test.orm._fixtures import keywords, addresses, Base, Keyword, \
composite_pk_table, CompositePk
class AssertsUOW(object):
- def _assert_uow_size(self,
- session,
- expected
- ):
+ def _get_test_uow(self, session):
uow = unitofwork.UOWTransaction(session)
deleted = set(session._deleted)
new = set(session._new)
@@ -26,6 +23,13 @@ class AssertsUOW(object):
uow.register_object(s)
for d in deleted:
uow.register_object(d, isdelete=True)
+ return uow
+
+ def _assert_uow_size(self,
+ session,
+ expected
+ ):
+ uow = self._get_test_uow(session)
postsort_actions = uow._generate_actions()
print postsort_actions
eq_(len(postsort_actions), expected, postsort_actions)
@@ -33,8 +37,6 @@ class AssertsUOW(object):
class UOWTest(_fixtures.FixtureTest, testing.AssertsExecutionResults, AssertsUOW):
run_inserts = None
-
-
class RudimentaryFlushTest(UOWTest):
def test_one_to_many_save(self):
@@ -215,7 +217,7 @@ class RudimentaryFlushTest(UOWTest):
{'id':u1.id}
),
)
-
+
def test_m2o_flush_size(self):
mapper(User, users)
mapper(Address, addresses, properties={
diff --git a/test/perf/large_flush.py b/test/perf/large_flush.py
new file mode 100644
index 000000000..431a28944
--- /dev/null
+++ b/test/perf/large_flush.py
@@ -0,0 +1,84 @@
+import sqlalchemy as sa
+from sqlalchemy import create_engine, MetaData, orm
+from sqlalchemy import Column, ForeignKey
+from sqlalchemy import Integer, String
+from sqlalchemy.orm import mapper
+from sqlalchemy.test import profiling
+
+class Object(object):
+ pass
+
+class Q(Object):
+ pass
+
+class A(Object):
+ pass
+
+class C(Object):
+ pass
+
+class WC(C):
+ pass
+
+engine = create_engine('sqlite:///:memory:', echo=True)
+
+sm = orm.sessionmaker(bind=engine)
+
+SA_Session = orm.scoped_session(sm)
+
+SA_Metadata = MetaData()
+
+object_table = sa.Table('Object',
+ SA_Metadata,
+ Column('ObjectID', Integer,primary_key=True),
+ Column('Type', String(1), nullable=False))
+
+q_table = sa.Table('Q',
+ SA_Metadata,
+ Column('QID', Integer, ForeignKey('Object.ObjectID'),primary_key=True))
+
+c_table = sa.Table('C',
+ SA_Metadata,
+ Column('CID', Integer, ForeignKey('Object.ObjectID'),primary_key=True))
+
+wc_table = sa.Table('WC',
+ SA_Metadata,
+ Column('WCID', Integer, ForeignKey('C.CID'), primary_key=True))
+
+a_table = sa.Table('A',
+ SA_Metadata,
+ Column('AID', Integer, ForeignKey('Object.ObjectID'),primary_key=True),
+ Column('QID', Integer, ForeignKey('Q.QID')),
+ Column('CID', Integer, ForeignKey('C.CID')))
+
+mapper(Object, object_table, polymorphic_on=object_table.c.Type, polymorphic_identity='O')
+
+mapper(Q, q_table, inherits=Object, polymorphic_identity='Q')
+mapper(C, c_table, inherits=Object, polymorphic_identity='C')
+mapper(WC, wc_table, inherits=C, polymorphic_identity='W')
+
+mapper(A, a_table, inherits=Object, polymorphic_identity='A',
+ properties = {
+ 'Q' : orm.relation(Q,primaryjoin=a_table.c.QID==q_table.c.QID,
+ backref='As'
+ ),
+ 'C' : orm.relation(C,primaryjoin=a_table.c.CID==c_table.c.CID,
+ backref='A',
+ uselist=False)
+ }
+ )
+
+SA_Metadata.create_all(engine)
+
+@profiling.profiled('large_flush', always=True, sort=['file'])
+def generate_error():
+ q = Q()
+ for j in range(100): #at 306 the error does not pop out (depending on recursion depth)
+ a = A()
+ a.Q = q
+ a.C = WC()
+
+ SA_Session.add(q)
+ SA_Session.commit() #here the error pops out
+
+generate_error() \ No newline at end of file