diff options
Diffstat (limited to 'qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main')
14 files changed, 3242 insertions, 0 deletions
diff --git a/qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/assembly/LICENSE b/qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/assembly/LICENSE new file mode 100644 index 0000000000..6b108e5ecb --- /dev/null +++ b/qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/assembly/LICENSE @@ -0,0 +1,235 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + + +############################################### +# Third Party Dependency Licensing Information: +############################################### + +This product bundles the slf4j-api jar, which is under the MIT licence: + +Copyright (c) 2004-2013 QOS.ch + All rights reserved. + + Permission is hereby granted, free of charge, to any person obtaining + a copy of this software and associated documentation files (the + "Software"), to deal in the Software without restriction, including + without limitation the rights to use, copy, modify, merge, publish, + distribute, sublicense, and/or sell copies of the Software, and to + permit persons to whom the Software is furnished to do so, subject to + the following conditions: + + The above copyright notice and this permission notice shall be + included in all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE + LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +############################################### + diff --git a/qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/assembly/NOTICE b/qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/assembly/NOTICE new file mode 100644 index 0000000000..e53b688a9c --- /dev/null +++ b/qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/assembly/NOTICE @@ -0,0 +1,10 @@ +Apache Qpid QMF2 Java Broker Management Plugin +Copyright 2012-2014 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +############################################### + +Apache Geronimo JMS 1.1 Spec +Copyright 2003-2008 The Apache Software Foundation diff --git a/qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/assembly/dependency-verification/DEPENDENCIES_REFERENCE b/qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/assembly/dependency-verification/DEPENDENCIES_REFERENCE new file mode 100644 index 0000000000..92a50821b9 --- /dev/null +++ b/qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/assembly/dependency-verification/DEPENDENCIES_REFERENCE @@ -0,0 +1,44 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +#// ------------------------------------------------------------------ +# TRIMMED 3RD PARTY DEPENDENCY INFORMATION FOR MODIFICATION CHECKS +#// ------------------------------------------------------------------ + + + +From: 'Apache Software Foundation' (http://www.apache.org) + - JMS 1.1 (http://geronimo.apache.org/specs/geronimo-jms_1.1_spec) org.apache.geronimo.specs:geronimo-jms_1.1_spec:jar:1.1.1 + License: The Apache Software License, Version 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt) + +From: 'QOS.ch' (http://www.qos.ch) + - SLF4J API Module (http://www.slf4j.org) org.slf4j:slf4j-api:jar:1.6.4 + License: MIT License (http://www.opensource.org/licenses/mit-license.php) + +From: 'The Apache Software Foundation' (http://www.apache.org/) + - Qpid AMQP 0-x JMS Client (http://qpid.apache.org/qpid-java-build/qpid-client) org.apache.qpid:qpid-client:jar + License: The Apache Software License, Version 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt) + - Qpid Common (http://qpid.apache.org/qpid-java-build/qpid-common) org.apache.qpid:qpid-common:jar + License: The Apache Software License, Version 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt) + - Qpid QMF2 (http://qpid.apache.org/qpid-qmf2-parent/qpid-qmf2) org.apache.qpid:qpid-qmf2:jar + License: The Apache Software License, Version 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt) + + + + diff --git a/qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/assembly/qpid-broker-plugins-management-qmf2-bin.xml b/qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/assembly/qpid-broker-plugins-management-qmf2-bin.xml new file mode 100644 index 0000000000..29180fd248 --- /dev/null +++ b/qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/assembly/qpid-broker-plugins-management-qmf2-bin.xml @@ -0,0 +1,53 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd"> + <id>bin</id> + <formats> + <format>tar.gz</format> + </formats> + <baseDirectory>qpid-broker-plugins-management-qmf2/${project.version}</baseDirectory> + <fileSets> + <fileSet> + <outputDirectory>/</outputDirectory> + <includes> + <include>README.txt</include> + </includes> + <fileMode>0644</fileMode> + <directoryMode>0755</directoryMode> + </fileSet> + <fileSet> + <directory>${basedir}/src/main/assembly/</directory> + <outputDirectory>/</outputDirectory> + <includes> + <include>LICENSE</include> + <include>NOTICE</include> + </includes> + <fileMode>0644</fileMode> + <directoryMode>0755</directoryMode> + </fileSet> + </fileSets> + <dependencySets> + <dependencySet> + <outputDirectory>/lib</outputDirectory> + <useProjectArtifact>true</useProjectArtifact> + </dependencySet> + </dependencySets> +</assembly> + diff --git a/qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/java/org/apache/qpid/server/qmf2/QmfManagementAgent.java b/qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/java/org/apache/qpid/server/qmf2/QmfManagementAgent.java new file mode 100644 index 0000000000..1e10170700 --- /dev/null +++ b/qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/java/org/apache/qpid/server/qmf2/QmfManagementAgent.java @@ -0,0 +1,647 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.qmf2; + +// Misc Imports + +import static org.apache.qpid.qmf2.common.WorkItem.WorkItemType.METHOD_CALL; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.qpid.qmf2.agent.Agent; +import org.apache.qpid.qmf2.agent.MethodCallParams; +import org.apache.qpid.qmf2.agent.MethodCallWorkItem; +import org.apache.qpid.qmf2.agent.QmfAgentData; +import org.apache.qpid.qmf2.common.ObjectId; +import org.apache.qpid.qmf2.common.QmfEventListener; +import org.apache.qpid.qmf2.common.QmfException; +import org.apache.qpid.qmf2.common.WorkItem; +import org.apache.qpid.qmf2.util.ConnectionHelper; +import org.apache.qpid.server.model.Binding; +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.ConfigurationChangeListener; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.Connection; +import org.apache.qpid.server.model.Consumer; +import org.apache.qpid.server.model.Exchange; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.model.Session; +import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.model.VirtualHostNode; + +// Simple Logging Facade 4 Java +// QMF2 Imports +// Java Broker model Imports + +/** + * This class implements a QMF2 Agent providing access to the Java broker Management Objects via QMF2 thus + * allowing the Java Broker to be managed in the same way to the C++ Broker. + * <p> + * The intention is for the QmfManagementAgent to conform to the same Management Schema as the C++ Broker + * (e.g. as specified in management-schema.xml) in order to provide maximum cohesion between the + * two Broker implementations, however that's not entirely possible given differences between the underlying + * Management Models. + * <p> + * This Plugin attempts to map properties from the Java org.apache.qpid.server.model.* classes to equivalent + * properties and statistics in the C++ broker's Management Schema rather than expose them "natively", this is + * in order to try and maximise alignment between the two implementations and to try to allow the Java Broker + * to be managed by the Command Line tools used with the C++ Broker such as qpid-config etc. it's also to + * enable the Java Broker to be accessed via the QMF2 REST API and GUI. + * + * @author Fraser Adams + */ +public class QmfManagementAgent implements ConfigurationChangeListener, QmfEventListener +{ + private static final Logger _log = LoggerFactory.getLogger(QmfManagementAgent.class); + + // Set heartbeat interval to 10 seconds. TODO Should probably be config driven, but I *think* that this is + // different than "heartbeat.delay" and "heartbeat.timeoutFactor" currently present in the config? + private static final int HEARTBEAT_INTERVAL = 10; + private Agent _agent = null; + + // The first Connection Object relates to the QmfManagementAgent, we use this flag to avoid mapping that Connection + // to a QMF Object thus hiding it from Consoles. This is done to provide consistency with the C++ Broker which + // also "hides" its own private AMQP Connections, Queues & Bindings. + private boolean agentConnection = true; + + private final Broker<?> _broker; // Passed in by Plugin bootstrapping. + private final String _defaultVirtualHost; // Pulled from the broker attributes. + + /** + * A Map of QmfAgentData keyed by ConfiguredObject. This is mainly used for Management Object "lifecycle management". + * In an ideal world the Agent class could retain all information, but I want to track ConfiguredObject state and + * use that to create and delete QmfAgentData data, which means I need to be able to *find* the QmfAgentData and + * the *official* Agent API doesn't have a public method to query Objects (though the evaluateQuery() method used + * by Query Subscriptions could be used, but it's not really a "public API method" so probably best not to use it) + * Arguably this is what the AgentExternal subclass of Agent is for whereby queries are handled in the Agent + * implementation by handling "(wi.getType() == QUERY)" but the AgentExternal API forces some constructs that are + * actually likely to be less efficient, as an example sending a separate queryResponse() for each object forces a + * look up of a List of QmfAgentData objects keyed by the consoleHandle for each call. There is also the need to + * separately iterate through the List of QmfAgentData objects thus created to create the mapEncoded list needed + * for sending via the QMF2 protocol. + * <p> + * So rather than go through all that faff we simply retain an additional Map as below which allows navigation + * between the ConfiguredObject and QmfAgentData. The subclasses of QmfAgentData will contain references to + * allow navigation back to the concrete subclasses of ConfiguredObject if necessary. + * The capacity of 100 is pretty arbitrary but the default of 16 seems too low for a ManagementAgent. + */ + private Map<ConfiguredObject, QmfAgentData> _objects = new ConcurrentHashMap<ConfiguredObject, QmfAgentData>(100); + + /** + * Constructor. Creates the AMQP Connection to the Broker and starts the QMF2 Agent. + * @param url the Connection URL to be used to construct the AMQP Connection. + * @param broker the root Broker Management Object from which the other Management Objects may be obtained. + * to work without explicitly setting a Virtual Host, which I think is necessary because the C++ Broker and + * the python command line tools aren't currently Virtual Host aware (are they?). The intention is to mark + * queues and exchanges with {@literal [vhost:<vhost-name>/]<object-name>} in other words if we want to add things to + * the non-default Virtual Host prefix their names with {@literal [vhost:<vhost-name>/]}. This approach *ought* to allow + * non-Virtual Host aware command line tools the ability to add queues/exchanges to a particular vhost. + */ + public QmfManagementAgent(final String url, final Broker broker) + { + _broker = broker; + _defaultVirtualHost = broker.getDefaultVirtualHost(); + + try + { + // Create the actual JMS Connection. ConnectionHelper allows us to work with a variety of URL + // formats so we can abstract away from the somewhat complex Java AMQP URL format. + javax.jms.Connection connection = ConnectionHelper.createConnection(url); + if (connection == null) + { + _log.info("QmfManagementAgent Constructor failed due to null AMQP Connection"); + } + else + { + _agent = new Agent(this, HEARTBEAT_INTERVAL); + // Vendor and Product are deliberately set to be the same as for the C++ broker. + _agent.setVendor("apache.org"); + _agent.setProduct("qpidd"); + _agent.setConnection(connection); + + // Register the schema for the Management Objects. These don't have to be completely populated + // the minimum is to register package name and class name for the QmfAgentData. + _agent.registerObjectClass(org.apache.qpid.server.qmf2.agentdata.Broker.getSchema()); + _agent.registerObjectClass(org.apache.qpid.server.qmf2.agentdata.Connection.getSchema()); + _agent.registerEventClass(org.apache.qpid.server.qmf2.agentdata.Connection.getClientConnectSchema()); + _agent.registerEventClass(org.apache.qpid.server.qmf2.agentdata.Connection.getClientDisconnectSchema()); + + _agent.registerObjectClass(org.apache.qpid.server.qmf2.agentdata.Exchange.getSchema()); + _agent.registerEventClass(org.apache.qpid.server.qmf2.agentdata.Exchange.getExchangeDeclareSchema()); + _agent.registerEventClass(org.apache.qpid.server.qmf2.agentdata.Exchange.getExchangeDeleteSchema()); + + _agent.registerObjectClass(org.apache.qpid.server.qmf2.agentdata.Queue.getSchema()); + _agent.registerEventClass(org.apache.qpid.server.qmf2.agentdata.Queue.getQueueDeclareSchema()); + _agent.registerEventClass(org.apache.qpid.server.qmf2.agentdata.Queue.getQueueDeleteSchema()); + + _agent.registerObjectClass(org.apache.qpid.server.qmf2.agentdata.Binding.getSchema()); + _agent.registerEventClass(org.apache.qpid.server.qmf2.agentdata.Binding.getBindSchema()); + _agent.registerEventClass(org.apache.qpid.server.qmf2.agentdata.Binding.getUnbindSchema()); + + _agent.registerObjectClass(org.apache.qpid.server.qmf2.agentdata.Subscription.getSchema()); + _agent.registerEventClass(org.apache.qpid.server.qmf2.agentdata.Subscription.getSubscribeSchema()); + _agent.registerEventClass(org.apache.qpid.server.qmf2.agentdata.Subscription.getUnsubscribeSchema()); + + _agent.registerObjectClass(org.apache.qpid.server.qmf2.agentdata.Session.getSchema()); + + // Initialise QmfAgentData Objects and track changes to the broker Management Objects. + registerConfigurationChangeListeners(); + } + } + catch (QmfException qmfe) + { + _log.error("QmfException caught in QmfManagementAgent Constructor", qmfe); + _agent = null; // Causes isConnected() to be false and thus prevents the "QMF2 Management Ready" message. + } + catch (Exception e) + { + _log.error("Exception caught in QmfManagementAgent Constructor", e); + _agent = null; // Causes isConnected() to be false and thus prevents the "QMF2 Management Ready" message. + } + } + + /** + * Close the QmfManagementAgent clearing the QMF2 Agent and freeing its resources. + */ + public void close() + { + if (isConnected()) + { + _agent.destroy(); + } + } + + /** + * Returns whether the Agent is connected and running. + * @return true if the Agent is connected and running otherwise return false. + */ + public boolean isConnected() + { + return _agent != null; + } + + /** + * This method initialises the initial set of QmfAgentData Objects and tracks changes to the Broker Management + * Objects via the childAdded() method call. + */ + private void registerConfigurationChangeListeners() + { + childAdded(null, _broker); + + if (_log.isDebugEnabled()) + { + _log.debug("Registering model listeners for broker " + _broker); + } + + for (VirtualHostNode<?> vhostNode : _broker.getVirtualHostNodes()) + { + + if (_log.isDebugEnabled()) + { + _log.debug("Considering virtualhostnode " + vhostNode); + } + + VirtualHost<?,?,?> vhost = vhostNode.getVirtualHost(); + + // We don't add QmfAgentData VirtualHost objects. Possibly TODO, but it's a bit awkward at the moment + // because the C++ Broker doesn't *seem* to do much with them and the command line tools such + // as qpid-config don't appear to be VirtualHost aware. A way to stay compatible is to mark queues, + // exchanges etc with [vhost:<vhost-name>/]<object-name> (see Constructor comments). + + if (vhost != null) + { + vhost.addChangeListener(this); + + addListenersForConnectionsAndChildren(vhost); + addListenersForExchangesAndChildren(vhost); + addListenersForQueuesAndChildren(vhost); + } + } + + + if (_log.isDebugEnabled()) + { + _log.debug("Registered model listeners"); + } + + } + + private void addListenersForQueuesAndChildren(final VirtualHost<?, ?, ?> vhost) + { + for (Queue<?> queue : vhost.getQueues()) + { + boolean agentQueue = false; + for (Binding binding : queue.getBindings()) + { + String key = binding.getName(); + if (key.equals("broker") || key.equals("console.request.agent_locate") || + key.startsWith("apache.org:qpidd:")) + { + agentQueue = true; + break; + } + } + + // Don't add QMF related bindings or Queues in registerConfigurationChangeListeners as those will + // relate to the Agent itself and we want to "hide" those to be consistent with the C++ Broker. + if (!agentQueue) + { + childAdded(vhost, queue); + + for (Binding binding : queue.getBindings()) + { + childAdded(queue, binding); + } + + for (Consumer subscription : queue.getChildren(Consumer.class)) + { + childAdded(queue, subscription); + } + } + } + } + + private void addListenersForExchangesAndChildren(final VirtualHost<?, ?, ?> vhost) + { + // The code blocks for adding Bindings (and adding Queues) contain checks to see if what is being added + // relates to Queues or Bindings for the QmfManagementAgent. If they are QmfManagementAgent related + // we avoid registering the Object as a QMF Object, in other words we "hide" QmfManagementAgent QMF Objects. + // This is done to be consistent with the C++ broker which also "hides" its own Connection, Queue & Binding. + for (Exchange<?> exchange : vhost.getExchanges()) + { + childAdded(vhost, exchange); + + for (Binding binding : exchange.getBindings()) + { + String key = binding.getName(); + if (key.equals("broker") || key.equals("console.request.agent_locate") || + key.startsWith("apache.org:qpidd:") || key.startsWith("TempQueue")) + { // Don't add QMF related Bindings in registerConfigurationChangeListeners as those will relate + } // to the Agent and we want to "hide" those. + else + { + childAdded(exchange, binding); + } + } + } + } + + private void addListenersForConnectionsAndChildren(final VirtualHost<?, ?, ?> vhost) + { + for (Connection<?> connection : vhost.getConnections()) + { + childAdded(vhost, connection); + + for (Session<?> session : connection.getSessions()) + { + childAdded(connection, session); + + if (session.getConsumers() != null) + { + for (Consumer subscription : session.getConsumers()) + { + childAdded(session, subscription); + } + } + } + } + } + + + // ************************* ConfigurationChangeListener implementation methods ************************* + + /** + * ConfigurationChangeListener method called when the state is changed (ignored here). + * @param object the object being modified. + * @param oldState the state of the object prior to this method call. + * @param newState the desired state of the object. + */ + @Override + public void stateChanged(final ConfiguredObject object, final State oldState, final State newState) + { + // no-op + } + + /** + * ConfigurationChangeListener method called when an attribute is set (ignored here). + * @param object the object being modified. + * @param attributeName the name of the object attribute that we want to change. + * @param oldAttributeValue the value of the attribute prior to this method call. + * @param newAttributeValue the desired value of the attribute. + */ + @Override + public void attributeSet(ConfiguredObject object, String attributeName, + Object oldAttributeValue, Object newAttributeValue) + { + // no-op + } + + /** + * ConfigurationChangeListener method called when a child ConfiguredObject is added. + * <p> + * This method checks the type of the child ConfiguredObject that has been added and creates the equivalent + * QMF2 Management Object if one doesn't already exist. In most cases it's a one-to-one mapping, but for + * Binding for example the Binding child is added to both Queue and Exchange so we only create the Binding + * QMF2 Management Object once and add the queueRef and exchangeRef reference properties referencing the Queue + * and Exchange parent Objects respectively, Similarly for Consumer (AKA Subscription). + * <p> + * This method is also responsible for raising the appropriate QMF2 Events when Management Objects are created. + * @param object the parent object that the child is being added to. + * @param child the child object being added. + */ + @Override + public void childAdded(final ConfiguredObject object, final ConfiguredObject child) + { + + if (_log.isDebugEnabled()) + { + _log.debug("childAdded: " + child.getClass().getSimpleName() + "." + child.getName()); + } + + QmfAgentData data = null; + + // We current don't listen for new virtualhostnodes or new virtualhosts, so any new instances + // of these objects wont be seen through QMF until the Broker is restarted. + + if (child instanceof Broker) + { + data = new org.apache.qpid.server.qmf2.agentdata.Broker((Broker)child); + } + else if (child instanceof Connection) + { + if (!agentConnection && !_objects.containsKey(child)) + { + // If the parent object is the default vhost set it to null so that the Connection ignores it. + VirtualHost vhost = (object.getName().equals(_defaultVirtualHost)) ? null : (VirtualHost)object; + data = new org.apache.qpid.server.qmf2.agentdata.Connection(vhost, (Connection)child); + _objects.put(child, data); + + // Raise a Client Connect Event. + _agent.raiseEvent(((org.apache.qpid.server.qmf2.agentdata.Connection)data).createClientConnectEvent()); + } + agentConnection = false; // Only ignore the first Connection, which is the one from the Agent. + } + else if (child instanceof Session) + { + if (!_objects.containsKey(child)) + { + QmfAgentData ref = _objects.get(object); // Get the Connection QmfAgentData so we can get connectionRef. + if (ref != null) + { + data = new org.apache.qpid.server.qmf2.agentdata.Session((Session)child, ref.getObjectId()); + _objects.put(child, data); + } + } + } + else if (child instanceof Exchange) + { + if (!_objects.containsKey(child)) + { + // If the parent object is the default vhost set it to null so that the Connection ignores it. + VirtualHost vhost = (object.getName().equals(_defaultVirtualHost)) ? null : (VirtualHost)object; + data = new org.apache.qpid.server.qmf2.agentdata.Exchange(vhost, (Exchange)child); + _objects.put(child, data); + + // Raise an Exchange Declare Event. + _agent.raiseEvent(((org.apache.qpid.server.qmf2.agentdata.Exchange)data).createExchangeDeclareEvent()); + + } + } + else if (child instanceof Queue) + { + if (!_objects.containsKey(child)) + { + // If the parent object is the default vhost set it to null so that the Connection ignores it. + VirtualHost vhost = (object.getName().equals(_defaultVirtualHost)) ? null : (VirtualHost)object; + data = new org.apache.qpid.server.qmf2.agentdata.Queue(vhost, (Queue)child); + _objects.put(child, data); + + // Raise a Queue Declare Event. + _agent.raiseEvent(((org.apache.qpid.server.qmf2.agentdata.Queue)data).createQueueDeclareEvent()); + } + } + else if (child instanceof Binding) + { + // Bindings are a little more complex because in QMF bindings contain exchangeRef and queueRef properties + // whereas with the Java Broker model Binding is a child of Queue and Exchange. To cope with this we + // first try to create or retrieve the QMF Binding Object then add either the Queue or Exchange reference + // depending on whether Queue or Exchange was the parent of this addChild() call. + if (!_objects.containsKey(child)) + { + data = new org.apache.qpid.server.qmf2.agentdata.Binding((Binding)child); + _objects.put(child, data); + + String eName = ((Binding)child).getExchange().getName(); + if (!eName.equals("<<default>>")) // Don't send Event for Binding to default direct. + { + // Raise a Bind Event. + _agent.raiseEvent(((org.apache.qpid.server.qmf2.agentdata.Binding)data).createBindEvent()); + } + } + + org.apache.qpid.server.qmf2.agentdata.Binding binding = + (org.apache.qpid.server.qmf2.agentdata.Binding)_objects.get(child); + + QmfAgentData ref = _objects.get(object); + if (ref != null) + { + if (object instanceof Queue) + { + binding.setQueueRef(ref.getObjectId()); + } + else if (object instanceof Exchange) + { + binding.setExchangeRef(ref.getObjectId()); + } + } + } + else if (child instanceof Consumer) // AKA Subscription + { + // Subscriptions are a little more complex because in QMF Subscriptions contain sessionRef and queueRef + // properties whereas with the Java Broker model Consumer is a child of Queue and Session. To cope with + // this we first try to create or retrieve the QMF Subscription Object then add either the Queue or + // Session reference depending on whether Queue or Session was the parent of this addChild() call. + if (!_objects.containsKey(child)) + { + data = new org.apache.qpid.server.qmf2.agentdata.Subscription((Consumer)child); + _objects.put(child, data); + } + + org.apache.qpid.server.qmf2.agentdata.Subscription subscription = + (org.apache.qpid.server.qmf2.agentdata.Subscription)_objects.get(child); + + QmfAgentData ref = _objects.get(object); + if (ref != null) + { + if (object instanceof Queue) + { + subscription.setQueueRef(ref.getObjectId(), (Queue)object); + // Raise a Subscribe Event - N.B. Need to do it *after* we've set the queueRef. + _agent.raiseEvent(subscription.createSubscribeEvent()); + } + else if (object instanceof Session) + { + subscription.setSessionRef(ref.getObjectId()); + } + } + } + + try + { + // If we've created new QmfAgentData we register it with the Agent. + if (data != null) + { + _agent.addObject(data); + } + } + catch (QmfException qmfe) + { + _log.error("QmfException caught in QmfManagementAgent.addObject()", qmfe); + } + + child.addChangeListener(this); + } + + + /** + * ConfigurationChangeListener method called when a child ConfiguredObject is removed. + * <p> + * This method checks the type of the child ConfiguredObject that has been removed and raises the appropriate + * QMF2 Events, it then destroys the QMF2 Management Object and removes the mapping between child and the QMF Object. + * + * @param object the parent object that the child is being removed from. + * @param child the child object being removed. + */ + @Override + public void childRemoved(final ConfiguredObject object, final ConfiguredObject child) + { + + + if (_log.isDebugEnabled()) + { + _log.debug("childRemoved: " + child.getClass().getSimpleName() + "." + child.getName()); + } + + child.removeChangeListener(this); + + // Look up the associated QmfAgentData and mark it for deletion by the Agent. + QmfAgentData data = _objects.get(child); + + if (data != null) + { + if (child instanceof Connection) + { + // Raise a Client Disconnect Event. + _agent.raiseEvent(((org.apache.qpid.server.qmf2.agentdata.Connection)data).createClientDisconnectEvent()); + } + else if (child instanceof Session) + { + // no-op, don't need to do anything specific when Session is removed. + } + else if (child instanceof Exchange) + { + // Raise an Exchange Delete Event. + _agent.raiseEvent(((org.apache.qpid.server.qmf2.agentdata.Exchange)data).createExchangeDeleteEvent()); + } + else if (child instanceof Queue) + { + // Raise a Queue Delete Event. + _agent.raiseEvent(((org.apache.qpid.server.qmf2.agentdata.Queue)data).createQueueDeleteEvent()); + } + else if (child instanceof Binding) + { + String eName = ((Binding)child).getExchange().getName(); + if (!eName.equals("<<default>>")) // Don't send Event for Unbinding from default direct. + { + // Raise an Unbind Event. + _agent.raiseEvent(((org.apache.qpid.server.qmf2.agentdata.Binding)data).createUnbindEvent()); + } + } + else if (child instanceof Consumer) + { + // Raise an Unsubscribe Event. + _agent.raiseEvent(((org.apache.qpid.server.qmf2.agentdata.Subscription)data).createUnsubscribeEvent()); + } + + data.destroy(); + } + + // Remove the mapping from the internal ConfiguredObject->QmfAgentData Map. + _objects.remove(child); + } + + // ******************************* QmfEventListener implementation method ******************************* + + /** + * Callback method triggered when the underlying QMF2 Agent has WorkItems available for processing. + * The purpose of this method is mainly to handle the METHOD_CALL WorkItem and demultiplex & delegate + * to the invokeMethod() call on the relevant concrete QmfAgentData Object. + * @param wi the WorkItem that has been passed by the QMF2 Agent to be processed here (mainly METHOD_CALL). + */ + @Override + public void onEvent(final WorkItem wi) + { + if (wi.getType() == METHOD_CALL) + { + MethodCallWorkItem item = (MethodCallWorkItem)wi; + MethodCallParams methodCallParams = item.getMethodCallParams(); + + String methodName = methodCallParams.getName(); + ObjectId objectId = methodCallParams.getObjectId(); + + // Look up QmfAgentData by ObjectId from the Agent's internal Object store. + QmfAgentData object = _agent.getObject(objectId); + if (object == null) + { + _agent.raiseException(item.getHandle(), "No object found with ID=" + objectId); + } + else + { + // If we've found a valid QmfAgentData check it's a Broker or Queue and if so call the generic + // invokeMethod on these objects, if not send an Exception as we don't support methods on + // other classes yet. + if (object instanceof org.apache.qpid.server.qmf2.agentdata.Broker) + { + org.apache.qpid.server.qmf2.agentdata.Broker broker = + (org.apache.qpid.server.qmf2.agentdata.Broker) object; + broker.invokeMethod(_agent, item.getHandle(), methodName, methodCallParams.getArgs()); + } + else if (object instanceof org.apache.qpid.server.qmf2.agentdata.Queue) + { + org.apache.qpid.server.qmf2.agentdata.Queue queue = + (org.apache.qpid.server.qmf2.agentdata.Queue) object; + queue.invokeMethod(_agent, item.getHandle(), methodName, methodCallParams.getArgs()); + } + else + { + _agent.raiseException(item.getHandle(), "Unknown Method " + methodName + " on " + + object.getClass().getSimpleName()); + } + } + } + } +} diff --git a/qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/java/org/apache/qpid/server/qmf2/QmfManagementPlugin.java b/qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/java/org/apache/qpid/server/qmf2/QmfManagementPlugin.java new file mode 100644 index 0000000000..f9ab9abd0c --- /dev/null +++ b/qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/java/org/apache/qpid/server/qmf2/QmfManagementPlugin.java @@ -0,0 +1,36 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.qmf2; + +import org.apache.qpid.server.model.ManagedAttribute; +import org.apache.qpid.server.model.ManagedObject; +import org.apache.qpid.server.model.Plugin; + +@ManagedObject( category = false, type = "MANAGEMENT-QMF2" ) +public interface QmfManagementPlugin<X extends QmfManagementPlugin<X>> extends Plugin<X> +{ + + // attributes + String CONNECTION_URL = "connectionURL"; + + @ManagedAttribute(defaultValue = "amqp://guest:guest@/?brokerlist='tcp://0.0.0.0:5672'") + String getConnectionURL(); +} diff --git a/qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/java/org/apache/qpid/server/qmf2/QmfManagementPluginImpl.java b/qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/java/org/apache/qpid/server/qmf2/QmfManagementPluginImpl.java new file mode 100644 index 0000000000..52ea0afa10 --- /dev/null +++ b/qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/java/org/apache/qpid/server/qmf2/QmfManagementPluginImpl.java @@ -0,0 +1,242 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.qmf2; + +// Misc Imports +import java.util.HashMap; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.server.logging.messages.ManagementConsoleMessages; +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.Exchange; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.ManagedAttributeField; +import org.apache.qpid.server.model.ManagedObjectFactoryConstructor; +import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.StateTransition; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.model.VirtualHostNode; +import org.apache.qpid.server.model.adapter.AbstractPluginAdapter; + +// Simple Logging Facade 4 Java +// Java Broker Management Imports + +/** + * This class is a Qpid Java Broker Plugin which follows the Plugin API added in Qpid 0.22 it implements + * org.apache.qpid.server.model.Plugin and extends org.apache.qpid.server.model.adapter.AbstractPluginAdapter. + * <p> + * This Plugin provides access to the Java Broker Management Objects via QMF2 thus allowing the Java Broker to + * be managed and monitored in the same way as the C++ Broker. + * <p> + * The intention is for the Java Broker QmfManagementPlugin to conform to the same Management Schema as the C++ + * Broker (e.g. as specified in the management-schema.xml) in order to provide maximum cohesion between the + * two Broker implementations, however that's not entirely possible given differences between the underlying + * Management Models. The ultimate aim is to align the Management Models of the two Qpid Brokers and migrate + * to the AMQP 1.0 Management architecture when it becomes available. + * <p> + * This Plugin attempts to map properties from the Java org.apache.qpid.server.model.* classes to equivalent + * properties and statistics in the C++ Broker's Management Schema rather than expose them "natively", this is + * in order to try and maximise alignment between the two implementations and to try to allow the Java Broker + * to be managed by the Command Line tools used with the C++ Broker such as qpid-config etc. it's also to + * enable the Java Broker to be accessed via the QMF2 REST API and GUI. + * <p> + * This class only bootstraps the ManagementPlugin, the actual business logic is run from QmfManagementAgent. + * It's worth also mentioning that this Plugin actually establishes an AMQP Connection to the Broker via JMS. + * As it's a Broker Plugin it could conceivably use the low level Broker internal transport, this would probably + * be a little more efficient, but OTOH by using the JMS based approach I can use the QMF2 Agent code + * directly and implementing a complete QMF2 Agent for the Java Broker becomes "fairly simple" only requiring + * mappings between the org.apache.qpid.server.model.* classes and their QmfAgentData equivalents. + * <p> + * This Plugin requires config to be set, if this is not done the Plugin will not bootstrap. Config may be + * set in $QPID_WORK/config.json as part of the "plugins" config e.g. + * <pre> + * "plugins" : [ { + * "id" : "26887211-842c-3c4a-ab09-b1a1f64de369", + * "name" : "qmf2Management", + * "pluginType" : "MANAGEMENT-QMF2", + * "connectionURL" : "amqp://guest:guest@/?brokerlist='tcp://0.0.0.0:5672'" + * }] + * </pre> + * @author Fraser Adams + */ +public class QmfManagementPluginImpl extends AbstractPluginAdapter<QmfManagementPluginImpl> implements QmfManagementPlugin<QmfManagementPluginImpl> +{ + private static final Logger _log = LoggerFactory.getLogger(QmfManagementPluginImpl.class); + + private static final String OPERATIONAL_LOGGING_NAME = "QMF2"; + + /************* Static initialiser used to implement org.apache.qpid.server.model.Plugin *************/ + + public static final String PLUGIN_TYPE = "MANAGEMENT-QMF2"; + public static final String QMF_DEFAULT_DIRECT = "qmf.default.direct"; + public static final String QMF_DEFAULT_TOPIC = "qmf.default.topic"; + + + /************************************ End of Static initialiser *************************************/ + + private final Broker<?> _broker; // Passed in by Plugin bootstrapping. + private String _defaultVirtualHost; // Pulled from the broker attributes. + + @ManagedAttributeField + private String _connectionURL; // Pulled from the Plugin config. + private QmfManagementAgent _agent; + + /** + * Constructor, called at broker startup by QmfManagementFactory.createInstance(). + * @param attributes a Map containing configuration information for the Plugin. + * @param broker the root Broker Management Object from which the other Management Objects may be obtained. + */ + @ManagedObjectFactoryConstructor + public QmfManagementPluginImpl(Map<String, Object> attributes, Broker broker) + { + super(attributes, broker); + _broker = broker; + } + + @Override + protected void onOpen() + { + super.onOpen(); + _defaultVirtualHost = _broker.getDefaultVirtualHost(); + } + + + /** + * Start the Plugin. Note that we bind the QMF Connection the the default Virtual Host, this is important + * in order to allow C++ or Python QMF Consoles to control the Java Broker, as they know nothing of Virtual + * Hosts and their Connection URL formats don't have a mechanism to specify Virtual Hosts. + * <p> + * Note too that it may be necessary to create the "qmf.default.direct" and "qmf.default.topic" exchanges + * as these don't exist by default on the Java Broker, however we have to check if they already exist + * as attempting to add an Exchange that already exists will cause IllegalArgumentException. + */ + @StateTransition( currentState = State.UNINITIALIZED, desiredState = State.ACTIVE ) + private void doStart() + { + // Log "QMF2 Management Startup" message. + getBroker().getEventLogger().message(ManagementConsoleMessages.STARTUP(OPERATIONAL_LOGGING_NAME)); + + // Wrap main startup logic in a try/catch block catching Exception. The idea is that if anything goes + // wrong with QmfManagementPlugin startup it shouldn't fatally prevent the Broker from starting, though + // clearly QMF2 management will not be available. + try + { + // Iterate through the Virtual Hosts looking for the default Virtual Host. When we find the default + // we create the QMF exchanges then construct the QmfManagementAgent passing it the ConnectionURL. + boolean foundDefaultVirtualHost = false; + + for(VirtualHostNode<?> node : _broker.getVirtualHostNodes()) + { + VirtualHost<?, ?, ?> vhost = node.getVirtualHost(); + + + if (vhost != null && vhost.getName().equals(_defaultVirtualHost)) + { + foundDefaultVirtualHost = true; + + // Create the QMF2 exchanges if necessary. + if (vhost.getChildByName(Exchange.class, QMF_DEFAULT_DIRECT) == null) + { + Map<String, Object> attributes = new HashMap<>(); + attributes.put(Exchange.NAME, QMF_DEFAULT_DIRECT); + attributes.put(Exchange.TYPE, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); + attributes.put(Exchange.STATE, State.ACTIVE); + attributes.put(Exchange.LIFETIME_POLICY, LifetimePolicy.PERMANENT); + attributes.put(Exchange.DURABLE, true); + vhost.createExchange(attributes); + } + + if (vhost.getChildByName(Exchange.class, QMF_DEFAULT_TOPIC) == null) + { + Map<String, Object> attributes = new HashMap<>(); + attributes.put(Exchange.NAME, QMF_DEFAULT_TOPIC); + attributes.put(Exchange.TYPE, ExchangeDefaults.TOPIC_EXCHANGE_CLASS); + attributes.put(Exchange.STATE, State.ACTIVE); + attributes.put(Exchange.LIFETIME_POLICY, LifetimePolicy.PERMANENT); + attributes.put(Exchange.DURABLE, true); + vhost.createExchange(attributes); + } + + // Now create the *real* Agent which maps Broker Management Objects to QmdAgentData Objects. + _agent = new QmfManagementAgent(_connectionURL, _broker); + } + + + } + // If we can't find a defaultVirtualHost we log that fact, the Plugin can't start in this case. + // Question. If defaultVirtualHost isn't configured or it doesn't match the name of one of the actual + // Virtual Hosts should we make the first one we find the de facto default for this Plugin?? + if (!foundDefaultVirtualHost) + { + _log.info("QmfManagementPlugin.start() could not find defaultVirtualHost"); + } + else if (_agent.isConnected()) + { + // Log QMF2 Management Ready message. + getBroker().getEventLogger().message(ManagementConsoleMessages.READY(OPERATIONAL_LOGGING_NAME)); + } + } + catch (Exception e) // Catch and log any Exception so we avoid Plugin failures stopping Broker startup. + { + _log.error("Exception caught in QmfManagementPlugin.start()", e); + } + } + + /** + * Stop the Plugin, closing the QMF Connection and logging "QMF2 Management Stopped". + */ + @StateTransition( currentState = State.ACTIVE, desiredState = State.STOPPED ) + private void doStop() + { + // When the Plugin state gets set to STOPPED we close the QMF Connection. + if (_agent != null) + { + _agent.close(); + } + + // Log "QMF2 Management Stopped" message (may not get displayed). + getBroker().getEventLogger().message(ManagementConsoleMessages.STOPPED(OPERATIONAL_LOGGING_NAME)); + } + + @Override + protected void onClose() + { + super.onClose(); + if (_agent != null) + { + _agent.close(); + } + } + + /** + * Accessor to retrieve the connectionURL attribute. + * @return the JMS connectionURL of the Plugin. + */ + public String getConnectionURL() + { + return _connectionURL; + } +} diff --git a/qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/java/org/apache/qpid/server/qmf2/agentdata/Binding.java b/qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/java/org/apache/qpid/server/qmf2/agentdata/Binding.java new file mode 100644 index 0000000000..e65d2734a7 --- /dev/null +++ b/qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/java/org/apache/qpid/server/qmf2/agentdata/Binding.java @@ -0,0 +1,204 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.qmf2.agentdata; + +// Misc Imports +import java.util.Collections; +import java.util.Map; + +// Simple Logging Facade 4 Java +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +// QMF2 Imports +import org.apache.qpid.qmf2.agent.QmfAgentData; +import org.apache.qpid.qmf2.common.QmfEvent; +import org.apache.qpid.qmf2.common.ObjectId; +import org.apache.qpid.qmf2.common.QmfEvent; +import org.apache.qpid.qmf2.common.SchemaEventClass; +//import org.apache.qpid.qmf2.common.SchemaMethod; +import org.apache.qpid.qmf2.common.SchemaObjectClass; +//import org.apache.qpid.qmf2.common.SchemaProperty; + +/** + * This class provides a concrete implementation of QmfAgentData for the Binding Management Object. + * In general it's possible to use QmfAgentData without sub-classing as it's really a "bean" style class + * that retains its properties in a Map, but in the case of the Java Broker Management Agent it's useful + * to sub-class as we need to map between the properties/statistics as specified in the Java Broker + * management model and those specified in qpid/spec/management-schema.xml which is what the C++ broker + * uses. This class retains a reference to its peer org.apache.qpid.server.model.Binding and does the + * necessary mapping when its mapEncode() method is called (which is used to serialise the QmfAgentData). + * + * @author Fraser Adams + */ +public class Binding extends QmfAgentData +{ + private static final Logger _log = LoggerFactory.getLogger(Binding.class); + + /** + * This static initialiser block initialises the QMF2 Schema information needed by the Agent to find + * QmfAgentData and QmfEvent Objects of a given type. + */ + private static final SchemaObjectClass _schema; + private static final SchemaEventClass _bindSchema; + private static final SchemaEventClass _unbindSchema; + + /** + * Returns the schema for the Binding class. + * @return the SchemaObjectClass for the Binding class. + */ + public static SchemaObjectClass getSchema() + { + return _schema; + } + + /** + * Returns the schema for the Bind Event. + * @return the SchemaEventClass for the Bind Event. + */ + public static SchemaEventClass getBindSchema() + { + return _bindSchema; + } + + /** + * Returns the schema for the Unbind Event. + * @return the SchemaEventClass for the Unbind Event. + */ + public static SchemaEventClass getUnbindSchema() + { + return _unbindSchema; + } + + static + { + // Declare the schema for the QMF2 broker class. + _schema = new SchemaObjectClass("org.apache.qpid.broker", "binding"); + + // TODO + //_schema.addProperty(new SchemaProperty("whatHappened", QmfType.TYPE_STRING)); + + // Declare the schema for the QMF2 bind Event class. + _bindSchema = new SchemaEventClass("org.apache.qpid.broker", "bind"); + + // Declare the schema for the QMF2 unbind Event class. + _unbindSchema = new SchemaEventClass("org.apache.qpid.broker", "unbind"); + } + // End of static initialiser. + + private final org.apache.qpid.server.model.Binding _binding; + + /** + * Constructor. + * @param binding the Binding ConfiguredObject from the broker model. + */ + @SuppressWarnings("unchecked") + public Binding(final org.apache.qpid.server.model.Binding binding) + { + super(getSchema()); + _binding = binding; // Will eventually be used in mapEncode() to retrieve statistics. + setValue("bindingKey", binding.getName()); + + Map<String, Object> arguments = binding.getArguments(); + // Only add arguments property if the bindings have arguments otherwise + // set to empty Map to be consistent with C++ Broker. + if (arguments != null && arguments.size() > 0) + { + setValue("arguments", arguments); + } + else + { + setValue("arguments", Collections.EMPTY_MAP); + } + // origin not implemented in Java Broker - not really sure what the origin property means anyway??? + } + + /** + * Set the exchangeRef property. + * @param exchangeRef the exchangeRef ObjectId. + */ + public void setExchangeRef(final ObjectId exchangeRef) + { + setRefValue("exchangeRef", exchangeRef); + } + + /** + * Set the queueRef property. + * @param queueRef the queueRef ObjectId. + */ + public void setQueueRef(final ObjectId queueRef) + { + setRefValue("queueRef", queueRef); + } + + /** + * Factory method to create a Bind Event Object with timestamp of now. + * @return the newly created Bind Event Object. + */ + public QmfEvent createBindEvent() + { + QmfEvent bind = new QmfEvent(_bindSchema); + bind.setSeverity("info"); + bind.setValue("args", _binding.getArguments()); + bind.setValue("exName", _binding.getExchange().getName()); + bind.setValue("key", _binding.getName()); + bind.setValue("qName", _binding.getQueue().getName()); + // TODO Not sure of a way to get these for Java Broker Exchange. + //bind.setValue("rhost", _connection.getName()); + //bind.setValue("user", getStringValue("authIdentity")); + return bind; + } + + /** + * Factory method to create an Unbind Event Object with timestamp of now. + * @return the newly created Unbind Event Object. + */ + public QmfEvent createUnbindEvent() + { + QmfEvent unbind = new QmfEvent(_unbindSchema); + unbind.setSeverity("info"); + unbind.setValue("exName", _binding.getExchange().getName()); + unbind.setValue("key", _binding.getName()); + unbind.setValue("qName", _binding.getQueue().getName()); + // TODO Not sure of a way to get these for Java Broker Exchange. + //unbind.setValue("rhost", _connection.getName()); + //unbind.setValue("user", getStringValue("authIdentity")); + return unbind; + } + + /** + * This method maps the org.apache.qpid.server.model.Binding to QMF2 broker properties where possible then + * serialises into the underlying Map for transmission via AMQP. This method is called by handleQueryRequest() + * in the org.apache.qpid.qmf2.agent.Agent class implementing the main QMF2 Agent behaviour. + * + * @return the underlying map. + */ + @Override + public Map<String, Object> mapEncode() + { + // Statistics + setValue("msgMatched", _binding.getMatches()); + + update(); // TODO only set update if a statistic has actually changed value. + return super.mapEncode(); + } +} diff --git a/qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/java/org/apache/qpid/server/qmf2/agentdata/Broker.java b/qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/java/org/apache/qpid/server/qmf2/agentdata/Broker.java new file mode 100644 index 0000000000..5c4dcc6b06 --- /dev/null +++ b/qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/java/org/apache/qpid/server/qmf2/agentdata/Broker.java @@ -0,0 +1,699 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.qmf2.agentdata; + +// Misc Imports + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.qpid.qmf2.agent.Agent; +import org.apache.qpid.qmf2.agent.QmfAgentData; +import org.apache.qpid.qmf2.common.Handle; +import org.apache.qpid.qmf2.common.ObjectId; +import org.apache.qpid.qmf2.common.QmfData; +import org.apache.qpid.qmf2.common.SchemaObjectClass; +import org.apache.qpid.server.model.Binding; +import org.apache.qpid.server.model.Exchange; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.model.Protocol; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.Transport; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.model.VirtualHostNode; + +// Simple Logging Facade 4 Java +// QMF2 Imports +/*import org.apache.qpid.qmf2.common.QmfEvent; +import org.apache.qpid.qmf2.common.QmfEventListener; +import org.apache.qpid.qmf2.common.QmfException; +import org.apache.qpid.qmf2.common.QmfType;*/ +// Java Broker model Imports + +/** + * This class provides a concrete implementation of QmfAgentData for the Broker Management Object. + * In general it's possible to use QmfAgentData without sub-classing as it's really a "bean" style class + * that retains its properties in a Map, but in the case of the Java Broker Management Agent it's useful + * to sub-class as we need to map between the properties/statistics as specified in the Java Broker + * management model and those specified in qpid/spec/management-schema.xml which is what the C++ broker + * uses. This class retains a reference to its peer org.apache.qpid.server.model.Broker and does the + * necessary mapping when its mapEncode() method is called (which is used to serialise the QmfAgentData). + * + * @author Fraser Adams + */ +public class Broker extends QmfAgentData +{ + private static final Logger _log = LoggerFactory.getLogger(Broker.class); + + /** + * This static initialiser block initialises the QMF2 Schema information needed by the Agent to find + * QmfAgentData Objects of a given type. + */ + private static final SchemaObjectClass _schema; + public static SchemaObjectClass getSchema() + { + return _schema; + } + + static + { + // Declare the schema for the QMF2 broker class. + _schema = new SchemaObjectClass("org.apache.qpid.broker", "broker"); + + // TODO + //_schema.addProperty(new SchemaProperty("whatHappened", QmfType.TYPE_STRING)); + } + + private final org.apache.qpid.server.model.Broker<?> _broker; // Passed in by Plugin bootstrapping. + private final String _defaultVirtualHost; // Pulled from the broker attributes. + + /** + * This inner class parses the name String that was passed in as a QMF method argument. + * There are a few quirks with this name. In the first instance it may be prefixed with Virtual Host information + * e.g. [vhost:<vhost-name>/]<queue-name> this is because in order to allow Java Broker QMF to work with things like + * qpid-config which are not particularly Virtual Host aware prefixing the names seemed to be the most natural, + * if slightly ugly approach. In addition the way bindings are named is of the form: + * <exchange-name>/<queue-name>[/<binding-key>] so we need a mechanism to parse the relevant information. + * <p> + * N.B. the parsing that takes place in this class makes an assumption that there are no valid exchange or queue + * names that contain a "/". This is probably a reasonable assumption given the way that a binding name is + * constructed, but it's worth recording the restriction here in case such a beast crops up. + * <p> + * This class also provides accessors that allow the Exchange, Queue and Binding ConfiguredObjects for the + * name parsed in the Constructor to be retrieved. This is generally useful because although in QMF the create + * and delete methods are invoked on the Broker object in the Java Broker ConfiguredObject model the underlying + * methods are distributed across a number of different classes. + */ + private class NameParser + { + private String _vhostName; + private VirtualHost<?,?,?> _vhost = null; + private String _exchangeName = ""; + private Exchange<?> _exchange = null; + private String _queueName = ""; + private Queue _queue = null; + private String _bindingKey = ""; + private Binding _binding = null; + + /** + * NameParser Constructor. + * The Constructor actually does the majority of the parsing, the remaining method are largely just accessors. + * + * @param name the name argument that was retrieved from the QMF method inArgs. This will be the exchange name, + * the queue name or the binding name (which is of the form <exchange-name>/<queue-name>[/<binding-key>]) + * exchange and queue names may be prefixed by a Virtual Host name e.g. [vhost:<vhost-name>/]<queue-name> + * @param type the type argument that was retrieved from the QMF method inArgs. Valid types are "exchange, + * "queue" or "binding". + */ + public NameParser(final String name, final String type) + { + boolean malformedVHostName = false; + String[] splitName = name.split("/"); // A slash is used as a separator in a couple of scenarios. + if (name.startsWith("vhost:")) + { + if (splitName.length == 1) // If it starts with vhost: the name should also contain at least one "/". + { + malformedVHostName = true; + _vhostName = name; + } + else + { + _vhostName = splitName[0]; + _vhostName = _vhostName.substring(6, _vhostName.length()); + } + } + else + { + _vhostName = _defaultVirtualHost; + } + + // If the vhostName isn't malformed then try to find the actual Virtual Host that it relates to. + // If it is malformed the vhost stays set to null, which will cause an exception to be returned later. + if (!malformedVHostName) + { + for (VirtualHostNode<?> vhostNode : _broker.getVirtualHostNodes()) + { + VirtualHost<?,?,?> vhost = vhostNode.getVirtualHost(); + if (vhost.getName().equals(_vhostName)) + { + _vhost = vhost; + break; + } + } + } + + // Populate the exchange, queue and binding names. We only populate the names in the constructor + // when we actually want to find the Object associated with the name we do it "on demand" in the + // relevant accessor and cache the result. + if (type.equals("exchange")) + { + _exchangeName = splitName[splitName.length - 1]; + } + else if (type.equals("queue")) + { + _queueName = splitName[splitName.length - 1]; + } + else if (type.equals("binding")) + { // TODO is there a way to make this parse less nasty and a bit more elegant.... + int i = 0; + String vhost1Name = _defaultVirtualHost; // The exchange and queue vhostName need to be the same. + if (splitName[i].startsWith("vhost:")) // Does the exchange name specify a vhost? + { + vhost1Name = splitName[i]; + i++; + } + + if (i < splitName.length) // Extract the exchange name sans vhost part. + { + _exchangeName = splitName[i]; + i++; + } + + String vhost2Name = _defaultVirtualHost; + if (i < splitName.length && splitName[i].startsWith("vhost:")) // Does the queue name specify a vhost? + { + vhost2Name = splitName[i]; + i++; + } + + // If the exchange and queue vhost names differ set _vhost and _vhostName to null which causes + // an exception that says "VirtualHost names for exchange and queue must match.". + if (!vhost2Name.equals(vhost1Name)) + { + _vhost = null; + _vhostName = null; + } + + if (i < splitName.length) // Extract the queue name sans vhost part. + { + _queueName = splitName[i]; + i++; + } + + if (i < splitName.length) // Extract the binding key if present (it's optional). + { + _bindingKey = splitName[i]; + i++; + } + } // End of binding name parse. + } + + // NameParser accessors. + + /** + * Retrieves the name of the Virtual Host that was parsed from the name supplied in the Constructor. + * @return the parsed Virtual Host name (may be an empty String). + */ + public String getVirtualHostName() + { + return _vhostName; + } + + /** + * Retrieves the Virtual Host with the name that was parsed from the name supplied in the Constructor. + * @return the Virtual Host with the name that was parsed from the name supplied in the Constructor (may be null). + */ + public VirtualHost getVirtualHost() + { + return _vhost; + } + + /** + * Retrieves the name of the Exchange that was parsed from the name supplied in the Constructor. + * @return the parsed Exchange name (may be an empty String). + */ + public String getExchangeName() + { + return _exchangeName; + } + + /** + * Retrieves the Exchange with the name that was parsed from the name supplied in the Constructor. + * @return the Exchange with the name that was parsed from the name supplied in the Constructor (may be null). + */ + public Exchange getExchange() + { + // If we've not previously cached the _exchange and the previously parsed Virtual Host isn't null we do a + // look up for the actual Exchange with the name _exchangeName and cache it. + if (_exchange == null && _vhost != null) + { + for (Exchange exchange : _vhost.getExchanges()) + { + if (exchange.getName().equals(_exchangeName)) + { + _exchange = exchange; + break; + } + } + } + + return _exchange; + } + + /** + * Retrieves the name of the Queue that was parsed from the name supplied in the Constructor. + * @return the parsed Queue name (may be an empty String). + */ + public String getQueueName() + { + return _queueName; + } + + /** + * Retrieves the Queue with the name that was parsed from the name supplied in the Constructor. + * @return the Queue with the name that was parsed from the name supplied in the Constructor (may be null). + */ + public Queue getQueue() + { + // If we've not previously cached the _queue and the previously parsed Virtual Host isn't null we do a + // look up for the actual Queue with the name _queueName and cache it. + if (_queue == null && _vhost != null) + { + for (Queue queue : _vhost.getQueues()) + { + if (queue.getName().equals(_queueName)) + { + _queue = queue; + break; + } + } + } + + return _queue; + } + + /** + * Retrieves the name of the Binding that was parsed from the name supplied in the Constructor. + * @return the parsed Binding name (may be an empty String). + */ + public String getBindingKey() + { + return _bindingKey; + } + + /** + * Retrieves the Binding with the name that was parsed from the name supplied in the Constructor. + * @return the Binding with the name that was parsed from the name supplied in the Constructor (may be null). + */ + public Binding getBinding() + { + // In order to retrieve a Binding it's first necessary to get the Exchange (or Queue) ConfiguredObject. + _exchange = getExchange(); // Need to get it via the accessor as it's initialised by lazy evaluation. + + // If we've not previously cached the _binding and the previously retrieved Exchange isn't null we do a + // look up for the actual Binding with the name _bindingKey and cache it. + if (_binding == null && _exchange != null) + { + for (Binding binding : _exchange.getBindings()) + { + if (binding.getName().equals(_bindingKey)) + { + _binding = binding; + break; + } + } + } + + return _binding; + } + } // End of class NameParser + + /** + * Broker Constructor. + * @param broker the root Broker Management Object from which the other Management Objects may be obtained. + */ + public Broker(final org.apache.qpid.server.model.Broker broker) + { + super(getSchema()); + + _broker = broker; + _defaultVirtualHost = broker.getDefaultVirtualHost(); + int amqpPort = 5672; // Default AMQP Port. + + // Search through the available Ports on this Broker looking for the AMQP Port using the TCP Transport + // and record that in amqpPort. N.B. The Java Broker supports several Protocol and Transport types so + // it might be good to return more detailed info, but for QMF the "port" property description for the + // Broker Object says "TCP Port for AMQP Service" so doing anything fancier might break consumers. + // TODO The C++ and Java Brokers should really return consistent information. + for (Port<?> port : _broker.getPorts()) + { + boolean isAMQP = false; + boolean isTCP = false; + + for (Protocol protocol : port.getProtocols()) + { + isAMQP = protocol.isAMQP(); + if (isAMQP) + { + break; + } + } + + for (Transport transport : port.getTransports()) + { + isTCP = (transport == Transport.TCP); + if (isTCP) + { + break; + } + } + + if (isAMQP && isTCP) + { + amqpPort = port.getPort(); + break; + } + } + + // systemRef is ignored in this implementation. + // stagingThreshold is ignored in this implementation (it's deprecated anyway I believe). + + // Use this name to be fairly consistent with C++ broker which uses "amqp-broker". + // N.B. although it's useful to be able to distinguish between C++ and Java brokers note that the + // _object_name in the ObjectId that we set below uses actually uses "amqp-broker" vice "amqp-java-broker", + // this is because qpid-config uses a "hardcoded" ObjectId to invoke methods so we need to use the same name. + setValue("name", "amqp-java-broker"); + setValue("port", amqpPort); + + // workerThreads doesn't *appear* to be configurable in the Java Broker, looks like there's no pool and the + // Threads just grow with the number of Connections? + setValue("workerThreads", 0); + + // maxConns doesn't *appear* to be configurable in the Java Broker. + setValue("maxConns", 0); + + // The Java Broker ServerSocket seems to be created in org.apache.qpid.transport.network.io.IoNetworkTransport + // In AcceptingThread. The does't appear to use any configuration for ServerSocket, which suggests that the + // backlog is the default value which is assumed to be 10. + setValue("connBacklog", 10); + + // "Technically" this isn't quite the same as for the C++ broker, which pushes management data to a particular + // topic - the subscription "qmf.default.topic/agent.ind.#" grabs that plus heartbeats for the C++ broker. + // This Agent allows the use of the QMF2 Query Subscriptions (which the C++ broker does not!! - the Console + // class fakes this client side for the C++ broker. TODO make sure that the Console does not fake for Java broker. + setValue("mgmtPublish", true); + setValue("mgmtPubInterval", 10); + + setValue("version", org.apache.qpid.common.QpidProperties.getReleaseVersion()); + setValue("dataDir", System.getProperty("QPID_WORK")); + + // ObjectId needs to be set here in Broker because the QMF2 version of qpid-config uses a hardcoded + // _object_name of "org.apache.qpid.broker:broker:amqp-broker" in the _object_id that it sets. + // It *shouldn't* do this and should really use the _object_id of the broker object returned by + // getObjects("broker"), but it does. The following line causes the Agent to use the explicit + // ObjectId below rather than constructing its own, which fixes the qpid-config issue. + // Note we use "amqp-broker" in the ObjectId to be compatible with qpid-config but we set the actual + // name to "amqp-java-broker" as it's useful to be able to distinguish between C++ and Java Brokers. + setObjectId(new ObjectId("", "org.apache.qpid.broker:broker:amqp-broker", 0)); + } + + /** + * This helper method checks the supplied properties Map for the "alternate-exchange" property, if it is present + * the property is removed from the map and the alternate exchange is parsed to recover the Virtual Host name + * and the actual alternate exchange name. If the alternate exchange Virtual Host name is not the same as the + * supplied vhostName this method returns "invalid" otherwise it returns the alternate exchange name or null. + * + * @param vhostName the Virtual Host name that we want to compare the alternate exchange's Virtual Host name with. + * @param properties a Map of properties that might contain "alternate-exchange". + * @return the alternate exchange name if present, null if not present or "invalid" if the Virtual Host name that + * was parsed from the alternate exchange doesn't match the name supplied in the vhostName parameter. + */ + private String parseAlternateExchange(String vhostName, Map<String, Object> properties) + { + String alternateExchange = null; + Object property = properties.get("alternate-exchange"); + if (property != null && property instanceof String) // Alternate exchange has been specified. + { + alternateExchange = property.toString(); + properties.remove("alternate-exchange"); + + String altExVhostName = _defaultVirtualHost; + String[] splitName = alternateExchange.split("/"); + if (alternateExchange.startsWith("vhost:")) + { + altExVhostName = splitName[0]; + altExVhostName = altExVhostName.substring(6, altExVhostName.length()); + } + + // If the Virtual Hosts differ raise an exception and return. + if (!altExVhostName.equals(vhostName)) + { + return "invalid"; + } + } + + return alternateExchange; + } + + /** + * This method acts as a single entry point for QMF methods invoked on the Broker Object. + * + * @param agent the org.apache.qpid.qmf2.agent.Agent instance that we call methodResponse() and raiseException() on. + * @param handle the reply handle used by methodResponse() and raiseException(). + * @param methodName the name of the QMF method being invoked. + * @param inArgs a Map of input arguments wrapped in a QmfData Object. + */ + @SuppressWarnings("unchecked") + public void invokeMethod(Agent agent, Handle handle, String methodName, QmfData inArgs) + { + if (methodName.equals("create") || methodName.equals("delete")) + { + QmfData outArgs = new QmfData(); + + String name = inArgs.getStringValue("name"); + String type = inArgs.getStringValue("type"); + + NameParser nameParser = new NameParser(name, type); + String vhostName = nameParser.getVirtualHostName(); + VirtualHost vhost = nameParser.getVirtualHost(); + + if (vhost == null) + { + if (vhostName == null) + { + agent.raiseException(handle, "VirtualHost names for exchange and queue must match."); + } + else + { + agent.raiseException(handle, "VirtualHost " + vhostName + " not found."); + } + } + else + { + if (methodName.equals("create")) // method = create + { + try + { + //boolean strict = inArgs.getBooleanValue("strict"); + Map<String, Object> properties = inArgs.getValue("properties"); + + boolean durable = false; + Object property = properties.get("durable"); + if (property != null && property instanceof Boolean) + { + Boolean durableProperty = (Boolean)property; + durable = durableProperty.booleanValue(); + properties.remove("durable"); + } + + if (type.equals("exchange")) // create exchange. + { +/* +System.out.println("Create Exchange"); +System.out.println("vhostName = " + vhostName); +System.out.println("exchange name = " + nameParser.getExchangeName()); +System.out.println("properties = " + properties); +*/ + String exchangeType = ""; + property = properties.get("exchange-type"); + if (property != null && property instanceof String) + { + exchangeType = property.toString(); + properties.remove("exchange-type"); + } + + String alternateExchange = parseAlternateExchange(vhostName, properties); + if (alternateExchange != null && alternateExchange.equals("invalid")) + { + agent.raiseException(handle, "Alternate Exchange must belong to the same Virtual Host as the Exchange being added."); + return; + } + + // Note that for Qpid 0.20 the "qpid.msg_sequence=1" and "qpid.ive=1" properties are + // not suppored, indeed no exchange properties seem to be supported yet. + Map<String,Object> attributes = new HashMap<>(); + attributes.put(Exchange.NAME, nameParser.getExchangeName()); + attributes.put(Exchange.STATE, State.ACTIVE); + attributes.put(Exchange.DURABLE, durable); + attributes.put(Exchange.LIFETIME_POLICY, LifetimePolicy.PERMANENT); + attributes.put(Exchange.TYPE, exchangeType); + attributes.put(Exchange.ALTERNATE_EXCHANGE, alternateExchange); + + + vhost.createExchange(attributes); + + } // End of create exchange. + else if (type.equals("queue")) // create queue. + { +/* +System.out.println("Create Queue"); +System.out.println("vhostName = " + vhostName); +System.out.println("queue name = " + nameParser.getQueueName()); +System.out.println("properties = " + properties); +*/ + + // TODO Try to map from the QMF create queue properties to the closest equivalents on + // the Java Broker. Unfortunately there are a *lot* of frustrating little differences. + + + String alternateExchange = parseAlternateExchange(vhostName, properties); + if (alternateExchange != null && alternateExchange.equals("invalid")) + { + agent.raiseException(handle, "Alternate Exchange must belong to the same Virtual Host as the Queue being added."); + return; + } + + // I don't *think* that it make sense to allow setting exclusive or autoDelete to + // a queue created from config. + Map<String,Object> attributes = new HashMap<String,Object>(properties); + attributes.put(Queue.NAME, nameParser.getQueueName()); + attributes.put(Queue.DURABLE, durable); + attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.PERMANENT); + + + // Set the queue's alternateExchange, which is just a little bit involved...... + // The queue.setAttribute() method needs an org.apache.qpid.server.model.Exchange instance + // not just a name, so we look up org.apache.qpid.server.qmf2.agentdata.Exchange by ID + // and get its associated org.apache.qpid.server.model.Exchange. We can do a look up by ID + // because we needed to use ObjectIds that were based on names in order to allow qpid-config + // to work, so we may as well make use of this convenience here too. + if (alternateExchange != null) + { + ObjectId objectId = + new ObjectId("", "org.apache.qpid.broker:exchange:" + alternateExchange, 0); + + // Look up Exchange QmfAgentData by ObjectId from the Agent's internal Object store. + QmfAgentData object = agent.getObject(objectId); + if (object != null) + { + org.apache.qpid.server.qmf2.agentdata.Exchange ex = + (org.apache.qpid.server.qmf2.agentdata.Exchange)object; + + Exchange altEx = ex.getExchange(); + attributes.put(Queue.ALTERNATE_EXCHANGE, altEx.getId()); + } + } + Queue queue = vhost.createQueue(attributes); + } + else if (type.equals("binding")) // create binding. + { + Exchange exchange = nameParser.getExchange(); + if (exchange == null) + { + agent.raiseException(handle, "Cannot create binding on Exchange " + + nameParser.getExchangeName()); + return; + } + else + { + Map<String, Object> attributes = Collections.emptyMap(); + exchange.createBinding(nameParser.getBindingKey(), nameParser.getQueue(), + properties, attributes); + } + } + + agent.methodResponse(methodName, handle, outArgs, null); + } + catch (Exception e) + { + agent.raiseException(handle, e.getMessage()); + } + } + else // method = delete + { + try + { + if (type.equals("exchange")) // delete exchange. + { + Exchange exchange = nameParser.getExchange(); + if (exchange != null) + { + exchange.delete(); + } + } + else if (type.equals("queue")) // delete queue. + { + Queue queue = nameParser.getQueue(); + if (queue != null) + { + queue.deleteAndReturnCount(); + } + } + else if (type.equals("binding")) // delete binding. + { + Binding binding = nameParser.getBinding(); + if (binding != null) + { + binding.delete(); + } + } + + agent.methodResponse(methodName, handle, outArgs, null); + } + catch (Exception e) + { + agent.raiseException(handle, e.getMessage()); + } + } + } + } + else // If methodName is not create or delete. + { + agent.raiseException(handle, methodName + " not yet implemented on Broker."); + } + } // End of invokeMethod. + + /** + * This method maps the org.apache.qpid.server.model.Broker to QMF2 broker properties where possible then + * serialises into the underlying Map for transmission via AMQP. This method is called by handleQueryRequest() + * in the org.apache.qpid.qmf2.agent.Agent class implementing the main QMF2 Agent behaviour. + * + * @return the underlying map. + */ + @Override + public Map<String, Object> mapEncode() + { + update(); // Need to do update before setting uptime in order to get the latest getUpdateTime() value. + + // Not sure if there's an "official" broker uptime anywhere, but as the QmfManagementAgent is created when + // the broker is and the Broker object is created then too the following approach should be good enough. + setValue("uptime", getUpdateTime() - getCreateTime()); + + return super.mapEncode(); + } +} diff --git a/qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/java/org/apache/qpid/server/qmf2/agentdata/Connection.java b/qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/java/org/apache/qpid/server/qmf2/agentdata/Connection.java new file mode 100644 index 0000000000..0dc9ba4975 --- /dev/null +++ b/qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/java/org/apache/qpid/server/qmf2/agentdata/Connection.java @@ -0,0 +1,192 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.qmf2.agentdata; + +// Misc Imports +import java.util.Map; + +// Simple Logging Facade 4 Java +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +// QMF2 Imports +import org.apache.qpid.qmf2.agent.QmfAgentData; +import org.apache.qpid.qmf2.common.QmfEvent; +import org.apache.qpid.qmf2.common.SchemaEventClass; +//import org.apache.qpid.qmf2.common.SchemaMethod; +import org.apache.qpid.qmf2.common.SchemaObjectClass; +//import org.apache.qpid.qmf2.common.SchemaProperty; + +/** + * This class provides a concrete implementation of QmfAgentData for the Connection Management Object. + * In general it's possible to use QmfAgentData without sub-classing as it's really a "bean" style class + * that retains its properties in a Map, but in the case of the Java Broker Management Agent it's useful + * to sub-class as we need to map between the properties/statistics as specified in the Java Broker + * management model and those specified in qpid/spec/management-schema.xml which is what the C++ broker + * uses. This class retains a reference to its peer org.apache.qpid.server.model.Connection and does the + * necessary mapping when its mapEncode() method is called (which is used to serialise the QmfAgentData). + * + * @author Fraser Adams + */ +public class Connection extends QmfAgentData +{ + private static final Logger _log = LoggerFactory.getLogger(Connection.class); + + /** + * This static initialiser block initialises the QMF2 Schema information needed by the Agent to find + * QmfAgentData and QmfEvent Objects of a given type. + */ + private static final SchemaObjectClass _schema; + private static final SchemaEventClass _clientConnectSchema; + private static final SchemaEventClass _clientDisconnectSchema; + + /** + * Returns the schema for the Connection class. + * @return the SchemaObjectClass for the Connection class. + */ + public static SchemaObjectClass getSchema() + { + return _schema; + } + + /** + * Returns the schema for the Client Connect Event. + * @return the SchemaEventClass for the Client Connect Event. + */ + public static SchemaEventClass getClientConnectSchema() + { + return _clientConnectSchema; + } + + /** + * Returns the schema for the Client Disconnect Event. + * @return the SchemaEventClass for the Client Disconnect Event. + */ + public static SchemaEventClass getClientDisconnectSchema() + { + return _clientDisconnectSchema; + } + + static + { + // Declare the schema for the QMF2 connection class. + _schema = new SchemaObjectClass("org.apache.qpid.broker", "connection"); + + // TODO + //_schema.addProperty(new SchemaProperty("whatHappened", QmfType.TYPE_STRING)); + + // Declare the schema for the QMF2 clientConnect Event class. + _clientConnectSchema = new SchemaEventClass("org.apache.qpid.broker", "clientConnect"); + + // Declare the schema for the QMF2 clientDisconnect Event class. + _clientDisconnectSchema = new SchemaEventClass("org.apache.qpid.broker", "clientDisconnect"); + } + // End of static initialiser. + + private final org.apache.qpid.server.model.Connection _connection; + + /** + * Constructor. + * @param vhost the parent VirtualHost ConfiguredObject from the broker model. + * @param connection the Connection ConfiguredObject from the broker model. + */ + public Connection(final org.apache.qpid.server.model.VirtualHost vhost, + final org.apache.qpid.server.model.Connection connection) + { + super(getSchema()); + _connection = connection; // Will eventually be used to retrieve statistics (when useful ones get populated). + String vhostName = (vhost == null) ? "" : "vhost:" + vhost.getName() + "/"; + String address = vhostName + _connection.getName(); + + // TODO vhostRef - currently just use its name to try and get things working with standard command line tools. + + setValue("address", address); + setValue("incoming", connection.isIncoming()); + + // Although not implemented in Java Broker it's reasonable for them to be false + setValue("SystemConnection", false); // Is the S in System really a capital? not implemented in Java Broker + setValue("userProxyAuth", false); // Not implemented in Java Broker + setValue("federationLink", false); // Not implemented in Java Broker + setValue("authIdentity", (connection.getPrincipal() == null ? "unknown" : connection.getPrincipal())); + setValue("remoteProcessName", (connection.getRemoteProcessName() == null ? + "unknown" : connection.getRemoteProcessName())); + setValue("remotePid", (connection.getRemoteProcessPid() == null ? + "unknown" : connection.getRemoteProcessPid())); + setValue("remoteParentPid", "unknown"); // remoteProcessName not supported in Java Broker + + // shadow Not implemented in Java Broker + // saslMechanism Not implemented in Java Broker + // saslSsf Not implemented in Java Broker + // protocol Not implemented in Java Broker + } + + /** + * Factory method to create a Client Connect Event Object with timestamp of now. + * @return the newly created Client Connect Event Object. + */ + public QmfEvent createClientConnectEvent() + { + QmfEvent clientConnect = new QmfEvent(_clientConnectSchema); + clientConnect.setSeverity("info"); + // TODO Set properties Map - can't really get much info from the org.apache.qpid.server.model.Connection yet. + clientConnect.setValue("rhost", _connection.getName()); + clientConnect.setValue("user", getStringValue("authIdentity")); + return clientConnect; + } + + /** + * Factory method to create a Client Disconnect Event Object with timestamp of now. + * @return the newly created Client Disconnect Event Object. + */ + public QmfEvent createClientDisconnectEvent() + { + QmfEvent clientDisconnect = new QmfEvent(_clientDisconnectSchema); + clientDisconnect.setSeverity("info"); + // TODO Set properties Map - can't really get much info from the org.apache.qpid.server.model.Connection yet. + clientDisconnect.setValue("rhost", _connection.getName()); + clientDisconnect.setValue("user", getStringValue("authIdentity")); + return clientDisconnect; + } + + /** + * This method maps the org.apache.qpid.server.model.Connection to QMF2 connection properties where possible then + * serialises into the underlying Map for transmission via AMQP. This method is called by handleQueryRequest() + * in the org.apache.qpid.qmf2.agent.Agent class implementing the main QMF2 Agent behaviour. + * + * @return the underlying map. + */ + @Override + public Map<String, Object> mapEncode() + { + // Statistics + // closing Not implemented in Java Broker + setValue("framesFromClient", 0); // framesFromClient Not implemented in Java Broker + setValue("framesToClient", 0); // framesToClient Not implemented in Java Broker + setValue("bytesFromClient", _connection.getBytesIn()); + setValue("bytesToClient", _connection.getBytesOut()); + setValue("msgsFromClient", _connection.getMessagesIn()); + setValue("msgsToClient", _connection.getMessagesOut()); + + update(); // TODO only set update if statistics change. + return super.mapEncode(); + } +} diff --git a/qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/java/org/apache/qpid/server/qmf2/agentdata/Exchange.java b/qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/java/org/apache/qpid/server/qmf2/agentdata/Exchange.java new file mode 100644 index 0000000000..b5e1f662db --- /dev/null +++ b/qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/java/org/apache/qpid/server/qmf2/agentdata/Exchange.java @@ -0,0 +1,256 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.qmf2.agentdata; + +// Misc Imports +import java.util.Collections; +import java.util.Map; + +// Simple Logging Facade 4 Java +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +// QMF2 Imports +import org.apache.qpid.qmf2.agent.QmfAgentData; +import org.apache.qpid.qmf2.common.QmfEvent; +import org.apache.qpid.qmf2.common.ObjectId; +import org.apache.qpid.qmf2.common.SchemaEventClass; +//import org.apache.qpid.qmf2.common.SchemaMethod; +import org.apache.qpid.qmf2.common.SchemaObjectClass; +//import org.apache.qpid.qmf2.common.SchemaProperty; + +import org.apache.qpid.server.model.LifetimePolicy; + +/** + * This class provides a concrete implementation of QmfAgentData for the Exchange Management Object. + * In general it's possible to use QmfAgentData without sub-classing as it's really a "bean" style class + * that retains its properties in a Map, but in the case of the Java Broker Management Agent it's useful + * to sub-class as we need to map between the properties/statistics as specified in the Java Broker + * management model and those specified in qpid/spec/management-schema.xml which is what the C++ broker + * uses. This class retains a reference to its peer org.apache.qpid.server.model.Exchange and does the + * necessary mapping when its mapEncode() method is called (which is used to serialise the QmfAgentData). + * + * @author Fraser Adams + */ +public class Exchange extends QmfAgentData +{ + private static final Logger _log = LoggerFactory.getLogger(Exchange.class); + + /** + * This static initialiser block initialises the QMF2 Schema information needed by the Agent to find + * QmfAgentData and QmfEvent Objects of a given type. + */ + private static final SchemaObjectClass _schema; + private static final SchemaEventClass _exchangeDeclareSchema; + private static final SchemaEventClass _exchangeDeleteSchema; + + /** + * Returns the schema for the Exchange class. + * @return the SchemaObjectClass for the Exchange class. + */ + public static SchemaObjectClass getSchema() + { + return _schema; + } + + /** + * Returns the schema for the Exchange Declare Event. + * @return the SchemaEventClass for the Exchange Declare Event. + */ + public static SchemaEventClass getExchangeDeclareSchema() + { + return _exchangeDeclareSchema; + } + + /** + * Returns the schema for the Exchange Delete Event. + * @return the SchemaEventClass for the Exchange Delete Event. + */ + public static SchemaEventClass getExchangeDeleteSchema() + { + return _exchangeDeleteSchema; + } + + static + { + // Declare the schema for the QMF2 broker class. + _schema = new SchemaObjectClass("org.apache.qpid.broker", "exchange"); + + // TODO + //_schema.addProperty(new SchemaProperty("whatHappened", QmfType.TYPE_STRING)); + + // Declare the schema for the QMF2 exchangeDeclare Event class. + _exchangeDeclareSchema = new SchemaEventClass("org.apache.qpid.broker", "exchangeDeclare"); + + // Declare the schema for the QMF2 exchangeDelete Event class. + _exchangeDeleteSchema = new SchemaEventClass("org.apache.qpid.broker", "exchangeDelete"); + } + // End of static initialiser. + + private final org.apache.qpid.server.model.Exchange _exchange; + private String _name; + + /** + * Constructor. + * @param vhost the parent VirtualHost ConfiguredObject from the broker model. + * @param exchange the Exchange ConfiguredObject from the broker model. + */ + public Exchange(final org.apache.qpid.server.model.VirtualHost vhost, + final org.apache.qpid.server.model.Exchange exchange) + { + super(getSchema()); + _exchange = exchange; + + _name = _exchange.getName(); + _name = (_name.equals("<<default>>")) ? "" : _name; + + if (vhost == null) + { // Note we include an empty vhost name in the compare key to make sure things get sorted properly. + setCompareKey("vhost:/" + _name); + } + else + { + _name = "vhost:" + vhost.getName() + "/" + _name; + setCompareKey(_name); + } + + // In the Java Broker LifetimePolicy may be PERMANENT, DELETE_ON_CONNECTION_CLOSE, + // DELETE_ON_SESSION_END, DELETE_ON_NO_OUTBOUND_LINKS, DELETE_ON_NO_LINKS, IN_USE + // We map these to a boolean value to be consistent with the C++ Broker QMF value. + // TODO The C++ and Java Brokers should really return consistent information. + LifetimePolicy lifetimePolicy = _exchange.getLifetimePolicy(); + boolean autoDelete = (lifetimePolicy != LifetimePolicy.PERMANENT) ? true : false; + + // TODO vhostRef - currently just use its name to try and get things working with standard command line tools. + + setValue("name", _name); + setValue("type", _exchange.getType()); + setValue("durable", _exchange.isDurable()); + setValue("autoDelete", autoDelete); + + // TODO figure out mapping from Java Broker model to QMF exchange arguments. + // Set to empty Map for now to be consistent with C++ broker. + setValue("arguments", Collections.EMPTY_MAP); + + // ObjectId needs to be set here in Exchange because the QMF2 version of qpid-config uses a hardcoded + // _object_name as below in the _object_id that it sets in the getExchange() call and in exchangeRef. + // It *shouldn't* do this and should really use the _object_id of the exchange object returned by + // getObjects("exchange"), but it does. The following line causes the Agent to use the explicit + // ObjectId below rather than constructing its own, which fixes the qpid-config issue. + setObjectId(new ObjectId("", "org.apache.qpid.broker:exchange:" + _name, 0)); + } + + /** + * Get the peer org.apache.qpid.server.model.Exchange instance. This is mainly used when creating an Alternate + * Exchange on a Queue as the underlying method requires an org.apache.qpid.server.model.Exchange. + */ + public org.apache.qpid.server.model.Exchange getExchange() + { + return _exchange; + } + + /** + * Factory method to create an Exchange Declare Event Object with timestamp of now. + * @return the newly created Exchange Declare Event Object. + */ + public QmfEvent createExchangeDeclareEvent() + { + QmfEvent exchangeDeclare = new QmfEvent(_exchangeDeclareSchema); + exchangeDeclare.setSeverity("info"); + exchangeDeclare.setValue("altEx", ""); // Java Broker can't set Alternate Exchange on Exchange + exchangeDeclare.setValue("args", Collections.EMPTY_MAP); + exchangeDeclare.setValue("autoDel", getBooleanValue("autoDelete")); + exchangeDeclare.setValue("disp", "created"); + exchangeDeclare.setValue("durable", getBooleanValue("durable")); + exchangeDeclare.setValue("exName", _name); + exchangeDeclare.setValue("exType", getStringValue("type")); + // TODO Not sure of a way to get these for Java Broker Exchange. + //exchangeDeclare.setValue("rhost", _connection.getName()); + //exchangeDeclare.setValue("user", getStringValue("authIdentity")); + return exchangeDeclare; + } + + /** + * Factory method to create an Exchange Delete Event Object with timestamp of now. + * @return the newly created Exchange Delete Event Object. + */ + public QmfEvent createExchangeDeleteEvent() + { + QmfEvent exchangeDelete = new QmfEvent(_exchangeDeleteSchema); + exchangeDelete.setSeverity("info"); + exchangeDelete.setValue("exName", _name); + // TODO Not sure of a way to get these for Java Broker Exchange. + //exchangeDelete.setValue("rhost", _connection.getName()); + //exchangeDelete.setValue("user", getStringValue("authIdentity")); + return exchangeDelete; + } + + /** + * This method maps the org.apache.qpid.server.model.Exchange to QMF2 broker properties where possible then + * serialises into the underlying Map for transmission via AMQP. This method is called by handleQueryRequest() + * in the org.apache.qpid.qmf2.agent.Agent class implementing the main QMF2 Agent behaviour. + * + * @return the underlying map. + */ + @Override + public Map<String, Object> mapEncode() + { + // Statistics + long msgReceives = _exchange.getMessagesIn(); + long msgDrops = _exchange.getMessagesDropped(); + long msgRoutes = msgReceives - msgDrops; + + long byteReceives = _exchange.getBytesIn(); + long byteDrops = _exchange.getBytesDropped(); + long byteRoutes = byteReceives - byteDrops; + + setValue("producerCount", _exchange.getPublishers().size()); + + // We have to modify the value of bindingCount for Exchange because the QmfManagementAgent "hides" the + // QMF Objects that relate to its own AMQP Connection/Queues/Bindings so the bindingCount for default direct + // qmf.default.direct and qmf.default.topic is different to the actual number of QMF bindings. + long bindingCount = _exchange.getBindingCount(); + if (_name.equals("")) + { + bindingCount -= 3; + } + else if (_name.equals("qmf.default.direct")) + { + bindingCount -= 2; + } + else if (_name.equals("qmf.default.topic")) + { + bindingCount -= 1; + } + setValue("bindingCount", bindingCount); + + setValue("msgReceives", msgReceives); + setValue("msgDrops", msgDrops); + setValue("msgRoutes", msgRoutes); + setValue("byteReceives", byteReceives); + setValue("byteDrops", byteDrops); + setValue("byteRoutes", byteRoutes); + + update(); // TODO only set update if a statistic has actually changed value. + return super.mapEncode(); + } +} diff --git a/qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/java/org/apache/qpid/server/qmf2/agentdata/Queue.java b/qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/java/org/apache/qpid/server/qmf2/agentdata/Queue.java new file mode 100644 index 0000000000..aad85e66e9 --- /dev/null +++ b/qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/java/org/apache/qpid/server/qmf2/agentdata/Queue.java @@ -0,0 +1,297 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.qmf2.agentdata; + +// Misc Imports +import java.util.Collections; +import java.util.Map; + +// Simple Logging Facade 4 Java +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +// QMF2 Imports +import org.apache.qpid.qmf2.agent.Agent; +import org.apache.qpid.qmf2.agent.QmfAgentData; +import org.apache.qpid.qmf2.common.QmfEvent; +import org.apache.qpid.qmf2.common.Handle; +import org.apache.qpid.qmf2.common.ObjectId; +import org.apache.qpid.qmf2.common.QmfData; +import org.apache.qpid.qmf2.common.SchemaEventClass; +//import org.apache.qpid.qmf2.common.SchemaMethod; +import org.apache.qpid.qmf2.common.SchemaObjectClass; +//import org.apache.qpid.qmf2.common.SchemaProperty; + +import org.apache.qpid.server.model.Exchange; +import org.apache.qpid.server.model.ExclusivityPolicy; +import org.apache.qpid.server.model.LifetimePolicy; + +/** + * This class provides a concrete implementation of QmfAgentData for the Queue Management Object. + * In general it's possible to use QmfAgentData without sub-classing as it's really a "bean" style class + * that retains its properties in a Map, but in the case of the Java Broker Management Agent it's useful + * to sub-class as we need to map between the properties/statistics as specified in the Java Broker + * management model and those specified in qpid/spec/management-schema.xml which is what the C++ broker + * uses. This class retains a reference to its peer org.apache.qpid.server.model.Queue and does the + * necessary mapping when its mapEncode() method is called (which is used to serialise the QmfAgentData). + * + * @author Fraser Adams + */ +public class Queue extends QmfAgentData +{ + private static final Logger _log = LoggerFactory.getLogger(Queue.class); + + /** + * This static initialiser block initialises the QMF2 Schema information needed by the Agent to find + * QmfAgentData and QmfEvent Objects of a given type. + */ + private static final SchemaObjectClass _schema; + private static final SchemaEventClass _queueDeclareSchema; + private static final SchemaEventClass _queueDeleteSchema; + + /** + * Returns the schema for the Queue class. + * @return the SchemaObjectClass for the Queue class. + */ + public static SchemaObjectClass getSchema() + { + return _schema; + } + + /** + * Returns the schema for the Queue Declare Event. + * @return the SchemaEventClass for the Queue Declare Event. + */ + public static SchemaEventClass getQueueDeclareSchema() + { + return _queueDeclareSchema; + } + + /** + * Returns the schema for the Queue Delete Event. + * @return the SchemaEventClass for the Queue Delete Event. + */ + public static SchemaEventClass getQueueDeleteSchema() + { + return _queueDeleteSchema; + } + + static + { + // Declare the schema for the QMF2 broker class. + _schema = new SchemaObjectClass("org.apache.qpid.broker", "queue"); + + // TODO + //_schema.addProperty(new SchemaProperty("whatHappened", QmfType.TYPE_STRING)); + + // Declare the schema for the QMF2 queueDeclare Event class. + _queueDeclareSchema = new SchemaEventClass("org.apache.qpid.broker", "queueDeclare"); + + // Declare the schema for the QMF2 queueDelete Event class. + _queueDeleteSchema = new SchemaEventClass("org.apache.qpid.broker", "queueDelete"); + } + // End of static initialiser. + + private final org.apache.qpid.server.model.Queue _queue; + private String _vhostName = ""; + private ObjectId _alternateExchange = null; + private String _alternateExchangeName = ""; + + /** + * Constructor. + * @param vhost the parent VirtualHost ConfiguredObject from the broker model. + * @param queue the Queue ConfiguredObject from the broker model. + */ + public Queue(final org.apache.qpid.server.model.VirtualHost vhost, + final org.apache.qpid.server.model.Queue queue) + { + super(getSchema()); + _queue = queue; + + String name = _queue.getName(); + + if (vhost == null) + { // Note we include an empty vhost name in the compare key to make sure things get sorted properly. + setCompareKey("vhost:/" + name); + } + else + { + _vhostName = "vhost:" + vhost.getName() + "/"; + name = _vhostName + name; + setCompareKey(name); + } + + // In the Java Broker LifetimePolicy may be PERMANENT, DELETE_ON_CONNECTION_CLOSE, + // DELETE_ON_SESSION_END, DELETE_ON_NO_OUTBOUND_LINKS, DELETE_ON_NO_LINKS, IN_USE + // We map these to a boolean value to be consistent with the C++ Broker QMF value. + // TODO The C++ and Java Brokers should really return consistent information. + LifetimePolicy lifetimePolicy = _queue.getLifetimePolicy(); + boolean autoDelete = (lifetimePolicy != LifetimePolicy.PERMANENT) ? true : false; + + // In the Java Broker exclusivity may be NONE, SESSION, CONNECTION, CONTAINER, PRINCIPAL, LINK + // We map these to a boolean value to be consistent with the C++ Broker QMF value. + // TODO The C++ and Java Brokers should really return consistent information. + ExclusivityPolicy exclusivityPolicy = _queue.getExclusive(); + boolean exclusive = (exclusivityPolicy != ExclusivityPolicy.NONE) ? true : false; + + // TODO vhostRef - currently just use its name to try and get things working with standard command line tools. + + setValue("name", name); + setValue("durable", _queue.isDurable()); + setValue("autoDelete", autoDelete); + setValue("exclusive", exclusive); + + // altExchange needs to be set later, done in mapEncode() for convenience, because it isn't set during + // Queue construction in the Java Broker. + + // TODO figure out mapping from Java Broker model to QMF queue arguments. + // Set to empty Map for now to be consistent with C++ broker. + setValue("arguments", Collections.EMPTY_MAP); + + // ObjectId needs to be set here in Queue because the QMF2 version of qpid-config uses a hardcoded + // _object_name as below in the _object_id that it sets in the getQueue() call and in queueRef. + // It *shouldn't* do this and should really use the _object_id of the queue object returned by + // getObjects("queue"), but it does. The following line causes the Agent to use the explicit + // ObjectId below rather than constructing its own, which fixes the qpid-config issue. + setObjectId(new ObjectId("", "org.apache.qpid.broker:queue:" + name, 0)); + } + + /** + * TODO + * + */ + public void invokeMethod(Agent agent, Handle handle, String methodName, QmfData inArgs) + { + /*if (methodName.equals("purge")) + { + //broker.create(inArgs); + } + else if (methodName.equals("reroute")) + { + //broker.create(inArgs); + } + else*/ + { + agent.raiseException(handle, methodName + " not yet implemented on Queue."); + } + } + + /** + * Factory method to create a Queue Declare Event Object with timestamp of now. + * @return the newly created Queue Declare Event Object. + */ + public QmfEvent createQueueDeclareEvent() + { + QmfEvent queueDeclare = new QmfEvent(_queueDeclareSchema); + queueDeclare.setSeverity("info"); + // TODO the _alternateExchangeName gets set some time after the Constructor - how do I get its value for + // the queueDeclareEvent???!!! + queueDeclare.setValue("altEx", _alternateExchangeName); + queueDeclare.setValue("args", Collections.EMPTY_MAP); // TODO + queueDeclare.setValue("autoDel", getBooleanValue("autoDelete")); + queueDeclare.setValue("disp", "created"); + queueDeclare.setValue("durable", getBooleanValue("durable")); + queueDeclare.setValue("excl", getBooleanValue("exclusive")); + queueDeclare.setValue("qName", getStringValue("name")); + // TODO Not sure of a way to get these for Java Broker Exchange. + //queueDeclare.setValue("rhost", _connection.getName()); + //queueDeclare.setValue("user", getStringValue("authIdentity")); + return queueDeclare; + } + + /** + * Factory method to create a Queue Delete Event Object with timestamp of now. + * @return the newly created Queue Delete Event Object. + */ + public QmfEvent createQueueDeleteEvent() + { + QmfEvent queueDelete = new QmfEvent(_queueDeleteSchema); + queueDelete.setSeverity("info"); + queueDelete.setValue("qName", getStringValue("name")); + // TODO Not sure of a way to get these for Java Broker Exchange. + //queueDelete.setValue("rhost", _connection.getName()); + //queueDelete.setValue("user", getStringValue("authIdentity")); + return queueDelete; + } + + /** + * This method maps the org.apache.qpid.server.model.Queue to QMF2 broker properties where possible then + * serialises into the underlying Map for transmission via AMQP. This method is called by handleQueryRequest() + * in the org.apache.qpid.qmf2.agent.Agent class implementing the main QMF2 Agent behaviour. + * + * @return the underlying map. + */ + @Override + public Map<String, Object> mapEncode() + { + // Set the altExchange reference if an alternateExchange exists and hasn't already been set. + // Not sure how to set this closer to the Constructor. At the moment the _alternateExchangeName gets set + // too late to populate the "altEx" property of the queueDeclareEvent. + if (_alternateExchange == null) + { + Exchange altEx = _queue.getAlternateExchange(); + if (altEx != null) + { + _alternateExchangeName = _vhostName + altEx.getName(); + _alternateExchange = new ObjectId("", "org.apache.qpid.broker:exchange:" + _alternateExchangeName, 0); + setRefValue("altExchange", _alternateExchange); + } + } + + // Statistics + setValue("msgTotalEnqueues", _queue.getTotalEnqueuedMessages()); + setValue("msgTotalDequeues", _queue.getTotalDequeuedMessages()); + // msgTxnEnqueues not implemented in Java Broker + // msgTxnDequeues not implemented in Java Broker + setValue("msgPersistEnqueues", _queue.getPersistentEnqueuedMessages()); + setValue("msgPersistDequeues", _queue.getPersistentDequeuedMessages()); + setValue("msgDepth", _queue.getQueueDepthMessages()); + setValue("byteDepth", _queue.getQueueDepthBytes()); + setValue("byteTotalEnqueues", _queue.getTotalEnqueuedBytes()); + setValue("byteTotalDequeues", _queue.getTotalDequeuedBytes()); + // byteTxnEnqueues not implemented in Java Broker + // byteTxnDequeues not implemented in Java Broker + setValue("bytePersistEnqueues", _queue.getPersistentEnqueuedBytes()); + setValue("bytePersistDequeues", _queue.getPersistentDequeuedBytes()); + + // Flow-to-disk Statistics not implemented in Java Broker + // releases & acquires not implemented in Java Broker + // discardsTtl (discardsTtlMessages) not implemented in Java Broker + // discardsRing not implemented in Java Broker + // discardsLvq not implemented in Java Broker + // discardsOverflow not implemented in Java Broker + // discardsSubscriber not implemented in Java Broker + // discardsPurge not implemented in Java Broker + // reroutes not implemented in Java Broker + + setValue("consumerCount", _queue.getConsumerCount()); + setValue("bindingCount", _queue.getBindingCount()); + setValue("unackedMessages", _queue.getUnacknowledgedMessages()); + + setValue("messageLatency", "Not yet implemented"); + // flowStopped not implemented in Java Broker + // flowStoppedCount not implemented in Java Broker + + update(); // TODO only update if statistics have actually changed. + return super.mapEncode(); + } +} diff --git a/qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/java/org/apache/qpid/server/qmf2/agentdata/Session.java b/qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/java/org/apache/qpid/server/qmf2/agentdata/Session.java new file mode 100644 index 0000000000..ea7bece7c0 --- /dev/null +++ b/qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/java/org/apache/qpid/server/qmf2/agentdata/Session.java @@ -0,0 +1,116 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.qmf2.agentdata; + +// Misc Imports +import java.util.Collections; +import java.util.Map; + +// Simple Logging Facade 4 Java +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +// QMF2 Imports +import org.apache.qpid.qmf2.agent.QmfAgentData; +import org.apache.qpid.qmf2.common.ObjectId; +//import org.apache.qpid.qmf2.common.SchemaEventClass; +//import org.apache.qpid.qmf2.common.SchemaMethod; +import org.apache.qpid.qmf2.common.SchemaObjectClass; +//import org.apache.qpid.qmf2.common.SchemaProperty; + +/** + * This class provides a concrete implementation of QmfAgentData for the Session Management Object. + * In general it's possible to use QmfAgentData without sub-classing as it's really a "bean" style class + * that retains its properties in a Map, but in the case of the Java Broker Management Agent it's useful + * to sub-class as we need to map between the properties/statistics as specified in the Java Broker + * management model and those specified in qpid/spec/management-schema.xml which is what the C++ broker + * uses. This class retains a reference to its peer org.apache.qpid.server.model.Consumer and does the + * necessary mapping when its mapEncode() method is called (which is used to serialise the QmfAgentData). + * + * @author Fraser Adams + */ +public class Session extends QmfAgentData +{ + private static final Logger _log = LoggerFactory.getLogger(Session.class); + + /** + * This static initialiser block initialises the QMF2 Schema information needed by the Agent to find + * QmfAgentData and QmfEvent Objects of a given type. + */ + private static final SchemaObjectClass _schema; + + /** + * Returns the schema for the Session class. + * @return the SchemaObjectClass for the Session class. + */ + public static SchemaObjectClass getSchema() + { + return _schema; + } + + static + { + // Declare the schema for the QMF2 session class. + _schema = new SchemaObjectClass("org.apache.qpid.broker", "session"); + + // TODO + //_schema.addProperty(new SchemaProperty("whatHappened", QmfType.TYPE_STRING)); + } + // End of static initialiser. + + private final org.apache.qpid.server.model.Session _session; + + /** + * Constructor. + * @param session the Session ConfiguredObject from the broker model. + * @param connectionRef the ObjectId of the Connection Object that is the parent of the Session. + */ + public Session(final org.apache.qpid.server.model.Session session, final ObjectId connectionRef) + { + super(getSchema()); + _session = session; + + setValue("name", session.getId()); // Use ID to be consistent with C++ Broker. + setValue("channelId", session.getName()); // The Java Broker name uses the channelId. + setRefValue("connectionRef", connectionRef); + } + + /** + * This method maps the org.apache.qpid.server.model.Session to QMF2 subscribe properties where possible then + * serialises into the underlying Map for transmission via AMQP. This method is called by handleQueryRequest() + * in the org.apache.qpid.qmf2.agent.Agent class implementing the main QMF2 Agent behaviour. + * + * @return the underlying map. + */ + @Override + public Map<String, Object> mapEncode() + { + // Statistics + setValue("unackedMessages", _session.getUnacknowledgedMessages()); + setValue("TxnStarts", _session.getLocalTransactionBegins()); + setValue("TxnRejects", _session.getLocalTransactionRollbacks()); + + update(); // TODO Only Update if statistics have changes. + + return super.mapEncode(); + } +} diff --git a/qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/java/org/apache/qpid/server/qmf2/agentdata/Subscription.java b/qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/java/org/apache/qpid/server/qmf2/agentdata/Subscription.java new file mode 100644 index 0000000000..bdfe6cca37 --- /dev/null +++ b/qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/java/org/apache/qpid/server/qmf2/agentdata/Subscription.java @@ -0,0 +1,211 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.qmf2.agentdata; + +// Misc Imports +import java.util.Collections; +import java.util.Map; + +// Simple Logging Facade 4 Java +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +// QMF2 Imports +import org.apache.qpid.qmf2.agent.QmfAgentData; +import org.apache.qpid.qmf2.common.QmfEvent; +import org.apache.qpid.qmf2.common.ObjectId; +import org.apache.qpid.qmf2.common.QmfEvent; +import org.apache.qpid.qmf2.common.SchemaEventClass; +//import org.apache.qpid.qmf2.common.SchemaMethod; +import org.apache.qpid.qmf2.common.SchemaObjectClass; +//import org.apache.qpid.qmf2.common.SchemaProperty; + +import org.apache.qpid.server.model.ExclusivityPolicy; +import org.apache.qpid.server.model.Queue; + +/** + * This class provides a concrete implementation of QmfAgentData for the Subscription Management Object. + * In general it's possible to use QmfAgentData without sub-classing as it's really a "bean" style class + * that retains its properties in a Map, but in the case of the Java Broker Management Agent it's useful + * to sub-class as we need to map between the properties/statistics as specified in the Java Broker + * management model and those specified in qpid/spec/management-schema.xml which is what the C++ broker + * uses. This class retains a reference to its peer org.apache.qpid.server.model.Consumer and does the + * necessary mapping when its mapEncode() method is called (which is used to serialise the QmfAgentData). + * + * @author Fraser Adams + */ +public class Subscription extends QmfAgentData +{ + private static final Logger _log = LoggerFactory.getLogger(Subscription.class); + + /** + * This static initialiser block initialises the QMF2 Schema information needed by the Agent to find + * QmfAgentData and QmfEvent Objects of a given type. + */ + private static final SchemaObjectClass _schema; + private static final SchemaEventClass _subscribeSchema; + private static final SchemaEventClass _unsubscribeSchema; + + /** + * Returns the schema for the Subscription class. + * @return the SchemaObjectClass for the Subscription class. + */ + public static SchemaObjectClass getSchema() + { + return _schema; + } + + /** + * Returns the schema for the Subscribe Event. + * @return the SchemaEventClass for the Subscribe Event. + */ + public static SchemaEventClass getSubscribeSchema() + { + return _subscribeSchema; + } + + /** + * Returns the schema for the Unsubscribe Event. + * @return the SchemaEventClass for the Unsubscribe Event. + */ + public static SchemaEventClass getUnsubscribeSchema() + { + return _unsubscribeSchema; + } + + static + { + // Declare the schema for the QMF2 subscription class. + _schema = new SchemaObjectClass("org.apache.qpid.broker", "subscription"); + + // TODO + //_schema.addProperty(new SchemaProperty("whatHappened", QmfType.TYPE_STRING)); + + // Declare the schema for the QMF2 subscribe Event class. + _subscribeSchema = new SchemaEventClass("org.apache.qpid.broker", "subscribe"); + + // Declare the schema for the QMF2 unsubscribe Event class. + _unsubscribeSchema = new SchemaEventClass("org.apache.qpid.broker", "unsubscribe"); + } + // End of static initialiser. + + private final org.apache.qpid.server.model.Consumer _subscription; + + private boolean _exclusive = false; + private String _qName = ""; + + /** + * Constructor. + * @param subscription the Consumer ConfiguredObject from the broker model. + */ + public Subscription(final org.apache.qpid.server.model.Consumer subscription) + { + super(getSchema()); + _subscription = subscription; // Will eventually be used in mapEncode() to retrieve statistics. + + setValue("name", subscription.getName()); + setValue("browsing", false); // TODO not supported in Java Broker. + setValue("acknowledged", true); // TODO not supported in Java Broker. + setValue("creditMode", "WINDOW"); // TODO not supported in Java Broker. + } + + /** + * Set the sessionRef property. + * @param sessionRef the sessionRef ObjectId. + */ + public void setSessionRef(final ObjectId sessionRef) + { + setRefValue("sessionRef", sessionRef); + } + + /** + * Set the queueRef property. + * @param queueRef the queueRef ObjectId. + */ + public void setQueueRef(final ObjectId queueRef, final Queue queue) + { + setRefValue("queueRef", queueRef); + + // Unfortunately the org.apache.qpid.server.model.Consumer doesn't yet allow access to its associated Queue + // so we pass a reference ourselves when we do setQueueRef. This is because some Subscription properties + // are *actually" related to the associated Queue. + _qName = queue.getName(); + + // In the Java Broker exclusivity may be NONE, SESSION, CONNECTION, CONTAINER, PRINCIPAL, LINK + // We map these to a boolean value to be consistent with the C++ Broker QMF values. + // TODO The C++ and Java Brokers should really return consistent information. + ExclusivityPolicy exclusivityPolicy = queue.getExclusive(); + _exclusive = (exclusivityPolicy != ExclusivityPolicy.NONE) ? true : false; + } + + /** + * Factory method to create a Subscribe Event Object with timestamp of now. + * @return the newly created Subscribe Event Object. + */ + public QmfEvent createSubscribeEvent() + { + QmfEvent subscribe = new QmfEvent(_subscribeSchema); + subscribe.setSeverity("info"); + subscribe.setValue("args", Collections.EMPTY_MAP); + subscribe.setValue("dest", getStringValue("name")); + subscribe.setValue("excl", _exclusive); + subscribe.setValue("qName", _qName); + // TODO Not sure of a way to get these for Java Broker Subscription. + //subscribe.setValue("rhost", _connection.getName()); + //subscribe.setValue("user", getStringValue("authIdentity")); + return subscribe; + } + + /** + * Factory method to create an Unsubscribe Event Object with timestamp of now. + * @return the newly created Unsubscribe Event Object. + */ + public QmfEvent createUnsubscribeEvent() + { + QmfEvent unsubscribe = new QmfEvent(_unsubscribeSchema); + unsubscribe.setSeverity("info"); + unsubscribe.setValue("dest", getStringValue("name")); + // TODO Not sure of a way to get these for Java Broker Subscription. + //unsubscribe.setValue("rhost", _connection.getName()); + //unsubscribe.setValue("user", getStringValue("authIdentity")); + return unsubscribe; + } + + /** + * This method maps the org.apache.qpid.server.model.Consumer to QMF2 subscribe properties where possible then + * serialises into the underlying Map for transmission via AMQP. This method is called by handleQueryRequest() + * in the org.apache.qpid.qmf2.agent.Agent class implementing the main QMF2 Agent behaviour. + * + * @return the underlying map. + */ + @Override + public Map<String, Object> mapEncode() + { + // Statistics + setValue("delivered", _subscription.getMessagesOut()); + + setValue("exclusive", _exclusive); + + update(); // TODO Only Update if statistics have changes. + return super.mapEncode(); + } +} |