summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2008-08-08 18:31:18 +0000
committerRafael H. Schloming <rhs@apache.org>2008-08-08 18:31:18 +0000
commit4d142a2beccdbabb242f5faa6dd210b10803b1af (patch)
treebac94b1fdcf5f251839004d8f46a9f762186a0d0
parent8ecc4ecc20d23685c8e2a835b855bfb60fdf5bf7 (diff)
downloadqpid-python-4d142a2beccdbabb242f5faa6dd210b10803b1af.tar.gz
QPID-1213: simplified unprocessed message and moved version specific code into the _0_8 and _0_10 variants
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@684036 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java225
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java39
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java84
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java32
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java129
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java20
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java87
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java12
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java5
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java4
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java113
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java48
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java14
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/CloseConsumerMessage.java43
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java11
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java19
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java4
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java221
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java61
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java41
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java19
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java23
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/nclient/MessagePartListener.java27
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java10
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/nclient/util/ByteBufferMessage.java43
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java44
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java8
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java11
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Header.java13
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java16
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java2
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java9
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/util/Logger.java5
34 files changed, 583 insertions, 861 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 723e502ff0..5203a27f42 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -63,7 +63,6 @@ import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInvalidArgumentException;
import org.apache.qpid.AMQInvalidRoutingKeyException;
-import org.apache.qpid.AMQUndeliveredException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverNoopSupport;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
@@ -78,7 +77,6 @@ import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.FieldTableFactory;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.jms.Session;
-import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.url.AMQBindingURL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -99,20 +97,20 @@ import org.slf4j.LoggerFactory;
* @todo Two new objects created on every failover supported method call. Consider more efficient ways of doing this,
* after looking at worse bottlenecks first.
*/
-public abstract class AMQSession extends Closeable implements Session, QueueSession, TopicSession
+public abstract class AMQSession<C extends BasicMessageConsumer, P extends BasicMessageProducer> extends Closeable implements Session, QueueSession, TopicSession
{
- public static final class IdToConsumerMap
+ public static final class IdToConsumerMap<C extends BasicMessageConsumer>
{
private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16];
- private final ConcurrentHashMap<Integer, BasicMessageConsumer> _slowAccessConsumers = new ConcurrentHashMap<Integer, BasicMessageConsumer>();
+ private final ConcurrentHashMap<Integer, C> _slowAccessConsumers = new ConcurrentHashMap<Integer, C>();
- public BasicMessageConsumer get(int id)
+ public C get(int id)
{
if ((id & 0xFFFFFFF0) == 0)
{
- return _fastAccessConsumers[id];
+ return (C) _fastAccessConsumers[id];
}
else
{
@@ -120,12 +118,12 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}
}
- public BasicMessageConsumer put(int id, BasicMessageConsumer consumer)
+ public C put(int id, C consumer)
{
- BasicMessageConsumer oldVal;
+ C oldVal;
if ((id & 0xFFFFFFF0) == 0)
{
- oldVal = _fastAccessConsumers[id];
+ oldVal = (C) _fastAccessConsumers[id];
_fastAccessConsumers[id] = consumer;
}
else
@@ -137,12 +135,12 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}
- public BasicMessageConsumer remove(int id)
+ public C remove(int id)
{
- BasicMessageConsumer consumer;
+ C consumer;
if ((id & 0xFFFFFFF0) == 0)
{
- consumer = _fastAccessConsumers[id];
+ consumer = (C) _fastAccessConsumers[id];
_fastAccessConsumers[id] = null;
}
else
@@ -154,15 +152,15 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}
- public Collection<BasicMessageConsumer> values()
+ public Collection<C> values()
{
- ArrayList<BasicMessageConsumer> values = new ArrayList<BasicMessageConsumer>();
+ ArrayList<C> values = new ArrayList<C>();
for (int i = 0; i < 16; i++)
{
if (_fastAccessConsumers[i] != null)
{
- values.add(_fastAccessConsumers[i]);
+ values.add((C) _fastAccessConsumers[i]);
}
}
values.addAll(_slowAccessConsumers.values());
@@ -183,8 +181,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
/** Used for debugging. */
private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
- /** Used for debugging in the dispatcher. */
- private static final Logger _dispatcherLogger = LoggerFactory.getLogger(Dispatcher.class);
/** The default maximum number of prefetched message at which to suspend the channel. */
public static final int DEFAULT_PREFETCH_HIGH_MARK = 5000;
@@ -234,7 +230,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
/** Holds this session unique identifier, used to distinguish it from other sessions. */
protected int _channelId;
- /** @todo This does not appear to be set? */
private int _ticket;
/** Holds the high mark for prefetched message, at which the session is suspended. */
@@ -261,8 +256,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
* Holds a mapping from message consumers to their identifying names, so that their subscriptions may be looked
* up in the {@link #_subscriptions} map.
*/
- protected final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap =
- new ConcurrentHashMap<BasicMessageConsumer, String>();
+ protected final ConcurrentHashMap<C, String> _reverseSubscriptionMap =
+ new ConcurrentHashMap<C, String>();
/**
* Used to hold incoming messages.
@@ -299,7 +294,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
* Maps from identifying tags to message consumers, in order to pass dispatch incoming messages to the right
* consumer.
*/
- protected final IdToConsumerMap _consumers = new IdToConsumerMap();
+ protected final IdToConsumerMap<C> _consumers = new IdToConsumerMap<C>();
//Map<AMQShortString, BasicMessageConsumer> _consumers =
//new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>();
@@ -308,7 +303,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
* Contains a list of consumers which have been removed but which might still have
* messages to acknowledge, eg in client ack or transacted modes
*/
- private CopyOnWriteArrayList<BasicMessageConsumer> _removedConsumers = new CopyOnWriteArrayList<BasicMessageConsumer>();
+ private CopyOnWriteArrayList<C> _removedConsumers = new CopyOnWriteArrayList<C>();
/** Provides a count of consumers on destinations, in order to be able to know if a destination has consumers. */
private ConcurrentHashMap<Destination, AtomicInteger> _destinationConsumerCount =
@@ -584,7 +579,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}, _connection).execute();
}
- public void addBindingKey(BasicMessageConsumer consumer, AMQDestination amqd, String routingKey) throws AMQException
+ public void addBindingKey(C consumer, AMQDestination amqd, String routingKey) throws AMQException
{
if (consumer.getQueuename() != null)
{
@@ -755,11 +750,12 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
public abstract void sendCommit() throws AMQException, FailoverException;
- public void confirmConsumerCancelled(AMQShortString consumerTag)
+
+ public void confirmConsumerCancelled(int consumerTag)
{
// Remove the consumer from the map
- BasicMessageConsumer consumer = _consumers.get(consumerTag.toIntValue());
+ C consumer = _consumers.get(consumerTag);
if (consumer != null)
{
if (!consumer.isNoConsume()) // Normal Consumer
@@ -801,7 +797,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}
else
{
- _queue.add(new UnprocessedMessage.CloseConsumerMessage(consumer));
+ _queue.add(new CloseConsumerMessage(consumer));
}
}
}
@@ -848,7 +844,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
false, false);
}
- public MessageConsumer createExclusiveConsumer(Destination destination) throws JMSException
+ public C createExclusiveConsumer(Destination destination) throws JMSException
{
checkValidDestination(destination);
@@ -927,8 +923,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
_subscriptions.get(name).close();
}
AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection);
- BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal);
- TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer);
+ C consumer = (C) createConsumer(dest, messageSelector, noLocal);
+ TopicSubscriberAdaptor<C> subscriber = new TopicSubscriberAdaptor(dest, consumer);
_subscriptions.put(name, subscriber);
_reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
@@ -960,23 +956,23 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
return msg;
}
- public BasicMessageProducer createProducer(Destination destination) throws JMSException
+ public P createProducer(Destination destination) throws JMSException
{
return createProducerImpl(destination, DEFAULT_MANDATORY, DEFAULT_IMMEDIATE);
}
- public BasicMessageProducer createProducer(Destination destination, boolean immediate) throws JMSException
+ public P createProducer(Destination destination, boolean immediate) throws JMSException
{
return createProducerImpl(destination, DEFAULT_MANDATORY, immediate);
}
- public BasicMessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate)
+ public P createProducer(Destination destination, boolean mandatory, boolean immediate)
throws JMSException
{
return createProducerImpl(destination, mandatory, immediate);
}
- public BasicMessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate,
+ public P createProducer(Destination destination, boolean mandatory, boolean immediate,
boolean waitUntilSent) throws JMSException
{
return createProducerImpl(destination, mandatory, immediate, waitUntilSent);
@@ -986,7 +982,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
{
checkNotClosed();
- return new TopicPublisherAdapter((BasicMessageProducer) createProducer(topic, false, false), topic);
+ return new TopicPublisherAdapter((P) createProducer(topic, false, false), topic);
}
public Queue createQueue(String queueName) throws JMSException
@@ -1074,7 +1070,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
{
checkValidDestination(destination);
AMQQueue dest = (AMQQueue) destination;
- BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(destination);
+ C consumer = (C) createConsumer(destination);
return new QueueReceiverAdaptor(dest, consumer);
}
@@ -1093,7 +1089,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
{
checkValidDestination(destination);
AMQQueue dest = (AMQQueue) destination;
- BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(destination, messageSelector);
+ C consumer = (C) createConsumer(destination, messageSelector);
return new QueueReceiverAdaptor(dest, consumer);
}
@@ -1111,7 +1107,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
{
checkNotClosed();
AMQQueue dest = (AMQQueue) queue;
- BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest);
+ C consumer = (C) createConsumer(dest);
return new QueueReceiverAdaptor(dest, consumer);
}
@@ -1130,7 +1126,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
{
checkNotClosed();
AMQQueue dest = (AMQQueue) queue;
- BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector);
+ C consumer = (C) createConsumer(dest, messageSelector);
return new QueueReceiverAdaptor(dest, consumer);
}
@@ -1175,7 +1171,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
AMQTopic dest = checkValidTopic(topic);
// AMQTopic dest = new AMQTopic(topic.getTopicName());
- return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createExclusiveConsumer(dest));
+ return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest));
}
/**
@@ -1195,7 +1191,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
AMQTopic dest = checkValidTopic(topic);
// AMQTopic dest = new AMQTopic(topic.getTopicName());
- return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createExclusiveConsumer(dest, messageSelector, noLocal));
+ return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest, messageSelector, noLocal));
}
public abstract TemporaryQueue createTemporaryQueue() throws JMSException;
@@ -1369,17 +1365,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
{
_logger.debug("Message[" + message.toString() + "] received in session");
}
-
- if (message instanceof ReturnMessage)
- {
- // Return of the bounced message.
- returnBouncedMessage((ReturnMessage) message);
- }
- else
- {
- _highestDeliveryTag.set(message.getDeliveryTag());
- _queue.add(message);
- }
+ _highestDeliveryTag.set(message.getDeliveryTag());
+ _queue.add(message);
}
public void declareAndBind(AMQDestination amqd)
@@ -1618,7 +1605,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}
}
- protected MessageConsumer createConsumerImpl(final Destination destination, final int prefetchHigh,
+ protected C createConsumerImpl(final Destination destination, final int prefetchHigh,
final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable rawSelector,
final boolean noConsume, final boolean autoClose) throws JMSException
{
@@ -1642,10 +1629,10 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
messageSelector = selector;
}
- return new FailoverRetrySupport<MessageConsumer, JMSException>(
- new FailoverProtectedOperation<MessageConsumer, JMSException>()
+ return new FailoverRetrySupport<C, JMSException>(
+ new FailoverProtectedOperation<C, JMSException>()
{
- public MessageConsumer execute() throws JMSException, FailoverException
+ public C execute() throws JMSException, FailoverException
{
checkNotClosed();
@@ -1668,7 +1655,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
ft.put(new AMQShortString("x-filter-jms-selector"), messageSelector);
}
- BasicMessageConsumer consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow,
+ C consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow,
noLocal, exclusive, messageSelector, ft, noConsume, autoClose);
if (_messageListener != null)
@@ -1712,7 +1699,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}, _connection).execute();
}
- public abstract BasicMessageConsumer createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
+ public abstract C createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable arguments,
final boolean noConsume, final boolean autoClose) throws JMSException;
@@ -1722,9 +1709,9 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
*
* @param consumer the consum
*/
- void deregisterConsumer(BasicMessageConsumer consumer)
+ void deregisterConsumer(C consumer)
{
- if (_consumers.remove(consumer.getConsumerTag().toIntValue()) != null)
+ if (_consumers.remove(consumer.getConsumerTag()) != null)
{
String subscriptionName = _reverseSubscriptionMap.remove(consumer);
if (subscriptionName != null)
@@ -2012,12 +1999,12 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
{
// we need to clone the list of consumers since the close() method updates the _consumers collection
// which would result in a concurrent modification exception
- final ArrayList<BasicMessageConsumer> clonedConsumers = new ArrayList<BasicMessageConsumer>(_consumers.values());
+ final ArrayList<C> clonedConsumers = new ArrayList<C>(_consumers.values());
- final Iterator<BasicMessageConsumer> it = clonedConsumers.iterator();
+ final Iterator<C> it = clonedConsumers.iterator();
while (it.hasNext())
{
- final BasicMessageConsumer con = it.next();
+ final C con = it.next();
if (error != null)
{
con.notifyError(error);
@@ -2048,7 +2035,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
final Iterator it = clonedProducers.iterator();
while (it.hasNext())
{
- final BasicMessageProducer prod = (BasicMessageProducer) it.next();
+ final P prod = (P) it.next();
prod.close();
}
// at this point the _producers map is empty
@@ -2096,20 +2083,18 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
*
* @param queueName
*/
- private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName,
+ private void consumeFromQueue(C consumer, AMQShortString queueName,
AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector) throws AMQException, FailoverException
{
int tagId = _nextTag++;
- // need to generate a consumer tag on the client so we can exploit the nowait flag
- AMQShortString tag = new AMQShortString(Integer.toString(tagId));
- consumer.setConsumerTag(tag);
+ consumer.setConsumerTag(tagId);
// we must register the consumer in the map before we actually start listening
_consumers.put(tagId, consumer);
try
{
- sendConsume(consumer, queueName, protocolHandler, nowait, messageSelector, tag);
+ sendConsume(consumer, queueName, protocolHandler, nowait, messageSelector, tagId);
}
catch (AMQException e)
{
@@ -2119,26 +2104,26 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}
}
- public abstract void sendConsume(BasicMessageConsumer consumer, AMQShortString queueName,
- AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector, AMQShortString tag) throws AMQException, FailoverException;
+ public abstract void sendConsume(C consumer, AMQShortString queueName,
+ AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector, int tag) throws AMQException, FailoverException;
- private BasicMessageProducer createProducerImpl(Destination destination, boolean mandatory, boolean immediate)
+ private P createProducerImpl(Destination destination, boolean mandatory, boolean immediate)
throws JMSException
{
return createProducerImpl(destination, mandatory, immediate, false);
}
- private BasicMessageProducer createProducerImpl(final Destination destination, final boolean mandatory,
+ private P createProducerImpl(final Destination destination, final boolean mandatory,
final boolean immediate, final boolean waitUntilSent) throws JMSException
{
- return new FailoverRetrySupport<BasicMessageProducer, JMSException>(
- new FailoverProtectedOperation<BasicMessageProducer, JMSException>()
+ return new FailoverRetrySupport<P, JMSException>(
+ new FailoverProtectedOperation<P, JMSException>()
{
- public BasicMessageProducer execute() throws JMSException, FailoverException
+ public P execute() throws JMSException, FailoverException
{
checkNotClosed();
long producerId = getNextProducerId();
- BasicMessageProducer producer = createMessageProducer(destination, mandatory,
+ P producer = createMessageProducer(destination, mandatory,
immediate, waitUntilSent, producerId);
registerProducer(producerId, producer);
@@ -2147,7 +2132,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}, _connection).execute();
}
- public abstract BasicMessageProducer createMessageProducer(final Destination destination, final boolean mandatory,
+ public abstract P createMessageProducer(final Destination destination, final boolean mandatory,
final boolean immediate, final boolean waitUntilSent, long producerId);
private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException
@@ -2320,12 +2305,12 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}
// we need to clone the list of consumers since the close() method updates the _consumers collection
// which would result in a concurrent modification exception
- final ArrayList<BasicMessageConsumer> clonedConsumers = new ArrayList<BasicMessageConsumer>(_consumers.values());
+ final ArrayList<C> clonedConsumers = new ArrayList<C>(_consumers.values());
- final Iterator<BasicMessageConsumer> it = clonedConsumers.iterator();
+ final Iterator<C> it = clonedConsumers.iterator();
while (it.hasNext())
{
- final BasicMessageConsumer con = it.next();
+ final C con = it.next();
con.markClosed();
}
// at this point the _consumers map will be empty
@@ -2360,7 +2345,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
*
* @throws AMQException
*/
- private void registerConsumer(BasicMessageConsumer consumer, boolean nowait) throws AMQException // , FailoverException
+ private void registerConsumer(C consumer, boolean nowait) throws AMQException // , FailoverException
{
AMQDestination amqd = consumer.getDestination();
@@ -2424,15 +2409,16 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
private void rejectAllMessages(boolean requeue)
{
- rejectMessagesForConsumerTag(null, requeue);
+ rejectMessagesForConsumerTag(0, requeue, true);
}
/**
* @param consumerTag The consumerTag to prune from queue or all if null
* @param requeue Should the removed messages be requeued (or discarded. Possibly to DLQ)
+ * @param rejectAllConsumers
*/
- private void rejectMessagesForConsumerTag(AMQShortString consumerTag, boolean requeue)
+ private void rejectMessagesForConsumerTag(int consumerTag, boolean requeue, boolean rejectAllConsumers)
{
Iterator messages = _queue.iterator();
if (_logger.isInfoEnabled())
@@ -2453,7 +2439,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
{
UnprocessedMessage message = (UnprocessedMessage) messages.next();
- if ((consumerTag == null) || message.getConsumerTag().equals(consumerTag.toString()))
+ if (rejectAllConsumers || (message.getConsumerTag() == consumerTag))
{
if (_logger.isDebugEnabled())
{
@@ -2475,12 +2461,11 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
private void resubscribeConsumers() throws AMQException
{
- ArrayList consumers = new ArrayList(_consumers.values());
+ ArrayList<C> consumers = new ArrayList<C>(_consumers.values());
_consumers.clear();
- for (Iterator it = consumers.iterator(); it.hasNext();)
- {
- BasicMessageConsumer consumer = (BasicMessageConsumer) it.next();
+ for (C consumer : consumers)
+ {
consumer.failedOver();
registerConsumer(consumer, true);
}
@@ -2492,53 +2477,11 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
_logger.info(MessageFormat.format("Resubscribing producers = {0} producers.size={1}", producers, producers.size())); // FIXME: removeKey
for (Iterator it = producers.iterator(); it.hasNext();)
{
- BasicMessageProducer producer = (BasicMessageProducer) it.next();
+ P producer = (P) it.next();
producer.resubscribe();
}
}
- private void returnBouncedMessage(final ReturnMessage msg)
- {
- _connection.performConnectionTask(new Runnable()
- {
- public void run()
- {
- try
- {
- // Bounced message is processed here, away from the mina thread
- AbstractJMSMessage bouncedMessage =
- _messageFactoryRegistry.createMessage(0, false, msg.getExchange(),
- msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies());
- AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode());
- AMQShortString reason = msg.getReplyText();
- _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
-
- // @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
- if (errorCode == AMQConstant.NO_CONSUMERS)
- {
- _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage, null));
- }
- else if (errorCode == AMQConstant.NO_ROUTE)
- {
- _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage, null));
- }
- else
- {
- _connection.exceptionReceived(
- new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage, null));
- }
-
- }
- catch (Exception e)
- {
- _logger.error(
- "Caught exception trying to raise undelivered message exception (dump follows) - ignoring...",
- e);
- }
- }
- });
- }
-
/**
* Suspends or unsuspends this session.
*
@@ -2641,6 +2584,10 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}
+ /** Used for debugging in the dispatcher. */
+ private static final Logger _dispatcherLogger = LoggerFactory.getLogger("org.apache.qpid.client.AMQSession.Dispatcher");
+
+
/** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
class Dispatcher extends Thread
{
@@ -2652,6 +2599,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
private final AtomicLong _rollbackMark = new AtomicLong(-1);
private String dispatcherID = "" + System.identityHashCode(this);
+
+
public Dispatcher()
{
super("Dispatcher-Channel-" + _channelId);
@@ -2670,7 +2619,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}
- public void rejectPending(BasicMessageConsumer consumer)
+ public void rejectPending(C consumer)
{
synchronized (_lock)
{
@@ -2685,7 +2634,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
consumer.rollbackPendingMessages();
// Reject messages on pre-dispatch queue
- rejectMessagesForConsumerTag(consumer.getConsumerTag(), true);
+ rejectMessagesForConsumerTag(consumer.getConsumerTag(), true, false);
//Let the dispatcher deal with this when it gets to them.
// closeConsumer
@@ -2712,7 +2661,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
_dispatcherLogger.debug("Session Pre Dispatch Queue cleared");
- for (BasicMessageConsumer consumer : _consumers.values())
+ for (C consumer : _consumers.values())
{
if (!consumer.isNoConsume())
{
@@ -2778,7 +2727,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
_lock.wait();
}
- if (!(message instanceof UnprocessedMessage.CloseConsumerMessage)
+ if (!(message instanceof CloseConsumerMessage)
&& tagLE(deliveryTag, _rollbackMark.get()))
{
rejectMessage(message, true);
@@ -2840,8 +2789,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
//This if block is not needed anymore as bounce messages are handled separately
//if (message.getDeliverBody() != null)
//{
- final BasicMessageConsumer consumer =
- _consumers.get(message.getConsumerTag().toIntValue());
+ final C consumer =
+ _consumers.get(message.getConsumerTag());
if ((consumer == null) || consumer.isClosed())
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 32a0fdd5f5..aa0ff66545 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -52,7 +52,7 @@ import java.util.Map;
/**
* This is a 0.10 Session
*/
-public class AMQSession_0_10 extends AMQSession
+public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, BasicMessageProducer_0_10>
{
/**
@@ -345,7 +345,7 @@ public class AMQSession_0_10 extends AMQSession
/**
* Create an 0_10 message consumer
*/
- public BasicMessageConsumer createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
+ public BasicMessageConsumer_0_10 createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
final int prefetchLow, final boolean noLocal,
final boolean exclusive, String messageSelector,
final FieldTable ft, final boolean noConsume,
@@ -406,8 +406,8 @@ public class AMQSession_0_10 extends AMQSession
* This method is invoked when a consumer is creted
* Registers the consumer with the broker
*/
- public void sendConsume(BasicMessageConsumer consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler,
- boolean nowait, String messageSelector, AMQShortString tag)
+ public void sendConsume(BasicMessageConsumer_0_10 consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler,
+ boolean nowait, String messageSelector, int tag)
throws AMQException, FailoverException
{
boolean preAcquire;
@@ -416,10 +416,10 @@ public class AMQSession_0_10 extends AMQSession
preAcquire = ( ! consumer.isNoConsume() &&
(consumer.getMessageSelector() == null || consumer.getMessageSelector().equals("")) )
|| !(consumer.getDestination() instanceof AMQQueue);
- getQpidSession().messageSubscribe(queueName.toString(), tag.toString(),
+ getQpidSession().messageSubscribe(queueName.toString(), String.valueOf(tag),
getAcknowledgeMode() == NO_ACKNOWLEDGE ? Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED:Session.TRANSFER_CONFIRM_MODE_REQUIRED,
preAcquire ? Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE : Session.TRANSFER_ACQUIRE_MODE_NO_ACQUIRE,
- new MessagePartListenerAdapter((BasicMessageConsumer_0_10) consumer), null,
+ (BasicMessageConsumer_0_10) consumer, null,
consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
}
catch (JMSException e)
@@ -427,21 +427,23 @@ public class AMQSession_0_10 extends AMQSession
throw new AMQException(AMQConstant.INTERNAL_ERROR, "problem when registering consumer", e);
}
+ String consumerTag = ((BasicMessageConsumer_0_10)consumer).getConsumerTagString();
+
if (! prefetch())
{
- getQpidSession().messageSetFlowMode(consumer.getConsumerTag().toString(), MessageFlowMode.CREDIT);
+ getQpidSession().messageSetFlowMode(consumerTag, MessageFlowMode.CREDIT);
}
else
{
- getQpidSession().messageSetFlowMode(consumer.getConsumerTag().toString(), MessageFlowMode.WINDOW);
+ getQpidSession().messageSetFlowMode(consumerTag, MessageFlowMode.WINDOW);
}
- getQpidSession().messageFlow(consumer.getConsumerTag().toString(), MessageCreditUnit.BYTE, 0xFFFFFFFF);
+ getQpidSession().messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF);
// We need to sync so that we get notify of an error.
// only if not immediat prefetch
if(prefetch() && (consumer.isStrated() || _immediatePrefetch))
{
// set the flow
- getQpidSession().messageFlow(consumer.getConsumerTag().toString(),
+ getQpidSession().messageFlow(consumerTag,
MessageCreditUnit.MESSAGE,
getAMQConnection().getMaxPrefetch());
}
@@ -452,7 +454,7 @@ public class AMQSession_0_10 extends AMQSession
/**
* Create an 0_10 message producer
*/
- public BasicMessageProducer createMessageProducer(final Destination destination, final boolean mandatory,
+ public BasicMessageProducer_0_10 createMessageProducer(final Destination destination, final boolean mandatory,
final boolean immediate, final boolean waitUntilSent,
long producerId)
{
@@ -542,13 +544,14 @@ public class AMQSession_0_10 extends AMQSession
{
for (BasicMessageConsumer consumer : _consumers.values())
{
- getQpidSession().messageStop(consumer.getConsumerTag().toString());
+ getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()));
}
}
else
{
- for (BasicMessageConsumer consumer : _consumers.values())
+ for (BasicMessageConsumer_0_10 consumer : _consumers.values())
{
+ String consumerTag = String.valueOf(consumer.getConsumerTag());
//only set if msg list is null
try
{
@@ -556,18 +559,18 @@ public class AMQSession_0_10 extends AMQSession
{
if (consumer.getMessageListener() != null)
{
- getQpidSession().messageFlow(consumer.getConsumerTag().toString(),
+ getQpidSession().messageFlow(consumerTag,
MessageCreditUnit.MESSAGE, 1);
}
}
else
{
getQpidSession()
- .messageFlow(consumer.getConsumerTag().toString(), MessageCreditUnit.MESSAGE,
+ .messageFlow(consumerTag, MessageCreditUnit.MESSAGE,
getAMQConnection().getMaxPrefetch());
}
getQpidSession()
- .messageFlow(consumer.getConsumerTag().toString(), MessageCreditUnit.BYTE, 0xFFFFFFFF);
+ .messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF);
}
catch (Exception e)
{
@@ -715,7 +718,7 @@ public class AMQSession_0_10 extends AMQSession
AMQTopic origTopic=checkValidTopic(topic);
AMQTopic dest=AMQTopic.createDurable010Topic(origTopic, name, _connection);
- TopicSubscriberAdaptor subscriber=_subscriptions.get(name);
+ TopicSubscriberAdaptor<BasicMessageConsumer_0_10> subscriber=_subscriptions.get(name);
if (subscriber != null)
{
if (subscriber.getTopic().equals(topic))
@@ -766,7 +769,7 @@ public class AMQSession_0_10 extends AMQSession
}
}
- subscriber=new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createExclusiveConsumer(dest));
+ subscriber=new TopicSubscriberAdaptor(dest, createExclusiveConsumer(dest));
_subscriptions.put(name, subscriber);
_reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index 2c88d6f557..2442b157f1 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -25,10 +25,11 @@ import javax.jms.*;
import javax.jms.IllegalStateException;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQUndeliveredException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.failover.FailoverRetrySupport;
-import org.apache.qpid.client.message.MessageFactoryRegistry;
+import org.apache.qpid.client.message.*;
import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
@@ -43,7 +44,7 @@ import org.slf4j.LoggerFactory;
import java.util.Map;
-public final class AMQSession_0_8 extends AMQSession
+public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8>
{
/** Used for debugging. */
@@ -218,6 +219,7 @@ public final class AMQSession_0_8 extends AMQSession
return isQueueBound(destination.getExchangeName(),destination.getAMQQueueName(),destination.getAMQQueueName());
}
+
public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)
throws JMSException
{
@@ -245,10 +247,14 @@ public final class AMQSession_0_8 extends AMQSession
{
throw new JMSAMQException("Queue bound query failed: " + e.getMessage(), e);
}
- }
-
- public void sendConsume(BasicMessageConsumer consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, boolean nowait,
- String messageSelector, AMQShortString tag) throws AMQException, FailoverException
+ }
+
+ @Override public void sendConsume(BasicMessageConsumer_0_8 consumer,
+ AMQShortString queueName,
+ AMQProtocolHandler protocolHandler,
+ boolean nowait,
+ String messageSelector,
+ int tag) throws AMQException, FailoverException
{
FieldTable arguments = FieldTableFactory.newFieldTable();
if ((messageSelector != null) && !messageSelector.equals(""))
@@ -268,7 +274,7 @@ public final class AMQSession_0_8 extends AMQSession
BasicConsumeBody body = getMethodRegistry().createBasicConsumeBody(getTicket(),
queueName,
- tag,
+ new AMQShortString(String.valueOf(tag)),
consumer.isNoLocal(),
consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE,
consumer.isExclusive(),
@@ -337,7 +343,7 @@ public final class AMQSession_0_8 extends AMQSession
}
- public BasicMessageProducer createMessageProducer(final Destination destination, final boolean mandatory,
+ public BasicMessageProducer_0_8 createMessageProducer(final Destination destination, final boolean mandatory,
final boolean immediate, final boolean waitUntilSent, long producerId)
{
@@ -345,6 +351,66 @@ public final class AMQSession_0_8 extends AMQSession
this, getProtocolHandler(), producerId, immediate, mandatory, waitUntilSent);
}
+
+ @Override public void messageReceived(UnprocessedMessage message)
+ {
+
+ if (message instanceof ReturnMessage)
+ {
+ // Return of the bounced message.
+ returnBouncedMessage((ReturnMessage) message);
+ }
+ else
+ {
+ super.messageReceived(message);
+ }
+ }
+
+ private void returnBouncedMessage(final ReturnMessage msg)
+ {
+ _connection.performConnectionTask(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ // Bounced message is processed here, away from the mina thread
+ AbstractJMSMessage bouncedMessage =
+ _messageFactoryRegistry.createMessage(0, false, msg.getExchange(),
+ msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies());
+ AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode());
+ AMQShortString reason = msg.getReplyText();
+ _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
+
+ // @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
+ if (errorCode == AMQConstant.NO_CONSUMERS)
+ {
+ _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage, null));
+ }
+ else if (errorCode == AMQConstant.NO_ROUTE)
+ {
+ _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage, null));
+ }
+ else
+ {
+ _connection.exceptionReceived(
+ new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage, null));
+ }
+
+ }
+ catch (Exception e)
+ {
+ _logger.error(
+ "Caught exception trying to raise undelivered message exception (dump follows) - ignoring...",
+ e);
+ }
+ }
+ });
+ }
+
+
+
+
public void sendRollback() throws AMQException, FailoverException
{
TxRollbackBody body = getMethodRegistry().createTxRollbackBody();
@@ -365,7 +431,7 @@ public final class AMQSession_0_8 extends AMQSession
checkNotClosed();
AMQTopic origTopic = checkValidTopic(topic);
AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection);
- TopicSubscriberAdaptor subscriber = _subscriptions.get(name);
+ TopicSubscriberAdaptor<BasicMessageConsumer_0_8> subscriber = _subscriptions.get(name);
if (subscriber != null)
{
if (subscriber.getTopic().equals(topic))
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 0176f66552..01bb68c23e 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -22,10 +22,7 @@ package org.apache.qpid.client;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.failover.FailoverException;
-import org.apache.qpid.client.message.AbstractJMSMessage;
-import org.apache.qpid.client.message.MessageFactoryRegistry;
-import org.apache.qpid.client.message.UnprocessedMessage;
-import org.apache.qpid.client.message.AMQMessageDelegateFactory;
+import org.apache.qpid.client.message.*;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.*;
import org.apache.qpid.jms.MessageConsumer;
@@ -49,7 +46,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-public abstract class BasicMessageConsumer<H, B> extends Closeable implements MessageConsumer
+public abstract class BasicMessageConsumer<U> extends Closeable implements MessageConsumer
{
private static final Logger _logger = LoggerFactory.getLogger(BasicMessageConsumer.class);
@@ -72,7 +69,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me
private final AtomicReference<MessageListener> _messageListener = new AtomicReference<MessageListener>();
/** The consumer tag allows us to close the consumer by sending a jmsCancel method to the broker */
- protected AMQShortString _consumerTag;
+ protected int _consumerTag;
/** We need to know the channel id when constructing frames */
protected final int _channelId;
@@ -517,7 +514,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me
throw e;
}
- else if (o instanceof UnprocessedMessage.CloseConsumerMessage)
+ else if (o instanceof CloseConsumerMessage)
{
_closed.set(true);
deregisterConsumer();
@@ -635,7 +632,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me
* @param closeMessage
* this message signals that we should close the browser
*/
- public void notifyCloseMessage(UnprocessedMessage.CloseConsumerMessage closeMessage)
+ public void notifyCloseMessage(CloseConsumerMessage closeMessage)
{
if (isMessageListenerSet())
{
@@ -667,26 +664,21 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me
*
* @param messageFrame the raw unprocessed mesage
*/
- void notifyMessage(UnprocessedMessage messageFrame)
+ void notifyMessage(U messageFrame)
{
- if (messageFrame instanceof UnprocessedMessage.CloseConsumerMessage)
+ if (messageFrame instanceof CloseConsumerMessage)
{
- notifyCloseMessage((UnprocessedMessage.CloseConsumerMessage) messageFrame);
+ notifyCloseMessage((CloseConsumerMessage) messageFrame);
return;
}
- final boolean debug = _logger.isDebugEnabled();
- if (debug)
- {
- _logger.debug("notifyMessage called with message number " + messageFrame.getDeliveryTag());
- }
try
{
AbstractJMSMessage jmsMessage = createJMSMessageFromUnprocessedMessage(_session.getMessageDelegateFactory(), messageFrame);
- if (debug)
+ if (_logger.isDebugEnabled())
{
_logger.debug("Message is of type: " + jmsMessage.getClass().getName());
}
@@ -721,7 +713,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me
}
}
- public abstract AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, UnprocessedMessage<H, B> messageFrame)
+ public abstract AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, U messageFrame)
throws Exception;
/** @param jmsMessage this message has already been processed so can't redo preDeliver */
@@ -936,12 +928,12 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me
_session.deregisterConsumer(this);
}
- public AMQShortString getConsumerTag()
+ public int getConsumerTag()
{
return _consumerTag;
}
- public void setConsumerTag(AMQShortString consumerTag)
+ public void setConsumerTag(int consumerTag)
{
_consumerTag = consumerTag;
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 388adfb434..2a37298a43 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -22,11 +22,8 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.client.message.*;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.AMQException;
-import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.api.Message;
import org.apache.qpid.transport.*;
import org.apache.qpid.QpidException;
import org.apache.qpid.filter.MessageFilter;
@@ -35,16 +32,14 @@ import org.apache.qpid.filter.JMSSelectorFilter;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.jms.MessageListener;
-import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* This is a 0.10 message consumer.
*/
-public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], ByteBuffer>
- implements org.apache.qpid.nclient.util.MessageListener
+public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedMessage_0_10>
+ implements org.apache.qpid.nclient.MessagePartListener
{
/**
@@ -76,6 +71,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
* Specify whether this consumer is performing a sync receive
*/
private final AtomicBoolean _syncReceive = new AtomicBoolean(false);
+ private String _consumerTagString;
//--- constructor
protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination,
@@ -106,6 +102,19 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
_isStarted = connection.started();
}
+
+ @Override public void setConsumerTag(int consumerTag)
+ {
+ super.setConsumerTag(consumerTag);
+ _consumerTagString = String.valueOf(consumerTag);
+ }
+
+ public String getConsumerTagString()
+ {
+ return _consumerTagString;
+ }
+
+
// ----- Interface org.apache.qpid.client.util.MessageListener
/**
@@ -142,7 +151,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
{
if (isMessageListenerSet() && ! getSession().prefetch())
{
- _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+ _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
MessageCreditUnit.MESSAGE, 1);
}
_logger.debug("messageOk, trying to notify");
@@ -155,52 +164,19 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
/**
* This method is invoked by the transport layer when a message is delivered for this
* consumer. The message is transformed and pass to the session.
- * @param message an 0.10 message
+ * @param xfr an 0.10 message transfer
*/
- public void onMessage(Message message)
+ public void messageTransfer(MessageTransfer xfr)
+
+ //public void onMessage(Message message)
{
int channelId = getSession().getChannelId();
- long deliveryId = message.getMessageTransferId();
- AMQShortString consumerTag = getConsumerTag();
- AMQShortString exchange;
- AMQShortString routingKey;
- boolean redelivered = false;
- Struct[] headers = {message.getMessageProperties(), message.getDeliveryProperties()};
- if (headers[0] == null) {
- headers[0] = new MessageProperties();
- }
- if( message.getDeliveryProperties() != null )
- {
- exchange = new AMQShortString(message.getDeliveryProperties().getExchange());
- routingKey = new AMQShortString(message.getDeliveryProperties().getRoutingKey());
- redelivered = message.getDeliveryProperties().getRedelivered();
- }
- else
- {
- exchange = new AMQShortString("");
- routingKey = new AMQShortString("");
- headers[1] = new DeliveryProperties();
- }
+ int consumerTag = getConsumerTag();
+
UnprocessedMessage_0_10 newMessage =
- new UnprocessedMessage_0_10(channelId, deliveryId, consumerTag, exchange, routingKey, redelivered);
- try
- {
- newMessage.receiveBody(message.readData());
- }
- catch (IOException e)
- {
- getSession().getAMQConnection().exceptionReceived(e);
- }
- // if there is a replyto destination then we need to request the exchange info
- ReplyTo replyTo = ((MessageProperties) headers[0]).getReplyTo();
- if (replyTo != null && replyTo.getExchange() != null && !replyTo.getExchange().equals(""))
- {
- // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
- // the exchnage class will be set later from within the sesion thread
- String replyToUrl = replyTo.getExchange() + "/" + replyTo.getRoutingKey() + "/" + replyTo.getRoutingKey();
- newMessage.setReplyToURL(replyToUrl);
- }
- newMessage.setContentHeader(headers);
+ new UnprocessedMessage_0_10(consumerTag, xfr);
+
+
getSession().messageReceived(newMessage);
// else ignore this message
}
@@ -213,47 +189,16 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
*/
@Override void sendCancel() throws AMQException
{
- ((AMQSession_0_10) getSession()).getQpidSession().messageCancel(getConsumerTag().toString());
+ ((AMQSession_0_10) getSession()).getQpidSession().messageCancel(getConsumerTagString());
((AMQSession_0_10) getSession()).getQpidSession().sync();
// confirm cancel
getSession().confirmConsumerCancelled(getConsumerTag());
((AMQSession_0_10) getSession()).getCurrentException();
}
- @Override void notifyMessage(UnprocessedMessage messageFrame)
+ @Override void notifyMessage(UnprocessedMessage_0_10 messageFrame)
{
- // if there is a replyto destination then we need to request the exchange info
- String replyToURL = messageFrame.getReplyToURL();
- if (replyToURL != null && !replyToURL.equals(""))
- {
- AMQShortString shortExchangeName = new AMQShortString( replyToURL.substring(0, replyToURL.indexOf('/')));
- String replyToUrl = "://" + replyToURL;
- if (shortExchangeName.equals(ExchangeDefaults.TOPIC_EXCHANGE_NAME))
- {
- replyToUrl = ExchangeDefaults.TOPIC_EXCHANGE_CLASS + replyToUrl;
- }
- else if (shortExchangeName.equals(ExchangeDefaults.DIRECT_EXCHANGE_NAME))
- {
- replyToUrl = ExchangeDefaults.DIRECT_EXCHANGE_CLASS + replyToUrl;
- }
- else if (shortExchangeName.equals(ExchangeDefaults.HEADERS_EXCHANGE_NAME))
- {
- replyToUrl = ExchangeDefaults.HEADERS_EXCHANGE_CLASS + replyToUrl;
- }
- else if (shortExchangeName.equals(ExchangeDefaults.FANOUT_EXCHANGE_NAME))
- {
- replyToUrl = ExchangeDefaults.FANOUT_EXCHANGE_CLASS + replyToUrl;
- }
- else
- {
- Future<ExchangeQueryResult> future =
- ((AMQSession_0_10) getSession()).getQpidSession().exchangeQuery(shortExchangeName.toString());
- ExchangeQueryResult res = future.get();
- // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
- replyToUrl = res.getType() + replyToUrl;
- }
- ((UnprocessedMessage_0_10) messageFrame).setReplyToURL(replyToUrl);
- }
+
super.notifyMessage(messageFrame);
}
@@ -267,11 +212,10 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
}
@Override public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(
- AMQMessageDelegateFactory delegateFactory, UnprocessedMessage<Struct[], ByteBuffer> messageFrame) throws Exception
+ AMQMessageDelegateFactory delegateFactory, UnprocessedMessage_0_10 msg) throws Exception
{
- return _messageFactory.createMessage(messageFrame.getDeliveryTag(), messageFrame.isRedelivered(),
- messageFrame.getContentHeader(), messageFrame.getBodies()
- );
+ AMQMessageDelegate_0_10.updateExchangeTypeMapping(msg.getMessageTransfer().getHeader(), ((AMQSession_0_10)getSession()).getQpidSession());
+ return _messageFactory.createMessage(msg.getMessageTransfer());
}
// private methods
@@ -327,7 +271,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
// and messages are not prefetched we then need to request another one
if(! getSession().prefetch())
{
- _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+ _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
MessageCreditUnit.MESSAGE, 1);
}
}
@@ -415,7 +359,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
super.setMessageListener(messageListener);
if (messageListener != null && ! getSession().prefetch())
{
- _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+ _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
MessageCreditUnit.MESSAGE, 1);
}
if (messageListener != null && !_synchronousQueue.isEmpty())
@@ -440,7 +384,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
_isStarted = true;
if (_syncReceive.get())
{
- _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+ _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
MessageCreditUnit.MESSAGE, 1);
}
}
@@ -463,7 +407,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
{
if (isStrated() && ! getSession().prefetch() && _synchronousQueue.isEmpty())
{
- _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+ _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
MessageCreditUnit.MESSAGE, 1);
}
if (! getSession().prefetch())
@@ -486,4 +430,5 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
_session.acknowledgeMessage(msg.getDeliveryTag(), false);
}
}
+
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
index e601f454a4..494a8fb43d 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
@@ -26,22 +26,14 @@ import javax.jms.JMSException;
import org.apache.qpid.AMQException;
import org.apache.qpid.QpidException;
import org.apache.qpid.client.failover.FailoverException;
-import org.apache.qpid.client.message.AbstractJMSMessage;
-import org.apache.qpid.client.message.MessageFactoryRegistry;
-import org.apache.qpid.client.message.UnprocessedMessage;
-import org.apache.qpid.client.message.AMQMessageDelegateFactory;
+import org.apache.qpid.client.message.*;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.filter.JMSSelectorFilter;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.BasicCancelBody;
-import org.apache.qpid.framing.BasicCancelOkBody;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<ContentHeaderBody,ContentBody>
+public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMessage_0_8>
{
protected final Logger _logger = LoggerFactory.getLogger(getClass());
@@ -69,7 +61,7 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<ContentHeader
void sendCancel() throws AMQException, FailoverException
{
- BasicCancelBody body = getSession().getMethodRegistry().createBasicCancelBody(_consumerTag, false);
+ BasicCancelBody body = getSession().getMethodRegistry().createBasicCancelBody(new AMQShortString(String.valueOf(_consumerTag)), false);
final AMQFrame cancelFrame = body.generateFrame(_channelId);
@@ -81,7 +73,7 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<ContentHeader
}
}
- public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, UnprocessedMessage<ContentHeaderBody, ContentBody> messageFrame)throws Exception
+ public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, UnprocessedMessage_0_8 messageFrame)throws Exception
{
return _messageFactory.createMessage(messageFrame.getDeliveryTag(),
@@ -90,4 +82,4 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<ContentHeader
}
-} \ No newline at end of file
+}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
index 2f06dc9d39..02c5526e03 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
@@ -22,6 +22,7 @@ import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import java.nio.ByteBuffer;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -36,11 +37,8 @@ import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.nclient.util.ByteBufferMessage;
import org.apache.qpid.njms.ExceptionHelper;
-import org.apache.qpid.transport.DeliveryProperties;
-import org.apache.qpid.transport.MessageDeliveryMode;
-import org.apache.qpid.transport.MessageDeliveryPriority;
-import org.apache.qpid.transport.MessageProperties;
-import org.apache.qpid.transport.ReplyTo;
+import org.apache.qpid.transport.*;
+import static org.apache.qpid.transport.Option.*;
/**
* This is a 0_10 message producer.
@@ -76,21 +74,8 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
AMQMessageDelegate_0_10 delegate = (AMQMessageDelegate_0_10) message.getDelegate();
- org.apache.qpid.api.Message underlyingMessage = message.get010Message();
- if (underlyingMessage == null)
- {
- underlyingMessage = new ByteBufferMessage(delegate.getMessageProperties(), delegate.getDeliveryProperties());
- message.set010Message(underlyingMessage);
-
- }
- // force a rebuild of the 0-10 message if data has changed
- if (message.getData() == null)
- {
- message.dataChanged();
- }
-
- DeliveryProperties deliveryProp = underlyingMessage.getDeliveryProperties();
- MessageProperties messageProps = underlyingMessage.getMessageProperties();
+ DeliveryProperties deliveryProp = delegate.getDeliveryProperties();
+ MessageProperties messageProps = delegate.getMessageProperties();
if (messageId != null)
{
@@ -140,10 +125,10 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
deliveryProp.setPriority(MessageDeliveryPriority.get((short) priority));
message.setJMSPriority(priority);
}
- String excahngeName = destination.getExchangeName().toString();
- if ( deliveryProp.getExchange() == null || ! deliveryProp.getExchange().equals(excahngeName))
+ String exchangeName = destination.getExchangeName().toString();
+ if ( deliveryProp.getExchange() == null || ! deliveryProp.getExchange().equals(exchangeName))
{
- deliveryProp.setExchange(excahngeName);
+ deliveryProp.setExchange(exchangeName);
}
String routingKey = destination.getRoutingKey().toString();
if (deliveryProp.getRoutingKey() == null || ! deliveryProp.getRoutingKey().equals(routingKey))
@@ -153,63 +138,27 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
messageProps.setContentLength(message.getContentLength());
-
- /* String replyToURL = contentHeaderProperties.getReplyToAsString();
- if (replyToURL != null)
- {
- if(_logger.isDebugEnabled())
- {
- StringBuffer b = new StringBuffer();
- b.append("\n==========================");
- b.append("\nReplyTo : " + replyToURL);
- b.append("\n==========================");
- _logger.debug(b.toString());
- }
- AMQBindingURL dest;
- try
- {
- dest = new AMQBindingURL(replyToURL);
- }
- catch (URISyntaxException e)
- {
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }
- messageProps.setReplyTo(new ReplyTo(dest.getExchangeName().toString(), dest.getRoutingKey().toString()));
- }
-*/
-
-
// send the message
try
{
- org.apache.qpid.nclient.Session ssn = ((AMQSession_0_10) getSession()).getQpidSession();
+ org.apache.qpid.transport.Session ssn = (org.apache.qpid.transport.Session)
+ ((AMQSession_0_10) getSession()).getQpidSession();
// if true, we need to sync the delivery of this message
boolean sync = (deliveryMode == DeliveryMode.PERSISTENT &&
getSession().getAMQConnection().getSyncPersistence());
+ org.apache.mina.common.ByteBuffer data = message.getData();
+ ByteBuffer buffer = data == null ? ByteBuffer.allocate(0) : data.buf().slice();
+
+ ssn.messageTransfer(destination.getExchangeName().toString(), MessageAcceptMode.NONE,
+ MessageAcquireMode.PRE_ACQUIRED,
+ new Header(deliveryProp, messageProps),
+ buffer, sync ? SYNC : NONE);
if (sync)
{
- ssn.setAutoSync(true);
- }
- try
- {
- ssn.messageTransfer(destination.getExchangeName().toString(),
- underlyingMessage,
- ssn.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
- ssn.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE);
+ ssn.sync();
}
- finally
- {
- if (sync)
- {
- ssn.setAutoSync(false);
- }
- }
- }
- catch (IOException e)
- {
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
}
catch (RuntimeException rte)
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java b/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java
index 29fa03fd1d..9bdef22f96 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java
@@ -32,20 +32,20 @@ import javax.jms.TopicSubscriber;
* Wraps a MessageConsumer to fulfill the extended TopicSubscriber contract
*
*/
-class TopicSubscriberAdaptor implements TopicSubscriber
+class TopicSubscriberAdaptor<C extends BasicMessageConsumer> implements TopicSubscriber
{
private final Topic _topic;
- private final BasicMessageConsumer _consumer;
+ private final C _consumer;
private final boolean _noLocal;
- TopicSubscriberAdaptor(Topic topic, BasicMessageConsumer consumer, boolean noLocal)
+ TopicSubscriberAdaptor(Topic topic, C consumer, boolean noLocal)
{
_topic = topic;
_consumer = consumer;
_noLocal = noLocal;
}
- TopicSubscriberAdaptor(Topic topic, BasicMessageConsumer consumer)
+ TopicSubscriberAdaptor(Topic topic, C consumer)
{
this(topic, consumer, consumer.isNoLocal());
}
@@ -103,7 +103,7 @@ class TopicSubscriberAdaptor implements TopicSubscriber
}
private void checkPreConditions() throws javax.jms.IllegalStateException{
- BasicMessageConsumer msgConsumer = (BasicMessageConsumer)_consumer;
+ C msgConsumer = _consumer;
if (msgConsumer.isClosed() ){
throw new javax.jms.IllegalStateException("Consumer is closed");
@@ -120,7 +120,7 @@ class TopicSubscriberAdaptor implements TopicSubscriber
}
}
- BasicMessageConsumer getMessageConsumer()
+ C getMessageConsumer()
{
return _consumer;
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
index 6029e7c171..6237234c4d 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
@@ -43,13 +43,12 @@ public class BasicDeliverMethodHandler implements StateAwareMethodListener<Basic
throws AMQException
{
final UnprocessedMessage_0_8 msg = new UnprocessedMessage_0_8(
- channelId,
body.getDeliveryTag(),
- body.getConsumerTag(),
+ body.getConsumerTag().toIntValue(),
body.getExchange(),
body.getRoutingKey(),
body.getRedelivered());
_logger.debug("New JmsDeliver method received:" + session);
- session.unprocessedMessageReceived(msg);
+ session.unprocessedMessageReceived(channelId, msg);
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java
index 5731fb7473..3bbc9209c5 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java
@@ -45,14 +45,14 @@ public class BasicReturnMethodHandler implements StateAwareMethodListener<BasicR
throws AMQException
{
_logger.debug("New JmsBounce method received");
- final ReturnMessage msg = new ReturnMessage(channelId,
+ final ReturnMessage msg = new ReturnMessage(
body.getExchange(),
body.getRoutingKey(),
body.getReplyText(),
body.getReplyCode()
);
- session.unprocessedMessageReceived(msg);
+ session.unprocessedMessageReceived(channelId, msg);
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
index 97cb2baee4..bfb7b6a9ce 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
@@ -22,19 +22,14 @@
package org.apache.qpid.client.message;
import org.apache.commons.collections.map.ReferenceMap;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.CustomJMSXProperty;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.client.AMQUndefinedDestination;
-import org.apache.qpid.client.JMSAMQException;
+import org.apache.qpid.client.*;
import org.apache.qpid.framing.ContentHeaderProperties;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQPInvalidClassException;
+import org.apache.qpid.nclient.*;
import org.apache.qpid.jms.Message;
import org.apache.qpid.url.BindingURL;
import org.apache.qpid.url.AMQBindingURL;
@@ -47,8 +42,10 @@ import javax.jms.MessageNotWriteableException;
import javax.jms.MessageFormatException;
import javax.jms.DeliveryMode;
import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
import java.net.URISyntaxException;
import java.nio.charset.Charset;
+import org.apache.qpid.exchange.ExchangeDefaults;
public class AMQMessageDelegate_0_10 implements AMQMessageDelegate
{
@@ -68,27 +65,32 @@ public class AMQMessageDelegate_0_10 implements AMQMessageDelegate
private AMQSession _session;
private final long _deliveryTag;
- private static Map<String,Integer> _exchangeTypeMap = new HashMap<String, Integer>();
+ private static Map<AMQShortString,Integer> _exchangeTypeMap = new ConcurrentHashMap<AMQShortString, Integer>();
+ private static Map<String,Integer> _exchangeTypeStringMap = new ConcurrentHashMap<String, Integer>();
+ private static Map<String, Integer> _exchangeTypeToDestinationType = new ConcurrentHashMap<String, Integer>();;
static
{
- // TODO - XXX - Need to add to this map when we find an exchange we don't know about
+ _exchangeTypeMap.put(ExchangeDefaults.DIRECT_EXCHANGE_NAME, AMQDestination.QUEUE_TYPE);
+ _exchangeTypeMap.put(AMQShortString.EMPTY_STRING, AMQDestination.QUEUE_TYPE);
+ _exchangeTypeMap.put(ExchangeDefaults.TOPIC_EXCHANGE_NAME, AMQDestination.TOPIC_TYPE);
+ _exchangeTypeMap.put(ExchangeDefaults.FANOUT_EXCHANGE_NAME, AMQDestination.TOPIC_TYPE);
- _exchangeTypeMap.put(null, AMQDestination.QUEUE_TYPE);
- _exchangeTypeMap.put("amq.direct", AMQDestination.QUEUE_TYPE);
- _exchangeTypeMap.put("", AMQDestination.QUEUE_TYPE);
- _exchangeTypeMap.put("amq.topic", AMQDestination.TOPIC_TYPE);
- _exchangeTypeMap.put("amq.fanout", AMQDestination.TOPIC_TYPE);
+ _exchangeTypeStringMap.put(ExchangeDefaults.DIRECT_EXCHANGE_NAME.toString(), AMQDestination.QUEUE_TYPE);
+ _exchangeTypeStringMap.put("", AMQDestination.QUEUE_TYPE);
+ _exchangeTypeStringMap.put(ExchangeDefaults.TOPIC_EXCHANGE_NAME.toString(), AMQDestination.TOPIC_TYPE);
+ _exchangeTypeStringMap.put(ExchangeDefaults.FANOUT_EXCHANGE_NAME.toString(), AMQDestination.TOPIC_TYPE);
-
+
+ _exchangeTypeToDestinationType.put(ExchangeDefaults.DIRECT_EXCHANGE_CLASS.toString(), AMQDestination.QUEUE_TYPE);
+ _exchangeTypeToDestinationType.put(ExchangeDefaults.TOPIC_EXCHANGE_NAME.toString(), AMQDestination.TOPIC_TYPE);
+ _exchangeTypeToDestinationType.put(ExchangeDefaults.FANOUT_EXCHANGE_NAME.toString(), AMQDestination.TOPIC_TYPE);
}
protected AMQMessageDelegate_0_10()
{
this(new MessageProperties(), new DeliveryProperties(), -1);
_readableProperties = false;
-
-
}
protected AMQMessageDelegate_0_10(long deliveryTag, MessageProperties messageProps, DeliveryProperties deliveryProps, AMQShortString exchange,
@@ -99,13 +101,68 @@ public class AMQMessageDelegate_0_10 implements AMQMessageDelegate
AMQDestination dest;
- dest = new AMQUndefinedDestination(exchange, routingKey, null);
-
- // Destination dest = AMQDestination.createDestination(url);
+ dest = generateDestination(exchange, routingKey);
setJMSDestination(dest);
+ }
+ private AMQDestination generateDestination(AMQShortString exchange, AMQShortString routingKey)
+ {
+ AMQDestination dest;
+ switch(getExchangeType(exchange))
+ {
+ case AMQDestination.QUEUE_TYPE:
+ dest = new AMQQueue(exchange, routingKey, routingKey);
+ break;
+ case AMQDestination.TOPIC_TYPE:
+ dest = new AMQTopic(exchange, routingKey, null);
+ break;
+ default:
+ dest = new AMQUndefinedDestination(exchange, routingKey, null);
+ }
+ return dest;
+ }
+
+ private int getExchangeType(AMQShortString exchange)
+ {
+ Integer type = _exchangeTypeMap.get(exchange == null ? AMQShortString.EMPTY_STRING : exchange);
+
+ if(type == null)
+ {
+ return AMQDestination.UNKNOWN_TYPE;
+ }
+
+
+ return type;
+ }
+
+
+ public static void updateExchangeTypeMapping(Header header, org.apache.qpid.nclient.Session session)
+ {
+ DeliveryProperties deliveryProps = header.get(DeliveryProperties.class);
+ if(deliveryProps != null)
+ {
+ String exchange = deliveryProps.getExchange();
+
+ if(exchange != null && !_exchangeTypeStringMap.containsKey(exchange))
+ {
+
+ AMQShortString exchangeShortString = new AMQShortString(exchange);
+ Future<ExchangeQueryResult> future =
+ session.exchangeQuery(exchange.toString());
+ ExchangeQueryResult res = future.get();
+
+ Integer type = _exchangeTypeToDestinationType.get(res.getType());
+ if(type == null)
+ {
+ type = AMQDestination.UNKNOWN_TYPE;
+ }
+ _exchangeTypeStringMap.put(exchange, type);
+ _exchangeTypeMap.put(exchangeShortString, type);
+
+ }
+ }
}
protected AMQMessageDelegate_0_10(MessageProperties messageProps, DeliveryProperties deliveryProps, long deliveryTag)
@@ -212,19 +269,10 @@ public class AMQMessageDelegate_0_10 implements AMQMessageDelegate
String exchange = replyTo.getExchange();
String routingKey = replyTo.getRoutingKey();
- int type = _exchangeTypeMap.get(exchange);
+ dest = generateDestination(exchange == null ? null : new AMQShortString(exchange),
+ routingKey == null ? null : new AMQShortString(routingKey));
+
- switch(type)
- {
- case AMQDestination.QUEUE_TYPE:
- dest = new AMQQueue(new AMQShortString(exchange), new AMQShortString(routingKey), new AMQShortString(routingKey));
- break;
- case AMQDestination.TOPIC_TYPE:
- dest = new AMQTopic(new AMQShortString(exchange), new AMQShortString(routingKey), null);
- break;
- default:
- dest = new AMQUndefinedDestination(new AMQShortString(exchange), new AMQShortString(routingKey), null);
- }
@@ -897,4 +945,5 @@ public class AMQMessageDelegate_0_10 implements AMQMessageDelegate
{
return _deliveryProps;
}
+
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
index 54da7c4404..0700ce5d23 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
@@ -44,17 +44,11 @@ public abstract class AbstractJMSMessage implements org.apache.qpid.jms.Message
protected boolean _readableMessage = false;
protected boolean _changedData = true;
-
- /**
- * This is 0_10 specific
- */
- private org.apache.qpid.api.Message _010message = null;
/** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */
-
private AMQMessageDelegate _delegate;
private boolean _redelivered;
@@ -454,52 +448,10 @@ public abstract class AbstractJMSMessage implements org.apache.qpid.jms.Message
else
{
_data.flip();
- dataChanged();
_changedData = false;
}
}
- public void set010Message(org.apache.qpid.api.Message m )
- {
- _010message = m;
- }
-
- public void dataChanged()
- {
- if (_010message != null)
- {
- _010message.clearData();
- try
- {
- if (_data != null)
- {
- _010message.appendData(_data.buf().slice());
- }
- else
- {
- _010message.appendData(java.nio.ByteBuffer.allocate(0));
- }
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- }
- }
-
- /**
- * End 010 specific
- */
-
- public org.apache.qpid.api.Message get010Message()
- {
- return _010message;
- }
-
-
-
-
-
public int getContentLength()
{
if(_data != null)
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
index 5e16c765af..54a845ceef 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
@@ -110,17 +110,17 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory
protected AbstractJMSMessage create010MessageWithBody(long messageNbr, Struct[] contentHeader,
- List bodies) throws AMQException
+ java.nio.ByteBuffer body) throws AMQException
{
ByteBuffer data;
final boolean debug = _logger.isDebugEnabled();
- // we optimise the non-fragmented case to avoid copying
- if ((bodies != null))
+
+ if (body != null)
{
- data = ByteBuffer.wrap((java.nio.ByteBuffer) bodies.get(0));
+ data = ByteBuffer.wrap(body);
}
- else // bodies == null
+ else // body == null
{
data = ByteBuffer.allocate(0);
}
@@ -164,11 +164,11 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory
}
public AbstractJMSMessage createMessage(long messageNbr, boolean redelivered, Struct[] contentHeader,
- List bodies)
+ java.nio.ByteBuffer body)
throws JMSException, AMQException
{
final AbstractJMSMessage msg =
- create010MessageWithBody(messageNbr, contentHeader, bodies);
+ create010MessageWithBody(messageNbr, contentHeader, body);
msg.setJMSRedelivered(redelivered);
msg.receivedFromServer();
return msg;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/CloseConsumerMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/CloseConsumerMessage.java
new file mode 100644
index 0000000000..4af04912e5
--- /dev/null
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/CloseConsumerMessage.java
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.message;
+
+import org.apache.qpid.client.BasicMessageConsumer;
+
+public final class CloseConsumerMessage extends UnprocessedMessage
+{
+
+ public CloseConsumerMessage(BasicMessageConsumer consumer)
+ {
+ super(consumer.getConsumerTag());
+ }
+
+
+ public long getDeliveryTag()
+ {
+ return 0;
+ }
+
+ public boolean isRedelivered()
+ {
+ return false;
+ }
+}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
index 470439d85c..c290149cef 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
@@ -66,17 +66,6 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text
}
- JMSTextMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data) throws JMSException
- {
- this(delegateFactory, data, null);
- }
-
- JMSTextMessage(AMQMessageDelegateFactory delegateFactory, String text) throws JMSException
- {
- super(delegateFactory, (ByteBuffer) null);
- setText(text);
- }
-
public void clearBodyImpl() throws JMSException
{
if (_data != null)
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
index 424dccc0c3..e1275c37f7 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
@@ -40,7 +40,7 @@ public interface MessageFactory
AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered,
Struct[] contentHeader,
- List bodies)
+ java.nio.ByteBuffer body)
throws JMSException, AMQException;
AbstractJMSMessage createMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
index 213d3ebc9e..948d6d0d7d 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
@@ -23,6 +23,7 @@ package org.apache.qpid.client.message;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.nio.ByteBuffer;
import javax.jms.JMSException;
@@ -32,6 +33,8 @@ import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.transport.Struct;
import org.apache.qpid.transport.MessageProperties;
+import org.apache.qpid.transport.MessageTransfer;
+import org.apache.qpid.transport.DeliveryProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -118,24 +121,30 @@ public class MessageFactoryRegistry
}
}
- public AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered,
- Struct[] contentHeader, List bodies) throws AMQException, JMSException
+ public AbstractJMSMessage createMessage(MessageTransfer transfer) throws AMQException, JMSException
{
- MessageProperties mprop = (MessageProperties) contentHeader[0];
+
+ MessageProperties mprop = transfer.getHeader().get(MessageProperties.class);
String messageType = mprop.getContentType();
if (messageType == null)
{
_logger.debug("no message type specified, building a byte message");
messageType = JMSBytesMessage.MIME_TYPE;
}
- MessageFactory mf = _mimeShortStringToFactoryMap.get(new AMQShortString(messageType));
+ MessageFactory mf = _mimeStringToFactoryMap.get(messageType);
if (mf == null)
{
throw new AMQException(null, "Unsupport MIME type of " + messageType, null);
}
else
{
- return mf.createMessage(deliveryTag, redelivered, contentHeader, bodies);
+ boolean redelivered = false;
+ DeliveryProperties deliverProps;
+ if((deliverProps = transfer.getHeader().get(DeliveryProperties.class)) != null)
+ {
+ redelivered = deliverProps.getRedelivered();
+ }
+ return mf.createMessage(transfer.getId(), redelivered, transfer.getHeader().getStructs(), transfer.getBody());
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java
index c866a5028e..ed590772d9 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java
@@ -7,9 +7,9 @@ public class ReturnMessage extends UnprocessedMessage_0_8
final private AMQShortString _replyText;
final private int _replyCode;
- public ReturnMessage(int channelId,AMQShortString exchange,AMQShortString routingKey,AMQShortString replyText,int replyCode)
+ public ReturnMessage(AMQShortString exchange, AMQShortString routingKey, AMQShortString replyText, int replyCode)
{
- super(channelId,-1,null,exchange,routingKey,false);
+ super(-1,0,exchange,routingKey,false);
_replyText = replyText;
_replyCode = replyCode;
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
index 3dbef46f6f..713c87260c 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
@@ -20,23 +20,7 @@
*/
package org.apache.qpid.client.message;
-import java.util.List;
-
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.AMQChannelException;
-import org.apache.qpid.AMQConnectionException;
-import org.apache.qpid.AMQException;
import org.apache.qpid.client.BasicMessageConsumer;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicDeliverBody;
-import org.apache.qpid.framing.BasicReturnBody;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.MethodDispatcher;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
/**
@@ -46,217 +30,24 @@ import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
* Note that the actual work of creating a JMS message for the client code's use is done outside of the MINA dispatcher
* thread in order to minimise the amount of work done in the MINA dispatcher thread.
*/
-public abstract class UnprocessedMessage<H,B>
+public abstract class UnprocessedMessage
{
- private final int _channelId;
- private final long _deliveryId;
- private final AMQShortString _consumerTag;
- protected AMQShortString _exchange;
- protected AMQShortString _routingKey;
- protected boolean _redelivered;
+ private final int _consumerTag;
+
- public UnprocessedMessage(int channelId,long deliveryId,AMQShortString consumerTag,AMQShortString exchange,AMQShortString routingKey,boolean redelivered)
+ public UnprocessedMessage(int consumerTag)
{
- _channelId = channelId;
- _deliveryId = deliveryId;
_consumerTag = consumerTag;
- _exchange = exchange;
- _routingKey = routingKey;
- _redelivered = redelivered;
}
- public abstract void receiveBody(B nativeMessageBody);
-
- public abstract void setContentHeader(H nativeMessageHeader);
- public int getChannelId()
- {
- return _channelId;
- }
+ abstract public long getDeliveryTag();
- public long getDeliveryTag()
- {
- return _deliveryId;
- }
- public AMQShortString getConsumerTag()
+ public int getConsumerTag()
{
return _consumerTag;
}
- public AMQShortString getExchange()
- {
- return _exchange;
- }
-
- public AMQShortString getRoutingKey()
- {
- return _routingKey;
- }
-
- public boolean isRedelivered()
- {
- return _redelivered;
- }
- public abstract List<B> getBodies();
-
- public abstract H getContentHeader();
-
- // specific to 0_10
- public String getReplyToURL()
- {
- return "";
- }
-
- public static final class CloseConsumerMessage extends UnprocessedMessage
- {
- AMQShortString _consumerTag;
-
- public CloseConsumerMessage(int channelId, long deliveryId, AMQShortString consumerTag,
- AMQShortString exchange, AMQShortString routingKey, boolean redelivered)
- {
- super(channelId, deliveryId, consumerTag, exchange, routingKey, redelivered);
- _consumerTag = consumerTag;
- }
-
- public CloseConsumerMessage(BasicMessageConsumer consumer)
- {
- this(0, 0, consumer.getConsumerTag(), null, null, false);
- }
-
- public BasicDeliverBody getDeliverBody()
- {
- return new BasicDeliverBody()
- {
-
- public AMQShortString getConsumerTag()
- {
- return _consumerTag;
- }
-
- public long getDeliveryTag()
- {
- return 0;
- }
-
- public AMQShortString getExchange()
- {
- return null;
- }
- public boolean getRedelivered()
- {
- return false;
- }
-
- public AMQShortString getRoutingKey()
- {
- return null;
- }
-
- public boolean execute(MethodDispatcher methodDispatcher, int channelId) throws AMQException
- {
- return false;
- }
-
- public AMQFrame generateFrame(int channelId)
- {
- return null;
- }
-
- public AMQChannelException getChannelException(AMQConstant code, String message)
- {
- return null;
- }
-
- public AMQChannelException getChannelException(AMQConstant code, String message, Throwable cause)
- {
- return null;
- }
-
- public AMQChannelException getChannelNotFoundException(int channelId)
- {
- return null;
- }
-
- public int getClazz()
- {
- return 0;
- }
-
- public AMQConnectionException getConnectionException(AMQConstant code, String message)
- {
- return null;
- }
-
- public AMQConnectionException getConnectionException(AMQConstant code, String message, Throwable cause)
- {
- return null;
- }
-
- public byte getMajor()
- {
- return 0;
- }
-
- public int getMethod()
- {
- return 0;
- }
-
- public byte getMinor()
- {
- return 0;
- }
-
- public int getSize()
- {
- return 0;
- }
-
- public void writeMethodPayload(ByteBuffer buffer)
- {
- }
-
- public void writePayload(ByteBuffer buffer)
- {
- }
-
- public byte getFrameType()
- {
- return 0;
- }
-
- public void handle(int channelId, AMQVersionAwareProtocolSession amqMinaProtocolSession)
- throws AMQException
- {
-
- }
- };
- }
-
- @Override
- public List getBodies()
- {
- return null;
- }
-
- @Override
- public Object getContentHeader()
- {
- return null;
- }
-
- @Override
- public void receiveBody(Object nativeMessageBody)
- {
-
- }
-
- @Override
- public void setContentHeader(Object nativeMessageHeader)
- {
-
- }
- }
} \ No newline at end of file
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java
index 5093064edf..6b1301a33f 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java
@@ -20,13 +20,7 @@
*/
package org.apache.qpid.client.message;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.transport.DeliveryProperties;
-import org.apache.qpid.transport.Struct;
+import org.apache.qpid.transport.MessageTransfer;
/**
* This class contains everything needed to process a JMS message. It assembles the deliver body, the content header and
@@ -35,58 +29,25 @@ import org.apache.qpid.transport.Struct;
* Note that the actual work of creating a JMS message for the client code's use is done outside of the MINA dispatcher
* thread in order to minimise the amount of work done in the MINA dispatcher thread.
*/
-public class UnprocessedMessage_0_10 extends UnprocessedMessage<Struct[],ByteBuffer>
+public class UnprocessedMessage_0_10 extends UnprocessedMessage
{
- private Struct[] _headers;
- private String _replyToURL;
-
- /** List of ContentBody instances. Due to fragmentation you don't know how big this will be in general */
- private List<ByteBuffer> _bodies = new ArrayList<ByteBuffer>();
-
- public UnprocessedMessage_0_10(int channelId,long deliveryId,AMQShortString consumerTag,AMQShortString exchange,AMQShortString routingKey,boolean redelivered)
- {
- super(channelId,deliveryId,consumerTag,exchange,routingKey,redelivered);
- }
-
- public void receiveBody(ByteBuffer body)
- {
-
- _bodies.add(body);
- }
-
- public void setContentHeader(Struct[] headers)
- {
- this._headers = headers;
- for(Struct s: headers)
- {
- if (s instanceof DeliveryProperties)
- {
- DeliveryProperties props = (DeliveryProperties)s;
- _exchange = new AMQShortString(props.getExchange());
- _routingKey = new AMQShortString(props.getRoutingKey());
- _redelivered = props.getRedelivered();
- }
- }
- }
+ private MessageTransfer _transfer;
- public Struct[] getContentHeader()
+ public UnprocessedMessage_0_10(int consumerTag, MessageTransfer xfr)
{
- return _headers;
- }
-
- public List<ByteBuffer> getBodies()
- {
- return _bodies;
+ super(consumerTag);
+ _transfer = xfr;
}
// additional 0_10 method
- public String getReplyToURL()
+
+ public long getDeliveryTag()
{
- return _replyToURL;
+ return _transfer.getId();
}
- public void setReplyToURL(String url)
+ public MessageTransfer getMessageTransfer()
{
- _replyToURL = url;
+ return _transfer;
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java
index 78da8cdca2..685e646d85 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java
@@ -26,7 +26,6 @@ import java.util.List;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicDeliverBody;
-import org.apache.qpid.framing.BasicReturnBody;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
@@ -37,32 +36,54 @@ import org.apache.qpid.framing.ContentHeaderBody;
* Note that the actual work of creating a JMS message for the client code's use is done outside of the MINA dispatcher
* thread in order to minimise the amount of work done in the MINA dispatcher thread.
*/
-public class UnprocessedMessage_0_8 extends UnprocessedMessage<ContentHeaderBody,ContentBody>
+public class UnprocessedMessage_0_8 extends UnprocessedMessage
{
private long _bytesReceived = 0;
+
+ private AMQShortString _exchange;
+ private AMQShortString _routingKey;
+ private final long _deliveryId;
+ protected boolean _redelivered;
+
private BasicDeliverBody _deliverBody;
private ContentHeaderBody _contentHeader;
/** List of ContentBody instances. Due to fragmentation you don't know how big this will be in general */
private List<ContentBody> _bodies;
- public UnprocessedMessage_0_8(int channelId,long deliveryId,AMQShortString consumerTag,AMQShortString exchange,AMQShortString routingKey,boolean redelivered)
+ public UnprocessedMessage_0_8(long deliveryId, int consumerTag, AMQShortString exchange, AMQShortString routingKey, boolean redelivered)
+ {
+ super(consumerTag);
+ _exchange = exchange;
+ _routingKey = routingKey;
+
+ _redelivered = redelivered;
+ _deliveryId = deliveryId;
+ }
+
+
+ public AMQShortString getExchange()
+ {
+ return _exchange;
+ }
+
+ public AMQShortString getRoutingKey()
{
- super(channelId,deliveryId,consumerTag,exchange,routingKey,redelivered);
+ return _routingKey;
}
- public UnprocessedMessage_0_8(int channelId, BasicReturnBody body)
+ public long getDeliveryTag()
{
- //FIXME: TGM, SRSLY 4RL
- super(channelId, 0, null, body.getExchange(), body.getRoutingKey(), false);
+ return _deliveryId;
}
- public UnprocessedMessage_0_8(int channelId, BasicDeliverBody body)
+ public boolean isRedelivered()
{
- super(channelId, body.getDeliveryTag(), body.getConsumerTag(), body.getExchange(), body.getRoutingKey(), false);
+ return _redelivered;
}
+
public void receiveBody(ContentBody body)
{
@@ -124,7 +145,7 @@ public class UnprocessedMessage_0_8 extends UnprocessedMessage<ContentHeaderBody
public String toString()
{
StringBuilder buf = new StringBuilder();
- buf.append("Channel Id : " + this.getChannelId());
+
if (_contentHeader != null)
{
buf.append("ContentHeader " + _contentHeader);
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index 6c3ae06ce9..d6c7f01e2d 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -224,9 +224,8 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
*
* @throws AMQException if this was not expected
*/
- public void unprocessedMessageReceived(UnprocessedMessage message) throws AMQException
- {
- final int channelId = message.getChannelId();
+ public void unprocessedMessageReceived(final int channelId, UnprocessedMessage message) throws AMQException
+ {
if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
{
_channelId2UnprocessedMsgArray[channelId] = message;
@@ -239,8 +238,8 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
public void contentHeaderReceived(int channelId, ContentHeaderBody contentHeader) throws AMQException
{
- final UnprocessedMessage msg = (channelId & FAST_CHANNEL_ACCESS_MASK) == 0 ? _channelId2UnprocessedMsgArray[channelId]
- : _channelId2UnprocessedMsgMap.get(channelId);
+ final UnprocessedMessage_0_8 msg = (UnprocessedMessage_0_8) ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0 ? _channelId2UnprocessedMsgArray[channelId]
+ : _channelId2UnprocessedMsgMap.get(channelId));
if (msg == null)
{
@@ -290,15 +289,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
throw new AMQException(null, "Error: received content body without having received a ContentHeader frame first", null);
}
- /*try
- {*/
msg.receiveBody(contentBody);
- /*}
- catch (UnexpectedBodyReceivedException e)
- {
- _channelId2UnprocessedMsgMap.remove(channelId);
- throw e;
- }*/
if (msg.isAllBodyDataReceived())
{
@@ -478,7 +469,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
{
final AMQSession session = getSession(channelId);
- session.confirmConsumerCancelled(consumerTag);
+ session.confirmConsumerCancelled(consumerTag.toIntValue());
}
public void setProtocolVersion(final ProtocolVersion pv)
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java b/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
index 0fc39a9318..bddbc329ab 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
@@ -21,8 +21,10 @@
package org.apache.qpid.client.util;
import java.util.Iterator;
+import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
/**
* A blocking queue that emits events above a user specified threshold allowing the caller to take action (e.g. flow
@@ -35,7 +37,7 @@ import java.util.concurrent.LinkedBlockingQueue;
public class FlowControllingBlockingQueue
{
/** This queue is bounded and is used to store messages before being dispatched to the consumer */
- private final BlockingQueue _queue = new LinkedBlockingQueue();
+ private final Queue _queue = new ConcurrentLinkedQueue();
private final int _flowControlHighThreshold;
private final int _flowControlLowThreshold;
@@ -71,7 +73,17 @@ public class FlowControllingBlockingQueue
public Object take() throws InterruptedException
{
- Object o = _queue.take();
+ Object o = _queue.poll();
+ if(o == null)
+ {
+ synchronized(this)
+ {
+ while((o = _queue.poll())==null)
+ {
+ wait();
+ }
+ }
+ }
if (_listener != null)
{
synchronized (_listener)
@@ -88,7 +100,12 @@ public class FlowControllingBlockingQueue
public void add(Object o)
{
- _queue.add(o);
+ synchronized(this)
+ {
+ _queue.add(o);
+
+ notifyAll();
+ }
if (_listener != null)
{
synchronized (_listener)
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/nclient/MessagePartListener.java b/qpid/java/client/src/main/java/org/apache/qpid/nclient/MessagePartListener.java
index 2fdc74fc09..6f07dcb469 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/nclient/MessagePartListener.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/nclient/MessagePartListener.java
@@ -20,6 +20,7 @@ package org.apache.qpid.nclient;
import java.nio.ByteBuffer;
import org.apache.qpid.transport.Header;
+import org.apache.qpid.transport.MessageTransfer;
/**
* Assembles message parts.
@@ -33,31 +34,13 @@ import org.apache.qpid.transport.Header;
* are transferred.
*/
public interface MessagePartListener
-{
- /**
- * Indicates the Message transfer has started.
- *
- * @param transferId The message transfer ID.
- */
- public void messageTransfer(int transferId);
-
- /**
- * Add the following a header to the message being received.
- *
- * @param header Either <code>DeliveryProperties</code> or <code>ApplicationProperties</code>
- */
- public void messageHeader(Header header);
+{
/**
- * Add the following byte array to the content of the message being received
+ * Inform the listener of the message transfer
*
- * @param src Data to be added or streamed.
- */
- public void data(ByteBuffer src);
-
- /**
- * Indicates that the message has been fully received.
+ * @param xfr the message transfer object
*/
- public void messageReceived();
+ public void messageTransfer(MessageTransfer xfr);
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java
index adcd49c26d..620cf14c33 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java
@@ -26,15 +26,7 @@ public class ClientSessionDelegate extends SessionDelegate
{
MessagePartListener listener = ((ClientSession)session).getMessageListeners()
.get(xfr.getDestination());
- listener.messageTransfer(xfr.getId());
- listener.messageHeader(xfr.getHeader());
- ByteBuffer body = xfr.getBody();
- if (body == null)
- {
- body = ByteBuffer.allocate(0);
- }
- listener.data(body);
- listener.messageReceived();
+ listener.messageTransfer(xfr);
}
@Override public void messageReject(Session session, MessageReject struct)
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/ByteBufferMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/ByteBufferMessage.java
index 3b10c44d5a..64c89c960c 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/ByteBufferMessage.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/ByteBufferMessage.java
@@ -2,8 +2,7 @@ package org.apache.qpid.nclient.util;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.LinkedList;
-import java.util.Queue;
+import java.util.*;
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.MessageProperties;
@@ -22,7 +21,7 @@ import org.apache.qpid.api.Message;
*/
public class ByteBufferMessage implements Message
{
- private Queue<ByteBuffer> _data = new LinkedList<ByteBuffer>();
+ private List<ByteBuffer> _data;// = new ArrayList<ByteBuffer>();
private ByteBuffer _readBuffer;
private int _dataSize;
private DeliveryProperties _currentDeliveryProps;
@@ -76,7 +75,18 @@ public class ByteBufferMessage implements Message
*/
public void appendData(ByteBuffer src) throws IOException
{
- _data.offer(src);
+ if(_data == null)
+ {
+ _data = Collections.singletonList(src);
+ }
+ else
+ {
+ if(_data.size() == 1)
+ {
+ _data = new ArrayList<ByteBuffer>(_data);
+ }
+ _data.add(src);
+ }
_dataSize += src.remaining();
}
@@ -100,12 +110,12 @@ public class ByteBufferMessage implements Message
_currentMessageProps = props;
}
- public void readData(byte[] target) throws IOException
+ public void readData(byte[] target)
{
getReadBuffer().get(target);
}
- public ByteBuffer readData() throws IOException
+ public ByteBuffer readData()
{
return getReadBuffer();
}
@@ -115,7 +125,7 @@ public class ByteBufferMessage implements Message
//optimize for the simple cases
if(_data.size() == 1)
{
- _readBuffer = _data.element().duplicate();
+ _readBuffer = _data.get(0).duplicate();
}
else
{
@@ -128,7 +138,7 @@ public class ByteBufferMessage implements Message
}
}
- private ByteBuffer getReadBuffer() throws IOException
+ private ByteBuffer getReadBuffer()
{
if (_readBuffer != null )
{
@@ -143,7 +153,7 @@ public class ByteBufferMessage implements Message
}
else
{
- throw new IOException("No Data to read");
+ return ByteBuffer.allocate(0);
}
}
}
@@ -151,16 +161,9 @@ public class ByteBufferMessage implements Message
//hack for testing
@Override public String toString()
{
- try
- {
- ByteBuffer temp = getReadBuffer();
- byte[] b = new byte[temp.remaining()];
- temp.get(b);
- return new String(b);
- }
- catch(IOException e)
- {
- return "No data";
- }
+ ByteBuffer temp = getReadBuffer();
+ byte[] b = new byte[temp.remaining()];
+ temp.get(b);
+ return new String(b);
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java b/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java
index 3f1746f48a..0e54e04a99 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java
@@ -3,9 +3,7 @@ package org.apache.qpid.nclient.util;
import java.io.IOException;
import java.nio.ByteBuffer;
-import org.apache.qpid.transport.DeliveryProperties;
-import org.apache.qpid.transport.MessageProperties;
-import org.apache.qpid.transport.Header;
+import org.apache.qpid.transport.*;
import org.apache.qpid.nclient.MessagePartListener;
/**
@@ -26,16 +24,35 @@ public class MessagePartListenerAdapter implements MessagePartListener
_adaptee = listener;
}
- public void messageTransfer(int transferId)
+ public void messageTransfer(MessageTransfer xfr)
{
- _currentMsg = new ByteBufferMessage(transferId);
- }
+ _currentMsg = new ByteBufferMessage(xfr.getId());
+
+ for (Struct st : xfr.getHeader().getStructs())
+ {
+ if(st instanceof DeliveryProperties)
+ {
+ _currentMsg.setDeliveryProperties((DeliveryProperties)st);
+
+ }
+ else if(st instanceof MessageProperties)
+ {
+ _currentMsg.setMessageProperties((MessageProperties)st);
+ }
+
+ }
+
+
+ ByteBuffer body = xfr.getBody();
+ if (body == null)
+ {
+ body = ByteBuffer.allocate(0);
+ }
+
- public void data(ByteBuffer src)
- {
try
{
- _currentMsg.appendData(src);
+ _currentMsg.appendData(body);
}
catch(IOException e)
{
@@ -43,16 +60,7 @@ public class MessagePartListenerAdapter implements MessagePartListener
// doesn't occur as we are using
// a ByteBuffer
}
- }
- public void messageHeader(Header header)
- {
- _currentMsg.setDeliveryProperties(header.get(DeliveryProperties.class));
- _currentMsg.setMessageProperties(header.get(MessageProperties.class));
- }
-
- public void messageReceived()
- {
_adaptee.onMessage(_currentMsg);
}
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
index dfb356432e..a881f6a822 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
@@ -31,7 +31,7 @@ import org.apache.qpid.AMQException;
import javax.jms.*;
import java.util.Map;
-public class TestAMQSession extends AMQSession
+public class TestAMQSession extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8>
{
public TestAMQSession()
@@ -94,7 +94,7 @@ public class TestAMQSession extends AMQSession
}
- public BasicMessageConsumer createMessageConsumer(AMQDestination destination, int prefetchHigh, int prefetchLow, boolean noLocal, boolean exclusive, String selector, FieldTable arguments, boolean noConsume, boolean autoClose) throws JMSException
+ public BasicMessageConsumer_0_8 createMessageConsumer(AMQDestination destination, int prefetchHigh, int prefetchLow, boolean noLocal, boolean exclusive, String selector, FieldTable arguments, boolean noConsume, boolean autoClose) throws JMSException
{
return null;
}
@@ -109,12 +109,12 @@ public class TestAMQSession extends AMQSession
return false;
}
- public void sendConsume(BasicMessageConsumer consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector, AMQShortString tag) throws AMQException, FailoverException
+ public void sendConsume(BasicMessageConsumer_0_8 consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector, int tag) throws AMQException, FailoverException
{
}
- public BasicMessageProducer createMessageProducer(Destination destination, boolean mandatory, boolean immediate, boolean waitUntilSent, long producerId)
+ public BasicMessageProducer_0_8 createMessageProducer(Destination destination, boolean mandatory, boolean immediate, boolean waitUntilSent, long producerId)
{
return null;
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
index 22f66ae556..a8e7f47db0 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
@@ -111,6 +111,8 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
private final byte[] _data;
private final int _offset;
private int _hashCode;
+ private String _asString = null;
+
private final int _length;
private static final char[] EMPTY_CHAR_ARRAY = new char[0];
@@ -137,7 +139,7 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
public AMQShortString(String data)
{
this((data == null) ? EMPTY_CHAR_ARRAY : data.toCharArray());
-
+ _asString = data;
}
public AMQShortString(char[] data)
@@ -418,15 +420,14 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
return chars;
}
- private String str = null;
public String asString()
{
- if (str == null)
+ if (_asString == null)
{
- str = new String(asChars());
+ _asString = new String(asChars());
}
- return str;
+ return _asString;
}
public boolean equals(Object o)
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Header.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Header.java
index 9b6ab4951b..9439e5e0de 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Header.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Header.java
@@ -24,6 +24,8 @@ import org.apache.qpid.transport.network.Frame;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
+import java.util.LinkedHashMap;
import java.nio.ByteBuffer;
@@ -35,30 +37,31 @@ import java.nio.ByteBuffer;
public class Header {
- private final List<Struct> structs;
+ private final Struct[] structs;
public Header(List<Struct> structs)
{
- this.structs = structs;
+ this(structs.toArray(new Struct[structs.size()]));
}
public Header(Struct ... structs)
{
- this(Arrays.asList(structs));
+ this.structs = structs;
}
- public List<Struct> getStructs()
+ public Struct[] getStructs()
{
return structs;
}
+
public <T> T get(Class<T> klass)
{
for (Struct st : structs)
{
if (klass.isInstance(st))
{
- return klass.cast(st);
+ return (T) st;
}
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
index 1400bd2e5b..2a4232c425 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
@@ -128,8 +128,14 @@ public class Session extends Invoker
{
int id = nextCommandId();
cmd.setId(id);
- log.debug("ID: [%s] %s", this.channel, id);
- if ((id % 65536) == 0)
+
+ if(log.isDebugEnabled())
+ {
+ log.debug("ID: [%s] %s", this.channel, id);
+ }
+
+ //if ((id % 65536) == 0)
+ if ((id & 0xff) == 0)
{
flushProcessed(TIMELY_REPLY);
}
@@ -232,7 +238,11 @@ public class Session extends Invoker
boolean complete(int lower, int upper)
{
- log.debug("%s complete(%d, %d)", this, lower, upper);
+ //avoid autoboxing
+ if(log.isDebugEnabled())
+ {
+ log.debug("%s complete(%d, %d)", this, lower, upper);
+ }
synchronized (commands)
{
int old = maxComplete;
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
index b808156dc6..33d552b91e 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
@@ -200,7 +200,7 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
break;
case HEADER:
command = incomplete[channel];
- List<Struct> structs = new ArrayList();
+ List<Struct> structs = new ArrayList(2);
while (dec.hasRemaining())
{
structs.add(dec.readStruct32());
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
index 007167115b..bb7d2506e3 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
@@ -34,7 +34,6 @@ import org.apache.qpid.transport.Struct;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
-import java.util.List;
import static org.apache.qpid.transport.network.Frame.*;
@@ -209,11 +208,11 @@ public final class Disassembler implements Sender<ProtocolEvent>,
if (payload)
{
final Header hdr = method.getHeader();
- final List<Struct> structs = hdr.getStructs();
- final int nstructs = structs.size();
- for (int i = 0; i < nstructs; i++)
+ final Struct[] structs = hdr.getStructs();
+
+ for (Struct st : structs)
{
- enc.writeStruct32(structs.get(i));
+ enc.writeStruct32(st);
}
headerSeg = enc.segment();
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/util/Logger.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/util/Logger.java
index 77f592b6c6..8c4818df92 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/util/Logger.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/util/Logger.java
@@ -42,6 +42,11 @@ public final class Logger
this.log = log;
}
+ public boolean isDebugEnabled()
+ {
+ return log.isDebugEnabled();
+ }
+
public void debug(String message, Object ... args)
{
if (log.isDebugEnabled())