1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
|
"""'dynamic' collection API. returns Query() objects on the 'read' side, alters
a special AttributeHistory on the 'write' side."""
from sqlalchemy import exceptions
from sqlalchemy.orm import attributes, object_session
from sqlalchemy.orm.query import Query
from sqlalchemy.orm.mapper import has_identity, object_mapper
class DynamicCollectionAttribute(attributes.InstrumentedAttribute):
def __init__(self, class_, attribute_manager, key, typecallable, target_mapper, **kwargs):
super(DynamicCollectionAttribute, self).__init__(class_, attribute_manager, key, typecallable, **kwargs)
self.target_mapper = target_mapper
def get(self, obj, passive=False):
if passive:
return self.get_history(obj, passive=True).added_items()
else:
return AppenderQuery(self, obj)
def commit_to_state(self, state, obj, value=attributes.NO_VALUE):
# we have our own AttributeHistory therefore dont need CommittedState
# instead, we reset the history stored on the attribute
obj.__dict__[self.key] = CollectionHistory(self, obj)
def set(self, obj, value, initiator):
if initiator is self:
return
state = obj._state
old_collection = self.get(obj).assign(value)
# TODO: emit events ???
state['modified'] = True
def delete(self, *args, **kwargs):
raise NotImplementedError()
def get_history(self, obj, passive=False):
try:
return obj.__dict__[self.key]
except KeyError:
obj.__dict__[self.key] = c = CollectionHistory(self, obj)
return c
def append(self, obj, value, initiator):
if initiator is not self:
self.get_history(obj)._added_items.append(value)
self.fire_append_event(obj, value, self)
def remove(self, obj, value, initiator):
if initiator is not self:
self.get_history(obj)._deleted_items.append(value)
self.fire_remove_event(obj, value, self)
class AppenderQuery(Query):
def __init__(self, attr, instance):
super(AppenderQuery, self).__init__(attr.target_mapper, None)
self.instance = instance
self.attr = attr
def __session(self):
sess = object_session(self.instance)
if sess is not None and self.instance in sess and sess.autoflush:
sess.flush()
if not has_identity(self.instance):
return None
else:
return sess
def __len__(self):
sess = self.__session()
if sess is None:
return len(self.attr.get_history(self.instance)._added_items)
else:
return self._clone(sess).count()
def __iter__(self):
sess = self.__session()
if sess is None:
return iter(self.attr.get_history(self.instance)._added_items)
else:
return iter(self._clone(sess))
def __getitem__(self, index):
sess = self.__session()
if sess is None:
return self.attr.get_history(self.instance)._added_items.__getitem__(index)
else:
return self._clone(sess).__getitem__(index)
def _clone(self, sess=None):
# note we're returning an entirely new Query class instance here
# without any assignment capabilities;
# the class of this query is determined by the session.
if sess is None:
sess = object_session(self.instance)
if sess is None:
try:
sess = object_mapper(self.instance).get_session()
except exceptions.InvalidRequestError:
raise exceptions.InvalidRequestError("Parent instance %s is not bound to a Session, and no contextual session is established; lazy load operation of attribute '%s' cannot proceed" % (self.instance.__class__, self.key))
return sess.query(self.attr.target_mapper).with_parent(self.instance)
def assign(self, collection):
if has_identity(self.instance):
oldlist = list(self)
else:
oldlist = []
self.attr.get_history(self.instance).replace(oldlist, collection)
return oldlist
def append(self, item):
self.attr.append(self.instance, item, None)
def remove(self, item):
self.attr.remove(self.instance, item, None)
class CollectionHistory(attributes.AttributeHistory):
"""Overrides AttributeHistory to receive append/remove events directly."""
def __init__(self, attr, obj):
self._deleted_items = []
self._added_items = []
self._unchanged_items = []
self._obj = obj
def replace(self, olditems, newitems):
self._added_items = newitems
self._deleted_items = olditems
def is_modified(self):
return len(self._deleted_items) > 0 or len(self._added_items) > 0
def added_items(self):
return self._added_items
def unchanged_items(self):
return self._unchanged_items
def deleted_items(self):
return self._deleted_items
|