diff options
20 files changed, 1451 insertions, 577 deletions
diff --git a/java/broker/bin/qpid-server.bat b/java/broker/bin/qpid-server.bat index ad3a20f459..b839cc72d8 100644 --- a/java/broker/bin/qpid-server.bat +++ b/java/broker/bin/qpid-server.bat @@ -63,6 +63,6 @@ goto loop :runCommand
set LAUNCH_JAR=%QPID_HOME%\lib\qpid-incubating.jar
set MODULE_JARS=%QPID_MODULE_JARS%
-"%JAVA_HOME%"\bin\java -server -Xmx1024m -DQPID_HOME="%QPID_HOME%" -cp "%LAUNCH_JAR%;%MODULE_JARS%" org.apache.qpid.server.Main %QPID_ARGS%
+"%JAVA_HOME%\bin\java" -server -Xmx1024m -DQPID_HOME="%QPID_HOME%" -cp "%LAUNCH_JAR%;%MODULE_JARS%" org.apache.qpid.server.Main %QPID_ARGS%
:end
diff --git a/java/broker/pom.xml b/java/broker/pom.xml index f6dd171214..5f4c490fd4 100644 --- a/java/broker/pom.xml +++ b/java/broker/pom.xml @@ -34,6 +34,7 @@ <properties> <topDirectoryLocation>..</topDirectoryLocation> + <amqj.logging.level>warn</amqj.logging.level> </properties> <dependencies> @@ -53,7 +54,7 @@ <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId> </dependency> - <dependency> + <dependency> <groupId>org.apache.geronimo.specs</groupId> <artifactId>geronimo-jms_1.1_spec</artifactId> </dependency> @@ -85,7 +86,6 @@ <build> <plugins> - <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>javacc-maven-plugin</artifactId> @@ -116,7 +116,7 @@ </property> <property> <name>amqj.logging.level</name> - <value>WARN</value> + <value>${amqj.logging.level}</value> </property> <property> <name>log4j.configuration</name> diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java index 6dc97f9e48..ffd0e5d8ae 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -197,4 +197,34 @@ public class DestNameExchange extends AbstractExchange } } } + + public boolean isBound(String routingKey, AMQQueue queue) throws AMQException + { + final List<AMQQueue> queues = _index.get(routingKey); + return queues != null && queues.contains(queue); + } + + public boolean isBound(String routingKey) throws AMQException + { + final List<AMQQueue> queues = _index.get(routingKey); + return queues != null && !queues.isEmpty(); + } + + public boolean isBound(AMQQueue queue) throws AMQException + { + Map<String, List<AMQQueue>> bindings = _index.getBindingsMap(); + for (List<AMQQueue> queues : bindings.values()) + { + if (queues.contains(queue)) + { + return true; + } + } + return false; + } + + public boolean hasBindings() throws AMQException + { + return !_index.getBindingsMap().isEmpty(); + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java index a692f9ebca..139307488e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -126,10 +126,11 @@ public class DestWildExchange extends AbstractExchange } // End of MBean class - public void registerQueue(String routingKey, AMQQueue queue, FieldTable args) throws AMQException + public synchronized void registerQueue(String routingKey, AMQQueue queue, FieldTable args) throws AMQException { assert queue != null; assert routingKey != null; + _logger.debug("Registering queue " + queue.getName() + " with routing key " + routingKey); // we need to use putIfAbsent, which is an atomic operation, to avoid a race condition List<AMQQueue> queueList = _routingKey2queues.putIfAbsent(routingKey, new CopyOnWriteArrayList<AMQQueue>()); // if we got null back, no previous value was associated with the specified routing key hence @@ -159,6 +160,8 @@ public class DestWildExchange extends AbstractExchange // TODO: add support for the immediate flag if (queues == null) { + _logger.warn("No queues found for routing key " + routingKey); + _logger.warn("Routing map contains: " + _routingKey2queues); //todo Check for valid topic - mritchie return; } @@ -172,7 +175,37 @@ public class DestWildExchange extends AbstractExchange } } - public void deregisterQueue(String routingKey, AMQQueue queue) throws AMQException + public boolean isBound(String routingKey, AMQQueue queue) throws AMQException + { + List<AMQQueue> queues = _routingKey2queues.get(routingKey); + return queues != null && queues.contains(queue); + } + + + public boolean isBound(String routingKey) throws AMQException + { + List<AMQQueue> queues = _routingKey2queues.get(routingKey); + return queues != null && !queues.isEmpty(); + } + + public boolean isBound(AMQQueue queue) throws AMQException + { + for (List<AMQQueue> queues : _routingKey2queues.values()) + { + if (queues.contains(queue)) + { + return true; + } + } + return false; + } + + public boolean hasBindings() throws AMQException + { + return !_routingKey2queues.isEmpty(); + } + + public synchronized void deregisterQueue(String routingKey, AMQQueue queue) throws AMQException { assert queue != null; assert routingKey != null; @@ -190,6 +223,10 @@ public class DestWildExchange extends AbstractExchange throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() + " with routing key " + routingKey); } + if (queues.isEmpty()) + { + _routingKey2queues.remove(queues); + } } protected ExchangeMBean createMBean() throws AMQException diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java index 787d0eddfd..824e85dc5c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -47,4 +47,30 @@ public interface Exchange void deregisterQueue(String routingKey, AMQQueue queue) throws AMQException; void route(AMQMessage message) throws AMQException; + + /** + * Determines whether a message would be isBound to a particular queue using a specific routing key + * @param routingKey + * @param queue + * @return + * @throws AMQException + */ + boolean isBound(String routingKey, AMQQueue queue) throws AMQException; + + /** + * Determines whether a message is routing to any queue using a specific routing key + * @param routingKey + * @return + * @throws AMQException + */ + boolean isBound(String routingKey) throws AMQException; + + boolean isBound(AMQQueue queue) throws AMQException; + + /** + * Returns true if this exchange has at least one binding associated with it. + * @return + * @throws AMQException + */ + boolean hasBindings() throws AMQException; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java index ccb2211a55..8c4df68dea 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -206,10 +206,48 @@ public class HeadersExchange extends AbstractExchange } if (!delivered) { - _logger.warn("Exchange " + getName() + ": message not routable."); + + String msg = "Exchange " + getName() + ": message not routable."; + + if (payload.getPublishBody().mandatory) + { + throw new NoRouteException(msg, payload); + } + else + { + _logger.warn(msg); + } + } } + public boolean isBound(String routingKey, AMQQueue queue) throws AMQException + { + return isBound(queue); + } + + public boolean isBound(String routingKey) throws AMQException + { + return hasBindings(); + } + + public boolean isBound(AMQQueue queue) throws AMQException + { + for (Registration r : _bindings) + { + if (r.queue.equals(queue)) + { + return true; + } + } + return false; + } + + public boolean hasBindings() throws AMQException + { + return !_bindings.isEmpty(); + } + protected Map getHeaders(ContentHeaderBody contentHeaderFrame) { //what if the content type is not 'basic'? 'file' and 'stream' content classes also define headers, diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java index fb48729c9e..485c4739bd 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -24,6 +24,7 @@ import org.apache.qpid.server.queue.AMQQueue; import java.util.List; import java.util.Map; +import java.util.HashMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -37,7 +38,7 @@ class Index private ConcurrentMap<String, List<AMQQueue>> _index = new ConcurrentHashMap<String, List<AMQQueue>>(); - boolean add(String key, AMQQueue queue) + synchronized boolean add(String key, AMQQueue queue) { List<AMQQueue> queues = _index.get(key); if(queues == null) @@ -61,7 +62,7 @@ class Index } } - boolean remove(String key, AMQQueue queue) + synchronized boolean remove(String key, AMQQueue queue) { List<AMQQueue> queues = _index.get(key); if (queues != null) @@ -83,6 +84,6 @@ class Index Map<String, List<AMQQueue>> getBindingsMap() { - return _index; + return new HashMap<String, List<AMQQueue>>(_index); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java b/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java index 5c106de6f2..23e7355f17 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.filter; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.AMQException; //import org.slf4j.Logger; //import org.slf4j.LoggerFactory; @@ -35,7 +36,7 @@ public class FilterManagerFactory //fixme move to a common class so it can be refered to from client code. private static String JMS_SELECTOR_FILTER = "x-filter-jms-selector"; - public static FilterManager createManager(FieldTable filters) + public static FilterManager createManager(FieldTable filters) throws AMQException { FilterManager manager = null; diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java b/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java index e54fdb944d..86caf171fb 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java @@ -23,6 +23,9 @@ package org.apache.qpid.server.filter; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.filter.jms.selector.SelectorParser; import org.apache.qpid.server.message.jms.JMSMessage; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQInvalidSelectorException; +import org.apache.qpid.protocol.AMQConstant; import org.apache.log4j.Logger; //import org.slf4j.Logger; //import org.slf4j.LoggerFactory; @@ -33,13 +36,13 @@ import javax.jms.JMSException; public class JMSSelectorFilter implements MessageFilter { - private final static Logger _logger = org.apache.log4j.Logger.getLogger(JMSSelectorFilter.class); + private final static Logger _logger = org.apache.log4j.Logger.getLogger(JMSSelectorFilter.class); // LoggerFactory.getLogger(JMSSelectorFilter.class); private String _selector; private BooleanExpression _matcher; - public JMSSelectorFilter(String selector) + public JMSSelectorFilter(String selector) throws AMQException { _selector = selector; _logger.info("Created JMSSelectorFilter with selector:" + _selector); @@ -53,8 +56,8 @@ public class JMSSelectorFilter implements MessageFilter catch (InvalidSelectorException e) { // fixme - // Will have to throw this back to the client... in the future - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + // Is this the correct way of throwing exception + throw new AMQInvalidSelectorException(e.getMessage()); } } @@ -64,7 +67,7 @@ public class JMSSelectorFilter implements MessageFilter try { boolean match = _matcher.matches(message); - _logger.info(message + " match(" + match + ") selector:" + _selector); + _logger.info(message + " match(" + match + ") selector(" + System.identityHashCode(_selector) + "):" + _selector); return match; } catch (JMSException e) diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java index c4c995540d..bf282020ee 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java @@ -21,10 +21,12 @@ package org.apache.qpid.server.handler; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQInvalidSelectorException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.framing.BasicConsumeBody; import org.apache.qpid.framing.BasicConsumeOkBody; import org.apache.qpid.framing.ConnectionCloseBody; +import org.apache.qpid.framing.ChannelCloseBody; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.ConsumerTagNotUniqueException; import org.apache.qpid.server.exchange.ExchangeRegistry; @@ -32,6 +34,7 @@ import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; import org.apache.log4j.Logger; @@ -68,14 +71,14 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic { AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : queueRegistry.getQueue(body.queue); - if(queue == null) + if (queue == null) { _log.info("No queue for '" + body.queue + "'"); } try { - String consumerTag = channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck, body.filter); - if(!body.nowait) + String consumerTag = channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck, body.arguments); + if (!body.nowait) { session.writeFrame(BasicConsumeOkBody.createAMQFrame(channelId, consumerTag)); } @@ -83,10 +86,19 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic //now allow queue to start async processing of any backlog of messages queue.deliverAsync(); } - catch(ConsumerTagNotUniqueException e) + catch (AMQInvalidSelectorException ise) + { + _log.info("Closing connection due to invalid selector"); + session.writeFrame(ChannelCloseBody.createAMQFrame(channelId, AMQConstant.INVALID_SELECTOR.getCode(), + ise.getMessage(), BasicConsumeBody.CLASS_ID, + BasicConsumeBody.METHOD_ID)); + } + catch (ConsumerTagNotUniqueException e) { String msg = "Non-unique consumer tag, '" + body.consumerTag + "'"; - session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId, AMQConstant.NOT_ALLOWED.getCode(), msg, BasicConsumeBody.CLASS_ID, BasicConsumeBody.METHOD_ID)); + session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId, AMQConstant.NOT_ALLOWED.getCode(), msg, + BasicConsumeBody.CLASS_ID, + BasicConsumeBody.METHOD_ID)); } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java new file mode 100644 index 0000000000..5aaf78d6b7 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java @@ -0,0 +1,163 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.handler; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.ExchangeBoundBody; +import org.apache.qpid.framing.ExchangeBoundOkBody; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQMethodEvent; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.server.state.StateAwareMethodListener; + +/** + * @author Apache Software Foundation + */ +public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBoundBody> +{ + private static final ExchangeBoundHandler _instance = new ExchangeBoundHandler(); + + public static final int OK = 0; + + public static final int EXCHANGE_NOT_FOUND = 1; + + public static final int QUEUE_NOT_FOUND = 2; + + public static final int NO_BINDINGS = 3; + + public static final int QUEUE_NOT_BOUND = 4; + + public static final int NO_QUEUE_BOUND_WITH_RK = 5; + + public static final int SPECIFIC_QUEUE_NOT_BOUND_WITH_RK = 6; + + public static ExchangeBoundHandler getInstance() + { + return _instance; + } + + private ExchangeBoundHandler() + { + } + + public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, + ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, + AMQMethodEvent<ExchangeBoundBody> evt) throws AMQException + { + ExchangeBoundBody body = evt.getMethod(); + + String exchangeName = body.exchange; + String queueName = body.queue; + String routingKey = body.routingKey; + if (exchangeName == null) + { + throw new AMQException("Exchange exchange must not be null"); + } + Exchange exchange = exchangeRegistry.getExchange(exchangeName); + AMQFrame response; + if (exchange == null) + { + response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), EXCHANGE_NOT_FOUND, + "Exchange " + exchangeName + " not found"); + } + else if (routingKey == null) + { + if (queueName == null) + { + if (exchange.hasBindings()) + { + response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK, null); + } + else + { + response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), NO_BINDINGS, null); + } + } + else + { + AMQQueue queue = queueRegistry.getQueue(queueName); + if (queue == null) + { + response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), QUEUE_NOT_FOUND, + "Queue " + queueName + " not found"); + } + else + { + if (exchange.isBound(queue)) + { + response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK, null); + } + else + { + response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), QUEUE_NOT_BOUND, + "Queue " + queueName + " not bound to exchange " + + exchangeName); + } + } + } + } + else if (queueName != null) + { + AMQQueue queue = queueRegistry.getQueue(queueName); + if (queue == null) + { + response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), QUEUE_NOT_FOUND, + "Queue " + queueName + " not found"); + } + else + { + if (exchange.isBound(body.routingKey, queue)) + { + response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK, + null); + } + else + { + response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), + SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, + "Queue " + queueName + + " not bound with routing key " + + body.routingKey + " to exchange " + + exchangeName); + } + } + } + else + { + if (exchange.isBound(body.routingKey)) + { + response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK, + null); + } + else + { + response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), + NO_QUEUE_BOUND_WITH_RK, + "No queue bound with routing key " + + body.routingKey + " to exchange " + + exchangeName); + } + } + protocolSession.writeFrame(response); + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 1aa62dbfa4..c38f7f630b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -40,9 +40,6 @@ import org.apache.qpid.framing.ProtocolVersionList; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.RequiredDeliveryException; import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.management.AMQManagedObject; -import org.apache.qpid.server.management.MBeanConstructor; -import org.apache.qpid.server.management.MBeanDescription; import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.queue.QueueRegistry; @@ -50,24 +47,12 @@ import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.state.AMQStateManager; import javax.management.JMException; -import javax.management.MBeanException; -import javax.management.MBeanNotificationInfo; -import javax.management.Notification; -import javax.management.monitor.MonitorNotification; -import javax.management.openmbean.CompositeData; -import javax.management.openmbean.CompositeDataSupport; -import javax.management.openmbean.CompositeType; -import javax.management.openmbean.OpenDataException; -import javax.management.openmbean.OpenType; -import javax.management.openmbean.SimpleType; -import javax.management.openmbean.TabularData; -import javax.management.openmbean.TabularDataSupport; -import javax.management.openmbean.TabularType; import javax.security.sasl.SaslServer; import java.net.InetSocketAddress; import java.net.SocketAddress; -import java.util.Date; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArraySet; @@ -93,7 +78,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, private AMQCodecFactory _codecFactory; - private ManagedAMQProtocolSession _managedObject; + private AMQProtocolSessionMBean _managedObject; private SaslServer _saslServer; @@ -102,11 +87,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, private Object _lastSent; private boolean _closed; - + // maximum number of channels this session should have private long _maxNoOfChannels = 1000; /* AMQP Version for this session */ - private byte _major; private byte _minor; @@ -115,190 +99,6 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, return _managedObject; } - /** - * This class implements the management interface (is an MBean). In order to - * make more attributes, operations and notifications available over JMX simply - * augment the ManagedConnection interface and add the appropriate implementation here. - */ - @MBeanDescription("Management Bean for an AMQ Broker Connection") - private final class ManagedAMQProtocolSession extends AMQManagedObject implements ManagedConnection - { - private String _name = null; - //openmbean data types for representing the channel attributes - private String[] _channelAtttibuteNames = { "Channel Id", "Transactional", "Default Queue", "Unacknowledged Message Count"}; - private String[] _indexNames = {_channelAtttibuteNames[0]}; - private OpenType[] _channelAttributeTypes = {SimpleType.INTEGER, SimpleType.BOOLEAN, SimpleType.STRING, SimpleType.INTEGER}; - private CompositeType _channelType = null; // represents the data type for channel data - private TabularType _channelsType = null; // Data type for list of channels type - private TabularDataSupport _channelsList = null; - - @MBeanConstructor("Creates an MBean exposing an AMQ Broker Connection") - public ManagedAMQProtocolSession() throws JMException - { - super(ManagedConnection.class, ManagedConnection.TYPE); - init(); - } - - /** - * initialises the openmbean data types - */ - private void init() throws OpenDataException - { - String remote = getRemoteAddress(); - remote = "anonymous".equals(remote) ? remote + hashCode() : remote; - _name = jmxEncode(new StringBuffer(remote), 0).toString(); - _channelType = new CompositeType("Channel", "Channel Details", _channelAtttibuteNames, - _channelAtttibuteNames, _channelAttributeTypes); - _channelsType = new TabularType("Channels", "Channels", _channelType, _indexNames); - } - - public Date getLastIoTime() - { - return new Date(_minaProtocolSession.getLastIoTime()); - } - - public String getRemoteAddress() - { - return _minaProtocolSession.getRemoteAddress().toString(); - } - - public Long getWrittenBytes() - { - return _minaProtocolSession.getWrittenBytes(); - } - - public Long getReadBytes() - { - return _minaProtocolSession.getReadBytes(); - } - - public Long getMaximumNumberOfChannels() - { - return _maxNoOfChannels; - } - - public void setMaximumNumberOfChannels(Long value) - { - _maxNoOfChannels = value; - } - - public String getObjectInstanceName() - { - return _name; - } - - public void commitTransactions(int channelId) throws JMException - { - try - { - AMQChannel channel = _channelMap.get(channelId); - if (channel == null) - { - throw new JMException("The channel (channel Id = " + channelId + ") does not exist"); - } - if (channel.isTransactional()) - { - channel.commit(); - } - } - catch(AMQException ex) - { - throw new MBeanException(ex, ex.toString()); - } - } - - public void rollbackTransactions(int channelId) throws JMException - { - try - { - AMQChannel channel = _channelMap.get(channelId); - if (channel == null) - { - throw new JMException("The channel (channel Id = " + channelId + ") does not exist"); - } - if (channel.isTransactional()) - { - channel.rollback(); - } - } - catch(AMQException ex) - { - throw new MBeanException(ex, ex.toString()); - } - } - - /** - * Creates the list of channels in tabular form from the _channelMap. - * @return list of channels in tabular form. - * @throws OpenDataException - */ - public TabularData channels() throws OpenDataException - { - _channelsList = new TabularDataSupport(_channelsType); - - for (Map.Entry<Integer, AMQChannel> entry : _channelMap.entrySet()) - { - AMQChannel channel = entry.getValue(); - Object[] itemValues = {channel.getChannelId(), channel.isTransactional(), - (channel.getDefaultQueue() != null) ? channel.getDefaultQueue().getName() : null, - channel.getUnacknowledgedMessageMap().size()}; - - CompositeData channelData = new CompositeDataSupport(_channelType, _channelAtttibuteNames, itemValues); - _channelsList.put(channelData); - } - - return _channelsList; - } - - public void closeChannel(int id) throws Exception - { - try - { - AMQMinaProtocolSession.this.closeChannel(id); - } - catch (AMQException ex) - { - throw new Exception(ex.toString()); - } - } - - public void closeConnection() throws Exception - { - try - { - AMQMinaProtocolSession.this.closeSession(); - } - catch (AMQException ex) - { - throw new Exception(ex.toString()); - } - } - - @Override - public MBeanNotificationInfo[] getNotificationInfo() - { - String[] notificationTypes = new String[] {MonitorNotification.THRESHOLD_VALUE_EXCEEDED}; - String name = MonitorNotification.class.getName(); - String description = "Channel count has reached threshold value"; - MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description); - - return new MBeanNotificationInfo[] {info1}; - } - - private void checkForNotification() - { - int channelsCount = _channelMap.size(); - if (channelsCount >= getMaximumNumberOfChannels()) - { - Notification n = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, - ++_notificationSequenceNumber, System.currentTimeMillis(), - "Channel count (" + channelsCount + ") has reached the threshold value"); - - _broadcaster.sendNotification(n); - } - } - - } // End of MBean class public AMQMinaProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQCodecFactory codecFactory) @@ -322,11 +122,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, _managedObject.register(); } - private ManagedAMQProtocolSession createMBean() throws AMQException + private AMQProtocolSessionMBean createMBean() throws AMQException { try { - return new ManagedAMQProtocolSession(); + return new AMQProtocolSessionMBean(this); } catch(JMException ex) { @@ -335,6 +135,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, } } + public IoSession getIOSession() + { + return _minaProtocolSession; + } + public static AMQProtocolSession getAMQProtocolSession(IoSession minaProtocolSession) { return (AMQProtocolSession) minaProtocolSession.getAttachment(); @@ -495,6 +300,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, _contextKey = contextKey; } + public List<AMQChannel> getChannels() + { + return new ArrayList<AMQChannel>(_channelMap.values()); + } + public AMQChannel getChannel(int channelId) throws AMQException { return _channelMap.get(channelId); @@ -503,7 +313,42 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, public void addChannel(AMQChannel channel) { _channelMap.put(channel.getChannelId(), channel); - _managedObject.checkForNotification(); + checkForNotification(); + } + + private void checkForNotification() + { + int channelsCount = _channelMap.size(); + if (channelsCount >= _maxNoOfChannels) + { + _managedObject.notifyClients("Channel count (" + channelsCount + ") has reached the threshold value"); + } + } + + public Long getMaximumNumberOfChannels() + { + return _maxNoOfChannels; + } + + public void setMaximumNumberOfChannels(Long value) + { + _maxNoOfChannels = value; + } + + public void commitTransactions(AMQChannel channel) throws AMQException + { + if (channel != null && channel.isTransactional()) + { + channel.commit(); + } + } + + public void rollbackTransactions(AMQChannel channel) throws AMQException + { + if (channel != null && channel.isTransactional()) + { + channel.rollback(); + } } /** diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java new file mode 100644 index 0000000000..a47d462810 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java @@ -0,0 +1,241 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.protocol; + +import org.apache.qpid.AMQException; +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.management.AMQManagedObject; +import org.apache.qpid.server.management.MBeanConstructor; +import org.apache.qpid.server.management.MBeanDescription; + +import javax.management.JMException; +import javax.management.MBeanException; +import javax.management.MBeanNotificationInfo; +import javax.management.Notification; +import javax.management.monitor.MonitorNotification; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.OpenType; +import javax.management.openmbean.SimpleType; +import javax.management.openmbean.TabularData; +import javax.management.openmbean.TabularDataSupport; +import javax.management.openmbean.TabularType; +import java.util.Date; +import java.util.List; + +/** + * This MBean class implements the management interface. In order to make more attributes, operations and notifications + * available over JMX simply augment the ManagedConnection interface and add the appropriate implementation here. + */ +@MBeanDescription("Management Bean for an AMQ Broker Connection") +public class AMQProtocolSessionMBean extends AMQManagedObject implements ManagedConnection +{ + private AMQMinaProtocolSession _session = null; + private String _name = null; + //openmbean data types for representing the channel attributes + private String[] _channelAtttibuteNames = {"Channel Id", "Transactional", "Default Queue", "Unacknowledged Message Count"}; + private String[] _indexNames = {_channelAtttibuteNames[0]}; + private OpenType[] _channelAttributeTypes = {SimpleType.INTEGER, SimpleType.BOOLEAN, SimpleType.STRING, SimpleType.INTEGER}; + private CompositeType _channelType = null; // represents the data type for channel data + private TabularType _channelsType = null; // Data type for list of channels type + + @MBeanConstructor("Creates an MBean exposing an AMQ Broker Connection") + public AMQProtocolSessionMBean(AMQMinaProtocolSession session) throws JMException + { + super(ManagedConnection.class, ManagedConnection.TYPE); + _session = session; + init(); + } + + /** + * initialises the openmbean data types + */ + private void init() throws OpenDataException + { + String remote = getRemoteAddress(); + remote = "anonymous".equals(remote) ? remote + hashCode() : remote; + _name = jmxEncode(new StringBuffer(remote), 0).toString(); + _channelType = new CompositeType("Channel", "Channel Details", _channelAtttibuteNames, + _channelAtttibuteNames, _channelAttributeTypes); + _channelsType = new TabularType("Channels", "Channels", _channelType, _indexNames); + } + + public Date getLastIoTime() + { + return new Date(_session.getIOSession().getLastIoTime()); + } + + public String getRemoteAddress() + { + return _session.getIOSession().getRemoteAddress().toString(); + } + + public Long getWrittenBytes() + { + return _session.getIOSession().getWrittenBytes(); + } + + public Long getReadBytes() + { + return _session.getIOSession().getReadBytes(); + } + + public Long getMaximumNumberOfChannels() + { + return _session.getMaximumNumberOfChannels(); + } + + public void setMaximumNumberOfChannels(Long value) + { + _session.setMaximumNumberOfChannels(value); + } + + public String getObjectInstanceName() + { + return _name; + } + + /** + * commits transactions for a transactional channel + * + * @param channelId + * @throws JMException if channel with given id doesn't exist or if commit fails + */ + public void commitTransactions(int channelId) throws JMException + { + try + { + AMQChannel channel = _session.getChannel(channelId); + if (channel == null) + { + throw new JMException("The channel (channel Id = " + channelId + ") does not exist"); + } + _session.commitTransactions(channel); + } + catch (AMQException ex) + { + throw new MBeanException(ex, ex.toString()); + } + } + + /** + * rollsback the transactions for a transactional channel + * + * @param channelId + * @throws JMException if channel with given id doesn't exist or if rollback fails + */ + public void rollbackTransactions(int channelId) throws JMException + { + try + { + AMQChannel channel = _session.getChannel(channelId); + if (channel == null) + { + throw new JMException("The channel (channel Id = " + channelId + ") does not exist"); + } + _session.rollbackTransactions(channel); + } + catch (AMQException ex) + { + throw new MBeanException(ex, ex.toString()); + } + } + + /** + * Creates the list of channels in tabular form from the _channelMap. + * + * @return list of channels in tabular form. + * @throws OpenDataException + */ + public TabularData channels() throws OpenDataException + { + TabularDataSupport channelsList = new TabularDataSupport(_channelsType); + List<AMQChannel> list = _session.getChannels(); + + for (AMQChannel channel : list) + { + Object[] itemValues = {channel.getChannelId(), channel.isTransactional(), + (channel.getDefaultQueue() != null) ? channel.getDefaultQueue().getName() : null, + channel.getUnacknowledgedMessageMap().size()}; + + CompositeData channelData = new CompositeDataSupport(_channelType, _channelAtttibuteNames, itemValues); + channelsList.put(channelData); + } + + return channelsList; + } + + /** + * @see AMQMinaProtocolSession#closeChannel(int) + */ + public void closeChannel(int id) throws JMException + { + try + { + AMQChannel channel = _session.getChannel(id); + if (channel == null) + { + throw new JMException("The channel (channel Id = " + id + ") does not exist"); + } + + _session.closeChannel(id); + } + catch (AMQException ex) + { + throw new MBeanException(ex, ex.toString()); + } + } + + /** + * closes the connection. The administrator can use this management operation to close connection to free up + * resources. + * @throws JMException + */ + public void closeConnection() throws JMException + { + try + { + _session.closeSession(); + } + catch (AMQException ex) + { + throw new MBeanException(ex, ex.toString()); + } + } + + @Override + public MBeanNotificationInfo[] getNotificationInfo() + { + String[] notificationTypes = new String[]{MonitorNotification.THRESHOLD_VALUE_EXCEEDED}; + String name = MonitorNotification.class.getName(); + String description = "Channel count has reached threshold value"; + MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description); + + return new MBeanNotificationInfo[]{info1}; + } + + public void notifyClients(String notificationMsg) + { + Notification n = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, + ++_notificationSequenceNumber, System.currentTimeMillis(), notificationMsg); + _broadcaster.sendNotification(n); + } + +} // End of MBean class diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 3d08805cab..7fff7b9adf 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -21,16 +21,9 @@ package org.apache.qpid.server.queue; import org.apache.log4j.Logger; -import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.management.AMQManagedObject; -import org.apache.qpid.server.management.MBeanConstructor; -import org.apache.qpid.server.management.MBeanDescription; import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.protocol.AMQProtocolSession; @@ -38,15 +31,8 @@ import org.apache.qpid.server.txn.TxnBuffer; import org.apache.qpid.server.txn.TxnOp; import javax.management.JMException; -import javax.management.MBeanException; -import javax.management.MBeanNotificationInfo; -import javax.management.Notification; -import javax.management.monitor.MonitorNotification; -import javax.management.openmbean.*; import java.text.MessageFormat; -import java.util.ArrayList; import java.util.List; -import java.util.Properties; import java.util.concurrent.Executor; /** @@ -110,7 +96,7 @@ public class AMQQueue implements Managable, Comparable * max allowed number of messages on a queue. */ private Integer _maxMessageCount = 10000; - + /** * max queue depth(KB) for the queue */ @@ -126,322 +112,6 @@ public class AMQQueue implements Managable, Comparable return _name.compareTo(((AMQQueue) o).getName()); } - /** - * MBean class for AMQQueue. It implements all the management features exposed - * for an AMQQueue. - */ - @MBeanDescription("Management Interface for AMQQueue") - private final class AMQQueueMBean extends AMQManagedObject implements ManagedQueue - { - private String _queueName = null; - // OpenMBean data types for viewMessages method - private String[] _msgAttributeNames = {"Message Id", "Header", "Size(bytes)", "Redelivered"}; - private String[] _msgAttributeIndex = {_msgAttributeNames[0]}; - private OpenType[] _msgAttributeTypes = new OpenType[4]; // AMQ message attribute types. - private CompositeType _messageDataType = null; // Composite type for representing AMQ Message data. - private TabularType _messagelistDataType = null; // Datatype for representing AMQ messages list. - - // OpenMBean data types for viewMessageContent method - private CompositeType _msgContentType = null; - private String[] _msgContentAttributes = {"Message Id", "MimeType", "Encoding", "Content"}; - private OpenType[] _msgContentAttributeTypes = new OpenType[4]; - - @MBeanConstructor("Creates an MBean exposing an AMQQueue") - public AMQQueueMBean() throws JMException - { - super(ManagedQueue.class, ManagedQueue.TYPE); - init(); - } - - /** - * initialises the openmbean data types - */ - private void init() throws OpenDataException - { - _queueName = jmxEncode(new StringBuffer(_name), 0).toString(); - _msgContentAttributeTypes[0] = SimpleType.LONG; // For message id - _msgContentAttributeTypes[1] = SimpleType.STRING; // For MimeType - _msgContentAttributeTypes[2] = SimpleType.STRING; // For Encoding - _msgContentAttributeTypes[3] = new ArrayType(1, SimpleType.BYTE); // For message content - _msgContentType = new CompositeType("Message Content", "AMQ Message Content", _msgContentAttributes, - _msgContentAttributes, _msgContentAttributeTypes); - - _msgAttributeTypes[0] = SimpleType.LONG; // For message id - _msgAttributeTypes[1] = new ArrayType(1, SimpleType.STRING); // For header attributes - _msgAttributeTypes[2] = SimpleType.LONG; // For size - _msgAttributeTypes[3] = SimpleType.BOOLEAN; // For redelivered - - _messageDataType = new CompositeType("Message", "AMQ Message", _msgAttributeNames, _msgAttributeNames, _msgAttributeTypes); - _messagelistDataType = new TabularType("Messages", "List of messages", _messageDataType, _msgAttributeIndex); - } - - public String getObjectInstanceName() - { - return _queueName; - } - - public String getName() - { - return _name; - } - - public boolean isDurable() - { - return _durable; - } - - public String getOwner() - { - return _owner; - } - - public boolean isAutoDelete() - { - return _autoDelete; - } - - public Integer getMessageCount() - { - return _deliveryMgr.getQueueMessageCount(); - } - - public Long getMaximumMessageSize() - { - return _maxMessageSize; - } - - public void setMaximumMessageSize(Long value) - { - _maxMessageSize = value; - } - - public Integer getConsumerCount() - { - return _subscribers.size(); - } - - public Integer getActiveConsumerCount() - { - return _subscribers.getWeight(); - } - - public Long getReceivedMessageCount() - { - return _totalMessagesReceived; - } - - public Integer getMaximumMessageCount() - { - return _maxMessageCount; - } - - public void setMaximumMessageCount(Integer value) - { - _maxMessageCount = value; - } - - public Long getMaximumQueueDepth() - { - return _maxQueueDepth; - } - - // Sets the queue depth, the max queue size - public void setMaximumQueueDepth(Long value) - { - _maxQueueDepth = value; - } - - /** - * returns the size of messages(KB) in the queue. - */ - public Long getQueueDepth() - { - List<AMQMessage> list = _deliveryMgr.getMessages(); - if (list.size() == 0) - { - return 0l; - } - - long queueDepth = 0; - for (AMQMessage message : list) - { - queueDepth = queueDepth + getMessageSize(message); - } - return (long) Math.round(queueDepth / 1000); - } - - /** - * returns size of message in bytes - */ - private long getMessageSize(AMQMessage msg) - { - if (msg == null) - { - return 0l; - } - - return msg.getContentHeaderBody().bodySize; - } - - /** - * Checks if there is any notification to be send to the listeners - */ - private void checkForNotification(AMQMessage msg) - { - // Check for threshold message count - Integer msgCount = getMessageCount(); - if (msgCount >= getMaximumMessageCount()) - { - notifyClients("Message count(" + msgCount + ") has reached or exceeded the threshold high value"); - } - - // Check for threshold message size - long messageSize = getMessageSize(msg); - if (messageSize >= _maxMessageSize) - { - notifyClients("Message size(ID=" + msg.getMessageId() + ", size=" + messageSize + " bytes) is higher than the threshold value"); - } - - // Check for threshold queue depth in bytes - long queueDepth = getQueueDepth(); - if (queueDepth >= _maxQueueDepth) - { - notifyClients("Queue depth(" + queueDepth + "), Queue size has reached the threshold high value"); - } - } - - /** - * Sends the notification to the listeners - */ - private void notifyClients(String notificationMsg) - { - Notification n = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, - ++_notificationSequenceNumber, System.currentTimeMillis(), notificationMsg); - - _broadcaster.sendNotification(n); - } - - public void deleteMessageFromTop() throws JMException - { - try - { - _deliveryMgr.removeAMessageFromTop(); - } - catch (AMQException ex) - { - throw new MBeanException(ex, ex.toString()); - } - } - - public void clearQueue() throws JMException - { - try - { - _deliveryMgr.clearAllMessages(); - } - catch (AMQException ex) - { - throw new MBeanException(ex, ex.toString()); - } - } - - /** - * returns message content as byte array and related attributes for the given message id. - */ - public CompositeData viewMessageContent(long msgId) throws JMException - { - List<AMQMessage> list = _deliveryMgr.getMessages(); - AMQMessage msg = null; - for (AMQMessage message : list) - { - if (message.getMessageId() == msgId) - { - msg = message; - break; - } - } - - if (msg == null) - { - throw new JMException("AMQMessage with message id = " + msgId + " is not in the " + _queueName); - } - // get message content - List<ContentBody> cBodies = msg.getContentBodies(); - List<Byte> msgContent = new ArrayList<Byte>(); - for (ContentBody body : cBodies) - { - if (body.getSize() != 0) - { - ByteBuffer slice = body.payload.slice(); - for (int j = 0; j < slice.limit(); j++) - { - msgContent.add(slice.get()); - } - } - } - - // Create header attributes list - BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) msg.getContentHeaderBody().properties; - String mimeType = headerProperties.getContentType(); - String encoding = headerProperties.getEncoding() == null ? "" : headerProperties.getEncoding(); - Object[] itemValues = {msgId, mimeType, encoding, msgContent.toArray(new Byte[0])}; - - return new CompositeDataSupport(_msgContentType, _msgContentAttributes, itemValues); - } - - /** - * Returns the header contents of the messages stored in this queue in tabular form. - */ - public TabularData viewMessages(int beginIndex, int endIndex) throws JMException - { - if ((beginIndex > endIndex) || (beginIndex < 1)) - { - throw new JMException("From Index = " + beginIndex + ", To Index = " + endIndex + - "\nFrom Index should be greater than 0 and less than To Index"); - } - - List<AMQMessage> list = _deliveryMgr.getMessages(); - TabularDataSupport _messageList = new TabularDataSupport(_messagelistDataType); - - // Create the tabular list of message header contents - for (int i = beginIndex; i <= endIndex && i <= list.size(); i++) - { - AMQMessage msg = list.get(i - 1); - ContentHeaderBody headerBody = msg.getContentHeaderBody(); - // Create header attributes list - BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) headerBody.properties; - List<String> headerAttribsList = new ArrayList<String>(); - headerAttribsList.add("App Id=" + headerProperties.getAppId()); - headerAttribsList.add("MimeType=" + headerProperties.getContentType()); - headerAttribsList.add("Correlation Id=" + headerProperties.getCorrelationId()); - headerAttribsList.add("Encoding=" + headerProperties.getEncoding()); - headerAttribsList.add(headerProperties.toString()); - - Object[] itemValues = {msg.getMessageId(), headerAttribsList.toArray(new String[0]), - headerBody.bodySize, msg.isRedelivered()}; - CompositeData messageData = new CompositeDataSupport(_messageDataType, _msgAttributeNames, itemValues); - _messageList.put(messageData); - } - - return _messageList; - } - - /** - * returns Notifications sent by this MBean. - */ - @Override - public MBeanNotificationInfo[] getNotificationInfo() - { - String[] notificationTypes = new String[]{MonitorNotification.THRESHOLD_VALUE_EXCEEDED}; - String name = MonitorNotification.class.getName(); - String description = "Either Message count or Queue depth or Message size has reached threshold high value"; - MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description); - - return new MBeanNotificationInfo[]{info1}; - } - - } // End of AMQMBean class - public AMQQueue(String name, boolean durable, String owner, boolean autoDelete, QueueRegistry queueRegistry) throws AMQException @@ -523,33 +193,32 @@ public class AMQQueue implements Managable, Comparable { if (System.getProperties().getProperty("deliverymanager").equals("ConcurrentSelectorDeliveryManager")) { - _logger.warn("Using ConcurrentSelectorDeliveryManager"); + _logger.info("Using ConcurrentSelectorDeliveryManager"); _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this); } else if (System.getProperties().getProperty("deliverymanager").equals("ConcurrentDeliveryManager")) { - _logger.warn("Using ConcurrentDeliveryManager"); + _logger.info("Using ConcurrentDeliveryManager"); _deliveryMgr = new ConcurrentDeliveryManager(_subscribers, this); } else { - _logger.warn("Using SynchronizedDeliveryManager"); + _logger.info("Using SynchronizedDeliveryManager"); _deliveryMgr = new SynchronizedDeliveryManager(_subscribers, this); } } else { - _logger.warn("Using SynchronizedDeliveryManager"); + _logger.info("Using SynchronizedDeliveryManager"); _deliveryMgr = new SynchronizedDeliveryManager(_subscribers, this); } - } private AMQQueueMBean createMBean() throws AMQException { try { - return new AMQQueueMBean(); + return new AMQQueueMBean(this); } catch (JMException ex) { @@ -582,16 +251,112 @@ public class AMQQueue implements Managable, Comparable return _autoDelete; } + /** + * @return no of messages(undelivered) on the queue. + */ public int getMessageCount() { return _deliveryMgr.getQueueMessageCount(); } + /** + * @return List of messages(undelivered) on the queue. + */ + public List<AMQMessage> getMessagesOnTheQueue() + { + return _deliveryMgr.getMessages(); + } + + /** + * @param messageId + * @return AMQMessage with give id if exists. null if AMQMessage with given id doesn't exist. + */ + public AMQMessage getMessageOnTheQueue(long messageId) + { + List<AMQMessage> list = getMessagesOnTheQueue(); + AMQMessage msg = null; + for (AMQMessage message : list) + { + if (message.getMessageId() == messageId) + { + msg = message; + break; + } + } + + return msg; + } + + /** + * @return MBean object associated with this Queue + */ public ManagedObject getManagedObject() { return _managedObject; } + public Long getMaximumMessageSize() + { + return _maxMessageSize; + } + + public void setMaximumMessageSize(Long value) + { + _maxMessageSize = value; + } + + public Integer getConsumerCount() + { + return _subscribers.size(); + } + + public Integer getActiveConsumerCount() + { + return _subscribers.getWeight(); + } + + public Long getReceivedMessageCount() + { + return _totalMessagesReceived; + } + + public Integer getMaximumMessageCount() + { + return _maxMessageCount; + } + + public void setMaximumMessageCount(Integer value) + { + _maxMessageCount = value; + } + + public Long getMaximumQueueDepth() + { + return _maxQueueDepth; + } + + // Sets the queue depth, the max queue size + public void setMaximumQueueDepth(Long value) + { + _maxQueueDepth = value; + } + + /** + * Removes the AMQMessage from the top of the queue. + */ + public void deleteMessageFromTop() throws AMQException + { + _deliveryMgr.removeAMessageFromTop(); + } + + /** + * removes all the messages from the queue. + */ + public void clearQueue() throws AMQException + { + _deliveryMgr.clearAllMessages(); + } + public void bind(String routingKey, Exchange exchange) { _bindings.addBinding(routingKey, exchange); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java new file mode 100644 index 0000000000..54dd366d71 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -0,0 +1,343 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.server.management.MBeanDescription; +import org.apache.qpid.server.management.AMQManagedObject; +import org.apache.qpid.server.management.MBeanConstructor; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.mina.common.ByteBuffer; + +import javax.management.openmbean.*; +import javax.management.JMException; +import javax.management.Notification; +import javax.management.MBeanException; +import javax.management.MBeanNotificationInfo; +import javax.management.OperationsException; +import javax.management.monitor.MonitorNotification; +import java.util.List; +import java.util.ArrayList; + +/** + * MBean class for AMQQueue. It implements all the management features exposed + * for an AMQQueue. + */ +@MBeanDescription("Management Interface for AMQQueue") +public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue +{ + private AMQQueue _queue = null; + private String _queueName = null; + // OpenMBean data types for viewMessages method + private String[] _msgAttributeNames = {"Message Id", "Header", "Size(bytes)", "Redelivered"}; + private String[] _msgAttributeIndex = {_msgAttributeNames[0]}; + private OpenType[] _msgAttributeTypes = new OpenType[4]; // AMQ message attribute types. + private CompositeType _messageDataType = null; // Composite type for representing AMQ Message data. + private TabularType _messagelistDataType = null; // Datatype for representing AMQ messages list. + + // OpenMBean data types for viewMessageContent method + private CompositeType _msgContentType = null; + private String[] _msgContentAttributes = {"Message Id", "MimeType", "Encoding", "Content"}; + private OpenType[] _msgContentAttributeTypes = new OpenType[4]; + + @MBeanConstructor("Creates an MBean exposing an AMQQueue") + public AMQQueueMBean(AMQQueue queue) throws JMException + { + super(ManagedQueue.class, ManagedQueue.TYPE); + _queue = queue; + _queueName = jmxEncode(new StringBuffer(queue.getName()), 0).toString(); + init(); + } + + /** + * initialises the openmbean data types + */ + private void init() throws OpenDataException + { + _msgContentAttributeTypes[0] = SimpleType.LONG; // For message id + _msgContentAttributeTypes[1] = SimpleType.STRING; // For MimeType + _msgContentAttributeTypes[2] = SimpleType.STRING; // For Encoding + _msgContentAttributeTypes[3] = new ArrayType(1, SimpleType.BYTE); // For message content + _msgContentType = new CompositeType("Message Content", "AMQ Message Content", _msgContentAttributes, + _msgContentAttributes, _msgContentAttributeTypes); + + _msgAttributeTypes[0] = SimpleType.LONG; // For message id + _msgAttributeTypes[1] = new ArrayType(1, SimpleType.STRING); // For header attributes + _msgAttributeTypes[2] = SimpleType.LONG; // For size + _msgAttributeTypes[3] = SimpleType.BOOLEAN; // For redelivered + + _messageDataType = new CompositeType("Message", "AMQ Message", _msgAttributeNames, _msgAttributeNames, _msgAttributeTypes); + _messagelistDataType = new TabularType("Messages", "List of messages", _messageDataType, _msgAttributeIndex); + } + + public String getObjectInstanceName() + { + return _queueName; + } + + public String getName() + { + return _queueName; + } + + public boolean isDurable() + { + return _queue.isDurable(); + } + + public String getOwner() + { + return _queue.getOwner(); + } + + public boolean isAutoDelete() + { + return _queue.isAutoDelete(); + } + + public Integer getMessageCount() + { + return _queue.getMessageCount(); + } + + public Long getMaximumMessageSize() + { + return _queue.getMaximumMessageSize(); + } + + public void setMaximumMessageSize(Long value) + { + _queue.setMaximumMessageSize(value); + } + + public Integer getConsumerCount() + { + return _queue.getConsumerCount(); + } + + public Integer getActiveConsumerCount() + { + return _queue.getActiveConsumerCount(); + } + + public Long getReceivedMessageCount() + { + return _queue.getReceivedMessageCount(); + } + + public Integer getMaximumMessageCount() + { + return _queue.getMaximumMessageCount(); + } + + public void setMaximumMessageCount(Integer value) + { + _queue.setMaximumMessageCount(value); + } + + public Long getMaximumQueueDepth() + { + return _queue.getMaximumQueueDepth(); + } + + public void setMaximumQueueDepth(Long value) + { + _queue.setMaximumQueueDepth(value); + } + + /** + * returns the size of messages(KB) in the queue. + */ + public Long getQueueDepth() + { + List<AMQMessage> list = _queue.getMessagesOnTheQueue(); + if (list.size() == 0) + { + return 0l; + } + + long queueDepth = 0; + for (AMQMessage message : list) + { + queueDepth = queueDepth + getMessageSize(message); + } + return (long) Math.round(queueDepth / 1000); + } + + /** + * returns size of message in bytes + */ + private long getMessageSize(AMQMessage msg) + { + if (msg == null) + { + return 0l; + } + + return msg.getContentHeaderBody().bodySize; + } + + /** + * Checks if there is any notification to be send to the listeners + */ + public void checkForNotification(AMQMessage msg) + { + // Check for threshold message count + Integer msgCount = getMessageCount(); + if (msgCount >= getMaximumMessageCount()) + { + notifyClients("Message count(" + msgCount + ") has reached or exceeded the threshold high value"); + } + + // Check for threshold message size + long messageSize = getMessageSize(msg); + if (messageSize >= _queue.getMaximumMessageSize()) + { + notifyClients("Message size(ID=" + msg.getMessageId() + ", size=" + messageSize + " bytes) is higher than the threshold value"); + } + + // Check for threshold queue depth in bytes + long queueDepth = getQueueDepth(); + if (queueDepth >= _queue.getMaximumQueueDepth()) + { + notifyClients("Queue depth(" + queueDepth + "), Queue size has reached the threshold high value"); + } + } + + /** + * Sends the notification to the listeners + */ + private void notifyClients(String notificationMsg) + { + Notification n = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, + ++_notificationSequenceNumber, System.currentTimeMillis(), notificationMsg); + + _broadcaster.sendNotification(n); + } + + /** + * @see org.apache.qpid.server.queue.AMQQueue#deleteMessageFromTop() + */ + public void deleteMessageFromTop() throws JMException + { + try + { + _queue.deleteMessageFromTop(); + } + catch (AMQException ex) + { + throw new MBeanException(ex, ex.toString()); + } + } + + /** + * @see org.apache.qpid.server.queue.AMQQueue#clearQueue() + */ + public void clearQueue() throws JMException + { + try + { + _queue.clearQueue(); + } + catch (AMQException ex) + { + throw new MBeanException(ex, ex.toString()); + } + } + + /** + * returns message content as byte array and related attributes for the given message id. + */ + public CompositeData viewMessageContent(long msgId) throws JMException + { + AMQMessage msg = _queue.getMessageOnTheQueue(msgId); + if (msg == null) + { + throw new OperationsException("AMQMessage with message id = " + msgId + " is not in the " + _queueName); + } + // get message content + List<ContentBody> cBodies = msg.getContentBodies(); + List<Byte> msgContent = new ArrayList<Byte>(); + for (ContentBody body : cBodies) + { + if (body.getSize() != 0) + { + ByteBuffer slice = body.payload.slice(); + for (int j = 0; j < slice.limit(); j++) + { + msgContent.add(slice.get()); + } + } + } + + // Create header attributes list + BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) msg.getContentHeaderBody().properties; + String mimeType = headerProperties.getContentType(); + String encoding = headerProperties.getEncoding() == null ? "" : headerProperties.getEncoding(); + Object[] itemValues = {msgId, mimeType, encoding, msgContent.toArray(new Byte[0])}; + + return new CompositeDataSupport(_msgContentType, _msgContentAttributes, itemValues); + } + + /** + * Returns the header contents of the messages stored in this queue in tabular form. + */ + public TabularData viewMessages(int beginIndex, int endIndex) throws JMException + { + if ((beginIndex > endIndex) || (beginIndex < 1)) + { + throw new OperationsException("From Index = " + beginIndex + ", To Index = " + endIndex + + "\n\"From Index\" should be greater than 0 and less than \"To Index\""); + } + + List<AMQMessage> list = _queue.getMessagesOnTheQueue(); + TabularDataSupport _messageList = new TabularDataSupport(_messagelistDataType); + + // Create the tabular list of message header contents + for (int i = beginIndex; i <= endIndex && i <= list.size(); i++) + { + AMQMessage msg = list.get(i - 1); + ContentHeaderBody headerBody = msg.getContentHeaderBody(); + // Create header attributes list + BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) headerBody.properties; + String[] headerAttributes = headerProperties.toString().split(","); + Object[] itemValues = {msg.getMessageId(), headerAttributes, headerBody.bodySize, msg.isRedelivered()}; + CompositeData messageData = new CompositeDataSupport(_messageDataType, _msgAttributeNames, itemValues); + _messageList.put(messageData); + } + + return _messageList; + } + + /** + * returns Notifications sent by this MBean. + */ + @Override + public MBeanNotificationInfo[] getNotificationInfo() + { + String[] notificationTypes = new String[]{MonitorNotification.THRESHOLD_VALUE_EXCEEDED}; + String name = MonitorNotification.class.getName(); + String description = "Either Message count or Queue depth or Message size has reached threshold high value"; + MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description); + + return new MBeanNotificationInfo[]{info1}; + } + +} // End of AMQMBean class diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java index dde76e5ba8..f9c8898182 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java @@ -281,7 +281,12 @@ public class ConcurrentDeliveryManager implements DeliveryManager //are we already running? if so, don't re-run if (_processing.compareAndSet(false, true)) { - executor.execute(asyncDelivery); + // Do we need this? + // This executor is created via Executors in AsyncDeliveryConfig which only returns a TPE so cast is ok. + //if (executor != null && !((ThreadPoolExecutor) executor).isShutdown()) + { + executor.execute(asyncDelivery); + } } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java new file mode 100644 index 0000000000..d8bb6e1948 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -0,0 +1,352 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize; +import org.apache.qpid.configuration.Configured; +import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.server.configuration.Configurator; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.Executor; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.atomic.AtomicBoolean; + + +/** + * Manages delivery of messages on behalf of a queue + */ +public class ConcurrentSelectorDeliveryManager implements DeliveryManager +{ + private static final Logger _log = Logger.getLogger(ConcurrentSelectorDeliveryManager.class); + + @Configured(path = "advanced.compressBufferOnQueue", + defaultValue = "false") + public boolean compressBufferOnQueue; + /** + * Holds any queued messages + */ + private final Queue<AMQMessage> _messages = new ConcurrentLinkedQueueAtomicSize<AMQMessage>(); + //private int _messageCount; + /** + * Ensures that only one asynchronous task is running for this manager at + * any time. + */ + private final AtomicBoolean _processing = new AtomicBoolean(); + /** + * The subscriptions on the queue to whom messages are delivered + */ + private final SubscriptionManager _subscriptions; + + /** + * A reference to the queue we are delivering messages for. We need this to be able + * to pass the code that handles acknowledgements a handle on the queue. + */ + private final AMQQueue _queue; + + + /** + * Lock used to ensure that an channel that becomes unsuspended during the start of the queueing process is forced + * to wait till the first message is added to the queue. This will ensure that the _queue has messages to be delivered + * via the async thread. + * <p/> + * Lock is used to control access to hasQueuedMessages() and over the addition of messages to the queue. + */ + private ReentrantLock _lock = new ReentrantLock(); + + + ConcurrentSelectorDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue) + { + + //Set values from configuration + Configurator.configure(this); + + if (compressBufferOnQueue) + { + _log.warn("Compressing Buffers on queue."); + } + + _subscriptions = subscriptions; + _queue = queue; + } + + + private boolean addMessageToQueue(AMQMessage msg) + { + // Shrink the ContentBodies to their actual size to save memory. + if (compressBufferOnQueue) + { + Iterator it = msg.getContentBodies().iterator(); + while (it.hasNext()) + { + ContentBody cb = (ContentBody) it.next(); + cb.reduceBufferToFit(); + } + } + + _messages.offer(msg); + + return true; + } + + + public boolean hasQueuedMessages() + { + _lock.lock(); + try + { + return !_messages.isEmpty(); + } + finally + { + _lock.unlock(); + } + } + + public int getQueueMessageCount() + { + return getMessageCount(); + } + + /** + * This is an EXPENSIVE opperation to perform with a ConcurrentLinkedQueue as it must run the queue to determine size. + * The ConcurrentLinkedQueueAtomicSize uses an AtomicInteger to record the number of elements on the queue. + * + * @return int the number of messages in the delivery queue. + */ + private int getMessageCount() + { + return _messages.size(); + } + + + public synchronized List<AMQMessage> getMessages() + { + return new ArrayList<AMQMessage>(_messages); + } + + public synchronized void removeAMessageFromTop() throws AMQException + { + AMQMessage msg = poll(); + if (msg != null) + { + msg.dequeue(_queue); + } + } + + public synchronized void clearAllMessages() throws AMQException + { + AMQMessage msg = poll(); + while (msg != null) + { + msg.dequeue(_queue); + msg = poll(); + } + } + + + private AMQMessage getNextMessage(Queue<AMQMessage> messages) + { + AMQMessage message = messages.peek(); + + while (message != null && message.taken()) + { + //remove the already taken message + messages.poll(); + // try the next message + message = messages.peek(); + } + return message; + } + + public void sendNextMessage(Subscription sub, Queue<AMQMessage> messageQueue, AMQQueue queue) + { + AMQMessage message = null; + try + { + message = getNextMessage(messageQueue); + + // message will be null if we have no messages in the messageQueue. + if (message == null) + { + return; + } + _log.info("Async Delivery Message:" + message + " to :" + sub); + + sub.send(message, queue); + message.setDeliveredToConsumer(); + + //remove sent message from our queue. + messageQueue.poll(); + } + catch (FailedDequeueException e) + { + message.release(); + _log.error("Unable to deliver message as dequeue failed: " + e, e); + } + } + + /** + * Only one thread should ever execute this method concurrently, but + * it can do so while other threads invoke deliver(). + */ + private void processQueue() + { + // Continue to process delivery while we haveSubscribers and messages + boolean hasSubscribers = _subscriptions.hasActiveSubscribers(); + + while (hasSubscribers && hasQueuedMessages()) + { + for (Subscription sub : _subscriptions.getSubscriptions()) + { + if (!sub.isSuspended()) + { + if (sub.hasFilters()) + { + sendNextMessage(sub, sub.getPreDeliveryQueue(), _queue); + } + else + { + sendNextMessage(sub, _messages, _queue); + } + + hasSubscribers = true; + } + else + { + hasSubscribers = false; + } + } + } + } + + private AMQMessage poll() + { + return _messages.poll(); + } + + public void deliver(String name, AMQMessage msg) throws FailedDequeueException + { + _log.info("deliver :" + msg); + + //Check if we have someone to deliver the message to. + _lock.lock(); + try + { + Subscription s = _subscriptions.nextSubscriber(msg); + + if (s == null) //no-one can take the message right now. + { + _log.info("Testing Message(" + msg + ") for Queued Delivery"); + if (!msg.isImmediate()) + { + addMessageToQueue(msg); + + //release lock now message is on queue. + _lock.unlock(); + + //Pre Deliver to all subscriptions + _log.info("We have " + _subscriptions.getSubscriptions().size() + " subscribers to give the message to."); + for (Subscription sub : _subscriptions.getSubscriptions()) + { + + // stop if the message gets delivered whilst PreDelivering if we have a shared queue. + if (_queue.isShared() && msg.getDeliveredToConsumer()) + { + _log.info("Stopping PreDelivery as message(" + msg + ") is already delivered."); + continue; + } + + // Only give the message to those that want them. + if (sub.hasFilters() && sub.hasInterest(msg)) + { + sub.enqueueForPreDelivery(msg); + } + } + } + } + else + { + //release lock now + _lock.unlock(); + + _log.info("Delivering Message:" + msg + " to(" + System.identityHashCode(s) + ") :" + s); + //Deliver the message + s.send(msg, _queue); + msg.setDeliveredToConsumer(); + } + } + finally + { + //ensure lock is released + if (_lock.isLocked()) + { + _lock.unlock(); + } + } + } + + Runner asyncDelivery = new Runner(); + + private class Runner implements Runnable + { + public void run() + { + boolean running = true; + while (running) + { + processQueue(); + + //Check that messages have not been added since we did our last peek(); + // Synchronize with the thread that adds to the queue. + // If the queue is still empty then we can exit + + if (!(hasQueuedMessages() && _subscriptions.hasActiveSubscribers())) + { + running = false; + _processing.set(false); + } + } + } + } + + public void processAsync(Executor executor) + { + _log.debug("Processing Async. Queued:" + hasQueuedMessages() + "(" + getQueueMessageCount() + ")" + + " Active:" + _subscriptions.hasActiveSubscribers() + + " Processing:" + _processing.get()); + + if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers()) + { + //are we already running? if so, don't re-run + if (_processing.compareAndSet(false, true)) + { + executor.execute(asyncDelivery); + } + } + } + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index 7105a3f30f..79b0593f69 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -118,6 +118,12 @@ public class SubscriptionImpl implements Subscription } } + public SubscriptionImpl(int channel, AMQProtocolSession protocolSession, + String consumerTag) + throws AMQException + { + this(channel, protocolSession, consumerTag, false); + } public boolean equals(Object o) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java index 301182a313..49b0111b67 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java @@ -206,7 +206,12 @@ class SynchronizedDeliveryManager implements DeliveryManager //are we already running? if so, don't re-run if (_processing.compareAndSet(false, true)) { - executor.execute(new Runner()); + // Do we need this? + // This executor is created via Executors in AsyncDeliveryConfig which only returns a TPE so cast is ok. + //if (executor != null && !((ThreadPoolExecutor) executor).isShutdown()) + { + executor.execute(new Runner()); + } } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java index 5e88ff7f2d..7a13884136 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java @@ -109,6 +109,7 @@ public class AMQStateManager implements AMQMethodListener frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance()); frame2handlerMap.put(ExchangeDeclareBody.class, ExchangeDeclareHandler.getInstance()); frame2handlerMap.put(ExchangeDeleteBody.class, ExchangeDeleteHandler.getInstance()); + frame2handlerMap.put(ExchangeBoundBody.class, ExchangeBoundHandler.getInstance()); frame2handlerMap.put(BasicAckBody.class, BasicAckMethodHandler.getInstance()); frame2handlerMap.put(BasicRecoverBody.class, BasicRecoverMethodHandler.getInstance()); frame2handlerMap.put(BasicConsumeBody.class, BasicConsumeMethodHandler.getInstance()); |