summaryrefslogtreecommitdiff
path: root/qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/java/org/apache/qpid/server/qmf2/QmfManagementAgent.java
blob: 1e1017070092f81620ec1a51c2c7fc56e54884f6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */

package org.apache.qpid.server.qmf2;

// Misc Imports

import static org.apache.qpid.qmf2.common.WorkItem.WorkItemType.METHOD_CALL;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.qpid.qmf2.agent.Agent;
import org.apache.qpid.qmf2.agent.MethodCallParams;
import org.apache.qpid.qmf2.agent.MethodCallWorkItem;
import org.apache.qpid.qmf2.agent.QmfAgentData;
import org.apache.qpid.qmf2.common.ObjectId;
import org.apache.qpid.qmf2.common.QmfEventListener;
import org.apache.qpid.qmf2.common.QmfException;
import org.apache.qpid.qmf2.common.WorkItem;
import org.apache.qpid.qmf2.util.ConnectionHelper;
import org.apache.qpid.server.model.Binding;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfigurationChangeListener;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Connection;
import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.Session;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.VirtualHostNode;

// Simple Logging Facade 4 Java
// QMF2 Imports
// Java Broker model Imports

/**
 * This class implements a QMF2 Agent providing access to the Java broker Management Objects via QMF2 thus
 * allowing the Java Broker to be managed in the same way to the C++ Broker.
 * <p>
 * The intention is for the QmfManagementAgent to conform to the same Management Schema as the C++ Broker
 * (e.g. as specified in management-schema.xml) in order to provide maximum cohesion between the
 * two Broker implementations, however that's not entirely possible given differences between the underlying
 * Management Models.
 * <p>
 * This Plugin attempts to map properties from the Java org.apache.qpid.server.model.* classes to equivalent
 * properties and statistics in the C++ broker's Management Schema rather than expose them "natively", this is
 * in order to try and maximise alignment between the two implementations and to try to allow the Java Broker
 * to be managed by the Command Line tools used with the C++ Broker such as qpid-config etc. it's also to
 * enable the Java Broker to be accessed via the QMF2 REST API and GUI.
 *
 * @author Fraser Adams
 */
public class QmfManagementAgent implements ConfigurationChangeListener, QmfEventListener
{
    private static final Logger _log = LoggerFactory.getLogger(QmfManagementAgent.class);

    // Set heartbeat interval to 10 seconds. TODO Should probably be config driven, but I *think* that this is
    // different than "heartbeat.delay" and "heartbeat.timeoutFactor" currently present in the config?
    private static final int HEARTBEAT_INTERVAL = 10;
    private Agent _agent = null;

    // The first Connection Object relates to the QmfManagementAgent, we use this flag to avoid mapping that Connection
    // to a QMF Object thus hiding it from Consoles. This is done to provide consistency with the C++ Broker which
    // also "hides" its own private AMQP Connections, Queues & Bindings.
    private boolean agentConnection = true;

    private final Broker<?> _broker;          // Passed in by Plugin bootstrapping.
    private final String _defaultVirtualHost; // Pulled from the broker attributes.

    /**
     * A Map of QmfAgentData keyed by ConfiguredObject. This is mainly used for Management Object "lifecycle management".
     * In an ideal world the Agent class could retain all information, but I want to track ConfiguredObject state and
     * use that to create and delete QmfAgentData data, which means I need to be able to *find* the QmfAgentData and
     * the *official* Agent API doesn't have a public method to query Objects (though the evaluateQuery() method used
     * by Query Subscriptions could be used, but it's not really a "public API method" so probably best not to use it)
     * Arguably this is what the AgentExternal subclass of Agent is for whereby queries are handled in the Agent
     * implementation by handling "(wi.getType() == QUERY)" but the AgentExternal API forces some constructs that are 
     * actually likely to be less efficient, as an example sending a separate queryResponse() for each object forces a 
     * look up of a List of QmfAgentData objects keyed by the consoleHandle for each call. There is also the need to 
     * separately iterate through the List of QmfAgentData objects thus created to create the mapEncoded list needed
     * for sending via the QMF2 protocol.
     * <p>
     * So rather than go through all that faff we simply retain an additional Map as below which allows navigation
     * between the ConfiguredObject and QmfAgentData. The subclasses of QmfAgentData will contain references to
     * allow navigation back to the concrete subclasses of ConfiguredObject if necessary.
     * The capacity of 100 is pretty arbitrary but the default of 16 seems too low for a ManagementAgent.
     */
    private Map<ConfiguredObject, QmfAgentData> _objects = new ConcurrentHashMap<ConfiguredObject, QmfAgentData>(100);

    /**
     * Constructor. Creates the AMQP Connection to the Broker and starts the QMF2 Agent.
     * @param url the Connection URL to be used to construct the AMQP Connection.
     * @param broker the root Broker Management Object from which the other Management Objects may be obtained.
     * to work without explicitly setting a Virtual Host, which I think is necessary because the C++ Broker and
     * the python command line tools aren't currently Virtual Host aware (are they?). The intention is to mark
     * queues and exchanges with {@literal [vhost:<vhost-name>/]<object-name>} in other words if we want to add things to
     * the non-default Virtual Host prefix their names with {@literal [vhost:<vhost-name>/]}. This approach *ought* to allow
     * non-Virtual Host aware command line tools the ability to add queues/exchanges to a particular vhost.
     */
    public QmfManagementAgent(final String url, final Broker broker)
    {
        _broker = broker;
        _defaultVirtualHost = broker.getDefaultVirtualHost();

        try
        {
            // Create the actual JMS Connection. ConnectionHelper allows us to work with a variety of URL
            // formats so we can abstract away from the somewhat complex Java AMQP URL format.
            javax.jms.Connection connection = ConnectionHelper.createConnection(url);
            if (connection == null)
            {
                _log.info("QmfManagementAgent Constructor failed due to null AMQP Connection");
            }
            else
            {
                _agent = new Agent(this, HEARTBEAT_INTERVAL);
                // Vendor and Product are deliberately set to be the same as for the C++ broker.
                _agent.setVendor("apache.org");
                _agent.setProduct("qpidd");
                _agent.setConnection(connection);

                // Register the schema for the Management Objects. These don't have to be completely populated
                // the minimum is to register package name and class name for the QmfAgentData.
                _agent.registerObjectClass(org.apache.qpid.server.qmf2.agentdata.Broker.getSchema());
                _agent.registerObjectClass(org.apache.qpid.server.qmf2.agentdata.Connection.getSchema());
                _agent.registerEventClass(org.apache.qpid.server.qmf2.agentdata.Connection.getClientConnectSchema());
                _agent.registerEventClass(org.apache.qpid.server.qmf2.agentdata.Connection.getClientDisconnectSchema());

                _agent.registerObjectClass(org.apache.qpid.server.qmf2.agentdata.Exchange.getSchema());
                _agent.registerEventClass(org.apache.qpid.server.qmf2.agentdata.Exchange.getExchangeDeclareSchema());
                _agent.registerEventClass(org.apache.qpid.server.qmf2.agentdata.Exchange.getExchangeDeleteSchema());

                _agent.registerObjectClass(org.apache.qpid.server.qmf2.agentdata.Queue.getSchema());
                _agent.registerEventClass(org.apache.qpid.server.qmf2.agentdata.Queue.getQueueDeclareSchema());
                _agent.registerEventClass(org.apache.qpid.server.qmf2.agentdata.Queue.getQueueDeleteSchema());

                _agent.registerObjectClass(org.apache.qpid.server.qmf2.agentdata.Binding.getSchema());
                _agent.registerEventClass(org.apache.qpid.server.qmf2.agentdata.Binding.getBindSchema());
                _agent.registerEventClass(org.apache.qpid.server.qmf2.agentdata.Binding.getUnbindSchema());

                _agent.registerObjectClass(org.apache.qpid.server.qmf2.agentdata.Subscription.getSchema());
                _agent.registerEventClass(org.apache.qpid.server.qmf2.agentdata.Subscription.getSubscribeSchema());
                _agent.registerEventClass(org.apache.qpid.server.qmf2.agentdata.Subscription.getUnsubscribeSchema());

                _agent.registerObjectClass(org.apache.qpid.server.qmf2.agentdata.Session.getSchema());

                // Initialise QmfAgentData Objects and track changes to the broker Management Objects.
                registerConfigurationChangeListeners();
            }
        }
        catch (QmfException qmfe)
        {
            _log.error("QmfException caught in QmfManagementAgent Constructor", qmfe);
            _agent = null; // Causes isConnected() to be false and thus prevents the "QMF2 Management Ready" message.
        }
        catch (Exception e)
        {
            _log.error("Exception caught in QmfManagementAgent Constructor", e);
            _agent = null; // Causes isConnected() to be false and thus prevents the "QMF2 Management Ready" message.
        }
    }

    /**
     * Close the QmfManagementAgent clearing the QMF2 Agent and freeing its resources.
     */
    public void close()
    {
        if (isConnected())
        {
            _agent.destroy();
        }
    }

    /**
     * Returns whether the Agent is connected and running.
     * @return true if the Agent is connected and running otherwise return false.
     */
    public boolean isConnected()
    {
        return _agent != null;
    }

    /**
     * This method initialises the initial set of QmfAgentData Objects and tracks changes to the Broker Management 
     * Objects via the childAdded() method call.
     */
    private void registerConfigurationChangeListeners()
    {
        childAdded(null, _broker);

        if (_log.isDebugEnabled())
        {
            _log.debug("Registering model listeners for broker " + _broker);
        }

        for (VirtualHostNode<?> vhostNode : _broker.getVirtualHostNodes())
        {

            if (_log.isDebugEnabled())
            {
                _log.debug("Considering virtualhostnode " + vhostNode);
            }

            VirtualHost<?,?,?> vhost = vhostNode.getVirtualHost();

            // We don't add QmfAgentData VirtualHost objects. Possibly TODO, but it's a bit awkward at the moment
            // because the C++ Broker doesn't *seem* to do much with them and the command line tools such
            // as qpid-config don't appear to be VirtualHost aware. A way to stay compatible is to mark queues,
            // exchanges etc with [vhost:<vhost-name>/]<object-name> (see Constructor comments).

            if (vhost != null)
            {
                vhost.addChangeListener(this);

                addListenersForConnectionsAndChildren(vhost);
                addListenersForExchangesAndChildren(vhost);
                addListenersForQueuesAndChildren(vhost);
            }
        }


        if (_log.isDebugEnabled())
        {
            _log.debug("Registered model listeners");
        }

    }

    private void addListenersForQueuesAndChildren(final VirtualHost<?, ?, ?> vhost)
    {
        for (Queue<?> queue : vhost.getQueues())
        {
            boolean agentQueue = false;
            for (Binding binding : queue.getBindings())
            {
                String key = binding.getName();
                if (key.equals("broker") || key.equals("console.request.agent_locate") ||
                    key.startsWith("apache.org:qpidd:"))
                {
                    agentQueue = true;
                    break;
                }
            }

            // Don't add QMF related bindings or Queues in registerConfigurationChangeListeners as those will
            // relate to the Agent itself and we want to "hide" those to be consistent with the C++ Broker.
            if (!agentQueue)
            {
                childAdded(vhost, queue);

                for (Binding binding : queue.getBindings())
                {
                    childAdded(queue, binding);
                }

                for (Consumer subscription : queue.getChildren(Consumer.class))
                {
                    childAdded(queue, subscription);
                }
            }
        }
    }

    private void addListenersForExchangesAndChildren(final VirtualHost<?, ?, ?> vhost)
    {
        // The code blocks for adding Bindings (and adding Queues) contain checks to see if what is being added
        // relates to Queues or Bindings for the QmfManagementAgent. If they are QmfManagementAgent related
        // we avoid registering the Object as a QMF Object, in other words we "hide" QmfManagementAgent QMF Objects.
        // This is done to be consistent with the C++ broker which also "hides" its own Connection, Queue & Binding.
        for (Exchange<?> exchange : vhost.getExchanges())
        {
            childAdded(vhost, exchange);

            for (Binding binding : exchange.getBindings())
            {
                String key = binding.getName();
                if (key.equals("broker") || key.equals("console.request.agent_locate") ||
                    key.startsWith("apache.org:qpidd:") || key.startsWith("TempQueue"))
                { // Don't add QMF related Bindings in registerConfigurationChangeListeners as those will relate
                } // to the Agent and we want to "hide" those.
                else
                {
                    childAdded(exchange, binding);
                }
            }
        }
    }

    private void addListenersForConnectionsAndChildren(final VirtualHost<?, ?, ?> vhost)
    {
        for (Connection<?> connection : vhost.getConnections())
        {
            childAdded(vhost, connection);

            for (Session<?> session : connection.getSessions())
            {
                childAdded(connection, session);

                if (session.getConsumers() != null)
                {
                    for (Consumer subscription : session.getConsumers())
                    {
                        childAdded(session, subscription);
                    }
                }
            }
        }
    }


    // ************************* ConfigurationChangeListener implementation methods *************************

    /**
     * ConfigurationChangeListener method called when the state is changed (ignored here).
     * @param object the object being modified.
     * @param oldState the state of the object prior to this method call.
     * @param newState the desired state of the object.
     */
    @Override
    public void stateChanged(final ConfiguredObject object, final State oldState, final State newState)
    {
        // no-op
    }

    /**
     * ConfigurationChangeListener method called when an attribute is set (ignored here).
     * @param object the object being modified.
     * @param attributeName the name of the object attribute that we want to change.
     * @param oldAttributeValue the value of the attribute prior to this method call.
     * @param newAttributeValue the desired value of the attribute.
     */
    @Override
    public void attributeSet(ConfiguredObject object, String attributeName,
                             Object oldAttributeValue, Object newAttributeValue)
    {
        // no-op
    }

    /**
     * ConfigurationChangeListener method called when a child ConfiguredObject is added.
     * <p>
     * This method checks the type of the child ConfiguredObject that has been added and creates the equivalent
     * QMF2 Management Object if one doesn't already exist. In most cases it's a one-to-one mapping, but for
     * Binding for example the Binding child is added to both Queue and Exchange so we only create the Binding
     * QMF2 Management Object once and add the queueRef and exchangeRef reference properties referencing the Queue
     * and Exchange parent Objects respectively, Similarly for Consumer (AKA Subscription).
     * <p>
     * This method is also responsible for raising the appropriate QMF2 Events when Management Objects are created.
     * @param object the parent object that the child is being added to.
     * @param child the child object being added.
     */
    @Override
    public void childAdded(final ConfiguredObject object, final ConfiguredObject child)
    {

        if (_log.isDebugEnabled())
        {
            _log.debug("childAdded: " + child.getClass().getSimpleName() + "." + child.getName());
        }

        QmfAgentData data = null;

        // We current don't listen for new virtualhostnodes or new virtualhosts, so any new instances
        // of these objects wont be seen through QMF until the Broker is restarted.

        if (child instanceof Broker)
        {
            data = new org.apache.qpid.server.qmf2.agentdata.Broker((Broker)child);
        }
        else if (child instanceof Connection)
        {
            if (!agentConnection && !_objects.containsKey(child))
            {
                // If the parent object is the default vhost set it to null so that the Connection ignores it.
                VirtualHost vhost = (object.getName().equals(_defaultVirtualHost)) ? null : (VirtualHost)object;
                data = new org.apache.qpid.server.qmf2.agentdata.Connection(vhost, (Connection)child);
                _objects.put(child, data);

                // Raise a Client Connect Event.
                _agent.raiseEvent(((org.apache.qpid.server.qmf2.agentdata.Connection)data).createClientConnectEvent());
            }
            agentConnection = false; // Only ignore the first Connection, which is the one from the Agent. 
        }
        else if (child instanceof Session)
        {
            if (!_objects.containsKey(child))
            {
                QmfAgentData ref = _objects.get(object); // Get the Connection QmfAgentData so we can get connectionRef.
                if (ref != null)
                {
                    data = new org.apache.qpid.server.qmf2.agentdata.Session((Session)child, ref.getObjectId());
                    _objects.put(child, data);
                }
            }
        }
        else if (child instanceof Exchange)
        {
            if (!_objects.containsKey(child))
            {
                // If the parent object is the default vhost set it to null so that the Connection ignores it.
                VirtualHost vhost = (object.getName().equals(_defaultVirtualHost)) ? null : (VirtualHost)object;
                data = new org.apache.qpid.server.qmf2.agentdata.Exchange(vhost, (Exchange)child);
                _objects.put(child, data);

                // Raise an Exchange Declare Event.
                _agent.raiseEvent(((org.apache.qpid.server.qmf2.agentdata.Exchange)data).createExchangeDeclareEvent());

            }
        }
        else if (child instanceof Queue)
        {
            if (!_objects.containsKey(child))
            {
                // If the parent object is the default vhost set it to null so that the Connection ignores it.
                VirtualHost vhost = (object.getName().equals(_defaultVirtualHost)) ? null : (VirtualHost)object;
                data = new org.apache.qpid.server.qmf2.agentdata.Queue(vhost, (Queue)child);
                _objects.put(child, data);

                // Raise a Queue Declare Event.
                _agent.raiseEvent(((org.apache.qpid.server.qmf2.agentdata.Queue)data).createQueueDeclareEvent());
            }
        }
        else if (child instanceof Binding)
        {
            // Bindings are a little more complex because in QMF bindings contain exchangeRef and queueRef properties
            // whereas with the Java Broker model Binding is a child of Queue and Exchange. To cope with this we
            // first try to create or retrieve the QMF Binding Object then add either the Queue or Exchange reference
            // depending on whether Queue or Exchange was the parent of this addChild() call.
            if (!_objects.containsKey(child))
            {
                data = new org.apache.qpid.server.qmf2.agentdata.Binding((Binding)child);
                _objects.put(child, data);

                String eName = ((Binding)child).getExchange().getName();
                if (!eName.equals("<<default>>")) // Don't send Event for Binding to default direct.
                {
                    // Raise a Bind Event.
                    _agent.raiseEvent(((org.apache.qpid.server.qmf2.agentdata.Binding)data).createBindEvent());
                }
            }

            org.apache.qpid.server.qmf2.agentdata.Binding binding =
                (org.apache.qpid.server.qmf2.agentdata.Binding)_objects.get(child);

            QmfAgentData ref = _objects.get(object);
            if (ref != null)
            {
                if (object instanceof Queue)
                {
                    binding.setQueueRef(ref.getObjectId());
                }
                else if (object instanceof Exchange)
                {
                    binding.setExchangeRef(ref.getObjectId());
                }
            }
        }
        else if (child instanceof Consumer) // AKA Subscription
        {
            // Subscriptions are a little more complex because in QMF Subscriptions contain sessionRef and queueRef 
            // properties whereas with the Java Broker model Consumer is a child of Queue and Session. To cope with
            // this we first try to create or retrieve the QMF Subscription Object then add either the Queue or
            // Session reference depending on whether Queue or Session was the parent of this addChild() call.
            if (!_objects.containsKey(child))
            {
                data = new org.apache.qpid.server.qmf2.agentdata.Subscription((Consumer)child);
                _objects.put(child, data);
           }

            org.apache.qpid.server.qmf2.agentdata.Subscription subscription =
                (org.apache.qpid.server.qmf2.agentdata.Subscription)_objects.get(child);

            QmfAgentData ref = _objects.get(object);
            if (ref != null)
            {
                if (object instanceof Queue)
                {
                    subscription.setQueueRef(ref.getObjectId(), (Queue)object);
                    // Raise a Subscribe Event - N.B. Need to do it *after* we've set the queueRef.
                    _agent.raiseEvent(subscription.createSubscribeEvent());
                }
                else if (object instanceof Session)
                {
                    subscription.setSessionRef(ref.getObjectId());
                }
            }
        }

        try
        {
            // If we've created new QmfAgentData we register it with the Agent.
            if (data != null)
            {
                _agent.addObject(data);
            }
        }
        catch (QmfException qmfe)
        {
            _log.error("QmfException caught in QmfManagementAgent.addObject()", qmfe);
        }

        child.addChangeListener(this);
    }


    /**
     * ConfigurationChangeListener method called when a child ConfiguredObject is removed.
     * <p>
     * This method checks the type of the child ConfiguredObject that has been removed and raises the appropriate
     * QMF2 Events, it then destroys the QMF2 Management Object and removes the mapping between child and the QMF Object.
     *
     * @param object the parent object that the child is being removed from.
     * @param child the child object being removed.
     */
    @Override
    public void childRemoved(final ConfiguredObject object, final ConfiguredObject child)
    {


        if (_log.isDebugEnabled())
        {
            _log.debug("childRemoved: " + child.getClass().getSimpleName() + "." + child.getName());
        }

        child.removeChangeListener(this);

        // Look up the associated QmfAgentData and mark it for deletion by the Agent.
        QmfAgentData data = _objects.get(child);

        if (data != null)
        {
            if (child instanceof Connection)
            {
                // Raise a Client Disconnect Event.
                _agent.raiseEvent(((org.apache.qpid.server.qmf2.agentdata.Connection)data).createClientDisconnectEvent());
            }
            else if (child instanceof Session)
            {
                // no-op, don't need to do anything specific when Session is removed.
            }
            else if (child instanceof Exchange)
            {
                // Raise an Exchange Delete Event.
                _agent.raiseEvent(((org.apache.qpid.server.qmf2.agentdata.Exchange)data).createExchangeDeleteEvent());
            }
            else if (child instanceof Queue)
            {
                // Raise a Queue Delete Event.
                _agent.raiseEvent(((org.apache.qpid.server.qmf2.agentdata.Queue)data).createQueueDeleteEvent());
            }
            else if (child instanceof Binding)
            {
                String eName = ((Binding)child).getExchange().getName();
                if (!eName.equals("<<default>>")) // Don't send Event for Unbinding from default direct.
                {
                    // Raise an Unbind Event.
                    _agent.raiseEvent(((org.apache.qpid.server.qmf2.agentdata.Binding)data).createUnbindEvent());
                }
            }
            else if (child instanceof Consumer)
            {
                // Raise an Unsubscribe Event.
                _agent.raiseEvent(((org.apache.qpid.server.qmf2.agentdata.Subscription)data).createUnsubscribeEvent());
            }

            data.destroy();
        }

        // Remove the mapping from the internal ConfiguredObject->QmfAgentData Map.
        _objects.remove(child);
    }

    // ******************************* QmfEventListener implementation method *******************************

    /**
     * Callback method triggered when the underlying QMF2 Agent has WorkItems available for processing.
     * The purpose of this method is mainly to handle the METHOD_CALL WorkItem and demultiplex &amp; delegate
     * to the invokeMethod() call on the relevant concrete QmfAgentData Object.
     * @param wi the WorkItem that has been passed by the QMF2 Agent to be processed here (mainly METHOD_CALL).
     */
    @Override
    public void onEvent(final WorkItem wi)
    {
        if (wi.getType() == METHOD_CALL)
        {
            MethodCallWorkItem item = (MethodCallWorkItem)wi;
            MethodCallParams methodCallParams = item.getMethodCallParams();

            String methodName = methodCallParams.getName();
            ObjectId objectId = methodCallParams.getObjectId();

            // Look up QmfAgentData by ObjectId from the Agent's internal Object store.
            QmfAgentData object = _agent.getObject(objectId);
            if (object == null)
            {
                _agent.raiseException(item.getHandle(), "No object found with ID=" + objectId);
            }
            else
            {
                // If we've found a valid QmfAgentData check it's a Broker or Queue and if so call the generic
                // invokeMethod on these objects, if not send an Exception as we don't support methods on
                // other classes yet.
                if (object instanceof org.apache.qpid.server.qmf2.agentdata.Broker)
                {
                    org.apache.qpid.server.qmf2.agentdata.Broker broker = 
                        (org.apache.qpid.server.qmf2.agentdata.Broker) object;
                    broker.invokeMethod(_agent, item.getHandle(), methodName, methodCallParams.getArgs());
                }
                else if (object instanceof org.apache.qpid.server.qmf2.agentdata.Queue)
                {
                    org.apache.qpid.server.qmf2.agentdata.Queue queue = 
                        (org.apache.qpid.server.qmf2.agentdata.Queue) object;
                    queue.invokeMethod(_agent, item.getHandle(), methodName, methodCallParams.getArgs());
                }
                else
                {
                    _agent.raiseException(item.getHandle(), "Unknown Method " + methodName + " on " + 
                                          object.getClass().getSimpleName());
                }
            }
        }
    }
}