summaryrefslogtreecommitdiff
path: root/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java')
-rw-r--r--qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java361
1 files changed, 361 insertions, 0 deletions
diff --git a/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java b/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java
new file mode 100644
index 0000000000..bd7d305184
--- /dev/null
+++ b/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java
@@ -0,0 +1,361 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.management.domain.services;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.qpid.api.Message;
+import org.apache.qpid.management.Messages;
+import org.apache.qpid.management.Names;
+import org.apache.qpid.management.configuration.QpidDatasource;
+import org.apache.qpid.management.domain.model.QpidMethod;
+import org.apache.qpid.management.domain.model.type.Binary;
+import org.apache.qpid.management.messages.MethodInvocationRequestMessage;
+import org.apache.qpid.management.messages.SchemaRequestMessage;
+import org.apache.qpid.nclient.util.MessageListener;
+import org.apache.qpid.nclient.util.MessagePartListenerAdapter;
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.MessageAcceptMode;
+import org.apache.qpid.transport.MessageAcquireMode;
+import org.apache.qpid.transport.MessageCreditUnit;
+import org.apache.qpid.transport.MessageTransfer;
+import org.apache.qpid.transport.Option;
+import org.apache.qpid.transport.Session;
+import org.apache.qpid.transport.SessionException;
+import org.apache.qpid.transport.SessionListener;
+import org.apache.qpid.transport.util.Logger;
+
+/**
+ * Qpid Broker facade.
+ *
+ * @author Andrea Gazzarini
+ */
+public class QpidService implements SessionListener
+{
+ private final static Logger LOGGER = Logger.get(QpidService.class);
+
+ private UUID _brokerId;
+ private Connection _connection;
+ private Session _session;
+ private Map<String,MessagePartListenerAdapter> _listeners;
+
+ /**
+ * Builds a new service with the given connection data.
+ *
+ * @param connectionData the connection data of the broker.
+ */
+ public QpidService(UUID brokerId)
+ {
+ this._brokerId = brokerId;
+ }
+
+ /**
+ * Estabilishes a connection with the broker.
+ *
+ * @throws QpidException in case of connection failure.
+ */
+ public void connect() throws Exception
+ {
+ _connection = QpidDatasource.getInstance().getConnection(_brokerId);
+ _listeners = new ConcurrentHashMap<String,MessagePartListenerAdapter>();
+ _session = _connection.createSession(0);
+ _session.setSessionListener(this);
+ }
+
+ public void opened(Session ssn) {}
+
+ public void resumed(Session ssn) {}
+
+ public void message(Session ssn, MessageTransfer xfr)
+ {
+ MessagePartListenerAdapter l = _listeners.get(xfr.getDestination());
+ if (l == null)
+ {
+ LOGGER.error("unhandled message: %s", xfr);
+ }
+ else
+ {
+ l.messageTransfer(xfr);
+ }
+ }
+
+
+ public void exception(Session ssn, SessionException exc)
+ {
+
+ }
+
+ public void closed(Session ssn) {}
+
+ /**
+ * All the previously entered outstanding commands are asynchronous.
+ * Synchronous behavior is achieved through invoking this method.
+ */
+ public void sync()
+ {
+ _session.sync();
+ }
+
+ /**
+ * Closes communication with broker.
+ */
+ public void close()
+ {
+ try
+ {
+ _session.close();
+ _session = null;
+ _listeners = null;
+ } catch (Exception e)
+ {
+ }
+ try
+ {
+ _connection.close();
+ _connection = null;
+ } catch (Exception e)
+ {
+ }
+ }
+
+ /**
+ * Associate a message listener with a destination therefore creating a new subscription.
+ *
+ * @param queueName The name of the queue that the subscriber is receiving messages from.
+ * @param destinationName the name of the destination, or delivery tag, for the subscriber.
+ * @param listener the listener for this destination.
+ *
+ * @see Session#messageSubscribe(String, String, short, short, org.apache.qpid.nclient.MessagePartListener, java.util.Map, org.apache.qpid.transport.Option...)
+ */
+ public void createSubscription(String queueName, String destinationName, MessageListener listener)
+ {
+ _listeners.put(destinationName, new MessagePartListenerAdapter(listener));
+ _session.messageSubscribe
+ (queueName,
+ destinationName,
+ MessageAcceptMode.NONE,
+ MessageAcquireMode.PRE_ACQUIRED,
+ null, 0, null);
+
+ _session.messageFlow(destinationName, MessageCreditUnit.BYTE, Session.UNLIMITED_CREDIT);
+ _session.messageFlow(destinationName, MessageCreditUnit.MESSAGE, Session.UNLIMITED_CREDIT);
+
+ LOGGER.debug(Messages.QMAN_200025_SUBSCRIPTION_DECLARED,queueName,destinationName);
+ }
+
+ /**
+ * Removes a previously declared consumer from the broker.
+ *
+ * @param destinationName the name of the destination, or delivery tag, for the subscriber.
+ * @see Session#messageCancel(String, Option...)
+ */
+ public void removeSubscription(String destinationName)
+ {
+ _session.messageCancel(destinationName);
+ LOGGER.debug(Messages.QMAN_200026_SUBSCRIPTION_REMOVED,destinationName);
+ }
+
+ /**
+ * Declares a queue on the broker with the given name.
+ *
+ * @param queueName the name of the declared queue.
+ * @see Session#queueDeclare(String, String, java.util.Map, Option...)
+ */
+ public void declareQueue(String queueName)
+ {
+ _session.queueDeclare(queueName, null, null);
+ LOGGER.debug(Messages.QMAN_200027_QUEUE_DECLARED,queueName);
+ }
+
+ /**
+ * Removes the queue with the given name from the broker.
+ *
+ * @param queueName the name of the queue.
+ * @see Session#queueDelete(String, Option...)
+ */
+ public void deleteQueue(String queueName)
+ {
+ _session.queueDelete(queueName);
+ LOGGER.debug(Messages.QMAN_200028_QUEUE_REMOVED,queueName);
+ }
+
+ /**
+ * Binds (on the broker) a queue with an exchange.
+ *
+ * @param queueName the name of the queue to bind.
+ * @param exchangeName the exchange name.
+ * @param routingKey the routing key used for the binding.
+ * @see Session#exchangeBind(String, String, String, java.util.Map, Option...)
+ */
+ public void declareBinding(String queueName, String exchangeName, String routingKey)
+ {
+ _session.exchangeBind(queueName, exchangeName, routingKey, null);
+ LOGGER.debug(Messages.QMAN_200029_BINDING_DECLARED,routingKey,queueName,exchangeName);
+ }
+
+ /**
+ * Removes a previously declare binding between an exchange and a queue.
+ *
+ * @param queueName the name of the queue.
+ * @param exchangeName the name of the exchange.
+ * @param routingKey the routing key used for binding.
+ */
+ public void declareUnbinding(String queueName, String exchangeName, String routingKey)
+ {
+ _session.exchangeUnbind(queueName, exchangeName, routingKey);
+ LOGGER.debug(Messages.QMAN_200030_BINDING_REMOVED,routingKey,queueName,exchangeName);
+ }
+
+ /**
+ * Sends a command message with the given data on the management queue.
+ *
+ * @param messageData the command message content.
+ */
+
+ /**
+ * Requests a schema for the given package.class.hash.
+ *
+ * @param packageName the package name.
+ * @param className the class name.
+ * @param schemaHash the schema hash.
+ * @throws IOException when the schema request cannot be sent.
+ */
+ public void requestSchema(final String packageName, final String className, final Binary schemaHash) throws IOException
+ {
+ Message message = new SchemaRequestMessage()
+ {
+ @Override
+ protected String className ()
+ {
+ return className;
+ }
+
+ @Override
+ protected String packageName ()
+ {
+ return packageName;
+ }
+
+ @Override
+ protected Binary schemaHash ()
+ {
+ return schemaHash;
+ }
+ };
+
+ sendMessage(message);
+ }
+
+ /**
+ * Invokes an operation on a broker object instance.
+ *
+ * @param packageName the package name.
+ * @param className the class name.
+ * @param schemaHash the schema hash of the corresponding class.
+ * @param objectId the object instance identifier.
+ * @param parameters the parameters for this invocation.
+ * @param method the method (definition) invoked.
+ * @param bankId the object bank identifier.
+ * @param brokerId the broker identifier.
+ * @return the sequence number used for this message.
+ * @throws MethodInvocationException when the invoked method returns an error code.
+ * @throws UnableToComplyException when it wasn't possibile to invoke the requested operation.
+ */
+ public void invoke(
+ final String packageName,
+ final String className,
+ final Binary schemaHash,
+ final Binary objectId,
+ final Object[] parameters,
+ final QpidMethod method,
+ final int sequenceNumber,
+ final long bankId,
+ final long brokerId) throws MethodInvocationException, UnableToComplyException
+ {
+ Message message = new MethodInvocationRequestMessage(bankId, brokerId)
+ {
+
+ @Override
+ protected int sequenceNumber ()
+ {
+ return sequenceNumber;
+ }
+
+ protected Binary objectId() {
+ return objectId;
+ }
+
+ protected String packageName()
+ {
+ return packageName;
+ }
+
+ protected String className()
+ {
+ return className;
+ }
+
+ @Override
+ protected QpidMethod method ()
+ {
+ return method;
+ }
+
+ @Override
+ protected Object[] parameters ()
+ {
+ return parameters;
+ }
+
+ @Override
+ protected Binary schemaHash ()
+ {
+ return schemaHash;
+ }
+ };
+
+ try {
+ sendMessage(message);
+ sync();
+ } catch(Exception exception) {
+ throw new UnableToComplyException(exception);
+ }
+ }
+
+ /**
+ * Sends a command message.
+ *
+ * @param message the command message.
+ * @throws IOException when the message cannot be sent.
+ */
+ public void sendMessage(Message message) throws IOException
+ {
+ _session.messageTransfer(
+ Names.MANAGEMENT_EXCHANGE,
+ MessageAcceptMode.EXPLICIT,
+ MessageAcquireMode.PRE_ACQUIRED,
+ message.getHeader(),
+ message.readData());
+ }
+}