summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2008-08-07 19:25:12 +0000
committerRafael H. Schloming <rhs@apache.org>2008-08-07 19:25:12 +0000
commitf4dc59ea3028b87c1f8640df02c2a73b5cafcf1a (patch)
tree3e903ac9fb763cf6d1e055dad8d7cb403ef675ae
parent06e4488afee377347660ea481959dfbfb720ab47 (diff)
downloadqpid-python-f4dc59ea3028b87c1f8640df02c2a73b5cafcf1a.tar.gz
QPID-1213: Patch from rgodfrey to refactor AbstractJMSMessage and descendants to move AMQP version specific code into delegates and remove unnecessary conversion between 0-8 and 0-10 objects
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@683683 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java3
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQDestination.java6
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java47
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java12
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java12
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java9
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java23
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java56
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AMQMessage.java135
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java138
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegateFactory.java54
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java900
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java567
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java26
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java20
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java605
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java59
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java25
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java11
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java23
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java11
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java32
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java11
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java27
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java11
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java52
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java12
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java41
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java7
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java15
-rw-r--r--java/client/src/main/java/org/apache/qpid/filter/PropertyExpression.java74
-rw-r--r--java/client/src/main/java/org/apache/qpid/jms/Message.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/util/ByteBufferMessage.java6
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java8
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java32
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java171
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java7
-rw-r--r--java/systests/src/main/java/org/apache/qpid/client/message/NonQpidObjectMessage.java8
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java4
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java6
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/store/TestMemoryMessageStore.java (renamed from java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java)4
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java5
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/txn/TxnBufferTest.java4
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/util/TestApplicationRegistry.java131
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java7
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java52
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java2
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java3
53 files changed, 2384 insertions, 1116 deletions
diff --git a/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java b/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
index c6cd5da01d..67eb180dbf 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
@@ -24,16 +24,15 @@ import junit.framework.TestCase;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.SimpleAMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.protocol.InternalTestProtocolSession;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.ConsumerTagNotUniqueException;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.BasicContentHeaderProperties;
diff --git a/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java b/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
index 471912c85a..15449dc613 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
@@ -20,20 +20,15 @@
*/
package org.apache.qpid.server.util;
-import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.MapConfiguration;
import org.apache.qpid.server.exchange.ExchangeFactory;
import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.management.ManagedObjectRegistry;
import org.apache.qpid.server.management.NoopManagedObjectRegistry;
-import org.apache.qpid.server.plugins.PluginManager;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.security.access.ACLPlugin;
import org.apache.qpid.server.security.access.plugins.AllowAll;
-import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
import org.apache.qpid.server.security.auth.database.PropertiesPrincipalDatabaseManager;
-import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
index 52080112c9..6c78b754bb 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
@@ -65,9 +65,9 @@ public abstract class AMQDestination implements Destination, Referenceable
private static final int IS_EXCLUSIVE_MASK = 0x2;
private static final int IS_AUTODELETE_MASK = 0x4;
- public static final Integer QUEUE_TYPE = Integer.valueOf(1);
- public static final Integer TOPIC_TYPE = Integer.valueOf(2);
- public static final Integer UNKNOWN_TYPE = Integer.valueOf(3);
+ public static final int QUEUE_TYPE = 1;
+ public static final int TOPIC_TYPE = 2;
+ public static final int UNKNOWN_TYPE = 3;
protected AMQDestination(String url) throws URISyntaxException
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index d2ef457c94..723e502ff0 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -68,15 +68,7 @@ import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverNoopSupport;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.failover.FailoverRetrySupport;
-import org.apache.qpid.client.message.AbstractJMSMessage;
-import org.apache.qpid.client.message.JMSBytesMessage;
-import org.apache.qpid.client.message.JMSMapMessage;
-import org.apache.qpid.client.message.JMSObjectMessage;
-import org.apache.qpid.client.message.JMSStreamMessage;
-import org.apache.qpid.client.message.JMSTextMessage;
-import org.apache.qpid.client.message.MessageFactoryRegistry;
-import org.apache.qpid.client.message.ReturnMessage;
-import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.qpid.client.message.*;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
import org.apache.qpid.client.state.AMQStateManager;
@@ -109,6 +101,8 @@ import org.slf4j.LoggerFactory;
*/
public abstract class AMQSession extends Closeable implements Session, QueueSession, TopicSession
{
+
+
public static final class IdToConsumerMap
{
private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16];
@@ -401,7 +395,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
* @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session.
* @param defaultPrefetchLowMark The number of prefetched messages at which to resume the session.
*/
- AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
+ protected AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark)
{
@@ -515,7 +509,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
public BytesMessage createBytesMessage() throws JMSException
{
checkNotClosed();
- return new JMSBytesMessage();
+ return new JMSBytesMessage(getMessageDelegateFactory());
}
/**
@@ -629,7 +623,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
// Ensure we only try and close an open session.
if (!_closed.getAndSet(true))
{
- synchronized (_connection.getFailoverMutex())
+ synchronized (getFailoverMutex())
{
// We must close down all producers and consumers in an orderly fashion. This is the only method
// that can be called from a different thread of control from the one controlling the session.
@@ -691,7 +685,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
if (!_closed.getAndSet(true))
{
- synchronized (_connection.getFailoverMutex())
+ synchronized (getFailoverMutex())
{
synchronized (_messageDeliveryLock)
{
@@ -944,7 +938,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
public MapMessage createMapMessage() throws JMSException
{
checkNotClosed();
- return new JMSMapMessage();
+ return new JMSMapMessage(getMessageDelegateFactory());
}
public javax.jms.Message createMessage() throws JMSException
@@ -955,7 +949,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
public ObjectMessage createObjectMessage() throws JMSException
{
checkNotClosed();
- return (ObjectMessage) new JMSObjectMessage();
+ return (ObjectMessage) new JMSObjectMessage(getMessageDelegateFactory());
}
public ObjectMessage createObjectMessage(Serializable object) throws JMSException
@@ -1158,11 +1152,11 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
// with the correct error code and text this is cleary WRONG as the instanceof check below will fail.
// We need to determin here if the connection should be
- synchronized (_connection.getFailoverMutex())
+ synchronized (getFailoverMutex())
{
checkNotClosed();
- return new JMSStreamMessage();
+ return new JMSStreamMessage(getMessageDelegateFactory());
}
}
@@ -1215,14 +1209,19 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
public TextMessage createTextMessage() throws JMSException
{
- synchronized (_connection.getFailoverMutex())
+ synchronized (getFailoverMutex())
{
checkNotClosed();
- return new JMSTextMessage();
+ return new JMSTextMessage(getMessageDelegateFactory());
}
}
+ protected Object getFailoverMutex()
+ {
+ return _connection.getFailoverMutex();
+ }
+
public TextMessage createTextMessage(String text) throws JMSException
{
@@ -1459,7 +1458,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}
}
- abstract void sendRecover() throws AMQException, FailoverException;
+ protected abstract void sendRecover() throws AMQException, FailoverException;
public void rejectMessage(UnprocessedMessage message, boolean requeue)
{
@@ -2181,7 +2180,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}
- abstract Long requestQueueDepth(AMQDestination amqd) throws AMQException, FailoverException;
+ protected abstract Long requestQueueDepth(AMQDestination amqd) throws AMQException, FailoverException;
/**
* Declares the named exchange and type of exchange.
@@ -2887,9 +2886,11 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}
}
- abstract boolean tagLE(long tag1, long tag2);
+ protected abstract boolean tagLE(long tag1, long tag2);
+
+ protected abstract boolean updateRollbackMark(long current, long deliveryTag);
- abstract boolean updateRollbackMark(long current, long deliveryTag);
+ public abstract AMQMessageDelegateFactory getMessageDelegateFactory();
/*public void requestAccess(AMQShortString realm, boolean exclusive, boolean passive, boolean active, boolean write,
boolean read) throws AMQException
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index cf73bc5089..32a0fdd5f5 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -27,6 +27,7 @@ import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.FiledTableSupport;
+import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.util.Serial;
import org.apache.qpid.nclient.Session;
import org.apache.qpid.nclient.util.MessagePartListenerAdapter;
@@ -773,7 +774,7 @@ public class AMQSession_0_10 extends AMQSession
return subscriber;
}
- Long requestQueueDepth(AMQDestination amqd)
+ protected Long requestQueueDepth(AMQDestination amqd)
{
return getQpidSession().queueQuery(amqd.getQueueName()).get().getMessageCount();
}
@@ -821,14 +822,19 @@ public class AMQSession_0_10 extends AMQSession
}
}
- final boolean tagLE(long tag1, long tag2)
+ protected final boolean tagLE(long tag1, long tag2)
{
return Serial.le((int) tag1, (int) tag2);
}
- final boolean updateRollbackMark(long currentMark, long deliveryTag)
+ protected final boolean updateRollbackMark(long currentMark, long deliveryTag)
{
return Serial.lt((int) currentMark, (int) deliveryTag);
}
+ public AMQMessageDelegateFactory getMessageDelegateFactory()
+ {
+ return AMQMessageDelegateFactory.FACTORY_0_10;
+ }
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index 3ccff410eb..2c88d6f557 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -29,6 +29,7 @@ import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.failover.FailoverRetrySupport;
import org.apache.qpid.client.message.MessageFactoryRegistry;
+import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
import org.apache.qpid.common.AMQPFilterTypes;
@@ -470,7 +471,7 @@ public final class AMQSession_0_8 extends AMQSession
}
- Long requestQueueDepth(AMQDestination amqd) throws AMQException, FailoverException
+ protected Long requestQueueDepth(AMQDestination amqd) throws AMQException, FailoverException
{
AMQFrame queueDeclare =
getMethodRegistry().createQueueDeclareBody(getTicket(),
@@ -486,14 +487,19 @@ public final class AMQSession_0_8 extends AMQSession
return okHandler._messageCount;
}
- final boolean tagLE(long tag1, long tag2)
+ protected final boolean tagLE(long tag1, long tag2)
{
return tag1 <= tag2;
}
- final boolean updateRollbackMark(long currentMark, long deliveryTag)
+ protected final boolean updateRollbackMark(long currentMark, long deliveryTag)
{
return false;
}
+ public AMQMessageDelegateFactory getMessageDelegateFactory()
+ {
+ return AMQMessageDelegateFactory.FACTORY_0_8;
+ }
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index aaf56e8493..0176f66552 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -25,6 +25,7 @@ import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.*;
import org.apache.qpid.jms.MessageConsumer;
@@ -683,7 +684,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me
try
{
- AbstractJMSMessage jmsMessage = createJMSMessageFromUnprocessedMessage(messageFrame);
+ AbstractJMSMessage jmsMessage = createJMSMessageFromUnprocessedMessage(_session.getMessageDelegateFactory(), messageFrame);
if (debug)
{
@@ -720,7 +721,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me
}
}
- public abstract AbstractJMSMessage createJMSMessageFromUnprocessedMessage(UnprocessedMessage<H, B> messageFrame)
+ public abstract AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, UnprocessedMessage<H, B> messageFrame)
throws Exception;
/** @param jmsMessage this message has already been processed so can't redo preDeliver */
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index ab9e1d285f..388adfb434 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -24,12 +24,10 @@ import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.AMQException;
-import org.apache.qpid.jms.*;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.api.Message;
import org.apache.qpid.transport.*;
-import org.apache.qpid.transport.Session;
import org.apache.qpid.QpidException;
import org.apache.qpid.filter.MessageFilter;
import org.apache.qpid.filter.JMSSelectorFilter;
@@ -269,12 +267,11 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
}
@Override public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(
- UnprocessedMessage<Struct[], ByteBuffer> messageFrame) throws Exception
+ AMQMessageDelegateFactory delegateFactory, UnprocessedMessage<Struct[], ByteBuffer> messageFrame) throws Exception
{
return _messageFactory.createMessage(messageFrame.getDeliveryTag(), messageFrame.isRedelivered(),
- messageFrame.getExchange(), messageFrame.getRoutingKey(),
- messageFrame.getContentHeader(), messageFrame.getBodies(),
- messageFrame.getReplyToURL());
+ messageFrame.getContentHeader(), messageFrame.getBodies()
+ );
}
// private methods
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
index 816bc430b2..e601f454a4 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
@@ -20,8 +20,6 @@
*/
package org.apache.qpid.client;
-import java.util.concurrent.TimeUnit;
-
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
@@ -31,6 +29,7 @@ import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.filter.JMSSelectorFilter;
import org.apache.qpid.framing.AMQFrame;
@@ -82,7 +81,7 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<ContentHeader
}
}
- public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(UnprocessedMessage<ContentHeaderBody, ContentBody> messageFrame)throws Exception
+ public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, UnprocessedMessage<ContentHeaderBody, ContentBody> messageFrame)throws Exception
{
return _messageFactory.createMessage(messageFrame.getDeliveryTag(),
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index 514b0f1213..b32cc86edb 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -31,23 +31,14 @@ import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.ObjectMessage;
-import javax.jms.Queue;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
-import javax.jms.Topic;
-import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageConverter;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.CompositeAMQDataBlock;
import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.ExchangeDeclareBody;
import org.apache.qpid.util.UUIDGen;
import org.apache.qpid.util.UUIDs;
import org.slf4j.Logger;
@@ -369,27 +360,27 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
if (message instanceof BytesMessage)
{
- newMessage = new MessageConverter((BytesMessage) message).getConvertedMessage();
+ newMessage = new MessageConverter(_session, (BytesMessage) message).getConvertedMessage();
}
else if (message instanceof MapMessage)
{
- newMessage = new MessageConverter((MapMessage) message).getConvertedMessage();
+ newMessage = new MessageConverter(_session, (MapMessage) message).getConvertedMessage();
}
else if (message instanceof ObjectMessage)
{
- newMessage = new MessageConverter((ObjectMessage) message).getConvertedMessage();
+ newMessage = new MessageConverter(_session, (ObjectMessage) message).getConvertedMessage();
}
else if (message instanceof TextMessage)
{
- newMessage = new MessageConverter((TextMessage) message).getConvertedMessage();
+ newMessage = new MessageConverter(_session, (TextMessage) message).getConvertedMessage();
}
else if (message instanceof StreamMessage)
{
- newMessage = new MessageConverter((StreamMessage) message).getConvertedMessage();
+ newMessage = new MessageConverter(_session, (StreamMessage) message).getConvertedMessage();
}
else
{
- newMessage = new MessageConverter(message).getConvertedMessage();
+ newMessage = new MessageConverter(_session, message).getConvertedMessage();
}
if (newMessage != null)
@@ -460,7 +451,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
UUID messageId = null;
if (_disableMessageId)
{
- message.setJMSMessageID(null);
+ message.setJMSMessageID((UUID)null);
}
else
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
index 4e906b387d..2f06dc9d39 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
@@ -29,6 +29,7 @@ import javax.jms.DeliveryMode;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.FiledTableSupport;
+import org.apache.qpid.client.message.AMQMessageDelegate_0_10;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
@@ -72,9 +73,15 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
boolean immediate, boolean wait) throws JMSException
{
message.prepareForSending();
- if (message.get010Message() == null)
+
+ AMQMessageDelegate_0_10 delegate = (AMQMessageDelegate_0_10) message.getDelegate();
+
+ org.apache.qpid.api.Message underlyingMessage = message.get010Message();
+ if (underlyingMessage == null)
{
- message.set010Message(new ByteBufferMessage());
+ underlyingMessage = new ByteBufferMessage(delegate.getMessageProperties(), delegate.getDeliveryProperties());
+ message.set010Message(underlyingMessage);
+
}
// force a rebuild of the 0-10 message if data has changed
if (message.getData() == null)
@@ -82,8 +89,8 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
message.dataChanged();
}
- DeliveryProperties deliveryProp = message.get010Message().getDeliveryProperties();
- MessageProperties messageProps = message.get010Message().getMessageProperties();
+ DeliveryProperties deliveryProp = underlyingMessage.getDeliveryProperties();
+ MessageProperties messageProps = underlyingMessage.getMessageProperties();
if (messageId != null)
{
@@ -144,20 +151,10 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
deliveryProp.setRoutingKey(routingKey);
}
- BasicContentHeaderProperties contentHeaderProperties = message.getContentHeaderProperties();
- if (contentHeaderProperties.reset())
- {
- // set the application properties
- messageProps.setContentType(contentHeaderProperties.getContentType().toString());
- messageProps.setContentLength(message.getContentLength());
+ messageProps.setContentLength(message.getContentLength());
- AMQShortString correlationID = contentHeaderProperties.getCorrelationId();
- if (correlationID != null)
- {
- messageProps.setCorrelationId(correlationID.getBytes());
- }
- String replyToURL = contentHeaderProperties.getReplyToAsString();
+ /* String replyToURL = contentHeaderProperties.getReplyToAsString();
if (replyToURL != null)
{
if(_logger.isDebugEnabled())
@@ -179,31 +176,8 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
}
messageProps.setReplyTo(new ReplyTo(dest.getExchangeName().toString(), dest.getRoutingKey().toString()));
}
+*/
- Map<String,Object> map = null;
-
- if (contentHeaderProperties.getHeaders() != null)
- {
- //JMS_QPID_DESTTYPE is always set but useles so this is a temporary fix
- contentHeaderProperties.getHeaders().remove(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName());
- map = FiledTableSupport.convertToMap(contentHeaderProperties.getHeaders());
- }
-
- AMQShortString type = contentHeaderProperties.getType();
- if (type != null)
- {
- if (map == null)
- {
- map = new HashMap<String,Object>();
- }
- map.put(AbstractJMSMessage.JMS_TYPE, type.toString());
- }
-
- if (map != null)
- {
- messageProps.setApplicationHeaders(map);
- }
- }
// send the message
try
@@ -221,7 +195,7 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
try
{
ssn.messageTransfer(destination.getExchangeName().toString(),
- message.get010Message(),
+ underlyingMessage,
ssn.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
ssn.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
index 8ca68850eb..c547fcb488 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
@@ -27,6 +27,8 @@ import javax.jms.Message;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.message.AMQMessageDelegate;
+import org.apache.qpid.client.message.AMQMessageDelegate_0_8;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.BasicConsumeBody;
@@ -81,7 +83,8 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
message.prepareForSending();
ByteBuffer payload = message.getData();
- BasicContentHeaderProperties contentHeaderProperties = message.getContentHeaderProperties();
+ AMQMessageDelegate_0_8 delegate = (AMQMessageDelegate_0_8) message.getDelegate();
+ BasicContentHeaderProperties contentHeaderProperties = delegate.getContentHeaderProperties();
if (!_disableTimestamps)
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessage.java
index 8741a1cbb6..e69de29bb2 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessage.java
@@ -1,135 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client.message;
-
-import javax.jms.JMSException;
-
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.framing.ContentHeaderProperties;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-
-import java.math.BigDecimal;
-
-public class AMQMessage
-{
- protected ContentHeaderProperties _contentHeaderProperties;
-
- /** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */
- protected AMQSession _session;
-
- protected final long _deliveryTag;
-
- public AMQMessage(ContentHeaderProperties properties, long deliveryTag)
- {
- _contentHeaderProperties = properties;
- _deliveryTag = deliveryTag;
- }
-
- public AMQMessage(ContentHeaderProperties properties)
- {
- this(properties, -1);
- }
-
- /**
- * The session is set when CLIENT_ACKNOWLEDGE mode is used so that the CHANNEL ACK can be sent when the user calls
- * acknowledge()
- *
- * @param s the AMQ session that delivered this message
- */
- public void setAMQSession(AMQSession s)
- {
- _session = s;
- }
-
- public AMQSession getAMQSession()
- {
- return _session;
- }
-
- /**
- * Get the AMQ message number assigned to this message
- *
- * @return the message number
- */
- public long getDeliveryTag()
- {
- return _deliveryTag;
- }
-
- /** Invoked prior to sending the message. Allows the message to be modified if necessary before sending. */
- public void prepareForSending() throws JMSException
- {
- }
-
- public FieldTable getPropertyHeaders()
- {
- return ((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders();
- }
-
- public void setDecimalProperty(AMQShortString propertyName, BigDecimal bd) throws JMSException
- {
- getPropertyHeaders().setDecimal(propertyName, bd);
- }
-
- public void setIntProperty(AMQShortString propertyName, int i) throws JMSException
- {
- getPropertyHeaders().setInteger(propertyName, new Integer(i));
- }
-
- public void setLongStringProperty(AMQShortString propertyName, String value)
- {
- getPropertyHeaders().setString(propertyName, value);
- }
-
- public void setTimestampProperty(AMQShortString propertyName, long value)
- {
- getPropertyHeaders().setTimestamp(propertyName, value);
- }
-
- public void setVoidProperty(AMQShortString propertyName)
- {
- getPropertyHeaders().setVoid(propertyName);
- }
-
- //** Getters
-
- public BigDecimal getDecimalProperty(AMQShortString propertyName) throws JMSException
- {
- return getPropertyHeaders().getDecimal(propertyName);
- }
-
- public int getIntegerProperty(AMQShortString propertyName) throws JMSException
- {
- return getPropertyHeaders().getInteger(propertyName);
- }
-
- public String getLongStringProperty(AMQShortString propertyName)
- {
- return getPropertyHeaders().getString(propertyName);
- }
-
- public Long getTimestampProperty(AMQShortString propertyName)
- {
- return getPropertyHeaders().getTimestamp(propertyName);
- }
-}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java
new file mode 100644
index 0000000000..7f43e4b4b2
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java
@@ -0,0 +1,138 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.client.message;
+
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import java.util.Enumeration;
+import java.util.UUID;
+
+public interface AMQMessageDelegate
+{
+ void acknowledgeThis() throws JMSException;
+
+ String getJMSMessageID() throws JMSException;
+
+ void setJMSMessageID(String string) throws JMSException;
+
+ long getJMSTimestamp() throws JMSException;
+
+ void setJMSTimestamp(long l) throws JMSException;
+
+ byte[] getJMSCorrelationIDAsBytes() throws JMSException;
+
+ void setJMSCorrelationIDAsBytes(byte[] bytes) throws JMSException;
+
+ void setJMSCorrelationID(String string) throws JMSException;
+
+ String getJMSCorrelationID() throws JMSException;
+
+ Destination getJMSReplyTo() throws JMSException;
+
+ void setJMSReplyTo(Destination destination) throws JMSException;
+
+ Destination getJMSDestination() throws JMSException;
+
+ int getJMSDeliveryMode() throws JMSException;
+
+ void setJMSDeliveryMode(int i) throws JMSException;
+
+ String getJMSType() throws JMSException;
+
+ void setJMSType(String string) throws JMSException;
+
+ long getJMSExpiration() throws JMSException;
+
+ void setJMSExpiration(long l) throws JMSException;
+
+ int getJMSPriority() throws JMSException;
+
+ void setJMSPriority(int i) throws JMSException;
+
+ void clearProperties() throws JMSException;
+
+ boolean propertyExists(String string) throws JMSException;
+
+ boolean getBooleanProperty(String string) throws JMSException;
+
+ byte getByteProperty(String string) throws JMSException;
+
+ short getShortProperty(String string) throws JMSException;
+
+ int getIntProperty(String string) throws JMSException;
+
+ long getLongProperty(String string) throws JMSException;
+
+ float getFloatProperty(String string) throws JMSException;
+
+ double getDoubleProperty(String string) throws JMSException;
+
+ String getStringProperty(String string) throws JMSException;
+
+ Object getObjectProperty(String string) throws JMSException;
+
+ Enumeration getPropertyNames() throws JMSException;
+
+ void setBooleanProperty(String string, boolean b) throws JMSException;
+
+ void setByteProperty(String string, byte b) throws JMSException;
+
+ void setShortProperty(String string, short i) throws JMSException;
+
+ void setIntProperty(String string, int i) throws JMSException;
+
+ void setLongProperty(String string, long l) throws JMSException;
+
+ void setFloatProperty(String string, float v) throws JMSException;
+
+ void setDoubleProperty(String string, double v) throws JMSException;
+
+ void setStringProperty(String string, String string1) throws JMSException;
+
+ void setObjectProperty(String string, Object object) throws JMSException;
+
+ void acknowledge() throws JMSException;
+
+ public void setJMSDestination(Destination destination);
+
+ public void setContentType(String contentType);
+ public String getContentType();
+
+ public void setEncoding(String encoding);
+ public String getEncoding();
+
+
+ String getReplyToString();
+
+ void removeProperty(final String propertyName) throws JMSException;
+
+ void setAMQSession(final AMQSession s);
+
+ AMQSession getAMQSession();
+
+ long getDeliveryTag();
+
+ void setJMSMessageID(final UUID messageId) throws JMSException;
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegateFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegateFactory.java
new file mode 100644
index 0000000000..8c3f2fd08f
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegateFactory.java
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.client.message;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.AMQException;
+
+public interface AMQMessageDelegateFactory<D extends AMQMessageDelegate>
+{
+ public static AMQMessageDelegateFactory DEFAULT_FACTORY = null;
+
+ public static AMQMessageDelegateFactory<AMQMessageDelegate_0_8> FACTORY_0_8 =
+ new AMQMessageDelegateFactory<AMQMessageDelegate_0_8>()
+ {
+ public AMQMessageDelegate_0_8 createDelegate()
+ {
+ return new AMQMessageDelegate_0_8();
+ }
+ };
+
+ public static AMQMessageDelegateFactory<AMQMessageDelegate_0_10> FACTORY_0_10 =
+ new AMQMessageDelegateFactory<AMQMessageDelegate_0_10>()
+ {
+ public AMQMessageDelegate_0_10 createDelegate()
+ {
+ return new AMQMessageDelegate_0_10();
+ }
+ };
+
+
+ public D createDelegate();
+
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
new file mode 100644
index 0000000000..97cb2baee4
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
@@ -0,0 +1,900 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.client.message;
+
+import org.apache.commons.collections.map.ReferenceMap;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.CustomJMSXProperty;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.AMQUndefinedDestination;
+import org.apache.qpid.client.JMSAMQException;
+import org.apache.qpid.framing.ContentHeaderProperties;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQPInvalidClassException;
+import org.apache.qpid.jms.Message;
+import org.apache.qpid.url.BindingURL;
+import org.apache.qpid.url.AMQBindingURL;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.transport.*;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageNotWriteableException;
+import javax.jms.MessageFormatException;
+import javax.jms.DeliveryMode;
+import java.util.*;
+import java.net.URISyntaxException;
+import java.nio.charset.Charset;
+
+public class AMQMessageDelegate_0_10 implements AMQMessageDelegate
+{
+ private static final Map<ReplyTo, Destination> _destinationCache = Collections.synchronizedMap(new ReferenceMap());
+
+ public static final String JMS_TYPE = "x-jms-type";
+
+
+ private boolean _readableProperties = false;
+
+ private Destination _destination;
+
+
+ private MessageProperties _messageProps;
+ private DeliveryProperties _deliveryProps;
+ /** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */
+ private AMQSession _session;
+ private final long _deliveryTag;
+
+ private static Map<String,Integer> _exchangeTypeMap = new HashMap<String, Integer>();
+
+ static
+ {
+ // TODO - XXX - Need to add to this map when we find an exchange we don't know about
+
+ _exchangeTypeMap.put(null, AMQDestination.QUEUE_TYPE);
+ _exchangeTypeMap.put("amq.direct", AMQDestination.QUEUE_TYPE);
+ _exchangeTypeMap.put("", AMQDestination.QUEUE_TYPE);
+ _exchangeTypeMap.put("amq.topic", AMQDestination.TOPIC_TYPE);
+ _exchangeTypeMap.put("amq.fanout", AMQDestination.TOPIC_TYPE);
+
+
+ }
+
+ protected AMQMessageDelegate_0_10()
+ {
+ this(new MessageProperties(), new DeliveryProperties(), -1);
+ _readableProperties = false;
+
+
+ }
+
+ protected AMQMessageDelegate_0_10(long deliveryTag, MessageProperties messageProps, DeliveryProperties deliveryProps, AMQShortString exchange,
+ AMQShortString routingKey) throws AMQException
+ {
+ this(messageProps, deliveryProps, deliveryTag);
+
+
+ AMQDestination dest;
+
+ dest = new AMQUndefinedDestination(exchange, routingKey, null);
+
+ // Destination dest = AMQDestination.createDestination(url);
+ setJMSDestination(dest);
+
+
+
+ }
+
+ protected AMQMessageDelegate_0_10(MessageProperties messageProps, DeliveryProperties deliveryProps, long deliveryTag)
+ {
+ _messageProps = messageProps;
+ _deliveryProps = deliveryProps;
+ _deliveryTag = deliveryTag;
+ _readableProperties = (_messageProps != null);
+
+ }
+
+
+ public String getJMSMessageID() throws JMSException
+ {
+ UUID id = _messageProps.getMessageId();
+ return id == null ? null : "ID:" + id;
+ }
+
+ public void setJMSMessageID(String messageId) throws JMSException
+ {
+ if(messageId == null)
+ {
+ _messageProps.clearMessageId();
+ }
+ else
+ {
+ if(messageId.startsWith("ID:"))
+ {
+ try
+ {
+ _messageProps.setMessageId(UUID.fromString(messageId.substring(3)));
+ }
+ catch(IllegalArgumentException ex)
+ {
+ throw new JMSException("MessageId '"+messageId+"' is not of the correct format, it must be ID: followed by a UUID");
+ }
+ }
+ else
+ {
+ throw new JMSException("MessageId '"+messageId+"' is not of the correct format, it must be ID: followed by a UUID");
+ }
+ }
+ }
+
+ public void setJMSMessageID(UUID messageId) throws JMSException
+ {
+ if(messageId == null)
+ {
+ _messageProps.clearMessageId();
+ }
+ else
+ {
+ _messageProps.setMessageId(messageId);
+ }
+ }
+
+
+ public long getJMSTimestamp() throws JMSException
+ {
+ return _deliveryProps.getTimestamp();
+ }
+
+ public void setJMSTimestamp(long timestamp) throws JMSException
+ {
+ _deliveryProps.setTimestamp(timestamp);
+ }
+
+ public byte[] getJMSCorrelationIDAsBytes() throws JMSException
+ {
+ return _messageProps.getCorrelationId();
+ }
+
+ public void setJMSCorrelationIDAsBytes(byte[] bytes) throws JMSException
+ {
+ _messageProps.setCorrelationId(bytes);
+ }
+
+ public void setJMSCorrelationID(String correlationId) throws JMSException
+ {
+
+ setJMSCorrelationIDAsBytes(correlationId == null ? null : correlationId.getBytes());
+ }
+
+ public String getJMSCorrelationID() throws JMSException
+ {
+
+ byte[] correlationIDAsBytes = getJMSCorrelationIDAsBytes();
+ return correlationIDAsBytes == null ? null : new String(correlationIDAsBytes);
+ }
+
+ public Destination getJMSReplyTo()
+ {
+ ReplyTo replyTo = _messageProps.getReplyTo();
+
+ if (replyTo == null)
+ {
+ return null;
+ }
+ else
+ {
+ Destination dest = _destinationCache.get(replyTo);
+ if (dest == null)
+ {
+ String exchange = replyTo.getExchange();
+ String routingKey = replyTo.getRoutingKey();
+
+ int type = _exchangeTypeMap.get(exchange);
+
+ switch(type)
+ {
+ case AMQDestination.QUEUE_TYPE:
+ dest = new AMQQueue(new AMQShortString(exchange), new AMQShortString(routingKey), new AMQShortString(routingKey));
+ break;
+ case AMQDestination.TOPIC_TYPE:
+ dest = new AMQTopic(new AMQShortString(exchange), new AMQShortString(routingKey), null);
+ break;
+ default:
+ dest = new AMQUndefinedDestination(new AMQShortString(exchange), new AMQShortString(routingKey), null);
+ }
+
+
+
+ _destinationCache.put(replyTo, dest);
+ }
+
+ return dest;
+ }
+ }
+
+ public void setJMSReplyTo(Destination destination) throws JMSException
+ {
+ if (destination == null)
+ {
+ throw new IllegalArgumentException("Null destination not allowed");
+ }
+
+ if (!(destination instanceof AMQDestination))
+ {
+ throw new IllegalArgumentException(
+ "ReplyTo destination may only be an AMQDestination - passed argument was type " + destination.getClass());
+ }
+
+ final AMQDestination amqd = (AMQDestination) destination;
+
+ final ReplyTo replyTo = new ReplyTo(amqd.getExchangeName().toString(), amqd.getRoutingKey().toString());
+ _destinationCache.put(replyTo, destination);
+ _messageProps.setReplyTo(replyTo);
+
+ }
+
+ public Destination getJMSDestination() throws JMSException
+ {
+ return _destination;
+ }
+
+ public void setJMSDestination(Destination destination)
+ {
+ _destination = destination;
+ }
+
+ public void setContentType(String contentType)
+ {
+ _messageProps.setContentType(contentType);
+ }
+
+ public String getContentType()
+ {
+ return _messageProps.getContentType();
+ }
+
+ public void setEncoding(String encoding)
+ {
+ if(encoding == null || encoding.length() == 0)
+ {
+ _messageProps.clearContentEncoding();
+ }
+ else
+ {
+ _messageProps.setContentEncoding(encoding);
+ }
+ }
+
+ public String getEncoding()
+ {
+ return _messageProps.getContentEncoding();
+ }
+
+ public String getReplyToString()
+ {
+ Destination replyTo = getJMSReplyTo();
+ if(replyTo != null)
+ {
+ return ((AMQDestination)replyTo).toURL();
+ }
+ else
+ {
+ return null;
+ }
+
+ }
+
+ public int getJMSDeliveryMode() throws JMSException
+ {
+
+ MessageDeliveryMode deliveryMode = _deliveryProps.getDeliveryMode();
+ if(deliveryMode != null)
+ {
+ switch(deliveryMode)
+ {
+ case PERSISTENT :
+ return DeliveryMode.PERSISTENT;
+ case NON_PERSISTENT:
+ return DeliveryMode.NON_PERSISTENT;
+ default:
+ throw new JMSException("Unknown Message Delivery Mode: " + _deliveryProps.getDeliveryMode());
+ }
+ }
+ else
+ {
+ return Message.DEFAULT_DELIVERY_MODE;
+ }
+
+ }
+
+ public void setJMSDeliveryMode(int deliveryMode) throws JMSException
+ {
+ switch(deliveryMode)
+ {
+ case DeliveryMode.PERSISTENT:
+ _deliveryProps.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
+ break;
+ case DeliveryMode.NON_PERSISTENT:
+ _deliveryProps.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
+ break;
+ default:
+ throw new JMSException("Unknown JMS Delivery Mode: " + deliveryMode);
+ }
+
+ }
+
+
+ public String getJMSType() throws JMSException
+ {
+ if(getApplicationHeaders().containsKey(JMS_TYPE))
+ {
+ return getStringProperty(JMS_TYPE);
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ private Map<String, Object> getApplicationHeaders()
+ {
+ Map<String, Object> map = _messageProps.getApplicationHeaders();
+ return map == null ? Collections.EMPTY_MAP : map;
+ }
+
+ public void setJMSType(String type) throws JMSException
+ {
+ Map<String, Object> headers = _messageProps.getApplicationHeaders();
+ if(type == null)
+ {
+ if(headers != null)
+ {
+ headers.remove(JMS_TYPE);
+ }
+ }
+ else
+ {
+ if(headers == null)
+ {
+ headers = new HashMap<String,Object>();
+ _messageProps.setApplicationHeaders(headers);
+
+ }
+ headers.put(JMS_TYPE, type);
+ }
+ }
+
+ public long getJMSExpiration() throws JMSException
+ {
+ return _deliveryProps.getExpiration();
+ }
+
+ public void setJMSExpiration(long l) throws JMSException
+ {
+ _deliveryProps.setExpiration(l);
+ }
+
+
+
+ public boolean propertyExists(String propertyName) throws JMSException
+ {
+ return getApplicationHeaders().containsKey(propertyName);
+ }
+
+ public boolean getBooleanProperty(String propertyName) throws JMSException
+ {
+ checkPropertyName(propertyName);
+
+ Object o = getApplicationHeaders().get(propertyName);
+
+ if(o instanceof Boolean)
+ {
+ return ((Boolean)o).booleanValue();
+ }
+ else if(o instanceof String)
+ {
+ return Boolean.valueOf((String) o).booleanValue();
+ }
+ else if(getApplicationHeaders().containsKey(propertyName))
+ {
+ throw new MessageFormatException("getBooleanProperty(\""+propertyName+"\") failed as value is not boolean: " + o);
+ }
+ else
+ {
+ return Boolean.valueOf(null);
+ }
+ }
+
+ public byte getByteProperty(String propertyName) throws JMSException
+ {
+ checkPropertyName(propertyName);
+
+ Map<String, Object> propertyMap = getApplicationHeaders();
+
+ Object o = propertyMap.get(propertyName);
+
+ if(o instanceof Byte)
+ {
+ return ((Byte)o).byteValue();
+ }
+ else if(o instanceof String)
+ {
+ return Byte.valueOf((String) o).byteValue();
+ }
+ else if(getApplicationHeaders().containsKey(propertyName))
+ {
+ throw new MessageFormatException("getByteProperty(\""+propertyName+"\") failed as value is not a byte: " + o);
+ }
+ else
+ {
+ return Byte.valueOf(null);
+ }
+ }
+
+ public short getShortProperty(String propertyName) throws JMSException
+ {
+ checkPropertyName(propertyName);
+
+ Map<String, Object> propertyMap = getApplicationHeaders();
+
+ Object o = propertyMap.get(propertyName);
+
+ if(o instanceof Short)
+ {
+ return ((Short)o).shortValue();
+ }
+ else
+ {
+ try
+ {
+ return Short.valueOf(getByteProperty(propertyName));
+ }
+ catch(MessageFormatException e)
+ {
+ throw new MessageFormatException("getShortProperty(\""+propertyName+"\") failed as value is not a short: " + o);
+ }
+ }
+
+
+ }
+
+ public int getIntProperty(String propertyName) throws JMSException
+ {
+ checkPropertyName(propertyName);
+
+ Map<String, Object> propertyMap = getApplicationHeaders();
+
+ Object o = propertyMap.get(propertyName);
+
+ if(o instanceof Integer)
+ {
+ return ((Integer)o).intValue();
+ }
+ else
+ {
+ try
+ {
+ return Integer.valueOf(getShortProperty(propertyName));
+ }
+ catch(MessageFormatException e)
+ {
+ throw new MessageFormatException("getIntProperty(\""+propertyName+"\") failed as value is not an int: " + o);
+ }
+
+ }
+ }
+
+ public long getLongProperty(String propertyName) throws JMSException
+ {
+ checkPropertyName(propertyName);
+
+ Map<String, Object> propertyMap = getApplicationHeaders();
+
+ Object o = propertyMap.get(propertyName);
+
+ if(o instanceof Long)
+ {
+ return ((Long)o).longValue();
+ }
+ else
+ {
+ try
+ {
+ return Long.valueOf(getIntProperty(propertyName));
+ }
+ catch(MessageFormatException e)
+ {
+ throw new MessageFormatException("getLongProperty(\""+propertyName+"\") failed as value is not a long: " + o);
+ }
+
+ }
+ }
+
+ public float getFloatProperty(String propertyName) throws JMSException
+ {
+ checkPropertyName(propertyName);
+ Map<String, Object> propertyMap = getApplicationHeaders();
+
+ Object o = propertyMap.get(propertyName);
+
+ if(o instanceof Float)
+ {
+ return ((Float)o).floatValue();
+ }
+ else if(o instanceof String)
+ {
+ return Float.valueOf((String) o).floatValue();
+ }
+ else if(getApplicationHeaders().containsKey(propertyName))
+ {
+ throw new MessageFormatException("getFloatProperty(\""+propertyName+"\") failed as value is not a float: " + o);
+ }
+ else
+ {
+ return Float.valueOf(null);
+ }
+
+ }
+
+ public double getDoubleProperty(String propertyName) throws JMSException
+ {
+ checkPropertyName(propertyName);
+
+ Map<String, Object> propertyMap = getApplicationHeaders();
+
+ Object o = propertyMap.get(propertyName);
+
+ if(o instanceof Double)
+ {
+ return ((Double)o).doubleValue();
+ }
+ else
+ {
+ try
+ {
+ return Double.valueOf(getFloatProperty(propertyName));
+ }
+ catch(MessageFormatException e)
+ {
+ throw new MessageFormatException("getDoubleProperty(\""+propertyName+"\") failed as value is not a double: " + o);
+ }
+
+ }
+ }
+
+ public String getStringProperty(String propertyName) throws JMSException
+ {
+ if (propertyName.equals(CustomJMSXProperty.JMSXUserID.toString()))
+ {
+ return new String(_messageProps.getUserId());
+ }
+ else
+ {
+ checkPropertyName(propertyName);
+ Map<String, Object> propertyMap = getApplicationHeaders();
+
+ Object o = propertyMap.get(propertyName);
+
+ if(o instanceof String)
+ {
+ return (String) o;
+ }
+ else if(o == null)
+ {
+ if(propertyMap.containsKey(propertyName))
+ {
+ return null;
+ }
+ else
+ {
+ return String.valueOf(null);
+ }
+ }
+ else if(o.getClass().isArray())
+ {
+ throw new MessageFormatException("getString(\""+propertyName+"\") failed as value of type " + o.getClass()+ " is an array.");
+ }
+ else
+ {
+ return String.valueOf(o);
+ }
+
+ }
+ }
+
+ public Object getObjectProperty(String propertyName) throws JMSException
+ {
+ checkPropertyName(propertyName);
+ Map<String, Object> propertyMap = getApplicationHeaders();
+
+ return propertyMap.get(propertyName);
+
+ }
+
+ public Enumeration getPropertyNames() throws JMSException
+ {
+ return java.util.Collections.enumeration(getApplicationHeaders().keySet());
+ }
+
+ public void setBooleanProperty(String propertyName, boolean b) throws JMSException
+ {
+ checkWritableProperties();
+ setApplicationHeader(propertyName, b);
+ }
+
+ public void setByteProperty(String propertyName, byte b) throws JMSException
+ {
+ checkWritableProperties();
+ setApplicationHeader(propertyName, b);
+ }
+
+ public void setShortProperty(String propertyName, short i) throws JMSException
+ {
+ checkWritableProperties();
+ setApplicationHeader(propertyName, i);
+ }
+
+ public void setIntProperty(String propertyName, int i) throws JMSException
+ {
+ checkWritableProperties();
+ setApplicationHeader(propertyName, i);
+ }
+
+ public void setLongProperty(String propertyName, long l) throws JMSException
+ {
+ checkWritableProperties();
+ setApplicationHeader(propertyName, l);
+ }
+
+ public void setFloatProperty(String propertyName, float f) throws JMSException
+ {
+ checkWritableProperties();
+ setApplicationHeader(propertyName, f);
+ }
+
+ public void setDoubleProperty(String propertyName, double v) throws JMSException
+ {
+ checkWritableProperties();
+ setApplicationHeader(propertyName, v);
+ }
+
+ public void setStringProperty(String propertyName, String value) throws JMSException
+ {
+ checkWritableProperties();
+ setApplicationHeader(propertyName, value);
+ }
+
+ public void setObjectProperty(String propertyName, Object object) throws JMSException
+ {
+ checkWritableProperties();
+ setApplicationHeader(propertyName, object);
+ }
+
+ private void setApplicationHeader(String propertyName, Object object)
+ {
+ Map<String, Object> headers = _messageProps.getApplicationHeaders();
+ if(headers == null)
+ {
+ headers = new HashMap<String,Object>();
+ _messageProps.setApplicationHeaders(headers);
+ }
+ headers.put(propertyName, object);
+ }
+
+ public void removeProperty(String propertyName) throws JMSException
+ {
+ Map<String, Object> headers = _messageProps.getApplicationHeaders();
+ if(headers != null)
+ {
+ headers.remove(propertyName);
+ }
+ }
+
+
+
+ protected void checkWritableProperties() throws MessageNotWriteableException
+ {
+ if (_readableProperties)
+ {
+ throw new MessageNotWriteableException("You need to call clearProperties() to make the message writable");
+ }
+ }
+
+
+ public int getJMSPriority() throws JMSException
+ {
+ MessageDeliveryPriority messageDeliveryPriority = _deliveryProps.getPriority();
+ return messageDeliveryPriority == null ? Message.DEFAULT_PRIORITY : messageDeliveryPriority.getValue();
+ }
+
+ public void setJMSPriority(int i) throws JMSException
+ {
+ _deliveryProps.setPriority(MessageDeliveryPriority.get((short)i));
+ }
+
+ public void clearProperties() throws JMSException
+ {
+ if(!getApplicationHeaders().isEmpty())
+ {
+ getApplicationHeaders().clear();
+ }
+
+ _readableProperties = false;
+ }
+
+
+ public void acknowledgeThis() throws JMSException
+ {
+ // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
+ // is not specified. In our case, we only set the session field where client acknowledge mode is specified.
+ if (_session != null)
+ {
+ if (_session.getAMQConnection().isClosed())
+ {
+ throw new javax.jms.IllegalStateException("Connection is already closed");
+ }
+
+ // we set multiple to true here since acknowledgement implies acknowledge of all previous messages
+ // received on the session
+ _session.acknowledgeMessage(_deliveryTag, true);
+ }
+ }
+
+ public void acknowledge() throws JMSException
+ {
+ if (_session != null)
+ {
+ _session.acknowledge();
+ }
+ }
+
+
+ /**
+ * The session is set when CLIENT_ACKNOWLEDGE mode is used so that the CHANNEL ACK can be sent when the user calls
+ * acknowledge()
+ *
+ * @param s the AMQ session that delivered this message
+ */
+ public void setAMQSession(AMQSession s)
+ {
+ _session = s;
+ }
+
+ public AMQSession getAMQSession()
+ {
+ return _session;
+ }
+
+ /**
+ * Get the AMQ message number assigned to this message
+ *
+ * @return the message number
+ */
+ public long getDeliveryTag()
+ {
+ return _deliveryTag;
+ }
+
+
+
+
+
+
+ protected void checkPropertyName(CharSequence propertyName)
+ {
+ if (propertyName == null)
+ {
+ throw new IllegalArgumentException("Property name must not be null");
+ }
+ else if (propertyName.length() == 0)
+ {
+ throw new IllegalArgumentException("Property name must not be the empty string");
+ }
+
+ checkIdentiferFormat(propertyName);
+ }
+
+ protected void checkIdentiferFormat(CharSequence propertyName)
+ {
+// JMS requirements 3.5.1 Property Names
+// Identifiers:
+// - An identifier is an unlimited-length character sequence that must begin
+// with a Java identifier start character; all following characters must be Java
+// identifier part characters. An identifier start character is any character for
+// which the method Character.isJavaIdentifierStart returns true. This includes
+// '_' and '$'. An identifier part character is any character for which the
+// method Character.isJavaIdentifierPart returns true.
+// - Identifiers cannot be the names NULL, TRUE, or FALSE.
+// Identifiers cannot be NOT, AND, OR, BETWEEN, LIKE, IN, IS, or
+// ESCAPE.
+// Identifiers are either header field references or property references. The
+// type of a property value in a message selector corresponds to the type
+// used to set the property. If a property that does not exist in a message is
+// referenced, its value is NULL. The semantics of evaluating NULL values
+// in a selector are described in Section 3.8.1.2, Null Values.
+// The conversions that apply to the get methods for properties do not
+// apply when a property is used in a message selector expression. For
+// example, suppose you set a property as a string value, as in the
+// following:
+// myMessage.setStringProperty("NumberOfOrders", "2");
+// The following expression in a message selector would evaluate to false,
+// because a string cannot be used in an arithmetic expression:
+// "NumberOfOrders > 1"
+// Identifiers are case sensitive.
+// Message header field references are restricted to JMSDeliveryMode,
+// JMSPriority, JMSMessageID, JMSTimestamp, JMSCorrelationID, and
+// JMSType. JMSMessageID, JMSCorrelationID, and JMSType values may be
+// null and if so are treated as a NULL value.
+
+ if (Boolean.getBoolean("strict-jms"))
+ {
+ // JMS start character
+ if (!(Character.isJavaIdentifierStart(propertyName.charAt(0))))
+ {
+ throw new IllegalArgumentException("Identifier '" + propertyName + "' does not start with a valid JMS identifier start character");
+ }
+
+ // JMS part character
+ int length = propertyName.length();
+ for (int c = 1; c < length; c++)
+ {
+ if (!(Character.isJavaIdentifierPart(propertyName.charAt(c))))
+ {
+ throw new IllegalArgumentException("Identifier '" + propertyName + "' contains an invalid JMS identifier character");
+ }
+ }
+
+ // JMS invalid names
+ if ((propertyName.equals("NULL")
+ || propertyName.equals("TRUE")
+ || propertyName.equals("FALSE")
+ || propertyName.equals("NOT")
+ || propertyName.equals("AND")
+ || propertyName.equals("OR")
+ || propertyName.equals("BETWEEN")
+ || propertyName.equals("LIKE")
+ || propertyName.equals("IN")
+ || propertyName.equals("IS")
+ || propertyName.equals("ESCAPE")))
+ {
+ throw new IllegalArgumentException("Identifier '" + propertyName + "' is not allowed in JMS");
+ }
+ }
+
+ }
+
+
+ public MessageProperties getMessageProperties()
+ {
+ return _messageProps;
+ }
+
+
+ public DeliveryProperties getDeliveryProperties()
+ {
+ return _deliveryProps;
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
new file mode 100644
index 0000000000..4c20a44849
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
@@ -0,0 +1,567 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.client.message;
+
+import org.apache.commons.collections.map.ReferenceMap;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.CustomJMSXProperty;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.AMQUndefinedDestination;
+import org.apache.qpid.client.JMSAMQException;
+import org.apache.qpid.framing.ContentHeaderProperties;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.url.BindingURL;
+import org.apache.qpid.url.AMQBindingURL;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageNotWriteableException;
+import java.util.Map;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.UUID;
+import java.net.URISyntaxException;
+
+public class AMQMessageDelegate_0_8 implements AMQMessageDelegate
+{
+ private static final Map _destinationCache = Collections.synchronizedMap(new ReferenceMap());
+
+ public static final String JMS_TYPE = "x-jms-type";
+
+
+ private boolean _readableProperties = false;
+
+ private Destination _destination;
+ private JMSHeaderAdapter _headerAdapter;
+ private static final boolean STRICT_AMQP_COMPLIANCE =
+ Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, AMQSession.STRICT_AMQP_DEFAULT));
+
+ private ContentHeaderProperties _contentHeaderProperties;
+ /** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */
+ private AMQSession _session;
+ private final long _deliveryTag;
+
+ protected AMQMessageDelegate_0_8()
+ {
+ this(new BasicContentHeaderProperties(), -1);
+ _readableProperties = false;
+ _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders());
+
+ }
+
+ protected AMQMessageDelegate_0_8(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
+ AMQShortString routingKey)
+ {
+ this(contentHeader, deliveryTag);
+
+ Integer type = contentHeader.getHeaders().getInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName());
+
+ if(type == null)
+ {
+ type = AMQDestination.UNKNOWN_TYPE;
+ }
+
+ AMQDestination dest;
+
+ switch(type.intValue())
+ {
+ case AMQDestination.QUEUE_TYPE:
+ dest = new AMQQueue(exchange, routingKey, routingKey);
+ break;
+ case AMQDestination.TOPIC_TYPE:
+ dest = new AMQTopic(exchange, routingKey, null);
+ break;
+ default:
+ dest = new AMQUndefinedDestination(exchange, routingKey, null);
+ }
+
+
+
+ // Destination dest = AMQDestination.createDestination(url);
+ setJMSDestination(dest);
+
+
+
+ }
+
+ protected AMQMessageDelegate_0_8(BasicContentHeaderProperties properties, long deliveryTag)
+ {
+ _contentHeaderProperties = properties;
+ _deliveryTag = deliveryTag;
+ _readableProperties = (_contentHeaderProperties != null);
+ _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders());
+ }
+
+
+ public String getJMSMessageID() throws JMSException
+ {
+ return getContentHeaderProperties().getMessageIdAsString();
+ }
+
+ public void setJMSMessageID(String messageId) throws JMSException
+ {
+ getContentHeaderProperties().setMessageId(messageId);
+ }
+
+ public void setJMSMessageID(UUID messageId) throws JMSException
+ {
+ getContentHeaderProperties().setMessageId("ID:" + messageId);
+ }
+
+
+ public long getJMSTimestamp() throws JMSException
+ {
+ return getContentHeaderProperties().getTimestamp();
+ }
+
+ public void setJMSTimestamp(long timestamp) throws JMSException
+ {
+ getContentHeaderProperties().setTimestamp(timestamp);
+ }
+
+ public byte[] getJMSCorrelationIDAsBytes() throws JMSException
+ {
+ return getContentHeaderProperties().getCorrelationIdAsString().getBytes();
+ }
+
+ public void setJMSCorrelationIDAsBytes(byte[] bytes) throws JMSException
+ {
+ getContentHeaderProperties().setCorrelationId(new String(bytes));
+ }
+
+ public void setJMSCorrelationID(String correlationId) throws JMSException
+ {
+ getContentHeaderProperties().setCorrelationId(correlationId);
+ }
+
+ public String getJMSCorrelationID() throws JMSException
+ {
+ return getContentHeaderProperties().getCorrelationIdAsString();
+ }
+
+ public Destination getJMSReplyTo() throws JMSException
+ {
+ String replyToEncoding = getContentHeaderProperties().getReplyToAsString();
+ if (replyToEncoding == null)
+ {
+ return null;
+ }
+ else
+ {
+ Destination dest = (Destination) _destinationCache.get(replyToEncoding);
+ if (dest == null)
+ {
+ try
+ {
+ BindingURL binding = new AMQBindingURL(replyToEncoding);
+ dest = AMQDestination.createDestination(binding);
+ }
+ catch (URISyntaxException e)
+ {
+ throw new JMSAMQException("Illegal value in JMS_ReplyTo property: " + replyToEncoding, e);
+ }
+
+ _destinationCache.put(replyToEncoding, dest);
+ }
+
+ return dest;
+ }
+ }
+
+ public void setJMSReplyTo(Destination destination) throws JMSException
+ {
+ if (destination == null)
+ {
+ throw new IllegalArgumentException("Null destination not allowed");
+ }
+
+ if (!(destination instanceof AMQDestination))
+ {
+ throw new IllegalArgumentException(
+ "ReplyTo destination may only be an AMQDestination - passed argument was type " + destination.getClass());
+ }
+
+ final AMQDestination amqd = (AMQDestination) destination;
+
+ final AMQShortString encodedDestination = amqd.getEncodedName();
+ _destinationCache.put(encodedDestination, destination);
+ getContentHeaderProperties().setReplyTo(encodedDestination);
+ }
+
+ public Destination getJMSDestination() throws JMSException
+ {
+ return _destination;
+ }
+
+ public void setJMSDestination(Destination destination)
+ {
+ _destination = destination;
+ }
+
+ public void setContentType(String contentType)
+ {
+ getContentHeaderProperties().setContentType(contentType);
+ }
+
+ public String getContentType()
+ {
+ return getContentHeaderProperties().getContentTypeAsString();
+ }
+
+ public void setEncoding(String encoding)
+ {
+ getContentHeaderProperties().setEncoding(encoding);
+ }
+
+ public String getEncoding()
+ {
+ return getContentHeaderProperties().getEncodingAsString();
+ }
+
+ public String getReplyToString()
+ {
+ return getContentHeaderProperties().getReplyToAsString();
+ }
+
+ public int getJMSDeliveryMode() throws JMSException
+ {
+ return getContentHeaderProperties().getDeliveryMode();
+ }
+
+ public void setJMSDeliveryMode(int i) throws JMSException
+ {
+ getContentHeaderProperties().setDeliveryMode((byte) i);
+ }
+
+ public BasicContentHeaderProperties getContentHeaderProperties()
+ {
+ return (BasicContentHeaderProperties) _contentHeaderProperties;
+ }
+
+
+ public String getJMSType() throws JMSException
+ {
+ return getContentHeaderProperties().getTypeAsString();
+ }
+
+ public void setJMSType(String string) throws JMSException
+ {
+ getContentHeaderProperties().setType(string);
+ }
+
+ public long getJMSExpiration() throws JMSException
+ {
+ return getContentHeaderProperties().getExpiration();
+ }
+
+ public void setJMSExpiration(long l) throws JMSException
+ {
+ getContentHeaderProperties().setExpiration(l);
+ }
+
+
+
+ public boolean propertyExists(String propertyName) throws JMSException
+ {
+ return getJmsHeaders().propertyExists(propertyName);
+ }
+
+ public boolean getBooleanProperty(String propertyName) throws JMSException
+ {
+ if (STRICT_AMQP_COMPLIANCE)
+ {
+ throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ }
+
+ return getJmsHeaders().getBoolean(propertyName);
+ }
+
+ public byte getByteProperty(String propertyName) throws JMSException
+ {
+ if (STRICT_AMQP_COMPLIANCE)
+ {
+ throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ }
+
+ return getJmsHeaders().getByte(propertyName);
+ }
+
+ public short getShortProperty(String propertyName) throws JMSException
+ {
+ if (STRICT_AMQP_COMPLIANCE)
+ {
+ throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ }
+
+ return getJmsHeaders().getShort(propertyName);
+ }
+
+ public int getIntProperty(String propertyName) throws JMSException
+ {
+ if (STRICT_AMQP_COMPLIANCE)
+ {
+ throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ }
+
+ return getJmsHeaders().getInteger(propertyName);
+ }
+
+ public long getLongProperty(String propertyName) throws JMSException
+ {
+ if (STRICT_AMQP_COMPLIANCE)
+ {
+ throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ }
+
+ return getJmsHeaders().getLong(propertyName);
+ }
+
+ public float getFloatProperty(String propertyName) throws JMSException
+ {
+ if (STRICT_AMQP_COMPLIANCE)
+ {
+ throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ }
+
+ return getJmsHeaders().getFloat(propertyName);
+ }
+
+ public double getDoubleProperty(String propertyName) throws JMSException
+ {
+ if (STRICT_AMQP_COMPLIANCE)
+ {
+ throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ }
+
+ return getJmsHeaders().getDouble(propertyName);
+ }
+
+ public String getStringProperty(String propertyName) throws JMSException
+ {
+ //NOTE: if the JMSX Property is a non AMQP property then we must check _strictAMQP and throw as below.
+ if (propertyName.equals(CustomJMSXProperty.JMSXUserID.toString()))
+ {
+ return ((BasicContentHeaderProperties) _contentHeaderProperties).getUserIdAsString();
+ }
+ else
+ {
+ if (STRICT_AMQP_COMPLIANCE)
+ {
+ throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ }
+
+ return getJmsHeaders().getString(propertyName);
+ }
+ }
+
+ public Object getObjectProperty(String propertyName) throws JMSException
+ {
+ return getJmsHeaders().getObject(propertyName);
+ }
+
+ public Enumeration getPropertyNames() throws JMSException
+ {
+ return getJmsHeaders().getPropertyNames();
+ }
+
+ public void setBooleanProperty(String propertyName, boolean b) throws JMSException
+ {
+ if (STRICT_AMQP_COMPLIANCE)
+ {
+ throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ }
+
+ checkWritableProperties();
+ getJmsHeaders().setBoolean(propertyName, b);
+ }
+
+ public void setByteProperty(String propertyName, byte b) throws JMSException
+ {
+ if (STRICT_AMQP_COMPLIANCE)
+ {
+ throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ }
+
+ checkWritableProperties();
+ getJmsHeaders().setByte(propertyName, new Byte(b));
+ }
+
+ public void setShortProperty(String propertyName, short i) throws JMSException
+ {
+ if (STRICT_AMQP_COMPLIANCE)
+ {
+ throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ }
+
+ checkWritableProperties();
+ getJmsHeaders().setShort(propertyName, new Short(i));
+ }
+
+ public void setIntProperty(String propertyName, int i) throws JMSException
+ {
+ checkWritableProperties();
+ getJmsHeaders().setInteger(propertyName, new Integer(i));
+ }
+
+ public void setLongProperty(String propertyName, long l) throws JMSException
+ {
+ if (STRICT_AMQP_COMPLIANCE)
+ {
+ throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ }
+
+ checkWritableProperties();
+ getJmsHeaders().setLong(propertyName, new Long(l));
+ }
+
+ public void setFloatProperty(String propertyName, float f) throws JMSException
+ {
+ if (STRICT_AMQP_COMPLIANCE)
+ {
+ throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ }
+
+ checkWritableProperties();
+ getJmsHeaders().setFloat(propertyName, new Float(f));
+ }
+
+ public void setDoubleProperty(String propertyName, double v) throws JMSException
+ {
+ if (STRICT_AMQP_COMPLIANCE)
+ {
+ throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ }
+
+ checkWritableProperties();
+ getJmsHeaders().setDouble(propertyName, new Double(v));
+ }
+
+ public void setStringProperty(String propertyName, String value) throws JMSException
+ {
+ checkWritableProperties();
+ getJmsHeaders().setString(propertyName, value);
+ }
+
+ public void setObjectProperty(String propertyName, Object object) throws JMSException
+ {
+ checkWritableProperties();
+ getJmsHeaders().setObject(propertyName, object);
+ }
+
+ public void removeProperty(String propertyName) throws JMSException
+ {
+ getJmsHeaders().remove(propertyName);
+ }
+
+
+ private JMSHeaderAdapter getJmsHeaders()
+ {
+ return _headerAdapter;
+ }
+
+ protected void checkWritableProperties() throws MessageNotWriteableException
+ {
+ if (_readableProperties)
+ {
+ throw new MessageNotWriteableException("You need to call clearProperties() to make the message writable");
+ }
+ _contentHeaderProperties.updated();
+ }
+
+
+ public int getJMSPriority() throws JMSException
+ {
+ return getContentHeaderProperties().getPriority();
+ }
+
+ public void setJMSPriority(int i) throws JMSException
+ {
+ getContentHeaderProperties().setPriority((byte) i);
+ }
+
+ public void clearProperties() throws JMSException
+ {
+ getJmsHeaders().clear();
+
+ _readableProperties = false;
+ }
+
+
+ public void acknowledgeThis() throws JMSException
+ {
+ // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
+ // is not specified. In our case, we only set the session field where client acknowledge mode is specified.
+ if (_session != null)
+ {
+ if (_session.getAMQConnection().isClosed())
+ {
+ throw new javax.jms.IllegalStateException("Connection is already closed");
+ }
+
+ // we set multiple to true here since acknowledgement implies acknowledge of all previous messages
+ // received on the session
+ _session.acknowledgeMessage(_deliveryTag, true);
+ }
+ }
+
+ public void acknowledge() throws JMSException
+ {
+ if (_session != null)
+ {
+ _session.acknowledge();
+ }
+ }
+
+
+ /**
+ * The session is set when CLIENT_ACKNOWLEDGE mode is used so that the CHANNEL ACK can be sent when the user calls
+ * acknowledge()
+ *
+ * @param s the AMQ session that delivered this message
+ */
+ public void setAMQSession(AMQSession s)
+ {
+ _session = s;
+ }
+
+ public AMQSession getAMQSession()
+ {
+ return _session;
+ }
+
+ /**
+ * Get the AMQ message number assigned to this message
+ *
+ * @return the message number
+ */
+ public long getDeliveryTag()
+ {
+ return _deliveryTag;
+ }
+
+
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
index 94be090cf2..c0d51fa726 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
@@ -31,7 +31,6 @@ import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderBody;
/**
* @author Apache Software Foundation
@@ -44,21 +43,21 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage
*/
private static final int DEFAULT_BUFFER_INITIAL_SIZE = 1024;
- AbstractBytesMessage()
+ AbstractBytesMessage(AMQMessageDelegateFactory delegateFactory)
{
- this(null);
+ this(delegateFactory, null);
}
/**
* Construct a bytes message with existing data.
*
+ * @param delegateFactory
* @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is
- * set to auto expand
*/
- AbstractBytesMessage(ByteBuffer data)
+ AbstractBytesMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data)
{
- super(data); // this instanties a content header
- getContentHeaderProperties().setContentType(getMimeTypeAsShortString());
+ super(delegateFactory, data); // this instanties a content header
+ setContentType(getMimeType());
if (_data == null)
{
@@ -72,13 +71,12 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage
_data.setAutoExpand(true);
}
- AbstractBytesMessage(long messageNbr, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
- AMQShortString routingKey, ByteBuffer data) throws AMQException
- {
- // TODO: this casting is ugly. Need to review whole ContentHeaderBody idea
- super(messageNbr, contentHeader, exchange, routingKey, data);
- getContentHeaderProperties().setContentType(getMimeTypeAsShortString());
- }
+ AbstractBytesMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
+ {
+ super(delegate, data);
+ setContentType(getMimeType());
+ }
+
public void clearBodyImpl() throws JMSException
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java
index 9bdde01bf3..f0fc394b0b 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java
@@ -33,7 +33,6 @@ import javax.jms.MessageNotWriteableException;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.BasicContentHeaderProperties;
/**
@@ -70,27 +69,28 @@ public abstract class AbstractBytesTypedMessage extends AbstractBytesMessage
*/
private int _byteArrayRemaining = -1;
- AbstractBytesTypedMessage()
+ AbstractBytesTypedMessage(AMQMessageDelegateFactory delegateFactory)
{
- this(null);
+
+ this(delegateFactory, null);
}
/**
* Construct a stream message with existing data.
*
+ * @param delegateFactory
* @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is
- * set to auto expand
*/
- AbstractBytesTypedMessage(ByteBuffer data)
+ AbstractBytesTypedMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data)
{
- super(data); // this instanties a content header
- }
+ super(delegateFactory, data); // this instanties a content header
+ }
- AbstractBytesTypedMessage(long messageNbr, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
- AMQShortString routingKey, ByteBuffer data) throws AMQException
+ AbstractBytesTypedMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
{
- super(messageNbr, contentHeader, exchange, routingKey, data);
+
+ super(delegate, data);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
index dc3a483538..54da7c4404 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
@@ -21,10 +21,7 @@
package org.apache.qpid.client.message;
import java.io.IOException;
-import java.net.URISyntaxException;
-import java.util.Collections;
import java.util.Enumeration;
-import java.util.Map;
import java.util.UUID;
import javax.jms.Destination;
@@ -32,122 +29,54 @@ import javax.jms.JMSException;
import javax.jms.MessageNotReadableException;
import javax.jms.MessageNotWriteableException;
-import org.apache.commons.collections.map.ReferenceMap;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.client.AMQUndefinedDestination;
-import org.apache.qpid.client.BasicMessageConsumer;
-import org.apache.qpid.client.CustomJMSXProperty;
-import org.apache.qpid.client.JMSAMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.url.AMQBindingURL;
-import org.apache.qpid.url.BindingURL;
-public abstract class AbstractJMSMessage extends AMQMessage implements org.apache.qpid.jms.Message
+public abstract class AbstractJMSMessage implements org.apache.qpid.jms.Message
{
- private static final Map _destinationCache = Collections.synchronizedMap(new ReferenceMap());
- public static final String JMS_TYPE = "x-jms-type";
- protected boolean _redelivered;
protected ByteBuffer _data;
- private boolean _readableProperties = false;
protected boolean _readableMessage = false;
protected boolean _changedData = true;
- private Destination _destination;
- private JMSHeaderAdapter _headerAdapter;
- private static final boolean STRICT_AMQP_COMPLIANCE =
- Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, AMQSession.STRICT_AMQP_DEFAULT));
+
/**
* This is 0_10 specific
*/
private org.apache.qpid.api.Message _010message = null;
+ /** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */
- public void set010Message(org.apache.qpid.api.Message m )
- {
- _010message = m;
- }
- public void dataChanged()
- {
- if (_010message != null)
- {
- _010message.clearData();
- try
- {
- if (_data != null)
- {
- _010message.appendData(_data.buf().slice());
- }
- else
- {
- _010message.appendData(java.nio.ByteBuffer.allocate(0));
- }
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- }
- }
- /**
- * End 010 specific
- */
- public org.apache.qpid.api.Message get010Message()
- {
- return _010message;
- }
+ private AMQMessageDelegate _delegate;
+ private boolean _redelivered;
- protected AbstractJMSMessage(ByteBuffer data)
+ protected AbstractJMSMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data)
{
- super(new BasicContentHeaderProperties());
+ _delegate = delegateFactory.createDelegate();
_data = data;
if (_data != null)
{
_data.acquire();
}
- _readableProperties = false;
+
_readableMessage = (data != null);
_changedData = (data == null);
- _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders());
}
- protected AbstractJMSMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
- AMQShortString routingKey, ByteBuffer data) throws AMQException
+ protected AbstractJMSMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
{
- this(contentHeader, deliveryTag);
-
- Integer type = contentHeader.getHeaders().getInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName());
-
- AMQDestination dest;
- if (AMQDestination.QUEUE_TYPE.equals(type))
- {
- dest = new AMQQueue(exchange, routingKey, routingKey);
- }
- else if (AMQDestination.TOPIC_TYPE.equals(type))
- {
- dest = new AMQTopic(exchange, routingKey, null);
- }
- else
- {
- dest = new AMQUndefinedDestination(exchange, routingKey, null);
- }
- // Destination dest = AMQDestination.createDestination(url);
- setJMSDestination(dest);
+ _delegate = delegate;
_data = data;
if (_data != null)
@@ -159,126 +88,82 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
}
- protected AbstractJMSMessage(BasicContentHeaderProperties contentHeader, long deliveryTag)
+ public String getJMSMessageID() throws JMSException
{
- super(contentHeader, deliveryTag);
- _readableProperties = (_contentHeaderProperties != null);
- _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders());
+ return _delegate.getJMSMessageID();
}
- public String getJMSMessageID() throws JMSException
+ public void setJMSMessageID(String messageId) throws JMSException
{
- return getContentHeaderProperties().getMessageIdAsString();
+ _delegate.setJMSMessageID(messageId);
}
- public void setJMSMessageID(String messageId) throws JMSException
+ public void setJMSMessageID(UUID messageId) throws JMSException
{
- getContentHeaderProperties().setMessageId(messageId);
+ _delegate.setJMSMessageID(messageId);
}
+
public long getJMSTimestamp() throws JMSException
{
- return getContentHeaderProperties().getTimestamp();
+ return _delegate.getJMSTimestamp();
}
public void setJMSTimestamp(long timestamp) throws JMSException
{
- getContentHeaderProperties().setTimestamp(timestamp);
+ _delegate.setJMSTimestamp(timestamp);
}
public byte[] getJMSCorrelationIDAsBytes() throws JMSException
{
- return getContentHeaderProperties().getCorrelationIdAsString().getBytes();
+ return _delegate.getJMSCorrelationIDAsBytes();
}
public void setJMSCorrelationIDAsBytes(byte[] bytes) throws JMSException
{
- getContentHeaderProperties().setCorrelationId(new String(bytes));
+ _delegate.setJMSCorrelationIDAsBytes(bytes);
}
public void setJMSCorrelationID(String correlationId) throws JMSException
{
- getContentHeaderProperties().setCorrelationId(correlationId);
+ _delegate.setJMSCorrelationID(correlationId);
}
public String getJMSCorrelationID() throws JMSException
{
- return getContentHeaderProperties().getCorrelationIdAsString();
+ return _delegate.getJMSCorrelationID();
}
public Destination getJMSReplyTo() throws JMSException
{
- String replyToEncoding = getContentHeaderProperties().getReplyToAsString();
- if (replyToEncoding == null)
- {
- return null;
- }
- else
- {
- Destination dest = (Destination) _destinationCache.get(replyToEncoding);
- if (dest == null)
- {
- try
- {
- BindingURL binding = new AMQBindingURL(replyToEncoding);
- dest = AMQDestination.createDestination(binding);
- }
- catch (URISyntaxException e)
- {
- throw new JMSAMQException("Illegal value in JMS_ReplyTo property: " + replyToEncoding, e);
- }
-
- _destinationCache.put(replyToEncoding, dest);
- }
-
- return dest;
- }
+ return _delegate.getJMSReplyTo();
}
public void setJMSReplyTo(Destination destination) throws JMSException
{
- if (destination == null)
- {
- throw new IllegalArgumentException("Null destination not allowed");
- }
-
- if (!(destination instanceof AMQDestination))
- {
- throw new IllegalArgumentException(
- "ReplyTo destination may only be an AMQDestination - passed argument was type " + destination.getClass());
- }
-
- final AMQDestination amqd = (AMQDestination) destination;
-
- final AMQShortString encodedDestination = amqd.getEncodedName();
- _destinationCache.put(encodedDestination, destination);
- getContentHeaderProperties().setReplyTo(encodedDestination);
+ _delegate.setJMSReplyTo(destination);
}
public Destination getJMSDestination() throws JMSException
{
- return _destination;
+ return _delegate.getJMSDestination();
}
public void setJMSDestination(Destination destination)
{
- _destination = destination;
+ _delegate.setJMSDestination(destination);
}
public int getJMSDeliveryMode() throws JMSException
{
- return getContentHeaderProperties().getDeliveryMode();
+ return _delegate.getJMSDeliveryMode();
}
public void setJMSDeliveryMode(int i) throws JMSException
{
- getContentHeaderProperties().setDeliveryMode((byte) i);
+ _delegate.setJMSDeliveryMode(i);
}
- public BasicContentHeaderProperties getContentHeaderProperties()
- {
- return (BasicContentHeaderProperties) _contentHeaderProperties;
- }
public boolean getJMSRedelivered() throws JMSException
{
@@ -290,318 +175,180 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
_redelivered = b;
}
+
public String getJMSType() throws JMSException
{
- return getContentHeaderProperties().getTypeAsString();
+ return _delegate.getJMSType();
}
public void setJMSType(String string) throws JMSException
{
- getContentHeaderProperties().setType(string);
+ _delegate.setJMSType(string);
}
public long getJMSExpiration() throws JMSException
{
- return getContentHeaderProperties().getExpiration();
+ return _delegate.getJMSExpiration();
}
public void setJMSExpiration(long l) throws JMSException
{
- getContentHeaderProperties().setExpiration(l);
+ _delegate.setJMSExpiration(l);
}
public int getJMSPriority() throws JMSException
{
- return getContentHeaderProperties().getPriority();
+ return _delegate.getJMSPriority();
}
public void setJMSPriority(int i) throws JMSException
{
- getContentHeaderProperties().setPriority((byte) i);
+ _delegate.setJMSPriority(i);
}
- public void clearProperties() throws JMSException
- {
- getJmsHeaders().clear();
-
- _readableProperties = false;
- }
-
- public void clearBody() throws JMSException
- {
- clearBodyImpl();
- _readableMessage = false;
- }
-
- public boolean propertyExists(AMQShortString propertyName) throws JMSException
- {
- return getJmsHeaders().propertyExists(propertyName);
- }
public boolean propertyExists(String propertyName) throws JMSException
{
- return getJmsHeaders().propertyExists(propertyName);
+ return _delegate.propertyExists(propertyName);
}
- public boolean getBooleanProperty(AMQShortString propertyName) throws JMSException
+ public boolean getBooleanProperty(final String s)
+ throws JMSException
{
- if (STRICT_AMQP_COMPLIANCE)
- {
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
- }
-
- return getJmsHeaders().getBoolean(propertyName);
+ return _delegate.getBooleanProperty(s);
}
- public boolean getBooleanProperty(String propertyName) throws JMSException
+ public byte getByteProperty(final String s)
+ throws JMSException
{
- if (STRICT_AMQP_COMPLIANCE)
- {
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
- }
-
- return getJmsHeaders().getBoolean(propertyName);
+ return _delegate.getByteProperty(s);
}
- public byte getByteProperty(String propertyName) throws JMSException
+ public short getShortProperty(final String s)
+ throws JMSException
{
- if (STRICT_AMQP_COMPLIANCE)
- {
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
- }
-
- return getJmsHeaders().getByte(propertyName);
+ return _delegate.getShortProperty(s);
}
- public byte[] getBytesProperty(AMQShortString propertyName) throws JMSException
+ public int getIntProperty(final String s)
+ throws JMSException
{
- if (STRICT_AMQP_COMPLIANCE)
- {
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
- }
-
- return getJmsHeaders().getBytes(propertyName);
+ return _delegate.getIntProperty(s);
}
- public short getShortProperty(String propertyName) throws JMSException
+ public long getLongProperty(final String s)
+ throws JMSException
{
- if (STRICT_AMQP_COMPLIANCE)
- {
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
- }
-
- return getJmsHeaders().getShort(propertyName);
- }
-
- public int getIntProperty(String propertyName) throws JMSException
- {
- if (STRICT_AMQP_COMPLIANCE)
- {
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
- }
-
- return getJmsHeaders().getInteger(propertyName);
+ return _delegate.getLongProperty(s);
}
- public long getLongProperty(String propertyName) throws JMSException
+ public float getFloatProperty(final String s)
+ throws JMSException
{
- if (STRICT_AMQP_COMPLIANCE)
- {
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
- }
-
- return getJmsHeaders().getLong(propertyName);
+ return _delegate.getFloatProperty(s);
}
- public float getFloatProperty(String propertyName) throws JMSException
+ public double getDoubleProperty(final String s)
+ throws JMSException
{
- if (STRICT_AMQP_COMPLIANCE)
- {
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
- }
-
- return getJmsHeaders().getFloat(propertyName);
+ return _delegate.getDoubleProperty(s);
}
- public double getDoubleProperty(String propertyName) throws JMSException
+ public String getStringProperty(final String s)
+ throws JMSException
{
- if (STRICT_AMQP_COMPLIANCE)
- {
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
- }
-
- return getJmsHeaders().getDouble(propertyName);
+ return _delegate.getStringProperty(s);
}
- public String getStringProperty(String propertyName) throws JMSException
+ public Object getObjectProperty(final String s)
+ throws JMSException
{
- //NOTE: if the JMSX Property is a non AMQP property then we must check _strictAMQP and throw as below.
- if (propertyName.equals(CustomJMSXProperty.JMSXUserID.toString()))
- {
- return ((BasicContentHeaderProperties) _contentHeaderProperties).getUserIdAsString();
- }
- else
- {
- if (STRICT_AMQP_COMPLIANCE)
- {
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
- }
-
- return getJmsHeaders().getString(propertyName);
- }
+ return _delegate.getObjectProperty(s);
}
- public Object getObjectProperty(String propertyName) throws JMSException
+ public Enumeration getPropertyNames()
+ throws JMSException
{
- return getJmsHeaders().getObject(propertyName);
+ return _delegate.getPropertyNames();
}
- public Enumeration getPropertyNames() throws JMSException
+ public void setBooleanProperty(final String s, final boolean b)
+ throws JMSException
{
- return getJmsHeaders().getPropertyNames();
+ _delegate.setBooleanProperty(s, b);
}
- public void setBooleanProperty(AMQShortString propertyName, boolean b) throws JMSException
+ public void setByteProperty(final String s, final byte b)
+ throws JMSException
{
- if (STRICT_AMQP_COMPLIANCE)
- {
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
- }
-
- checkWritableProperties();
- getJmsHeaders().setBoolean(propertyName, b);
+ _delegate.setByteProperty(s, b);
}
- public void setBooleanProperty(String propertyName, boolean b) throws JMSException
+ public void setShortProperty(final String s, final short i)
+ throws JMSException
{
- if (STRICT_AMQP_COMPLIANCE)
- {
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
- }
-
- checkWritableProperties();
- getJmsHeaders().setBoolean(propertyName, b);
+ _delegate.setShortProperty(s, i);
}
- public void setByteProperty(String propertyName, byte b) throws JMSException
+ public void setIntProperty(final String s, final int i)
+ throws JMSException
{
- if (STRICT_AMQP_COMPLIANCE)
- {
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
- }
-
- checkWritableProperties();
- getJmsHeaders().setByte(propertyName, new Byte(b));
+ _delegate.setIntProperty(s, i);
}
- public void setBytesProperty(AMQShortString propertyName, byte[] bytes) throws JMSException
+ public void setLongProperty(final String s, final long l)
+ throws JMSException
{
- if (STRICT_AMQP_COMPLIANCE)
- {
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
- }
-
- checkWritableProperties();
- getJmsHeaders().setBytes(propertyName, bytes);
+ _delegate.setLongProperty(s, l);
}
- public void setShortProperty(String propertyName, short i) throws JMSException
+ public void setFloatProperty(final String s, final float v)
+ throws JMSException
{
- if (STRICT_AMQP_COMPLIANCE)
- {
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
- }
-
- checkWritableProperties();
- getJmsHeaders().setShort(propertyName, new Short(i));
+ _delegate.setFloatProperty(s, v);
}
- public void setIntProperty(String propertyName, int i) throws JMSException
+ public void setDoubleProperty(final String s, final double v)
+ throws JMSException
{
- checkWritableProperties();
- JMSHeaderAdapter.checkPropertyName(propertyName);
- super.setIntProperty(new AMQShortString(propertyName), new Integer(i));
+ _delegate.setDoubleProperty(s, v);
}
- public void setLongProperty(String propertyName, long l) throws JMSException
+ public void setStringProperty(final String s, final String s1)
+ throws JMSException
{
- if (STRICT_AMQP_COMPLIANCE)
- {
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
- }
-
- checkWritableProperties();
- getJmsHeaders().setLong(propertyName, new Long(l));
+ _delegate.setStringProperty(s, s1);
}
- public void setFloatProperty(String propertyName, float f) throws JMSException
+ public void setObjectProperty(final String s, final Object o)
+ throws JMSException
{
- if (STRICT_AMQP_COMPLIANCE)
- {
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
- }
-
- checkWritableProperties();
- getJmsHeaders().setFloat(propertyName, new Float(f));
+ _delegate.setObjectProperty(s, o);
}
- public void setDoubleProperty(String propertyName, double v) throws JMSException
- {
- if (STRICT_AMQP_COMPLIANCE)
- {
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
- }
- checkWritableProperties();
- getJmsHeaders().setDouble(propertyName, new Double(v));
- }
- public void setStringProperty(String propertyName, String value) throws JMSException
+ public void clearProperties() throws JMSException
{
- checkWritableProperties();
- JMSHeaderAdapter.checkPropertyName(propertyName);
- super.setLongStringProperty(new AMQShortString(propertyName), value);
+ _delegate.clearProperties();
}
- public void setObjectProperty(String propertyName, Object object) throws JMSException
+ public void clearBody() throws JMSException
{
- checkWritableProperties();
- getJmsHeaders().setObject(propertyName, object);
- }
+ clearBodyImpl();
+ _readableMessage = false;
- protected void removeProperty(AMQShortString propertyName) throws JMSException
- {
- getJmsHeaders().remove(propertyName);
}
- protected void removeProperty(String propertyName) throws JMSException
- {
- getJmsHeaders().remove(propertyName);
- }
public void acknowledgeThis() throws JMSException
{
- // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
- // is not specified. In our case, we only set the session field where client acknowledge mode is specified.
- if (_session != null)
- {
- if (_session.getAMQConnection().isClosed())
- {
- throw new javax.jms.IllegalStateException("Connection is already closed");
- }
-
- // we set multiple to true here since acknowledgement implies acknowledge of all previous messages
- // received on the session
- _session.acknowledgeMessage(_deliveryTag, true);
- }
+ _delegate.acknowledgeThis();
}
public void acknowledge() throws JMSException
{
- if (_session != null)
- {
- _session.acknowledge();
- }
+ _delegate.acknowledge();
}
/**
@@ -617,12 +364,9 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
*/
public abstract String toBodyString() throws JMSException;
- public String getMimeType()
- {
- return getMimeTypeAsShortString().toString();
- }
+ protected abstract String getMimeType();
+
- public abstract AMQShortString getMimeTypeAsShortString();
public String toString()
{
@@ -640,16 +384,23 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
buf.append("\nJMS Destination: ").append(getJMSDestination());
buf.append("\nJMS Type: ").append(getJMSType());
buf.append("\nJMS MessageID: ").append(getJMSMessageID());
- buf.append("\nAMQ message number: ").append(_deliveryTag);
+ buf.append("\nAMQ message number: ").append(getDeliveryTag());
buf.append("\nProperties:");
- if (getJmsHeaders().isEmpty())
+ final Enumeration propertyNames = getPropertyNames();
+ if (!propertyNames.hasMoreElements())
{
buf.append("<NONE>");
}
else
{
- buf.append('\n').append(getJmsHeaders().getHeaders());
+ buf.append('\n');
+ while(propertyNames.hasMoreElements())
+ {
+ String propertyName = (String) propertyNames.nextElement();
+ buf.append(propertyName).append(":\t").append(getObjectProperty(propertyName));
+ }
+
}
return buf.toString();
@@ -660,14 +411,10 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
}
}
- public void setUnderlyingMessagePropertiesMap(FieldTable messageProperties)
- {
- getContentHeaderProperties().setHeaders(messageProperties);
- }
- public JMSHeaderAdapter getJmsHeaders()
+ public AMQMessageDelegate getDelegate()
{
- return _headerAdapter;
+ return _delegate;
}
public ByteBuffer getData()
@@ -698,25 +445,6 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
}
}
- protected void checkWritableProperties() throws MessageNotWriteableException
- {
- if (_readableProperties)
- {
- throw new MessageNotWriteableException("You need to call clearProperties() to make the message writable");
- }
- _contentHeaderProperties.updated();
- }
-
- public boolean isReadable()
- {
- return _readableMessage;
- }
-
- public boolean isWritable()
- {
- return !_readableMessage;
- }
-
public void reset()
{
if (!_changedData)
@@ -731,6 +459,47 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
}
}
+ public void set010Message(org.apache.qpid.api.Message m )
+ {
+ _010message = m;
+ }
+
+ public void dataChanged()
+ {
+ if (_010message != null)
+ {
+ _010message.clearData();
+ try
+ {
+ if (_data != null)
+ {
+ _010message.appendData(_data.buf().slice());
+ }
+ else
+ {
+ _010message.appendData(java.nio.ByteBuffer.allocate(0));
+ }
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ /**
+ * End 010 specific
+ */
+
+ public org.apache.qpid.api.Message get010Message()
+ {
+ return _010message;
+ }
+
+
+
+
+
public int getContentLength()
{
if(_data != null)
@@ -748,4 +517,66 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
_changedData = false;
}
+ /**
+ * The session is set when CLIENT_ACKNOWLEDGE mode is used so that the CHANNEL ACK can be sent when the user calls
+ * acknowledge()
+ *
+ * @param s the AMQ session that delivered this message
+ */
+ public void setAMQSession(AMQSession s)
+ {
+ _delegate.setAMQSession(s);
+ }
+
+ public AMQSession getAMQSession()
+ {
+ return _delegate.getAMQSession();
+ }
+
+ /**
+ * Get the AMQ message number assigned to this message
+ *
+ * @return the message number
+ */
+ public long getDeliveryTag()
+ {
+ return _delegate.getDeliveryTag();
+ }
+
+ /** Invoked prior to sending the message. Allows the message to be modified if necessary before sending. */
+ public void prepareForSending() throws JMSException
+ {
+ }
+
+
+ public void setContentType(String contentType)
+ {
+ _delegate.setContentType(contentType);
+ }
+
+ public String getContentType()
+ {
+ return _delegate.getContentType();
+ }
+
+ public void setEncoding(String encoding)
+ {
+ _delegate.setEncoding(encoding);
+ }
+
+ public String getEncoding()
+ {
+ return _delegate.getEncoding();
+ }
+
+ public String getReplyToString()
+ {
+ return _delegate.getReplyToString();
+ }
+
+ protected void removeProperty(final String propertyName) throws JMSException
+ {
+ _delegate.removeProperty(propertyName);
+ }
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
index e14c343613..5e16c765af 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
@@ -38,17 +38,11 @@ import javax.jms.JMSException;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
-import java.util.UUID;
public abstract class AbstractJMSMessageFactory implements MessageFactory
{
private static final Logger _logger = LoggerFactory.getLogger(AbstractJMSMessageFactory.class);
- protected abstract AbstractJMSMessage createMessage(long messageNbr, ByteBuffer data, AMQShortString exchange,
- AMQShortString routingKey,
- BasicContentHeaderProperties contentHeader) throws AMQException;
-
protected AbstractJMSMessage create08MessageWithBody(long messageNbr, ContentHeaderBody contentHeader,
AMQShortString exchange, AMQShortString routingKey,
List bodies) throws AMQException
@@ -105,13 +99,18 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory
.remaining());
}
- return createMessage(messageNbr, data, exchange, routingKey,
- (BasicContentHeaderProperties) contentHeader.properties);
+ AMQMessageDelegate delegate = new AMQMessageDelegate_0_8(messageNbr,
+ (BasicContentHeaderProperties) contentHeader.properties,
+ exchange, routingKey);
+
+ return createMessage(delegate, data);
}
+ protected abstract AbstractJMSMessage createMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException;
+
+
protected AbstractJMSMessage create010MessageWithBody(long messageNbr, Struct[] contentHeader,
- AMQShortString exchange, AMQShortString routingKey,
- List bodies, String replyToURL) throws AMQException
+ List bodies) throws AMQException
{
ByteBuffer data;
final boolean debug = _logger.isDebugEnabled();
@@ -131,40 +130,13 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory
_logger.debug("Creating message from buffer with position=" + data.position() + " and remaining=" + data
.remaining());
}
- BasicContentHeaderProperties props = new BasicContentHeaderProperties();
// set the properties of this message
MessageProperties mprop = (MessageProperties) contentHeader[0];
DeliveryProperties devprop = (DeliveryProperties) contentHeader[1];
- props.setContentType(mprop.getContentType());
- props.setCorrelationId(asString(mprop.getCorrelationId()));
- String encoding = mprop.getContentEncoding();
- if (encoding != null && !encoding.equals(""))
- {
- props.setEncoding(encoding);
- }
- if (devprop.hasDeliveryMode())
- {
- props.setDeliveryMode((byte) devprop.getDeliveryMode().getValue());
- }
- props.setExpiration(devprop.getExpiration());
- UUID mid = mprop.getMessageId();
- props.setMessageId(mid == null ? null : "ID:" + mid.toString());
- if (devprop.hasPriority())
- {
- props.setPriority((byte) devprop.getPriority().getValue());
- }
- props.setReplyTo(replyToURL);
- props.setTimestamp(devprop.getTimestamp());
- String type = null;
- Map<String,Object> map = mprop.getApplicationHeaders();
- if (map != null)
- {
- type = (String) map.get(AbstractJMSMessage.JMS_TYPE);
- }
- props.setType(type);
- props.setUserId(asString(mprop.getUserId()));
- props.setHeaders(FiledTableSupport.convertToFieldTable(mprop.getApplicationHeaders()));
- AbstractJMSMessage message = createMessage(messageNbr, data, exchange, routingKey, props);
+
+ AMQMessageDelegate delegate = new AMQMessageDelegate_0_10(mprop, devprop, messageNbr);
+
+ AbstractJMSMessage message = createMessage(delegate, data);
return message;
}
@@ -192,12 +164,11 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory
}
public AbstractJMSMessage createMessage(long messageNbr, boolean redelivered, Struct[] contentHeader,
- AMQShortString exchange, AMQShortString routingKey, List bodies,
- String replyToURL)
+ List bodies)
throws JMSException, AMQException
{
final AbstractJMSMessage msg =
- create010MessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies, replyToURL);
+ create010MessageWithBody(messageNbr, contentHeader, bodies);
msg.setJMSRedelivered(redelivered);
msg.receivedFromServer();
return msg;
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
index 4f5641bcff..cd9d7ccf8b 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
@@ -33,46 +33,47 @@ import javax.jms.MessageFormatException;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.BasicContentHeaderProperties;
public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessage
{
public static final String MIME_TYPE = "application/octet-stream";
- private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE);
- public JMSBytesMessage()
+
+ public JMSBytesMessage(AMQMessageDelegateFactory delegateFactory)
{
- this(null);
+ this(delegateFactory,null);
+
}
/**
* Construct a bytes message with existing data.
*
+ * @param delegateFactory
* @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is
- * set to auto expand
*/
- JMSBytesMessage(ByteBuffer data)
+ JMSBytesMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data)
{
- super(data); // this instanties a content header
+
+ super(delegateFactory, data); // this instanties a content header
}
- JMSBytesMessage(long messageNbr, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
- AMQShortString routingKey, ByteBuffer data) throws AMQException
+ JMSBytesMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
{
- super(messageNbr, contentHeader, exchange, routingKey, data);
+ super(delegate, data);
}
+
public void reset()
{
super.reset();
_readableMessage = true;
}
- public AMQShortString getMimeTypeAsShortString()
+ protected String getMimeType()
{
- return MIME_TYPE_SHORT_STRING;
+ return MIME_TYPE;
}
public long getBodyLength() throws JMSException
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java
index 0202dc29df..cb04ebee1b 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java
@@ -25,21 +25,18 @@ import javax.jms.JMSException;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.BasicContentHeaderProperties;
public class JMSBytesMessageFactory extends AbstractJMSMessageFactory
{
- protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data,
- AMQShortString exchange, AMQShortString routingKey,
- BasicContentHeaderProperties contentHeader) throws AMQException
+ protected AbstractJMSMessage createMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
{
- return new JMSBytesMessage(deliveryTag, contentHeader, exchange, routingKey, data);
+ return new JMSBytesMessage(delegate, data);
}
- public AbstractJMSMessage createMessage() throws JMSException
+ public AbstractJMSMessage createMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException
{
- return new JMSBytesMessage();
+ return new JMSBytesMessage(delegateFactory);
}
// 0_10 specific
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java
index 8ec7437fa1..6215652c80 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java
@@ -467,7 +467,7 @@ public final class JMSHeaderAdapter
return getPropertyNames();
}
- protected static void checkPropertyName(CharSequence propertyName)
+ protected void checkPropertyName(CharSequence propertyName)
{
if (propertyName == null)
{
@@ -481,7 +481,7 @@ public final class JMSHeaderAdapter
checkIdentiferFormat(propertyName);
}
- protected static void checkIdentiferFormat(CharSequence propertyName)
+ protected void checkIdentiferFormat(CharSequence propertyName)
{
// JMS requirements 3.5.1 Property Names
// Identifiers:
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
index fed1f1c609..b6e013ac8f 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
@@ -24,7 +24,6 @@ import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.slf4j.Logger;
@@ -44,18 +43,19 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm
private static final Logger _logger = LoggerFactory.getLogger(JMSMapMessage.class);
public static final String MIME_TYPE = "jms/map-message";
- private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE);
+
private Map<String, Object> _map = new HashMap<String, Object>();
- public JMSMapMessage() throws JMSException
+ public JMSMapMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException
{
- this(null);
+ this(delegateFactory, null);
}
- JMSMapMessage(ByteBuffer data) throws JMSException
+ JMSMapMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data) throws JMSException
{
- super(data); // this instantiates a content header
+
+ super(delegateFactory, data); // this instantiates a content header
if(data != null)
{
populateMapFromData();
@@ -63,10 +63,10 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm
}
- JMSMapMessage(long messageNbr, BasicContentHeaderProperties contentHeader, AMQShortString exchange, AMQShortString routingKey,
- ByteBuffer data) throws AMQException
+ JMSMapMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
{
- super(messageNbr, contentHeader, exchange, routingKey, data);
+
+ super(delegate, data);
try
{
populateMapFromData();
@@ -79,14 +79,15 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm
}
+
public String toBodyString() throws JMSException
{
return _map == null ? "" : _map.toString();
}
- public AMQShortString getMimeTypeAsShortString()
+ protected String getMimeType()
{
- return MIME_TYPE_SHORT_STRING;
+ return MIME_TYPE;
}
public ByteBuffer getData()
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java
index 7cb8b637e6..eccb90560b 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java
@@ -25,21 +25,18 @@ import javax.jms.JMSException;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.BasicContentHeaderProperties;
public class JMSMapMessageFactory extends AbstractJMSMessageFactory
{
- public AbstractJMSMessage createMessage() throws JMSException
+ public AbstractJMSMessage createMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException
{
- return new JMSMapMessage();
+ return new JMSMapMessage(delegateFactory);
}
- protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data,
- AMQShortString exchange, AMQShortString routingKey,
- BasicContentHeaderProperties contentHeader) throws AMQException
+ protected AbstractJMSMessage createMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
{
- return new JMSMapMessage(deliveryTag, contentHeader, exchange, routingKey, data);
+ return new JMSMapMessage(delegate, data);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
index 385eee47c9..0c431d42e6 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
@@ -37,52 +37,54 @@ import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderBody;
public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessage
{
public static final String MIME_TYPE = "application/java-object-stream";
- private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE);
+
private static final int DEFAULT_BUFFER_SIZE = 1024;
/**
* Creates empty, writable message for use by producers
+ * @param delegateFactory
*/
- public JMSObjectMessage()
+ public JMSObjectMessage(AMQMessageDelegateFactory delegateFactory)
{
- this(null);
+ this(delegateFactory, null);
}
- private JMSObjectMessage(ByteBuffer data)
+ private JMSObjectMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data)
{
- super(data);
+ super(delegateFactory, data);
if (data == null)
{
_data = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
_data.setAutoExpand(true);
}
- getContentHeaderProperties().setContentType(MIME_TYPE_SHORT_STRING);
+ setContentType(getMimeType());
}
/**
* Creates read only message for delivery to consumers
*/
- JMSObjectMessage(long messageNbr, BasicContentHeaderProperties contentHeader, AMQShortString exchange, AMQShortString routingKey,
- ByteBuffer data) throws AMQException
- {
- super(messageNbr, contentHeader, exchange, routingKey, data);
- }
+
+ JMSObjectMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
+ {
+ super(delegate, data);
+ }
+
public void clearBodyImpl() throws JMSException
{
if (_data != null)
{
_data.release();
+ _data = null;
}
- _data = null;
+
}
@@ -91,9 +93,9 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag
return toString(_data);
}
- public AMQShortString getMimeTypeAsShortString()
+ public String getMimeType()
{
- return MIME_TYPE_SHORT_STRING;
+ return MIME_TYPE;
}
public void setObject(Serializable serializable) throws JMSException
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java
index e7369dcb26..03851dfa01 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java
@@ -25,20 +25,17 @@ import javax.jms.JMSException;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.BasicContentHeaderProperties;
public class JMSObjectMessageFactory extends AbstractJMSMessageFactory
{
- protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data,
- AMQShortString exchange, AMQShortString routingKey,
- BasicContentHeaderProperties contentHeader) throws AMQException
+ protected AbstractJMSMessage createMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
{
- return new JMSObjectMessage(deliveryTag, contentHeader, exchange, routingKey, data);
+ return new JMSObjectMessage(delegate, data);
}
- public AbstractJMSMessage createMessage() throws JMSException
+ public AbstractJMSMessage createMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException
{
- return new JMSObjectMessage();
+ return new JMSObjectMessage(delegateFactory);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
index 62f3150ed1..ad2620852b 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
@@ -26,7 +26,6 @@ import javax.jms.StreamMessage;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.BasicContentHeaderProperties;
/**
@@ -35,7 +34,7 @@ import org.apache.qpid.framing.BasicContentHeaderProperties;
public class JMSStreamMessage extends AbstractBytesTypedMessage implements StreamMessage
{
public static final String MIME_TYPE="jms/stream-message";
- private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE);
+
/**
@@ -44,38 +43,40 @@ public class JMSStreamMessage extends AbstractBytesTypedMessage implements Strea
*/
private int _byteArrayRemaining = -1;
- public JMSStreamMessage()
+ public JMSStreamMessage(AMQMessageDelegateFactory delegateFactory)
{
- this(null);
+ this(delegateFactory,null);
+
}
/**
* Construct a stream message with existing data.
*
+ * @param delegateFactory
* @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is
- * set to auto expand
*/
- JMSStreamMessage(ByteBuffer data)
+ JMSStreamMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data)
{
- super(data); // this instanties a content header
- }
+ super(delegateFactory, data); // this instanties a content header
+ }
- JMSStreamMessage(long messageNbr, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
- AMQShortString routingKey, ByteBuffer data) throws AMQException
+ JMSStreamMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
{
- super(messageNbr, contentHeader, exchange, routingKey, data);
+
+ super(delegate, data);
}
+
public void reset()
{
super.reset();
_readableMessage = true;
}
- public AMQShortString getMimeTypeAsShortString()
+ protected String getMimeType()
{
- return MIME_TYPE_SHORT_STRING;
+ return MIME_TYPE;
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java
index 4bb648e090..5e25db9ae0 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java
@@ -25,19 +25,16 @@ import javax.jms.JMSException;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.BasicContentHeaderProperties;
public class JMSStreamMessageFactory extends AbstractJMSMessageFactory
{
- protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data,
- AMQShortString exchange, AMQShortString routingKey,
- BasicContentHeaderProperties contentHeader) throws AMQException
+ protected AbstractJMSMessage createMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
{
- return new JMSStreamMessage(deliveryTag, contentHeader, exchange, routingKey, data);
+ return new JMSStreamMessage(delegate, data);
}
- public AbstractJMSMessage createMessage() throws JMSException
+ public AbstractJMSMessage createMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException
{
- return new JMSStreamMessage();
+ return new JMSStreamMessage(delegateFactory);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
index b29e39a52e..470439d85c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
@@ -36,46 +36,44 @@ import org.apache.qpid.util.Strings;
public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.TextMessage
{
private static final String MIME_TYPE = "text/plain";
- private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE);
-
private String _decodedValue;
/**
* This constant represents the name of a property that is set when the message payload is null.
*/
- private static final AMQShortString PAYLOAD_NULL_PROPERTY = CustomJMSXProperty.JMS_AMQP_NULL.getShortStringName();
+ private static final String PAYLOAD_NULL_PROPERTY = CustomJMSXProperty.JMS_AMQP_NULL.toString();
private static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8");
- public JMSTextMessage() throws JMSException
+ public JMSTextMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException
{
- this(null, null);
+ this(delegateFactory, null, null);
}
- JMSTextMessage(ByteBuffer data, String encoding) throws JMSException
+ JMSTextMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data, String encoding) throws JMSException
{
- super(data); // this instantiates a content header
- getContentHeaderProperties().setContentType(MIME_TYPE_SHORT_STRING);
- getContentHeaderProperties().setEncoding(encoding);
+ super(delegateFactory, data); // this instantiates a content header
+ setContentType(getMimeType());
+ setEncoding(encoding);
}
- JMSTextMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
- AMQShortString routingKey, ByteBuffer data)
+ JMSTextMessage(AMQMessageDelegate delegate, ByteBuffer data)
throws AMQException
{
- super(deliveryTag, contentHeader, exchange, routingKey, data);
- contentHeader.setContentType(MIME_TYPE_SHORT_STRING);
+ super(delegate, data);
+ setContentType(getMimeType());
_data = data;
}
- JMSTextMessage(ByteBuffer data) throws JMSException
+
+ JMSTextMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data) throws JMSException
{
- this(data, null);
+ this(delegateFactory, data, null);
}
- JMSTextMessage(String text) throws JMSException
+ JMSTextMessage(AMQMessageDelegateFactory delegateFactory, String text) throws JMSException
{
- super((ByteBuffer) null);
+ super(delegateFactory, (ByteBuffer) null);
setText(text);
}
@@ -84,8 +82,9 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text
if (_data != null)
{
_data.release();
+ _data = null;
}
- _data = null;
+
_decodedValue = null;
}
@@ -94,14 +93,9 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text
return getText();
}
- public void setData(ByteBuffer data)
+ protected String getMimeType()
{
- _data = data;
- }
-
- public AMQShortString getMimeTypeAsShortString()
- {
- return MIME_TYPE_SHORT_STRING;
+ return MIME_TYPE;
}
public void setText(String text) throws JMSException
@@ -113,7 +107,7 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text
{
if (text != null)
{
- final String encoding = getContentHeaderProperties().getEncodingAsString();
+ final String encoding = getEncoding();
if (encoding == null || encoding.equalsIgnoreCase("UTF-8"))
{
_data = ByteBuffer.wrap(Strings.toUTF8(text));
@@ -154,11 +148,11 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text
{
return null;
}
- if (getContentHeaderProperties().getEncodingAsString() != null)
+ if (getEncoding() != null)
{
try
{
- _decodedValue = _data.getString(Charset.forName(getContentHeaderProperties().getEncodingAsString()).newDecoder());
+ _decodedValue = _data.getString(Charset.forName(getEncoding()).newDecoder());
}
catch (CharacterCodingException e)
{
@@ -197,4 +191,6 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text
removeProperty(PAYLOAD_NULL_PROPERTY);
}
}
+
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java
index c578c15a6a..1f4d64c78f 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java
@@ -26,21 +26,17 @@ import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderBody;
public class JMSTextMessageFactory extends AbstractJMSMessageFactory
{
- public AbstractJMSMessage createMessage() throws JMSException
+ public AbstractJMSMessage createMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException
{
- return new JMSTextMessage();
+ return new JMSTextMessage(delegateFactory);
}
- protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data,
- AMQShortString exchange, AMQShortString routingKey,
- BasicContentHeaderProperties contentHeader) throws AMQException
+ protected AbstractJMSMessage createMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
{
- return new JMSTextMessage(deliveryTag, contentHeader,
- exchange, routingKey, data);
+ return new JMSTextMessage(delegate, data);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java b/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java
index f6b11c6f6c..e606ef11c9 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java
@@ -22,15 +22,9 @@ package org.apache.qpid.client.message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.client.AMQSession;
-import javax.jms.BytesMessage;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.MessageEOFException;
-import javax.jms.ObjectMessage;
-import javax.jms.StreamMessage;
-import javax.jms.TextMessage;
+import javax.jms.*;
import java.util.Enumeration;
@@ -52,12 +46,11 @@ public class MessageConverter
_newMessage = message;
}
- public MessageConverter(BytesMessage message) throws JMSException
+ public MessageConverter(AMQSession session, BytesMessage bytesMessage) throws JMSException
{
- BytesMessage bytesMessage = (BytesMessage) message;
bytesMessage.reset();
- JMSBytesMessage nativeMsg = new JMSBytesMessage();
+ JMSBytesMessage nativeMsg = (JMSBytesMessage) session.createBytesMessage();
byte[] buf = new byte[1024];
@@ -69,12 +62,12 @@ public class MessageConverter
}
_newMessage = nativeMsg;
- setMessageProperties(message);
+ setMessageProperties(bytesMessage);
}
- public MessageConverter(MapMessage message) throws JMSException
+ public MessageConverter(AMQSession session, MapMessage message) throws JMSException
{
- MapMessage nativeMessage = new JMSMapMessage();
+ MapMessage nativeMessage = session.createMapMessage();
Enumeration mapNames = message.getMapNames();
while (mapNames.hasMoreElements())
@@ -87,21 +80,21 @@ public class MessageConverter
setMessageProperties(message);
}
- public MessageConverter(ObjectMessage message) throws JMSException
+ public MessageConverter(AMQSession session, ObjectMessage origMessage) throws JMSException
{
- ObjectMessage origMessage = (ObjectMessage) message;
- ObjectMessage nativeMessage = new JMSObjectMessage();
+
+ ObjectMessage nativeMessage = session.createObjectMessage();
nativeMessage.setObject(origMessage.getObject());
_newMessage = (AbstractJMSMessage) nativeMessage;
- setMessageProperties(message);
+ setMessageProperties(origMessage);
}
- public MessageConverter(TextMessage message) throws JMSException
+ public MessageConverter(AMQSession session, TextMessage message) throws JMSException
{
- TextMessage nativeMessage = new JMSTextMessage();
+ TextMessage nativeMessage = session.createTextMessage();
nativeMessage.setText(message.getText());
@@ -109,9 +102,9 @@ public class MessageConverter
setMessageProperties(message);
}
- public MessageConverter(StreamMessage message) throws JMSException
+ public MessageConverter(AMQSession session, StreamMessage message) throws JMSException
{
- StreamMessage nativeMessage = new JMSStreamMessage();
+ StreamMessage nativeMessage = session.createStreamMessage();
try
{
@@ -130,11 +123,11 @@ public class MessageConverter
setMessageProperties(message);
}
- public MessageConverter(Message message) throws JMSException
+ public MessageConverter(AMQSession session, Message message) throws JMSException
{
// Send a message with just properties.
// Throwing away content
- BytesMessage nativeMessage = new JMSBytesMessage();
+ Message nativeMessage = session.createMessage();
_newMessage = (AbstractJMSMessage) nativeMessage;
setMessageProperties(message);
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
index 6c9acdef01..424dccc0c3 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
@@ -39,10 +39,9 @@ public interface MessageFactory
throws JMSException, AMQException;
AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered,
- Struct[] contentHeader,
- AMQShortString exchange, AMQShortString routingKey,
- List bodies, String replyToURL)
+ Struct[] contentHeader,
+ List bodies)
throws JMSException, AMQException;
- AbstractJMSMessage createMessage() throws JMSException;
+ AbstractJMSMessage createMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException;
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
index 6117b18fde..213d3ebc9e 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
@@ -31,7 +31,6 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.transport.Struct;
-import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.MessageProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -92,8 +91,7 @@ public class MessageFactoryRegistry
* @param deliveryTag the AMQ message id
* @param redelivered true if redelivered
* @param contentHeader the content header that was received
- * @param bodies a list of ContentBody instances
- * @return the message.
+ * @param bodies a list of ContentBody instances @return the message.
* @throws AMQException
* @throws JMSException
*/
@@ -120,9 +118,8 @@ public class MessageFactoryRegistry
}
}
- public AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered, AMQShortString exchange,
- AMQShortString routingKey, Struct[] contentHeader, List bodies,
- String replyTo) throws AMQException, JMSException
+ public AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered,
+ Struct[] contentHeader, List bodies) throws AMQException, JMSException
{
MessageProperties mprop = (MessageProperties) contentHeader[0];
String messageType = mprop.getContentType();
@@ -138,12 +135,12 @@ public class MessageFactoryRegistry
}
else
{
- return mf.createMessage(deliveryTag, redelivered, contentHeader, exchange, routingKey, bodies, replyTo);
+ return mf.createMessage(deliveryTag, redelivered, contentHeader, bodies);
}
}
- public AbstractJMSMessage createMessage(String mimeType) throws AMQException, JMSException
+ public AbstractJMSMessage createMessage(AMQMessageDelegateFactory delegateFactory, String mimeType) throws AMQException, JMSException
{
if (mimeType == null)
{
@@ -157,7 +154,7 @@ public class MessageFactoryRegistry
}
else
{
- return mf.createMessage();
+ return mf.createMessage(delegateFactory);
}
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/filter/PropertyExpression.java b/java/client/src/main/java/org/apache/qpid/filter/PropertyExpression.java
index 700c6a15ac..2c05f5ce0f 100644
--- a/java/client/src/main/java/org/apache/qpid/filter/PropertyExpression.java
+++ b/java/client/src/main/java/org/apache/qpid/filter/PropertyExpression.java
@@ -17,10 +17,9 @@
*/
package org.apache.qpid.filter;
-import org.apache.qpid.framing.CommonContentHeaderProperties;
-import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.QpidException;
+import org.apache.qpid.ErrorCode;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;
@@ -54,20 +53,7 @@ public class PropertyExpression implements Expression
{
public Object evaluate(AbstractJMSMessage message)
{
- try
- {
- CommonContentHeaderProperties _properties =
- message.getContentHeaderProperties();
- AMQShortString replyTo = _properties.getReplyTo();
-
- return (replyTo == null) ? null : replyTo.toString();
- }
- catch (Exception e)
- {
- _logger.warn("Error evaluating property", e);
-
- return null;
- }
+ return message.getReplyToString();
}
});
@@ -77,13 +63,9 @@ public class PropertyExpression implements Expression
{
try
{
- CommonContentHeaderProperties _properties =
- message.getContentHeaderProperties();
- AMQShortString type = _properties.getType();
-
- return (type == null) ? null : type.toString();
+ return message.getJMSType();
}
- catch (Exception e)
+ catch (JMSException e)
{
_logger.warn("Error evaluating property", e);
@@ -107,7 +89,7 @@ public class PropertyExpression implements Expression
return mode;
}
- catch (Exception e)
+ catch (JMSException e)
{
_logger.warn("Error evaluating property",e);
}
@@ -122,9 +104,7 @@ public class PropertyExpression implements Expression
{
try
{
- CommonContentHeaderProperties _properties =
- message.getContentHeaderProperties();
- return (int) _properties.getPriority();
+ return message.getJMSPriority();
}
catch (Exception e)
{
@@ -142,13 +122,9 @@ public class PropertyExpression implements Expression
try
{
- CommonContentHeaderProperties _properties =
- message.getContentHeaderProperties();
- AMQShortString messageId = _properties.getMessageId();
-
- return (messageId == null) ? null : messageId;
+ return message.getJMSMessageID();
}
- catch (Exception e)
+ catch (JMSException e)
{
_logger.warn("Error evaluating property",e);
@@ -164,9 +140,7 @@ public class PropertyExpression implements Expression
{
try
{
- CommonContentHeaderProperties _properties =
- message.getContentHeaderProperties();
- return _properties.getTimestamp();
+ return message.getJMSTimestamp();
}
catch (Exception e)
{
@@ -185,12 +159,9 @@ public class PropertyExpression implements Expression
try
{
- CommonContentHeaderProperties _properties =
- message.getContentHeaderProperties();
- AMQShortString correlationId = _properties.getCorrelationId();
- return (correlationId == null) ? null : correlationId.toString();
+ return message.getJMSCorrelationID();
}
- catch (Exception e)
+ catch (JMSException e)
{
_logger.warn("Error evaluating property",e);
@@ -207,11 +178,9 @@ public class PropertyExpression implements Expression
try
{
- CommonContentHeaderProperties _properties =
- message.getContentHeaderProperties();
- return _properties.getExpiration();
+ return message.getJMSExpiration();
}
- catch (Exception e)
+ catch (JMSException e)
{
_logger.warn("Error evaluating property",e);
return null;
@@ -257,13 +226,20 @@ public class PropertyExpression implements Expression
else
{
- CommonContentHeaderProperties _properties = message.getContentHeaderProperties();
- if (_logger.isDebugEnabled())
+ try
+ {
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Looking up property:" + name);
+ _logger.debug("Properties are:" + message.getPropertyNames());
+ }
+ return message.getObjectProperty(name);
+ }
+ catch(JMSException e)
{
- _logger.debug("Looking up property:" + name);
- _logger.debug("Properties are:" + _properties.getHeaders().keySet());
+ throw new QpidException("Exception evaluating properties for filter", ErrorCode.INTERNAL_ERROR, e);
}
- return _properties.getHeaders().getObject(name);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/jms/Message.java b/java/client/src/main/java/org/apache/qpid/jms/Message.java
index e65f9ad2f4..53c615a1fd 100644
--- a/java/client/src/main/java/org/apache/qpid/jms/Message.java
+++ b/java/client/src/main/java/org/apache/qpid/jms/Message.java
@@ -24,5 +24,7 @@ import javax.jms.JMSException;
public interface Message extends javax.jms.Message
{
+ public static final String JMS_TYPE = "x-jms-type";
+
public void acknowledgeThis() throws JMSException;
}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/util/ByteBufferMessage.java b/java/client/src/main/java/org/apache/qpid/nclient/util/ByteBufferMessage.java
index 8973127105..3b10c44d5a 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/util/ByteBufferMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/util/ByteBufferMessage.java
@@ -30,6 +30,12 @@ public class ByteBufferMessage implements Message
private int _transferId;
private Header _header;
+ public ByteBufferMessage(MessageProperties messageProperties, DeliveryProperties deliveryProperties)
+ {
+ _currentMessageProps = messageProperties;
+ _currentDeliveryProps = deliveryProperties;
+ }
+
public void setHeader(Header header) {
_header = header;
}
diff --git a/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java b/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java
index 9b477c19e2..7ee991b63c 100644
--- a/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java
+++ b/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java
@@ -26,21 +26,21 @@ public class TestMessageHelper
{
public static JMSTextMessage newJMSTextMessage() throws JMSException
{
- return new JMSTextMessage();
+ return new JMSTextMessage(AMQMessageDelegateFactory.FACTORY_0_8);
}
public static JMSBytesMessage newJMSBytesMessage() throws JMSException
{
- return new JMSBytesMessage();
+ return new JMSBytesMessage(AMQMessageDelegateFactory.FACTORY_0_8);
}
public static JMSMapMessage newJMSMapMessage() throws JMSException
{
- return new JMSMapMessage();
+ return new JMSMapMessage(AMQMessageDelegateFactory.FACTORY_0_8);
}
public static JMSStreamMessage newJMSStreamMessage()
{
- return new JMSStreamMessage();
+ return new JMSStreamMessage(AMQMessageDelegateFactory.FACTORY_0_8);
}
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java
index fd425b9930..b5e7ae82b5 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java
@@ -20,20 +20,20 @@
*/
package org.apache.qpid.test.unit.message;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.TextMessage;
+import javax.jms.*;
import junit.framework.TestCase;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.message.AbstractJMSMessage;
-import org.apache.qpid.client.message.JMSMapMessage;
-import org.apache.qpid.client.message.JMSTextMessage;
-import org.apache.qpid.client.message.MessageConverter;
+import org.apache.qpid.client.*;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.message.*;
import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.AMQException;
+
+import java.util.Map;
public class MessageConverterTest extends TestCase
@@ -47,36 +47,38 @@ public class MessageConverterTest extends TestCase
protected JMSTextMessage testTextMessage;
protected JMSMapMessage testMapMessage;
+ private AMQSession _session = new TestAMQSession();
+
protected void setUp() throws Exception
{
super.setUp();
- testTextMessage = new JMSTextMessage();
+ testTextMessage = new JMSTextMessage(AMQMessageDelegateFactory.FACTORY_0_8);
//Set Message Text
testTextMessage.setText("testTextMessage text");
setMessageProperties(testTextMessage);
- testMapMessage = new JMSMapMessage();
+ testMapMessage = new JMSMapMessage(AMQMessageDelegateFactory.FACTORY_0_8);
testMapMessage.setString("testMapString", "testMapStringValue");
testMapMessage.setDouble("testMapDouble", Double.MAX_VALUE);
}
public void testSetProperties() throws Exception
{
- AbstractJMSMessage newMessage = new MessageConverter((TextMessage) testTextMessage).getConvertedMessage();
+ AbstractJMSMessage newMessage = new MessageConverter(_session, (TextMessage) testTextMessage).getConvertedMessage();
mesagePropertiesTest(testTextMessage, newMessage);
}
public void testJMSTextMessageConversion() throws Exception
{
- AbstractJMSMessage newMessage = new MessageConverter((TextMessage) testTextMessage).getConvertedMessage();
+ AbstractJMSMessage newMessage = new MessageConverter(_session, (TextMessage) testTextMessage).getConvertedMessage();
assertEquals("Converted message text mismatch", ((JMSTextMessage) newMessage).getText(), testTextMessage.getText());
}
public void testJMSMapMessageConversion() throws Exception
{
- AbstractJMSMessage newMessage = new MessageConverter((MapMessage) testMapMessage).getConvertedMessage();
+ AbstractJMSMessage newMessage = new MessageConverter(_session, (MapMessage) testMapMessage).getConvertedMessage();
assertEquals("Converted map message String mismatch", ((JMSMapMessage) newMessage).getString("testMapString"),
testMapMessage.getString("testMapString"));
assertEquals("Converted map message Double mismatch", ((JMSMapMessage) newMessage).getDouble("testMapDouble"),
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
new file mode 100644
index 0000000000..dfb356432e
--- /dev/null
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
@@ -0,0 +1,171 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.message;
+
+import org.apache.qpid.client.*;
+import org.apache.qpid.client.message.AMQMessageDelegateFactory;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.AMQException;
+
+import javax.jms.*;
+import java.util.Map;
+
+public class TestAMQSession extends AMQSession
+{
+
+ public TestAMQSession()
+ {
+ super(null, 0, false, AUTO_ACKNOWLEDGE, null, 0, 0);
+ }
+
+ public void acknowledgeMessage(long deliveryTag, boolean multiple)
+ {
+
+ }
+
+ public void sendQueueBind(AMQShortString queueName, AMQShortString routingKey, FieldTable arguments, AMQShortString exchangeName, AMQDestination destination) throws AMQException, FailoverException
+ {
+
+ }
+
+ public void sendClose(long timeout) throws AMQException, FailoverException
+ {
+
+ }
+
+ public void sendCommit() throws AMQException, FailoverException
+ {
+
+ }
+
+ public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
+ {
+ return null;
+ }
+
+ public void sendCreateQueue(AMQShortString name, boolean autoDelete, boolean durable, boolean exclusive, Map<String, Object> arguments) throws AMQException, FailoverException
+ {
+
+ }
+
+ public TemporaryQueue createTemporaryQueue() throws JMSException
+ {
+ return null;
+ }
+
+ protected void sendRecover() throws AMQException, FailoverException
+ {
+
+ }
+
+ public void rejectMessage(long deliveryTag, boolean requeue)
+ {
+
+ }
+
+ public void releaseForRollback()
+ {
+
+ }
+
+ public void sendRollback() throws AMQException, FailoverException
+ {
+
+ }
+
+ public BasicMessageConsumer createMessageConsumer(AMQDestination destination, int prefetchHigh, int prefetchLow, boolean noLocal, boolean exclusive, String selector, FieldTable arguments, boolean noConsume, boolean autoClose) throws JMSException
+ {
+ return null;
+ }
+
+ public boolean isQueueBound(AMQShortString exchangeName, AMQShortString queueName, AMQShortString routingKey) throws JMSException
+ {
+ return false;
+ }
+
+ public boolean isQueueBound(AMQDestination destination) throws JMSException
+ {
+ return false;
+ }
+
+ public void sendConsume(BasicMessageConsumer consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector, AMQShortString tag) throws AMQException, FailoverException
+ {
+
+ }
+
+ public BasicMessageProducer createMessageProducer(Destination destination, boolean mandatory, boolean immediate, boolean waitUntilSent, long producerId)
+ {
+ return null;
+ }
+
+ protected Long requestQueueDepth(AMQDestination amqd) throws AMQException, FailoverException
+ {
+ return null;
+ }
+
+ public void sendExchangeDeclare(AMQShortString name, AMQShortString type, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException, FailoverException
+ {
+
+ }
+
+ public void sendQueueDeclare(AMQDestination amqd, AMQProtocolHandler protocolHandler) throws AMQException, FailoverException
+ {
+
+ }
+
+ public void sendQueueDelete(AMQShortString queueName) throws AMQException, FailoverException
+ {
+
+ }
+
+ public void sendSuspendChannel(boolean suspend) throws AMQException, FailoverException
+ {
+
+ }
+
+ protected boolean tagLE(long tag1, long tag2)
+ {
+ return false;
+ }
+
+ protected boolean updateRollbackMark(long current, long deliveryTag)
+ {
+ return false;
+ }
+
+ public AMQMessageDelegateFactory getMessageDelegateFactory()
+ {
+ return AMQMessageDelegateFactory.FACTORY_0_8;
+ }
+
+ protected Object getFailoverMutex()
+ {
+ return this;
+ }
+
+ public void checkNotClosed()
+ {
+
+ }
+}
diff --git a/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java b/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java
index 64ccb719b6..3ad6c021bd 100644
--- a/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java
+++ b/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java
@@ -29,7 +29,6 @@ import javax.jms.ObjectMessage;
import javax.jms.StreamMessage;
import javax.jms.BytesMessage;
import javax.jms.TextMessage;
-import javax.jms.Queue;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
@@ -42,10 +41,10 @@ public class TestMessageFactory
return session.createTextMessage(createMessagePayload(size));
}
- public static JMSTextMessage newJMSTextMessage(int size, String encoding) throws JMSException
+ public static TextMessage newJMSTextMessage(Session session, int size, String encoding) throws JMSException
{
- ByteBuffer byteBuffer = (new SimpleByteBufferAllocator()).allocate(size, true);
- JMSTextMessage message = new JMSTextMessage(byteBuffer, encoding);
+
+ TextMessage message = session.createTextMessage();
message.clearBody();
message.setText(createMessagePayload(size));
return message;
diff --git a/java/systests/src/main/java/org/apache/qpid/client/message/NonQpidObjectMessage.java b/java/systests/src/main/java/org/apache/qpid/client/message/NonQpidObjectMessage.java
index 60a26c8e62..857adaf82c 100644
--- a/java/systests/src/main/java/org/apache/qpid/client/message/NonQpidObjectMessage.java
+++ b/java/systests/src/main/java/org/apache/qpid/client/message/NonQpidObjectMessage.java
@@ -26,20 +26,22 @@ import java.util.Enumeration;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.ObjectMessage;
+import javax.jms.Session;
public class NonQpidObjectMessage implements ObjectMessage {
- private JMSObjectMessage _realMessage;
+ private ObjectMessage _realMessage;
private String _contentString;
/**
* Allows us to construct a JMS message which
* does not inherit from the Qpid message superclasses
* and expand our unit testing of MessageConverter et al
+ * @param session
*/
- public NonQpidObjectMessage()
+ public NonQpidObjectMessage(Session session) throws JMSException
{
- _realMessage = new JMSObjectMessage();
+ _realMessage = session.createObjectMessage();
}
public String getJMSMessageID() throws JMSException {
diff --git a/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java b/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
index 98639f6970..aafddb810a 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
@@ -33,7 +33,7 @@ import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.AMQMessageHandle;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.store.TestMemoryMessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.store.MemoryMessageStore;
import org.apache.qpid.server.txn.NonTransactionalContext;
@@ -105,7 +105,7 @@ public class TxAckTest extends TestCase
Scenario(int messageCount, List<Long> acked, List<Long> unacked) throws Exception
{
- TransactionalContext txnContext = new NonTransactionalContext(new TestableMemoryMessageStore(),
+ TransactionalContext txnContext = new NonTransactionalContext(new TestMemoryMessageStore(),
_storeContext, null,
new LinkedList<RequiredDeliveryException>()
);
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
index afa0f84d71..08f78a3d28 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
@@ -34,7 +34,7 @@ import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.flow.LimitlessCreditManager;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.store.TestMemoryMessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
@@ -56,7 +56,7 @@ public class AckTest extends TestCase
private MockProtocolSession _protocolSession;
- private TestableMemoryMessageStore _messageStore;
+ private TestMemoryMessageStore _messageStore;
private StoreContext _storeContext = new StoreContext();
@@ -74,7 +74,7 @@ public class AckTest extends TestCase
protected void setUp() throws Exception
{
super.setUp();
- _messageStore = new TestableMemoryMessageStore();
+ _messageStore = new TestMemoryMessageStore();
_protocolSession = new MockProtocolSession(_messageStore);
_channel = new AMQChannel(_protocolSession,5, _messageStore /*dont need exchange registry*/);
diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java b/java/systests/src/main/java/org/apache/qpid/server/store/TestMemoryMessageStore.java
index 79d428fee8..4e48435962 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/store/TestMemoryMessageStore.java
@@ -31,9 +31,9 @@ import java.util.List;
/**
* Adds some extra methods to the memory message store for testing purposes.
*/
-public class TestableMemoryMessageStore extends MemoryMessageStore
+public class TestMemoryMessageStore extends MemoryMessageStore
{
- public TestableMemoryMessageStore()
+ public TestMemoryMessageStore()
{
_metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>();
_contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>();
diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java b/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java
index f36e924890..2346660d25 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java
@@ -29,14 +29,13 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.MessageHandleFactory;
import org.apache.qpid.server.queue.AMQMessageHandle;
-import org.apache.qpid.server.txn.NonTransactionalContext;
/**
* Tests that reference counting works correctly with AMQMessage and the message store
*/
public class TestReferenceCounting extends TestCase
{
- private TestableMemoryMessageStore _store;
+ private TestMemoryMessageStore _store;
private StoreContext _storeContext = new StoreContext();
@@ -44,7 +43,7 @@ public class TestReferenceCounting extends TestCase
protected void setUp() throws Exception
{
super.setUp();
- _store = new TestableMemoryMessageStore();
+ _store = new TestMemoryMessageStore();
}
/**
diff --git a/java/systests/src/main/java/org/apache/qpid/server/txn/TxnBufferTest.java b/java/systests/src/main/java/org/apache/qpid/server/txn/TxnBufferTest.java
index b34f28b1a8..84d3d313d1 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/txn/TxnBufferTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/txn/TxnBufferTest.java
@@ -23,7 +23,7 @@ package org.apache.qpid.server.txn;
import junit.framework.TestCase;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.store.TestMemoryMessageStore;
import org.apache.qpid.server.store.StoreContext;
import java.util.LinkedList;
@@ -194,7 +194,7 @@ public class TxnBufferTest extends TestCase
}
}
- class MockStore extends TestableMemoryMessageStore
+ class MockStore extends TestMemoryMessageStore
{
final Object BEGIN = "BEGIN";
final Object ABORT = "ABORT";
diff --git a/java/systests/src/main/java/org/apache/qpid/server/util/TestApplicationRegistry.java b/java/systests/src/main/java/org/apache/qpid/server/util/TestApplicationRegistry.java
index 6864b0a80d..e69de29bb2 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/util/TestApplicationRegistry.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/util/TestApplicationRegistry.java
@@ -1,131 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.util;
-
-import org.apache.qpid.server.exchange.ExchangeFactory;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.management.ManagedObjectRegistry;
-import org.apache.qpid.server.plugins.PluginManager;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
-import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
-import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
-import org.apache.qpid.server.security.auth.database.PropertiesPrincipalDatabaseManager;
-import org.apache.qpid.server.security.access.ACLPlugin;
-import org.apache.qpid.server.security.access.plugins.AllowAll;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.MapConfiguration;
-
-import java.util.HashMap;
-import java.util.Collection;
-import java.util.Properties;
-
-public class TestApplicationRegistry extends ApplicationRegistry
-{
- private QueueRegistry _queueRegistry;
-
- private ExchangeRegistry _exchangeRegistry;
-
- private ExchangeFactory _exchangeFactory;
-
- private MessageStore _messageStore;
-
- private VirtualHost _vHost;
-
- public TestApplicationRegistry()
- {
- super(new MapConfiguration(new HashMap()));
- }
-
- public void initialise() throws Exception
- {
- _logger.info("Initialising TestApplicationRegistry");
-
- Properties users = new Properties();
-
- users.put("guest", "guest");
-
- _databaseManager = new PropertiesPrincipalDatabaseManager("default", users);
-
- _accessManager = new AllowAll();
-
- _authenticationManager = new PrincipalDatabaseAuthenticationManager(null, null);
-
- IApplicationRegistry appRegistry = ApplicationRegistry.getInstance();
- _managedObjectRegistry = appRegistry.getManagedObjectRegistry();
- _vHost = appRegistry.getVirtualHostRegistry().getVirtualHost("test");
- _queueRegistry = _vHost.getQueueRegistry();
- _exchangeFactory = _vHost.getExchangeFactory();
- _exchangeRegistry = _vHost.getExchangeRegistry();
-
- _messageStore = new TestableMemoryMessageStore();
-
- _configuration.addProperty("heartbeat.delay", 10 * 60); // 10 minutes
- }
-
- public QueueRegistry getQueueRegistry()
- {
- return _queueRegistry;
- }
-
- public ExchangeRegistry getExchangeRegistry()
- {
- return _exchangeRegistry;
- }
-
- public ExchangeFactory getExchangeFactory()
- {
- return _exchangeFactory;
- }
-
- public Collection<String> getVirtualHostNames()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public VirtualHostRegistry getVirtualHostRegistry()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void setAccessManager(ACLPlugin newManager)
- {
- _accessManager = newManager;
- }
-
- public MessageStore getMessageStore()
- {
- return _messageStore;
- }
-
- public PluginManager getPluginManager()
- {
- return null;
- }
-}
-
-
diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java
index 10705119e7..d2965bd52a 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java
@@ -114,9 +114,14 @@ public class ObjectMessageTest extends QpidTestCase implements MessageListener
{
synchronized (received)
{
+ long endTime = System.currentTimeMillis() + 30000L;
while (received.size() < count)
{
- received.wait();
+ received.wait(30000);
+ if(received.size() < count && System.currentTimeMillis() > endTime)
+ {
+ throw new RuntimeException("Only received " + received.size() + " messages, was expecting " + count);
+ }
}
}
}
diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
index ca896b08bb..d9390c33df 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
@@ -25,10 +25,12 @@ import junit.framework.Assert;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.message.AMQMessage;
+import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.message.JMSTextMessage;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.url.BindingURL;
+import org.apache.qpid.url.AMQBindingURL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,6 +47,7 @@ import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.net.URISyntaxException;
public class PropertyValueTest extends QpidTestCase implements MessageListener
{
@@ -183,43 +186,6 @@ public class PropertyValueTest extends QpidTestCase implements MessageListener
m.setShortProperty("Short", (short) Short.MAX_VALUE);
m.setStringProperty("String", "Test");
- // AMQP Specific values
-
- // Timestamp
- long nano = System.nanoTime();
- m.setStringProperty("time-str", String.valueOf(nano));
- ((AMQMessage) m).setTimestampProperty(new AMQShortString("time"), nano);
-
- // Decimal
- BigDecimal bd = new BigDecimal(Integer.MAX_VALUE);
- ((AMQMessage) m).setDecimalProperty(new AMQShortString("decimal"), bd.setScale(Byte.MAX_VALUE));
-
- bd = new BigDecimal((long) Integer.MAX_VALUE + 1L);
-
- try
- {
- ((AMQMessage) m).setDecimalProperty(new AMQShortString("decimal-bad-value"), bd.setScale(Byte.MAX_VALUE));
- fail("UnsupportedOperationException should be thrown as value can't be correctly transmitted");
- }
- catch (UnsupportedOperationException uoe)
- {
- // normal path.
- }
-
- try
- {
- ((AMQMessage) m).setDecimalProperty(new AMQShortString("decimal-bad-scale"),
- bd.setScale(Byte.MAX_VALUE + 1));
- fail("UnsupportedOperationException should be thrown as scale can't be correctly transmitted");
- }
- catch (UnsupportedOperationException uoe)
- {
- // normal path.
- }
-
- // Void
- ((AMQMessage) m).setVoidProperty(new AMQShortString("void"));
-
_logger.debug("Sending Msg:" + m);
producer.send(m);
}
@@ -236,7 +202,7 @@ public class PropertyValueTest extends QpidTestCase implements MessageListener
}
}
- void check() throws JMSException
+ void check() throws JMSException, URISyntaxException
{
List<String> actual = new ArrayList<String>();
for (JMSTextMessage m : received)
@@ -259,8 +225,8 @@ public class PropertyValueTest extends QpidTestCase implements MessageListener
Assert.assertEquals("Check Priority properties are correctly transported", 8, m.getJMSPriority());
// Queue
- Assert.assertEquals("Check ReplyTo properties are correctly transported", m.getStringProperty("TempQueue"),
- m.getJMSReplyTo().toString());
+ Assert.assertEquals("Check ReplyTo properties are correctly transported", AMQDestination.createDestination(new AMQBindingURL(m.getStringProperty("TempQueue"))),
+ m.getJMSReplyTo());
Assert.assertEquals("Check Type properties are correctly transported", "Test", m.getJMSType());
@@ -271,7 +237,7 @@ public class PropertyValueTest extends QpidTestCase implements MessageListener
Assert.assertEquals("Check Long properties are correctly transported", (long) Long.MAX_VALUE,
m.getLongProperty("Long"));
Assert.assertEquals("Check String properties are correctly transported", "Test", m.getStringProperty("String"));
-
+/*
// AMQP Tests Specific values
Assert.assertEquals("Check Timestamp properties are correctly transported", m.getStringProperty("time-str"),
@@ -288,7 +254,7 @@ public class PropertyValueTest extends QpidTestCase implements MessageListener
Assert.assertTrue("Check void properties are correctly transported",
((AMQMessage) m).getPropertyHeaders().containsKey("void"));
-
+*/
//JMSXUserID
if (m.getStringProperty("JMSXUserID") != null)
{
diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java
index 5bf99e719e..1f90f1e29f 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java
@@ -76,7 +76,7 @@ public class JMSPropertiesTest extends QpidTestCase
MessageProducer producer = producerSession.createProducer(queue);
Destination JMS_REPLY_TO = new AMQQueue(con2, "my.replyto");
// create a test message to send
- ObjectMessage sentMsg = new NonQpidObjectMessage();
+ ObjectMessage sentMsg = new NonQpidObjectMessage(producerSession);
sentMsg.setJMSCorrelationID(JMS_CORR_ID);
sentMsg.setJMSDeliveryMode(JMS_DELIV_MODE);
sentMsg.setJMSType(JMS_TYPE);
diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
index 3027da00c7..6fa0172ae3 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
@@ -106,7 +106,8 @@ public class StreamMessageTest extends QpidTestCase
_logger.info("Starting consumer connection");
con.start();
- StreamMessage msg2 = (StreamMessage) consumer.receive();
+ StreamMessage msg2 = (StreamMessage) consumer.receive(2000);
+ assertNotNull(msg2);
msg2.readByte();
try