summaryrefslogtreecommitdiff
path: root/tools/src/java/qpid-qmf2/src/main/java/org/apache/qpid/qmf2/agent/Subscription.java
blob: 2f49702c7ccc6edccb0c252da5ea0dd8ee1f153e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
package org.apache.qpid.qmf2.agent;

// Simple Logging Facade 4 Java
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TimerTask;
import java.util.UUID;

// QMF2 Imports
import org.apache.qpid.qmf2.common.Handle;
import org.apache.qpid.qmf2.common.QmfException;
import org.apache.qpid.qmf2.common.QmfQuery;
import org.apache.qpid.qmf2.common.QmfQueryTarget;

/** 
 * This TimerTask represents a running Subscription on the Agent.
 * <p>
 * The main reason we have Subscriptions as TimerTasks is to enable proper cleanup of the references stored in
 * the _subscriptions Map when the Subscription expires. The timer also causes QmfAgenData that have been updated
 * since the last interval to be published.
 * <p>
 * The following diagram illustrates the Subscription relationships with the Agent and QmfAgentData.
 * <p>
 * <img alt="" src="doc-files/Subscriptions.png">
 * @author Fraser Adams
 */
public final class Subscription extends TimerTask
{
    private static final Logger _log = LoggerFactory.getLogger(Subscription.class);

    // Duration is the time (in seconds) the Subscription is active before it automatically expires unless refreshed
    private static final int DEFAULT_DURATION = 300;
    private static final int MAX_DURATION = 3600;
    private static final int MIN_DURATION = 10;

    // Interval is the period (in milliseconds) between subscription ubdates.
    private static final int DEFAULT_INTERVAL = 30000;
    private static final int MIN_INTERVAL = 1000;

    private SubscribableAgent _agent;
    private long _startTime = System.currentTimeMillis();
    private long _lastUpdate = _startTime*1000000l;
    private String _subscriptionId;
    private Handle _consoleHandle;
    private QmfQuery _query;
    private long _duration = 0;
    private long _interval = 0;

    /**
     * Tells the SubscribableAgent to send the results to the Console via a subscription indicate message.
     *
     * @param results the list of mapEncoded QmfAgentData that currently match the query associated with this
     * Subscription.
     */
    protected void publish(List<Map> results)
    {
        _agent.sendSubscriptionIndicate(_consoleHandle, results);
        _lastUpdate = System.currentTimeMillis()*1000000l;
    }

    /**
     * Construct a new Subscription.
     * @param agent the SubscribableAgent to which this Subscription is associated.
     * @param params the SubscriptionParams object that contains the information needed to create a Subscription.
     */
    public Subscription(SubscribableAgent agent, SubscriptionParams params) throws QmfException
    {
        _agent = agent;
        _subscriptionId = UUID.randomUUID().toString();
        _consoleHandle = params.getConsoleHandle();
        _query = params.getQuery();
        setDuration(params.getLifetime());
        setInterval(params.getPublishInterval());

        _log.debug("Creating Subscription {}, duration = {}, interval = {}", new Object[] {_subscriptionId, _duration, _interval});
    }

    /**
     * This method gets called periodically by the Timer scheduling this TimerTask.
     * <p>
     * First a check is made to see if the Subscription has expired, if it has then it is cancelled.
     * <p>
     * If the Subscription isn't cancelled the Query gets evaluated against all registered objects and any that match
     * which are new to the Subscription or have changed since the last update get published.
     */
    public void run()
    {
        long elapsed = (long)Math.round((System.currentTimeMillis() - _startTime)/1000.0f);
        if (elapsed >= _duration)
        {
            _log.debug("Subscription {} has expired, removing", _subscriptionId);
            // The Subscription has expired so cancel it
            cancel();
        }
        else
        {
            List<QmfAgentData> objects = _agent.evaluateQuery(_query);
            List<Map> results = new ArrayList<Map>(objects.size());
            for (QmfAgentData object : objects)
            {
                if (object.getSubscription(_subscriptionId) == null)
                {
                    // The object is new to this Subscription so publish it
                    object.addSubscription(_subscriptionId, this);
                    results.add(object.mapEncode());
                }
                else
                {
                    // If the object has had update() called since last Subscription update publish it.
                    // Note that in many cases an Agent might call publish() on a managed object rather than
                    // update() which immediately forces a data indication to be sent to the subscriber on
                    // the Console.
                    if (object.getUpdateTime() > _lastUpdate)
                    {
                        results.add(object.mapEncode());
                    }
                }
            }

            if (results.size() > 0)
            {
                publish(results);
            }
        }
    }

    /**
     * Refresh the subscription by zeroing its elapsed time.
     *
     * @param resubscribeParams the ResubscribeParams passed by the Console potentially containing new duration
     * information.
     */
    public void refresh(ResubscribeParams resubscribeParams)
    {
        _log.debug("Refreshing Subscription {}", _subscriptionId);
        _startTime = System.currentTimeMillis();
        setDuration(resubscribeParams.getLifetime());
    }

    /**
     * Cancel the Subscription, tidying references up and cancelling the TimerTask.
     */
    @Override
    public boolean cancel()
    {
        _log.debug("Cancelling Subscription {}", _subscriptionId);
        // This Subscription is about to be deleted, remove it from any Objects that may be referencing it.
        List<QmfAgentData> objects = _agent.evaluateQuery(_query);
        for (QmfAgentData object : objects)
        {
            object.removeSubscription(_subscriptionId);
        }

        _agent.removeSubscription(this);
        return super.cancel(); // Cancel the TimerTask
    }

    /**
     * Return the SubscriptionId of this subscription.
     * @return the SubscriptionId of this subscription.
     */
    public String getSubscriptionId()
    {
        return _subscriptionId;
    }

    /**
     * Return the consoleHandle of this subscription.
     * @return the consoleHandle of this subscription.
     */
    public Handle getConsoleHandle()
    {
        return _consoleHandle;
    }

    /**
     * Set the Subscription lifetime in seconds. If the value passed to this method is zero the duration gets
     * set to the Agent's DEFAULT_DURATION is the duration has not already been set, if the duration has already
     * been set passing in a zero value has no effect on the duration.
     * If the value passed is non-zero the duration passed gets restricted between the Agent's MIN_DURATION
     * and MAX_DURATION.
     *
     * @param duration the new Subscription lifetime in seconds.
     */
    public void setDuration(long duration)
    {
        if (duration == 0)
        {
            if (_duration == 0)
            {
                _duration = DEFAULT_DURATION;
            } 
            return;
        }
        else
        {
            if (duration > MAX_DURATION)
            {
                duration = MAX_DURATION;
            }
            else if (duration < MIN_DURATION)
            {
                duration = MIN_DURATION;
            }
        }
        _duration = duration;
    }

    /**
     * Return the current Subscription lifetime value in seconds.
     * @return the current Subscription lifetime value in seconds.
     */
    public long getDuration()
    {
        return _duration;
    }

    /**
     * Set the Subscription refresh interval in seconds. If the value passed to this method is zero the interval gets
     * set to the Agent's DEFAULT_INTERVAL otherwise the interval passed gets restricted to be {@literal >= } the Agent's
     * MIN_INTERVAL.
     *
     * @param interval the time (in milliseconds) between periodic updates of data in this Subscription. 
     */
    public void setInterval(long interval)
    {
        if (interval == 0)
        {
            interval = DEFAULT_INTERVAL;
        }
        else if (interval < MIN_INTERVAL)
        {
            interval = MIN_INTERVAL;
        }
        _interval = interval;
    }

    /**
     * Return The time (in milliseconds) between periodic updates of data in this Subscription. 
     * @return The time (in milliseconds) between periodic updates of data in this Subscription. 
     */
    public long getInterval()
    {
        return _interval;
    }

    /**
     * Return The Subscription's QmfQuery.
     * @return The Subscription's QmfQuery.
     */
    public QmfQuery getQuery()
    {
        return _query;
    }
}