summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2015-03-01 11:43:02 +0000
committerRobert Godfrey <rgodfrey@apache.org>2015-03-01 11:43:02 +0000
commit7dd86708f3542c03118ded5b8db74328cef71c84 (patch)
tree7febec6a59ae1b95499ba45a75004475d083bd77
parent36d2c895515960b4fa4c864f2816963ad63bab2e (diff)
downloadqpid-python-7dd86708f3542c03118ded5b8db74328cef71c84.tar.gz
QPID-6423 : [Java Broker] Allow plugin of custom user queue reports using the REST API
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1663082 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java3
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/QueueBinaryReport.java28
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/QueueReport.java161
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/QueueTextReport.java28
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java408
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportableMessage.java42
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportableMessageHeader.java58
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/QueueReportServlet.java103
-rw-r--r--qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/report/ReportRunnerTest.java186
-rw-r--r--qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/report/TestBinaryReport.java114
-rw-r--r--qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/report/TestTextReport.java84
-rw-r--r--qpid/java/broker-plugins/management-http/src/test/resources/META-INF/services/org.apache.qpid.server.management.plugin.report.QueueReport2
12 files changed, 1217 insertions, 0 deletions
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
index 4e340c7b72..69920ff488 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
+++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
@@ -69,6 +69,7 @@ import org.apache.qpid.server.management.plugin.servlet.rest.LogoutServlet;
import org.apache.qpid.server.management.plugin.servlet.rest.MessageContentServlet;
import org.apache.qpid.server.management.plugin.servlet.rest.MessageServlet;
import org.apache.qpid.server.management.plugin.servlet.rest.MetaDataServlet;
+import org.apache.qpid.server.management.plugin.servlet.rest.QueueReportServlet;
import org.apache.qpid.server.management.plugin.servlet.rest.RestServlet;
import org.apache.qpid.server.management.plugin.servlet.rest.SaslServlet;
import org.apache.qpid.server.management.plugin.servlet.rest.StructureServlet;
@@ -304,6 +305,8 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem
root.addServlet(new ServletHolder(new StructureServlet()), "/service/structure");
root.addServlet(new ServletHolder(new MessageServlet()), "/service/message/*");
root.addServlet(new ServletHolder(new MessageContentServlet()), "/service/message-content/*");
+ root.addServlet(new ServletHolder(new QueueReportServlet()), "/service/queuereport/*");
+
root.addServlet(new ServletHolder(new LogRecordsServlet()), "/service/logrecords");
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/QueueBinaryReport.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/QueueBinaryReport.java
new file mode 100644
index 0000000000..d842de3f1b
--- /dev/null
+++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/QueueBinaryReport.java
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.management.plugin.report;
+
+public abstract class QueueBinaryReport extends QueueReport<byte[]>
+{
+ public QueueBinaryReport()
+ {
+ }
+}
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/QueueReport.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/QueueReport.java
new file mode 100644
index 0000000000..23b24aaf8d
--- /dev/null
+++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/QueueReport.java
@@ -0,0 +1,161 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.management.plugin.report;
+
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.VirtualHost;
+
+/**
+ * <p>
+ * The QueueReport class provides an extension point for installations to provide custom management reporting on
+ * queues through the REST API.
+ * </p>
+ *
+ * <p>
+ * A custom QueueReport must extend either {@link org.apache.qpid.server.management.plugin.report.QueueTextReport}
+ * or {@link org.apache.qpid.server.management.plugin.report.QueueBinaryReport}. The report implementation must
+ * define a {@link #getName() name} which is unique amongst all installed reports. The report class must be present
+ * in the classpath of the broker, and a provider-configuration file named
+ * org.apache.qpid.server.management.plugin.report.QueueReport must be added in the resource directory
+ * META-INF/services directory with the binary name of the implementation (as described in
+ * {@link java.util.ServiceLoader ServiceLoader}).
+ * </p>
+ *
+ * <h3>Running reports</h3>
+ * <p>
+ * The report can be run using the URL:
+ * {@code http://<broker>/service/queuereport/<virtualhost name>/<queue name>/<report name>[?param1=x&param2=y...]}
+ * </p>
+ *
+ * <h4>Report Parameters</h4>
+ *
+ * <p>
+ * Reports can take parameters from the query string of the HTTP request. For every parameter in the query string
+ * the system will look for a setter on the report object with either a String or String[] parameter. Thus if
+ * the query string contains {@code foo=bar}, then the system will look for a setter {@code setFoo(String value)} or
+ * {@code setFoo(String[] value)}. If the same parameter occurs multiple times in the query string then only the
+ * array variant of the setter will be called.
+ * </p>
+ * <p>
+ * Setters for the parameters are guaranteed to be called before the first message is added to the report.
+ * </p>
+ *
+ * <p>
+ * NOTE: In order to comply with the requirements of the {@link java.util.ServiceLoader ServiceLoader} api, all
+ * implementations of QueueReport MUST provide a public no-args constructor.
+ * </p>
+ * @param <T>
+ */
+public abstract class QueueReport<T>
+{
+ private Queue<?> _queue;
+
+ QueueReport()
+ {
+
+ }
+
+ /**
+ * Gets the name of the report.
+ * <p>
+ * The name of the report must be unique amongst all installed implementations. The name of the report
+ * is examined by the Qpid immediately upon construction. The name should not change during
+ * the lifetime of the object (the value is only meaningful to the system at the time of initial construction)
+ * and all instances of the same concrete implementation should have the same name.
+ * </p>
+ * @return the name of the report
+ */
+ public abstract String getName();
+
+ /**
+ * Get the name of the queue against which the report is being run.
+ *
+ * @return the name of the queue
+ */
+ public final String getQueueName()
+ {
+ return _queue.getName();
+ }
+
+ final void setQueue(final Queue<?> queue)
+ {
+ _queue = queue;
+ }
+
+ /**
+ * Get the name of the virtual host against which the report is being run.
+ *
+ * @return the name of the virtual host
+ */
+ public final String getVirtualHostName()
+ {
+ return _queue.getParent(VirtualHost.class).getName();
+ }
+
+ /**
+ *
+ * The value returned by getContentType() will be used to set the Content-Type HTTP header field
+ *
+ * @return the value to use for the content-type HTTP header field
+ */
+ public abstract String getContentType();
+
+ /**
+ * Called by the system to add a message to the report.
+ *
+ * <p>
+ * The method is called by the system for every message on the queue, or until {@link #isComplete()} returns true.
+ * </p>
+ * @param reportableMessage the message to add to the report
+ */
+ public abstract void addMessage(final ReportableMessage reportableMessage);
+
+ /**
+ * Informs the system if the report is complete (i.e. does not need to report on any more messages).
+ *
+ * <p>
+ * This method will be called by the system after each message is {@link #addMessage(ReportableMessage) added}
+ * to the report. If a report is only interested in some messages, and can determine that the addition of more
+ * messages will not vary the content of the report, then it can return true.
+ * </p>
+ * <p>
+ * If this method always returns false, then all messages from the queue will be added to the report.
+ * </p>
+ * <p>
+ * NOTE: Retrieving content or properties of the message may require it to be reloaded from disk, and so care
+ * should be taken by reports to only access properties/content of the message if it is going to be required
+ * for the report production.
+ * </p>
+ *
+ * @return true if the report does not want to report on any more messages in the queue
+ */
+ public abstract boolean isComplete();
+
+ /**
+ * Called by the system to get the content of the report to retrun to the user.
+ * <p>
+ * The system guarantees to only call this method once
+ * </p>
+ * @return the report content.
+ */
+ public abstract T getReport();
+
+}
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/QueueTextReport.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/QueueTextReport.java
new file mode 100644
index 0000000000..09bc5c4229
--- /dev/null
+++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/QueueTextReport.java
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.management.plugin.report;
+
+public abstract class QueueTextReport extends QueueReport<String>
+{
+ public QueueTextReport()
+ {
+ }
+}
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java
new file mode 100644
index 0000000000..2a05cfc9a1
--- /dev/null
+++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java
@@ -0,0 +1,408 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.management.plugin.report;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.queue.QueueEntryVisitor;
+
+public class ReportRunner<T>
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(ReportRunner.class);
+
+ private static final Set<Class> IMMUTABLE_CLASSES = new HashSet<>(Arrays.<Class>asList(
+ Boolean.class,
+ Byte.class,
+ Short.class,
+ Character.class,
+ Integer.class,
+ Long.class,
+ Float.class,
+ Double.class,
+ UUID.class,
+ Date.class,
+ String.class
+ ));
+
+ private ReportRunner(final QueueReport<T> report)
+ {
+ _report = report;
+ }
+
+ public boolean isBinaryReport()
+ {
+ return _report instanceof QueueBinaryReport;
+ }
+
+ public static ReportRunner<?> createRunner(final String reportName, final Map<String, String[]> parameterMap)
+ {
+ QueueReport<?> report = getReport(reportName);
+ setReportParameters(report, parameterMap);
+ return new ReportRunner<>(report);
+ }
+
+ private static void setReportParameters(final QueueReport<?> report, final Map<String, String[]> parameterMap)
+ {
+ if(parameterMap != null && !parameterMap.isEmpty())
+ {
+ Class<? extends QueueReport> clazz = report.getClass();
+ for(Map.Entry<String,String[]> entry : parameterMap.entrySet())
+ {
+ String key = entry.getKey();
+ String[] value = entry.getValue();
+ if(isValidName(key))
+ {
+
+ StringBuilder setterName = new StringBuilder("set");
+ setterName.append(key.substring(0,1).toUpperCase());
+ if(key.length()>1)
+ {
+ setterName.append(key.substring(1));
+ }
+ Method method = null;
+ try
+ {
+
+ if (value == null || value.length == 0 || value.length == 1)
+ {
+ try
+ {
+ method = clazz.getMethod(setterName.toString(), String.class);
+ method.invoke(report, value == null || value.length == 0 ? null : value[0]);
+ }
+ catch (NoSuchMethodException | IllegalAccessException e)
+ {
+ method = null;
+ }
+ }
+ if (method == null)
+ {
+ try
+ {
+ method = clazz.getMethod(setterName.toString(), String[].class);
+ method.invoke(report, new Object[] { value });
+ }
+ catch (NoSuchMethodException | IllegalAccessException e)
+ {
+ LOGGER.info("Unknown parameter '"
+ + key
+ + "' (no setter) for report "
+ + report.getName());
+ }
+ }
+ }
+ catch (InvocationTargetException e)
+ {
+ LOGGER.info("Error setting parameter '" + key + "' for report " + report.getName(), e);
+ }
+ }
+ else
+ {
+ LOGGER.info("Invalid parameter name '" + key + "' running report " + report.getName());
+ }
+ }
+ }
+ }
+
+ private static boolean isValidName(final String key)
+ {
+ if(key != null && key.length() != 0)
+ {
+ if(Character.isJavaIdentifierStart(key.charAt(0)))
+ {
+ for(int i = 1; i < key.length(); i++)
+ {
+ if(!Character.isJavaIdentifierPart(key.charAt(i)))
+ {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ }
+ return false;
+
+ }
+
+ private static QueueReport<?> getReport(final String reportName)
+ {
+ ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+ for (final QueueReport report : ServiceLoader.load(QueueReport.class, classLoader))
+ {
+ if (report.getName().equals(reportName))
+ {
+ try
+ {
+ return report.getClass().newInstance();
+ }
+ catch (InstantiationException | IllegalAccessException e)
+ {
+ // can't happen as by definition must have public noargs constructor
+ }
+ }
+ }
+ throw new IllegalArgumentException("Unknown report: " + reportName);
+ }
+
+ public String getContentType()
+ {
+ return _report.getContentType();
+ }
+
+
+ private static class ReportVisitor implements QueueEntryVisitor
+ {
+
+ private final QueueReport _report;
+
+ public ReportVisitor(final QueueReport report)
+ {
+ _report = report;
+ }
+
+ @Override
+ public boolean visit(final QueueEntry entry)
+ {
+ _report.addMessage(convertMessage(entry.getMessage()));
+ return _report.isComplete();
+ }
+
+
+ }
+
+
+ private static ReportableMessage convertMessage(final ServerMessage message)
+ {
+ return new ReportableMessage()
+ {
+ @Override
+ public String getInitialRoutingAddress()
+ {
+ return message.getInitialRoutingAddress();
+ }
+
+ @Override
+ public ReportableMessageHeader getMessageHeader()
+ {
+ return convertMessageHeader(message.getMessageHeader());
+ }
+
+ @Override
+ public ByteBuffer getContent()
+ {
+ ByteBuffer content = message.getContent(0, (int) getSize());
+
+ return content.asReadOnlyBuffer();
+ }
+
+ @Override
+ public boolean isPersistent()
+ {
+ return message.isPersistent();
+ }
+
+ @Override
+ public long getSize()
+ {
+ return message.getSize();
+ }
+
+ @Override
+ public long getExpiration()
+ {
+ return message.getExpiration();
+ }
+
+ @Override
+ public long getMessageNumber()
+ {
+ return message.getMessageNumber();
+ }
+
+ @Override
+ public long getArrivalTime()
+ {
+ return message.getArrivalTime();
+ }
+ };
+ }
+
+ private static ReportableMessageHeader convertMessageHeader(final AMQMessageHeader messageHeader)
+ {
+ return new ReportableMessageHeader()
+ {
+ @Override
+ public String getCorrelationId()
+ {
+ return messageHeader.getCorrelationId();
+ }
+
+ @Override
+ public long getExpiration()
+ {
+ return messageHeader.getExpiration();
+ }
+
+ @Override
+ public String getUserId()
+ {
+ return messageHeader.getUserId();
+ }
+
+ @Override
+ public String getAppId()
+ {
+ return messageHeader.getAppId();
+ }
+
+ @Override
+ public String getMessageId()
+ {
+ return messageHeader.getMessageId();
+ }
+
+ @Override
+ public String getMimeType()
+ {
+ return messageHeader.getMimeType();
+ }
+
+ @Override
+ public String getEncoding()
+ {
+ return messageHeader.getEncoding();
+ }
+
+ @Override
+ public byte getPriority()
+ {
+ return messageHeader.getPriority();
+ }
+
+ @Override
+ public long getTimestamp()
+ {
+ return messageHeader.getTimestamp();
+ }
+
+ @Override
+ public String getType()
+ {
+ return messageHeader.getType();
+ }
+
+ @Override
+ public String getReplyTo()
+ {
+ return messageHeader.getReplyTo();
+ }
+
+ @Override
+ public Object getHeader(final String name)
+ {
+ return makeImmutable(messageHeader.getHeader(name));
+ }
+
+ @Override
+ public boolean containsHeaders(final Set<String> names)
+ {
+ return messageHeader.containsHeaders(names);
+ }
+
+ @Override
+ public boolean containsHeader(final String name)
+ {
+ return messageHeader.containsHeader(name);
+ }
+
+ @Override
+ public Collection<String> getHeaderNames()
+ {
+ return Collections.unmodifiableCollection(messageHeader.getHeaderNames());
+ }
+ };
+ }
+
+ private static Object makeImmutable(final Object value)
+ {
+ if(value == null || IMMUTABLE_CLASSES.contains(value.getClass()))
+ {
+ return value;
+ }
+ else if(value instanceof byte[])
+ {
+ return ByteBuffer.wrap((byte[])value).asReadOnlyBuffer();
+ }
+ else if(value instanceof List)
+ {
+ List orig = (List) value;
+ List<Object> copy = new ArrayList<>(orig.size());
+ for(Object element : orig)
+ {
+ copy.add(makeImmutable(element));
+ }
+ return copy;
+ }
+ else if(value instanceof Map)
+ {
+ Map<?,?> orig = (Map<?,?>) value;
+ LinkedHashMap<Object,Object> copy = new LinkedHashMap<>();
+ for(Map.Entry<?,?> entry : orig.entrySet())
+ {
+ copy.put(makeImmutable(entry.getKey()),makeImmutable(entry.getValue()));
+ }
+ return copy;
+ }
+ else return null;
+ }
+
+ private final QueueReport<T> _report;
+
+ public final T runReport(Queue<?> queue)
+ {
+ _report.setQueue(queue);
+ ReportVisitor visitor = new ReportVisitor(_report);
+ queue.visit(visitor);
+ return _report.getReport();
+ }
+}
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportableMessage.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportableMessage.java
new file mode 100644
index 0000000000..00b6c4abeb
--- /dev/null
+++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportableMessage.java
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.management.plugin.report;
+
+import java.nio.ByteBuffer;
+
+public interface ReportableMessage
+{
+ String getInitialRoutingAddress();
+
+ ReportableMessageHeader getMessageHeader();
+
+ public ByteBuffer getContent();
+
+ boolean isPersistent();
+
+ long getSize();
+
+ long getExpiration();
+
+ long getMessageNumber();
+
+ long getArrivalTime();
+}
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportableMessageHeader.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportableMessageHeader.java
new file mode 100644
index 0000000000..e78415f8d0
--- /dev/null
+++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportableMessageHeader.java
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.management.plugin.report;
+
+import java.util.Collection;
+import java.util.Set;
+
+public interface ReportableMessageHeader
+{
+ String getCorrelationId();
+
+ long getExpiration();
+
+ String getUserId();
+
+ String getAppId();
+
+ String getMessageId();
+
+ String getMimeType();
+
+ String getEncoding();
+
+ byte getPriority();
+
+ long getTimestamp();
+
+ String getType();
+
+ String getReplyTo();
+
+ Object getHeader(String name);
+
+ boolean containsHeaders(Set<String> names);
+
+ boolean containsHeader(String name);
+
+ Collection<String> getHeaderNames();
+
+}
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/QueueReportServlet.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/QueueReportServlet.java
new file mode 100644
index 0000000000..2b3def2dab
--- /dev/null
+++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/QueueReportServlet.java
@@ -0,0 +1,103 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.management.plugin.servlet.rest;
+
+import java.io.IOException;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.qpid.server.management.plugin.report.ReportRunner;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.VirtualHost;
+
+public class QueueReportServlet extends AbstractServlet
+{
+ @Override
+ protected void doGetWithSubjectAndActor(HttpServletRequest request, HttpServletResponse response) throws
+ IOException,
+ ServletException
+ {
+ String[] pathInfoElements = getPathInfoElements(request);
+ if(pathInfoElements != null && pathInfoElements.length == 3)
+ {
+ Queue<?> queue = getQueueFromRequest(request);
+ ReportRunner<?> reportRunner = ReportRunner.createRunner(pathInfoElements[2],request.getParameterMap());
+ Object output = reportRunner.runReport(queue);
+ response.setContentType(reportRunner.getContentType());
+ if(reportRunner.isBinaryReport())
+ {
+ response.getOutputStream().write((byte[])output);
+ }
+ else
+ {
+ response.getWriter().write((String)output);
+ }
+ }
+ else
+ {
+ throw new IllegalArgumentException("Invalid path is specified");
+ }
+
+ }
+
+ private Queue<?> getQueueFromRequest(HttpServletRequest request)
+ {
+ String[] pathInfoElements = getPathInfoElements(request);
+ if(pathInfoElements == null || pathInfoElements.length < 2)
+ {
+ throw new IllegalArgumentException("Invalid path is specified");
+ }
+ String vhostName = pathInfoElements[0];
+ String queueName = pathInfoElements[1];
+
+ VirtualHost<?,?,?> vhost = getBroker().findVirtualHostByName(vhostName);
+ if (vhost == null)
+ {
+ throw new IllegalArgumentException("Could not find virtual host with name '" + vhostName + "'");
+ }
+
+ Queue queueFromVirtualHost = getQueueFromVirtualHost(queueName, vhost);
+ if (queueFromVirtualHost == null)
+ {
+ throw new IllegalArgumentException("Could not find queue with name '" + queueName + "' on virtual host '" + vhost.getName() + "'");
+ }
+ return queueFromVirtualHost;
+ }
+
+ private Queue getQueueFromVirtualHost(String queueName, VirtualHost<?,?,?> vhost)
+ {
+ Queue queue = null;
+
+ for(Queue<?> q : vhost.getQueues())
+ {
+
+ if(q.getName().equals(queueName))
+ {
+ queue = q;
+ break;
+ }
+ }
+ return queue;
+ }
+
+}
diff --git a/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/report/ReportRunnerTest.java b/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/report/ReportRunnerTest.java
new file mode 100644
index 0000000000..38432a26f4
--- /dev/null
+++ b/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/report/ReportRunnerTest.java
@@ -0,0 +1,186 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.management.plugin.report;
+
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.queue.QueueEntryVisitor;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class ReportRunnerTest extends QpidTestCase
+{
+ public void testTextReportCountsMessages()
+ {
+ ReportRunner<String> runner = (ReportRunner<String>) ReportRunner.createRunner(TestTextReport.NAME,
+ Collections.<String, String[]>emptyMap());
+ Queue queue = createMockQueue();
+ assertEquals("There are 0 messages on the queue.", runner.runReport(queue));
+
+ runner = (ReportRunner<String>) ReportRunner.createRunner(TestTextReport.NAME,
+ Collections.<String, String[]>emptyMap());
+ Queue queue1 = createMockQueue(mock(ServerMessage.class));
+ assertEquals("There are 1 messages on the queue.", runner.runReport(queue1));
+
+ runner = (ReportRunner<String>) ReportRunner.createRunner(TestTextReport.NAME,
+ Collections.<String, String[]>emptyMap());
+ Queue queue2 = createMockQueue(mock(ServerMessage.class), mock(ServerMessage.class));
+ assertEquals("There are 2 messages on the queue.", runner.runReport(queue2));
+ }
+
+ public void testTextReportSingleStringParam()
+ {
+ Queue queue2 = createMockQueue(mock(ServerMessage.class), mock(ServerMessage.class));
+
+ Map<String, String[]> parameterMap = new HashMap<>();
+ parameterMap.put("stringParam", new String[]{"hello world"});
+ ReportRunner<String> runner =
+ (ReportRunner<String>) ReportRunner.createRunner(TestTextReport.NAME, parameterMap);
+ assertEquals("There are 2 messages on the queue. stringParam = hello world.", runner.runReport(queue2));
+ }
+
+ public void testTextReportSingleStringArrayParam()
+ {
+ Queue queue = createMockQueue();
+
+ Map<String, String[]> parameterMap = new HashMap<>();
+ parameterMap.put("stringArrayParam", new String[] { "hello world", "goodbye"});
+ ReportRunner<String> runner = (ReportRunner<String>) ReportRunner.createRunner(TestTextReport.NAME, parameterMap);
+ assertEquals("There are 0 messages on the queue. stringArrayParam = [hello world, goodbye].", runner.runReport(queue));
+
+ }
+
+
+ public void testTextReportBothParams()
+ {
+ Queue queue = createMockQueue();
+
+ Map<String, String[]> parameterMap = new HashMap<>();
+ parameterMap.put("stringParam", new String[]{"hello world"});
+ parameterMap.put("stringArrayParam", new String[] { "hello world", "goodbye"});
+ ReportRunner<String> runner = (ReportRunner<String>) ReportRunner.createRunner(TestTextReport.NAME, parameterMap);
+ assertEquals("There are 0 messages on the queue. stringParam = hello world. stringArrayParam = [hello world, goodbye].", runner.runReport(queue));
+
+ }
+
+ public void testInvalidReportName()
+ {
+ try
+ {
+ ReportRunner.createRunner("unknown", Collections.<String, String[]>emptyMap());
+ fail("Unknown report name should throw exception");
+ }
+ catch(IllegalArgumentException e)
+ {
+ assertEquals("Unknown report: unknown", e.getMessage());
+ }
+ }
+
+ public void testBinaryReportWithLimit() throws Exception
+ {
+ Queue queue = createMockQueue(createMessageWithAppProperties(Collections.<String,Object>singletonMap("key",1)),
+ createMessageWithAppProperties(Collections.<String,Object>singletonMap("key",2)),
+ createMessageWithAppProperties(Collections.<String, Object>singletonMap("key", 3)),
+ createMessageWithAppProperties(Collections.<String, Object>singletonMap("key", 4)));
+ Map<String, String[]> parameterMap = new HashMap<>();
+ parameterMap.put("propertyName", new String[]{"key"});
+ parameterMap.put("limit", new String[] { "3" });
+
+ ReportRunner<byte[]> runner = (ReportRunner<byte[]>) ReportRunner.createRunner(TestBinaryReport.NAME, parameterMap);
+
+ ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+ ObjectOutputStream objects = new ObjectOutputStream(bytes);
+ objects.writeObject(Integer.valueOf(1));
+ objects.writeObject(Integer.valueOf(2));
+ objects.writeObject(Integer.valueOf(3));
+ objects.flush();
+ byte[] expected = bytes.toByteArray();
+ byte[] actual = runner.runReport(queue);
+ assertTrue("Output not as expected", Arrays.equals(expected, actual));
+ }
+
+ private ServerMessage<?> createMessageWithAppProperties(final Map<String,Object> props)
+ {
+ ServerMessage<?> message = mock(ServerMessage.class);
+ final AMQMessageHeader header = mock(AMQMessageHeader.class);
+ when(message.getMessageHeader()).thenReturn(header);
+ final ArgumentCaptor<String> headerNameCaptor = ArgumentCaptor.forClass(String.class);
+ when(header.getHeader(headerNameCaptor.capture())).thenAnswer(new Answer<Object>()
+ {
+ @Override
+ public Object answer(final InvocationOnMock invocation) throws Throwable
+ {
+ String header = headerNameCaptor.getValue();
+ return props.get(header);
+ }
+ });
+ when(header.getHeaderNames()).thenReturn(props.keySet());
+ return message;
+ }
+
+ private Queue createMockQueue(final ServerMessage<?>... messages)
+ {
+ final AMQQueue queue = mock(AMQQueue.class);
+ final ArgumentCaptor<QueueEntryVisitor> captor = ArgumentCaptor.forClass(QueueEntryVisitor.class);
+ doAnswer(new Answer()
+ {
+ @Override
+ public Object answer(final InvocationOnMock invocation) throws Throwable
+ {
+ QueueEntryVisitor visitor = captor.getValue();
+ for(ServerMessage<?> message : messages)
+ {
+ if(visitor.visit(makeEntry(queue, message)))
+ {
+ break;
+ }
+ }
+ return null;
+ }
+ }).when(queue).visit(captor.capture());
+ return queue;
+ }
+
+ private QueueEntry makeEntry(final AMQQueue queue, final ServerMessage<?> message)
+ {
+ QueueEntry entry = mock(QueueEntry.class);
+ when(entry.getQueue()).thenReturn(queue);
+ when(entry.getMessage()).thenReturn(message);
+ return entry;
+ }
+}
diff --git a/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/report/TestBinaryReport.java b/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/report/TestBinaryReport.java
new file mode 100644
index 0000000000..fc5e93631e
--- /dev/null
+++ b/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/report/TestBinaryReport.java
@@ -0,0 +1,114 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.management.plugin.report;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+
+public class TestBinaryReport extends QueueBinaryReport
+{
+
+
+ private int _limit;
+ private String _propertyName;
+ private int _count;
+ private final ByteArrayOutputStream _bytesOutputStream = new ByteArrayOutputStream();
+ private final ObjectOutputStream _objectOutputStream;
+ public static final String NAME = "testBinary";
+
+ public TestBinaryReport()
+ {
+ try
+ {
+ _objectOutputStream = new ObjectOutputStream(_bytesOutputStream);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ ;
+ }
+
+ @Override
+ public String getName()
+ {
+ return NAME;
+ }
+
+ @Override
+ public String getContentType()
+ {
+ return "application/octet-stream";
+ }
+
+ @Override
+ public void addMessage(final ReportableMessage reportableMessage)
+ {
+ if(_propertyName != null)
+ {
+ Object value = reportableMessage.getMessageHeader().getHeader(_propertyName);
+ if(value != null)
+ {
+ try
+ {
+ _objectOutputStream.writeObject(value);
+ }
+ catch (IOException e)
+ {
+ // ignore
+ }
+ }
+ }
+ _count++;
+ }
+
+ @Override
+ public boolean isComplete()
+ {
+ return _limit != 0 && _count >= _limit;
+ }
+
+ @Override
+ public byte[] getReport()
+ {
+ try
+ {
+ _objectOutputStream.flush();
+
+ return _bytesOutputStream.toByteArray();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void setLimit(final String limit)
+ {
+ _limit = Integer.parseInt(limit);
+ }
+
+ public void setPropertyName(final String propertyName)
+ {
+ this._propertyName = propertyName;
+ }
+}
diff --git a/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/report/TestTextReport.java b/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/report/TestTextReport.java
new file mode 100644
index 0000000000..7f9e1e2962
--- /dev/null
+++ b/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/report/TestTextReport.java
@@ -0,0 +1,84 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.management.plugin.report;
+
+import java.util.Arrays;
+
+public class TestTextReport extends QueueTextReport
+{
+ public static final String NAME = "testText";
+ private int _count;
+ private String _stringParam;
+ private String[] _stringArrayParam;
+
+ @Override
+ public String getName()
+ {
+ return NAME;
+ }
+
+ @Override
+ public String getContentType()
+ {
+ return "text/plain";
+ }
+
+ @Override
+ public void addMessage(final ReportableMessage reportableMessage)
+ {
+ _count++;
+ }
+
+ @Override
+ public boolean isComplete()
+ {
+ return false;
+ }
+
+ @Override
+ public String getReport()
+ {
+ StringBuilder result = new StringBuilder("There are " + _count + " messages on the queue.");
+ if(_stringParam != null)
+ {
+ result.append(" stringParam = " + _stringParam + ".");
+ }
+ if(_stringArrayParam != null)
+ {
+ result.append(" stringArrayParam = " + Arrays.asList(_stringArrayParam) + ".");
+ }
+ return result.toString();
+ }
+
+ @SuppressWarnings("unused")
+ public void setStringParam(final String value)
+ {
+ _stringParam = value;
+ }
+
+ @SuppressWarnings("unused")
+ public void setStringArrayParam(final String[] value)
+ {
+ _stringArrayParam = value;
+ }
+
+
+}
diff --git a/qpid/java/broker-plugins/management-http/src/test/resources/META-INF/services/org.apache.qpid.server.management.plugin.report.QueueReport b/qpid/java/broker-plugins/management-http/src/test/resources/META-INF/services/org.apache.qpid.server.management.plugin.report.QueueReport
new file mode 100644
index 0000000000..7d25ec4378
--- /dev/null
+++ b/qpid/java/broker-plugins/management-http/src/test/resources/META-INF/services/org.apache.qpid.server.management.plugin.report.QueueReport
@@ -0,0 +1,2 @@
+org.apache.qpid.server.management.plugin.report.TestTextReport
+org.apache.qpid.server.management.plugin.report.TestBinaryReport