summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2007-01-29 11:04:43 +0000
committerRobert Greig <rgreig@apache.org>2007-01-29 11:04:43 +0000
commita6808589c1ce06773f599bc28fe90938e2dcd60f (patch)
tree7e07a64a520fd85a98daf0036abab3d4a808ddf8 /java
parente88772d1b09594300643bbf1768c825c154e2723 (diff)
downloadqpid-python-a6808589c1ce06773f599bc28fe90938e2dcd60f.tar.gz
QPID-324 : Patch supplied by Rob Godfrey - Only send byte indicating topic / queue / other in properties field table, not whole destination
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@501005 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQQueue.java6
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQTopic.java6
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java45
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java20
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java7
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java7
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java32
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java16
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java6
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java7
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java15
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java6
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java7
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java7
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java6
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java8
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java7
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java8
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java84
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java107
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java25
27 files changed, 339 insertions, 115 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java b/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java
index 39a5ffc0b8..5b4f4e015c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java
@@ -48,6 +48,12 @@ public class AMQQueue extends AMQDestination implements Queue
this(name, false);
}
+ public AMQQueue(AMQShortString exchangeName, AMQShortString routingKey, AMQShortString queueName)
+ {
+ super(exchangeName, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, routingKey, false,
+ false, queueName, false); }
+
+
/**
* Create a reference to a non temporary queue. Note this does not actually imply the queue exists.
* @param name the name of the queue
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
index 9b8a6686d3..f50b0390c5 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
@@ -44,6 +44,12 @@ public class AMQTopic extends AMQDestination implements Topic
this(new AMQShortString(name));
}
+ public AMQTopic(AMQShortString exchange, AMQShortString routingKey, AMQShortString queueName)
+ {
+ super(exchange, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, routingKey, true, true, queueName, false);
+ }
+
+
public AMQTopic(AMQShortString name)
{
this(name, true, null, false);
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java
new file mode 100644
index 0000000000..0f3723c58b
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import org.apache.qpid.framing.AMQShortString;
+
+public class AMQUndefinedDestination extends AMQDestination
+{
+
+ private static final AMQShortString UNKNOWN_EXCHANGE_CLASS = new AMQShortString("unknown");
+
+
+ public AMQUndefinedDestination(AMQShortString exchange, AMQShortString routingKey, AMQShortString queueName)
+ {
+ super(exchange, UNKNOWN_EXCHANGE_CLASS, routingKey, queueName);
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return getDestinationName();
+ }
+
+ public boolean isNameRequired()
+ {
+ return getAMQQueueName() == null;
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index 04df584a8e..4426a7deee 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -441,8 +441,24 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
AbstractJMSMessage message = convertToNativeMessage(origMessage);
- message.getJmsHeaders().setBytes(CustomJMSXProperty.JMSX_QPID_JMSDESTINATIONURL.
- getShortStringName(), destination.toByteEncoding());
+
+ byte type;
+ if(destination instanceof Topic)
+ {
+ type = AMQDestination.TOPIC_TYPE;
+ }
+ else if(destination instanceof Queue)
+ {
+ type = AMQDestination.QUEUE_TYPE;
+ }
+ else
+ {
+ type = AMQDestination.UNKNOWN_TYPE;
+ }
+
+ message.getJmsHeaders().setByte(CustomJMSXProperty.JMSZ_QPID_DESTTYPE.getShortStringName(),
+ type);
+
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
diff --git a/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java b/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java
index 26e26781c0..319077c45e 100644
--- a/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java
+++ b/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java
@@ -26,7 +26,7 @@ import java.util.*;
public enum CustomJMSXProperty
{
- JMSX_QPID_JMSDESTINATIONURL,
+ JMSZ_QPID_DESTTYPE,
JMSXGroupID,
JMSXGroupSeq;
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
index f036f88d50..d19968235d 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
@@ -42,9 +42,7 @@ public class BasicDeliverMethodHandler implements StateAwareMethodListener
public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
{
- final UnprocessedMessage msg = new UnprocessedMessage();
- msg.deliverBody = (BasicDeliverBody) evt.getMethod();
- msg.channelId = evt.getChannelId();
+ final UnprocessedMessage msg = new UnprocessedMessage(evt.getChannelId(), (BasicDeliverBody) evt.getMethod());
_logger.debug("New JmsDeliver method received");
protocolSession.unprocessedMessageReceived(msg);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java
index 72923c5cae..187473ec11 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java
@@ -43,10 +43,7 @@ public class BasicReturnMethodHandler implements StateAwareMethodListener
public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
{
_logger.debug("New JmsBounce method received");
- final UnprocessedMessage msg = new UnprocessedMessage();
- msg.deliverBody = null;
- msg.bounceBody = (BasicReturnBody) evt.getMethod();
- msg.channelId = evt.getChannelId();
+ final UnprocessedMessage msg = new UnprocessedMessage(evt.getChannelId(),(BasicReturnBody)evt.getMethod());
protocolSession.unprocessedMessageReceived(msg);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
index 805bdc6186..f7ce294880 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
@@ -24,6 +24,7 @@ import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.AMQShortString;
import javax.jms.JMSException;
import javax.jms.MessageEOFException;
@@ -69,11 +70,11 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage
_data.setAutoExpand(true);
}
- AbstractBytesMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data)
- throws AMQException
+ AbstractBytesMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange,
+ AMQShortString routingKey, ByteBuffer data) throws AMQException
{
// TODO: this casting is ugly. Need to review whole ContentHeaderBody idea
- super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, data);
+ super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, exchange, routingKey, data);
getContentHeaderProperties().setContentType(getMimeTypeAsShortString());
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java
index b941b9aee8..d0bae35dfb 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java
@@ -2,6 +2,7 @@ package org.apache.qpid.client.message;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.AMQException;
import javax.jms.*;
@@ -59,10 +60,10 @@ public abstract class AbstractBytesTypedMessage extends AbstractBytesMessage
}
- AbstractBytesTypedMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data)
- throws AMQException
+ AbstractBytesTypedMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange,
+ AMQShortString routingKey, ByteBuffer data) throws AMQException
{
- super(messageNbr, contentHeader, data);
+ super(messageNbr, contentHeader, exchange, routingKey, data);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
index 9dc4339895..113cce4ef6 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
@@ -26,8 +26,8 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.url.BindingURL;
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.URLSyntaxException;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.BasicMessageConsumer;
+import org.apache.qpid.client.AMQUndefinedDestination;
+import org.apache.qpid.client.*;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.AMQShortString;
@@ -66,9 +66,33 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
_headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties)_contentHeaderProperties).getHeaders());
}
- protected AbstractJMSMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, ByteBuffer data) throws AMQException
+ protected AbstractJMSMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
+ AMQShortString routingKey, ByteBuffer data) throws AMQException
{
this(contentHeader, deliveryTag);
+
+
+ byte type = contentHeader.getHeaders().getByte(CustomJMSXProperty.JMSZ_QPID_DESTTYPE.getShortStringName());
+
+ AMQDestination dest;
+
+ switch(type)
+ {
+ case AMQDestination.QUEUE_TYPE:
+ dest = new AMQQueue(exchange, routingKey, routingKey);
+ break;
+ case AMQDestination.TOPIC_TYPE:
+ dest = new AMQTopic(exchange, routingKey, null);
+ break;
+ default:
+ dest = new AMQUndefinedDestination(exchange, routingKey, null);
+ break;
+ }
+ //Destination dest = AMQDestination.createDestination(url);
+ setJMSDestination(dest);
+
+
+
_data = data;
if (_data != null)
{
@@ -181,7 +205,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
return _destination;
}
- public void setJMSDestination(Destination destination) throws JMSException
+ public void setJMSDestination(Destination destination)
{
_destination = destination;
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
index 4b28a43c64..e6985080c0 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
@@ -23,6 +23,7 @@ package org.apache.qpid.client.message;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
@@ -36,10 +37,12 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory
protected abstract AbstractJMSMessage createMessage(long messageNbr, ByteBuffer data,
- ContentHeaderBody contentHeader) throws AMQException;
+ AMQShortString exchange, AMQShortString routingKey,
+ ContentHeaderBody contentHeader) throws AMQException;
protected AbstractJMSMessage createMessageWithBody(long messageNbr,
ContentHeaderBody contentHeader,
+ AMQShortString exchange, AMQShortString routingKey,
List bodies) throws AMQException
{
ByteBuffer data;
@@ -54,7 +57,7 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory
}
data = ((ContentBody)bodies.get(0)).payload;
}
- else
+ else if (bodies != null)
{
if(debug)
{
@@ -70,19 +73,24 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory
}
data.flip();
}
+ else // bodies == null
+ {
+ data = ByteBuffer.allocate(0);
+ }
if(debug)
{
_logger.debug("Creating message from buffer with position=" + data.position() + " and remaining=" + data.remaining());
}
- return createMessage(messageNbr, data, contentHeader);
+ return createMessage(messageNbr, data, exchange, routingKey, contentHeader);
}
public AbstractJMSMessage createMessage(long messageNbr, boolean redelivered,
ContentHeaderBody contentHeader,
+ AMQShortString exchange, AMQShortString routingKey,
List bodies) throws JMSException, AMQException
{
- final AbstractJMSMessage msg = createMessageWithBody(messageNbr, contentHeader, bodies);
+ final AbstractJMSMessage msg = createMessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies);
msg.setJMSRedelivered(redelivered);
return msg;
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
index ec7ef453eb..f6971ca677 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
@@ -56,10 +56,10 @@ public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessag
super(data); // this instanties a content header
}
- JMSBytesMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data)
- throws AMQException
+ JMSBytesMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange,
+ AMQShortString routingKey, ByteBuffer data) throws AMQException
{
- super(messageNbr, contentHeader, data);
+ super(messageNbr, contentHeader, exchange, routingKey, data);
}
public void reset()
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java
index 78f392a83f..bd7ac4ab24 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java
@@ -23,14 +23,17 @@ package org.apache.qpid.client.message;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.AMQShortString;
import javax.jms.JMSException;
public class JMSBytesMessageFactory extends AbstractJMSMessageFactory
{
- protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, ContentHeaderBody contentHeader) throws AMQException
+ protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data,
+ AMQShortString exchange, AMQShortString routingKey,
+ ContentHeaderBody contentHeader) throws AMQException
{
- return new JMSBytesMessage(deliveryTag, contentHeader, data);
+ return new JMSBytesMessage(deliveryTag, contentHeader, exchange, routingKey, data);
}
public AbstractJMSMessage createMessage() throws JMSException
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java
index 35f8c54d90..3086e5b90a 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java
@@ -32,7 +32,7 @@ import java.util.Enumeration;
public final class JMSHeaderAdapter
{
- FieldTable _headers;
+ private final FieldTable _headers;
public JMSHeaderAdapter(FieldTable headers)
{
@@ -319,6 +319,13 @@ public final class JMSHeaderAdapter
getHeaders().setByte(string, b);
}
+ public void setByte(AMQShortString string, byte b) throws JMSException
+ {
+ checkPropertyName(string);
+ getHeaders().setByte(string, b);
+ }
+
+
public void setShort(String string, short i) throws JMSException
{
checkPropertyName(string);
@@ -331,6 +338,12 @@ public final class JMSHeaderAdapter
getHeaders().setInteger(string, i);
}
+ public void setInteger(AMQShortString string, int i) throws JMSException
+ {
+ checkPropertyName(string);
+ getHeaders().setInteger(string, i);
+ }
+
public void setLong(String string, long l) throws JMSException
{
checkPropertyName(string);
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
index fcbb6500d4..d562cdc102 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
@@ -54,10 +54,10 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm
}
- JMSMapMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data)
- throws AMQException
+ JMSMapMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange,
+ AMQShortString routingKey, ByteBuffer data) throws AMQException
{
- super(messageNbr, contentHeader, data);
+ super(messageNbr, contentHeader, exchange, routingKey, data);
try
{
populateMapFromData();
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java
index b110f04460..9cd7dd9149 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java
@@ -22,6 +22,7 @@ package org.apache.qpid.client.message;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.AMQException;
import javax.jms.JMSException;
@@ -33,8 +34,10 @@ public class JMSMapMessageFactory extends AbstractJMSMessageFactory
return new JMSMapMessage();
}
- protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, ContentHeaderBody contentHeader) throws AMQException
+ protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data,
+ AMQShortString exchange, AMQShortString routingKey,
+ ContentHeaderBody contentHeader) throws AMQException
{
- return new JMSMapMessage(deliveryTag, contentHeader, data);
+ return new JMSMapMessage(deliveryTag, contentHeader, exchange, routingKey, data);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
index 10ed530923..7f0c89a3be 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
@@ -62,9 +62,10 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag
/**
* Creates read only message for delivery to consumers
*/
- JMSObjectMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data) throws AMQException
+ JMSObjectMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange,
+ AMQShortString routingKey, ByteBuffer data) throws AMQException
{
- super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, data);
+ super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, exchange, routingKey, data);
}
public void clearBodyImpl() throws JMSException
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java
index b2228a6805..5f222712be 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java
@@ -23,14 +23,17 @@ package org.apache.qpid.client.message;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.AMQShortString;
import javax.jms.JMSException;
public class JMSObjectMessageFactory extends AbstractJMSMessageFactory
{
- protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, ContentHeaderBody contentHeader) throws AMQException
+ protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data,
+ AMQShortString exchange, AMQShortString routingKey,
+ ContentHeaderBody contentHeader) throws AMQException
{
- return new JMSObjectMessage(deliveryTag, contentHeader, data);
+ return new JMSObjectMessage(deliveryTag, contentHeader, exchange, routingKey, data);
}
public AbstractJMSMessage createMessage() throws JMSException
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
index 747b97b11c..aca9d4796e 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
@@ -61,10 +61,10 @@ public class JMSStreamMessage extends AbstractBytesTypedMessage implements Strea
}
- JMSStreamMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data)
- throws AMQException
+ JMSStreamMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange,
+ AMQShortString routingKey, ByteBuffer data) throws AMQException
{
- super(messageNbr, contentHeader, data);
+ super(messageNbr, contentHeader, exchange, routingKey, data);
}
public void reset()
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java
index aae9f0cdb2..337afdac15 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java
@@ -22,16 +22,18 @@ package org.apache.qpid.client.message;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.AMQException;
import javax.jms.JMSException;
public class JMSStreamMessageFactory extends AbstractJMSMessageFactory
{
- protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, ContentHeaderBody contentHeader) throws
- AMQException
+ protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data,
+ AMQShortString exchange, AMQShortString routingKey,
+ ContentHeaderBody contentHeader) throws AMQException
{
- return new JMSStreamMessage(deliveryTag, contentHeader, data);
+ return new JMSStreamMessage(deliveryTag, contentHeader, exchange, routingKey, data);
}
public AbstractJMSMessage createMessage() throws JMSException
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
index 6c2c9a99d0..da75fe2351 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
@@ -42,7 +42,7 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text
* This constant represents the name of a property that is set when the message payload is null.
*/
private static final AMQShortString PAYLOAD_NULL_PROPERTY = new AMQShortString("JMS_QPID_NULL");
- private static final Charset DEFAULT_CHARSET = Charset.defaultCharset();
+ private static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8");
public JMSTextMessage() throws JMSException
{
@@ -56,10 +56,11 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text
getContentHeaderProperties().setEncoding(encoding);
}
- JMSTextMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, ByteBuffer data)
+ JMSTextMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
+ AMQShortString routingKey, ByteBuffer data)
throws AMQException
{
- super(deliveryTag, contentHeader, data);
+ super(deliveryTag, contentHeader, exchange, routingKey, data);
contentHeader.setContentType(MIME_TYPE_SHORT_STRING);
_data = data;
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java
index e7ddde2790..3218f3a8ba 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java
@@ -24,6 +24,7 @@ import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.AMQShortString;
import javax.jms.JMSException;
@@ -35,8 +36,11 @@ public class JMSTextMessageFactory extends AbstractJMSMessageFactory
return new JMSTextMessage();
}
- protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, ContentHeaderBody contentHeader) throws AMQException
+ protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data,
+ AMQShortString exchange, AMQShortString routingKey,
+ ContentHeaderBody contentHeader) throws AMQException
{
- return new JMSTextMessage(deliveryTag, (BasicContentHeaderProperties) contentHeader.properties, data);
+ return new JMSTextMessage(deliveryTag, (BasicContentHeaderProperties) contentHeader.properties,
+ exchange, routingKey, data);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
index 1fb178a1a4..d61e6571aa 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
@@ -22,6 +22,7 @@ package org.apache.qpid.client.message;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.AMQShortString;
import javax.jms.JMSException;
import java.util.List;
@@ -31,6 +32,7 @@ public interface MessageFactory
{
AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered,
ContentHeaderBody contentHeader,
+ AMQShortString exchange, AMQShortString routingKey,
List bodies)
throws JMSException, AMQException;
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
index df7537f1e8..83dcc57b80 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
@@ -63,6 +63,8 @@ public class MessageFactoryRegistry
* @throws JMSException
*/
public AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered,
+ AMQShortString exchange,
+ AMQShortString routingKey,
ContentHeaderBody contentHeader,
List bodies) throws AMQException, JMSException
{
@@ -74,7 +76,7 @@ public class MessageFactoryRegistry
}
else
{
- return mf.createMessage(deliveryTag, redelivered, contentHeader, bodies);
+ return mf.createMessage(deliveryTag, redelivered, contentHeader, exchange, routingKey, bodies);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
index 7d20c32b66..4f9c62df5c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
@@ -24,6 +24,8 @@ import org.apache.qpid.framing.*;
import java.util.List;
import java.util.LinkedList;
+import java.util.ArrayList;
+import java.util.Collections;
/**
* This class contains everything needed to process a JMS message. It assembles the
@@ -38,27 +40,93 @@ public class UnprocessedMessage
{
private long _bytesReceived = 0;
- public BasicDeliverBody deliverBody;
- public BasicReturnBody bounceBody; // TODO: check change (gustavo)
- public int channelId;
- public ContentHeaderBody contentHeader;
+ private final BasicDeliverBody _deliverBody;
+ private final BasicReturnBody _bounceBody; // TODO: check change (gustavo)
+ private final int _channelId;
+ private ContentHeaderBody _contentHeader;
/**
* List of ContentBody instances. Due to fragmentation you don't know how big this will be in general
*/
- public List bodies = new LinkedList();
+ private List<ContentBody> _bodies;
+
+ public UnprocessedMessage(int channelId, BasicDeliverBody deliverBody)
+ {
+ _deliverBody = deliverBody;
+ _channelId = channelId;
+ _bounceBody = null;
+ }
+
+
+ public UnprocessedMessage(int channelId, BasicReturnBody bounceBody)
+ {
+ _deliverBody = null;
+ _channelId = channelId;
+ _bounceBody = bounceBody;
+ }
public void receiveBody(ContentBody body) throws UnexpectedBodyReceivedException
{
- bodies.add(body);
+
if (body.payload != null)
{
- _bytesReceived += body.payload.remaining();
+ final long payloadSize = body.payload.remaining();
+
+ if(_bodies == null)
+ {
+ if(payloadSize == getContentHeader().bodySize)
+ {
+ _bodies = Collections.singletonList(body);
+ }
+ else
+ {
+ _bodies = new ArrayList<ContentBody>();
+ }
+
+ }
+ else
+ {
+ _bodies.add(body);
+ }
+ _bytesReceived += payloadSize;
}
}
public boolean isAllBodyDataReceived()
{
- return _bytesReceived == contentHeader.bodySize;
+ return _bytesReceived == getContentHeader().bodySize;
+ }
+
+ public BasicDeliverBody getDeliverBody()
+ {
+ return _deliverBody;
+ }
+
+ public BasicReturnBody getBounceBody()
+ {
+ return _bounceBody;
}
+
+
+ public int getChannelId()
+ {
+ return _channelId;
+ }
+
+
+ public ContentHeaderBody getContentHeader()
+ {
+ return _contentHeader;
+ }
+
+ public void setContentHeader(ContentHeaderBody contentHeader)
+ {
+ this._contentHeader = contentHeader;
+ }
+
+ public List<ContentBody> getBodies()
+ {
+ return _bodies;
+ }
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index 4e7f8a3032..ed1b5ae6f9 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -283,72 +283,87 @@ public class AMQProtocolHandler extends IoHandlerAdapter
public void messageReceived(IoSession session, Object message) throws Exception
{
+ final boolean debug = _logger.isDebugEnabled();
final long msgNumber = ++_messageReceivedCount;
- if (_logger.isDebugEnabled() && (msgNumber % 1000 == 0))
+ if (debug && (msgNumber % 1000 == 0))
{
_logger.debug("Received " + _messageReceivedCount + " protocol messages");
}
AMQFrame frame = (AMQFrame) message;
- HeartbeatDiagnostics.received(frame.bodyFrame instanceof HeartbeatBody);
+ final AMQBody bodyFrame = frame.getBodyFrame();
+
+ HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
- if (frame.bodyFrame instanceof AMQMethodBody)
+ switch(bodyFrame.getFrameType())
{
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Method frame received: " + frame);
- }
+ case AMQMethodBody.TYPE:
- final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.channel, (AMQMethodBody) frame.bodyFrame);
+ if (debug)
+ {
+ _logger.debug("Method frame received: " + frame);
+ }
- try
- {
+ final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), (AMQMethodBody) bodyFrame);
- boolean wasAnyoneInterested = getStateManager().methodReceived(evt);
- if (!_frameListeners.isEmpty())
+ try
{
- Iterator it = _frameListeners.iterator();
- while (it.hasNext())
+
+ boolean wasAnyoneInterested = getStateManager().methodReceived(evt);
+ if (!_frameListeners.isEmpty())
{
- final AMQMethodListener listener = (AMQMethodListener) it.next();
- wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
+ Iterator it = _frameListeners.iterator();
+ while (it.hasNext())
+ {
+ final AMQMethodListener listener = (AMQMethodListener) it.next();
+ wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
+ }
+ }
+ if (!wasAnyoneInterested)
+ {
+ throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" + _frameListeners);
}
}
- if (!wasAnyoneInterested)
+ catch (AMQException e)
{
- throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" + _frameListeners);
- }
- }
- catch (AMQException e)
- {
- getStateManager().error(e);
- if (!_frameListeners.isEmpty())
- {
- Iterator it = _frameListeners.iterator();
- while (it.hasNext())
+ getStateManager().error(e);
+ if (!_frameListeners.isEmpty())
{
- final AMQMethodListener listener = (AMQMethodListener) it.next();
- listener.error(e);
+ Iterator it = _frameListeners.iterator();
+ while (it.hasNext())
+ {
+ final AMQMethodListener listener = (AMQMethodListener) it.next();
+ listener.error(e);
+ }
}
+ exceptionCaught(session, e);
}
- exceptionCaught(session, e);
- }
- }
- else if (frame.bodyFrame instanceof ContentHeaderBody)
- {
- _protocolSession.messageContentHeaderReceived(frame.channel,
- (ContentHeaderBody) frame.bodyFrame);
- }
- else if (frame.bodyFrame instanceof ContentBody)
- {
- _protocolSession.messageContentBodyReceived(frame.channel,
- (ContentBody) frame.bodyFrame);
- }
- else if (frame.bodyFrame instanceof HeartbeatBody)
- {
- _logger.debug("Received heartbeat");
+ break;
+
+ case ContentHeaderBody.TYPE:
+
+ _protocolSession.messageContentHeaderReceived(frame.getChannel(),
+ (ContentHeaderBody) bodyFrame);
+ break;
+
+ case ContentBody.TYPE:
+
+ _protocolSession.messageContentBodyReceived(frame.getChannel(),
+ (ContentBody) bodyFrame);
+ break;
+
+ case HeartbeatBody.TYPE:
+
+ if(debug)
+ {
+ _logger.debug("Received heartbeat");
+ }
+ break;
+
+ default:
+
}
_connection.bytesReceived(_protocolSession.getIoSession().getReadBytes());
}
@@ -467,7 +482,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass, long timeout) throws AMQException
{
return writeCommandFrameAndWaitForReply(frame,
- new SpecificMethodFrameListener(frame.channel, responseClass), timeout);
+ new SpecificMethodFrameListener(frame.getChannel(), responseClass), timeout);
}
/**
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index 2399819a07..66c90d181c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -32,7 +32,7 @@ import org.apache.qpid.client.ConnectionTuneParameters;
import org.apache.qpid.client.message.UnexpectedBodyReceivedException;
import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.framing.*;
-import org.apache.qpid.protocol.AMQProtocolWriter;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.commons.lang.StringUtils;
@@ -47,7 +47,7 @@ import java.util.concurrent.ConcurrentMap;
* The underlying protocol session is still available but clients should not
* use it to obtain session attributes.
*/
-public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionList
+public class AMQProtocolSession implements ProtocolVersionList, AMQVersionAwareProtocolSession
{
protected static final int LAST_WRITE_FUTURE_JOIN_TIMEOUT = 1000 * 60 * 2;
@@ -95,8 +95,7 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis
private byte _protocolMinorVersion;
private byte _protocolMajorVersion;
-
-
+ private VersionSpecificRegistry _registry = MainRegistry.getVersionSpecificRegistry(pv[pv.length-1][PROTOCOL_MAJOR],pv[pv.length-1][PROTOCOL_MINOR]);
/**
@@ -125,6 +124,7 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis
{
_protocolHandler = protocolHandler;
_minaProtocolSession = protocolSession;
+ _minaProtocolSession.setAttachment(this);
// properties of the connection are made available to the event handlers
_minaProtocolSession.setAttribute(AMQ_CONNECTION, connection);
//fixme - real value needed
@@ -239,7 +239,7 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis
*/
public void unprocessedMessageReceived(UnprocessedMessage message) throws AMQException
{
- _channelId2UnprocessedMsgMap.put(message.channelId, message);
+ _channelId2UnprocessedMsgMap.put(message.getChannelId(), message);
}
public void messageContentHeaderReceived(int channelId, ContentHeaderBody contentHeader)
@@ -250,11 +250,11 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis
{
throw new AMQException("Error: received content header without having received a BasicDeliver frame first");
}
- if (msg.contentHeader != null)
+ if (msg.getContentHeader() != null)
{
throw new AMQException("Error: received duplicate content header or did not receive correct number of content body frames");
}
- msg.contentHeader = contentHeader;
+ msg.setContentHeader(contentHeader);
if (contentHeader.bodySize == 0)
{
deliverMessageToAMQSession(channelId, msg);
@@ -268,7 +268,7 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis
{
throw new AMQException("Error: received content body without having received a JMSDeliver frame first");
}
- if (msg.contentHeader == null)
+ if (msg.getContentHeader() == null)
{
_channelId2UnprocessedMsgMap.remove(channelId);
throw new AMQException("Error: received content body without having received a ContentHeader frame first");
@@ -465,11 +465,11 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis
session.confirmConsumerCancelled(consumerTag);
}
- public void setProtocolVersion(byte versionMajor, byte versionMinor)
+ public void setProtocolVersion(final byte versionMajor, final byte versionMinor)
{
_protocolMajorVersion = versionMajor;
_protocolMinorVersion = versionMinor;
-
+ _registry = MainRegistry.getVersionSpecificRegistry(versionMajor, versionMinor);
}
public byte getProtocolMinorVersion()
@@ -482,4 +482,9 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis
return _protocolMajorVersion;
}
+ public VersionSpecificRegistry getRegistry()
+ {
+ return _registry;
+ }
+
}