diff options
Diffstat (limited to 'qpid/java/broker-plugins/management-http/src/main')
8 files changed, 831 insertions, 0 deletions
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java index 4e340c7b72..69920ff488 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java +++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java @@ -69,6 +69,7 @@ import org.apache.qpid.server.management.plugin.servlet.rest.LogoutServlet; import org.apache.qpid.server.management.plugin.servlet.rest.MessageContentServlet; import org.apache.qpid.server.management.plugin.servlet.rest.MessageServlet; import org.apache.qpid.server.management.plugin.servlet.rest.MetaDataServlet; +import org.apache.qpid.server.management.plugin.servlet.rest.QueueReportServlet; import org.apache.qpid.server.management.plugin.servlet.rest.RestServlet; import org.apache.qpid.server.management.plugin.servlet.rest.SaslServlet; import org.apache.qpid.server.management.plugin.servlet.rest.StructureServlet; @@ -304,6 +305,8 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem root.addServlet(new ServletHolder(new StructureServlet()), "/service/structure"); root.addServlet(new ServletHolder(new MessageServlet()), "/service/message/*"); root.addServlet(new ServletHolder(new MessageContentServlet()), "/service/message-content/*"); + root.addServlet(new ServletHolder(new QueueReportServlet()), "/service/queuereport/*"); + root.addServlet(new ServletHolder(new LogRecordsServlet()), "/service/logrecords"); diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/QueueBinaryReport.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/QueueBinaryReport.java new file mode 100644 index 0000000000..d842de3f1b --- /dev/null +++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/QueueBinaryReport.java @@ -0,0 +1,28 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.management.plugin.report; + +public abstract class QueueBinaryReport extends QueueReport<byte[]> +{ + public QueueBinaryReport() + { + } +} diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/QueueReport.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/QueueReport.java new file mode 100644 index 0000000000..23b24aaf8d --- /dev/null +++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/QueueReport.java @@ -0,0 +1,161 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.management.plugin.report; + +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.model.VirtualHost; + +/** + * <p> + * The QueueReport class provides an extension point for installations to provide custom management reporting on + * queues through the REST API. + * </p> + * + * <p> + * A custom QueueReport must extend either {@link org.apache.qpid.server.management.plugin.report.QueueTextReport} + * or {@link org.apache.qpid.server.management.plugin.report.QueueBinaryReport}. The report implementation must + * define a {@link #getName() name} which is unique amongst all installed reports. The report class must be present + * in the classpath of the broker, and a provider-configuration file named + * org.apache.qpid.server.management.plugin.report.QueueReport must be added in the resource directory + * META-INF/services directory with the binary name of the implementation (as described in + * {@link java.util.ServiceLoader ServiceLoader}). + * </p> + * + * <h3>Running reports</h3> + * <p> + * The report can be run using the URL: + * {@code http://<broker>/service/queuereport/<virtualhost name>/<queue name>/<report name>[?param1=x¶m2=y...]} + * </p> + * + * <h4>Report Parameters</h4> + * + * <p> + * Reports can take parameters from the query string of the HTTP request. For every parameter in the query string + * the system will look for a setter on the report object with either a String or String[] parameter. Thus if + * the query string contains {@code foo=bar}, then the system will look for a setter {@code setFoo(String value)} or + * {@code setFoo(String[] value)}. If the same parameter occurs multiple times in the query string then only the + * array variant of the setter will be called. + * </p> + * <p> + * Setters for the parameters are guaranteed to be called before the first message is added to the report. + * </p> + * + * <p> + * NOTE: In order to comply with the requirements of the {@link java.util.ServiceLoader ServiceLoader} api, all + * implementations of QueueReport MUST provide a public no-args constructor. + * </p> + * @param <T> + */ +public abstract class QueueReport<T> +{ + private Queue<?> _queue; + + QueueReport() + { + + } + + /** + * Gets the name of the report. + * <p> + * The name of the report must be unique amongst all installed implementations. The name of the report + * is examined by the Qpid immediately upon construction. The name should not change during + * the lifetime of the object (the value is only meaningful to the system at the time of initial construction) + * and all instances of the same concrete implementation should have the same name. + * </p> + * @return the name of the report + */ + public abstract String getName(); + + /** + * Get the name of the queue against which the report is being run. + * + * @return the name of the queue + */ + public final String getQueueName() + { + return _queue.getName(); + } + + final void setQueue(final Queue<?> queue) + { + _queue = queue; + } + + /** + * Get the name of the virtual host against which the report is being run. + * + * @return the name of the virtual host + */ + public final String getVirtualHostName() + { + return _queue.getParent(VirtualHost.class).getName(); + } + + /** + * + * The value returned by getContentType() will be used to set the Content-Type HTTP header field + * + * @return the value to use for the content-type HTTP header field + */ + public abstract String getContentType(); + + /** + * Called by the system to add a message to the report. + * + * <p> + * The method is called by the system for every message on the queue, or until {@link #isComplete()} returns true. + * </p> + * @param reportableMessage the message to add to the report + */ + public abstract void addMessage(final ReportableMessage reportableMessage); + + /** + * Informs the system if the report is complete (i.e. does not need to report on any more messages). + * + * <p> + * This method will be called by the system after each message is {@link #addMessage(ReportableMessage) added} + * to the report. If a report is only interested in some messages, and can determine that the addition of more + * messages will not vary the content of the report, then it can return true. + * </p> + * <p> + * If this method always returns false, then all messages from the queue will be added to the report. + * </p> + * <p> + * NOTE: Retrieving content or properties of the message may require it to be reloaded from disk, and so care + * should be taken by reports to only access properties/content of the message if it is going to be required + * for the report production. + * </p> + * + * @return true if the report does not want to report on any more messages in the queue + */ + public abstract boolean isComplete(); + + /** + * Called by the system to get the content of the report to retrun to the user. + * <p> + * The system guarantees to only call this method once + * </p> + * @return the report content. + */ + public abstract T getReport(); + +} diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/QueueTextReport.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/QueueTextReport.java new file mode 100644 index 0000000000..09bc5c4229 --- /dev/null +++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/QueueTextReport.java @@ -0,0 +1,28 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.management.plugin.report; + +public abstract class QueueTextReport extends QueueReport<String> +{ + public QueueTextReport() + { + } +} diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java new file mode 100644 index 0000000000..2a05cfc9a1 --- /dev/null +++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java @@ -0,0 +1,408 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.management.plugin.report; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.ServiceLoader; +import java.util.Set; +import java.util.UUID; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.queue.QueueEntryVisitor; + +public class ReportRunner<T> +{ + private static final Logger LOGGER = LoggerFactory.getLogger(ReportRunner.class); + + private static final Set<Class> IMMUTABLE_CLASSES = new HashSet<>(Arrays.<Class>asList( + Boolean.class, + Byte.class, + Short.class, + Character.class, + Integer.class, + Long.class, + Float.class, + Double.class, + UUID.class, + Date.class, + String.class + )); + + private ReportRunner(final QueueReport<T> report) + { + _report = report; + } + + public boolean isBinaryReport() + { + return _report instanceof QueueBinaryReport; + } + + public static ReportRunner<?> createRunner(final String reportName, final Map<String, String[]> parameterMap) + { + QueueReport<?> report = getReport(reportName); + setReportParameters(report, parameterMap); + return new ReportRunner<>(report); + } + + private static void setReportParameters(final QueueReport<?> report, final Map<String, String[]> parameterMap) + { + if(parameterMap != null && !parameterMap.isEmpty()) + { + Class<? extends QueueReport> clazz = report.getClass(); + for(Map.Entry<String,String[]> entry : parameterMap.entrySet()) + { + String key = entry.getKey(); + String[] value = entry.getValue(); + if(isValidName(key)) + { + + StringBuilder setterName = new StringBuilder("set"); + setterName.append(key.substring(0,1).toUpperCase()); + if(key.length()>1) + { + setterName.append(key.substring(1)); + } + Method method = null; + try + { + + if (value == null || value.length == 0 || value.length == 1) + { + try + { + method = clazz.getMethod(setterName.toString(), String.class); + method.invoke(report, value == null || value.length == 0 ? null : value[0]); + } + catch (NoSuchMethodException | IllegalAccessException e) + { + method = null; + } + } + if (method == null) + { + try + { + method = clazz.getMethod(setterName.toString(), String[].class); + method.invoke(report, new Object[] { value }); + } + catch (NoSuchMethodException | IllegalAccessException e) + { + LOGGER.info("Unknown parameter '" + + key + + "' (no setter) for report " + + report.getName()); + } + } + } + catch (InvocationTargetException e) + { + LOGGER.info("Error setting parameter '" + key + "' for report " + report.getName(), e); + } + } + else + { + LOGGER.info("Invalid parameter name '" + key + "' running report " + report.getName()); + } + } + } + } + + private static boolean isValidName(final String key) + { + if(key != null && key.length() != 0) + { + if(Character.isJavaIdentifierStart(key.charAt(0))) + { + for(int i = 1; i < key.length(); i++) + { + if(!Character.isJavaIdentifierPart(key.charAt(i))) + { + return false; + } + } + return true; + } + + } + return false; + + } + + private static QueueReport<?> getReport(final String reportName) + { + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + for (final QueueReport report : ServiceLoader.load(QueueReport.class, classLoader)) + { + if (report.getName().equals(reportName)) + { + try + { + return report.getClass().newInstance(); + } + catch (InstantiationException | IllegalAccessException e) + { + // can't happen as by definition must have public noargs constructor + } + } + } + throw new IllegalArgumentException("Unknown report: " + reportName); + } + + public String getContentType() + { + return _report.getContentType(); + } + + + private static class ReportVisitor implements QueueEntryVisitor + { + + private final QueueReport _report; + + public ReportVisitor(final QueueReport report) + { + _report = report; + } + + @Override + public boolean visit(final QueueEntry entry) + { + _report.addMessage(convertMessage(entry.getMessage())); + return _report.isComplete(); + } + + + } + + + private static ReportableMessage convertMessage(final ServerMessage message) + { + return new ReportableMessage() + { + @Override + public String getInitialRoutingAddress() + { + return message.getInitialRoutingAddress(); + } + + @Override + public ReportableMessageHeader getMessageHeader() + { + return convertMessageHeader(message.getMessageHeader()); + } + + @Override + public ByteBuffer getContent() + { + ByteBuffer content = message.getContent(0, (int) getSize()); + + return content.asReadOnlyBuffer(); + } + + @Override + public boolean isPersistent() + { + return message.isPersistent(); + } + + @Override + public long getSize() + { + return message.getSize(); + } + + @Override + public long getExpiration() + { + return message.getExpiration(); + } + + @Override + public long getMessageNumber() + { + return message.getMessageNumber(); + } + + @Override + public long getArrivalTime() + { + return message.getArrivalTime(); + } + }; + } + + private static ReportableMessageHeader convertMessageHeader(final AMQMessageHeader messageHeader) + { + return new ReportableMessageHeader() + { + @Override + public String getCorrelationId() + { + return messageHeader.getCorrelationId(); + } + + @Override + public long getExpiration() + { + return messageHeader.getExpiration(); + } + + @Override + public String getUserId() + { + return messageHeader.getUserId(); + } + + @Override + public String getAppId() + { + return messageHeader.getAppId(); + } + + @Override + public String getMessageId() + { + return messageHeader.getMessageId(); + } + + @Override + public String getMimeType() + { + return messageHeader.getMimeType(); + } + + @Override + public String getEncoding() + { + return messageHeader.getEncoding(); + } + + @Override + public byte getPriority() + { + return messageHeader.getPriority(); + } + + @Override + public long getTimestamp() + { + return messageHeader.getTimestamp(); + } + + @Override + public String getType() + { + return messageHeader.getType(); + } + + @Override + public String getReplyTo() + { + return messageHeader.getReplyTo(); + } + + @Override + public Object getHeader(final String name) + { + return makeImmutable(messageHeader.getHeader(name)); + } + + @Override + public boolean containsHeaders(final Set<String> names) + { + return messageHeader.containsHeaders(names); + } + + @Override + public boolean containsHeader(final String name) + { + return messageHeader.containsHeader(name); + } + + @Override + public Collection<String> getHeaderNames() + { + return Collections.unmodifiableCollection(messageHeader.getHeaderNames()); + } + }; + } + + private static Object makeImmutable(final Object value) + { + if(value == null || IMMUTABLE_CLASSES.contains(value.getClass())) + { + return value; + } + else if(value instanceof byte[]) + { + return ByteBuffer.wrap((byte[])value).asReadOnlyBuffer(); + } + else if(value instanceof List) + { + List orig = (List) value; + List<Object> copy = new ArrayList<>(orig.size()); + for(Object element : orig) + { + copy.add(makeImmutable(element)); + } + return copy; + } + else if(value instanceof Map) + { + Map<?,?> orig = (Map<?,?>) value; + LinkedHashMap<Object,Object> copy = new LinkedHashMap<>(); + for(Map.Entry<?,?> entry : orig.entrySet()) + { + copy.put(makeImmutable(entry.getKey()),makeImmutable(entry.getValue())); + } + return copy; + } + else return null; + } + + private final QueueReport<T> _report; + + public final T runReport(Queue<?> queue) + { + _report.setQueue(queue); + ReportVisitor visitor = new ReportVisitor(_report); + queue.visit(visitor); + return _report.getReport(); + } +} diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportableMessage.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportableMessage.java new file mode 100644 index 0000000000..00b6c4abeb --- /dev/null +++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportableMessage.java @@ -0,0 +1,42 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.management.plugin.report; + +import java.nio.ByteBuffer; + +public interface ReportableMessage +{ + String getInitialRoutingAddress(); + + ReportableMessageHeader getMessageHeader(); + + public ByteBuffer getContent(); + + boolean isPersistent(); + + long getSize(); + + long getExpiration(); + + long getMessageNumber(); + + long getArrivalTime(); +} diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportableMessageHeader.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportableMessageHeader.java new file mode 100644 index 0000000000..e78415f8d0 --- /dev/null +++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportableMessageHeader.java @@ -0,0 +1,58 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.management.plugin.report; + +import java.util.Collection; +import java.util.Set; + +public interface ReportableMessageHeader +{ + String getCorrelationId(); + + long getExpiration(); + + String getUserId(); + + String getAppId(); + + String getMessageId(); + + String getMimeType(); + + String getEncoding(); + + byte getPriority(); + + long getTimestamp(); + + String getType(); + + String getReplyTo(); + + Object getHeader(String name); + + boolean containsHeaders(Set<String> names); + + boolean containsHeader(String name); + + Collection<String> getHeaderNames(); + +} diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/QueueReportServlet.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/QueueReportServlet.java new file mode 100644 index 0000000000..2b3def2dab --- /dev/null +++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/QueueReportServlet.java @@ -0,0 +1,103 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.management.plugin.servlet.rest; + +import java.io.IOException; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.apache.qpid.server.management.plugin.report.ReportRunner; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.model.VirtualHost; + +public class QueueReportServlet extends AbstractServlet +{ + @Override + protected void doGetWithSubjectAndActor(HttpServletRequest request, HttpServletResponse response) throws + IOException, + ServletException + { + String[] pathInfoElements = getPathInfoElements(request); + if(pathInfoElements != null && pathInfoElements.length == 3) + { + Queue<?> queue = getQueueFromRequest(request); + ReportRunner<?> reportRunner = ReportRunner.createRunner(pathInfoElements[2],request.getParameterMap()); + Object output = reportRunner.runReport(queue); + response.setContentType(reportRunner.getContentType()); + if(reportRunner.isBinaryReport()) + { + response.getOutputStream().write((byte[])output); + } + else + { + response.getWriter().write((String)output); + } + } + else + { + throw new IllegalArgumentException("Invalid path is specified"); + } + + } + + private Queue<?> getQueueFromRequest(HttpServletRequest request) + { + String[] pathInfoElements = getPathInfoElements(request); + if(pathInfoElements == null || pathInfoElements.length < 2) + { + throw new IllegalArgumentException("Invalid path is specified"); + } + String vhostName = pathInfoElements[0]; + String queueName = pathInfoElements[1]; + + VirtualHost<?,?,?> vhost = getBroker().findVirtualHostByName(vhostName); + if (vhost == null) + { + throw new IllegalArgumentException("Could not find virtual host with name '" + vhostName + "'"); + } + + Queue queueFromVirtualHost = getQueueFromVirtualHost(queueName, vhost); + if (queueFromVirtualHost == null) + { + throw new IllegalArgumentException("Could not find queue with name '" + queueName + "' on virtual host '" + vhost.getName() + "'"); + } + return queueFromVirtualHost; + } + + private Queue getQueueFromVirtualHost(String queueName, VirtualHost<?,?,?> vhost) + { + Queue queue = null; + + for(Queue<?> q : vhost.getQueues()) + { + + if(q.getName().equals(queueName)) + { + queue = q; + break; + } + } + return queue; + } + +} |