summaryrefslogtreecommitdiff
path: root/pint/facets/group/objects.py
blob: 558a107512da45783109b200fc70bb4f25f3a7ad (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
"""
    pint.facets.group.objects
    ~~~~~~~~~~~~~~~~~~~~~~~~~

    :copyright: 2022 by Pint Authors, see AUTHORS for more details.
    :license: BSD, see LICENSE for more details.
"""

from __future__ import annotations

from ...util import SharedRegistryObject, getattr_maybe_raise
from .definitions import GroupDefinition


class Group(SharedRegistryObject):
    """A group is a set of units.

    Units can be added directly or by including other groups.

    Members are computed dynamically, that is if a unit is added to a group X
    all groups that include X are affected.

    The group belongs to one Registry.

    See GroupDefinition for the definition file syntax.
    """

    def __init__(self, name):
        """
        :param name: Name of the group. If not given, a root Group will be created.
        :type name: str
        :param groups: dictionary like object groups and system.
                        The newly created group will be added after creation.
        :type groups: dict[str | Group]
        """

        # The name of the group.
        #: type: str
        self.name = name

        #: Names of the units in this group.
        #: :type: set[str]
        self._unit_names = set()

        #: Names of the groups in this group.
        #: :type: set[str]
        self._used_groups = set()

        #: Names of the groups in which this group is contained.
        #: :type: set[str]
        self._used_by = set()

        # Add this group to the group dictionary
        self._REGISTRY._groups[self.name] = self

        if name != "root":
            # All groups are added to root group
            self._REGISTRY._groups["root"].add_groups(name)

        #: A cache of the included units.
        #: None indicates that the cache has been invalidated.
        #: :type: frozenset[str] | None
        self._computed_members = None

    @property
    def members(self):
        """Names of the units that are members of the group.

        Calculated to include to all units in all included _used_groups.

        """
        if self._computed_members is None:
            self._computed_members = set(self._unit_names)

            for _, group in self.iter_used_groups():
                self._computed_members |= group.members

            self._computed_members = frozenset(self._computed_members)

        return self._computed_members

    def invalidate_members(self):
        """Invalidate computed members in this Group and all parent nodes."""
        self._computed_members = None
        d = self._REGISTRY._groups
        for name in self._used_by:
            d[name].invalidate_members()

    def iter_used_groups(self):
        pending = set(self._used_groups)
        d = self._REGISTRY._groups
        while pending:
            name = pending.pop()
            group = d[name]
            pending |= group._used_groups
            yield name, d[name]

    def is_used_group(self, group_name):
        for name, _ in self.iter_used_groups():
            if name == group_name:
                return True
        return False

    def add_units(self, *unit_names):
        """Add units to group."""
        for unit_name in unit_names:
            self._unit_names.add(unit_name)

        self.invalidate_members()

    @property
    def non_inherited_unit_names(self):
        return frozenset(self._unit_names)

    def remove_units(self, *unit_names):
        """Remove units from group."""
        for unit_name in unit_names:
            self._unit_names.remove(unit_name)

        self.invalidate_members()

    def add_groups(self, *group_names):
        """Add groups to group."""
        d = self._REGISTRY._groups
        for group_name in group_names:
            grp = d[group_name]

            if grp.is_used_group(self.name):
                raise ValueError(
                    "Cyclic relationship found between %s and %s"
                    % (self.name, group_name)
                )

            self._used_groups.add(group_name)
            grp._used_by.add(self.name)

        self.invalidate_members()

    def remove_groups(self, *group_names):
        """Remove groups from group."""
        d = self._REGISTRY._groups
        for group_name in group_names:
            grp = d[group_name]

            self._used_groups.remove(group_name)
            grp._used_by.remove(self.name)

        self.invalidate_members()

    @classmethod
    def from_lines(cls, lines, define_func, non_int_type=float) -> Group:
        """Return a Group object parsing an iterable of lines.

        Parameters
        ----------
        lines : list[str]
            iterable
        define_func : callable
            Function to define a unit in the registry; it must accept a single string as
            a parameter.

        Returns
        -------

        """
        group_definition = GroupDefinition.from_lines(lines, non_int_type)
        return cls.from_definition(group_definition, define_func)

    @classmethod
    def from_definition(
        cls, group_definition: GroupDefinition, add_unit_func=None
    ) -> Group:
        grp = cls(group_definition.name)

        add_unit_func = add_unit_func or grp._REGISTRY._add_unit

        # We first add all units defined within the group
        # to the registry.
        for definition in group_definition.definitions:
            add_unit_func(definition)

        # Then we add all units defined within the group
        # to this group (by name)
        grp.add_units(*group_definition.unit_names)

        # Finally, we add all grou0ps used by this group
        # tho this group (by name)
        if group_definition.using_group_names:
            grp.add_groups(*group_definition.using_group_names)

        return grp

    def __getattr__(self, item):
        getattr_maybe_raise(self, item)
        return self._REGISTRY