diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2015-03-01 11:43:02 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2015-03-01 11:43:02 +0000 |
commit | 7dd86708f3542c03118ded5b8db74328cef71c84 (patch) | |
tree | 7febec6a59ae1b95499ba45a75004475d083bd77 | |
parent | 36d2c895515960b4fa4c864f2816963ad63bab2e (diff) | |
download | qpid-python-7dd86708f3542c03118ded5b8db74328cef71c84.tar.gz |
QPID-6423 : [Java Broker] Allow plugin of custom user queue reports using the REST API
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1663082 13f79535-47bb-0310-9956-ffa450edef68
12 files changed, 1217 insertions, 0 deletions
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java index 4e340c7b72..69920ff488 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java +++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java @@ -69,6 +69,7 @@ import org.apache.qpid.server.management.plugin.servlet.rest.LogoutServlet; import org.apache.qpid.server.management.plugin.servlet.rest.MessageContentServlet; import org.apache.qpid.server.management.plugin.servlet.rest.MessageServlet; import org.apache.qpid.server.management.plugin.servlet.rest.MetaDataServlet; +import org.apache.qpid.server.management.plugin.servlet.rest.QueueReportServlet; import org.apache.qpid.server.management.plugin.servlet.rest.RestServlet; import org.apache.qpid.server.management.plugin.servlet.rest.SaslServlet; import org.apache.qpid.server.management.plugin.servlet.rest.StructureServlet; @@ -304,6 +305,8 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem root.addServlet(new ServletHolder(new StructureServlet()), "/service/structure"); root.addServlet(new ServletHolder(new MessageServlet()), "/service/message/*"); root.addServlet(new ServletHolder(new MessageContentServlet()), "/service/message-content/*"); + root.addServlet(new ServletHolder(new QueueReportServlet()), "/service/queuereport/*"); + root.addServlet(new ServletHolder(new LogRecordsServlet()), "/service/logrecords"); diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/QueueBinaryReport.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/QueueBinaryReport.java new file mode 100644 index 0000000000..d842de3f1b --- /dev/null +++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/QueueBinaryReport.java @@ -0,0 +1,28 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.management.plugin.report; + +public abstract class QueueBinaryReport extends QueueReport<byte[]> +{ + public QueueBinaryReport() + { + } +} diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/QueueReport.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/QueueReport.java new file mode 100644 index 0000000000..23b24aaf8d --- /dev/null +++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/QueueReport.java @@ -0,0 +1,161 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.management.plugin.report; + +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.model.VirtualHost; + +/** + * <p> + * The QueueReport class provides an extension point for installations to provide custom management reporting on + * queues through the REST API. + * </p> + * + * <p> + * A custom QueueReport must extend either {@link org.apache.qpid.server.management.plugin.report.QueueTextReport} + * or {@link org.apache.qpid.server.management.plugin.report.QueueBinaryReport}. The report implementation must + * define a {@link #getName() name} which is unique amongst all installed reports. The report class must be present + * in the classpath of the broker, and a provider-configuration file named + * org.apache.qpid.server.management.plugin.report.QueueReport must be added in the resource directory + * META-INF/services directory with the binary name of the implementation (as described in + * {@link java.util.ServiceLoader ServiceLoader}). + * </p> + * + * <h3>Running reports</h3> + * <p> + * The report can be run using the URL: + * {@code http://<broker>/service/queuereport/<virtualhost name>/<queue name>/<report name>[?param1=x¶m2=y...]} + * </p> + * + * <h4>Report Parameters</h4> + * + * <p> + * Reports can take parameters from the query string of the HTTP request. For every parameter in the query string + * the system will look for a setter on the report object with either a String or String[] parameter. Thus if + * the query string contains {@code foo=bar}, then the system will look for a setter {@code setFoo(String value)} or + * {@code setFoo(String[] value)}. If the same parameter occurs multiple times in the query string then only the + * array variant of the setter will be called. + * </p> + * <p> + * Setters for the parameters are guaranteed to be called before the first message is added to the report. + * </p> + * + * <p> + * NOTE: In order to comply with the requirements of the {@link java.util.ServiceLoader ServiceLoader} api, all + * implementations of QueueReport MUST provide a public no-args constructor. + * </p> + * @param <T> + */ +public abstract class QueueReport<T> +{ + private Queue<?> _queue; + + QueueReport() + { + + } + + /** + * Gets the name of the report. + * <p> + * The name of the report must be unique amongst all installed implementations. The name of the report + * is examined by the Qpid immediately upon construction. The name should not change during + * the lifetime of the object (the value is only meaningful to the system at the time of initial construction) + * and all instances of the same concrete implementation should have the same name. + * </p> + * @return the name of the report + */ + public abstract String getName(); + + /** + * Get the name of the queue against which the report is being run. + * + * @return the name of the queue + */ + public final String getQueueName() + { + return _queue.getName(); + } + + final void setQueue(final Queue<?> queue) + { + _queue = queue; + } + + /** + * Get the name of the virtual host against which the report is being run. + * + * @return the name of the virtual host + */ + public final String getVirtualHostName() + { + return _queue.getParent(VirtualHost.class).getName(); + } + + /** + * + * The value returned by getContentType() will be used to set the Content-Type HTTP header field + * + * @return the value to use for the content-type HTTP header field + */ + public abstract String getContentType(); + + /** + * Called by the system to add a message to the report. + * + * <p> + * The method is called by the system for every message on the queue, or until {@link #isComplete()} returns true. + * </p> + * @param reportableMessage the message to add to the report + */ + public abstract void addMessage(final ReportableMessage reportableMessage); + + /** + * Informs the system if the report is complete (i.e. does not need to report on any more messages). + * + * <p> + * This method will be called by the system after each message is {@link #addMessage(ReportableMessage) added} + * to the report. If a report is only interested in some messages, and can determine that the addition of more + * messages will not vary the content of the report, then it can return true. + * </p> + * <p> + * If this method always returns false, then all messages from the queue will be added to the report. + * </p> + * <p> + * NOTE: Retrieving content or properties of the message may require it to be reloaded from disk, and so care + * should be taken by reports to only access properties/content of the message if it is going to be required + * for the report production. + * </p> + * + * @return true if the report does not want to report on any more messages in the queue + */ + public abstract boolean isComplete(); + + /** + * Called by the system to get the content of the report to retrun to the user. + * <p> + * The system guarantees to only call this method once + * </p> + * @return the report content. + */ + public abstract T getReport(); + +} diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/QueueTextReport.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/QueueTextReport.java new file mode 100644 index 0000000000..09bc5c4229 --- /dev/null +++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/QueueTextReport.java @@ -0,0 +1,28 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.management.plugin.report; + +public abstract class QueueTextReport extends QueueReport<String> +{ + public QueueTextReport() + { + } +} diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java new file mode 100644 index 0000000000..2a05cfc9a1 --- /dev/null +++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java @@ -0,0 +1,408 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.management.plugin.report; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.ServiceLoader; +import java.util.Set; +import java.util.UUID; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.queue.QueueEntryVisitor; + +public class ReportRunner<T> +{ + private static final Logger LOGGER = LoggerFactory.getLogger(ReportRunner.class); + + private static final Set<Class> IMMUTABLE_CLASSES = new HashSet<>(Arrays.<Class>asList( + Boolean.class, + Byte.class, + Short.class, + Character.class, + Integer.class, + Long.class, + Float.class, + Double.class, + UUID.class, + Date.class, + String.class + )); + + private ReportRunner(final QueueReport<T> report) + { + _report = report; + } + + public boolean isBinaryReport() + { + return _report instanceof QueueBinaryReport; + } + + public static ReportRunner<?> createRunner(final String reportName, final Map<String, String[]> parameterMap) + { + QueueReport<?> report = getReport(reportName); + setReportParameters(report, parameterMap); + return new ReportRunner<>(report); + } + + private static void setReportParameters(final QueueReport<?> report, final Map<String, String[]> parameterMap) + { + if(parameterMap != null && !parameterMap.isEmpty()) + { + Class<? extends QueueReport> clazz = report.getClass(); + for(Map.Entry<String,String[]> entry : parameterMap.entrySet()) + { + String key = entry.getKey(); + String[] value = entry.getValue(); + if(isValidName(key)) + { + + StringBuilder setterName = new StringBuilder("set"); + setterName.append(key.substring(0,1).toUpperCase()); + if(key.length()>1) + { + setterName.append(key.substring(1)); + } + Method method = null; + try + { + + if (value == null || value.length == 0 || value.length == 1) + { + try + { + method = clazz.getMethod(setterName.toString(), String.class); + method.invoke(report, value == null || value.length == 0 ? null : value[0]); + } + catch (NoSuchMethodException | IllegalAccessException e) + { + method = null; + } + } + if (method == null) + { + try + { + method = clazz.getMethod(setterName.toString(), String[].class); + method.invoke(report, new Object[] { value }); + } + catch (NoSuchMethodException | IllegalAccessException e) + { + LOGGER.info("Unknown parameter '" + + key + + "' (no setter) for report " + + report.getName()); + } + } + } + catch (InvocationTargetException e) + { + LOGGER.info("Error setting parameter '" + key + "' for report " + report.getName(), e); + } + } + else + { + LOGGER.info("Invalid parameter name '" + key + "' running report " + report.getName()); + } + } + } + } + + private static boolean isValidName(final String key) + { + if(key != null && key.length() != 0) + { + if(Character.isJavaIdentifierStart(key.charAt(0))) + { + for(int i = 1; i < key.length(); i++) + { + if(!Character.isJavaIdentifierPart(key.charAt(i))) + { + return false; + } + } + return true; + } + + } + return false; + + } + + private static QueueReport<?> getReport(final String reportName) + { + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + for (final QueueReport report : ServiceLoader.load(QueueReport.class, classLoader)) + { + if (report.getName().equals(reportName)) + { + try + { + return report.getClass().newInstance(); + } + catch (InstantiationException | IllegalAccessException e) + { + // can't happen as by definition must have public noargs constructor + } + } + } + throw new IllegalArgumentException("Unknown report: " + reportName); + } + + public String getContentType() + { + return _report.getContentType(); + } + + + private static class ReportVisitor implements QueueEntryVisitor + { + + private final QueueReport _report; + + public ReportVisitor(final QueueReport report) + { + _report = report; + } + + @Override + public boolean visit(final QueueEntry entry) + { + _report.addMessage(convertMessage(entry.getMessage())); + return _report.isComplete(); + } + + + } + + + private static ReportableMessage convertMessage(final ServerMessage message) + { + return new ReportableMessage() + { + @Override + public String getInitialRoutingAddress() + { + return message.getInitialRoutingAddress(); + } + + @Override + public ReportableMessageHeader getMessageHeader() + { + return convertMessageHeader(message.getMessageHeader()); + } + + @Override + public ByteBuffer getContent() + { + ByteBuffer content = message.getContent(0, (int) getSize()); + + return content.asReadOnlyBuffer(); + } + + @Override + public boolean isPersistent() + { + return message.isPersistent(); + } + + @Override + public long getSize() + { + return message.getSize(); + } + + @Override + public long getExpiration() + { + return message.getExpiration(); + } + + @Override + public long getMessageNumber() + { + return message.getMessageNumber(); + } + + @Override + public long getArrivalTime() + { + return message.getArrivalTime(); + } + }; + } + + private static ReportableMessageHeader convertMessageHeader(final AMQMessageHeader messageHeader) + { + return new ReportableMessageHeader() + { + @Override + public String getCorrelationId() + { + return messageHeader.getCorrelationId(); + } + + @Override + public long getExpiration() + { + return messageHeader.getExpiration(); + } + + @Override + public String getUserId() + { + return messageHeader.getUserId(); + } + + @Override + public String getAppId() + { + return messageHeader.getAppId(); + } + + @Override + public String getMessageId() + { + return messageHeader.getMessageId(); + } + + @Override + public String getMimeType() + { + return messageHeader.getMimeType(); + } + + @Override + public String getEncoding() + { + return messageHeader.getEncoding(); + } + + @Override + public byte getPriority() + { + return messageHeader.getPriority(); + } + + @Override + public long getTimestamp() + { + return messageHeader.getTimestamp(); + } + + @Override + public String getType() + { + return messageHeader.getType(); + } + + @Override + public String getReplyTo() + { + return messageHeader.getReplyTo(); + } + + @Override + public Object getHeader(final String name) + { + return makeImmutable(messageHeader.getHeader(name)); + } + + @Override + public boolean containsHeaders(final Set<String> names) + { + return messageHeader.containsHeaders(names); + } + + @Override + public boolean containsHeader(final String name) + { + return messageHeader.containsHeader(name); + } + + @Override + public Collection<String> getHeaderNames() + { + return Collections.unmodifiableCollection(messageHeader.getHeaderNames()); + } + }; + } + + private static Object makeImmutable(final Object value) + { + if(value == null || IMMUTABLE_CLASSES.contains(value.getClass())) + { + return value; + } + else if(value instanceof byte[]) + { + return ByteBuffer.wrap((byte[])value).asReadOnlyBuffer(); + } + else if(value instanceof List) + { + List orig = (List) value; + List<Object> copy = new ArrayList<>(orig.size()); + for(Object element : orig) + { + copy.add(makeImmutable(element)); + } + return copy; + } + else if(value instanceof Map) + { + Map<?,?> orig = (Map<?,?>) value; + LinkedHashMap<Object,Object> copy = new LinkedHashMap<>(); + for(Map.Entry<?,?> entry : orig.entrySet()) + { + copy.put(makeImmutable(entry.getKey()),makeImmutable(entry.getValue())); + } + return copy; + } + else return null; + } + + private final QueueReport<T> _report; + + public final T runReport(Queue<?> queue) + { + _report.setQueue(queue); + ReportVisitor visitor = new ReportVisitor(_report); + queue.visit(visitor); + return _report.getReport(); + } +} diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportableMessage.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportableMessage.java new file mode 100644 index 0000000000..00b6c4abeb --- /dev/null +++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportableMessage.java @@ -0,0 +1,42 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.management.plugin.report; + +import java.nio.ByteBuffer; + +public interface ReportableMessage +{ + String getInitialRoutingAddress(); + + ReportableMessageHeader getMessageHeader(); + + public ByteBuffer getContent(); + + boolean isPersistent(); + + long getSize(); + + long getExpiration(); + + long getMessageNumber(); + + long getArrivalTime(); +} diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportableMessageHeader.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportableMessageHeader.java new file mode 100644 index 0000000000..e78415f8d0 --- /dev/null +++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportableMessageHeader.java @@ -0,0 +1,58 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.management.plugin.report; + +import java.util.Collection; +import java.util.Set; + +public interface ReportableMessageHeader +{ + String getCorrelationId(); + + long getExpiration(); + + String getUserId(); + + String getAppId(); + + String getMessageId(); + + String getMimeType(); + + String getEncoding(); + + byte getPriority(); + + long getTimestamp(); + + String getType(); + + String getReplyTo(); + + Object getHeader(String name); + + boolean containsHeaders(Set<String> names); + + boolean containsHeader(String name); + + Collection<String> getHeaderNames(); + +} diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/QueueReportServlet.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/QueueReportServlet.java new file mode 100644 index 0000000000..2b3def2dab --- /dev/null +++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/QueueReportServlet.java @@ -0,0 +1,103 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.management.plugin.servlet.rest; + +import java.io.IOException; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.apache.qpid.server.management.plugin.report.ReportRunner; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.model.VirtualHost; + +public class QueueReportServlet extends AbstractServlet +{ + @Override + protected void doGetWithSubjectAndActor(HttpServletRequest request, HttpServletResponse response) throws + IOException, + ServletException + { + String[] pathInfoElements = getPathInfoElements(request); + if(pathInfoElements != null && pathInfoElements.length == 3) + { + Queue<?> queue = getQueueFromRequest(request); + ReportRunner<?> reportRunner = ReportRunner.createRunner(pathInfoElements[2],request.getParameterMap()); + Object output = reportRunner.runReport(queue); + response.setContentType(reportRunner.getContentType()); + if(reportRunner.isBinaryReport()) + { + response.getOutputStream().write((byte[])output); + } + else + { + response.getWriter().write((String)output); + } + } + else + { + throw new IllegalArgumentException("Invalid path is specified"); + } + + } + + private Queue<?> getQueueFromRequest(HttpServletRequest request) + { + String[] pathInfoElements = getPathInfoElements(request); + if(pathInfoElements == null || pathInfoElements.length < 2) + { + throw new IllegalArgumentException("Invalid path is specified"); + } + String vhostName = pathInfoElements[0]; + String queueName = pathInfoElements[1]; + + VirtualHost<?,?,?> vhost = getBroker().findVirtualHostByName(vhostName); + if (vhost == null) + { + throw new IllegalArgumentException("Could not find virtual host with name '" + vhostName + "'"); + } + + Queue queueFromVirtualHost = getQueueFromVirtualHost(queueName, vhost); + if (queueFromVirtualHost == null) + { + throw new IllegalArgumentException("Could not find queue with name '" + queueName + "' on virtual host '" + vhost.getName() + "'"); + } + return queueFromVirtualHost; + } + + private Queue getQueueFromVirtualHost(String queueName, VirtualHost<?,?,?> vhost) + { + Queue queue = null; + + for(Queue<?> q : vhost.getQueues()) + { + + if(q.getName().equals(queueName)) + { + queue = q; + break; + } + } + return queue; + } + +} diff --git a/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/report/ReportRunnerTest.java b/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/report/ReportRunnerTest.java new file mode 100644 index 0000000000..38432a26f4 --- /dev/null +++ b/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/report/ReportRunnerTest.java @@ -0,0 +1,186 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.management.plugin.report; + +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.ByteArrayOutputStream; +import java.io.ObjectOutputStream; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.mockito.ArgumentCaptor; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.queue.QueueEntryVisitor; +import org.apache.qpid.test.utils.QpidTestCase; + +public class ReportRunnerTest extends QpidTestCase +{ + public void testTextReportCountsMessages() + { + ReportRunner<String> runner = (ReportRunner<String>) ReportRunner.createRunner(TestTextReport.NAME, + Collections.<String, String[]>emptyMap()); + Queue queue = createMockQueue(); + assertEquals("There are 0 messages on the queue.", runner.runReport(queue)); + + runner = (ReportRunner<String>) ReportRunner.createRunner(TestTextReport.NAME, + Collections.<String, String[]>emptyMap()); + Queue queue1 = createMockQueue(mock(ServerMessage.class)); + assertEquals("There are 1 messages on the queue.", runner.runReport(queue1)); + + runner = (ReportRunner<String>) ReportRunner.createRunner(TestTextReport.NAME, + Collections.<String, String[]>emptyMap()); + Queue queue2 = createMockQueue(mock(ServerMessage.class), mock(ServerMessage.class)); + assertEquals("There are 2 messages on the queue.", runner.runReport(queue2)); + } + + public void testTextReportSingleStringParam() + { + Queue queue2 = createMockQueue(mock(ServerMessage.class), mock(ServerMessage.class)); + + Map<String, String[]> parameterMap = new HashMap<>(); + parameterMap.put("stringParam", new String[]{"hello world"}); + ReportRunner<String> runner = + (ReportRunner<String>) ReportRunner.createRunner(TestTextReport.NAME, parameterMap); + assertEquals("There are 2 messages on the queue. stringParam = hello world.", runner.runReport(queue2)); + } + + public void testTextReportSingleStringArrayParam() + { + Queue queue = createMockQueue(); + + Map<String, String[]> parameterMap = new HashMap<>(); + parameterMap.put("stringArrayParam", new String[] { "hello world", "goodbye"}); + ReportRunner<String> runner = (ReportRunner<String>) ReportRunner.createRunner(TestTextReport.NAME, parameterMap); + assertEquals("There are 0 messages on the queue. stringArrayParam = [hello world, goodbye].", runner.runReport(queue)); + + } + + + public void testTextReportBothParams() + { + Queue queue = createMockQueue(); + + Map<String, String[]> parameterMap = new HashMap<>(); + parameterMap.put("stringParam", new String[]{"hello world"}); + parameterMap.put("stringArrayParam", new String[] { "hello world", "goodbye"}); + ReportRunner<String> runner = (ReportRunner<String>) ReportRunner.createRunner(TestTextReport.NAME, parameterMap); + assertEquals("There are 0 messages on the queue. stringParam = hello world. stringArrayParam = [hello world, goodbye].", runner.runReport(queue)); + + } + + public void testInvalidReportName() + { + try + { + ReportRunner.createRunner("unknown", Collections.<String, String[]>emptyMap()); + fail("Unknown report name should throw exception"); + } + catch(IllegalArgumentException e) + { + assertEquals("Unknown report: unknown", e.getMessage()); + } + } + + public void testBinaryReportWithLimit() throws Exception + { + Queue queue = createMockQueue(createMessageWithAppProperties(Collections.<String,Object>singletonMap("key",1)), + createMessageWithAppProperties(Collections.<String,Object>singletonMap("key",2)), + createMessageWithAppProperties(Collections.<String, Object>singletonMap("key", 3)), + createMessageWithAppProperties(Collections.<String, Object>singletonMap("key", 4))); + Map<String, String[]> parameterMap = new HashMap<>(); + parameterMap.put("propertyName", new String[]{"key"}); + parameterMap.put("limit", new String[] { "3" }); + + ReportRunner<byte[]> runner = (ReportRunner<byte[]>) ReportRunner.createRunner(TestBinaryReport.NAME, parameterMap); + + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + ObjectOutputStream objects = new ObjectOutputStream(bytes); + objects.writeObject(Integer.valueOf(1)); + objects.writeObject(Integer.valueOf(2)); + objects.writeObject(Integer.valueOf(3)); + objects.flush(); + byte[] expected = bytes.toByteArray(); + byte[] actual = runner.runReport(queue); + assertTrue("Output not as expected", Arrays.equals(expected, actual)); + } + + private ServerMessage<?> createMessageWithAppProperties(final Map<String,Object> props) + { + ServerMessage<?> message = mock(ServerMessage.class); + final AMQMessageHeader header = mock(AMQMessageHeader.class); + when(message.getMessageHeader()).thenReturn(header); + final ArgumentCaptor<String> headerNameCaptor = ArgumentCaptor.forClass(String.class); + when(header.getHeader(headerNameCaptor.capture())).thenAnswer(new Answer<Object>() + { + @Override + public Object answer(final InvocationOnMock invocation) throws Throwable + { + String header = headerNameCaptor.getValue(); + return props.get(header); + } + }); + when(header.getHeaderNames()).thenReturn(props.keySet()); + return message; + } + + private Queue createMockQueue(final ServerMessage<?>... messages) + { + final AMQQueue queue = mock(AMQQueue.class); + final ArgumentCaptor<QueueEntryVisitor> captor = ArgumentCaptor.forClass(QueueEntryVisitor.class); + doAnswer(new Answer() + { + @Override + public Object answer(final InvocationOnMock invocation) throws Throwable + { + QueueEntryVisitor visitor = captor.getValue(); + for(ServerMessage<?> message : messages) + { + if(visitor.visit(makeEntry(queue, message))) + { + break; + } + } + return null; + } + }).when(queue).visit(captor.capture()); + return queue; + } + + private QueueEntry makeEntry(final AMQQueue queue, final ServerMessage<?> message) + { + QueueEntry entry = mock(QueueEntry.class); + when(entry.getQueue()).thenReturn(queue); + when(entry.getMessage()).thenReturn(message); + return entry; + } +} diff --git a/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/report/TestBinaryReport.java b/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/report/TestBinaryReport.java new file mode 100644 index 0000000000..fc5e93631e --- /dev/null +++ b/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/report/TestBinaryReport.java @@ -0,0 +1,114 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.management.plugin.report; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; + +public class TestBinaryReport extends QueueBinaryReport +{ + + + private int _limit; + private String _propertyName; + private int _count; + private final ByteArrayOutputStream _bytesOutputStream = new ByteArrayOutputStream(); + private final ObjectOutputStream _objectOutputStream; + public static final String NAME = "testBinary"; + + public TestBinaryReport() + { + try + { + _objectOutputStream = new ObjectOutputStream(_bytesOutputStream); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + ; + } + + @Override + public String getName() + { + return NAME; + } + + @Override + public String getContentType() + { + return "application/octet-stream"; + } + + @Override + public void addMessage(final ReportableMessage reportableMessage) + { + if(_propertyName != null) + { + Object value = reportableMessage.getMessageHeader().getHeader(_propertyName); + if(value != null) + { + try + { + _objectOutputStream.writeObject(value); + } + catch (IOException e) + { + // ignore + } + } + } + _count++; + } + + @Override + public boolean isComplete() + { + return _limit != 0 && _count >= _limit; + } + + @Override + public byte[] getReport() + { + try + { + _objectOutputStream.flush(); + + return _bytesOutputStream.toByteArray(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + + public void setLimit(final String limit) + { + _limit = Integer.parseInt(limit); + } + + public void setPropertyName(final String propertyName) + { + this._propertyName = propertyName; + } +} diff --git a/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/report/TestTextReport.java b/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/report/TestTextReport.java new file mode 100644 index 0000000000..7f9e1e2962 --- /dev/null +++ b/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/report/TestTextReport.java @@ -0,0 +1,84 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.management.plugin.report; + +import java.util.Arrays; + +public class TestTextReport extends QueueTextReport +{ + public static final String NAME = "testText"; + private int _count; + private String _stringParam; + private String[] _stringArrayParam; + + @Override + public String getName() + { + return NAME; + } + + @Override + public String getContentType() + { + return "text/plain"; + } + + @Override + public void addMessage(final ReportableMessage reportableMessage) + { + _count++; + } + + @Override + public boolean isComplete() + { + return false; + } + + @Override + public String getReport() + { + StringBuilder result = new StringBuilder("There are " + _count + " messages on the queue."); + if(_stringParam != null) + { + result.append(" stringParam = " + _stringParam + "."); + } + if(_stringArrayParam != null) + { + result.append(" stringArrayParam = " + Arrays.asList(_stringArrayParam) + "."); + } + return result.toString(); + } + + @SuppressWarnings("unused") + public void setStringParam(final String value) + { + _stringParam = value; + } + + @SuppressWarnings("unused") + public void setStringArrayParam(final String[] value) + { + _stringArrayParam = value; + } + + +} diff --git a/qpid/java/broker-plugins/management-http/src/test/resources/META-INF/services/org.apache.qpid.server.management.plugin.report.QueueReport b/qpid/java/broker-plugins/management-http/src/test/resources/META-INF/services/org.apache.qpid.server.management.plugin.report.QueueReport new file mode 100644 index 0000000000..7d25ec4378 --- /dev/null +++ b/qpid/java/broker-plugins/management-http/src/test/resources/META-INF/services/org.apache.qpid.server.management.plugin.report.QueueReport @@ -0,0 +1,2 @@ +org.apache.qpid.server.management.plugin.report.TestTextReport +org.apache.qpid.server.management.plugin.report.TestBinaryReport |