diff options
Diffstat (limited to 'qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java')
-rw-r--r-- | qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java | 361 |
1 files changed, 361 insertions, 0 deletions
diff --git a/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java b/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java new file mode 100644 index 0000000000..bd7d305184 --- /dev/null +++ b/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java @@ -0,0 +1,361 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.management.domain.services; + +import java.io.IOException; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.qpid.api.Message; +import org.apache.qpid.management.Messages; +import org.apache.qpid.management.Names; +import org.apache.qpid.management.configuration.QpidDatasource; +import org.apache.qpid.management.domain.model.QpidMethod; +import org.apache.qpid.management.domain.model.type.Binary; +import org.apache.qpid.management.messages.MethodInvocationRequestMessage; +import org.apache.qpid.management.messages.SchemaRequestMessage; +import org.apache.qpid.nclient.util.MessageListener; +import org.apache.qpid.nclient.util.MessagePartListenerAdapter; +import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.MessageAcceptMode; +import org.apache.qpid.transport.MessageAcquireMode; +import org.apache.qpid.transport.MessageCreditUnit; +import org.apache.qpid.transport.MessageTransfer; +import org.apache.qpid.transport.Option; +import org.apache.qpid.transport.Session; +import org.apache.qpid.transport.SessionException; +import org.apache.qpid.transport.SessionListener; +import org.apache.qpid.transport.util.Logger; + +/** + * Qpid Broker facade. + * + * @author Andrea Gazzarini + */ +public class QpidService implements SessionListener +{ + private final static Logger LOGGER = Logger.get(QpidService.class); + + private UUID _brokerId; + private Connection _connection; + private Session _session; + private Map<String,MessagePartListenerAdapter> _listeners; + + /** + * Builds a new service with the given connection data. + * + * @param connectionData the connection data of the broker. + */ + public QpidService(UUID brokerId) + { + this._brokerId = brokerId; + } + + /** + * Estabilishes a connection with the broker. + * + * @throws QpidException in case of connection failure. + */ + public void connect() throws Exception + { + _connection = QpidDatasource.getInstance().getConnection(_brokerId); + _listeners = new ConcurrentHashMap<String,MessagePartListenerAdapter>(); + _session = _connection.createSession(0); + _session.setSessionListener(this); + } + + public void opened(Session ssn) {} + + public void resumed(Session ssn) {} + + public void message(Session ssn, MessageTransfer xfr) + { + MessagePartListenerAdapter l = _listeners.get(xfr.getDestination()); + if (l == null) + { + LOGGER.error("unhandled message: %s", xfr); + } + else + { + l.messageTransfer(xfr); + } + } + + + public void exception(Session ssn, SessionException exc) + { + + } + + public void closed(Session ssn) {} + + /** + * All the previously entered outstanding commands are asynchronous. + * Synchronous behavior is achieved through invoking this method. + */ + public void sync() + { + _session.sync(); + } + + /** + * Closes communication with broker. + */ + public void close() + { + try + { + _session.close(); + _session = null; + _listeners = null; + } catch (Exception e) + { + } + try + { + _connection.close(); + _connection = null; + } catch (Exception e) + { + } + } + + /** + * Associate a message listener with a destination therefore creating a new subscription. + * + * @param queueName The name of the queue that the subscriber is receiving messages from. + * @param destinationName the name of the destination, or delivery tag, for the subscriber. + * @param listener the listener for this destination. + * + * @see Session#messageSubscribe(String, String, short, short, org.apache.qpid.nclient.MessagePartListener, java.util.Map, org.apache.qpid.transport.Option...) + */ + public void createSubscription(String queueName, String destinationName, MessageListener listener) + { + _listeners.put(destinationName, new MessagePartListenerAdapter(listener)); + _session.messageSubscribe + (queueName, + destinationName, + MessageAcceptMode.NONE, + MessageAcquireMode.PRE_ACQUIRED, + null, 0, null); + + _session.messageFlow(destinationName, MessageCreditUnit.BYTE, Session.UNLIMITED_CREDIT); + _session.messageFlow(destinationName, MessageCreditUnit.MESSAGE, Session.UNLIMITED_CREDIT); + + LOGGER.debug(Messages.QMAN_200025_SUBSCRIPTION_DECLARED,queueName,destinationName); + } + + /** + * Removes a previously declared consumer from the broker. + * + * @param destinationName the name of the destination, or delivery tag, for the subscriber. + * @see Session#messageCancel(String, Option...) + */ + public void removeSubscription(String destinationName) + { + _session.messageCancel(destinationName); + LOGGER.debug(Messages.QMAN_200026_SUBSCRIPTION_REMOVED,destinationName); + } + + /** + * Declares a queue on the broker with the given name. + * + * @param queueName the name of the declared queue. + * @see Session#queueDeclare(String, String, java.util.Map, Option...) + */ + public void declareQueue(String queueName) + { + _session.queueDeclare(queueName, null, null); + LOGGER.debug(Messages.QMAN_200027_QUEUE_DECLARED,queueName); + } + + /** + * Removes the queue with the given name from the broker. + * + * @param queueName the name of the queue. + * @see Session#queueDelete(String, Option...) + */ + public void deleteQueue(String queueName) + { + _session.queueDelete(queueName); + LOGGER.debug(Messages.QMAN_200028_QUEUE_REMOVED,queueName); + } + + /** + * Binds (on the broker) a queue with an exchange. + * + * @param queueName the name of the queue to bind. + * @param exchangeName the exchange name. + * @param routingKey the routing key used for the binding. + * @see Session#exchangeBind(String, String, String, java.util.Map, Option...) + */ + public void declareBinding(String queueName, String exchangeName, String routingKey) + { + _session.exchangeBind(queueName, exchangeName, routingKey, null); + LOGGER.debug(Messages.QMAN_200029_BINDING_DECLARED,routingKey,queueName,exchangeName); + } + + /** + * Removes a previously declare binding between an exchange and a queue. + * + * @param queueName the name of the queue. + * @param exchangeName the name of the exchange. + * @param routingKey the routing key used for binding. + */ + public void declareUnbinding(String queueName, String exchangeName, String routingKey) + { + _session.exchangeUnbind(queueName, exchangeName, routingKey); + LOGGER.debug(Messages.QMAN_200030_BINDING_REMOVED,routingKey,queueName,exchangeName); + } + + /** + * Sends a command message with the given data on the management queue. + * + * @param messageData the command message content. + */ + + /** + * Requests a schema for the given package.class.hash. + * + * @param packageName the package name. + * @param className the class name. + * @param schemaHash the schema hash. + * @throws IOException when the schema request cannot be sent. + */ + public void requestSchema(final String packageName, final String className, final Binary schemaHash) throws IOException + { + Message message = new SchemaRequestMessage() + { + @Override + protected String className () + { + return className; + } + + @Override + protected String packageName () + { + return packageName; + } + + @Override + protected Binary schemaHash () + { + return schemaHash; + } + }; + + sendMessage(message); + } + + /** + * Invokes an operation on a broker object instance. + * + * @param packageName the package name. + * @param className the class name. + * @param schemaHash the schema hash of the corresponding class. + * @param objectId the object instance identifier. + * @param parameters the parameters for this invocation. + * @param method the method (definition) invoked. + * @param bankId the object bank identifier. + * @param brokerId the broker identifier. + * @return the sequence number used for this message. + * @throws MethodInvocationException when the invoked method returns an error code. + * @throws UnableToComplyException when it wasn't possibile to invoke the requested operation. + */ + public void invoke( + final String packageName, + final String className, + final Binary schemaHash, + final Binary objectId, + final Object[] parameters, + final QpidMethod method, + final int sequenceNumber, + final long bankId, + final long brokerId) throws MethodInvocationException, UnableToComplyException + { + Message message = new MethodInvocationRequestMessage(bankId, brokerId) + { + + @Override + protected int sequenceNumber () + { + return sequenceNumber; + } + + protected Binary objectId() { + return objectId; + } + + protected String packageName() + { + return packageName; + } + + protected String className() + { + return className; + } + + @Override + protected QpidMethod method () + { + return method; + } + + @Override + protected Object[] parameters () + { + return parameters; + } + + @Override + protected Binary schemaHash () + { + return schemaHash; + } + }; + + try { + sendMessage(message); + sync(); + } catch(Exception exception) { + throw new UnableToComplyException(exception); + } + } + + /** + * Sends a command message. + * + * @param message the command message. + * @throws IOException when the message cannot be sent. + */ + public void sendMessage(Message message) throws IOException + { + _session.messageTransfer( + Names.MANAGEMENT_EXCHANGE, + MessageAcceptMode.EXPLICIT, + MessageAcquireMode.PRE_ACQUIRED, + message.getHeader(), + message.readData()); + } +} |