diff options
Diffstat (limited to 'lib/sqlalchemy/sql/traversals.py')
-rw-r--r-- | lib/sqlalchemy/sql/traversals.py | 68 |
1 files changed, 65 insertions, 3 deletions
diff --git a/lib/sqlalchemy/sql/traversals.py b/lib/sqlalchemy/sql/traversals.py index f41480a94..cb38df6af 100644 --- a/lib/sqlalchemy/sql/traversals.py +++ b/lib/sqlalchemy/sql/traversals.py @@ -190,7 +190,10 @@ class HasCacheKey(object): # statements, not so much, but they usually won't have # annotations. result += self._annotations_cache_key - elif meth is InternalTraversal.dp_clauseelement_list: + elif ( + meth is InternalTraversal.dp_clauseelement_list + or meth is InternalTraversal.dp_clauseelement_tuple + ): result += ( attrname, tuple( @@ -390,6 +393,7 @@ class _CacheKey(ExtendedInternalTraversal): visit_has_cache_key = visit_clauseelement = CALL_GEN_CACHE_KEY visit_clauseelement_list = InternalTraversal.dp_clauseelement_list visit_annotations_key = InternalTraversal.dp_annotations_key + visit_clauseelement_tuple = InternalTraversal.dp_clauseelement_tuple visit_string = ( visit_boolean @@ -451,6 +455,8 @@ class _CacheKey(ExtendedInternalTraversal): tuple(elem._gen_cache_key(anon_map, bindparams) for elem in obj), ) + visit_executable_options = visit_has_cache_key_list + def visit_inspectable_list( self, attrname, obj, parent, anon_map, bindparams ): @@ -682,6 +688,41 @@ class _CacheKey(ExtendedInternalTraversal): _cache_key_traversal_visitor = _CacheKey() +class HasCopyInternals(object): + def _clone(self, **kw): + raise NotImplementedError() + + def _copy_internals(self, omit_attrs=(), **kw): + """Reassign internal elements to be clones of themselves. + + Called during a copy-and-traverse operation on newly + shallow-copied elements to create a deep copy. + + The given clone function should be used, which may be applying + additional transformations to the element (i.e. replacement + traversal, cloned traversal, annotations). + + """ + + try: + traverse_internals = self._traverse_internals + except AttributeError: + # user-defined classes may not have a _traverse_internals + return + + for attrname, obj, meth in _copy_internals.run_generated_dispatch( + self, traverse_internals, "_generated_copy_internals_traversal" + ): + if attrname in omit_attrs: + continue + + if obj is not None: + + result = meth(attrname, self, obj, **kw) + if result is not None: + setattr(self, attrname, result) + + class _CopyInternals(InternalTraversal): """Generate a _copy_internals internal traversal dispatch for classes with a _traverse_internals collection.""" @@ -696,6 +737,16 @@ class _CopyInternals(InternalTraversal): ): return [clone(clause, **kw) for clause in element] + def visit_clauseelement_tuple( + self, attrname, parent, element, clone=_clone, **kw + ): + return tuple([clone(clause, **kw) for clause in element]) + + def visit_executable_options( + self, attrname, parent, element, clone=_clone, **kw + ): + return tuple([clone(clause, **kw) for clause in element]) + def visit_clauseelement_unordered_set( self, attrname, parent, element, clone=_clone, **kw ): @@ -817,6 +868,9 @@ class _GetChildren(InternalTraversal): def visit_clauseelement_list(self, element, **kw): return element + def visit_clauseelement_tuple(self, element, **kw): + return element + def visit_clauseelement_tuples(self, element, **kw): return itertools.chain.from_iterable(element) @@ -840,8 +894,8 @@ class _GetChildren(InternalTraversal): if not isinstance(target, str): yield _flatten_clauseelement(target) - # if onclause is not None and not isinstance(onclause, str): - # yield _flatten_clauseelement(onclause) + if onclause is not None and not isinstance(onclause, str): + yield _flatten_clauseelement(onclause) def visit_dml_ordered_values(self, element, **kw): for k, v in element: @@ -1015,6 +1069,8 @@ class TraversalComparatorStrategy(InternalTraversal, util.MemoizedSlots): ): return COMPARE_FAILED + visit_executable_options = visit_has_cache_key_list + def visit_clauseelement( self, attrname, left_parent, left, right_parent, right, **kw ): @@ -1057,6 +1113,12 @@ class TraversalComparatorStrategy(InternalTraversal, util.MemoizedSlots): for l, r in util.zip_longest(left, right, fillvalue=None): self.stack.append((l, r)) + def visit_clauseelement_tuple( + self, attrname, left_parent, left, right_parent, right, **kw + ): + for l, r in util.zip_longest(left, right, fillvalue=None): + self.stack.append((l, r)) + def _compare_unordered_sequences(self, seq1, seq2, **kw): if seq1 is None: return seq2 is None |