summaryrefslogtreecommitdiff
path: root/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java')
-rw-r--r--java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java502
1 files changed, 502 insertions, 0 deletions
diff --git a/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java b/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java
new file mode 100644
index 0000000000..6e7bc1d935
--- /dev/null
+++ b/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.server.management.plugin.servlet.rest;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.queue.QueueEntryVisitor;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.security.access.Operation;
+import org.apache.qpid.server.subscription.Subscription;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
+
+public class MessageServlet extends AbstractServlet
+{
+ private static final Logger LOGGER = Logger.getLogger(MessageServlet.class);
+
+ public MessageServlet()
+ {
+ super();
+ }
+
+ public MessageServlet(Broker broker)
+ {
+ super(broker);
+ }
+
+ @Override
+ protected void onGet(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
+ {
+
+ if(request.getPathInfo() != null && request.getPathInfo().length()>0 && request.getPathInfo().substring(1).split("/").length > 2)
+ {
+ getMessageContent(request, response);
+ }
+ else
+ {
+ getMessageList(request, response);
+ }
+
+ }
+
+ private void getMessageContent(HttpServletRequest request, HttpServletResponse response) throws IOException
+ {
+ Queue queue = getQueueFromRequest(request);
+ String path[] = request.getPathInfo().substring(1).split("/");
+ MessageFinder messageFinder = new MessageFinder(Long.parseLong(path[2]));
+ queue.visit(messageFinder);
+
+ response.setStatus(HttpServletResponse.SC_OK);
+
+ response.setHeader("Cache-Control","no-cache");
+ response.setHeader("Pragma","no-cache");
+ response.setDateHeader ("Expires", 0);
+
+ final PrintWriter writer = response.getWriter();
+ ObjectMapper mapper = new ObjectMapper();
+ mapper.configure(SerializationConfig.Feature.INDENT_OUTPUT, true);
+ mapper.writeValue(writer, messageFinder.getMessageObject());
+ }
+
+ private void getMessageList(HttpServletRequest request, HttpServletResponse response) throws IOException
+ {
+ Queue queue = getQueueFromRequest(request);
+
+ int first = -1;
+ int last = -1;
+ String range = request.getHeader("Range");
+ if(range != null)
+ {
+ String[] boundaries = range.split("=")[1].split("-");
+ first = Integer.parseInt(boundaries[0]);
+ last = Integer.parseInt(boundaries[1]);
+ }
+ final MessageCollector messageCollector = new MessageCollector(first, last);
+ queue.visit(messageCollector);
+
+ response.setContentType("application/json");
+ final List<Map<String, Object>> messages = messageCollector.getMessages();
+ int queueSize = ((Number) queue.getStatistics().getStatistic(Queue.QUEUE_DEPTH_MESSAGES)).intValue();
+ String min = messages.isEmpty() ? "0" : messages.get(0).get("position").toString();
+ String max = messages.isEmpty() ? "0" : messages.get(messages.size()-1).get("position").toString();
+ response.setHeader("Content-Range", (min + "-" + max + "/" + queueSize));
+ response.setStatus(HttpServletResponse.SC_OK);
+
+ response.setHeader("Cache-Control","no-cache");
+ response.setHeader("Pragma","no-cache");
+ response.setDateHeader ("Expires", 0);
+
+ final PrintWriter writer = response.getWriter();
+ ObjectMapper mapper = new ObjectMapper();
+ mapper.configure(SerializationConfig.Feature.INDENT_OUTPUT, true);
+ mapper.writeValue(writer, messages);
+ }
+
+ private Queue getQueueFromRequest(HttpServletRequest request)
+ {
+ List<String> names = new ArrayList<String>();
+ // TODO - validation that there is a vhost and queue and only those in the path
+ if(request.getPathInfo() != null && request.getPathInfo().length()>0)
+ {
+ String path = request.getPathInfo().substring(1);
+ names.addAll(Arrays.asList(path.split("/")));
+ }
+ String vhostName = names.get(0);
+ String queueName = names.get(1);
+
+ VirtualHost vhost = null;
+
+ for(VirtualHost vh : getBroker().getVirtualHosts())
+ {
+ if(vh.getName().equals(vhostName))
+ {
+ vhost = vh;
+ break;
+ }
+ }
+
+ return getQueueFromVirtualHost(queueName, vhost);
+ }
+
+ private Queue getQueueFromVirtualHost(String queueName, VirtualHost vhost)
+ {
+ Queue queue = null;
+
+ for(Queue q : vhost.getQueues())
+ {
+
+ if(q.getName().equals(queueName))
+ {
+ queue = q;
+ break;
+ }
+ }
+ return queue;
+ }
+
+ private abstract static class QueueEntryTransaction implements VirtualHost.TransactionalOperation
+ {
+ private final Queue _sourceQueue;
+ private final List _messageIds;
+
+ protected QueueEntryTransaction(Queue sourceQueue, List messageIds)
+ {
+ _sourceQueue = sourceQueue;
+ _messageIds = messageIds;
+ }
+
+
+ public void withinTransaction(final VirtualHost.Transaction txn)
+ {
+
+ _sourceQueue.visit(new QueueEntryVisitor()
+ {
+
+ public boolean visit(final QueueEntry entry)
+ {
+ final ServerMessage message = entry.getMessage();
+ if(message != null)
+ {
+ final long messageId = message.getMessageNumber();
+ if (_messageIds.remove(messageId) || (messageId <= (long) Integer.MAX_VALUE
+ && _messageIds.remove(Integer.valueOf((int)messageId))))
+ {
+ updateEntry(entry, txn);
+ }
+ }
+ return _messageIds.isEmpty();
+ }
+ });
+ }
+
+
+ protected abstract void updateEntry(QueueEntry entry, VirtualHost.Transaction txn);
+ }
+
+ private static class MoveTransaction extends QueueEntryTransaction
+ {
+ private final Queue _destinationQueue;
+
+ public MoveTransaction(Queue sourceQueue, List<Long> messageIds, Queue destinationQueue)
+ {
+ super(sourceQueue, messageIds);
+ _destinationQueue = destinationQueue;
+ }
+
+ protected void updateEntry(QueueEntry entry, VirtualHost.Transaction txn)
+ {
+ txn.move(entry, _destinationQueue);
+ }
+ }
+
+ private static class CopyTransaction extends QueueEntryTransaction
+ {
+ private final Queue _destinationQueue;
+
+ public CopyTransaction(Queue sourceQueue, List<Long> messageIds, Queue destinationQueue)
+ {
+ super(sourceQueue, messageIds);
+ _destinationQueue = destinationQueue;
+ }
+
+ protected void updateEntry(QueueEntry entry, VirtualHost.Transaction txn)
+ {
+ txn.copy(entry, _destinationQueue);
+ }
+ }
+
+ private static class DeleteTransaction extends QueueEntryTransaction
+ {
+ public DeleteTransaction(Queue sourceQueue, List<Long> messageIds)
+ {
+ super(sourceQueue, messageIds);
+ }
+
+ protected void updateEntry(QueueEntry entry, VirtualHost.Transaction txn)
+ {
+ txn.dequeue(entry);
+ }
+ }
+
+
+
+ private class MessageCollector implements QueueEntryVisitor
+ {
+ private final int _first;
+ private final int _last;
+ private int _position = -1;
+ private final List<Map<String, Object>> _messages = new ArrayList<Map<String, Object>>();
+
+ private MessageCollector(int first, int last)
+ {
+ _first = first;
+ _last = last;
+ }
+
+
+ public boolean visit(QueueEntry entry)
+ {
+
+ _position++;
+ if((_first == -1 || _position >= _first) && (_last == -1 || _position <= _last))
+ {
+ final Map<String, Object> messageObject = convertToObject(entry, false);
+ messageObject.put("position", _position);
+ _messages.add(messageObject);
+ }
+ return _last != -1 && _position > _last;
+ }
+
+ public List<Map<String, Object>> getMessages()
+ {
+ return _messages;
+ }
+ }
+
+
+ private class MessageFinder implements QueueEntryVisitor
+ {
+ private final long _messageNumber;
+ private Map<String, Object> _messageObject;
+
+ private MessageFinder(long messageNumber)
+ {
+ _messageNumber = messageNumber;
+ }
+
+
+ public boolean visit(QueueEntry entry)
+ {
+ ServerMessage message = entry.getMessage();
+ if(message != null)
+ {
+ if(_messageNumber == message.getMessageNumber())
+ {
+ MessageReference reference = message.newReference();
+ _messageObject = convertToObject(entry, true);
+ reference.release();
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public Map<String, Object> getMessageObject()
+ {
+ return _messageObject;
+ }
+ }
+
+ private Map<String, Object> convertToObject(QueueEntry entry, boolean includeContent)
+ {
+ Map<String, Object> object = new LinkedHashMap<String, Object>();
+ object.put("size", entry.getSize());
+ object.put("deliveryCount", entry.getDeliveryCount());
+ object.put("state",entry.isAvailable()
+ ? "Available"
+ : entry.isAcquired()
+ ? "Acquired"
+ : "");
+ final Subscription deliveredSubscription = entry.getDeliveredSubscription();
+ object.put("deliveredTo", deliveredSubscription == null ? null : deliveredSubscription.getSubscriptionID());
+ ServerMessage message = entry.getMessage();
+
+ if(message != null)
+ {
+ convertMessageProperties(object, message);
+ if(includeContent)
+ {
+ convertMessageHeaders(object, message);
+ }
+ }
+
+ return object;
+ }
+
+ private void convertMessageProperties(Map<String, Object> object, ServerMessage message)
+ {
+ object.put("id", message.getMessageNumber());
+ object.put("arrivalTime",message.getArrivalTime());
+ object.put("persistent", message.isPersistent());
+
+ final AMQMessageHeader messageHeader = message.getMessageHeader();
+ if(messageHeader != null)
+ {
+ addIfPresent(object, "messageId", messageHeader.getMessageId());
+ addIfPresent(object, "expirationTime", messageHeader.getExpiration());
+ addIfPresent(object, "applicationId", messageHeader.getAppId());
+ addIfPresent(object, "correlationId", messageHeader.getCorrelationId());
+ addIfPresent(object, "encoding", messageHeader.getEncoding());
+ addIfPresent(object, "mimeType", messageHeader.getMimeType());
+ addIfPresent(object, "priority", messageHeader.getPriority());
+ addIfPresent(object, "replyTo", messageHeader.getReplyTo());
+ addIfPresent(object, "timestamp", messageHeader.getTimestamp());
+ addIfPresent(object, "type", messageHeader.getType());
+ addIfPresent(object, "userId", messageHeader.getUserId());
+ }
+
+ }
+
+ private void addIfPresent(Map<String, Object> object, String name, Object property)
+ {
+ if(property != null)
+ {
+ object.put(name, property);
+ }
+ }
+
+ private void convertMessageHeaders(Map<String, Object> object, ServerMessage message)
+ {
+ final AMQMessageHeader messageHeader = message.getMessageHeader();
+ if(messageHeader != null)
+ {
+ Map<String, Object> headers = new HashMap<String,Object>();
+ for(String headerName : messageHeader.getHeaderNames())
+ {
+ headers.put(headerName, messageHeader.getHeader(headerName));
+ }
+ object.put("headers", headers);
+ }
+ }
+
+ /*
+ * POST moves or copies messages to the given queue from a queue specified in the posted JSON data
+ */
+ @Override
+ protected void onPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
+ {
+
+ try
+ {
+ final Queue sourceQueue = getQueueFromRequest(request);
+
+ ObjectMapper mapper = new ObjectMapper();
+
+ @SuppressWarnings("unchecked")
+ Map<String,Object> providedObject = mapper.readValue(request.getInputStream(), LinkedHashMap.class);
+
+ String destQueueName = (String) providedObject.get("destinationQueue");
+ Boolean move = (Boolean) providedObject.get("move");
+
+ final VirtualHost vhost = sourceQueue.getParent(VirtualHost.class);
+
+ boolean isMoveTransaction = move != null && Boolean.valueOf(move);
+
+ // FIXME: added temporary authorization check until we introduce management layer
+ // and review current ACL rules to have common rules for all management interfaces
+ String methodName = isMoveTransaction? "moveMessages":"copyMessages";
+ if (isQueueUpdateMethodAuthorized(methodName, vhost.getName()))
+ {
+ final Queue destinationQueue = getQueueFromVirtualHost(destQueueName, vhost);
+ final List messageIds = new ArrayList((List) providedObject.get("messages"));
+ QueueEntryTransaction txn =
+ isMoveTransaction
+ ? new MoveTransaction(sourceQueue, messageIds, destinationQueue)
+ : new CopyTransaction(sourceQueue, messageIds, destinationQueue);
+ vhost.executeTransaction(txn);
+ response.setStatus(HttpServletResponse.SC_OK);
+ }
+ else
+ {
+ response.setStatus(HttpServletResponse.SC_UNAUTHORIZED);
+ }
+ }
+ catch(RuntimeException e)
+ {
+ LOGGER.error("Failure to perform message opertion", e);
+ response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+ }
+ }
+
+
+ /*
+ * DELETE removes messages from the queue
+ */
+ @Override
+ protected void onDelete(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
+ {
+
+ final Queue sourceQueue = getQueueFromRequest(request);
+
+ final VirtualHost vhost = sourceQueue.getParent(VirtualHost.class);
+
+
+ final List<Long> messageIds = new ArrayList<Long>();
+ for(String idStr : request.getParameterValues("id"))
+ {
+ messageIds.add(Long.valueOf(idStr));
+ }
+
+ // FIXME: added temporary authorization check until we introduce management layer
+ // and review current ACL rules to have common rules for all management interfaces
+ if (isQueueUpdateMethodAuthorized("deleteMessages", vhost.getName()))
+ {
+ vhost.executeTransaction(new DeleteTransaction(sourceQueue, messageIds));
+ response.setStatus(HttpServletResponse.SC_OK);
+ }
+ else
+ {
+ response.setStatus(HttpServletResponse.SC_UNAUTHORIZED);
+ }
+
+ }
+
+ private boolean isQueueUpdateMethodAuthorized(String methodName, String virtualHost)
+ {
+ SecurityManager securityManager = getSecurityManager(virtualHost);
+ return securityManager.authoriseMethod(Operation.UPDATE, "VirtualHost.Queue", methodName);
+ }
+
+ private SecurityManager getSecurityManager(String virtualHost)
+ {
+ IApplicationRegistry appRegistry = ApplicationRegistry.getInstance();
+ SecurityManager security;
+ if (virtualHost == null)
+ {
+ security = appRegistry.getSecurityManager();
+ }
+ else
+ {
+ security = appRegistry.getVirtualHostRegistry().getVirtualHost(virtualHost).getSecurityManager();
+ }
+ return security;
+ }
+
+}