diff options
Diffstat (limited to 'qpid/tools/src/java/qpid-qmf2-rest/src/main/java/org/apache/qpid/restapi/ConnectionProxy.java')
-rw-r--r-- | qpid/tools/src/java/qpid-qmf2-rest/src/main/java/org/apache/qpid/restapi/ConnectionProxy.java | 287 |
1 files changed, 287 insertions, 0 deletions
diff --git a/qpid/tools/src/java/qpid-qmf2-rest/src/main/java/org/apache/qpid/restapi/ConnectionProxy.java b/qpid/tools/src/java/qpid-qmf2-rest/src/main/java/org/apache/qpid/restapi/ConnectionProxy.java new file mode 100644 index 0000000000..435372a0d6 --- /dev/null +++ b/qpid/tools/src/java/qpid-qmf2-rest/src/main/java/org/apache/qpid/restapi/ConnectionProxy.java @@ -0,0 +1,287 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.restapi; + +// Misc Imports +import java.util.TimerTask; + +// JMS Imports +import javax.jms.Connection; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; + +// Simple Logging Facade 4 Java +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +// QMF2 Imports +import org.apache.qpid.qmf2.common.BlockingNotifier; +import org.apache.qpid.qmf2.common.QmfException; +import org.apache.qpid.qmf2.console.Console; +import org.apache.qpid.qmf2.util.ConnectionHelper; + +/** + * Contains a Connection object under a "leasehold agreement" whereby the Connection (and associated Sessions and QMF + * Consoles) will expire after a period of time. + * <p> + * The idea here is to allow a user to create multiple Connection instances (for example to monitor multiple brokers) + * but by using the lease metaphor we can expire instances that haven't been used for some predetermined period. + * Using the leashold agreement means that we don't have to rely on users explicitly deleting Connections that they + * are no longer interested in, because obviously we can't rely on that :-) + * + * @author Fraser Adams + */ +public final class ConnectionProxy extends TimerTask implements ExceptionListener +{ + private static final Logger _log = LoggerFactory.getLogger(ConnectionProxy.class); + + private static final int MAX_WORKITEM_QUEUE_SIZE = 20; // Maximum number of items allowed on WorkItem queue. + + // Connections expire after 20 minutes of no use. + private static final int TIMEOUT_THRESHOLD = (20*60000)/ConnectionStore.PING_PERIOD; + + // Connections expire after 1 minute if they have never been dereferenced. + private static final int UNUSED_THRESHOLD = 60000/ConnectionStore.PING_PERIOD; + + private Connection _connection; + private Console _console; + private boolean _connected; + private int _expireCount; + private final ConnectionStore _store; + private final String _name; + private final String _url; + private final String _connectionOptions; + private final boolean _disableEvents; + + /** + * Actually create the Qpid Connection and QMF2 Console specified in the Constructor. + */ + private synchronized void createConnection() + { + //System.out.println("ConnectionProxy createConnection() name: " + _name + ", thread: " + Thread.currentThread().getId() + ", creating connection to " + _url + ", options " + _connectionOptions); + try + { + _connection = ConnectionHelper.createConnection(_url, _connectionOptions); + if (_connection != null) + { + _connection.setExceptionListener(this); + + // N.B. creating a Console with a notifier causes the internal WorkQueue to get populated, so care must + // be taken to manage its size. In a normal Console application the application would only declare this + // if there was an intention to retrieve work items, but in a fairly general REST API we can't guarantee + // that clients will. ConsoleLease acts to make the WorkQueue "circular" by deleting items from the + // front of the WorkQueue if it exceeds a particular size. + if (_disableEvents) + { + _console = new Console(_name, null, null, null); + _console.disableEvents(); + } + else + { + BlockingNotifier notifier = new BlockingNotifier(); + _console = new Console(_name, null, notifier, null); + } + _console.addConnection(_connection); + _connected = true; + _expireCount = UNUSED_THRESHOLD; + notifyAll(); + } + } + catch (Exception ex) + { + _log.info("Exception {} caught in ConnectionProxy constructor.", ex.getMessage()); + _connected = false; + } + } + + /** + * This method blocks until the Connection has been created. + */ + public synchronized void waitForConnection() + { + while (!_connected) + { + try + { + wait(); + } + catch (InterruptedException ie) + { + continue; + } + } + } + + /** + * This method blocks until the Connection has been created or timeout expires (or wait has been interrupted). + * @param timeout the maximum time in milliseconds to wait for notification of the connection's availability. + */ + public synchronized void waitForConnection(long timeout) + { + try + { + wait(timeout); + } + catch (InterruptedException ie) + { // Ignore + } + } + + /** + * Construct a Proxy to the specified Qpid Connection with the supplied name to be stored in the specified store. + * @param store The ConnectionStore that we want to store this ConnectionProxy in. + * @param name A unique name for the Connection that we want to create. + * @param url A Connection URL using one of the forms supported by {@link org.apache.qpid.qmf2.util.ConnectionHelper}. + * @param connectionOptions A set of connection options in the form supported by {@link org.apache.qpid.qmf2.util.ConnectionHelper}. + * @param disableEvents if true create a QMF Console Connection that can only perform synchronous + * operations like getObjects() and cannot do asynchronous things like Agent discovery or receive Events. + */ + public ConnectionProxy(final ConnectionStore store, final String name, + final String url, final String connectionOptions, final boolean disableEvents) + { + _connected = false; + _store = store; + _name = name; + _url = url; + _connectionOptions = connectionOptions; + _disableEvents = disableEvents; + } + + /** + * The exception listener for the underlying Qpid Connection. This is used to trigger the ConnectionProxy internal + * reconnect logic. N.B. ConnectionProxy uses its own reconnection logic for two reasons: firstly the Qpid auto + * retry mechanism has some undesireable and unreliable behaviours prior to Qpid version 0.16 and secondly the + * Qpid auto retry mechanism is transparent whereas we actually <b>want</b> to detect connection failures in the REST + * API so that we can report failures back to the client. + * @param jmse The JMSException that has caused onException to be triggered. + */ + public void onException(JMSException jmse) + { + _log.info("ConnectionProxy onException {}", jmse.getMessage()); + _connected = false; + } + + /** + * This method is called periodically by {@link org.apache.qpid.restapi.ConnectionStore} to carry out a number + * of housekeeping tasks. It checks if the Qpid Connection is still connected and if not it attempts to reconnect + * it also checks whether the Connection "lease" has run out and if it has it tidies up the Connection. Finally + * it restricts the size of the QMF2 WorkItem queue as the REST API has no control over whether a client is or + * is not interested in being notified of QMF2 Events. + */ + public void run() + { + if (_connected) + { + //System.out.println("ConnectionProxy name: " + _name + ", thread: " + Thread.currentThread().getId() + ", WorkItem count = " + _console.getWorkitemCount()); + + while (_console.getWorkitemCount() > MAX_WORKITEM_QUEUE_SIZE) + { + _console.getNextWorkitem(); + } + + _expireCount--; + //System.out.println("ConnectionProxy name: " + _name + ", thread: " + Thread.currentThread().getId() + ", expireCount = " + _expireCount); + if (_expireCount == 0) + { + _store.delete(_name); + } + } + else + { + createConnection(); + } + } + + /** + * Stops scheduled housekeeping, destroys any attached QMF2 Console instances then closes the Qpid Connection. + */ + public synchronized void close() + { + //System.out.println("ConnectionProxy close() name: " + _name + ", thread: " + Thread.currentThread().getId() + ", expireCount = " + _expireCount); + + cancel(); + + try + { + _console.destroy(); + _connection.close(); + } + catch (Exception e) + { // Log and Ignore + _log.info("ConnectionProxy close() caught Exception {}", e.getMessage()); + } + } + + /** + * Retrieves the QMF2 Console that we've associated with this Connection. + * @return The QMF2 Console that we've associated with this Connection. + */ + public Console getConsole() + { + _expireCount = TIMEOUT_THRESHOLD; + return _console; + } + + /** + * Returns whether or not the Connection is currently connected to the broker. This is used by the REST API to + * tell any clients about the Connection state. + * @return true if currently connected or false if not. + */ + public boolean isConnected() + { + _expireCount = TIMEOUT_THRESHOLD; + return _connected; + } + + /** + * Returns the Connection URL String used to create the Connection. + * @return The Connection URL String used to create the Connection. + */ + public String getUrl() + { + _expireCount = TIMEOUT_THRESHOLD; + return _url; + } + + /** + * Returns the Connection options String used to create the Connection. + * @return The Connection options String used to create the Connection. + */ + public String getConnectionOptions() + { + _expireCount = TIMEOUT_THRESHOLD; + return _connectionOptions; + } + + /** + * Returns a String representation of a ConnectionProxy. + * @return The String representation of this ConnectionProxy Object. + */ + @Override + public String toString() + { + // The reason we use JSON.toMap on the string is because it is fairly tolerant and doesn't need pure JSON + // if we then call JSON.fromMap we get a pure JSON String. + return "{" + "\"url\":\"" + _url + "\",\"connectionOptions\":" + + JSON.fromMap(JSON.toMap(_connectionOptions)) + "}"; + } +} + |