summaryrefslogtreecommitdiff
path: root/neutron/agent/common/resource_processing_queue.py
blob: 56878e9f73bd8c10eff8c924b4c9141f7c5fdf4b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# Copyright 2014 Hewlett-Packard Development Company, L.P.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.
#

import datetime

from oslo_utils import timeutils
from six.moves import queue as Queue

# Lower value is higher priority
PRIORITY_RELATED_ROUTER = 0
PRIORITY_RPC = 1
PRIORITY_SYNC_ROUTERS_TASK = 2
PRIORITY_PD_UPDATE = 3

# Actions
DELETE_ROUTER = 1
DELETE_RELATED_ROUTER = 2
ADD_UPDATE_ROUTER = 3
ADD_UPDATE_RELATED_ROUTER = 4
PD_UPDATE = 5

RELATED_ACTION_MAP = {DELETE_ROUTER: DELETE_RELATED_ROUTER,
                      ADD_UPDATE_ROUTER: ADD_UPDATE_RELATED_ROUTER}


class ResourceUpdate(object):
    """Encapsulates a resource update

    An instance of this object carries the information necessary to prioritize
    and process a request to update a resource.

    Priority values are ordered from higher (0) to lower (>0) by the caller,
    and are therefore not defined here, but must be done by the consumer.
    """
    def __init__(self, id, priority,
                 action=None, resource=None, timestamp=None, tries=5):
        self.priority = priority
        self.timestamp = timestamp
        if not timestamp:
            self.timestamp = timeutils.utcnow()
        self.id = id
        self.action = action
        self.resource = resource
        self.tries = tries

    def __lt__(self, other):
        """Implements priority among updates

        Lower numerical priority always gets precedence.  When comparing two
        updates of the same priority then the one with the earlier timestamp
        gets precedence.  In the unlikely event that the timestamps are also
        equal it falls back to a simple comparison of ids meaning the
        precedence is essentially random.
        """
        if self.priority != other.priority:
            return self.priority < other.priority
        if self.timestamp != other.timestamp:
            return self.timestamp < other.timestamp
        return self.id < other.id

    def hit_retry_limit(self):
        return self.tries < 0


class ExclusiveResourceProcessor(object):
    """Manager for access to a resource for processing

    This class controls access to a resource in a non-blocking way.  The first
    instance to be created for a given ID is granted exclusive access to
    the resource.

    Other instances may be created for the same ID while the first
    instance has exclusive access.  If that happens then it doesn't block and
    wait for access.  Instead, it signals to the master instance that an update
    came in with the timestamp.

    This way, a thread will not block to wait for access to a resource.
    Instead it effectively signals to the thread that is working on the
    resource that something has changed since it started working on it.
    That thread will simply finish its current iteration and then repeat.

    This class keeps track of the last time that resource data was fetched and
    processed.  The timestamp that it keeps must be before when the data used
    to process the resource last was fetched from the database.  But, as close
    as possible.  The timestamp should not be recorded, however, until the
    resource has been processed using the fetch data.
    """
    _masters = {}
    _resource_timestamps = {}

    def __init__(self, id):
        self._id = id

        if id not in self._masters:
            self._masters[id] = self
            self._queue = Queue.PriorityQueue(-1)

        self._master = self._masters[id]

    def _i_am_master(self):
        return self == self._master

    def __enter__(self):
        return self

    def __exit__(self, type, value, traceback):
        if self._i_am_master():
            del self._masters[self._id]

    def _get_resource_data_timestamp(self):
        return self._resource_timestamps.get(self._id,
                                             datetime.datetime.min)

    def fetched_and_processed(self, timestamp):
        """Records the timestamp after it is used to update the resource"""
        new_timestamp = max(timestamp, self._get_resource_data_timestamp())
        self._resource_timestamps[self._id] = new_timestamp

    def queue_update(self, update):
        """Queues an update from a worker

        This is the queue used to keep new updates that come in while a
        resource is being processed.  These updates have already bubbled to
        the front of the ResourceProcessingQueue.
        """
        self._master._queue.put(update)

    def updates(self):
        """Processes the resource until updates stop coming

        Only the master instance will process the resource.  However, updates
        may come in from other workers while it is in progress.  This method
        loops until they stop coming.
        """
        while self._i_am_master():
            if self._queue.empty():
                return
            # Get the update from the queue even if it is old.
            update = self._queue.get()
            # Process the update only if it is fresh.
            if self._get_resource_data_timestamp() < update.timestamp:
                yield update


class ResourceProcessingQueue(object):
    """Manager of the queue of resources to process."""
    def __init__(self):
        self._queue = Queue.PriorityQueue()

    def add(self, update):
        update.tries -= 1
        self._queue.put(update)

    def each_update_to_next_resource(self):
        """Grabs the next resource from the queue and processes

        This method uses a for loop to process the resource repeatedly until
        updates stop bubbling to the front of the queue.
        """
        next_update = self._queue.get()

        with ExclusiveResourceProcessor(next_update.id) as rp:
            # Queue the update whether this worker is the master or not.
            rp.queue_update(next_update)

            # Here, if the current worker is not the master, the call to
            # rp.updates() will not yield and so this will essentially be a
            # noop.
            for update in rp.updates():
                yield (rp, update)