diff options
-rw-r--r-- | CHANGES | 23 | ||||
-rw-r--r-- | lib/sqlalchemy/__init__.py | 2 | ||||
-rw-r--r-- | lib/sqlalchemy/orm/dependency.py | 53 | ||||
-rw-r--r-- | lib/sqlalchemy/orm/sync.py | 3 | ||||
-rw-r--r-- | lib/sqlalchemy/orm/unitofwork.py | 56 | ||||
-rw-r--r-- | test/orm/test_cascade.py | 146 | ||||
-rw-r--r-- | test/orm/test_unitofworkv2.py | 16 | ||||
-rw-r--r-- | test/perf/large_flush.py | 84 |
8 files changed, 330 insertions, 53 deletions
@@ -4,6 +4,29 @@ CHANGES ======= +0.6uow_refactor +=============== +- orm + - Unit of work internals have been rewritten. Units of work + with large numbers of objects interdependent objects + can now be flushed without recursion overflows + as there is no longer reliance upon recursive calls + [ticket:1081]. The number of internal structures now stays + constant for a particular session state, regardless of + how many relationships are present on mappings. The flow + of events now corresponds to a linear list of steps, + generated by the mappers and relationships based on actual + work to be done, filtered through a single topological sort + for correct ordering. Flush actions are assembled using + far fewer steps and less memory. [ticket:1742] + + - one-to-many relationships now maintain a list of positive + parent-child associations within the flush, preventing + previous parents marked as deleted from cascading a + delete or NULL foreign key set on those child objects, + despite the end-user not removing the child from the old + association. [ticket:1764] + 0.6.0 ===== diff --git a/lib/sqlalchemy/__init__.py b/lib/sqlalchemy/__init__.py index 376b13e64..774ac24d9 100644 --- a/lib/sqlalchemy/__init__.py +++ b/lib/sqlalchemy/__init__.py @@ -114,6 +114,6 @@ from sqlalchemy.engine import create_engine, engine_from_config __all__ = sorted(name for name, obj in locals().items() if not (name.startswith('_') or inspect.ismodule(obj))) -__version__ = '0.6beta3' +__version__ = '0.6uow_refactor' del inspect, sys diff --git a/lib/sqlalchemy/orm/dependency.py b/lib/sqlalchemy/orm/dependency.py index dd595a052..316c5382d 100644 --- a/lib/sqlalchemy/orm/dependency.py +++ b/lib/sqlalchemy/orm/dependency.py @@ -10,7 +10,7 @@ from sqlalchemy import sql, util import sqlalchemy.exceptions as sa_exc -from sqlalchemy.orm import attributes, exc, sync, unitofwork +from sqlalchemy.orm import attributes, exc, sync, unitofwork, util as mapperutil from sqlalchemy.orm.interfaces import ONETOMANY, MANYTOONE, MANYTOMANY @@ -42,21 +42,13 @@ class DependencyProcessor(object): "child are present" % self.prop) - def _get_instrumented_attribute(self): - """Return the ``InstrumentedAttribute`` handled by this - ``DependencyProecssor``. - - """ - return self.parent.class_manager.get_impl(self.key) - def hasparent(self, state): """return True if the given object instance has a parent, according to the ``InstrumentedAttribute`` handled by this ``DependencyProcessor``. """ - # TODO: use correct API for this - return self._get_instrumented_attribute().hasparent(state) + return self.parent.class_manager.get_impl(self.key).hasparent(state) def per_property_preprocessors(self, uow): """establish actions and dependencies related to a flush. @@ -326,7 +318,7 @@ class OneToManyDP(DependencyProcessor): (parent_saves, after_save), (after_save, child_saves), (after_save, child_deletes), - + (child_saves, parent_deletes), (child_deletes, parent_deletes), @@ -397,6 +389,7 @@ class OneToManyDP(DependencyProcessor): uowcommit.register_object(child, isdelete=True) else: uowcommit.register_object(child) + if should_null_fks: for child in history.unchanged: if child is not None: @@ -404,15 +397,23 @@ class OneToManyDP(DependencyProcessor): def presort_saves(self, uowcommit, states): + children_added = uowcommit.memo(('children_added', self), set) + for state in states: + pks_changed = self._pks_changed(uowcommit, state) + history = uowcommit.get_attribute_history( state, self.key, - passive=True) + passive=not pks_changed + or self.passive_updates) if history: for child in history.added: if child is not None: - uowcommit.register_object(child) + uowcommit.register_object(child, cancel_delete=True) + + children_added.update(history.added) + for child in history.deleted: if not self.cascade.delete_orphan: uowcommit.register_object(child, isdelete=False) @@ -422,11 +423,8 @@ class OneToManyDP(DependencyProcessor): uowcommit.register_object( attributes.instance_state(c), isdelete=True) - if self._pks_changed(uowcommit, state): - if not history: - history = uowcommit.get_attribute_history( - state, self.key, - passive=self.passive_updates) + + if pks_changed: if history: for child in history.unchanged: if child is not None: @@ -442,6 +440,8 @@ class OneToManyDP(DependencyProcessor): # safely for any cascade but is unnecessary if delete cascade # is on. if self.post_update or not self.passive_deletes == 'all': + children_added = uowcommit.memo(('children_added', self), set) + for state in states: history = uowcommit.get_attribute_history( state, @@ -461,7 +461,8 @@ class OneToManyDP(DependencyProcessor): uowcommit, [state]) if self.post_update or not self.cascade.delete: - for child in history.unchanged: + for child in set(history.unchanged).\ + difference(children_added): if child is not None: self._synchronize( state, @@ -472,7 +473,11 @@ class OneToManyDP(DependencyProcessor): child, uowcommit, [state]) - + # technically, we can even remove each child from the + # collection here too. but this would be a somewhat + # inconsistent behavior since it wouldn't happen if the old + # parent wasn't deleted but child was moved. + def process_saves(self, uowcommit, states): for state in states: history = uowcommit.get_attribute_history(state, self.key, passive=True) @@ -731,10 +736,10 @@ class DetectKeySwitch(DependencyProcessor): self._process_key_switches(states, uowcommit) def _key_switchers(self, uow, states): - if ('pk_switchers', self) in uow.attributes: - switched, notswitched = uow.attributes[('pk_switchers', self)] - else: - uow.attributes[('pk_switchers', self)] = (switched, notswitched) = (set(), set()) + switched, notswitched = uow.memo( + ('pk_switchers', self), + lambda: (set(), set()) + ) allstates = switched.union(notswitched) for s in states: diff --git a/lib/sqlalchemy/orm/sync.py b/lib/sqlalchemy/orm/sync.py index b9ddbb6e7..184ae8c84 100644 --- a/lib/sqlalchemy/orm/sync.py +++ b/lib/sqlalchemy/orm/sync.py @@ -75,8 +75,7 @@ def source_modified(uowcommit, source, source_mapper, synchronize_pairs): except exc.UnmappedColumnError: _raise_col_to_prop(False, source_mapper, l, None, r) history = uowcommit.get_attribute_history(source, prop.key, passive=True) - if len(history.deleted): - return True + return bool(history.deleted) else: return False diff --git a/lib/sqlalchemy/orm/unitofwork.py b/lib/sqlalchemy/orm/unitofwork.py index 1095c1347..2c616943f 100644 --- a/lib/sqlalchemy/orm/unitofwork.py +++ b/lib/sqlalchemy/orm/unitofwork.py @@ -12,13 +12,12 @@ organizes them in order of dependency, and executes. """ -from sqlalchemy import util, log, topological +from sqlalchemy import util, topological from sqlalchemy.orm import attributes, interfaces from sqlalchemy.orm import util as mapperutil from sqlalchemy.orm.util import _state_mapper # Load lazily -object_session = None _state_session = None class UOWEventHandler(interfaces.AttributeExtension): @@ -90,10 +89,10 @@ class UOWTransaction(object): # as a parent. self.mappers = util.defaultdict(set) - # a set of Preprocess objects, which gather + # a dictionary of Preprocess objects, which gather # additional states impacted by the flush # and determine if a flush action is needed - self.presort_actions = set() + self.presort_actions = {} # dictionary of PostSortRec objects, each # one issues work during the flush within @@ -121,6 +120,13 @@ class UOWTransaction(object): return state in self.states and self.states[state][0] + def memo(self, key, callable_): + if key in self.attributes: + return self.attributes[key] + else: + self.attributes[key] = ret = callable_() + return ret + def remove_state_actions(self, state): """remove pending actions for a state from the uowtransaction.""" @@ -139,10 +145,10 @@ class UOWTransaction(object): # if the cached lookup was "passive" and now we want non-passive, do a non-passive # lookup and re-cache if cached_passive and not passive: - history = attributes.get_state_history(state, key, passive=False) + history = state.get_history(key, passive=False) self.attributes[hashkey] = (history, passive) else: - history = attributes.get_state_history(state, key, passive=passive) + history = state.get_history(key, passive=passive) self.attributes[hashkey] = (history, passive) if not history or not state.get_impl(key).uses_objects: @@ -151,9 +157,12 @@ class UOWTransaction(object): return history.as_state() def register_preprocessor(self, processor, fromparent): - self.presort_actions.add(Preprocess(processor, fromparent)) + key = (processor, fromparent) + if key not in self.presort_actions: + self.presort_actions[key] = Preprocess(processor, fromparent) - def register_object(self, state, isdelete=False, listonly=False): + def register_object(self, state, isdelete=False, + listonly=False, cancel_delete=False): if not self.session._contains_state(state): return @@ -166,11 +175,8 @@ class UOWTransaction(object): self.mappers[mapper].add(state) self.states[state] = (isdelete, listonly) else: - existing_isdelete, existing_listonly = self.states[state] - self.states[state] = ( - existing_isdelete or isdelete, - existing_listonly and listonly - ) + if not listonly and (isdelete or cancel_delete): + self.states[state] = (isdelete, False) def issue_post_update(self, state, post_update_cols): mapper = state.manager.mapper.base_mapper @@ -180,11 +186,23 @@ class UOWTransaction(object): @util.memoized_property def _mapper_for_dep(self): + """return a dynamic mapping of (Mapper, DependencyProcessor) to + True or False, indicating if the DependencyProcessor operates + on objects of that Mapper. + + The result is stored in the dictionary persistently once + calculated. + + """ return util.PopulateDict( lambda tup:tup[0]._props.get(tup[1].key) is tup[1].prop ) def filter_states_for_dep(self, dep, states): + """Filter the given list of InstanceStates to those relevant to the + given DependencyProcessor. + + """ mapper_for_dep = self._mapper_for_dep return [s for s in states if mapper_for_dep[(s.manager.mapper, dep)]] @@ -196,12 +214,16 @@ class UOWTransaction(object): yield state def _generate_actions(self): + """Generate the full, unsorted collection of PostSortRecs as + well as dependency pairs for this UOWTransaction. + + """ # execute presort_actions, until all states # have been processed. a presort_action might # add new states to the uow. while True: ret = False - for action in list(self.presort_actions): + for action in list(self.presort_actions.values()): if action.execute(self): ret = True if not ret: @@ -211,7 +233,7 @@ class UOWTransaction(object): self.cycles = cycles = topological.find_cycles( self.dependencies, self.postsort_actions.values()) - + if cycles: # if yes, break the per-mapper actions into # per-state actions @@ -248,7 +270,7 @@ class UOWTransaction(object): #sort = topological.sort(self.dependencies, postsort_actions) #print "--------------" #print self.dependencies - #print postsort_actions + #print list(sort) #print "COUNT OF POSTSORT ACTIONS", len(postsort_actions) # execute @@ -281,8 +303,6 @@ class UOWTransaction(object): # debug... would like to see how many do this self.session._register_newly_persistent(state) -log.class_logger(UOWTransaction) - class IterateMappersMixin(object): def _mappers(self, uow): if self.fromparent: diff --git a/test/orm/test_cascade.py b/test/orm/test_cascade.py index a7152ecc1..7b07898a5 100644 --- a/test/orm/test_cascade.py +++ b/test/orm/test_cascade.py @@ -1156,7 +1156,6 @@ class UnsavedOrphansTest3(_base.MappedTest): assert c not in s, "Should expunge customer when both parents are gone" - class DoubleParentOrphanTest(_base.MappedTest): """test orphan detection for an entity with two parent relationships""" @@ -1276,6 +1275,151 @@ class CollectionAssignmentOrphanTest(_base.MappedTest): eq_(sess.query(A).get(a1.id), A(name='a1', bs=[B(name='b1'), B(name='b2'), B(name='b3')])) +class O2MConflictTest(_base.MappedTest): + """test that O2M dependency detects a change in parent, does the + right thing, and even updates the collection/attribute. + + """ + + @classmethod + def define_tables(cls, metadata): + Table("parent", metadata, + Column("id", Integer, primary_key=True, test_needs_autoincrement=True) + ) + Table("child", metadata, + Column("id", Integer, primary_key=True, test_needs_autoincrement=True), + Column('parent_id', Integer, ForeignKey('parent.id'), nullable=False) + ) + + @classmethod + def setup_classes(cls): + class Parent(_base.ComparableEntity): + pass + class Child(_base.ComparableEntity): + pass + + @testing.resolve_artifact_names + def _do_delete_old_test(self): + sess = create_session() + + p1, p2, c1 = Parent(), Parent(), Child() + if Parent.child.property.uselist: + p1.child.append(c1) + else: + p1.child = c1 + sess.add_all([p1, c1]) + sess.flush() + + sess.delete(p1) + + if Parent.child.property.uselist: + p2.child.append(c1) + else: + p2.child = c1 + sess.add(p2) + + sess.flush() + eq_(sess.query(Child).filter(Child.parent_id==p2.id).all(), [c1]) + + @testing.resolve_artifact_names + def _do_move_test(self): + sess = create_session() + + p1, p2, c1 = Parent(), Parent(), Child() + if Parent.child.property.uselist: + p1.child.append(c1) + else: + p1.child = c1 + sess.add_all([p1, c1]) + sess.flush() + + if Parent.child.property.uselist: + p2.child.append(c1) + else: + p2.child = c1 + sess.add(p2) + + sess.flush() + eq_(sess.query(Child).filter(Child.parent_id==p2.id).all(), [c1]) + + @testing.resolve_artifact_names + def test_o2o_delete_old(self): + mapper(Parent, parent, properties={ + 'child':relationship(Child, uselist=False) + }) + mapper(Child, child) + self._do_delete_old_test() + self._do_move_test() + + @testing.resolve_artifact_names + def test_o2m_delete_old(self): + mapper(Parent, parent, properties={ + 'child':relationship(Child, uselist=True) + }) + mapper(Child, child) + self._do_delete_old_test() + self._do_move_test() + + @testing.resolve_artifact_names + def test_o2o_backref_delete_old(self): + mapper(Parent, parent, properties={ + 'child':relationship(Child, uselist=False, backref='parent') + }) + mapper(Child, child) + self._do_delete_old_test() + self._do_move_test() + + @testing.resolve_artifact_names + def test_o2o_delcascade_delete_old(self): + mapper(Parent, parent, properties={ + 'child':relationship(Child, uselist=False, cascade="all, delete") + }) + mapper(Child, child) + self._do_delete_old_test() + self._do_move_test() + + @testing.resolve_artifact_names + def test_o2o_delorphan_delete_old(self): + mapper(Parent, parent, properties={ + 'child':relationship(Child, uselist=False, cascade="all, delete, delete-orphan") + }) + mapper(Child, child) + self._do_delete_old_test() + self._do_move_test() + + @testing.resolve_artifact_names + def test_o2o_delorphan_backref_delete_old(self): + mapper(Parent, parent, properties={ + 'child':relationship(Child, uselist=False, + cascade="all, delete, delete-orphan", + backref='parent') + }) + mapper(Child, child) + self._do_delete_old_test() + self._do_move_test() + + @testing.resolve_artifact_names + def test_o2o_backref_delorphan_delete_old(self): + mapper(Parent, parent) + mapper(Child, child, properties = { + 'parent' : relationship(Parent, uselist=False, single_parent=True, + backref=backref('child', uselist=False), + cascade="all,delete,delete-orphan") + }) + self._do_delete_old_test() + self._do_move_test() + + @testing.resolve_artifact_names + def test_o2m_backref_delorphan_delete_old(self): + mapper(Parent, parent) + mapper(Child, child, properties = { + 'parent' : relationship(Parent, uselist=False, single_parent=True, + backref=backref('child', uselist=True), + cascade="all,delete,delete-orphan") + }) + self._do_delete_old_test() + self._do_move_test() + class PartialFlushTest(_base.MappedTest): """test cascade behavior as it relates to object lists passed to flush(). diff --git a/test/orm/test_unitofworkv2.py b/test/orm/test_unitofworkv2.py index 7fef87d33..e28537b00 100644 --- a/test/orm/test_unitofworkv2.py +++ b/test/orm/test_unitofworkv2.py @@ -14,10 +14,7 @@ from test.orm._fixtures import keywords, addresses, Base, Keyword, \ composite_pk_table, CompositePk class AssertsUOW(object): - def _assert_uow_size(self, - session, - expected - ): + def _get_test_uow(self, session): uow = unitofwork.UOWTransaction(session) deleted = set(session._deleted) new = set(session._new) @@ -26,6 +23,13 @@ class AssertsUOW(object): uow.register_object(s) for d in deleted: uow.register_object(d, isdelete=True) + return uow + + def _assert_uow_size(self, + session, + expected + ): + uow = self._get_test_uow(session) postsort_actions = uow._generate_actions() print postsort_actions eq_(len(postsort_actions), expected, postsort_actions) @@ -33,8 +37,6 @@ class AssertsUOW(object): class UOWTest(_fixtures.FixtureTest, testing.AssertsExecutionResults, AssertsUOW): run_inserts = None - - class RudimentaryFlushTest(UOWTest): def test_one_to_many_save(self): @@ -215,7 +217,7 @@ class RudimentaryFlushTest(UOWTest): {'id':u1.id} ), ) - + def test_m2o_flush_size(self): mapper(User, users) mapper(Address, addresses, properties={ diff --git a/test/perf/large_flush.py b/test/perf/large_flush.py new file mode 100644 index 000000000..431a28944 --- /dev/null +++ b/test/perf/large_flush.py @@ -0,0 +1,84 @@ +import sqlalchemy as sa +from sqlalchemy import create_engine, MetaData, orm +from sqlalchemy import Column, ForeignKey +from sqlalchemy import Integer, String +from sqlalchemy.orm import mapper +from sqlalchemy.test import profiling + +class Object(object): + pass + +class Q(Object): + pass + +class A(Object): + pass + +class C(Object): + pass + +class WC(C): + pass + +engine = create_engine('sqlite:///:memory:', echo=True) + +sm = orm.sessionmaker(bind=engine) + +SA_Session = orm.scoped_session(sm) + +SA_Metadata = MetaData() + +object_table = sa.Table('Object', + SA_Metadata, + Column('ObjectID', Integer,primary_key=True), + Column('Type', String(1), nullable=False)) + +q_table = sa.Table('Q', + SA_Metadata, + Column('QID', Integer, ForeignKey('Object.ObjectID'),primary_key=True)) + +c_table = sa.Table('C', + SA_Metadata, + Column('CID', Integer, ForeignKey('Object.ObjectID'),primary_key=True)) + +wc_table = sa.Table('WC', + SA_Metadata, + Column('WCID', Integer, ForeignKey('C.CID'), primary_key=True)) + +a_table = sa.Table('A', + SA_Metadata, + Column('AID', Integer, ForeignKey('Object.ObjectID'),primary_key=True), + Column('QID', Integer, ForeignKey('Q.QID')), + Column('CID', Integer, ForeignKey('C.CID'))) + +mapper(Object, object_table, polymorphic_on=object_table.c.Type, polymorphic_identity='O') + +mapper(Q, q_table, inherits=Object, polymorphic_identity='Q') +mapper(C, c_table, inherits=Object, polymorphic_identity='C') +mapper(WC, wc_table, inherits=C, polymorphic_identity='W') + +mapper(A, a_table, inherits=Object, polymorphic_identity='A', + properties = { + 'Q' : orm.relation(Q,primaryjoin=a_table.c.QID==q_table.c.QID, + backref='As' + ), + 'C' : orm.relation(C,primaryjoin=a_table.c.CID==c_table.c.CID, + backref='A', + uselist=False) + } + ) + +SA_Metadata.create_all(engine) + +@profiling.profiled('large_flush', always=True, sort=['file']) +def generate_error(): + q = Q() + for j in range(100): #at 306 the error does not pop out (depending on recursion depth) + a = A() + a.Q = q + a.C = WC() + + SA_Session.add(q) + SA_Session.commit() #here the error pops out + +generate_error()
\ No newline at end of file |