summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java27
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/Main.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java343
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java15
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java153
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java36
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java11
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java75
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java25
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java113
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java81
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java10
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java3
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java2
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java111
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java206
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java63
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java12
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java54
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java53
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java8
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java93
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java123
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java74
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java17
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java28
-rw-r--r--java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java528
-rw-r--r--java/common/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java250
-rw-r--r--java/common/src/main/java/org/apache/mina/filter/codec/QpidProtocolCodecFilter.java440
-rw-r--r--java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java97
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQBody.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java58
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java7
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java356
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQShortStringTokenizer.java31
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java26
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ContentBody.java8
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java8
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java8
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/pool/Job.java22
-rw-r--r--java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java107
-rw-r--r--java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java3
-rw-r--r--java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java12
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java15
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/protocol/MaxChannelsTest.java12
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java4
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java20
58 files changed, 3067 insertions, 737 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 10184a79e5..3cb50d1d12 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -75,7 +75,7 @@ public class AMQChannel
* The delivery tag is unique per channel. This is pre-incremented before putting into the deliver frame so that
* value of this represents the <b>last</b> tag sent out
*/
- private AtomicLong _deliveryTag = new AtomicLong(0);
+ private long _deliveryTag = 0;
/** A channel has a default queue (the last declared) that is used when no queue name is explictily set */
private AMQQueue _defaultQueue;
@@ -99,8 +99,6 @@ public class AMQChannel
private final AtomicBoolean _suspended = new AtomicBoolean(false);
- private final MessageRouter _exchanges;
-
private TransactionalContext _txnContext, _nonTransactedContext;
/**
@@ -124,7 +122,7 @@ public class AMQChannel
public boolean ENABLE_JMSXUserID;
- public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore, MessageRouter exchanges)
+ public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore)
throws AMQException
{
//Set values from configuration
@@ -136,7 +134,7 @@ public class AMQChannel
_prefetch_HighWaterMark = DEFAULT_PREFETCH;
_prefetch_LowWaterMark = _prefetch_HighWaterMark / 2;
_messageStore = messageStore;
- _exchanges = exchanges;
+
// by default the session is non-transactional
_txnContext = new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks);
}
@@ -297,7 +295,7 @@ public class AMQChannel
public long getNextDeliveryTag()
{
- return _deliveryTag.incrementAndGet();
+ return ++_deliveryTag;
}
public int getNextConsumerTag()
@@ -969,16 +967,19 @@ public class AMQChannel
public void processReturns(AMQProtocolSession session) throws AMQException
{
- for (RequiredDeliveryException bouncedMessage : _returnMessages)
+ if(!_returnMessages.isEmpty())
{
- AMQMessage message = bouncedMessage.getAMQMessage();
- session.getProtocolOutputConverter().writeReturn(message, _channelId, bouncedMessage.getReplyCode().getCode(),
- new AMQShortString(bouncedMessage.getMessage()));
+ for (RequiredDeliveryException bouncedMessage : _returnMessages)
+ {
+ AMQMessage message = bouncedMessage.getAMQMessage();
+ session.getProtocolOutputConverter().writeReturn(message, _channelId, bouncedMessage.getReplyCode().getCode(),
+ new AMQShortString(bouncedMessage.getMessage()));
- message.decrementReference(_storeContext);
- }
+ message.decrementReference(_storeContext);
+ }
- _returnMessages.clear();
+ _returnMessages.clear();
+ }
}
public boolean wouldSuspend(AMQMessage msg)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/Main.java b/java/broker/src/main/java/org/apache/qpid/server/Main.java
index ab9f40b31d..d8a8cfb6d1 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/Main.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/Main.java
@@ -35,6 +35,7 @@ import org.apache.log4j.xml.DOMConfigurator;
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IoAcceptor;
import org.apache.mina.common.SimpleByteBufferAllocator;
+import org.apache.mina.common.FixedSizeByteBufferAllocator;
import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
import org.apache.mina.transport.socket.nio.SocketSessionConfig;
import org.apache.qpid.AMQException;
@@ -275,7 +276,7 @@ public class Main
// once more testing of the performance of the simple allocator has been done
if (!connectorConfig.enablePooledAllocator)
{
- ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
+ ByteBuffer.setAllocator(new FixedSizeByteBufferAllocator());
}
int port = connectorConfig.port;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
index b6b6ee39ce..12347c0278 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
@@ -212,10 +212,9 @@ public class DestNameExchange extends AbstractExchange
_logger.debug("Publishing message to queue " + queues);
}
- for (AMQQueue q : queues)
- {
- payload.enqueue(q);
- }
+ payload.enqueue(queues);
+
+
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
index 19172b98f3..dbe7a8938a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
@@ -26,6 +26,7 @@ import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.AMQShortStringTokenizer;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.management.MBeanConstructor;
import org.apache.qpid.server.management.MBeanDescription;
@@ -40,11 +41,7 @@ import javax.management.openmbean.CompositeDataSupport;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.StringTokenizer;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -84,12 +81,21 @@ public class DestWildExchange extends AbstractExchange
private static final Logger _logger = Logger.getLogger(DestWildExchange.class);
- private ConcurrentHashMap<AMQShortString, List<AMQQueue>> _routingKey2queues =
+ private final ConcurrentHashMap<AMQShortString, List<AMQQueue>> _bindingKey2queues =
+ new ConcurrentHashMap<AMQShortString, List<AMQQueue>>();
+ private final ConcurrentHashMap<AMQShortString, List<AMQQueue>> _simpleBindingKey2queues =
+ new ConcurrentHashMap<AMQShortString, List<AMQQueue>>();
+ private final ConcurrentHashMap<AMQShortString, List<AMQQueue>> _wildCardBindingKey2queues =
new ConcurrentHashMap<AMQShortString, List<AMQQueue>>();
// private ConcurrentHashMap<AMQShortString, AMQQueue> _routingKey2queue = new ConcurrentHashMap<AMQShortString, AMQQueue>();
- private static final String TOPIC_SEPARATOR = ".";
- private static final String AMQP_STAR = "*";
- private static final String AMQP_HASH = "#";
+ private static final byte TOPIC_SEPARATOR = (byte)'.';
+ private static final AMQShortString TOPIC_SEPARATOR_AS_SHORTSTRING = new AMQShortString(".");
+ private static final AMQShortString AMQP_STAR_TOKEN = new AMQShortString("*");
+ private static final AMQShortString AMQP_HASH_TOKEN = new AMQShortString("#");
+ private ConcurrentHashMap<AMQShortString, AMQShortString[]> _bindingKey2Tokenized =
+ new ConcurrentHashMap<AMQShortString, AMQShortString[]>();
+ private static final byte HASH_BYTE = (byte)'#';
+ private static final byte STAR_BYTE = (byte)'*';
/** DestWildExchangeMBean class implements the management interface for the Topic exchanges. */
@MBeanDescription("Management Bean for Topic Exchange")
@@ -107,7 +113,7 @@ public class DestWildExchange extends AbstractExchange
public TabularData bindings() throws OpenDataException
{
_bindingList = new TabularDataSupport(_bindinglistDataType);
- for (Map.Entry<AMQShortString, List<AMQQueue>> entry : _routingKey2queues.entrySet())
+ for (Map.Entry<AMQShortString, List<AMQQueue>> entry : _bindingKey2queues.entrySet())
{
AMQShortString key = entry.getKey();
List<String> queueList = new ArrayList<String>();
@@ -156,27 +162,75 @@ public class DestWildExchange extends AbstractExchange
assert queue != null;
assert rKey != null;
- AMQShortString routingKey = normalize(rKey);
+ _logger.debug("Registering queue " + queue.getName() + " with routing key " + rKey);
- _logger.debug("Registering queue " + queue.getName() + " with routing key " + routingKey);
// we need to use putIfAbsent, which is an atomic operation, to avoid a race condition
- List<AMQQueue> queueList = _routingKey2queues.putIfAbsent(routingKey, new CopyOnWriteArrayList<AMQQueue>());
+ List<AMQQueue> queueList = _bindingKey2queues.putIfAbsent(rKey, new CopyOnWriteArrayList<AMQQueue>());
+
+
+
+
+
+
+
// if we got null back, no previous value was associated with the specified routing key hence
// we need to read back the new value just put into the map
if (queueList == null)
{
- queueList = _routingKey2queues.get(routingKey);
+ queueList = _bindingKey2queues.get(rKey);
}
+
+
if (!queueList.contains(queue))
{
queueList.add(queue);
+
+
+ if(rKey.contains(HASH_BYTE) || rKey.contains(STAR_BYTE))
+ {
+ AMQShortString routingKey = normalize(rKey);
+ List<AMQQueue> queueList2 = _wildCardBindingKey2queues.putIfAbsent(routingKey, new CopyOnWriteArrayList<AMQQueue>());
+
+ if(queueList2 == null)
+ {
+ queueList2 = _wildCardBindingKey2queues.get(routingKey);
+ AMQShortStringTokenizer keyTok = routingKey.tokenize(TOPIC_SEPARATOR);
+
+ ArrayList<AMQShortString> keyTokList = new ArrayList<AMQShortString>(keyTok.countTokens());
+
+ while (keyTok.hasMoreTokens())
+ {
+ keyTokList.add(keyTok.nextToken());
+ }
+
+ _bindingKey2Tokenized.put(routingKey, keyTokList.toArray(new AMQShortString[keyTokList.size()]));
+ }
+ queueList2.add(queue);
+
+ }
+ else
+ {
+ List<AMQQueue> queueList2 = _simpleBindingKey2queues.putIfAbsent(rKey, new CopyOnWriteArrayList<AMQQueue>());
+ if(queueList2 == null)
+ {
+ queueList2 = _simpleBindingKey2queues.get(rKey);
+ }
+ queueList2.add(queue);
+
+ }
+
+
+
+
}
else if (_logger.isDebugEnabled())
{
- _logger.debug("Queue " + queue + " is already registered with routing key " + routingKey);
+ _logger.debug("Queue " + queue + " is already registered with routing key " + rKey);
}
+
+
}
private AMQShortString normalize(AMQShortString routingKey)
@@ -186,53 +240,58 @@ public class DestWildExchange extends AbstractExchange
routingKey = AMQShortString.EMPTY_STRING;
}
- StringTokenizer routingTokens = new StringTokenizer(routingKey.toString(), TOPIC_SEPARATOR);
- List<String> _subscription = new ArrayList<String>();
+ AMQShortStringTokenizer routingTokens = routingKey.tokenize(TOPIC_SEPARATOR);
+
+ List<AMQShortString> subscriptionList = new ArrayList<AMQShortString>();
while (routingTokens.hasMoreTokens())
{
- _subscription.add(routingTokens.nextToken());
+ subscriptionList.add(routingTokens.nextToken());
}
- int size = _subscription.size();
+ int size = subscriptionList.size();
for (int index = 0; index < size; index++)
{
// if there are more levels
if ((index + 1) < size)
{
- if (_subscription.get(index).equals(AMQP_HASH))
+ if (subscriptionList.get(index).equals(AMQP_HASH_TOKEN))
{
- if (_subscription.get(index + 1).equals(AMQP_HASH))
+ if (subscriptionList.get(index + 1).equals(AMQP_HASH_TOKEN))
{
// we don't need #.# delete this one
- _subscription.remove(index);
+ subscriptionList.remove(index);
size--;
// redo this normalisation
index--;
}
- if (_subscription.get(index + 1).equals(AMQP_STAR))
+ if (subscriptionList.get(index + 1).equals(AMQP_STAR_TOKEN))
{
// we don't want #.* swap to *.#
// remove it and put it in at index + 1
- _subscription.add(index + 1, _subscription.remove(index));
+ subscriptionList.add(index + 1, subscriptionList.remove(index));
}
}
} // if we have more levels
}
- StringBuilder sb = new StringBuilder();
- for (String s : _subscription)
+
+ AMQShortString normalizedString = AMQShortString.join(subscriptionList, TOPIC_SEPARATOR_AS_SHORTSTRING);
+/*
+ StringBuilder sb = new StringBuilder();
+ for (AMQShortString s : subscriptionList)
{
sb.append(s);
sb.append(TOPIC_SEPARATOR);
}
sb.deleteCharAt(sb.length() - 1);
+*/
- return new AMQShortString(sb.toString());
+ return normalizedString;
}
public void route(AMQMessage payload) throws AMQException
@@ -254,19 +313,14 @@ public class DestWildExchange extends AbstractExchange
else
{
_logger.warn("No queues found for routing key " + routingKey);
- _logger.warn("Routing map contains: " + _routingKey2queues);
+ _logger.warn("Routing map contains: " + _bindingKey2queues);
return;
}
}
- for (AMQQueue q : queues)
- {
- // TODO: modify code generator to add clone() method then clone the deliver body
- // without this addition we have a race condition - we will be modifying the body
- // before the encoder has encoded the body for delivery
- payload.enqueue(q);
- }
+ payload.enqueue(queues);
+
}
public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
@@ -276,21 +330,21 @@ public class DestWildExchange extends AbstractExchange
public boolean isBound(AMQShortString routingKey, AMQQueue queue)
{
- List<AMQQueue> queues = _routingKey2queues.get(normalize(routingKey));
+ List<AMQQueue> queues = _bindingKey2queues.get(normalize(routingKey));
return (queues != null) && queues.contains(queue);
}
public boolean isBound(AMQShortString routingKey)
{
- List<AMQQueue> queues = _routingKey2queues.get(normalize(routingKey));
+ List<AMQQueue> queues = _bindingKey2queues.get(normalize(routingKey));
return (queues != null) && !queues.isEmpty();
}
public boolean isBound(AMQQueue queue)
{
- for (List<AMQQueue> queues : _routingKey2queues.values())
+ for (List<AMQQueue> queues : _bindingKey2queues.values())
{
if (queues.contains(queue))
{
@@ -303,7 +357,7 @@ public class DestWildExchange extends AbstractExchange
public boolean hasBindings()
{
- return !_routingKey2queues.isEmpty();
+ return !_bindingKey2queues.isEmpty();
}
public synchronized void deregisterQueue(AMQShortString rKey, AMQQueue queue, FieldTable args) throws AMQException
@@ -311,13 +365,11 @@ public class DestWildExchange extends AbstractExchange
assert queue != null;
assert rKey != null;
- AMQShortString routingKey = normalize(rKey);
-
- List<AMQQueue> queues = _routingKey2queues.get(routingKey);
+ List<AMQQueue> queues = _bindingKey2queues.get(rKey);
if (queues == null)
{
throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue + " was not registered with exchange " + this.getName()
- + " with routing key " + routingKey + ". No queue was registered with that _routing key");
+ + " with routing key " + rKey + ". No queue was registered with that _routing key");
}
@@ -325,12 +377,39 @@ public class DestWildExchange extends AbstractExchange
if (!removedQ)
{
throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue + " was not registered with exchange " + this.getName()
- + " with routing key " + routingKey);
+ + " with routing key " + rKey);
+ }
+
+
+ if(rKey.contains(HASH_BYTE) || rKey.contains(STAR_BYTE))
+ {
+ AMQShortString bindingKey = normalize(rKey);
+ List<AMQQueue> queues2 = _wildCardBindingKey2queues.get(bindingKey);
+ queues2.remove(queue);
+ if(queues2.isEmpty())
+ {
+ _wildCardBindingKey2queues.remove(bindingKey);
+ _bindingKey2Tokenized.remove(bindingKey);
+ }
+
}
+ else
+ {
+ List<AMQQueue> queues2 = _simpleBindingKey2queues.get(rKey);
+ queues2.remove(queue);
+ if(queues2.isEmpty())
+ {
+ _simpleBindingKey2queues.remove(rKey);
+ }
+
+ }
+
+
+
if (queues.isEmpty())
{
- _routingKey2queues.remove(routingKey);
+ _bindingKey2queues.remove(rKey);
}
}
@@ -349,117 +428,167 @@ public class DestWildExchange extends AbstractExchange
public Map<AMQShortString, List<AMQQueue>> getBindings()
{
- return _routingKey2queues;
+ return _bindingKey2queues;
}
private List<AMQQueue> getMatchedQueues(AMQShortString routingKey)
{
- List<AMQQueue> list = new LinkedList<AMQQueue>();
- StringTokenizer routingTokens = new StringTokenizer(routingKey.toString(), TOPIC_SEPARATOR);
- ArrayList<String> routingkeyList = new ArrayList<String>();
+ List<AMQQueue> list = null;
- while (routingTokens.hasMoreTokens())
+ if(!_wildCardBindingKey2queues.isEmpty())
{
- String next = routingTokens.nextToken();
- if (next.equals(AMQP_HASH) && routingkeyList.get(routingkeyList.size() - 1).equals(AMQP_HASH))
- {
- continue;
- }
- routingkeyList.add(next);
- }
- for (AMQShortString queue : _routingKey2queues.keySet())
- {
- StringTokenizer queTok = new StringTokenizer(queue.toString(), TOPIC_SEPARATOR);
+ AMQShortStringTokenizer routingTokens = routingKey.tokenize(TOPIC_SEPARATOR);
+
+ final int routingTokensCount = routingTokens.countTokens();
- ArrayList<String> queueList = new ArrayList<String>();
- while (queTok.hasMoreTokens())
+ AMQShortString[] routingkeyTokens = new AMQShortString[routingTokensCount];
+
+ if(routingTokensCount == 1)
{
- queueList.add(queTok.nextToken());
+ routingkeyTokens[0] =routingKey;
}
+ else
+ {
- int depth = 0;
- boolean matching = true;
- boolean done = false;
- int routingskip = 0;
- int queueskip = 0;
- while (matching && !done)
- {
- if ((queueList.size() == (depth + queueskip)) || (routingkeyList.size() == (depth + routingskip)))
+ int token = 0;
+ while (routingTokens.hasMoreTokens())
{
- done = true;
- // if it was the routing key that ran out of digits
- if (routingkeyList.size() == (depth + routingskip))
- {
- if (queueList.size() > (depth + queueskip))
- { // a hash and it is the last entry
- matching =
- queueList.get(depth + queueskip).equals(AMQP_HASH)
- && (queueList.size() == (depth + queueskip + 1));
- }
- }
- else if (routingkeyList.size() > (depth + routingskip))
+ AMQShortString next = routingTokens.nextToken();
+ /* if (next.equals(AMQP_HASH) && routingkeyTokens.get(routingkeyTokens.size() - 1).equals(AMQP_HASH))
{
- // There is still more routing key to check
- matching = false;
+ continue;
}
+ */
- continue;
+ routingkeyTokens[token++] = next;
}
+ }
+ for (AMQShortString bindingKey : _wildCardBindingKey2queues.keySet())
+ {
+
+ AMQShortString[] bindingKeyTokens = _bindingKey2Tokenized.get(bindingKey);
+
+
+ boolean matching = true;
+ boolean done = false;
- // if the values on the two topics don't match
- if (!queueList.get(depth + queueskip).equals(routingkeyList.get(depth + routingskip)))
+ int depthPlusRoutingSkip = 0;
+ int depthPlusQueueSkip = 0;
+
+ final int bindingKeyTokensCount = bindingKeyTokens.length;
+
+ while (matching && !done)
{
- if (queueList.get(depth + queueskip).equals(AMQP_STAR))
+
+ if ((bindingKeyTokensCount == depthPlusQueueSkip) || (routingTokensCount == depthPlusRoutingSkip))
{
- depth++;
+ done = true;
+
+ // if it was the routing key that ran out of digits
+ if (routingTokensCount == depthPlusRoutingSkip)
+ {
+ if (bindingKeyTokensCount > depthPlusQueueSkip)
+ { // a hash and it is the last entry
+ matching =
+ bindingKeyTokens[depthPlusQueueSkip].equals(AMQP_HASH_TOKEN)
+ && (bindingKeyTokensCount == (depthPlusQueueSkip + 1));
+ }
+ }
+ else if (routingTokensCount > depthPlusRoutingSkip)
+ {
+ // There is still more routing key to check
+ matching = false;
+ }
continue;
}
- else if (queueList.get(depth + queueskip).equals(AMQP_HASH))
+
+ // if the values on the two topics don't match
+ if (!bindingKeyTokens[depthPlusQueueSkip].equals(routingkeyTokens[depthPlusRoutingSkip]))
{
- // Is this a # at the end
- if (queueList.size() == (depth + queueskip + 1))
+ if (bindingKeyTokens[depthPlusQueueSkip].equals(AMQP_STAR_TOKEN))
{
- done = true;
+ depthPlusQueueSkip++;
+ depthPlusRoutingSkip++;
continue;
}
-
- // otherwise # in the middle
- while (routingkeyList.size() > (depth + routingskip))
+ else if (bindingKeyTokens[depthPlusQueueSkip].equals(AMQP_HASH_TOKEN))
{
- if (routingkeyList.get(depth + routingskip).equals(queueList.get(depth + queueskip + 1)))
+ // Is this a # at the end
+ if (bindingKeyTokensCount == (depthPlusQueueSkip + 1))
+ {
+ done = true;
+
+ continue;
+ }
+
+ // otherwise # in the middle
+ while (routingTokensCount > depthPlusRoutingSkip)
{
- queueskip++;
- depth++;
+ if (routingkeyTokens[depthPlusRoutingSkip].equals(bindingKeyTokens[depthPlusQueueSkip + 1]))
+ {
+ depthPlusQueueSkip += 2;
+ depthPlusRoutingSkip++;
+
+ break;
+ }
- break;
+ depthPlusRoutingSkip++;
}
- routingskip++;
+ continue;
}
- continue;
+ matching = false;
}
- matching = false;
+ depthPlusQueueSkip++;
+ depthPlusRoutingSkip++;
}
- depth++;
+ if (matching)
+ {
+ if(list == null)
+ {
+ list = new ArrayList<AMQQueue>(_wildCardBindingKey2queues.get(bindingKey));
+ }
+ else
+ {
+ list.addAll(_wildCardBindingKey2queues.get(bindingKey));
+ }
+ }
}
- if (matching)
+ }
+ if(!_simpleBindingKey2queues.isEmpty())
+ {
+ List<AMQQueue> queues = _simpleBindingKey2queues.get(routingKey);
+ if(list == null)
+ {
+ if(queues == null)
+ {
+ list = Collections.EMPTY_LIST;
+ }
+ else
+ {
+ list = new ArrayList<AMQQueue>(queues);
+ }
+ }
+ else if(queues != null)
{
- list.addAll(_routingKey2queues.get(queue));
+ list.addAll(queues);
}
+
}
return list;
+
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
index 57ae2bb6d4..e7c887f306 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
@@ -42,6 +42,7 @@ import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
import java.util.List;
import java.util.Map;
+import java.util.ArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
public class FanoutExchange extends AbstractExchange
@@ -205,10 +206,8 @@ public class FanoutExchange extends AbstractExchange
_logger.debug("Publishing message to queue " + _queues);
}
- for (AMQQueue q : _queues)
- {
- payload.enqueue(q);
- }
+ payload.enqueue(new ArrayList(_queues));
+
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
index 3f604480b9..054674aed4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
@@ -29,7 +29,6 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.*;
import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0;
-import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -55,8 +54,8 @@ public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenB
AMQProtocolSession session = stateManager.getProtocolSession();
VirtualHost virtualHost = session.getVirtualHost();
- final AMQChannel channel = new AMQChannel(session,channelId, virtualHost.getMessageStore(),
- virtualHost.getExchangeRegistry());
+ final AMQChannel channel = new AMQChannel(session,channelId, virtualHost.getMessageStore()
+ );
session.addChannel(channel);
ChannelOpenOkBody response;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
index 98c77d8d32..d7a879180a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
@@ -100,8 +100,8 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0);
AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
- AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
+ AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
writeFrame(compositeBlock);
//
@@ -151,8 +151,8 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0);
AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
- AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
+ AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
writeFrame(compositeBlock);
//
@@ -247,14 +247,13 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
if (bodyFrameIterator.hasNext())
{
AMQDataBlock firstContentBody = bodyFrameIterator.next();
- AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame, headerAndFirstContent);
+ AMQDataBlock[] blocks = new AMQDataBlock[]{returnFrame, contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
writeFrame(compositeBlock);
}
else
{
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame,
- new AMQDataBlock[]{contentHeader});
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(new AMQDataBlock[]{returnFrame, contentHeader});
writeFrame(compositeBlock);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
index b14f03e617..646ef43826 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
@@ -12,11 +12,14 @@ import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.framing.*;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
{
private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
+ private static final ProtocolVersionMethodConverter PROTOCOL_METHOD_CONVERTER = METHOD_REGISTRY.getProtocolVersionMethodConverter();
public static Factory getInstanceFactory()
@@ -47,10 +50,9 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
throws AMQException
{
- AMQDataBlock deliver = createEncodedDeliverFrame(message, channelId, deliveryTag, consumerTag);
+ AMQBody deliverBody = createEncodedDeliverFrame(message, channelId, deliveryTag, consumerTag);
final ContentHeaderBody contentHeaderBody = message.getContentHeaderBody();
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
- contentHeaderBody);
+
final AMQMessageHandle messageHandle = message.getMessageHandle();
final StoreContext storeContext = message.getStoreContext();
@@ -60,8 +62,8 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
if(bodyCount == 0)
{
- SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
- contentHeader);
+ SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody,
+ contentHeaderBody);
writeFrame(compositeBlock);
}
@@ -75,9 +77,9 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
//
ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0);
- AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
- AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
+ AMQBody firstContentBody = PROTOCOL_METHOD_CONVERTER.convertToBody(cb);
+
+ CompositeAMQBodyBlock compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
writeFrame(compositeBlock);
//
@@ -86,7 +88,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
for(int i = 1; i < bodyCount; i++)
{
cb = messageHandle.getContentChunk(storeContext,messageId, i);
- writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
+ writeFrame(new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb)));
}
@@ -95,6 +97,14 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
}
+ private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
+ {
+
+ AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+ contentHeaderBody);
+ return contentHeader;
+ }
+
public void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException
{
@@ -106,8 +116,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
AMQFrame deliver = createEncodedGetOkFrame(message, channelId, deliveryTag, queueSize);
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
- message.getContentHeaderBody());
+ AMQDataBlock contentHeader = createContentHeaderBlock(channelId, message.getContentHeaderBody());
final int bodyCount = messageHandle.getBodyCount(storeContext,messageId);
if(bodyCount == 0)
@@ -126,9 +135,9 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
//
ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0);
- AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
- AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
+ AMQDataBlock firstContentBody = new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb));
+ AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
writeFrame(compositeBlock);
//
@@ -137,7 +146,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
for(int i = 1; i < bodyCount; i++)
{
cb = messageHandle.getContentChunk(storeContext, messageId, i);
- writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
+ writeFrame(new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb)));
}
@@ -147,7 +156,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
}
- private AMQDataBlock createEncodedDeliverFrame(AMQMessage message, final int channelId, final long deliveryTag, final AMQShortString consumerTag)
+ private AMQBody createEncodedDeliverFrame(AMQMessage message, final int channelId, final long deliveryTag, final AMQShortString consumerTag)
throws AMQException
{
final MessagePublishInfo pb = message.getMessagePublishInfo();
@@ -158,23 +167,53 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
final AMQShortString exchangeName = pb.getExchange();
final AMQShortString routingKey = pb.getRoutingKey();
- final AMQDataBlock returnBlock = new DeferredDataBlock()
+ final AMQBody returnBlock = new AMQBody()
{
- protected AMQDataBlock createAMQDataBlock()
+ public AMQBody _underlyingBody;
+
+ public AMQBody createAMQBody()
{
- BasicDeliverBody deliverBody =
- METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
+ return METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
deliveryTag,
isRedelivered,
exchangeName,
routingKey);
- AMQFrame deliverFrame = deliverBody.generateFrame(channelId);
- return deliverFrame;
+
+
}
+
+ public byte getFrameType()
+ {
+ return AMQMethodBody.TYPE;
+ }
+
+ public int getSize()
+ {
+ if(_underlyingBody == null)
+ {
+ _underlyingBody = createAMQBody();
+ }
+ return _underlyingBody.getSize();
+ }
+
+ public void writePayload(ByteBuffer buffer)
+ {
+ if(_underlyingBody == null)
+ {
+ _underlyingBody = createAMQBody();
+ }
+ _underlyingBody.writePayload(buffer);
+ }
+
+ public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession)
+ throws AMQException
+ {
+ throw new AMQException("This block should never be dispatched!");
+ }
};
return returnBlock;
}
@@ -225,8 +264,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
{
AMQDataBlock returnFrame = createEncodedReturnFrame(message, channelId, replyCode, replyText);
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
- message.getContentHeaderBody());
+ AMQDataBlock contentHeader = createContentHeaderBlock(channelId, message.getContentHeaderBody());
Iterator<AMQDataBlock> bodyFrameIterator = message.getBodyFrameIterator(getProtocolSession(), channelId);
//
@@ -236,14 +274,13 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
if (bodyFrameIterator.hasNext())
{
AMQDataBlock firstContentBody = bodyFrameIterator.next();
- AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame, headerAndFirstContent);
+ AMQDataBlock[] blocks = new AMQDataBlock[]{returnFrame, contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
writeFrame(compositeBlock);
}
else
{
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame,
- new AMQDataBlock[]{contentHeader});
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(new AMQDataBlock[]{returnFrame, contentHeader});
writeFrame(compositeBlock);
}
@@ -272,4 +309,64 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
writeFrame(basicCancelOkBody.generateFrame(channelId));
}
+
+
+ public static final class CompositeAMQBodyBlock extends AMQDataBlock
+ {
+ public static final int OVERHEAD = 3 * AMQFrame.getFrameOverhead();
+
+ private final AMQBody _methodBody;
+ private final AMQBody _headerBody;
+ private final AMQBody _contentBody;
+ private final int _channel;
+
+
+ public CompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody, AMQBody contentBody)
+ {
+ _channel = channel;
+ _methodBody = methodBody;
+ _headerBody = headerBody;
+ _contentBody = contentBody;
+
+ }
+
+ public long getSize()
+ {
+ return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize();
+ }
+
+ public void writePayload(ByteBuffer buffer)
+ {
+ AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody);
+ }
+ }
+
+ public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock
+ {
+ public static final int OVERHEAD = 2 * AMQFrame.getFrameOverhead();
+
+ private final AMQBody _methodBody;
+ private final AMQBody _headerBody;
+ private final int _channel;
+
+
+ public SmallCompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody)
+ {
+ _channel = channel;
+ _methodBody = methodBody;
+ _headerBody = headerBody;
+
+ }
+
+ public long getSize()
+ {
+ return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ;
+ }
+
+ public void writePayload(ByteBuffer buffer)
+ {
+ AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
+ }
+ }
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index 0fe6d3636e..143ee5fa40 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -208,27 +208,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
{
_logger.debug("Frame Received: " + frame);
}
+
+
+ body.handle(channelId, this);
- if (body instanceof AMQMethodBody)
- {
- methodFrameReceived(channelId, (AMQMethodBody) body);
- }
- else if (body instanceof ContentHeaderBody)
- {
- contentHeaderReceived(channelId, (ContentHeaderBody) body);
- }
- else if (body instanceof ContentBody)
- {
- contentBodyReceived(channelId, (ContentBody) body);
- }
- else if (body instanceof HeartbeatBody)
- {
- // NO OP
- }
- else
- {
- _logger.warn("Unrecognised frame " + frame.getClass().getName());
- }
}
private void protocolInitiationReceived(ProtocolInitiation pi)
@@ -271,7 +254,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
}
}
- private void methodFrameReceived(int channelId, AMQMethodBody methodBody)
+ public void methodFrameReceived(int channelId, AMQMethodBody methodBody)
{
final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(channelId, methodBody);
@@ -365,7 +348,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
}
catch (Exception e)
{
- _stateManager.error(e);
+
for (AMQMethodListener listener : _frameListeners)
{
listener.error(e);
@@ -375,7 +358,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
}
}
- private void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException
+ public void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException
{
AMQChannel channel = getAndAssertChannel(channelId);
@@ -384,13 +367,18 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
}
- private void contentBodyReceived(int channelId, ContentBody body) throws AMQException
+ public void contentBodyReceived(int channelId, ContentBody body) throws AMQException
{
AMQChannel channel = getAndAssertChannel(channelId);
channel.publishContentBody(body, this);
}
+ public void heartbeatBodyReceived(int channelId, HeartbeatBody body)
+ {
+ // NO - OP
+ }
+
/**
* Convenience method that writes a frame to the protocol session. Equivalent to calling
* getProtocolSession().write().
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
index 543e043bed..db5d882f51 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
@@ -30,6 +30,7 @@ import org.apache.mina.filter.ReadThrottleFilterBuilder;
import org.apache.mina.filter.SSLFilter;
import org.apache.mina.filter.WriteBufferLimitFilterBuilder;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
+import org.apache.mina.filter.codec.QpidProtocolCodecFilter;
import org.apache.mina.filter.executor.ExecutorFilter;
import org.apache.mina.transport.socket.nio.SocketSessionConfig;
import org.apache.mina.util.SessionUtil;
@@ -82,7 +83,7 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter
createSession(protocolSession, _applicationRegistry, codecFactory);
_logger.info("Protocol session created for:" + protocolSession.getRemoteAddress());
- final ProtocolCodecFilter pcf = new ProtocolCodecFilter(codecFactory);
+ final QpidProtocolCodecFilter pcf = new QpidProtocolCodecFilter(codecFactory);
ConnectorConfiguration connectorConfig = ApplicationRegistry.getInstance().
getConfiguredObject(ConnectorConfiguration.class);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index 80158779b2..5e79ab46b0 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -83,7 +83,7 @@ public class AMQMessage
private long _expiration;
- private final int hashcode = System.identityHashCode(this);
+
private Exchange _exchange;
private static final boolean SYNCED_CLOCKS =
@@ -92,7 +92,7 @@ public class AMQMessage
public String debugIdentity()
{
- return "(HC:" + hashcode + " ID:" + _messageId + " Ref:" + _referenceCount.get() + ")";
+ return "(HC:" + System.identityHashCode(this) + " ID:" + _messageId + " Ref:" + _referenceCount.get() + ")";
}
public void setExpiration()
@@ -141,6 +141,11 @@ public class AMQMessage
_exchange.route(this);
}
+ public void enqueue(final List<AMQQueue> queues)
+ {
+ _transientMessageData.setDestinationQueues(queues);
+ }
+
/**
* Used to iterate through all the body frames associated with this message. Will not keep all the data in memory
* therefore is memory-efficient.
@@ -664,7 +669,7 @@ public class AMQMessage
}
finally
{
- destinationQueues.clear();
+
// Remove refence for routing process . Reference count should now == delivered queue count
decrementReference(storeContext);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index e1c1de29bd..4a0121700c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -37,9 +37,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
import javax.management.JMException;
import java.text.MessageFormat;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
+import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -158,6 +156,8 @@ public class AMQQueue implements Managable, Comparable
public AtomicLong _totalMessagesReceived = new AtomicLong();
+ private final Set<NotificationCheck> _notificationChecks = EnumSet.noneOf(NotificationCheck.class);
+
public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
throws AMQException
@@ -200,6 +200,13 @@ public class AMQQueue implements Managable, Comparable
_subscribers = subscribers;
_subscriptionFactory = subscriptionFactory;
_deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
+
+ // This ensure that the notification checks for the configured alerts are created.
+ setMaximumMessageAge(_maximumMessageAge);
+ setMaximumMessageCount(_maximumMessageCount);
+ setMaximumMessageSize(_maximumMessageSize);
+ setMaximumQueueDepth(_maximumQueueDepth);
+
}
private AMQQueueMBean createMBean() throws AMQException
@@ -214,7 +221,7 @@ public class AMQQueue implements Managable, Comparable
}
}
- public AMQShortString getName()
+ public final AMQShortString getName()
{
return _name;
}
@@ -540,9 +547,17 @@ public class AMQQueue implements Managable, Comparable
return _maximumMessageSize;
}
- public void setMaximumMessageSize(long value)
+ public void setMaximumMessageSize(final long maximumMessageSize)
{
- _maximumMessageSize = value;
+ _maximumMessageSize = maximumMessageSize;
+ if(maximumMessageSize == 0L)
+ {
+ _notificationChecks.remove(NotificationCheck.MESSAGE_SIZE_ALERT);
+ }
+ else
+ {
+ _notificationChecks.add(NotificationCheck.MESSAGE_SIZE_ALERT);
+ }
}
public int getConsumerCount()
@@ -565,9 +580,20 @@ public class AMQQueue implements Managable, Comparable
return _maximumMessageCount;
}
- public void setMaximumMessageCount(long value)
+ public void setMaximumMessageCount(final long maximumMessageCount)
{
- _maximumMessageCount = value;
+ _maximumMessageCount = maximumMessageCount;
+ if(maximumMessageCount == 0L)
+ {
+ _notificationChecks.remove(NotificationCheck.MESSAGE_COUNT_ALERT);
+ }
+ else
+ {
+ _notificationChecks.add(NotificationCheck.MESSAGE_COUNT_ALERT);
+ }
+
+
+
}
public long getMaximumQueueDepth()
@@ -576,9 +602,18 @@ public class AMQQueue implements Managable, Comparable
}
// Sets the queue depth, the max queue size
- public void setMaximumQueueDepth(long value)
+ public void setMaximumQueueDepth(final long maximumQueueDepth)
{
- _maximumQueueDepth = value;
+ _maximumQueueDepth = maximumQueueDepth;
+ if(maximumQueueDepth == 0L)
+ {
+ _notificationChecks.remove(NotificationCheck.QUEUE_DEPTH_ALERT);
+ }
+ else
+ {
+ _notificationChecks.add(NotificationCheck.QUEUE_DEPTH_ALERT);
+ }
+
}
public long getOldestMessageArrivalTime()
@@ -661,6 +696,10 @@ public class AMQQueue implements Managable, Comparable
}
_subscribers.addSubscriber(subscription);
+ if(exclusive)
+ {
+ _subscribers.setExclusive(true);
+ }
}
private boolean isExclusive()
@@ -692,6 +731,7 @@ public class AMQQueue implements Managable, Comparable
ps, channel, consumerTag, this));
}
+ _subscribers.setExclusive(false);
Subscription removedSubscription;
if ((removedSubscription = _subscribers.removeSubscriber(_subscriptionFactory.createSubscription(channel, ps,
consumerTag)))
@@ -805,7 +845,7 @@ public class AMQQueue implements Managable, Comparable
public void process(StoreContext storeContext, QueueEntry entry, boolean deliverFirst) throws AMQException
{
AMQMessage msg = entry.getMessage();
- _deliveryMgr.deliver(storeContext, getName(), entry, deliverFirst);
+ _deliveryMgr.deliver(storeContext, _name, entry, deliverFirst);
try
{
msg.checkDeliveredToConsumer();
@@ -938,6 +978,14 @@ public class AMQQueue implements Managable, Comparable
public void setMaximumMessageAge(long maximumMessageAge)
{
_maximumMessageAge = maximumMessageAge;
+ if(maximumMessageAge == 0L)
+ {
+ _notificationChecks.remove(NotificationCheck.MESSAGE_AGE_ALERT);
+ }
+ else
+ {
+ _notificationChecks.add(NotificationCheck.MESSAGE_AGE_ALERT);
+ }
}
public void subscriberHasPendingResend(boolean hasContent, SubscriptionImpl subscription, QueueEntry entry)
@@ -966,4 +1014,9 @@ public class AMQQueue implements Managable, Comparable
}
}
}
+
+ public final Set<NotificationCheck> getNotificationChecks()
+ {
+ return _notificationChecks;
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
index 9e32de3f76..348a136f9d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
@@ -54,10 +54,7 @@ import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.Iterator;
-import java.util.List;
+import java.util.*;
/**
* AMQQueueMBean is the management bean for an {@link AMQQueue}.
@@ -97,6 +94,9 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length];
private Notification _lastNotification = null;
+
+
+
@MBeanConstructor("Creates an MBean exposing an AMQQueue")
public AMQQueueMBean(AMQQueue queue) throws JMException
{
@@ -249,16 +249,21 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
public void checkForNotification(AMQMessage msg) throws AMQException, JMException
{
- final long currentTime = System.currentTimeMillis();
- final long thresholdTime = currentTime - _queue.getMinimumAlertRepeatGap();
+ final Set<NotificationCheck> notificationChecks = _queue.getNotificationChecks();
- for (NotificationCheck check : NotificationCheck.values())
+ if(!notificationChecks.isEmpty())
{
- if (check.isMessageSpecific() || (_lastNotificationTimes[check.ordinal()] < thresholdTime))
+ final long currentTime = System.currentTimeMillis();
+ final long thresholdTime = currentTime - _queue.getMinimumAlertRepeatGap();
+
+ for (NotificationCheck check : notificationChecks)
{
- if (check.notifyIfNecessary(msg, _queue, this))
+ if (check.isMessageSpecific() || (_lastNotificationTimes[check.ordinal()] < thresholdTime))
{
- _lastNotificationTimes[check.ordinal()] = currentTime;
+ if (check.notifyIfNecessary(msg, _queue, this))
+ {
+ _lastNotificationTimes[check.ordinal()] = currentTime;
+ }
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index e3c8d3f17a..a61d41e33b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -363,8 +363,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
protocolSession.getProtocolOutputConverter().writeGetOk(entry.getMessage(), channel.getChannelId(),
deliveryTag, _queue.getMessageCount());
- _totalMessageSize.addAndGet(-entry.getSize());
+
}
+ _totalMessageSize.addAndGet(-entry.getSize());
if (!acks)
{
@@ -918,7 +919,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
if (!s.isSuspended())
{
- if (_log.isDebugEnabled())
+ if (debugEnabled)
{
_log.debug(debugIdentity() + "Delivering Message:" + entry.getMessage().debugIdentity() + " to(" +
System.identityHashCode(s) + ") :" + s);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java b/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
index 6b3d65661f..6f9efd3200 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
@@ -29,9 +29,9 @@ public enum NotificationCheck
{
boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
{
- int msgCount = queue.getMessageCount();
+ int msgCount;
final long maximumMessageCount = queue.getMaximumMessageCount();
- if (maximumMessageCount!= 0 && msgCount >= maximumMessageCount)
+ if (maximumMessageCount!= 0 && (msgCount = queue.getMessageCount()) >= maximumMessageCount)
{
listener.notifyClients(this, queue, msgCount + ": Maximum count on queue threshold ("+ maximumMessageCount +") breached.");
return true;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
index 7f0accf052..6e68b5637e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -292,14 +292,17 @@ public class SubscriptionImpl implements Subscription
queue.dequeue(storeContext, entry);
}
+/*
+ if (_sendLock.get())
+ {
+ _logger.error("Sending " + entry + " when subscriber(" + this + ") is closed!");
+ }
+*/
+
synchronized (channel)
{
long deliveryTag = channel.getNextDeliveryTag();
- if (_sendLock.get())
- {
- _logger.error("Sending " + entry + " when subscriber(" + this + ") is closed!");
- }
if (_acks)
{
@@ -308,10 +311,11 @@ public class SubscriptionImpl implements Subscription
protocolSession.getProtocolOutputConverter().writeDeliver(entry.getMessage(), channel.getChannelId(), deliveryTag, consumerTag);
- if (!_acks)
- {
- entry.getMessage().decrementReference(storeContext);
- }
+
+ }
+ if (!_acks)
+ {
+ entry.getMessage().decrementReference(storeContext);
}
}
finally
@@ -367,59 +371,60 @@ public class SubscriptionImpl implements Subscription
// return false;
}
- final AMQProtocolSession publisher = entry.getMessage().getPublisher();
+
//todo - client id should be recoreded and this test removed but handled below
- if (_noLocal && publisher != null)
+ if (_noLocal)
{
- // We don't want local messages so check to see if message is one we sent
- Object localInstance;
- Object msgInstance;
- if ((protocolSession.getClientProperties() != null) &&
- (localInstance = protocolSession.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
+ final AMQProtocolSession publisher = entry.getMessage().getPublisher();
+ if(publisher != null)
+
{
+ // We don't want local messages so check to see if message is one we sent
+ Object localInstance;
+ Object msgInstance;
- if ((publisher.getClientProperties() != null) &&
- (msgInstance = publisher.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
+ if ((protocolSession.getClientProperties() != null) &&
+ (localInstance = protocolSession.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
{
- if (localInstance == msgInstance || localInstance.equals(msgInstance))
+
+ if ((publisher.getClientProperties() != null) &&
+ (msgInstance = publisher.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
{
-// if (_logger.isTraceEnabled())
-// {
-// _logger.trace("(" + debugIdentity() + ") has no interest as it is a local message(" +
-// msg.debugIdentity() + ")");
-// }
- return false;
+ if (localInstance == msgInstance || localInstance.equals(msgInstance))
+ {
+ // if (_logger.isTraceEnabled())
+ // {
+ // _logger.trace("(" + debugIdentity() + ") has no interest as it is a local message(" +
+ // msg.debugIdentity() + ")");
+ // }
+ return false;
+ }
}
}
- }
- else
- {
+ else
+ {
- localInstance = protocolSession.getClientIdentifier();
- //todo - client id should be recoreded and this test removed but handled here
+ localInstance = protocolSession.getClientIdentifier();
+ //todo - client id should be recoreded and this test removed but handled here
- msgInstance = publisher.getClientIdentifier();
- if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance)))
- {
-// if (_logger.isTraceEnabled())
-// {
-// _logger.trace("(" + debugIdentity() + ") has no interest as it is a local message(" +
-// msg.debugIdentity() + ")");
-// }
- return false;
+ msgInstance = publisher.getClientIdentifier();
+ if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance)))
+ {
+ // if (_logger.isTraceEnabled())
+ // {
+ // _logger.trace("(" + debugIdentity() + ") has no interest as it is a local message(" +
+ // msg.debugIdentity() + ")");
+ // }
+ return false;
+ }
}
- }
-
+ }
}
- if (_logger.isDebugEnabled())
- {
- _logger.debug("(" + debugIdentity() + ") checking filters for message (" + entry.debugIdentity());
- }
return checkFilters(entry);
}
@@ -433,23 +438,7 @@ public class SubscriptionImpl implements Subscription
private boolean checkFilters(QueueEntry msg)
{
- if (_filters != null)
- {
-// if (_logger.isTraceEnabled())
-// {
-// _logger.trace("(" + debugIdentity() + ") has filters.");
-// }
- return _filters.allAllow(msg.getMessage());
- }
- else
- {
-// if (_logger.isTraceEnabled())
-// {
-// _logger.trace("(" + debugIdentity() + ") has no filters");
-// }
-
- return true;
- }
+ return (_filters == null) || _filters.allAllow(msg.getMessage());
}
public Queue<QueueEntry> getPreDeliveryQueue()
@@ -613,7 +602,7 @@ public class SubscriptionImpl implements Subscription
public boolean wouldSuspend(QueueEntry msg)
{
- return channel.wouldSuspend(msg.getMessage());
+ return _acks && channel.wouldSuspend(msg.getMessage());
}
public Queue<QueueEntry> getResendQueue()
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
index b2f8cae8ff..b7cdaa29ab 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
@@ -39,6 +39,7 @@ class SubscriptionSet implements WeightedSubscriptionManager
private int _currentSubscriber;
private final Object _changeLock = new Object();
+ private volatile boolean _exclusive;
/** Accessor for unit tests. */
@@ -116,10 +117,7 @@ class SubscriptionSet implements WeightedSubscriptionManager
*/
public Subscription nextSubscriber(QueueEntry msg)
{
- if (_subscriptions.isEmpty())
- {
- return null;
- }
+
try
{
@@ -143,30 +141,64 @@ class SubscriptionSet implements WeightedSubscriptionManager
private Subscription nextSubscriberImpl(QueueEntry msg)
{
- final ListIterator<Subscription> iterator = _subscriptions.listIterator(_currentSubscriber);
- while (iterator.hasNext())
+ if(_exclusive)
{
- Subscription subscription = iterator.next();
- ++_currentSubscriber;
- subscriberScanned();
-
- if (!(subscription.isSuspended() || subscription.wouldSuspend(msg)))
+ try
{
- if (subscription.hasInterest(msg))
+ Subscription subscription = _subscriptions.get(0);
+ subscriberScanned();
+
+ if (!(subscription.isSuspended() || subscription.wouldSuspend(msg)))
{
- // if the queue is not empty then this client is ready to receive a message.
- //FIXME the queue could be full of sent messages.
- // Either need to clean all PDQs after sending a message
- // OR have a clean up thread that runs the PDQs expunging the messages.
- if (!subscription.filtersMessages() || subscription.getPreDeliveryQueue().isEmpty())
+ if (subscription.hasInterest(msg))
{
- return subscription;
+ // if the queue is not empty then this client is ready to receive a message.
+ //FIXME the queue could be full of sent messages.
+ // Either need to clean all PDQs after sending a message
+ // OR have a clean up thread that runs the PDQs expunging the messages.
+ if (!subscription.filtersMessages() || subscription.getPreDeliveryQueue().isEmpty())
+ {
+ return subscription;
+ }
}
}
}
+ catch(IndexOutOfBoundsException e)
+ {
+ }
+ return null;
}
+ else
+ {
+ if (_subscriptions.isEmpty())
+ {
+ return null;
+ }
+ final ListIterator<Subscription> iterator = _subscriptions.listIterator(_currentSubscriber);
+ while (iterator.hasNext())
+ {
+ Subscription subscription = iterator.next();
+ ++_currentSubscriber;
+ subscriberScanned();
- return null;
+ if (!(subscription.isSuspended() || subscription.wouldSuspend(msg)))
+ {
+ if (subscription.hasInterest(msg))
+ {
+ // if the queue is not empty then this client is ready to receive a message.
+ //FIXME the queue could be full of sent messages.
+ // Either need to clean all PDQs after sending a message
+ // OR have a clean up thread that runs the PDQs expunging the messages.
+ if (!subscription.filtersMessages() || subscription.getPreDeliveryQueue().isEmpty())
+ {
+ return subscription;
+ }
+ }
+ }
+ }
+
+ return null;
+ }
}
/** Overridden in test classes. */
@@ -233,5 +265,14 @@ class SubscriptionSet implements WeightedSubscriptionManager
{
return _changeLock;
}
-
+
+ public void setExclusive(final boolean exclusive)
+ {
+ _exclusive = exclusive;
+ }
+
+ public boolean getExcBoolean()
+ {
+ return _exclusive;
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java b/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java
index 79ee6b93a3..9b91c71a1d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java
@@ -22,6 +22,8 @@ package org.apache.qpid.server.queue;
import java.util.LinkedList;
import java.util.List;
+import java.util.ArrayList;
+import java.util.Collections;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
@@ -60,7 +62,7 @@ public class TransientMessageData
* delivered. It is <b>cleared after delivery has been attempted</b>. Any persistent record of destinations is done
* by the message handle.
*/
- private List<AMQQueue> _destinationQueues = new LinkedList<AMQQueue>();
+ private List<AMQQueue> _destinationQueues;
public MessagePublishInfo getMessagePublishInfo()
{
@@ -74,7 +76,7 @@ public class TransientMessageData
public List<AMQQueue> getDestinationQueues()
{
- return _destinationQueues;
+ return _destinationQueues == null ? (List<AMQQueue>) Collections.EMPTY_LIST : _destinationQueues;
}
public void setDestinationQueues(List<AMQQueue> destinationQueues)
@@ -109,6 +111,10 @@ public class TransientMessageData
public void addDestinationQueue(AMQQueue queue)
{
+ if(_destinationQueues == null)
+ {
+ _destinationQueues = new ArrayList<AMQQueue>();
+ }
_destinationQueues.add(queue);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
index 047cef9064..1e4b69c935 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
@@ -49,11 +49,11 @@ public class NonTransactionalContext implements TransactionalContext
/** Where to put undeliverable messages */
private final List<RequiredDeliveryException> _returnMessages;
- private Set<Long> _browsedAcks;
+ private final Set<Long> _browsedAcks;
private final MessageStore _messageStore;
- private StoreContext _storeContext;
+ private final StoreContext _storeContext;
/** Whether we are in a transaction */
private boolean _inTran;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index 9addf83e01..afe96bcd4f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.virtualhost;
import javax.management.NotCompliantMBeanException;
import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.log4j.Logger;
import org.apache.qpid.server.AMQBrokerManagerMBean;
import org.apache.qpid.server.security.access.AccessManager;
@@ -123,7 +124,7 @@ public class VirtualHost implements Accessable
*/
public VirtualHost(String name, MessageStore store) throws Exception
{
- this(name, null, store);
+ this(name, new PropertiesConfiguration(), store);
}
/**
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
index fbd9e65480..ed79384d42 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
@@ -173,7 +173,7 @@ public class AMQQueueAlertTest extends TestCase
public void testQueueDepthAlertWithSubscribers() throws Exception
{
protocolSession = new TestMinaProtocolSession();
- AMQChannel channel = new AMQChannel(protocolSession, 2, _messageStore, null);
+ AMQChannel channel = new AMQChannel(protocolSession, 2, _messageStore);
protocolSession.addChannel(channel);
// Create queue
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index e72e1bf1f0..c02b47e9fd 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -123,7 +123,7 @@ public class AMQQueueMBeanTest extends TestCase
TestMinaProtocolSession protocolSession = new TestMinaProtocolSession();
- AMQChannel channel = new AMQChannel(protocolSession, 1, _messageStore, null);
+ AMQChannel channel = new AMQChannel(protocolSession, 1, _messageStore);
protocolSession.addChannel(channel);
_queue.registerProtocolSession(protocolSession, 1, new AMQShortString("test"), false, null, false, false);
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 79d92f7705..b60a8dfaad 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -73,6 +73,105 @@ import java.util.concurrent.atomic.AtomicInteger;
public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable
{
+ private static final class ChannelToSessionMap
+ {
+ private final AMQSession[] _fastAccessSessions = new AMQSession[16];
+ private final LinkedHashMap<Integer, AMQSession> _slowAccessSessions = new LinkedHashMap<Integer, AMQSession>();
+ private int _size = 0;
+ private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
+
+ public AMQSession get(int channelId)
+ {
+ if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
+ {
+ return _fastAccessSessions[channelId];
+ }
+ else
+ {
+ return _slowAccessSessions.get(channelId);
+ }
+ }
+
+ public AMQSession put(int channelId, AMQSession session)
+ {
+ AMQSession oldVal;
+ if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
+ {
+ oldVal = _fastAccessSessions[channelId];
+ _fastAccessSessions[channelId] = session;
+ }
+ else
+ {
+ oldVal = _slowAccessSessions.put(channelId, session);
+ }
+ if((oldVal != null) && (session == null))
+ {
+ _size--;
+ }
+ else if((oldVal == null) && (session != null))
+ {
+ _size++;
+ }
+
+ return session;
+
+ }
+
+
+ public AMQSession remove(int channelId)
+ {
+ AMQSession session;
+ if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
+ {
+ session = _fastAccessSessions[channelId];
+ _fastAccessSessions[channelId] = null;
+ }
+ else
+ {
+ session = _slowAccessSessions.remove(channelId);
+ }
+
+ if(session != null)
+ {
+ _size--;
+ }
+ return session;
+
+ }
+
+ public Collection<AMQSession> values()
+ {
+ ArrayList<AMQSession> values = new ArrayList<AMQSession>(size());
+
+ for(int i = 0; i < 16; i++)
+ {
+ if(_fastAccessSessions[i] != null)
+ {
+ values.add(_fastAccessSessions[i]);
+ }
+ }
+ values.addAll(_slowAccessSessions.values());
+
+ return values;
+ }
+
+ public int size()
+ {
+ return _size;
+ }
+
+ public void clear()
+ {
+ _size = 0;
+ _slowAccessSessions.clear();
+ for(int i = 0; i<16; i++)
+ {
+ _fastAccessSessions[i] = null;
+ }
+ }
+ }
+
+
private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class);
private AtomicInteger _idFactory = new AtomicInteger(0);
@@ -102,7 +201,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
private AMQProtocolHandler _protocolHandler;
/** Maps from session id (Integer) to AMQSession instance */
- private final Map<Integer, AMQSession> _sessions = new LinkedHashMap<Integer, AMQSession>();
+ private final ChannelToSessionMap _sessions = new ChannelToSessionMap();
private String _clientName;
@@ -757,10 +856,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
checkNotClosed();
if (!_started)
{
- final Iterator it = _sessions.entrySet().iterator();
+ final Iterator it = _sessions.values().iterator();
while (it.hasNext())
{
- final AMQSession s = (AMQSession) ((Map.Entry) it.next()).getValue();
+ final AMQSession s = (AMQSession) (it.next());
try
{
s.start();
@@ -1014,11 +1113,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
return _maximumFrameSize;
}
- public Map getSessions()
- {
- return _sessions;
- }
-
public String getUsername()
{
return _username;
@@ -1239,6 +1333,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
// _protocolHandler.addSessionByChannel(s.getChannelId(), s);
reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.getTransacted());
s.resubscribe();
+ s.setFlowControl(true);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 0c94216597..87c813982e 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -78,10 +78,7 @@ import javax.jms.TopicSubscriber;
import javax.jms.TransactionRolledBackException;
import java.io.Serializable;
import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
@@ -107,6 +104,89 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class AMQSession extends Closeable implements Session, QueueSession, TopicSession
{
+ private static final class IdToConsumerMap
+ {
+ private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16];
+ private final ConcurrentHashMap<Integer, BasicMessageConsumer> _slowAccessConsumers = new ConcurrentHashMap<Integer, BasicMessageConsumer>();
+
+
+ public BasicMessageConsumer get(int id)
+ {
+ if((id & 0xFFFFFFF0) == 0)
+ {
+ return _fastAccessConsumers[id];
+ }
+ else
+ {
+ return _slowAccessConsumers.get(id);
+ }
+ }
+
+ public BasicMessageConsumer put(int id, BasicMessageConsumer consumer)
+ {
+ BasicMessageConsumer oldVal;
+ if((id & 0xFFFFFFF0) == 0)
+ {
+ oldVal = _fastAccessConsumers[id];
+ _fastAccessConsumers[id] = consumer;
+ }
+ else
+ {
+ oldVal = _slowAccessConsumers.put(id, consumer);
+ }
+
+ return consumer;
+
+ }
+
+
+ public BasicMessageConsumer remove(int id)
+ {
+ BasicMessageConsumer consumer;
+ if((id & 0xFFFFFFF0) == 0)
+ {
+ consumer = _fastAccessConsumers[id];
+ _fastAccessConsumers[id] = null;
+ }
+ else
+ {
+ consumer = _slowAccessConsumers.remove(id);
+ }
+
+ return consumer;
+
+ }
+
+ public Collection<BasicMessageConsumer> values()
+ {
+ ArrayList<BasicMessageConsumer> values = new ArrayList<BasicMessageConsumer>();
+
+ for(int i = 0; i < 16; i++)
+ {
+ if(_fastAccessConsumers[i] != null)
+ {
+ values.add(_fastAccessConsumers[i]);
+ }
+ }
+ values.addAll(_slowAccessConsumers.values());
+
+ return values;
+ }
+
+
+ public void clear()
+ {
+ _slowAccessConsumers.clear();
+ for(int i = 0; i<16; i++)
+ {
+ _fastAccessConsumers[i] = null;
+ }
+ }
+ }
+
+
+
+
/** Used for debugging. */
private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
@@ -156,7 +236,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private boolean _transacted;
/** Holds the sessions acknowledgement mode. */
- private int _acknowledgeMode;
+ private final int _acknowledgeMode;
/** Holds this session unique identifier, used to distinguish it from other sessions. */
private int _channelId;
@@ -217,8 +297,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
* Maps from identifying tags to message consumers, in order to pass dispatch incoming messages to the right
* consumer.
*/
- private Map<AMQShortString, BasicMessageConsumer> _consumers =
- new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>();
+ private final IdToConsumerMap _consumers = new IdToConsumerMap();
+
+ //Map<AMQShortString, BasicMessageConsumer> _consumers =
+ //new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>();
/**
* Contains a list of consumers which have been removed but which might still have
@@ -281,6 +363,27 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
/** Has failover occured on this session */
private boolean _failedOver;
+
+
+ private static final class FlowControlIndicator
+ {
+ private volatile boolean _flowControl = true;
+
+ public synchronized void setFlowControl(boolean flowControl)
+ {
+ _flowControl= flowControl;
+ notify();
+ }
+
+ public boolean getFlowControl()
+ {
+ return _flowControl;
+ }
+ }
+
+ /** Flow control */
+ private FlowControlIndicator _flowControl = new FlowControlIndicator();
+
/**
* Creates a new session on a connection.
*
@@ -327,24 +430,20 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
public void aboveThreshold(int currentValue)
{
- if (_acknowledgeMode == NO_ACKNOWLEDGE)
- {
_logger.debug(
"Above threshold(" + _defaultPrefetchHighMark
+ ") so suspending channel. Current value is " + currentValue);
new Thread(new SuspenderRunner(true)).start();
- }
+
}
public void underThreshold(int currentValue)
{
- if (_acknowledgeMode == NO_ACKNOWLEDGE)
- {
_logger.debug(
"Below threshold(" + _defaultPrefetchLowMark
+ ") so unsuspending channel. Current value is " + currentValue);
new Thread(new SuspenderRunner(false)).start();
- }
+
}
});
}
@@ -662,7 +761,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
// Remove the consumer from the map
- BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag);
+ BasicMessageConsumer consumer = _consumers.get(consumerTag.toIntValue());
if (consumer != null)
{
// fixme this isn't right.. needs to check if _queue contains data for this consumer
@@ -744,6 +843,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
false, false);
}
+
+ public MessageConsumer createExclusiveConsumer(Destination destination) throws JMSException
+ {
+ checkValidDestination(destination);
+
+ return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, true, null, null,
+ false, false);
+ }
+
+
public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
{
checkValidDestination(destination);
@@ -761,6 +870,17 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
messageSelector, null, false, false);
}
+
+ public MessageConsumer createExclusiveConsumer(Destination destination, String messageSelector, boolean noLocal)
+ throws JMSException
+ {
+ checkValidDestination(destination);
+
+ return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, true,
+ messageSelector, null, false, false);
+ }
+
+
public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive,
String selector) throws JMSException
{
@@ -925,7 +1045,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
checkNotClosed();
- return new TopicPublisherAdapter((BasicMessageProducer) createProducer(topic), topic);
+ return new TopicPublisherAdapter((BasicMessageProducer) createProducer(topic,false,false), topic);
}
public Queue createQueue(String queueName) throws JMSException
@@ -1089,9 +1209,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
AMQTopic dest = checkValidTopic(topic);
// AMQTopic dest = new AMQTopic(topic.getTopicName());
- return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
+ return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createExclusiveConsumer(dest));
}
+
/**
* Creates a non-durable subscriber with a message selector
*
@@ -1109,7 +1230,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
AMQTopic dest = checkValidTopic(topic);
// AMQTopic dest = new AMQTopic(topic.getTopicName());
- return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal));
+ return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createExclusiveConsumer(dest, messageSelector, noLocal));
}
public TemporaryQueue createTemporaryQueue() throws JMSException
@@ -1276,15 +1397,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
+ "] received in session with channel id " + _channelId);
}
- if (message.getDeliverBody() == null)
+ if (message.isDeliverMessage())
{
- // Return of the bounced message.
- returnBouncedMessage(message);
+ _highestDeliveryTag.set(message.getDeliverBody().getDeliveryTag());
+ _queue.add(message);
}
else
{
- _highestDeliveryTag.set(message.getDeliverBody().getDeliveryTag());
- _queue.add(message);
+ // Return of the bounced message.
+ returnBouncedMessage(message);
}
}
@@ -1666,7 +1787,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
void deregisterConsumer(BasicMessageConsumer consumer)
{
- if (_consumers.remove(consumer.getConsumerTag()) != null)
+ if (_consumers.remove(consumer.getConsumerTag().toIntValue()) != null)
{
String subscriptionName = _reverseSubscriptionMap.remove(consumer);
if (subscriptionName != null)
@@ -2063,8 +2184,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName,
AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector) throws AMQException, FailoverException
{
+ int tagId = _nextTag++;
// need to generate a consumer tag on the client so we can exploit the nowait flag
- AMQShortString tag = new AMQShortString(Integer.toString(_nextTag++));
+ AMQShortString tag = new AMQShortString(Integer.toString(tagId));
FieldTable arguments = FieldTableFactory.newFieldTable();
if ((messageSelector != null) && !messageSelector.equals(""))
@@ -2084,7 +2206,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
consumer.setConsumerTag(tag);
// we must register the consumer in the map before we actually start listening
- _consumers.put(tag, consumer);
+ _consumers.put(tagId, consumer);
try
{
@@ -2112,7 +2234,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
catch (AMQException e)
{
// clean-up the map in the event of an error
- _consumers.remove(tag);
+ _consumers.remove(tagId);
throw e;
}
}
@@ -2659,6 +2781,25 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_ticket = ticket;
}
+ public void setFlowControl(final boolean active)
+ {
+ _flowControl.setFlowControl(active);
+ }
+
+
+ public void checkFlowControl() throws InterruptedException
+ {
+ synchronized(_flowControl)
+ {
+ while(!_flowControl.getFlowControl())
+ {
+ _flowControl.wait();
+ }
+ }
+
+ }
+
+
/** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
private class Dispatcher extends Thread
{
@@ -2850,10 +2991,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private void dispatchMessage(UnprocessedMessage message)
{
- if (message.getDeliverBody() != null)
+ final BasicDeliverBody deliverBody = message.getDeliverBody();
+ if (deliverBody != null)
{
final BasicMessageConsumer consumer =
- (BasicMessageConsumer) _consumers.get(message.getDeliverBody().getConsumerTag());
+ _consumers.get(deliverBody.getConsumerTag().toIntValue());
if ((consumer == null) || consumer.isClosed())
{
@@ -2862,13 +3004,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
if (consumer == null)
{
_dispatcherLogger.info("Dispatcher(" + dispatcherID + ")Received a message(" + System.identityHashCode(message) + ")" + "["
- + message.getDeliverBody().getDeliveryTag() + "] from queue "
- + message.getDeliverBody().getConsumerTag() + " )without a handler - rejecting(requeue)...");
+ + deliverBody.getDeliveryTag() + "] from queue "
+ + deliverBody.getConsumerTag() + " )without a handler - rejecting(requeue)...");
}
else
{
_dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "["
- + message.getDeliverBody().getDeliveryTag() + "] from queue " + " consumer("
+ + deliverBody.getDeliveryTag() + "] from queue " + " consumer("
+ consumer.debugIdentity() + ") is closed rejecting(requeue)...");
}
}
@@ -2880,7 +3022,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
else
{
- consumer.notifyMessage(message, _channelId);
+ consumer.notifyMessage(message);
}
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 488d22c4bd..bf11572163 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -26,11 +26,7 @@ import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicCancelBody;
-import org.apache.qpid.framing.BasicCancelOkBody;
-import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.*;
import org.apache.qpid.jms.MessageConsumer;
import org.apache.qpid.jms.Session;
import org.slf4j.Logger;
@@ -53,13 +49,13 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
private static final Logger _logger = LoggerFactory.getLogger(BasicMessageConsumer.class);
/** The connection being used by this consumer */
- private AMQConnection _connection;
+ private final AMQConnection _connection;
- private String _messageSelector;
+ private final String _messageSelector;
- private boolean _noLocal;
+ private final boolean _noLocal;
- private AMQDestination _destination;
+ private final AMQDestination _destination;
/** When true indicates that a blocking receive call is in progress */
private final AtomicBoolean _receiving = new AtomicBoolean(false);
@@ -70,7 +66,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
private AMQShortString _consumerTag;
/** We need to know the channel id when constructing frames */
- private int _channelId;
+ private final int _channelId;
/**
* Used in the blocking receive methods to receive a message from the Session thread. <p/> Or to notify of errors
@@ -78,36 +74,36 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
*/
private final ArrayBlockingQueue _synchronousQueue;
- private MessageFactoryRegistry _messageFactory;
+ private final MessageFactoryRegistry _messageFactory;
private final AMQSession _session;
- private AMQProtocolHandler _protocolHandler;
+ private final AMQProtocolHandler _protocolHandler;
/** We need to store the "raw" field table so that we can resubscribe in the event of failover being required */
- private FieldTable _rawSelectorFieldTable;
+ private final FieldTable _rawSelectorFieldTable;
/**
* We store the high water prefetch field in order to be able to reuse it when resubscribing in the event of
* failover
*/
- private int _prefetchHigh;
+ private final int _prefetchHigh;
/**
* We store the low water prefetch field in order to be able to reuse it when resubscribing in the event of
* failover
*/
- private int _prefetchLow;
+ private final int _prefetchLow;
/** We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover */
- private boolean _exclusive;
+ private final boolean _exclusive;
/**
* The acknowledge mode in force for this consumer. Note that the AMQP protocol allows different ack modes per
* consumer whereas JMS defines this at the session level, hence why we associate it with the consumer in our
* implementation.
*/
- private int _acknowledgeMode;
+ private final int _acknowledgeMode;
/** Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode */
private int _outstanding;
@@ -133,10 +129,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
* autoClose denotes that the consumer will automatically cancel itself when there are no more messages to receive
* on the queue. This is used for queue browsing.
*/
- private boolean _autoClose;
+ private final boolean _autoClose;
private boolean _closeWhenNoMessages;
- private boolean _noConsume;
+ private final boolean _noConsume;
private List<StackTraceElement> _closedStack = null;
protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination,
@@ -156,7 +152,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
_prefetchHigh = prefetchHigh;
_prefetchLow = prefetchLow;
_exclusive = exclusive;
- _acknowledgeMode = acknowledgeMode;
+
_synchronousQueue = new ArrayBlockingQueue(prefetchHigh, true);
_autoClose = autoClose;
_noConsume = noConsume;
@@ -166,6 +162,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
_acknowledgeMode = Session.NO_ACKNOWLEDGE;
}
+ else
+ {
+ _acknowledgeMode = acknowledgeMode;
+ }
}
public AMQDestination getDestination()
@@ -254,7 +254,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
switch (_acknowledgeMode)
{
case Session.DUPS_OK_ACKNOWLEDGE:
- _logger.info("Recording tag for acking on close:" + msg.getDeliveryTag());
_receivedDeliveryTags.add(msg.getDeliveryTag());
break;
@@ -269,7 +268,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
else
{
- _logger.info("Recording tag for commit:" + msg.getDeliveryTag());
_receivedDeliveryTags.add(msg.getDeliveryTag());
}
@@ -645,9 +643,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
* message listener or a synchronous receive() caller.
*
* @param messageFrame the raw unprocessed mesage
- * @param channelId channel on which this message was sent
*/
- void notifyMessage(UnprocessedMessage messageFrame, int channelId)
+ void notifyMessage(UnprocessedMessage messageFrame)
{
final boolean debug = _logger.isDebugEnabled();
@@ -658,10 +655,15 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
try
{
+ final BasicDeliverBody deliverBody = messageFrame.getDeliverBody();
+
AbstractJMSMessage jmsMessage =
- _messageFactory.createMessage(messageFrame.getDeliverBody().getDeliveryTag(),
- messageFrame.getDeliverBody().getRedelivered(), messageFrame.getDeliverBody().getExchange(),
- messageFrame.getDeliverBody().getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies());
+ _messageFactory.createMessage(deliverBody.getDeliveryTag(),
+ deliverBody.getRedelivered(),
+ deliverBody.getExchange(),
+ deliverBody.getRoutingKey(),
+ messageFrame.getContentHeader(),
+ messageFrame.getBodies());
if (debug)
{
@@ -673,11 +675,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
// if (!_closed.get())
{
- jmsMessage.setConsumer(this);
-
preDeliver(jmsMessage);
- notifyMessage(jmsMessage, channelId);
+ notifyMessage(jmsMessage);
}
// else
// {
@@ -702,9 +702,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
/**
* @param jmsMessage this message has already been processed so can't redo preDeliver
- * @param channelId
*/
- public void notifyMessage(AbstractJMSMessage jmsMessage, int channelId)
+ public void notifyMessage(AbstractJMSMessage jmsMessage)
{
try
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index 7e96fb537c..ae71846870 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -538,6 +538,18 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
frames[0] = publishFrame;
frames[1] = contentHeaderFrame;
CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
+
+ try
+ {
+ _session.checkFlowControl();
+ }
+ catch (InterruptedException e)
+ {
+ JMSException jmsEx = new JMSException("Interrupted while waiting for flow control to be removed");
+ jmsEx.setLinkedException(e);
+ throw jmsEx;
+ }
+
_protocolHandler.writeFrame(compositeFrame, wait);
if (message != origMessage)
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
index 49c8a83833..d05e99d210 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
@@ -26,7 +26,6 @@ import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.framing.BasicDeliverBody;
-import org.apache.qpid.protocol.AMQMethodEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,8 +45,8 @@ public class BasicDeliverMethodHandler implements StateAwareMethodListener<Basic
throws AMQException
{
final AMQProtocolSession session = stateManager.getProtocolSession();
- final UnprocessedMessage msg = new UnprocessedMessage(channelId, body);
+ final UnprocessedMessage msg = new UnprocessedMessage.UnprocessedDeliverMessage(body);
_logger.debug("New JmsDeliver method received");
- session.unprocessedMessageReceived(msg);
+ session.unprocessedMessageReceived(channelId, msg);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java
index 428d366f07..2ebc9288c3 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java
@@ -26,7 +26,6 @@ import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.framing.BasicReturnBody;
-import org.apache.qpid.protocol.AMQMethodEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,9 +46,9 @@ public void methodReceived(AMQStateManager stateManager, BasicReturnBody body, i
{
_logger.debug("New JmsBounce method received");
final AMQProtocolSession session = stateManager.getProtocolSession();
- final UnprocessedMessage msg = new UnprocessedMessage(channelId, body);
+ final UnprocessedMessage msg = new UnprocessedMessage.UnprocessedBouncedMessage(body);
- session.unprocessedMessageReceived(msg);
+ session.unprocessedMessageReceived(channelId, msg);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java
new file mode 100644
index 0000000000..b47fe751d6
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java
@@ -0,0 +1,54 @@
+package org.apache.qpid.client.handler;
+
+import org.apache.qpid.framing.ChannelFlowBody;
+import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.AMQException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+
+public class ChannelFlowMethodHandler implements StateAwareMethodListener<ChannelFlowBody>
+{
+ private static final Logger _logger = LoggerFactory.getLogger(ChannelFlowMethodHandler.class);
+ private static final ChannelFlowMethodHandler _instance = new ChannelFlowMethodHandler();
+
+ public static ChannelFlowMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private ChannelFlowMethodHandler()
+ { }
+
+ public void methodReceived(AMQStateManager stateManager, ChannelFlowBody body, int channelId)
+ throws AMQException
+ {
+
+ final AMQProtocolSession session = stateManager.getProtocolSession();
+ session.setFlowControl(channelId, body.getActive());
+ }
+
+
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
index b029770946..1947a18653 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
@@ -55,8 +55,9 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
protected boolean _changedData;
private Destination _destination;
private JMSHeaderAdapter _headerAdapter;
- private BasicMessageConsumer _consumer;
- private boolean _strictAMQP;
+
+ private static final boolean STRICT_AMQP_COMPLIANCE =
+ Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, AMQSession.STRICT_AMQP_DEFAULT));
protected AbstractJMSMessage(ByteBuffer data)
{
@@ -72,8 +73,6 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
_changedData = (data == null);
_headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders());
- _strictAMQP =
- Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, AMQSession.STRICT_AMQP_DEFAULT));
}
protected AbstractJMSMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
@@ -121,7 +120,10 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
{
if (getContentHeaderProperties().getMessageIdAsString() == null)
{
- getContentHeaderProperties().setMessageId("ID:" + UUID.randomUUID());
+ StringBuilder b = new StringBuilder(39);
+ b.append("ID");
+ b.append(UUID.randomUUID());
+ getContentHeaderProperties().setMessageId(b.toString());
}
return getContentHeaderProperties().getMessageIdAsString();
@@ -301,7 +303,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public boolean getBooleanProperty(AMQShortString propertyName) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -311,7 +313,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public boolean getBooleanProperty(String propertyName) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -321,7 +323,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public byte getByteProperty(String propertyName) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -331,7 +333,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public byte[] getBytesProperty(AMQShortString propertyName) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -341,7 +343,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public short getShortProperty(String propertyName) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -351,7 +353,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public int getIntProperty(String propertyName) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -361,7 +363,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public long getLongProperty(String propertyName) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -371,7 +373,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public float getFloatProperty(String propertyName) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -381,7 +383,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public double getDoubleProperty(String propertyName) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -404,7 +406,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
}
else
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -425,7 +427,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public void setBooleanProperty(AMQShortString propertyName, boolean b) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -436,7 +438,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public void setBooleanProperty(String propertyName, boolean b) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -447,7 +449,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public void setByteProperty(String propertyName, byte b) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -458,7 +460,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public void setBytesProperty(AMQShortString propertyName, byte[] bytes) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -469,7 +471,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public void setShortProperty(String propertyName, short i) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -487,7 +489,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public void setLongProperty(String propertyName, long l) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -498,7 +500,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public void setFloatProperty(String propertyName, float f) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -509,7 +511,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public void setDoubleProperty(String propertyName, double v) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -691,9 +693,4 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
}
}
- public void setConsumer(BasicMessageConsumer basicMessageConsumer)
- {
- _consumer = basicMessageConsumer;
- }
-
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
index a70acbabbe..d8fe964b85 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
@@ -55,7 +55,11 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm
JMSMapMessage(ByteBuffer data) throws JMSException
{
super(data); // this instantiates a content header
- populateMapFromData();
+ if(data != null)
+ {
+ populateMapFromData();
+ }
+
}
JMSMapMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, AMQShortString routingKey,
@@ -76,7 +80,7 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm
public String toBodyString() throws JMSException
{
- return _map.toString();
+ return _map == null ? "" : _map.toString();
}
public AMQShortString getMimeTypeAsShortString()
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
index 5b199f2478..bc1ba155cb 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
@@ -36,33 +36,15 @@ import org.apache.qpid.framing.ContentHeaderBody;
* Note that the actual work of creating a JMS message for the client code's use is done outside of the MINA dispatcher
* thread in order to minimise the amount of work done in the MINA dispatcher thread.
*/
-public class UnprocessedMessage
+public abstract class UnprocessedMessage
{
- private long _bytesReceived = 0;
+ private long _bytesReceived = 0L;
- private final BasicDeliverBody _deliverBody;
- private final BasicReturnBody _bounceBody; // TODO: check change (gustavo)
- private final int _channelId;
private ContentHeaderBody _contentHeader;
/** List of ContentBody instances. Due to fragmentation you don't know how big this will be in general */
private List<ContentBody> _bodies;
- public UnprocessedMessage(int channelId, BasicDeliverBody deliverBody)
- {
- _deliverBody = deliverBody;
- _channelId = channelId;
- _bounceBody = null;
- }
-
-
- public UnprocessedMessage(int channelId, BasicReturnBody bounceBody)
- {
- _deliverBody = null;
- _channelId = channelId;
- _bounceBody = bounceBody;
- }
-
public void receiveBody(ContentBody body) //throws UnexpectedBodyReceivedException
{
@@ -96,22 +78,11 @@ public class UnprocessedMessage
return _bytesReceived == getContentHeader().bodySize;
}
- public BasicDeliverBody getDeliverBody()
- {
- return _deliverBody;
- }
- public BasicReturnBody getBounceBody()
- {
- return _bounceBody;
- }
+ abstract public BasicDeliverBody getDeliverBody();
- public int getChannelId()
- {
- return _channelId;
- }
-
+ abstract public BasicReturnBody getBounceBody();
public ContentHeaderBody getContentHeader()
{
@@ -128,4 +99,60 @@ public class UnprocessedMessage
return _bodies;
}
+ abstract public boolean isDeliverMessage();
+
+ public static final class UnprocessedDeliverMessage extends UnprocessedMessage
+ {
+ private final BasicDeliverBody _body;
+
+ public UnprocessedDeliverMessage(final BasicDeliverBody body)
+ {
+ _body = body;
+ }
+
+
+ public BasicDeliverBody getDeliverBody()
+ {
+ return _body;
+ }
+
+ public BasicReturnBody getBounceBody()
+ {
+ return null;
+ }
+
+ public boolean isDeliverMessage()
+ {
+ return true;
+ }
+ }
+
+ public static final class UnprocessedBouncedMessage extends UnprocessedMessage
+ {
+ private final BasicReturnBody _body;
+
+ public UnprocessedBouncedMessage(final BasicReturnBody body)
+ {
+ _body = body;
+ }
+
+
+ public BasicDeliverBody getDeliverBody()
+ {
+ return null;
+ }
+
+ public BasicReturnBody getBounceBody()
+ {
+ return _body;
+ }
+
+ public boolean isDeliverMessage()
+ {
+ return false;
+ }
+ }
+
+
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index f70c1faa84..3dee0b0142 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -407,7 +407,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
*/
public void propagateExceptionToWaiters(Exception e)
{
- getStateManager().error(e);
+
if (!_frameListeners.isEmpty())
{
final Iterator it = _frameListeners.iterator();
@@ -439,78 +439,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter
HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
- switch (bodyFrame.getFrameType())
- {
- case AMQMethodBody.TYPE:
-
- if (debug)
- {
- _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + frame);
- }
-
- final AMQMethodEvent<AMQMethodBody> evt =
- new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), (AMQMethodBody) bodyFrame);
-
- try
- {
-
- boolean wasAnyoneInterested = getStateManager().methodReceived(evt);
- if (!_frameListeners.isEmpty())
- {
- Iterator it = _frameListeners.iterator();
- while (it.hasNext())
- {
- final AMQMethodListener listener = (AMQMethodListener) it.next();
- wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
- }
- }
-
- if (!wasAnyoneInterested)
- {
- throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:"
- + _frameListeners);
- }
- }
- catch (AMQException e)
- {
- getStateManager().error(e);
- if (!_frameListeners.isEmpty())
- {
- Iterator it = _frameListeners.iterator();
- while (it.hasNext())
- {
- final AMQMethodListener listener = (AMQMethodListener) it.next();
- listener.error(e);
- }
- }
-
- exceptionCaught(session, e);
- }
-
- break;
-
- case ContentHeaderBody.TYPE:
-
- _protocolSession.messageContentHeaderReceived(frame.getChannel(), (ContentHeaderBody) bodyFrame);
- break;
+ bodyFrame.handle(frame.getChannel(),_protocolSession);
- case ContentBody.TYPE:
-
- _protocolSession.messageContentBodyReceived(frame.getChannel(), (ContentBody) bodyFrame);
- break;
-
- case HeartbeatBody.TYPE:
-
- if (debug)
- {
- _logger.debug("Received heartbeat");
- }
-
- break;
-
- default:
-
- }
_connection.bytesReceived(_protocolSession.getIoSession().getReadBytes());
}
@@ -528,6 +458,55 @@ public class AMQProtocolHandler extends IoHandlerAdapter
}
}
+ public void methodBodyReceived(final int channelId, final AMQBody bodyFrame, IoSession session)//, final IoSession session)
+ throws AMQException
+ {
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + bodyFrame);
+ }
+
+ final AMQMethodEvent<AMQMethodBody> evt =
+ new AMQMethodEvent<AMQMethodBody>(channelId, (AMQMethodBody) bodyFrame);
+
+ try
+ {
+
+ boolean wasAnyoneInterested = getStateManager().methodReceived(evt);
+ if (!_frameListeners.isEmpty())
+ {
+ Iterator it = _frameListeners.iterator();
+ while (it.hasNext())
+ {
+ final AMQMethodListener listener = (AMQMethodListener) it.next();
+ wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
+ }
+ }
+
+ if (!wasAnyoneInterested)
+ {
+ throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:"
+ + _frameListeners);
+ }
+ }
+ catch (AMQException e)
+ {
+ if (!_frameListeners.isEmpty())
+ {
+ Iterator it = _frameListeners.iterator();
+ while (it.hasNext())
+ {
+ final AMQMethodListener listener = (AMQMethodListener) it.next();
+ listener.error(e);
+ }
+ }
+
+ exceptionCaught(session, e);
+ }
+
+ }
+
private static int _messagesOut;
public void messageSent(IoSession session, Object message) throws Exception
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index b48adbdb08..6a5cc62bfc 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -74,8 +74,6 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
*/
protected final AMQProtocolHandler _protocolHandler;
- /** Maps from the channel id to the AMQSession that it represents. */
- protected ConcurrentMap<Integer, AMQSession> _channelId2SessionMap = new ConcurrentHashMap<Integer, AMQSession>();
protected ConcurrentMap _closingChannels = new ConcurrentHashMap();
@@ -83,7 +81,8 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
* Maps from a channel id to an unprocessed message. This is used to tie together the JmsDeliverBody (which arrives
* first) with the subsequent content header and content bodies.
*/
- protected ConcurrentMap _channelId2UnprocessedMsgMap = new ConcurrentHashMap();
+ private final ConcurrentMap<Integer,UnprocessedMessage> _channelId2UnprocessedMsgMap = new ConcurrentHashMap<Integer,UnprocessedMessage>();
+ private final UnprocessedMessage[] _channelId2UnprocessedMsgArray = new UnprocessedMessage[16];
/** Counter to ensure unique queue names */
protected int _queueId = 1;
@@ -101,7 +100,8 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
private MethodDispatcher _methodDispatcher;
- private final AMQConnection _connection;
+ private final AMQConnection _connection;
+ private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection)
{
@@ -230,14 +230,24 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
*
* @throws AMQException if this was not expected
*/
- public void unprocessedMessageReceived(UnprocessedMessage message) throws AMQException
+ public void unprocessedMessageReceived(final int channelId, UnprocessedMessage message) throws AMQException
{
- _channelId2UnprocessedMsgMap.put(message.getChannelId(), message);
+ if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
+ {
+ _channelId2UnprocessedMsgArray[channelId] = message;
+ }
+ else
+ {
+ _channelId2UnprocessedMsgMap.put(channelId, message);
+ }
}
- public void messageContentHeaderReceived(int channelId, ContentHeaderBody contentHeader) throws AMQException
+ public void contentHeaderReceived(int channelId, ContentHeaderBody contentHeader) throws AMQException
{
- UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap.get(channelId);
+ final UnprocessedMessage msg = (channelId & FAST_CHANNEL_ACCESS_MASK) == 0 ? _channelId2UnprocessedMsgArray[channelId]
+ : _channelId2UnprocessedMsgMap.get(channelId);
+
+
if (msg == null)
{
throw new AMQException("Error: received content header without having received a BasicDeliver frame first");
@@ -256,9 +266,19 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
}
}
- public void messageContentBodyReceived(int channelId, ContentBody contentBody) throws AMQException
+ public void contentBodyReceived(final int channelId, ContentBody contentBody) throws AMQException
{
- UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap.get(channelId);
+ UnprocessedMessage msg;
+ final boolean fastAccess = (channelId & FAST_CHANNEL_ACCESS_MASK) == 0;
+ if(fastAccess)
+ {
+ msg = _channelId2UnprocessedMsgArray[channelId];
+ }
+ else
+ {
+ msg = _channelId2UnprocessedMsgMap.get(channelId);
+ }
+
if (msg == null)
{
throw new AMQException("Error: received content body without having received a JMSDeliver frame first");
@@ -266,7 +286,14 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
if (msg.getContentHeader() == null)
{
- _channelId2UnprocessedMsgMap.remove(channelId);
+ if(fastAccess)
+ {
+ _channelId2UnprocessedMsgArray[channelId] = null;
+ }
+ else
+ {
+ _channelId2UnprocessedMsgMap.remove(channelId);
+ }
throw new AMQException("Error: received content body without having received a ContentHeader frame first");
}
@@ -286,6 +313,11 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
}
}
+ public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException
+ {
+
+ }
+
/**
* Deliver a message to the appropriate session, removing the unprocessed message from our map
*
@@ -296,7 +328,14 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
{
AMQSession session = getSession(channelId);
session.messageReceived(msg);
- _channelId2UnprocessedMsgMap.remove(channelId);
+ if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
+ {
+ _channelId2UnprocessedMsgArray[channelId] = null;
+ }
+ else
+ {
+ _channelId2UnprocessedMsgMap.remove(channelId);
+ }
}
protected AMQSession getSession(int channelId)
@@ -486,4 +525,15 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
{
_methodDispatcher = methodDispatcher;
}
+
+ public void setFlowControl(final int channelId, final boolean active)
+ {
+ final AMQSession session = getSession(channelId);
+ session.setFlowControl(active);
+ }
+
+ public void methodFrameReceived(final int channel, final AMQMethodBody amqMethodBody) throws AMQException
+ {
+ _protocolHandler.methodBodyReceived(channel, amqMethodBody, _minaProtocolSession);
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
index e62fce0f60..2e6a4beb83 100644
--- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
+++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
@@ -37,7 +37,7 @@ import java.util.concurrent.CopyOnWriteArraySet;
* The state manager is responsible for managing the state of the protocol session. <p/> For each AMQProtocolHandler
* there is a separate state manager.
*/
-public class AMQStateManager implements AMQMethodListener
+public class AMQStateManager
{
private static final Logger _logger = LoggerFactory.getLogger(AMQStateManager.class);
@@ -52,7 +52,7 @@ public class AMQStateManager implements AMQMethodListener
* AMQFrame.
*/
- private final CopyOnWriteArraySet _stateListeners = new CopyOnWriteArraySet();
+
private final Object _stateLock = new Object();
private static final long MAXIMUM_STATE_WAIT_TIME = Long.parseLong(System.getProperty("amqj.MaximumStateWait", "30000"));
@@ -91,19 +91,6 @@ public class AMQStateManager implements AMQMethodListener
}
}
- public void error(Exception e)
- {
- _logger.debug("State manager receive error notification: " + e);
- synchronized (_stateListeners)
- {
- final Iterator it = _stateListeners.iterator();
- while (it.hasNext())
- {
- final StateListener l = (StateListener) it.next();
- l.error(e);
- }
- }
- }
public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws AMQException
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
index 0ea04e5bc3..adbec6e35f 100644
--- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
@@ -85,7 +85,7 @@ public class TransportConnection
throw new AMQNoTransportForProtocolException(details);
}
- if (transport == _currentInstance)
+ /* if (transport == _currentInstance)
{
if (transport == VM)
{
@@ -100,21 +100,23 @@ public class TransportConnection
}
}
- _currentInstance = transport;
+ _currentInstance = transport;*/
+ ITransportConnection instance;
switch (transport)
{
case SOCKET:
- _instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
- {
- public IoConnector newSocketConnector()
- {
- return new ExistingSocketConnector();
- }
- });
+ instance =
+ new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
+ {
+ public IoConnector newSocketConnector()
+ {
+ return new ExistingSocketConnector();
+ }
+ });
break;
case TCP:
- _instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
+ instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
{
public IoConnector newSocketConnector()
{
@@ -142,12 +144,14 @@ public class TransportConnection
break;
case VM:
{
- _instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker"));
+ instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker"));
break;
}
+ default:
+ throw new AMQNoTransportForProtocolException(details);
}
- return _instance;
+ return instance;
}
private static int getTransport(String transport)
diff --git a/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java b/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java
new file mode 100644
index 0000000000..bed80d5954
--- /dev/null
+++ b/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java
@@ -0,0 +1,528 @@
+package org.apache.mina.common;
+
+import org.apache.mina.common.ByteBuffer;
+
+import java.nio.*;
+
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+public class FixedSizeByteBufferAllocator implements ByteBufferAllocator
+{
+
+
+ private static final int MINIMUM_CAPACITY = 1;
+
+ public FixedSizeByteBufferAllocator ()
+ {
+ }
+
+ public ByteBuffer allocate( int capacity, boolean direct )
+ {
+ java.nio.ByteBuffer nioBuffer;
+ if( direct )
+ {
+ nioBuffer = java.nio.ByteBuffer.allocateDirect( capacity );
+ }
+ else
+ {
+ nioBuffer = java.nio.ByteBuffer.allocate( capacity );
+ }
+ return new FixedSizeByteBuffer( nioBuffer );
+ }
+
+ public ByteBuffer wrap( java.nio.ByteBuffer nioBuffer )
+ {
+ return new FixedSizeByteBuffer( nioBuffer );
+ }
+
+ public void dispose()
+ {
+ }
+
+
+
+ private static final class FixedSizeByteBuffer extends ByteBuffer
+ {
+ private java.nio.ByteBuffer buf;
+ private int refCount = 1;
+ private int mark = -1;
+
+
+ protected FixedSizeByteBuffer( java.nio.ByteBuffer buf )
+ {
+ this.buf = buf;
+ buf.order( ByteOrder.BIG_ENDIAN );
+ refCount = 1;
+ }
+
+ public synchronized void acquire()
+ {
+ if( refCount <= 0 )
+ {
+ throw new IllegalStateException( "Already released buffer." );
+ }
+
+ refCount ++;
+ }
+
+ public void release()
+ {
+ synchronized( this )
+ {
+ if( refCount <= 0 )
+ {
+ refCount = 0;
+ throw new IllegalStateException(
+ "Already released buffer. You released the buffer too many times." );
+ }
+
+ refCount --;
+ if( refCount > 0)
+ {
+ return;
+ }
+ }
+ }
+
+ public java.nio.ByteBuffer buf()
+ {
+ return buf;
+ }
+
+ public boolean isPooled()
+ {
+ return false;
+ }
+
+ public void setPooled( boolean pooled )
+ {
+ }
+
+ public ByteBuffer duplicate() {
+ return new FixedSizeByteBuffer( this.buf.duplicate() );
+ }
+
+ public ByteBuffer slice() {
+ return new FixedSizeByteBuffer( this.buf.slice() );
+ }
+
+ public ByteBuffer asReadOnlyBuffer() {
+ return new FixedSizeByteBuffer( this.buf.asReadOnlyBuffer() );
+ }
+
+ public byte[] array()
+ {
+ return buf.array();
+ }
+
+ public int arrayOffset()
+ {
+ return buf.arrayOffset();
+ }
+
+ public boolean isDirect()
+ {
+ return buf.isDirect();
+ }
+
+ public boolean isReadOnly()
+ {
+ return buf.isReadOnly();
+ }
+
+ public int capacity()
+ {
+ return buf.capacity();
+ }
+
+ public ByteBuffer capacity( int newCapacity )
+ {
+ if( newCapacity > capacity() )
+ {
+ // Allocate a new buffer and transfer all settings to it.
+ int pos = position();
+ int limit = limit();
+ ByteOrder bo = order();
+
+ capacity0( newCapacity );
+ buf.limit( limit );
+ if( mark >= 0 )
+ {
+ buf.position( mark );
+ buf.mark();
+ }
+ buf.position( pos );
+ buf.order( bo );
+ }
+
+ return this;
+ }
+
+ protected void capacity0( int requestedCapacity )
+ {
+ int newCapacity = MINIMUM_CAPACITY;
+ while( newCapacity < requestedCapacity )
+ {
+ newCapacity <<= 1;
+ }
+
+ java.nio.ByteBuffer oldBuf = this.buf;
+ java.nio.ByteBuffer newBuf;
+ if( isDirect() )
+ {
+ newBuf = java.nio.ByteBuffer.allocateDirect( newCapacity );
+ }
+ else
+ {
+ newBuf = java.nio.ByteBuffer.allocate( newCapacity );
+ }
+
+ newBuf.clear();
+ oldBuf.clear();
+ newBuf.put( oldBuf );
+ this.buf = newBuf;
+ }
+
+
+
+ public boolean isAutoExpand()
+ {
+ return false;
+ }
+
+ public ByteBuffer setAutoExpand( boolean autoExpand )
+ {
+ if(autoExpand) throw new IllegalArgumentException();
+ else return this;
+ }
+
+ public ByteBuffer expand( int pos, int expectedRemaining )
+ {
+ int end = pos + expectedRemaining;
+ if( end > capacity() )
+ {
+ // The buffer needs expansion.
+ capacity( end );
+ }
+
+ if( end > limit() )
+ {
+ // We call limit() directly to prevent StackOverflowError
+ buf.limit( end );
+ }
+ return this;
+ }
+
+ public int position()
+ {
+ return buf.position();
+ }
+
+ public ByteBuffer position( int newPosition )
+ {
+
+ buf.position( newPosition );
+ if( mark > newPosition )
+ {
+ mark = -1;
+ }
+ return this;
+ }
+
+ public int limit()
+ {
+ return buf.limit();
+ }
+
+ public ByteBuffer limit( int newLimit )
+ {
+ buf.limit( newLimit );
+ if( mark > newLimit )
+ {
+ mark = -1;
+ }
+ return this;
+ }
+
+ public ByteBuffer mark()
+ {
+ buf.mark();
+ mark = position();
+ return this;
+ }
+
+ public int markValue()
+ {
+ return mark;
+ }
+
+ public ByteBuffer reset()
+ {
+ buf.reset();
+ return this;
+ }
+
+ public ByteBuffer clear()
+ {
+ buf.clear();
+ mark = -1;
+ return this;
+ }
+
+ public ByteBuffer flip()
+ {
+ buf.flip();
+ mark = -1;
+ return this;
+ }
+
+ public ByteBuffer rewind()
+ {
+ buf.rewind();
+ mark = -1;
+ return this;
+ }
+
+ public byte get()
+ {
+ return buf.get();
+ }
+
+ public ByteBuffer put( byte b )
+ {
+ buf.put( b );
+ return this;
+ }
+
+ public byte get( int index )
+ {
+ return buf.get( index );
+ }
+
+ public ByteBuffer put( int index, byte b )
+ {
+ buf.put( index, b );
+ return this;
+ }
+
+ public ByteBuffer get( byte[] dst, int offset, int length )
+ {
+ buf.get( dst, offset, length );
+ return this;
+ }
+
+ public ByteBuffer put( java.nio.ByteBuffer src )
+ {
+ buf.put( src );
+ return this;
+ }
+
+ public ByteBuffer put( byte[] src, int offset, int length )
+ {
+ buf.put( src, offset, length );
+ return this;
+ }
+
+ public ByteBuffer compact()
+ {
+ buf.compact();
+ mark = -1;
+ return this;
+ }
+
+ public ByteOrder order()
+ {
+ return buf.order();
+ }
+
+ public ByteBuffer order( ByteOrder bo )
+ {
+ buf.order( bo );
+ return this;
+ }
+
+ public char getChar()
+ {
+ return buf.getChar();
+ }
+
+ public ByteBuffer putChar( char value )
+ {
+ buf.putChar( value );
+ return this;
+ }
+
+ public char getChar( int index )
+ {
+ return buf.getChar( index );
+ }
+
+ public ByteBuffer putChar( int index, char value )
+ {
+ buf.putChar( index, value );
+ return this;
+ }
+
+ public CharBuffer asCharBuffer()
+ {
+ return buf.asCharBuffer();
+ }
+
+ public short getShort()
+ {
+ return buf.getShort();
+ }
+
+ public ByteBuffer putShort( short value )
+ {
+ buf.putShort( value );
+ return this;
+ }
+
+ public short getShort( int index )
+ {
+ return buf.getShort( index );
+ }
+
+ public ByteBuffer putShort( int index, short value )
+ {
+ buf.putShort( index, value );
+ return this;
+ }
+
+ public ShortBuffer asShortBuffer()
+ {
+ return buf.asShortBuffer();
+ }
+
+ public int getInt()
+ {
+ return buf.getInt();
+ }
+
+ public ByteBuffer putInt( int value )
+ {
+ buf.putInt( value );
+ return this;
+ }
+
+ public int getInt( int index )
+ {
+ return buf.getInt( index );
+ }
+
+ public ByteBuffer putInt( int index, int value )
+ {
+ buf.putInt( index, value );
+ return this;
+ }
+
+ public IntBuffer asIntBuffer()
+ {
+ return buf.asIntBuffer();
+ }
+
+ public long getLong()
+ {
+ return buf.getLong();
+ }
+
+ public ByteBuffer putLong( long value )
+ {
+ buf.putLong( value );
+ return this;
+ }
+
+ public long getLong( int index )
+ {
+ return buf.getLong( index );
+ }
+
+ public ByteBuffer putLong( int index, long value )
+ {
+ buf.putLong( index, value );
+ return this;
+ }
+
+ public LongBuffer asLongBuffer()
+ {
+ return buf.asLongBuffer();
+ }
+
+ public float getFloat()
+ {
+ return buf.getFloat();
+ }
+
+ public ByteBuffer putFloat( float value )
+ {
+ buf.putFloat( value );
+ return this;
+ }
+
+ public float getFloat( int index )
+ {
+ return buf.getFloat( index );
+ }
+
+ public ByteBuffer putFloat( int index, float value )
+ {
+ buf.putFloat( index, value );
+ return this;
+ }
+
+ public FloatBuffer asFloatBuffer()
+ {
+ return buf.asFloatBuffer();
+ }
+
+ public double getDouble()
+ {
+ return buf.getDouble();
+ }
+
+ public ByteBuffer putDouble( double value )
+ {
+ buf.putDouble( value );
+ return this;
+ }
+
+ public double getDouble( int index )
+ {
+ return buf.getDouble( index );
+ }
+
+ public ByteBuffer putDouble( int index, double value )
+ {
+ buf.putDouble( index, value );
+ return this;
+ }
+
+ public DoubleBuffer asDoubleBuffer()
+ {
+ return buf.asDoubleBuffer();
+ }
+
+
+ }
+
+
+}
diff --git a/java/common/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java b/java/common/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java
new file mode 100644
index 0000000000..09cf785108
--- /dev/null
+++ b/java/common/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java
@@ -0,0 +1,250 @@
+package org.apache.mina.common.support;
+
+import org.apache.mina.common.IoFuture;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IoFutureListener;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * A default implementation of {@link org.apache.mina.common.IoFuture}.
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev: 440259 $, $Date: 2006-09-05 14:01:47 +0900 (í™”, 05 9ì›” 2006) $
+ */
+public class DefaultIoFuture implements IoFuture
+{
+ private final IoSession session;
+ private final Object lock;
+ private List listeners;
+ private Object result;
+ private boolean ready;
+
+
+ /**
+ * Creates a new instance.
+ *
+ * @param session an {@link IoSession} which is associated with this future
+ */
+ public DefaultIoFuture( IoSession session )
+ {
+ this.session = session;
+ this.lock = this;
+ }
+
+ /**
+ * Creates a new instance which uses the specified object as a lock.
+ */
+ public DefaultIoFuture( IoSession session, Object lock )
+ {
+ if( lock == null )
+ {
+ throw new NullPointerException( "lock" );
+ }
+ this.session = session;
+ this.lock = lock;
+ }
+
+ public IoSession getSession()
+ {
+ return session;
+ }
+
+ public Object getLock()
+ {
+ return lock;
+ }
+
+ public void join()
+ {
+ synchronized( lock )
+ {
+ while( !ready )
+ {
+ try
+ {
+ lock.wait();
+ }
+ catch( InterruptedException e )
+ {
+ }
+ }
+ }
+ }
+
+ public boolean join( long timeoutInMillis )
+ {
+ long startTime = ( timeoutInMillis <= 0 ) ? 0 : System
+ .currentTimeMillis();
+ long waitTime = timeoutInMillis;
+
+ synchronized( lock )
+ {
+ if( ready )
+ {
+ return ready;
+ }
+ else if( waitTime <= 0 )
+ {
+ return ready;
+ }
+
+ for( ;; )
+ {
+ try
+ {
+ lock.wait( waitTime );
+ }
+ catch( InterruptedException e )
+ {
+ }
+
+ if( ready )
+ return true;
+ else
+ {
+ waitTime = timeoutInMillis - ( System.currentTimeMillis() - startTime );
+ if( waitTime <= 0 )
+ {
+ return ready;
+ }
+ }
+ }
+ }
+ }
+
+ public boolean isReady()
+ {
+ synchronized( lock )
+ {
+ return ready;
+ }
+ }
+
+ /**
+ * Sets the result of the asynchronous operation, and mark it as finished.
+ */
+ protected void setValue( Object newValue )
+ {
+ synchronized( lock )
+ {
+ // Allow only once.
+ if( ready )
+ {
+ return;
+ }
+
+ result = newValue;
+ ready = true;
+ lock.notifyAll();
+
+ notifyListeners();
+ }
+ }
+
+ /**
+ * Returns the result of the asynchronous operation.
+ */
+ protected Object getValue()
+ {
+ synchronized( lock )
+ {
+ return result;
+ }
+ }
+
+ public void addListener( IoFutureListener listener )
+ {
+ if( listener == null )
+ {
+ throw new NullPointerException( "listener" );
+ }
+
+ synchronized( lock )
+ {
+ if(listeners == null)
+ {
+ listeners = new ArrayList();
+ }
+ listeners.add( listener );
+ if( ready )
+ {
+ listener.operationComplete( this );
+ }
+ }
+ }
+
+ public void removeListener( IoFutureListener listener )
+ {
+ if( listener == null )
+ {
+ throw new NullPointerException( "listener" );
+ }
+
+ synchronized( lock )
+ {
+ listeners.remove( listener );
+ }
+ }
+
+ private void notifyListeners()
+ {
+ synchronized( lock )
+ {
+
+ if(listeners != null)
+ {
+
+ for( Iterator i = listeners.iterator(); i.hasNext(); ) {
+ ( ( IoFutureListener ) i.next() ).operationComplete( this );
+ }
+ }
+ }
+ }
+}
+
+
+
diff --git a/java/common/src/main/java/org/apache/mina/filter/codec/QpidProtocolCodecFilter.java b/java/common/src/main/java/org/apache/mina/filter/codec/QpidProtocolCodecFilter.java
new file mode 100644
index 0000000000..b8c6f29720
--- /dev/null
+++ b/java/common/src/main/java/org/apache/mina/filter/codec/QpidProtocolCodecFilter.java
@@ -0,0 +1,440 @@
+package org.apache.mina.filter.codec;
+
+
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+
+import org.apache.mina.common.*;
+import org.apache.mina.common.support.DefaultWriteFuture;
+import org.apache.mina.filter.codec.support.SimpleProtocolDecoderOutput;
+import org.apache.mina.util.SessionLog;
+import org.apache.mina.util.Queue;
+
+
+public class QpidProtocolCodecFilter extends IoFilterAdapter
+{
+ public static final String ENCODER = QpidProtocolCodecFilter.class.getName() + ".encoder";
+ public static final String DECODER = QpidProtocolCodecFilter.class.getName() + ".decoder";
+
+ private static final Class[] EMPTY_PARAMS = new Class[0];
+ private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.wrap( new byte[0] );
+
+ private final ProtocolCodecFactory factory;
+
+ public QpidProtocolCodecFilter( ProtocolCodecFactory factory )
+ {
+ if( factory == null )
+ {
+ throw new NullPointerException( "factory" );
+ }
+ this.factory = factory;
+ }
+
+ public QpidProtocolCodecFilter( final ProtocolEncoder encoder, final ProtocolDecoder decoder )
+ {
+ if( encoder == null )
+ {
+ throw new NullPointerException( "encoder" );
+ }
+ if( decoder == null )
+ {
+ throw new NullPointerException( "decoder" );
+ }
+
+ this.factory = new ProtocolCodecFactory()
+ {
+ public ProtocolEncoder getEncoder()
+ {
+ return encoder;
+ }
+
+ public ProtocolDecoder getDecoder()
+ {
+ return decoder;
+ }
+ };
+ }
+
+ public QpidProtocolCodecFilter( final Class encoderClass, final Class decoderClass )
+ {
+ if( encoderClass == null )
+ {
+ throw new NullPointerException( "encoderClass" );
+ }
+ if( decoderClass == null )
+ {
+ throw new NullPointerException( "decoderClass" );
+ }
+ if( !ProtocolEncoder.class.isAssignableFrom( encoderClass ) )
+ {
+ throw new IllegalArgumentException( "encoderClass: " + encoderClass.getName() );
+ }
+ if( !ProtocolDecoder.class.isAssignableFrom( decoderClass ) )
+ {
+ throw new IllegalArgumentException( "decoderClass: " + decoderClass.getName() );
+ }
+ try
+ {
+ encoderClass.getConstructor( EMPTY_PARAMS );
+ }
+ catch( NoSuchMethodException e )
+ {
+ throw new IllegalArgumentException( "encoderClass doesn't have a public default constructor." );
+ }
+ try
+ {
+ decoderClass.getConstructor( EMPTY_PARAMS );
+ }
+ catch( NoSuchMethodException e )
+ {
+ throw new IllegalArgumentException( "decoderClass doesn't have a public default constructor." );
+ }
+
+ this.factory = new ProtocolCodecFactory()
+ {
+ public ProtocolEncoder getEncoder() throws Exception
+ {
+ return ( ProtocolEncoder ) encoderClass.newInstance();
+ }
+
+ public ProtocolDecoder getDecoder() throws Exception
+ {
+ return ( ProtocolDecoder ) decoderClass.newInstance();
+ }
+ };
+ }
+
+ public void onPreAdd( IoFilterChain parent, String name, IoFilter.NextFilter nextFilter ) throws Exception
+ {
+ if( parent.contains( ProtocolCodecFilter.class ) )
+ {
+ throw new IllegalStateException( "A filter chain cannot contain more than one QpidProtocolCodecFilter." );
+ }
+ }
+
+ public void messageReceived( IoFilter.NextFilter nextFilter, IoSession session, Object message ) throws Exception
+ {
+ if( !( message instanceof ByteBuffer ) )
+ {
+ nextFilter.messageReceived( session, message );
+ return;
+ }
+
+ ByteBuffer in = ( ByteBuffer ) message;
+ ProtocolDecoder decoder = getDecoder( session );
+ ProtocolDecoderOutput decoderOut = getDecoderOut( session, nextFilter );
+
+ try
+ {
+ decoder.decode( session, in, decoderOut );
+ }
+ catch( Throwable t )
+ {
+ ProtocolDecoderException pde;
+ if( t instanceof ProtocolDecoderException )
+ {
+ pde = ( ProtocolDecoderException ) t;
+ }
+ else
+ {
+ pde = new ProtocolDecoderException( t );
+ }
+ pde.setHexdump( in.getHexDump() );
+ throw pde;
+ }
+ finally
+ {
+ // Dispose the decoder if this session is connectionless.
+ if( session.getTransportType().isConnectionless() )
+ {
+ disposeDecoder( session );
+ }
+
+ // Release the read buffer.
+ in.release();
+
+ decoderOut.flush();
+ }
+ }
+
+ public void messageSent( IoFilter.NextFilter nextFilter, IoSession session, Object message ) throws Exception
+ {
+ if( message instanceof HiddenByteBuffer )
+ {
+ return;
+ }
+
+ if( !( message instanceof MessageByteBuffer ) )
+ {
+ nextFilter.messageSent( session, message );
+ return;
+ }
+
+ nextFilter.messageSent( session, ( ( MessageByteBuffer ) message ).message );
+ }
+
+ public void filterWrite( IoFilter.NextFilter nextFilter, IoSession session, IoFilter.WriteRequest writeRequest ) throws Exception
+ {
+ Object message = writeRequest.getMessage();
+ if( message instanceof ByteBuffer )
+ {
+ nextFilter.filterWrite( session, writeRequest );
+ return;
+ }
+
+ ProtocolEncoder encoder = getEncoder( session );
+ ProtocolEncoderOutputImpl encoderOut = getEncoderOut( session, nextFilter, writeRequest );
+
+ try
+ {
+ encoder.encode( session, message, encoderOut );
+ encoderOut.flush();
+ nextFilter.filterWrite(
+ session,
+ new IoFilter.WriteRequest(
+ new MessageByteBuffer( writeRequest.getMessage() ),
+ writeRequest.getFuture(), writeRequest.getDestination() ) );
+ }
+ catch( Throwable t )
+ {
+ ProtocolEncoderException pee;
+ if( t instanceof ProtocolEncoderException )
+ {
+ pee = ( ProtocolEncoderException ) t;
+ }
+ else
+ {
+ pee = new ProtocolEncoderException( t );
+ }
+ throw pee;
+ }
+ finally
+ {
+ // Dispose the encoder if this session is connectionless.
+ if( session.getTransportType().isConnectionless() )
+ {
+ disposeEncoder( session );
+ }
+ }
+ }
+
+ public void sessionClosed( IoFilter.NextFilter nextFilter, IoSession session ) throws Exception
+ {
+ // Call finishDecode() first when a connection is closed.
+ ProtocolDecoder decoder = getDecoder( session );
+ ProtocolDecoderOutput decoderOut = getDecoderOut( session, nextFilter );
+ try
+ {
+ decoder.finishDecode( session, decoderOut );
+ }
+ catch( Throwable t )
+ {
+ ProtocolDecoderException pde;
+ if( t instanceof ProtocolDecoderException )
+ {
+ pde = ( ProtocolDecoderException ) t;
+ }
+ else
+ {
+ pde = new ProtocolDecoderException( t );
+ }
+ throw pde;
+ }
+ finally
+ {
+ // Dispose all.
+ disposeEncoder( session );
+ disposeDecoder( session );
+
+ decoderOut.flush();
+ }
+
+ nextFilter.sessionClosed( session );
+ }
+
+ private ProtocolEncoder getEncoder( IoSession session ) throws Exception
+ {
+ ProtocolEncoder encoder = ( ProtocolEncoder ) session.getAttribute( ENCODER );
+ if( encoder == null )
+ {
+ encoder = factory.getEncoder();
+ session.setAttribute( ENCODER, encoder );
+ }
+ return encoder;
+ }
+
+ private ProtocolEncoderOutputImpl getEncoderOut( IoSession session, IoFilter.NextFilter nextFilter, IoFilter.WriteRequest writeRequest )
+ {
+ return new ProtocolEncoderOutputImpl( session, nextFilter, writeRequest );
+ }
+
+ private ProtocolDecoder getDecoder( IoSession session ) throws Exception
+ {
+ ProtocolDecoder decoder = ( ProtocolDecoder ) session.getAttribute( DECODER );
+ if( decoder == null )
+ {
+ decoder = factory.getDecoder();
+ session.setAttribute( DECODER, decoder );
+ }
+ return decoder;
+ }
+
+ private ProtocolDecoderOutput getDecoderOut( IoSession session, IoFilter.NextFilter nextFilter )
+ {
+ return new SimpleProtocolDecoderOutput( session, nextFilter );
+ }
+
+ private void disposeEncoder( IoSession session )
+ {
+ ProtocolEncoder encoder = ( ProtocolEncoder ) session.removeAttribute( ENCODER );
+ if( encoder == null )
+ {
+ return;
+ }
+
+ try
+ {
+ encoder.dispose( session );
+ }
+ catch( Throwable t )
+ {
+ SessionLog.warn(
+ session,
+ "Failed to dispose: " + encoder.getClass().getName() +
+ " (" + encoder + ')' );
+ }
+ }
+
+ private void disposeDecoder( IoSession session )
+ {
+ ProtocolDecoder decoder = ( ProtocolDecoder ) session.removeAttribute( DECODER );
+ if( decoder == null )
+ {
+ return;
+ }
+
+ try
+ {
+ decoder.dispose( session );
+ }
+ catch( Throwable t )
+ {
+ SessionLog.warn(
+ session,
+ "Falied to dispose: " + decoder.getClass().getName() +
+ " (" + decoder + ')' );
+ }
+ }
+
+ private static class HiddenByteBuffer extends ByteBufferProxy
+ {
+ private HiddenByteBuffer( ByteBuffer buf )
+ {
+ super( buf );
+ }
+ }
+
+ private static class MessageByteBuffer extends ByteBufferProxy
+ {
+ private final Object message;
+
+ private MessageByteBuffer( Object message )
+ {
+ super( EMPTY_BUFFER );
+ this.message = message;
+ }
+
+ public void acquire()
+ {
+ // no-op since we are wraping a zero-byte buffer, this instance is to just curry the message
+ }
+
+ public void release()
+ {
+ // no-op since we are wraping a zero-byte buffer, this instance is to just curry the message
+ }
+ }
+
+ private static class ProtocolEncoderOutputImpl implements ProtocolEncoderOutput
+ {
+ private ByteBuffer buffer;
+
+ private final IoSession session;
+ private final IoFilter.NextFilter nextFilter;
+ private final IoFilter.WriteRequest writeRequest;
+
+ public ProtocolEncoderOutputImpl( IoSession session, IoFilter.NextFilter nextFilter, IoFilter.WriteRequest writeRequest )
+ {
+ this.session = session;
+ this.nextFilter = nextFilter;
+ this.writeRequest = writeRequest;
+ }
+
+
+
+ public void write( ByteBuffer buf )
+ {
+ if(buffer != null)
+ {
+ flush();
+ }
+ buffer = buf;
+ }
+
+ public void mergeAll()
+ {
+ }
+
+ public WriteFuture flush()
+ {
+ WriteFuture future = null;
+ if( buffer == null )
+ {
+ return null;
+ }
+ else
+ {
+ ByteBuffer buf = buffer;
+ // Flush only when the buffer has remaining.
+ if( buf.hasRemaining() )
+ {
+ future = doFlush( buf );
+ }
+
+ }
+
+ return future;
+ }
+
+
+ protected WriteFuture doFlush( ByteBuffer buf )
+ {
+ WriteFuture future = new DefaultWriteFuture( session );
+ nextFilter.filterWrite(
+ session,
+ new IoFilter.WriteRequest(
+ buf,
+ future, writeRequest.getDestination() ) );
+ return future;
+ }
+ }
+}
+
diff --git a/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java b/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
index ff0bc798da..7eef73f337 100644
--- a/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
+++ b/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
@@ -22,6 +22,7 @@ package org.apache.qpid.codec;
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IoSession;
+import org.apache.mina.common.SimpleByteBufferAllocator;
import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
@@ -48,6 +49,9 @@ import org.apache.qpid.framing.ProtocolInitiation;
*/
public class AMQDecoder extends CumulativeProtocolDecoder
{
+
+ private static final String BUFFER = AMQDecoder.class.getName() + ".Buffer";
+
/** Holds the 'normal' AMQP data decoder. */
private AMQDataBlockDecoder _dataBlockDecoder = new AMQDataBlockDecoder();
@@ -171,4 +175,97 @@ public class AMQDecoder extends CumulativeProtocolDecoder
{
_expectProtocolInitiation = expectProtocolInitiation;
}
+
+
+ /**
+ * Cumulates content of <tt>in</tt> into internal buffer and forwards
+ * decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}.
+ * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt>
+ * and the cumulative buffer is compacted after decoding ends.
+ *
+ * @throws IllegalStateException if your <tt>doDecode()</tt> returned
+ * <tt>true</tt> not consuming the cumulative buffer.
+ */
+ public void decode( IoSession session, ByteBuffer in,
+ ProtocolDecoderOutput out ) throws Exception
+ {
+ ByteBuffer buf = ( ByteBuffer ) session.getAttribute( BUFFER );
+ // if we have a session buffer, append data to that otherwise
+ // use the buffer read from the network directly
+ if( buf != null )
+ {
+ buf.put( in );
+ buf.flip();
+ }
+ else
+ {
+ buf = in;
+ }
+
+ for( ;; )
+ {
+ int oldPos = buf.position();
+ boolean decoded = doDecode( session, buf, out );
+ if( decoded )
+ {
+ if( buf.position() == oldPos )
+ {
+ throw new IllegalStateException(
+ "doDecode() can't return true when buffer is not consumed." );
+ }
+
+ if( !buf.hasRemaining() )
+ {
+ break;
+ }
+ }
+ else
+ {
+ break;
+ }
+ }
+
+ // if there is any data left that cannot be decoded, we store
+ // it in a buffer in the session and next time this decoder is
+ // invoked the session buffer gets appended to
+ if ( buf.hasRemaining() )
+ {
+ storeRemainingInSession( buf, session );
+ }
+ else
+ {
+ removeSessionBuffer( session );
+ }
+ }
+
+ /**
+ * Releases the cumulative buffer used by the specified <tt>session</tt>.
+ * Please don't forget to call <tt>super.dispose( session )</tt> when
+ * you override this method.
+ */
+ public void dispose( IoSession session ) throws Exception
+ {
+ removeSessionBuffer( session );
+ }
+
+ private void removeSessionBuffer(IoSession session)
+ {
+ ByteBuffer buf = ( ByteBuffer ) session.getAttribute( BUFFER );
+ if( buf != null )
+ {
+ buf.release();
+ session.removeAttribute( BUFFER );
+ }
+ }
+
+ private static final SimpleByteBufferAllocator SIMPLE_BYTE_BUFFER_ALLOCATOR = new SimpleByteBufferAllocator();
+
+ private void storeRemainingInSession(ByteBuffer buf, IoSession session)
+ {
+ ByteBuffer remainingBuf = SIMPLE_BYTE_BUFFER_ALLOCATOR.allocate( buf.remaining(), false );
+ remainingBuf.setAutoExpand( true );
+ remainingBuf.put( buf );
+ session.setAttribute( BUFFER, remainingBuf );
+ }
+
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
index 3abd97ddb7..fcd336b180 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
@@ -21,6 +21,8 @@
package org.apache.qpid.framing;
import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.AMQException;
public interface AMQBody
{
@@ -36,4 +38,6 @@ public interface AMQBody
//public void populateFromBuffer(ByteBuffer buffer, long size)
// throws AMQFrameDecodingException, AMQProtocolVersionException;
+
+ void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession) throws AMQException;
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java b/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
index 11f505fd4b..02a46f3748 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
@@ -27,7 +27,7 @@ public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock
private final int _channel;
private final AMQBody _bodyFrame;
-
+ public static final byte FRAME_END_BYTE = (byte) 0xCE;
public AMQFrame(final int channel, final AMQBody bodyFrame)
@@ -47,13 +47,19 @@ public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock
return 1 + 2 + 4 + _bodyFrame.getSize() + 1;
}
+ public static final int getFrameOverhead()
+ {
+ return 1 + 2 + 4 + 1;
+ }
+
+
public void writePayload(ByteBuffer buffer)
{
buffer.put(_bodyFrame.getFrameType());
EncodingUtils.writeUnsignedShort(buffer, _channel);
EncodingUtils.writeUnsignedInteger(buffer, _bodyFrame.getSize());
_bodyFrame.writePayload(buffer);
- buffer.put((byte) 0xCE);
+ buffer.put(FRAME_END_BYTE);
}
public final int getChannel()
@@ -66,10 +72,54 @@ public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock
return _bodyFrame;
}
-
-
public String toString()
{
return "Frame channelId: " + _channel + ", bodyFrame: " + String.valueOf(_bodyFrame);
}
+
+ public static void writeFrame(ByteBuffer buffer, final int channel, AMQBody body)
+ {
+ buffer.put(body.getFrameType());
+ EncodingUtils.writeUnsignedShort(buffer, channel);
+ EncodingUtils.writeUnsignedInteger(buffer, body.getSize());
+ body.writePayload(buffer);
+ buffer.put(FRAME_END_BYTE);
+
+ }
+
+ public static void writeFrames(ByteBuffer buffer, final int channel, AMQBody body1, AMQBody body2)
+ {
+ buffer.put(body1.getFrameType());
+ EncodingUtils.writeUnsignedShort(buffer, channel);
+ EncodingUtils.writeUnsignedInteger(buffer, body1.getSize());
+ body1.writePayload(buffer);
+ buffer.put(FRAME_END_BYTE);
+ buffer.put(body2.getFrameType());
+ EncodingUtils.writeUnsignedShort(buffer, channel);
+ EncodingUtils.writeUnsignedInteger(buffer, body2.getSize());
+ body2.writePayload(buffer);
+ buffer.put(FRAME_END_BYTE);
+
+ }
+
+ public static void writeFrames(ByteBuffer buffer, final int channel, AMQBody body1, AMQBody body2, AMQBody body3)
+ {
+ buffer.put(body1.getFrameType());
+ EncodingUtils.writeUnsignedShort(buffer, channel);
+ EncodingUtils.writeUnsignedInteger(buffer, body1.getSize());
+ body1.writePayload(buffer);
+ buffer.put(FRAME_END_BYTE);
+ buffer.put(body2.getFrameType());
+ EncodingUtils.writeUnsignedShort(buffer, channel);
+ EncodingUtils.writeUnsignedInteger(buffer, body2.getSize());
+ body2.writePayload(buffer);
+ buffer.put(FRAME_END_BYTE);
+ buffer.put(body3.getFrameType());
+ EncodingUtils.writeUnsignedShort(buffer, channel);
+ EncodingUtils.writeUnsignedInteger(buffer, body3.getSize());
+ body3.writePayload(buffer);
+ buffer.put(FRAME_END_BYTE);
+
+ }
+
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
index 5215bcbd66..64af717342 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
@@ -24,7 +24,9 @@ package org.apache.qpid.framing;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQConnectionException;
+import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
public abstract class AMQMethodBodyImpl implements AMQMethodBody
{
@@ -86,4 +88,9 @@ public abstract class AMQMethodBodyImpl implements AMQMethodBody
return new AMQConnectionException(code, message, getClazz(), getMethod(), getMajor(), getMinor(), cause);
}
+ public void handle(final int channelId, final AMQVersionAwareProtocolSession session) throws AMQException
+ {
+ session.methodFrameReceived(channelId, this);
+ }
+
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
index cf64b0475a..07e2faaf7e 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
@@ -26,8 +26,7 @@ import org.apache.mina.common.ByteBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Map;
-import java.util.WeakHashMap;
+import java.util.*;
import java.lang.ref.WeakReference;
/**
@@ -38,6 +37,62 @@ import java.lang.ref.WeakReference;
*/
public final class AMQShortString implements CharSequence, Comparable<AMQShortString>
{
+ private static final byte MINUS = (byte)'-';
+ private static final byte ZERO = (byte) '0';
+
+
+
+ private final class TokenizerImpl implements AMQShortStringTokenizer
+ {
+ private final byte _delim;
+ private int _count = -1;
+ private int _pos = 0;
+
+ public TokenizerImpl(final byte delim)
+ {
+ _delim = delim;
+ }
+
+ public int countTokens()
+ {
+ if(_count == -1)
+ {
+ _count = 1 + AMQShortString.this.occurences(_delim);
+ }
+ return _count;
+ }
+
+ public AMQShortString nextToken()
+ {
+ if(_pos <= AMQShortString.this.length())
+ {
+ int nextDelim = AMQShortString.this.indexOf(_delim, _pos);
+ if(nextDelim == -1)
+ {
+ nextDelim = AMQShortString.this.length();
+ }
+
+ AMQShortString nextToken = AMQShortString.this.substring(_pos, nextDelim++);
+ _pos = nextDelim;
+ return nextToken;
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public boolean hasMoreTokens()
+ {
+ return _pos <= AMQShortString.this.length();
+ }
+ }
+
+ private AMQShortString substring(final int from, final int to)
+ {
+ return new AMQShortString(_data, from, to);
+ }
+
private static final ThreadLocal<Map<AMQShortString, WeakReference<AMQShortString>>> _localInternMap =
new ThreadLocal<Map<AMQShortString, WeakReference<AMQShortString>>>()
@@ -53,7 +108,8 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
private static final Logger _logger = LoggerFactory.getLogger(AMQShortString.class);
- private final ByteBuffer _data;
+ private final byte[] _data;
+ private final int _offset;
private int _hashCode;
private final int _length;
private static final char[] EMPTY_CHAR_ARRAY = new char[0];
@@ -63,17 +119,25 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
public AMQShortString(byte[] data)
{
- _data = ByteBuffer.wrap(data);
+ _data = data.clone();
_length = data.length;
+ _offset = 0;
+ }
+
+ public AMQShortString(byte[] data, int pos)
+ {
+ final int size = data[pos++];
+ final byte[] dataCopy = new byte[size];
+ System.arraycopy(data,pos,dataCopy,0,size);
+ _length = size;
+ _data = dataCopy;
+ _offset = 0;
}
public AMQShortString(String data)
{
this((data == null) ? EMPTY_CHAR_ARRAY : data.toCharArray());
- if (data != null)
- {
- _hashCode = data.hashCode();
- }
+
}
public AMQShortString(char[] data)
@@ -85,14 +149,17 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
final int length = data.length;
final byte[] stringBytes = new byte[length];
+ int hash = 0;
for (int i = 0; i < length; i++)
{
stringBytes[i] = (byte) (0xFF & data[i]);
+ hash = (31 * hash) + stringBytes[i];
}
+ _hashCode = hash;
+ _data = stringBytes;
- _data = ByteBuffer.wrap(stringBytes);
- _data.rewind();
_length = length;
+ _offset = 0;
}
@@ -108,20 +175,31 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
}
- _data = ByteBuffer.wrap(stringBytes);
- _data.rewind();
+ _data = stringBytes;
_hashCode = hash;
_length = length;
+ _offset = 0;
}
- private AMQShortString(ByteBuffer data)
+ private AMQShortString(ByteBuffer data, final int length)
{
- _data = data;
- _length = data.limit();
+ byte[] dataBytes = new byte[length];
+ data.get(dataBytes);
+ _data = dataBytes;
+ _length = length;
+ _offset = 0;
+
+ }
+ private AMQShortString(final byte[] data, final int from, final int to)
+ {
+ _offset = from;
+ _length = to - from;
+ _data = data;
}
+
/**
* Get the length of the short string
* @return length of the underlying byte array
@@ -134,7 +212,7 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
public char charAt(int index)
{
- return (char) _data.get(index);
+ return (char) _data[_offset + index];
}
@@ -146,27 +224,24 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
public int writeToByteArray(byte[] encoding, int pos)
{
final int size = length();
- encoding[pos++] = (byte) length();
- for (int i = 0; i < size; i++)
- {
- encoding[pos++] = _data.get(i);
- }
-
- return pos;
+ encoding[pos++] = (byte) size;
+ System.arraycopy(_data,_offset,encoding,pos,size);
+ return pos+size;
}
public static AMQShortString readFromByteArray(byte[] byteEncodedDestination, int pos)
{
- final byte len = byteEncodedDestination[pos];
- if (len == 0)
+
+ final AMQShortString shortString = new AMQShortString(byteEncodedDestination, pos);
+ if(shortString.length() == 0)
{
return null;
}
-
- ByteBuffer data = ByteBuffer.wrap(byteEncodedDestination, pos + 1, len).slice();
-
- return new AMQShortString(data);
+ else
+ {
+ return shortString;
+ }
}
public static AMQShortString readFromBuffer(ByteBuffer buffer)
@@ -178,90 +253,59 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
}
else
{
- ByteBuffer data = buffer.slice();
- data.limit(length);
- data.rewind();
- buffer.skip(length);
- return new AMQShortString(data);
+ return new AMQShortString(buffer, length);
}
}
public byte[] getBytes()
{
-
- if (_data.buf().hasArray() && (_data.arrayOffset() == 0))
+ if(_offset == 0 && _length == _data.length)
{
- return _data.array();
+ return _data.clone();
}
else
{
- final int size = length();
- byte[] b = new byte[size];
- ByteBuffer buf = _data.duplicate();
- buf.rewind();
- buf.get(b);
-
- return b;
+ byte[] data = new byte[_length];
+ System.arraycopy(_data,_offset,data,0,_length);
+ return data;
}
-
}
public void writeToBuffer(ByteBuffer buffer)
{
final int size = length();
- if (size != 0)
- {
-
- buffer.setAutoExpand(true);
- buffer.put((byte) size);
- if (_data.buf().hasArray())
- {
- buffer.put(_data.array(), _data.arrayOffset(), length());
- }
- else
- {
-
- for (int i = 0; i < size; i++)
- {
-
- buffer.put(_data.get(i));
- }
- }
- }
- else
- {
- // really writing out unsigned byte
- buffer.put((byte) 0);
- }
+ //buffer.setAutoExpand(true);
+ buffer.put((byte) size);
+ buffer.put(_data, _offset, size);
}
private final class CharSubSequence implements CharSequence
{
- private final int _offset;
+ private final int _sequenceOffset;
private final int _end;
public CharSubSequence(final int offset, final int end)
{
- _offset = offset;
+ _sequenceOffset = offset;
_end = end;
}
public int length()
{
- return _end - _offset;
+ return _end - _sequenceOffset;
}
public char charAt(int index)
{
- return AMQShortString.this.charAt(index + _offset);
+ return AMQShortString.this.charAt(index + _sequenceOffset);
}
public CharSequence subSequence(int start, int end)
{
- return new CharSubSequence(start + _offset, end + _offset);
+ return new CharSubSequence(start + _sequenceOffset, end + _sequenceOffset);
}
}
@@ -272,7 +316,7 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
for (int i = 0; i < size; i++)
{
- chars[i] = (char) _data.get(i);
+ chars[i] = (char) _data[i + _offset];
}
return chars;
@@ -285,6 +329,17 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
public boolean equals(Object o)
{
+
+
+ if(o instanceof AMQShortString)
+ {
+ return equals((AMQShortString)o);
+ }
+ if(o instanceof CharSequence)
+ {
+ return equals((CharSequence)o);
+ }
+
if (o == null)
{
return false;
@@ -295,26 +350,40 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
return true;
}
- if (o instanceof AMQShortString)
- {
- final AMQShortString otherString = (AMQShortString) o;
+ return false;
- if ((_hashCode != 0) && (otherString._hashCode != 0) && (_hashCode != otherString._hashCode))
- {
- return false;
- }
+ }
+
+ public boolean equals(final AMQShortString otherString)
+ {
+ if (otherString == this)
+ {
+ return true;
+ }
- return _data.equals(otherString._data);
+ if (otherString == null)
+ {
+ return false;
+ }
+ if ((_hashCode != 0) && (otherString._hashCode != 0) && (_hashCode != otherString._hashCode))
+ {
+ return false;
}
- return (o instanceof CharSequence) && equals((CharSequence) o);
+ return (_offset == 0 && otherString._offset == 0 && _length == _data.length && otherString._length == otherString._data.length && Arrays.equals(_data,otherString._data))
+ || Arrays.equals(getBytes(),otherString.getBytes());
}
public boolean equals(CharSequence s)
{
+ if(s instanceof AMQShortString)
+ {
+ return equals((AMQShortString)s);
+ }
+
if (s == null)
{
return false;
@@ -345,7 +414,7 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
for (int i = 0; i < size; i++)
{
- hash = (31 * hash) + _data.get(i);
+ hash = (31 * hash) + _data[i+_offset];
}
_hashCode = hash;
@@ -380,8 +449,8 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
for (int i = 0; i < length(); i++)
{
- final byte d = _data.get(i);
- final byte n = name._data.get(i);
+ final byte d = _data[i+_offset];
+ final byte n = name._data[i+name._offset];
if (d < n)
{
return -1;
@@ -398,6 +467,12 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
}
+ public AMQShortStringTokenizer tokenize(byte delim)
+ {
+ return new TokenizerImpl(delim);
+ }
+
+
public AMQShortString intern()
{
@@ -435,4 +510,111 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
return internString;
}
+
+ private int occurences(final byte delim)
+ {
+ int count = 0;
+ final int end = _offset + _length;
+ for(int i = _offset ; i < end ; i++ )
+ {
+ if(_data[i] == delim)
+ {
+ count++;
+ }
+ }
+ return count;
+ }
+
+ private int indexOf(final byte val, final int pos)
+ {
+
+ for(int i = pos; i < length(); i++)
+ {
+ if(_data[_offset+i] == val)
+ {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+
+ public static AMQShortString join(final Collection<AMQShortString> terms,
+ final AMQShortString delim)
+ {
+ if(terms.size() == 0)
+ {
+ return EMPTY_STRING;
+ }
+
+ int size = delim.length() * (terms.size() - 1);
+ for(AMQShortString term : terms)
+ {
+ size += term.length();
+ }
+
+ byte[] data = new byte[size];
+ int pos = 0;
+ final byte[] delimData = delim._data;
+ final int delimOffset = delim._offset;
+ final int delimLength = delim._length;
+
+
+ for(AMQShortString term : terms)
+ {
+
+ if(pos!=0)
+ {
+ System.arraycopy(delimData, delimOffset,data,pos, delimLength);
+ pos+=delimLength;
+ }
+ System.arraycopy(term._data,term._offset,data,pos,term._length);
+ pos+=term._length;
+ }
+
+
+
+ return new AMQShortString(data,0,size);
+ }
+
+ public int toIntValue()
+ {
+ int pos = 0;
+ int val = 0;
+
+
+ boolean isNegative = (_data[pos] == MINUS);
+ if(isNegative)
+ {
+ pos++;
+ }
+ while(pos < _length)
+ {
+ int digit = (int) (_data[pos++] - ZERO);
+ if((digit < 0) || (digit > 9))
+ {
+ throw new NumberFormatException("\""+toString()+"\" is not a valid number");
+ }
+ val = val * 10;
+ val += digit;
+ }
+ if(isNegative)
+ {
+ val = val * -1;
+ }
+ return val;
+ }
+
+ public boolean contains(final byte b)
+ {
+ for(int i = 0; i < _length; i++)
+ {
+ if(_data[i] == b)
+ {
+ return true;
+ }
+ }
+ return false; //To change body of created methods use File | Settings | File Templates.
+ }
+
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQShortStringTokenizer.java b/java/common/src/main/java/org/apache/qpid/framing/AMQShortStringTokenizer.java
new file mode 100644
index 0000000000..e2db8906a1
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQShortStringTokenizer.java
@@ -0,0 +1,31 @@
+package org.apache.qpid.framing;
+
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+public interface AMQShortStringTokenizer
+{
+
+ public int countTokens();
+
+ public AMQShortString nextToken();
+
+ boolean hasMoreTokens();
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java b/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java
index 7b6699b783..94030f383e 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java
@@ -24,7 +24,6 @@ import org.apache.mina.common.ByteBuffer;
public class CompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQDataBlock
{
- private AMQDataBlock _firstFrame;
private AMQDataBlock[] _blocks;
@@ -33,27 +32,12 @@ public class CompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQD
_blocks = blocks;
}
- /**
- * The encoded block will be logically first before the AMQDataBlocks which are encoded
- * into the buffer afterwards.
- * @param encodedBlock already-encoded data
- * @param blocks some blocks to be encoded.
- */
- public CompositeAMQDataBlock(AMQDataBlock encodedBlock, AMQDataBlock[] blocks)
- {
- this(blocks);
- _firstFrame = encodedBlock;
- }
public AMQDataBlock[] getBlocks()
{
return _blocks;
}
- public AMQDataBlock getFirstFrame()
- {
- return _firstFrame;
- }
public long getSize()
{
@@ -62,19 +46,11 @@ public class CompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQD
{
frameSize += _blocks[i].getSize();
}
- if (_firstFrame != null)
- {
- frameSize += _firstFrame.getSize();
- }
return frameSize;
}
public void writePayload(ByteBuffer buffer)
{
- if (_firstFrame != null)
- {
- _firstFrame.writePayload(buffer);
- }
for (int i = 0; i < _blocks.length; i++)
{
_blocks[i].writePayload(buffer);
@@ -90,7 +66,7 @@ public class CompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQD
else
{
StringBuilder buf = new StringBuilder(this.getClass().getName());
- buf.append("{encodedBlock=").append(_firstFrame);
+ buf.append("{");
for (int i = 0 ; i < _blocks.length; i++)
{
buf.append(" ").append(i).append("=[").append(_blocks[i].toString()).append("]");
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
index cbee1680f7..969df954ce 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
@@ -21,6 +21,8 @@
package org.apache.qpid.framing;
import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.AMQException;
public class ContentBody implements AMQBody
{
@@ -68,6 +70,12 @@ public class ContentBody implements AMQBody
}
}
+ public void handle(final int channelId, final AMQVersionAwareProtocolSession session)
+ throws AMQException
+ {
+ session.contentBodyReceived(channelId, this);
+ }
+
protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException
{
if (size > 0)
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
index 80a61544b3..83e5a7e341 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
@@ -21,6 +21,8 @@
package org.apache.qpid.framing;
import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.AMQException;
public class ContentHeaderBody implements AMQBody
{
@@ -110,6 +112,12 @@ public class ContentHeaderBody implements AMQBody
properties.writePropertyListPayload(buffer);
}
+ public void handle(final int channelId, final AMQVersionAwareProtocolSession session)
+ throws AMQException
+ {
+ session.contentHeaderReceived(channelId, this);
+ }
+
public static AMQFrame createAMQFrame(int channelId, int classId, int weight, BasicContentHeaderProperties properties,
long bodySize)
{
diff --git a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
index ef7163bd40..15a43345b5 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
@@ -21,6 +21,8 @@
package org.apache.qpid.framing;
import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.AMQException;
public class HeartbeatBody implements AMQBody
{
@@ -55,6 +57,12 @@ public class HeartbeatBody implements AMQBody
{
}
+ public void handle(final int channelId, final AMQVersionAwareProtocolSession session)
+ throws AMQException
+ {
+ session.heartbeatBodyReceived(channelId, this);
+ }
+
protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException
{
if(size > 0)
diff --git a/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java b/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java
index d8b6b25b92..d7194640d4 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java
@@ -70,7 +70,7 @@ public class MethodConverter_0_9 extends AbstractMethodConverter implements Prot
return new MethodConverter_0_9.MessagePublishInfoImpl(exchange,
publishBody.getImmediate(),
publishBody.getMandatory(),
- routingKey == null ? null : routingKey.intern());
+ routingKey);
}
diff --git a/java/common/src/main/java/org/apache/qpid/pool/Job.java b/java/common/src/main/java/org/apache/qpid/pool/Job.java
index ba3c5d03fa..b2a09ac592 100644
--- a/java/common/src/main/java/org/apache/qpid/pool/Job.java
+++ b/java/common/src/main/java/org/apache/qpid/pool/Job.java
@@ -94,21 +94,23 @@ public class Job implements Runnable
/**
* Sequentially processes, up to the maximum number per job, the aggregated continuations in enqueued in this job.
*/
- void processAll()
+ boolean processAll()
{
// limit the number of events processed in one run
- for (int i = 0; i < _maxEvents; i++)
+ int i = _maxEvents;
+ while( --i != 0 )
{
Event e = _eventQueue.poll();
if (e == null)
{
- break;
+ return true;
}
else
{
e.process(_session);
}
}
+ return false;
}
/**
@@ -144,9 +146,15 @@ public class Job implements Runnable
*/
public void run()
{
- processAll();
- deactivate();
- _completionHandler.completed(_session, this);
+ if(processAll())
+ {
+ deactivate();
+ _completionHandler.completed(_session, this);
+ }
+ else
+ {
+ _completionHandler.notCompleted(_session, this);
+ }
}
/**
@@ -158,5 +166,7 @@ public class Job implements Runnable
static interface JobCompletionHandler
{
public void completed(IoSession session, Job job);
+
+ public void notCompleted(final IoSession session, final Job job);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java b/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
index cbe08a192e..8352b5af77 100644
--- a/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
+++ b/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
@@ -29,8 +29,8 @@ import org.apache.qpid.pool.Event.CloseEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ExecutorService;
/**
* PoolingFilter, is a no-op pass through filter that hands all events down the Mina filter chain by default. As it
@@ -84,9 +84,6 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo
/** Used for debugging purposes. */
private static final Logger _logger = LoggerFactory.getLogger(PoolingFilter.class);
- /** Holds a mapping from Mina sessions to batched jobs for execution. */
- private final ConcurrentMap<IoSession, Job> _jobs = new ConcurrentHashMap<IoSession, Job>();
-
/** Holds the managed reference to obtain the executor for the batched jobs. */
private final ReferenceCountingExecutorService _poolReference;
@@ -94,7 +91,9 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo
private final String _name;
/** Defines the maximum number of events that will be batched into a single job. */
- private final int _maxEvents = Integer.getInteger("amqj.server.read_write_pool.max_events", 10);
+ static final int MAX_JOB_EVENTS = Integer.getInteger("amqj.server.read_write_pool.max_events", 10);
+
+ private final int _maxEvents;
/**
* Creates a named pooling filter, on the specified shared thread pool.
@@ -102,10 +101,11 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo
* @param refCountingPool The thread pool reference.
* @param name The identifying name of the filter type.
*/
- public PoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
+ public PoolingFilter(ReferenceCountingExecutorService refCountingPool, String name, int maxEvents)
{
_poolReference = refCountingPool;
_name = name;
+ _maxEvents = maxEvents;
}
/**
@@ -160,20 +160,34 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo
/**
* Adds an {@link Event} to a {@link Job}, triggering the execution of the job if it is not already running.
*
- * @param session The Mina session to work in.
+ * @param job The job.
* @param event The event to hand off asynchronously.
*/
- void fireAsynchEvent(IoSession session, Event event)
+ void fireAsynchEvent(Job job, Event event)
{
- Job job = getJobForSession(session);
+
// job.acquire(); //prevents this job being removed from _jobs
job.add(event);
- // Additional checks on pool to check that it hasn't shutdown.
- // The alternative is to catch the RejectedExecutionException that will result from executing on a shutdown pool
- if (job.activate() && (_poolReference.getPool() != null) && !_poolReference.getPool().isShutdown())
+ final ExecutorService pool = _poolReference.getPool();
+
+ if(pool == null)
{
- _poolReference.getPool().execute(job);
+ return;
+ }
+
+ // rather than perform additional checks on pool to check that it hasn't shutdown.
+ // catch the RejectedExecutionException that will result from executing on a shutdown pool
+ if (job.activate())
+ {
+ try
+ {
+ pool.execute(job);
+ }
+ catch(RejectedExecutionException e)
+ {
+ _logger.warn("Thread pool shutdown while tasks still outstanding");
+ }
}
}
@@ -186,7 +200,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo
*/
public void createNewJobForSession(IoSession session)
{
- Job job = new Job(session, this, _maxEvents);
+ Job job = new Job(session, this, MAX_JOB_EVENTS);
session.setAttribute(_name, job);
}
@@ -197,7 +211,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo
*
* @return The Job for this filter to place asynchronous events into.
*/
- private Job getJobForSession(IoSession session)
+ public Job getJobForSession(IoSession session)
{
return (Job) session.getAttribute(_name);
}
@@ -233,17 +247,57 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo
// }
// }
// else
+
+
if (!job.isComplete())
{
+ final ExecutorService pool = _poolReference.getPool();
+
+ if(pool == null)
+ {
+ return;
+ }
+
+
// ritchiem : 2006-12-13 Do we need to perform the additional checks here?
// Can the pool be shutdown at this point?
- if (job.activate() && (_poolReference.getPool() != null) && !_poolReference.getPool().isShutdown())
+ if (job.activate())
{
- _poolReference.getPool().execute(job);
+ try
+ {
+ pool.execute(job);
+ }
+ catch(RejectedExecutionException e)
+ {
+ _logger.warn("Thread pool shutdown while tasks still outstanding");
+ }
+
}
}
}
+ public void notCompleted(IoSession session, Job job)
+ {
+ final ExecutorService pool = _poolReference.getPool();
+
+ if(pool == null)
+ {
+ return;
+ }
+
+ try
+ {
+ pool.execute(job);
+ }
+ catch(RejectedExecutionException e)
+ {
+ _logger.warn("Thread pool shutdown while tasks still outstanding");
+ }
+
+ }
+
+
+
/**
* No-op pass through filter to the next filter in the chain.
*
@@ -400,7 +454,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo
*/
public AsynchReadPoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
{
- super(refCountingPool, name);
+ super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_read_events", MAX_JOB_EVENTS));
}
/**
@@ -412,8 +466,8 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo
*/
public void messageReceived(NextFilter nextFilter, final IoSession session, Object message)
{
-
- fireAsynchEvent(session, new Event.ReceivedEvent(nextFilter, message));
+ Job job = getJobForSession(session);
+ fireAsynchEvent(job, new Event.ReceivedEvent(nextFilter, message));
}
/**
@@ -424,7 +478,8 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo
*/
public void sessionClosed(final NextFilter nextFilter, final IoSession session)
{
- fireAsynchEvent(session, new CloseEvent(nextFilter));
+ Job job = getJobForSession(session);
+ fireAsynchEvent(job, new CloseEvent(nextFilter));
}
}
@@ -442,7 +497,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo
*/
public AsynchWritePoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
{
- super(refCountingPool, name);
+ super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_write_events", MAX_JOB_EVENTS));
}
/**
@@ -454,7 +509,8 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo
*/
public void filterWrite(final NextFilter nextFilter, final IoSession session, final WriteRequest writeRequest)
{
- fireAsynchEvent(session, new Event.WriteEvent(nextFilter, writeRequest));
+ Job job = getJobForSession(session);
+ fireAsynchEvent(job, new Event.WriteEvent(nextFilter, writeRequest));
}
/**
@@ -465,7 +521,8 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo
*/
public void sessionClosed(final NextFilter nextFilter, final IoSession session)
{
- fireAsynchEvent(session, new CloseEvent(nextFilter));
+ Job job = getJobForSession(session);
+ fireAsynchEvent(job, new CloseEvent(nextFilter));
}
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java
index 2fbeeda1d4..5a7679a972 100644
--- a/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java
+++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java
@@ -21,6 +21,7 @@
package org.apache.qpid.protocol;
import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.AMQException;
/**
* AMQMethodListener is a listener that receives notifications of AMQP methods. The methods are packaged as events in
@@ -57,7 +58,7 @@ public interface AMQMethodListener
*
* @todo Consider narrowing the exception.
*/
- <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws Exception;
+ <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws AMQException;
/**
* Notifies the listener of an error on the event context to which it is listening. The listener should perform
diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
index 035645aad2..b56a05f725 100644
--- a/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
+++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
@@ -20,8 +20,8 @@
*/
package org.apache.qpid.protocol;
-import org.apache.qpid.framing.VersionSpecificRegistry;
-import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.AMQException;
/**
* AMQVersionAwareProtocolSession is implemented by all AMQP session classes, that need to provide an awareness to
@@ -46,4 +46,12 @@ public interface AMQVersionAwareProtocolSession extends AMQProtocolWriter, Proto
// public VersionSpecificRegistry getRegistry();
MethodRegistry getMethodRegistry();
+
+
+ public void methodFrameReceived(int channelId, AMQMethodBody body) throws AMQException;
+ public void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException;
+ public void contentBodyReceived(int channelId, ContentBody body) throws AMQException;
+ public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException;
+
+
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java b/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
index bb9b201e5d..5fbea5e14f 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
@@ -24,20 +24,15 @@ import junit.framework.TestCase;
import org.apache.log4j.Logger;
-import org.apache.mina.common.IoSession;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.SkeletonMessageStore;
-import org.apache.qpid.server.virtualhost.VirtualHost;
import javax.management.JMException;
@@ -62,7 +57,7 @@ public class AMQProtocolSessionMBeanTest extends TestCase
AMQQueue queue =
new org.apache.qpid.server.queue.AMQQueue(new AMQShortString("testQueue_" + System.currentTimeMillis()), false,
new AMQShortString("test"), true, _protocolSession.getVirtualHost());
- AMQChannel channel = new AMQChannel(_protocolSession, 2, _messageStore, null);
+ AMQChannel channel = new AMQChannel(_protocolSession, 2, _messageStore);
channel.setDefaultQueue(queue);
_protocolSession.addChannel(channel);
channelCount = _mbean.channels().size();
@@ -73,7 +68,7 @@ public class AMQProtocolSessionMBeanTest extends TestCase
assertTrue(_mbean.getMaximumNumberOfChannels() == 1000L);
// check APIs
- AMQChannel channel3 = new AMQChannel(_protocolSession, 3, _messageStore, null);
+ AMQChannel channel3 = new AMQChannel(_protocolSession, 3, _messageStore);
channel3.setLocalTransactional();
_protocolSession.addChannel(channel3);
_mbean.rollbackTransactions(2);
@@ -93,14 +88,14 @@ public class AMQProtocolSessionMBeanTest extends TestCase
}
// check if closing of session works
- _protocolSession.addChannel(new AMQChannel(_protocolSession, 5, _messageStore, null));
+ _protocolSession.addChannel(new AMQChannel(_protocolSession, 5, _messageStore));
_mbean.closeConnection();
try
{
channelCount = _mbean.channels().size();
assertTrue(channelCount == 0);
// session is now closed so adding another channel should throw an exception
- _protocolSession.addChannel(new AMQChannel(_protocolSession, 6, _messageStore, null));
+ _protocolSession.addChannel(new AMQChannel(_protocolSession, 6, _messageStore));
fail();
}
catch (AMQException ex)
@@ -119,7 +114,7 @@ public class AMQProtocolSessionMBeanTest extends TestCase
new AMQMinaProtocolSession(new MockIoSession(), appRegistry.getVirtualHostRegistry(), new AMQCodecFactory(true),
null);
_protocolSession.setVirtualHost(appRegistry.getVirtualHostRegistry().getVirtualHost("test"));
- _channel = new AMQChannel(_protocolSession, 1, _messageStore, null);
+ _channel = new AMQChannel(_protocolSession, 1, _messageStore);
_protocolSession.addChannel(_channel);
_mbean = (AMQProtocolSessionMBean) _protocolSession.getManagedObject();
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/protocol/MaxChannelsTest.java b/java/systests/src/main/java/org/apache/qpid/server/protocol/MaxChannelsTest.java
index ae02c1c28c..5adfebbffb 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/protocol/MaxChannelsTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/protocol/MaxChannelsTest.java
@@ -21,22 +21,12 @@
package org.apache.qpid.server.protocol;
import junit.framework.TestCase;
-import org.apache.mina.common.IoSession;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.SkeletonMessageStore;
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.framing.AMQShortString;
-
-import javax.management.JMException;
/** Test class to test MBean operations for AMQMinaProtocolSession. */
public class MaxChannelsTest extends TestCase
@@ -65,7 +55,7 @@ public class MaxChannelsTest extends TestCase
{
for (long currentChannel = 0L; currentChannel < maxChannels; currentChannel++)
{
- _protocolSession.addChannel(new AMQChannel(_protocolSession, (int) currentChannel, null, null));
+ _protocolSession.addChannel(new AMQChannel(_protocolSession, (int) currentChannel, null));
}
}
catch (AMQException e)
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
index 96be579d2a..95ffb505fb 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
@@ -24,7 +24,6 @@ import junit.framework.TestCase;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
@@ -37,7 +36,6 @@ import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.util.TestApplicationRegistry;
import org.apache.qpid.server.util.NullApplicationRegistry;
import java.util.LinkedList;
@@ -77,7 +75,7 @@ public class AckTest extends TestCase
super.setUp();
_messageStore = new TestableMemoryMessageStore();
_protocolSession = new MockProtocolSession(_messageStore);
- _channel = new AMQChannel(_protocolSession,5, _messageStore, null/*dont need exchange registry*/);
+ _channel = new AMQChannel(_protocolSession,5, _messageStore /*dont need exchange registry*/);
_protocolSession.addChannel(_channel);
_subscriptionManager = new SubscriptionSet();
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java b/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
index fcee3c7de4..cf986e7803 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
@@ -190,6 +190,26 @@ public class MockProtocolSession implements AMQProtocolSession
return null; //To change body of implemented methods use File | Settings | File Templates.
}
+ public void methodFrameReceived(int channelId, AMQMethodBody body)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void contentHeaderReceived(int channelId, ContentHeaderBody body)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void contentBodyReceived(int channelId, ContentBody body)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void heartbeatBodyReceived(int channelId, HeartbeatBody body)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public MethodDispatcher getMethodDispatcher()
{
return null; //To change body of implemented methods use File | Settings | File Templates.