diff options
58 files changed, 3067 insertions, 737 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 10184a79e5..3cb50d1d12 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -75,7 +75,7 @@ public class AMQChannel * The delivery tag is unique per channel. This is pre-incremented before putting into the deliver frame so that * value of this represents the <b>last</b> tag sent out */ - private AtomicLong _deliveryTag = new AtomicLong(0); + private long _deliveryTag = 0; /** A channel has a default queue (the last declared) that is used when no queue name is explictily set */ private AMQQueue _defaultQueue; @@ -99,8 +99,6 @@ public class AMQChannel private final AtomicBoolean _suspended = new AtomicBoolean(false); - private final MessageRouter _exchanges; - private TransactionalContext _txnContext, _nonTransactedContext; /** @@ -124,7 +122,7 @@ public class AMQChannel public boolean ENABLE_JMSXUserID; - public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore, MessageRouter exchanges) + public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore) throws AMQException { //Set values from configuration @@ -136,7 +134,7 @@ public class AMQChannel _prefetch_HighWaterMark = DEFAULT_PREFETCH; _prefetch_LowWaterMark = _prefetch_HighWaterMark / 2; _messageStore = messageStore; - _exchanges = exchanges; + // by default the session is non-transactional _txnContext = new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks); } @@ -297,7 +295,7 @@ public class AMQChannel public long getNextDeliveryTag() { - return _deliveryTag.incrementAndGet(); + return ++_deliveryTag; } public int getNextConsumerTag() @@ -969,16 +967,19 @@ public class AMQChannel public void processReturns(AMQProtocolSession session) throws AMQException { - for (RequiredDeliveryException bouncedMessage : _returnMessages) + if(!_returnMessages.isEmpty()) { - AMQMessage message = bouncedMessage.getAMQMessage(); - session.getProtocolOutputConverter().writeReturn(message, _channelId, bouncedMessage.getReplyCode().getCode(), - new AMQShortString(bouncedMessage.getMessage())); + for (RequiredDeliveryException bouncedMessage : _returnMessages) + { + AMQMessage message = bouncedMessage.getAMQMessage(); + session.getProtocolOutputConverter().writeReturn(message, _channelId, bouncedMessage.getReplyCode().getCode(), + new AMQShortString(bouncedMessage.getMessage())); - message.decrementReference(_storeContext); - } + message.decrementReference(_storeContext); + } - _returnMessages.clear(); + _returnMessages.clear(); + } } public boolean wouldSuspend(AMQMessage msg) diff --git a/java/broker/src/main/java/org/apache/qpid/server/Main.java b/java/broker/src/main/java/org/apache/qpid/server/Main.java index ab9f40b31d..d8a8cfb6d1 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/Main.java +++ b/java/broker/src/main/java/org/apache/qpid/server/Main.java @@ -35,6 +35,7 @@ import org.apache.log4j.xml.DOMConfigurator; import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.IoAcceptor; import org.apache.mina.common.SimpleByteBufferAllocator; +import org.apache.mina.common.FixedSizeByteBufferAllocator; import org.apache.mina.transport.socket.nio.SocketAcceptorConfig; import org.apache.mina.transport.socket.nio.SocketSessionConfig; import org.apache.qpid.AMQException; @@ -275,7 +276,7 @@ public class Main // once more testing of the performance of the simple allocator has been done if (!connectorConfig.enablePooledAllocator) { - ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); + ByteBuffer.setAllocator(new FixedSizeByteBufferAllocator()); } int port = connectorConfig.port; diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java index b6b6ee39ce..12347c0278 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java @@ -212,10 +212,9 @@ public class DestNameExchange extends AbstractExchange _logger.debug("Publishing message to queue " + queues); } - for (AMQQueue q : queues) - { - payload.enqueue(q); - } + payload.enqueue(queues); + + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java index 19172b98f3..dbe7a8938a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java @@ -26,6 +26,7 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.AMQShortStringTokenizer; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.management.MBeanConstructor; import org.apache.qpid.server.management.MBeanDescription; @@ -40,11 +41,7 @@ import javax.management.openmbean.CompositeDataSupport; import javax.management.openmbean.OpenDataException; import javax.management.openmbean.TabularData; import javax.management.openmbean.TabularDataSupport; -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.StringTokenizer; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -84,12 +81,21 @@ public class DestWildExchange extends AbstractExchange private static final Logger _logger = Logger.getLogger(DestWildExchange.class); - private ConcurrentHashMap<AMQShortString, List<AMQQueue>> _routingKey2queues = + private final ConcurrentHashMap<AMQShortString, List<AMQQueue>> _bindingKey2queues = + new ConcurrentHashMap<AMQShortString, List<AMQQueue>>(); + private final ConcurrentHashMap<AMQShortString, List<AMQQueue>> _simpleBindingKey2queues = + new ConcurrentHashMap<AMQShortString, List<AMQQueue>>(); + private final ConcurrentHashMap<AMQShortString, List<AMQQueue>> _wildCardBindingKey2queues = new ConcurrentHashMap<AMQShortString, List<AMQQueue>>(); // private ConcurrentHashMap<AMQShortString, AMQQueue> _routingKey2queue = new ConcurrentHashMap<AMQShortString, AMQQueue>(); - private static final String TOPIC_SEPARATOR = "."; - private static final String AMQP_STAR = "*"; - private static final String AMQP_HASH = "#"; + private static final byte TOPIC_SEPARATOR = (byte)'.'; + private static final AMQShortString TOPIC_SEPARATOR_AS_SHORTSTRING = new AMQShortString("."); + private static final AMQShortString AMQP_STAR_TOKEN = new AMQShortString("*"); + private static final AMQShortString AMQP_HASH_TOKEN = new AMQShortString("#"); + private ConcurrentHashMap<AMQShortString, AMQShortString[]> _bindingKey2Tokenized = + new ConcurrentHashMap<AMQShortString, AMQShortString[]>(); + private static final byte HASH_BYTE = (byte)'#'; + private static final byte STAR_BYTE = (byte)'*'; /** DestWildExchangeMBean class implements the management interface for the Topic exchanges. */ @MBeanDescription("Management Bean for Topic Exchange") @@ -107,7 +113,7 @@ public class DestWildExchange extends AbstractExchange public TabularData bindings() throws OpenDataException { _bindingList = new TabularDataSupport(_bindinglistDataType); - for (Map.Entry<AMQShortString, List<AMQQueue>> entry : _routingKey2queues.entrySet()) + for (Map.Entry<AMQShortString, List<AMQQueue>> entry : _bindingKey2queues.entrySet()) { AMQShortString key = entry.getKey(); List<String> queueList = new ArrayList<String>(); @@ -156,27 +162,75 @@ public class DestWildExchange extends AbstractExchange assert queue != null; assert rKey != null; - AMQShortString routingKey = normalize(rKey); + _logger.debug("Registering queue " + queue.getName() + " with routing key " + rKey); - _logger.debug("Registering queue " + queue.getName() + " with routing key " + routingKey); // we need to use putIfAbsent, which is an atomic operation, to avoid a race condition - List<AMQQueue> queueList = _routingKey2queues.putIfAbsent(routingKey, new CopyOnWriteArrayList<AMQQueue>()); + List<AMQQueue> queueList = _bindingKey2queues.putIfAbsent(rKey, new CopyOnWriteArrayList<AMQQueue>()); + + + + + + + // if we got null back, no previous value was associated with the specified routing key hence // we need to read back the new value just put into the map if (queueList == null) { - queueList = _routingKey2queues.get(routingKey); + queueList = _bindingKey2queues.get(rKey); } + + if (!queueList.contains(queue)) { queueList.add(queue); + + + if(rKey.contains(HASH_BYTE) || rKey.contains(STAR_BYTE)) + { + AMQShortString routingKey = normalize(rKey); + List<AMQQueue> queueList2 = _wildCardBindingKey2queues.putIfAbsent(routingKey, new CopyOnWriteArrayList<AMQQueue>()); + + if(queueList2 == null) + { + queueList2 = _wildCardBindingKey2queues.get(routingKey); + AMQShortStringTokenizer keyTok = routingKey.tokenize(TOPIC_SEPARATOR); + + ArrayList<AMQShortString> keyTokList = new ArrayList<AMQShortString>(keyTok.countTokens()); + + while (keyTok.hasMoreTokens()) + { + keyTokList.add(keyTok.nextToken()); + } + + _bindingKey2Tokenized.put(routingKey, keyTokList.toArray(new AMQShortString[keyTokList.size()])); + } + queueList2.add(queue); + + } + else + { + List<AMQQueue> queueList2 = _simpleBindingKey2queues.putIfAbsent(rKey, new CopyOnWriteArrayList<AMQQueue>()); + if(queueList2 == null) + { + queueList2 = _simpleBindingKey2queues.get(rKey); + } + queueList2.add(queue); + + } + + + + } else if (_logger.isDebugEnabled()) { - _logger.debug("Queue " + queue + " is already registered with routing key " + routingKey); + _logger.debug("Queue " + queue + " is already registered with routing key " + rKey); } + + } private AMQShortString normalize(AMQShortString routingKey) @@ -186,53 +240,58 @@ public class DestWildExchange extends AbstractExchange routingKey = AMQShortString.EMPTY_STRING; } - StringTokenizer routingTokens = new StringTokenizer(routingKey.toString(), TOPIC_SEPARATOR); - List<String> _subscription = new ArrayList<String>(); + AMQShortStringTokenizer routingTokens = routingKey.tokenize(TOPIC_SEPARATOR); + + List<AMQShortString> subscriptionList = new ArrayList<AMQShortString>(); while (routingTokens.hasMoreTokens()) { - _subscription.add(routingTokens.nextToken()); + subscriptionList.add(routingTokens.nextToken()); } - int size = _subscription.size(); + int size = subscriptionList.size(); for (int index = 0; index < size; index++) { // if there are more levels if ((index + 1) < size) { - if (_subscription.get(index).equals(AMQP_HASH)) + if (subscriptionList.get(index).equals(AMQP_HASH_TOKEN)) { - if (_subscription.get(index + 1).equals(AMQP_HASH)) + if (subscriptionList.get(index + 1).equals(AMQP_HASH_TOKEN)) { // we don't need #.# delete this one - _subscription.remove(index); + subscriptionList.remove(index); size--; // redo this normalisation index--; } - if (_subscription.get(index + 1).equals(AMQP_STAR)) + if (subscriptionList.get(index + 1).equals(AMQP_STAR_TOKEN)) { // we don't want #.* swap to *.# // remove it and put it in at index + 1 - _subscription.add(index + 1, _subscription.remove(index)); + subscriptionList.add(index + 1, subscriptionList.remove(index)); } } } // if we have more levels } - StringBuilder sb = new StringBuilder(); - for (String s : _subscription) + + AMQShortString normalizedString = AMQShortString.join(subscriptionList, TOPIC_SEPARATOR_AS_SHORTSTRING); +/* + StringBuilder sb = new StringBuilder(); + for (AMQShortString s : subscriptionList) { sb.append(s); sb.append(TOPIC_SEPARATOR); } sb.deleteCharAt(sb.length() - 1); +*/ - return new AMQShortString(sb.toString()); + return normalizedString; } public void route(AMQMessage payload) throws AMQException @@ -254,19 +313,14 @@ public class DestWildExchange extends AbstractExchange else { _logger.warn("No queues found for routing key " + routingKey); - _logger.warn("Routing map contains: " + _routingKey2queues); + _logger.warn("Routing map contains: " + _bindingKey2queues); return; } } - for (AMQQueue q : queues) - { - // TODO: modify code generator to add clone() method then clone the deliver body - // without this addition we have a race condition - we will be modifying the body - // before the encoder has encoded the body for delivery - payload.enqueue(q); - } + payload.enqueue(queues); + } public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue) @@ -276,21 +330,21 @@ public class DestWildExchange extends AbstractExchange public boolean isBound(AMQShortString routingKey, AMQQueue queue) { - List<AMQQueue> queues = _routingKey2queues.get(normalize(routingKey)); + List<AMQQueue> queues = _bindingKey2queues.get(normalize(routingKey)); return (queues != null) && queues.contains(queue); } public boolean isBound(AMQShortString routingKey) { - List<AMQQueue> queues = _routingKey2queues.get(normalize(routingKey)); + List<AMQQueue> queues = _bindingKey2queues.get(normalize(routingKey)); return (queues != null) && !queues.isEmpty(); } public boolean isBound(AMQQueue queue) { - for (List<AMQQueue> queues : _routingKey2queues.values()) + for (List<AMQQueue> queues : _bindingKey2queues.values()) { if (queues.contains(queue)) { @@ -303,7 +357,7 @@ public class DestWildExchange extends AbstractExchange public boolean hasBindings() { - return !_routingKey2queues.isEmpty(); + return !_bindingKey2queues.isEmpty(); } public synchronized void deregisterQueue(AMQShortString rKey, AMQQueue queue, FieldTable args) throws AMQException @@ -311,13 +365,11 @@ public class DestWildExchange extends AbstractExchange assert queue != null; assert rKey != null; - AMQShortString routingKey = normalize(rKey); - - List<AMQQueue> queues = _routingKey2queues.get(routingKey); + List<AMQQueue> queues = _bindingKey2queues.get(rKey); if (queues == null) { throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue + " was not registered with exchange " + this.getName() - + " with routing key " + routingKey + ". No queue was registered with that _routing key"); + + " with routing key " + rKey + ". No queue was registered with that _routing key"); } @@ -325,12 +377,39 @@ public class DestWildExchange extends AbstractExchange if (!removedQ) { throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue + " was not registered with exchange " + this.getName() - + " with routing key " + routingKey); + + " with routing key " + rKey); + } + + + if(rKey.contains(HASH_BYTE) || rKey.contains(STAR_BYTE)) + { + AMQShortString bindingKey = normalize(rKey); + List<AMQQueue> queues2 = _wildCardBindingKey2queues.get(bindingKey); + queues2.remove(queue); + if(queues2.isEmpty()) + { + _wildCardBindingKey2queues.remove(bindingKey); + _bindingKey2Tokenized.remove(bindingKey); + } + } + else + { + List<AMQQueue> queues2 = _simpleBindingKey2queues.get(rKey); + queues2.remove(queue); + if(queues2.isEmpty()) + { + _simpleBindingKey2queues.remove(rKey); + } + + } + + + if (queues.isEmpty()) { - _routingKey2queues.remove(routingKey); + _bindingKey2queues.remove(rKey); } } @@ -349,117 +428,167 @@ public class DestWildExchange extends AbstractExchange public Map<AMQShortString, List<AMQQueue>> getBindings() { - return _routingKey2queues; + return _bindingKey2queues; } private List<AMQQueue> getMatchedQueues(AMQShortString routingKey) { - List<AMQQueue> list = new LinkedList<AMQQueue>(); - StringTokenizer routingTokens = new StringTokenizer(routingKey.toString(), TOPIC_SEPARATOR); - ArrayList<String> routingkeyList = new ArrayList<String>(); + List<AMQQueue> list = null; - while (routingTokens.hasMoreTokens()) + if(!_wildCardBindingKey2queues.isEmpty()) { - String next = routingTokens.nextToken(); - if (next.equals(AMQP_HASH) && routingkeyList.get(routingkeyList.size() - 1).equals(AMQP_HASH)) - { - continue; - } - routingkeyList.add(next); - } - for (AMQShortString queue : _routingKey2queues.keySet()) - { - StringTokenizer queTok = new StringTokenizer(queue.toString(), TOPIC_SEPARATOR); + AMQShortStringTokenizer routingTokens = routingKey.tokenize(TOPIC_SEPARATOR); + + final int routingTokensCount = routingTokens.countTokens(); - ArrayList<String> queueList = new ArrayList<String>(); - while (queTok.hasMoreTokens()) + AMQShortString[] routingkeyTokens = new AMQShortString[routingTokensCount]; + + if(routingTokensCount == 1) { - queueList.add(queTok.nextToken()); + routingkeyTokens[0] =routingKey; } + else + { - int depth = 0; - boolean matching = true; - boolean done = false; - int routingskip = 0; - int queueskip = 0; - while (matching && !done) - { - if ((queueList.size() == (depth + queueskip)) || (routingkeyList.size() == (depth + routingskip))) + int token = 0; + while (routingTokens.hasMoreTokens()) { - done = true; - // if it was the routing key that ran out of digits - if (routingkeyList.size() == (depth + routingskip)) - { - if (queueList.size() > (depth + queueskip)) - { // a hash and it is the last entry - matching = - queueList.get(depth + queueskip).equals(AMQP_HASH) - && (queueList.size() == (depth + queueskip + 1)); - } - } - else if (routingkeyList.size() > (depth + routingskip)) + AMQShortString next = routingTokens.nextToken(); + /* if (next.equals(AMQP_HASH) && routingkeyTokens.get(routingkeyTokens.size() - 1).equals(AMQP_HASH)) { - // There is still more routing key to check - matching = false; + continue; } + */ - continue; + routingkeyTokens[token++] = next; } + } + for (AMQShortString bindingKey : _wildCardBindingKey2queues.keySet()) + { + + AMQShortString[] bindingKeyTokens = _bindingKey2Tokenized.get(bindingKey); + + + boolean matching = true; + boolean done = false; - // if the values on the two topics don't match - if (!queueList.get(depth + queueskip).equals(routingkeyList.get(depth + routingskip))) + int depthPlusRoutingSkip = 0; + int depthPlusQueueSkip = 0; + + final int bindingKeyTokensCount = bindingKeyTokens.length; + + while (matching && !done) { - if (queueList.get(depth + queueskip).equals(AMQP_STAR)) + + if ((bindingKeyTokensCount == depthPlusQueueSkip) || (routingTokensCount == depthPlusRoutingSkip)) { - depth++; + done = true; + + // if it was the routing key that ran out of digits + if (routingTokensCount == depthPlusRoutingSkip) + { + if (bindingKeyTokensCount > depthPlusQueueSkip) + { // a hash and it is the last entry + matching = + bindingKeyTokens[depthPlusQueueSkip].equals(AMQP_HASH_TOKEN) + && (bindingKeyTokensCount == (depthPlusQueueSkip + 1)); + } + } + else if (routingTokensCount > depthPlusRoutingSkip) + { + // There is still more routing key to check + matching = false; + } continue; } - else if (queueList.get(depth + queueskip).equals(AMQP_HASH)) + + // if the values on the two topics don't match + if (!bindingKeyTokens[depthPlusQueueSkip].equals(routingkeyTokens[depthPlusRoutingSkip])) { - // Is this a # at the end - if (queueList.size() == (depth + queueskip + 1)) + if (bindingKeyTokens[depthPlusQueueSkip].equals(AMQP_STAR_TOKEN)) { - done = true; + depthPlusQueueSkip++; + depthPlusRoutingSkip++; continue; } - - // otherwise # in the middle - while (routingkeyList.size() > (depth + routingskip)) + else if (bindingKeyTokens[depthPlusQueueSkip].equals(AMQP_HASH_TOKEN)) { - if (routingkeyList.get(depth + routingskip).equals(queueList.get(depth + queueskip + 1))) + // Is this a # at the end + if (bindingKeyTokensCount == (depthPlusQueueSkip + 1)) + { + done = true; + + continue; + } + + // otherwise # in the middle + while (routingTokensCount > depthPlusRoutingSkip) { - queueskip++; - depth++; + if (routingkeyTokens[depthPlusRoutingSkip].equals(bindingKeyTokens[depthPlusQueueSkip + 1])) + { + depthPlusQueueSkip += 2; + depthPlusRoutingSkip++; + + break; + } - break; + depthPlusRoutingSkip++; } - routingskip++; + continue; } - continue; + matching = false; } - matching = false; + depthPlusQueueSkip++; + depthPlusRoutingSkip++; } - depth++; + if (matching) + { + if(list == null) + { + list = new ArrayList<AMQQueue>(_wildCardBindingKey2queues.get(bindingKey)); + } + else + { + list.addAll(_wildCardBindingKey2queues.get(bindingKey)); + } + } } - if (matching) + } + if(!_simpleBindingKey2queues.isEmpty()) + { + List<AMQQueue> queues = _simpleBindingKey2queues.get(routingKey); + if(list == null) + { + if(queues == null) + { + list = Collections.EMPTY_LIST; + } + else + { + list = new ArrayList<AMQQueue>(queues); + } + } + else if(queues != null) { - list.addAll(_routingKey2queues.get(queue)); + list.addAll(queues); } + } return list; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java index 57ae2bb6d4..e7c887f306 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java @@ -42,6 +42,7 @@ import javax.management.openmbean.TabularData; import javax.management.openmbean.TabularDataSupport;
import java.util.List;
import java.util.Map;
+import java.util.ArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
public class FanoutExchange extends AbstractExchange
@@ -205,10 +206,8 @@ public class FanoutExchange extends AbstractExchange _logger.debug("Publishing message to queue " + _queues);
}
- for (AMQQueue q : _queues)
- {
- payload.enqueue(q);
- }
+ payload.enqueue(new ArrayList(_queues));
+
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java index 3f604480b9..054674aed4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java @@ -29,7 +29,6 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.*; import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9; import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0; -import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.protocol.AMQProtocolSession; @@ -55,8 +54,8 @@ public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenB AMQProtocolSession session = stateManager.getProtocolSession(); VirtualHost virtualHost = session.getVirtualHost(); - final AMQChannel channel = new AMQChannel(session,channelId, virtualHost.getMessageStore(), - virtualHost.getExchangeRegistry()); + final AMQChannel channel = new AMQChannel(session,channelId, virtualHost.getMessageStore() + ); session.addChannel(channel); ChannelOpenOkBody response; diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java index 98c77d8d32..d7a879180a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java @@ -100,8 +100,8 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0);
AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
- AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
+ AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
writeFrame(compositeBlock);
//
@@ -151,8 +151,8 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0);
AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
- AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
+ AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
writeFrame(compositeBlock);
//
@@ -247,14 +247,13 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter if (bodyFrameIterator.hasNext())
{
AMQDataBlock firstContentBody = bodyFrameIterator.next();
- AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame, headerAndFirstContent);
+ AMQDataBlock[] blocks = new AMQDataBlock[]{returnFrame, contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
writeFrame(compositeBlock);
}
else
{
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame,
- new AMQDataBlock[]{contentHeader});
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(new AMQDataBlock[]{returnFrame, contentHeader});
writeFrame(compositeBlock);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java index b14f03e617..646ef43826 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java @@ -12,11 +12,14 @@ import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.framing.*;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
{
private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
+ private static final ProtocolVersionMethodConverter PROTOCOL_METHOD_CONVERTER = METHOD_REGISTRY.getProtocolVersionMethodConverter();
public static Factory getInstanceFactory()
@@ -47,10 +50,9 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
throws AMQException
{
- AMQDataBlock deliver = createEncodedDeliverFrame(message, channelId, deliveryTag, consumerTag);
+ AMQBody deliverBody = createEncodedDeliverFrame(message, channelId, deliveryTag, consumerTag);
final ContentHeaderBody contentHeaderBody = message.getContentHeaderBody();
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
- contentHeaderBody);
+
final AMQMessageHandle messageHandle = message.getMessageHandle();
final StoreContext storeContext = message.getStoreContext();
@@ -60,8 +62,8 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter if(bodyCount == 0)
{
- SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
- contentHeader);
+ SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody,
+ contentHeaderBody);
writeFrame(compositeBlock);
}
@@ -75,9 +77,9 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter //
ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0);
- AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
- AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
+ AMQBody firstContentBody = PROTOCOL_METHOD_CONVERTER.convertToBody(cb);
+
+ CompositeAMQBodyBlock compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
writeFrame(compositeBlock);
//
@@ -86,7 +88,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter for(int i = 1; i < bodyCount; i++)
{
cb = messageHandle.getContentChunk(storeContext,messageId, i);
- writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
+ writeFrame(new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb)));
}
@@ -95,6 +97,14 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter }
+ private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
+ {
+
+ AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+ contentHeaderBody);
+ return contentHeader;
+ }
+
public void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException
{
@@ -106,8 +116,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter AMQFrame deliver = createEncodedGetOkFrame(message, channelId, deliveryTag, queueSize);
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
- message.getContentHeaderBody());
+ AMQDataBlock contentHeader = createContentHeaderBlock(channelId, message.getContentHeaderBody());
final int bodyCount = messageHandle.getBodyCount(storeContext,messageId);
if(bodyCount == 0)
@@ -126,9 +135,9 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter //
ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0);
- AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
- AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
+ AMQDataBlock firstContentBody = new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb));
+ AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
writeFrame(compositeBlock);
//
@@ -137,7 +146,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter for(int i = 1; i < bodyCount; i++)
{
cb = messageHandle.getContentChunk(storeContext, messageId, i);
- writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
+ writeFrame(new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb)));
}
@@ -147,7 +156,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter }
- private AMQDataBlock createEncodedDeliverFrame(AMQMessage message, final int channelId, final long deliveryTag, final AMQShortString consumerTag)
+ private AMQBody createEncodedDeliverFrame(AMQMessage message, final int channelId, final long deliveryTag, final AMQShortString consumerTag)
throws AMQException
{
final MessagePublishInfo pb = message.getMessagePublishInfo();
@@ -158,23 +167,53 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter final AMQShortString exchangeName = pb.getExchange();
final AMQShortString routingKey = pb.getRoutingKey();
- final AMQDataBlock returnBlock = new DeferredDataBlock()
+ final AMQBody returnBlock = new AMQBody()
{
- protected AMQDataBlock createAMQDataBlock()
+ public AMQBody _underlyingBody;
+
+ public AMQBody createAMQBody()
{
- BasicDeliverBody deliverBody =
- METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
+ return METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
deliveryTag,
isRedelivered,
exchangeName,
routingKey);
- AMQFrame deliverFrame = deliverBody.generateFrame(channelId);
- return deliverFrame;
+
+
}
+
+ public byte getFrameType()
+ {
+ return AMQMethodBody.TYPE;
+ }
+
+ public int getSize()
+ {
+ if(_underlyingBody == null)
+ {
+ _underlyingBody = createAMQBody();
+ }
+ return _underlyingBody.getSize();
+ }
+
+ public void writePayload(ByteBuffer buffer)
+ {
+ if(_underlyingBody == null)
+ {
+ _underlyingBody = createAMQBody();
+ }
+ _underlyingBody.writePayload(buffer);
+ }
+
+ public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession)
+ throws AMQException
+ {
+ throw new AMQException("This block should never be dispatched!");
+ }
};
return returnBlock;
}
@@ -225,8 +264,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter {
AMQDataBlock returnFrame = createEncodedReturnFrame(message, channelId, replyCode, replyText);
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
- message.getContentHeaderBody());
+ AMQDataBlock contentHeader = createContentHeaderBlock(channelId, message.getContentHeaderBody());
Iterator<AMQDataBlock> bodyFrameIterator = message.getBodyFrameIterator(getProtocolSession(), channelId);
//
@@ -236,14 +274,13 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter if (bodyFrameIterator.hasNext())
{
AMQDataBlock firstContentBody = bodyFrameIterator.next();
- AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame, headerAndFirstContent);
+ AMQDataBlock[] blocks = new AMQDataBlock[]{returnFrame, contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
writeFrame(compositeBlock);
}
else
{
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame,
- new AMQDataBlock[]{contentHeader});
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(new AMQDataBlock[]{returnFrame, contentHeader});
writeFrame(compositeBlock);
}
@@ -272,4 +309,64 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter writeFrame(basicCancelOkBody.generateFrame(channelId));
}
+
+
+ public static final class CompositeAMQBodyBlock extends AMQDataBlock
+ {
+ public static final int OVERHEAD = 3 * AMQFrame.getFrameOverhead();
+
+ private final AMQBody _methodBody;
+ private final AMQBody _headerBody;
+ private final AMQBody _contentBody;
+ private final int _channel;
+
+
+ public CompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody, AMQBody contentBody)
+ {
+ _channel = channel;
+ _methodBody = methodBody;
+ _headerBody = headerBody;
+ _contentBody = contentBody;
+
+ }
+
+ public long getSize()
+ {
+ return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize();
+ }
+
+ public void writePayload(ByteBuffer buffer)
+ {
+ AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody);
+ }
+ }
+
+ public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock
+ {
+ public static final int OVERHEAD = 2 * AMQFrame.getFrameOverhead();
+
+ private final AMQBody _methodBody;
+ private final AMQBody _headerBody;
+ private final int _channel;
+
+
+ public SmallCompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody)
+ {
+ _channel = channel;
+ _methodBody = methodBody;
+ _headerBody = headerBody;
+
+ }
+
+ public long getSize()
+ {
+ return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ;
+ }
+
+ public void writePayload(ByteBuffer buffer)
+ {
+ AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
+ }
+ }
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 0fe6d3636e..143ee5fa40 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -208,27 +208,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable { _logger.debug("Frame Received: " + frame); } + + + body.handle(channelId, this); - if (body instanceof AMQMethodBody) - { - methodFrameReceived(channelId, (AMQMethodBody) body); - } - else if (body instanceof ContentHeaderBody) - { - contentHeaderReceived(channelId, (ContentHeaderBody) body); - } - else if (body instanceof ContentBody) - { - contentBodyReceived(channelId, (ContentBody) body); - } - else if (body instanceof HeartbeatBody) - { - // NO OP - } - else - { - _logger.warn("Unrecognised frame " + frame.getClass().getName()); - } } private void protocolInitiationReceived(ProtocolInitiation pi) @@ -271,7 +254,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable } } - private void methodFrameReceived(int channelId, AMQMethodBody methodBody) + public void methodFrameReceived(int channelId, AMQMethodBody methodBody) { final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(channelId, methodBody); @@ -365,7 +348,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable } catch (Exception e) { - _stateManager.error(e); + for (AMQMethodListener listener : _frameListeners) { listener.error(e); @@ -375,7 +358,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable } } - private void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException + public void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException { AMQChannel channel = getAndAssertChannel(channelId); @@ -384,13 +367,18 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable } - private void contentBodyReceived(int channelId, ContentBody body) throws AMQException + public void contentBodyReceived(int channelId, ContentBody body) throws AMQException { AMQChannel channel = getAndAssertChannel(channelId); channel.publishContentBody(body, this); } + public void heartbeatBodyReceived(int channelId, HeartbeatBody body) + { + // NO - OP + } + /** * Convenience method that writes a frame to the protocol session. Equivalent to calling * getProtocolSession().write(). diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java index 543e043bed..db5d882f51 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java @@ -30,6 +30,7 @@ import org.apache.mina.filter.ReadThrottleFilterBuilder; import org.apache.mina.filter.SSLFilter; import org.apache.mina.filter.WriteBufferLimitFilterBuilder; import org.apache.mina.filter.codec.ProtocolCodecFilter; +import org.apache.mina.filter.codec.QpidProtocolCodecFilter; import org.apache.mina.filter.executor.ExecutorFilter; import org.apache.mina.transport.socket.nio.SocketSessionConfig; import org.apache.mina.util.SessionUtil; @@ -82,7 +83,7 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter createSession(protocolSession, _applicationRegistry, codecFactory); _logger.info("Protocol session created for:" + protocolSession.getRemoteAddress()); - final ProtocolCodecFilter pcf = new ProtocolCodecFilter(codecFactory); + final QpidProtocolCodecFilter pcf = new QpidProtocolCodecFilter(codecFactory); ConnectorConfiguration connectorConfig = ApplicationRegistry.getInstance(). getConfiguredObject(ConnectorConfiguration.class); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index 80158779b2..5e79ab46b0 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -83,7 +83,7 @@ public class AMQMessage private long _expiration; - private final int hashcode = System.identityHashCode(this); + private Exchange _exchange; private static final boolean SYNCED_CLOCKS = @@ -92,7 +92,7 @@ public class AMQMessage public String debugIdentity() { - return "(HC:" + hashcode + " ID:" + _messageId + " Ref:" + _referenceCount.get() + ")"; + return "(HC:" + System.identityHashCode(this) + " ID:" + _messageId + " Ref:" + _referenceCount.get() + ")"; } public void setExpiration() @@ -141,6 +141,11 @@ public class AMQMessage _exchange.route(this); } + public void enqueue(final List<AMQQueue> queues) + { + _transientMessageData.setDestinationQueues(queues); + } + /** * Used to iterate through all the body frames associated with this message. Will not keep all the data in memory * therefore is memory-efficient. @@ -664,7 +669,7 @@ public class AMQMessage } finally { - destinationQueues.clear(); + // Remove refence for routing process . Reference count should now == delivered queue count decrementReference(storeContext); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index e1c1de29bd..4a0121700c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -37,9 +37,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost; import javax.management.JMException; import java.text.MessageFormat; -import java.util.List; -import java.util.Timer; -import java.util.TimerTask; +import java.util.*; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; @@ -158,6 +156,8 @@ public class AMQQueue implements Managable, Comparable public AtomicLong _totalMessagesReceived = new AtomicLong(); + private final Set<NotificationCheck> _notificationChecks = EnumSet.noneOf(NotificationCheck.class); + public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost) throws AMQException @@ -200,6 +200,13 @@ public class AMQQueue implements Managable, Comparable _subscribers = subscribers; _subscriptionFactory = subscriptionFactory; _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this); + + // This ensure that the notification checks for the configured alerts are created. + setMaximumMessageAge(_maximumMessageAge); + setMaximumMessageCount(_maximumMessageCount); + setMaximumMessageSize(_maximumMessageSize); + setMaximumQueueDepth(_maximumQueueDepth); + } private AMQQueueMBean createMBean() throws AMQException @@ -214,7 +221,7 @@ public class AMQQueue implements Managable, Comparable } } - public AMQShortString getName() + public final AMQShortString getName() { return _name; } @@ -540,9 +547,17 @@ public class AMQQueue implements Managable, Comparable return _maximumMessageSize; } - public void setMaximumMessageSize(long value) + public void setMaximumMessageSize(final long maximumMessageSize) { - _maximumMessageSize = value; + _maximumMessageSize = maximumMessageSize; + if(maximumMessageSize == 0L) + { + _notificationChecks.remove(NotificationCheck.MESSAGE_SIZE_ALERT); + } + else + { + _notificationChecks.add(NotificationCheck.MESSAGE_SIZE_ALERT); + } } public int getConsumerCount() @@ -565,9 +580,20 @@ public class AMQQueue implements Managable, Comparable return _maximumMessageCount; } - public void setMaximumMessageCount(long value) + public void setMaximumMessageCount(final long maximumMessageCount) { - _maximumMessageCount = value; + _maximumMessageCount = maximumMessageCount; + if(maximumMessageCount == 0L) + { + _notificationChecks.remove(NotificationCheck.MESSAGE_COUNT_ALERT); + } + else + { + _notificationChecks.add(NotificationCheck.MESSAGE_COUNT_ALERT); + } + + + } public long getMaximumQueueDepth() @@ -576,9 +602,18 @@ public class AMQQueue implements Managable, Comparable } // Sets the queue depth, the max queue size - public void setMaximumQueueDepth(long value) + public void setMaximumQueueDepth(final long maximumQueueDepth) { - _maximumQueueDepth = value; + _maximumQueueDepth = maximumQueueDepth; + if(maximumQueueDepth == 0L) + { + _notificationChecks.remove(NotificationCheck.QUEUE_DEPTH_ALERT); + } + else + { + _notificationChecks.add(NotificationCheck.QUEUE_DEPTH_ALERT); + } + } public long getOldestMessageArrivalTime() @@ -661,6 +696,10 @@ public class AMQQueue implements Managable, Comparable } _subscribers.addSubscriber(subscription); + if(exclusive) + { + _subscribers.setExclusive(true); + } } private boolean isExclusive() @@ -692,6 +731,7 @@ public class AMQQueue implements Managable, Comparable ps, channel, consumerTag, this)); } + _subscribers.setExclusive(false); Subscription removedSubscription; if ((removedSubscription = _subscribers.removeSubscriber(_subscriptionFactory.createSubscription(channel, ps, consumerTag))) @@ -805,7 +845,7 @@ public class AMQQueue implements Managable, Comparable public void process(StoreContext storeContext, QueueEntry entry, boolean deliverFirst) throws AMQException { AMQMessage msg = entry.getMessage(); - _deliveryMgr.deliver(storeContext, getName(), entry, deliverFirst); + _deliveryMgr.deliver(storeContext, _name, entry, deliverFirst); try { msg.checkDeliveredToConsumer(); @@ -938,6 +978,14 @@ public class AMQQueue implements Managable, Comparable public void setMaximumMessageAge(long maximumMessageAge) { _maximumMessageAge = maximumMessageAge; + if(maximumMessageAge == 0L) + { + _notificationChecks.remove(NotificationCheck.MESSAGE_AGE_ALERT); + } + else + { + _notificationChecks.add(NotificationCheck.MESSAGE_AGE_ALERT); + } } public void subscriberHasPendingResend(boolean hasContent, SubscriptionImpl subscription, QueueEntry entry) @@ -966,4 +1014,9 @@ public class AMQQueue implements Managable, Comparable } } } + + public final Set<NotificationCheck> getNotificationChecks() + { + return _notificationChecks; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java index 9e32de3f76..348a136f9d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -54,10 +54,7 @@ import javax.management.openmbean.TabularDataSupport; import javax.management.openmbean.TabularType; import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Date; -import java.util.Iterator; -import java.util.List; +import java.util.*; /** * AMQQueueMBean is the management bean for an {@link AMQQueue}. @@ -97,6 +94,9 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length]; private Notification _lastNotification = null; + + + @MBeanConstructor("Creates an MBean exposing an AMQQueue") public AMQQueueMBean(AMQQueue queue) throws JMException { @@ -249,16 +249,21 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que public void checkForNotification(AMQMessage msg) throws AMQException, JMException { - final long currentTime = System.currentTimeMillis(); - final long thresholdTime = currentTime - _queue.getMinimumAlertRepeatGap(); + final Set<NotificationCheck> notificationChecks = _queue.getNotificationChecks(); - for (NotificationCheck check : NotificationCheck.values()) + if(!notificationChecks.isEmpty()) { - if (check.isMessageSpecific() || (_lastNotificationTimes[check.ordinal()] < thresholdTime)) + final long currentTime = System.currentTimeMillis(); + final long thresholdTime = currentTime - _queue.getMinimumAlertRepeatGap(); + + for (NotificationCheck check : notificationChecks) { - if (check.notifyIfNecessary(msg, _queue, this)) + if (check.isMessageSpecific() || (_lastNotificationTimes[check.ordinal()] < thresholdTime)) { - _lastNotificationTimes[check.ordinal()] = currentTime; + if (check.notifyIfNecessary(msg, _queue, this)) + { + _lastNotificationTimes[check.ordinal()] = currentTime; + } } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index e3c8d3f17a..a61d41e33b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -363,8 +363,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager protocolSession.getProtocolOutputConverter().writeGetOk(entry.getMessage(), channel.getChannelId(), deliveryTag, _queue.getMessageCount()); - _totalMessageSize.addAndGet(-entry.getSize()); + } + _totalMessageSize.addAndGet(-entry.getSize()); if (!acks) { @@ -918,7 +919,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { if (!s.isSuspended()) { - if (_log.isDebugEnabled()) + if (debugEnabled) { _log.debug(debugIdentity() + "Delivering Message:" + entry.getMessage().debugIdentity() + " to(" + System.identityHashCode(s) + ") :" + s); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java b/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java index 6b3d65661f..6f9efd3200 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java @@ -29,9 +29,9 @@ public enum NotificationCheck {
boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
{
- int msgCount = queue.getMessageCount();
+ int msgCount;
final long maximumMessageCount = queue.getMaximumMessageCount();
- if (maximumMessageCount!= 0 && msgCount >= maximumMessageCount)
+ if (maximumMessageCount!= 0 && (msgCount = queue.getMessageCount()) >= maximumMessageCount)
{
listener.notifyClients(this, queue, msgCount + ": Maximum count on queue threshold ("+ maximumMessageCount +") breached.");
return true;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index 7f0accf052..6e68b5637e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -292,14 +292,17 @@ public class SubscriptionImpl implements Subscription queue.dequeue(storeContext, entry); } +/* + if (_sendLock.get()) + { + _logger.error("Sending " + entry + " when subscriber(" + this + ") is closed!"); + } +*/ + synchronized (channel) { long deliveryTag = channel.getNextDeliveryTag(); - if (_sendLock.get()) - { - _logger.error("Sending " + entry + " when subscriber(" + this + ") is closed!"); - } if (_acks) { @@ -308,10 +311,11 @@ public class SubscriptionImpl implements Subscription protocolSession.getProtocolOutputConverter().writeDeliver(entry.getMessage(), channel.getChannelId(), deliveryTag, consumerTag); - if (!_acks) - { - entry.getMessage().decrementReference(storeContext); - } + + } + if (!_acks) + { + entry.getMessage().decrementReference(storeContext); } } finally @@ -367,59 +371,60 @@ public class SubscriptionImpl implements Subscription // return false; } - final AMQProtocolSession publisher = entry.getMessage().getPublisher(); + //todo - client id should be recoreded and this test removed but handled below - if (_noLocal && publisher != null) + if (_noLocal) { - // We don't want local messages so check to see if message is one we sent - Object localInstance; - Object msgInstance; - if ((protocolSession.getClientProperties() != null) && - (localInstance = protocolSession.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null) + final AMQProtocolSession publisher = entry.getMessage().getPublisher(); + if(publisher != null) + { + // We don't want local messages so check to see if message is one we sent + Object localInstance; + Object msgInstance; - if ((publisher.getClientProperties() != null) && - (msgInstance = publisher.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null) + if ((protocolSession.getClientProperties() != null) && + (localInstance = protocolSession.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null) { - if (localInstance == msgInstance || localInstance.equals(msgInstance)) + + if ((publisher.getClientProperties() != null) && + (msgInstance = publisher.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null) { -// if (_logger.isTraceEnabled()) -// { -// _logger.trace("(" + debugIdentity() + ") has no interest as it is a local message(" + -// msg.debugIdentity() + ")"); -// } - return false; + if (localInstance == msgInstance || localInstance.equals(msgInstance)) + { + // if (_logger.isTraceEnabled()) + // { + // _logger.trace("(" + debugIdentity() + ") has no interest as it is a local message(" + + // msg.debugIdentity() + ")"); + // } + return false; + } } } - } - else - { + else + { - localInstance = protocolSession.getClientIdentifier(); - //todo - client id should be recoreded and this test removed but handled here + localInstance = protocolSession.getClientIdentifier(); + //todo - client id should be recoreded and this test removed but handled here - msgInstance = publisher.getClientIdentifier(); - if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance))) - { -// if (_logger.isTraceEnabled()) -// { -// _logger.trace("(" + debugIdentity() + ") has no interest as it is a local message(" + -// msg.debugIdentity() + ")"); -// } - return false; + msgInstance = publisher.getClientIdentifier(); + if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance))) + { + // if (_logger.isTraceEnabled()) + // { + // _logger.trace("(" + debugIdentity() + ") has no interest as it is a local message(" + + // msg.debugIdentity() + ")"); + // } + return false; + } } - } - + } } - if (_logger.isDebugEnabled()) - { - _logger.debug("(" + debugIdentity() + ") checking filters for message (" + entry.debugIdentity()); - } return checkFilters(entry); } @@ -433,23 +438,7 @@ public class SubscriptionImpl implements Subscription private boolean checkFilters(QueueEntry msg) { - if (_filters != null) - { -// if (_logger.isTraceEnabled()) -// { -// _logger.trace("(" + debugIdentity() + ") has filters."); -// } - return _filters.allAllow(msg.getMessage()); - } - else - { -// if (_logger.isTraceEnabled()) -// { -// _logger.trace("(" + debugIdentity() + ") has no filters"); -// } - - return true; - } + return (_filters == null) || _filters.allAllow(msg.getMessage()); } public Queue<QueueEntry> getPreDeliveryQueue() @@ -613,7 +602,7 @@ public class SubscriptionImpl implements Subscription public boolean wouldSuspend(QueueEntry msg) { - return channel.wouldSuspend(msg.getMessage()); + return _acks && channel.wouldSuspend(msg.getMessage()); } public Queue<QueueEntry> getResendQueue() diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java index b2f8cae8ff..b7cdaa29ab 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java @@ -39,6 +39,7 @@ class SubscriptionSet implements WeightedSubscriptionManager private int _currentSubscriber; private final Object _changeLock = new Object(); + private volatile boolean _exclusive; /** Accessor for unit tests. */ @@ -116,10 +117,7 @@ class SubscriptionSet implements WeightedSubscriptionManager */ public Subscription nextSubscriber(QueueEntry msg) { - if (_subscriptions.isEmpty()) - { - return null; - } + try { @@ -143,30 +141,64 @@ class SubscriptionSet implements WeightedSubscriptionManager private Subscription nextSubscriberImpl(QueueEntry msg) { - final ListIterator<Subscription> iterator = _subscriptions.listIterator(_currentSubscriber); - while (iterator.hasNext()) + if(_exclusive) { - Subscription subscription = iterator.next(); - ++_currentSubscriber; - subscriberScanned(); - - if (!(subscription.isSuspended() || subscription.wouldSuspend(msg))) + try { - if (subscription.hasInterest(msg)) + Subscription subscription = _subscriptions.get(0); + subscriberScanned(); + + if (!(subscription.isSuspended() || subscription.wouldSuspend(msg))) { - // if the queue is not empty then this client is ready to receive a message. - //FIXME the queue could be full of sent messages. - // Either need to clean all PDQs after sending a message - // OR have a clean up thread that runs the PDQs expunging the messages. - if (!subscription.filtersMessages() || subscription.getPreDeliveryQueue().isEmpty()) + if (subscription.hasInterest(msg)) { - return subscription; + // if the queue is not empty then this client is ready to receive a message. + //FIXME the queue could be full of sent messages. + // Either need to clean all PDQs after sending a message + // OR have a clean up thread that runs the PDQs expunging the messages. + if (!subscription.filtersMessages() || subscription.getPreDeliveryQueue().isEmpty()) + { + return subscription; + } } } } + catch(IndexOutOfBoundsException e) + { + } + return null; } + else + { + if (_subscriptions.isEmpty()) + { + return null; + } + final ListIterator<Subscription> iterator = _subscriptions.listIterator(_currentSubscriber); + while (iterator.hasNext()) + { + Subscription subscription = iterator.next(); + ++_currentSubscriber; + subscriberScanned(); - return null; + if (!(subscription.isSuspended() || subscription.wouldSuspend(msg))) + { + if (subscription.hasInterest(msg)) + { + // if the queue is not empty then this client is ready to receive a message. + //FIXME the queue could be full of sent messages. + // Either need to clean all PDQs after sending a message + // OR have a clean up thread that runs the PDQs expunging the messages. + if (!subscription.filtersMessages() || subscription.getPreDeliveryQueue().isEmpty()) + { + return subscription; + } + } + } + } + + return null; + } } /** Overridden in test classes. */ @@ -233,5 +265,14 @@ class SubscriptionSet implements WeightedSubscriptionManager { return _changeLock; } - + + public void setExclusive(final boolean exclusive) + { + _exclusive = exclusive; + } + + public boolean getExcBoolean() + { + return _exclusive; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java b/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java index 79ee6b93a3..9b91c71a1d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java @@ -22,6 +22,8 @@ package org.apache.qpid.server.queue; import java.util.LinkedList; import java.util.List; +import java.util.ArrayList; +import java.util.Collections; import org.apache.qpid.AMQException; import org.apache.qpid.framing.abstraction.MessagePublishInfo; @@ -60,7 +62,7 @@ public class TransientMessageData * delivered. It is <b>cleared after delivery has been attempted</b>. Any persistent record of destinations is done * by the message handle. */ - private List<AMQQueue> _destinationQueues = new LinkedList<AMQQueue>(); + private List<AMQQueue> _destinationQueues; public MessagePublishInfo getMessagePublishInfo() { @@ -74,7 +76,7 @@ public class TransientMessageData public List<AMQQueue> getDestinationQueues() { - return _destinationQueues; + return _destinationQueues == null ? (List<AMQQueue>) Collections.EMPTY_LIST : _destinationQueues; } public void setDestinationQueues(List<AMQQueue> destinationQueues) @@ -109,6 +111,10 @@ public class TransientMessageData public void addDestinationQueue(AMQQueue queue) { + if(_destinationQueues == null) + { + _destinationQueues = new ArrayList<AMQQueue>(); + } _destinationQueues.add(queue); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java index 047cef9064..1e4b69c935 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java @@ -49,11 +49,11 @@ public class NonTransactionalContext implements TransactionalContext /** Where to put undeliverable messages */ private final List<RequiredDeliveryException> _returnMessages; - private Set<Long> _browsedAcks; + private final Set<Long> _browsedAcks; private final MessageStore _messageStore; - private StoreContext _storeContext; + private final StoreContext _storeContext; /** Whether we are in a transaction */ private boolean _inTran; diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index 9addf83e01..afe96bcd4f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.virtualhost; import javax.management.NotCompliantMBeanException; import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.log4j.Logger; import org.apache.qpid.server.AMQBrokerManagerMBean; import org.apache.qpid.server.security.access.AccessManager; @@ -123,7 +124,7 @@ public class VirtualHost implements Accessable */ public VirtualHost(String name, MessageStore store) throws Exception { - this(name, null, store); + this(name, new PropertiesConfiguration(), store); } /** diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java index fbd9e65480..ed79384d42 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java @@ -173,7 +173,7 @@ public class AMQQueueAlertTest extends TestCase public void testQueueDepthAlertWithSubscribers() throws Exception { protocolSession = new TestMinaProtocolSession(); - AMQChannel channel = new AMQChannel(protocolSession, 2, _messageStore, null); + AMQChannel channel = new AMQChannel(protocolSession, 2, _messageStore); protocolSession.addChannel(channel); // Create queue diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index e72e1bf1f0..c02b47e9fd 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -123,7 +123,7 @@ public class AMQQueueMBeanTest extends TestCase TestMinaProtocolSession protocolSession = new TestMinaProtocolSession(); - AMQChannel channel = new AMQChannel(protocolSession, 1, _messageStore, null); + AMQChannel channel = new AMQChannel(protocolSession, 1, _messageStore); protocolSession.addChannel(channel); _queue.registerProtocolSession(protocolSession, 1, new AMQShortString("test"), false, null, false, false); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 79d92f7705..b60a8dfaad 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -73,6 +73,105 @@ import java.util.concurrent.atomic.AtomicInteger; public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable { + private static final class ChannelToSessionMap + { + private final AMQSession[] _fastAccessSessions = new AMQSession[16]; + private final LinkedHashMap<Integer, AMQSession> _slowAccessSessions = new LinkedHashMap<Integer, AMQSession>(); + private int _size = 0; + private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0; + + public AMQSession get(int channelId) + { + if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) + { + return _fastAccessSessions[channelId]; + } + else + { + return _slowAccessSessions.get(channelId); + } + } + + public AMQSession put(int channelId, AMQSession session) + { + AMQSession oldVal; + if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) + { + oldVal = _fastAccessSessions[channelId]; + _fastAccessSessions[channelId] = session; + } + else + { + oldVal = _slowAccessSessions.put(channelId, session); + } + if((oldVal != null) && (session == null)) + { + _size--; + } + else if((oldVal == null) && (session != null)) + { + _size++; + } + + return session; + + } + + + public AMQSession remove(int channelId) + { + AMQSession session; + if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) + { + session = _fastAccessSessions[channelId]; + _fastAccessSessions[channelId] = null; + } + else + { + session = _slowAccessSessions.remove(channelId); + } + + if(session != null) + { + _size--; + } + return session; + + } + + public Collection<AMQSession> values() + { + ArrayList<AMQSession> values = new ArrayList<AMQSession>(size()); + + for(int i = 0; i < 16; i++) + { + if(_fastAccessSessions[i] != null) + { + values.add(_fastAccessSessions[i]); + } + } + values.addAll(_slowAccessSessions.values()); + + return values; + } + + public int size() + { + return _size; + } + + public void clear() + { + _size = 0; + _slowAccessSessions.clear(); + for(int i = 0; i<16; i++) + { + _fastAccessSessions[i] = null; + } + } + } + + private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class); private AtomicInteger _idFactory = new AtomicInteger(0); @@ -102,7 +201,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private AMQProtocolHandler _protocolHandler; /** Maps from session id (Integer) to AMQSession instance */ - private final Map<Integer, AMQSession> _sessions = new LinkedHashMap<Integer, AMQSession>(); + private final ChannelToSessionMap _sessions = new ChannelToSessionMap(); private String _clientName; @@ -757,10 +856,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect checkNotClosed(); if (!_started) { - final Iterator it = _sessions.entrySet().iterator(); + final Iterator it = _sessions.values().iterator(); while (it.hasNext()) { - final AMQSession s = (AMQSession) ((Map.Entry) it.next()).getValue(); + final AMQSession s = (AMQSession) (it.next()); try { s.start(); @@ -1014,11 +1113,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect return _maximumFrameSize; } - public Map getSessions() - { - return _sessions; - } - public String getUsername() { return _username; @@ -1239,6 +1333,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect // _protocolHandler.addSessionByChannel(s.getChannelId(), s); reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.getTransacted()); s.resubscribe(); + s.setFlowControl(true); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 0c94216597..87c813982e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -78,10 +78,7 @@ import javax.jms.TopicSubscriber; import javax.jms.TransactionRolledBackException; import java.io.Serializable; import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Iterator; -import java.util.Map; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; @@ -107,6 +104,89 @@ import java.util.concurrent.atomic.AtomicLong; */ public class AMQSession extends Closeable implements Session, QueueSession, TopicSession { + private static final class IdToConsumerMap + { + private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16]; + private final ConcurrentHashMap<Integer, BasicMessageConsumer> _slowAccessConsumers = new ConcurrentHashMap<Integer, BasicMessageConsumer>(); + + + public BasicMessageConsumer get(int id) + { + if((id & 0xFFFFFFF0) == 0) + { + return _fastAccessConsumers[id]; + } + else + { + return _slowAccessConsumers.get(id); + } + } + + public BasicMessageConsumer put(int id, BasicMessageConsumer consumer) + { + BasicMessageConsumer oldVal; + if((id & 0xFFFFFFF0) == 0) + { + oldVal = _fastAccessConsumers[id]; + _fastAccessConsumers[id] = consumer; + } + else + { + oldVal = _slowAccessConsumers.put(id, consumer); + } + + return consumer; + + } + + + public BasicMessageConsumer remove(int id) + { + BasicMessageConsumer consumer; + if((id & 0xFFFFFFF0) == 0) + { + consumer = _fastAccessConsumers[id]; + _fastAccessConsumers[id] = null; + } + else + { + consumer = _slowAccessConsumers.remove(id); + } + + return consumer; + + } + + public Collection<BasicMessageConsumer> values() + { + ArrayList<BasicMessageConsumer> values = new ArrayList<BasicMessageConsumer>(); + + for(int i = 0; i < 16; i++) + { + if(_fastAccessConsumers[i] != null) + { + values.add(_fastAccessConsumers[i]); + } + } + values.addAll(_slowAccessConsumers.values()); + + return values; + } + + + public void clear() + { + _slowAccessConsumers.clear(); + for(int i = 0; i<16; i++) + { + _fastAccessConsumers[i] = null; + } + } + } + + + + /** Used for debugging. */ private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class); @@ -156,7 +236,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private boolean _transacted; /** Holds the sessions acknowledgement mode. */ - private int _acknowledgeMode; + private final int _acknowledgeMode; /** Holds this session unique identifier, used to distinguish it from other sessions. */ private int _channelId; @@ -217,8 +297,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * Maps from identifying tags to message consumers, in order to pass dispatch incoming messages to the right * consumer. */ - private Map<AMQShortString, BasicMessageConsumer> _consumers = - new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>(); + private final IdToConsumerMap _consumers = new IdToConsumerMap(); + + //Map<AMQShortString, BasicMessageConsumer> _consumers = + //new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>(); /** * Contains a list of consumers which have been removed but which might still have @@ -281,6 +363,27 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi /** Has failover occured on this session */ private boolean _failedOver; + + + private static final class FlowControlIndicator + { + private volatile boolean _flowControl = true; + + public synchronized void setFlowControl(boolean flowControl) + { + _flowControl= flowControl; + notify(); + } + + public boolean getFlowControl() + { + return _flowControl; + } + } + + /** Flow control */ + private FlowControlIndicator _flowControl = new FlowControlIndicator(); + /** * Creates a new session on a connection. * @@ -327,24 +430,20 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { public void aboveThreshold(int currentValue) { - if (_acknowledgeMode == NO_ACKNOWLEDGE) - { _logger.debug( "Above threshold(" + _defaultPrefetchHighMark + ") so suspending channel. Current value is " + currentValue); new Thread(new SuspenderRunner(true)).start(); - } + } public void underThreshold(int currentValue) { - if (_acknowledgeMode == NO_ACKNOWLEDGE) - { _logger.debug( "Below threshold(" + _defaultPrefetchLowMark + ") so unsuspending channel. Current value is " + currentValue); new Thread(new SuspenderRunner(false)).start(); - } + } }); } @@ -662,7 +761,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { // Remove the consumer from the map - BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag); + BasicMessageConsumer consumer = _consumers.get(consumerTag.toIntValue()); if (consumer != null) { // fixme this isn't right.. needs to check if _queue contains data for this consumer @@ -744,6 +843,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi false, false); } + + public MessageConsumer createExclusiveConsumer(Destination destination) throws JMSException + { + checkValidDestination(destination); + + return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, true, null, null, + false, false); + } + + public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException { checkValidDestination(destination); @@ -761,6 +870,17 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi messageSelector, null, false, false); } + + public MessageConsumer createExclusiveConsumer(Destination destination, String messageSelector, boolean noLocal) + throws JMSException + { + checkValidDestination(destination); + + return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, true, + messageSelector, null, false, false); + } + + public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive, String selector) throws JMSException { @@ -925,7 +1045,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { checkNotClosed(); - return new TopicPublisherAdapter((BasicMessageProducer) createProducer(topic), topic); + return new TopicPublisherAdapter((BasicMessageProducer) createProducer(topic,false,false), topic); } public Queue createQueue(String queueName) throws JMSException @@ -1089,9 +1209,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi AMQTopic dest = checkValidTopic(topic); // AMQTopic dest = new AMQTopic(topic.getTopicName()); - return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest)); + return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createExclusiveConsumer(dest)); } + /** * Creates a non-durable subscriber with a message selector * @@ -1109,7 +1230,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi AMQTopic dest = checkValidTopic(topic); // AMQTopic dest = new AMQTopic(topic.getTopicName()); - return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal)); + return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createExclusiveConsumer(dest, messageSelector, noLocal)); } public TemporaryQueue createTemporaryQueue() throws JMSException @@ -1276,15 +1397,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi + "] received in session with channel id " + _channelId); } - if (message.getDeliverBody() == null) + if (message.isDeliverMessage()) { - // Return of the bounced message. - returnBouncedMessage(message); + _highestDeliveryTag.set(message.getDeliverBody().getDeliveryTag()); + _queue.add(message); } else { - _highestDeliveryTag.set(message.getDeliverBody().getDeliveryTag()); - _queue.add(message); + // Return of the bounced message. + returnBouncedMessage(message); } } @@ -1666,7 +1787,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ void deregisterConsumer(BasicMessageConsumer consumer) { - if (_consumers.remove(consumer.getConsumerTag()) != null) + if (_consumers.remove(consumer.getConsumerTag().toIntValue()) != null) { String subscriptionName = _reverseSubscriptionMap.remove(consumer); if (subscriptionName != null) @@ -2063,8 +2184,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector) throws AMQException, FailoverException { + int tagId = _nextTag++; // need to generate a consumer tag on the client so we can exploit the nowait flag - AMQShortString tag = new AMQShortString(Integer.toString(_nextTag++)); + AMQShortString tag = new AMQShortString(Integer.toString(tagId)); FieldTable arguments = FieldTableFactory.newFieldTable(); if ((messageSelector != null) && !messageSelector.equals("")) @@ -2084,7 +2206,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi consumer.setConsumerTag(tag); // we must register the consumer in the map before we actually start listening - _consumers.put(tag, consumer); + _consumers.put(tagId, consumer); try { @@ -2112,7 +2234,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi catch (AMQException e) { // clean-up the map in the event of an error - _consumers.remove(tag); + _consumers.remove(tagId); throw e; } } @@ -2659,6 +2781,25 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _ticket = ticket; } + public void setFlowControl(final boolean active) + { + _flowControl.setFlowControl(active); + } + + + public void checkFlowControl() throws InterruptedException + { + synchronized(_flowControl) + { + while(!_flowControl.getFlowControl()) + { + _flowControl.wait(); + } + } + + } + + /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ private class Dispatcher extends Thread { @@ -2850,10 +2991,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private void dispatchMessage(UnprocessedMessage message) { - if (message.getDeliverBody() != null) + final BasicDeliverBody deliverBody = message.getDeliverBody(); + if (deliverBody != null) { final BasicMessageConsumer consumer = - (BasicMessageConsumer) _consumers.get(message.getDeliverBody().getConsumerTag()); + _consumers.get(deliverBody.getConsumerTag().toIntValue()); if ((consumer == null) || consumer.isClosed()) { @@ -2862,13 +3004,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (consumer == null) { _dispatcherLogger.info("Dispatcher(" + dispatcherID + ")Received a message(" + System.identityHashCode(message) + ")" + "[" - + message.getDeliverBody().getDeliveryTag() + "] from queue " - + message.getDeliverBody().getConsumerTag() + " )without a handler - rejecting(requeue)..."); + + deliverBody.getDeliveryTag() + "] from queue " + + deliverBody.getConsumerTag() + " )without a handler - rejecting(requeue)..."); } else { _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "[" - + message.getDeliverBody().getDeliveryTag() + "] from queue " + " consumer(" + + deliverBody.getDeliveryTag() + "] from queue " + " consumer(" + consumer.debugIdentity() + ") is closed rejecting(requeue)..."); } } @@ -2880,7 +3022,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } else { - consumer.notifyMessage(message, _channelId); + consumer.notifyMessage(message); } } } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 488d22c4bd..bf11572163 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -26,11 +26,7 @@ import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicCancelBody; -import org.apache.qpid.framing.BasicCancelOkBody; -import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.*; import org.apache.qpid.jms.MessageConsumer; import org.apache.qpid.jms.Session; import org.slf4j.Logger; @@ -53,13 +49,13 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer private static final Logger _logger = LoggerFactory.getLogger(BasicMessageConsumer.class); /** The connection being used by this consumer */ - private AMQConnection _connection; + private final AMQConnection _connection; - private String _messageSelector; + private final String _messageSelector; - private boolean _noLocal; + private final boolean _noLocal; - private AMQDestination _destination; + private final AMQDestination _destination; /** When true indicates that a blocking receive call is in progress */ private final AtomicBoolean _receiving = new AtomicBoolean(false); @@ -70,7 +66,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer private AMQShortString _consumerTag; /** We need to know the channel id when constructing frames */ - private int _channelId; + private final int _channelId; /** * Used in the blocking receive methods to receive a message from the Session thread. <p/> Or to notify of errors @@ -78,36 +74,36 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer */ private final ArrayBlockingQueue _synchronousQueue; - private MessageFactoryRegistry _messageFactory; + private final MessageFactoryRegistry _messageFactory; private final AMQSession _session; - private AMQProtocolHandler _protocolHandler; + private final AMQProtocolHandler _protocolHandler; /** We need to store the "raw" field table so that we can resubscribe in the event of failover being required */ - private FieldTable _rawSelectorFieldTable; + private final FieldTable _rawSelectorFieldTable; /** * We store the high water prefetch field in order to be able to reuse it when resubscribing in the event of * failover */ - private int _prefetchHigh; + private final int _prefetchHigh; /** * We store the low water prefetch field in order to be able to reuse it when resubscribing in the event of * failover */ - private int _prefetchLow; + private final int _prefetchLow; /** We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover */ - private boolean _exclusive; + private final boolean _exclusive; /** * The acknowledge mode in force for this consumer. Note that the AMQP protocol allows different ack modes per * consumer whereas JMS defines this at the session level, hence why we associate it with the consumer in our * implementation. */ - private int _acknowledgeMode; + private final int _acknowledgeMode; /** Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode */ private int _outstanding; @@ -133,10 +129,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer * autoClose denotes that the consumer will automatically cancel itself when there are no more messages to receive * on the queue. This is used for queue browsing. */ - private boolean _autoClose; + private final boolean _autoClose; private boolean _closeWhenNoMessages; - private boolean _noConsume; + private final boolean _noConsume; private List<StackTraceElement> _closedStack = null; protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, @@ -156,7 +152,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer _prefetchHigh = prefetchHigh; _prefetchLow = prefetchLow; _exclusive = exclusive; - _acknowledgeMode = acknowledgeMode; + _synchronousQueue = new ArrayBlockingQueue(prefetchHigh, true); _autoClose = autoClose; _noConsume = noConsume; @@ -166,6 +162,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { _acknowledgeMode = Session.NO_ACKNOWLEDGE; } + else + { + _acknowledgeMode = acknowledgeMode; + } } public AMQDestination getDestination() @@ -254,7 +254,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer switch (_acknowledgeMode) { case Session.DUPS_OK_ACKNOWLEDGE: - _logger.info("Recording tag for acking on close:" + msg.getDeliveryTag()); _receivedDeliveryTags.add(msg.getDeliveryTag()); break; @@ -269,7 +268,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } else { - _logger.info("Recording tag for commit:" + msg.getDeliveryTag()); _receivedDeliveryTags.add(msg.getDeliveryTag()); } @@ -645,9 +643,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer * message listener or a synchronous receive() caller. * * @param messageFrame the raw unprocessed mesage - * @param channelId channel on which this message was sent */ - void notifyMessage(UnprocessedMessage messageFrame, int channelId) + void notifyMessage(UnprocessedMessage messageFrame) { final boolean debug = _logger.isDebugEnabled(); @@ -658,10 +655,15 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer try { + final BasicDeliverBody deliverBody = messageFrame.getDeliverBody(); + AbstractJMSMessage jmsMessage = - _messageFactory.createMessage(messageFrame.getDeliverBody().getDeliveryTag(), - messageFrame.getDeliverBody().getRedelivered(), messageFrame.getDeliverBody().getExchange(), - messageFrame.getDeliverBody().getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies()); + _messageFactory.createMessage(deliverBody.getDeliveryTag(), + deliverBody.getRedelivered(), + deliverBody.getExchange(), + deliverBody.getRoutingKey(), + messageFrame.getContentHeader(), + messageFrame.getBodies()); if (debug) { @@ -673,11 +675,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer // if (!_closed.get()) { - jmsMessage.setConsumer(this); - preDeliver(jmsMessage); - notifyMessage(jmsMessage, channelId); + notifyMessage(jmsMessage); } // else // { @@ -702,9 +702,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer /** * @param jmsMessage this message has already been processed so can't redo preDeliver - * @param channelId */ - public void notifyMessage(AbstractJMSMessage jmsMessage, int channelId) + public void notifyMessage(AbstractJMSMessage jmsMessage) { try { diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index 7e96fb537c..ae71846870 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -538,6 +538,18 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j frames[0] = publishFrame; frames[1] = contentHeaderFrame; CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames); + + try + { + _session.checkFlowControl(); + } + catch (InterruptedException e) + { + JMSException jmsEx = new JMSException("Interrupted while waiting for flow control to be removed"); + jmsEx.setLinkedException(e); + throw jmsEx; + } + _protocolHandler.writeFrame(compositeFrame, wait); if (message != origMessage) diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java index 49c8a83833..d05e99d210 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java @@ -26,7 +26,6 @@ import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.framing.BasicDeliverBody; -import org.apache.qpid.protocol.AMQMethodEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,8 +45,8 @@ public class BasicDeliverMethodHandler implements StateAwareMethodListener<Basic throws AMQException { final AMQProtocolSession session = stateManager.getProtocolSession(); - final UnprocessedMessage msg = new UnprocessedMessage(channelId, body); + final UnprocessedMessage msg = new UnprocessedMessage.UnprocessedDeliverMessage(body); _logger.debug("New JmsDeliver method received"); - session.unprocessedMessageReceived(msg); + session.unprocessedMessageReceived(channelId, msg); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java index 428d366f07..2ebc9288c3 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java @@ -26,7 +26,6 @@ import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.framing.BasicReturnBody; -import org.apache.qpid.protocol.AMQMethodEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,9 +46,9 @@ public void methodReceived(AMQStateManager stateManager, BasicReturnBody body, i { _logger.debug("New JmsBounce method received"); final AMQProtocolSession session = stateManager.getProtocolSession(); - final UnprocessedMessage msg = new UnprocessedMessage(channelId, body); + final UnprocessedMessage msg = new UnprocessedMessage.UnprocessedBouncedMessage(body); - session.unprocessedMessageReceived(msg); + session.unprocessedMessageReceived(channelId, msg); } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java new file mode 100644 index 0000000000..b47fe751d6 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java @@ -0,0 +1,54 @@ +package org.apache.qpid.client.handler; + +import org.apache.qpid.framing.ChannelFlowBody; +import org.apache.qpid.client.state.StateAwareMethodListener; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.protocol.AMQProtocolSession; +import org.apache.qpid.AMQException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ + +public class ChannelFlowMethodHandler implements StateAwareMethodListener<ChannelFlowBody> +{ + private static final Logger _logger = LoggerFactory.getLogger(ChannelFlowMethodHandler.class); + private static final ChannelFlowMethodHandler _instance = new ChannelFlowMethodHandler(); + + public static ChannelFlowMethodHandler getInstance() + { + return _instance; + } + + private ChannelFlowMethodHandler() + { } + + public void methodReceived(AMQStateManager stateManager, ChannelFlowBody body, int channelId) + throws AMQException + { + + final AMQProtocolSession session = stateManager.getProtocolSession(); + session.setFlowControl(channelId, body.getActive()); + } + + +} diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index b029770946..1947a18653 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -55,8 +55,9 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach protected boolean _changedData; private Destination _destination; private JMSHeaderAdapter _headerAdapter; - private BasicMessageConsumer _consumer; - private boolean _strictAMQP; + + private static final boolean STRICT_AMQP_COMPLIANCE = + Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, AMQSession.STRICT_AMQP_DEFAULT)); protected AbstractJMSMessage(ByteBuffer data) { @@ -72,8 +73,6 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach _changedData = (data == null); _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders()); - _strictAMQP = - Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, AMQSession.STRICT_AMQP_DEFAULT)); } protected AbstractJMSMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange, @@ -121,7 +120,10 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach { if (getContentHeaderProperties().getMessageIdAsString() == null) { - getContentHeaderProperties().setMessageId("ID:" + UUID.randomUUID()); + StringBuilder b = new StringBuilder(39); + b.append("ID"); + b.append(UUID.randomUUID()); + getContentHeaderProperties().setMessageId(b.toString()); } return getContentHeaderProperties().getMessageIdAsString(); @@ -301,7 +303,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public boolean getBooleanProperty(AMQShortString propertyName) throws JMSException { - if (_strictAMQP) + if (STRICT_AMQP_COMPLIANCE) { throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); } @@ -311,7 +313,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public boolean getBooleanProperty(String propertyName) throws JMSException { - if (_strictAMQP) + if (STRICT_AMQP_COMPLIANCE) { throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); } @@ -321,7 +323,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public byte getByteProperty(String propertyName) throws JMSException { - if (_strictAMQP) + if (STRICT_AMQP_COMPLIANCE) { throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); } @@ -331,7 +333,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public byte[] getBytesProperty(AMQShortString propertyName) throws JMSException { - if (_strictAMQP) + if (STRICT_AMQP_COMPLIANCE) { throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); } @@ -341,7 +343,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public short getShortProperty(String propertyName) throws JMSException { - if (_strictAMQP) + if (STRICT_AMQP_COMPLIANCE) { throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); } @@ -351,7 +353,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public int getIntProperty(String propertyName) throws JMSException { - if (_strictAMQP) + if (STRICT_AMQP_COMPLIANCE) { throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); } @@ -361,7 +363,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public long getLongProperty(String propertyName) throws JMSException { - if (_strictAMQP) + if (STRICT_AMQP_COMPLIANCE) { throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); } @@ -371,7 +373,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public float getFloatProperty(String propertyName) throws JMSException { - if (_strictAMQP) + if (STRICT_AMQP_COMPLIANCE) { throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); } @@ -381,7 +383,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public double getDoubleProperty(String propertyName) throws JMSException { - if (_strictAMQP) + if (STRICT_AMQP_COMPLIANCE) { throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); } @@ -404,7 +406,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach } else { - if (_strictAMQP) + if (STRICT_AMQP_COMPLIANCE) { throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); } @@ -425,7 +427,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public void setBooleanProperty(AMQShortString propertyName, boolean b) throws JMSException { - if (_strictAMQP) + if (STRICT_AMQP_COMPLIANCE) { throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); } @@ -436,7 +438,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public void setBooleanProperty(String propertyName, boolean b) throws JMSException { - if (_strictAMQP) + if (STRICT_AMQP_COMPLIANCE) { throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); } @@ -447,7 +449,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public void setByteProperty(String propertyName, byte b) throws JMSException { - if (_strictAMQP) + if (STRICT_AMQP_COMPLIANCE) { throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); } @@ -458,7 +460,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public void setBytesProperty(AMQShortString propertyName, byte[] bytes) throws JMSException { - if (_strictAMQP) + if (STRICT_AMQP_COMPLIANCE) { throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); } @@ -469,7 +471,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public void setShortProperty(String propertyName, short i) throws JMSException { - if (_strictAMQP) + if (STRICT_AMQP_COMPLIANCE) { throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); } @@ -487,7 +489,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public void setLongProperty(String propertyName, long l) throws JMSException { - if (_strictAMQP) + if (STRICT_AMQP_COMPLIANCE) { throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); } @@ -498,7 +500,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public void setFloatProperty(String propertyName, float f) throws JMSException { - if (_strictAMQP) + if (STRICT_AMQP_COMPLIANCE) { throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); } @@ -509,7 +511,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public void setDoubleProperty(String propertyName, double v) throws JMSException { - if (_strictAMQP) + if (STRICT_AMQP_COMPLIANCE) { throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); } @@ -691,9 +693,4 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach } } - public void setConsumer(BasicMessageConsumer basicMessageConsumer) - { - _consumer = basicMessageConsumer; - } - } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java index a70acbabbe..d8fe964b85 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java @@ -55,7 +55,11 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm JMSMapMessage(ByteBuffer data) throws JMSException { super(data); // this instantiates a content header - populateMapFromData(); + if(data != null) + { + populateMapFromData(); + } + } JMSMapMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, AMQShortString routingKey, @@ -76,7 +80,7 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm public String toBodyString() throws JMSException { - return _map.toString(); + return _map == null ? "" : _map.toString(); } public AMQShortString getMimeTypeAsShortString() diff --git a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java index 5b199f2478..bc1ba155cb 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java @@ -36,33 +36,15 @@ import org.apache.qpid.framing.ContentHeaderBody; * Note that the actual work of creating a JMS message for the client code's use is done outside of the MINA dispatcher * thread in order to minimise the amount of work done in the MINA dispatcher thread. */ -public class UnprocessedMessage +public abstract class UnprocessedMessage { - private long _bytesReceived = 0; + private long _bytesReceived = 0L; - private final BasicDeliverBody _deliverBody; - private final BasicReturnBody _bounceBody; // TODO: check change (gustavo) - private final int _channelId; private ContentHeaderBody _contentHeader; /** List of ContentBody instances. Due to fragmentation you don't know how big this will be in general */ private List<ContentBody> _bodies; - public UnprocessedMessage(int channelId, BasicDeliverBody deliverBody) - { - _deliverBody = deliverBody; - _channelId = channelId; - _bounceBody = null; - } - - - public UnprocessedMessage(int channelId, BasicReturnBody bounceBody) - { - _deliverBody = null; - _channelId = channelId; - _bounceBody = bounceBody; - } - public void receiveBody(ContentBody body) //throws UnexpectedBodyReceivedException { @@ -96,22 +78,11 @@ public class UnprocessedMessage return _bytesReceived == getContentHeader().bodySize; } - public BasicDeliverBody getDeliverBody() - { - return _deliverBody; - } - public BasicReturnBody getBounceBody() - { - return _bounceBody; - } + abstract public BasicDeliverBody getDeliverBody(); - public int getChannelId() - { - return _channelId; - } - + abstract public BasicReturnBody getBounceBody(); public ContentHeaderBody getContentHeader() { @@ -128,4 +99,60 @@ public class UnprocessedMessage return _bodies; } + abstract public boolean isDeliverMessage(); + + public static final class UnprocessedDeliverMessage extends UnprocessedMessage + { + private final BasicDeliverBody _body; + + public UnprocessedDeliverMessage(final BasicDeliverBody body) + { + _body = body; + } + + + public BasicDeliverBody getDeliverBody() + { + return _body; + } + + public BasicReturnBody getBounceBody() + { + return null; + } + + public boolean isDeliverMessage() + { + return true; + } + } + + public static final class UnprocessedBouncedMessage extends UnprocessedMessage + { + private final BasicReturnBody _body; + + public UnprocessedBouncedMessage(final BasicReturnBody body) + { + _body = body; + } + + + public BasicDeliverBody getDeliverBody() + { + return null; + } + + public BasicReturnBody getBounceBody() + { + return _body; + } + + public boolean isDeliverMessage() + { + return false; + } + } + + + } diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index f70c1faa84..3dee0b0142 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -407,7 +407,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter */ public void propagateExceptionToWaiters(Exception e) { - getStateManager().error(e); + if (!_frameListeners.isEmpty()) { final Iterator it = _frameListeners.iterator(); @@ -439,78 +439,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody); - switch (bodyFrame.getFrameType()) - { - case AMQMethodBody.TYPE: - - if (debug) - { - _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + frame); - } - - final AMQMethodEvent<AMQMethodBody> evt = - new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), (AMQMethodBody) bodyFrame); - - try - { - - boolean wasAnyoneInterested = getStateManager().methodReceived(evt); - if (!_frameListeners.isEmpty()) - { - Iterator it = _frameListeners.iterator(); - while (it.hasNext()) - { - final AMQMethodListener listener = (AMQMethodListener) it.next(); - wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested; - } - } - - if (!wasAnyoneInterested) - { - throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" - + _frameListeners); - } - } - catch (AMQException e) - { - getStateManager().error(e); - if (!_frameListeners.isEmpty()) - { - Iterator it = _frameListeners.iterator(); - while (it.hasNext()) - { - final AMQMethodListener listener = (AMQMethodListener) it.next(); - listener.error(e); - } - } - - exceptionCaught(session, e); - } - - break; - - case ContentHeaderBody.TYPE: - - _protocolSession.messageContentHeaderReceived(frame.getChannel(), (ContentHeaderBody) bodyFrame); - break; + bodyFrame.handle(frame.getChannel(),_protocolSession); - case ContentBody.TYPE: - - _protocolSession.messageContentBodyReceived(frame.getChannel(), (ContentBody) bodyFrame); - break; - - case HeartbeatBody.TYPE: - - if (debug) - { - _logger.debug("Received heartbeat"); - } - - break; - - default: - - } _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes()); } @@ -528,6 +458,55 @@ public class AMQProtocolHandler extends IoHandlerAdapter } } + public void methodBodyReceived(final int channelId, final AMQBody bodyFrame, IoSession session)//, final IoSession session) + throws AMQException + { + + if (_logger.isDebugEnabled()) + { + _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + bodyFrame); + } + + final AMQMethodEvent<AMQMethodBody> evt = + new AMQMethodEvent<AMQMethodBody>(channelId, (AMQMethodBody) bodyFrame); + + try + { + + boolean wasAnyoneInterested = getStateManager().methodReceived(evt); + if (!_frameListeners.isEmpty()) + { + Iterator it = _frameListeners.iterator(); + while (it.hasNext()) + { + final AMQMethodListener listener = (AMQMethodListener) it.next(); + wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested; + } + } + + if (!wasAnyoneInterested) + { + throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" + + _frameListeners); + } + } + catch (AMQException e) + { + if (!_frameListeners.isEmpty()) + { + Iterator it = _frameListeners.iterator(); + while (it.hasNext()) + { + final AMQMethodListener listener = (AMQMethodListener) it.next(); + listener.error(e); + } + } + + exceptionCaught(session, e); + } + + } + private static int _messagesOut; public void messageSent(IoSession session, Object message) throws Exception diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index b48adbdb08..6a5cc62bfc 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -74,8 +74,6 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession */ protected final AMQProtocolHandler _protocolHandler; - /** Maps from the channel id to the AMQSession that it represents. */ - protected ConcurrentMap<Integer, AMQSession> _channelId2SessionMap = new ConcurrentHashMap<Integer, AMQSession>(); protected ConcurrentMap _closingChannels = new ConcurrentHashMap(); @@ -83,7 +81,8 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession * Maps from a channel id to an unprocessed message. This is used to tie together the JmsDeliverBody (which arrives * first) with the subsequent content header and content bodies. */ - protected ConcurrentMap _channelId2UnprocessedMsgMap = new ConcurrentHashMap(); + private final ConcurrentMap<Integer,UnprocessedMessage> _channelId2UnprocessedMsgMap = new ConcurrentHashMap<Integer,UnprocessedMessage>(); + private final UnprocessedMessage[] _channelId2UnprocessedMsgArray = new UnprocessedMessage[16]; /** Counter to ensure unique queue names */ protected int _queueId = 1; @@ -101,7 +100,8 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession private MethodDispatcher _methodDispatcher; - private final AMQConnection _connection; + private final AMQConnection _connection; + private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0; public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection) { @@ -230,14 +230,24 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession * * @throws AMQException if this was not expected */ - public void unprocessedMessageReceived(UnprocessedMessage message) throws AMQException + public void unprocessedMessageReceived(final int channelId, UnprocessedMessage message) throws AMQException { - _channelId2UnprocessedMsgMap.put(message.getChannelId(), message); + if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) + { + _channelId2UnprocessedMsgArray[channelId] = message; + } + else + { + _channelId2UnprocessedMsgMap.put(channelId, message); + } } - public void messageContentHeaderReceived(int channelId, ContentHeaderBody contentHeader) throws AMQException + public void contentHeaderReceived(int channelId, ContentHeaderBody contentHeader) throws AMQException { - UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap.get(channelId); + final UnprocessedMessage msg = (channelId & FAST_CHANNEL_ACCESS_MASK) == 0 ? _channelId2UnprocessedMsgArray[channelId] + : _channelId2UnprocessedMsgMap.get(channelId); + + if (msg == null) { throw new AMQException("Error: received content header without having received a BasicDeliver frame first"); @@ -256,9 +266,19 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession } } - public void messageContentBodyReceived(int channelId, ContentBody contentBody) throws AMQException + public void contentBodyReceived(final int channelId, ContentBody contentBody) throws AMQException { - UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap.get(channelId); + UnprocessedMessage msg; + final boolean fastAccess = (channelId & FAST_CHANNEL_ACCESS_MASK) == 0; + if(fastAccess) + { + msg = _channelId2UnprocessedMsgArray[channelId]; + } + else + { + msg = _channelId2UnprocessedMsgMap.get(channelId); + } + if (msg == null) { throw new AMQException("Error: received content body without having received a JMSDeliver frame first"); @@ -266,7 +286,14 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession if (msg.getContentHeader() == null) { - _channelId2UnprocessedMsgMap.remove(channelId); + if(fastAccess) + { + _channelId2UnprocessedMsgArray[channelId] = null; + } + else + { + _channelId2UnprocessedMsgMap.remove(channelId); + } throw new AMQException("Error: received content body without having received a ContentHeader frame first"); } @@ -286,6 +313,11 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession } } + public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException + { + + } + /** * Deliver a message to the appropriate session, removing the unprocessed message from our map * @@ -296,7 +328,14 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession { AMQSession session = getSession(channelId); session.messageReceived(msg); - _channelId2UnprocessedMsgMap.remove(channelId); + if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) + { + _channelId2UnprocessedMsgArray[channelId] = null; + } + else + { + _channelId2UnprocessedMsgMap.remove(channelId); + } } protected AMQSession getSession(int channelId) @@ -486,4 +525,15 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession { _methodDispatcher = methodDispatcher; } + + public void setFlowControl(final int channelId, final boolean active) + { + final AMQSession session = getSession(channelId); + session.setFlowControl(active); + } + + public void methodFrameReceived(final int channel, final AMQMethodBody amqMethodBody) throws AMQException + { + _protocolHandler.methodBodyReceived(channel, amqMethodBody, _minaProtocolSession); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java index e62fce0f60..2e6a4beb83 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java @@ -37,7 +37,7 @@ import java.util.concurrent.CopyOnWriteArraySet; * The state manager is responsible for managing the state of the protocol session. <p/> For each AMQProtocolHandler * there is a separate state manager. */ -public class AMQStateManager implements AMQMethodListener +public class AMQStateManager { private static final Logger _logger = LoggerFactory.getLogger(AMQStateManager.class); @@ -52,7 +52,7 @@ public class AMQStateManager implements AMQMethodListener * AMQFrame. */ - private final CopyOnWriteArraySet _stateListeners = new CopyOnWriteArraySet(); + private final Object _stateLock = new Object(); private static final long MAXIMUM_STATE_WAIT_TIME = Long.parseLong(System.getProperty("amqj.MaximumStateWait", "30000")); @@ -91,19 +91,6 @@ public class AMQStateManager implements AMQMethodListener } } - public void error(Exception e) - { - _logger.debug("State manager receive error notification: " + e); - synchronized (_stateListeners) - { - final Iterator it = _stateListeners.iterator(); - while (it.hasNext()) - { - final StateListener l = (StateListener) it.next(); - l.error(e); - } - } - } public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws AMQException { diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java index 0ea04e5bc3..adbec6e35f 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java @@ -85,7 +85,7 @@ public class TransportConnection throw new AMQNoTransportForProtocolException(details); } - if (transport == _currentInstance) + /* if (transport == _currentInstance) { if (transport == VM) { @@ -100,21 +100,23 @@ public class TransportConnection } } - _currentInstance = transport; + _currentInstance = transport;*/ + ITransportConnection instance; switch (transport) { case SOCKET: - _instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory() - { - public IoConnector newSocketConnector() - { - return new ExistingSocketConnector(); - } - }); + instance = + new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory() + { + public IoConnector newSocketConnector() + { + return new ExistingSocketConnector(); + } + }); break; case TCP: - _instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory() + instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory() { public IoConnector newSocketConnector() { @@ -142,12 +144,14 @@ public class TransportConnection break; case VM: { - _instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker")); + instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker")); break; } + default: + throw new AMQNoTransportForProtocolException(details); } - return _instance; + return instance; } private static int getTransport(String transport) diff --git a/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java b/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java new file mode 100644 index 0000000000..bed80d5954 --- /dev/null +++ b/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java @@ -0,0 +1,528 @@ +package org.apache.mina.common; + +import org.apache.mina.common.ByteBuffer; + +import java.nio.*; + +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +public class FixedSizeByteBufferAllocator implements ByteBufferAllocator +{ + + + private static final int MINIMUM_CAPACITY = 1; + + public FixedSizeByteBufferAllocator () + { + } + + public ByteBuffer allocate( int capacity, boolean direct ) + { + java.nio.ByteBuffer nioBuffer; + if( direct ) + { + nioBuffer = java.nio.ByteBuffer.allocateDirect( capacity ); + } + else + { + nioBuffer = java.nio.ByteBuffer.allocate( capacity ); + } + return new FixedSizeByteBuffer( nioBuffer ); + } + + public ByteBuffer wrap( java.nio.ByteBuffer nioBuffer ) + { + return new FixedSizeByteBuffer( nioBuffer ); + } + + public void dispose() + { + } + + + + private static final class FixedSizeByteBuffer extends ByteBuffer + { + private java.nio.ByteBuffer buf; + private int refCount = 1; + private int mark = -1; + + + protected FixedSizeByteBuffer( java.nio.ByteBuffer buf ) + { + this.buf = buf; + buf.order( ByteOrder.BIG_ENDIAN ); + refCount = 1; + } + + public synchronized void acquire() + { + if( refCount <= 0 ) + { + throw new IllegalStateException( "Already released buffer." ); + } + + refCount ++; + } + + public void release() + { + synchronized( this ) + { + if( refCount <= 0 ) + { + refCount = 0; + throw new IllegalStateException( + "Already released buffer. You released the buffer too many times." ); + } + + refCount --; + if( refCount > 0) + { + return; + } + } + } + + public java.nio.ByteBuffer buf() + { + return buf; + } + + public boolean isPooled() + { + return false; + } + + public void setPooled( boolean pooled ) + { + } + + public ByteBuffer duplicate() { + return new FixedSizeByteBuffer( this.buf.duplicate() ); + } + + public ByteBuffer slice() { + return new FixedSizeByteBuffer( this.buf.slice() ); + } + + public ByteBuffer asReadOnlyBuffer() { + return new FixedSizeByteBuffer( this.buf.asReadOnlyBuffer() ); + } + + public byte[] array() + { + return buf.array(); + } + + public int arrayOffset() + { + return buf.arrayOffset(); + } + + public boolean isDirect() + { + return buf.isDirect(); + } + + public boolean isReadOnly() + { + return buf.isReadOnly(); + } + + public int capacity() + { + return buf.capacity(); + } + + public ByteBuffer capacity( int newCapacity ) + { + if( newCapacity > capacity() ) + { + // Allocate a new buffer and transfer all settings to it. + int pos = position(); + int limit = limit(); + ByteOrder bo = order(); + + capacity0( newCapacity ); + buf.limit( limit ); + if( mark >= 0 ) + { + buf.position( mark ); + buf.mark(); + } + buf.position( pos ); + buf.order( bo ); + } + + return this; + } + + protected void capacity0( int requestedCapacity ) + { + int newCapacity = MINIMUM_CAPACITY; + while( newCapacity < requestedCapacity ) + { + newCapacity <<= 1; + } + + java.nio.ByteBuffer oldBuf = this.buf; + java.nio.ByteBuffer newBuf; + if( isDirect() ) + { + newBuf = java.nio.ByteBuffer.allocateDirect( newCapacity ); + } + else + { + newBuf = java.nio.ByteBuffer.allocate( newCapacity ); + } + + newBuf.clear(); + oldBuf.clear(); + newBuf.put( oldBuf ); + this.buf = newBuf; + } + + + + public boolean isAutoExpand() + { + return false; + } + + public ByteBuffer setAutoExpand( boolean autoExpand ) + { + if(autoExpand) throw new IllegalArgumentException(); + else return this; + } + + public ByteBuffer expand( int pos, int expectedRemaining ) + { + int end = pos + expectedRemaining; + if( end > capacity() ) + { + // The buffer needs expansion. + capacity( end ); + } + + if( end > limit() ) + { + // We call limit() directly to prevent StackOverflowError + buf.limit( end ); + } + return this; + } + + public int position() + { + return buf.position(); + } + + public ByteBuffer position( int newPosition ) + { + + buf.position( newPosition ); + if( mark > newPosition ) + { + mark = -1; + } + return this; + } + + public int limit() + { + return buf.limit(); + } + + public ByteBuffer limit( int newLimit ) + { + buf.limit( newLimit ); + if( mark > newLimit ) + { + mark = -1; + } + return this; + } + + public ByteBuffer mark() + { + buf.mark(); + mark = position(); + return this; + } + + public int markValue() + { + return mark; + } + + public ByteBuffer reset() + { + buf.reset(); + return this; + } + + public ByteBuffer clear() + { + buf.clear(); + mark = -1; + return this; + } + + public ByteBuffer flip() + { + buf.flip(); + mark = -1; + return this; + } + + public ByteBuffer rewind() + { + buf.rewind(); + mark = -1; + return this; + } + + public byte get() + { + return buf.get(); + } + + public ByteBuffer put( byte b ) + { + buf.put( b ); + return this; + } + + public byte get( int index ) + { + return buf.get( index ); + } + + public ByteBuffer put( int index, byte b ) + { + buf.put( index, b ); + return this; + } + + public ByteBuffer get( byte[] dst, int offset, int length ) + { + buf.get( dst, offset, length ); + return this; + } + + public ByteBuffer put( java.nio.ByteBuffer src ) + { + buf.put( src ); + return this; + } + + public ByteBuffer put( byte[] src, int offset, int length ) + { + buf.put( src, offset, length ); + return this; + } + + public ByteBuffer compact() + { + buf.compact(); + mark = -1; + return this; + } + + public ByteOrder order() + { + return buf.order(); + } + + public ByteBuffer order( ByteOrder bo ) + { + buf.order( bo ); + return this; + } + + public char getChar() + { + return buf.getChar(); + } + + public ByteBuffer putChar( char value ) + { + buf.putChar( value ); + return this; + } + + public char getChar( int index ) + { + return buf.getChar( index ); + } + + public ByteBuffer putChar( int index, char value ) + { + buf.putChar( index, value ); + return this; + } + + public CharBuffer asCharBuffer() + { + return buf.asCharBuffer(); + } + + public short getShort() + { + return buf.getShort(); + } + + public ByteBuffer putShort( short value ) + { + buf.putShort( value ); + return this; + } + + public short getShort( int index ) + { + return buf.getShort( index ); + } + + public ByteBuffer putShort( int index, short value ) + { + buf.putShort( index, value ); + return this; + } + + public ShortBuffer asShortBuffer() + { + return buf.asShortBuffer(); + } + + public int getInt() + { + return buf.getInt(); + } + + public ByteBuffer putInt( int value ) + { + buf.putInt( value ); + return this; + } + + public int getInt( int index ) + { + return buf.getInt( index ); + } + + public ByteBuffer putInt( int index, int value ) + { + buf.putInt( index, value ); + return this; + } + + public IntBuffer asIntBuffer() + { + return buf.asIntBuffer(); + } + + public long getLong() + { + return buf.getLong(); + } + + public ByteBuffer putLong( long value ) + { + buf.putLong( value ); + return this; + } + + public long getLong( int index ) + { + return buf.getLong( index ); + } + + public ByteBuffer putLong( int index, long value ) + { + buf.putLong( index, value ); + return this; + } + + public LongBuffer asLongBuffer() + { + return buf.asLongBuffer(); + } + + public float getFloat() + { + return buf.getFloat(); + } + + public ByteBuffer putFloat( float value ) + { + buf.putFloat( value ); + return this; + } + + public float getFloat( int index ) + { + return buf.getFloat( index ); + } + + public ByteBuffer putFloat( int index, float value ) + { + buf.putFloat( index, value ); + return this; + } + + public FloatBuffer asFloatBuffer() + { + return buf.asFloatBuffer(); + } + + public double getDouble() + { + return buf.getDouble(); + } + + public ByteBuffer putDouble( double value ) + { + buf.putDouble( value ); + return this; + } + + public double getDouble( int index ) + { + return buf.getDouble( index ); + } + + public ByteBuffer putDouble( int index, double value ) + { + buf.putDouble( index, value ); + return this; + } + + public DoubleBuffer asDoubleBuffer() + { + return buf.asDoubleBuffer(); + } + + + } + + +} diff --git a/java/common/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java b/java/common/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java new file mode 100644 index 0000000000..09cf785108 --- /dev/null +++ b/java/common/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java @@ -0,0 +1,250 @@ +package org.apache.mina.common.support; + +import org.apache.mina.common.IoFuture; +import org.apache.mina.common.IoSession; +import org.apache.mina.common.IoFutureListener; + +import java.util.List; +import java.util.ArrayList; +import java.util.Iterator; + +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/** + * A default implementation of {@link org.apache.mina.common.IoFuture}. + * + * @author The Apache Directory Project (mina-dev@directory.apache.org) + * @version $Rev: 440259 $, $Date: 2006-09-05 14:01:47 +0900 (í™”, 05 9ì›” 2006) $ + */ +public class DefaultIoFuture implements IoFuture +{ + private final IoSession session; + private final Object lock; + private List listeners; + private Object result; + private boolean ready; + + + /** + * Creates a new instance. + * + * @param session an {@link IoSession} which is associated with this future + */ + public DefaultIoFuture( IoSession session ) + { + this.session = session; + this.lock = this; + } + + /** + * Creates a new instance which uses the specified object as a lock. + */ + public DefaultIoFuture( IoSession session, Object lock ) + { + if( lock == null ) + { + throw new NullPointerException( "lock" ); + } + this.session = session; + this.lock = lock; + } + + public IoSession getSession() + { + return session; + } + + public Object getLock() + { + return lock; + } + + public void join() + { + synchronized( lock ) + { + while( !ready ) + { + try + { + lock.wait(); + } + catch( InterruptedException e ) + { + } + } + } + } + + public boolean join( long timeoutInMillis ) + { + long startTime = ( timeoutInMillis <= 0 ) ? 0 : System + .currentTimeMillis(); + long waitTime = timeoutInMillis; + + synchronized( lock ) + { + if( ready ) + { + return ready; + } + else if( waitTime <= 0 ) + { + return ready; + } + + for( ;; ) + { + try + { + lock.wait( waitTime ); + } + catch( InterruptedException e ) + { + } + + if( ready ) + return true; + else + { + waitTime = timeoutInMillis - ( System.currentTimeMillis() - startTime ); + if( waitTime <= 0 ) + { + return ready; + } + } + } + } + } + + public boolean isReady() + { + synchronized( lock ) + { + return ready; + } + } + + /** + * Sets the result of the asynchronous operation, and mark it as finished. + */ + protected void setValue( Object newValue ) + { + synchronized( lock ) + { + // Allow only once. + if( ready ) + { + return; + } + + result = newValue; + ready = true; + lock.notifyAll(); + + notifyListeners(); + } + } + + /** + * Returns the result of the asynchronous operation. + */ + protected Object getValue() + { + synchronized( lock ) + { + return result; + } + } + + public void addListener( IoFutureListener listener ) + { + if( listener == null ) + { + throw new NullPointerException( "listener" ); + } + + synchronized( lock ) + { + if(listeners == null) + { + listeners = new ArrayList(); + } + listeners.add( listener ); + if( ready ) + { + listener.operationComplete( this ); + } + } + } + + public void removeListener( IoFutureListener listener ) + { + if( listener == null ) + { + throw new NullPointerException( "listener" ); + } + + synchronized( lock ) + { + listeners.remove( listener ); + } + } + + private void notifyListeners() + { + synchronized( lock ) + { + + if(listeners != null) + { + + for( Iterator i = listeners.iterator(); i.hasNext(); ) { + ( ( IoFutureListener ) i.next() ).operationComplete( this ); + } + } + } + } +} + + + diff --git a/java/common/src/main/java/org/apache/mina/filter/codec/QpidProtocolCodecFilter.java b/java/common/src/main/java/org/apache/mina/filter/codec/QpidProtocolCodecFilter.java new file mode 100644 index 0000000000..b8c6f29720 --- /dev/null +++ b/java/common/src/main/java/org/apache/mina/filter/codec/QpidProtocolCodecFilter.java @@ -0,0 +1,440 @@ +package org.apache.mina.filter.codec; + + +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ + +import org.apache.mina.common.*; +import org.apache.mina.common.support.DefaultWriteFuture; +import org.apache.mina.filter.codec.support.SimpleProtocolDecoderOutput; +import org.apache.mina.util.SessionLog; +import org.apache.mina.util.Queue; + + +public class QpidProtocolCodecFilter extends IoFilterAdapter +{ + public static final String ENCODER = QpidProtocolCodecFilter.class.getName() + ".encoder"; + public static final String DECODER = QpidProtocolCodecFilter.class.getName() + ".decoder"; + + private static final Class[] EMPTY_PARAMS = new Class[0]; + private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.wrap( new byte[0] ); + + private final ProtocolCodecFactory factory; + + public QpidProtocolCodecFilter( ProtocolCodecFactory factory ) + { + if( factory == null ) + { + throw new NullPointerException( "factory" ); + } + this.factory = factory; + } + + public QpidProtocolCodecFilter( final ProtocolEncoder encoder, final ProtocolDecoder decoder ) + { + if( encoder == null ) + { + throw new NullPointerException( "encoder" ); + } + if( decoder == null ) + { + throw new NullPointerException( "decoder" ); + } + + this.factory = new ProtocolCodecFactory() + { + public ProtocolEncoder getEncoder() + { + return encoder; + } + + public ProtocolDecoder getDecoder() + { + return decoder; + } + }; + } + + public QpidProtocolCodecFilter( final Class encoderClass, final Class decoderClass ) + { + if( encoderClass == null ) + { + throw new NullPointerException( "encoderClass" ); + } + if( decoderClass == null ) + { + throw new NullPointerException( "decoderClass" ); + } + if( !ProtocolEncoder.class.isAssignableFrom( encoderClass ) ) + { + throw new IllegalArgumentException( "encoderClass: " + encoderClass.getName() ); + } + if( !ProtocolDecoder.class.isAssignableFrom( decoderClass ) ) + { + throw new IllegalArgumentException( "decoderClass: " + decoderClass.getName() ); + } + try + { + encoderClass.getConstructor( EMPTY_PARAMS ); + } + catch( NoSuchMethodException e ) + { + throw new IllegalArgumentException( "encoderClass doesn't have a public default constructor." ); + } + try + { + decoderClass.getConstructor( EMPTY_PARAMS ); + } + catch( NoSuchMethodException e ) + { + throw new IllegalArgumentException( "decoderClass doesn't have a public default constructor." ); + } + + this.factory = new ProtocolCodecFactory() + { + public ProtocolEncoder getEncoder() throws Exception + { + return ( ProtocolEncoder ) encoderClass.newInstance(); + } + + public ProtocolDecoder getDecoder() throws Exception + { + return ( ProtocolDecoder ) decoderClass.newInstance(); + } + }; + } + + public void onPreAdd( IoFilterChain parent, String name, IoFilter.NextFilter nextFilter ) throws Exception + { + if( parent.contains( ProtocolCodecFilter.class ) ) + { + throw new IllegalStateException( "A filter chain cannot contain more than one QpidProtocolCodecFilter." ); + } + } + + public void messageReceived( IoFilter.NextFilter nextFilter, IoSession session, Object message ) throws Exception + { + if( !( message instanceof ByteBuffer ) ) + { + nextFilter.messageReceived( session, message ); + return; + } + + ByteBuffer in = ( ByteBuffer ) message; + ProtocolDecoder decoder = getDecoder( session ); + ProtocolDecoderOutput decoderOut = getDecoderOut( session, nextFilter ); + + try + { + decoder.decode( session, in, decoderOut ); + } + catch( Throwable t ) + { + ProtocolDecoderException pde; + if( t instanceof ProtocolDecoderException ) + { + pde = ( ProtocolDecoderException ) t; + } + else + { + pde = new ProtocolDecoderException( t ); + } + pde.setHexdump( in.getHexDump() ); + throw pde; + } + finally + { + // Dispose the decoder if this session is connectionless. + if( session.getTransportType().isConnectionless() ) + { + disposeDecoder( session ); + } + + // Release the read buffer. + in.release(); + + decoderOut.flush(); + } + } + + public void messageSent( IoFilter.NextFilter nextFilter, IoSession session, Object message ) throws Exception + { + if( message instanceof HiddenByteBuffer ) + { + return; + } + + if( !( message instanceof MessageByteBuffer ) ) + { + nextFilter.messageSent( session, message ); + return; + } + + nextFilter.messageSent( session, ( ( MessageByteBuffer ) message ).message ); + } + + public void filterWrite( IoFilter.NextFilter nextFilter, IoSession session, IoFilter.WriteRequest writeRequest ) throws Exception + { + Object message = writeRequest.getMessage(); + if( message instanceof ByteBuffer ) + { + nextFilter.filterWrite( session, writeRequest ); + return; + } + + ProtocolEncoder encoder = getEncoder( session ); + ProtocolEncoderOutputImpl encoderOut = getEncoderOut( session, nextFilter, writeRequest ); + + try + { + encoder.encode( session, message, encoderOut ); + encoderOut.flush(); + nextFilter.filterWrite( + session, + new IoFilter.WriteRequest( + new MessageByteBuffer( writeRequest.getMessage() ), + writeRequest.getFuture(), writeRequest.getDestination() ) ); + } + catch( Throwable t ) + { + ProtocolEncoderException pee; + if( t instanceof ProtocolEncoderException ) + { + pee = ( ProtocolEncoderException ) t; + } + else + { + pee = new ProtocolEncoderException( t ); + } + throw pee; + } + finally + { + // Dispose the encoder if this session is connectionless. + if( session.getTransportType().isConnectionless() ) + { + disposeEncoder( session ); + } + } + } + + public void sessionClosed( IoFilter.NextFilter nextFilter, IoSession session ) throws Exception + { + // Call finishDecode() first when a connection is closed. + ProtocolDecoder decoder = getDecoder( session ); + ProtocolDecoderOutput decoderOut = getDecoderOut( session, nextFilter ); + try + { + decoder.finishDecode( session, decoderOut ); + } + catch( Throwable t ) + { + ProtocolDecoderException pde; + if( t instanceof ProtocolDecoderException ) + { + pde = ( ProtocolDecoderException ) t; + } + else + { + pde = new ProtocolDecoderException( t ); + } + throw pde; + } + finally + { + // Dispose all. + disposeEncoder( session ); + disposeDecoder( session ); + + decoderOut.flush(); + } + + nextFilter.sessionClosed( session ); + } + + private ProtocolEncoder getEncoder( IoSession session ) throws Exception + { + ProtocolEncoder encoder = ( ProtocolEncoder ) session.getAttribute( ENCODER ); + if( encoder == null ) + { + encoder = factory.getEncoder(); + session.setAttribute( ENCODER, encoder ); + } + return encoder; + } + + private ProtocolEncoderOutputImpl getEncoderOut( IoSession session, IoFilter.NextFilter nextFilter, IoFilter.WriteRequest writeRequest ) + { + return new ProtocolEncoderOutputImpl( session, nextFilter, writeRequest ); + } + + private ProtocolDecoder getDecoder( IoSession session ) throws Exception + { + ProtocolDecoder decoder = ( ProtocolDecoder ) session.getAttribute( DECODER ); + if( decoder == null ) + { + decoder = factory.getDecoder(); + session.setAttribute( DECODER, decoder ); + } + return decoder; + } + + private ProtocolDecoderOutput getDecoderOut( IoSession session, IoFilter.NextFilter nextFilter ) + { + return new SimpleProtocolDecoderOutput( session, nextFilter ); + } + + private void disposeEncoder( IoSession session ) + { + ProtocolEncoder encoder = ( ProtocolEncoder ) session.removeAttribute( ENCODER ); + if( encoder == null ) + { + return; + } + + try + { + encoder.dispose( session ); + } + catch( Throwable t ) + { + SessionLog.warn( + session, + "Failed to dispose: " + encoder.getClass().getName() + + " (" + encoder + ')' ); + } + } + + private void disposeDecoder( IoSession session ) + { + ProtocolDecoder decoder = ( ProtocolDecoder ) session.removeAttribute( DECODER ); + if( decoder == null ) + { + return; + } + + try + { + decoder.dispose( session ); + } + catch( Throwable t ) + { + SessionLog.warn( + session, + "Falied to dispose: " + decoder.getClass().getName() + + " (" + decoder + ')' ); + } + } + + private static class HiddenByteBuffer extends ByteBufferProxy + { + private HiddenByteBuffer( ByteBuffer buf ) + { + super( buf ); + } + } + + private static class MessageByteBuffer extends ByteBufferProxy + { + private final Object message; + + private MessageByteBuffer( Object message ) + { + super( EMPTY_BUFFER ); + this.message = message; + } + + public void acquire() + { + // no-op since we are wraping a zero-byte buffer, this instance is to just curry the message + } + + public void release() + { + // no-op since we are wraping a zero-byte buffer, this instance is to just curry the message + } + } + + private static class ProtocolEncoderOutputImpl implements ProtocolEncoderOutput + { + private ByteBuffer buffer; + + private final IoSession session; + private final IoFilter.NextFilter nextFilter; + private final IoFilter.WriteRequest writeRequest; + + public ProtocolEncoderOutputImpl( IoSession session, IoFilter.NextFilter nextFilter, IoFilter.WriteRequest writeRequest ) + { + this.session = session; + this.nextFilter = nextFilter; + this.writeRequest = writeRequest; + } + + + + public void write( ByteBuffer buf ) + { + if(buffer != null) + { + flush(); + } + buffer = buf; + } + + public void mergeAll() + { + } + + public WriteFuture flush() + { + WriteFuture future = null; + if( buffer == null ) + { + return null; + } + else + { + ByteBuffer buf = buffer; + // Flush only when the buffer has remaining. + if( buf.hasRemaining() ) + { + future = doFlush( buf ); + } + + } + + return future; + } + + + protected WriteFuture doFlush( ByteBuffer buf ) + { + WriteFuture future = new DefaultWriteFuture( session ); + nextFilter.filterWrite( + session, + new IoFilter.WriteRequest( + buf, + future, writeRequest.getDestination() ) ); + return future; + } + } +} + diff --git a/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java b/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java index ff0bc798da..7eef73f337 100644 --- a/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java +++ b/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java @@ -22,6 +22,7 @@ package org.apache.qpid.codec; import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.IoSession; +import org.apache.mina.common.SimpleByteBufferAllocator; import org.apache.mina.filter.codec.CumulativeProtocolDecoder; import org.apache.mina.filter.codec.ProtocolDecoderOutput; @@ -48,6 +49,9 @@ import org.apache.qpid.framing.ProtocolInitiation; */ public class AMQDecoder extends CumulativeProtocolDecoder { + + private static final String BUFFER = AMQDecoder.class.getName() + ".Buffer"; + /** Holds the 'normal' AMQP data decoder. */ private AMQDataBlockDecoder _dataBlockDecoder = new AMQDataBlockDecoder(); @@ -171,4 +175,97 @@ public class AMQDecoder extends CumulativeProtocolDecoder { _expectProtocolInitiation = expectProtocolInitiation; } + + + /** + * Cumulates content of <tt>in</tt> into internal buffer and forwards + * decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}. + * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt> + * and the cumulative buffer is compacted after decoding ends. + * + * @throws IllegalStateException if your <tt>doDecode()</tt> returned + * <tt>true</tt> not consuming the cumulative buffer. + */ + public void decode( IoSession session, ByteBuffer in, + ProtocolDecoderOutput out ) throws Exception + { + ByteBuffer buf = ( ByteBuffer ) session.getAttribute( BUFFER ); + // if we have a session buffer, append data to that otherwise + // use the buffer read from the network directly + if( buf != null ) + { + buf.put( in ); + buf.flip(); + } + else + { + buf = in; + } + + for( ;; ) + { + int oldPos = buf.position(); + boolean decoded = doDecode( session, buf, out ); + if( decoded ) + { + if( buf.position() == oldPos ) + { + throw new IllegalStateException( + "doDecode() can't return true when buffer is not consumed." ); + } + + if( !buf.hasRemaining() ) + { + break; + } + } + else + { + break; + } + } + + // if there is any data left that cannot be decoded, we store + // it in a buffer in the session and next time this decoder is + // invoked the session buffer gets appended to + if ( buf.hasRemaining() ) + { + storeRemainingInSession( buf, session ); + } + else + { + removeSessionBuffer( session ); + } + } + + /** + * Releases the cumulative buffer used by the specified <tt>session</tt>. + * Please don't forget to call <tt>super.dispose( session )</tt> when + * you override this method. + */ + public void dispose( IoSession session ) throws Exception + { + removeSessionBuffer( session ); + } + + private void removeSessionBuffer(IoSession session) + { + ByteBuffer buf = ( ByteBuffer ) session.getAttribute( BUFFER ); + if( buf != null ) + { + buf.release(); + session.removeAttribute( BUFFER ); + } + } + + private static final SimpleByteBufferAllocator SIMPLE_BYTE_BUFFER_ALLOCATOR = new SimpleByteBufferAllocator(); + + private void storeRemainingInSession(ByteBuffer buf, IoSession session) + { + ByteBuffer remainingBuf = SIMPLE_BYTE_BUFFER_ALLOCATOR.allocate( buf.remaining(), false ); + remainingBuf.setAutoExpand( true ); + remainingBuf.put( buf ); + session.setAttribute( BUFFER, remainingBuf ); + } + } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java index 3abd97ddb7..fcd336b180 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java @@ -21,6 +21,8 @@ package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import org.apache.qpid.AMQException; public interface AMQBody { @@ -36,4 +38,6 @@ public interface AMQBody //public void populateFromBuffer(ByteBuffer buffer, long size) // throws AMQFrameDecodingException, AMQProtocolVersionException; + + void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession) throws AMQException; } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java b/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java index 11f505fd4b..02a46f3748 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java @@ -27,7 +27,7 @@ public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock private final int _channel; private final AMQBody _bodyFrame; - + public static final byte FRAME_END_BYTE = (byte) 0xCE; public AMQFrame(final int channel, final AMQBody bodyFrame) @@ -47,13 +47,19 @@ public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock return 1 + 2 + 4 + _bodyFrame.getSize() + 1; } + public static final int getFrameOverhead() + { + return 1 + 2 + 4 + 1; + } + + public void writePayload(ByteBuffer buffer) { buffer.put(_bodyFrame.getFrameType()); EncodingUtils.writeUnsignedShort(buffer, _channel); EncodingUtils.writeUnsignedInteger(buffer, _bodyFrame.getSize()); _bodyFrame.writePayload(buffer); - buffer.put((byte) 0xCE); + buffer.put(FRAME_END_BYTE); } public final int getChannel() @@ -66,10 +72,54 @@ public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock return _bodyFrame; } - - public String toString() { return "Frame channelId: " + _channel + ", bodyFrame: " + String.valueOf(_bodyFrame); } + + public static void writeFrame(ByteBuffer buffer, final int channel, AMQBody body) + { + buffer.put(body.getFrameType()); + EncodingUtils.writeUnsignedShort(buffer, channel); + EncodingUtils.writeUnsignedInteger(buffer, body.getSize()); + body.writePayload(buffer); + buffer.put(FRAME_END_BYTE); + + } + + public static void writeFrames(ByteBuffer buffer, final int channel, AMQBody body1, AMQBody body2) + { + buffer.put(body1.getFrameType()); + EncodingUtils.writeUnsignedShort(buffer, channel); + EncodingUtils.writeUnsignedInteger(buffer, body1.getSize()); + body1.writePayload(buffer); + buffer.put(FRAME_END_BYTE); + buffer.put(body2.getFrameType()); + EncodingUtils.writeUnsignedShort(buffer, channel); + EncodingUtils.writeUnsignedInteger(buffer, body2.getSize()); + body2.writePayload(buffer); + buffer.put(FRAME_END_BYTE); + + } + + public static void writeFrames(ByteBuffer buffer, final int channel, AMQBody body1, AMQBody body2, AMQBody body3) + { + buffer.put(body1.getFrameType()); + EncodingUtils.writeUnsignedShort(buffer, channel); + EncodingUtils.writeUnsignedInteger(buffer, body1.getSize()); + body1.writePayload(buffer); + buffer.put(FRAME_END_BYTE); + buffer.put(body2.getFrameType()); + EncodingUtils.writeUnsignedShort(buffer, channel); + EncodingUtils.writeUnsignedInteger(buffer, body2.getSize()); + body2.writePayload(buffer); + buffer.put(FRAME_END_BYTE); + buffer.put(body3.getFrameType()); + EncodingUtils.writeUnsignedShort(buffer, channel); + EncodingUtils.writeUnsignedInteger(buffer, body3.getSize()); + body3.writePayload(buffer); + buffer.put(FRAME_END_BYTE); + + } + } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java index 5215bcbd66..64af717342 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java @@ -24,7 +24,9 @@ package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQConnectionException;
+import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
public abstract class AMQMethodBodyImpl implements AMQMethodBody
{
@@ -86,4 +88,9 @@ public abstract class AMQMethodBodyImpl implements AMQMethodBody return new AMQConnectionException(code, message, getClazz(), getMethod(), getMajor(), getMinor(), cause);
}
+ public void handle(final int channelId, final AMQVersionAwareProtocolSession session) throws AMQException
+ {
+ session.methodFrameReceived(channelId, this);
+ }
+
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java index cf64b0475a..07e2faaf7e 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java @@ -26,8 +26,7 @@ import org.apache.mina.common.ByteBuffer; import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Map;
-import java.util.WeakHashMap;
+import java.util.*;
import java.lang.ref.WeakReference;
/**
@@ -38,6 +37,62 @@ import java.lang.ref.WeakReference; */
public final class AMQShortString implements CharSequence, Comparable<AMQShortString>
{
+ private static final byte MINUS = (byte)'-';
+ private static final byte ZERO = (byte) '0';
+
+
+
+ private final class TokenizerImpl implements AMQShortStringTokenizer
+ {
+ private final byte _delim;
+ private int _count = -1;
+ private int _pos = 0;
+
+ public TokenizerImpl(final byte delim)
+ {
+ _delim = delim;
+ }
+
+ public int countTokens()
+ {
+ if(_count == -1)
+ {
+ _count = 1 + AMQShortString.this.occurences(_delim);
+ }
+ return _count;
+ }
+
+ public AMQShortString nextToken()
+ {
+ if(_pos <= AMQShortString.this.length())
+ {
+ int nextDelim = AMQShortString.this.indexOf(_delim, _pos);
+ if(nextDelim == -1)
+ {
+ nextDelim = AMQShortString.this.length();
+ }
+
+ AMQShortString nextToken = AMQShortString.this.substring(_pos, nextDelim++);
+ _pos = nextDelim;
+ return nextToken;
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public boolean hasMoreTokens()
+ {
+ return _pos <= AMQShortString.this.length();
+ }
+ }
+
+ private AMQShortString substring(final int from, final int to)
+ {
+ return new AMQShortString(_data, from, to);
+ }
+
private static final ThreadLocal<Map<AMQShortString, WeakReference<AMQShortString>>> _localInternMap =
new ThreadLocal<Map<AMQShortString, WeakReference<AMQShortString>>>()
@@ -53,7 +108,8 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt private static final Logger _logger = LoggerFactory.getLogger(AMQShortString.class);
- private final ByteBuffer _data;
+ private final byte[] _data;
+ private final int _offset;
private int _hashCode;
private final int _length;
private static final char[] EMPTY_CHAR_ARRAY = new char[0];
@@ -63,17 +119,25 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt public AMQShortString(byte[] data)
{
- _data = ByteBuffer.wrap(data);
+ _data = data.clone();
_length = data.length;
+ _offset = 0;
+ }
+
+ public AMQShortString(byte[] data, int pos)
+ {
+ final int size = data[pos++];
+ final byte[] dataCopy = new byte[size];
+ System.arraycopy(data,pos,dataCopy,0,size);
+ _length = size;
+ _data = dataCopy;
+ _offset = 0;
}
public AMQShortString(String data)
{
this((data == null) ? EMPTY_CHAR_ARRAY : data.toCharArray());
- if (data != null)
- {
- _hashCode = data.hashCode();
- }
+
}
public AMQShortString(char[] data)
@@ -85,14 +149,17 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt final int length = data.length;
final byte[] stringBytes = new byte[length];
+ int hash = 0;
for (int i = 0; i < length; i++)
{
stringBytes[i] = (byte) (0xFF & data[i]);
+ hash = (31 * hash) + stringBytes[i];
}
+ _hashCode = hash;
+ _data = stringBytes;
- _data = ByteBuffer.wrap(stringBytes);
- _data.rewind();
_length = length;
+ _offset = 0;
}
@@ -108,20 +175,31 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt }
- _data = ByteBuffer.wrap(stringBytes);
- _data.rewind();
+ _data = stringBytes;
_hashCode = hash;
_length = length;
+ _offset = 0;
}
- private AMQShortString(ByteBuffer data)
+ private AMQShortString(ByteBuffer data, final int length)
{
- _data = data;
- _length = data.limit();
+ byte[] dataBytes = new byte[length];
+ data.get(dataBytes);
+ _data = dataBytes;
+ _length = length;
+ _offset = 0;
+
+ }
+ private AMQShortString(final byte[] data, final int from, final int to)
+ {
+ _offset = from;
+ _length = to - from;
+ _data = data;
}
+
/**
* Get the length of the short string
* @return length of the underlying byte array
@@ -134,7 +212,7 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt public char charAt(int index)
{
- return (char) _data.get(index);
+ return (char) _data[_offset + index];
}
@@ -146,27 +224,24 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt public int writeToByteArray(byte[] encoding, int pos)
{
final int size = length();
- encoding[pos++] = (byte) length();
- for (int i = 0; i < size; i++)
- {
- encoding[pos++] = _data.get(i);
- }
-
- return pos;
+ encoding[pos++] = (byte) size;
+ System.arraycopy(_data,_offset,encoding,pos,size);
+ return pos+size;
}
public static AMQShortString readFromByteArray(byte[] byteEncodedDestination, int pos)
{
- final byte len = byteEncodedDestination[pos];
- if (len == 0)
+
+ final AMQShortString shortString = new AMQShortString(byteEncodedDestination, pos);
+ if(shortString.length() == 0)
{
return null;
}
-
- ByteBuffer data = ByteBuffer.wrap(byteEncodedDestination, pos + 1, len).slice();
-
- return new AMQShortString(data);
+ else
+ {
+ return shortString;
+ }
}
public static AMQShortString readFromBuffer(ByteBuffer buffer)
@@ -178,90 +253,59 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt }
else
{
- ByteBuffer data = buffer.slice();
- data.limit(length);
- data.rewind();
- buffer.skip(length);
- return new AMQShortString(data);
+ return new AMQShortString(buffer, length);
}
}
public byte[] getBytes()
{
-
- if (_data.buf().hasArray() && (_data.arrayOffset() == 0))
+ if(_offset == 0 && _length == _data.length)
{
- return _data.array();
+ return _data.clone();
}
else
{
- final int size = length();
- byte[] b = new byte[size];
- ByteBuffer buf = _data.duplicate();
- buf.rewind();
- buf.get(b);
-
- return b;
+ byte[] data = new byte[_length];
+ System.arraycopy(_data,_offset,data,0,_length);
+ return data;
}
-
}
public void writeToBuffer(ByteBuffer buffer)
{
final int size = length();
- if (size != 0)
- {
-
- buffer.setAutoExpand(true);
- buffer.put((byte) size);
- if (_data.buf().hasArray())
- {
- buffer.put(_data.array(), _data.arrayOffset(), length());
- }
- else
- {
-
- for (int i = 0; i < size; i++)
- {
-
- buffer.put(_data.get(i));
- }
- }
- }
- else
- {
- // really writing out unsigned byte
- buffer.put((byte) 0);
- }
+ //buffer.setAutoExpand(true);
+ buffer.put((byte) size);
+ buffer.put(_data, _offset, size);
}
private final class CharSubSequence implements CharSequence
{
- private final int _offset;
+ private final int _sequenceOffset;
private final int _end;
public CharSubSequence(final int offset, final int end)
{
- _offset = offset;
+ _sequenceOffset = offset;
_end = end;
}
public int length()
{
- return _end - _offset;
+ return _end - _sequenceOffset;
}
public char charAt(int index)
{
- return AMQShortString.this.charAt(index + _offset);
+ return AMQShortString.this.charAt(index + _sequenceOffset);
}
public CharSequence subSequence(int start, int end)
{
- return new CharSubSequence(start + _offset, end + _offset);
+ return new CharSubSequence(start + _sequenceOffset, end + _sequenceOffset);
}
}
@@ -272,7 +316,7 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt for (int i = 0; i < size; i++)
{
- chars[i] = (char) _data.get(i);
+ chars[i] = (char) _data[i + _offset];
}
return chars;
@@ -285,6 +329,17 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt public boolean equals(Object o)
{
+
+
+ if(o instanceof AMQShortString)
+ {
+ return equals((AMQShortString)o);
+ }
+ if(o instanceof CharSequence)
+ {
+ return equals((CharSequence)o);
+ }
+
if (o == null)
{
return false;
@@ -295,26 +350,40 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt return true;
}
- if (o instanceof AMQShortString)
- {
- final AMQShortString otherString = (AMQShortString) o;
+ return false;
- if ((_hashCode != 0) && (otherString._hashCode != 0) && (_hashCode != otherString._hashCode))
- {
- return false;
- }
+ }
+
+ public boolean equals(final AMQShortString otherString)
+ {
+ if (otherString == this)
+ {
+ return true;
+ }
- return _data.equals(otherString._data);
+ if (otherString == null)
+ {
+ return false;
+ }
+ if ((_hashCode != 0) && (otherString._hashCode != 0) && (_hashCode != otherString._hashCode))
+ {
+ return false;
}
- return (o instanceof CharSequence) && equals((CharSequence) o);
+ return (_offset == 0 && otherString._offset == 0 && _length == _data.length && otherString._length == otherString._data.length && Arrays.equals(_data,otherString._data))
+ || Arrays.equals(getBytes(),otherString.getBytes());
}
public boolean equals(CharSequence s)
{
+ if(s instanceof AMQShortString)
+ {
+ return equals((AMQShortString)s);
+ }
+
if (s == null)
{
return false;
@@ -345,7 +414,7 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt for (int i = 0; i < size; i++)
{
- hash = (31 * hash) + _data.get(i);
+ hash = (31 * hash) + _data[i+_offset];
}
_hashCode = hash;
@@ -380,8 +449,8 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt for (int i = 0; i < length(); i++)
{
- final byte d = _data.get(i);
- final byte n = name._data.get(i);
+ final byte d = _data[i+_offset];
+ final byte n = name._data[i+name._offset];
if (d < n)
{
return -1;
@@ -398,6 +467,12 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt }
+ public AMQShortStringTokenizer tokenize(byte delim)
+ {
+ return new TokenizerImpl(delim);
+ }
+
+
public AMQShortString intern()
{
@@ -435,4 +510,111 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt return internString;
}
+
+ private int occurences(final byte delim)
+ {
+ int count = 0;
+ final int end = _offset + _length;
+ for(int i = _offset ; i < end ; i++ )
+ {
+ if(_data[i] == delim)
+ {
+ count++;
+ }
+ }
+ return count;
+ }
+
+ private int indexOf(final byte val, final int pos)
+ {
+
+ for(int i = pos; i < length(); i++)
+ {
+ if(_data[_offset+i] == val)
+ {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+
+ public static AMQShortString join(final Collection<AMQShortString> terms,
+ final AMQShortString delim)
+ {
+ if(terms.size() == 0)
+ {
+ return EMPTY_STRING;
+ }
+
+ int size = delim.length() * (terms.size() - 1);
+ for(AMQShortString term : terms)
+ {
+ size += term.length();
+ }
+
+ byte[] data = new byte[size];
+ int pos = 0;
+ final byte[] delimData = delim._data;
+ final int delimOffset = delim._offset;
+ final int delimLength = delim._length;
+
+
+ for(AMQShortString term : terms)
+ {
+
+ if(pos!=0)
+ {
+ System.arraycopy(delimData, delimOffset,data,pos, delimLength);
+ pos+=delimLength;
+ }
+ System.arraycopy(term._data,term._offset,data,pos,term._length);
+ pos+=term._length;
+ }
+
+
+
+ return new AMQShortString(data,0,size);
+ }
+
+ public int toIntValue()
+ {
+ int pos = 0;
+ int val = 0;
+
+
+ boolean isNegative = (_data[pos] == MINUS);
+ if(isNegative)
+ {
+ pos++;
+ }
+ while(pos < _length)
+ {
+ int digit = (int) (_data[pos++] - ZERO);
+ if((digit < 0) || (digit > 9))
+ {
+ throw new NumberFormatException("\""+toString()+"\" is not a valid number");
+ }
+ val = val * 10;
+ val += digit;
+ }
+ if(isNegative)
+ {
+ val = val * -1;
+ }
+ return val;
+ }
+
+ public boolean contains(final byte b)
+ {
+ for(int i = 0; i < _length; i++)
+ {
+ if(_data[i] == b)
+ {
+ return true;
+ }
+ }
+ return false; //To change body of created methods use File | Settings | File Templates.
+ }
+
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQShortStringTokenizer.java b/java/common/src/main/java/org/apache/qpid/framing/AMQShortStringTokenizer.java new file mode 100644 index 0000000000..e2db8906a1 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQShortStringTokenizer.java @@ -0,0 +1,31 @@ +package org.apache.qpid.framing; + +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +public interface AMQShortStringTokenizer +{ + + public int countTokens(); + + public AMQShortString nextToken(); + + boolean hasMoreTokens(); +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java b/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java index 7b6699b783..94030f383e 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java +++ b/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java @@ -24,7 +24,6 @@ import org.apache.mina.common.ByteBuffer; public class CompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQDataBlock { - private AMQDataBlock _firstFrame; private AMQDataBlock[] _blocks; @@ -33,27 +32,12 @@ public class CompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQD _blocks = blocks; } - /** - * The encoded block will be logically first before the AMQDataBlocks which are encoded - * into the buffer afterwards. - * @param encodedBlock already-encoded data - * @param blocks some blocks to be encoded. - */ - public CompositeAMQDataBlock(AMQDataBlock encodedBlock, AMQDataBlock[] blocks) - { - this(blocks); - _firstFrame = encodedBlock; - } public AMQDataBlock[] getBlocks() { return _blocks; } - public AMQDataBlock getFirstFrame() - { - return _firstFrame; - } public long getSize() { @@ -62,19 +46,11 @@ public class CompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQD { frameSize += _blocks[i].getSize(); } - if (_firstFrame != null) - { - frameSize += _firstFrame.getSize(); - } return frameSize; } public void writePayload(ByteBuffer buffer) { - if (_firstFrame != null) - { - _firstFrame.writePayload(buffer); - } for (int i = 0; i < _blocks.length; i++) { _blocks[i].writePayload(buffer); @@ -90,7 +66,7 @@ public class CompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQD else { StringBuilder buf = new StringBuilder(this.getClass().getName()); - buf.append("{encodedBlock=").append(_firstFrame); + buf.append("{"); for (int i = 0 ; i < _blocks.length; i++) { buf.append(" ").append(i).append("=[").append(_blocks[i].toString()).append("]"); diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java index cbee1680f7..969df954ce 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java @@ -21,6 +21,8 @@ package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import org.apache.qpid.AMQException; public class ContentBody implements AMQBody { @@ -68,6 +70,12 @@ public class ContentBody implements AMQBody } } + public void handle(final int channelId, final AMQVersionAwareProtocolSession session) + throws AMQException + { + session.contentBodyReceived(channelId, this); + } + protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException { if (size > 0) diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java index 80a61544b3..83e5a7e341 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java @@ -21,6 +21,8 @@ package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import org.apache.qpid.AMQException; public class ContentHeaderBody implements AMQBody { @@ -110,6 +112,12 @@ public class ContentHeaderBody implements AMQBody properties.writePropertyListPayload(buffer); } + public void handle(final int channelId, final AMQVersionAwareProtocolSession session) + throws AMQException + { + session.contentHeaderReceived(channelId, this); + } + public static AMQFrame createAMQFrame(int channelId, int classId, int weight, BasicContentHeaderProperties properties, long bodySize) { diff --git a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java index ef7163bd40..15a43345b5 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java @@ -21,6 +21,8 @@ package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import org.apache.qpid.AMQException; public class HeartbeatBody implements AMQBody { @@ -55,6 +57,12 @@ public class HeartbeatBody implements AMQBody { } + public void handle(final int channelId, final AMQVersionAwareProtocolSession session) + throws AMQException + { + session.heartbeatBodyReceived(channelId, this); + } + protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException { if(size > 0) diff --git a/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java b/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java index d8b6b25b92..d7194640d4 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java +++ b/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java @@ -70,7 +70,7 @@ public class MethodConverter_0_9 extends AbstractMethodConverter implements Prot return new MethodConverter_0_9.MessagePublishInfoImpl(exchange,
publishBody.getImmediate(),
publishBody.getMandatory(),
- routingKey == null ? null : routingKey.intern());
+ routingKey);
}
diff --git a/java/common/src/main/java/org/apache/qpid/pool/Job.java b/java/common/src/main/java/org/apache/qpid/pool/Job.java index ba3c5d03fa..b2a09ac592 100644 --- a/java/common/src/main/java/org/apache/qpid/pool/Job.java +++ b/java/common/src/main/java/org/apache/qpid/pool/Job.java @@ -94,21 +94,23 @@ public class Job implements Runnable /** * Sequentially processes, up to the maximum number per job, the aggregated continuations in enqueued in this job. */ - void processAll() + boolean processAll() { // limit the number of events processed in one run - for (int i = 0; i < _maxEvents; i++) + int i = _maxEvents; + while( --i != 0 ) { Event e = _eventQueue.poll(); if (e == null) { - break; + return true; } else { e.process(_session); } } + return false; } /** @@ -144,9 +146,15 @@ public class Job implements Runnable */ public void run() { - processAll(); - deactivate(); - _completionHandler.completed(_session, this); + if(processAll()) + { + deactivate(); + _completionHandler.completed(_session, this); + } + else + { + _completionHandler.notCompleted(_session, this); + } } /** @@ -158,5 +166,7 @@ public class Job implements Runnable static interface JobCompletionHandler { public void completed(IoSession session, Job job); + + public void notCompleted(final IoSession session, final Job job); } } diff --git a/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java b/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java index cbe08a192e..8352b5af77 100644 --- a/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java +++ b/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java @@ -29,8 +29,8 @@ import org.apache.qpid.pool.Event.CloseEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ExecutorService; /** * PoolingFilter, is a no-op pass through filter that hands all events down the Mina filter chain by default. As it @@ -84,9 +84,6 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo /** Used for debugging purposes. */ private static final Logger _logger = LoggerFactory.getLogger(PoolingFilter.class); - /** Holds a mapping from Mina sessions to batched jobs for execution. */ - private final ConcurrentMap<IoSession, Job> _jobs = new ConcurrentHashMap<IoSession, Job>(); - /** Holds the managed reference to obtain the executor for the batched jobs. */ private final ReferenceCountingExecutorService _poolReference; @@ -94,7 +91,9 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo private final String _name; /** Defines the maximum number of events that will be batched into a single job. */ - private final int _maxEvents = Integer.getInteger("amqj.server.read_write_pool.max_events", 10); + static final int MAX_JOB_EVENTS = Integer.getInteger("amqj.server.read_write_pool.max_events", 10); + + private final int _maxEvents; /** * Creates a named pooling filter, on the specified shared thread pool. @@ -102,10 +101,11 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo * @param refCountingPool The thread pool reference. * @param name The identifying name of the filter type. */ - public PoolingFilter(ReferenceCountingExecutorService refCountingPool, String name) + public PoolingFilter(ReferenceCountingExecutorService refCountingPool, String name, int maxEvents) { _poolReference = refCountingPool; _name = name; + _maxEvents = maxEvents; } /** @@ -160,20 +160,34 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo /** * Adds an {@link Event} to a {@link Job}, triggering the execution of the job if it is not already running. * - * @param session The Mina session to work in. + * @param job The job. * @param event The event to hand off asynchronously. */ - void fireAsynchEvent(IoSession session, Event event) + void fireAsynchEvent(Job job, Event event) { - Job job = getJobForSession(session); + // job.acquire(); //prevents this job being removed from _jobs job.add(event); - // Additional checks on pool to check that it hasn't shutdown. - // The alternative is to catch the RejectedExecutionException that will result from executing on a shutdown pool - if (job.activate() && (_poolReference.getPool() != null) && !_poolReference.getPool().isShutdown()) + final ExecutorService pool = _poolReference.getPool(); + + if(pool == null) { - _poolReference.getPool().execute(job); + return; + } + + // rather than perform additional checks on pool to check that it hasn't shutdown. + // catch the RejectedExecutionException that will result from executing on a shutdown pool + if (job.activate()) + { + try + { + pool.execute(job); + } + catch(RejectedExecutionException e) + { + _logger.warn("Thread pool shutdown while tasks still outstanding"); + } } } @@ -186,7 +200,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo */ public void createNewJobForSession(IoSession session) { - Job job = new Job(session, this, _maxEvents); + Job job = new Job(session, this, MAX_JOB_EVENTS); session.setAttribute(_name, job); } @@ -197,7 +211,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo * * @return The Job for this filter to place asynchronous events into. */ - private Job getJobForSession(IoSession session) + public Job getJobForSession(IoSession session) { return (Job) session.getAttribute(_name); } @@ -233,17 +247,57 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo // } // } // else + + if (!job.isComplete()) { + final ExecutorService pool = _poolReference.getPool(); + + if(pool == null) + { + return; + } + + // ritchiem : 2006-12-13 Do we need to perform the additional checks here? // Can the pool be shutdown at this point? - if (job.activate() && (_poolReference.getPool() != null) && !_poolReference.getPool().isShutdown()) + if (job.activate()) { - _poolReference.getPool().execute(job); + try + { + pool.execute(job); + } + catch(RejectedExecutionException e) + { + _logger.warn("Thread pool shutdown while tasks still outstanding"); + } + } } } + public void notCompleted(IoSession session, Job job) + { + final ExecutorService pool = _poolReference.getPool(); + + if(pool == null) + { + return; + } + + try + { + pool.execute(job); + } + catch(RejectedExecutionException e) + { + _logger.warn("Thread pool shutdown while tasks still outstanding"); + } + + } + + + /** * No-op pass through filter to the next filter in the chain. * @@ -400,7 +454,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo */ public AsynchReadPoolingFilter(ReferenceCountingExecutorService refCountingPool, String name) { - super(refCountingPool, name); + super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_read_events", MAX_JOB_EVENTS)); } /** @@ -412,8 +466,8 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo */ public void messageReceived(NextFilter nextFilter, final IoSession session, Object message) { - - fireAsynchEvent(session, new Event.ReceivedEvent(nextFilter, message)); + Job job = getJobForSession(session); + fireAsynchEvent(job, new Event.ReceivedEvent(nextFilter, message)); } /** @@ -424,7 +478,8 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo */ public void sessionClosed(final NextFilter nextFilter, final IoSession session) { - fireAsynchEvent(session, new CloseEvent(nextFilter)); + Job job = getJobForSession(session); + fireAsynchEvent(job, new CloseEvent(nextFilter)); } } @@ -442,7 +497,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo */ public AsynchWritePoolingFilter(ReferenceCountingExecutorService refCountingPool, String name) { - super(refCountingPool, name); + super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_write_events", MAX_JOB_EVENTS)); } /** @@ -454,7 +509,8 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo */ public void filterWrite(final NextFilter nextFilter, final IoSession session, final WriteRequest writeRequest) { - fireAsynchEvent(session, new Event.WriteEvent(nextFilter, writeRequest)); + Job job = getJobForSession(session); + fireAsynchEvent(job, new Event.WriteEvent(nextFilter, writeRequest)); } /** @@ -465,7 +521,8 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo */ public void sessionClosed(final NextFilter nextFilter, final IoSession session) { - fireAsynchEvent(session, new CloseEvent(nextFilter)); + Job job = getJobForSession(session); + fireAsynchEvent(job, new CloseEvent(nextFilter)); } } } diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java index 2fbeeda1d4..5a7679a972 100644 --- a/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java +++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java @@ -21,6 +21,7 @@ package org.apache.qpid.protocol; import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.AMQException; /** * AMQMethodListener is a listener that receives notifications of AMQP methods. The methods are packaged as events in @@ -57,7 +58,7 @@ public interface AMQMethodListener * * @todo Consider narrowing the exception. */ - <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws Exception; + <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws AMQException; /** * Notifies the listener of an error on the event context to which it is listening. The listener should perform diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java index 035645aad2..b56a05f725 100644 --- a/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java +++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java @@ -20,8 +20,8 @@ */
package org.apache.qpid.protocol;
-import org.apache.qpid.framing.VersionSpecificRegistry;
-import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.AMQException;
/**
* AMQVersionAwareProtocolSession is implemented by all AMQP session classes, that need to provide an awareness to
@@ -46,4 +46,12 @@ public interface AMQVersionAwareProtocolSession extends AMQProtocolWriter, Proto // public VersionSpecificRegistry getRegistry();
MethodRegistry getMethodRegistry();
+
+
+ public void methodFrameReceived(int channelId, AMQMethodBody body) throws AMQException;
+ public void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException;
+ public void contentBodyReceived(int channelId, ContentBody body) throws AMQException;
+ public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException;
+
+
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java b/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java index bb9b201e5d..5fbea5e14f 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java @@ -24,20 +24,15 @@ import junit.framework.TestCase; import org.apache.log4j.Logger; -import org.apache.mina.common.IoSession; - import org.apache.qpid.AMQException; import org.apache.qpid.codec.AMQCodecFactory; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.SkeletonMessageStore; -import org.apache.qpid.server.virtualhost.VirtualHost; import javax.management.JMException; @@ -62,7 +57,7 @@ public class AMQProtocolSessionMBeanTest extends TestCase AMQQueue queue = new org.apache.qpid.server.queue.AMQQueue(new AMQShortString("testQueue_" + System.currentTimeMillis()), false, new AMQShortString("test"), true, _protocolSession.getVirtualHost()); - AMQChannel channel = new AMQChannel(_protocolSession, 2, _messageStore, null); + AMQChannel channel = new AMQChannel(_protocolSession, 2, _messageStore); channel.setDefaultQueue(queue); _protocolSession.addChannel(channel); channelCount = _mbean.channels().size(); @@ -73,7 +68,7 @@ public class AMQProtocolSessionMBeanTest extends TestCase assertTrue(_mbean.getMaximumNumberOfChannels() == 1000L); // check APIs - AMQChannel channel3 = new AMQChannel(_protocolSession, 3, _messageStore, null); + AMQChannel channel3 = new AMQChannel(_protocolSession, 3, _messageStore); channel3.setLocalTransactional(); _protocolSession.addChannel(channel3); _mbean.rollbackTransactions(2); @@ -93,14 +88,14 @@ public class AMQProtocolSessionMBeanTest extends TestCase } // check if closing of session works - _protocolSession.addChannel(new AMQChannel(_protocolSession, 5, _messageStore, null)); + _protocolSession.addChannel(new AMQChannel(_protocolSession, 5, _messageStore)); _mbean.closeConnection(); try { channelCount = _mbean.channels().size(); assertTrue(channelCount == 0); // session is now closed so adding another channel should throw an exception - _protocolSession.addChannel(new AMQChannel(_protocolSession, 6, _messageStore, null)); + _protocolSession.addChannel(new AMQChannel(_protocolSession, 6, _messageStore)); fail(); } catch (AMQException ex) @@ -119,7 +114,7 @@ public class AMQProtocolSessionMBeanTest extends TestCase new AMQMinaProtocolSession(new MockIoSession(), appRegistry.getVirtualHostRegistry(), new AMQCodecFactory(true), null); _protocolSession.setVirtualHost(appRegistry.getVirtualHostRegistry().getVirtualHost("test")); - _channel = new AMQChannel(_protocolSession, 1, _messageStore, null); + _channel = new AMQChannel(_protocolSession, 1, _messageStore); _protocolSession.addChannel(_channel); _mbean = (AMQProtocolSessionMBean) _protocolSession.getManagedObject(); } diff --git a/java/systests/src/main/java/org/apache/qpid/server/protocol/MaxChannelsTest.java b/java/systests/src/main/java/org/apache/qpid/server/protocol/MaxChannelsTest.java index ae02c1c28c..5adfebbffb 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/protocol/MaxChannelsTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/protocol/MaxChannelsTest.java @@ -21,22 +21,12 @@ package org.apache.qpid.server.protocol; import junit.framework.TestCase; -import org.apache.mina.common.IoSession; import org.apache.qpid.codec.AMQCodecFactory; import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.SkeletonMessageStore; import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.framing.AMQShortString; - -import javax.management.JMException; /** Test class to test MBean operations for AMQMinaProtocolSession. */ public class MaxChannelsTest extends TestCase @@ -65,7 +55,7 @@ public class MaxChannelsTest extends TestCase { for (long currentChannel = 0L; currentChannel < maxChannels; currentChannel++) { - _protocolSession.addChannel(new AMQChannel(_protocolSession, (int) currentChannel, null, null)); + _protocolSession.addChannel(new AMQChannel(_protocolSession, (int) currentChannel, null)); } } catch (AMQException e) diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java index 96be579d2a..95ffb505fb 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java @@ -24,7 +24,6 @@ import junit.framework.TestCase; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.abstraction.MessagePublishInfo; @@ -37,7 +36,6 @@ import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; -import org.apache.qpid.server.util.TestApplicationRegistry; import org.apache.qpid.server.util.NullApplicationRegistry; import java.util.LinkedList; @@ -77,7 +75,7 @@ public class AckTest extends TestCase super.setUp(); _messageStore = new TestableMemoryMessageStore(); _protocolSession = new MockProtocolSession(_messageStore); - _channel = new AMQChannel(_protocolSession,5, _messageStore, null/*dont need exchange registry*/); + _channel = new AMQChannel(_protocolSession,5, _messageStore /*dont need exchange registry*/); _protocolSession.addChannel(_channel); _subscriptionManager = new SubscriptionSet(); diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java b/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java index fcee3c7de4..cf986e7803 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java @@ -190,6 +190,26 @@ public class MockProtocolSession implements AMQProtocolSession return null; //To change body of implemented methods use File | Settings | File Templates. } + public void methodFrameReceived(int channelId, AMQMethodBody body) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void contentHeaderReceived(int channelId, ContentHeaderBody body) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void contentBodyReceived(int channelId, ContentBody body) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void heartbeatBodyReceived(int channelId, HeartbeatBody body) + { + //To change body of implemented methods use File | Settings | File Templates. + } + public MethodDispatcher getMethodDispatcher() { return null; //To change body of implemented methods use File | Settings | File Templates. |