summaryrefslogtreecommitdiff
path: root/tools/src/java/qpid-qmf2/src/main/java/org/apache/qpid/qmf2/console/Agent.java
blob: 26c80df809bc8f79364ea6e072e7349236ebb105 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
package org.apache.qpid.qmf2.console;

// Misc Imports
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

// QMF2 Imports
import org.apache.qpid.qmf2.common.ObjectId;
import org.apache.qpid.qmf2.common.QmfData;
import org.apache.qpid.qmf2.common.QmfException;
import org.apache.qpid.qmf2.common.SchemaClass;
import org.apache.qpid.qmf2.common.SchemaClassId;

/**
 * Local representation (proxy) of a remote Agent.
 * <p>
 * This class holds some state that relates to the Agent and in addition some methods may be called on the agent.
 * destroy(), invokeMethod() and refresh() are actually called by a proxy class AgentProxy. AgentProxy is actually
 * an interface that is implemented by Console (as that's where all the JMS stuff is), we use this approach to 
 * avoid introducing a circular dependency between Agent and Console.
 * <p>
 * The Console application maintains a list of all known remote Agents.
 * Each Agent is represented by an instance of the Agent class:
 * <p>
 * The following diagram illustrates the interactions between the Console, AgentProxy and the client side Agent
 * representation.
 * <p>
 * <img alt="" src="doc-files/Subscriptions.png">
 * @author Fraser Adams
 */
public final class Agent extends QmfData
{
    private AgentProxy                      _proxy;
    private List<String>                    _packages = new ArrayList<String>();
    private Map<SchemaClassId, SchemaClass> _schemaCache = new ConcurrentHashMap<SchemaClassId, SchemaClass>();
    private long                            _epoch;
    private long                            _heartbeatInterval;
    private long                            _timestamp;
    private boolean                         _eventsEnabled = true;
    private boolean                         _isActive = true;

    /**
     * The main constructor, taking a java.util.Map as a parameter. In essence it "deserialises" its state from the Map.
     *
     * @param m the map used to construct the SchemaClass
     * @param p the AgentProxy instance that implements some of the concrete behaviour of the local Agent representation.
     */
    public Agent(final Map m, final AgentProxy p)
    {
        super(m);
        // Populate attributes translating any old style keys if necessary.
        _epoch = hasValue("_epoch") ? getLongValue("_epoch") : getLongValue("epoch");
        _heartbeatInterval = hasValue("_heartbeat_interval") ? getLongValue("_heartbeat_interval") : 
                                                               getLongValue("heartbeat_interval");
        _timestamp = hasValue("_timestamp") ? getLongValue("_timestamp") : getLongValue("timestamp");
        _proxy = p;
    }

    /**
     * Sets the state of the Agent, used as an assignment operator.
     * 
     * @param m the Map used to initialise the Agent.
     */
    @SuppressWarnings("unchecked")
    public void initialise(final Map m)
    {        
        Map<String, Object> values = (Map<String, Object>)m.get("_values");
        _values = (values == null) ? m : values;

        // Populate attributes translating any old style keys if necessary.
        _epoch = hasValue("_epoch") ? getLongValue("_epoch") : getLongValue("epoch");
        _heartbeatInterval = hasValue("_heartbeat_interval") ? getLongValue("_heartbeat_interval") : 
                                                               getLongValue("heartbeat_interval");
        _timestamp = hasValue("_timestamp") ? getLongValue("_timestamp") : getLongValue("timestamp");
    }

    /**
     * Return whether or not events are enabled for this Agent.
     * @return a boolean indication of whether or not events are enabled for this Agent.
     */
    public boolean eventsEnabled()
    {
        return _eventsEnabled;
    }

    /**
     * Deactivated this Agent. Called by the Console when the Agent times out.
     */
    public void deactivate()
    {
        _isActive = false;
    }

    /**
     * Return the Agent instance name.
     * @return the Agent instance name.
     */
    public String getInstance()
    {
        return getStringValue("_instance");
    }

    /**
     * Return the identifying name string of the Agent. 
     * @return the identifying name string of the Agent. This name is used to send AMQP messages directly to this agent.
     */
    public String getName()
    {
        return getStringValue("_name");
    }

    /**
     * Return the product name string of the Agent.
     * @return the product name string of the Agent.
     */
    public String getProduct()
    {
        return getStringValue("_product");
    }

    /**
     * Return the Agent vendor name.
     * @return the Agent vendor name.
     */
    public String getVendor()
    {
        return getStringValue("_vendor");
    }

    /**
     * Return the Epoch stamp.
     * @return the Epoch stamp, used to determine if an Agent has been restarted.
     */
    public long getEpoch()
    {
        return _epoch;
    }

    /**
     * Set the Epoch stamp.
     * @param epoch the new Epoch stamp, used to indicate that an Agent has been restarted.
     */
    public void setEpoch(long epoch)
    {
        _epoch = epoch;
    }

    /**
     * Return the time that the Agent waits between sending hearbeat messages.
     * @return the time that the Agent waits between sending hearbeat messages.
     */
    public long getHeartbeatInterval()
    {
        return _heartbeatInterval;
    }

    /**
     * Return the timestamp of the Agent's last update.
     * @return the timestamp of the Agent's last update.
     */
    public long getTimestamp()
    {
        return _timestamp;
    }

    /**
     * Return true if the agent is alive.
     * @return true if the agent is alive (heartbeats have not timed out).
     */
    public boolean isActive()
    {
        return _isActive;
    }

    /**
     * Request that the Agent updates the value of this object's contents.
     *
     * @param objectId the ObjectId being queried for..
     * @param replyHandle the correlation handle used to tie asynchronous method requests with responses.
     * @param timeout the maximum time to wait for a response, overrides default replyTimeout.
     * @return the refreshed object.
     */    
    public QmfConsoleData refresh(final ObjectId objectId, final String replyHandle, final int timeout) throws QmfException
    {
        if (isActive())
        {
            return _proxy.refresh(this, objectId, replyHandle, timeout);
        }
        else
        {
            throw new QmfException("Agent.refresh() called from deactivated Agent");
        }
    }

    /**
     * Helper method to create a Map containing a QMF method request.
     *
     * @param objectId the objectId of the remote object.
     * @param name the remote method name.
     * @param inArgs the formal parameters of the remote method name.
     * @return a Map containing a QMF method request.
     */
    private Map<String, Object> createRequest(final ObjectId objectId, final String name, final QmfData inArgs)
    {
        // Default sizes for HashMap should be fine for request
        Map<String, Object> request = new HashMap<String, Object>();
        if (objectId != null)
        {
            request.put("_object_id", objectId.mapEncode());
        }
        request.put("_method_name", name);
        if (inArgs != null)
        {
            request.put("_arguments", inArgs.mapEncode());
            if (inArgs.getSubtypes() != null)
            {
                request.put("_subtypes", inArgs.getSubtypes());
            }
        }
        return request;
    }

    /**
     * Sends a method request to the Agent. Delegates to the AgentProxy to actually send the method as it's the
     * AgentProxy that knows about connections, sessions and messages.
     *
     * @param objectId the objectId of the remote object.
     * @param name the remote method name.
     * @param inArgs the formal parameters of the remote method name.
     * @param timeout the maximum time to wait for a response, overrides default replyTimeout.
     * @return the MethodResult.
     */
    protected MethodResult invokeMethod(final ObjectId objectId, final String name,
                                        final QmfData inArgs, final int timeout) throws QmfException
    {
        if (isActive())
        {
            return _proxy.invokeMethod(this, createRequest(objectId, name, inArgs), null, timeout);
        }
        else
        {
            throw new QmfException("Agent.invokeMethod() called from deactivated Agent");
        }
    }

    /**
     * Sends an asynchronous method request to the Agent. Delegates to the AgentProxy to actually send the method as
     * it's the AgentProxy that knows about connections, sessions and messages.
     *
     * @param objectId the objectId of the remote object.
     * @param name the remote method name.
     * @param inArgs the formal parameters of the remote method name.
     * @param replyHandle the correlation handle used to tie asynchronous method requests with responses.
     */
    protected void invokeMethod(final ObjectId objectId, final String name,
                                final QmfData inArgs, final String replyHandle) throws QmfException
    {
        if (isActive())
        {
            _proxy.invokeMethod(this, createRequest(objectId, name, inArgs), replyHandle, -1);
        }
        else
        {
            throw new QmfException("Agent.invokeMethod() called from deactivated Agent");
        }
    }

    /**
     * Sends a method request to the Agent. Delegates to the AgentProxy to actually send the method as it's the
     * AgentProxy that knows about connections, sessions and messages.
     *
     * @param name the remote method name.
     * @param inArgs the formal parameters of the remote method name.
     * @return the MethodResult.
     */
    public MethodResult invokeMethod(final String name, final QmfData inArgs) throws QmfException
    {
        return invokeMethod(null, name, inArgs, -1);
    }

    /**
     * Sends a method request to the Agent. Delegates to the AgentProxy to actually send the method as it's the
     * AgentProxy that knows about connections, sessions and messages.
     *
     * @param name the remote method name.
     * @param inArgs the formal parameters of the remote method name.
     * @param timeout the maximum time to wait for a response, overrides default replyTimeout.
     * @return the MethodResult.
     */
    public MethodResult invokeMethod(final String name, final QmfData inArgs, final int timeout) throws QmfException
    {
        return invokeMethod(null, name, inArgs, timeout);
    }

    /**
     * Sends a method request to the Agent. Delegates to the AgentProxy to actually send the method as it's the
     * AgentProxy that knows about connections, sessions and messages.
     *
     * @param name the remote method name.
     * @param inArgs the formal parameters of the remote method name.
     * @param replyHandle the correlation handle used to tie asynchronous method requests with responses.
     */
    public void invokeMethod(final String name, final QmfData inArgs, final String replyHandle) throws QmfException
    {
        invokeMethod(null, name, inArgs, replyHandle);
    }

    /**
     * Remove a Subscription. Delegates to the AgentProxy to actually remove the Subscription as it's the AgentProxy
     * that really knows about subscriptions.
     *
     * @param subscription the SubscriptionManager that we wish to remove.
     */
    public void removeSubscription(final SubscriptionManager subscription)
    {
        _proxy.removeSubscription(subscription);
    }

    /**
     * Allows reception of events from this agent.
     */
    public void enableEvents()
    {
        _eventsEnabled = true;
    }

    /**
     * Prevents reception of events from this agent.
     */
    public void disableEvents()
    {
        _eventsEnabled = false;
    }

    /**
     * Releases this Agent instance. Once called, the Console application should not reference this instance again.
     */
    public void destroy()
    {
        _timestamp = 0;
        _proxy.destroy(this);
    }

    /**
     * Clears the internally cached schema. Generally done when we wich to refresh the schema information from the
     * remote Agent.
     */
    public void clearSchemaCache()
    {
        _schemaCache.clear();
        _packages.clear();
    }

    /**
     * Stores the schema and package information obtained by querying the remote Agent.
     *
     * @param classes the list of SchemaClassIds obtained by querying the remote Agent.
     */
    public void setClasses(final List<SchemaClassId> classes)
    {
        if (classes == null)
        {
            clearSchemaCache();
            return;
        }

        for (SchemaClassId classId : classes)
        {
            _schemaCache.put(classId, SchemaClass.EMPTY_SCHEMA);
            if (!_packages.contains(classId.getPackageName()))
            {
                _packages.add(classId.getPackageName());
            }
        }
    }

    /**
     * Return the list of SchemaClassIds associated with this Agent.
     * @return the list of SchemaClassIds associated with this Agent.
     */
    public List<SchemaClassId> getClasses()
    {
        if (_schemaCache.size() == 0)
        {
            return Collections.emptyList();
        }
        return new ArrayList<SchemaClassId>(_schemaCache.keySet());
    }

    /**
     * Return the list of packages associated with this Agent.
     * @return the list of packages associated with this Agent.
     */
    public List<String> getPackages()
    {
        return _packages;
    }

    /**
     * Return the SchemaClass associated with this Agent.
     * @return the list of SchemaClass associated with this Agent.
     * <p>
     * I <i>believe</i> that there should only be one entry in the list returned when looking up a specific chema by classId.
     */
    public List<SchemaClass> getSchema(final SchemaClassId classId)
    {
        SchemaClass schema = _schemaCache.get(classId);
        if (schema == SchemaClass.EMPTY_SCHEMA)
        {
            return Collections.emptyList();
        }
        
        List<SchemaClass> results = new ArrayList<SchemaClass>();
        results.add(schema);
        return results;
    }

    /**
     * Set a schema keyed by SchemaClassId.
     *
     * @param classId the SchemaClassId indexing the particular schema.
     * @param schemaList the schema being indexed.
     * <p>
     * I <i>believe</i> that there should only be one entry in the list returned when looking up a specific chema by classId.
     */
    public void setSchema(final SchemaClassId classId, final List<SchemaClass> schemaList)
    {
        if (schemaList == null || schemaList.size() == 0)
        {
            _schemaCache.put(classId, SchemaClass.EMPTY_SCHEMA);
        }
        else
        {
            // I believe that there should only be one entry in the list returned when looking up
            // a specific chema by classId
            _schemaCache.put(classId, schemaList.get(0));
        }
    }

    /**
     * Helper/debug method to list the QMF Object properties and their type.
     */
    @Override
    public void listValues()
    {
        super.listValues();
        System.out.println("Agent:");
        System.out.println("instance: " + getInstance());
        System.out.println("name: " + getName());
        System.out.println("product: " + getProduct());
        System.out.println("vendor: " + getVendor());
        System.out.println("epoch: " + getEpoch());
        System.out.println("heartbeatInterval: " + getHeartbeatInterval());
        System.out.println("timestamp: " + new Date(getTimestamp()/1000000l));
    }
}