diff options
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java')
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java | 235 |
1 files changed, 200 insertions, 35 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java index 5b199f2478..18157adc34 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -24,10 +24,20 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQChannelException; +import org.apache.qpid.AMQConnectionException; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.BasicMessageConsumer; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicDeliverBody; import org.apache.qpid.framing.BasicReturnBody; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.MethodDispatcher; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; /** * This class contains everything needed to process a JMS message. It assembles the deliver body, the content header and @@ -36,33 +46,15 @@ import org.apache.qpid.framing.ContentHeaderBody; * Note that the actual work of creating a JMS message for the client code's use is done outside of the MINA dispatcher * thread in order to minimise the amount of work done in the MINA dispatcher thread. */ -public class UnprocessedMessage +public abstract class UnprocessedMessage { - private long _bytesReceived = 0; + private long _bytesReceived = 0L; - private final BasicDeliverBody _deliverBody; - private final BasicReturnBody _bounceBody; // TODO: check change (gustavo) - private final int _channelId; private ContentHeaderBody _contentHeader; /** List of ContentBody instances. Due to fragmentation you don't know how big this will be in general */ private List<ContentBody> _bodies; - public UnprocessedMessage(int channelId, BasicDeliverBody deliverBody) - { - _deliverBody = deliverBody; - _channelId = channelId; - _bounceBody = null; - } - - - public UnprocessedMessage(int channelId, BasicReturnBody bounceBody) - { - _deliverBody = null; - _channelId = channelId; - _bounceBody = bounceBody; - } - public void receiveBody(ContentBody body) //throws UnexpectedBodyReceivedException { @@ -96,22 +88,11 @@ public class UnprocessedMessage return _bytesReceived == getContentHeader().bodySize; } - public BasicDeliverBody getDeliverBody() - { - return _deliverBody; - } - - public BasicReturnBody getBounceBody() - { - return _bounceBody; - } - public int getChannelId() - { - return _channelId; - } + abstract public BasicDeliverBody getDeliverBody(); + abstract public BasicReturnBody getBounceBody(); public ContentHeaderBody getContentHeader() { @@ -128,4 +109,188 @@ public class UnprocessedMessage return _bodies; } + abstract public boolean isDeliverMessage(); + + public static final class UnprocessedDeliverMessage extends UnprocessedMessage + { + private final BasicDeliverBody _body; + + public UnprocessedDeliverMessage(final BasicDeliverBody body) + { + _body = body; + } + + + public BasicDeliverBody getDeliverBody() + { + return _body; + } + + public BasicReturnBody getBounceBody() + { + return null; + } + + public boolean isDeliverMessage() + { + return true; + } + } + + public static final class UnprocessedBouncedMessage extends UnprocessedMessage + { + private final BasicReturnBody _body; + + public UnprocessedBouncedMessage(final BasicReturnBody body) + { + _body = body; + } + + + public BasicDeliverBody getDeliverBody() + { + return null; + } + + public BasicReturnBody getBounceBody() + { + return _body; + } + + public boolean isDeliverMessage() + { + return false; + } + } + + public static final class CloseConsumerMessage extends UnprocessedMessage + { + BasicMessageConsumer _consumer; + + public CloseConsumerMessage(BasicMessageConsumer consumer) + { + _consumer = consumer; + } + + public BasicDeliverBody getDeliverBody() + { + return new BasicDeliverBody() + { + // This is the only thing we need to preserve so the correct consumer can be found later. + public AMQShortString getConsumerTag() + { + return _consumer.getConsumerTag(); + } + + // The Rest of these methods are not used + public long getDeliveryTag() + { + return 0; + } + + public AMQShortString getExchange() + { + return null; + } + + public boolean getRedelivered() + { + return false; + } + + public AMQShortString getRoutingKey() + { + return null; + } + + public byte getMajor() + { + return 0; + } + + public byte getMinor() + { + return 0; + } + + public int getClazz() + { + return 0; + } + + public int getMethod() + { + return 0; + } + + public void writeMethodPayload(ByteBuffer buffer) + { + } + + public byte getFrameType() + { + return 0; + } + + public int getSize() + { + return 0; + } + + public void writePayload(ByteBuffer buffer) + { + } + + public void handle(int channelId, AMQVersionAwareProtocolSession amqMinaProtocolSession) throws AMQException + { + } + + public AMQFrame generateFrame(int channelId) + { + return null; + } + + public AMQChannelException getChannelNotFoundException(int channelId) + { + return null; + } + + public AMQChannelException getChannelException(AMQConstant code, String message) + { + return null; + } + + public AMQChannelException getChannelException(AMQConstant code, String message, Throwable cause) + { + return null; + } + + public AMQConnectionException getConnectionException(AMQConstant code, String message) + { + return null; + } + + public AMQConnectionException getConnectionException(AMQConstant code, String message, Throwable cause) + { + return null; + } + + public boolean execute(MethodDispatcher methodDispatcher, int channelId) throws AMQException + { + return false; + } + }; + } + + public BasicReturnBody getBounceBody() + { + return null; + } + + public boolean isDeliverMessage() + { + return false; + } + } + } |