summaryrefslogtreecommitdiff
path: root/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java235
1 files changed, 200 insertions, 35 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
index 5b199f2478..18157adc34 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,10 +24,20 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQChannelException;
+import org.apache.qpid.AMQConnectionException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.BasicMessageConsumer;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicDeliverBody;
import org.apache.qpid.framing.BasicReturnBody;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.MethodDispatcher;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
/**
* This class contains everything needed to process a JMS message. It assembles the deliver body, the content header and
@@ -36,33 +46,15 @@ import org.apache.qpid.framing.ContentHeaderBody;
* Note that the actual work of creating a JMS message for the client code's use is done outside of the MINA dispatcher
* thread in order to minimise the amount of work done in the MINA dispatcher thread.
*/
-public class UnprocessedMessage
+public abstract class UnprocessedMessage
{
- private long _bytesReceived = 0;
+ private long _bytesReceived = 0L;
- private final BasicDeliverBody _deliverBody;
- private final BasicReturnBody _bounceBody; // TODO: check change (gustavo)
- private final int _channelId;
private ContentHeaderBody _contentHeader;
/** List of ContentBody instances. Due to fragmentation you don't know how big this will be in general */
private List<ContentBody> _bodies;
- public UnprocessedMessage(int channelId, BasicDeliverBody deliverBody)
- {
- _deliverBody = deliverBody;
- _channelId = channelId;
- _bounceBody = null;
- }
-
-
- public UnprocessedMessage(int channelId, BasicReturnBody bounceBody)
- {
- _deliverBody = null;
- _channelId = channelId;
- _bounceBody = bounceBody;
- }
-
public void receiveBody(ContentBody body) //throws UnexpectedBodyReceivedException
{
@@ -96,22 +88,11 @@ public class UnprocessedMessage
return _bytesReceived == getContentHeader().bodySize;
}
- public BasicDeliverBody getDeliverBody()
- {
- return _deliverBody;
- }
-
- public BasicReturnBody getBounceBody()
- {
- return _bounceBody;
- }
- public int getChannelId()
- {
- return _channelId;
- }
+ abstract public BasicDeliverBody getDeliverBody();
+ abstract public BasicReturnBody getBounceBody();
public ContentHeaderBody getContentHeader()
{
@@ -128,4 +109,188 @@ public class UnprocessedMessage
return _bodies;
}
+ abstract public boolean isDeliverMessage();
+
+ public static final class UnprocessedDeliverMessage extends UnprocessedMessage
+ {
+ private final BasicDeliverBody _body;
+
+ public UnprocessedDeliverMessage(final BasicDeliverBody body)
+ {
+ _body = body;
+ }
+
+
+ public BasicDeliverBody getDeliverBody()
+ {
+ return _body;
+ }
+
+ public BasicReturnBody getBounceBody()
+ {
+ return null;
+ }
+
+ public boolean isDeliverMessage()
+ {
+ return true;
+ }
+ }
+
+ public static final class UnprocessedBouncedMessage extends UnprocessedMessage
+ {
+ private final BasicReturnBody _body;
+
+ public UnprocessedBouncedMessage(final BasicReturnBody body)
+ {
+ _body = body;
+ }
+
+
+ public BasicDeliverBody getDeliverBody()
+ {
+ return null;
+ }
+
+ public BasicReturnBody getBounceBody()
+ {
+ return _body;
+ }
+
+ public boolean isDeliverMessage()
+ {
+ return false;
+ }
+ }
+
+ public static final class CloseConsumerMessage extends UnprocessedMessage
+ {
+ BasicMessageConsumer _consumer;
+
+ public CloseConsumerMessage(BasicMessageConsumer consumer)
+ {
+ _consumer = consumer;
+ }
+
+ public BasicDeliverBody getDeliverBody()
+ {
+ return new BasicDeliverBody()
+ {
+ // This is the only thing we need to preserve so the correct consumer can be found later.
+ public AMQShortString getConsumerTag()
+ {
+ return _consumer.getConsumerTag();
+ }
+
+ // The Rest of these methods are not used
+ public long getDeliveryTag()
+ {
+ return 0;
+ }
+
+ public AMQShortString getExchange()
+ {
+ return null;
+ }
+
+ public boolean getRedelivered()
+ {
+ return false;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return null;
+ }
+
+ public byte getMajor()
+ {
+ return 0;
+ }
+
+ public byte getMinor()
+ {
+ return 0;
+ }
+
+ public int getClazz()
+ {
+ return 0;
+ }
+
+ public int getMethod()
+ {
+ return 0;
+ }
+
+ public void writeMethodPayload(ByteBuffer buffer)
+ {
+ }
+
+ public byte getFrameType()
+ {
+ return 0;
+ }
+
+ public int getSize()
+ {
+ return 0;
+ }
+
+ public void writePayload(ByteBuffer buffer)
+ {
+ }
+
+ public void handle(int channelId, AMQVersionAwareProtocolSession amqMinaProtocolSession) throws AMQException
+ {
+ }
+
+ public AMQFrame generateFrame(int channelId)
+ {
+ return null;
+ }
+
+ public AMQChannelException getChannelNotFoundException(int channelId)
+ {
+ return null;
+ }
+
+ public AMQChannelException getChannelException(AMQConstant code, String message)
+ {
+ return null;
+ }
+
+ public AMQChannelException getChannelException(AMQConstant code, String message, Throwable cause)
+ {
+ return null;
+ }
+
+ public AMQConnectionException getConnectionException(AMQConstant code, String message)
+ {
+ return null;
+ }
+
+ public AMQConnectionException getConnectionException(AMQConstant code, String message, Throwable cause)
+ {
+ return null;
+ }
+
+ public boolean execute(MethodDispatcher methodDispatcher, int channelId) throws AMQException
+ {
+ return false;
+ }
+ };
+ }
+
+ public BasicReturnBody getBounceBody()
+ {
+ return null;
+ }
+
+ public boolean isDeliverMessage()
+ {
+ return false;
+ }
+ }
+
}