summaryrefslogtreecommitdiff
path: root/qpid/tools/src/java/qpid-qmf2/src/main/java/org/apache/qpid/qmf2/console/SubscriptionManager.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/tools/src/java/qpid-qmf2/src/main/java/org/apache/qpid/qmf2/console/SubscriptionManager.java')
-rw-r--r--qpid/tools/src/java/qpid-qmf2/src/main/java/org/apache/qpid/qmf2/console/SubscriptionManager.java255
1 files changed, 255 insertions, 0 deletions
diff --git a/qpid/tools/src/java/qpid-qmf2/src/main/java/org/apache/qpid/qmf2/console/SubscriptionManager.java b/qpid/tools/src/java/qpid-qmf2/src/main/java/org/apache/qpid/qmf2/console/SubscriptionManager.java
new file mode 100644
index 0000000000..6e3e9e91b5
--- /dev/null
+++ b/qpid/tools/src/java/qpid-qmf2/src/main/java/org/apache/qpid/qmf2/console/SubscriptionManager.java
@@ -0,0 +1,255 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.qmf2.console;
+
+// Simple Logging Facade 4 Java
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// Misc Imports
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TimerTask;
+
+// QMF2 Imports
+import org.apache.qpid.qmf2.common.QmfQuery;
+
+/**
+ * A SubscriptionManager represents a running Subscription on the Console.
+ * <p>
+ * The main reason we have SubscriptionManagers as TimerTasks is to enable proper cleanup of the references stored in
+ * the subscriptionByHandle and subscriptionById Maps. Ideally these will be cleaned up by a client calling
+ * cancelSubscription but we can't rely on that as the client may forget or the Agent may not respond.
+ * <p>
+ * The SubscriptionManager acts like a client/Console side representation of a Subscription running on an Agent.
+ * As mentioned above its primary purpose is to enable references to Subscriptions maintained by the Console to
+ * be cleaned up should the Subscription time out rather than being cancelled, however as a side effect it is
+ * used to enable emulation of Subscriptions to the broker ManagementAgent, which does not yet natively implement
+ * Subscription.
+ * <p>
+ * To emulate Subscriptions the Console receives the periodic _data indications pushed by the ManagementAgent. The
+ * Console then iterates through Subscriptions referencing the broker Agent and evaluates their queries against
+ * the QmfConsoleData returned by the _data indication. Any QmfConsoleData that match the query are passed to the
+ * client application with the consoleHandle of the matching Subscription.
+ * <p>
+ * The following diagram illustrates the Subscription relationships with the Console and local Agent proxy.
+ * <p>
+ * <img src="doc-files/Subscriptions.png"/>
+ *
+ * @author Fraser Adams
+ */
+public final class SubscriptionManager extends TimerTask
+{
+ private static final Logger _log = LoggerFactory.getLogger(SubscriptionManager.class);
+
+ private final Agent _agent;
+ private long _startTime = System.currentTimeMillis();
+ private String _subscriptionId;
+ private String _consoleHandle;
+ private String _replyHandle;
+ private QmfQuery _query;
+ private long _duration = 0;
+ private long _interval = 0;
+ private boolean _waiting = true;
+
+ /**
+ * Construct a Console side proxy of a Subscription. Primarily to manage references to the Subscription.
+ *
+ * @param agent the Agent from which the Subscription has been requested
+ * @param query the QmfQuery that the Subscription will run
+ * @param consoleHandle the handle that uniquely identifies the Subscription
+ * @param interval the interval between subscription updates
+ * @param duration the duration of the subscription (assuming it doesn't get refreshed)
+ */
+ SubscriptionManager(final Agent agent, final QmfQuery query, final String consoleHandle,
+ final String replyHandle, final long interval, final long duration)
+ {
+ _agent = agent;
+ _query = query;
+ _consoleHandle = consoleHandle;
+ _replyHandle = replyHandle;
+ _interval = interval;
+ _duration = duration;
+ _log.debug("Creating SubscriptionManager {}, on Agent {}",_consoleHandle, _agent.getName());
+ }
+
+ /**
+ * This method gets called periodically by the Timer scheduling this TimerTask.
+ * <p>
+ * First a check is made to see if the Subscription has expired, if it has then it is cancelled.
+ */
+ public void run()
+ {
+ long elapsed = (long)Math.round((System.currentTimeMillis() - _startTime)/1000.0f);
+ if (elapsed >= _duration || !_agent.isActive())
+ {
+ _log.debug("Subscription {} has expired, removing", _subscriptionId);
+ // The Subscription has expired so cancel it
+ cancel();
+ }
+ }
+
+ /**
+ * Causes the current thread to wait until it is signalled or times out.
+ * <p>
+ * This method is primarily used as a means to enable a synchronous call to createSubscription().
+ * For most synchronous calls we simply use the receive() call on the synchronous session, but we can't do that
+ * for createSubscription() as we specifically need to use the replyTo on the asynchronous session as once
+ * subscriptions are created the results are asynchronously pushed. This means we have to get the response to
+ * createSession() on the asynchronous replyTo then signal the (blocked) main thread that the response has
+ * been received.
+ *
+ * @param timeout the maximum time to wait to be signalled.
+ */
+ public synchronized void await(final long timeout)
+ {
+ while (_waiting)
+ {
+ long _startTime = System.currentTimeMillis();
+ try
+ {
+ wait(timeout);
+ }
+ catch (InterruptedException ie)
+ {
+ continue;
+ }
+ // Measure elapsed time to test against spurious wakeups and ensure we really have timed out
+ long elapsedTime = (System.currentTimeMillis() - _startTime);
+ if (elapsedTime >= timeout)
+ {
+ break;
+ }
+ }
+ _waiting = true;
+ }
+
+ /**
+ * Wakes up all waiting threads.
+ */
+ public synchronized void signal()
+ {
+ _waiting = false;
+ notifyAll();
+ }
+
+ /**
+ * Refresh the subscription by zeroing its elapsed time.
+ */
+ public void refresh()
+ {
+ _log.debug("Refreshing Subscription {}", _subscriptionId);
+ _startTime = System.currentTimeMillis();
+ }
+
+ /**
+ * Cancel the Subscription, tidying references up and cancelling the TimerTask.
+ */
+ @Override
+ public boolean cancel()
+ {
+ _log.debug("Cancelling Subscription {}, {}", _consoleHandle, _subscriptionId);
+ _agent.removeSubscription(this);
+ signal(); // Just in case anything is blocking on this Subscription.
+ return super.cancel(); // Cancel the TimerTask
+ }
+
+ /**
+ * Set the SubscriptionId.
+ * @param subscriptionId the new SubscriptionId of this Subscription.
+ */
+ public void setSubscriptionId(final String subscriptionId)
+ {
+ _subscriptionId = subscriptionId;
+ }
+
+ /**
+ * return the SubscriptionId of this Subscription.
+ * @return the SubscriptionId of this Subscription.
+ */
+ public String getSubscriptionId()
+ {
+ return _subscriptionId;
+ }
+
+ /**
+ * Return the consoleHandle of this Subscription.
+ * @return the consoleHandle of this Subscription.
+ */
+ public String getConsoleHandle()
+ {
+ return _consoleHandle;
+ }
+
+ /**
+ * Return the replyHandle of this Subscription.
+ * @return the replyHandle of this Subscription.
+ */
+ public String getReplyHandle()
+ {
+ return _replyHandle;
+ }
+
+ /**
+ * Return the Agent running this Subscription.
+ * @return the Agent running this Subscription.
+ */
+ public Agent getAgent()
+ {
+ return _agent;
+ }
+
+ /**
+ * Set the Subscription lifetime in seconds.
+ *
+ * @param duration the new Subscription lifetime in seconds
+ */
+ public void setDuration(final long duration)
+ {
+ _duration = duration;
+ }
+
+ /**
+ * Return The Subscription's QmfQuery.
+ * @return The Subscription's QmfQuery.
+ */
+ public QmfQuery getQuery()
+ {
+ return _query;
+ }
+
+ /**
+ * Create a Map encoded version.
+ * <p>
+ * When we do a synchronous createSubscription the Subscription itself holds the info needed to populate
+ * the SubscriptionParams result. We encode the info in a Map to pass to the SubscribeParams Constructor
+ */
+ public Map<String, Object> mapEncode()
+ {
+ Map<String, Object> map = new HashMap<String, Object>();
+ map.put("_interval", _interval);
+ map.put("_duration", _duration);
+ map.put("_subscription_id", _subscriptionId);
+ return map;
+ }
+}
+
+