diff options
34 files changed, 583 insertions, 861 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 723e502ff0..5203a27f42 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -63,7 +63,6 @@ import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQInvalidArgumentException; import org.apache.qpid.AMQInvalidRoutingKeyException; -import org.apache.qpid.AMQUndeliveredException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverNoopSupport; import org.apache.qpid.client.failover.FailoverProtectedOperation; @@ -78,7 +77,6 @@ import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.FieldTableFactory; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.jms.Session; -import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.url.AMQBindingURL; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -99,20 +97,20 @@ import org.slf4j.LoggerFactory; * @todo Two new objects created on every failover supported method call. Consider more efficient ways of doing this, * after looking at worse bottlenecks first. */ -public abstract class AMQSession extends Closeable implements Session, QueueSession, TopicSession +public abstract class AMQSession<C extends BasicMessageConsumer, P extends BasicMessageProducer> extends Closeable implements Session, QueueSession, TopicSession { - public static final class IdToConsumerMap + public static final class IdToConsumerMap<C extends BasicMessageConsumer> { private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16]; - private final ConcurrentHashMap<Integer, BasicMessageConsumer> _slowAccessConsumers = new ConcurrentHashMap<Integer, BasicMessageConsumer>(); + private final ConcurrentHashMap<Integer, C> _slowAccessConsumers = new ConcurrentHashMap<Integer, C>(); - public BasicMessageConsumer get(int id) + public C get(int id) { if ((id & 0xFFFFFFF0) == 0) { - return _fastAccessConsumers[id]; + return (C) _fastAccessConsumers[id]; } else { @@ -120,12 +118,12 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } } - public BasicMessageConsumer put(int id, BasicMessageConsumer consumer) + public C put(int id, C consumer) { - BasicMessageConsumer oldVal; + C oldVal; if ((id & 0xFFFFFFF0) == 0) { - oldVal = _fastAccessConsumers[id]; + oldVal = (C) _fastAccessConsumers[id]; _fastAccessConsumers[id] = consumer; } else @@ -137,12 +135,12 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } - public BasicMessageConsumer remove(int id) + public C remove(int id) { - BasicMessageConsumer consumer; + C consumer; if ((id & 0xFFFFFFF0) == 0) { - consumer = _fastAccessConsumers[id]; + consumer = (C) _fastAccessConsumers[id]; _fastAccessConsumers[id] = null; } else @@ -154,15 +152,15 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } - public Collection<BasicMessageConsumer> values() + public Collection<C> values() { - ArrayList<BasicMessageConsumer> values = new ArrayList<BasicMessageConsumer>(); + ArrayList<C> values = new ArrayList<C>(); for (int i = 0; i < 16; i++) { if (_fastAccessConsumers[i] != null) { - values.add(_fastAccessConsumers[i]); + values.add((C) _fastAccessConsumers[i]); } } values.addAll(_slowAccessConsumers.values()); @@ -183,8 +181,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess /** Used for debugging. */ private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class); - /** Used for debugging in the dispatcher. */ - private static final Logger _dispatcherLogger = LoggerFactory.getLogger(Dispatcher.class); /** The default maximum number of prefetched message at which to suspend the channel. */ public static final int DEFAULT_PREFETCH_HIGH_MARK = 5000; @@ -234,7 +230,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess /** Holds this session unique identifier, used to distinguish it from other sessions. */ protected int _channelId; - /** @todo This does not appear to be set? */ private int _ticket; /** Holds the high mark for prefetched message, at which the session is suspended. */ @@ -261,8 +256,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * Holds a mapping from message consumers to their identifying names, so that their subscriptions may be looked * up in the {@link #_subscriptions} map. */ - protected final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap = - new ConcurrentHashMap<BasicMessageConsumer, String>(); + protected final ConcurrentHashMap<C, String> _reverseSubscriptionMap = + new ConcurrentHashMap<C, String>(); /** * Used to hold incoming messages. @@ -299,7 +294,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * Maps from identifying tags to message consumers, in order to pass dispatch incoming messages to the right * consumer. */ - protected final IdToConsumerMap _consumers = new IdToConsumerMap(); + protected final IdToConsumerMap<C> _consumers = new IdToConsumerMap<C>(); //Map<AMQShortString, BasicMessageConsumer> _consumers = //new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>(); @@ -308,7 +303,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * Contains a list of consumers which have been removed but which might still have * messages to acknowledge, eg in client ack or transacted modes */ - private CopyOnWriteArrayList<BasicMessageConsumer> _removedConsumers = new CopyOnWriteArrayList<BasicMessageConsumer>(); + private CopyOnWriteArrayList<C> _removedConsumers = new CopyOnWriteArrayList<C>(); /** Provides a count of consumers on destinations, in order to be able to know if a destination has consumers. */ private ConcurrentHashMap<Destination, AtomicInteger> _destinationConsumerCount = @@ -584,7 +579,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess }, _connection).execute(); } - public void addBindingKey(BasicMessageConsumer consumer, AMQDestination amqd, String routingKey) throws AMQException + public void addBindingKey(C consumer, AMQDestination amqd, String routingKey) throws AMQException { if (consumer.getQueuename() != null) { @@ -755,11 +750,12 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess public abstract void sendCommit() throws AMQException, FailoverException; - public void confirmConsumerCancelled(AMQShortString consumerTag) + + public void confirmConsumerCancelled(int consumerTag) { // Remove the consumer from the map - BasicMessageConsumer consumer = _consumers.get(consumerTag.toIntValue()); + C consumer = _consumers.get(consumerTag); if (consumer != null) { if (!consumer.isNoConsume()) // Normal Consumer @@ -801,7 +797,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } else { - _queue.add(new UnprocessedMessage.CloseConsumerMessage(consumer)); + _queue.add(new CloseConsumerMessage(consumer)); } } } @@ -848,7 +844,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess false, false); } - public MessageConsumer createExclusiveConsumer(Destination destination) throws JMSException + public C createExclusiveConsumer(Destination destination) throws JMSException { checkValidDestination(destination); @@ -927,8 +923,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess _subscriptions.get(name).close(); } AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection); - BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal); - TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer); + C consumer = (C) createConsumer(dest, messageSelector, noLocal); + TopicSubscriberAdaptor<C> subscriber = new TopicSubscriberAdaptor(dest, consumer); _subscriptions.put(name, subscriber); _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name); @@ -960,23 +956,23 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess return msg; } - public BasicMessageProducer createProducer(Destination destination) throws JMSException + public P createProducer(Destination destination) throws JMSException { return createProducerImpl(destination, DEFAULT_MANDATORY, DEFAULT_IMMEDIATE); } - public BasicMessageProducer createProducer(Destination destination, boolean immediate) throws JMSException + public P createProducer(Destination destination, boolean immediate) throws JMSException { return createProducerImpl(destination, DEFAULT_MANDATORY, immediate); } - public BasicMessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate) + public P createProducer(Destination destination, boolean mandatory, boolean immediate) throws JMSException { return createProducerImpl(destination, mandatory, immediate); } - public BasicMessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate, + public P createProducer(Destination destination, boolean mandatory, boolean immediate, boolean waitUntilSent) throws JMSException { return createProducerImpl(destination, mandatory, immediate, waitUntilSent); @@ -986,7 +982,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { checkNotClosed(); - return new TopicPublisherAdapter((BasicMessageProducer) createProducer(topic, false, false), topic); + return new TopicPublisherAdapter((P) createProducer(topic, false, false), topic); } public Queue createQueue(String queueName) throws JMSException @@ -1074,7 +1070,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { checkValidDestination(destination); AMQQueue dest = (AMQQueue) destination; - BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(destination); + C consumer = (C) createConsumer(destination); return new QueueReceiverAdaptor(dest, consumer); } @@ -1093,7 +1089,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { checkValidDestination(destination); AMQQueue dest = (AMQQueue) destination; - BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(destination, messageSelector); + C consumer = (C) createConsumer(destination, messageSelector); return new QueueReceiverAdaptor(dest, consumer); } @@ -1111,7 +1107,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { checkNotClosed(); AMQQueue dest = (AMQQueue) queue; - BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest); + C consumer = (C) createConsumer(dest); return new QueueReceiverAdaptor(dest, consumer); } @@ -1130,7 +1126,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { checkNotClosed(); AMQQueue dest = (AMQQueue) queue; - BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector); + C consumer = (C) createConsumer(dest, messageSelector); return new QueueReceiverAdaptor(dest, consumer); } @@ -1175,7 +1171,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess AMQTopic dest = checkValidTopic(topic); // AMQTopic dest = new AMQTopic(topic.getTopicName()); - return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createExclusiveConsumer(dest)); + return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest)); } /** @@ -1195,7 +1191,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess AMQTopic dest = checkValidTopic(topic); // AMQTopic dest = new AMQTopic(topic.getTopicName()); - return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createExclusiveConsumer(dest, messageSelector, noLocal)); + return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest, messageSelector, noLocal)); } public abstract TemporaryQueue createTemporaryQueue() throws JMSException; @@ -1369,17 +1365,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { _logger.debug("Message[" + message.toString() + "] received in session"); } - - if (message instanceof ReturnMessage) - { - // Return of the bounced message. - returnBouncedMessage((ReturnMessage) message); - } - else - { - _highestDeliveryTag.set(message.getDeliveryTag()); - _queue.add(message); - } + _highestDeliveryTag.set(message.getDeliveryTag()); + _queue.add(message); } public void declareAndBind(AMQDestination amqd) @@ -1618,7 +1605,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } } - protected MessageConsumer createConsumerImpl(final Destination destination, final int prefetchHigh, + protected C createConsumerImpl(final Destination destination, final int prefetchHigh, final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable rawSelector, final boolean noConsume, final boolean autoClose) throws JMSException { @@ -1642,10 +1629,10 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess messageSelector = selector; } - return new FailoverRetrySupport<MessageConsumer, JMSException>( - new FailoverProtectedOperation<MessageConsumer, JMSException>() + return new FailoverRetrySupport<C, JMSException>( + new FailoverProtectedOperation<C, JMSException>() { - public MessageConsumer execute() throws JMSException, FailoverException + public C execute() throws JMSException, FailoverException { checkNotClosed(); @@ -1668,7 +1655,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess ft.put(new AMQShortString("x-filter-jms-selector"), messageSelector); } - BasicMessageConsumer consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow, + C consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow, noLocal, exclusive, messageSelector, ft, noConsume, autoClose); if (_messageListener != null) @@ -1712,7 +1699,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess }, _connection).execute(); } - public abstract BasicMessageConsumer createMessageConsumer(final AMQDestination destination, final int prefetchHigh, + public abstract C createMessageConsumer(final AMQDestination destination, final int prefetchHigh, final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable arguments, final boolean noConsume, final boolean autoClose) throws JMSException; @@ -1722,9 +1709,9 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * * @param consumer the consum */ - void deregisterConsumer(BasicMessageConsumer consumer) + void deregisterConsumer(C consumer) { - if (_consumers.remove(consumer.getConsumerTag().toIntValue()) != null) + if (_consumers.remove(consumer.getConsumerTag()) != null) { String subscriptionName = _reverseSubscriptionMap.remove(consumer); if (subscriptionName != null) @@ -2012,12 +1999,12 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { // we need to clone the list of consumers since the close() method updates the _consumers collection // which would result in a concurrent modification exception - final ArrayList<BasicMessageConsumer> clonedConsumers = new ArrayList<BasicMessageConsumer>(_consumers.values()); + final ArrayList<C> clonedConsumers = new ArrayList<C>(_consumers.values()); - final Iterator<BasicMessageConsumer> it = clonedConsumers.iterator(); + final Iterator<C> it = clonedConsumers.iterator(); while (it.hasNext()) { - final BasicMessageConsumer con = it.next(); + final C con = it.next(); if (error != null) { con.notifyError(error); @@ -2048,7 +2035,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess final Iterator it = clonedProducers.iterator(); while (it.hasNext()) { - final BasicMessageProducer prod = (BasicMessageProducer) it.next(); + final P prod = (P) it.next(); prod.close(); } // at this point the _producers map is empty @@ -2096,20 +2083,18 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * * @param queueName */ - private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName, + private void consumeFromQueue(C consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector) throws AMQException, FailoverException { int tagId = _nextTag++; - // need to generate a consumer tag on the client so we can exploit the nowait flag - AMQShortString tag = new AMQShortString(Integer.toString(tagId)); - consumer.setConsumerTag(tag); + consumer.setConsumerTag(tagId); // we must register the consumer in the map before we actually start listening _consumers.put(tagId, consumer); try { - sendConsume(consumer, queueName, protocolHandler, nowait, messageSelector, tag); + sendConsume(consumer, queueName, protocolHandler, nowait, messageSelector, tagId); } catch (AMQException e) { @@ -2119,26 +2104,26 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } } - public abstract void sendConsume(BasicMessageConsumer consumer, AMQShortString queueName, - AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector, AMQShortString tag) throws AMQException, FailoverException; + public abstract void sendConsume(C consumer, AMQShortString queueName, + AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector, int tag) throws AMQException, FailoverException; - private BasicMessageProducer createProducerImpl(Destination destination, boolean mandatory, boolean immediate) + private P createProducerImpl(Destination destination, boolean mandatory, boolean immediate) throws JMSException { return createProducerImpl(destination, mandatory, immediate, false); } - private BasicMessageProducer createProducerImpl(final Destination destination, final boolean mandatory, + private P createProducerImpl(final Destination destination, final boolean mandatory, final boolean immediate, final boolean waitUntilSent) throws JMSException { - return new FailoverRetrySupport<BasicMessageProducer, JMSException>( - new FailoverProtectedOperation<BasicMessageProducer, JMSException>() + return new FailoverRetrySupport<P, JMSException>( + new FailoverProtectedOperation<P, JMSException>() { - public BasicMessageProducer execute() throws JMSException, FailoverException + public P execute() throws JMSException, FailoverException { checkNotClosed(); long producerId = getNextProducerId(); - BasicMessageProducer producer = createMessageProducer(destination, mandatory, + P producer = createMessageProducer(destination, mandatory, immediate, waitUntilSent, producerId); registerProducer(producerId, producer); @@ -2147,7 +2132,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess }, _connection).execute(); } - public abstract BasicMessageProducer createMessageProducer(final Destination destination, final boolean mandatory, + public abstract P createMessageProducer(final Destination destination, final boolean mandatory, final boolean immediate, final boolean waitUntilSent, long producerId); private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException @@ -2320,12 +2305,12 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } // we need to clone the list of consumers since the close() method updates the _consumers collection // which would result in a concurrent modification exception - final ArrayList<BasicMessageConsumer> clonedConsumers = new ArrayList<BasicMessageConsumer>(_consumers.values()); + final ArrayList<C> clonedConsumers = new ArrayList<C>(_consumers.values()); - final Iterator<BasicMessageConsumer> it = clonedConsumers.iterator(); + final Iterator<C> it = clonedConsumers.iterator(); while (it.hasNext()) { - final BasicMessageConsumer con = it.next(); + final C con = it.next(); con.markClosed(); } // at this point the _consumers map will be empty @@ -2360,7 +2345,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * * @throws AMQException */ - private void registerConsumer(BasicMessageConsumer consumer, boolean nowait) throws AMQException // , FailoverException + private void registerConsumer(C consumer, boolean nowait) throws AMQException // , FailoverException { AMQDestination amqd = consumer.getDestination(); @@ -2424,15 +2409,16 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess private void rejectAllMessages(boolean requeue) { - rejectMessagesForConsumerTag(null, requeue); + rejectMessagesForConsumerTag(0, requeue, true); } /** * @param consumerTag The consumerTag to prune from queue or all if null * @param requeue Should the removed messages be requeued (or discarded. Possibly to DLQ) + * @param rejectAllConsumers */ - private void rejectMessagesForConsumerTag(AMQShortString consumerTag, boolean requeue) + private void rejectMessagesForConsumerTag(int consumerTag, boolean requeue, boolean rejectAllConsumers) { Iterator messages = _queue.iterator(); if (_logger.isInfoEnabled()) @@ -2453,7 +2439,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { UnprocessedMessage message = (UnprocessedMessage) messages.next(); - if ((consumerTag == null) || message.getConsumerTag().equals(consumerTag.toString())) + if (rejectAllConsumers || (message.getConsumerTag() == consumerTag)) { if (_logger.isDebugEnabled()) { @@ -2475,12 +2461,11 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess private void resubscribeConsumers() throws AMQException { - ArrayList consumers = new ArrayList(_consumers.values()); + ArrayList<C> consumers = new ArrayList<C>(_consumers.values()); _consumers.clear(); - for (Iterator it = consumers.iterator(); it.hasNext();) - { - BasicMessageConsumer consumer = (BasicMessageConsumer) it.next(); + for (C consumer : consumers) + { consumer.failedOver(); registerConsumer(consumer, true); } @@ -2492,53 +2477,11 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess _logger.info(MessageFormat.format("Resubscribing producers = {0} producers.size={1}", producers, producers.size())); // FIXME: removeKey for (Iterator it = producers.iterator(); it.hasNext();) { - BasicMessageProducer producer = (BasicMessageProducer) it.next(); + P producer = (P) it.next(); producer.resubscribe(); } } - private void returnBouncedMessage(final ReturnMessage msg) - { - _connection.performConnectionTask(new Runnable() - { - public void run() - { - try - { - // Bounced message is processed here, away from the mina thread - AbstractJMSMessage bouncedMessage = - _messageFactoryRegistry.createMessage(0, false, msg.getExchange(), - msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies()); - AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode()); - AMQShortString reason = msg.getReplyText(); - _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")"); - - // @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions. - if (errorCode == AMQConstant.NO_CONSUMERS) - { - _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage, null)); - } - else if (errorCode == AMQConstant.NO_ROUTE) - { - _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage, null)); - } - else - { - _connection.exceptionReceived( - new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage, null)); - } - - } - catch (Exception e) - { - _logger.error( - "Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", - e); - } - } - }); - } - /** * Suspends or unsuspends this session. * @@ -2641,6 +2584,10 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } + /** Used for debugging in the dispatcher. */ + private static final Logger _dispatcherLogger = LoggerFactory.getLogger("org.apache.qpid.client.AMQSession.Dispatcher"); + + /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ class Dispatcher extends Thread { @@ -2652,6 +2599,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess private final AtomicLong _rollbackMark = new AtomicLong(-1); private String dispatcherID = "" + System.identityHashCode(this); + + public Dispatcher() { super("Dispatcher-Channel-" + _channelId); @@ -2670,7 +2619,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } - public void rejectPending(BasicMessageConsumer consumer) + public void rejectPending(C consumer) { synchronized (_lock) { @@ -2685,7 +2634,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess consumer.rollbackPendingMessages(); // Reject messages on pre-dispatch queue - rejectMessagesForConsumerTag(consumer.getConsumerTag(), true); + rejectMessagesForConsumerTag(consumer.getConsumerTag(), true, false); //Let the dispatcher deal with this when it gets to them. // closeConsumer @@ -2712,7 +2661,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess _dispatcherLogger.debug("Session Pre Dispatch Queue cleared"); - for (BasicMessageConsumer consumer : _consumers.values()) + for (C consumer : _consumers.values()) { if (!consumer.isNoConsume()) { @@ -2778,7 +2727,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess _lock.wait(); } - if (!(message instanceof UnprocessedMessage.CloseConsumerMessage) + if (!(message instanceof CloseConsumerMessage) && tagLE(deliveryTag, _rollbackMark.get())) { rejectMessage(message, true); @@ -2840,8 +2789,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess //This if block is not needed anymore as bounce messages are handled separately //if (message.getDeliverBody() != null) //{ - final BasicMessageConsumer consumer = - _consumers.get(message.getConsumerTag().toIntValue()); + final C consumer = + _consumers.get(message.getConsumerTag()); if ((consumer == null) || consumer.isClosed()) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 32a0fdd5f5..aa0ff66545 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -52,7 +52,7 @@ import java.util.Map; /** * This is a 0.10 Session */ -public class AMQSession_0_10 extends AMQSession +public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, BasicMessageProducer_0_10> { /** @@ -345,7 +345,7 @@ public class AMQSession_0_10 extends AMQSession /** * Create an 0_10 message consumer */ - public BasicMessageConsumer createMessageConsumer(final AMQDestination destination, final int prefetchHigh, + public BasicMessageConsumer_0_10 createMessageConsumer(final AMQDestination destination, final int prefetchHigh, final int prefetchLow, final boolean noLocal, final boolean exclusive, String messageSelector, final FieldTable ft, final boolean noConsume, @@ -406,8 +406,8 @@ public class AMQSession_0_10 extends AMQSession * This method is invoked when a consumer is creted * Registers the consumer with the broker */ - public void sendConsume(BasicMessageConsumer consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, - boolean nowait, String messageSelector, AMQShortString tag) + public void sendConsume(BasicMessageConsumer_0_10 consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, + boolean nowait, String messageSelector, int tag) throws AMQException, FailoverException { boolean preAcquire; @@ -416,10 +416,10 @@ public class AMQSession_0_10 extends AMQSession preAcquire = ( ! consumer.isNoConsume() && (consumer.getMessageSelector() == null || consumer.getMessageSelector().equals("")) ) || !(consumer.getDestination() instanceof AMQQueue); - getQpidSession().messageSubscribe(queueName.toString(), tag.toString(), + getQpidSession().messageSubscribe(queueName.toString(), String.valueOf(tag), getAcknowledgeMode() == NO_ACKNOWLEDGE ? Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED:Session.TRANSFER_CONFIRM_MODE_REQUIRED, preAcquire ? Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE : Session.TRANSFER_ACQUIRE_MODE_NO_ACQUIRE, - new MessagePartListenerAdapter((BasicMessageConsumer_0_10) consumer), null, + (BasicMessageConsumer_0_10) consumer, null, consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE); } catch (JMSException e) @@ -427,21 +427,23 @@ public class AMQSession_0_10 extends AMQSession throw new AMQException(AMQConstant.INTERNAL_ERROR, "problem when registering consumer", e); } + String consumerTag = ((BasicMessageConsumer_0_10)consumer).getConsumerTagString(); + if (! prefetch()) { - getQpidSession().messageSetFlowMode(consumer.getConsumerTag().toString(), MessageFlowMode.CREDIT); + getQpidSession().messageSetFlowMode(consumerTag, MessageFlowMode.CREDIT); } else { - getQpidSession().messageSetFlowMode(consumer.getConsumerTag().toString(), MessageFlowMode.WINDOW); + getQpidSession().messageSetFlowMode(consumerTag, MessageFlowMode.WINDOW); } - getQpidSession().messageFlow(consumer.getConsumerTag().toString(), MessageCreditUnit.BYTE, 0xFFFFFFFF); + getQpidSession().messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF); // We need to sync so that we get notify of an error. // only if not immediat prefetch if(prefetch() && (consumer.isStrated() || _immediatePrefetch)) { // set the flow - getQpidSession().messageFlow(consumer.getConsumerTag().toString(), + getQpidSession().messageFlow(consumerTag, MessageCreditUnit.MESSAGE, getAMQConnection().getMaxPrefetch()); } @@ -452,7 +454,7 @@ public class AMQSession_0_10 extends AMQSession /** * Create an 0_10 message producer */ - public BasicMessageProducer createMessageProducer(final Destination destination, final boolean mandatory, + public BasicMessageProducer_0_10 createMessageProducer(final Destination destination, final boolean mandatory, final boolean immediate, final boolean waitUntilSent, long producerId) { @@ -542,13 +544,14 @@ public class AMQSession_0_10 extends AMQSession { for (BasicMessageConsumer consumer : _consumers.values()) { - getQpidSession().messageStop(consumer.getConsumerTag().toString()); + getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag())); } } else { - for (BasicMessageConsumer consumer : _consumers.values()) + for (BasicMessageConsumer_0_10 consumer : _consumers.values()) { + String consumerTag = String.valueOf(consumer.getConsumerTag()); //only set if msg list is null try { @@ -556,18 +559,18 @@ public class AMQSession_0_10 extends AMQSession { if (consumer.getMessageListener() != null) { - getQpidSession().messageFlow(consumer.getConsumerTag().toString(), + getQpidSession().messageFlow(consumerTag, MessageCreditUnit.MESSAGE, 1); } } else { getQpidSession() - .messageFlow(consumer.getConsumerTag().toString(), MessageCreditUnit.MESSAGE, + .messageFlow(consumerTag, MessageCreditUnit.MESSAGE, getAMQConnection().getMaxPrefetch()); } getQpidSession() - .messageFlow(consumer.getConsumerTag().toString(), MessageCreditUnit.BYTE, 0xFFFFFFFF); + .messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF); } catch (Exception e) { @@ -715,7 +718,7 @@ public class AMQSession_0_10 extends AMQSession AMQTopic origTopic=checkValidTopic(topic); AMQTopic dest=AMQTopic.createDurable010Topic(origTopic, name, _connection); - TopicSubscriberAdaptor subscriber=_subscriptions.get(name); + TopicSubscriberAdaptor<BasicMessageConsumer_0_10> subscriber=_subscriptions.get(name); if (subscriber != null) { if (subscriber.getTopic().equals(topic)) @@ -766,7 +769,7 @@ public class AMQSession_0_10 extends AMQSession } } - subscriber=new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createExclusiveConsumer(dest)); + subscriber=new TopicSubscriberAdaptor(dest, createExclusiveConsumer(dest)); _subscriptions.put(name, subscriber); _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 2c88d6f557..2442b157f1 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -25,10 +25,11 @@ import javax.jms.*; import javax.jms.IllegalStateException; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQUndeliveredException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.failover.FailoverRetrySupport; -import org.apache.qpid.client.message.MessageFactoryRegistry; +import org.apache.qpid.client.message.*; import org.apache.qpid.client.message.AMQMessageDelegateFactory; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; @@ -43,7 +44,7 @@ import org.slf4j.LoggerFactory; import java.util.Map; -public final class AMQSession_0_8 extends AMQSession +public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8> { /** Used for debugging. */ @@ -218,6 +219,7 @@ public final class AMQSession_0_8 extends AMQSession return isQueueBound(destination.getExchangeName(),destination.getAMQQueueName(),destination.getAMQQueueName()); } + public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey) throws JMSException { @@ -245,10 +247,14 @@ public final class AMQSession_0_8 extends AMQSession { throw new JMSAMQException("Queue bound query failed: " + e.getMessage(), e); } - } - - public void sendConsume(BasicMessageConsumer consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, boolean nowait, - String messageSelector, AMQShortString tag) throws AMQException, FailoverException + } + + @Override public void sendConsume(BasicMessageConsumer_0_8 consumer, + AMQShortString queueName, + AMQProtocolHandler protocolHandler, + boolean nowait, + String messageSelector, + int tag) throws AMQException, FailoverException { FieldTable arguments = FieldTableFactory.newFieldTable(); if ((messageSelector != null) && !messageSelector.equals("")) @@ -268,7 +274,7 @@ public final class AMQSession_0_8 extends AMQSession BasicConsumeBody body = getMethodRegistry().createBasicConsumeBody(getTicket(), queueName, - tag, + new AMQShortString(String.valueOf(tag)), consumer.isNoLocal(), consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, consumer.isExclusive(), @@ -337,7 +343,7 @@ public final class AMQSession_0_8 extends AMQSession } - public BasicMessageProducer createMessageProducer(final Destination destination, final boolean mandatory, + public BasicMessageProducer_0_8 createMessageProducer(final Destination destination, final boolean mandatory, final boolean immediate, final boolean waitUntilSent, long producerId) { @@ -345,6 +351,66 @@ public final class AMQSession_0_8 extends AMQSession this, getProtocolHandler(), producerId, immediate, mandatory, waitUntilSent); } + + @Override public void messageReceived(UnprocessedMessage message) + { + + if (message instanceof ReturnMessage) + { + // Return of the bounced message. + returnBouncedMessage((ReturnMessage) message); + } + else + { + super.messageReceived(message); + } + } + + private void returnBouncedMessage(final ReturnMessage msg) + { + _connection.performConnectionTask(new Runnable() + { + public void run() + { + try + { + // Bounced message is processed here, away from the mina thread + AbstractJMSMessage bouncedMessage = + _messageFactoryRegistry.createMessage(0, false, msg.getExchange(), + msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies()); + AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode()); + AMQShortString reason = msg.getReplyText(); + _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")"); + + // @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions. + if (errorCode == AMQConstant.NO_CONSUMERS) + { + _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage, null)); + } + else if (errorCode == AMQConstant.NO_ROUTE) + { + _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage, null)); + } + else + { + _connection.exceptionReceived( + new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage, null)); + } + + } + catch (Exception e) + { + _logger.error( + "Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", + e); + } + } + }); + } + + + + public void sendRollback() throws AMQException, FailoverException { TxRollbackBody body = getMethodRegistry().createTxRollbackBody(); @@ -365,7 +431,7 @@ public final class AMQSession_0_8 extends AMQSession checkNotClosed(); AMQTopic origTopic = checkValidTopic(topic); AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection); - TopicSubscriberAdaptor subscriber = _subscriptions.get(name); + TopicSubscriberAdaptor<BasicMessageConsumer_0_8> subscriber = _subscriptions.get(name); if (subscriber != null) { if (subscriber.getTopic().equals(topic)) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 0176f66552..01bb68c23e 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -22,10 +22,7 @@ package org.apache.qpid.client; import org.apache.qpid.AMQException; import org.apache.qpid.client.failover.FailoverException; -import org.apache.qpid.client.message.AbstractJMSMessage; -import org.apache.qpid.client.message.MessageFactoryRegistry; -import org.apache.qpid.client.message.UnprocessedMessage; -import org.apache.qpid.client.message.AMQMessageDelegateFactory; +import org.apache.qpid.client.message.*; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.framing.*; import org.apache.qpid.jms.MessageConsumer; @@ -49,7 +46,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -public abstract class BasicMessageConsumer<H, B> extends Closeable implements MessageConsumer +public abstract class BasicMessageConsumer<U> extends Closeable implements MessageConsumer { private static final Logger _logger = LoggerFactory.getLogger(BasicMessageConsumer.class); @@ -72,7 +69,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me private final AtomicReference<MessageListener> _messageListener = new AtomicReference<MessageListener>(); /** The consumer tag allows us to close the consumer by sending a jmsCancel method to the broker */ - protected AMQShortString _consumerTag; + protected int _consumerTag; /** We need to know the channel id when constructing frames */ protected final int _channelId; @@ -517,7 +514,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me throw e; } - else if (o instanceof UnprocessedMessage.CloseConsumerMessage) + else if (o instanceof CloseConsumerMessage) { _closed.set(true); deregisterConsumer(); @@ -635,7 +632,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me * @param closeMessage * this message signals that we should close the browser */ - public void notifyCloseMessage(UnprocessedMessage.CloseConsumerMessage closeMessage) + public void notifyCloseMessage(CloseConsumerMessage closeMessage) { if (isMessageListenerSet()) { @@ -667,26 +664,21 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me * * @param messageFrame the raw unprocessed mesage */ - void notifyMessage(UnprocessedMessage messageFrame) + void notifyMessage(U messageFrame) { - if (messageFrame instanceof UnprocessedMessage.CloseConsumerMessage) + if (messageFrame instanceof CloseConsumerMessage) { - notifyCloseMessage((UnprocessedMessage.CloseConsumerMessage) messageFrame); + notifyCloseMessage((CloseConsumerMessage) messageFrame); return; } - final boolean debug = _logger.isDebugEnabled(); - if (debug) - { - _logger.debug("notifyMessage called with message number " + messageFrame.getDeliveryTag()); - } try { AbstractJMSMessage jmsMessage = createJMSMessageFromUnprocessedMessage(_session.getMessageDelegateFactory(), messageFrame); - if (debug) + if (_logger.isDebugEnabled()) { _logger.debug("Message is of type: " + jmsMessage.getClass().getName()); } @@ -721,7 +713,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me } } - public abstract AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, UnprocessedMessage<H, B> messageFrame) + public abstract AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, U messageFrame) throws Exception; /** @param jmsMessage this message has already been processed so can't redo preDeliver */ @@ -936,12 +928,12 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me _session.deregisterConsumer(this); } - public AMQShortString getConsumerTag() + public int getConsumerTag() { return _consumerTag; } - public void setConsumerTag(AMQShortString consumerTag) + public void setConsumerTag(int consumerTag) { _consumerTag = consumerTag; } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 388adfb434..2a37298a43 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -22,11 +22,8 @@ import org.slf4j.LoggerFactory; import org.apache.qpid.client.message.*; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.AMQException; -import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.api.Message; import org.apache.qpid.transport.*; import org.apache.qpid.QpidException; import org.apache.qpid.filter.MessageFilter; @@ -35,16 +32,14 @@ import org.apache.qpid.filter.JMSSelectorFilter; import javax.jms.InvalidSelectorException; import javax.jms.JMSException; import javax.jms.MessageListener; -import java.io.IOException; -import java.nio.ByteBuffer; import java.util.Iterator; import java.util.concurrent.atomic.AtomicBoolean; /** * This is a 0.10 message consumer. */ -public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], ByteBuffer> - implements org.apache.qpid.nclient.util.MessageListener +public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedMessage_0_10> + implements org.apache.qpid.nclient.MessagePartListener { /** @@ -76,6 +71,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By * Specify whether this consumer is performing a sync receive */ private final AtomicBoolean _syncReceive = new AtomicBoolean(false); + private String _consumerTagString; //--- constructor protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination, @@ -106,6 +102,19 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By _isStarted = connection.started(); } + + @Override public void setConsumerTag(int consumerTag) + { + super.setConsumerTag(consumerTag); + _consumerTagString = String.valueOf(consumerTag); + } + + public String getConsumerTagString() + { + return _consumerTagString; + } + + // ----- Interface org.apache.qpid.client.util.MessageListener /** @@ -142,7 +151,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By { if (isMessageListenerSet() && ! getSession().prefetch()) { - _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), + _0_10session.getQpidSession().messageFlow(getConsumerTagString(), MessageCreditUnit.MESSAGE, 1); } _logger.debug("messageOk, trying to notify"); @@ -155,52 +164,19 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By /** * This method is invoked by the transport layer when a message is delivered for this * consumer. The message is transformed and pass to the session. - * @param message an 0.10 message + * @param xfr an 0.10 message transfer */ - public void onMessage(Message message) + public void messageTransfer(MessageTransfer xfr) + + //public void onMessage(Message message) { int channelId = getSession().getChannelId(); - long deliveryId = message.getMessageTransferId(); - AMQShortString consumerTag = getConsumerTag(); - AMQShortString exchange; - AMQShortString routingKey; - boolean redelivered = false; - Struct[] headers = {message.getMessageProperties(), message.getDeliveryProperties()}; - if (headers[0] == null) { - headers[0] = new MessageProperties(); - } - if( message.getDeliveryProperties() != null ) - { - exchange = new AMQShortString(message.getDeliveryProperties().getExchange()); - routingKey = new AMQShortString(message.getDeliveryProperties().getRoutingKey()); - redelivered = message.getDeliveryProperties().getRedelivered(); - } - else - { - exchange = new AMQShortString(""); - routingKey = new AMQShortString(""); - headers[1] = new DeliveryProperties(); - } + int consumerTag = getConsumerTag(); + UnprocessedMessage_0_10 newMessage = - new UnprocessedMessage_0_10(channelId, deliveryId, consumerTag, exchange, routingKey, redelivered); - try - { - newMessage.receiveBody(message.readData()); - } - catch (IOException e) - { - getSession().getAMQConnection().exceptionReceived(e); - } - // if there is a replyto destination then we need to request the exchange info - ReplyTo replyTo = ((MessageProperties) headers[0]).getReplyTo(); - if (replyTo != null && replyTo.getExchange() != null && !replyTo.getExchange().equals("")) - { - // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']* - // the exchnage class will be set later from within the sesion thread - String replyToUrl = replyTo.getExchange() + "/" + replyTo.getRoutingKey() + "/" + replyTo.getRoutingKey(); - newMessage.setReplyToURL(replyToUrl); - } - newMessage.setContentHeader(headers); + new UnprocessedMessage_0_10(consumerTag, xfr); + + getSession().messageReceived(newMessage); // else ignore this message } @@ -213,47 +189,16 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By */ @Override void sendCancel() throws AMQException { - ((AMQSession_0_10) getSession()).getQpidSession().messageCancel(getConsumerTag().toString()); + ((AMQSession_0_10) getSession()).getQpidSession().messageCancel(getConsumerTagString()); ((AMQSession_0_10) getSession()).getQpidSession().sync(); // confirm cancel getSession().confirmConsumerCancelled(getConsumerTag()); ((AMQSession_0_10) getSession()).getCurrentException(); } - @Override void notifyMessage(UnprocessedMessage messageFrame) + @Override void notifyMessage(UnprocessedMessage_0_10 messageFrame) { - // if there is a replyto destination then we need to request the exchange info - String replyToURL = messageFrame.getReplyToURL(); - if (replyToURL != null && !replyToURL.equals("")) - { - AMQShortString shortExchangeName = new AMQShortString( replyToURL.substring(0, replyToURL.indexOf('/'))); - String replyToUrl = "://" + replyToURL; - if (shortExchangeName.equals(ExchangeDefaults.TOPIC_EXCHANGE_NAME)) - { - replyToUrl = ExchangeDefaults.TOPIC_EXCHANGE_CLASS + replyToUrl; - } - else if (shortExchangeName.equals(ExchangeDefaults.DIRECT_EXCHANGE_NAME)) - { - replyToUrl = ExchangeDefaults.DIRECT_EXCHANGE_CLASS + replyToUrl; - } - else if (shortExchangeName.equals(ExchangeDefaults.HEADERS_EXCHANGE_NAME)) - { - replyToUrl = ExchangeDefaults.HEADERS_EXCHANGE_CLASS + replyToUrl; - } - else if (shortExchangeName.equals(ExchangeDefaults.FANOUT_EXCHANGE_NAME)) - { - replyToUrl = ExchangeDefaults.FANOUT_EXCHANGE_CLASS + replyToUrl; - } - else - { - Future<ExchangeQueryResult> future = - ((AMQSession_0_10) getSession()).getQpidSession().exchangeQuery(shortExchangeName.toString()); - ExchangeQueryResult res = future.get(); - // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']* - replyToUrl = res.getType() + replyToUrl; - } - ((UnprocessedMessage_0_10) messageFrame).setReplyToURL(replyToUrl); - } + super.notifyMessage(messageFrame); } @@ -267,11 +212,10 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By } @Override public AbstractJMSMessage createJMSMessageFromUnprocessedMessage( - AMQMessageDelegateFactory delegateFactory, UnprocessedMessage<Struct[], ByteBuffer> messageFrame) throws Exception + AMQMessageDelegateFactory delegateFactory, UnprocessedMessage_0_10 msg) throws Exception { - return _messageFactory.createMessage(messageFrame.getDeliveryTag(), messageFrame.isRedelivered(), - messageFrame.getContentHeader(), messageFrame.getBodies() - ); + AMQMessageDelegate_0_10.updateExchangeTypeMapping(msg.getMessageTransfer().getHeader(), ((AMQSession_0_10)getSession()).getQpidSession()); + return _messageFactory.createMessage(msg.getMessageTransfer()); } // private methods @@ -327,7 +271,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By // and messages are not prefetched we then need to request another one if(! getSession().prefetch()) { - _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), + _0_10session.getQpidSession().messageFlow(getConsumerTagString(), MessageCreditUnit.MESSAGE, 1); } } @@ -415,7 +359,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By super.setMessageListener(messageListener); if (messageListener != null && ! getSession().prefetch()) { - _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), + _0_10session.getQpidSession().messageFlow(getConsumerTagString(), MessageCreditUnit.MESSAGE, 1); } if (messageListener != null && !_synchronousQueue.isEmpty()) @@ -440,7 +384,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By _isStarted = true; if (_syncReceive.get()) { - _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), + _0_10session.getQpidSession().messageFlow(getConsumerTagString(), MessageCreditUnit.MESSAGE, 1); } } @@ -463,7 +407,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By { if (isStrated() && ! getSession().prefetch() && _synchronousQueue.isEmpty()) { - _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), + _0_10session.getQpidSession().messageFlow(getConsumerTagString(), MessageCreditUnit.MESSAGE, 1); } if (! getSession().prefetch()) @@ -486,4 +430,5 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By _session.acknowledgeMessage(msg.getDeliveryTag(), false); } } + } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java index e601f454a4..494a8fb43d 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java @@ -26,22 +26,14 @@ import javax.jms.JMSException; import org.apache.qpid.AMQException; import org.apache.qpid.QpidException; import org.apache.qpid.client.failover.FailoverException; -import org.apache.qpid.client.message.AbstractJMSMessage; -import org.apache.qpid.client.message.MessageFactoryRegistry; -import org.apache.qpid.client.message.UnprocessedMessage; -import org.apache.qpid.client.message.AMQMessageDelegateFactory; +import org.apache.qpid.client.message.*; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.filter.JMSSelectorFilter; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.BasicCancelBody; -import org.apache.qpid.framing.BasicCancelOkBody; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<ContentHeaderBody,ContentBody> +public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMessage_0_8> { protected final Logger _logger = LoggerFactory.getLogger(getClass()); @@ -69,7 +61,7 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<ContentHeader void sendCancel() throws AMQException, FailoverException { - BasicCancelBody body = getSession().getMethodRegistry().createBasicCancelBody(_consumerTag, false); + BasicCancelBody body = getSession().getMethodRegistry().createBasicCancelBody(new AMQShortString(String.valueOf(_consumerTag)), false); final AMQFrame cancelFrame = body.generateFrame(_channelId); @@ -81,7 +73,7 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<ContentHeader } } - public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, UnprocessedMessage<ContentHeaderBody, ContentBody> messageFrame)throws Exception + public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, UnprocessedMessage_0_8 messageFrame)throws Exception { return _messageFactory.createMessage(messageFrame.getDeliveryTag(), @@ -90,4 +82,4 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<ContentHeader } -}
\ No newline at end of file +} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index 2f06dc9d39..02c5526e03 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -22,6 +22,7 @@ import java.net.URISyntaxException; import java.util.HashMap; import java.util.Map; import java.util.UUID; +import java.nio.ByteBuffer; import javax.jms.JMSException; import javax.jms.Message; @@ -36,11 +37,8 @@ import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.nclient.util.ByteBufferMessage; import org.apache.qpid.njms.ExceptionHelper; -import org.apache.qpid.transport.DeliveryProperties; -import org.apache.qpid.transport.MessageDeliveryMode; -import org.apache.qpid.transport.MessageDeliveryPriority; -import org.apache.qpid.transport.MessageProperties; -import org.apache.qpid.transport.ReplyTo; +import org.apache.qpid.transport.*; +import static org.apache.qpid.transport.Option.*; /** * This is a 0_10 message producer. @@ -76,21 +74,8 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer AMQMessageDelegate_0_10 delegate = (AMQMessageDelegate_0_10) message.getDelegate(); - org.apache.qpid.api.Message underlyingMessage = message.get010Message(); - if (underlyingMessage == null) - { - underlyingMessage = new ByteBufferMessage(delegate.getMessageProperties(), delegate.getDeliveryProperties()); - message.set010Message(underlyingMessage); - - } - // force a rebuild of the 0-10 message if data has changed - if (message.getData() == null) - { - message.dataChanged(); - } - - DeliveryProperties deliveryProp = underlyingMessage.getDeliveryProperties(); - MessageProperties messageProps = underlyingMessage.getMessageProperties(); + DeliveryProperties deliveryProp = delegate.getDeliveryProperties(); + MessageProperties messageProps = delegate.getMessageProperties(); if (messageId != null) { @@ -140,10 +125,10 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer deliveryProp.setPriority(MessageDeliveryPriority.get((short) priority)); message.setJMSPriority(priority); } - String excahngeName = destination.getExchangeName().toString(); - if ( deliveryProp.getExchange() == null || ! deliveryProp.getExchange().equals(excahngeName)) + String exchangeName = destination.getExchangeName().toString(); + if ( deliveryProp.getExchange() == null || ! deliveryProp.getExchange().equals(exchangeName)) { - deliveryProp.setExchange(excahngeName); + deliveryProp.setExchange(exchangeName); } String routingKey = destination.getRoutingKey().toString(); if (deliveryProp.getRoutingKey() == null || ! deliveryProp.getRoutingKey().equals(routingKey)) @@ -153,63 +138,27 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer messageProps.setContentLength(message.getContentLength()); - - /* String replyToURL = contentHeaderProperties.getReplyToAsString(); - if (replyToURL != null) - { - if(_logger.isDebugEnabled()) - { - StringBuffer b = new StringBuffer(); - b.append("\n=========================="); - b.append("\nReplyTo : " + replyToURL); - b.append("\n=========================="); - _logger.debug(b.toString()); - } - AMQBindingURL dest; - try - { - dest = new AMQBindingURL(replyToURL); - } - catch (URISyntaxException e) - { - throw ExceptionHelper.convertQpidExceptionToJMSException(e); - } - messageProps.setReplyTo(new ReplyTo(dest.getExchangeName().toString(), dest.getRoutingKey().toString())); - } -*/ - - // send the message try { - org.apache.qpid.nclient.Session ssn = ((AMQSession_0_10) getSession()).getQpidSession(); + org.apache.qpid.transport.Session ssn = (org.apache.qpid.transport.Session) + ((AMQSession_0_10) getSession()).getQpidSession(); // if true, we need to sync the delivery of this message boolean sync = (deliveryMode == DeliveryMode.PERSISTENT && getSession().getAMQConnection().getSyncPersistence()); + org.apache.mina.common.ByteBuffer data = message.getData(); + ByteBuffer buffer = data == null ? ByteBuffer.allocate(0) : data.buf().slice(); + + ssn.messageTransfer(destination.getExchangeName().toString(), MessageAcceptMode.NONE, + MessageAcquireMode.PRE_ACQUIRED, + new Header(deliveryProp, messageProps), + buffer, sync ? SYNC : NONE); if (sync) { - ssn.setAutoSync(true); - } - try - { - ssn.messageTransfer(destination.getExchangeName().toString(), - underlyingMessage, - ssn.TRANSFER_CONFIRM_MODE_NOT_REQUIRED, - ssn.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE); + ssn.sync(); } - finally - { - if (sync) - { - ssn.setAutoSync(false); - } - } - } - catch (IOException e) - { - throw ExceptionHelper.convertQpidExceptionToJMSException(e); } catch (RuntimeException rte) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java b/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java index 29fa03fd1d..9bdef22f96 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java @@ -32,20 +32,20 @@ import javax.jms.TopicSubscriber; * Wraps a MessageConsumer to fulfill the extended TopicSubscriber contract * */ -class TopicSubscriberAdaptor implements TopicSubscriber +class TopicSubscriberAdaptor<C extends BasicMessageConsumer> implements TopicSubscriber { private final Topic _topic; - private final BasicMessageConsumer _consumer; + private final C _consumer; private final boolean _noLocal; - TopicSubscriberAdaptor(Topic topic, BasicMessageConsumer consumer, boolean noLocal) + TopicSubscriberAdaptor(Topic topic, C consumer, boolean noLocal) { _topic = topic; _consumer = consumer; _noLocal = noLocal; } - TopicSubscriberAdaptor(Topic topic, BasicMessageConsumer consumer) + TopicSubscriberAdaptor(Topic topic, C consumer) { this(topic, consumer, consumer.isNoLocal()); } @@ -103,7 +103,7 @@ class TopicSubscriberAdaptor implements TopicSubscriber } private void checkPreConditions() throws javax.jms.IllegalStateException{ - BasicMessageConsumer msgConsumer = (BasicMessageConsumer)_consumer; + C msgConsumer = _consumer; if (msgConsumer.isClosed() ){ throw new javax.jms.IllegalStateException("Consumer is closed"); @@ -120,7 +120,7 @@ class TopicSubscriberAdaptor implements TopicSubscriber } } - BasicMessageConsumer getMessageConsumer() + C getMessageConsumer() { return _consumer; } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java index 6029e7c171..6237234c4d 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java @@ -43,13 +43,12 @@ public class BasicDeliverMethodHandler implements StateAwareMethodListener<Basic throws AMQException { final UnprocessedMessage_0_8 msg = new UnprocessedMessage_0_8( - channelId, body.getDeliveryTag(), - body.getConsumerTag(), + body.getConsumerTag().toIntValue(), body.getExchange(), body.getRoutingKey(), body.getRedelivered()); _logger.debug("New JmsDeliver method received:" + session); - session.unprocessedMessageReceived(msg); + session.unprocessedMessageReceived(channelId, msg); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java index 5731fb7473..3bbc9209c5 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java @@ -45,14 +45,14 @@ public class BasicReturnMethodHandler implements StateAwareMethodListener<BasicR throws AMQException { _logger.debug("New JmsBounce method received"); - final ReturnMessage msg = new ReturnMessage(channelId, + final ReturnMessage msg = new ReturnMessage( body.getExchange(), body.getRoutingKey(), body.getReplyText(), body.getReplyCode() ); - session.unprocessedMessageReceived(msg); + session.unprocessedMessageReceived(channelId, msg); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java index 97cb2baee4..bfb7b6a9ce 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java @@ -22,19 +22,14 @@ package org.apache.qpid.client.message; import org.apache.commons.collections.map.ReferenceMap; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.CustomJMSXProperty; -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQTopic; -import org.apache.qpid.client.AMQUndefinedDestination; -import org.apache.qpid.client.JMSAMQException; +import org.apache.qpid.client.*; import org.apache.qpid.framing.ContentHeaderProperties; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.AMQException; import org.apache.qpid.AMQPInvalidClassException; +import org.apache.qpid.nclient.*; import org.apache.qpid.jms.Message; import org.apache.qpid.url.BindingURL; import org.apache.qpid.url.AMQBindingURL; @@ -47,8 +42,10 @@ import javax.jms.MessageNotWriteableException; import javax.jms.MessageFormatException; import javax.jms.DeliveryMode; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; import java.net.URISyntaxException; import java.nio.charset.Charset; +import org.apache.qpid.exchange.ExchangeDefaults; public class AMQMessageDelegate_0_10 implements AMQMessageDelegate { @@ -68,27 +65,32 @@ public class AMQMessageDelegate_0_10 implements AMQMessageDelegate private AMQSession _session; private final long _deliveryTag; - private static Map<String,Integer> _exchangeTypeMap = new HashMap<String, Integer>(); + private static Map<AMQShortString,Integer> _exchangeTypeMap = new ConcurrentHashMap<AMQShortString, Integer>(); + private static Map<String,Integer> _exchangeTypeStringMap = new ConcurrentHashMap<String, Integer>(); + private static Map<String, Integer> _exchangeTypeToDestinationType = new ConcurrentHashMap<String, Integer>();; static { - // TODO - XXX - Need to add to this map when we find an exchange we don't know about + _exchangeTypeMap.put(ExchangeDefaults.DIRECT_EXCHANGE_NAME, AMQDestination.QUEUE_TYPE); + _exchangeTypeMap.put(AMQShortString.EMPTY_STRING, AMQDestination.QUEUE_TYPE); + _exchangeTypeMap.put(ExchangeDefaults.TOPIC_EXCHANGE_NAME, AMQDestination.TOPIC_TYPE); + _exchangeTypeMap.put(ExchangeDefaults.FANOUT_EXCHANGE_NAME, AMQDestination.TOPIC_TYPE); - _exchangeTypeMap.put(null, AMQDestination.QUEUE_TYPE); - _exchangeTypeMap.put("amq.direct", AMQDestination.QUEUE_TYPE); - _exchangeTypeMap.put("", AMQDestination.QUEUE_TYPE); - _exchangeTypeMap.put("amq.topic", AMQDestination.TOPIC_TYPE); - _exchangeTypeMap.put("amq.fanout", AMQDestination.TOPIC_TYPE); + _exchangeTypeStringMap.put(ExchangeDefaults.DIRECT_EXCHANGE_NAME.toString(), AMQDestination.QUEUE_TYPE); + _exchangeTypeStringMap.put("", AMQDestination.QUEUE_TYPE); + _exchangeTypeStringMap.put(ExchangeDefaults.TOPIC_EXCHANGE_NAME.toString(), AMQDestination.TOPIC_TYPE); + _exchangeTypeStringMap.put(ExchangeDefaults.FANOUT_EXCHANGE_NAME.toString(), AMQDestination.TOPIC_TYPE); - + + _exchangeTypeToDestinationType.put(ExchangeDefaults.DIRECT_EXCHANGE_CLASS.toString(), AMQDestination.QUEUE_TYPE); + _exchangeTypeToDestinationType.put(ExchangeDefaults.TOPIC_EXCHANGE_NAME.toString(), AMQDestination.TOPIC_TYPE); + _exchangeTypeToDestinationType.put(ExchangeDefaults.FANOUT_EXCHANGE_NAME.toString(), AMQDestination.TOPIC_TYPE); } protected AMQMessageDelegate_0_10() { this(new MessageProperties(), new DeliveryProperties(), -1); _readableProperties = false; - - } protected AMQMessageDelegate_0_10(long deliveryTag, MessageProperties messageProps, DeliveryProperties deliveryProps, AMQShortString exchange, @@ -99,13 +101,68 @@ public class AMQMessageDelegate_0_10 implements AMQMessageDelegate AMQDestination dest; - dest = new AMQUndefinedDestination(exchange, routingKey, null); - - // Destination dest = AMQDestination.createDestination(url); + dest = generateDestination(exchange, routingKey); setJMSDestination(dest); + } + private AMQDestination generateDestination(AMQShortString exchange, AMQShortString routingKey) + { + AMQDestination dest; + switch(getExchangeType(exchange)) + { + case AMQDestination.QUEUE_TYPE: + dest = new AMQQueue(exchange, routingKey, routingKey); + break; + case AMQDestination.TOPIC_TYPE: + dest = new AMQTopic(exchange, routingKey, null); + break; + default: + dest = new AMQUndefinedDestination(exchange, routingKey, null); + } + return dest; + } + + private int getExchangeType(AMQShortString exchange) + { + Integer type = _exchangeTypeMap.get(exchange == null ? AMQShortString.EMPTY_STRING : exchange); + + if(type == null) + { + return AMQDestination.UNKNOWN_TYPE; + } + + + return type; + } + + + public static void updateExchangeTypeMapping(Header header, org.apache.qpid.nclient.Session session) + { + DeliveryProperties deliveryProps = header.get(DeliveryProperties.class); + if(deliveryProps != null) + { + String exchange = deliveryProps.getExchange(); + + if(exchange != null && !_exchangeTypeStringMap.containsKey(exchange)) + { + + AMQShortString exchangeShortString = new AMQShortString(exchange); + Future<ExchangeQueryResult> future = + session.exchangeQuery(exchange.toString()); + ExchangeQueryResult res = future.get(); + + Integer type = _exchangeTypeToDestinationType.get(res.getType()); + if(type == null) + { + type = AMQDestination.UNKNOWN_TYPE; + } + _exchangeTypeStringMap.put(exchange, type); + _exchangeTypeMap.put(exchangeShortString, type); + + } + } } protected AMQMessageDelegate_0_10(MessageProperties messageProps, DeliveryProperties deliveryProps, long deliveryTag) @@ -212,19 +269,10 @@ public class AMQMessageDelegate_0_10 implements AMQMessageDelegate String exchange = replyTo.getExchange(); String routingKey = replyTo.getRoutingKey(); - int type = _exchangeTypeMap.get(exchange); + dest = generateDestination(exchange == null ? null : new AMQShortString(exchange), + routingKey == null ? null : new AMQShortString(routingKey)); + - switch(type) - { - case AMQDestination.QUEUE_TYPE: - dest = new AMQQueue(new AMQShortString(exchange), new AMQShortString(routingKey), new AMQShortString(routingKey)); - break; - case AMQDestination.TOPIC_TYPE: - dest = new AMQTopic(new AMQShortString(exchange), new AMQShortString(routingKey), null); - break; - default: - dest = new AMQUndefinedDestination(new AMQShortString(exchange), new AMQShortString(routingKey), null); - } @@ -897,4 +945,5 @@ public class AMQMessageDelegate_0_10 implements AMQMessageDelegate { return _deliveryProps; } + } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index 54da7c4404..0700ce5d23 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -44,17 +44,11 @@ public abstract class AbstractJMSMessage implements org.apache.qpid.jms.Message protected boolean _readableMessage = false; protected boolean _changedData = true; - - /** - * This is 0_10 specific - */ - private org.apache.qpid.api.Message _010message = null; /** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */ - private AMQMessageDelegate _delegate; private boolean _redelivered; @@ -454,52 +448,10 @@ public abstract class AbstractJMSMessage implements org.apache.qpid.jms.Message else { _data.flip(); - dataChanged(); _changedData = false; } } - public void set010Message(org.apache.qpid.api.Message m ) - { - _010message = m; - } - - public void dataChanged() - { - if (_010message != null) - { - _010message.clearData(); - try - { - if (_data != null) - { - _010message.appendData(_data.buf().slice()); - } - else - { - _010message.appendData(java.nio.ByteBuffer.allocate(0)); - } - } - catch (IOException e) - { - throw new RuntimeException(e); - } - } - } - - /** - * End 010 specific - */ - - public org.apache.qpid.api.Message get010Message() - { - return _010message; - } - - - - - public int getContentLength() { if(_data != null) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java index 5e16c765af..54a845ceef 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java @@ -110,17 +110,17 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory protected AbstractJMSMessage create010MessageWithBody(long messageNbr, Struct[] contentHeader, - List bodies) throws AMQException + java.nio.ByteBuffer body) throws AMQException { ByteBuffer data; final boolean debug = _logger.isDebugEnabled(); - // we optimise the non-fragmented case to avoid copying - if ((bodies != null)) + + if (body != null) { - data = ByteBuffer.wrap((java.nio.ByteBuffer) bodies.get(0)); + data = ByteBuffer.wrap(body); } - else // bodies == null + else // body == null { data = ByteBuffer.allocate(0); } @@ -164,11 +164,11 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory } public AbstractJMSMessage createMessage(long messageNbr, boolean redelivered, Struct[] contentHeader, - List bodies) + java.nio.ByteBuffer body) throws JMSException, AMQException { final AbstractJMSMessage msg = - create010MessageWithBody(messageNbr, contentHeader, bodies); + create010MessageWithBody(messageNbr, contentHeader, body); msg.setJMSRedelivered(redelivered); msg.receivedFromServer(); return msg; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/CloseConsumerMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/CloseConsumerMessage.java new file mode 100644 index 0000000000..4af04912e5 --- /dev/null +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/CloseConsumerMessage.java @@ -0,0 +1,43 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.message; + +import org.apache.qpid.client.BasicMessageConsumer; + +public final class CloseConsumerMessage extends UnprocessedMessage +{ + + public CloseConsumerMessage(BasicMessageConsumer consumer) + { + super(consumer.getConsumerTag()); + } + + + public long getDeliveryTag() + { + return 0; + } + + public boolean isRedelivered() + { + return false; + } +} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java index 470439d85c..c290149cef 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java @@ -66,17 +66,6 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text } - JMSTextMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data) throws JMSException - { - this(delegateFactory, data, null); - } - - JMSTextMessage(AMQMessageDelegateFactory delegateFactory, String text) throws JMSException - { - super(delegateFactory, (ByteBuffer) null); - setText(text); - } - public void clearBodyImpl() throws JMSException { if (_data != null) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java index 424dccc0c3..e1275c37f7 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java @@ -40,7 +40,7 @@ public interface MessageFactory AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered, Struct[] contentHeader, - List bodies) + java.nio.ByteBuffer body) throws JMSException, AMQException; AbstractJMSMessage createMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java index 213d3ebc9e..948d6d0d7d 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java @@ -23,6 +23,7 @@ package org.apache.qpid.client.message; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.nio.ByteBuffer; import javax.jms.JMSException; @@ -32,6 +33,8 @@ import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.transport.Struct; import org.apache.qpid.transport.MessageProperties; +import org.apache.qpid.transport.MessageTransfer; +import org.apache.qpid.transport.DeliveryProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -118,24 +121,30 @@ public class MessageFactoryRegistry } } - public AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered, - Struct[] contentHeader, List bodies) throws AMQException, JMSException + public AbstractJMSMessage createMessage(MessageTransfer transfer) throws AMQException, JMSException { - MessageProperties mprop = (MessageProperties) contentHeader[0]; + + MessageProperties mprop = transfer.getHeader().get(MessageProperties.class); String messageType = mprop.getContentType(); if (messageType == null) { _logger.debug("no message type specified, building a byte message"); messageType = JMSBytesMessage.MIME_TYPE; } - MessageFactory mf = _mimeShortStringToFactoryMap.get(new AMQShortString(messageType)); + MessageFactory mf = _mimeStringToFactoryMap.get(messageType); if (mf == null) { throw new AMQException(null, "Unsupport MIME type of " + messageType, null); } else { - return mf.createMessage(deliveryTag, redelivered, contentHeader, bodies); + boolean redelivered = false; + DeliveryProperties deliverProps; + if((deliverProps = transfer.getHeader().get(DeliveryProperties.class)) != null) + { + redelivered = deliverProps.getRedelivered(); + } + return mf.createMessage(transfer.getId(), redelivered, transfer.getHeader().getStructs(), transfer.getBody()); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java index c866a5028e..ed590772d9 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java @@ -7,9 +7,9 @@ public class ReturnMessage extends UnprocessedMessage_0_8 final private AMQShortString _replyText; final private int _replyCode; - public ReturnMessage(int channelId,AMQShortString exchange,AMQShortString routingKey,AMQShortString replyText,int replyCode) + public ReturnMessage(AMQShortString exchange, AMQShortString routingKey, AMQShortString replyText, int replyCode) { - super(channelId,-1,null,exchange,routingKey,false); + super(-1,0,exchange,routingKey,false); _replyText = replyText; _replyCode = replyCode; } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java index 3dbef46f6f..713c87260c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java @@ -20,23 +20,7 @@ */ package org.apache.qpid.client.message; -import java.util.List; - -import org.apache.qpid.framing.AMQShortString; -import org.apache.mina.common.ByteBuffer; -import org.apache.qpid.AMQChannelException; -import org.apache.qpid.AMQConnectionException; -import org.apache.qpid.AMQException; import org.apache.qpid.client.BasicMessageConsumer; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicDeliverBody; -import org.apache.qpid.framing.BasicReturnBody; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.MethodDispatcher; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; /** @@ -46,217 +30,24 @@ import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; * Note that the actual work of creating a JMS message for the client code's use is done outside of the MINA dispatcher * thread in order to minimise the amount of work done in the MINA dispatcher thread. */ -public abstract class UnprocessedMessage<H,B> +public abstract class UnprocessedMessage { - private final int _channelId; - private final long _deliveryId; - private final AMQShortString _consumerTag; - protected AMQShortString _exchange; - protected AMQShortString _routingKey; - protected boolean _redelivered; + private final int _consumerTag; + - public UnprocessedMessage(int channelId,long deliveryId,AMQShortString consumerTag,AMQShortString exchange,AMQShortString routingKey,boolean redelivered) + public UnprocessedMessage(int consumerTag) { - _channelId = channelId; - _deliveryId = deliveryId; _consumerTag = consumerTag; - _exchange = exchange; - _routingKey = routingKey; - _redelivered = redelivered; } - public abstract void receiveBody(B nativeMessageBody); - - public abstract void setContentHeader(H nativeMessageHeader); - public int getChannelId() - { - return _channelId; - } + abstract public long getDeliveryTag(); - public long getDeliveryTag() - { - return _deliveryId; - } - public AMQShortString getConsumerTag() + public int getConsumerTag() { return _consumerTag; } - public AMQShortString getExchange() - { - return _exchange; - } - - public AMQShortString getRoutingKey() - { - return _routingKey; - } - - public boolean isRedelivered() - { - return _redelivered; - } - public abstract List<B> getBodies(); - - public abstract H getContentHeader(); - - // specific to 0_10 - public String getReplyToURL() - { - return ""; - } - - public static final class CloseConsumerMessage extends UnprocessedMessage - { - AMQShortString _consumerTag; - - public CloseConsumerMessage(int channelId, long deliveryId, AMQShortString consumerTag, - AMQShortString exchange, AMQShortString routingKey, boolean redelivered) - { - super(channelId, deliveryId, consumerTag, exchange, routingKey, redelivered); - _consumerTag = consumerTag; - } - - public CloseConsumerMessage(BasicMessageConsumer consumer) - { - this(0, 0, consumer.getConsumerTag(), null, null, false); - } - - public BasicDeliverBody getDeliverBody() - { - return new BasicDeliverBody() - { - - public AMQShortString getConsumerTag() - { - return _consumerTag; - } - - public long getDeliveryTag() - { - return 0; - } - - public AMQShortString getExchange() - { - return null; - } - public boolean getRedelivered() - { - return false; - } - - public AMQShortString getRoutingKey() - { - return null; - } - - public boolean execute(MethodDispatcher methodDispatcher, int channelId) throws AMQException - { - return false; - } - - public AMQFrame generateFrame(int channelId) - { - return null; - } - - public AMQChannelException getChannelException(AMQConstant code, String message) - { - return null; - } - - public AMQChannelException getChannelException(AMQConstant code, String message, Throwable cause) - { - return null; - } - - public AMQChannelException getChannelNotFoundException(int channelId) - { - return null; - } - - public int getClazz() - { - return 0; - } - - public AMQConnectionException getConnectionException(AMQConstant code, String message) - { - return null; - } - - public AMQConnectionException getConnectionException(AMQConstant code, String message, Throwable cause) - { - return null; - } - - public byte getMajor() - { - return 0; - } - - public int getMethod() - { - return 0; - } - - public byte getMinor() - { - return 0; - } - - public int getSize() - { - return 0; - } - - public void writeMethodPayload(ByteBuffer buffer) - { - } - - public void writePayload(ByteBuffer buffer) - { - } - - public byte getFrameType() - { - return 0; - } - - public void handle(int channelId, AMQVersionAwareProtocolSession amqMinaProtocolSession) - throws AMQException - { - - } - }; - } - - @Override - public List getBodies() - { - return null; - } - - @Override - public Object getContentHeader() - { - return null; - } - - @Override - public void receiveBody(Object nativeMessageBody) - { - - } - - @Override - public void setContentHeader(Object nativeMessageHeader) - { - - } - } }
\ No newline at end of file diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java index 5093064edf..6b1301a33f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java @@ -20,13 +20,7 @@ */ package org.apache.qpid.client.message; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; - -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.transport.DeliveryProperties; -import org.apache.qpid.transport.Struct; +import org.apache.qpid.transport.MessageTransfer; /** * This class contains everything needed to process a JMS message. It assembles the deliver body, the content header and @@ -35,58 +29,25 @@ import org.apache.qpid.transport.Struct; * Note that the actual work of creating a JMS message for the client code's use is done outside of the MINA dispatcher * thread in order to minimise the amount of work done in the MINA dispatcher thread. */ -public class UnprocessedMessage_0_10 extends UnprocessedMessage<Struct[],ByteBuffer> +public class UnprocessedMessage_0_10 extends UnprocessedMessage { - private Struct[] _headers; - private String _replyToURL; - - /** List of ContentBody instances. Due to fragmentation you don't know how big this will be in general */ - private List<ByteBuffer> _bodies = new ArrayList<ByteBuffer>(); - - public UnprocessedMessage_0_10(int channelId,long deliveryId,AMQShortString consumerTag,AMQShortString exchange,AMQShortString routingKey,boolean redelivered) - { - super(channelId,deliveryId,consumerTag,exchange,routingKey,redelivered); - } - - public void receiveBody(ByteBuffer body) - { - - _bodies.add(body); - } - - public void setContentHeader(Struct[] headers) - { - this._headers = headers; - for(Struct s: headers) - { - if (s instanceof DeliveryProperties) - { - DeliveryProperties props = (DeliveryProperties)s; - _exchange = new AMQShortString(props.getExchange()); - _routingKey = new AMQShortString(props.getRoutingKey()); - _redelivered = props.getRedelivered(); - } - } - } + private MessageTransfer _transfer; - public Struct[] getContentHeader() + public UnprocessedMessage_0_10(int consumerTag, MessageTransfer xfr) { - return _headers; - } - - public List<ByteBuffer> getBodies() - { - return _bodies; + super(consumerTag); + _transfer = xfr; } // additional 0_10 method - public String getReplyToURL() + + public long getDeliveryTag() { - return _replyToURL; + return _transfer.getId(); } - public void setReplyToURL(String url) + public MessageTransfer getMessageTransfer() { - _replyToURL = url; + return _transfer; } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java index 78da8cdca2..685e646d85 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java @@ -26,7 +26,6 @@ import java.util.List; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicDeliverBody; -import org.apache.qpid.framing.BasicReturnBody; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; @@ -37,32 +36,54 @@ import org.apache.qpid.framing.ContentHeaderBody; * Note that the actual work of creating a JMS message for the client code's use is done outside of the MINA dispatcher * thread in order to minimise the amount of work done in the MINA dispatcher thread. */ -public class UnprocessedMessage_0_8 extends UnprocessedMessage<ContentHeaderBody,ContentBody> +public class UnprocessedMessage_0_8 extends UnprocessedMessage { private long _bytesReceived = 0; + + private AMQShortString _exchange; + private AMQShortString _routingKey; + private final long _deliveryId; + protected boolean _redelivered; + private BasicDeliverBody _deliverBody; private ContentHeaderBody _contentHeader; /** List of ContentBody instances. Due to fragmentation you don't know how big this will be in general */ private List<ContentBody> _bodies; - public UnprocessedMessage_0_8(int channelId,long deliveryId,AMQShortString consumerTag,AMQShortString exchange,AMQShortString routingKey,boolean redelivered) + public UnprocessedMessage_0_8(long deliveryId, int consumerTag, AMQShortString exchange, AMQShortString routingKey, boolean redelivered) + { + super(consumerTag); + _exchange = exchange; + _routingKey = routingKey; + + _redelivered = redelivered; + _deliveryId = deliveryId; + } + + + public AMQShortString getExchange() + { + return _exchange; + } + + public AMQShortString getRoutingKey() { - super(channelId,deliveryId,consumerTag,exchange,routingKey,redelivered); + return _routingKey; } - public UnprocessedMessage_0_8(int channelId, BasicReturnBody body) + public long getDeliveryTag() { - //FIXME: TGM, SRSLY 4RL - super(channelId, 0, null, body.getExchange(), body.getRoutingKey(), false); + return _deliveryId; } - public UnprocessedMessage_0_8(int channelId, BasicDeliverBody body) + public boolean isRedelivered() { - super(channelId, body.getDeliveryTag(), body.getConsumerTag(), body.getExchange(), body.getRoutingKey(), false); + return _redelivered; } + public void receiveBody(ContentBody body) { @@ -124,7 +145,7 @@ public class UnprocessedMessage_0_8 extends UnprocessedMessage<ContentHeaderBody public String toString() { StringBuilder buf = new StringBuilder(); - buf.append("Channel Id : " + this.getChannelId()); + if (_contentHeader != null) { buf.append("ContentHeader " + _contentHeader); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index 6c3ae06ce9..d6c7f01e2d 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -224,9 +224,8 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession * * @throws AMQException if this was not expected */ - public void unprocessedMessageReceived(UnprocessedMessage message) throws AMQException - { - final int channelId = message.getChannelId(); + public void unprocessedMessageReceived(final int channelId, UnprocessedMessage message) throws AMQException + { if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) { _channelId2UnprocessedMsgArray[channelId] = message; @@ -239,8 +238,8 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession public void contentHeaderReceived(int channelId, ContentHeaderBody contentHeader) throws AMQException { - final UnprocessedMessage msg = (channelId & FAST_CHANNEL_ACCESS_MASK) == 0 ? _channelId2UnprocessedMsgArray[channelId] - : _channelId2UnprocessedMsgMap.get(channelId); + final UnprocessedMessage_0_8 msg = (UnprocessedMessage_0_8) ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0 ? _channelId2UnprocessedMsgArray[channelId] + : _channelId2UnprocessedMsgMap.get(channelId)); if (msg == null) { @@ -290,15 +289,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession throw new AMQException(null, "Error: received content body without having received a ContentHeader frame first", null); } - /*try - {*/ msg.receiveBody(contentBody); - /*} - catch (UnexpectedBodyReceivedException e) - { - _channelId2UnprocessedMsgMap.remove(channelId); - throw e; - }*/ if (msg.isAllBodyDataReceived()) { @@ -478,7 +469,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession { final AMQSession session = getSession(channelId); - session.confirmConsumerCancelled(consumerTag); + session.confirmConsumerCancelled(consumerTag.toIntValue()); } public void setProtocolVersion(final ProtocolVersion pv) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java b/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java index 0fc39a9318..bddbc329ab 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java @@ -21,8 +21,10 @@ package org.apache.qpid.client.util; import java.util.Iterator; +import java.util.Queue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ConcurrentLinkedQueue; /** * A blocking queue that emits events above a user specified threshold allowing the caller to take action (e.g. flow @@ -35,7 +37,7 @@ import java.util.concurrent.LinkedBlockingQueue; public class FlowControllingBlockingQueue { /** This queue is bounded and is used to store messages before being dispatched to the consumer */ - private final BlockingQueue _queue = new LinkedBlockingQueue(); + private final Queue _queue = new ConcurrentLinkedQueue(); private final int _flowControlHighThreshold; private final int _flowControlLowThreshold; @@ -71,7 +73,17 @@ public class FlowControllingBlockingQueue public Object take() throws InterruptedException { - Object o = _queue.take(); + Object o = _queue.poll(); + if(o == null) + { + synchronized(this) + { + while((o = _queue.poll())==null) + { + wait(); + } + } + } if (_listener != null) { synchronized (_listener) @@ -88,7 +100,12 @@ public class FlowControllingBlockingQueue public void add(Object o) { - _queue.add(o); + synchronized(this) + { + _queue.add(o); + + notifyAll(); + } if (_listener != null) { synchronized (_listener) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/nclient/MessagePartListener.java b/qpid/java/client/src/main/java/org/apache/qpid/nclient/MessagePartListener.java index 2fdc74fc09..6f07dcb469 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/nclient/MessagePartListener.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/nclient/MessagePartListener.java @@ -20,6 +20,7 @@ package org.apache.qpid.nclient; import java.nio.ByteBuffer; import org.apache.qpid.transport.Header; +import org.apache.qpid.transport.MessageTransfer; /** * Assembles message parts. @@ -33,31 +34,13 @@ import org.apache.qpid.transport.Header; * are transferred. */ public interface MessagePartListener -{ - /** - * Indicates the Message transfer has started. - * - * @param transferId The message transfer ID. - */ - public void messageTransfer(int transferId); - - /** - * Add the following a header to the message being received. - * - * @param header Either <code>DeliveryProperties</code> or <code>ApplicationProperties</code> - */ - public void messageHeader(Header header); +{ /** - * Add the following byte array to the content of the message being received + * Inform the listener of the message transfer * - * @param src Data to be added or streamed. - */ - public void data(ByteBuffer src); - - /** - * Indicates that the message has been fully received. + * @param xfr the message transfer object */ - public void messageReceived(); + public void messageTransfer(MessageTransfer xfr); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java index adcd49c26d..620cf14c33 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java @@ -26,15 +26,7 @@ public class ClientSessionDelegate extends SessionDelegate { MessagePartListener listener = ((ClientSession)session).getMessageListeners() .get(xfr.getDestination()); - listener.messageTransfer(xfr.getId()); - listener.messageHeader(xfr.getHeader()); - ByteBuffer body = xfr.getBody(); - if (body == null) - { - body = ByteBuffer.allocate(0); - } - listener.data(body); - listener.messageReceived(); + listener.messageTransfer(xfr); } @Override public void messageReject(Session session, MessageReject struct) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/ByteBufferMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/ByteBufferMessage.java index 3b10c44d5a..64c89c960c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/ByteBufferMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/ByteBufferMessage.java @@ -2,8 +2,7 @@ package org.apache.qpid.nclient.util; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.LinkedList; -import java.util.Queue; +import java.util.*; import org.apache.qpid.transport.DeliveryProperties; import org.apache.qpid.transport.MessageProperties; @@ -22,7 +21,7 @@ import org.apache.qpid.api.Message; */ public class ByteBufferMessage implements Message { - private Queue<ByteBuffer> _data = new LinkedList<ByteBuffer>(); + private List<ByteBuffer> _data;// = new ArrayList<ByteBuffer>(); private ByteBuffer _readBuffer; private int _dataSize; private DeliveryProperties _currentDeliveryProps; @@ -76,7 +75,18 @@ public class ByteBufferMessage implements Message */ public void appendData(ByteBuffer src) throws IOException { - _data.offer(src); + if(_data == null) + { + _data = Collections.singletonList(src); + } + else + { + if(_data.size() == 1) + { + _data = new ArrayList<ByteBuffer>(_data); + } + _data.add(src); + } _dataSize += src.remaining(); } @@ -100,12 +110,12 @@ public class ByteBufferMessage implements Message _currentMessageProps = props; } - public void readData(byte[] target) throws IOException + public void readData(byte[] target) { getReadBuffer().get(target); } - public ByteBuffer readData() throws IOException + public ByteBuffer readData() { return getReadBuffer(); } @@ -115,7 +125,7 @@ public class ByteBufferMessage implements Message //optimize for the simple cases if(_data.size() == 1) { - _readBuffer = _data.element().duplicate(); + _readBuffer = _data.get(0).duplicate(); } else { @@ -128,7 +138,7 @@ public class ByteBufferMessage implements Message } } - private ByteBuffer getReadBuffer() throws IOException + private ByteBuffer getReadBuffer() { if (_readBuffer != null ) { @@ -143,7 +153,7 @@ public class ByteBufferMessage implements Message } else { - throw new IOException("No Data to read"); + return ByteBuffer.allocate(0); } } } @@ -151,16 +161,9 @@ public class ByteBufferMessage implements Message //hack for testing @Override public String toString() { - try - { - ByteBuffer temp = getReadBuffer(); - byte[] b = new byte[temp.remaining()]; - temp.get(b); - return new String(b); - } - catch(IOException e) - { - return "No data"; - } + ByteBuffer temp = getReadBuffer(); + byte[] b = new byte[temp.remaining()]; + temp.get(b); + return new String(b); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java b/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java index 3f1746f48a..0e54e04a99 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java @@ -3,9 +3,7 @@ package org.apache.qpid.nclient.util; import java.io.IOException; import java.nio.ByteBuffer; -import org.apache.qpid.transport.DeliveryProperties; -import org.apache.qpid.transport.MessageProperties; -import org.apache.qpid.transport.Header; +import org.apache.qpid.transport.*; import org.apache.qpid.nclient.MessagePartListener; /** @@ -26,16 +24,35 @@ public class MessagePartListenerAdapter implements MessagePartListener _adaptee = listener; } - public void messageTransfer(int transferId) + public void messageTransfer(MessageTransfer xfr) { - _currentMsg = new ByteBufferMessage(transferId); - } + _currentMsg = new ByteBufferMessage(xfr.getId()); + + for (Struct st : xfr.getHeader().getStructs()) + { + if(st instanceof DeliveryProperties) + { + _currentMsg.setDeliveryProperties((DeliveryProperties)st); + + } + else if(st instanceof MessageProperties) + { + _currentMsg.setMessageProperties((MessageProperties)st); + } + + } + + + ByteBuffer body = xfr.getBody(); + if (body == null) + { + body = ByteBuffer.allocate(0); + } + - public void data(ByteBuffer src) - { try { - _currentMsg.appendData(src); + _currentMsg.appendData(body); } catch(IOException e) { @@ -43,16 +60,7 @@ public class MessagePartListenerAdapter implements MessagePartListener // doesn't occur as we are using // a ByteBuffer } - } - public void messageHeader(Header header) - { - _currentMsg.setDeliveryProperties(header.get(DeliveryProperties.class)); - _currentMsg.setMessageProperties(header.get(MessageProperties.class)); - } - - public void messageReceived() - { _adaptee.onMessage(_currentMsg); } diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java index dfb356432e..a881f6a822 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java @@ -31,7 +31,7 @@ import org.apache.qpid.AMQException; import javax.jms.*; import java.util.Map; -public class TestAMQSession extends AMQSession +public class TestAMQSession extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8> { public TestAMQSession() @@ -94,7 +94,7 @@ public class TestAMQSession extends AMQSession } - public BasicMessageConsumer createMessageConsumer(AMQDestination destination, int prefetchHigh, int prefetchLow, boolean noLocal, boolean exclusive, String selector, FieldTable arguments, boolean noConsume, boolean autoClose) throws JMSException + public BasicMessageConsumer_0_8 createMessageConsumer(AMQDestination destination, int prefetchHigh, int prefetchLow, boolean noLocal, boolean exclusive, String selector, FieldTable arguments, boolean noConsume, boolean autoClose) throws JMSException { return null; } @@ -109,12 +109,12 @@ public class TestAMQSession extends AMQSession return false; } - public void sendConsume(BasicMessageConsumer consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector, AMQShortString tag) throws AMQException, FailoverException + public void sendConsume(BasicMessageConsumer_0_8 consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector, int tag) throws AMQException, FailoverException { } - public BasicMessageProducer createMessageProducer(Destination destination, boolean mandatory, boolean immediate, boolean waitUntilSent, long producerId) + public BasicMessageProducer_0_8 createMessageProducer(Destination destination, boolean mandatory, boolean immediate, boolean waitUntilSent, long producerId) { return null; } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java index 22f66ae556..a8e7f47db0 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java @@ -111,6 +111,8 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt private final byte[] _data; private final int _offset; private int _hashCode; + private String _asString = null; + private final int _length; private static final char[] EMPTY_CHAR_ARRAY = new char[0]; @@ -137,7 +139,7 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt public AMQShortString(String data) { this((data == null) ? EMPTY_CHAR_ARRAY : data.toCharArray()); - + _asString = data; } public AMQShortString(char[] data) @@ -418,15 +420,14 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt return chars; } - private String str = null; public String asString() { - if (str == null) + if (_asString == null) { - str = new String(asChars()); + _asString = new String(asChars()); } - return str; + return _asString; } public boolean equals(Object o) diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Header.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Header.java index 9b6ab4951b..9439e5e0de 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Header.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Header.java @@ -24,6 +24,8 @@ import org.apache.qpid.transport.network.Frame; import java.util.Arrays; import java.util.List; +import java.util.Map; +import java.util.LinkedHashMap; import java.nio.ByteBuffer; @@ -35,30 +37,31 @@ import java.nio.ByteBuffer; public class Header { - private final List<Struct> structs; + private final Struct[] structs; public Header(List<Struct> structs) { - this.structs = structs; + this(structs.toArray(new Struct[structs.size()])); } public Header(Struct ... structs) { - this(Arrays.asList(structs)); + this.structs = structs; } - public List<Struct> getStructs() + public Struct[] getStructs() { return structs; } + public <T> T get(Class<T> klass) { for (Struct st : structs) { if (klass.isInstance(st)) { - return klass.cast(st); + return (T) st; } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java index 1400bd2e5b..2a4232c425 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -128,8 +128,14 @@ public class Session extends Invoker { int id = nextCommandId(); cmd.setId(id); - log.debug("ID: [%s] %s", this.channel, id); - if ((id % 65536) == 0) + + if(log.isDebugEnabled()) + { + log.debug("ID: [%s] %s", this.channel, id); + } + + //if ((id % 65536) == 0) + if ((id & 0xff) == 0) { flushProcessed(TIMELY_REPLY); } @@ -232,7 +238,11 @@ public class Session extends Invoker boolean complete(int lower, int upper) { - log.debug("%s complete(%d, %d)", this, lower, upper); + //avoid autoboxing + if(log.isDebugEnabled()) + { + log.debug("%s complete(%d, %d)", this, lower, upper); + } synchronized (commands) { int old = maxComplete; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java index b808156dc6..33d552b91e 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java @@ -200,7 +200,7 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate break; case HEADER: command = incomplete[channel]; - List<Struct> structs = new ArrayList(); + List<Struct> structs = new ArrayList(2); while (dec.hasRemaining()) { structs.add(dec.readStruct32()); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java index 007167115b..bb7d2506e3 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java @@ -34,7 +34,6 @@ import org.apache.qpid.transport.Struct; import java.nio.ByteBuffer; import java.nio.ByteOrder; -import java.util.List; import static org.apache.qpid.transport.network.Frame.*; @@ -209,11 +208,11 @@ public final class Disassembler implements Sender<ProtocolEvent>, if (payload) { final Header hdr = method.getHeader(); - final List<Struct> structs = hdr.getStructs(); - final int nstructs = structs.size(); - for (int i = 0; i < nstructs; i++) + final Struct[] structs = hdr.getStructs(); + + for (Struct st : structs) { - enc.writeStruct32(structs.get(i)); + enc.writeStruct32(st); } headerSeg = enc.segment(); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/util/Logger.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/util/Logger.java index 77f592b6c6..8c4818df92 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/util/Logger.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/util/Logger.java @@ -42,6 +42,11 @@ public final class Logger this.log = log; } + public boolean isDebugEnabled() + { + return log.isDebugEnabled(); + } + public void debug(String message, Object ... args) { if (log.isDebugEnabled()) |