summaryrefslogtreecommitdiff
path: root/qpid/tools/src/java/qpid-qmf2-rest/src/main/java/org/apache/qpid/restapi/ConnectionProxy.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/tools/src/java/qpid-qmf2-rest/src/main/java/org/apache/qpid/restapi/ConnectionProxy.java')
-rw-r--r--qpid/tools/src/java/qpid-qmf2-rest/src/main/java/org/apache/qpid/restapi/ConnectionProxy.java287
1 files changed, 287 insertions, 0 deletions
diff --git a/qpid/tools/src/java/qpid-qmf2-rest/src/main/java/org/apache/qpid/restapi/ConnectionProxy.java b/qpid/tools/src/java/qpid-qmf2-rest/src/main/java/org/apache/qpid/restapi/ConnectionProxy.java
new file mode 100644
index 0000000000..435372a0d6
--- /dev/null
+++ b/qpid/tools/src/java/qpid-qmf2-rest/src/main/java/org/apache/qpid/restapi/ConnectionProxy.java
@@ -0,0 +1,287 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.restapi;
+
+// Misc Imports
+import java.util.TimerTask;
+
+// JMS Imports
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+
+// Simple Logging Facade 4 Java
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// QMF2 Imports
+import org.apache.qpid.qmf2.common.BlockingNotifier;
+import org.apache.qpid.qmf2.common.QmfException;
+import org.apache.qpid.qmf2.console.Console;
+import org.apache.qpid.qmf2.util.ConnectionHelper;
+
+/**
+ * Contains a Connection object under a "leasehold agreement" whereby the Connection (and associated Sessions and QMF
+ * Consoles) will expire after a period of time.
+ * <p>
+ * The idea here is to allow a user to create multiple Connection instances (for example to monitor multiple brokers)
+ * but by using the lease metaphor we can expire instances that haven't been used for some predetermined period.
+ * Using the leashold agreement means that we don't have to rely on users explicitly deleting Connections that they
+ * are no longer interested in, because obviously we can't rely on that :-)
+ *
+ * @author Fraser Adams
+ */
+public final class ConnectionProxy extends TimerTask implements ExceptionListener
+{
+ private static final Logger _log = LoggerFactory.getLogger(ConnectionProxy.class);
+
+ private static final int MAX_WORKITEM_QUEUE_SIZE = 20; // Maximum number of items allowed on WorkItem queue.
+
+ // Connections expire after 20 minutes of no use.
+ private static final int TIMEOUT_THRESHOLD = (20*60000)/ConnectionStore.PING_PERIOD;
+
+ // Connections expire after 1 minute if they have never been dereferenced.
+ private static final int UNUSED_THRESHOLD = 60000/ConnectionStore.PING_PERIOD;
+
+ private Connection _connection;
+ private Console _console;
+ private boolean _connected;
+ private int _expireCount;
+ private final ConnectionStore _store;
+ private final String _name;
+ private final String _url;
+ private final String _connectionOptions;
+ private final boolean _disableEvents;
+
+ /**
+ * Actually create the Qpid Connection and QMF2 Console specified in the Constructor.
+ */
+ private synchronized void createConnection()
+ {
+ //System.out.println("ConnectionProxy createConnection() name: " + _name + ", thread: " + Thread.currentThread().getId() + ", creating connection to " + _url + ", options " + _connectionOptions);
+ try
+ {
+ _connection = ConnectionHelper.createConnection(_url, _connectionOptions);
+ if (_connection != null)
+ {
+ _connection.setExceptionListener(this);
+
+ // N.B. creating a Console with a notifier causes the internal WorkQueue to get populated, so care must
+ // be taken to manage its size. In a normal Console application the application would only declare this
+ // if there was an intention to retrieve work items, but in a fairly general REST API we can't guarantee
+ // that clients will. ConsoleLease acts to make the WorkQueue "circular" by deleting items from the
+ // front of the WorkQueue if it exceeds a particular size.
+ if (_disableEvents)
+ {
+ _console = new Console(_name, null, null, null);
+ _console.disableEvents();
+ }
+ else
+ {
+ BlockingNotifier notifier = new BlockingNotifier();
+ _console = new Console(_name, null, notifier, null);
+ }
+ _console.addConnection(_connection);
+ _connected = true;
+ _expireCount = UNUSED_THRESHOLD;
+ notifyAll();
+ }
+ }
+ catch (Exception ex)
+ {
+ _log.info("Exception {} caught in ConnectionProxy constructor.", ex.getMessage());
+ _connected = false;
+ }
+ }
+
+ /**
+ * This method blocks until the Connection has been created.
+ */
+ public synchronized void waitForConnection()
+ {
+ while (!_connected)
+ {
+ try
+ {
+ wait();
+ }
+ catch (InterruptedException ie)
+ {
+ continue;
+ }
+ }
+ }
+
+ /**
+ * This method blocks until the Connection has been created or timeout expires (or wait has been interrupted).
+ * @param timeout the maximum time in milliseconds to wait for notification of the connection's availability.
+ */
+ public synchronized void waitForConnection(long timeout)
+ {
+ try
+ {
+ wait(timeout);
+ }
+ catch (InterruptedException ie)
+ { // Ignore
+ }
+ }
+
+ /**
+ * Construct a Proxy to the specified Qpid Connection with the supplied name to be stored in the specified store.
+ * @param store The ConnectionStore that we want to store this ConnectionProxy in.
+ * @param name A unique name for the Connection that we want to create.
+ * @param url A Connection URL using one of the forms supported by {@link org.apache.qpid.qmf2.util.ConnectionHelper}.
+ * @param connectionOptions A set of connection options in the form supported by {@link org.apache.qpid.qmf2.util.ConnectionHelper}.
+ * @param disableEvents if true create a QMF Console Connection that can only perform synchronous
+ * operations like getObjects() and cannot do asynchronous things like Agent discovery or receive Events.
+ */
+ public ConnectionProxy(final ConnectionStore store, final String name,
+ final String url, final String connectionOptions, final boolean disableEvents)
+ {
+ _connected = false;
+ _store = store;
+ _name = name;
+ _url = url;
+ _connectionOptions = connectionOptions;
+ _disableEvents = disableEvents;
+ }
+
+ /**
+ * The exception listener for the underlying Qpid Connection. This is used to trigger the ConnectionProxy internal
+ * reconnect logic. N.B. ConnectionProxy uses its own reconnection logic for two reasons: firstly the Qpid auto
+ * retry mechanism has some undesireable and unreliable behaviours prior to Qpid version 0.16 and secondly the
+ * Qpid auto retry mechanism is transparent whereas we actually <b>want</b> to detect connection failures in the REST
+ * API so that we can report failures back to the client.
+ * @param jmse The JMSException that has caused onException to be triggered.
+ */
+ public void onException(JMSException jmse)
+ {
+ _log.info("ConnectionProxy onException {}", jmse.getMessage());
+ _connected = false;
+ }
+
+ /**
+ * This method is called periodically by {@link org.apache.qpid.restapi.ConnectionStore} to carry out a number
+ * of housekeeping tasks. It checks if the Qpid Connection is still connected and if not it attempts to reconnect
+ * it also checks whether the Connection "lease" has run out and if it has it tidies up the Connection. Finally
+ * it restricts the size of the QMF2 WorkItem queue as the REST API has no control over whether a client is or
+ * is not interested in being notified of QMF2 Events.
+ */
+ public void run()
+ {
+ if (_connected)
+ {
+ //System.out.println("ConnectionProxy name: " + _name + ", thread: " + Thread.currentThread().getId() + ", WorkItem count = " + _console.getWorkitemCount());
+
+ while (_console.getWorkitemCount() > MAX_WORKITEM_QUEUE_SIZE)
+ {
+ _console.getNextWorkitem();
+ }
+
+ _expireCount--;
+ //System.out.println("ConnectionProxy name: " + _name + ", thread: " + Thread.currentThread().getId() + ", expireCount = " + _expireCount);
+ if (_expireCount == 0)
+ {
+ _store.delete(_name);
+ }
+ }
+ else
+ {
+ createConnection();
+ }
+ }
+
+ /**
+ * Stops scheduled housekeeping, destroys any attached QMF2 Console instances then closes the Qpid Connection.
+ */
+ public synchronized void close()
+ {
+ //System.out.println("ConnectionProxy close() name: " + _name + ", thread: " + Thread.currentThread().getId() + ", expireCount = " + _expireCount);
+
+ cancel();
+
+ try
+ {
+ _console.destroy();
+ _connection.close();
+ }
+ catch (Exception e)
+ { // Log and Ignore
+ _log.info("ConnectionProxy close() caught Exception {}", e.getMessage());
+ }
+ }
+
+ /**
+ * Retrieves the QMF2 Console that we've associated with this Connection.
+ * @return The QMF2 Console that we've associated with this Connection.
+ */
+ public Console getConsole()
+ {
+ _expireCount = TIMEOUT_THRESHOLD;
+ return _console;
+ }
+
+ /**
+ * Returns whether or not the Connection is currently connected to the broker. This is used by the REST API to
+ * tell any clients about the Connection state.
+ * @return true if currently connected or false if not.
+ */
+ public boolean isConnected()
+ {
+ _expireCount = TIMEOUT_THRESHOLD;
+ return _connected;
+ }
+
+ /**
+ * Returns the Connection URL String used to create the Connection.
+ * @return The Connection URL String used to create the Connection.
+ */
+ public String getUrl()
+ {
+ _expireCount = TIMEOUT_THRESHOLD;
+ return _url;
+ }
+
+ /**
+ * Returns the Connection options String used to create the Connection.
+ * @return The Connection options String used to create the Connection.
+ */
+ public String getConnectionOptions()
+ {
+ _expireCount = TIMEOUT_THRESHOLD;
+ return _connectionOptions;
+ }
+
+ /**
+ * Returns a String representation of a ConnectionProxy.
+ * @return The String representation of this ConnectionProxy Object.
+ */
+ @Override
+ public String toString()
+ {
+ // The reason we use JSON.toMap on the string is because it is fairly tolerant and doesn't need pure JSON
+ // if we then call JSON.fromMap we get a pure JSON String.
+ return "{" + "\"url\":\"" + _url + "\",\"connectionOptions\":" +
+ JSON.fromMap(JSON.toMap(_connectionOptions)) + "}";
+ }
+}
+