diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2008-02-21 10:09:03 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2008-02-21 10:09:03 +0000 |
commit | 3047c0ec2d581f4b51c77fec84fbf0bec8599573 (patch) | |
tree | 7ba966b95105a3576cf2fc9150b6b9dd322f4b14 /java/broker/src | |
parent | 3aed99f65d795c234faa9b584182cf3ea8c67b4a (diff) | |
download | qpid-python-3047c0ec2d581f4b51c77fec84fbf0bec8599573.tar.gz |
QPID-790 : Performance Improvements
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@629731 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src')
22 files changed, 624 insertions, 310 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 10184a79e5..3cb50d1d12 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -75,7 +75,7 @@ public class AMQChannel * The delivery tag is unique per channel. This is pre-incremented before putting into the deliver frame so that * value of this represents the <b>last</b> tag sent out */ - private AtomicLong _deliveryTag = new AtomicLong(0); + private long _deliveryTag = 0; /** A channel has a default queue (the last declared) that is used when no queue name is explictily set */ private AMQQueue _defaultQueue; @@ -99,8 +99,6 @@ public class AMQChannel private final AtomicBoolean _suspended = new AtomicBoolean(false); - private final MessageRouter _exchanges; - private TransactionalContext _txnContext, _nonTransactedContext; /** @@ -124,7 +122,7 @@ public class AMQChannel public boolean ENABLE_JMSXUserID; - public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore, MessageRouter exchanges) + public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore) throws AMQException { //Set values from configuration @@ -136,7 +134,7 @@ public class AMQChannel _prefetch_HighWaterMark = DEFAULT_PREFETCH; _prefetch_LowWaterMark = _prefetch_HighWaterMark / 2; _messageStore = messageStore; - _exchanges = exchanges; + // by default the session is non-transactional _txnContext = new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks); } @@ -297,7 +295,7 @@ public class AMQChannel public long getNextDeliveryTag() { - return _deliveryTag.incrementAndGet(); + return ++_deliveryTag; } public int getNextConsumerTag() @@ -969,16 +967,19 @@ public class AMQChannel public void processReturns(AMQProtocolSession session) throws AMQException { - for (RequiredDeliveryException bouncedMessage : _returnMessages) + if(!_returnMessages.isEmpty()) { - AMQMessage message = bouncedMessage.getAMQMessage(); - session.getProtocolOutputConverter().writeReturn(message, _channelId, bouncedMessage.getReplyCode().getCode(), - new AMQShortString(bouncedMessage.getMessage())); + for (RequiredDeliveryException bouncedMessage : _returnMessages) + { + AMQMessage message = bouncedMessage.getAMQMessage(); + session.getProtocolOutputConverter().writeReturn(message, _channelId, bouncedMessage.getReplyCode().getCode(), + new AMQShortString(bouncedMessage.getMessage())); - message.decrementReference(_storeContext); - } + message.decrementReference(_storeContext); + } - _returnMessages.clear(); + _returnMessages.clear(); + } } public boolean wouldSuspend(AMQMessage msg) diff --git a/java/broker/src/main/java/org/apache/qpid/server/Main.java b/java/broker/src/main/java/org/apache/qpid/server/Main.java index ab9f40b31d..d8a8cfb6d1 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/Main.java +++ b/java/broker/src/main/java/org/apache/qpid/server/Main.java @@ -35,6 +35,7 @@ import org.apache.log4j.xml.DOMConfigurator; import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.IoAcceptor; import org.apache.mina.common.SimpleByteBufferAllocator; +import org.apache.mina.common.FixedSizeByteBufferAllocator; import org.apache.mina.transport.socket.nio.SocketAcceptorConfig; import org.apache.mina.transport.socket.nio.SocketSessionConfig; import org.apache.qpid.AMQException; @@ -275,7 +276,7 @@ public class Main // once more testing of the performance of the simple allocator has been done if (!connectorConfig.enablePooledAllocator) { - ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); + ByteBuffer.setAllocator(new FixedSizeByteBufferAllocator()); } int port = connectorConfig.port; diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java index b6b6ee39ce..12347c0278 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java @@ -212,10 +212,9 @@ public class DestNameExchange extends AbstractExchange _logger.debug("Publishing message to queue " + queues); } - for (AMQQueue q : queues) - { - payload.enqueue(q); - } + payload.enqueue(queues); + + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java index 19172b98f3..dbe7a8938a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java @@ -26,6 +26,7 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.AMQShortStringTokenizer; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.management.MBeanConstructor; import org.apache.qpid.server.management.MBeanDescription; @@ -40,11 +41,7 @@ import javax.management.openmbean.CompositeDataSupport; import javax.management.openmbean.OpenDataException; import javax.management.openmbean.TabularData; import javax.management.openmbean.TabularDataSupport; -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.StringTokenizer; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -84,12 +81,21 @@ public class DestWildExchange extends AbstractExchange private static final Logger _logger = Logger.getLogger(DestWildExchange.class); - private ConcurrentHashMap<AMQShortString, List<AMQQueue>> _routingKey2queues = + private final ConcurrentHashMap<AMQShortString, List<AMQQueue>> _bindingKey2queues = + new ConcurrentHashMap<AMQShortString, List<AMQQueue>>(); + private final ConcurrentHashMap<AMQShortString, List<AMQQueue>> _simpleBindingKey2queues = + new ConcurrentHashMap<AMQShortString, List<AMQQueue>>(); + private final ConcurrentHashMap<AMQShortString, List<AMQQueue>> _wildCardBindingKey2queues = new ConcurrentHashMap<AMQShortString, List<AMQQueue>>(); // private ConcurrentHashMap<AMQShortString, AMQQueue> _routingKey2queue = new ConcurrentHashMap<AMQShortString, AMQQueue>(); - private static final String TOPIC_SEPARATOR = "."; - private static final String AMQP_STAR = "*"; - private static final String AMQP_HASH = "#"; + private static final byte TOPIC_SEPARATOR = (byte)'.'; + private static final AMQShortString TOPIC_SEPARATOR_AS_SHORTSTRING = new AMQShortString("."); + private static final AMQShortString AMQP_STAR_TOKEN = new AMQShortString("*"); + private static final AMQShortString AMQP_HASH_TOKEN = new AMQShortString("#"); + private ConcurrentHashMap<AMQShortString, AMQShortString[]> _bindingKey2Tokenized = + new ConcurrentHashMap<AMQShortString, AMQShortString[]>(); + private static final byte HASH_BYTE = (byte)'#'; + private static final byte STAR_BYTE = (byte)'*'; /** DestWildExchangeMBean class implements the management interface for the Topic exchanges. */ @MBeanDescription("Management Bean for Topic Exchange") @@ -107,7 +113,7 @@ public class DestWildExchange extends AbstractExchange public TabularData bindings() throws OpenDataException { _bindingList = new TabularDataSupport(_bindinglistDataType); - for (Map.Entry<AMQShortString, List<AMQQueue>> entry : _routingKey2queues.entrySet()) + for (Map.Entry<AMQShortString, List<AMQQueue>> entry : _bindingKey2queues.entrySet()) { AMQShortString key = entry.getKey(); List<String> queueList = new ArrayList<String>(); @@ -156,27 +162,75 @@ public class DestWildExchange extends AbstractExchange assert queue != null; assert rKey != null; - AMQShortString routingKey = normalize(rKey); + _logger.debug("Registering queue " + queue.getName() + " with routing key " + rKey); - _logger.debug("Registering queue " + queue.getName() + " with routing key " + routingKey); // we need to use putIfAbsent, which is an atomic operation, to avoid a race condition - List<AMQQueue> queueList = _routingKey2queues.putIfAbsent(routingKey, new CopyOnWriteArrayList<AMQQueue>()); + List<AMQQueue> queueList = _bindingKey2queues.putIfAbsent(rKey, new CopyOnWriteArrayList<AMQQueue>()); + + + + + + + // if we got null back, no previous value was associated with the specified routing key hence // we need to read back the new value just put into the map if (queueList == null) { - queueList = _routingKey2queues.get(routingKey); + queueList = _bindingKey2queues.get(rKey); } + + if (!queueList.contains(queue)) { queueList.add(queue); + + + if(rKey.contains(HASH_BYTE) || rKey.contains(STAR_BYTE)) + { + AMQShortString routingKey = normalize(rKey); + List<AMQQueue> queueList2 = _wildCardBindingKey2queues.putIfAbsent(routingKey, new CopyOnWriteArrayList<AMQQueue>()); + + if(queueList2 == null) + { + queueList2 = _wildCardBindingKey2queues.get(routingKey); + AMQShortStringTokenizer keyTok = routingKey.tokenize(TOPIC_SEPARATOR); + + ArrayList<AMQShortString> keyTokList = new ArrayList<AMQShortString>(keyTok.countTokens()); + + while (keyTok.hasMoreTokens()) + { + keyTokList.add(keyTok.nextToken()); + } + + _bindingKey2Tokenized.put(routingKey, keyTokList.toArray(new AMQShortString[keyTokList.size()])); + } + queueList2.add(queue); + + } + else + { + List<AMQQueue> queueList2 = _simpleBindingKey2queues.putIfAbsent(rKey, new CopyOnWriteArrayList<AMQQueue>()); + if(queueList2 == null) + { + queueList2 = _simpleBindingKey2queues.get(rKey); + } + queueList2.add(queue); + + } + + + + } else if (_logger.isDebugEnabled()) { - _logger.debug("Queue " + queue + " is already registered with routing key " + routingKey); + _logger.debug("Queue " + queue + " is already registered with routing key " + rKey); } + + } private AMQShortString normalize(AMQShortString routingKey) @@ -186,53 +240,58 @@ public class DestWildExchange extends AbstractExchange routingKey = AMQShortString.EMPTY_STRING; } - StringTokenizer routingTokens = new StringTokenizer(routingKey.toString(), TOPIC_SEPARATOR); - List<String> _subscription = new ArrayList<String>(); + AMQShortStringTokenizer routingTokens = routingKey.tokenize(TOPIC_SEPARATOR); + + List<AMQShortString> subscriptionList = new ArrayList<AMQShortString>(); while (routingTokens.hasMoreTokens()) { - _subscription.add(routingTokens.nextToken()); + subscriptionList.add(routingTokens.nextToken()); } - int size = _subscription.size(); + int size = subscriptionList.size(); for (int index = 0; index < size; index++) { // if there are more levels if ((index + 1) < size) { - if (_subscription.get(index).equals(AMQP_HASH)) + if (subscriptionList.get(index).equals(AMQP_HASH_TOKEN)) { - if (_subscription.get(index + 1).equals(AMQP_HASH)) + if (subscriptionList.get(index + 1).equals(AMQP_HASH_TOKEN)) { // we don't need #.# delete this one - _subscription.remove(index); + subscriptionList.remove(index); size--; // redo this normalisation index--; } - if (_subscription.get(index + 1).equals(AMQP_STAR)) + if (subscriptionList.get(index + 1).equals(AMQP_STAR_TOKEN)) { // we don't want #.* swap to *.# // remove it and put it in at index + 1 - _subscription.add(index + 1, _subscription.remove(index)); + subscriptionList.add(index + 1, subscriptionList.remove(index)); } } } // if we have more levels } - StringBuilder sb = new StringBuilder(); - for (String s : _subscription) + + AMQShortString normalizedString = AMQShortString.join(subscriptionList, TOPIC_SEPARATOR_AS_SHORTSTRING); +/* + StringBuilder sb = new StringBuilder(); + for (AMQShortString s : subscriptionList) { sb.append(s); sb.append(TOPIC_SEPARATOR); } sb.deleteCharAt(sb.length() - 1); +*/ - return new AMQShortString(sb.toString()); + return normalizedString; } public void route(AMQMessage payload) throws AMQException @@ -254,19 +313,14 @@ public class DestWildExchange extends AbstractExchange else { _logger.warn("No queues found for routing key " + routingKey); - _logger.warn("Routing map contains: " + _routingKey2queues); + _logger.warn("Routing map contains: " + _bindingKey2queues); return; } } - for (AMQQueue q : queues) - { - // TODO: modify code generator to add clone() method then clone the deliver body - // without this addition we have a race condition - we will be modifying the body - // before the encoder has encoded the body for delivery - payload.enqueue(q); - } + payload.enqueue(queues); + } public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue) @@ -276,21 +330,21 @@ public class DestWildExchange extends AbstractExchange public boolean isBound(AMQShortString routingKey, AMQQueue queue) { - List<AMQQueue> queues = _routingKey2queues.get(normalize(routingKey)); + List<AMQQueue> queues = _bindingKey2queues.get(normalize(routingKey)); return (queues != null) && queues.contains(queue); } public boolean isBound(AMQShortString routingKey) { - List<AMQQueue> queues = _routingKey2queues.get(normalize(routingKey)); + List<AMQQueue> queues = _bindingKey2queues.get(normalize(routingKey)); return (queues != null) && !queues.isEmpty(); } public boolean isBound(AMQQueue queue) { - for (List<AMQQueue> queues : _routingKey2queues.values()) + for (List<AMQQueue> queues : _bindingKey2queues.values()) { if (queues.contains(queue)) { @@ -303,7 +357,7 @@ public class DestWildExchange extends AbstractExchange public boolean hasBindings() { - return !_routingKey2queues.isEmpty(); + return !_bindingKey2queues.isEmpty(); } public synchronized void deregisterQueue(AMQShortString rKey, AMQQueue queue, FieldTable args) throws AMQException @@ -311,13 +365,11 @@ public class DestWildExchange extends AbstractExchange assert queue != null; assert rKey != null; - AMQShortString routingKey = normalize(rKey); - - List<AMQQueue> queues = _routingKey2queues.get(routingKey); + List<AMQQueue> queues = _bindingKey2queues.get(rKey); if (queues == null) { throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue + " was not registered with exchange " + this.getName() - + " with routing key " + routingKey + ". No queue was registered with that _routing key"); + + " with routing key " + rKey + ". No queue was registered with that _routing key"); } @@ -325,12 +377,39 @@ public class DestWildExchange extends AbstractExchange if (!removedQ) { throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue + " was not registered with exchange " + this.getName() - + " with routing key " + routingKey); + + " with routing key " + rKey); + } + + + if(rKey.contains(HASH_BYTE) || rKey.contains(STAR_BYTE)) + { + AMQShortString bindingKey = normalize(rKey); + List<AMQQueue> queues2 = _wildCardBindingKey2queues.get(bindingKey); + queues2.remove(queue); + if(queues2.isEmpty()) + { + _wildCardBindingKey2queues.remove(bindingKey); + _bindingKey2Tokenized.remove(bindingKey); + } + } + else + { + List<AMQQueue> queues2 = _simpleBindingKey2queues.get(rKey); + queues2.remove(queue); + if(queues2.isEmpty()) + { + _simpleBindingKey2queues.remove(rKey); + } + + } + + + if (queues.isEmpty()) { - _routingKey2queues.remove(routingKey); + _bindingKey2queues.remove(rKey); } } @@ -349,117 +428,167 @@ public class DestWildExchange extends AbstractExchange public Map<AMQShortString, List<AMQQueue>> getBindings() { - return _routingKey2queues; + return _bindingKey2queues; } private List<AMQQueue> getMatchedQueues(AMQShortString routingKey) { - List<AMQQueue> list = new LinkedList<AMQQueue>(); - StringTokenizer routingTokens = new StringTokenizer(routingKey.toString(), TOPIC_SEPARATOR); - ArrayList<String> routingkeyList = new ArrayList<String>(); + List<AMQQueue> list = null; - while (routingTokens.hasMoreTokens()) + if(!_wildCardBindingKey2queues.isEmpty()) { - String next = routingTokens.nextToken(); - if (next.equals(AMQP_HASH) && routingkeyList.get(routingkeyList.size() - 1).equals(AMQP_HASH)) - { - continue; - } - routingkeyList.add(next); - } - for (AMQShortString queue : _routingKey2queues.keySet()) - { - StringTokenizer queTok = new StringTokenizer(queue.toString(), TOPIC_SEPARATOR); + AMQShortStringTokenizer routingTokens = routingKey.tokenize(TOPIC_SEPARATOR); + + final int routingTokensCount = routingTokens.countTokens(); - ArrayList<String> queueList = new ArrayList<String>(); - while (queTok.hasMoreTokens()) + AMQShortString[] routingkeyTokens = new AMQShortString[routingTokensCount]; + + if(routingTokensCount == 1) { - queueList.add(queTok.nextToken()); + routingkeyTokens[0] =routingKey; } + else + { - int depth = 0; - boolean matching = true; - boolean done = false; - int routingskip = 0; - int queueskip = 0; - while (matching && !done) - { - if ((queueList.size() == (depth + queueskip)) || (routingkeyList.size() == (depth + routingskip))) + int token = 0; + while (routingTokens.hasMoreTokens()) { - done = true; - // if it was the routing key that ran out of digits - if (routingkeyList.size() == (depth + routingskip)) - { - if (queueList.size() > (depth + queueskip)) - { // a hash and it is the last entry - matching = - queueList.get(depth + queueskip).equals(AMQP_HASH) - && (queueList.size() == (depth + queueskip + 1)); - } - } - else if (routingkeyList.size() > (depth + routingskip)) + AMQShortString next = routingTokens.nextToken(); + /* if (next.equals(AMQP_HASH) && routingkeyTokens.get(routingkeyTokens.size() - 1).equals(AMQP_HASH)) { - // There is still more routing key to check - matching = false; + continue; } + */ - continue; + routingkeyTokens[token++] = next; } + } + for (AMQShortString bindingKey : _wildCardBindingKey2queues.keySet()) + { + + AMQShortString[] bindingKeyTokens = _bindingKey2Tokenized.get(bindingKey); + + + boolean matching = true; + boolean done = false; - // if the values on the two topics don't match - if (!queueList.get(depth + queueskip).equals(routingkeyList.get(depth + routingskip))) + int depthPlusRoutingSkip = 0; + int depthPlusQueueSkip = 0; + + final int bindingKeyTokensCount = bindingKeyTokens.length; + + while (matching && !done) { - if (queueList.get(depth + queueskip).equals(AMQP_STAR)) + + if ((bindingKeyTokensCount == depthPlusQueueSkip) || (routingTokensCount == depthPlusRoutingSkip)) { - depth++; + done = true; + + // if it was the routing key that ran out of digits + if (routingTokensCount == depthPlusRoutingSkip) + { + if (bindingKeyTokensCount > depthPlusQueueSkip) + { // a hash and it is the last entry + matching = + bindingKeyTokens[depthPlusQueueSkip].equals(AMQP_HASH_TOKEN) + && (bindingKeyTokensCount == (depthPlusQueueSkip + 1)); + } + } + else if (routingTokensCount > depthPlusRoutingSkip) + { + // There is still more routing key to check + matching = false; + } continue; } - else if (queueList.get(depth + queueskip).equals(AMQP_HASH)) + + // if the values on the two topics don't match + if (!bindingKeyTokens[depthPlusQueueSkip].equals(routingkeyTokens[depthPlusRoutingSkip])) { - // Is this a # at the end - if (queueList.size() == (depth + queueskip + 1)) + if (bindingKeyTokens[depthPlusQueueSkip].equals(AMQP_STAR_TOKEN)) { - done = true; + depthPlusQueueSkip++; + depthPlusRoutingSkip++; continue; } - - // otherwise # in the middle - while (routingkeyList.size() > (depth + routingskip)) + else if (bindingKeyTokens[depthPlusQueueSkip].equals(AMQP_HASH_TOKEN)) { - if (routingkeyList.get(depth + routingskip).equals(queueList.get(depth + queueskip + 1))) + // Is this a # at the end + if (bindingKeyTokensCount == (depthPlusQueueSkip + 1)) + { + done = true; + + continue; + } + + // otherwise # in the middle + while (routingTokensCount > depthPlusRoutingSkip) { - queueskip++; - depth++; + if (routingkeyTokens[depthPlusRoutingSkip].equals(bindingKeyTokens[depthPlusQueueSkip + 1])) + { + depthPlusQueueSkip += 2; + depthPlusRoutingSkip++; + + break; + } - break; + depthPlusRoutingSkip++; } - routingskip++; + continue; } - continue; + matching = false; } - matching = false; + depthPlusQueueSkip++; + depthPlusRoutingSkip++; } - depth++; + if (matching) + { + if(list == null) + { + list = new ArrayList<AMQQueue>(_wildCardBindingKey2queues.get(bindingKey)); + } + else + { + list.addAll(_wildCardBindingKey2queues.get(bindingKey)); + } + } } - if (matching) + } + if(!_simpleBindingKey2queues.isEmpty()) + { + List<AMQQueue> queues = _simpleBindingKey2queues.get(routingKey); + if(list == null) + { + if(queues == null) + { + list = Collections.EMPTY_LIST; + } + else + { + list = new ArrayList<AMQQueue>(queues); + } + } + else if(queues != null) { - list.addAll(_routingKey2queues.get(queue)); + list.addAll(queues); } + } return list; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java index 57ae2bb6d4..e7c887f306 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java @@ -42,6 +42,7 @@ import javax.management.openmbean.TabularData; import javax.management.openmbean.TabularDataSupport;
import java.util.List;
import java.util.Map;
+import java.util.ArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
public class FanoutExchange extends AbstractExchange
@@ -205,10 +206,8 @@ public class FanoutExchange extends AbstractExchange _logger.debug("Publishing message to queue " + _queues);
}
- for (AMQQueue q : _queues)
- {
- payload.enqueue(q);
- }
+ payload.enqueue(new ArrayList(_queues));
+
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java index 3f604480b9..054674aed4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java @@ -29,7 +29,6 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.*; import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9; import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0; -import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.protocol.AMQProtocolSession; @@ -55,8 +54,8 @@ public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenB AMQProtocolSession session = stateManager.getProtocolSession(); VirtualHost virtualHost = session.getVirtualHost(); - final AMQChannel channel = new AMQChannel(session,channelId, virtualHost.getMessageStore(), - virtualHost.getExchangeRegistry()); + final AMQChannel channel = new AMQChannel(session,channelId, virtualHost.getMessageStore() + ); session.addChannel(channel); ChannelOpenOkBody response; diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java index 98c77d8d32..d7a879180a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java @@ -100,8 +100,8 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0);
AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
- AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
+ AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
writeFrame(compositeBlock);
//
@@ -151,8 +151,8 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0);
AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
- AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
+ AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
writeFrame(compositeBlock);
//
@@ -247,14 +247,13 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter if (bodyFrameIterator.hasNext())
{
AMQDataBlock firstContentBody = bodyFrameIterator.next();
- AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame, headerAndFirstContent);
+ AMQDataBlock[] blocks = new AMQDataBlock[]{returnFrame, contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
writeFrame(compositeBlock);
}
else
{
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame,
- new AMQDataBlock[]{contentHeader});
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(new AMQDataBlock[]{returnFrame, contentHeader});
writeFrame(compositeBlock);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java index b14f03e617..646ef43826 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java @@ -12,11 +12,14 @@ import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.framing.*;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
{
private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
+ private static final ProtocolVersionMethodConverter PROTOCOL_METHOD_CONVERTER = METHOD_REGISTRY.getProtocolVersionMethodConverter();
public static Factory getInstanceFactory()
@@ -47,10 +50,9 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
throws AMQException
{
- AMQDataBlock deliver = createEncodedDeliverFrame(message, channelId, deliveryTag, consumerTag);
+ AMQBody deliverBody = createEncodedDeliverFrame(message, channelId, deliveryTag, consumerTag);
final ContentHeaderBody contentHeaderBody = message.getContentHeaderBody();
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
- contentHeaderBody);
+
final AMQMessageHandle messageHandle = message.getMessageHandle();
final StoreContext storeContext = message.getStoreContext();
@@ -60,8 +62,8 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter if(bodyCount == 0)
{
- SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
- contentHeader);
+ SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody,
+ contentHeaderBody);
writeFrame(compositeBlock);
}
@@ -75,9 +77,9 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter //
ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0);
- AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
- AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
+ AMQBody firstContentBody = PROTOCOL_METHOD_CONVERTER.convertToBody(cb);
+
+ CompositeAMQBodyBlock compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
writeFrame(compositeBlock);
//
@@ -86,7 +88,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter for(int i = 1; i < bodyCount; i++)
{
cb = messageHandle.getContentChunk(storeContext,messageId, i);
- writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
+ writeFrame(new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb)));
}
@@ -95,6 +97,14 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter }
+ private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
+ {
+
+ AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+ contentHeaderBody);
+ return contentHeader;
+ }
+
public void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException
{
@@ -106,8 +116,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter AMQFrame deliver = createEncodedGetOkFrame(message, channelId, deliveryTag, queueSize);
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
- message.getContentHeaderBody());
+ AMQDataBlock contentHeader = createContentHeaderBlock(channelId, message.getContentHeaderBody());
final int bodyCount = messageHandle.getBodyCount(storeContext,messageId);
if(bodyCount == 0)
@@ -126,9 +135,9 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter //
ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0);
- AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
- AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
+ AMQDataBlock firstContentBody = new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb));
+ AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
writeFrame(compositeBlock);
//
@@ -137,7 +146,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter for(int i = 1; i < bodyCount; i++)
{
cb = messageHandle.getContentChunk(storeContext, messageId, i);
- writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
+ writeFrame(new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb)));
}
@@ -147,7 +156,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter }
- private AMQDataBlock createEncodedDeliverFrame(AMQMessage message, final int channelId, final long deliveryTag, final AMQShortString consumerTag)
+ private AMQBody createEncodedDeliverFrame(AMQMessage message, final int channelId, final long deliveryTag, final AMQShortString consumerTag)
throws AMQException
{
final MessagePublishInfo pb = message.getMessagePublishInfo();
@@ -158,23 +167,53 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter final AMQShortString exchangeName = pb.getExchange();
final AMQShortString routingKey = pb.getRoutingKey();
- final AMQDataBlock returnBlock = new DeferredDataBlock()
+ final AMQBody returnBlock = new AMQBody()
{
- protected AMQDataBlock createAMQDataBlock()
+ public AMQBody _underlyingBody;
+
+ public AMQBody createAMQBody()
{
- BasicDeliverBody deliverBody =
- METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
+ return METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
deliveryTag,
isRedelivered,
exchangeName,
routingKey);
- AMQFrame deliverFrame = deliverBody.generateFrame(channelId);
- return deliverFrame;
+
+
}
+
+ public byte getFrameType()
+ {
+ return AMQMethodBody.TYPE;
+ }
+
+ public int getSize()
+ {
+ if(_underlyingBody == null)
+ {
+ _underlyingBody = createAMQBody();
+ }
+ return _underlyingBody.getSize();
+ }
+
+ public void writePayload(ByteBuffer buffer)
+ {
+ if(_underlyingBody == null)
+ {
+ _underlyingBody = createAMQBody();
+ }
+ _underlyingBody.writePayload(buffer);
+ }
+
+ public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession)
+ throws AMQException
+ {
+ throw new AMQException("This block should never be dispatched!");
+ }
};
return returnBlock;
}
@@ -225,8 +264,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter {
AMQDataBlock returnFrame = createEncodedReturnFrame(message, channelId, replyCode, replyText);
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
- message.getContentHeaderBody());
+ AMQDataBlock contentHeader = createContentHeaderBlock(channelId, message.getContentHeaderBody());
Iterator<AMQDataBlock> bodyFrameIterator = message.getBodyFrameIterator(getProtocolSession(), channelId);
//
@@ -236,14 +274,13 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter if (bodyFrameIterator.hasNext())
{
AMQDataBlock firstContentBody = bodyFrameIterator.next();
- AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame, headerAndFirstContent);
+ AMQDataBlock[] blocks = new AMQDataBlock[]{returnFrame, contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
writeFrame(compositeBlock);
}
else
{
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame,
- new AMQDataBlock[]{contentHeader});
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(new AMQDataBlock[]{returnFrame, contentHeader});
writeFrame(compositeBlock);
}
@@ -272,4 +309,64 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter writeFrame(basicCancelOkBody.generateFrame(channelId));
}
+
+
+ public static final class CompositeAMQBodyBlock extends AMQDataBlock
+ {
+ public static final int OVERHEAD = 3 * AMQFrame.getFrameOverhead();
+
+ private final AMQBody _methodBody;
+ private final AMQBody _headerBody;
+ private final AMQBody _contentBody;
+ private final int _channel;
+
+
+ public CompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody, AMQBody contentBody)
+ {
+ _channel = channel;
+ _methodBody = methodBody;
+ _headerBody = headerBody;
+ _contentBody = contentBody;
+
+ }
+
+ public long getSize()
+ {
+ return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize();
+ }
+
+ public void writePayload(ByteBuffer buffer)
+ {
+ AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody);
+ }
+ }
+
+ public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock
+ {
+ public static final int OVERHEAD = 2 * AMQFrame.getFrameOverhead();
+
+ private final AMQBody _methodBody;
+ private final AMQBody _headerBody;
+ private final int _channel;
+
+
+ public SmallCompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody)
+ {
+ _channel = channel;
+ _methodBody = methodBody;
+ _headerBody = headerBody;
+
+ }
+
+ public long getSize()
+ {
+ return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ;
+ }
+
+ public void writePayload(ByteBuffer buffer)
+ {
+ AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
+ }
+ }
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 0fe6d3636e..143ee5fa40 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -208,27 +208,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable { _logger.debug("Frame Received: " + frame); } + + + body.handle(channelId, this); - if (body instanceof AMQMethodBody) - { - methodFrameReceived(channelId, (AMQMethodBody) body); - } - else if (body instanceof ContentHeaderBody) - { - contentHeaderReceived(channelId, (ContentHeaderBody) body); - } - else if (body instanceof ContentBody) - { - contentBodyReceived(channelId, (ContentBody) body); - } - else if (body instanceof HeartbeatBody) - { - // NO OP - } - else - { - _logger.warn("Unrecognised frame " + frame.getClass().getName()); - } } private void protocolInitiationReceived(ProtocolInitiation pi) @@ -271,7 +254,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable } } - private void methodFrameReceived(int channelId, AMQMethodBody methodBody) + public void methodFrameReceived(int channelId, AMQMethodBody methodBody) { final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(channelId, methodBody); @@ -365,7 +348,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable } catch (Exception e) { - _stateManager.error(e); + for (AMQMethodListener listener : _frameListeners) { listener.error(e); @@ -375,7 +358,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable } } - private void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException + public void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException { AMQChannel channel = getAndAssertChannel(channelId); @@ -384,13 +367,18 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable } - private void contentBodyReceived(int channelId, ContentBody body) throws AMQException + public void contentBodyReceived(int channelId, ContentBody body) throws AMQException { AMQChannel channel = getAndAssertChannel(channelId); channel.publishContentBody(body, this); } + public void heartbeatBodyReceived(int channelId, HeartbeatBody body) + { + // NO - OP + } + /** * Convenience method that writes a frame to the protocol session. Equivalent to calling * getProtocolSession().write(). diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java index 543e043bed..db5d882f51 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java @@ -30,6 +30,7 @@ import org.apache.mina.filter.ReadThrottleFilterBuilder; import org.apache.mina.filter.SSLFilter; import org.apache.mina.filter.WriteBufferLimitFilterBuilder; import org.apache.mina.filter.codec.ProtocolCodecFilter; +import org.apache.mina.filter.codec.QpidProtocolCodecFilter; import org.apache.mina.filter.executor.ExecutorFilter; import org.apache.mina.transport.socket.nio.SocketSessionConfig; import org.apache.mina.util.SessionUtil; @@ -82,7 +83,7 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter createSession(protocolSession, _applicationRegistry, codecFactory); _logger.info("Protocol session created for:" + protocolSession.getRemoteAddress()); - final ProtocolCodecFilter pcf = new ProtocolCodecFilter(codecFactory); + final QpidProtocolCodecFilter pcf = new QpidProtocolCodecFilter(codecFactory); ConnectorConfiguration connectorConfig = ApplicationRegistry.getInstance(). getConfiguredObject(ConnectorConfiguration.class); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index 80158779b2..5e79ab46b0 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -83,7 +83,7 @@ public class AMQMessage private long _expiration; - private final int hashcode = System.identityHashCode(this); + private Exchange _exchange; private static final boolean SYNCED_CLOCKS = @@ -92,7 +92,7 @@ public class AMQMessage public String debugIdentity() { - return "(HC:" + hashcode + " ID:" + _messageId + " Ref:" + _referenceCount.get() + ")"; + return "(HC:" + System.identityHashCode(this) + " ID:" + _messageId + " Ref:" + _referenceCount.get() + ")"; } public void setExpiration() @@ -141,6 +141,11 @@ public class AMQMessage _exchange.route(this); } + public void enqueue(final List<AMQQueue> queues) + { + _transientMessageData.setDestinationQueues(queues); + } + /** * Used to iterate through all the body frames associated with this message. Will not keep all the data in memory * therefore is memory-efficient. @@ -664,7 +669,7 @@ public class AMQMessage } finally { - destinationQueues.clear(); + // Remove refence for routing process . Reference count should now == delivered queue count decrementReference(storeContext); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index e1c1de29bd..4a0121700c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -37,9 +37,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost; import javax.management.JMException; import java.text.MessageFormat; -import java.util.List; -import java.util.Timer; -import java.util.TimerTask; +import java.util.*; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; @@ -158,6 +156,8 @@ public class AMQQueue implements Managable, Comparable public AtomicLong _totalMessagesReceived = new AtomicLong(); + private final Set<NotificationCheck> _notificationChecks = EnumSet.noneOf(NotificationCheck.class); + public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost) throws AMQException @@ -200,6 +200,13 @@ public class AMQQueue implements Managable, Comparable _subscribers = subscribers; _subscriptionFactory = subscriptionFactory; _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this); + + // This ensure that the notification checks for the configured alerts are created. + setMaximumMessageAge(_maximumMessageAge); + setMaximumMessageCount(_maximumMessageCount); + setMaximumMessageSize(_maximumMessageSize); + setMaximumQueueDepth(_maximumQueueDepth); + } private AMQQueueMBean createMBean() throws AMQException @@ -214,7 +221,7 @@ public class AMQQueue implements Managable, Comparable } } - public AMQShortString getName() + public final AMQShortString getName() { return _name; } @@ -540,9 +547,17 @@ public class AMQQueue implements Managable, Comparable return _maximumMessageSize; } - public void setMaximumMessageSize(long value) + public void setMaximumMessageSize(final long maximumMessageSize) { - _maximumMessageSize = value; + _maximumMessageSize = maximumMessageSize; + if(maximumMessageSize == 0L) + { + _notificationChecks.remove(NotificationCheck.MESSAGE_SIZE_ALERT); + } + else + { + _notificationChecks.add(NotificationCheck.MESSAGE_SIZE_ALERT); + } } public int getConsumerCount() @@ -565,9 +580,20 @@ public class AMQQueue implements Managable, Comparable return _maximumMessageCount; } - public void setMaximumMessageCount(long value) + public void setMaximumMessageCount(final long maximumMessageCount) { - _maximumMessageCount = value; + _maximumMessageCount = maximumMessageCount; + if(maximumMessageCount == 0L) + { + _notificationChecks.remove(NotificationCheck.MESSAGE_COUNT_ALERT); + } + else + { + _notificationChecks.add(NotificationCheck.MESSAGE_COUNT_ALERT); + } + + + } public long getMaximumQueueDepth() @@ -576,9 +602,18 @@ public class AMQQueue implements Managable, Comparable } // Sets the queue depth, the max queue size - public void setMaximumQueueDepth(long value) + public void setMaximumQueueDepth(final long maximumQueueDepth) { - _maximumQueueDepth = value; + _maximumQueueDepth = maximumQueueDepth; + if(maximumQueueDepth == 0L) + { + _notificationChecks.remove(NotificationCheck.QUEUE_DEPTH_ALERT); + } + else + { + _notificationChecks.add(NotificationCheck.QUEUE_DEPTH_ALERT); + } + } public long getOldestMessageArrivalTime() @@ -661,6 +696,10 @@ public class AMQQueue implements Managable, Comparable } _subscribers.addSubscriber(subscription); + if(exclusive) + { + _subscribers.setExclusive(true); + } } private boolean isExclusive() @@ -692,6 +731,7 @@ public class AMQQueue implements Managable, Comparable ps, channel, consumerTag, this)); } + _subscribers.setExclusive(false); Subscription removedSubscription; if ((removedSubscription = _subscribers.removeSubscriber(_subscriptionFactory.createSubscription(channel, ps, consumerTag))) @@ -805,7 +845,7 @@ public class AMQQueue implements Managable, Comparable public void process(StoreContext storeContext, QueueEntry entry, boolean deliverFirst) throws AMQException { AMQMessage msg = entry.getMessage(); - _deliveryMgr.deliver(storeContext, getName(), entry, deliverFirst); + _deliveryMgr.deliver(storeContext, _name, entry, deliverFirst); try { msg.checkDeliveredToConsumer(); @@ -938,6 +978,14 @@ public class AMQQueue implements Managable, Comparable public void setMaximumMessageAge(long maximumMessageAge) { _maximumMessageAge = maximumMessageAge; + if(maximumMessageAge == 0L) + { + _notificationChecks.remove(NotificationCheck.MESSAGE_AGE_ALERT); + } + else + { + _notificationChecks.add(NotificationCheck.MESSAGE_AGE_ALERT); + } } public void subscriberHasPendingResend(boolean hasContent, SubscriptionImpl subscription, QueueEntry entry) @@ -966,4 +1014,9 @@ public class AMQQueue implements Managable, Comparable } } } + + public final Set<NotificationCheck> getNotificationChecks() + { + return _notificationChecks; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java index 9e32de3f76..348a136f9d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -54,10 +54,7 @@ import javax.management.openmbean.TabularDataSupport; import javax.management.openmbean.TabularType; import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Date; -import java.util.Iterator; -import java.util.List; +import java.util.*; /** * AMQQueueMBean is the management bean for an {@link AMQQueue}. @@ -97,6 +94,9 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length]; private Notification _lastNotification = null; + + + @MBeanConstructor("Creates an MBean exposing an AMQQueue") public AMQQueueMBean(AMQQueue queue) throws JMException { @@ -249,16 +249,21 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que public void checkForNotification(AMQMessage msg) throws AMQException, JMException { - final long currentTime = System.currentTimeMillis(); - final long thresholdTime = currentTime - _queue.getMinimumAlertRepeatGap(); + final Set<NotificationCheck> notificationChecks = _queue.getNotificationChecks(); - for (NotificationCheck check : NotificationCheck.values()) + if(!notificationChecks.isEmpty()) { - if (check.isMessageSpecific() || (_lastNotificationTimes[check.ordinal()] < thresholdTime)) + final long currentTime = System.currentTimeMillis(); + final long thresholdTime = currentTime - _queue.getMinimumAlertRepeatGap(); + + for (NotificationCheck check : notificationChecks) { - if (check.notifyIfNecessary(msg, _queue, this)) + if (check.isMessageSpecific() || (_lastNotificationTimes[check.ordinal()] < thresholdTime)) { - _lastNotificationTimes[check.ordinal()] = currentTime; + if (check.notifyIfNecessary(msg, _queue, this)) + { + _lastNotificationTimes[check.ordinal()] = currentTime; + } } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index e3c8d3f17a..a61d41e33b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -363,8 +363,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager protocolSession.getProtocolOutputConverter().writeGetOk(entry.getMessage(), channel.getChannelId(), deliveryTag, _queue.getMessageCount()); - _totalMessageSize.addAndGet(-entry.getSize()); + } + _totalMessageSize.addAndGet(-entry.getSize()); if (!acks) { @@ -918,7 +919,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { if (!s.isSuspended()) { - if (_log.isDebugEnabled()) + if (debugEnabled) { _log.debug(debugIdentity() + "Delivering Message:" + entry.getMessage().debugIdentity() + " to(" + System.identityHashCode(s) + ") :" + s); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java b/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java index 6b3d65661f..6f9efd3200 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java @@ -29,9 +29,9 @@ public enum NotificationCheck {
boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
{
- int msgCount = queue.getMessageCount();
+ int msgCount;
final long maximumMessageCount = queue.getMaximumMessageCount();
- if (maximumMessageCount!= 0 && msgCount >= maximumMessageCount)
+ if (maximumMessageCount!= 0 && (msgCount = queue.getMessageCount()) >= maximumMessageCount)
{
listener.notifyClients(this, queue, msgCount + ": Maximum count on queue threshold ("+ maximumMessageCount +") breached.");
return true;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index 7f0accf052..6e68b5637e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -292,14 +292,17 @@ public class SubscriptionImpl implements Subscription queue.dequeue(storeContext, entry); } +/* + if (_sendLock.get()) + { + _logger.error("Sending " + entry + " when subscriber(" + this + ") is closed!"); + } +*/ + synchronized (channel) { long deliveryTag = channel.getNextDeliveryTag(); - if (_sendLock.get()) - { - _logger.error("Sending " + entry + " when subscriber(" + this + ") is closed!"); - } if (_acks) { @@ -308,10 +311,11 @@ public class SubscriptionImpl implements Subscription protocolSession.getProtocolOutputConverter().writeDeliver(entry.getMessage(), channel.getChannelId(), deliveryTag, consumerTag); - if (!_acks) - { - entry.getMessage().decrementReference(storeContext); - } + + } + if (!_acks) + { + entry.getMessage().decrementReference(storeContext); } } finally @@ -367,59 +371,60 @@ public class SubscriptionImpl implements Subscription // return false; } - final AMQProtocolSession publisher = entry.getMessage().getPublisher(); + //todo - client id should be recoreded and this test removed but handled below - if (_noLocal && publisher != null) + if (_noLocal) { - // We don't want local messages so check to see if message is one we sent - Object localInstance; - Object msgInstance; - if ((protocolSession.getClientProperties() != null) && - (localInstance = protocolSession.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null) + final AMQProtocolSession publisher = entry.getMessage().getPublisher(); + if(publisher != null) + { + // We don't want local messages so check to see if message is one we sent + Object localInstance; + Object msgInstance; - if ((publisher.getClientProperties() != null) && - (msgInstance = publisher.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null) + if ((protocolSession.getClientProperties() != null) && + (localInstance = protocolSession.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null) { - if (localInstance == msgInstance || localInstance.equals(msgInstance)) + + if ((publisher.getClientProperties() != null) && + (msgInstance = publisher.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null) { -// if (_logger.isTraceEnabled()) -// { -// _logger.trace("(" + debugIdentity() + ") has no interest as it is a local message(" + -// msg.debugIdentity() + ")"); -// } - return false; + if (localInstance == msgInstance || localInstance.equals(msgInstance)) + { + // if (_logger.isTraceEnabled()) + // { + // _logger.trace("(" + debugIdentity() + ") has no interest as it is a local message(" + + // msg.debugIdentity() + ")"); + // } + return false; + } } } - } - else - { + else + { - localInstance = protocolSession.getClientIdentifier(); - //todo - client id should be recoreded and this test removed but handled here + localInstance = protocolSession.getClientIdentifier(); + //todo - client id should be recoreded and this test removed but handled here - msgInstance = publisher.getClientIdentifier(); - if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance))) - { -// if (_logger.isTraceEnabled()) -// { -// _logger.trace("(" + debugIdentity() + ") has no interest as it is a local message(" + -// msg.debugIdentity() + ")"); -// } - return false; + msgInstance = publisher.getClientIdentifier(); + if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance))) + { + // if (_logger.isTraceEnabled()) + // { + // _logger.trace("(" + debugIdentity() + ") has no interest as it is a local message(" + + // msg.debugIdentity() + ")"); + // } + return false; + } } - } - + } } - if (_logger.isDebugEnabled()) - { - _logger.debug("(" + debugIdentity() + ") checking filters for message (" + entry.debugIdentity()); - } return checkFilters(entry); } @@ -433,23 +438,7 @@ public class SubscriptionImpl implements Subscription private boolean checkFilters(QueueEntry msg) { - if (_filters != null) - { -// if (_logger.isTraceEnabled()) -// { -// _logger.trace("(" + debugIdentity() + ") has filters."); -// } - return _filters.allAllow(msg.getMessage()); - } - else - { -// if (_logger.isTraceEnabled()) -// { -// _logger.trace("(" + debugIdentity() + ") has no filters"); -// } - - return true; - } + return (_filters == null) || _filters.allAllow(msg.getMessage()); } public Queue<QueueEntry> getPreDeliveryQueue() @@ -613,7 +602,7 @@ public class SubscriptionImpl implements Subscription public boolean wouldSuspend(QueueEntry msg) { - return channel.wouldSuspend(msg.getMessage()); + return _acks && channel.wouldSuspend(msg.getMessage()); } public Queue<QueueEntry> getResendQueue() diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java index b2f8cae8ff..b7cdaa29ab 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java @@ -39,6 +39,7 @@ class SubscriptionSet implements WeightedSubscriptionManager private int _currentSubscriber; private final Object _changeLock = new Object(); + private volatile boolean _exclusive; /** Accessor for unit tests. */ @@ -116,10 +117,7 @@ class SubscriptionSet implements WeightedSubscriptionManager */ public Subscription nextSubscriber(QueueEntry msg) { - if (_subscriptions.isEmpty()) - { - return null; - } + try { @@ -143,30 +141,64 @@ class SubscriptionSet implements WeightedSubscriptionManager private Subscription nextSubscriberImpl(QueueEntry msg) { - final ListIterator<Subscription> iterator = _subscriptions.listIterator(_currentSubscriber); - while (iterator.hasNext()) + if(_exclusive) { - Subscription subscription = iterator.next(); - ++_currentSubscriber; - subscriberScanned(); - - if (!(subscription.isSuspended() || subscription.wouldSuspend(msg))) + try { - if (subscription.hasInterest(msg)) + Subscription subscription = _subscriptions.get(0); + subscriberScanned(); + + if (!(subscription.isSuspended() || subscription.wouldSuspend(msg))) { - // if the queue is not empty then this client is ready to receive a message. - //FIXME the queue could be full of sent messages. - // Either need to clean all PDQs after sending a message - // OR have a clean up thread that runs the PDQs expunging the messages. - if (!subscription.filtersMessages() || subscription.getPreDeliveryQueue().isEmpty()) + if (subscription.hasInterest(msg)) { - return subscription; + // if the queue is not empty then this client is ready to receive a message. + //FIXME the queue could be full of sent messages. + // Either need to clean all PDQs after sending a message + // OR have a clean up thread that runs the PDQs expunging the messages. + if (!subscription.filtersMessages() || subscription.getPreDeliveryQueue().isEmpty()) + { + return subscription; + } } } } + catch(IndexOutOfBoundsException e) + { + } + return null; } + else + { + if (_subscriptions.isEmpty()) + { + return null; + } + final ListIterator<Subscription> iterator = _subscriptions.listIterator(_currentSubscriber); + while (iterator.hasNext()) + { + Subscription subscription = iterator.next(); + ++_currentSubscriber; + subscriberScanned(); - return null; + if (!(subscription.isSuspended() || subscription.wouldSuspend(msg))) + { + if (subscription.hasInterest(msg)) + { + // if the queue is not empty then this client is ready to receive a message. + //FIXME the queue could be full of sent messages. + // Either need to clean all PDQs after sending a message + // OR have a clean up thread that runs the PDQs expunging the messages. + if (!subscription.filtersMessages() || subscription.getPreDeliveryQueue().isEmpty()) + { + return subscription; + } + } + } + } + + return null; + } } /** Overridden in test classes. */ @@ -233,5 +265,14 @@ class SubscriptionSet implements WeightedSubscriptionManager { return _changeLock; } - + + public void setExclusive(final boolean exclusive) + { + _exclusive = exclusive; + } + + public boolean getExcBoolean() + { + return _exclusive; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java b/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java index 79ee6b93a3..9b91c71a1d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java @@ -22,6 +22,8 @@ package org.apache.qpid.server.queue; import java.util.LinkedList; import java.util.List; +import java.util.ArrayList; +import java.util.Collections; import org.apache.qpid.AMQException; import org.apache.qpid.framing.abstraction.MessagePublishInfo; @@ -60,7 +62,7 @@ public class TransientMessageData * delivered. It is <b>cleared after delivery has been attempted</b>. Any persistent record of destinations is done * by the message handle. */ - private List<AMQQueue> _destinationQueues = new LinkedList<AMQQueue>(); + private List<AMQQueue> _destinationQueues; public MessagePublishInfo getMessagePublishInfo() { @@ -74,7 +76,7 @@ public class TransientMessageData public List<AMQQueue> getDestinationQueues() { - return _destinationQueues; + return _destinationQueues == null ? (List<AMQQueue>) Collections.EMPTY_LIST : _destinationQueues; } public void setDestinationQueues(List<AMQQueue> destinationQueues) @@ -109,6 +111,10 @@ public class TransientMessageData public void addDestinationQueue(AMQQueue queue) { + if(_destinationQueues == null) + { + _destinationQueues = new ArrayList<AMQQueue>(); + } _destinationQueues.add(queue); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java index 047cef9064..1e4b69c935 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java @@ -49,11 +49,11 @@ public class NonTransactionalContext implements TransactionalContext /** Where to put undeliverable messages */ private final List<RequiredDeliveryException> _returnMessages; - private Set<Long> _browsedAcks; + private final Set<Long> _browsedAcks; private final MessageStore _messageStore; - private StoreContext _storeContext; + private final StoreContext _storeContext; /** Whether we are in a transaction */ private boolean _inTran; diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index 9addf83e01..afe96bcd4f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.virtualhost; import javax.management.NotCompliantMBeanException; import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.log4j.Logger; import org.apache.qpid.server.AMQBrokerManagerMBean; import org.apache.qpid.server.security.access.AccessManager; @@ -123,7 +124,7 @@ public class VirtualHost implements Accessable */ public VirtualHost(String name, MessageStore store) throws Exception { - this(name, null, store); + this(name, new PropertiesConfiguration(), store); } /** diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java index fbd9e65480..ed79384d42 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java @@ -173,7 +173,7 @@ public class AMQQueueAlertTest extends TestCase public void testQueueDepthAlertWithSubscribers() throws Exception { protocolSession = new TestMinaProtocolSession(); - AMQChannel channel = new AMQChannel(protocolSession, 2, _messageStore, null); + AMQChannel channel = new AMQChannel(protocolSession, 2, _messageStore); protocolSession.addChannel(channel); // Create queue diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index e72e1bf1f0..c02b47e9fd 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -123,7 +123,7 @@ public class AMQQueueMBeanTest extends TestCase TestMinaProtocolSession protocolSession = new TestMinaProtocolSession(); - AMQChannel channel = new AMQChannel(protocolSession, 1, _messageStore, null); + AMQChannel channel = new AMQChannel(protocolSession, 1, _messageStore); protocolSession.addChannel(channel); _queue.registerProtocolSession(protocolSession, 1, new AMQShortString("test"), false, null, false, false); |