summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2006-12-18 13:09:22 +0000
committerMartin Ritchie <ritchiem@apache.org>2006-12-18 13:09:22 +0000
commite787eba0d8b10f61e39fa7a6d8a6f12a00977d36 (patch)
treebd8d2071178861af03a0d72aeb2d7fd4e08b39c8
parent67175fa993f364f8debf0fd8cc3b15633a022bcf (diff)
downloadqpid-python-e787eba0d8b10f61e39fa7a6d8a6f12a00977d36.tar.gz
FilterManagerFactory/JMSSelectorFilter.java - Added throw AMQInvalidSelectorException to be passed back to BasicConsumeMethodHandler.java and then to the Client.
Addition of ConcurrentSelectorDeliveryManager.java. This uses the additional methods on Subscription (hasFilters, hasInterest) to implement selectors. Trunk Merges git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/jmsselectors@488264 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/bin/qpid-server.bat2
-rw-r--r--java/broker/pom.xml6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java34
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java45
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java30
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java44
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java11
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java13
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java22
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java163
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java259
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java241
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java439
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java343
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java352
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java1
20 files changed, 1451 insertions, 577 deletions
diff --git a/java/broker/bin/qpid-server.bat b/java/broker/bin/qpid-server.bat
index ad3a20f459..b839cc72d8 100644
--- a/java/broker/bin/qpid-server.bat
+++ b/java/broker/bin/qpid-server.bat
@@ -63,6 +63,6 @@ goto loop
:runCommand
set LAUNCH_JAR=%QPID_HOME%\lib\qpid-incubating.jar
set MODULE_JARS=%QPID_MODULE_JARS%
-"%JAVA_HOME%"\bin\java -server -Xmx1024m -DQPID_HOME="%QPID_HOME%" -cp "%LAUNCH_JAR%;%MODULE_JARS%" org.apache.qpid.server.Main %QPID_ARGS%
+"%JAVA_HOME%\bin\java" -server -Xmx1024m -DQPID_HOME="%QPID_HOME%" -cp "%LAUNCH_JAR%;%MODULE_JARS%" org.apache.qpid.server.Main %QPID_ARGS%
:end
diff --git a/java/broker/pom.xml b/java/broker/pom.xml
index f6dd171214..5f4c490fd4 100644
--- a/java/broker/pom.xml
+++ b/java/broker/pom.xml
@@ -34,6 +34,7 @@
<properties>
<topDirectoryLocation>..</topDirectoryLocation>
+ <amqj.logging.level>warn</amqj.logging.level>
</properties>
<dependencies>
@@ -53,7 +54,7 @@
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
</dependency>
- <dependency>
+ <dependency>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-jms_1.1_spec</artifactId>
</dependency>
@@ -85,7 +86,6 @@
<build>
<plugins>
-
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>javacc-maven-plugin</artifactId>
@@ -116,7 +116,7 @@
</property>
<property>
<name>amqj.logging.level</name>
- <value>WARN</value>
+ <value>${amqj.logging.level}</value>
</property>
<property>
<name>log4j.configuration</name>
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
index 6dc97f9e48..ffd0e5d8ae 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -197,4 +197,34 @@ public class DestNameExchange extends AbstractExchange
}
}
}
+
+ public boolean isBound(String routingKey, AMQQueue queue) throws AMQException
+ {
+ final List<AMQQueue> queues = _index.get(routingKey);
+ return queues != null && queues.contains(queue);
+ }
+
+ public boolean isBound(String routingKey) throws AMQException
+ {
+ final List<AMQQueue> queues = _index.get(routingKey);
+ return queues != null && !queues.isEmpty();
+ }
+
+ public boolean isBound(AMQQueue queue) throws AMQException
+ {
+ Map<String, List<AMQQueue>> bindings = _index.getBindingsMap();
+ for (List<AMQQueue> queues : bindings.values())
+ {
+ if (queues.contains(queue))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public boolean hasBindings() throws AMQException
+ {
+ return !_index.getBindingsMap().isEmpty();
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
index a692f9ebca..139307488e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -126,10 +126,11 @@ public class DestWildExchange extends AbstractExchange
} // End of MBean class
- public void registerQueue(String routingKey, AMQQueue queue, FieldTable args) throws AMQException
+ public synchronized void registerQueue(String routingKey, AMQQueue queue, FieldTable args) throws AMQException
{
assert queue != null;
assert routingKey != null;
+ _logger.debug("Registering queue " + queue.getName() + " with routing key " + routingKey);
// we need to use putIfAbsent, which is an atomic operation, to avoid a race condition
List<AMQQueue> queueList = _routingKey2queues.putIfAbsent(routingKey, new CopyOnWriteArrayList<AMQQueue>());
// if we got null back, no previous value was associated with the specified routing key hence
@@ -159,6 +160,8 @@ public class DestWildExchange extends AbstractExchange
// TODO: add support for the immediate flag
if (queues == null)
{
+ _logger.warn("No queues found for routing key " + routingKey);
+ _logger.warn("Routing map contains: " + _routingKey2queues);
//todo Check for valid topic - mritchie
return;
}
@@ -172,7 +175,37 @@ public class DestWildExchange extends AbstractExchange
}
}
- public void deregisterQueue(String routingKey, AMQQueue queue) throws AMQException
+ public boolean isBound(String routingKey, AMQQueue queue) throws AMQException
+ {
+ List<AMQQueue> queues = _routingKey2queues.get(routingKey);
+ return queues != null && queues.contains(queue);
+ }
+
+
+ public boolean isBound(String routingKey) throws AMQException
+ {
+ List<AMQQueue> queues = _routingKey2queues.get(routingKey);
+ return queues != null && !queues.isEmpty();
+ }
+
+ public boolean isBound(AMQQueue queue) throws AMQException
+ {
+ for (List<AMQQueue> queues : _routingKey2queues.values())
+ {
+ if (queues.contains(queue))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public boolean hasBindings() throws AMQException
+ {
+ return !_routingKey2queues.isEmpty();
+ }
+
+ public synchronized void deregisterQueue(String routingKey, AMQQueue queue) throws AMQException
{
assert queue != null;
assert routingKey != null;
@@ -190,6 +223,10 @@ public class DestWildExchange extends AbstractExchange
throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() +
" with routing key " + routingKey);
}
+ if (queues.isEmpty())
+ {
+ _routingKey2queues.remove(queues);
+ }
}
protected ExchangeMBean createMBean() throws AMQException
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
index 787d0eddfd..824e85dc5c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -47,4 +47,30 @@ public interface Exchange
void deregisterQueue(String routingKey, AMQQueue queue) throws AMQException;
void route(AMQMessage message) throws AMQException;
+
+ /**
+ * Determines whether a message would be isBound to a particular queue using a specific routing key
+ * @param routingKey
+ * @param queue
+ * @return
+ * @throws AMQException
+ */
+ boolean isBound(String routingKey, AMQQueue queue) throws AMQException;
+
+ /**
+ * Determines whether a message is routing to any queue using a specific routing key
+ * @param routingKey
+ * @return
+ * @throws AMQException
+ */
+ boolean isBound(String routingKey) throws AMQException;
+
+ boolean isBound(AMQQueue queue) throws AMQException;
+
+ /**
+ * Returns true if this exchange has at least one binding associated with it.
+ * @return
+ * @throws AMQException
+ */
+ boolean hasBindings() throws AMQException;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
index ccb2211a55..8c4df68dea 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -206,10 +206,48 @@ public class HeadersExchange extends AbstractExchange
}
if (!delivered)
{
- _logger.warn("Exchange " + getName() + ": message not routable.");
+
+ String msg = "Exchange " + getName() + ": message not routable.";
+
+ if (payload.getPublishBody().mandatory)
+ {
+ throw new NoRouteException(msg, payload);
+ }
+ else
+ {
+ _logger.warn(msg);
+ }
+
}
}
+ public boolean isBound(String routingKey, AMQQueue queue) throws AMQException
+ {
+ return isBound(queue);
+ }
+
+ public boolean isBound(String routingKey) throws AMQException
+ {
+ return hasBindings();
+ }
+
+ public boolean isBound(AMQQueue queue) throws AMQException
+ {
+ for (Registration r : _bindings)
+ {
+ if (r.queue.equals(queue))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public boolean hasBindings() throws AMQException
+ {
+ return !_bindings.isEmpty();
+ }
+
protected Map getHeaders(ContentHeaderBody contentHeaderFrame)
{
//what if the content type is not 'basic'? 'file' and 'stream' content classes also define headers,
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java
index fb48729c9e..485c4739bd 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,6 +24,7 @@ import org.apache.qpid.server.queue.AMQQueue;
import java.util.List;
import java.util.Map;
+import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -37,7 +38,7 @@ class Index
private ConcurrentMap<String, List<AMQQueue>> _index
= new ConcurrentHashMap<String, List<AMQQueue>>();
- boolean add(String key, AMQQueue queue)
+ synchronized boolean add(String key, AMQQueue queue)
{
List<AMQQueue> queues = _index.get(key);
if(queues == null)
@@ -61,7 +62,7 @@ class Index
}
}
- boolean remove(String key, AMQQueue queue)
+ synchronized boolean remove(String key, AMQQueue queue)
{
List<AMQQueue> queues = _index.get(key);
if (queues != null)
@@ -83,6 +84,6 @@ class Index
Map<String, List<AMQQueue>> getBindingsMap()
{
- return _index;
+ return new HashMap<String, List<AMQQueue>>(_index);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java b/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java
index 5c106de6f2..23e7355f17 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.filter;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.AMQException;
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
@@ -35,7 +36,7 @@ public class FilterManagerFactory
//fixme move to a common class so it can be refered to from client code.
private static String JMS_SELECTOR_FILTER = "x-filter-jms-selector";
- public static FilterManager createManager(FieldTable filters)
+ public static FilterManager createManager(FieldTable filters) throws AMQException
{
FilterManager manager = null;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java b/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
index e54fdb944d..86caf171fb 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
@@ -23,6 +23,9 @@ package org.apache.qpid.server.filter;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.filter.jms.selector.SelectorParser;
import org.apache.qpid.server.message.jms.JMSMessage;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInvalidSelectorException;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.log4j.Logger;
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
@@ -33,13 +36,13 @@ import javax.jms.JMSException;
public class JMSSelectorFilter implements MessageFilter
{
- private final static Logger _logger = org.apache.log4j.Logger.getLogger(JMSSelectorFilter.class);
+ private final static Logger _logger = org.apache.log4j.Logger.getLogger(JMSSelectorFilter.class);
// LoggerFactory.getLogger(JMSSelectorFilter.class);
private String _selector;
private BooleanExpression _matcher;
- public JMSSelectorFilter(String selector)
+ public JMSSelectorFilter(String selector) throws AMQException
{
_selector = selector;
_logger.info("Created JMSSelectorFilter with selector:" + _selector);
@@ -53,8 +56,8 @@ public class JMSSelectorFilter implements MessageFilter
catch (InvalidSelectorException e)
{
// fixme
- // Will have to throw this back to the client... in the future
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ // Is this the correct way of throwing exception
+ throw new AMQInvalidSelectorException(e.getMessage());
}
}
@@ -64,7 +67,7 @@ public class JMSSelectorFilter implements MessageFilter
try
{
boolean match = _matcher.matches(message);
- _logger.info(message + " match(" + match + ") selector:" + _selector);
+ _logger.info(message + " match(" + match + ") selector(" + System.identityHashCode(_selector) + "):" + _selector);
return match;
}
catch (JMSException e)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
index c4c995540d..bf282020ee 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
@@ -21,10 +21,12 @@
package org.apache.qpid.server.handler;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInvalidSelectorException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.framing.BasicConsumeBody;
import org.apache.qpid.framing.BasicConsumeOkBody;
import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.ChannelCloseBody;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.ConsumerTagNotUniqueException;
import org.apache.qpid.server.exchange.ExchangeRegistry;
@@ -32,6 +34,7 @@ import org.apache.qpid.server.protocol.AMQMethodEvent;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.log4j.Logger;
@@ -68,14 +71,14 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
{
AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : queueRegistry.getQueue(body.queue);
- if(queue == null)
+ if (queue == null)
{
_log.info("No queue for '" + body.queue + "'");
}
try
{
- String consumerTag = channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck, body.filter);
- if(!body.nowait)
+ String consumerTag = channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck, body.arguments);
+ if (!body.nowait)
{
session.writeFrame(BasicConsumeOkBody.createAMQFrame(channelId, consumerTag));
}
@@ -83,10 +86,19 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
//now allow queue to start async processing of any backlog of messages
queue.deliverAsync();
}
- catch(ConsumerTagNotUniqueException e)
+ catch (AMQInvalidSelectorException ise)
+ {
+ _log.info("Closing connection due to invalid selector");
+ session.writeFrame(ChannelCloseBody.createAMQFrame(channelId, AMQConstant.INVALID_SELECTOR.getCode(),
+ ise.getMessage(), BasicConsumeBody.CLASS_ID,
+ BasicConsumeBody.METHOD_ID));
+ }
+ catch (ConsumerTagNotUniqueException e)
{
String msg = "Non-unique consumer tag, '" + body.consumerTag + "'";
- session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId, AMQConstant.NOT_ALLOWED.getCode(), msg, BasicConsumeBody.CLASS_ID, BasicConsumeBody.METHOD_ID));
+ session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId, AMQConstant.NOT_ALLOWED.getCode(), msg,
+ BasicConsumeBody.CLASS_ID,
+ BasicConsumeBody.METHOD_ID));
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
new file mode 100644
index 0000000000..5aaf78d6b7
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
@@ -0,0 +1,163 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.ExchangeBoundBody;
+import org.apache.qpid.framing.ExchangeBoundOkBody;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQMethodEvent;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+
+/**
+ * @author Apache Software Foundation
+ */
+public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBoundBody>
+{
+ private static final ExchangeBoundHandler _instance = new ExchangeBoundHandler();
+
+ public static final int OK = 0;
+
+ public static final int EXCHANGE_NOT_FOUND = 1;
+
+ public static final int QUEUE_NOT_FOUND = 2;
+
+ public static final int NO_BINDINGS = 3;
+
+ public static final int QUEUE_NOT_BOUND = 4;
+
+ public static final int NO_QUEUE_BOUND_WITH_RK = 5;
+
+ public static final int SPECIFIC_QUEUE_NOT_BOUND_WITH_RK = 6;
+
+ public static ExchangeBoundHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private ExchangeBoundHandler()
+ {
+ }
+
+ public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
+ ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+ AMQMethodEvent<ExchangeBoundBody> evt) throws AMQException
+ {
+ ExchangeBoundBody body = evt.getMethod();
+
+ String exchangeName = body.exchange;
+ String queueName = body.queue;
+ String routingKey = body.routingKey;
+ if (exchangeName == null)
+ {
+ throw new AMQException("Exchange exchange must not be null");
+ }
+ Exchange exchange = exchangeRegistry.getExchange(exchangeName);
+ AMQFrame response;
+ if (exchange == null)
+ {
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), EXCHANGE_NOT_FOUND,
+ "Exchange " + exchangeName + " not found");
+ }
+ else if (routingKey == null)
+ {
+ if (queueName == null)
+ {
+ if (exchange.hasBindings())
+ {
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK, null);
+ }
+ else
+ {
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), NO_BINDINGS, null);
+ }
+ }
+ else
+ {
+ AMQQueue queue = queueRegistry.getQueue(queueName);
+ if (queue == null)
+ {
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), QUEUE_NOT_FOUND,
+ "Queue " + queueName + " not found");
+ }
+ else
+ {
+ if (exchange.isBound(queue))
+ {
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK, null);
+ }
+ else
+ {
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), QUEUE_NOT_BOUND,
+ "Queue " + queueName + " not bound to exchange " +
+ exchangeName);
+ }
+ }
+ }
+ }
+ else if (queueName != null)
+ {
+ AMQQueue queue = queueRegistry.getQueue(queueName);
+ if (queue == null)
+ {
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), QUEUE_NOT_FOUND,
+ "Queue " + queueName + " not found");
+ }
+ else
+ {
+ if (exchange.isBound(body.routingKey, queue))
+ {
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK,
+ null);
+ }
+ else
+ {
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+ SPECIFIC_QUEUE_NOT_BOUND_WITH_RK,
+ "Queue " + queueName +
+ " not bound with routing key " +
+ body.routingKey + " to exchange " +
+ exchangeName);
+ }
+ }
+ }
+ else
+ {
+ if (exchange.isBound(body.routingKey))
+ {
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK,
+ null);
+ }
+ else
+ {
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+ NO_QUEUE_BOUND_WITH_RK,
+ "No queue bound with routing key " +
+ body.routingKey + " to exchange " +
+ exchangeName);
+ }
+ }
+ protocolSession.writeFrame(response);
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index 1aa62dbfa4..c38f7f630b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -40,9 +40,6 @@ import org.apache.qpid.framing.ProtocolVersionList;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.management.AMQManagedObject;
-import org.apache.qpid.server.management.MBeanConstructor;
-import org.apache.qpid.server.management.MBeanDescription;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.queue.QueueRegistry;
@@ -50,24 +47,12 @@ import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import javax.management.JMException;
-import javax.management.MBeanException;
-import javax.management.MBeanNotificationInfo;
-import javax.management.Notification;
-import javax.management.monitor.MonitorNotification;
-import javax.management.openmbean.CompositeData;
-import javax.management.openmbean.CompositeDataSupport;
-import javax.management.openmbean.CompositeType;
-import javax.management.openmbean.OpenDataException;
-import javax.management.openmbean.OpenType;
-import javax.management.openmbean.SimpleType;
-import javax.management.openmbean.TabularData;
-import javax.management.openmbean.TabularDataSupport;
-import javax.management.openmbean.TabularType;
import javax.security.sasl.SaslServer;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
-import java.util.Date;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArraySet;
@@ -93,7 +78,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
private AMQCodecFactory _codecFactory;
- private ManagedAMQProtocolSession _managedObject;
+ private AMQProtocolSessionMBean _managedObject;
private SaslServer _saslServer;
@@ -102,11 +87,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
private Object _lastSent;
private boolean _closed;
-
+ // maximum number of channels this session should have
private long _maxNoOfChannels = 1000;
/* AMQP Version for this session */
-
private byte _major;
private byte _minor;
@@ -115,190 +99,6 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
return _managedObject;
}
- /**
- * This class implements the management interface (is an MBean). In order to
- * make more attributes, operations and notifications available over JMX simply
- * augment the ManagedConnection interface and add the appropriate implementation here.
- */
- @MBeanDescription("Management Bean for an AMQ Broker Connection")
- private final class ManagedAMQProtocolSession extends AMQManagedObject implements ManagedConnection
- {
- private String _name = null;
- //openmbean data types for representing the channel attributes
- private String[] _channelAtttibuteNames = { "Channel Id", "Transactional", "Default Queue", "Unacknowledged Message Count"};
- private String[] _indexNames = {_channelAtttibuteNames[0]};
- private OpenType[] _channelAttributeTypes = {SimpleType.INTEGER, SimpleType.BOOLEAN, SimpleType.STRING, SimpleType.INTEGER};
- private CompositeType _channelType = null; // represents the data type for channel data
- private TabularType _channelsType = null; // Data type for list of channels type
- private TabularDataSupport _channelsList = null;
-
- @MBeanConstructor("Creates an MBean exposing an AMQ Broker Connection")
- public ManagedAMQProtocolSession() throws JMException
- {
- super(ManagedConnection.class, ManagedConnection.TYPE);
- init();
- }
-
- /**
- * initialises the openmbean data types
- */
- private void init() throws OpenDataException
- {
- String remote = getRemoteAddress();
- remote = "anonymous".equals(remote) ? remote + hashCode() : remote;
- _name = jmxEncode(new StringBuffer(remote), 0).toString();
- _channelType = new CompositeType("Channel", "Channel Details", _channelAtttibuteNames,
- _channelAtttibuteNames, _channelAttributeTypes);
- _channelsType = new TabularType("Channels", "Channels", _channelType, _indexNames);
- }
-
- public Date getLastIoTime()
- {
- return new Date(_minaProtocolSession.getLastIoTime());
- }
-
- public String getRemoteAddress()
- {
- return _minaProtocolSession.getRemoteAddress().toString();
- }
-
- public Long getWrittenBytes()
- {
- return _minaProtocolSession.getWrittenBytes();
- }
-
- public Long getReadBytes()
- {
- return _minaProtocolSession.getReadBytes();
- }
-
- public Long getMaximumNumberOfChannels()
- {
- return _maxNoOfChannels;
- }
-
- public void setMaximumNumberOfChannels(Long value)
- {
- _maxNoOfChannels = value;
- }
-
- public String getObjectInstanceName()
- {
- return _name;
- }
-
- public void commitTransactions(int channelId) throws JMException
- {
- try
- {
- AMQChannel channel = _channelMap.get(channelId);
- if (channel == null)
- {
- throw new JMException("The channel (channel Id = " + channelId + ") does not exist");
- }
- if (channel.isTransactional())
- {
- channel.commit();
- }
- }
- catch(AMQException ex)
- {
- throw new MBeanException(ex, ex.toString());
- }
- }
-
- public void rollbackTransactions(int channelId) throws JMException
- {
- try
- {
- AMQChannel channel = _channelMap.get(channelId);
- if (channel == null)
- {
- throw new JMException("The channel (channel Id = " + channelId + ") does not exist");
- }
- if (channel.isTransactional())
- {
- channel.rollback();
- }
- }
- catch(AMQException ex)
- {
- throw new MBeanException(ex, ex.toString());
- }
- }
-
- /**
- * Creates the list of channels in tabular form from the _channelMap.
- * @return list of channels in tabular form.
- * @throws OpenDataException
- */
- public TabularData channels() throws OpenDataException
- {
- _channelsList = new TabularDataSupport(_channelsType);
-
- for (Map.Entry<Integer, AMQChannel> entry : _channelMap.entrySet())
- {
- AMQChannel channel = entry.getValue();
- Object[] itemValues = {channel.getChannelId(), channel.isTransactional(),
- (channel.getDefaultQueue() != null) ? channel.getDefaultQueue().getName() : null,
- channel.getUnacknowledgedMessageMap().size()};
-
- CompositeData channelData = new CompositeDataSupport(_channelType, _channelAtttibuteNames, itemValues);
- _channelsList.put(channelData);
- }
-
- return _channelsList;
- }
-
- public void closeChannel(int id) throws Exception
- {
- try
- {
- AMQMinaProtocolSession.this.closeChannel(id);
- }
- catch (AMQException ex)
- {
- throw new Exception(ex.toString());
- }
- }
-
- public void closeConnection() throws Exception
- {
- try
- {
- AMQMinaProtocolSession.this.closeSession();
- }
- catch (AMQException ex)
- {
- throw new Exception(ex.toString());
- }
- }
-
- @Override
- public MBeanNotificationInfo[] getNotificationInfo()
- {
- String[] notificationTypes = new String[] {MonitorNotification.THRESHOLD_VALUE_EXCEEDED};
- String name = MonitorNotification.class.getName();
- String description = "Channel count has reached threshold value";
- MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description);
-
- return new MBeanNotificationInfo[] {info1};
- }
-
- private void checkForNotification()
- {
- int channelsCount = _channelMap.size();
- if (channelsCount >= getMaximumNumberOfChannels())
- {
- Notification n = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this,
- ++_notificationSequenceNumber, System.currentTimeMillis(),
- "Channel count (" + channelsCount + ") has reached the threshold value");
-
- _broadcaster.sendNotification(n);
- }
- }
-
- } // End of MBean class
public AMQMinaProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry,
AMQCodecFactory codecFactory)
@@ -322,11 +122,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
_managedObject.register();
}
- private ManagedAMQProtocolSession createMBean() throws AMQException
+ private AMQProtocolSessionMBean createMBean() throws AMQException
{
try
{
- return new ManagedAMQProtocolSession();
+ return new AMQProtocolSessionMBean(this);
}
catch(JMException ex)
{
@@ -335,6 +135,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
}
}
+ public IoSession getIOSession()
+ {
+ return _minaProtocolSession;
+ }
+
public static AMQProtocolSession getAMQProtocolSession(IoSession minaProtocolSession)
{
return (AMQProtocolSession) minaProtocolSession.getAttachment();
@@ -495,6 +300,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
_contextKey = contextKey;
}
+ public List<AMQChannel> getChannels()
+ {
+ return new ArrayList<AMQChannel>(_channelMap.values());
+ }
+
public AMQChannel getChannel(int channelId) throws AMQException
{
return _channelMap.get(channelId);
@@ -503,7 +313,42 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
public void addChannel(AMQChannel channel)
{
_channelMap.put(channel.getChannelId(), channel);
- _managedObject.checkForNotification();
+ checkForNotification();
+ }
+
+ private void checkForNotification()
+ {
+ int channelsCount = _channelMap.size();
+ if (channelsCount >= _maxNoOfChannels)
+ {
+ _managedObject.notifyClients("Channel count (" + channelsCount + ") has reached the threshold value");
+ }
+ }
+
+ public Long getMaximumNumberOfChannels()
+ {
+ return _maxNoOfChannels;
+ }
+
+ public void setMaximumNumberOfChannels(Long value)
+ {
+ _maxNoOfChannels = value;
+ }
+
+ public void commitTransactions(AMQChannel channel) throws AMQException
+ {
+ if (channel != null && channel.isTransactional())
+ {
+ channel.commit();
+ }
+ }
+
+ public void rollbackTransactions(AMQChannel channel) throws AMQException
+ {
+ if (channel != null && channel.isTransactional())
+ {
+ channel.rollback();
+ }
}
/**
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
new file mode 100644
index 0000000000..a47d462810
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
@@ -0,0 +1,241 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.management.AMQManagedObject;
+import org.apache.qpid.server.management.MBeanConstructor;
+import org.apache.qpid.server.management.MBeanDescription;
+
+import javax.management.JMException;
+import javax.management.MBeanException;
+import javax.management.MBeanNotificationInfo;
+import javax.management.Notification;
+import javax.management.monitor.MonitorNotification;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.OpenType;
+import javax.management.openmbean.SimpleType;
+import javax.management.openmbean.TabularData;
+import javax.management.openmbean.TabularDataSupport;
+import javax.management.openmbean.TabularType;
+import java.util.Date;
+import java.util.List;
+
+/**
+ * This MBean class implements the management interface. In order to make more attributes, operations and notifications
+ * available over JMX simply augment the ManagedConnection interface and add the appropriate implementation here.
+ */
+@MBeanDescription("Management Bean for an AMQ Broker Connection")
+public class AMQProtocolSessionMBean extends AMQManagedObject implements ManagedConnection
+{
+ private AMQMinaProtocolSession _session = null;
+ private String _name = null;
+ //openmbean data types for representing the channel attributes
+ private String[] _channelAtttibuteNames = {"Channel Id", "Transactional", "Default Queue", "Unacknowledged Message Count"};
+ private String[] _indexNames = {_channelAtttibuteNames[0]};
+ private OpenType[] _channelAttributeTypes = {SimpleType.INTEGER, SimpleType.BOOLEAN, SimpleType.STRING, SimpleType.INTEGER};
+ private CompositeType _channelType = null; // represents the data type for channel data
+ private TabularType _channelsType = null; // Data type for list of channels type
+
+ @MBeanConstructor("Creates an MBean exposing an AMQ Broker Connection")
+ public AMQProtocolSessionMBean(AMQMinaProtocolSession session) throws JMException
+ {
+ super(ManagedConnection.class, ManagedConnection.TYPE);
+ _session = session;
+ init();
+ }
+
+ /**
+ * initialises the openmbean data types
+ */
+ private void init() throws OpenDataException
+ {
+ String remote = getRemoteAddress();
+ remote = "anonymous".equals(remote) ? remote + hashCode() : remote;
+ _name = jmxEncode(new StringBuffer(remote), 0).toString();
+ _channelType = new CompositeType("Channel", "Channel Details", _channelAtttibuteNames,
+ _channelAtttibuteNames, _channelAttributeTypes);
+ _channelsType = new TabularType("Channels", "Channels", _channelType, _indexNames);
+ }
+
+ public Date getLastIoTime()
+ {
+ return new Date(_session.getIOSession().getLastIoTime());
+ }
+
+ public String getRemoteAddress()
+ {
+ return _session.getIOSession().getRemoteAddress().toString();
+ }
+
+ public Long getWrittenBytes()
+ {
+ return _session.getIOSession().getWrittenBytes();
+ }
+
+ public Long getReadBytes()
+ {
+ return _session.getIOSession().getReadBytes();
+ }
+
+ public Long getMaximumNumberOfChannels()
+ {
+ return _session.getMaximumNumberOfChannels();
+ }
+
+ public void setMaximumNumberOfChannels(Long value)
+ {
+ _session.setMaximumNumberOfChannels(value);
+ }
+
+ public String getObjectInstanceName()
+ {
+ return _name;
+ }
+
+ /**
+ * commits transactions for a transactional channel
+ *
+ * @param channelId
+ * @throws JMException if channel with given id doesn't exist or if commit fails
+ */
+ public void commitTransactions(int channelId) throws JMException
+ {
+ try
+ {
+ AMQChannel channel = _session.getChannel(channelId);
+ if (channel == null)
+ {
+ throw new JMException("The channel (channel Id = " + channelId + ") does not exist");
+ }
+ _session.commitTransactions(channel);
+ }
+ catch (AMQException ex)
+ {
+ throw new MBeanException(ex, ex.toString());
+ }
+ }
+
+ /**
+ * rollsback the transactions for a transactional channel
+ *
+ * @param channelId
+ * @throws JMException if channel with given id doesn't exist or if rollback fails
+ */
+ public void rollbackTransactions(int channelId) throws JMException
+ {
+ try
+ {
+ AMQChannel channel = _session.getChannel(channelId);
+ if (channel == null)
+ {
+ throw new JMException("The channel (channel Id = " + channelId + ") does not exist");
+ }
+ _session.rollbackTransactions(channel);
+ }
+ catch (AMQException ex)
+ {
+ throw new MBeanException(ex, ex.toString());
+ }
+ }
+
+ /**
+ * Creates the list of channels in tabular form from the _channelMap.
+ *
+ * @return list of channels in tabular form.
+ * @throws OpenDataException
+ */
+ public TabularData channels() throws OpenDataException
+ {
+ TabularDataSupport channelsList = new TabularDataSupport(_channelsType);
+ List<AMQChannel> list = _session.getChannels();
+
+ for (AMQChannel channel : list)
+ {
+ Object[] itemValues = {channel.getChannelId(), channel.isTransactional(),
+ (channel.getDefaultQueue() != null) ? channel.getDefaultQueue().getName() : null,
+ channel.getUnacknowledgedMessageMap().size()};
+
+ CompositeData channelData = new CompositeDataSupport(_channelType, _channelAtttibuteNames, itemValues);
+ channelsList.put(channelData);
+ }
+
+ return channelsList;
+ }
+
+ /**
+ * @see AMQMinaProtocolSession#closeChannel(int)
+ */
+ public void closeChannel(int id) throws JMException
+ {
+ try
+ {
+ AMQChannel channel = _session.getChannel(id);
+ if (channel == null)
+ {
+ throw new JMException("The channel (channel Id = " + id + ") does not exist");
+ }
+
+ _session.closeChannel(id);
+ }
+ catch (AMQException ex)
+ {
+ throw new MBeanException(ex, ex.toString());
+ }
+ }
+
+ /**
+ * closes the connection. The administrator can use this management operation to close connection to free up
+ * resources.
+ * @throws JMException
+ */
+ public void closeConnection() throws JMException
+ {
+ try
+ {
+ _session.closeSession();
+ }
+ catch (AMQException ex)
+ {
+ throw new MBeanException(ex, ex.toString());
+ }
+ }
+
+ @Override
+ public MBeanNotificationInfo[] getNotificationInfo()
+ {
+ String[] notificationTypes = new String[]{MonitorNotification.THRESHOLD_VALUE_EXCEEDED};
+ String name = MonitorNotification.class.getName();
+ String description = "Channel count has reached threshold value";
+ MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description);
+
+ return new MBeanNotificationInfo[]{info1};
+ }
+
+ public void notifyClients(String notificationMsg)
+ {
+ Notification n = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this,
+ ++_notificationSequenceNumber, System.currentTimeMillis(), notificationMsg);
+ _broadcaster.sendNotification(n);
+ }
+
+} // End of MBean class
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 3d08805cab..7fff7b9adf 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -21,16 +21,9 @@
package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
-import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.management.AMQManagedObject;
-import org.apache.qpid.server.management.MBeanConstructor;
-import org.apache.qpid.server.management.MBeanDescription;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -38,15 +31,8 @@ import org.apache.qpid.server.txn.TxnBuffer;
import org.apache.qpid.server.txn.TxnOp;
import javax.management.JMException;
-import javax.management.MBeanException;
-import javax.management.MBeanNotificationInfo;
-import javax.management.Notification;
-import javax.management.monitor.MonitorNotification;
-import javax.management.openmbean.*;
import java.text.MessageFormat;
-import java.util.ArrayList;
import java.util.List;
-import java.util.Properties;
import java.util.concurrent.Executor;
/**
@@ -110,7 +96,7 @@ public class AMQQueue implements Managable, Comparable
* max allowed number of messages on a queue.
*/
private Integer _maxMessageCount = 10000;
-
+
/**
* max queue depth(KB) for the queue
*/
@@ -126,322 +112,6 @@ public class AMQQueue implements Managable, Comparable
return _name.compareTo(((AMQQueue) o).getName());
}
- /**
- * MBean class for AMQQueue. It implements all the management features exposed
- * for an AMQQueue.
- */
- @MBeanDescription("Management Interface for AMQQueue")
- private final class AMQQueueMBean extends AMQManagedObject implements ManagedQueue
- {
- private String _queueName = null;
- // OpenMBean data types for viewMessages method
- private String[] _msgAttributeNames = {"Message Id", "Header", "Size(bytes)", "Redelivered"};
- private String[] _msgAttributeIndex = {_msgAttributeNames[0]};
- private OpenType[] _msgAttributeTypes = new OpenType[4]; // AMQ message attribute types.
- private CompositeType _messageDataType = null; // Composite type for representing AMQ Message data.
- private TabularType _messagelistDataType = null; // Datatype for representing AMQ messages list.
-
- // OpenMBean data types for viewMessageContent method
- private CompositeType _msgContentType = null;
- private String[] _msgContentAttributes = {"Message Id", "MimeType", "Encoding", "Content"};
- private OpenType[] _msgContentAttributeTypes = new OpenType[4];
-
- @MBeanConstructor("Creates an MBean exposing an AMQQueue")
- public AMQQueueMBean() throws JMException
- {
- super(ManagedQueue.class, ManagedQueue.TYPE);
- init();
- }
-
- /**
- * initialises the openmbean data types
- */
- private void init() throws OpenDataException
- {
- _queueName = jmxEncode(new StringBuffer(_name), 0).toString();
- _msgContentAttributeTypes[0] = SimpleType.LONG; // For message id
- _msgContentAttributeTypes[1] = SimpleType.STRING; // For MimeType
- _msgContentAttributeTypes[2] = SimpleType.STRING; // For Encoding
- _msgContentAttributeTypes[3] = new ArrayType(1, SimpleType.BYTE); // For message content
- _msgContentType = new CompositeType("Message Content", "AMQ Message Content", _msgContentAttributes,
- _msgContentAttributes, _msgContentAttributeTypes);
-
- _msgAttributeTypes[0] = SimpleType.LONG; // For message id
- _msgAttributeTypes[1] = new ArrayType(1, SimpleType.STRING); // For header attributes
- _msgAttributeTypes[2] = SimpleType.LONG; // For size
- _msgAttributeTypes[3] = SimpleType.BOOLEAN; // For redelivered
-
- _messageDataType = new CompositeType("Message", "AMQ Message", _msgAttributeNames, _msgAttributeNames, _msgAttributeTypes);
- _messagelistDataType = new TabularType("Messages", "List of messages", _messageDataType, _msgAttributeIndex);
- }
-
- public String getObjectInstanceName()
- {
- return _queueName;
- }
-
- public String getName()
- {
- return _name;
- }
-
- public boolean isDurable()
- {
- return _durable;
- }
-
- public String getOwner()
- {
- return _owner;
- }
-
- public boolean isAutoDelete()
- {
- return _autoDelete;
- }
-
- public Integer getMessageCount()
- {
- return _deliveryMgr.getQueueMessageCount();
- }
-
- public Long getMaximumMessageSize()
- {
- return _maxMessageSize;
- }
-
- public void setMaximumMessageSize(Long value)
- {
- _maxMessageSize = value;
- }
-
- public Integer getConsumerCount()
- {
- return _subscribers.size();
- }
-
- public Integer getActiveConsumerCount()
- {
- return _subscribers.getWeight();
- }
-
- public Long getReceivedMessageCount()
- {
- return _totalMessagesReceived;
- }
-
- public Integer getMaximumMessageCount()
- {
- return _maxMessageCount;
- }
-
- public void setMaximumMessageCount(Integer value)
- {
- _maxMessageCount = value;
- }
-
- public Long getMaximumQueueDepth()
- {
- return _maxQueueDepth;
- }
-
- // Sets the queue depth, the max queue size
- public void setMaximumQueueDepth(Long value)
- {
- _maxQueueDepth = value;
- }
-
- /**
- * returns the size of messages(KB) in the queue.
- */
- public Long getQueueDepth()
- {
- List<AMQMessage> list = _deliveryMgr.getMessages();
- if (list.size() == 0)
- {
- return 0l;
- }
-
- long queueDepth = 0;
- for (AMQMessage message : list)
- {
- queueDepth = queueDepth + getMessageSize(message);
- }
- return (long) Math.round(queueDepth / 1000);
- }
-
- /**
- * returns size of message in bytes
- */
- private long getMessageSize(AMQMessage msg)
- {
- if (msg == null)
- {
- return 0l;
- }
-
- return msg.getContentHeaderBody().bodySize;
- }
-
- /**
- * Checks if there is any notification to be send to the listeners
- */
- private void checkForNotification(AMQMessage msg)
- {
- // Check for threshold message count
- Integer msgCount = getMessageCount();
- if (msgCount >= getMaximumMessageCount())
- {
- notifyClients("Message count(" + msgCount + ") has reached or exceeded the threshold high value");
- }
-
- // Check for threshold message size
- long messageSize = getMessageSize(msg);
- if (messageSize >= _maxMessageSize)
- {
- notifyClients("Message size(ID=" + msg.getMessageId() + ", size=" + messageSize + " bytes) is higher than the threshold value");
- }
-
- // Check for threshold queue depth in bytes
- long queueDepth = getQueueDepth();
- if (queueDepth >= _maxQueueDepth)
- {
- notifyClients("Queue depth(" + queueDepth + "), Queue size has reached the threshold high value");
- }
- }
-
- /**
- * Sends the notification to the listeners
- */
- private void notifyClients(String notificationMsg)
- {
- Notification n = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this,
- ++_notificationSequenceNumber, System.currentTimeMillis(), notificationMsg);
-
- _broadcaster.sendNotification(n);
- }
-
- public void deleteMessageFromTop() throws JMException
- {
- try
- {
- _deliveryMgr.removeAMessageFromTop();
- }
- catch (AMQException ex)
- {
- throw new MBeanException(ex, ex.toString());
- }
- }
-
- public void clearQueue() throws JMException
- {
- try
- {
- _deliveryMgr.clearAllMessages();
- }
- catch (AMQException ex)
- {
- throw new MBeanException(ex, ex.toString());
- }
- }
-
- /**
- * returns message content as byte array and related attributes for the given message id.
- */
- public CompositeData viewMessageContent(long msgId) throws JMException
- {
- List<AMQMessage> list = _deliveryMgr.getMessages();
- AMQMessage msg = null;
- for (AMQMessage message : list)
- {
- if (message.getMessageId() == msgId)
- {
- msg = message;
- break;
- }
- }
-
- if (msg == null)
- {
- throw new JMException("AMQMessage with message id = " + msgId + " is not in the " + _queueName);
- }
- // get message content
- List<ContentBody> cBodies = msg.getContentBodies();
- List<Byte> msgContent = new ArrayList<Byte>();
- for (ContentBody body : cBodies)
- {
- if (body.getSize() != 0)
- {
- ByteBuffer slice = body.payload.slice();
- for (int j = 0; j < slice.limit(); j++)
- {
- msgContent.add(slice.get());
- }
- }
- }
-
- // Create header attributes list
- BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) msg.getContentHeaderBody().properties;
- String mimeType = headerProperties.getContentType();
- String encoding = headerProperties.getEncoding() == null ? "" : headerProperties.getEncoding();
- Object[] itemValues = {msgId, mimeType, encoding, msgContent.toArray(new Byte[0])};
-
- return new CompositeDataSupport(_msgContentType, _msgContentAttributes, itemValues);
- }
-
- /**
- * Returns the header contents of the messages stored in this queue in tabular form.
- */
- public TabularData viewMessages(int beginIndex, int endIndex) throws JMException
- {
- if ((beginIndex > endIndex) || (beginIndex < 1))
- {
- throw new JMException("From Index = " + beginIndex + ", To Index = " + endIndex +
- "\nFrom Index should be greater than 0 and less than To Index");
- }
-
- List<AMQMessage> list = _deliveryMgr.getMessages();
- TabularDataSupport _messageList = new TabularDataSupport(_messagelistDataType);
-
- // Create the tabular list of message header contents
- for (int i = beginIndex; i <= endIndex && i <= list.size(); i++)
- {
- AMQMessage msg = list.get(i - 1);
- ContentHeaderBody headerBody = msg.getContentHeaderBody();
- // Create header attributes list
- BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) headerBody.properties;
- List<String> headerAttribsList = new ArrayList<String>();
- headerAttribsList.add("App Id=" + headerProperties.getAppId());
- headerAttribsList.add("MimeType=" + headerProperties.getContentType());
- headerAttribsList.add("Correlation Id=" + headerProperties.getCorrelationId());
- headerAttribsList.add("Encoding=" + headerProperties.getEncoding());
- headerAttribsList.add(headerProperties.toString());
-
- Object[] itemValues = {msg.getMessageId(), headerAttribsList.toArray(new String[0]),
- headerBody.bodySize, msg.isRedelivered()};
- CompositeData messageData = new CompositeDataSupport(_messageDataType, _msgAttributeNames, itemValues);
- _messageList.put(messageData);
- }
-
- return _messageList;
- }
-
- /**
- * returns Notifications sent by this MBean.
- */
- @Override
- public MBeanNotificationInfo[] getNotificationInfo()
- {
- String[] notificationTypes = new String[]{MonitorNotification.THRESHOLD_VALUE_EXCEEDED};
- String name = MonitorNotification.class.getName();
- String description = "Either Message count or Queue depth or Message size has reached threshold high value";
- MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description);
-
- return new MBeanNotificationInfo[]{info1};
- }
-
- } // End of AMQMBean class
-
public AMQQueue(String name, boolean durable, String owner,
boolean autoDelete, QueueRegistry queueRegistry)
throws AMQException
@@ -523,33 +193,32 @@ public class AMQQueue implements Managable, Comparable
{
if (System.getProperties().getProperty("deliverymanager").equals("ConcurrentSelectorDeliveryManager"))
{
- _logger.warn("Using ConcurrentSelectorDeliveryManager");
+ _logger.info("Using ConcurrentSelectorDeliveryManager");
_deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
}
else if (System.getProperties().getProperty("deliverymanager").equals("ConcurrentDeliveryManager"))
{
- _logger.warn("Using ConcurrentDeliveryManager");
+ _logger.info("Using ConcurrentDeliveryManager");
_deliveryMgr = new ConcurrentDeliveryManager(_subscribers, this);
}
else
{
- _logger.warn("Using SynchronizedDeliveryManager");
+ _logger.info("Using SynchronizedDeliveryManager");
_deliveryMgr = new SynchronizedDeliveryManager(_subscribers, this);
}
}
else
{
- _logger.warn("Using SynchronizedDeliveryManager");
+ _logger.info("Using SynchronizedDeliveryManager");
_deliveryMgr = new SynchronizedDeliveryManager(_subscribers, this);
}
-
}
private AMQQueueMBean createMBean() throws AMQException
{
try
{
- return new AMQQueueMBean();
+ return new AMQQueueMBean(this);
}
catch (JMException ex)
{
@@ -582,16 +251,112 @@ public class AMQQueue implements Managable, Comparable
return _autoDelete;
}
+ /**
+ * @return no of messages(undelivered) on the queue.
+ */
public int getMessageCount()
{
return _deliveryMgr.getQueueMessageCount();
}
+ /**
+ * @return List of messages(undelivered) on the queue.
+ */
+ public List<AMQMessage> getMessagesOnTheQueue()
+ {
+ return _deliveryMgr.getMessages();
+ }
+
+ /**
+ * @param messageId
+ * @return AMQMessage with give id if exists. null if AMQMessage with given id doesn't exist.
+ */
+ public AMQMessage getMessageOnTheQueue(long messageId)
+ {
+ List<AMQMessage> list = getMessagesOnTheQueue();
+ AMQMessage msg = null;
+ for (AMQMessage message : list)
+ {
+ if (message.getMessageId() == messageId)
+ {
+ msg = message;
+ break;
+ }
+ }
+
+ return msg;
+ }
+
+ /**
+ * @return MBean object associated with this Queue
+ */
public ManagedObject getManagedObject()
{
return _managedObject;
}
+ public Long getMaximumMessageSize()
+ {
+ return _maxMessageSize;
+ }
+
+ public void setMaximumMessageSize(Long value)
+ {
+ _maxMessageSize = value;
+ }
+
+ public Integer getConsumerCount()
+ {
+ return _subscribers.size();
+ }
+
+ public Integer getActiveConsumerCount()
+ {
+ return _subscribers.getWeight();
+ }
+
+ public Long getReceivedMessageCount()
+ {
+ return _totalMessagesReceived;
+ }
+
+ public Integer getMaximumMessageCount()
+ {
+ return _maxMessageCount;
+ }
+
+ public void setMaximumMessageCount(Integer value)
+ {
+ _maxMessageCount = value;
+ }
+
+ public Long getMaximumQueueDepth()
+ {
+ return _maxQueueDepth;
+ }
+
+ // Sets the queue depth, the max queue size
+ public void setMaximumQueueDepth(Long value)
+ {
+ _maxQueueDepth = value;
+ }
+
+ /**
+ * Removes the AMQMessage from the top of the queue.
+ */
+ public void deleteMessageFromTop() throws AMQException
+ {
+ _deliveryMgr.removeAMessageFromTop();
+ }
+
+ /**
+ * removes all the messages from the queue.
+ */
+ public void clearQueue() throws AMQException
+ {
+ _deliveryMgr.clearAllMessages();
+ }
+
public void bind(String routingKey, Exchange exchange)
{
_bindings.addBinding(routingKey, exchange);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
new file mode 100644
index 0000000000..54dd366d71
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
@@ -0,0 +1,343 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.management.MBeanDescription;
+import org.apache.qpid.server.management.AMQManagedObject;
+import org.apache.qpid.server.management.MBeanConstructor;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.mina.common.ByteBuffer;
+
+import javax.management.openmbean.*;
+import javax.management.JMException;
+import javax.management.Notification;
+import javax.management.MBeanException;
+import javax.management.MBeanNotificationInfo;
+import javax.management.OperationsException;
+import javax.management.monitor.MonitorNotification;
+import java.util.List;
+import java.util.ArrayList;
+
+/**
+ * MBean class for AMQQueue. It implements all the management features exposed
+ * for an AMQQueue.
+ */
+@MBeanDescription("Management Interface for AMQQueue")
+public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue
+{
+ private AMQQueue _queue = null;
+ private String _queueName = null;
+ // OpenMBean data types for viewMessages method
+ private String[] _msgAttributeNames = {"Message Id", "Header", "Size(bytes)", "Redelivered"};
+ private String[] _msgAttributeIndex = {_msgAttributeNames[0]};
+ private OpenType[] _msgAttributeTypes = new OpenType[4]; // AMQ message attribute types.
+ private CompositeType _messageDataType = null; // Composite type for representing AMQ Message data.
+ private TabularType _messagelistDataType = null; // Datatype for representing AMQ messages list.
+
+ // OpenMBean data types for viewMessageContent method
+ private CompositeType _msgContentType = null;
+ private String[] _msgContentAttributes = {"Message Id", "MimeType", "Encoding", "Content"};
+ private OpenType[] _msgContentAttributeTypes = new OpenType[4];
+
+ @MBeanConstructor("Creates an MBean exposing an AMQQueue")
+ public AMQQueueMBean(AMQQueue queue) throws JMException
+ {
+ super(ManagedQueue.class, ManagedQueue.TYPE);
+ _queue = queue;
+ _queueName = jmxEncode(new StringBuffer(queue.getName()), 0).toString();
+ init();
+ }
+
+ /**
+ * initialises the openmbean data types
+ */
+ private void init() throws OpenDataException
+ {
+ _msgContentAttributeTypes[0] = SimpleType.LONG; // For message id
+ _msgContentAttributeTypes[1] = SimpleType.STRING; // For MimeType
+ _msgContentAttributeTypes[2] = SimpleType.STRING; // For Encoding
+ _msgContentAttributeTypes[3] = new ArrayType(1, SimpleType.BYTE); // For message content
+ _msgContentType = new CompositeType("Message Content", "AMQ Message Content", _msgContentAttributes,
+ _msgContentAttributes, _msgContentAttributeTypes);
+
+ _msgAttributeTypes[0] = SimpleType.LONG; // For message id
+ _msgAttributeTypes[1] = new ArrayType(1, SimpleType.STRING); // For header attributes
+ _msgAttributeTypes[2] = SimpleType.LONG; // For size
+ _msgAttributeTypes[3] = SimpleType.BOOLEAN; // For redelivered
+
+ _messageDataType = new CompositeType("Message", "AMQ Message", _msgAttributeNames, _msgAttributeNames, _msgAttributeTypes);
+ _messagelistDataType = new TabularType("Messages", "List of messages", _messageDataType, _msgAttributeIndex);
+ }
+
+ public String getObjectInstanceName()
+ {
+ return _queueName;
+ }
+
+ public String getName()
+ {
+ return _queueName;
+ }
+
+ public boolean isDurable()
+ {
+ return _queue.isDurable();
+ }
+
+ public String getOwner()
+ {
+ return _queue.getOwner();
+ }
+
+ public boolean isAutoDelete()
+ {
+ return _queue.isAutoDelete();
+ }
+
+ public Integer getMessageCount()
+ {
+ return _queue.getMessageCount();
+ }
+
+ public Long getMaximumMessageSize()
+ {
+ return _queue.getMaximumMessageSize();
+ }
+
+ public void setMaximumMessageSize(Long value)
+ {
+ _queue.setMaximumMessageSize(value);
+ }
+
+ public Integer getConsumerCount()
+ {
+ return _queue.getConsumerCount();
+ }
+
+ public Integer getActiveConsumerCount()
+ {
+ return _queue.getActiveConsumerCount();
+ }
+
+ public Long getReceivedMessageCount()
+ {
+ return _queue.getReceivedMessageCount();
+ }
+
+ public Integer getMaximumMessageCount()
+ {
+ return _queue.getMaximumMessageCount();
+ }
+
+ public void setMaximumMessageCount(Integer value)
+ {
+ _queue.setMaximumMessageCount(value);
+ }
+
+ public Long getMaximumQueueDepth()
+ {
+ return _queue.getMaximumQueueDepth();
+ }
+
+ public void setMaximumQueueDepth(Long value)
+ {
+ _queue.setMaximumQueueDepth(value);
+ }
+
+ /**
+ * returns the size of messages(KB) in the queue.
+ */
+ public Long getQueueDepth()
+ {
+ List<AMQMessage> list = _queue.getMessagesOnTheQueue();
+ if (list.size() == 0)
+ {
+ return 0l;
+ }
+
+ long queueDepth = 0;
+ for (AMQMessage message : list)
+ {
+ queueDepth = queueDepth + getMessageSize(message);
+ }
+ return (long) Math.round(queueDepth / 1000);
+ }
+
+ /**
+ * returns size of message in bytes
+ */
+ private long getMessageSize(AMQMessage msg)
+ {
+ if (msg == null)
+ {
+ return 0l;
+ }
+
+ return msg.getContentHeaderBody().bodySize;
+ }
+
+ /**
+ * Checks if there is any notification to be send to the listeners
+ */
+ public void checkForNotification(AMQMessage msg)
+ {
+ // Check for threshold message count
+ Integer msgCount = getMessageCount();
+ if (msgCount >= getMaximumMessageCount())
+ {
+ notifyClients("Message count(" + msgCount + ") has reached or exceeded the threshold high value");
+ }
+
+ // Check for threshold message size
+ long messageSize = getMessageSize(msg);
+ if (messageSize >= _queue.getMaximumMessageSize())
+ {
+ notifyClients("Message size(ID=" + msg.getMessageId() + ", size=" + messageSize + " bytes) is higher than the threshold value");
+ }
+
+ // Check for threshold queue depth in bytes
+ long queueDepth = getQueueDepth();
+ if (queueDepth >= _queue.getMaximumQueueDepth())
+ {
+ notifyClients("Queue depth(" + queueDepth + "), Queue size has reached the threshold high value");
+ }
+ }
+
+ /**
+ * Sends the notification to the listeners
+ */
+ private void notifyClients(String notificationMsg)
+ {
+ Notification n = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this,
+ ++_notificationSequenceNumber, System.currentTimeMillis(), notificationMsg);
+
+ _broadcaster.sendNotification(n);
+ }
+
+ /**
+ * @see org.apache.qpid.server.queue.AMQQueue#deleteMessageFromTop()
+ */
+ public void deleteMessageFromTop() throws JMException
+ {
+ try
+ {
+ _queue.deleteMessageFromTop();
+ }
+ catch (AMQException ex)
+ {
+ throw new MBeanException(ex, ex.toString());
+ }
+ }
+
+ /**
+ * @see org.apache.qpid.server.queue.AMQQueue#clearQueue()
+ */
+ public void clearQueue() throws JMException
+ {
+ try
+ {
+ _queue.clearQueue();
+ }
+ catch (AMQException ex)
+ {
+ throw new MBeanException(ex, ex.toString());
+ }
+ }
+
+ /**
+ * returns message content as byte array and related attributes for the given message id.
+ */
+ public CompositeData viewMessageContent(long msgId) throws JMException
+ {
+ AMQMessage msg = _queue.getMessageOnTheQueue(msgId);
+ if (msg == null)
+ {
+ throw new OperationsException("AMQMessage with message id = " + msgId + " is not in the " + _queueName);
+ }
+ // get message content
+ List<ContentBody> cBodies = msg.getContentBodies();
+ List<Byte> msgContent = new ArrayList<Byte>();
+ for (ContentBody body : cBodies)
+ {
+ if (body.getSize() != 0)
+ {
+ ByteBuffer slice = body.payload.slice();
+ for (int j = 0; j < slice.limit(); j++)
+ {
+ msgContent.add(slice.get());
+ }
+ }
+ }
+
+ // Create header attributes list
+ BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) msg.getContentHeaderBody().properties;
+ String mimeType = headerProperties.getContentType();
+ String encoding = headerProperties.getEncoding() == null ? "" : headerProperties.getEncoding();
+ Object[] itemValues = {msgId, mimeType, encoding, msgContent.toArray(new Byte[0])};
+
+ return new CompositeDataSupport(_msgContentType, _msgContentAttributes, itemValues);
+ }
+
+ /**
+ * Returns the header contents of the messages stored in this queue in tabular form.
+ */
+ public TabularData viewMessages(int beginIndex, int endIndex) throws JMException
+ {
+ if ((beginIndex > endIndex) || (beginIndex < 1))
+ {
+ throw new OperationsException("From Index = " + beginIndex + ", To Index = " + endIndex +
+ "\n\"From Index\" should be greater than 0 and less than \"To Index\"");
+ }
+
+ List<AMQMessage> list = _queue.getMessagesOnTheQueue();
+ TabularDataSupport _messageList = new TabularDataSupport(_messagelistDataType);
+
+ // Create the tabular list of message header contents
+ for (int i = beginIndex; i <= endIndex && i <= list.size(); i++)
+ {
+ AMQMessage msg = list.get(i - 1);
+ ContentHeaderBody headerBody = msg.getContentHeaderBody();
+ // Create header attributes list
+ BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) headerBody.properties;
+ String[] headerAttributes = headerProperties.toString().split(",");
+ Object[] itemValues = {msg.getMessageId(), headerAttributes, headerBody.bodySize, msg.isRedelivered()};
+ CompositeData messageData = new CompositeDataSupport(_messageDataType, _msgAttributeNames, itemValues);
+ _messageList.put(messageData);
+ }
+
+ return _messageList;
+ }
+
+ /**
+ * returns Notifications sent by this MBean.
+ */
+ @Override
+ public MBeanNotificationInfo[] getNotificationInfo()
+ {
+ String[] notificationTypes = new String[]{MonitorNotification.THRESHOLD_VALUE_EXCEEDED};
+ String name = MonitorNotification.class.getName();
+ String description = "Either Message count or Queue depth or Message size has reached threshold high value";
+ MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description);
+
+ return new MBeanNotificationInfo[]{info1};
+ }
+
+} // End of AMQMBean class
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java
index dde76e5ba8..f9c8898182 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java
@@ -281,7 +281,12 @@ public class ConcurrentDeliveryManager implements DeliveryManager
//are we already running? if so, don't re-run
if (_processing.compareAndSet(false, true))
{
- executor.execute(asyncDelivery);
+ // Do we need this?
+ // This executor is created via Executors in AsyncDeliveryConfig which only returns a TPE so cast is ok.
+ //if (executor != null && !((ThreadPoolExecutor) executor).isShutdown())
+ {
+ executor.execute(asyncDelivery);
+ }
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
new file mode 100644
index 0000000000..d8bb6e1948
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -0,0 +1,352 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
+import org.apache.qpid.configuration.Configured;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.server.configuration.Configurator;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+
+/**
+ * Manages delivery of messages on behalf of a queue
+ */
+public class ConcurrentSelectorDeliveryManager implements DeliveryManager
+{
+ private static final Logger _log = Logger.getLogger(ConcurrentSelectorDeliveryManager.class);
+
+ @Configured(path = "advanced.compressBufferOnQueue",
+ defaultValue = "false")
+ public boolean compressBufferOnQueue;
+ /**
+ * Holds any queued messages
+ */
+ private final Queue<AMQMessage> _messages = new ConcurrentLinkedQueueAtomicSize<AMQMessage>();
+ //private int _messageCount;
+ /**
+ * Ensures that only one asynchronous task is running for this manager at
+ * any time.
+ */
+ private final AtomicBoolean _processing = new AtomicBoolean();
+ /**
+ * The subscriptions on the queue to whom messages are delivered
+ */
+ private final SubscriptionManager _subscriptions;
+
+ /**
+ * A reference to the queue we are delivering messages for. We need this to be able
+ * to pass the code that handles acknowledgements a handle on the queue.
+ */
+ private final AMQQueue _queue;
+
+
+ /**
+ * Lock used to ensure that an channel that becomes unsuspended during the start of the queueing process is forced
+ * to wait till the first message is added to the queue. This will ensure that the _queue has messages to be delivered
+ * via the async thread.
+ * <p/>
+ * Lock is used to control access to hasQueuedMessages() and over the addition of messages to the queue.
+ */
+ private ReentrantLock _lock = new ReentrantLock();
+
+
+ ConcurrentSelectorDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue)
+ {
+
+ //Set values from configuration
+ Configurator.configure(this);
+
+ if (compressBufferOnQueue)
+ {
+ _log.warn("Compressing Buffers on queue.");
+ }
+
+ _subscriptions = subscriptions;
+ _queue = queue;
+ }
+
+
+ private boolean addMessageToQueue(AMQMessage msg)
+ {
+ // Shrink the ContentBodies to their actual size to save memory.
+ if (compressBufferOnQueue)
+ {
+ Iterator it = msg.getContentBodies().iterator();
+ while (it.hasNext())
+ {
+ ContentBody cb = (ContentBody) it.next();
+ cb.reduceBufferToFit();
+ }
+ }
+
+ _messages.offer(msg);
+
+ return true;
+ }
+
+
+ public boolean hasQueuedMessages()
+ {
+ _lock.lock();
+ try
+ {
+ return !_messages.isEmpty();
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ public int getQueueMessageCount()
+ {
+ return getMessageCount();
+ }
+
+ /**
+ * This is an EXPENSIVE opperation to perform with a ConcurrentLinkedQueue as it must run the queue to determine size.
+ * The ConcurrentLinkedQueueAtomicSize uses an AtomicInteger to record the number of elements on the queue.
+ *
+ * @return int the number of messages in the delivery queue.
+ */
+ private int getMessageCount()
+ {
+ return _messages.size();
+ }
+
+
+ public synchronized List<AMQMessage> getMessages()
+ {
+ return new ArrayList<AMQMessage>(_messages);
+ }
+
+ public synchronized void removeAMessageFromTop() throws AMQException
+ {
+ AMQMessage msg = poll();
+ if (msg != null)
+ {
+ msg.dequeue(_queue);
+ }
+ }
+
+ public synchronized void clearAllMessages() throws AMQException
+ {
+ AMQMessage msg = poll();
+ while (msg != null)
+ {
+ msg.dequeue(_queue);
+ msg = poll();
+ }
+ }
+
+
+ private AMQMessage getNextMessage(Queue<AMQMessage> messages)
+ {
+ AMQMessage message = messages.peek();
+
+ while (message != null && message.taken())
+ {
+ //remove the already taken message
+ messages.poll();
+ // try the next message
+ message = messages.peek();
+ }
+ return message;
+ }
+
+ public void sendNextMessage(Subscription sub, Queue<AMQMessage> messageQueue, AMQQueue queue)
+ {
+ AMQMessage message = null;
+ try
+ {
+ message = getNextMessage(messageQueue);
+
+ // message will be null if we have no messages in the messageQueue.
+ if (message == null)
+ {
+ return;
+ }
+ _log.info("Async Delivery Message:" + message + " to :" + sub);
+
+ sub.send(message, queue);
+ message.setDeliveredToConsumer();
+
+ //remove sent message from our queue.
+ messageQueue.poll();
+ }
+ catch (FailedDequeueException e)
+ {
+ message.release();
+ _log.error("Unable to deliver message as dequeue failed: " + e, e);
+ }
+ }
+
+ /**
+ * Only one thread should ever execute this method concurrently, but
+ * it can do so while other threads invoke deliver().
+ */
+ private void processQueue()
+ {
+ // Continue to process delivery while we haveSubscribers and messages
+ boolean hasSubscribers = _subscriptions.hasActiveSubscribers();
+
+ while (hasSubscribers && hasQueuedMessages())
+ {
+ for (Subscription sub : _subscriptions.getSubscriptions())
+ {
+ if (!sub.isSuspended())
+ {
+ if (sub.hasFilters())
+ {
+ sendNextMessage(sub, sub.getPreDeliveryQueue(), _queue);
+ }
+ else
+ {
+ sendNextMessage(sub, _messages, _queue);
+ }
+
+ hasSubscribers = true;
+ }
+ else
+ {
+ hasSubscribers = false;
+ }
+ }
+ }
+ }
+
+ private AMQMessage poll()
+ {
+ return _messages.poll();
+ }
+
+ public void deliver(String name, AMQMessage msg) throws FailedDequeueException
+ {
+ _log.info("deliver :" + msg);
+
+ //Check if we have someone to deliver the message to.
+ _lock.lock();
+ try
+ {
+ Subscription s = _subscriptions.nextSubscriber(msg);
+
+ if (s == null) //no-one can take the message right now.
+ {
+ _log.info("Testing Message(" + msg + ") for Queued Delivery");
+ if (!msg.isImmediate())
+ {
+ addMessageToQueue(msg);
+
+ //release lock now message is on queue.
+ _lock.unlock();
+
+ //Pre Deliver to all subscriptions
+ _log.info("We have " + _subscriptions.getSubscriptions().size() + " subscribers to give the message to.");
+ for (Subscription sub : _subscriptions.getSubscriptions())
+ {
+
+ // stop if the message gets delivered whilst PreDelivering if we have a shared queue.
+ if (_queue.isShared() && msg.getDeliveredToConsumer())
+ {
+ _log.info("Stopping PreDelivery as message(" + msg + ") is already delivered.");
+ continue;
+ }
+
+ // Only give the message to those that want them.
+ if (sub.hasFilters() && sub.hasInterest(msg))
+ {
+ sub.enqueueForPreDelivery(msg);
+ }
+ }
+ }
+ }
+ else
+ {
+ //release lock now
+ _lock.unlock();
+
+ _log.info("Delivering Message:" + msg + " to(" + System.identityHashCode(s) + ") :" + s);
+ //Deliver the message
+ s.send(msg, _queue);
+ msg.setDeliveredToConsumer();
+ }
+ }
+ finally
+ {
+ //ensure lock is released
+ if (_lock.isLocked())
+ {
+ _lock.unlock();
+ }
+ }
+ }
+
+ Runner asyncDelivery = new Runner();
+
+ private class Runner implements Runnable
+ {
+ public void run()
+ {
+ boolean running = true;
+ while (running)
+ {
+ processQueue();
+
+ //Check that messages have not been added since we did our last peek();
+ // Synchronize with the thread that adds to the queue.
+ // If the queue is still empty then we can exit
+
+ if (!(hasQueuedMessages() && _subscriptions.hasActiveSubscribers()))
+ {
+ running = false;
+ _processing.set(false);
+ }
+ }
+ }
+ }
+
+ public void processAsync(Executor executor)
+ {
+ _log.debug("Processing Async. Queued:" + hasQueuedMessages() + "(" + getQueueMessageCount() + ")" +
+ " Active:" + _subscriptions.hasActiveSubscribers() +
+ " Processing:" + _processing.get());
+
+ if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers())
+ {
+ //are we already running? if so, don't re-run
+ if (_processing.compareAndSet(false, true))
+ {
+ executor.execute(asyncDelivery);
+ }
+ }
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
index 7105a3f30f..79b0593f69 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -118,6 +118,12 @@ public class SubscriptionImpl implements Subscription
}
}
+ public SubscriptionImpl(int channel, AMQProtocolSession protocolSession,
+ String consumerTag)
+ throws AMQException
+ {
+ this(channel, protocolSession, consumerTag, false);
+ }
public boolean equals(Object o)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
index 301182a313..49b0111b67 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
@@ -206,7 +206,12 @@ class SynchronizedDeliveryManager implements DeliveryManager
//are we already running? if so, don't re-run
if (_processing.compareAndSet(false, true))
{
- executor.execute(new Runner());
+ // Do we need this?
+ // This executor is created via Executors in AsyncDeliveryConfig which only returns a TPE so cast is ok.
+ //if (executor != null && !((ThreadPoolExecutor) executor).isShutdown())
+ {
+ executor.execute(new Runner());
+ }
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
index 5e88ff7f2d..7a13884136 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
@@ -109,6 +109,7 @@ public class AMQStateManager implements AMQMethodListener
frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());
frame2handlerMap.put(ExchangeDeclareBody.class, ExchangeDeclareHandler.getInstance());
frame2handlerMap.put(ExchangeDeleteBody.class, ExchangeDeleteHandler.getInstance());
+ frame2handlerMap.put(ExchangeBoundBody.class, ExchangeBoundHandler.getInstance());
frame2handlerMap.put(BasicAckBody.class, BasicAckMethodHandler.getInstance());
frame2handlerMap.put(BasicRecoverBody.class, BasicRecoverMethodHandler.getInstance());
frame2handlerMap.put(BasicConsumeBody.class, BasicConsumeMethodHandler.getInstance());