summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--java/broker/bin/qpid-server.bat2
-rw-r--r--java/broker/pom.xml6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java34
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java45
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java30
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java44
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java11
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java13
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java22
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java163
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java259
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java241
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java439
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java343
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java352
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java1
20 files changed, 1451 insertions, 577 deletions
diff --git a/java/broker/bin/qpid-server.bat b/java/broker/bin/qpid-server.bat
index ad3a20f459..b839cc72d8 100644
--- a/java/broker/bin/qpid-server.bat
+++ b/java/broker/bin/qpid-server.bat
@@ -63,6 +63,6 @@ goto loop
:runCommand
set LAUNCH_JAR=%QPID_HOME%\lib\qpid-incubating.jar
set MODULE_JARS=%QPID_MODULE_JARS%
-"%JAVA_HOME%"\bin\java -server -Xmx1024m -DQPID_HOME="%QPID_HOME%" -cp "%LAUNCH_JAR%;%MODULE_JARS%" org.apache.qpid.server.Main %QPID_ARGS%
+"%JAVA_HOME%\bin\java" -server -Xmx1024m -DQPID_HOME="%QPID_HOME%" -cp "%LAUNCH_JAR%;%MODULE_JARS%" org.apache.qpid.server.Main %QPID_ARGS%
:end
diff --git a/java/broker/pom.xml b/java/broker/pom.xml
index f6dd171214..5f4c490fd4 100644
--- a/java/broker/pom.xml
+++ b/java/broker/pom.xml
@@ -34,6 +34,7 @@
<properties>
<topDirectoryLocation>..</topDirectoryLocation>
+ <amqj.logging.level>warn</amqj.logging.level>
</properties>
<dependencies>
@@ -53,7 +54,7 @@
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
</dependency>
- <dependency>
+ <dependency>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-jms_1.1_spec</artifactId>
</dependency>
@@ -85,7 +86,6 @@
<build>
<plugins>
-
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>javacc-maven-plugin</artifactId>
@@ -116,7 +116,7 @@
</property>
<property>
<name>amqj.logging.level</name>
- <value>WARN</value>
+ <value>${amqj.logging.level}</value>
</property>
<property>
<name>log4j.configuration</name>
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
index 6dc97f9e48..ffd0e5d8ae 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -197,4 +197,34 @@ public class DestNameExchange extends AbstractExchange
}
}
}
+
+ public boolean isBound(String routingKey, AMQQueue queue) throws AMQException
+ {
+ final List<AMQQueue> queues = _index.get(routingKey);
+ return queues != null && queues.contains(queue);
+ }
+
+ public boolean isBound(String routingKey) throws AMQException
+ {
+ final List<AMQQueue> queues = _index.get(routingKey);
+ return queues != null && !queues.isEmpty();
+ }
+
+ public boolean isBound(AMQQueue queue) throws AMQException
+ {
+ Map<String, List<AMQQueue>> bindings = _index.getBindingsMap();
+ for (List<AMQQueue> queues : bindings.values())
+ {
+ if (queues.contains(queue))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public boolean hasBindings() throws AMQException
+ {
+ return !_index.getBindingsMap().isEmpty();
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
index a692f9ebca..139307488e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -126,10 +126,11 @@ public class DestWildExchange extends AbstractExchange
} // End of MBean class
- public void registerQueue(String routingKey, AMQQueue queue, FieldTable args) throws AMQException
+ public synchronized void registerQueue(String routingKey, AMQQueue queue, FieldTable args) throws AMQException
{
assert queue != null;
assert routingKey != null;
+ _logger.debug("Registering queue " + queue.getName() + " with routing key " + routingKey);
// we need to use putIfAbsent, which is an atomic operation, to avoid a race condition
List<AMQQueue> queueList = _routingKey2queues.putIfAbsent(routingKey, new CopyOnWriteArrayList<AMQQueue>());
// if we got null back, no previous value was associated with the specified routing key hence
@@ -159,6 +160,8 @@ public class DestWildExchange extends AbstractExchange
// TODO: add support for the immediate flag
if (queues == null)
{
+ _logger.warn("No queues found for routing key " + routingKey);
+ _logger.warn("Routing map contains: " + _routingKey2queues);
//todo Check for valid topic - mritchie
return;
}
@@ -172,7 +175,37 @@ public class DestWildExchange extends AbstractExchange
}
}
- public void deregisterQueue(String routingKey, AMQQueue queue) throws AMQException
+ public boolean isBound(String routingKey, AMQQueue queue) throws AMQException
+ {
+ List<AMQQueue> queues = _routingKey2queues.get(routingKey);
+ return queues != null && queues.contains(queue);
+ }
+
+
+ public boolean isBound(String routingKey) throws AMQException
+ {
+ List<AMQQueue> queues = _routingKey2queues.get(routingKey);
+ return queues != null && !queues.isEmpty();
+ }
+
+ public boolean isBound(AMQQueue queue) throws AMQException
+ {
+ for (List<AMQQueue> queues : _routingKey2queues.values())
+ {
+ if (queues.contains(queue))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public boolean hasBindings() throws AMQException
+ {
+ return !_routingKey2queues.isEmpty();
+ }
+
+ public synchronized void deregisterQueue(String routingKey, AMQQueue queue) throws AMQException
{
assert queue != null;
assert routingKey != null;
@@ -190,6 +223,10 @@ public class DestWildExchange extends AbstractExchange
throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() +
" with routing key " + routingKey);
}
+ if (queues.isEmpty())
+ {
+ _routingKey2queues.remove(queues);
+ }
}
protected ExchangeMBean createMBean() throws AMQException
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
index 787d0eddfd..824e85dc5c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -47,4 +47,30 @@ public interface Exchange
void deregisterQueue(String routingKey, AMQQueue queue) throws AMQException;
void route(AMQMessage message) throws AMQException;
+
+ /**
+ * Determines whether a message would be isBound to a particular queue using a specific routing key
+ * @param routingKey
+ * @param queue
+ * @return
+ * @throws AMQException
+ */
+ boolean isBound(String routingKey, AMQQueue queue) throws AMQException;
+
+ /**
+ * Determines whether a message is routing to any queue using a specific routing key
+ * @param routingKey
+ * @return
+ * @throws AMQException
+ */
+ boolean isBound(String routingKey) throws AMQException;
+
+ boolean isBound(AMQQueue queue) throws AMQException;
+
+ /**
+ * Returns true if this exchange has at least one binding associated with it.
+ * @return
+ * @throws AMQException
+ */
+ boolean hasBindings() throws AMQException;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
index ccb2211a55..8c4df68dea 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -206,10 +206,48 @@ public class HeadersExchange extends AbstractExchange
}
if (!delivered)
{
- _logger.warn("Exchange " + getName() + ": message not routable.");
+
+ String msg = "Exchange " + getName() + ": message not routable.";
+
+ if (payload.getPublishBody().mandatory)
+ {
+ throw new NoRouteException(msg, payload);
+ }
+ else
+ {
+ _logger.warn(msg);
+ }
+
}
}
+ public boolean isBound(String routingKey, AMQQueue queue) throws AMQException
+ {
+ return isBound(queue);
+ }
+
+ public boolean isBound(String routingKey) throws AMQException
+ {
+ return hasBindings();
+ }
+
+ public boolean isBound(AMQQueue queue) throws AMQException
+ {
+ for (Registration r : _bindings)
+ {
+ if (r.queue.equals(queue))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public boolean hasBindings() throws AMQException
+ {
+ return !_bindings.isEmpty();
+ }
+
protected Map getHeaders(ContentHeaderBody contentHeaderFrame)
{
//what if the content type is not 'basic'? 'file' and 'stream' content classes also define headers,
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java
index fb48729c9e..485c4739bd 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,6 +24,7 @@ import org.apache.qpid.server.queue.AMQQueue;
import java.util.List;
import java.util.Map;
+import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -37,7 +38,7 @@ class Index
private ConcurrentMap<String, List<AMQQueue>> _index
= new ConcurrentHashMap<String, List<AMQQueue>>();
- boolean add(String key, AMQQueue queue)
+ synchronized boolean add(String key, AMQQueue queue)
{
List<AMQQueue> queues = _index.get(key);
if(queues == null)
@@ -61,7 +62,7 @@ class Index
}
}
- boolean remove(String key, AMQQueue queue)
+ synchronized boolean remove(String key, AMQQueue queue)
{
List<AMQQueue> queues = _index.get(key);
if (queues != null)
@@ -83,6 +84,6 @@ class Index
Map<String, List<AMQQueue>> getBindingsMap()
{
- return _index;
+ return new HashMap<String, List<AMQQueue>>(_index);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java b/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java
index 5c106de6f2..23e7355f17 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.filter;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.AMQException;
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
@@ -35,7 +36,7 @@ public class FilterManagerFactory
//fixme move to a common class so it can be refered to from client code.
private static String JMS_SELECTOR_FILTER = "x-filter-jms-selector";
- public static FilterManager createManager(FieldTable filters)
+ public static FilterManager createManager(FieldTable filters) throws AMQException
{
FilterManager manager = null;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java b/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
index e54fdb944d..86caf171fb 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
@@ -23,6 +23,9 @@ package org.apache.qpid.server.filter;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.filter.jms.selector.SelectorParser;
import org.apache.qpid.server.message.jms.JMSMessage;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInvalidSelectorException;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.log4j.Logger;
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
@@ -33,13 +36,13 @@ import javax.jms.JMSException;
public class JMSSelectorFilter implements MessageFilter
{
- private final static Logger _logger = org.apache.log4j.Logger.getLogger(JMSSelectorFilter.class);
+ private final static Logger _logger = org.apache.log4j.Logger.getLogger(JMSSelectorFilter.class);
// LoggerFactory.getLogger(JMSSelectorFilter.class);
private String _selector;
private BooleanExpression _matcher;
- public JMSSelectorFilter(String selector)
+ public JMSSelectorFilter(String selector) throws AMQException
{
_selector = selector;
_logger.info("Created JMSSelectorFilter with selector:" + _selector);
@@ -53,8 +56,8 @@ public class JMSSelectorFilter implements MessageFilter
catch (InvalidSelectorException e)
{
// fixme
- // Will have to throw this back to the client... in the future
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ // Is this the correct way of throwing exception
+ throw new AMQInvalidSelectorException(e.getMessage());
}
}
@@ -64,7 +67,7 @@ public class JMSSelectorFilter implements MessageFilter
try
{
boolean match = _matcher.matches(message);
- _logger.info(message + " match(" + match + ") selector:" + _selector);
+ _logger.info(message + " match(" + match + ") selector(" + System.identityHashCode(_selector) + "):" + _selector);
return match;
}
catch (JMSException e)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
index c4c995540d..bf282020ee 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
@@ -21,10 +21,12 @@
package org.apache.qpid.server.handler;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInvalidSelectorException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.framing.BasicConsumeBody;
import org.apache.qpid.framing.BasicConsumeOkBody;
import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.ChannelCloseBody;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.ConsumerTagNotUniqueException;
import org.apache.qpid.server.exchange.ExchangeRegistry;
@@ -32,6 +34,7 @@ import org.apache.qpid.server.protocol.AMQMethodEvent;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.log4j.Logger;
@@ -68,14 +71,14 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
{
AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : queueRegistry.getQueue(body.queue);
- if(queue == null)
+ if (queue == null)
{
_log.info("No queue for '" + body.queue + "'");
}
try
{
- String consumerTag = channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck, body.filter);
- if(!body.nowait)
+ String consumerTag = channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck, body.arguments);
+ if (!body.nowait)
{
session.writeFrame(BasicConsumeOkBody.createAMQFrame(channelId, consumerTag));
}
@@ -83,10 +86,19 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
//now allow queue to start async processing of any backlog of messages
queue.deliverAsync();
}
- catch(ConsumerTagNotUniqueException e)
+ catch (AMQInvalidSelectorException ise)
+ {
+ _log.info("Closing connection due to invalid selector");
+ session.writeFrame(ChannelCloseBody.createAMQFrame(channelId, AMQConstant.INVALID_SELECTOR.getCode(),
+ ise.getMessage(), BasicConsumeBody.CLASS_ID,
+ BasicConsumeBody.METHOD_ID));
+ }
+ catch (ConsumerTagNotUniqueException e)
{
String msg = "Non-unique consumer tag, '" + body.consumerTag + "'";
- session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId, AMQConstant.NOT_ALLOWED.getCode(), msg, BasicConsumeBody.CLASS_ID, BasicConsumeBody.METHOD_ID));
+ session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId, AMQConstant.NOT_ALLOWED.getCode(), msg,
+ BasicConsumeBody.CLASS_ID,
+ BasicConsumeBody.METHOD_ID));
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
new file mode 100644
index 0000000000..5aaf78d6b7
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
@@ -0,0 +1,163 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.ExchangeBoundBody;
+import org.apache.qpid.framing.ExchangeBoundOkBody;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQMethodEvent;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+
+/**
+ * @author Apache Software Foundation
+ */
+public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBoundBody>
+{
+ private static final ExchangeBoundHandler _instance = new ExchangeBoundHandler();
+
+ public static final int OK = 0;
+
+ public static final int EXCHANGE_NOT_FOUND = 1;
+
+ public static final int QUEUE_NOT_FOUND = 2;
+
+ public static final int NO_BINDINGS = 3;
+
+ public static final int QUEUE_NOT_BOUND = 4;
+
+ public static final int NO_QUEUE_BOUND_WITH_RK = 5;
+
+ public static final int SPECIFIC_QUEUE_NOT_BOUND_WITH_RK = 6;
+
+ public static ExchangeBoundHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private ExchangeBoundHandler()
+ {
+ }
+
+ public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
+ ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+ AMQMethodEvent<ExchangeBoundBody> evt) throws AMQException
+ {
+ ExchangeBoundBody body = evt.getMethod();
+
+ String exchangeName = body.exchange;
+ String queueName = body.queue;
+ String routingKey = body.routingKey;
+ if (exchangeName == null)
+ {
+ throw new AMQException("Exchange exchange must not be null");
+ }
+ Exchange exchange = exchangeRegistry.getExchange(exchangeName);
+ AMQFrame response;
+ if (exchange == null)
+ {
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), EXCHANGE_NOT_FOUND,
+ "Exchange " + exchangeName + " not found");
+ }
+ else if (routingKey == null)
+ {
+ if (queueName == null)
+ {
+ if (exchange.hasBindings())
+ {
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK, null);
+ }
+ else
+ {
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), NO_BINDINGS, null);
+ }
+ }
+ else
+ {
+ AMQQueue queue = queueRegistry.getQueue(queueName);
+ if (queue == null)
+ {
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), QUEUE_NOT_FOUND,
+ "Queue " + queueName + " not found");
+ }
+ else
+ {
+ if (exchange.isBound(queue))
+ {
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK, null);
+ }
+ else
+ {
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), QUEUE_NOT_BOUND,
+ "Queue " + queueName + " not bound to exchange " +
+ exchangeName);
+ }
+ }
+ }
+ }
+ else if (queueName != null)
+ {
+ AMQQueue queue = queueRegistry.getQueue(queueName);
+ if (queue == null)
+ {
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), QUEUE_NOT_FOUND,
+ "Queue " + queueName + " not found");
+ }
+ else
+ {
+ if (exchange.isBound(body.routingKey, queue))
+ {
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK,
+ null);
+ }
+ else
+ {
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+ SPECIFIC_QUEUE_NOT_BOUND_WITH_RK,
+ "Queue " + queueName +
+ " not bound with routing key " +
+ body.routingKey + " to exchange " +
+ exchangeName);
+ }
+ }
+ }
+ else
+ {
+ if (exchange.isBound(body.routingKey))
+ {
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK,
+ null);
+ }
+ else
+ {
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+ NO_QUEUE_BOUND_WITH_RK,
+ "No queue bound with routing key " +
+ body.routingKey + " to exchange " +
+ exchangeName);
+ }
+ }
+ protocolSession.writeFrame(response);
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index 1aa62dbfa4..c38f7f630b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -40,9 +40,6 @@ import org.apache.qpid.framing.ProtocolVersionList;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.management.AMQManagedObject;
-import org.apache.qpid.server.management.MBeanConstructor;
-import org.apache.qpid.server.management.MBeanDescription;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.queue.QueueRegistry;
@@ -50,24 +47,12 @@ import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import javax.management.JMException;
-import javax.management.MBeanException;
-import javax.management.MBeanNotificationInfo;
-import javax.management.Notification;
-import javax.management.monitor.MonitorNotification;
-import javax.management.openmbean.CompositeData;
-import javax.management.openmbean.CompositeDataSupport;
-import javax.management.openmbean.CompositeType;
-import javax.management.openmbean.OpenDataException;
-import javax.management.openmbean.OpenType;
-import javax.management.openmbean.SimpleType;
-import javax.management.openmbean.TabularData;
-import javax.management.openmbean.TabularDataSupport;
-import javax.management.openmbean.TabularType;
import javax.security.sasl.SaslServer;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
-import java.util.Date;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArraySet;
@@ -93,7 +78,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
private AMQCodecFactory _codecFactory;
- private ManagedAMQProtocolSession _managedObject;
+ private AMQProtocolSessionMBean _managedObject;
private SaslServer _saslServer;
@@ -102,11 +87,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
private Object _lastSent;
private boolean _closed;
-
+ // maximum number of channels this session should have
private long _maxNoOfChannels = 1000;
/* AMQP Version for this session */
-
private byte _major;
private byte _minor;
@@ -115,190 +99,6 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
return _managedObject;
}
- /**
- * This class implements the management interface (is an MBean). In order to
- * make more attributes, operations and notifications available over JMX simply
- * augment the ManagedConnection interface and add the appropriate implementation here.
- */
- @MBeanDescription("Management Bean for an AMQ Broker Connection")
- private final class ManagedAMQProtocolSession extends AMQManagedObject implements ManagedConnection
- {
- private String _name = null;
- //openmbean data types for representing the channel attributes
- private String[] _channelAtttibuteNames = { "Channel Id", "Transactional", "Default Queue", "Unacknowledged Message Count"};
- private String[] _indexNames = {_channelAtttibuteNames[0]};
- private OpenType[] _channelAttributeTypes = {SimpleType.INTEGER, SimpleType.BOOLEAN, SimpleType.STRING, SimpleType.INTEGER};
- private CompositeType _channelType = null; // represents the data type for channel data
- private TabularType _channelsType = null; // Data type for list of channels type
- private TabularDataSupport _channelsList = null;
-
- @MBeanConstructor("Creates an MBean exposing an AMQ Broker Connection")
- public ManagedAMQProtocolSession() throws JMException
- {
- super(ManagedConnection.class, ManagedConnection.TYPE);
- init();
- }
-
- /**
- * initialises the openmbean data types
- */
- private void init() throws OpenDataException
- {
- String remote = getRemoteAddress();
- remote = "anonymous".equals(remote) ? remote + hashCode() : remote;
- _name = jmxEncode(new StringBuffer(remote), 0).toString();
- _channelType = new CompositeType("Channel", "Channel Details", _channelAtttibuteNames,
- _channelAtttibuteNames, _channelAttributeTypes);
- _channelsType = new TabularType("Channels", "Channels", _channelType, _indexNames);
- }
-
- public Date getLastIoTime()
- {
- return new Date(_minaProtocolSession.getLastIoTime());
- }
-
- public String getRemoteAddress()
- {
- return _minaProtocolSession.getRemoteAddress().toString();
- }
-
- public Long getWrittenBytes()
- {
- return _minaProtocolSession.getWrittenBytes();
- }
-
- public Long getReadBytes()
- {
- return _minaProtocolSession.getReadBytes();
- }
-
- public Long getMaximumNumberOfChannels()
- {
- return _maxNoOfChannels;
- }
-
- public void setMaximumNumberOfChannels(Long value)
- {
- _maxNoOfChannels = value;
- }
-
- public String getObjectInstanceName()
- {
- return _name;
- }
-
- public void commitTransactions(int channelId) throws JMException
- {
- try
- {
- AMQChannel channel = _channelMap.get(channelId);
- if (channel == null)
- {
- throw new JMException("The channel (channel Id = " + channelId + ") does not exist");
- }
- if (channel.isTransactional())
- {
- channel.commit();
- }
- }
- catch(AMQException ex)
- {
- throw new MBeanException(ex, ex.toString());
- }
- }
-
- public void rollbackTransactions(int channelId) throws JMException
- {
- try
- {
- AMQChannel channel = _channelMap.get(channelId);
- if (channel == null)
- {
- throw new JMException("The channel (channel Id = " + channelId + ") does not exist");
- }
- if (channel.isTransactional())
- {
- channel.rollback();
- }
- }
- catch(AMQException ex)
- {
- throw new MBeanException(ex, ex.toString());
- }
- }
-
- /**
- * Creates the list of channels in tabular form from the _channelMap.
- * @return list of channels in tabular form.
- * @throws OpenDataException
- */
- public TabularData channels() throws OpenDataException
- {
- _channelsList = new TabularDataSupport(_channelsType);
-
- for (Map.Entry<Integer, AMQChannel> entry : _channelMap.entrySet())
- {
- AMQChannel channel = entry.getValue();
- Object[] itemValues = {channel.getChannelId(), channel.isTransactional(),
- (channel.getDefaultQueue() != null) ? channel.getDefaultQueue().getName() : null,
- channel.getUnacknowledgedMessageMap().size()};
-
- CompositeData channelData = new CompositeDataSupport(_channelType, _channelAtttibuteNames, itemValues);
- _channelsList.put(channelData);
- }
-
- return _channelsList;
- }
-
- public void closeChannel(int id) throws Exception
- {
- try
- {
- AMQMinaProtocolSession.this.closeChannel(id);
- }
- catch (AMQException ex)
- {
- throw new Exception(ex.toString());
- }
- }
-
- public void closeConnection() throws Exception
- {
- try
- {
- AMQMinaProtocolSession.this.closeSession();
- }
- catch (AMQException ex)
- {
- throw new Exception(ex.toString());
- }
- }
-
- @Override
- public MBeanNotificationInfo[] getNotificationInfo()
- {
- String[] notificationTypes = new String[] {MonitorNotification.THRESHOLD_VALUE_EXCEEDED};
- String name = MonitorNotification.class.getName();
- String description = "Channel count has reached threshold value";
- MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description);
-
- return new MBeanNotificationInfo[] {info1};
- }
-
- private void checkForNotification()
- {
- int channelsCount = _channelMap.size();
- if (channelsCount >= getMaximumNumberOfChannels())
- {
- Notification n = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this,
- ++_notificationSequenceNumber, System.currentTimeMillis(),
- "Channel count (" + channelsCount + ") has reached the threshold value");
-
- _broadcaster.sendNotification(n);
- }
- }
-
- } // End of MBean class
public AMQMinaProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry,
AMQCodecFactory codecFactory)
@@ -322,11 +122,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
_managedObject.register();
}
- private ManagedAMQProtocolSession createMBean() throws AMQException
+ private AMQProtocolSessionMBean createMBean() throws AMQException
{
try
{
- return new ManagedAMQProtocolSession();
+ return new AMQProtocolSessionMBean(this);
}
catch(JMException ex)
{
@@ -335,6 +135,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
}
}
+ public IoSession getIOSession()
+ {
+ return _minaProtocolSession;
+ }
+
public static AMQProtocolSession getAMQProtocolSession(IoSession minaProtocolSession)
{
return (AMQProtocolSession) minaProtocolSession.getAttachment();
@@ -495,6 +300,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
_contextKey = contextKey;
}
+ public List<AMQChannel> getChannels()
+ {
+ return new ArrayList<AMQChannel>(_channelMap.values());
+ }
+
public AMQChannel getChannel(int channelId) throws AMQException
{
return _channelMap.get(channelId);
@@ -503,7 +313,42 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
public void addChannel(AMQChannel channel)
{
_channelMap.put(channel.getChannelId(), channel);
- _managedObject.checkForNotification();
+ checkForNotification();
+ }
+
+ private void checkForNotification()
+ {
+ int channelsCount = _channelMap.size();
+ if (channelsCount >= _maxNoOfChannels)
+ {
+ _managedObject.notifyClients("Channel count (" + channelsCount + ") has reached the threshold value");
+ }
+ }
+
+ public Long getMaximumNumberOfChannels()
+ {
+ return _maxNoOfChannels;
+ }
+
+ public void setMaximumNumberOfChannels(Long value)
+ {
+ _maxNoOfChannels = value;
+ }
+
+ public void commitTransactions(AMQChannel channel) throws AMQException
+ {
+ if (channel != null && channel.isTransactional())
+ {
+ channel.commit();
+ }
+ }
+
+ public void rollbackTransactions(AMQChannel channel) throws AMQException
+ {
+ if (channel != null && channel.isTransactional())
+ {
+ channel.rollback();
+ }
}
/**
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
new file mode 100644
index 0000000000..a47d462810
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
@@ -0,0 +1,241 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.management.AMQManagedObject;
+import org.apache.qpid.server.management.MBeanConstructor;
+import org.apache.qpid.server.management.MBeanDescription;
+
+import javax.management.JMException;
+import javax.management.MBeanException;
+import javax.management.MBeanNotificationInfo;
+import javax.management.Notification;
+import javax.management.monitor.MonitorNotification;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.OpenType;
+import javax.management.openmbean.SimpleType;
+import javax.management.openmbean.TabularData;
+import javax.management.openmbean.TabularDataSupport;
+import javax.management.openmbean.TabularType;
+import java.util.Date;
+import java.util.List;
+
+/**
+ * This MBean class implements the management interface. In order to make more attributes, operations and notifications
+ * available over JMX simply augment the ManagedConnection interface and add the appropriate implementation here.
+ */
+@MBeanDescription("Management Bean for an AMQ Broker Connection")
+public class AMQProtocolSessionMBean extends AMQManagedObject implements ManagedConnection
+{
+ private AMQMinaProtocolSession _session = null;
+ private String _name = null;
+ //openmbean data types for representing the channel attributes
+ private String[] _channelAtttibuteNames = {"Channel Id", "Transactional", "Default Queue", "Unacknowledged Message Count"};
+ private String[] _indexNames = {_channelAtttibuteNames[0]};
+ private OpenType[] _channelAttributeTypes = {SimpleType.INTEGER, SimpleType.BOOLEAN, SimpleType.STRING, SimpleType.INTEGER};
+ private CompositeType _channelType = null; // represents the data type for channel data
+ private TabularType _channelsType = null; // Data type for list of channels type
+
+ @MBeanConstructor("Creates an MBean exposing an AMQ Broker Connection")
+ public AMQProtocolSessionMBean(AMQMinaProtocolSession session) throws JMException
+ {
+ super(ManagedConnection.class, ManagedConnection.TYPE);
+ _session = session;
+ init();
+ }
+
+ /**
+ * initialises the openmbean data types
+ */
+ private void init() throws OpenDataException
+ {
+ String remote = getRemoteAddress();
+ remote = "anonymous".equals(remote) ? remote + hashCode() : remote;
+ _name = jmxEncode(new StringBuffer(remote), 0).toString();
+ _channelType = new CompositeType("Channel", "Channel Details", _channelAtttibuteNames,
+ _channelAtttibuteNames, _channelAttributeTypes);
+ _channelsType = new TabularType("Channels", "Channels", _channelType, _indexNames);
+ }
+
+ public Date getLastIoTime()
+ {
+ return new Date(_session.getIOSession().getLastIoTime());
+ }
+
+ public String getRemoteAddress()
+ {
+ return _session.getIOSession().getRemoteAddress().toString();
+ }
+
+ public Long getWrittenBytes()
+ {
+ return _session.getIOSession().getWrittenBytes();
+ }
+
+ public Long getReadBytes()
+ {
+ return _session.getIOSession().getReadBytes();
+ }
+
+ public Long getMaximumNumberOfChannels()
+ {
+ return _session.getMaximumNumberOfChannels();
+ }
+
+ public void setMaximumNumberOfChannels(Long value)
+ {
+ _session.setMaximumNumberOfChannels(value);
+ }
+
+ public String getObjectInstanceName()
+ {
+ return _name;
+ }
+
+ /**
+ * commits transactions for a transactional channel
+ *
+ * @param channelId
+ * @throws JMException if channel with given id doesn't exist or if commit fails
+ */
+ public void commitTransactions(int channelId) throws JMException
+ {
+ try
+ {
+ AMQChannel channel = _session.getChannel(channelId);
+ if (channel == null)
+ {
+ throw new JMException("The channel (channel Id = " + channelId + ") does not exist");
+ }
+ _session.commitTransactions(channel);
+ }
+ catch (AMQException ex)
+ {
+ throw new MBeanException(ex, ex.toString());
+ }
+ }
+
+ /**
+ * rollsback the transactions for a transactional channel
+ *
+ * @param channelId
+ * @throws JMException if channel with given id doesn't exist or if rollback fails
+ */
+ public void rollbackTransactions(int channelId) throws JMException
+ {
+ try
+ {
+ AMQChannel channel = _session.getChannel(channelId);
+ if (channel == null)
+ {
+ throw new JMException("The channel (channel Id = " + channelId + ") does not exist");
+ }
+ _session.rollbackTransactions(channel);
+ }
+ catch (AMQException ex)
+ {
+ throw new MBeanException(ex, ex.toString());
+ }
+ }
+
+ /**
+ * Creates the list of channels in tabular form from the _channelMap.
+ *
+ * @return list of channels in tabular form.
+ * @throws OpenDataException
+ */
+ public TabularData channels() throws OpenDataException
+ {
+ TabularDataSupport channelsList = new TabularDataSupport(_channelsType);
+ List<AMQChannel> list = _session.getChannels();
+
+ for (AMQChannel channel : list)
+ {
+ Object[] itemValues = {channel.getChannelId(), channel.isTransactional(),
+ (channel.getDefaultQueue() != null) ? channel.getDefaultQueue().getName() : null,
+ channel.getUnacknowledgedMessageMap().size()};
+
+ CompositeData channelData = new CompositeDataSupport(_channelType, _channelAtttibuteNames, itemValues);
+ channelsList.put(channelData);
+ }
+
+ return channelsList;
+ }
+
+ /**
+ * @see AMQMinaProtocolSession#closeChannel(int)
+ */
+ public void closeChannel(int id) throws JMException
+ {
+ try
+ {
+ AMQChannel channel = _session.getChannel(id);
+ if (channel == null)
+ {
+ throw new JMException("The channel (channel Id = " + id + ") does not exist");
+ }
+
+ _session.closeChannel(id);
+ }
+ catch (AMQException ex)
+ {
+ throw new MBeanException(ex, ex.toString());
+ }
+ }
+
+ /**
+ * closes the connection. The administrator can use this management operation to close connection to free up
+ * resources.
+ * @throws JMException
+ */
+ public void closeConnection() throws JMException
+ {
+ try
+ {
+ _session.closeSession();
+ }
+ catch (AMQException ex)
+ {
+ throw new MBeanException(ex, ex.toString());
+ }
+ }
+
+ @Override
+ public MBeanNotificationInfo[] getNotificationInfo()
+ {
+ String[] notificationTypes = new String[]{MonitorNotification.THRESHOLD_VALUE_EXCEEDED};
+ String name = MonitorNotification.class.getName();
+ String description = "Channel count has reached threshold value";
+ MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description);
+
+ return new MBeanNotificationInfo[]{info1};
+ }
+
+ public void notifyClients(String notificationMsg)
+ {
+ Notification n = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this,
+ ++_notificationSequenceNumber, System.currentTimeMillis(), notificationMsg);
+ _broadcaster.sendNotification(n);
+ }
+
+} // End of MBean class
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 3d08805cab..7fff7b9adf 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -21,16 +21,9 @@
package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
-import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.management.AMQManagedObject;
-import org.apache.qpid.server.management.MBeanConstructor;
-import org.apache.qpid.server.management.MBeanDescription;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -38,15 +31,8 @@ import org.apache.qpid.server.txn.TxnBuffer;
import org.apache.qpid.server.txn.TxnOp;
import javax.management.JMException;
-import javax.management.MBeanException;
-import javax.management.MBeanNotificationInfo;
-import javax.management.Notification;
-import javax.management.monitor.MonitorNotification;
-import javax.management.openmbean.*;
import java.text.MessageFormat;
-import java.util.ArrayList;
import java.util.List;
-import java.util.Properties;
import java.util.concurrent.Executor;
/**
@@ -110,7 +96,7 @@ public class AMQQueue implements Managable, Comparable
* max allowed number of messages on a queue.
*/
private Integer _maxMessageCount = 10000;
-
+
/**
* max queue depth(KB) for the queue
*/
@@ -126,322 +112,6 @@ public class AMQQueue implements Managable, Comparable
return _name.compareTo(((AMQQueue) o).getName());
}
- /**
- * MBean class for AMQQueue. It implements all the management features exposed
- * for an AMQQueue.
- */
- @MBeanDescription("Management Interface for AMQQueue")
- private final class AMQQueueMBean extends AMQManagedObject implements ManagedQueue
- {
- private String _queueName = null;
- // OpenMBean data types for viewMessages method
- private String[] _msgAttributeNames = {"Message Id", "Header", "Size(bytes)", "Redelivered"};
- private String[] _msgAttributeIndex = {_msgAttributeNames[0]};
- private OpenType[] _msgAttributeTypes = new OpenType[4]; // AMQ message attribute types.
- private CompositeType _messageDataType = null; // Composite type for representing AMQ Message data.
- private TabularType _messagelistDataType = null; // Datatype for representing AMQ messages list.
-
- // OpenMBean data types for viewMessageContent method
- private CompositeType _msgContentType = null;
- private String[] _msgContentAttributes = {"Message Id", "MimeType", "Encoding", "Content"};
- private OpenType[] _msgContentAttributeTypes = new OpenType[4];
-
- @MBeanConstructor("Creates an MBean exposing an AMQQueue")
- public AMQQueueMBean() throws JMException
- {
- super(ManagedQueue.class, ManagedQueue.TYPE);
- init();
- }
-
- /**
- * initialises the openmbean data types
- */
- private void init() throws OpenDataException
- {
- _queueName = jmxEncode(new StringBuffer(_name), 0).toString();
- _msgContentAttributeTypes[0] = SimpleType.LONG; // For message id
- _msgContentAttributeTypes[1] = SimpleType.STRING; // For MimeType
- _msgContentAttributeTypes[2] = SimpleType.STRING; // For Encoding
- _msgContentAttributeTypes[3] = new ArrayType(1, SimpleType.BYTE); // For message content
- _msgContentType = new CompositeType("Message Content", "AMQ Message Content", _msgContentAttributes,
- _msgContentAttributes, _msgContentAttributeTypes);
-
- _msgAttributeTypes[0] = SimpleType.LONG; // For message id
- _msgAttributeTypes[1] = new ArrayType(1, SimpleType.STRING); // For header attributes
- _msgAttributeTypes[2] = SimpleType.LONG; // For size
- _msgAttributeTypes[3] = SimpleType.BOOLEAN; // For redelivered
-
- _messageDataType = new CompositeType("Message", "AMQ Message", _msgAttributeNames, _msgAttributeNames, _msgAttributeTypes);
- _messagelistDataType = new TabularType("Messages", "List of messages", _messageDataType, _msgAttributeIndex);
- }
-
- public String getObjectInstanceName()
- {
- return _queueName;
- }
-
- public String getName()
- {
- return _name;
- }
-
- public boolean isDurable()
- {
- return _durable;
- }
-
- public String getOwner()
- {
- return _owner;
- }
-
- public boolean isAutoDelete()
- {
- return _autoDelete;
- }
-
- public Integer getMessageCount()
- {
- return _deliveryMgr.getQueueMessageCount();
- }
-
- public Long getMaximumMessageSize()
- {
- return _maxMessageSize;
- }
-
- public void setMaximumMessageSize(Long value)
- {
- _maxMessageSize = value;
- }
-
- public Integer getConsumerCount()
- {
- return _subscribers.size();
- }
-
- public Integer getActiveConsumerCount()
- {
- return _subscribers.getWeight();
- }
-
- public Long getReceivedMessageCount()
- {
- return _totalMessagesReceived;
- }
-
- public Integer getMaximumMessageCount()
- {
- return _maxMessageCount;
- }
-
- public void setMaximumMessageCount(Integer value)
- {
- _maxMessageCount = value;
- }
-
- public Long getMaximumQueueDepth()
- {
- return _maxQueueDepth;
- }
-
- // Sets the queue depth, the max queue size
- public void setMaximumQueueDepth(Long value)
- {
- _maxQueueDepth = value;
- }
-
- /**
- * returns the size of messages(KB) in the queue.
- */
- public Long getQueueDepth()
- {
- List<AMQMessage> list = _deliveryMgr.getMessages();
- if (list.size() == 0)
- {
- return 0l;
- }
-
- long queueDepth = 0;
- for (AMQMessage message : list)
- {
- queueDepth = queueDepth + getMessageSize(message);
- }
- return (long) Math.round(queueDepth / 1000);
- }
-
- /**
- * returns size of message in bytes
- */
- private long getMessageSize(AMQMessage msg)
- {
- if (msg == null)
- {
- return 0l;
- }
-
- return msg.getContentHeaderBody().bodySize;
- }
-
- /**
- * Checks if there is any notification to be send to the listeners
- */
- private void checkForNotification(AMQMessage msg)
- {
- // Check for threshold message count
- Integer msgCount = getMessageCount();
- if (msgCount >= getMaximumMessageCount())
- {
- notifyClients("Message count(" + msgCount + ") has reached or exceeded the threshold high value");
- }
-
- // Check for threshold message size
- long messageSize = getMessageSize(msg);
- if (messageSize >= _maxMessageSize)
- {
- notifyClients("Message size(ID=" + msg.getMessageId() + ", size=" + messageSize + " bytes) is higher than the threshold value");
- }
-
- // Check for threshold queue depth in bytes
- long queueDepth = getQueueDepth();
- if (queueDepth >= _maxQueueDepth)
- {
- notifyClients("Queue depth(" + queueDepth + "), Queue size has reached the threshold high value");
- }
- }
-
- /**
- * Sends the notification to the listeners
- */
- private void notifyClients(String notificationMsg)
- {
- Notification n = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this,
- ++_notificationSequenceNumber, System.currentTimeMillis(), notificationMsg);
-
- _broadcaster.sendNotification(n);
- }
-
- public void deleteMessageFromTop() throws JMException
- {
- try
- {
- _deliveryMgr.removeAMessageFromTop();
- }
- catch (AMQException ex)
- {
- throw new MBeanException(ex, ex.toString());
- }
- }
-
- public void clearQueue() throws JMException
- {
- try
- {
- _deliveryMgr.clearAllMessages();
- }
- catch (AMQException ex)
- {
- throw new MBeanException(ex, ex.toString());
- }
- }
-
- /**
- * returns message content as byte array and related attributes for the given message id.
- */
- public CompositeData viewMessageContent(long msgId) throws JMException
- {
- List<AMQMessage> list = _deliveryMgr.getMessages();
- AMQMessage msg = null;
- for (AMQMessage message : list)
- {
- if (message.getMessageId() == msgId)
- {
- msg = message;
- break;
- }
- }
-
- if (msg == null)
- {
- throw new JMException("AMQMessage with message id = " + msgId + " is not in the " + _queueName);
- }
- // get message content
- List<ContentBody> cBodies = msg.getContentBodies();
- List<Byte> msgContent = new ArrayList<Byte>();
- for (ContentBody body : cBodies)
- {
- if (body.getSize() != 0)
- {
- ByteBuffer slice = body.payload.slice();
- for (int j = 0; j < slice.limit(); j++)
- {
- msgContent.add(slice.get());
- }
- }
- }
-
- // Create header attributes list
- BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) msg.getContentHeaderBody().properties;
- String mimeType = headerProperties.getContentType();
- String encoding = headerProperties.getEncoding() == null ? "" : headerProperties.getEncoding();
- Object[] itemValues = {msgId, mimeType, encoding, msgContent.toArray(new Byte[0])};
-
- return new CompositeDataSupport(_msgContentType, _msgContentAttributes, itemValues);
- }
-
- /**
- * Returns the header contents of the messages stored in this queue in tabular form.
- */
- public TabularData viewMessages(int beginIndex, int endIndex) throws JMException
- {
- if ((beginIndex > endIndex) || (beginIndex < 1))
- {
- throw new JMException("From Index = " + beginIndex + ", To Index = " + endIndex +
- "\nFrom Index should be greater than 0 and less than To Index");
- }
-
- List<AMQMessage> list = _deliveryMgr.getMessages();
- TabularDataSupport _messageList = new TabularDataSupport(_messagelistDataType);
-
- // Create the tabular list of message header contents
- for (int i = beginIndex; i <= endIndex && i <= list.size(); i++)
- {
- AMQMessage msg = list.get(i - 1);
- ContentHeaderBody headerBody = msg.getContentHeaderBody();
- // Create header attributes list
- BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) headerBody.properties;
- List<String> headerAttribsList = new ArrayList<String>();
- headerAttribsList.add("App Id=" + headerProperties.getAppId());
- headerAttribsList.add("MimeType=" + headerProperties.getContentType());
- headerAttribsList.add("Correlation Id=" + headerProperties.getCorrelationId());
- headerAttribsList.add("Encoding=" + headerProperties.getEncoding());
- headerAttribsList.add(headerProperties.toString());
-
- Object[] itemValues = {msg.getMessageId(), headerAttribsList.toArray(new String[0]),
- headerBody.bodySize, msg.isRedelivered()};
- CompositeData messageData = new CompositeDataSupport(_messageDataType, _msgAttributeNames, itemValues);
- _messageList.put(messageData);
- }
-
- return _messageList;
- }
-
- /**
- * returns Notifications sent by this MBean.
- */
- @Override
- public MBeanNotificationInfo[] getNotificationInfo()
- {
- String[] notificationTypes = new String[]{MonitorNotification.THRESHOLD_VALUE_EXCEEDED};
- String name = MonitorNotification.class.getName();
- String description = "Either Message count or Queue depth or Message size has reached threshold high value";
- MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description);
-
- return new MBeanNotificationInfo[]{info1};
- }
-
- } // End of AMQMBean class
-
public AMQQueue(String name, boolean durable, String owner,
boolean autoDelete, QueueRegistry queueRegistry)
throws AMQException
@@ -523,33 +193,32 @@ public class AMQQueue implements Managable, Comparable
{
if (System.getProperties().getProperty("deliverymanager").equals("ConcurrentSelectorDeliveryManager"))
{
- _logger.warn("Using ConcurrentSelectorDeliveryManager");
+ _logger.info("Using ConcurrentSelectorDeliveryManager");
_deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
}
else if (System.getProperties().getProperty("deliverymanager").equals("ConcurrentDeliveryManager"))
{
- _logger.warn("Using ConcurrentDeliveryManager");
+ _logger.info("Using ConcurrentDeliveryManager");
_deliveryMgr = new ConcurrentDeliveryManager(_subscribers, this);
}
else
{
- _logger.warn("Using SynchronizedDeliveryManager");
+ _logger.info("Using SynchronizedDeliveryManager");
_deliveryMgr = new SynchronizedDeliveryManager(_subscribers, this);
}
}
else
{
- _logger.warn("Using SynchronizedDeliveryManager");
+ _logger.info("Using SynchronizedDeliveryManager");
_deliveryMgr = new SynchronizedDeliveryManager(_subscribers, this);
}
-
}
private AMQQueueMBean createMBean() throws AMQException
{
try
{
- return new AMQQueueMBean();
+ return new AMQQueueMBean(this);
}
catch (JMException ex)
{
@@ -582,16 +251,112 @@ public class AMQQueue implements Managable, Comparable
return _autoDelete;
}
+ /**
+ * @return no of messages(undelivered) on the queue.
+ */
public int getMessageCount()
{
return _deliveryMgr.getQueueMessageCount();
}
+ /**
+ * @return List of messages(undelivered) on the queue.
+ */
+ public List<AMQMessage> getMessagesOnTheQueue()
+ {
+ return _deliveryMgr.getMessages();
+ }
+
+ /**
+ * @param messageId
+ * @return AMQMessage with give id if exists. null if AMQMessage with given id doesn't exist.
+ */
+ public AMQMessage getMessageOnTheQueue(long messageId)
+ {
+ List<AMQMessage> list = getMessagesOnTheQueue();
+ AMQMessage msg = null;
+ for (AMQMessage message : list)
+ {
+ if (message.getMessageId() == messageId)
+ {
+ msg = message;
+ break;
+ }
+ }
+
+ return msg;
+ }
+
+ /**
+ * @return MBean object associated with this Queue
+ */
public ManagedObject getManagedObject()
{
return _managedObject;
}
+ public Long getMaximumMessageSize()
+ {
+ return _maxMessageSize;
+ }
+
+ public void setMaximumMessageSize(Long value)
+ {
+ _maxMessageSize = value;
+ }
+
+ public Integer getConsumerCount()
+ {
+ return _subscribers.size();
+ }
+
+ public Integer getActiveConsumerCount()
+ {
+ return _subscribers.getWeight();
+ }
+
+ public Long getReceivedMessageCount()
+ {
+ return _totalMessagesReceived;
+ }
+
+ public Integer getMaximumMessageCount()
+ {
+ return _maxMessageCount;
+ }
+
+ public void setMaximumMessageCount(Integer value)
+ {
+ _maxMessageCount = value;
+ }
+
+ public Long getMaximumQueueDepth()
+ {
+ return _maxQueueDepth;
+ }
+
+ // Sets the queue depth, the max queue size
+ public void setMaximumQueueDepth(Long value)
+ {
+ _maxQueueDepth = value;
+ }
+
+ /**
+ * Removes the AMQMessage from the top of the queue.
+ */
+ public void deleteMessageFromTop() throws AMQException
+ {
+ _deliveryMgr.removeAMessageFromTop();
+ }
+
+ /**
+ * removes all the messages from the queue.
+ */
+ public void clearQueue() throws AMQException
+ {
+ _deliveryMgr.clearAllMessages();
+ }
+
public void bind(String routingKey, Exchange exchange)
{
_bindings.addBinding(routingKey, exchange);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
new file mode 100644
index 0000000000..54dd366d71
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
@@ -0,0 +1,343 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.management.MBeanDescription;
+import org.apache.qpid.server.management.AMQManagedObject;
+import org.apache.qpid.server.management.MBeanConstructor;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.mina.common.ByteBuffer;
+
+import javax.management.openmbean.*;
+import javax.management.JMException;
+import javax.management.Notification;
+import javax.management.MBeanException;
+import javax.management.MBeanNotificationInfo;
+import javax.management.OperationsException;
+import javax.management.monitor.MonitorNotification;
+import java.util.List;
+import java.util.ArrayList;
+
+/**
+ * MBean class for AMQQueue. It implements all the management features exposed
+ * for an AMQQueue.
+ */
+@MBeanDescription("Management Interface for AMQQueue")
+public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue
+{
+ private AMQQueue _queue = null;
+ private String _queueName = null;
+ // OpenMBean data types for viewMessages method
+ private String[] _msgAttributeNames = {"Message Id", "Header", "Size(bytes)", "Redelivered"};
+ private String[] _msgAttributeIndex = {_msgAttributeNames[0]};
+ private OpenType[] _msgAttributeTypes = new OpenType[4]; // AMQ message attribute types.
+ private CompositeType _messageDataType = null; // Composite type for representing AMQ Message data.
+ private TabularType _messagelistDataType = null; // Datatype for representing AMQ messages list.
+
+ // OpenMBean data types for viewMessageContent method
+ private CompositeType _msgContentType = null;
+ private String[] _msgContentAttributes = {"Message Id", "MimeType", "Encoding", "Content"};
+ private OpenType[] _msgContentAttributeTypes = new OpenType[4];
+
+ @MBeanConstructor("Creates an MBean exposing an AMQQueue")
+ public AMQQueueMBean(AMQQueue queue) throws JMException
+ {
+ super(ManagedQueue.class, ManagedQueue.TYPE);
+ _queue = queue;
+ _queueName = jmxEncode(new StringBuffer(queue.getName()), 0).toString();
+ init();
+ }
+
+ /**
+ * initialises the openmbean data types
+ */
+ private void init() throws OpenDataException
+ {
+ _msgContentAttributeTypes[0] = SimpleType.LONG; // For message id
+ _msgContentAttributeTypes[1] = SimpleType.STRING; // For MimeType
+ _msgContentAttributeTypes[2] = SimpleType.STRING; // For Encoding
+ _msgContentAttributeTypes[3] = new ArrayType(1, SimpleType.BYTE); // For message content
+ _msgContentType = new CompositeType("Message Content", "AMQ Message Content", _msgContentAttributes,
+ _msgContentAttributes, _msgContentAttributeTypes);
+
+ _msgAttributeTypes[0] = SimpleType.LONG; // For message id
+ _msgAttributeTypes[1] = new ArrayType(1, SimpleType.STRING); // For header attributes
+ _msgAttributeTypes[2] = SimpleType.LONG; // For size
+ _msgAttributeTypes[3] = SimpleType.BOOLEAN; // For redelivered
+
+ _messageDataType = new CompositeType("Message", "AMQ Message", _msgAttributeNames, _msgAttributeNames, _msgAttributeTypes);
+ _messagelistDataType = new TabularType("Messages", "List of messages", _messageDataType, _msgAttributeIndex);
+ }
+
+ public String getObjectInstanceName()
+ {
+ return _queueName;
+ }
+
+ public String getName()
+ {
+ return _queueName;
+ }
+
+ public boolean isDurable()
+ {
+ return _queue.isDurable();
+ }
+
+ public String getOwner()
+ {
+ return _queue.getOwner();
+ }
+
+ public boolean isAutoDelete()
+ {
+ return _queue.isAutoDelete();
+ }
+
+ public Integer getMessageCount()
+ {
+ return _queue.getMessageCount();
+ }
+
+ public Long getMaximumMessageSize()
+ {
+ return _queue.getMaximumMessageSize();
+ }
+
+ public void setMaximumMessageSize(Long value)
+ {
+ _queue.setMaximumMessageSize(value);
+ }
+
+ public Integer getConsumerCount()
+ {
+ return _queue.getConsumerCount();
+ }
+
+ public Integer getActiveConsumerCount()
+ {
+ return _queue.getActiveConsumerCount();
+ }
+
+ public Long getReceivedMessageCount()
+ {
+ return _queue.getReceivedMessageCount();
+ }
+
+ public Integer getMaximumMessageCount()
+ {
+ return _queue.getMaximumMessageCount();
+ }
+
+ public void setMaximumMessageCount(Integer value)
+ {
+ _queue.setMaximumMessageCount(value);
+ }
+
+ public Long getMaximumQueueDepth()
+ {
+ return _queue.getMaximumQueueDepth();
+ }
+
+ public void setMaximumQueueDepth(Long value)
+ {
+ _queue.setMaximumQueueDepth(value);
+ }
+
+ /**
+ * returns the size of messages(KB) in the queue.
+ */
+ public Long getQueueDepth()
+ {
+ List<AMQMessage> list = _queue.getMessagesOnTheQueue();
+ if (list.size() == 0)
+ {
+ return 0l;
+ }
+
+ long queueDepth = 0;
+ for (AMQMessage message : list)
+ {
+ queueDepth = queueDepth + getMessageSize(message);
+ }
+ return (long) Math.round(queueDepth / 1000);
+ }
+
+ /**
+ * returns size of message in bytes
+ */
+ private long getMessageSize(AMQMessage msg)
+ {
+ if (msg == null)
+ {
+ return 0l;
+ }
+
+ return msg.getContentHeaderBody().bodySize;
+ }
+
+ /**
+ * Checks if there is any notification to be send to the listeners
+ */
+ public void checkForNotification(AMQMessage msg)
+ {
+ // Check for threshold message count
+ Integer msgCount = getMessageCount();
+ if (msgCount >= getMaximumMessageCount())
+ {
+ notifyClients("Message count(" + msgCount + ") has reached or exceeded the threshold high value");
+ }
+
+ // Check for threshold message size
+ long messageSize = getMessageSize(msg);
+ if (messageSize >= _queue.getMaximumMessageSize())
+ {
+ notifyClients("Message size(ID=" + msg.getMessageId() + ", size=" + messageSize + " bytes) is higher than the threshold value");
+ }
+
+ // Check for threshold queue depth in bytes
+ long queueDepth = getQueueDepth();
+ if (queueDepth >= _queue.getMaximumQueueDepth())
+ {
+ notifyClients("Queue depth(" + queueDepth + "), Queue size has reached the threshold high value");
+ }
+ }
+
+ /**
+ * Sends the notification to the listeners
+ */
+ private void notifyClients(String notificationMsg)
+ {
+ Notification n = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this,
+ ++_notificationSequenceNumber, System.currentTimeMillis(), notificationMsg);
+
+ _broadcaster.sendNotification(n);
+ }
+
+ /**
+ * @see org.apache.qpid.server.queue.AMQQueue#deleteMessageFromTop()
+ */
+ public void deleteMessageFromTop() throws JMException
+ {
+ try
+ {
+ _queue.deleteMessageFromTop();
+ }
+ catch (AMQException ex)
+ {
+ throw new MBeanException(ex, ex.toString());
+ }
+ }
+
+ /**
+ * @see org.apache.qpid.server.queue.AMQQueue#clearQueue()
+ */
+ public void clearQueue() throws JMException
+ {
+ try
+ {
+ _queue.clearQueue();
+ }
+ catch (AMQException ex)
+ {
+ throw new MBeanException(ex, ex.toString());
+ }
+ }
+
+ /**
+ * returns message content as byte array and related attributes for the given message id.
+ */
+ public CompositeData viewMessageContent(long msgId) throws JMException
+ {
+ AMQMessage msg = _queue.getMessageOnTheQueue(msgId);
+ if (msg == null)
+ {
+ throw new OperationsException("AMQMessage with message id = " + msgId + " is not in the " + _queueName);
+ }
+ // get message content
+ List<ContentBody> cBodies = msg.getContentBodies();
+ List<Byte> msgContent = new ArrayList<Byte>();
+ for (ContentBody body : cBodies)
+ {
+ if (body.getSize() != 0)
+ {
+ ByteBuffer slice = body.payload.slice();
+ for (int j = 0; j < slice.limit(); j++)
+ {
+ msgContent.add(slice.get());
+ }
+ }
+ }
+
+ // Create header attributes list
+ BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) msg.getContentHeaderBody().properties;
+ String mimeType = headerProperties.getContentType();
+ String encoding = headerProperties.getEncoding() == null ? "" : headerProperties.getEncoding();
+ Object[] itemValues = {msgId, mimeType, encoding, msgContent.toArray(new Byte[0])};
+
+ return new CompositeDataSupport(_msgContentType, _msgContentAttributes, itemValues);
+ }
+
+ /**
+ * Returns the header contents of the messages stored in this queue in tabular form.
+ */
+ public TabularData viewMessages(int beginIndex, int endIndex) throws JMException
+ {
+ if ((beginIndex > endIndex) || (beginIndex < 1))
+ {
+ throw new OperationsException("From Index = " + beginIndex + ", To Index = " + endIndex +
+ "\n\"From Index\" should be greater than 0 and less than \"To Index\"");
+ }
+
+ List<AMQMessage> list = _queue.getMessagesOnTheQueue();
+ TabularDataSupport _messageList = new TabularDataSupport(_messagelistDataType);
+
+ // Create the tabular list of message header contents
+ for (int i = beginIndex; i <= endIndex && i <= list.size(); i++)
+ {
+ AMQMessage msg = list.get(i - 1);
+ ContentHeaderBody headerBody = msg.getContentHeaderBody();
+ // Create header attributes list
+ BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) headerBody.properties;
+ String[] headerAttributes = headerProperties.toString().split(",");
+ Object[] itemValues = {msg.getMessageId(), headerAttributes, headerBody.bodySize, msg.isRedelivered()};
+ CompositeData messageData = new CompositeDataSupport(_messageDataType, _msgAttributeNames, itemValues);
+ _messageList.put(messageData);
+ }
+
+ return _messageList;
+ }
+
+ /**
+ * returns Notifications sent by this MBean.
+ */
+ @Override
+ public MBeanNotificationInfo[] getNotificationInfo()
+ {
+ String[] notificationTypes = new String[]{MonitorNotification.THRESHOLD_VALUE_EXCEEDED};
+ String name = MonitorNotification.class.getName();
+ String description = "Either Message count or Queue depth or Message size has reached threshold high value";
+ MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description);
+
+ return new MBeanNotificationInfo[]{info1};
+ }
+
+} // End of AMQMBean class
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java
index dde76e5ba8..f9c8898182 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java
@@ -281,7 +281,12 @@ public class ConcurrentDeliveryManager implements DeliveryManager
//are we already running? if so, don't re-run
if (_processing.compareAndSet(false, true))
{
- executor.execute(asyncDelivery);
+ // Do we need this?
+ // This executor is created via Executors in AsyncDeliveryConfig which only returns a TPE so cast is ok.
+ //if (executor != null && !((ThreadPoolExecutor) executor).isShutdown())
+ {
+ executor.execute(asyncDelivery);
+ }
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
new file mode 100644
index 0000000000..d8bb6e1948
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -0,0 +1,352 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
+import org.apache.qpid.configuration.Configured;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.server.configuration.Configurator;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+
+/**
+ * Manages delivery of messages on behalf of a queue
+ */
+public class ConcurrentSelectorDeliveryManager implements DeliveryManager
+{
+ private static final Logger _log = Logger.getLogger(ConcurrentSelectorDeliveryManager.class);
+
+ @Configured(path = "advanced.compressBufferOnQueue",
+ defaultValue = "false")
+ public boolean compressBufferOnQueue;
+ /**
+ * Holds any queued messages
+ */
+ private final Queue<AMQMessage> _messages = new ConcurrentLinkedQueueAtomicSize<AMQMessage>();
+ //private int _messageCount;
+ /**
+ * Ensures that only one asynchronous task is running for this manager at
+ * any time.
+ */
+ private final AtomicBoolean _processing = new AtomicBoolean();
+ /**
+ * The subscriptions on the queue to whom messages are delivered
+ */
+ private final SubscriptionManager _subscriptions;
+
+ /**
+ * A reference to the queue we are delivering messages for. We need this to be able
+ * to pass the code that handles acknowledgements a handle on the queue.
+ */
+ private final AMQQueue _queue;
+
+
+ /**
+ * Lock used to ensure that an channel that becomes unsuspended during the start of the queueing process is forced
+ * to wait till the first message is added to the queue. This will ensure that the _queue has messages to be delivered
+ * via the async thread.
+ * <p/>
+ * Lock is used to control access to hasQueuedMessages() and over the addition of messages to the queue.
+ */
+ private ReentrantLock _lock = new ReentrantLock();
+
+
+ ConcurrentSelectorDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue)
+ {
+
+ //Set values from configuration
+ Configurator.configure(this);
+
+ if (compressBufferOnQueue)
+ {
+ _log.warn("Compressing Buffers on queue.");
+ }
+
+ _subscriptions = subscriptions;
+ _queue = queue;
+ }
+
+
+ private boolean addMessageToQueue(AMQMessage msg)
+ {
+ // Shrink the ContentBodies to their actual size to save memory.
+ if (compressBufferOnQueue)
+ {
+ Iterator it = msg.getContentBodies().iterator();
+ while (it.hasNext())
+ {
+ ContentBody cb = (ContentBody) it.next();
+ cb.reduceBufferToFit();
+ }
+ }
+
+ _messages.offer(msg);
+
+ return true;
+ }
+
+
+ public boolean hasQueuedMessages()
+ {
+ _lock.lock();
+ try
+ {
+ return !_messages.isEmpty();
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ public int getQueueMessageCount()
+ {
+ return getMessageCount();
+ }
+
+ /**
+ * This is an EXPENSIVE opperation to perform with a ConcurrentLinkedQueue as it must run the queue to determine size.
+ * The ConcurrentLinkedQueueAtomicSize uses an AtomicInteger to record the number of elements on the queue.
+ *
+ * @return int the number of messages in the delivery queue.
+ */
+ private int getMessageCount()
+ {
+ return _messages.size();
+ }
+
+
+ public synchronized List<AMQMessage> getMessages()
+ {
+ return new ArrayList<AMQMessage>(_messages);
+ }
+
+ public synchronized void removeAMessageFromTop() throws AMQException
+ {
+ AMQMessage msg = poll();
+ if (msg != null)
+ {
+ msg.dequeue(_queue);
+ }
+ }
+
+ public synchronized void clearAllMessages() throws AMQException
+ {
+ AMQMessage msg = poll();
+ while (msg != null)
+ {
+ msg.dequeue(_queue);
+ msg = poll();
+ }
+ }
+
+
+ private AMQMessage getNextMessage(Queue<AMQMessage> messages)
+ {
+ AMQMessage message = messages.peek();
+
+ while (message != null && message.taken())
+ {
+ //remove the already taken message
+ messages.poll();
+ // try the next message
+ message = messages.peek();
+ }
+ return message;
+ }
+
+ public void sendNextMessage(Subscription sub, Queue<AMQMessage> messageQueue, AMQQueue queue)
+ {
+ AMQMessage message = null;
+ try
+ {
+ message = getNextMessage(messageQueue);
+
+ // message will be null if we have no messages in the messageQueue.
+ if (message == null)
+ {
+ return;
+ }
+ _log.info("Async Delivery Message:" + message + " to :" + sub);
+
+ sub.send(message, queue);
+ message.setDeliveredToConsumer();
+
+ //remove sent message from our queue.
+ messageQueue.poll();
+ }
+ catch (FailedDequeueException e)
+ {
+ message.release();
+ _log.error("Unable to deliver message as dequeue failed: " + e, e);
+ }
+ }
+
+ /**
+ * Only one thread should ever execute this method concurrently, but
+ * it can do so while other threads invoke deliver().
+ */
+ private void processQueue()
+ {
+ // Continue to process delivery while we haveSubscribers and messages
+ boolean hasSubscribers = _subscriptions.hasActiveSubscribers();
+
+ while (hasSubscribers && hasQueuedMessages())
+ {
+ for (Subscription sub : _subscriptions.getSubscriptions())
+ {
+ if (!sub.isSuspended())
+ {
+ if (sub.hasFilters())
+ {
+ sendNextMessage(sub, sub.getPreDeliveryQueue(), _queue);
+ }
+ else
+ {
+ sendNextMessage(sub, _messages, _queue);
+ }
+
+ hasSubscribers = true;
+ }
+ else
+ {
+ hasSubscribers = false;
+ }
+ }
+ }
+ }
+
+ private AMQMessage poll()
+ {
+ return _messages.poll();
+ }
+
+ public void deliver(String name, AMQMessage msg) throws FailedDequeueException
+ {
+ _log.info("deliver :" + msg);
+
+ //Check if we have someone to deliver the message to.
+ _lock.lock();
+ try
+ {
+ Subscription s = _subscriptions.nextSubscriber(msg);
+
+ if (s == null) //no-one can take the message right now.
+ {
+ _log.info("Testing Message(" + msg + ") for Queued Delivery");
+ if (!msg.isImmediate())
+ {
+ addMessageToQueue(msg);
+
+ //release lock now message is on queue.
+ _lock.unlock();
+
+ //Pre Deliver to all subscriptions
+ _log.info("We have " + _subscriptions.getSubscriptions().size() + " subscribers to give the message to.");
+ for (Subscription sub : _subscriptions.getSubscriptions())
+ {
+
+ // stop if the message gets delivered whilst PreDelivering if we have a shared queue.
+ if (_queue.isShared() && msg.getDeliveredToConsumer())
+ {
+ _log.info("Stopping PreDelivery as message(" + msg + ") is already delivered.");
+ continue;
+ }
+
+ // Only give the message to those that want them.
+ if (sub.hasFilters() && sub.hasInterest(msg))
+ {
+ sub.enqueueForPreDelivery(msg);
+ }
+ }
+ }
+ }
+ else
+ {
+ //release lock now
+ _lock.unlock();
+
+ _log.info("Delivering Message:" + msg + " to(" + System.identityHashCode(s) + ") :" + s);
+ //Deliver the message
+ s.send(msg, _queue);
+ msg.setDeliveredToConsumer();
+ }
+ }
+ finally
+ {
+ //ensure lock is released
+ if (_lock.isLocked())
+ {
+ _lock.unlock();
+ }
+ }
+ }
+
+ Runner asyncDelivery = new Runner();
+
+ private class Runner implements Runnable
+ {
+ public void run()
+ {
+ boolean running = true;
+ while (running)
+ {
+ processQueue();
+
+ //Check that messages have not been added since we did our last peek();
+ // Synchronize with the thread that adds to the queue.
+ // If the queue is still empty then we can exit
+
+ if (!(hasQueuedMessages() && _subscriptions.hasActiveSubscribers()))
+ {
+ running = false;
+ _processing.set(false);
+ }
+ }
+ }
+ }
+
+ public void processAsync(Executor executor)
+ {
+ _log.debug("Processing Async. Queued:" + hasQueuedMessages() + "(" + getQueueMessageCount() + ")" +
+ " Active:" + _subscriptions.hasActiveSubscribers() +
+ " Processing:" + _processing.get());
+
+ if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers())
+ {
+ //are we already running? if so, don't re-run
+ if (_processing.compareAndSet(false, true))
+ {
+ executor.execute(asyncDelivery);
+ }
+ }
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
index 7105a3f30f..79b0593f69 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -118,6 +118,12 @@ public class SubscriptionImpl implements Subscription
}
}
+ public SubscriptionImpl(int channel, AMQProtocolSession protocolSession,
+ String consumerTag)
+ throws AMQException
+ {
+ this(channel, protocolSession, consumerTag, false);
+ }
public boolean equals(Object o)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
index 301182a313..49b0111b67 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
@@ -206,7 +206,12 @@ class SynchronizedDeliveryManager implements DeliveryManager
//are we already running? if so, don't re-run
if (_processing.compareAndSet(false, true))
{
- executor.execute(new Runner());
+ // Do we need this?
+ // This executor is created via Executors in AsyncDeliveryConfig which only returns a TPE so cast is ok.
+ //if (executor != null && !((ThreadPoolExecutor) executor).isShutdown())
+ {
+ executor.execute(new Runner());
+ }
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
index 5e88ff7f2d..7a13884136 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
@@ -109,6 +109,7 @@ public class AMQStateManager implements AMQMethodListener
frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());
frame2handlerMap.put(ExchangeDeclareBody.class, ExchangeDeclareHandler.getInstance());
frame2handlerMap.put(ExchangeDeleteBody.class, ExchangeDeleteHandler.getInstance());
+ frame2handlerMap.put(ExchangeBoundBody.class, ExchangeBoundHandler.getInstance());
frame2handlerMap.put(BasicAckBody.class, BasicAckMethodHandler.getInstance());
frame2handlerMap.put(BasicRecoverBody.class, BasicRecoverMethodHandler.getInstance());
frame2handlerMap.put(BasicConsumeBody.class, BasicConsumeMethodHandler.getInstance());