diff options
author | Robert Greig <rgreig@apache.org> | 2007-01-29 11:04:43 +0000 |
---|---|---|
committer | Robert Greig <rgreig@apache.org> | 2007-01-29 11:04:43 +0000 |
commit | a6808589c1ce06773f599bc28fe90938e2dcd60f (patch) | |
tree | 7e07a64a520fd85a98daf0036abab3d4a808ddf8 /java | |
parent | e88772d1b09594300643bbf1768c825c154e2723 (diff) | |
download | qpid-python-a6808589c1ce06773f599bc28fe90938e2dcd60f.tar.gz |
QPID-324 : Patch supplied by Rob Godfrey - Only send byte indicating topic / queue / other in properties field table, not whole destination
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@501005 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
27 files changed, 339 insertions, 115 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java b/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java index 39a5ffc0b8..5b4f4e015c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java @@ -48,6 +48,12 @@ public class AMQQueue extends AMQDestination implements Queue this(name, false); } + public AMQQueue(AMQShortString exchangeName, AMQShortString routingKey, AMQShortString queueName) + { + super(exchangeName, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, routingKey, false, + false, queueName, false); } + + /** * Create a reference to a non temporary queue. Note this does not actually imply the queue exists. * @param name the name of the queue diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java index 9b8a6686d3..f50b0390c5 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java @@ -44,6 +44,12 @@ public class AMQTopic extends AMQDestination implements Topic this(new AMQShortString(name)); } + public AMQTopic(AMQShortString exchange, AMQShortString routingKey, AMQShortString queueName) + { + super(exchange, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, routingKey, true, true, queueName, false); + } + + public AMQTopic(AMQShortString name) { this(name, true, null, false); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java new file mode 100644 index 0000000000..0f3723c58b --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java @@ -0,0 +1,45 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import org.apache.qpid.framing.AMQShortString;
+
+public class AMQUndefinedDestination extends AMQDestination
+{
+
+ private static final AMQShortString UNKNOWN_EXCHANGE_CLASS = new AMQShortString("unknown");
+
+
+ public AMQUndefinedDestination(AMQShortString exchange, AMQShortString routingKey, AMQShortString queueName)
+ {
+ super(exchange, UNKNOWN_EXCHANGE_CLASS, routingKey, queueName);
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return getDestinationName();
+ }
+
+ public boolean isNameRequired()
+ {
+ return getAMQQueueName() == null;
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index 04df584a8e..4426a7deee 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -441,8 +441,24 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j AbstractJMSMessage message = convertToNativeMessage(origMessage); - message.getJmsHeaders().setBytes(CustomJMSXProperty.JMSX_QPID_JMSDESTINATIONURL. - getShortStringName(), destination.toByteEncoding()); + + byte type; + if(destination instanceof Topic) + { + type = AMQDestination.TOPIC_TYPE; + } + else if(destination instanceof Queue) + { + type = AMQDestination.QUEUE_TYPE; + } + else + { + type = AMQDestination.UNKNOWN_TYPE; + } + + message.getJmsHeaders().setByte(CustomJMSXProperty.JMSZ_QPID_DESTTYPE.getShortStringName(), + type); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. diff --git a/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java b/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java index 26e26781c0..319077c45e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java +++ b/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java @@ -26,7 +26,7 @@ import java.util.*; public enum CustomJMSXProperty
{
- JMSX_QPID_JMSDESTINATIONURL,
+ JMSZ_QPID_DESTTYPE,
JMSXGroupID,
JMSXGroupSeq;
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java index f036f88d50..d19968235d 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java @@ -42,9 +42,7 @@ public class BasicDeliverMethodHandler implements StateAwareMethodListener public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException { - final UnprocessedMessage msg = new UnprocessedMessage(); - msg.deliverBody = (BasicDeliverBody) evt.getMethod(); - msg.channelId = evt.getChannelId(); + final UnprocessedMessage msg = new UnprocessedMessage(evt.getChannelId(), (BasicDeliverBody) evt.getMethod()); _logger.debug("New JmsDeliver method received"); protocolSession.unprocessedMessageReceived(msg); } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java index 72923c5cae..187473ec11 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java @@ -43,10 +43,7 @@ public class BasicReturnMethodHandler implements StateAwareMethodListener public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException { _logger.debug("New JmsBounce method received"); - final UnprocessedMessage msg = new UnprocessedMessage(); - msg.deliverBody = null; - msg.bounceBody = (BasicReturnBody) evt.getMethod(); - msg.channelId = evt.getChannelId(); + final UnprocessedMessage msg = new UnprocessedMessage(evt.getChannelId(),(BasicReturnBody)evt.getMethod()); protocolSession.unprocessedMessageReceived(msg); } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java index 805bdc6186..f7ce294880 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java @@ -24,6 +24,7 @@ import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.AMQShortString; import javax.jms.JMSException; import javax.jms.MessageEOFException; @@ -69,11 +70,11 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage _data.setAutoExpand(true); } - AbstractBytesMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data) - throws AMQException + AbstractBytesMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, + AMQShortString routingKey, ByteBuffer data) throws AMQException { // TODO: this casting is ugly. Need to review whole ContentHeaderBody idea - super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, data); + super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, exchange, routingKey, data); getContentHeaderProperties().setContentType(getMimeTypeAsShortString()); } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java index b941b9aee8..d0bae35dfb 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java @@ -2,6 +2,7 @@ package org.apache.qpid.client.message; import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.AMQException;
import javax.jms.*;
@@ -59,10 +60,10 @@ public abstract class AbstractBytesTypedMessage extends AbstractBytesMessage }
- AbstractBytesTypedMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data)
- throws AMQException
+ AbstractBytesTypedMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange,
+ AMQShortString routingKey, ByteBuffer data) throws AMQException
{
- super(messageNbr, contentHeader, data);
+ super(messageNbr, contentHeader, exchange, routingKey, data);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index 9dc4339895..113cce4ef6 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -26,8 +26,8 @@ import org.apache.qpid.AMQException; import org.apache.qpid.url.BindingURL; import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.URLSyntaxException; -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.client.BasicMessageConsumer; +import org.apache.qpid.client.AMQUndefinedDestination; +import org.apache.qpid.client.*; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.AMQShortString; @@ -66,9 +66,33 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties)_contentHeaderProperties).getHeaders()); } - protected AbstractJMSMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, ByteBuffer data) throws AMQException + protected AbstractJMSMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange, + AMQShortString routingKey, ByteBuffer data) throws AMQException { this(contentHeader, deliveryTag); + + + byte type = contentHeader.getHeaders().getByte(CustomJMSXProperty.JMSZ_QPID_DESTTYPE.getShortStringName()); + + AMQDestination dest; + + switch(type) + { + case AMQDestination.QUEUE_TYPE: + dest = new AMQQueue(exchange, routingKey, routingKey); + break; + case AMQDestination.TOPIC_TYPE: + dest = new AMQTopic(exchange, routingKey, null); + break; + default: + dest = new AMQUndefinedDestination(exchange, routingKey, null); + break; + } + //Destination dest = AMQDestination.createDestination(url); + setJMSDestination(dest); + + + _data = data; if (_data != null) { @@ -181,7 +205,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach return _destination; } - public void setJMSDestination(Destination destination) throws JMSException + public void setJMSDestination(Destination destination) { _destination = destination; } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java index 4b28a43c64..e6985080c0 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java @@ -23,6 +23,7 @@ package org.apache.qpid.client.message; import org.apache.qpid.AMQException; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.AMQShortString; import org.apache.log4j.Logger; import org.apache.mina.common.ByteBuffer; @@ -36,10 +37,12 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory protected abstract AbstractJMSMessage createMessage(long messageNbr, ByteBuffer data, - ContentHeaderBody contentHeader) throws AMQException; + AMQShortString exchange, AMQShortString routingKey, + ContentHeaderBody contentHeader) throws AMQException; protected AbstractJMSMessage createMessageWithBody(long messageNbr, ContentHeaderBody contentHeader, + AMQShortString exchange, AMQShortString routingKey, List bodies) throws AMQException { ByteBuffer data; @@ -54,7 +57,7 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory } data = ((ContentBody)bodies.get(0)).payload; } - else + else if (bodies != null) { if(debug) { @@ -70,19 +73,24 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory } data.flip(); } + else // bodies == null + { + data = ByteBuffer.allocate(0); + } if(debug) { _logger.debug("Creating message from buffer with position=" + data.position() + " and remaining=" + data.remaining()); } - return createMessage(messageNbr, data, contentHeader); + return createMessage(messageNbr, data, exchange, routingKey, contentHeader); } public AbstractJMSMessage createMessage(long messageNbr, boolean redelivered, ContentHeaderBody contentHeader, + AMQShortString exchange, AMQShortString routingKey, List bodies) throws JMSException, AMQException { - final AbstractJMSMessage msg = createMessageWithBody(messageNbr, contentHeader, bodies); + final AbstractJMSMessage msg = createMessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies); msg.setJMSRedelivered(redelivered); return msg; } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java index ec7ef453eb..f6971ca677 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java @@ -56,10 +56,10 @@ public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessag super(data); // this instanties a content header } - JMSBytesMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data) - throws AMQException + JMSBytesMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, + AMQShortString routingKey, ByteBuffer data) throws AMQException { - super(messageNbr, contentHeader, data); + super(messageNbr, contentHeader, exchange, routingKey, data); } public void reset() diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java index 78f392a83f..bd7ac4ab24 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java @@ -23,14 +23,17 @@ package org.apache.qpid.client.message; import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.AMQShortString; import javax.jms.JMSException; public class JMSBytesMessageFactory extends AbstractJMSMessageFactory { - protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, ContentHeaderBody contentHeader) throws AMQException + protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, + AMQShortString exchange, AMQShortString routingKey, + ContentHeaderBody contentHeader) throws AMQException { - return new JMSBytesMessage(deliveryTag, contentHeader, data); + return new JMSBytesMessage(deliveryTag, contentHeader, exchange, routingKey, data); } public AbstractJMSMessage createMessage() throws JMSException diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java index 35f8c54d90..3086e5b90a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java @@ -32,7 +32,7 @@ import java.util.Enumeration; public final class JMSHeaderAdapter
{
- FieldTable _headers;
+ private final FieldTable _headers;
public JMSHeaderAdapter(FieldTable headers)
{
@@ -319,6 +319,13 @@ public final class JMSHeaderAdapter getHeaders().setByte(string, b);
}
+ public void setByte(AMQShortString string, byte b) throws JMSException
+ {
+ checkPropertyName(string);
+ getHeaders().setByte(string, b);
+ }
+
+
public void setShort(String string, short i) throws JMSException
{
checkPropertyName(string);
@@ -331,6 +338,12 @@ public final class JMSHeaderAdapter getHeaders().setInteger(string, i);
}
+ public void setInteger(AMQShortString string, int i) throws JMSException
+ {
+ checkPropertyName(string);
+ getHeaders().setInteger(string, i);
+ }
+
public void setLong(String string, long l) throws JMSException
{
checkPropertyName(string);
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java index fcbb6500d4..d562cdc102 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java @@ -54,10 +54,10 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm } - JMSMapMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data) - throws AMQException + JMSMapMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, + AMQShortString routingKey, ByteBuffer data) throws AMQException { - super(messageNbr, contentHeader, data); + super(messageNbr, contentHeader, exchange, routingKey, data); try { populateMapFromData(); diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java index b110f04460..9cd7dd9149 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java @@ -22,6 +22,7 @@ package org.apache.qpid.client.message; import org.apache.mina.common.ByteBuffer; import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.AMQException; import javax.jms.JMSException; @@ -33,8 +34,10 @@ public class JMSMapMessageFactory extends AbstractJMSMessageFactory return new JMSMapMessage(); } - protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, ContentHeaderBody contentHeader) throws AMQException + protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, + AMQShortString exchange, AMQShortString routingKey, + ContentHeaderBody contentHeader) throws AMQException { - return new JMSMapMessage(deliveryTag, contentHeader, data); + return new JMSMapMessage(deliveryTag, contentHeader, exchange, routingKey, data); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java index 10ed530923..7f0c89a3be 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java @@ -62,9 +62,10 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag /** * Creates read only message for delivery to consumers */ - JMSObjectMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data) throws AMQException + JMSObjectMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, + AMQShortString routingKey, ByteBuffer data) throws AMQException { - super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, data); + super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, exchange, routingKey, data); } public void clearBodyImpl() throws JMSException diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java index b2228a6805..5f222712be 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java @@ -23,14 +23,17 @@ package org.apache.qpid.client.message; import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.AMQShortString; import javax.jms.JMSException; public class JMSObjectMessageFactory extends AbstractJMSMessageFactory { - protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, ContentHeaderBody contentHeader) throws AMQException + protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, + AMQShortString exchange, AMQShortString routingKey, + ContentHeaderBody contentHeader) throws AMQException { - return new JMSObjectMessage(deliveryTag, contentHeader, data); + return new JMSObjectMessage(deliveryTag, contentHeader, exchange, routingKey, data); } public AbstractJMSMessage createMessage() throws JMSException diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java index 747b97b11c..aca9d4796e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java @@ -61,10 +61,10 @@ public class JMSStreamMessage extends AbstractBytesTypedMessage implements Strea } - JMSStreamMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data) - throws AMQException + JMSStreamMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, + AMQShortString routingKey, ByteBuffer data) throws AMQException { - super(messageNbr, contentHeader, data); + super(messageNbr, contentHeader, exchange, routingKey, data); } public void reset() diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java index aae9f0cdb2..337afdac15 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java @@ -22,16 +22,18 @@ package org.apache.qpid.client.message; import org.apache.mina.common.ByteBuffer; import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.AMQException; import javax.jms.JMSException; public class JMSStreamMessageFactory extends AbstractJMSMessageFactory { - protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, ContentHeaderBody contentHeader) throws - AMQException + protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, + AMQShortString exchange, AMQShortString routingKey, + ContentHeaderBody contentHeader) throws AMQException { - return new JMSStreamMessage(deliveryTag, contentHeader, data); + return new JMSStreamMessage(deliveryTag, contentHeader, exchange, routingKey, data); } public AbstractJMSMessage createMessage() throws JMSException diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java index 6c2c9a99d0..da75fe2351 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java @@ -42,7 +42,7 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text * This constant represents the name of a property that is set when the message payload is null. */ private static final AMQShortString PAYLOAD_NULL_PROPERTY = new AMQShortString("JMS_QPID_NULL"); - private static final Charset DEFAULT_CHARSET = Charset.defaultCharset(); + private static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8"); public JMSTextMessage() throws JMSException { @@ -56,10 +56,11 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text getContentHeaderProperties().setEncoding(encoding); } - JMSTextMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, ByteBuffer data) + JMSTextMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange, + AMQShortString routingKey, ByteBuffer data) throws AMQException { - super(deliveryTag, contentHeader, data); + super(deliveryTag, contentHeader, exchange, routingKey, data); contentHeader.setContentType(MIME_TYPE_SHORT_STRING); _data = data; } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java index e7ddde2790..3218f3a8ba 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java @@ -24,6 +24,7 @@ import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.AMQShortString; import javax.jms.JMSException; @@ -35,8 +36,11 @@ public class JMSTextMessageFactory extends AbstractJMSMessageFactory return new JMSTextMessage(); } - protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, ContentHeaderBody contentHeader) throws AMQException + protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, + AMQShortString exchange, AMQShortString routingKey, + ContentHeaderBody contentHeader) throws AMQException { - return new JMSTextMessage(deliveryTag, (BasicContentHeaderProperties) contentHeader.properties, data); + return new JMSTextMessage(deliveryTag, (BasicContentHeaderProperties) contentHeader.properties, + exchange, routingKey, data); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java index 1fb178a1a4..d61e6571aa 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java @@ -22,6 +22,7 @@ package org.apache.qpid.client.message; import org.apache.qpid.AMQException; import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.AMQShortString; import javax.jms.JMSException; import java.util.List; @@ -31,6 +32,7 @@ public interface MessageFactory { AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered, ContentHeaderBody contentHeader, + AMQShortString exchange, AMQShortString routingKey, List bodies) throws JMSException, AMQException; diff --git a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java index df7537f1e8..83dcc57b80 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java @@ -63,6 +63,8 @@ public class MessageFactoryRegistry * @throws JMSException */ public AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered, + AMQShortString exchange, + AMQShortString routingKey, ContentHeaderBody contentHeader, List bodies) throws AMQException, JMSException { @@ -74,7 +76,7 @@ public class MessageFactoryRegistry } else { - return mf.createMessage(deliveryTag, redelivered, contentHeader, bodies); + return mf.createMessage(deliveryTag, redelivered, contentHeader, exchange, routingKey, bodies); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java index 7d20c32b66..4f9c62df5c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java @@ -24,6 +24,8 @@ import org.apache.qpid.framing.*; import java.util.List; import java.util.LinkedList; +import java.util.ArrayList; +import java.util.Collections; /** * This class contains everything needed to process a JMS message. It assembles the @@ -38,27 +40,93 @@ public class UnprocessedMessage { private long _bytesReceived = 0; - public BasicDeliverBody deliverBody; - public BasicReturnBody bounceBody; // TODO: check change (gustavo) - public int channelId; - public ContentHeaderBody contentHeader; + private final BasicDeliverBody _deliverBody; + private final BasicReturnBody _bounceBody; // TODO: check change (gustavo) + private final int _channelId; + private ContentHeaderBody _contentHeader; /** * List of ContentBody instances. Due to fragmentation you don't know how big this will be in general */ - public List bodies = new LinkedList(); + private List<ContentBody> _bodies; + + public UnprocessedMessage(int channelId, BasicDeliverBody deliverBody) + { + _deliverBody = deliverBody; + _channelId = channelId; + _bounceBody = null; + } + + + public UnprocessedMessage(int channelId, BasicReturnBody bounceBody) + { + _deliverBody = null; + _channelId = channelId; + _bounceBody = bounceBody; + } public void receiveBody(ContentBody body) throws UnexpectedBodyReceivedException { - bodies.add(body); + if (body.payload != null) { - _bytesReceived += body.payload.remaining(); + final long payloadSize = body.payload.remaining(); + + if(_bodies == null) + { + if(payloadSize == getContentHeader().bodySize) + { + _bodies = Collections.singletonList(body); + } + else + { + _bodies = new ArrayList<ContentBody>(); + } + + } + else + { + _bodies.add(body); + } + _bytesReceived += payloadSize; } } public boolean isAllBodyDataReceived() { - return _bytesReceived == contentHeader.bodySize; + return _bytesReceived == getContentHeader().bodySize; + } + + public BasicDeliverBody getDeliverBody() + { + return _deliverBody; + } + + public BasicReturnBody getBounceBody() + { + return _bounceBody; } + + + public int getChannelId() + { + return _channelId; + } + + + public ContentHeaderBody getContentHeader() + { + return _contentHeader; + } + + public void setContentHeader(ContentHeaderBody contentHeader) + { + this._contentHeader = contentHeader; + } + + public List<ContentBody> getBodies() + { + return _bodies; + } + } diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 4e7f8a3032..ed1b5ae6f9 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -283,72 +283,87 @@ public class AMQProtocolHandler extends IoHandlerAdapter public void messageReceived(IoSession session, Object message) throws Exception { + final boolean debug = _logger.isDebugEnabled(); final long msgNumber = ++_messageReceivedCount; - if (_logger.isDebugEnabled() && (msgNumber % 1000 == 0)) + if (debug && (msgNumber % 1000 == 0)) { _logger.debug("Received " + _messageReceivedCount + " protocol messages"); } AMQFrame frame = (AMQFrame) message; - HeartbeatDiagnostics.received(frame.bodyFrame instanceof HeartbeatBody); + final AMQBody bodyFrame = frame.getBodyFrame(); + + HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody); - if (frame.bodyFrame instanceof AMQMethodBody) + switch(bodyFrame.getFrameType()) { - if (_logger.isDebugEnabled()) - { - _logger.debug("Method frame received: " + frame); - } + case AMQMethodBody.TYPE: - final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.channel, (AMQMethodBody) frame.bodyFrame); + if (debug) + { + _logger.debug("Method frame received: " + frame); + } - try - { + final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), (AMQMethodBody) bodyFrame); - boolean wasAnyoneInterested = getStateManager().methodReceived(evt); - if (!_frameListeners.isEmpty()) + try { - Iterator it = _frameListeners.iterator(); - while (it.hasNext()) + + boolean wasAnyoneInterested = getStateManager().methodReceived(evt); + if (!_frameListeners.isEmpty()) { - final AMQMethodListener listener = (AMQMethodListener) it.next(); - wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested; + Iterator it = _frameListeners.iterator(); + while (it.hasNext()) + { + final AMQMethodListener listener = (AMQMethodListener) it.next(); + wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested; + } + } + if (!wasAnyoneInterested) + { + throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" + _frameListeners); } } - if (!wasAnyoneInterested) + catch (AMQException e) { - throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" + _frameListeners); - } - } - catch (AMQException e) - { - getStateManager().error(e); - if (!_frameListeners.isEmpty()) - { - Iterator it = _frameListeners.iterator(); - while (it.hasNext()) + getStateManager().error(e); + if (!_frameListeners.isEmpty()) { - final AMQMethodListener listener = (AMQMethodListener) it.next(); - listener.error(e); + Iterator it = _frameListeners.iterator(); + while (it.hasNext()) + { + final AMQMethodListener listener = (AMQMethodListener) it.next(); + listener.error(e); + } } + exceptionCaught(session, e); } - exceptionCaught(session, e); - } - } - else if (frame.bodyFrame instanceof ContentHeaderBody) - { - _protocolSession.messageContentHeaderReceived(frame.channel, - (ContentHeaderBody) frame.bodyFrame); - } - else if (frame.bodyFrame instanceof ContentBody) - { - _protocolSession.messageContentBodyReceived(frame.channel, - (ContentBody) frame.bodyFrame); - } - else if (frame.bodyFrame instanceof HeartbeatBody) - { - _logger.debug("Received heartbeat"); + break; + + case ContentHeaderBody.TYPE: + + _protocolSession.messageContentHeaderReceived(frame.getChannel(), + (ContentHeaderBody) bodyFrame); + break; + + case ContentBody.TYPE: + + _protocolSession.messageContentBodyReceived(frame.getChannel(), + (ContentBody) bodyFrame); + break; + + case HeartbeatBody.TYPE: + + if(debug) + { + _logger.debug("Received heartbeat"); + } + break; + + default: + } _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes()); } @@ -467,7 +482,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass, long timeout) throws AMQException { return writeCommandFrameAndWaitForReply(frame, - new SpecificMethodFrameListener(frame.channel, responseClass), timeout); + new SpecificMethodFrameListener(frame.getChannel(), responseClass), timeout); } /** diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index 2399819a07..66c90d181c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -32,7 +32,7 @@ import org.apache.qpid.client.ConnectionTuneParameters; import org.apache.qpid.client.message.UnexpectedBodyReceivedException; import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpid.framing.*; -import org.apache.qpid.protocol.AMQProtocolWriter; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; import org.apache.qpid.client.state.AMQStateManager; import org.apache.commons.lang.StringUtils; @@ -47,7 +47,7 @@ import java.util.concurrent.ConcurrentMap; * The underlying protocol session is still available but clients should not * use it to obtain session attributes. */ -public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionList +public class AMQProtocolSession implements ProtocolVersionList, AMQVersionAwareProtocolSession { protected static final int LAST_WRITE_FUTURE_JOIN_TIMEOUT = 1000 * 60 * 2; @@ -95,8 +95,7 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis private byte _protocolMinorVersion; private byte _protocolMajorVersion; - - + private VersionSpecificRegistry _registry = MainRegistry.getVersionSpecificRegistry(pv[pv.length-1][PROTOCOL_MAJOR],pv[pv.length-1][PROTOCOL_MINOR]); /** @@ -125,6 +124,7 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis { _protocolHandler = protocolHandler; _minaProtocolSession = protocolSession; + _minaProtocolSession.setAttachment(this); // properties of the connection are made available to the event handlers _minaProtocolSession.setAttribute(AMQ_CONNECTION, connection); //fixme - real value needed @@ -239,7 +239,7 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis */ public void unprocessedMessageReceived(UnprocessedMessage message) throws AMQException { - _channelId2UnprocessedMsgMap.put(message.channelId, message); + _channelId2UnprocessedMsgMap.put(message.getChannelId(), message); } public void messageContentHeaderReceived(int channelId, ContentHeaderBody contentHeader) @@ -250,11 +250,11 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis { throw new AMQException("Error: received content header without having received a BasicDeliver frame first"); } - if (msg.contentHeader != null) + if (msg.getContentHeader() != null) { throw new AMQException("Error: received duplicate content header or did not receive correct number of content body frames"); } - msg.contentHeader = contentHeader; + msg.setContentHeader(contentHeader); if (contentHeader.bodySize == 0) { deliverMessageToAMQSession(channelId, msg); @@ -268,7 +268,7 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis { throw new AMQException("Error: received content body without having received a JMSDeliver frame first"); } - if (msg.contentHeader == null) + if (msg.getContentHeader() == null) { _channelId2UnprocessedMsgMap.remove(channelId); throw new AMQException("Error: received content body without having received a ContentHeader frame first"); @@ -465,11 +465,11 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis session.confirmConsumerCancelled(consumerTag); } - public void setProtocolVersion(byte versionMajor, byte versionMinor) + public void setProtocolVersion(final byte versionMajor, final byte versionMinor) { _protocolMajorVersion = versionMajor; _protocolMinorVersion = versionMinor; - + _registry = MainRegistry.getVersionSpecificRegistry(versionMajor, versionMinor); } public byte getProtocolMinorVersion() @@ -482,4 +482,9 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis return _protocolMajorVersion; } + public VersionSpecificRegistry getRegistry() + { + return _registry; + } + } |