summaryrefslogtreecommitdiff
path: root/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/ManagementClient.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/ManagementClient.java')
-rw-r--r--qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/ManagementClient.java249
1 files changed, 249 insertions, 0 deletions
diff --git a/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/ManagementClient.java b/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/ManagementClient.java
new file mode 100644
index 0000000000..a6f5ab90cc
--- /dev/null
+++ b/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/ManagementClient.java
@@ -0,0 +1,249 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.management.domain.services;
+
+import java.util.UUID;
+
+import org.apache.qpid.management.Messages;
+import org.apache.qpid.management.Names;
+import org.apache.qpid.management.configuration.BrokerConnectionData;
+import org.apache.qpid.management.configuration.Configuration;
+import org.apache.qpid.management.domain.model.DomainModel;
+import org.apache.qpid.transport.util.Logger;
+
+/**
+ * This is the Object representation of a management client.
+ * According to specification : "A software component that is separate from the messaging broker, connected to the
+ * management broker via an AMQP connection, which allows any software component to be managed remotely by QPID."
+ *
+ * @author Andrea Gazzarini
+ */
+public final class ManagementClient
+{
+ private final static Logger LOGGER = Logger.get(ManagementClient.class);
+
+ private final String _managementQueueName;
+ private final String _methodReplyQueueName;
+
+ private DomainModel _domainModel;
+ private QpidService _service;
+
+ private final BrokerConnectionData _connectionData;
+
+ /**
+ * Builds a new <code>ManagementClient</code> with the given identifier and connection data.
+ *
+ * @param brokerId the broker identifier.
+ * @param connectionData the broker connection data (host, port, etc...)
+ */
+ ManagementClient(UUID brokerId,BrokerConnectionData connectionData)
+ {
+ _connectionData = connectionData;
+ _service = new QpidService(brokerId);
+ _domainModel = new DomainModel(brokerId);
+ _managementQueueName = Configuration.getInstance().getManagementQueueName();
+ _methodReplyQueueName = Configuration.getInstance().getMethodReplyQueueName();
+ }
+
+ @Override
+ public String toString()
+ {
+ return _connectionData.toString();
+ }
+
+ /**
+ * Returns the connection data associated with this management client.
+ *
+ * @return the connection data associated with this management client.
+ */
+ public BrokerConnectionData getBrokerConnectionData()
+ {
+ return _connectionData;
+ }
+
+ /**
+ * Establishing initial communication Between Client and Broker.
+ * According to specification :
+ * "Communication is established between the management client and management agent using normal AMQP procedures.
+ * The client creates a connection to the broker and then establishes a session with its corresponding channel.
+ * Two private queues are then declared.
+ * A management queue is declared and bound to the qpid.management exchange with "mgmt.#" as routing key; in that
+ * way all management-related messages sent to the exchange will be received by this client.
+ * When a client successfully binds to the qpid.management exchange, the management agent schedules a schema
+ * broadcast to be sent to the exchange.
+ * The agent will publish, via the exchange, a description of the schema for all manageable objects in its control. That
+ * schema is therefore received by this service and it will be part of service's domain model."
+ *
+ * @throws StartupFailureException when this management client cannot perform startup operations due to an error.
+ */
+ void estabilishFirstConnectionWithBroker() throws StartupFailureException{
+ try {
+ connectWithBroker();
+
+ createAndBindMethodReplyQueue();
+ createAndBindManagementQueue();
+
+ registerConsumerOnManagementQueue();
+ registerConsumerOnMethodReplyQueue();
+
+ synchronize();
+ } catch(Exception exception)
+ {
+ try {
+ _service.close();
+ } catch(Exception ignore)
+ {
+ }
+ throw new StartupFailureException(exception);
+ }
+ }
+
+ /**
+ * Shutdown procedure for this management client.
+ */
+ void shutdown ()
+ {
+ LOGGER.info(Messages.QMAN_000011_SHUTDOWN_INITIATED,_domainModel.getBrokerId());
+
+ removeMethodReplyConsumer();
+ destroyAndUnbingMethodReplyQueue();
+
+ removeManagementConsumer();
+ destroyAndUnbingManagementQueue();
+
+ _domainModel.releaseResources();
+
+ _service.close();
+
+ LOGGER.info(Messages.QMAN_000012_MANAGEMENT_CLIENT_SHUT_DOWN,_domainModel.getBrokerId());
+ }
+
+ /**
+ * Registers a consumer (that is, a listener) on the method-reply queue.
+ */
+ private void registerConsumerOnMethodReplyQueue ()
+ {
+ BrokerMessageListener methodReplyChannelListener = new BrokerMessageListener(_domainModel);
+ methodReplyChannelListener.setHandlers(Configuration.getInstance().getMethodReplyQueueHandlers());
+ _service.createSubscription(_methodReplyQueueName, _methodReplyQueueName, methodReplyChannelListener);
+
+ LOGGER.info(Messages.QMAN_000013_METHOD_REPLY_CONSUMER_INSTALLED, _domainModel.getBrokerId());
+ }
+
+ /**
+ * Registers a consumer (listener) on the management queue.
+ */
+ private void registerConsumerOnManagementQueue ()
+ {
+ BrokerMessageListener managementChannelListener = new BrokerMessageListener(_domainModel);
+ managementChannelListener.setHandlers(Configuration.getInstance().getManagementQueueHandlers());
+ _service.createSubscription(_managementQueueName, _managementQueueName, managementChannelListener);
+
+ LOGGER.info(Messages.QMAN_000014_MANAGEMENT_CONSUMER_INSTALLED, _domainModel.getBrokerId());
+ }
+
+ /**
+ * Declares a management queue and bound it to the "qpid.management" exchange with "mgmt.#" as routing key;
+ */
+ private void createAndBindManagementQueue ()
+ {
+ _service.declareQueue(_managementQueueName);
+ _service.declareBinding(
+ _managementQueueName,
+ Names.MANAGEMENT_EXCHANGE,
+ Names.MANAGEMENT_ROUTING_KEY);
+
+ LOGGER.info(Messages.QMAN_000015_MANAGEMENT_QUEUE_DECLARED,_managementQueueName,_domainModel.getBrokerId());
+ }
+
+ /**
+ * Declares a private queue for receiving method replies (after method invocations).
+ * This queue is bound to the amq.direct exchange using a routing key equal to the name of the queue.
+ */
+ private void createAndBindMethodReplyQueue ()
+ {
+ _service.declareQueue(_methodReplyQueueName);
+ _service.declareBinding(_methodReplyQueueName, Names.AMQ_DIRECT_QUEUE, _methodReplyQueueName);
+
+ LOGGER.info(Messages.QMAN_000016_METHOD_REPLY_QUEUE_DECLARED,_methodReplyQueueName, _domainModel.getBrokerId());
+ }
+
+ /**
+ * Removes the method-reply queue consumer.
+ */
+ private void removeMethodReplyConsumer()
+ {
+ _service.removeSubscription(_methodReplyQueueName);
+
+ LOGGER.info(Messages.QMAN_000017_CONSUMER_HAS_BEEN_REMOVED,_methodReplyQueueName,_domainModel.getBrokerId());
+ }
+
+ /**
+ * Unbind the method reply queue and after that destroy it from remote broker.
+ */
+ private void destroyAndUnbingMethodReplyQueue()
+ {
+ _service.declareUnbinding(_methodReplyQueueName, Names.AMQ_DIRECT_QUEUE, _methodReplyQueueName);
+ _service.deleteQueue(_methodReplyQueueName);
+
+ LOGGER.info(Messages.QMAN_000018_QUEUE_UNDECLARED,_methodReplyQueueName,_domainModel.getBrokerId());
+ }
+
+ /**
+ * Removes the management queue consumer.
+ */
+ private void removeManagementConsumer()
+ {
+ _service.removeSubscription(_managementQueueName);
+
+ LOGGER.info(Messages.QMAN_000017_CONSUMER_HAS_BEEN_REMOVED,_managementQueueName,_domainModel.getBrokerId());
+ }
+
+ /**
+ * Unbind the management queue and after that destroy it from remote broker.
+ */
+ private void destroyAndUnbingManagementQueue()
+ {
+ _service.declareUnbinding(_managementQueueName, Names.MANAGEMENT_EXCHANGE, Names.MANAGEMENT_ROUTING_KEY);
+ _service.deleteQueue(_managementQueueName);
+
+ LOGGER.info(Messages.QMAN_000018_QUEUE_UNDECLARED, _managementQueueName,_domainModel.getBrokerId());
+ }
+
+ /**
+ * Connects this client with the broker.
+ *
+ * @throws QpidException when it's not possibile to connect with the broker.
+ */
+ private void connectWithBroker() throws Exception
+ {
+ _service.connect();
+ }
+
+ /**
+ * All the Management client commands are asynchronous.
+ * Synchronous behavior is achieved through invoking the sync method.
+ */
+ private void synchronize()
+ {
+ _service.sync();
+ }
+}