summaryrefslogtreecommitdiff
path: root/qpid/tools/src/java/qpid-qmf2/src/main/java/org/apache/qpid/qmf2/console/SubscriptionManager.java
blob: 69889d85de0b4f028d7e020be488e7086bc6776e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
package org.apache.qpid.qmf2.console;

// Simple Logging Facade 4 Java
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

// Misc Imports
import java.util.HashMap;
import java.util.Map;
import java.util.TimerTask;

// QMF2 Imports
import org.apache.qpid.qmf2.common.QmfQuery;

/** 
 * A SubscriptionManager represents a running Subscription on the Console.
 * <p>
 * The main reason we have SubscriptionManagers as TimerTasks is to enable proper cleanup of the references stored in
 * the subscriptionByHandle and subscriptionById Maps. Ideally these will be cleaned up by a client calling 
 * cancelSubscription but we can't rely on that as the client may forget or the Agent may not respond.
 * <p>
 * The SubscriptionManager acts like a client/Console side representation of a Subscription running on an Agent.
 * As mentioned above its primary purpose is to enable references to Subscriptions maintained by the Console to
 * be cleaned up should the Subscription time out rather than being cancelled, however as a side effect it is
 * used to enable emulation of Subscriptions to the broker ManagementAgent, which does not yet natively implement
 * Subscription.
 * <p>
 * To emulate Subscriptions the Console receives the periodic _data indications pushed by the ManagementAgent. The
 * Console then iterates through Subscriptions referencing the broker Agent and evaluates their queries against
 * the QmfConsoleData returned by the _data indication. Any QmfConsoleData that match the query are passed to the
 * client application with the consoleHandle of the matching Subscription.
 * <p>
 * The following diagram illustrates the Subscription relationships with the Console and local Agent proxy.
 * <p>
 * <img alt="" src="doc-files/Subscriptions.png">
 *
 * @author Fraser Adams
 */
public final class SubscriptionManager extends TimerTask
{
    private static final Logger _log = LoggerFactory.getLogger(SubscriptionManager.class);

    private final Agent _agent;
    private long     _startTime = System.currentTimeMillis();
    private String   _subscriptionId;
    private String   _consoleHandle;
    private String   _replyHandle;
    private QmfQuery _query;
    private long     _duration = 0;
    private long     _interval = 0;
    private boolean  _waiting = true;

    /**
     * Construct a Console side proxy of a Subscription. Primarily to manage references to the Subscription.
     *
     * @param agent the Agent from which the Subscription has been requested
     * @param query the QmfQuery that the Subscription will run
     * @param consoleHandle the handle that uniquely identifies the Subscription
     * @param interval the interval between subscription updates
     * @param duration the duration of the subscription (assuming it doesn't get refreshed)
     */
    SubscriptionManager(final Agent agent, final QmfQuery query, final String consoleHandle,
                        final String replyHandle, final long interval, final long duration)
    {
        _agent = agent;
        _query = query;
        _consoleHandle = consoleHandle;
        _replyHandle = replyHandle;
        _interval = interval;
        _duration = duration;
        _log.debug("Creating SubscriptionManager {}, on Agent {}",_consoleHandle, _agent.getName());
    }

    /**
     * This method gets called periodically by the Timer scheduling this TimerTask.
     * <p>
     * First a check is made to see if the Subscription has expired, if it has then it is cancelled.
     */
    public void run()
    {
        long elapsed = (long)Math.round((System.currentTimeMillis() - _startTime)/1000.0f);
        if (elapsed >= _duration || !_agent.isActive())
        {
            _log.debug("Subscription {} has expired, removing", _subscriptionId);
            // The Subscription has expired so cancel it
            cancel();
        }
    }

    /**
     * Causes the current thread to wait until it is signalled or times out.
     * <p>
     * This method is primarily used as a means to enable a synchronous call to createSubscription().
     * For most synchronous calls we simply use the receive() call on the synchronous session, but we can't do that
     * for createSubscription() as we specifically need to use the replyTo on the asynchronous session as once
     * subscriptions are created the results are asynchronously pushed. This means we have to get the response to 
     * createSession() on the asynchronous replyTo then signal the (blocked) main thread that the response has
     * been received.
     *
     * @param timeout the maximum time to wait to be signalled.
     */
    public synchronized void await(final long timeout)
    {
        while (_waiting)
        {
            long _startTime = System.currentTimeMillis();
            try
            {
                wait(timeout);
            }
            catch (InterruptedException ie)
            {
                continue;
            }
            // Measure elapsed time to test against spurious wakeups and ensure we really have timed out
            long elapsedTime = (System.currentTimeMillis() - _startTime);
            if (elapsedTime >= timeout)
            {
                break;
            }
        }
        _waiting = true;
    }

    /**
     * Wakes up all waiting threads.
     */
    public synchronized void signal()
    {
        _waiting = false;
        notifyAll();
    }

    /**
     * Refresh the subscription by zeroing its elapsed time.
     */
    public void refresh()
    {
        _log.debug("Refreshing Subscription {}", _subscriptionId);
        _startTime = System.currentTimeMillis();
    }

    /**
     * Cancel the Subscription, tidying references up and cancelling the TimerTask.
     */
    @Override
    public boolean cancel()
    {
        _log.debug("Cancelling Subscription {}, {}", _consoleHandle, _subscriptionId);
        _agent.removeSubscription(this);
        signal(); // Just in case anything is blocking on this Subscription.
        return super.cancel(); // Cancel the TimerTask
    }

    /**
     * Set the SubscriptionId.
     * @param subscriptionId the new SubscriptionId of this Subscription.
     */
    public void setSubscriptionId(final String subscriptionId)
    {
        _subscriptionId = subscriptionId;
    }

    /**
     * return the SubscriptionId of this Subscription.
     * @return the SubscriptionId of this Subscription.
     */
    public String getSubscriptionId()
    {
        return _subscriptionId;
    }

    /**
     * Return the consoleHandle of this Subscription.
     * @return the consoleHandle of this Subscription.
     */
    public String getConsoleHandle()
    {
        return _consoleHandle;
    }

    /**
     * Return the replyHandle of this Subscription.
     * @return the replyHandle of this Subscription.
     */
    public String getReplyHandle()
    {
        return _replyHandle;
    }

    /**
     * Return the Agent running this Subscription.
     * @return the Agent running this Subscription.
     */
    public Agent getAgent()
    {
        return _agent;
    }

    /**
     * Set the Subscription lifetime in seconds.
     *
     * @param duration the new Subscription lifetime in seconds
     */
    public void setDuration(final long duration)
    {
        _duration = duration;
    }

    /**
     * Return The Subscription's QmfQuery.
     * @return The Subscription's QmfQuery.
     */
    public QmfQuery getQuery()
    {
        return _query;
    }

    /**
     * Create a Map encoded version.
     * <p>
     * When we do a synchronous createSubscription the Subscription itself holds the info needed to populate
     * the SubscriptionParams result. We encode the info in a Map to pass to the SubscribeParams Constructor
     */
    public Map<String, Object> mapEncode()
    {
        Map<String, Object> map = new HashMap<String, Object>();
        map.put("_interval", _interval);
        map.put("_duration", _duration);
        map.put("_subscription_id", _subscriptionId);
        return map;
    }
}