diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2007-09-06 14:29:03 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2007-09-06 14:29:03 +0000 |
commit | 8a188f811fbfcda1a650b4241b2bcfd85edd5700 (patch) | |
tree | c437ec1c41c8d682e1b15814128c60cb9a511a1f | |
parent | e73ff21672422217cf4ae8247f355e05cf9fa274 (diff) | |
download | qpid-python-8a188f811fbfcda1a650b4241b2bcfd85edd5700.tar.gz |
Unprocessed message was made abstract and a 0-8 and 0-10 implementation is provided.
The return message extends unprocessed_msg_0_8 as return message is only a 0-8 feature.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@573282 13f79535-47bb-0310-9956-ffa450edef68
10 files changed, 336 insertions, 173 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index bc4036ac18..56c553bfc5 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -71,79 +71,22 @@ import org.apache.qpid.client.message.JMSObjectMessage; import org.apache.qpid.client.message.JMSStreamMessage; import org.apache.qpid.client.message.JMSTextMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; +import org.apache.qpid.client.message.ReturnMessage; import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.util.FlowControllingBlockingQueue; -import org.apache.qpid.common.AMQPFilterTypes; -import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicAckBody; -import org.apache.qpid.framing.BasicConsumeBody; -import org.apache.qpid.framing.BasicConsumeOkBody; -import org.apache.qpid.framing.BasicRecoverBody; -import org.apache.qpid.framing.BasicRecoverOkBody; -import org.apache.qpid.framing.BasicRejectBody; -import org.apache.qpid.framing.ChannelCloseBody; -import org.apache.qpid.framing.ChannelCloseOkBody; -import org.apache.qpid.framing.ChannelFlowBody; -import org.apache.qpid.framing.ChannelFlowOkBody; -import org.apache.qpid.framing.ExchangeBoundBody; -import org.apache.qpid.framing.ExchangeBoundOkBody; -import org.apache.qpid.framing.ExchangeDeclareBody; -import org.apache.qpid.framing.ExchangeDeclareOkBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.FieldTableFactory; -import org.apache.qpid.framing.QueueBindBody; -import org.apache.qpid.framing.QueueBindOkBody; -import org.apache.qpid.framing.QueueDeclareBody; -import org.apache.qpid.framing.QueueDeclareOkBody; -import org.apache.qpid.framing.QueueDeleteBody; -import org.apache.qpid.framing.QueueDeleteOkBody; -import org.apache.qpid.framing.TxCommitBody; -import org.apache.qpid.framing.TxCommitOkBody; import org.apache.qpid.framing.TxRollbackBody; import org.apache.qpid.framing.TxRollbackOkBody; import org.apache.qpid.jms.Session; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.URLSyntaxException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.jms.BytesMessage; -import javax.jms.Destination; -import javax.jms.IllegalStateException; -import javax.jms.InvalidDestinationException; -import javax.jms.InvalidSelectorException; -import javax.jms.JMSException; -import javax.jms.MapMessage; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.ObjectMessage; -import javax.jms.Queue; -import javax.jms.QueueBrowser; -import javax.jms.QueueReceiver; -import javax.jms.QueueSender; -import javax.jms.QueueSession; -import javax.jms.StreamMessage; -import javax.jms.TemporaryQueue; -import javax.jms.TemporaryTopic; -import javax.jms.TextMessage; -import javax.jms.Topic; -import javax.jms.TopicPublisher; -import javax.jms.TopicSession; -import javax.jms.TopicSubscriber; -import java.io.Serializable; -import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Iterator; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; /** @@ -1269,19 +1212,17 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { if (_logger.isDebugEnabled()) { - _logger.debug("Message[" - + ((message.getDeliverBody() == null) ? ("B:" + message.getBounceBody()) : ("D:" + message.getDeliverBody())) - + "] received in session with channel id " + _channelId); + _logger.debug("Message[" + message.toString() + "] received in session"); } - if (message.getDeliverBody() == null) + if (message instanceof ReturnMessage) { // Return of the bounced message. - returnBouncedMessage(message); + returnBouncedMessage((ReturnMessage)message); } else { - _highestDeliveryTag.set(message.getDeliverBody().deliveryTag); + _highestDeliveryTag.set(message.getDeliveryTag()); _queue.add(message); } } @@ -1374,10 +1315,10 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess if (_logger.isTraceEnabled()) { - _logger.trace("Rejecting Unacked message:" + message.getDeliverBody().deliveryTag); + _logger.trace("Rejecting Unacked message:" + message.getDeliveryTag()); } - rejectMessage(message.getDeliverBody().deliveryTag, requeue); + rejectMessage(message.getDeliveryTag(), requeue); } public void rejectMessage(AbstractJMSMessage message, boolean requeue) @@ -2320,12 +2261,12 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { UnprocessedMessage message = (UnprocessedMessage) messages.next(); - if ((consumerTag == null) || message.getDeliverBody().consumerTag.equals(consumerTag)) + if ((consumerTag == null) || message.getConsumerTag().equals(consumerTag)) { if (_logger.isDebugEnabled()) { _logger.debug("Removing message(" + System.identityHashCode(message) + ") from _queue DT:" - + message.getDeliverBody().deliveryTag); + + message.getDeliveryTag()); } messages.remove(); @@ -2334,7 +2275,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess if (_logger.isDebugEnabled()) { - _logger.debug("Rejected the message(" + message.getDeliverBody() + ") for consumer :" + consumerTag); + _logger.debug("Rejected the message(" + message.toString() + ") for consumer :" + consumerTag); } } } @@ -2363,7 +2304,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } } - private void returnBouncedMessage(final UnprocessedMessage message) + private void returnBouncedMessage(final ReturnMessage msg) { _connection.performConnectionTask(new Runnable() { @@ -2373,11 +2314,11 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { // Bounced message is processed here, away from the mina thread AbstractJMSMessage bouncedMessage = - _messageFactoryRegistry.createMessage(0, false, message.getBounceBody().exchange, - message.getBounceBody().routingKey, message.getContentHeader(), message.getBodies()); + _messageFactoryRegistry.createMessage(0, false, msg.getExchange(), + msg.getExchange(), msg.getContentHeader(), msg.getBodies()); - AMQConstant errorCode = AMQConstant.getConstant(message.getBounceBody().replyCode); - AMQShortString reason = message.getBounceBody().replyText; + AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode()); + AMQShortString reason = msg.getReplyText(); _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")"); // @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions. @@ -2565,7 +2506,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess _lock.wait(); } - if (message.getDeliverBody().deliveryTag <= _rollbackMark.get()) + if (message.getDeliveryTag() <= _rollbackMark.get()) { rejectMessage(message, true); } @@ -2619,10 +2560,11 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess private void dispatchMessage(UnprocessedMessage message) { - if (message.getDeliverBody() != null) - { + //This if block is not needed anymore as bounce messages are handled separately + //if (message.getDeliverBody() != null) + //{ final BasicMessageConsumer consumer = - (BasicMessageConsumer) _consumers.get(message.getDeliverBody().consumerTag); + (BasicMessageConsumer) _consumers.get(new AMQShortString(message.getConsumerTag())); if ((consumer == null) || consumer.isClosed()) { @@ -2631,13 +2573,13 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess if (consumer == null) { _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "[" - + message.getDeliverBody().deliveryTag + "] from queue " - + message.getDeliverBody().consumerTag + " )without a handler - rejecting(requeue)..."); + + message.getDeliveryTag() + "] from queue " + + message.getConsumerTag() + " )without a handler - rejecting(requeue)..."); } else { _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "[" - + message.getDeliverBody().deliveryTag + "] from queue " + " consumer(" + + message.getDeliveryTag() + "] from queue " + " consumer(" + consumer.debugIdentity() + ") is closed rejecting(requeue)..."); } } @@ -2652,7 +2594,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess consumer.notifyMessage(message, _channelId); } } - } + //} } /*public void requestAccess(AMQShortString realm, boolean exclusive, boolean passive, boolean active, boolean write, diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 96e0b30d23..5da0de128b 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -44,7 +44,7 @@ import org.apache.qpid.jms.Session; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public abstract class BasicMessageConsumer extends Closeable implements MessageConsumer +public abstract class BasicMessageConsumer<H,B> extends Closeable implements MessageConsumer { private static final Logger _logger = LoggerFactory.getLogger(BasicMessageConsumer.class); @@ -74,7 +74,7 @@ public abstract class BasicMessageConsumer extends Closeable implements MessageC */ private final ArrayBlockingQueue _synchronousQueue; - private MessageFactoryRegistry _messageFactory; + protected MessageFactoryRegistry _messageFactory; private final AMQSession _session; @@ -543,16 +543,12 @@ public abstract class BasicMessageConsumer extends Closeable implements MessageC if (debug) { - _logger.debug("notifyMessage called with message number " + messageFrame.getDeliverBody().deliveryTag); + _logger.debug("notifyMessage called with message number " + messageFrame.getDeliveryTag()); } try { - AbstractJMSMessage jmsMessage = - _messageFactory.createMessage(messageFrame.getDeliverBody().deliveryTag, - messageFrame.getDeliverBody().redelivered, messageFrame.getDeliverBody().exchange, - messageFrame.getDeliverBody().routingKey, messageFrame.getContentHeader(), messageFrame.getBodies()); - + AbstractJMSMessage jmsMessage = createJMSMessageFromUnprocessedMessage(messageFrame); if (debug) { _logger.debug("Message is of type: " + jmsMessage.getClass().getName()); @@ -590,6 +586,8 @@ public abstract class BasicMessageConsumer extends Closeable implements MessageC } } + public abstract AbstractJMSMessage createJMSMessageFromUnprocessedMessage(UnprocessedMessage<H,B> messageFrame)throws Exception; + /** * @param jmsMessage this message has already been processed so can't redo preDeliver * @param channelId diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java index b4f9c5de37..20d581147b 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java @@ -22,16 +22,20 @@ package org.apache.qpid.client; import org.apache.qpid.AMQException; import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; +import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.BasicCancelBody; import org.apache.qpid.framing.BasicCancelOkBody; +import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.FieldTable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class BasicMessageConsumer_0_8 extends BasicMessageConsumer +public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<ContentHeaderBody,ContentBody> { protected final Logger _logger = LoggerFactory.getLogger(getClass()); @@ -71,4 +75,13 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer throw new JMSAMQException("FailoverException interrupted basic cancel.", e); } } + + public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(UnprocessedMessage<ContentHeaderBody,ContentBody> messageFrame)throws Exception + { + + return _messageFactory.createMessage(messageFrame.getDeliveryTag(), + messageFrame.isRedelivered(), messageFrame.getExchange(), + messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies()); + + } }
\ No newline at end of file diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java index 51120da55c..92ba6fd136 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java @@ -21,13 +21,12 @@ package org.apache.qpid.client.handler; import org.apache.qpid.AMQException; -import org.apache.qpid.client.message.UnprocessedMessage; +import org.apache.qpid.client.message.UnprocessedMessage_0_8; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.framing.BasicDeliverBody; import org.apache.qpid.protocol.AMQMethodEvent; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,7 +44,14 @@ public class BasicDeliverMethodHandler implements StateAwareMethodListener public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException { - final UnprocessedMessage msg = new UnprocessedMessage(evt.getChannelId(), (BasicDeliverBody) evt.getMethod()); + BasicDeliverBody deliveryBody = (BasicDeliverBody) evt.getMethod(); + final UnprocessedMessage_0_8 msg = new UnprocessedMessage_0_8( + evt.getChannelId(), + deliveryBody.deliveryTag, + deliveryBody.consumerTag.asString(), + deliveryBody.getExchange(), + deliveryBody.getRoutingKey(), + deliveryBody.getRedelivered()); _logger.debug("New JmsDeliver method received"); protocolSession.unprocessedMessageReceived(msg); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java index 0f00c6a26e..bb5a56e5bd 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java @@ -21,13 +21,12 @@ package org.apache.qpid.client.handler; import org.apache.qpid.AMQException; -import org.apache.qpid.client.message.UnprocessedMessage; +import org.apache.qpid.client.message.ReturnMessage; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.framing.BasicReturnBody; import org.apache.qpid.protocol.AMQMethodEvent; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,8 +44,14 @@ public class BasicReturnMethodHandler implements StateAwareMethodListener public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException { + BasicReturnBody returnBody = (BasicReturnBody)evt.getMethod(); _logger.debug("New JmsBounce method received"); - final UnprocessedMessage msg = new UnprocessedMessage(evt.getChannelId(), (BasicReturnBody) evt.getMethod()); + final ReturnMessage msg = new ReturnMessage(evt.getChannelId(), + returnBody.getExchange(), + returnBody.getRoutingKey(), + returnBody.getReplyText(), + returnBody.getReplyCode() + ); protocolSession.unprocessedMessageReceived(msg); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java new file mode 100644 index 0000000000..593c7795b0 --- /dev/null +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java @@ -0,0 +1,26 @@ +package org.apache.qpid.client.message; + +import org.apache.qpid.framing.AMQShortString; + +public class ReturnMessage extends UnprocessedMessage_0_8 +{ + final private AMQShortString _replyText; + final private int _replyCode; + + public ReturnMessage(int channelId,AMQShortString exchange,AMQShortString routingKey,AMQShortString replyText,int replyCode) + { + super(channelId,-1,"",exchange,routingKey,false); + _replyText = replyText; + _replyCode = replyCode; + } + + public int getReplyCode() + { + return _replyCode; + } + + public AMQShortString getReplyText() + { + return _replyText; + } +} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java index 5b199f2478..9aab5d094b 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,14 +20,10 @@ */ package org.apache.qpid.client.message; -import java.util.ArrayList; -import java.util.Collections; import java.util.List; -import org.apache.qpid.framing.BasicDeliverBody; -import org.apache.qpid.framing.BasicReturnBody; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.AMQShortString; + /** * This class contains everything needed to process a JMS message. It assembles the deliver body, the content header and @@ -36,96 +32,60 @@ import org.apache.qpid.framing.ContentHeaderBody; * Note that the actual work of creating a JMS message for the client code's use is done outside of the MINA dispatcher * thread in order to minimise the amount of work done in the MINA dispatcher thread. */ -public class UnprocessedMessage +public abstract class UnprocessedMessage<H,B> { - private long _bytesReceived = 0; - - private final BasicDeliverBody _deliverBody; - private final BasicReturnBody _bounceBody; // TODO: check change (gustavo) private final int _channelId; - private ContentHeaderBody _contentHeader; - - /** List of ContentBody instances. Due to fragmentation you don't know how big this will be in general */ - private List<ContentBody> _bodies; + private final long _deliveryId; + private final String _consumerTag; + protected AMQShortString _exchange; + protected AMQShortString _routingKey; + protected boolean _redelivered; - public UnprocessedMessage(int channelId, BasicDeliverBody deliverBody) + public UnprocessedMessage(int channelId,long deliveryId,String consumerTag,AMQShortString exchange,AMQShortString routingKey,boolean redelivered) { - _deliverBody = deliverBody; _channelId = channelId; - _bounceBody = null; + _deliveryId = deliveryId; + _consumerTag = consumerTag; + _exchange = exchange; + _routingKey = routingKey; + _redelivered = redelivered; } + public abstract void receiveBody(B nativeMessageBody); - public UnprocessedMessage(int channelId, BasicReturnBody bounceBody) - { - _deliverBody = null; - _channelId = channelId; - _bounceBody = bounceBody; - } - - public void receiveBody(ContentBody body) //throws UnexpectedBodyReceivedException - { - - if (body.payload != null) - { - final long payloadSize = body.payload.remaining(); - - if (_bodies == null) - { - if (payloadSize == getContentHeader().bodySize) - { - _bodies = Collections.singletonList(body); - } - else - { - _bodies = new ArrayList<ContentBody>(); - _bodies.add(body); - } - - } - else - { - _bodies.add(body); - } - _bytesReceived += payloadSize; - } - } + public abstract void setContentHeader(H nativeMessageHeader); - public boolean isAllBodyDataReceived() + public int getChannelId() { - return _bytesReceived == getContentHeader().bodySize; + return _channelId; } - public BasicDeliverBody getDeliverBody() + public long getDeliveryTag() { - return _deliverBody; + return _deliveryId; } - public BasicReturnBody getBounceBody() + public String getConsumerTag() { - return _bounceBody; + return _consumerTag; } - - public int getChannelId() + public AMQShortString getExchange() { - return _channelId; + return _exchange; } - - public ContentHeaderBody getContentHeader() + public AMQShortString getRoutingKey() { - return _contentHeader; + return _routingKey; } - public void setContentHeader(ContentHeaderBody contentHeader) + public boolean isRedelivered() { - this._contentHeader = contentHeader; + return _redelivered; } - public List<ContentBody> getBodies() - { - return _bodies; - } + public abstract List<B> getBodies(); + public abstract H getContentHeader(); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java new file mode 100644 index 0000000000..8d78f9f7fd --- /dev/null +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java @@ -0,0 +1,81 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.message; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpidity.DeliveryProperties; +import org.apache.qpidity.Struct; + +/** + * This class contains everything needed to process a JMS message. It assembles the deliver body, the content header and + * the content body/ies. + * + * Note that the actual work of creating a JMS message for the client code's use is done outside of the MINA dispatcher + * thread in order to minimise the amount of work done in the MINA dispatcher thread. + */ +public class UnprocessedMessage_0_10 extends UnprocessedMessage<Struct[],ByteBuffer> +{ + private Struct[] _headers; + + /** List of ContentBody instances. Due to fragmentation you don't know how big this will be in general */ + private List<ByteBuffer> _bodies = new ArrayList<ByteBuffer>(); + + public UnprocessedMessage_0_10(int channelId,long deliveryId,String consumerTag,AMQShortString exchange,AMQShortString routingKey,boolean redelivered) + { + super(channelId,deliveryId,consumerTag,exchange,routingKey,redelivered); + } + + public void receiveBody(ByteBuffer body) + { + + _bodies.add(body); + } + + public void setContentHeader(Struct[] headers) + { + this._headers = headers; + for(Struct s: headers) + { + if (s instanceof DeliveryProperties) + { + DeliveryProperties props = (DeliveryProperties)s; + _exchange = new AMQShortString(props.getExchange()); + _routingKey = new AMQShortString(props.getRoutingKey()); + _redelivered = props.getRedelivered(); + } + } + } + + public Struct[] getContentHeader() + { + return _headers; + } + + public List<ByteBuffer> getBodies() + { + return _bodies; + } + +} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java new file mode 100644 index 0000000000..8e32de382b --- /dev/null +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java @@ -0,0 +1,130 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.message; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicDeliverBody; +import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.ContentHeaderBody; + +/** + * This class contains everything needed to process a JMS message. It assembles the deliver body, the content header and + * the content body/ies. + * + * Note that the actual work of creating a JMS message for the client code's use is done outside of the MINA dispatcher + * thread in order to minimise the amount of work done in the MINA dispatcher thread. + */ +public class UnprocessedMessage_0_8 extends UnprocessedMessage<ContentHeaderBody,ContentBody> +{ + private long _bytesReceived = 0; + + private BasicDeliverBody _deliverBody; + private ContentHeaderBody _contentHeader; + + /** List of ContentBody instances. Due to fragmentation you don't know how big this will be in general */ + private List<ContentBody> _bodies; + + public UnprocessedMessage_0_8(int channelId,long deliveryId,String consumerTag,AMQShortString exchange,AMQShortString routingKey,boolean redelivered) + { + super(channelId,deliveryId,consumerTag,exchange,routingKey,redelivered); + } + + public void receiveBody(ContentBody body) + { + + if (body.payload != null) + { + final long payloadSize = body.payload.remaining(); + + if (_bodies == null) + { + if (payloadSize == getContentHeader().bodySize) + { + _bodies = Collections.singletonList(body); + } + else + { + _bodies = new ArrayList<ContentBody>(); + _bodies.add(body); + } + + } + else + { + _bodies.add(body); + } + _bytesReceived += payloadSize; + } + } + + public void setMethodBody(BasicDeliverBody deliverBody) + { + _deliverBody = deliverBody; + } + + public void setContentHeader(ContentHeaderBody contentHeader) + { + this._contentHeader = contentHeader; + } + + public boolean isAllBodyDataReceived() + { + return _bytesReceived == getContentHeader().bodySize; + } + + public BasicDeliverBody getDeliverBody() + { + return _deliverBody; + } + + public ContentHeaderBody getContentHeader() + { + return _contentHeader; + } + + public List<ContentBody> getBodies() + { + return _bodies; + } + + public String toString() + { + StringBuilder buf = new StringBuilder(); + buf.append("Channel Id : " + this.getChannelId()); + if (_contentHeader != null) + { + buf.append("ContentHeader " + _contentHeader); + } + if(_deliverBody != null) + { + buf.append("Delivery tag " + _deliverBody.deliveryTag); + buf.append("Consumer tag " + _deliverBody.consumerTag); + buf.append("Deliver Body " + _deliverBody); + } + + return buf.toString(); + } + +} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index dcab89e9bb..f5008496d9 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -32,7 +32,9 @@ import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.ConnectionTuneParameters; // import org.apache.qpid.client.message.UnexpectedBodyReceivedException; +import org.apache.qpid.client.message.ReturnMessage; import org.apache.qpid.client.message.UnprocessedMessage; +import org.apache.qpid.client.message.UnprocessedMessage_0_8; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQShortString; @@ -93,7 +95,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession * Maps from a channel id to an unprocessed message. This is used to tie together the JmsDeliverBody (which arrives * first) with the subsequent content header and content bodies. */ - protected ConcurrentMap _channelId2UnprocessedMsgMap = new ConcurrentHashMap(); + protected ConcurrentMap<Integer,UnprocessedMessage_0_8> _channelId2UnprocessedMsgMap = new ConcurrentHashMap<Integer,UnprocessedMessage_0_8>(); /** Counter to ensure unique queue names */ protected int _queueId = 1; @@ -228,14 +230,14 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession * * @throws AMQException if this was not expected */ - public void unprocessedMessageReceived(UnprocessedMessage message) throws AMQException + public void unprocessedMessageReceived(UnprocessedMessage_0_8 message) throws AMQException { _channelId2UnprocessedMsgMap.put(message.getChannelId(), message); } public void messageContentHeaderReceived(int channelId, ContentHeaderBody contentHeader) throws AMQException { - UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap.get(channelId); + UnprocessedMessage_0_8 msg = (UnprocessedMessage_0_8) _channelId2UnprocessedMsgMap.get(channelId); if (msg == null) { throw new AMQException(null, "Error: received content header without having received a BasicDeliver frame first", null); @@ -255,7 +257,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession public void messageContentBodyReceived(int channelId, ContentBody contentBody) throws AMQException { - UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap.get(channelId); + UnprocessedMessage_0_8 msg = _channelId2UnprocessedMsgMap.get(channelId); if (msg == null) { throw new AMQException(null, "Error: received content body without having received a JMSDeliver frame first", null); |